]> git.saurik.com Git - redis.git/blame - src/networking.c
Merge remote branch 'pietern/networking-perf'
[redis.git] / src / networking.c
CommitLineData
e2641e09 1#include "redis.h"
e2641e09 2#include <sys/uio.h>
3
4void *dupClientReplyValue(void *o) {
5 incrRefCount((robj*)o);
6 return o;
7}
8
9int listMatchObjects(void *a, void *b) {
10 return equalStringObjects(a,b);
11}
12
13redisClient *createClient(int fd) {
f3357792 14 redisClient *c = zmalloc(sizeof(redisClient));
834ef78e 15 c->bufpos = 0;
e2641e09 16
17 anetNonBlock(NULL,fd);
18 anetTcpNoDelay(NULL,fd);
19 if (!c) return NULL;
106bd87a
PN
20 if (aeCreateFileEvent(server.el,fd,AE_READABLE,
21 readQueryFromClient, c) == AE_ERR)
22 {
23 close(fd);
24 zfree(c);
25 return NULL;
26 }
27
e2641e09 28 selectDb(c,0);
29 c->fd = fd;
30 c->querybuf = sdsempty();
31 c->argc = 0;
32 c->argv = NULL;
33 c->bulklen = -1;
34 c->multibulk = 0;
35 c->mbargc = 0;
36 c->mbargv = NULL;
37 c->sentlen = 0;
38 c->flags = 0;
39 c->lastinteraction = time(NULL);
40 c->authenticated = 0;
41 c->replstate = REDIS_REPL_NONE;
42 c->reply = listCreate();
43 listSetFreeMethod(c->reply,decrRefCount);
44 listSetDupMethod(c->reply,dupClientReplyValue);
45 c->blocking_keys = NULL;
46 c->blocking_keys_num = 0;
47 c->io_keys = listCreate();
48 c->watched_keys = listCreate();
49 listSetFreeMethod(c->io_keys,decrRefCount);
50 c->pubsub_channels = dictCreate(&setDictType,NULL);
51 c->pubsub_patterns = listCreate();
52 listSetFreeMethod(c->pubsub_patterns,decrRefCount);
53 listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
e2641e09 54 listAddNodeTail(server.clients,c);
55 initClientMultiState(c);
56 return c;
57}
58
834ef78e 59int _ensureFileEvent(redisClient *c) {
57b07380 60 if (c->fd <= 0) return REDIS_ERR;
834ef78e 61 if (c->bufpos == 0 && listLength(c->reply) == 0 &&
e2641e09 62 (c->replstate == REDIS_REPL_NONE ||
63 c->replstate == REDIS_REPL_ONLINE) &&
64 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
834ef78e
PN
65 sendReplyToClient, c) == AE_ERR) return REDIS_ERR;
66 return REDIS_OK;
67}
68
36c19d03
PN
69/* Create a duplicate of the last object in the reply list when
70 * it is not exclusively owned by the reply list. */
71robj *dupLastObjectIfNeeded(list *reply) {
72 robj *new, *cur;
73 listNode *ln;
74 redisAssert(listLength(reply) > 0);
75 ln = listLast(reply);
76 cur = listNodeValue(ln);
77 if (cur->refcount > 1) {
78 new = dupStringObject(cur);
79 decrRefCount(cur);
80 listNodeValue(ln) = new;
81 }
82 return listNodeValue(ln);
834ef78e
PN
83}
84
36c19d03 85int _addReplyToBuffer(redisClient *c, char *s, size_t len) {
f3357792 86 size_t available = sizeof(c->buf)-c->bufpos;
36c19d03
PN
87
88 /* If there already are entries in the reply list, we cannot
89 * add anything more to the static buffer. */
90 if (listLength(c->reply) > 0) return REDIS_ERR;
91
92 /* Check that the buffer has enough space available for this string. */
93 if (len > available) return REDIS_ERR;
e2641e09 94
36c19d03
PN
95 memcpy(c->buf+c->bufpos,s,len);
96 c->bufpos+=len;
97 return REDIS_OK;
834ef78e
PN
98}
99
36c19d03
PN
100void _addReplyObjectToList(redisClient *c, robj *o) {
101 robj *tail;
102 if (listLength(c->reply) == 0) {
103 incrRefCount(o);
104 listAddNodeTail(c->reply,o);
105 } else {
106 tail = listNodeValue(listLast(c->reply));
107
108 /* Append to this object when possible. */
109 if (tail->ptr != NULL &&
110 sdslen(tail->ptr)+sdslen(o->ptr) <= REDIS_REPLY_CHUNK_BYTES)
111 {
112 tail = dupLastObjectIfNeeded(c->reply);
113 tail->ptr = sdscatlen(tail->ptr,o->ptr,sdslen(o->ptr));
114 } else {
115 incrRefCount(o);
116 listAddNodeTail(c->reply,o);
117 }
118 }
119}
834ef78e 120
36c19d03
PN
121/* This method takes responsibility over the sds. When it is no longer
122 * needed it will be free'd, otherwise it ends up in a robj. */
123void _addReplySdsToList(redisClient *c, sds s) {
124 robj *tail;
125 if (listLength(c->reply) == 0) {
126 listAddNodeTail(c->reply,createObject(REDIS_STRING,s));
127 } else {
128 tail = listNodeValue(listLast(c->reply));
129
130 /* Append to this object when possible. */
131 if (tail->ptr != NULL &&
132 sdslen(tail->ptr)+sdslen(s) <= REDIS_REPLY_CHUNK_BYTES)
133 {
134 tail = dupLastObjectIfNeeded(c->reply);
135 tail->ptr = sdscatlen(tail->ptr,s,sdslen(s));
136 sdsfree(s);
834ef78e 137 } else {
36c19d03 138 listAddNodeTail(c->reply,createObject(REDIS_STRING,s));
834ef78e 139 }
36c19d03
PN
140 }
141}
142
143void _addReplyStringToList(redisClient *c, char *s, size_t len) {
144 robj *tail;
145 if (listLength(c->reply) == 0) {
146 listAddNodeTail(c->reply,createStringObject(s,len));
834ef78e 147 } else {
36c19d03
PN
148 tail = listNodeValue(listLast(c->reply));
149
150 /* Append to this object when possible. */
151 if (tail->ptr != NULL &&
152 sdslen(tail->ptr)+len <= REDIS_REPLY_CHUNK_BYTES)
153 {
154 tail = dupLastObjectIfNeeded(c->reply);
155 tail->ptr = sdscatlen(tail->ptr,s,len);
834ef78e 156 } else {
36c19d03 157 listAddNodeTail(c->reply,createStringObject(s,len));
834ef78e
PN
158 }
159 }
160}
e2641e09 161
834ef78e
PN
162void addReply(redisClient *c, robj *obj) {
163 if (_ensureFileEvent(c) != REDIS_OK) return;
e2641e09 164 if (server.vm_enabled && obj->storage != REDIS_VM_MEMORY) {
834ef78e 165 /* Returns a new object with refcount 1 */
e2641e09 166 obj = dupStringObject(obj);
834ef78e
PN
167 } else {
168 /* This increments the refcount. */
169 obj = getDecodedObject(obj);
e2641e09 170 }
36c19d03 171 if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK)
834ef78e 172 _addReplyObjectToList(c,obj);
36c19d03 173 decrRefCount(obj);
e2641e09 174}
175
176void addReplySds(redisClient *c, sds s) {
cd76bb65
PN
177 if (_ensureFileEvent(c) != REDIS_OK) {
178 /* The caller expects the sds to be free'd. */
179 sdsfree(s);
180 return;
181 }
36c19d03 182 if (_addReplyToBuffer(c,s,sdslen(s)) == REDIS_OK) {
834ef78e
PN
183 sdsfree(s);
184 } else {
36c19d03
PN
185 /* This method free's the sds when it is no longer needed. */
186 _addReplySdsToList(c,s);
834ef78e 187 }
e2641e09 188}
189
834ef78e
PN
190void addReplyString(redisClient *c, char *s, size_t len) {
191 if (_ensureFileEvent(c) != REDIS_OK) return;
36c19d03
PN
192 if (_addReplyToBuffer(c,s,len) != REDIS_OK)
193 _addReplyStringToList(c,s,len);
834ef78e 194}
e2641e09 195
3ab20376
PN
196void _addReplyError(redisClient *c, char *s, size_t len) {
197 addReplyString(c,"-ERR ",5);
198 addReplyString(c,s,len);
199 addReplyString(c,"\r\n",2);
e2641e09 200}
201
3ab20376
PN
202void addReplyError(redisClient *c, char *err) {
203 _addReplyError(c,err,strlen(err));
204}
e2641e09 205
3ab20376
PN
206void addReplyErrorFormat(redisClient *c, const char *fmt, ...) {
207 va_list ap;
208 va_start(ap,fmt);
209 sds s = sdscatvprintf(sdsempty(),fmt,ap);
210 va_end(ap);
211 _addReplyError(c,s,sdslen(s));
212 sdsfree(s);
213}
214
215void _addReplyStatus(redisClient *c, char *s, size_t len) {
216 addReplyString(c,"+",1);
217 addReplyString(c,s,len);
218 addReplyString(c,"\r\n",2);
219}
220
221void addReplyStatus(redisClient *c, char *status) {
222 _addReplyStatus(c,status,strlen(status));
223}
224
225void addReplyStatusFormat(redisClient *c, const char *fmt, ...) {
226 va_list ap;
227 va_start(ap,fmt);
228 sds s = sdscatvprintf(sdsempty(),fmt,ap);
229 va_end(ap);
230 _addReplyStatus(c,s,sdslen(s));
231 sdsfree(s);
232}
233
b301c1fc
PN
234/* Adds an empty object to the reply list that will contain the multi bulk
235 * length, which is not known when this function is called. */
236void *addDeferredMultiBulkLength(redisClient *c) {
237 if (_ensureFileEvent(c) != REDIS_OK) return NULL;
36c19d03 238 listAddNodeTail(c->reply,createObject(REDIS_STRING,NULL));
b301c1fc
PN
239 return listLast(c->reply);
240}
241
242/* Populate the length object and try glueing it to the next chunk. */
243void setDeferredMultiBulkLength(redisClient *c, void *node, long length) {
244 listNode *ln = (listNode*)node;
245 robj *len, *next;
246
247 /* Abort when *node is NULL (see addDeferredMultiBulkLength). */
248 if (node == NULL) return;
249
250 len = listNodeValue(ln);
251 len->ptr = sdscatprintf(sdsempty(),"*%ld\r\n",length);
252 if (ln->next != NULL) {
253 next = listNodeValue(ln->next);
36c19d03 254
49128f0b 255 /* Only glue when the next node is non-NULL (an sds in this case) */
36c19d03 256 if (next->ptr != NULL) {
49128f0b 257 len->ptr = sdscatlen(len->ptr,next->ptr,sdslen(next->ptr));
b301c1fc
PN
258 listDelNode(c->reply,ln->next);
259 }
e2641e09 260 }
b301c1fc
PN
261}
262
834ef78e
PN
263void addReplyDouble(redisClient *c, double d) {
264 char dbuf[128], sbuf[128];
265 int dlen, slen;
266 dlen = snprintf(dbuf,sizeof(dbuf),"%.17g",d);
267 slen = snprintf(sbuf,sizeof(sbuf),"$%d\r\n%s\r\n",dlen,dbuf);
268 addReplyString(c,sbuf,slen);
e2641e09 269}
270
834ef78e 271void _addReplyLongLong(redisClient *c, long long ll, char prefix) {
e2641e09 272 char buf[128];
834ef78e
PN
273 int len;
274 buf[0] = prefix;
e2641e09 275 len = ll2string(buf+1,sizeof(buf)-1,ll);
276 buf[len+1] = '\r';
277 buf[len+2] = '\n';
834ef78e 278 addReplyString(c,buf,len+3);
e2641e09 279}
280
834ef78e
PN
281void addReplyLongLong(redisClient *c, long long ll) {
282 _addReplyLongLong(c,ll,':');
283}
e2641e09 284
0537e7bf
PN
285void addReplyMultiBulkLen(redisClient *c, long length) {
286 _addReplyLongLong(c,length,'*');
e2641e09 287}
288
289void addReplyBulkLen(redisClient *c, robj *obj) {
834ef78e 290 size_t len;
e2641e09 291
292 if (obj->encoding == REDIS_ENCODING_RAW) {
293 len = sdslen(obj->ptr);
294 } else {
295 long n = (long)obj->ptr;
296
297 /* Compute how many bytes will take this integer as a radix 10 string */
298 len = 1;
299 if (n < 0) {
300 len++;
301 n = -n;
302 }
303 while((n = n/10) != 0) {
304 len++;
305 }
306 }
834ef78e 307 _addReplyLongLong(c,len,'$');
e2641e09 308}
309
310void addReplyBulk(redisClient *c, robj *obj) {
311 addReplyBulkLen(c,obj);
312 addReply(c,obj);
313 addReply(c,shared.crlf);
314}
315
316/* In the CONFIG command we need to add vanilla C string as bulk replies */
317void addReplyBulkCString(redisClient *c, char *s) {
318 if (s == NULL) {
319 addReply(c,shared.nullbulk);
320 } else {
321 robj *o = createStringObject(s,strlen(s));
322 addReplyBulk(c,o);
323 decrRefCount(o);
324 }
325}
326
327void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
328 int cport, cfd;
329 char cip[128];
330 redisClient *c;
331 REDIS_NOTUSED(el);
332 REDIS_NOTUSED(mask);
333 REDIS_NOTUSED(privdata);
334
335 cfd = anetAccept(server.neterr, fd, cip, &cport);
336 if (cfd == AE_ERR) {
337 redisLog(REDIS_VERBOSE,"Accepting client connection: %s", server.neterr);
338 return;
339 }
340 redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
341 if ((c = createClient(cfd)) == NULL) {
342 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
343 close(cfd); /* May be already closed, just ingore errors */
344 return;
345 }
346 /* If maxclient directive is set and this is one client more... close the
347 * connection. Note that we create the client instead to check before
348 * for this condition, since now the socket is already set in nonblocking
349 * mode and we can send an error for free using the Kernel I/O */
350 if (server.maxclients && listLength(server.clients) > server.maxclients) {
351 char *err = "-ERR max number of clients reached\r\n";
352
353 /* That's a best effort error message, don't check write errors */
354 if (write(c->fd,err,strlen(err)) == -1) {
355 /* Nothing to do, Just to avoid the warning... */
356 }
357 freeClient(c);
358 return;
359 }
360 server.stat_numconnections++;
361}
362
363static void freeClientArgv(redisClient *c) {
364 int j;
365
366 for (j = 0; j < c->argc; j++)
367 decrRefCount(c->argv[j]);
368 for (j = 0; j < c->mbargc; j++)
369 decrRefCount(c->mbargv[j]);
370 c->argc = 0;
371 c->mbargc = 0;
372}
373
374void freeClient(redisClient *c) {
375 listNode *ln;
376
377 /* Note that if the client we are freeing is blocked into a blocking
378 * call, we have to set querybuf to NULL *before* to call
379 * unblockClientWaitingData() to avoid processInputBuffer() will get
380 * called. Also it is important to remove the file events after
381 * this, because this call adds the READABLE event. */
382 sdsfree(c->querybuf);
383 c->querybuf = NULL;
384 if (c->flags & REDIS_BLOCKED)
385 unblockClientWaitingData(c);
386
387 /* UNWATCH all the keys */
388 unwatchAllKeys(c);
389 listRelease(c->watched_keys);
390 /* Unsubscribe from all the pubsub channels */
391 pubsubUnsubscribeAllChannels(c,0);
392 pubsubUnsubscribeAllPatterns(c,0);
393 dictRelease(c->pubsub_channels);
394 listRelease(c->pubsub_patterns);
395 /* Obvious cleanup */
396 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
397 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
398 listRelease(c->reply);
399 freeClientArgv(c);
400 close(c->fd);
401 /* Remove from the list of clients */
402 ln = listSearchKey(server.clients,c);
403 redisAssert(ln != NULL);
404 listDelNode(server.clients,ln);
1a71fb96 405 /* Remove from the list of clients waiting for swapped keys, or ready
406 * to be restarted, but not yet woken up again. */
407 if (c->flags & REDIS_IO_WAIT) {
408 redisAssert(server.vm_enabled);
409 if (listLength(c->io_keys) == 0) {
410 ln = listSearchKey(server.io_ready_clients,c);
411
412 /* When this client is waiting to be woken up (REDIS_IO_WAIT),
413 * it should be present in the list io_ready_clients */
414 redisAssert(ln != NULL);
e2641e09 415 listDelNode(server.io_ready_clients,ln);
1a71fb96 416 } else {
417 while (listLength(c->io_keys)) {
418 ln = listFirst(c->io_keys);
419 dontWaitForSwappedKey(c,ln->value);
420 }
e2641e09 421 }
1a71fb96 422 server.vm_blocked_clients--;
e2641e09 423 }
424 listRelease(c->io_keys);
778b2210 425 /* Master/slave cleanup.
426 * Case 1: we lost the connection with a slave. */
e2641e09 427 if (c->flags & REDIS_SLAVE) {
428 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
429 close(c->repldbfd);
430 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
431 ln = listSearchKey(l,c);
432 redisAssert(ln != NULL);
433 listDelNode(l,ln);
434 }
778b2210 435
436 /* Case 2: we lost the connection with the master. */
e2641e09 437 if (c->flags & REDIS_MASTER) {
438 server.master = NULL;
439 server.replstate = REDIS_REPL_CONNECT;
778b2210 440 /* Since we lost the connection with the master, we should also
441 * close the connection with all our slaves if we have any, so
442 * when we'll resync with the master the other slaves will sync again
443 * with us as well. Note that also when the slave is not connected
444 * to the master it will keep refusing connections by other slaves. */
445 while (listLength(server.slaves)) {
446 ln = listFirst(server.slaves);
447 freeClient((redisClient*)ln->value);
448 }
e2641e09 449 }
450 /* Release memory */
451 zfree(c->argv);
452 zfree(c->mbargv);
453 freeClientMultiState(c);
454 zfree(c);
455}
456
e2641e09 457void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
458 redisClient *c = privdata;
459 int nwritten = 0, totwritten = 0, objlen;
460 robj *o;
461 REDIS_NOTUSED(el);
462 REDIS_NOTUSED(mask);
463
464 /* Use writev() if we have enough buffers to send */
465 if (!server.glueoutputbuf &&
466 listLength(c->reply) > REDIS_WRITEV_THRESHOLD &&
467 !(c->flags & REDIS_MASTER))
468 {
469 sendReplyToClientWritev(el, fd, privdata, mask);
470 return;
471 }
472
834ef78e
PN
473 while(c->bufpos > 0 || listLength(c->reply)) {
474 if (c->bufpos > 0) {
475 if (c->flags & REDIS_MASTER) {
476 /* Don't reply to a master */
477 nwritten = c->bufpos - c->sentlen;
478 } else {
479 nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
480 if (nwritten <= 0) break;
481 }
482 c->sentlen += nwritten;
483 totwritten += nwritten;
484
485 /* If the buffer was sent, set bufpos to zero to continue with
486 * the remainder of the reply. */
487 if (c->sentlen == c->bufpos) {
488 c->bufpos = 0;
489 c->sentlen = 0;
490 }
491 } else {
492 o = listNodeValue(listFirst(c->reply));
493 objlen = sdslen(o->ptr);
e2641e09 494
834ef78e
PN
495 if (objlen == 0) {
496 listDelNode(c->reply,listFirst(c->reply));
497 continue;
498 }
e2641e09 499
834ef78e
PN
500 if (c->flags & REDIS_MASTER) {
501 /* Don't reply to a master */
502 nwritten = objlen - c->sentlen;
503 } else {
504 nwritten = write(fd, ((char*)o->ptr)+c->sentlen,objlen-c->sentlen);
505 if (nwritten <= 0) break;
506 }
507 c->sentlen += nwritten;
508 totwritten += nwritten;
e2641e09 509
834ef78e
PN
510 /* If we fully sent the object on head go to the next one */
511 if (c->sentlen == objlen) {
512 listDelNode(c->reply,listFirst(c->reply));
513 c->sentlen = 0;
514 }
e2641e09 515 }
516 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
517 * bytes, in a single threaded server it's a good idea to serve
518 * other clients as well, even if a very large request comes from
519 * super fast link that is always able to accept data (in real world
520 * scenario think about 'KEYS *' against the loopback interfae) */
521 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
522 }
523 if (nwritten == -1) {
524 if (errno == EAGAIN) {
525 nwritten = 0;
526 } else {
527 redisLog(REDIS_VERBOSE,
528 "Error writing to client: %s", strerror(errno));
529 freeClient(c);
530 return;
531 }
532 }
533 if (totwritten > 0) c->lastinteraction = time(NULL);
534 if (listLength(c->reply) == 0) {
535 c->sentlen = 0;
536 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
537 }
538}
539
540void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask)
541{
542 redisClient *c = privdata;
543 int nwritten = 0, totwritten = 0, objlen, willwrite;
544 robj *o;
545 struct iovec iov[REDIS_WRITEV_IOVEC_COUNT];
546 int offset, ion = 0;
547 REDIS_NOTUSED(el);
548 REDIS_NOTUSED(mask);
549
550 listNode *node;
551 while (listLength(c->reply)) {
552 offset = c->sentlen;
553 ion = 0;
554 willwrite = 0;
555
556 /* fill-in the iov[] array */
557 for(node = listFirst(c->reply); node; node = listNextNode(node)) {
558 o = listNodeValue(node);
559 objlen = sdslen(o->ptr);
560
561 if (totwritten + objlen - offset > REDIS_MAX_WRITE_PER_EVENT)
562 break;
563
564 if(ion == REDIS_WRITEV_IOVEC_COUNT)
565 break; /* no more iovecs */
566
567 iov[ion].iov_base = ((char*)o->ptr) + offset;
568 iov[ion].iov_len = objlen - offset;
569 willwrite += objlen - offset;
570 offset = 0; /* just for the first item */
571 ion++;
572 }
573
574 if(willwrite == 0)
575 break;
576
577 /* write all collected blocks at once */
578 if((nwritten = writev(fd, iov, ion)) < 0) {
579 if (errno != EAGAIN) {
580 redisLog(REDIS_VERBOSE,
581 "Error writing to client: %s", strerror(errno));
582 freeClient(c);
583 return;
584 }
585 break;
586 }
587
588 totwritten += nwritten;
589 offset = c->sentlen;
590
591 /* remove written robjs from c->reply */
592 while (nwritten && listLength(c->reply)) {
593 o = listNodeValue(listFirst(c->reply));
594 objlen = sdslen(o->ptr);
595
596 if(nwritten >= objlen - offset) {
597 listDelNode(c->reply, listFirst(c->reply));
598 nwritten -= objlen - offset;
599 c->sentlen = 0;
600 } else {
601 /* partial write */
602 c->sentlen += nwritten;
603 break;
604 }
605 offset = 0;
606 }
607 }
608
609 if (totwritten > 0)
610 c->lastinteraction = time(NULL);
611
612 if (listLength(c->reply) == 0) {
613 c->sentlen = 0;
614 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
615 }
616}
617
618/* resetClient prepare the client to process the next command */
619void resetClient(redisClient *c) {
620 freeClientArgv(c);
621 c->bulklen = -1;
622 c->multibulk = 0;
623}
624
625void closeTimedoutClients(void) {
626 redisClient *c;
627 listNode *ln;
628 time_t now = time(NULL);
629 listIter li;
630
631 listRewind(server.clients,&li);
632 while ((ln = listNext(&li)) != NULL) {
633 c = listNodeValue(ln);
634 if (server.maxidletime &&
635 !(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
636 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
e452436a 637 !(c->flags & REDIS_BLOCKED) && /* no timeout for BLPOP */
e2641e09 638 dictSize(c->pubsub_channels) == 0 && /* no timeout for pubsub */
639 listLength(c->pubsub_patterns) == 0 &&
640 (now - c->lastinteraction > server.maxidletime))
641 {
642 redisLog(REDIS_VERBOSE,"Closing idle client");
643 freeClient(c);
644 } else if (c->flags & REDIS_BLOCKED) {
645 if (c->blockingto != 0 && c->blockingto < now) {
646 addReply(c,shared.nullmultibulk);
647 unblockClientWaitingData(c);
648 }
649 }
650 }
651}
652
653void processInputBuffer(redisClient *c) {
654again:
655 /* Before to process the input buffer, make sure the client is not
656 * waitig for a blocking operation such as BLPOP. Note that the first
657 * iteration the client is never blocked, otherwise the processInputBuffer
658 * would not be called at all, but after the execution of the first commands
659 * in the input buffer the client may be blocked, and the "goto again"
660 * will try to reiterate. The following line will make it return asap. */
661 if (c->flags & REDIS_BLOCKED || c->flags & REDIS_IO_WAIT) return;
662 if (c->bulklen == -1) {
663 /* Read the first line of the query */
664 char *p = strchr(c->querybuf,'\n');
665 size_t querylen;
666
667 if (p) {
668 sds query, *argv;
669 int argc, j;
670
671 query = c->querybuf;
672 c->querybuf = sdsempty();
673 querylen = 1+(p-(query));
674 if (sdslen(query) > querylen) {
675 /* leave data after the first line of the query in the buffer */
676 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
677 }
678 *p = '\0'; /* remove "\n" */
679 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
680 sdsupdatelen(query);
681
682 /* Now we can split the query in arguments */
683 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
684 sdsfree(query);
685
686 if (c->argv) zfree(c->argv);
687 c->argv = zmalloc(sizeof(robj*)*argc);
688
689 for (j = 0; j < argc; j++) {
690 if (sdslen(argv[j])) {
691 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
692 c->argc++;
693 } else {
694 sdsfree(argv[j]);
695 }
696 }
697 zfree(argv);
698 if (c->argc) {
699 /* Execute the command. If the client is still valid
700 * after processCommand() return and there is something
701 * on the query buffer try to process the next command. */
702 if (processCommand(c) && sdslen(c->querybuf)) goto again;
703 } else {
704 /* Nothing to process, argc == 0. Just process the query
705 * buffer if it's not empty or return to the caller */
706 if (sdslen(c->querybuf)) goto again;
707 }
708 return;
709 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
710 redisLog(REDIS_VERBOSE, "Client protocol error");
711 freeClient(c);
712 return;
713 }
714 } else {
715 /* Bulk read handling. Note that if we are at this point
716 the client already sent a command terminated with a newline,
717 we are reading the bulk data that is actually the last
718 argument of the command. */
719 int qbl = sdslen(c->querybuf);
720
721 if (c->bulklen <= qbl) {
722 /* Copy everything but the final CRLF as final argument */
723 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
724 c->argc++;
725 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
726 /* Process the command. If the client is still valid after
727 * the processing and there is more data in the buffer
728 * try to parse it. */
729 if (processCommand(c) && sdslen(c->querybuf)) goto again;
730 return;
731 }
732 }
733}
734
735void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
736 redisClient *c = (redisClient*) privdata;
737 char buf[REDIS_IOBUF_LEN];
738 int nread;
739 REDIS_NOTUSED(el);
740 REDIS_NOTUSED(mask);
741
742 nread = read(fd, buf, REDIS_IOBUF_LEN);
743 if (nread == -1) {
744 if (errno == EAGAIN) {
745 nread = 0;
746 } else {
747 redisLog(REDIS_VERBOSE, "Reading from client: %s",strerror(errno));
748 freeClient(c);
749 return;
750 }
751 } else if (nread == 0) {
752 redisLog(REDIS_VERBOSE, "Client closed connection");
753 freeClient(c);
754 return;
755 }
756 if (nread) {
757 c->querybuf = sdscatlen(c->querybuf, buf, nread);
758 c->lastinteraction = time(NULL);
759 } else {
760 return;
761 }
762 processInputBuffer(c);
763}