4 void *dupClientReplyValue(void *o
) { 
   5     incrRefCount((robj
*)o
); 
   9 int listMatchObjects(void *a
, void *b
) { 
  10     return equalStringObjects(a
,b
); 
  13 redisClient 
*createClient(int fd
) { 
  14     redisClient 
*c 
= zmalloc(sizeof(redisClient
)); 
  17     /* passing -1 as fd it is possible to create a non connected client. 
  18      * This is useful since all the Redis commands needs to be executed 
  19      * in the context of a client. When commands are executed in other 
  20      * contexts (for instance a Lua script) we need a non connected client. */ 
  22         anetNonBlock(NULL
,fd
); 
  23         anetTcpNoDelay(NULL
,fd
); 
  24         if (aeCreateFileEvent(server
.el
,fd
,AE_READABLE
, 
  25             readQueryFromClient
, c
) == AE_ERR
) 
  35     c
->querybuf 
= sdsempty(); 
  43     c
->lastinteraction 
= time(NULL
); 
  45     c
->replstate 
= REDIS_REPL_NONE
; 
  46     c
->reply 
= listCreate(); 
  47     listSetFreeMethod(c
->reply
,decrRefCount
); 
  48     listSetDupMethod(c
->reply
,dupClientReplyValue
); 
  52     c
->bpop
.target 
= NULL
; 
  53     c
->io_keys 
= listCreate(); 
  54     c
->watched_keys 
= listCreate(); 
  55     listSetFreeMethod(c
->io_keys
,decrRefCount
); 
  56     c
->pubsub_channels 
= dictCreate(&setDictType
,NULL
); 
  57     c
->pubsub_patterns 
= listCreate(); 
  58     listSetFreeMethod(c
->pubsub_patterns
,decrRefCount
); 
  59     listSetMatchMethod(c
->pubsub_patterns
,listMatchObjects
); 
  60     if (fd 
!= -1) listAddNodeTail(server
.clients
,c
); 
  61     initClientMultiState(c
); 
  65 /* Set the event loop to listen for write events on the client's socket. 
  66  * Typically gets called every time a reply is built. */ 
  67 int _installWriteEvent(redisClient 
*c
) { 
  68     if (c
->flags 
& REDIS_LUA_CLIENT
) return REDIS_OK
; 
  69     if (c
->fd 
<= 0) return REDIS_ERR
; 
  70     if (c
->bufpos 
== 0 && listLength(c
->reply
) == 0 && 
  71         (c
->replstate 
== REDIS_REPL_NONE 
|| 
  72          c
->replstate 
== REDIS_REPL_ONLINE
) && 
  73         aeCreateFileEvent(server
.el
, c
->fd
, AE_WRITABLE
, 
  74         sendReplyToClient
, c
) == AE_ERR
) return REDIS_ERR
; 
  78 /* Create a duplicate of the last object in the reply list when 
  79  * it is not exclusively owned by the reply list. */ 
  80 robj 
*dupLastObjectIfNeeded(list 
*reply
) { 
  83     redisAssert(listLength(reply
) > 0); 
  85     cur 
= listNodeValue(ln
); 
  86     if (cur
->refcount 
> 1) { 
  87         new = dupStringObject(cur
); 
  89         listNodeValue(ln
) = new; 
  91     return listNodeValue(ln
); 
  94 /* ----------------------------------------------------------------------------- 
  95  * Low level functions to add more data to output buffers. 
  96  * -------------------------------------------------------------------------- */ 
  98 int _addReplyToBuffer(redisClient 
*c
, char *s
, size_t len
) { 
  99     size_t available 
= sizeof(c
->buf
)-c
->bufpos
; 
 101     if (c
->flags 
& REDIS_CLOSE_AFTER_REPLY
) return REDIS_OK
; 
 103     /* If there already are entries in the reply list, we cannot 
 104      * add anything more to the static buffer. */ 
 105     if (listLength(c
->reply
) > 0) return REDIS_ERR
; 
 107     /* Check that the buffer has enough space available for this string. */ 
 108     if (len 
> available
) return REDIS_ERR
; 
 110     memcpy(c
->buf
+c
->bufpos
,s
,len
); 
 115 void _addReplyObjectToList(redisClient 
*c
, robj 
*o
) { 
 118     if (c
->flags 
& REDIS_CLOSE_AFTER_REPLY
) return; 
 120     if (listLength(c
->reply
) == 0) { 
 122         listAddNodeTail(c
->reply
,o
); 
 124         tail 
= listNodeValue(listLast(c
->reply
)); 
 126         /* Append to this object when possible. */ 
 127         if (tail
->ptr 
!= NULL 
&& 
 128             sdslen(tail
->ptr
)+sdslen(o
->ptr
) <= REDIS_REPLY_CHUNK_BYTES
) 
 130             tail 
= dupLastObjectIfNeeded(c
->reply
); 
 131             tail
->ptr 
= sdscatlen(tail
->ptr
,o
->ptr
,sdslen(o
->ptr
)); 
 134             listAddNodeTail(c
->reply
,o
); 
 139 /* This method takes responsibility over the sds. When it is no longer 
 140  * needed it will be free'd, otherwise it ends up in a robj. */ 
 141 void _addReplySdsToList(redisClient 
*c
, sds s
) { 
 144     if (c
->flags 
& REDIS_CLOSE_AFTER_REPLY
) { 
 149     if (listLength(c
->reply
) == 0) { 
 150         listAddNodeTail(c
->reply
,createObject(REDIS_STRING
,s
)); 
 152         tail 
= listNodeValue(listLast(c
->reply
)); 
 154         /* Append to this object when possible. */ 
 155         if (tail
->ptr 
!= NULL 
&& 
 156             sdslen(tail
->ptr
)+sdslen(s
) <= REDIS_REPLY_CHUNK_BYTES
) 
 158             tail 
= dupLastObjectIfNeeded(c
->reply
); 
 159             tail
->ptr 
= sdscatlen(tail
->ptr
,s
,sdslen(s
)); 
 162             listAddNodeTail(c
->reply
,createObject(REDIS_STRING
,s
)); 
 167 void _addReplyStringToList(redisClient 
*c
, char *s
, size_t len
) { 
 170     if (c
->flags 
& REDIS_CLOSE_AFTER_REPLY
) return; 
 172     if (listLength(c
->reply
) == 0) { 
 173         listAddNodeTail(c
->reply
,createStringObject(s
,len
)); 
 175         tail 
= listNodeValue(listLast(c
->reply
)); 
 177         /* Append to this object when possible. */ 
 178         if (tail
->ptr 
!= NULL 
&& 
 179             sdslen(tail
->ptr
)+len 
<= REDIS_REPLY_CHUNK_BYTES
) 
 181             tail 
= dupLastObjectIfNeeded(c
->reply
); 
 182             tail
->ptr 
= sdscatlen(tail
->ptr
,s
,len
); 
 184             listAddNodeTail(c
->reply
,createStringObject(s
,len
)); 
 189 /* ----------------------------------------------------------------------------- 
 190  * Higher level functions to queue data on the client output buffer. 
 191  * The following functions are the ones that commands implementations will call. 
 192  * -------------------------------------------------------------------------- */ 
 194 void addReply(redisClient 
*c
, robj 
*obj
) { 
 195     if (_installWriteEvent(c
) != REDIS_OK
) return; 
 197     /* This is an important place where we can avoid copy-on-write 
 198      * when there is a saving child running, avoiding touching the 
 199      * refcount field of the object if it's not needed. 
 201      * If the encoding is RAW and there is room in the static buffer 
 202      * we'll be able to send the object to the client without 
 203      * messing with its page. */ 
 204     if (obj
->encoding 
== REDIS_ENCODING_RAW
) { 
 205         if (_addReplyToBuffer(c
,obj
->ptr
,sdslen(obj
->ptr
)) != REDIS_OK
) 
 206             _addReplyObjectToList(c
,obj
); 
 208         /* FIXME: convert the long into string and use _addReplyToBuffer() 
 209          * instead of calling getDecodedObject. As this place in the 
 210          * code is too performance critical. */ 
 211         obj 
= getDecodedObject(obj
); 
 212         if (_addReplyToBuffer(c
,obj
->ptr
,sdslen(obj
->ptr
)) != REDIS_OK
) 
 213             _addReplyObjectToList(c
,obj
); 
 218 void addReplySds(redisClient 
*c
, sds s
) { 
 219     if (_installWriteEvent(c
) != REDIS_OK
) { 
 220         /* The caller expects the sds to be free'd. */ 
 224     if (_addReplyToBuffer(c
,s
,sdslen(s
)) == REDIS_OK
) { 
 227         /* This method free's the sds when it is no longer needed. */ 
 228         _addReplySdsToList(c
,s
); 
 232 void addReplyString(redisClient 
*c
, char *s
, size_t len
) { 
 233     if (_installWriteEvent(c
) != REDIS_OK
) return; 
 234     if (_addReplyToBuffer(c
,s
,len
) != REDIS_OK
) 
 235         _addReplyStringToList(c
,s
,len
); 
 238 void _addReplyError(redisClient 
*c
, char *s
, size_t len
) { 
 239     addReplyString(c
,"-ERR ",5); 
 240     addReplyString(c
,s
,len
); 
 241     addReplyString(c
,"\r\n",2); 
 244 void addReplyError(redisClient 
*c
, char *err
) { 
 245     _addReplyError(c
,err
,strlen(err
)); 
 248 void addReplyErrorFormat(redisClient 
*c
, const char *fmt
, ...) { 
 251     sds s 
= sdscatvprintf(sdsempty(),fmt
,ap
); 
 253     _addReplyError(c
,s
,sdslen(s
)); 
 257 void _addReplyStatus(redisClient 
*c
, char *s
, size_t len
) { 
 258     addReplyString(c
,"+",1); 
 259     addReplyString(c
,s
,len
); 
 260     addReplyString(c
,"\r\n",2); 
 263 void addReplyStatus(redisClient 
*c
, char *status
) { 
 264     _addReplyStatus(c
,status
,strlen(status
)); 
 267 void addReplyStatusFormat(redisClient 
*c
, const char *fmt
, ...) { 
 270     sds s 
= sdscatvprintf(sdsempty(),fmt
,ap
); 
 272     _addReplyStatus(c
,s
,sdslen(s
)); 
 276 /* Adds an empty object to the reply list that will contain the multi bulk 
 277  * length, which is not known when this function is called. */ 
 278 void *addDeferredMultiBulkLength(redisClient 
*c
) { 
 279     /* Note that we install the write event here even if the object is not 
 280      * ready to be sent, since we are sure that before returning to the 
 281      * event loop setDeferredMultiBulkLength() will be called. */ 
 282     if (_installWriteEvent(c
) != REDIS_OK
) return NULL
; 
 283     listAddNodeTail(c
->reply
,createObject(REDIS_STRING
,NULL
)); 
 284     return listLast(c
->reply
); 
 287 /* Populate the length object and try glueing it to the next chunk. */ 
 288 void setDeferredMultiBulkLength(redisClient 
*c
, void *node
, long length
) { 
 289     listNode 
*ln 
= (listNode
*)node
; 
 292     /* Abort when *node is NULL (see addDeferredMultiBulkLength). */ 
 293     if (node 
== NULL
) return; 
 295     len 
= listNodeValue(ln
); 
 296     len
->ptr 
= sdscatprintf(sdsempty(),"*%ld\r\n",length
); 
 297     if (ln
->next 
!= NULL
) { 
 298         next 
= listNodeValue(ln
->next
); 
 300         /* Only glue when the next node is non-NULL (an sds in this case) */ 
 301         if (next
->ptr 
!= NULL
) { 
 302             len
->ptr 
= sdscatlen(len
->ptr
,next
->ptr
,sdslen(next
->ptr
)); 
 303             listDelNode(c
->reply
,ln
->next
); 
 308 /* Add a duble as a bulk reply */ 
 309 void addReplyDouble(redisClient 
*c
, double d
) { 
 310     char dbuf
[128], sbuf
[128]; 
 312     dlen 
= snprintf(dbuf
,sizeof(dbuf
),"%.17g",d
); 
 313     slen 
= snprintf(sbuf
,sizeof(sbuf
),"$%d\r\n%s\r\n",dlen
,dbuf
); 
 314     addReplyString(c
,sbuf
,slen
); 
 317 /* Add a long long as integer reply or bulk len / multi bulk count. 
 318  * Basically this is used to output <prefix><long long><crlf>. */ 
 319 void _addReplyLongLong(redisClient 
*c
, long long ll
, char prefix
) { 
 323     len 
= ll2string(buf
+1,sizeof(buf
)-1,ll
); 
 326     addReplyString(c
,buf
,len
+3); 
 329 void addReplyLongLong(redisClient 
*c
, long long ll
) { 
 331         addReply(c
,shared
.czero
); 
 333         addReply(c
,shared
.cone
); 
 335         _addReplyLongLong(c
,ll
,':'); 
 338 void addReplyMultiBulkLen(redisClient 
*c
, long length
) { 
 339     _addReplyLongLong(c
,length
,'*'); 
 342 /* Create the length prefix of a bulk reply, example: $2234 */ 
 343 void addReplyBulkLen(redisClient 
*c
, robj 
*obj
) { 
 346     if (obj
->encoding 
== REDIS_ENCODING_RAW
) { 
 347         len 
= sdslen(obj
->ptr
); 
 349         long n 
= (long)obj
->ptr
; 
 351         /* Compute how many bytes will take this integer as a radix 10 string */ 
 357         while((n 
= n
/10) != 0) { 
 361     _addReplyLongLong(c
,len
,'$'); 
 364 /* Add a Redis Object as a bulk reply */ 
 365 void addReplyBulk(redisClient 
*c
, robj 
*obj
) { 
 366     addReplyBulkLen(c
,obj
); 
 368     addReply(c
,shared
.crlf
); 
 371 /* Add a C buffer as bulk reply */ 
 372 void addReplyBulkCBuffer(redisClient 
*c
, void *p
, size_t len
) { 
 373     _addReplyLongLong(c
,len
,'$'); 
 374     addReplyString(c
,p
,len
); 
 375     addReply(c
,shared
.crlf
); 
 378 /* Add a C nul term string as bulk reply */ 
 379 void addReplyBulkCString(redisClient 
*c
, char *s
) { 
 381         addReply(c
,shared
.nullbulk
); 
 383         addReplyBulkCBuffer(c
,s
,strlen(s
)); 
 387 /* Add a long long as a bulk reply */ 
 388 void addReplyBulkLongLong(redisClient 
*c
, long long ll
) { 
 392     len 
= ll2string(buf
,64,ll
); 
 393     addReplyBulkCBuffer(c
,buf
,len
); 
 396 static void acceptCommonHandler(int fd
) { 
 398     if ((c 
= createClient(fd
)) == NULL
) { 
 399         redisLog(REDIS_WARNING
,"Error allocating resoures for the client"); 
 400         close(fd
); /* May be already closed, just ingore errors */ 
 403     /* If maxclient directive is set and this is one client more... close the 
 404      * connection. Note that we create the client instead to check before 
 405      * for this condition, since now the socket is already set in nonblocking 
 406      * mode and we can send an error for free using the Kernel I/O */ 
 407     if (server
.maxclients 
&& listLength(server
.clients
) > server
.maxclients
) { 
 408         char *err 
= "-ERR max number of clients reached\r\n"; 
 410         /* That's a best effort error message, don't check write errors */ 
 411         if (write(c
->fd
,err
,strlen(err
)) == -1) { 
 412             /* Nothing to do, Just to avoid the warning... */ 
 417     server
.stat_numconnections
++; 
 420 void acceptTcpHandler(aeEventLoop 
*el
, int fd
, void *privdata
, int mask
) { 
 425     REDIS_NOTUSED(privdata
); 
 427     cfd 
= anetTcpAccept(server
.neterr
, fd
, cip
, &cport
); 
 429         redisLog(REDIS_WARNING
,"Accepting client connection: %s", server
.neterr
); 
 432     redisLog(REDIS_VERBOSE
,"Accepted %s:%d", cip
, cport
); 
 433     acceptCommonHandler(cfd
); 
 436 void acceptUnixHandler(aeEventLoop 
*el
, int fd
, void *privdata
, int mask
) { 
 440     REDIS_NOTUSED(privdata
); 
 442     cfd 
= anetUnixAccept(server
.neterr
, fd
); 
 444         redisLog(REDIS_WARNING
,"Accepting client connection: %s", server
.neterr
); 
 447     redisLog(REDIS_VERBOSE
,"Accepted connection to %s", server
.unixsocket
); 
 448     acceptCommonHandler(cfd
); 
 452 static void freeClientArgv(redisClient 
*c
) { 
 454     for (j 
= 0; j 
< c
->argc
; j
++) 
 455         decrRefCount(c
->argv
[j
]); 
 459 void freeClient(redisClient 
*c
) { 
 462     /* Note that if the client we are freeing is blocked into a blocking 
 463      * call, we have to set querybuf to NULL *before* to call 
 464      * unblockClientWaitingData() to avoid processInputBuffer() will get 
 465      * called. Also it is important to remove the file events after 
 466      * this, because this call adds the READABLE event. */ 
 467     sdsfree(c
->querybuf
); 
 469     if (c
->flags 
& REDIS_BLOCKED
) 
 470         unblockClientWaitingData(c
); 
 472     /* UNWATCH all the keys */ 
 474     listRelease(c
->watched_keys
); 
 475     /* Unsubscribe from all the pubsub channels */ 
 476     pubsubUnsubscribeAllChannels(c
,0); 
 477     pubsubUnsubscribeAllPatterns(c
,0); 
 478     dictRelease(c
->pubsub_channels
); 
 479     listRelease(c
->pubsub_patterns
); 
 480     /* Obvious cleanup */ 
 481     aeDeleteFileEvent(server
.el
,c
->fd
,AE_READABLE
); 
 482     aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
); 
 483     listRelease(c
->reply
); 
 486     /* Remove from the list of clients */ 
 487     ln 
= listSearchKey(server
.clients
,c
); 
 488     redisAssert(ln 
!= NULL
); 
 489     listDelNode(server
.clients
,ln
); 
 490     /* When client was just unblocked because of a blocking operation, 
 491      * remove it from the list with unblocked clients. */ 
 492     if (c
->flags 
& REDIS_UNBLOCKED
) { 
 493         ln 
= listSearchKey(server
.unblocked_clients
,c
); 
 494         redisAssert(ln 
!= NULL
); 
 495         listDelNode(server
.unblocked_clients
,ln
); 
 497     /* Remove from the list of clients waiting for swapped keys, or ready 
 498      * to be restarted, but not yet woken up again. */ 
 499     if (c
->flags 
& REDIS_IO_WAIT
) { 
 500         redisAssert(server
.ds_enabled
); 
 501         if (listLength(c
->io_keys
) == 0) { 
 502             ln 
= listSearchKey(server
.io_ready_clients
,c
); 
 504             /* When this client is waiting to be woken up (REDIS_IO_WAIT), 
 505              * it should be present in the list io_ready_clients */ 
 506             redisAssert(ln 
!= NULL
); 
 507             listDelNode(server
.io_ready_clients
,ln
); 
 509             while (listLength(c
->io_keys
)) { 
 510                 ln 
= listFirst(c
->io_keys
); 
 511                 dontWaitForSwappedKey(c
,ln
->value
); 
 514         server
.cache_blocked_clients
--; 
 516     listRelease(c
->io_keys
); 
 517     /* Master/slave cleanup. 
 518      * Case 1: we lost the connection with a slave. */ 
 519     if (c
->flags 
& REDIS_SLAVE
) { 
 520         if (c
->replstate 
== REDIS_REPL_SEND_BULK 
&& c
->repldbfd 
!= -1) 
 522         list 
*l 
= (c
->flags 
& REDIS_MONITOR
) ? server
.monitors 
: server
.slaves
; 
 523         ln 
= listSearchKey(l
,c
); 
 524         redisAssert(ln 
!= NULL
); 
 528     /* Case 2: we lost the connection with the master. */ 
 529     if (c
->flags 
& REDIS_MASTER
) { 
 530         server
.master 
= NULL
; 
 531         server
.replstate 
= REDIS_REPL_CONNECT
; 
 532         /* Since we lost the connection with the master, we should also 
 533          * close the connection with all our slaves if we have any, so 
 534          * when we'll resync with the master the other slaves will sync again 
 535          * with us as well. Note that also when the slave is not connected 
 536          * to the master it will keep refusing connections by other slaves. 
 538          * We do this only if server.masterhost != NULL. If it is NULL this 
 539          * means the user called SLAVEOF NO ONE and we are freeing our 
 540          * link with the master, so no need to close link with slaves. */ 
 541         if (server
.masterhost 
!= NULL
) { 
 542             while (listLength(server
.slaves
)) { 
 543                 ln 
= listFirst(server
.slaves
); 
 544                 freeClient((redisClient
*)ln
->value
); 
 550     freeClientMultiState(c
); 
 554 void sendReplyToClient(aeEventLoop 
*el
, int fd
, void *privdata
, int mask
) { 
 555     redisClient 
*c 
= privdata
; 
 556     int nwritten 
= 0, totwritten 
= 0, objlen
; 
 561     while(c
->bufpos 
> 0 || listLength(c
->reply
)) { 
 563             if (c
->flags 
& REDIS_MASTER
) { 
 564                 /* Don't reply to a master */ 
 565                 nwritten 
= c
->bufpos 
- c
->sentlen
; 
 567                 nwritten 
= write(fd
,c
->buf
+c
->sentlen
,c
->bufpos
-c
->sentlen
); 
 568                 if (nwritten 
<= 0) break; 
 570             c
->sentlen 
+= nwritten
; 
 571             totwritten 
+= nwritten
; 
 573             /* If the buffer was sent, set bufpos to zero to continue with 
 574              * the remainder of the reply. */ 
 575             if (c
->sentlen 
== c
->bufpos
) { 
 580             o 
= listNodeValue(listFirst(c
->reply
)); 
 581             objlen 
= sdslen(o
->ptr
); 
 584                 listDelNode(c
->reply
,listFirst(c
->reply
)); 
 588             if (c
->flags 
& REDIS_MASTER
) { 
 589                 /* Don't reply to a master */ 
 590                 nwritten 
= objlen 
- c
->sentlen
; 
 592                 nwritten 
= write(fd
, ((char*)o
->ptr
)+c
->sentlen
,objlen
-c
->sentlen
); 
 593                 if (nwritten 
<= 0) break; 
 595             c
->sentlen 
+= nwritten
; 
 596             totwritten 
+= nwritten
; 
 598             /* If we fully sent the object on head go to the next one */ 
 599             if (c
->sentlen 
== objlen
) { 
 600                 listDelNode(c
->reply
,listFirst(c
->reply
)); 
 604         /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT 
 605          * bytes, in a single threaded server it's a good idea to serve 
 606          * other clients as well, even if a very large request comes from 
 607          * super fast link that is always able to accept data (in real world 
 608          * scenario think about 'KEYS *' against the loopback interfae) */ 
 609         if (totwritten 
> REDIS_MAX_WRITE_PER_EVENT
) break; 
 611     if (nwritten 
== -1) { 
 612         if (errno 
== EAGAIN
) { 
 615             redisLog(REDIS_VERBOSE
, 
 616                 "Error writing to client: %s", strerror(errno
)); 
 621     if (totwritten 
> 0) c
->lastinteraction 
= time(NULL
); 
 622     if (listLength(c
->reply
) == 0) { 
 624         aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
); 
 626         /* Close connection after entire reply has been sent. */ 
 627         if (c
->flags 
& REDIS_CLOSE_AFTER_REPLY
) freeClient(c
); 
 631 /* resetClient prepare the client to process the next command */ 
 632 void resetClient(redisClient 
*c
) { 
 639 void closeTimedoutClients(void) { 
 642     time_t now 
= time(NULL
); 
 645     listRewind(server
.clients
,&li
); 
 646     while ((ln 
= listNext(&li
)) != NULL
) { 
 647         c 
= listNodeValue(ln
); 
 648         if (server
.maxidletime 
&& 
 649             !(c
->flags 
& REDIS_SLAVE
) &&    /* no timeout for slaves */ 
 650             !(c
->flags 
& REDIS_MASTER
) &&   /* no timeout for masters */ 
 651             !(c
->flags 
& REDIS_BLOCKED
) &&  /* no timeout for BLPOP */ 
 652             dictSize(c
->pubsub_channels
) == 0 && /* no timeout for pubsub */ 
 653             listLength(c
->pubsub_patterns
) == 0 && 
 654             (now 
- c
->lastinteraction 
> server
.maxidletime
)) 
 656             redisLog(REDIS_VERBOSE
,"Closing idle client"); 
 658         } else if (c
->flags 
& REDIS_BLOCKED
) { 
 659             if (c
->bpop
.timeout 
!= 0 && c
->bpop
.timeout 
< now
) { 
 660                 addReply(c
,shared
.nullmultibulk
); 
 661                 unblockClientWaitingData(c
); 
 667 int processInlineBuffer(redisClient 
*c
) { 
 668     char *newline 
= strstr(c
->querybuf
,"\r\n"); 
 673     /* Nothing to do without a \r\n */ 
 677     /* Split the input buffer up to the \r\n */ 
 678     querylen 
= newline
-(c
->querybuf
); 
 679     argv 
= sdssplitlen(c
->querybuf
,querylen
," ",1,&argc
); 
 681     /* Leave data after the first line of the query in the buffer */ 
 682     c
->querybuf 
= sdsrange(c
->querybuf
,querylen
+2,-1); 
 684     /* Setup argv array on client structure */ 
 685     if (c
->argv
) zfree(c
->argv
); 
 686     c
->argv 
= zmalloc(sizeof(robj
*)*argc
); 
 688     /* Create redis objects for all arguments. */ 
 689     for (c
->argc 
= 0, j 
= 0; j 
< argc
; j
++) { 
 690         if (sdslen(argv
[j
])) { 
 691             c
->argv
[c
->argc
] = createObject(REDIS_STRING
,argv
[j
]); 
 701 /* Helper function. Trims query buffer to make the function that processes 
 702  * multi bulk requests idempotent. */ 
 703 static void setProtocolError(redisClient 
*c
, int pos
) { 
 704     c
->flags 
|= REDIS_CLOSE_AFTER_REPLY
; 
 705     c
->querybuf 
= sdsrange(c
->querybuf
,pos
,-1); 
 708 int processMultibulkBuffer(redisClient 
*c
) { 
 709     char *newline 
= NULL
; 
 713     if (c
->multibulklen 
== 0) { 
 714         /* The client should have been reset */ 
 715         redisAssert(c
->argc 
== 0); 
 717         /* Multi bulk length cannot be read without a \r\n */ 
 718         newline 
= strchr(c
->querybuf
,'\r'); 
 722         /* Buffer should also contain \n */ 
 723         if (newline
-(c
->querybuf
) > ((signed)sdslen(c
->querybuf
)-2)) 
 726         /* We know for sure there is a whole line since newline != NULL, 
 727          * so go ahead and find out the multi bulk length. */ 
 728         redisAssert(c
->querybuf
[0] == '*'); 
 729         ok 
= string2ll(c
->querybuf
+1,newline
-(c
->querybuf
+1),&ll
); 
 730         if (!ok 
|| ll 
> 1024*1024) { 
 731             addReplyError(c
,"Protocol error: invalid multibulk length"); 
 732             setProtocolError(c
,pos
); 
 736         pos 
= (newline
-c
->querybuf
)+2; 
 738             c
->querybuf 
= sdsrange(c
->querybuf
,pos
,-1); 
 742         c
->multibulklen 
= ll
; 
 744         /* Setup argv array on client structure */ 
 745         if (c
->argv
) zfree(c
->argv
); 
 746         c
->argv 
= zmalloc(sizeof(robj
*)*c
->multibulklen
); 
 749     redisAssert(c
->multibulklen 
> 0); 
 750     while(c
->multibulklen
) { 
 751         /* Read bulk length if unknown */ 
 752         if (c
->bulklen 
== -1) { 
 753             newline 
= strchr(c
->querybuf
+pos
,'\r'); 
 757             /* Buffer should also contain \n */ 
 758             if (newline
-(c
->querybuf
) > ((signed)sdslen(c
->querybuf
)-2)) 
 761             if (c
->querybuf
[pos
] != '$') { 
 762                 addReplyErrorFormat(c
, 
 763                     "Protocol error: expected '$', got '%c'", 
 765                 setProtocolError(c
,pos
); 
 769             ok 
= string2ll(c
->querybuf
+pos
+1,newline
-(c
->querybuf
+pos
+1),&ll
); 
 770             if (!ok 
|| ll 
< 0 || ll 
> 512*1024*1024) { 
 771                 addReplyError(c
,"Protocol error: invalid bulk length"); 
 772                 setProtocolError(c
,pos
); 
 776             pos 
+= newline
-(c
->querybuf
+pos
)+2; 
 780         /* Read bulk argument */ 
 781         if (sdslen(c
->querybuf
)-pos 
< (unsigned)(c
->bulklen
+2)) { 
 782             /* Not enough data (+2 == trailing \r\n) */ 
 785             c
->argv
[c
->argc
++] = createStringObject(c
->querybuf
+pos
,c
->bulklen
); 
 793     c
->querybuf 
= sdsrange(c
->querybuf
,pos
,-1); 
 795     /* We're done when c->multibulk == 0 */ 
 796     if (c
->multibulklen 
== 0) { 
 802 void processInputBuffer(redisClient 
*c
) { 
 803     /* Keep processing while there is something in the input buffer */ 
 804     while(sdslen(c
->querybuf
)) { 
 805         /* Immediately abort if the client is in the middle of something. */ 
 806         if (c
->flags 
& REDIS_BLOCKED 
|| c
->flags 
& REDIS_IO_WAIT
) return; 
 808         /* REDIS_CLOSE_AFTER_REPLY closes the connection once the reply is 
 809          * written to the client. Make sure to not let the reply grow after 
 810          * this flag has been set (i.e. don't process more commands). */ 
 811         if (c
->flags 
& REDIS_CLOSE_AFTER_REPLY
) return; 
 813         /* Determine request type when unknown. */ 
 815             if (c
->querybuf
[0] == '*') { 
 816                 c
->reqtype 
= REDIS_REQ_MULTIBULK
; 
 818                 c
->reqtype 
= REDIS_REQ_INLINE
; 
 822         if (c
->reqtype 
== REDIS_REQ_INLINE
) { 
 823             if (processInlineBuffer(c
) != REDIS_OK
) break; 
 824         } else if (c
->reqtype 
== REDIS_REQ_MULTIBULK
) { 
 825             if (processMultibulkBuffer(c
) != REDIS_OK
) break; 
 827             redisPanic("Unknown request type"); 
 830         /* Multibulk processing could see a <= 0 length. */ 
 834             /* Only reset the client when the command was executed. */ 
 835             if (processCommand(c
) == REDIS_OK
) 
 841 void readQueryFromClient(aeEventLoop 
*el
, int fd
, void *privdata
, int mask
) { 
 842     redisClient 
*c 
= (redisClient
*) privdata
; 
 843     char buf
[REDIS_IOBUF_LEN
]; 
 848     nread 
= read(fd
, buf
, REDIS_IOBUF_LEN
); 
 850         if (errno 
== EAGAIN
) { 
 853             redisLog(REDIS_VERBOSE
, "Reading from client: %s",strerror(errno
)); 
 857     } else if (nread 
== 0) { 
 858         redisLog(REDIS_VERBOSE
, "Client closed connection"); 
 863         c
->querybuf 
= sdscatlen(c
->querybuf
,buf
,nread
); 
 864         c
->lastinteraction 
= time(NULL
); 
 868     processInputBuffer(c
); 
 871 void getClientsMaxBuffers(unsigned long *longest_output_list
, 
 872                           unsigned long *biggest_input_buffer
) { 
 876     unsigned long lol 
= 0, bib 
= 0; 
 878     listRewind(server
.clients
,&li
); 
 879     while ((ln 
= listNext(&li
)) != NULL
) { 
 880         c 
= listNodeValue(ln
); 
 882         if (listLength(c
->reply
) > lol
) lol 
= listLength(c
->reply
); 
 883         if (sdslen(c
->querybuf
) > bib
) bib 
= sdslen(c
->querybuf
); 
 885     *longest_output_list 
= lol
; 
 886     *biggest_input_buffer 
= bib
; 
 889 void clientCommand(redisClient 
*c
) { 
 894     if (!strcasecmp(c
->argv
[1]->ptr
,"list") && c
->argc 
== 2) { 
 896         time_t now 
= time(NULL
); 
 898         listRewind(server
.clients
,&li
); 
 899         while ((ln 
= listNext(&li
)) != NULL
) { 
 900             char ip
[32], flags
[16], *p
; 
 903             client 
= listNodeValue(ln
); 
 904             if (anetPeerToString(client
->fd
,ip
,&port
) == -1) continue; 
 906             if (client
->flags 
& REDIS_SLAVE
) { 
 907                 if (client
->flags 
& REDIS_MONITOR
) 
 912             if (client
->flags 
& REDIS_MASTER
) *p
++ = 'M'; 
 913             if (p 
== flags
) *p
++ = 'N'; 
 914             if (client
->flags 
& REDIS_MULTI
) *p
++ = 'x'; 
 915             if (client
->flags 
& REDIS_BLOCKED
) *p
++ = 'b'; 
 916             if (client
->flags 
& REDIS_IO_WAIT
) *p
++ = 'i'; 
 917             if (client
->flags 
& REDIS_DIRTY_CAS
) *p
++ = 'd'; 
 918             if (client
->flags 
& REDIS_CLOSE_AFTER_REPLY
) *p
++ = 'c'; 
 919             if (client
->flags 
& REDIS_UNBLOCKED
) *p
++ = 'u'; 
 922                 "addr=%s:%d fd=%d idle=%ld flags=%s db=%d sub=%d psub=%d\n", 
 924                 (long)(now 
- client
->lastinteraction
), 
 927                 (int) dictSize(client
->pubsub_channels
), 
 928                 (int) listLength(client
->pubsub_patterns
)); 
 930         addReplyBulkCBuffer(c
,o
,sdslen(o
)); 
 932     } else if (!strcasecmp(c
->argv
[1]->ptr
,"kill") && c
->argc 
== 3) { 
 933         listRewind(server
.clients
,&li
); 
 934         while ((ln 
= listNext(&li
)) != NULL
) { 
 935             char ip
[32], addr
[64]; 
 938             client 
= listNodeValue(ln
); 
 939             if (anetPeerToString(client
->fd
,ip
,&port
) == -1) continue; 
 940             snprintf(addr
,sizeof(addr
),"%s:%d",ip
,port
); 
 941             if (strcmp(addr
,c
->argv
[2]->ptr
) == 0) { 
 942                 addReply(c
,shared
.ok
); 
 944                     client
->flags 
|= REDIS_CLOSE_AFTER_REPLY
; 
 951         addReplyError(c
,"No such client"); 
 953         addReplyError(c
, "Syntax error, try CLIENT (LIST | KILL ip:port)");