4 static void setProtocolError(redisClient 
*c
, int pos
); 
   6 void *dupClientReplyValue(void *o
) { 
   7     incrRefCount((robj
*)o
); 
  11 int listMatchObjects(void *a
, void *b
) { 
  12     return equalStringObjects(a
,b
); 
  15 redisClient 
*createClient(int fd
) { 
  16     redisClient 
*c 
= zmalloc(sizeof(redisClient
)); 
  19     /* passing -1 as fd it is possible to create a non connected client. 
  20      * This is useful since all the Redis commands needs to be executed 
  21      * in the context of a client. When commands are executed in other 
  22      * contexts (for instance a Lua script) we need a non connected client. */ 
  24         anetNonBlock(NULL
,fd
); 
  25         anetTcpNoDelay(NULL
,fd
); 
  26         if (aeCreateFileEvent(server
.el
,fd
,AE_READABLE
, 
  27             readQueryFromClient
, c
) == AE_ERR
) 
  37     c
->querybuf 
= sdsempty(); 
  41     c
->cmd 
= c
->lastcmd 
= NULL
; 
  46     c
->lastinteraction 
= time(NULL
); 
  48     c
->replstate 
= REDIS_REPL_NONE
; 
  49     c
->reply 
= listCreate(); 
  50     listSetFreeMethod(c
->reply
,decrRefCount
); 
  51     listSetDupMethod(c
->reply
,dupClientReplyValue
); 
  55     c
->bpop
.target 
= NULL
; 
  56     c
->io_keys 
= listCreate(); 
  57     c
->watched_keys 
= listCreate(); 
  58     listSetFreeMethod(c
->io_keys
,decrRefCount
); 
  59     c
->pubsub_channels 
= dictCreate(&setDictType
,NULL
); 
  60     c
->pubsub_patterns 
= listCreate(); 
  61     listSetFreeMethod(c
->pubsub_patterns
,decrRefCount
); 
  62     listSetMatchMethod(c
->pubsub_patterns
,listMatchObjects
); 
  63     if (fd 
!= -1) listAddNodeTail(server
.clients
,c
); 
  64     initClientMultiState(c
); 
  68 /* Set the event loop to listen for write events on the client's socket. 
  69  * Typically gets called every time a reply is built. */ 
  70 int _installWriteEvent(redisClient 
*c
) { 
  71     if (c
->flags 
& REDIS_LUA_CLIENT
) return REDIS_OK
; 
  72     if (c
->fd 
<= 0) return REDIS_ERR
; 
  73     if (c
->bufpos 
== 0 && listLength(c
->reply
) == 0 && 
  74         (c
->replstate 
== REDIS_REPL_NONE 
|| 
  75          c
->replstate 
== REDIS_REPL_ONLINE
) && 
  76         aeCreateFileEvent(server
.el
, c
->fd
, AE_WRITABLE
, 
  77         sendReplyToClient
, c
) == AE_ERR
) return REDIS_ERR
; 
  81 /* Create a duplicate of the last object in the reply list when 
  82  * it is not exclusively owned by the reply list. */ 
  83 robj 
*dupLastObjectIfNeeded(list 
*reply
) { 
  86     redisAssert(listLength(reply
) > 0); 
  88     cur 
= listNodeValue(ln
); 
  89     if (cur
->refcount 
> 1) { 
  90         new = dupStringObject(cur
); 
  92         listNodeValue(ln
) = new; 
  94     return listNodeValue(ln
); 
  97 /* ----------------------------------------------------------------------------- 
  98  * Low level functions to add more data to output buffers. 
  99  * -------------------------------------------------------------------------- */ 
 101 int _addReplyToBuffer(redisClient 
*c
, char *s
, size_t len
) { 
 102     size_t available 
= sizeof(c
->buf
)-c
->bufpos
; 
 104     if (c
->flags 
& REDIS_CLOSE_AFTER_REPLY
) return REDIS_OK
; 
 106     /* If there already are entries in the reply list, we cannot 
 107      * add anything more to the static buffer. */ 
 108     if (listLength(c
->reply
) > 0) return REDIS_ERR
; 
 110     /* Check that the buffer has enough space available for this string. */ 
 111     if (len 
> available
) return REDIS_ERR
; 
 113     memcpy(c
->buf
+c
->bufpos
,s
,len
); 
 118 void _addReplyObjectToList(redisClient 
*c
, robj 
*o
) { 
 121     if (c
->flags 
& REDIS_CLOSE_AFTER_REPLY
) return; 
 123     if (listLength(c
->reply
) == 0) { 
 125         listAddNodeTail(c
->reply
,o
); 
 127         tail 
= listNodeValue(listLast(c
->reply
)); 
 129         /* Append to this object when possible. */ 
 130         if (tail
->ptr 
!= NULL 
&& 
 131             sdslen(tail
->ptr
)+sdslen(o
->ptr
) <= REDIS_REPLY_CHUNK_BYTES
) 
 133             tail 
= dupLastObjectIfNeeded(c
->reply
); 
 134             tail
->ptr 
= sdscatlen(tail
->ptr
,o
->ptr
,sdslen(o
->ptr
)); 
 137             listAddNodeTail(c
->reply
,o
); 
 142 /* This method takes responsibility over the sds. When it is no longer 
 143  * needed it will be free'd, otherwise it ends up in a robj. */ 
 144 void _addReplySdsToList(redisClient 
*c
, sds s
) { 
 147     if (c
->flags 
& REDIS_CLOSE_AFTER_REPLY
) { 
 152     if (listLength(c
->reply
) == 0) { 
 153         listAddNodeTail(c
->reply
,createObject(REDIS_STRING
,s
)); 
 155         tail 
= listNodeValue(listLast(c
->reply
)); 
 157         /* Append to this object when possible. */ 
 158         if (tail
->ptr 
!= NULL 
&& 
 159             sdslen(tail
->ptr
)+sdslen(s
) <= REDIS_REPLY_CHUNK_BYTES
) 
 161             tail 
= dupLastObjectIfNeeded(c
->reply
); 
 162             tail
->ptr 
= sdscatlen(tail
->ptr
,s
,sdslen(s
)); 
 165             listAddNodeTail(c
->reply
,createObject(REDIS_STRING
,s
)); 
 170 void _addReplyStringToList(redisClient 
*c
, char *s
, size_t len
) { 
 173     if (c
->flags 
& REDIS_CLOSE_AFTER_REPLY
) return; 
 175     if (listLength(c
->reply
) == 0) { 
 176         listAddNodeTail(c
->reply
,createStringObject(s
,len
)); 
 178         tail 
= listNodeValue(listLast(c
->reply
)); 
 180         /* Append to this object when possible. */ 
 181         if (tail
->ptr 
!= NULL 
&& 
 182             sdslen(tail
->ptr
)+len 
<= REDIS_REPLY_CHUNK_BYTES
) 
 184             tail 
= dupLastObjectIfNeeded(c
->reply
); 
 185             tail
->ptr 
= sdscatlen(tail
->ptr
,s
,len
); 
 187             listAddNodeTail(c
->reply
,createStringObject(s
,len
)); 
 192 /* ----------------------------------------------------------------------------- 
 193  * Higher level functions to queue data on the client output buffer. 
 194  * The following functions are the ones that commands implementations will call. 
 195  * -------------------------------------------------------------------------- */ 
 197 void addReply(redisClient 
*c
, robj 
*obj
) { 
 198     if (_installWriteEvent(c
) != REDIS_OK
) return; 
 200     /* This is an important place where we can avoid copy-on-write 
 201      * when there is a saving child running, avoiding touching the 
 202      * refcount field of the object if it's not needed. 
 204      * If the encoding is RAW and there is room in the static buffer 
 205      * we'll be able to send the object to the client without 
 206      * messing with its page. */ 
 207     if (obj
->encoding 
== REDIS_ENCODING_RAW
) { 
 208         if (_addReplyToBuffer(c
,obj
->ptr
,sdslen(obj
->ptr
)) != REDIS_OK
) 
 209             _addReplyObjectToList(c
,obj
); 
 211         /* FIXME: convert the long into string and use _addReplyToBuffer() 
 212          * instead of calling getDecodedObject. As this place in the 
 213          * code is too performance critical. */ 
 214         obj 
= getDecodedObject(obj
); 
 215         if (_addReplyToBuffer(c
,obj
->ptr
,sdslen(obj
->ptr
)) != REDIS_OK
) 
 216             _addReplyObjectToList(c
,obj
); 
 221 void addReplySds(redisClient 
*c
, sds s
) { 
 222     if (_installWriteEvent(c
) != REDIS_OK
) { 
 223         /* The caller expects the sds to be free'd. */ 
 227     if (_addReplyToBuffer(c
,s
,sdslen(s
)) == REDIS_OK
) { 
 230         /* This method free's the sds when it is no longer needed. */ 
 231         _addReplySdsToList(c
,s
); 
 235 void addReplyString(redisClient 
*c
, char *s
, size_t len
) { 
 236     if (_installWriteEvent(c
) != REDIS_OK
) return; 
 237     if (_addReplyToBuffer(c
,s
,len
) != REDIS_OK
) 
 238         _addReplyStringToList(c
,s
,len
); 
 241 void _addReplyError(redisClient 
*c
, char *s
, size_t len
) { 
 242     addReplyString(c
,"-ERR ",5); 
 243     addReplyString(c
,s
,len
); 
 244     addReplyString(c
,"\r\n",2); 
 247 void addReplyError(redisClient 
*c
, char *err
) { 
 248     _addReplyError(c
,err
,strlen(err
)); 
 251 void addReplyErrorFormat(redisClient 
*c
, const char *fmt
, ...) { 
 255     sds s 
= sdscatvprintf(sdsempty(),fmt
,ap
); 
 257     /* Make sure there are no newlines in the string, otherwise invalid protocol 
 260     for (j 
= 0; j 
< l
; j
++) { 
 261         if (s
[j
] == '\r' || s
[j
] == '\n') s
[j
] = ' '; 
 263     _addReplyError(c
,s
,sdslen(s
)); 
 267 void _addReplyStatus(redisClient 
*c
, char *s
, size_t len
) { 
 268     addReplyString(c
,"+",1); 
 269     addReplyString(c
,s
,len
); 
 270     addReplyString(c
,"\r\n",2); 
 273 void addReplyStatus(redisClient 
*c
, char *status
) { 
 274     _addReplyStatus(c
,status
,strlen(status
)); 
 277 void addReplyStatusFormat(redisClient 
*c
, const char *fmt
, ...) { 
 280     sds s 
= sdscatvprintf(sdsempty(),fmt
,ap
); 
 282     _addReplyStatus(c
,s
,sdslen(s
)); 
 286 /* Adds an empty object to the reply list that will contain the multi bulk 
 287  * length, which is not known when this function is called. */ 
 288 void *addDeferredMultiBulkLength(redisClient 
*c
) { 
 289     /* Note that we install the write event here even if the object is not 
 290      * ready to be sent, since we are sure that before returning to the 
 291      * event loop setDeferredMultiBulkLength() will be called. */ 
 292     if (_installWriteEvent(c
) != REDIS_OK
) return NULL
; 
 293     listAddNodeTail(c
->reply
,createObject(REDIS_STRING
,NULL
)); 
 294     return listLast(c
->reply
); 
 297 /* Populate the length object and try glueing it to the next chunk. */ 
 298 void setDeferredMultiBulkLength(redisClient 
*c
, void *node
, long length
) { 
 299     listNode 
*ln 
= (listNode
*)node
; 
 302     /* Abort when *node is NULL (see addDeferredMultiBulkLength). */ 
 303     if (node 
== NULL
) return; 
 305     len 
= listNodeValue(ln
); 
 306     len
->ptr 
= sdscatprintf(sdsempty(),"*%ld\r\n",length
); 
 307     if (ln
->next 
!= NULL
) { 
 308         next 
= listNodeValue(ln
->next
); 
 310         /* Only glue when the next node is non-NULL (an sds in this case) */ 
 311         if (next
->ptr 
!= NULL
) { 
 312             len
->ptr 
= sdscatlen(len
->ptr
,next
->ptr
,sdslen(next
->ptr
)); 
 313             listDelNode(c
->reply
,ln
->next
); 
 318 /* Add a duble as a bulk reply */ 
 319 void addReplyDouble(redisClient 
*c
, double d
) { 
 320     char dbuf
[128], sbuf
[128]; 
 322     dlen 
= snprintf(dbuf
,sizeof(dbuf
),"%.17g",d
); 
 323     slen 
= snprintf(sbuf
,sizeof(sbuf
),"$%d\r\n%s\r\n",dlen
,dbuf
); 
 324     addReplyString(c
,sbuf
,slen
); 
 327 /* Add a long long as integer reply or bulk len / multi bulk count. 
 328  * Basically this is used to output <prefix><long long><crlf>. */ 
 329 void _addReplyLongLong(redisClient 
*c
, long long ll
, char prefix
) { 
 333     len 
= ll2string(buf
+1,sizeof(buf
)-1,ll
); 
 336     addReplyString(c
,buf
,len
+3); 
 339 void addReplyLongLong(redisClient 
*c
, long long ll
) { 
 341         addReply(c
,shared
.czero
); 
 343         addReply(c
,shared
.cone
); 
 345         _addReplyLongLong(c
,ll
,':'); 
 348 void addReplyMultiBulkLen(redisClient 
*c
, long length
) { 
 349     _addReplyLongLong(c
,length
,'*'); 
 352 /* Create the length prefix of a bulk reply, example: $2234 */ 
 353 void addReplyBulkLen(redisClient 
*c
, robj 
*obj
) { 
 356     if (obj
->encoding 
== REDIS_ENCODING_RAW
) { 
 357         len 
= sdslen(obj
->ptr
); 
 359         long n 
= (long)obj
->ptr
; 
 361         /* Compute how many bytes will take this integer as a radix 10 string */ 
 367         while((n 
= n
/10) != 0) { 
 371     _addReplyLongLong(c
,len
,'$'); 
 374 /* Add a Redis Object as a bulk reply */ 
 375 void addReplyBulk(redisClient 
*c
, robj 
*obj
) { 
 376     addReplyBulkLen(c
,obj
); 
 378     addReply(c
,shared
.crlf
); 
 381 /* Add a C buffer as bulk reply */ 
 382 void addReplyBulkCBuffer(redisClient 
*c
, void *p
, size_t len
) { 
 383     _addReplyLongLong(c
,len
,'$'); 
 384     addReplyString(c
,p
,len
); 
 385     addReply(c
,shared
.crlf
); 
 388 /* Add a C nul term string as bulk reply */ 
 389 void addReplyBulkCString(redisClient 
*c
, char *s
) { 
 391         addReply(c
,shared
.nullbulk
); 
 393         addReplyBulkCBuffer(c
,s
,strlen(s
)); 
 397 /* Add a long long as a bulk reply */ 
 398 void addReplyBulkLongLong(redisClient 
*c
, long long ll
) { 
 402     len 
= ll2string(buf
,64,ll
); 
 403     addReplyBulkCBuffer(c
,buf
,len
); 
 406 /* Copy 'src' client output buffers into 'dst' client output buffers. 
 407  * The function takes care of freeing the old output buffers of the 
 408  * destination client. */ 
 409 void copyClientOutputBuffer(redisClient 
*dst
, redisClient 
*src
) { 
 410     listRelease(dst
->reply
); 
 411     dst
->reply 
= listDup(src
->reply
); 
 412     memcpy(dst
->buf
,src
->buf
,src
->bufpos
); 
 413     dst
->bufpos 
= src
->bufpos
; 
 416 static void acceptCommonHandler(int fd
) { 
 418     if ((c 
= createClient(fd
)) == NULL
) { 
 419         redisLog(REDIS_WARNING
,"Error allocating resoures for the client"); 
 420         close(fd
); /* May be already closed, just ingore errors */ 
 423     /* If maxclient directive is set and this is one client more... close the 
 424      * connection. Note that we create the client instead to check before 
 425      * for this condition, since now the socket is already set in nonblocking 
 426      * mode and we can send an error for free using the Kernel I/O */ 
 427     if (listLength(server
.clients
) > server
.maxclients
) { 
 428         char *err 
= "-ERR max number of clients reached\r\n"; 
 430         /* That's a best effort error message, don't check write errors */ 
 431         if (write(c
->fd
,err
,strlen(err
)) == -1) { 
 432             /* Nothing to do, Just to avoid the warning... */ 
 434         server
.stat_rejected_conn
++; 
 438     server
.stat_numconnections
++; 
 441 void acceptTcpHandler(aeEventLoop 
*el
, int fd
, void *privdata
, int mask
) { 
 446     REDIS_NOTUSED(privdata
); 
 448     cfd 
= anetTcpAccept(server
.neterr
, fd
, cip
, &cport
); 
 450         redisLog(REDIS_WARNING
,"Accepting client connection: %s", server
.neterr
); 
 453     redisLog(REDIS_VERBOSE
,"Accepted %s:%d", cip
, cport
); 
 454     acceptCommonHandler(cfd
); 
 457 void acceptUnixHandler(aeEventLoop 
*el
, int fd
, void *privdata
, int mask
) { 
 461     REDIS_NOTUSED(privdata
); 
 463     cfd 
= anetUnixAccept(server
.neterr
, fd
); 
 465         redisLog(REDIS_WARNING
,"Accepting client connection: %s", server
.neterr
); 
 468     redisLog(REDIS_VERBOSE
,"Accepted connection to %s", server
.unixsocket
); 
 469     acceptCommonHandler(cfd
); 
 473 static void freeClientArgv(redisClient 
*c
) { 
 475     for (j 
= 0; j 
< c
->argc
; j
++) 
 476         decrRefCount(c
->argv
[j
]); 
 481 void freeClient(redisClient 
*c
) { 
 484     /* Note that if the client we are freeing is blocked into a blocking 
 485      * call, we have to set querybuf to NULL *before* to call 
 486      * unblockClientWaitingData() to avoid processInputBuffer() will get 
 487      * called. Also it is important to remove the file events after 
 488      * this, because this call adds the READABLE event. */ 
 489     sdsfree(c
->querybuf
); 
 491     if (c
->flags 
& REDIS_BLOCKED
) 
 492         unblockClientWaitingData(c
); 
 494     /* UNWATCH all the keys */ 
 496     listRelease(c
->watched_keys
); 
 497     /* Unsubscribe from all the pubsub channels */ 
 498     pubsubUnsubscribeAllChannels(c
,0); 
 499     pubsubUnsubscribeAllPatterns(c
,0); 
 500     dictRelease(c
->pubsub_channels
); 
 501     listRelease(c
->pubsub_patterns
); 
 502     /* Obvious cleanup */ 
 503     aeDeleteFileEvent(server
.el
,c
->fd
,AE_READABLE
); 
 504     aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
); 
 505     listRelease(c
->reply
); 
 508     /* Remove from the list of clients */ 
 509     ln 
= listSearchKey(server
.clients
,c
); 
 510     redisAssert(ln 
!= NULL
); 
 511     listDelNode(server
.clients
,ln
); 
 512     /* When client was just unblocked because of a blocking operation, 
 513      * remove it from the list with unblocked clients. */ 
 514     if (c
->flags 
& REDIS_UNBLOCKED
) { 
 515         ln 
= listSearchKey(server
.unblocked_clients
,c
); 
 516         redisAssert(ln 
!= NULL
); 
 517         listDelNode(server
.unblocked_clients
,ln
); 
 519     listRelease(c
->io_keys
); 
 520     /* Master/slave cleanup. 
 521      * Case 1: we lost the connection with a slave. */ 
 522     if (c
->flags 
& REDIS_SLAVE
) { 
 523         if (c
->replstate 
== REDIS_REPL_SEND_BULK 
&& c
->repldbfd 
!= -1) 
 525         list 
*l 
= (c
->flags 
& REDIS_MONITOR
) ? server
.monitors 
: server
.slaves
; 
 526         ln 
= listSearchKey(l
,c
); 
 527         redisAssert(ln 
!= NULL
); 
 531     /* Case 2: we lost the connection with the master. */ 
 532     if (c
->flags 
& REDIS_MASTER
) { 
 533         server
.master 
= NULL
; 
 534         server
.repl_state 
= REDIS_REPL_CONNECT
; 
 535         server
.repl_down_since 
= time(NULL
); 
 536         /* Since we lost the connection with the master, we should also 
 537          * close the connection with all our slaves if we have any, so 
 538          * when we'll resync with the master the other slaves will sync again 
 539          * with us as well. Note that also when the slave is not connected 
 540          * to the master it will keep refusing connections by other slaves. 
 542          * We do this only if server.masterhost != NULL. If it is NULL this 
 543          * means the user called SLAVEOF NO ONE and we are freeing our 
 544          * link with the master, so no need to close link with slaves. */ 
 545         if (server
.masterhost 
!= NULL
) { 
 546             while (listLength(server
.slaves
)) { 
 547                 ln 
= listFirst(server
.slaves
); 
 548                 freeClient((redisClient
*)ln
->value
); 
 554     freeClientMultiState(c
); 
 558 void sendReplyToClient(aeEventLoop 
*el
, int fd
, void *privdata
, int mask
) { 
 559     redisClient 
*c 
= privdata
; 
 560     int nwritten 
= 0, totwritten 
= 0, objlen
; 
 565     while(c
->bufpos 
> 0 || listLength(c
->reply
)) { 
 567             if (c
->flags 
& REDIS_MASTER
) { 
 568                 /* Don't reply to a master */ 
 569                 nwritten 
= c
->bufpos 
- c
->sentlen
; 
 571                 nwritten 
= write(fd
,c
->buf
+c
->sentlen
,c
->bufpos
-c
->sentlen
); 
 572                 if (nwritten 
<= 0) break; 
 574             c
->sentlen 
+= nwritten
; 
 575             totwritten 
+= nwritten
; 
 577             /* If the buffer was sent, set bufpos to zero to continue with 
 578              * the remainder of the reply. */ 
 579             if (c
->sentlen 
== c
->bufpos
) { 
 584             o 
= listNodeValue(listFirst(c
->reply
)); 
 585             objlen 
= sdslen(o
->ptr
); 
 588                 listDelNode(c
->reply
,listFirst(c
->reply
)); 
 592             if (c
->flags 
& REDIS_MASTER
) { 
 593                 /* Don't reply to a master */ 
 594                 nwritten 
= objlen 
- c
->sentlen
; 
 596                 nwritten 
= write(fd
, ((char*)o
->ptr
)+c
->sentlen
,objlen
-c
->sentlen
); 
 597                 if (nwritten 
<= 0) break; 
 599             c
->sentlen 
+= nwritten
; 
 600             totwritten 
+= nwritten
; 
 602             /* If we fully sent the object on head go to the next one */ 
 603             if (c
->sentlen 
== objlen
) { 
 604                 listDelNode(c
->reply
,listFirst(c
->reply
)); 
 608         /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT 
 609          * bytes, in a single threaded server it's a good idea to serve 
 610          * other clients as well, even if a very large request comes from 
 611          * super fast link that is always able to accept data (in real world 
 612          * scenario think about 'KEYS *' against the loopback interfae) */ 
 613         if (totwritten 
> REDIS_MAX_WRITE_PER_EVENT
) break; 
 615     if (nwritten 
== -1) { 
 616         if (errno 
== EAGAIN
) { 
 619             redisLog(REDIS_VERBOSE
, 
 620                 "Error writing to client: %s", strerror(errno
)); 
 625     if (totwritten 
> 0) c
->lastinteraction 
= time(NULL
); 
 626     if (c
->bufpos 
== 0 && listLength(c
->reply
) == 0) { 
 628         aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
); 
 630         /* Close connection after entire reply has been sent. */ 
 631         if (c
->flags 
& REDIS_CLOSE_AFTER_REPLY
) freeClient(c
); 
 635 /* resetClient prepare the client to process the next command */ 
 636 void resetClient(redisClient 
*c
) { 
 641     /* We clear the ASKING flag as well if we are not inside a MULTI. */ 
 642     if (!(c
->flags 
& REDIS_MULTI
)) c
->flags 
&= (~REDIS_ASKING
); 
 645 void closeTimedoutClients(void) { 
 648     time_t now 
= time(NULL
); 
 651     listRewind(server
.clients
,&li
); 
 652     while ((ln 
= listNext(&li
)) != NULL
) { 
 653         c 
= listNodeValue(ln
); 
 654         if (server
.maxidletime 
&& 
 655             !(c
->flags 
& REDIS_SLAVE
) &&    /* no timeout for slaves */ 
 656             !(c
->flags 
& REDIS_MASTER
) &&   /* no timeout for masters */ 
 657             !(c
->flags 
& REDIS_BLOCKED
) &&  /* no timeout for BLPOP */ 
 658             dictSize(c
->pubsub_channels
) == 0 && /* no timeout for pubsub */ 
 659             listLength(c
->pubsub_patterns
) == 0 && 
 660             (now 
- c
->lastinteraction 
> server
.maxidletime
)) 
 662             redisLog(REDIS_VERBOSE
,"Closing idle client"); 
 664         } else if (c
->flags 
& REDIS_BLOCKED
) { 
 665             if (c
->bpop
.timeout 
!= 0 && c
->bpop
.timeout 
< now
) { 
 666                 addReply(c
,shared
.nullmultibulk
); 
 667                 unblockClientWaitingData(c
); 
 673 int processInlineBuffer(redisClient 
*c
) { 
 674     char *newline 
= strstr(c
->querybuf
,"\r\n"); 
 679     /* Nothing to do without a \r\n */ 
 680     if (newline 
== NULL
) { 
 681         if (sdslen(c
->querybuf
) > REDIS_INLINE_MAX_SIZE
) { 
 682             addReplyError(c
,"Protocol error: too big inline request"); 
 683             setProtocolError(c
,0); 
 688     /* Split the input buffer up to the \r\n */ 
 689     querylen 
= newline
-(c
->querybuf
); 
 690     argv 
= sdssplitlen(c
->querybuf
,querylen
," ",1,&argc
); 
 692     /* Leave data after the first line of the query in the buffer */ 
 693     c
->querybuf 
= sdsrange(c
->querybuf
,querylen
+2,-1); 
 695     /* Setup argv array on client structure */ 
 696     if (c
->argv
) zfree(c
->argv
); 
 697     c
->argv 
= zmalloc(sizeof(robj
*)*argc
); 
 699     /* Create redis objects for all arguments. */ 
 700     for (c
->argc 
= 0, j 
= 0; j 
< argc
; j
++) { 
 701         if (sdslen(argv
[j
])) { 
 702             c
->argv
[c
->argc
] = createObject(REDIS_STRING
,argv
[j
]); 
 712 /* Helper function. Trims query buffer to make the function that processes 
 713  * multi bulk requests idempotent. */ 
 714 static void setProtocolError(redisClient 
*c
, int pos
) { 
 715     if (server
.verbosity 
>= REDIS_VERBOSE
) { 
 716         sds client 
= getClientInfoString(c
); 
 717         redisLog(REDIS_VERBOSE
, 
 718             "Protocol error from client: %s", client
); 
 721     c
->flags 
|= REDIS_CLOSE_AFTER_REPLY
; 
 722     c
->querybuf 
= sdsrange(c
->querybuf
,pos
,-1); 
 725 int processMultibulkBuffer(redisClient 
*c
) { 
 726     char *newline 
= NULL
; 
 730     if (c
->multibulklen 
== 0) { 
 731         /* The client should have been reset */ 
 732         redisAssertWithInfo(c
,NULL
,c
->argc 
== 0); 
 734         /* Multi bulk length cannot be read without a \r\n */ 
 735         newline 
= strchr(c
->querybuf
,'\r'); 
 736         if (newline 
== NULL
) { 
 737             if (sdslen(c
->querybuf
) > REDIS_INLINE_MAX_SIZE
) { 
 738                 addReplyError(c
,"Protocol error: too big mbulk count string"); 
 739                 setProtocolError(c
,0); 
 744         /* Buffer should also contain \n */ 
 745         if (newline
-(c
->querybuf
) > ((signed)sdslen(c
->querybuf
)-2)) 
 748         /* We know for sure there is a whole line since newline != NULL, 
 749          * so go ahead and find out the multi bulk length. */ 
 750         redisAssertWithInfo(c
,NULL
,c
->querybuf
[0] == '*'); 
 751         ok 
= string2ll(c
->querybuf
+1,newline
-(c
->querybuf
+1),&ll
); 
 752         if (!ok 
|| ll 
> 1024*1024) { 
 753             addReplyError(c
,"Protocol error: invalid multibulk length"); 
 754             setProtocolError(c
,pos
); 
 758         pos 
= (newline
-c
->querybuf
)+2; 
 760             c
->querybuf 
= sdsrange(c
->querybuf
,pos
,-1); 
 764         c
->multibulklen 
= ll
; 
 766         /* Setup argv array on client structure */ 
 767         if (c
->argv
) zfree(c
->argv
); 
 768         c
->argv 
= zmalloc(sizeof(robj
*)*c
->multibulklen
); 
 771     redisAssertWithInfo(c
,NULL
,c
->multibulklen 
> 0); 
 772     while(c
->multibulklen
) { 
 773         /* Read bulk length if unknown */ 
 774         if (c
->bulklen 
== -1) { 
 775             newline 
= strchr(c
->querybuf
+pos
,'\r'); 
 776             if (newline 
== NULL
) { 
 777                 if (sdslen(c
->querybuf
) > REDIS_INLINE_MAX_SIZE
) { 
 778                     addReplyError(c
,"Protocol error: too big bulk count string"); 
 779                     setProtocolError(c
,0); 
 784             /* Buffer should also contain \n */ 
 785             if (newline
-(c
->querybuf
) > ((signed)sdslen(c
->querybuf
)-2)) 
 788             if (c
->querybuf
[pos
] != '$') { 
 789                 addReplyErrorFormat(c
, 
 790                     "Protocol error: expected '$', got '%c'", 
 792                 setProtocolError(c
,pos
); 
 796             ok 
= string2ll(c
->querybuf
+pos
+1,newline
-(c
->querybuf
+pos
+1),&ll
); 
 797             if (!ok 
|| ll 
< 0 || ll 
> 512*1024*1024) { 
 798                 addReplyError(c
,"Protocol error: invalid bulk length"); 
 799                 setProtocolError(c
,pos
); 
 803             pos 
+= newline
-(c
->querybuf
+pos
)+2; 
 804             if (ll 
>= REDIS_MBULK_BIG_ARG
) { 
 805                 /* If we are going to read a large object from network 
 806                  * try to make it likely that it will start at c->querybuf 
 807                  * boundary so that we can optimized object creation 
 808                  * avoiding a large copy of data. */ 
 809                 c
->querybuf 
= sdsrange(c
->querybuf
,pos
,-1); 
 811                 /* Hint the sds library about the amount of bytes this string is 
 812                  * going to contain. */ 
 813                 c
->querybuf 
= sdsMakeRoomFor(c
->querybuf
,ll
+2); 
 818         /* Read bulk argument */ 
 819         if (sdslen(c
->querybuf
)-pos 
< (unsigned)(c
->bulklen
+2)) { 
 820             /* Not enough data (+2 == trailing \r\n) */ 
 823             /* Optimization: if the buffer contanins JUST our bulk element 
 824              * instead of creating a new object by *copying* the sds we 
 825              * just use the current sds string. */ 
 827                 c
->bulklen 
>= REDIS_MBULK_BIG_ARG 
&& 
 828                 (signed) sdslen(c
->querybuf
) == c
->bulklen
+2) 
 830                 c
->argv
[c
->argc
++] = createObject(REDIS_STRING
,c
->querybuf
); 
 831                 sdsIncrLen(c
->querybuf
,-2); /* remove CRLF */ 
 832                 c
->querybuf 
= sdsempty(); 
 833                 /* Assume that if we saw a fat argument we'll see another one 
 835                 c
->querybuf 
= sdsMakeRoomFor(c
->querybuf
,c
->bulklen
+2); 
 839                     createStringObject(c
->querybuf
+pos
,c
->bulklen
); 
 848     if (pos
) c
->querybuf 
= sdsrange(c
->querybuf
,pos
,-1); 
 850     /* We're done when c->multibulk == 0 */ 
 851     if (c
->multibulklen 
== 0) return REDIS_OK
; 
 853     /* Still not read to process the command */ 
 857 void processInputBuffer(redisClient 
*c
) { 
 858     /* Keep processing while there is something in the input buffer */ 
 859     while(sdslen(c
->querybuf
)) { 
 860         /* Immediately abort if the client is in the middle of something. */ 
 861         if (c
->flags 
& REDIS_BLOCKED
) return; 
 863         /* REDIS_CLOSE_AFTER_REPLY closes the connection once the reply is 
 864          * written to the client. Make sure to not let the reply grow after 
 865          * this flag has been set (i.e. don't process more commands). */ 
 866         if (c
->flags 
& REDIS_CLOSE_AFTER_REPLY
) return; 
 868         /* Determine request type when unknown. */ 
 870             if (c
->querybuf
[0] == '*') { 
 871                 c
->reqtype 
= REDIS_REQ_MULTIBULK
; 
 873                 c
->reqtype 
= REDIS_REQ_INLINE
; 
 877         if (c
->reqtype 
== REDIS_REQ_INLINE
) { 
 878             if (processInlineBuffer(c
) != REDIS_OK
) break; 
 879         } else if (c
->reqtype 
== REDIS_REQ_MULTIBULK
) { 
 880             if (processMultibulkBuffer(c
) != REDIS_OK
) break; 
 882             redisPanic("Unknown request type"); 
 885         /* Multibulk processing could see a <= 0 length. */ 
 889             /* Only reset the client when the command was executed. */ 
 890             if (processCommand(c
) == REDIS_OK
) 
 896 void readQueryFromClient(aeEventLoop 
*el
, int fd
, void *privdata
, int mask
) { 
 897     redisClient 
*c 
= (redisClient
*) privdata
; 
 903     readlen 
= REDIS_IOBUF_LEN
; 
 904     /* If this is a multi bulk request, and we are processing a bulk reply 
 905      * that is large enough, try to maximize the probabilty that the query 
 906      * buffer contains excatly the SDS string representing the object, even 
 907      * at the risk of requring more read(2) calls. This way the function 
 908      * processMultiBulkBuffer() can avoid copying buffers to create the 
 909      * Redis Object representing the argument. */ 
 910     if (c
->reqtype 
== REDIS_REQ_MULTIBULK 
&& c
->multibulklen 
&& c
->bulklen 
!= -1 
 911         && c
->bulklen 
>= REDIS_MBULK_BIG_ARG
) 
 913         int remaining 
= (unsigned)(c
->bulklen
+2)-sdslen(c
->querybuf
); 
 915         if (remaining 
< readlen
) readlen 
= remaining
; 
 918     qblen 
= sdslen(c
->querybuf
); 
 919     c
->querybuf 
= sdsMakeRoomFor(c
->querybuf
, readlen
); 
 920     nread 
= read(fd
, c
->querybuf
+qblen
, readlen
); 
 922         if (errno 
== EAGAIN
) { 
 925             redisLog(REDIS_VERBOSE
, "Reading from client: %s",strerror(errno
)); 
 929     } else if (nread 
== 0) { 
 930         redisLog(REDIS_VERBOSE
, "Client closed connection"); 
 935         sdsIncrLen(c
->querybuf
,nread
); 
 936         c
->lastinteraction 
= time(NULL
); 
 940     if (sdslen(c
->querybuf
) > server
.client_max_querybuf_len
) { 
 941         sds ci 
= getClientInfoString(c
), bytes 
= sdsempty(); 
 943         bytes 
= sdscatrepr(bytes
,c
->querybuf
,64); 
 944         redisLog(REDIS_WARNING
,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci
, bytes
); 
 950     processInputBuffer(c
); 
 953 void getClientsMaxBuffers(unsigned long *longest_output_list
, 
 954                           unsigned long *biggest_input_buffer
) { 
 958     unsigned long lol 
= 0, bib 
= 0; 
 960     listRewind(server
.clients
,&li
); 
 961     while ((ln 
= listNext(&li
)) != NULL
) { 
 962         c 
= listNodeValue(ln
); 
 964         if (listLength(c
->reply
) > lol
) lol 
= listLength(c
->reply
); 
 965         if (sdslen(c
->querybuf
) > bib
) bib 
= sdslen(c
->querybuf
); 
 967     *longest_output_list 
= lol
; 
 968     *biggest_input_buffer 
= bib
; 
 971 /* Turn a Redis client into an sds string representing its state. */ 
 972 sds 
getClientInfoString(redisClient 
*client
) { 
 973     char ip
[32], flags
[16], events
[3], *p
; 
 975     time_t now 
= time(NULL
); 
 978     if (anetPeerToString(client
->fd
,ip
,&port
) == -1) { 
 984     if (client
->flags 
& REDIS_SLAVE
) { 
 985         if (client
->flags 
& REDIS_MONITOR
) 
 990     if (client
->flags 
& REDIS_MASTER
) *p
++ = 'M'; 
 991     if (client
->flags 
& REDIS_MULTI
) *p
++ = 'x'; 
 992     if (client
->flags 
& REDIS_BLOCKED
) *p
++ = 'b'; 
 993     if (client
->flags 
& REDIS_DIRTY_CAS
) *p
++ = 'd'; 
 994     if (client
->flags 
& REDIS_CLOSE_AFTER_REPLY
) *p
++ = 'c'; 
 995     if (client
->flags 
& REDIS_UNBLOCKED
) *p
++ = 'u'; 
 996     if (p 
== flags
) *p
++ = 'N'; 
 999     emask 
= client
->fd 
== -1 ? 0 : aeGetFileEvents(server
.el
,client
->fd
); 
1001     if (emask 
& AE_READABLE
) *p
++ = 'r'; 
1002     if (emask 
& AE_WRITABLE
) *p
++ = 'w'; 
1004     return sdscatprintf(sdsempty(), 
1005         "addr=%s:%d fd=%d idle=%ld flags=%s db=%d sub=%d psub=%d qbuf=%lu obl=%lu oll=%lu events=%s cmd=%s", 
1007         (long)(now 
- client
->lastinteraction
), 
1010         (int) dictSize(client
->pubsub_channels
), 
1011         (int) listLength(client
->pubsub_patterns
), 
1012         (unsigned long) sdslen(client
->querybuf
), 
1013         (unsigned long) client
->bufpos
, 
1014         (unsigned long) listLength(client
->reply
), 
1016         client
->lastcmd 
? client
->lastcmd
->name 
: "NULL"); 
1019 sds 
getAllClientsInfoString(void) { 
1022     redisClient 
*client
; 
1025     listRewind(server
.clients
,&li
); 
1026     while ((ln 
= listNext(&li
)) != NULL
) { 
1029         client 
= listNodeValue(ln
); 
1030         cs 
= getClientInfoString(client
); 
1031         o 
= sdscatsds(o
,cs
); 
1033         o 
= sdscatlen(o
,"\n",1); 
1038 void clientCommand(redisClient 
*c
) { 
1041     redisClient 
*client
; 
1043     if (!strcasecmp(c
->argv
[1]->ptr
,"list") && c
->argc 
== 2) { 
1044         sds o 
= getAllClientsInfoString(); 
1045         addReplyBulkCBuffer(c
,o
,sdslen(o
)); 
1047     } else if (!strcasecmp(c
->argv
[1]->ptr
,"kill") && c
->argc 
== 3) { 
1048         listRewind(server
.clients
,&li
); 
1049         while ((ln 
= listNext(&li
)) != NULL
) { 
1050             char ip
[32], addr
[64]; 
1053             client 
= listNodeValue(ln
); 
1054             if (anetPeerToString(client
->fd
,ip
,&port
) == -1) continue; 
1055             snprintf(addr
,sizeof(addr
),"%s:%d",ip
,port
); 
1056             if (strcmp(addr
,c
->argv
[2]->ptr
) == 0) { 
1057                 addReply(c
,shared
.ok
); 
1059                     client
->flags 
|= REDIS_CLOSE_AFTER_REPLY
; 
1066         addReplyError(c
,"No such client"); 
1068         addReplyError(c
, "Syntax error, try CLIENT (LIST | KILL ip:port)"); 
1072 /* Rewrite the command vector of the client. All the new objects ref count 
1073  * is incremented. The old command vector is freed, and the old objects 
1074  * ref count is decremented. */ 
1075 void rewriteClientCommandVector(redisClient 
*c
, int argc
, ...) { 
1078     robj 
**argv
; /* The new argument vector */ 
1080     argv 
= zmalloc(sizeof(robj
*)*argc
); 
1082     for (j 
= 0; j 
< argc
; j
++) { 
1085         a 
= va_arg(ap
, robj
*); 
1089     /* We free the objects in the original vector at the end, so we are 
1090      * sure that if the same objects are reused in the new vector the 
1091      * refcount gets incremented before it gets decremented. */ 
1092     for (j 
= 0; j 
< c
->argc
; j
++) decrRefCount(c
->argv
[j
]); 
1094     /* Replace argv and argc with our new versions. */ 
1097     c
->cmd 
= lookupCommand(c
->argv
[0]->ptr
); 
1098     redisAssertWithInfo(c
,NULL
,c
->cmd 
!= NULL
); 
1102 /* Rewrite a single item in the command vector. 
1103  * The new val ref count is incremented, and the old decremented. */ 
1104 void rewriteClientCommandArgument(redisClient 
*c
, int i
, robj 
*newval
) { 
1107     redisAssertWithInfo(c
,NULL
,i 
< c
->argc
); 
1108     oldval 
= c
->argv
[i
]; 
1109     c
->argv
[i
] = newval
; 
1110     incrRefCount(newval
); 
1111     decrRefCount(oldval
); 
1113     /* If this is the command name make sure to fix c->cmd. */ 
1115         c
->cmd 
= lookupCommand(c
->argv
[0]->ptr
); 
1116         redisAssertWithInfo(c
,NULL
,c
->cmd 
!= NULL
);