]> git.saurik.com Git - redis.git/blob - src/networking.c
vm_blocked_clients count fixed in INFO, thanks to Pietern Noordhuis
[redis.git] / src / networking.c
1 #include "redis.h"
2
3 #include <sys/uio.h>
4
5 void *dupClientReplyValue(void *o) {
6 incrRefCount((robj*)o);
7 return o;
8 }
9
10 int listMatchObjects(void *a, void *b) {
11 return equalStringObjects(a,b);
12 }
13
14 redisClient *createClient(int fd) {
15 redisClient *c = zmalloc(sizeof(*c));
16
17 anetNonBlock(NULL,fd);
18 anetTcpNoDelay(NULL,fd);
19 if (!c) return NULL;
20 selectDb(c,0);
21 c->fd = fd;
22 c->querybuf = sdsempty();
23 c->argc = 0;
24 c->argv = NULL;
25 c->bulklen = -1;
26 c->multibulk = 0;
27 c->mbargc = 0;
28 c->mbargv = NULL;
29 c->sentlen = 0;
30 c->flags = 0;
31 c->lastinteraction = time(NULL);
32 c->authenticated = 0;
33 c->replstate = REDIS_REPL_NONE;
34 c->reply = listCreate();
35 listSetFreeMethod(c->reply,decrRefCount);
36 listSetDupMethod(c->reply,dupClientReplyValue);
37 c->blocking_keys = NULL;
38 c->blocking_keys_num = 0;
39 c->io_keys = listCreate();
40 c->watched_keys = listCreate();
41 listSetFreeMethod(c->io_keys,decrRefCount);
42 c->pubsub_channels = dictCreate(&setDictType,NULL);
43 c->pubsub_patterns = listCreate();
44 listSetFreeMethod(c->pubsub_patterns,decrRefCount);
45 listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
46 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
47 readQueryFromClient, c) == AE_ERR) {
48 freeClient(c);
49 return NULL;
50 }
51 listAddNodeTail(server.clients,c);
52 initClientMultiState(c);
53 return c;
54 }
55
56 void addReply(redisClient *c, robj *obj) {
57 if (listLength(c->reply) == 0 &&
58 (c->replstate == REDIS_REPL_NONE ||
59 c->replstate == REDIS_REPL_ONLINE) &&
60 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
61 sendReplyToClient, c) == AE_ERR) return;
62
63 if (server.vm_enabled && obj->storage != REDIS_VM_MEMORY) {
64 obj = dupStringObject(obj);
65 obj->refcount = 0; /* getDecodedObject() will increment the refcount */
66 }
67 listAddNodeTail(c->reply,getDecodedObject(obj));
68 }
69
70 void addReplySds(redisClient *c, sds s) {
71 robj *o = createObject(REDIS_STRING,s);
72 addReply(c,o);
73 decrRefCount(o);
74 }
75
76 void addReplyDouble(redisClient *c, double d) {
77 char buf[128];
78
79 snprintf(buf,sizeof(buf),"%.17g",d);
80 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n%s\r\n",
81 (unsigned long) strlen(buf),buf));
82 }
83
84 void addReplyLongLong(redisClient *c, long long ll) {
85 char buf[128];
86 size_t len;
87
88 if (ll == 0) {
89 addReply(c,shared.czero);
90 return;
91 } else if (ll == 1) {
92 addReply(c,shared.cone);
93 return;
94 }
95 buf[0] = ':';
96 len = ll2string(buf+1,sizeof(buf)-1,ll);
97 buf[len+1] = '\r';
98 buf[len+2] = '\n';
99 addReplySds(c,sdsnewlen(buf,len+3));
100 }
101
102 void addReplyUlong(redisClient *c, unsigned long ul) {
103 char buf[128];
104 size_t len;
105
106 if (ul == 0) {
107 addReply(c,shared.czero);
108 return;
109 } else if (ul == 1) {
110 addReply(c,shared.cone);
111 return;
112 }
113 len = snprintf(buf,sizeof(buf),":%lu\r\n",ul);
114 addReplySds(c,sdsnewlen(buf,len));
115 }
116
117 void addReplyBulkLen(redisClient *c, robj *obj) {
118 size_t len, intlen;
119 char buf[128];
120
121 if (obj->encoding == REDIS_ENCODING_RAW) {
122 len = sdslen(obj->ptr);
123 } else {
124 long n = (long)obj->ptr;
125
126 /* Compute how many bytes will take this integer as a radix 10 string */
127 len = 1;
128 if (n < 0) {
129 len++;
130 n = -n;
131 }
132 while((n = n/10) != 0) {
133 len++;
134 }
135 }
136 buf[0] = '$';
137 intlen = ll2string(buf+1,sizeof(buf)-1,(long long)len);
138 buf[intlen+1] = '\r';
139 buf[intlen+2] = '\n';
140 addReplySds(c,sdsnewlen(buf,intlen+3));
141 }
142
143 void addReplyBulk(redisClient *c, robj *obj) {
144 addReplyBulkLen(c,obj);
145 addReply(c,obj);
146 addReply(c,shared.crlf);
147 }
148
149 /* In the CONFIG command we need to add vanilla C string as bulk replies */
150 void addReplyBulkCString(redisClient *c, char *s) {
151 if (s == NULL) {
152 addReply(c,shared.nullbulk);
153 } else {
154 robj *o = createStringObject(s,strlen(s));
155 addReplyBulk(c,o);
156 decrRefCount(o);
157 }
158 }
159
160 void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
161 int cport, cfd;
162 char cip[128];
163 redisClient *c;
164 REDIS_NOTUSED(el);
165 REDIS_NOTUSED(mask);
166 REDIS_NOTUSED(privdata);
167
168 cfd = anetAccept(server.neterr, fd, cip, &cport);
169 if (cfd == AE_ERR) {
170 redisLog(REDIS_VERBOSE,"Accepting client connection: %s", server.neterr);
171 return;
172 }
173 redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
174 if ((c = createClient(cfd)) == NULL) {
175 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
176 close(cfd); /* May be already closed, just ingore errors */
177 return;
178 }
179 /* If maxclient directive is set and this is one client more... close the
180 * connection. Note that we create the client instead to check before
181 * for this condition, since now the socket is already set in nonblocking
182 * mode and we can send an error for free using the Kernel I/O */
183 if (server.maxclients && listLength(server.clients) > server.maxclients) {
184 char *err = "-ERR max number of clients reached\r\n";
185
186 /* That's a best effort error message, don't check write errors */
187 if (write(c->fd,err,strlen(err)) == -1) {
188 /* Nothing to do, Just to avoid the warning... */
189 }
190 freeClient(c);
191 return;
192 }
193 server.stat_numconnections++;
194 }
195
196 static void freeClientArgv(redisClient *c) {
197 int j;
198
199 for (j = 0; j < c->argc; j++)
200 decrRefCount(c->argv[j]);
201 for (j = 0; j < c->mbargc; j++)
202 decrRefCount(c->mbargv[j]);
203 c->argc = 0;
204 c->mbargc = 0;
205 }
206
207 void freeClient(redisClient *c) {
208 listNode *ln;
209
210 /* Note that if the client we are freeing is blocked into a blocking
211 * call, we have to set querybuf to NULL *before* to call
212 * unblockClientWaitingData() to avoid processInputBuffer() will get
213 * called. Also it is important to remove the file events after
214 * this, because this call adds the READABLE event. */
215 sdsfree(c->querybuf);
216 c->querybuf = NULL;
217 if (c->flags & REDIS_BLOCKED)
218 unblockClientWaitingData(c);
219
220 /* UNWATCH all the keys */
221 unwatchAllKeys(c);
222 listRelease(c->watched_keys);
223 /* Unsubscribe from all the pubsub channels */
224 pubsubUnsubscribeAllChannels(c,0);
225 pubsubUnsubscribeAllPatterns(c,0);
226 dictRelease(c->pubsub_channels);
227 listRelease(c->pubsub_patterns);
228 /* Obvious cleanup */
229 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
230 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
231 listRelease(c->reply);
232 freeClientArgv(c);
233 close(c->fd);
234 /* Remove from the list of clients */
235 ln = listSearchKey(server.clients,c);
236 redisAssert(ln != NULL);
237 listDelNode(server.clients,ln);
238 /* Remove from the list of clients waiting for swapped keys, or ready
239 * to be restarted, but not yet woken up again. */
240 if (c->flags & REDIS_IO_WAIT) {
241 redisAssert(server.vm_enabled);
242 if (listLength(c->io_keys) == 0) {
243 ln = listSearchKey(server.io_ready_clients,c);
244
245 /* When this client is waiting to be woken up (REDIS_IO_WAIT),
246 * it should be present in the list io_ready_clients */
247 redisAssert(ln != NULL);
248 listDelNode(server.io_ready_clients,ln);
249 } else {
250 while (listLength(c->io_keys)) {
251 ln = listFirst(c->io_keys);
252 dontWaitForSwappedKey(c,ln->value);
253 }
254 }
255 server.vm_blocked_clients--;
256 }
257 listRelease(c->io_keys);
258 /* Master/slave cleanup */
259 if (c->flags & REDIS_SLAVE) {
260 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
261 close(c->repldbfd);
262 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
263 ln = listSearchKey(l,c);
264 redisAssert(ln != NULL);
265 listDelNode(l,ln);
266 }
267 if (c->flags & REDIS_MASTER) {
268 server.master = NULL;
269 server.replstate = REDIS_REPL_CONNECT;
270 }
271 /* Release memory */
272 zfree(c->argv);
273 zfree(c->mbargv);
274 freeClientMultiState(c);
275 zfree(c);
276 }
277
278 #define GLUEREPLY_UP_TO (1024)
279 static void glueReplyBuffersIfNeeded(redisClient *c) {
280 int copylen = 0;
281 char buf[GLUEREPLY_UP_TO];
282 listNode *ln;
283 listIter li;
284 robj *o;
285
286 listRewind(c->reply,&li);
287 while((ln = listNext(&li))) {
288 int objlen;
289
290 o = ln->value;
291 objlen = sdslen(o->ptr);
292 if (copylen + objlen <= GLUEREPLY_UP_TO) {
293 memcpy(buf+copylen,o->ptr,objlen);
294 copylen += objlen;
295 listDelNode(c->reply,ln);
296 } else {
297 if (copylen == 0) return;
298 break;
299 }
300 }
301 /* Now the output buffer is empty, add the new single element */
302 o = createObject(REDIS_STRING,sdsnewlen(buf,copylen));
303 listAddNodeHead(c->reply,o);
304 }
305
306 void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
307 redisClient *c = privdata;
308 int nwritten = 0, totwritten = 0, objlen;
309 robj *o;
310 REDIS_NOTUSED(el);
311 REDIS_NOTUSED(mask);
312
313 /* Use writev() if we have enough buffers to send */
314 if (!server.glueoutputbuf &&
315 listLength(c->reply) > REDIS_WRITEV_THRESHOLD &&
316 !(c->flags & REDIS_MASTER))
317 {
318 sendReplyToClientWritev(el, fd, privdata, mask);
319 return;
320 }
321
322 while(listLength(c->reply)) {
323 if (server.glueoutputbuf && listLength(c->reply) > 1)
324 glueReplyBuffersIfNeeded(c);
325
326 o = listNodeValue(listFirst(c->reply));
327 objlen = sdslen(o->ptr);
328
329 if (objlen == 0) {
330 listDelNode(c->reply,listFirst(c->reply));
331 continue;
332 }
333
334 if (c->flags & REDIS_MASTER) {
335 /* Don't reply to a master */
336 nwritten = objlen - c->sentlen;
337 } else {
338 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
339 if (nwritten <= 0) break;
340 }
341 c->sentlen += nwritten;
342 totwritten += nwritten;
343 /* If we fully sent the object on head go to the next one */
344 if (c->sentlen == objlen) {
345 listDelNode(c->reply,listFirst(c->reply));
346 c->sentlen = 0;
347 }
348 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
349 * bytes, in a single threaded server it's a good idea to serve
350 * other clients as well, even if a very large request comes from
351 * super fast link that is always able to accept data (in real world
352 * scenario think about 'KEYS *' against the loopback interfae) */
353 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
354 }
355 if (nwritten == -1) {
356 if (errno == EAGAIN) {
357 nwritten = 0;
358 } else {
359 redisLog(REDIS_VERBOSE,
360 "Error writing to client: %s", strerror(errno));
361 freeClient(c);
362 return;
363 }
364 }
365 if (totwritten > 0) c->lastinteraction = time(NULL);
366 if (listLength(c->reply) == 0) {
367 c->sentlen = 0;
368 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
369 }
370 }
371
372 void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask)
373 {
374 redisClient *c = privdata;
375 int nwritten = 0, totwritten = 0, objlen, willwrite;
376 robj *o;
377 struct iovec iov[REDIS_WRITEV_IOVEC_COUNT];
378 int offset, ion = 0;
379 REDIS_NOTUSED(el);
380 REDIS_NOTUSED(mask);
381
382 listNode *node;
383 while (listLength(c->reply)) {
384 offset = c->sentlen;
385 ion = 0;
386 willwrite = 0;
387
388 /* fill-in the iov[] array */
389 for(node = listFirst(c->reply); node; node = listNextNode(node)) {
390 o = listNodeValue(node);
391 objlen = sdslen(o->ptr);
392
393 if (totwritten + objlen - offset > REDIS_MAX_WRITE_PER_EVENT)
394 break;
395
396 if(ion == REDIS_WRITEV_IOVEC_COUNT)
397 break; /* no more iovecs */
398
399 iov[ion].iov_base = ((char*)o->ptr) + offset;
400 iov[ion].iov_len = objlen - offset;
401 willwrite += objlen - offset;
402 offset = 0; /* just for the first item */
403 ion++;
404 }
405
406 if(willwrite == 0)
407 break;
408
409 /* write all collected blocks at once */
410 if((nwritten = writev(fd, iov, ion)) < 0) {
411 if (errno != EAGAIN) {
412 redisLog(REDIS_VERBOSE,
413 "Error writing to client: %s", strerror(errno));
414 freeClient(c);
415 return;
416 }
417 break;
418 }
419
420 totwritten += nwritten;
421 offset = c->sentlen;
422
423 /* remove written robjs from c->reply */
424 while (nwritten && listLength(c->reply)) {
425 o = listNodeValue(listFirst(c->reply));
426 objlen = sdslen(o->ptr);
427
428 if(nwritten >= objlen - offset) {
429 listDelNode(c->reply, listFirst(c->reply));
430 nwritten -= objlen - offset;
431 c->sentlen = 0;
432 } else {
433 /* partial write */
434 c->sentlen += nwritten;
435 break;
436 }
437 offset = 0;
438 }
439 }
440
441 if (totwritten > 0)
442 c->lastinteraction = time(NULL);
443
444 if (listLength(c->reply) == 0) {
445 c->sentlen = 0;
446 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
447 }
448 }
449
450 /* resetClient prepare the client to process the next command */
451 void resetClient(redisClient *c) {
452 freeClientArgv(c);
453 c->bulklen = -1;
454 c->multibulk = 0;
455 }
456
457 void closeTimedoutClients(void) {
458 redisClient *c;
459 listNode *ln;
460 time_t now = time(NULL);
461 listIter li;
462
463 listRewind(server.clients,&li);
464 while ((ln = listNext(&li)) != NULL) {
465 c = listNodeValue(ln);
466 if (server.maxidletime &&
467 !(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
468 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
469 dictSize(c->pubsub_channels) == 0 && /* no timeout for pubsub */
470 listLength(c->pubsub_patterns) == 0 &&
471 (now - c->lastinteraction > server.maxidletime))
472 {
473 redisLog(REDIS_VERBOSE,"Closing idle client");
474 freeClient(c);
475 } else if (c->flags & REDIS_BLOCKED) {
476 if (c->blockingto != 0 && c->blockingto < now) {
477 addReply(c,shared.nullmultibulk);
478 unblockClientWaitingData(c);
479 }
480 }
481 }
482 }
483
484 void processInputBuffer(redisClient *c) {
485 again:
486 /* Before to process the input buffer, make sure the client is not
487 * waitig for a blocking operation such as BLPOP. Note that the first
488 * iteration the client is never blocked, otherwise the processInputBuffer
489 * would not be called at all, but after the execution of the first commands
490 * in the input buffer the client may be blocked, and the "goto again"
491 * will try to reiterate. The following line will make it return asap. */
492 if (c->flags & REDIS_BLOCKED || c->flags & REDIS_IO_WAIT) return;
493 if (c->bulklen == -1) {
494 /* Read the first line of the query */
495 char *p = strchr(c->querybuf,'\n');
496 size_t querylen;
497
498 if (p) {
499 sds query, *argv;
500 int argc, j;
501
502 query = c->querybuf;
503 c->querybuf = sdsempty();
504 querylen = 1+(p-(query));
505 if (sdslen(query) > querylen) {
506 /* leave data after the first line of the query in the buffer */
507 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
508 }
509 *p = '\0'; /* remove "\n" */
510 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
511 sdsupdatelen(query);
512
513 /* Now we can split the query in arguments */
514 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
515 sdsfree(query);
516
517 if (c->argv) zfree(c->argv);
518 c->argv = zmalloc(sizeof(robj*)*argc);
519
520 for (j = 0; j < argc; j++) {
521 if (sdslen(argv[j])) {
522 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
523 c->argc++;
524 } else {
525 sdsfree(argv[j]);
526 }
527 }
528 zfree(argv);
529 if (c->argc) {
530 /* Execute the command. If the client is still valid
531 * after processCommand() return and there is something
532 * on the query buffer try to process the next command. */
533 if (processCommand(c) && sdslen(c->querybuf)) goto again;
534 } else {
535 /* Nothing to process, argc == 0. Just process the query
536 * buffer if it's not empty or return to the caller */
537 if (sdslen(c->querybuf)) goto again;
538 }
539 return;
540 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
541 redisLog(REDIS_VERBOSE, "Client protocol error");
542 freeClient(c);
543 return;
544 }
545 } else {
546 /* Bulk read handling. Note that if we are at this point
547 the client already sent a command terminated with a newline,
548 we are reading the bulk data that is actually the last
549 argument of the command. */
550 int qbl = sdslen(c->querybuf);
551
552 if (c->bulklen <= qbl) {
553 /* Copy everything but the final CRLF as final argument */
554 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
555 c->argc++;
556 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
557 /* Process the command. If the client is still valid after
558 * the processing and there is more data in the buffer
559 * try to parse it. */
560 if (processCommand(c) && sdslen(c->querybuf)) goto again;
561 return;
562 }
563 }
564 }
565
566 void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
567 redisClient *c = (redisClient*) privdata;
568 char buf[REDIS_IOBUF_LEN];
569 int nread;
570 REDIS_NOTUSED(el);
571 REDIS_NOTUSED(mask);
572
573 nread = read(fd, buf, REDIS_IOBUF_LEN);
574 if (nread == -1) {
575 if (errno == EAGAIN) {
576 nread = 0;
577 } else {
578 redisLog(REDIS_VERBOSE, "Reading from client: %s",strerror(errno));
579 freeClient(c);
580 return;
581 }
582 } else if (nread == 0) {
583 redisLog(REDIS_VERBOSE, "Client closed connection");
584 freeClient(c);
585 return;
586 }
587 if (nread) {
588 c->querybuf = sdscatlen(c->querybuf, buf, nread);
589 c->lastinteraction = time(NULL);
590 } else {
591 return;
592 }
593 processInputBuffer(c);
594 }