4 void *dupClientReplyValue(void *o
) { 
   5     incrRefCount((robj
*)o
); 
   9 int listMatchObjects(void *a
, void *b
) { 
  10     return equalStringObjects(a
,b
); 
  13 redisClient 
*createClient(int fd
) { 
  14     redisClient 
*c 
= zmalloc(sizeof(redisClient
)); 
  17     anetNonBlock(NULL
,fd
); 
  18     anetTcpNoDelay(NULL
,fd
); 
  20     if (aeCreateFileEvent(server
.el
,fd
,AE_READABLE
, 
  21         readQueryFromClient
, c
) == AE_ERR
) 
  30     c
->querybuf 
= sdsempty(); 
  38     c
->lastinteraction 
= time(NULL
); 
  40     c
->replstate 
= REDIS_REPL_NONE
; 
  41     c
->reply 
= listCreate(); 
  42     listSetFreeMethod(c
->reply
,decrRefCount
); 
  43     listSetDupMethod(c
->reply
,dupClientReplyValue
); 
  44     c
->bstate
.keys 
= NULL
; 
  46     c
->bstate
.timeout 
= 0; 
  47     c
->bstate
.target 
= NULL
; 
  48     c
->io_keys 
= listCreate(); 
  49     c
->watched_keys 
= listCreate(); 
  50     listSetFreeMethod(c
->io_keys
,decrRefCount
); 
  51     c
->pubsub_channels 
= dictCreate(&setDictType
,NULL
); 
  52     c
->pubsub_patterns 
= listCreate(); 
  53     listSetFreeMethod(c
->pubsub_patterns
,decrRefCount
); 
  54     listSetMatchMethod(c
->pubsub_patterns
,listMatchObjects
); 
  55     listAddNodeTail(server
.clients
,c
); 
  56     initClientMultiState(c
); 
  60 /* Set the event loop to listen for write events on the client's socket. 
  61  * Typically gets called every time a reply is built. */ 
  62 int _installWriteEvent(redisClient 
*c
) { 
  63     /* When CLOSE_AFTER_REPLY is set, no more replies may be added! */ 
  64     redisAssert(!(c
->flags 
& REDIS_CLOSE_AFTER_REPLY
)); 
  66     if (c
->fd 
<= 0) return REDIS_ERR
; 
  67     if (c
->bufpos 
== 0 && listLength(c
->reply
) == 0 && 
  68         (c
->replstate 
== REDIS_REPL_NONE 
|| 
  69          c
->replstate 
== REDIS_REPL_ONLINE
) && 
  70         aeCreateFileEvent(server
.el
, c
->fd
, AE_WRITABLE
, 
  71         sendReplyToClient
, c
) == AE_ERR
) return REDIS_ERR
; 
  75 /* Create a duplicate of the last object in the reply list when 
  76  * it is not exclusively owned by the reply list. */ 
  77 robj 
*dupLastObjectIfNeeded(list 
*reply
) { 
  80     redisAssert(listLength(reply
) > 0); 
  82     cur 
= listNodeValue(ln
); 
  83     if (cur
->refcount 
> 1) { 
  84         new = dupStringObject(cur
); 
  86         listNodeValue(ln
) = new; 
  88     return listNodeValue(ln
); 
  91 int _addReplyToBuffer(redisClient 
*c
, char *s
, size_t len
) { 
  92     size_t available 
= sizeof(c
->buf
)-c
->bufpos
; 
  94     /* If there already are entries in the reply list, we cannot 
  95      * add anything more to the static buffer. */ 
  96     if (listLength(c
->reply
) > 0) return REDIS_ERR
; 
  98     /* Check that the buffer has enough space available for this string. */ 
  99     if (len 
> available
) return REDIS_ERR
; 
 101     memcpy(c
->buf
+c
->bufpos
,s
,len
); 
 106 void _addReplyObjectToList(redisClient 
*c
, robj 
*o
) { 
 108     if (listLength(c
->reply
) == 0) { 
 110         listAddNodeTail(c
->reply
,o
); 
 112         tail 
= listNodeValue(listLast(c
->reply
)); 
 114         /* Append to this object when possible. */ 
 115         if (tail
->ptr 
!= NULL 
&& 
 116             sdslen(tail
->ptr
)+sdslen(o
->ptr
) <= REDIS_REPLY_CHUNK_BYTES
) 
 118             tail 
= dupLastObjectIfNeeded(c
->reply
); 
 119             tail
->ptr 
= sdscatlen(tail
->ptr
,o
->ptr
,sdslen(o
->ptr
)); 
 122             listAddNodeTail(c
->reply
,o
); 
 127 /* This method takes responsibility over the sds. When it is no longer 
 128  * needed it will be free'd, otherwise it ends up in a robj. */ 
 129 void _addReplySdsToList(redisClient 
*c
, sds s
) { 
 131     if (listLength(c
->reply
) == 0) { 
 132         listAddNodeTail(c
->reply
,createObject(REDIS_STRING
,s
)); 
 134         tail 
= listNodeValue(listLast(c
->reply
)); 
 136         /* Append to this object when possible. */ 
 137         if (tail
->ptr 
!= NULL 
&& 
 138             sdslen(tail
->ptr
)+sdslen(s
) <= REDIS_REPLY_CHUNK_BYTES
) 
 140             tail 
= dupLastObjectIfNeeded(c
->reply
); 
 141             tail
->ptr 
= sdscatlen(tail
->ptr
,s
,sdslen(s
)); 
 144             listAddNodeTail(c
->reply
,createObject(REDIS_STRING
,s
)); 
 149 void _addReplyStringToList(redisClient 
*c
, char *s
, size_t len
) { 
 151     if (listLength(c
->reply
) == 0) { 
 152         listAddNodeTail(c
->reply
,createStringObject(s
,len
)); 
 154         tail 
= listNodeValue(listLast(c
->reply
)); 
 156         /* Append to this object when possible. */ 
 157         if (tail
->ptr 
!= NULL 
&& 
 158             sdslen(tail
->ptr
)+len 
<= REDIS_REPLY_CHUNK_BYTES
) 
 160             tail 
= dupLastObjectIfNeeded(c
->reply
); 
 161             tail
->ptr 
= sdscatlen(tail
->ptr
,s
,len
); 
 163             listAddNodeTail(c
->reply
,createStringObject(s
,len
)); 
 168 void addReply(redisClient 
*c
, robj 
*obj
) { 
 169     if (_installWriteEvent(c
) != REDIS_OK
) return; 
 170     redisAssert(!server
.vm_enabled 
|| obj
->storage 
== REDIS_VM_MEMORY
); 
 172     /* This is an important place where we can avoid copy-on-write 
 173      * when there is a saving child running, avoiding touching the 
 174      * refcount field of the object if it's not needed. 
 176      * If the encoding is RAW and there is room in the static buffer 
 177      * we'll be able to send the object to the client without 
 178      * messing with its page. */ 
 179     if (obj
->encoding 
== REDIS_ENCODING_RAW
) { 
 180         if (_addReplyToBuffer(c
,obj
->ptr
,sdslen(obj
->ptr
)) != REDIS_OK
) 
 181             _addReplyObjectToList(c
,obj
); 
 183         obj 
= getDecodedObject(obj
); 
 184         if (_addReplyToBuffer(c
,obj
->ptr
,sdslen(obj
->ptr
)) != REDIS_OK
) 
 185             _addReplyObjectToList(c
,obj
); 
 190 void addReplySds(redisClient 
*c
, sds s
) { 
 191     if (_installWriteEvent(c
) != REDIS_OK
) { 
 192         /* The caller expects the sds to be free'd. */ 
 196     if (_addReplyToBuffer(c
,s
,sdslen(s
)) == REDIS_OK
) { 
 199         /* This method free's the sds when it is no longer needed. */ 
 200         _addReplySdsToList(c
,s
); 
 204 void addReplyString(redisClient 
*c
, char *s
, size_t len
) { 
 205     if (_installWriteEvent(c
) != REDIS_OK
) return; 
 206     if (_addReplyToBuffer(c
,s
,len
) != REDIS_OK
) 
 207         _addReplyStringToList(c
,s
,len
); 
 210 void _addReplyError(redisClient 
*c
, char *s
, size_t len
) { 
 211     addReplyString(c
,"-ERR ",5); 
 212     addReplyString(c
,s
,len
); 
 213     addReplyString(c
,"\r\n",2); 
 216 void addReplyError(redisClient 
*c
, char *err
) { 
 217     _addReplyError(c
,err
,strlen(err
)); 
 220 void addReplyErrorFormat(redisClient 
*c
, const char *fmt
, ...) { 
 223     sds s 
= sdscatvprintf(sdsempty(),fmt
,ap
); 
 225     _addReplyError(c
,s
,sdslen(s
)); 
 229 void _addReplyStatus(redisClient 
*c
, char *s
, size_t len
) { 
 230     addReplyString(c
,"+",1); 
 231     addReplyString(c
,s
,len
); 
 232     addReplyString(c
,"\r\n",2); 
 235 void addReplyStatus(redisClient 
*c
, char *status
) { 
 236     _addReplyStatus(c
,status
,strlen(status
)); 
 239 void addReplyStatusFormat(redisClient 
*c
, const char *fmt
, ...) { 
 242     sds s 
= sdscatvprintf(sdsempty(),fmt
,ap
); 
 244     _addReplyStatus(c
,s
,sdslen(s
)); 
 248 /* Adds an empty object to the reply list that will contain the multi bulk 
 249  * length, which is not known when this function is called. */ 
 250 void *addDeferredMultiBulkLength(redisClient 
*c
) { 
 251     /* Note that we install the write event here even if the object is not 
 252      * ready to be sent, since we are sure that before returning to the 
 253      * event loop setDeferredMultiBulkLength() will be called. */ 
 254     if (_installWriteEvent(c
) != REDIS_OK
) return NULL
; 
 255     listAddNodeTail(c
->reply
,createObject(REDIS_STRING
,NULL
)); 
 256     return listLast(c
->reply
); 
 259 /* Populate the length object and try glueing it to the next chunk. */ 
 260 void setDeferredMultiBulkLength(redisClient 
*c
, void *node
, long length
) { 
 261     listNode 
*ln 
= (listNode
*)node
; 
 264     /* Abort when *node is NULL (see addDeferredMultiBulkLength). */ 
 265     if (node 
== NULL
) return; 
 267     len 
= listNodeValue(ln
); 
 268     len
->ptr 
= sdscatprintf(sdsempty(),"*%ld\r\n",length
); 
 269     if (ln
->next 
!= NULL
) { 
 270         next 
= listNodeValue(ln
->next
); 
 272         /* Only glue when the next node is non-NULL (an sds in this case) */ 
 273         if (next
->ptr 
!= NULL
) { 
 274             len
->ptr 
= sdscatlen(len
->ptr
,next
->ptr
,sdslen(next
->ptr
)); 
 275             listDelNode(c
->reply
,ln
->next
); 
 280 void addReplyDouble(redisClient 
*c
, double d
) { 
 281     char dbuf
[128], sbuf
[128]; 
 283     dlen 
= snprintf(dbuf
,sizeof(dbuf
),"%.17g",d
); 
 284     slen 
= snprintf(sbuf
,sizeof(sbuf
),"$%d\r\n%s\r\n",dlen
,dbuf
); 
 285     addReplyString(c
,sbuf
,slen
); 
 288 void _addReplyLongLong(redisClient 
*c
, long long ll
, char prefix
) { 
 292     len 
= ll2string(buf
+1,sizeof(buf
)-1,ll
); 
 295     addReplyString(c
,buf
,len
+3); 
 298 void addReplyLongLong(redisClient 
*c
, long long ll
) { 
 299     _addReplyLongLong(c
,ll
,':'); 
 302 void addReplyMultiBulkLen(redisClient 
*c
, long length
) { 
 303     _addReplyLongLong(c
,length
,'*'); 
 306 void addReplyBulkLen(redisClient 
*c
, robj 
*obj
) { 
 309     if (obj
->encoding 
== REDIS_ENCODING_RAW
) { 
 310         len 
= sdslen(obj
->ptr
); 
 312         long n 
= (long)obj
->ptr
; 
 314         /* Compute how many bytes will take this integer as a radix 10 string */ 
 320         while((n 
= n
/10) != 0) { 
 324     _addReplyLongLong(c
,len
,'$'); 
 327 void addReplyBulk(redisClient 
*c
, robj 
*obj
) { 
 328     addReplyBulkLen(c
,obj
); 
 330     addReply(c
,shared
.crlf
); 
 333 /* In the CONFIG command we need to add vanilla C string as bulk replies */ 
 334 void addReplyBulkCString(redisClient 
*c
, char *s
) { 
 336         addReply(c
,shared
.nullbulk
); 
 338         robj 
*o 
= createStringObject(s
,strlen(s
)); 
 344 static void acceptCommonHandler(int fd
) { 
 346     if ((c 
= createClient(fd
)) == NULL
) { 
 347         redisLog(REDIS_WARNING
,"Error allocating resoures for the client"); 
 348         close(fd
); /* May be already closed, just ingore errors */ 
 351     /* If maxclient directive is set and this is one client more... close the 
 352      * connection. Note that we create the client instead to check before 
 353      * for this condition, since now the socket is already set in nonblocking 
 354      * mode and we can send an error for free using the Kernel I/O */ 
 355     if (server
.maxclients 
&& listLength(server
.clients
) > server
.maxclients
) { 
 356         char *err 
= "-ERR max number of clients reached\r\n"; 
 358         /* That's a best effort error message, don't check write errors */ 
 359         if (write(c
->fd
,err
,strlen(err
)) == -1) { 
 360             /* Nothing to do, Just to avoid the warning... */ 
 365     server
.stat_numconnections
++; 
 368 void acceptTcpHandler(aeEventLoop 
*el
, int fd
, void *privdata
, int mask
) { 
 373     REDIS_NOTUSED(privdata
); 
 375     cfd 
= anetTcpAccept(server
.neterr
, fd
, cip
, &cport
); 
 377         redisLog(REDIS_VERBOSE
,"Accepting client connection: %s", server
.neterr
); 
 380     redisLog(REDIS_VERBOSE
,"Accepted %s:%d", cip
, cport
); 
 381     acceptCommonHandler(cfd
); 
 384 void acceptUnixHandler(aeEventLoop 
*el
, int fd
, void *privdata
, int mask
) { 
 388     REDIS_NOTUSED(privdata
); 
 390     cfd 
= anetUnixAccept(server
.neterr
, fd
); 
 392         redisLog(REDIS_VERBOSE
,"Accepting client connection: %s", server
.neterr
); 
 395     redisLog(REDIS_VERBOSE
,"Accepted connection to %s", server
.unixsocket
); 
 396     acceptCommonHandler(cfd
); 
 400 static void freeClientArgv(redisClient 
*c
) { 
 402     for (j 
= 0; j 
< c
->argc
; j
++) 
 403         decrRefCount(c
->argv
[j
]); 
 407 void freeClient(redisClient 
*c
) { 
 410     /* Note that if the client we are freeing is blocked into a blocking 
 411      * call, we have to set querybuf to NULL *before* to call 
 412      * unblockClientWaitingData() to avoid processInputBuffer() will get 
 413      * called. Also it is important to remove the file events after 
 414      * this, because this call adds the READABLE event. */ 
 415     sdsfree(c
->querybuf
); 
 417     if (c
->flags 
& REDIS_BLOCKED
) 
 418         unblockClientWaitingData(c
); 
 420     /* UNWATCH all the keys */ 
 422     listRelease(c
->watched_keys
); 
 423     /* Unsubscribe from all the pubsub channels */ 
 424     pubsubUnsubscribeAllChannels(c
,0); 
 425     pubsubUnsubscribeAllPatterns(c
,0); 
 426     dictRelease(c
->pubsub_channels
); 
 427     listRelease(c
->pubsub_patterns
); 
 428     /* Obvious cleanup */ 
 429     aeDeleteFileEvent(server
.el
,c
->fd
,AE_READABLE
); 
 430     aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
); 
 431     listRelease(c
->reply
); 
 434     /* Remove from the list of clients */ 
 435     ln 
= listSearchKey(server
.clients
,c
); 
 436     redisAssert(ln 
!= NULL
); 
 437     listDelNode(server
.clients
,ln
); 
 438     /* Remove from the list of clients waiting for swapped keys, or ready 
 439      * to be restarted, but not yet woken up again. */ 
 440     if (c
->flags 
& REDIS_IO_WAIT
) { 
 441         redisAssert(server
.vm_enabled
); 
 442         if (listLength(c
->io_keys
) == 0) { 
 443             ln 
= listSearchKey(server
.io_ready_clients
,c
); 
 445             /* When this client is waiting to be woken up (REDIS_IO_WAIT), 
 446              * it should be present in the list io_ready_clients */ 
 447             redisAssert(ln 
!= NULL
); 
 448             listDelNode(server
.io_ready_clients
,ln
); 
 450             while (listLength(c
->io_keys
)) { 
 451                 ln 
= listFirst(c
->io_keys
); 
 452                 dontWaitForSwappedKey(c
,ln
->value
); 
 455         server
.vm_blocked_clients
--; 
 457     listRelease(c
->io_keys
); 
 458     /* Master/slave cleanup. 
 459      * Case 1: we lost the connection with a slave. */ 
 460     if (c
->flags 
& REDIS_SLAVE
) { 
 461         if (c
->replstate 
== REDIS_REPL_SEND_BULK 
&& c
->repldbfd 
!= -1) 
 463         list 
*l 
= (c
->flags 
& REDIS_MONITOR
) ? server
.monitors 
: server
.slaves
; 
 464         ln 
= listSearchKey(l
,c
); 
 465         redisAssert(ln 
!= NULL
); 
 469     /* Case 2: we lost the connection with the master. */ 
 470     if (c
->flags 
& REDIS_MASTER
) { 
 471         server
.master 
= NULL
; 
 473         server
.replstate 
= REDIS_REPL_CONNECT
; 
 474         /* Since we lost the connection with the master, we should also 
 475          * close the connection with all our slaves if we have any, so 
 476          * when we'll resync with the master the other slaves will sync again 
 477          * with us as well. Note that also when the slave is not connected 
 478          * to the master it will keep refusing connections by other slaves. */ 
 479         while (listLength(server
.slaves
)) { 
 480             ln 
= listFirst(server
.slaves
); 
 481             freeClient((redisClient
*)ln
->value
); 
 486     freeClientMultiState(c
); 
 490 void sendReplyToClient(aeEventLoop 
*el
, int fd
, void *privdata
, int mask
) { 
 491     redisClient 
*c 
= privdata
; 
 492     int nwritten 
= 0, totwritten 
= 0, objlen
; 
 497     /* Use writev() if we have enough buffers to send */ 
 498     if (!server
.glueoutputbuf 
&& 
 499         listLength(c
->reply
) > REDIS_WRITEV_THRESHOLD 
&& 
 500         !(c
->flags 
& REDIS_MASTER
)) 
 502         sendReplyToClientWritev(el
, fd
, privdata
, mask
); 
 506     while(c
->bufpos 
> 0 || listLength(c
->reply
)) { 
 508             if (c
->flags 
& REDIS_MASTER
) { 
 509                 /* Don't reply to a master */ 
 510                 nwritten 
= c
->bufpos 
- c
->sentlen
; 
 512                 nwritten 
= write(fd
,c
->buf
+c
->sentlen
,c
->bufpos
-c
->sentlen
); 
 513                 if (nwritten 
<= 0) break; 
 515             c
->sentlen 
+= nwritten
; 
 516             totwritten 
+= nwritten
; 
 518             /* If the buffer was sent, set bufpos to zero to continue with 
 519              * the remainder of the reply. */ 
 520             if (c
->sentlen 
== c
->bufpos
) { 
 525             o 
= listNodeValue(listFirst(c
->reply
)); 
 526             objlen 
= sdslen(o
->ptr
); 
 529                 listDelNode(c
->reply
,listFirst(c
->reply
)); 
 533             if (c
->flags 
& REDIS_MASTER
) { 
 534                 /* Don't reply to a master */ 
 535                 nwritten 
= objlen 
- c
->sentlen
; 
 537                 nwritten 
= write(fd
, ((char*)o
->ptr
)+c
->sentlen
,objlen
-c
->sentlen
); 
 538                 if (nwritten 
<= 0) break; 
 540             c
->sentlen 
+= nwritten
; 
 541             totwritten 
+= nwritten
; 
 543             /* If we fully sent the object on head go to the next one */ 
 544             if (c
->sentlen 
== objlen
) { 
 545                 listDelNode(c
->reply
,listFirst(c
->reply
)); 
 549         /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT 
 550          * bytes, in a single threaded server it's a good idea to serve 
 551          * other clients as well, even if a very large request comes from 
 552          * super fast link that is always able to accept data (in real world 
 553          * scenario think about 'KEYS *' against the loopback interfae) */ 
 554         if (totwritten 
> REDIS_MAX_WRITE_PER_EVENT
) break; 
 556     if (nwritten 
== -1) { 
 557         if (errno 
== EAGAIN
) { 
 560             redisLog(REDIS_VERBOSE
, 
 561                 "Error writing to client: %s", strerror(errno
)); 
 566     if (totwritten 
> 0) c
->lastinteraction 
= time(NULL
); 
 567     if (listLength(c
->reply
) == 0) { 
 569         aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
); 
 571         /* Close connection after entire reply has been sent. */ 
 572         if (c
->flags 
& REDIS_CLOSE_AFTER_REPLY
) freeClient(c
); 
 576 void sendReplyToClientWritev(aeEventLoop 
*el
, int fd
, void *privdata
, int mask
) 
 578     redisClient 
*c 
= privdata
; 
 579     int nwritten 
= 0, totwritten 
= 0, objlen
, willwrite
; 
 581     struct iovec iov
[REDIS_WRITEV_IOVEC_COUNT
]; 
 587     while (listLength(c
->reply
)) { 
 592         /* fill-in the iov[] array */ 
 593         for(node 
= listFirst(c
->reply
); node
; node 
= listNextNode(node
)) { 
 594             o 
= listNodeValue(node
); 
 595             objlen 
= sdslen(o
->ptr
); 
 597             if (totwritten 
+ objlen 
- offset 
> REDIS_MAX_WRITE_PER_EVENT
) 
 600             if(ion 
== REDIS_WRITEV_IOVEC_COUNT
) 
 601                 break; /* no more iovecs */ 
 603             iov
[ion
].iov_base 
= ((char*)o
->ptr
) + offset
; 
 604             iov
[ion
].iov_len 
= objlen 
- offset
; 
 605             willwrite 
+= objlen 
- offset
; 
 606             offset 
= 0; /* just for the first item */ 
 613         /* write all collected blocks at once */ 
 614         if((nwritten 
= writev(fd
, iov
, ion
)) < 0) { 
 615             if (errno 
!= EAGAIN
) { 
 616                 redisLog(REDIS_VERBOSE
, 
 617                          "Error writing to client: %s", strerror(errno
)); 
 624         totwritten 
+= nwritten
; 
 627         /* remove written robjs from c->reply */ 
 628         while (nwritten 
&& listLength(c
->reply
)) { 
 629             o 
= listNodeValue(listFirst(c
->reply
)); 
 630             objlen 
= sdslen(o
->ptr
); 
 632             if(nwritten 
>= objlen 
- offset
) { 
 633                 listDelNode(c
->reply
, listFirst(c
->reply
)); 
 634                 nwritten 
-= objlen 
- offset
; 
 638                 c
->sentlen 
+= nwritten
; 
 646         c
->lastinteraction 
= time(NULL
); 
 648     if (listLength(c
->reply
) == 0) { 
 650         aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
); 
 654 /* resetClient prepare the client to process the next command */ 
 655 void resetClient(redisClient 
*c
) { 
 662 void closeTimedoutClients(void) { 
 665     time_t now 
= time(NULL
); 
 668     listRewind(server
.clients
,&li
); 
 669     while ((ln 
= listNext(&li
)) != NULL
) { 
 670         c 
= listNodeValue(ln
); 
 671         if (server
.maxidletime 
&& 
 672             !(c
->flags 
& REDIS_SLAVE
) &&    /* no timeout for slaves */ 
 673             !(c
->flags 
& REDIS_MASTER
) &&   /* no timeout for masters */ 
 674             !(c
->flags 
& REDIS_BLOCKED
) &&  /* no timeout for BLPOP */ 
 675             dictSize(c
->pubsub_channels
) == 0 && /* no timeout for pubsub */ 
 676             listLength(c
->pubsub_patterns
) == 0 && 
 677             (now 
- c
->lastinteraction 
> server
.maxidletime
)) 
 679             redisLog(REDIS_VERBOSE
,"Closing idle client"); 
 681         } else if (c
->flags 
& REDIS_BLOCKED
) { 
 682             if (c
->bstate
.timeout 
!= 0 && c
->bstate
.timeout 
< now
) { 
 683                 addReply(c
,shared
.nullmultibulk
); 
 684                 unblockClientWaitingData(c
); 
 690 int processInlineBuffer(redisClient 
*c
) { 
 691     char *newline 
= strstr(c
->querybuf
,"\r\n"); 
 696     /* Nothing to do without a \r\n */ 
 700     /* Split the input buffer up to the \r\n */ 
 701     querylen 
= newline
-(c
->querybuf
); 
 702     argv 
= sdssplitlen(c
->querybuf
,querylen
," ",1,&argc
); 
 704     /* Leave data after the first line of the query in the buffer */ 
 705     c
->querybuf 
= sdsrange(c
->querybuf
,querylen
+2,-1); 
 707     /* Setup argv array on client structure */ 
 708     if (c
->argv
) zfree(c
->argv
); 
 709     c
->argv 
= zmalloc(sizeof(robj
*)*argc
); 
 711     /* Create redis objects for all arguments. */ 
 712     for (c
->argc 
= 0, j 
= 0; j 
< argc
; j
++) { 
 713         if (sdslen(argv
[j
])) { 
 714             c
->argv
[c
->argc
] = createObject(REDIS_STRING
,argv
[j
]); 
 724 /* Helper function. Trims query buffer to make the function that processes 
 725  * multi bulk requests idempotent. */ 
 726 static void setProtocolError(redisClient 
*c
, int pos
) { 
 727     c
->flags 
|= REDIS_CLOSE_AFTER_REPLY
; 
 728     c
->querybuf 
= sdsrange(c
->querybuf
,pos
,-1); 
 731 int processMultibulkBuffer(redisClient 
*c
) { 
 732     char *newline 
= NULL
; 
 737     if (c
->multibulklen 
== 0) { 
 738         /* The client should have been reset */ 
 739         redisAssert(c
->argc 
== 0); 
 741         /* Multi bulk length cannot be read without a \r\n */ 
 742         newline 
= strstr(c
->querybuf
,"\r\n"); 
 746         /* We know for sure there is a whole line since newline != NULL, 
 747          * so go ahead and find out the multi bulk length. */ 
 748         redisAssert(c
->querybuf
[0] == '*'); 
 749         c
->multibulklen 
= strtol(c
->querybuf
+1,&eptr
,10); 
 750         pos 
= (newline
-c
->querybuf
)+2; 
 751         if (c
->multibulklen 
<= 0) { 
 752             c
->querybuf 
= sdsrange(c
->querybuf
,pos
,-1); 
 754         } else if (c
->multibulklen 
> 1024*1024) { 
 755             addReplyError(c
,"Protocol error: invalid multibulk length"); 
 756             setProtocolError(c
,pos
); 
 760         /* Setup argv array on client structure */ 
 761         if (c
->argv
) zfree(c
->argv
); 
 762         c
->argv 
= zmalloc(sizeof(robj
*)*c
->multibulklen
); 
 764         /* Search new newline */ 
 765         newline 
= strstr(c
->querybuf
+pos
,"\r\n"); 
 768     redisAssert(c
->multibulklen 
> 0); 
 769     while(c
->multibulklen
) { 
 770         /* Read bulk length if unknown */ 
 771         if (c
->bulklen 
== -1) { 
 772             newline 
= strstr(c
->querybuf
+pos
,"\r\n"); 
 773             if (newline 
!= NULL
) { 
 774                 if (c
->querybuf
[pos
] != '$') { 
 775                     addReplyErrorFormat(c
, 
 776                         "Protocol error: expected '$', got '%c'", 
 778                     setProtocolError(c
,pos
); 
 782                 bulklen 
= strtol(c
->querybuf
+pos
+1,&eptr
,10); 
 783                 tolerr 
= (eptr
[0] != '\r'); 
 784                 if (tolerr 
|| bulklen 
== LONG_MIN 
|| bulklen 
== LONG_MAX 
|| 
 785                     bulklen 
< 0 || bulklen 
> 1024*1024*1024) 
 787                     addReplyError(c
,"Protocol error: invalid bulk length"); 
 788                     setProtocolError(c
,pos
); 
 791                 pos 
+= eptr
-(c
->querybuf
+pos
)+2; 
 792                 c
->bulklen 
= bulklen
; 
 794                 /* No newline in current buffer, so wait for more data */ 
 799         /* Read bulk argument */ 
 800         if (sdslen(c
->querybuf
)-pos 
< (unsigned)(c
->bulklen
+2)) { 
 801             /* Not enough data (+2 == trailing \r\n) */ 
 804             c
->argv
[c
->argc
++] = createStringObject(c
->querybuf
+pos
,c
->bulklen
); 
 812     c
->querybuf 
= sdsrange(c
->querybuf
,pos
,-1); 
 814     /* We're done when c->multibulk == 0 */ 
 815     if (c
->multibulklen 
== 0) { 
 821 void processInputBuffer(redisClient 
*c
) { 
 822     /* Keep processing while there is something in the input buffer */ 
 823     while(sdslen(c
->querybuf
)) { 
 824         /* Immediately abort if the client is in the middle of something. */ 
 825         if (c
->flags 
& REDIS_BLOCKED 
|| c
->flags 
& REDIS_IO_WAIT
) return; 
 827         /* REDIS_CLOSE_AFTER_REPLY closes the connection once the reply is 
 828          * written to the client. Make sure to not let the reply grow after 
 829          * this flag has been set (i.e. don't process more commands). */ 
 830         if (c
->flags 
& REDIS_CLOSE_AFTER_REPLY
) return; 
 832         /* Determine request type when unknown. */ 
 834             if (c
->querybuf
[0] == '*') { 
 835                 c
->reqtype 
= REDIS_REQ_MULTIBULK
; 
 837                 c
->reqtype 
= REDIS_REQ_INLINE
; 
 841         if (c
->reqtype 
== REDIS_REQ_INLINE
) { 
 842             if (processInlineBuffer(c
) != REDIS_OK
) break; 
 843         } else if (c
->reqtype 
== REDIS_REQ_MULTIBULK
) { 
 844             if (processMultibulkBuffer(c
) != REDIS_OK
) break; 
 846             redisPanic("Unknown request type"); 
 849         /* Multibulk processing could see a <= 0 length. */ 
 853             /* Only reset the client when the command was executed. */ 
 854             if (processCommand(c
) == REDIS_OK
) 
 860 void readQueryFromClient(aeEventLoop 
*el
, int fd
, void *privdata
, int mask
) { 
 861     redisClient 
*c 
= (redisClient
*) privdata
; 
 862     char buf
[REDIS_IOBUF_LEN
]; 
 867     nread 
= read(fd
, buf
, REDIS_IOBUF_LEN
); 
 869         if (errno 
== EAGAIN
) { 
 872             redisLog(REDIS_VERBOSE
, "Reading from client: %s",strerror(errno
)); 
 876     } else if (nread 
== 0) { 
 877         redisLog(REDIS_VERBOSE
, "Client closed connection"); 
 882         c
->querybuf 
= sdscatlen(c
->querybuf
,buf
,nread
); 
 883         c
->lastinteraction 
= time(NULL
); 
 887     processInputBuffer(c
);