4 static void setProtocolError(redisClient 
*c
, int pos
); 
   6 /* To evaluate the output buffer size of a client we need to get size of 
   7  * allocated objects, however we can't used zmalloc_size() directly on sds 
   8  * strings because of the trick they use to work (the header is before the 
   9  * returned pointer), so we use this helper function. */ 
  10 size_t zmalloc_size_sds(sds s
) { 
  11     return zmalloc_size(s
-sizeof(struct sdshdr
)); 
  14 void *dupClientReplyValue(void *o
) { 
  15     incrRefCount((robj
*)o
); 
  19 int listMatchObjects(void *a
, void *b
) { 
  20     return equalStringObjects(a
,b
); 
  23 redisClient 
*createClient(int fd
) { 
  24     redisClient 
*c 
= zmalloc(sizeof(redisClient
)); 
  26     /* passing -1 as fd it is possible to create a non connected client. 
  27      * This is useful since all the Redis commands needs to be executed 
  28      * in the context of a client. When commands are executed in other 
  29      * contexts (for instance a Lua script) we need a non connected client. */ 
  31         anetNonBlock(NULL
,fd
); 
  32         anetTcpNoDelay(NULL
,fd
); 
  33         if (aeCreateFileEvent(server
.el
,fd
,AE_READABLE
, 
  34             readQueryFromClient
, c
) == AE_ERR
) 
  45     c
->querybuf 
= sdsempty(); 
  50     c
->cmd 
= c
->lastcmd 
= NULL
; 
  55     c
->ctime 
= c
->lastinteraction 
= server
.unixtime
; 
  57     c
->replstate 
= REDIS_REPL_NONE
; 
  58     c
->reply 
= listCreate(); 
  60     c
->obuf_soft_limit_reached_time 
= 0; 
  61     listSetFreeMethod(c
->reply
,decrRefCount
); 
  62     listSetDupMethod(c
->reply
,dupClientReplyValue
); 
  66     c
->bpop
.target 
= NULL
; 
  67     c
->io_keys 
= listCreate(); 
  68     c
->watched_keys 
= listCreate(); 
  69     listSetFreeMethod(c
->io_keys
,decrRefCount
); 
  70     c
->pubsub_channels 
= dictCreate(&setDictType
,NULL
); 
  71     c
->pubsub_patterns 
= listCreate(); 
  72     listSetFreeMethod(c
->pubsub_patterns
,decrRefCount
); 
  73     listSetMatchMethod(c
->pubsub_patterns
,listMatchObjects
); 
  74     if (fd 
!= -1) listAddNodeTail(server
.clients
,c
); 
  75     initClientMultiState(c
); 
  79 /* This function is called every time we are going to transmit new data 
  80  * to the client. The behavior is the following: 
  82  * If the client should receive new data (normal clients will) the function 
  83  * returns REDIS_OK, and make sure to install the write handler in our event 
  84  * loop so that when the socket is writable new data gets written. 
  86  * If the client should not receive new data, because it is a fake client 
  87  * or a slave, or because the setup of the write handler failed, the function 
  90  * Typically gets called every time a reply is built, before adding more 
  91  * data to the clients output buffers. If the function returns REDIS_ERR no 
  92  * data should be appended to the output buffers. */ 
  93 int prepareClientToWrite(redisClient 
*c
) { 
  94     if (c
->flags 
& REDIS_LUA_CLIENT
) return REDIS_OK
; 
  95     if (c
->fd 
<= 0) return REDIS_ERR
; /* Fake client */ 
  96     if (c
->bufpos 
== 0 && listLength(c
->reply
) == 0 && 
  97         (c
->replstate 
== REDIS_REPL_NONE 
|| 
  98          c
->replstate 
== REDIS_REPL_ONLINE
) && 
  99         aeCreateFileEvent(server
.el
, c
->fd
, AE_WRITABLE
, 
 100         sendReplyToClient
, c
) == AE_ERR
) return REDIS_ERR
; 
 104 /* Create a duplicate of the last object in the reply list when 
 105  * it is not exclusively owned by the reply list. */ 
 106 robj 
*dupLastObjectIfNeeded(list 
*reply
) { 
 109     redisAssert(listLength(reply
) > 0); 
 110     ln 
= listLast(reply
); 
 111     cur 
= listNodeValue(ln
); 
 112     if (cur
->refcount 
> 1) { 
 113         new = dupStringObject(cur
); 
 115         listNodeValue(ln
) = new; 
 117     return listNodeValue(ln
); 
 120 /* ----------------------------------------------------------------------------- 
 121  * Low level functions to add more data to output buffers. 
 122  * -------------------------------------------------------------------------- */ 
 124 int _addReplyToBuffer(redisClient 
*c
, char *s
, size_t len
) { 
 125     size_t available 
= sizeof(c
->buf
)-c
->bufpos
; 
 127     if (c
->flags 
& REDIS_CLOSE_AFTER_REPLY
) return REDIS_OK
; 
 129     /* If there already are entries in the reply list, we cannot 
 130      * add anything more to the static buffer. */ 
 131     if (listLength(c
->reply
) > 0) return REDIS_ERR
; 
 133     /* Check that the buffer has enough space available for this string. */ 
 134     if (len 
> available
) return REDIS_ERR
; 
 136     memcpy(c
->buf
+c
->bufpos
,s
,len
); 
 141 void _addReplyObjectToList(redisClient 
*c
, robj 
*o
) { 
 144     if (c
->flags 
& REDIS_CLOSE_AFTER_REPLY
) return; 
 146     if (listLength(c
->reply
) == 0) { 
 148         listAddNodeTail(c
->reply
,o
); 
 149         c
->reply_bytes 
+= zmalloc_size_sds(o
->ptr
); 
 151         tail 
= listNodeValue(listLast(c
->reply
)); 
 153         /* Append to this object when possible. */ 
 154         if (tail
->ptr 
!= NULL 
&& 
 155             sdslen(tail
->ptr
)+sdslen(o
->ptr
) <= REDIS_REPLY_CHUNK_BYTES
) 
 157             c
->reply_bytes 
-= zmalloc_size_sds(tail
->ptr
); 
 158             tail 
= dupLastObjectIfNeeded(c
->reply
); 
 159             tail
->ptr 
= sdscatlen(tail
->ptr
,o
->ptr
,sdslen(o
->ptr
)); 
 160             c
->reply_bytes 
+= zmalloc_size_sds(tail
->ptr
); 
 163             listAddNodeTail(c
->reply
,o
); 
 164             c
->reply_bytes 
+= zmalloc_size_sds(o
->ptr
); 
 167     asyncCloseClientOnOutputBufferLimitReached(c
); 
 170 /* This method takes responsibility over the sds. When it is no longer 
 171  * needed it will be free'd, otherwise it ends up in a robj. */ 
 172 void _addReplySdsToList(redisClient 
*c
, sds s
) { 
 175     if (c
->flags 
& REDIS_CLOSE_AFTER_REPLY
) { 
 180     if (listLength(c
->reply
) == 0) { 
 181         listAddNodeTail(c
->reply
,createObject(REDIS_STRING
,s
)); 
 182         c
->reply_bytes 
+= zmalloc_size_sds(s
); 
 184         tail 
= listNodeValue(listLast(c
->reply
)); 
 186         /* Append to this object when possible. */ 
 187         if (tail
->ptr 
!= NULL 
&& 
 188             sdslen(tail
->ptr
)+sdslen(s
) <= REDIS_REPLY_CHUNK_BYTES
) 
 190             c
->reply_bytes 
-= zmalloc_size_sds(tail
->ptr
); 
 191             tail 
= dupLastObjectIfNeeded(c
->reply
); 
 192             tail
->ptr 
= sdscatlen(tail
->ptr
,s
,sdslen(s
)); 
 193             c
->reply_bytes 
+= zmalloc_size_sds(tail
->ptr
); 
 196             listAddNodeTail(c
->reply
,createObject(REDIS_STRING
,s
)); 
 197             c
->reply_bytes 
+= zmalloc_size_sds(s
); 
 200     asyncCloseClientOnOutputBufferLimitReached(c
); 
 203 void _addReplyStringToList(redisClient 
*c
, char *s
, size_t len
) { 
 206     if (c
->flags 
& REDIS_CLOSE_AFTER_REPLY
) return; 
 208     if (listLength(c
->reply
) == 0) { 
 209         robj 
*o 
= createStringObject(s
,len
); 
 211         listAddNodeTail(c
->reply
,o
); 
 212         c
->reply_bytes 
+= zmalloc_size_sds(o
->ptr
); 
 214         tail 
= listNodeValue(listLast(c
->reply
)); 
 216         /* Append to this object when possible. */ 
 217         if (tail
->ptr 
!= NULL 
&& 
 218             sdslen(tail
->ptr
)+len 
<= REDIS_REPLY_CHUNK_BYTES
) 
 220             c
->reply_bytes 
-= zmalloc_size_sds(tail
->ptr
); 
 221             tail 
= dupLastObjectIfNeeded(c
->reply
); 
 222             tail
->ptr 
= sdscatlen(tail
->ptr
,s
,len
); 
 223             c
->reply_bytes 
+= zmalloc_size_sds(tail
->ptr
); 
 225             robj 
*o 
= createStringObject(s
,len
); 
 227             listAddNodeTail(c
->reply
,o
); 
 228             c
->reply_bytes 
+= zmalloc_size_sds(o
->ptr
); 
 231     asyncCloseClientOnOutputBufferLimitReached(c
); 
 234 /* ----------------------------------------------------------------------------- 
 235  * Higher level functions to queue data on the client output buffer. 
 236  * The following functions are the ones that commands implementations will call. 
 237  * -------------------------------------------------------------------------- */ 
 239 void addReply(redisClient 
*c
, robj 
*obj
) { 
 240     if (prepareClientToWrite(c
) != REDIS_OK
) return; 
 242     /* This is an important place where we can avoid copy-on-write 
 243      * when there is a saving child running, avoiding touching the 
 244      * refcount field of the object if it's not needed. 
 246      * If the encoding is RAW and there is room in the static buffer 
 247      * we'll be able to send the object to the client without 
 248      * messing with its page. */ 
 249     if (obj
->encoding 
== REDIS_ENCODING_RAW
) { 
 250         if (_addReplyToBuffer(c
,obj
->ptr
,sdslen(obj
->ptr
)) != REDIS_OK
) 
 251             _addReplyObjectToList(c
,obj
); 
 252     } else if (obj
->encoding 
== REDIS_ENCODING_INT
) { 
 253         /* Optimization: if there is room in the static buffer for 32 bytes 
 254          * (more than the max chars a 64 bit integer can take as string) we 
 255          * avoid decoding the object and go for the lower level approach. */ 
 256         if (listLength(c
->reply
) == 0 && (sizeof(c
->buf
) - c
->bufpos
) >= 32) { 
 260             len 
= ll2string(buf
,sizeof(buf
),(long)obj
->ptr
); 
 261             if (_addReplyToBuffer(c
,buf
,len
) == REDIS_OK
) 
 263             /* else... continue with the normal code path, but should never 
 264              * happen actually since we verified there is room. */ 
 266         obj 
= getDecodedObject(obj
); 
 267         if (_addReplyToBuffer(c
,obj
->ptr
,sdslen(obj
->ptr
)) != REDIS_OK
) 
 268             _addReplyObjectToList(c
,obj
); 
 271         redisPanic("Wrong obj->encoding in addReply()"); 
 275 void addReplySds(redisClient 
*c
, sds s
) { 
 276     if (prepareClientToWrite(c
) != REDIS_OK
) { 
 277         /* The caller expects the sds to be free'd. */ 
 281     if (_addReplyToBuffer(c
,s
,sdslen(s
)) == REDIS_OK
) { 
 284         /* This method free's the sds when it is no longer needed. */ 
 285         _addReplySdsToList(c
,s
); 
 289 void addReplyString(redisClient 
*c
, char *s
, size_t len
) { 
 290     if (prepareClientToWrite(c
) != REDIS_OK
) return; 
 291     if (_addReplyToBuffer(c
,s
,len
) != REDIS_OK
) 
 292         _addReplyStringToList(c
,s
,len
); 
 295 void addReplyErrorLength(redisClient 
*c
, char *s
, size_t len
) { 
 296     addReplyString(c
,"-ERR ",5); 
 297     addReplyString(c
,s
,len
); 
 298     addReplyString(c
,"\r\n",2); 
 301 void addReplyError(redisClient 
*c
, char *err
) { 
 302     addReplyErrorLength(c
,err
,strlen(err
)); 
 305 void addReplyErrorFormat(redisClient 
*c
, const char *fmt
, ...) { 
 309     sds s 
= sdscatvprintf(sdsempty(),fmt
,ap
); 
 311     /* Make sure there are no newlines in the string, otherwise invalid protocol 
 314     for (j 
= 0; j 
< l
; j
++) { 
 315         if (s
[j
] == '\r' || s
[j
] == '\n') s
[j
] = ' '; 
 317     addReplyErrorLength(c
,s
,sdslen(s
)); 
 321 void addReplyStatusLength(redisClient 
*c
, char *s
, size_t len
) { 
 322     addReplyString(c
,"+",1); 
 323     addReplyString(c
,s
,len
); 
 324     addReplyString(c
,"\r\n",2); 
 327 void addReplyStatus(redisClient 
*c
, char *status
) { 
 328     addReplyStatusLength(c
,status
,strlen(status
)); 
 331 void addReplyStatusFormat(redisClient 
*c
, const char *fmt
, ...) { 
 334     sds s 
= sdscatvprintf(sdsempty(),fmt
,ap
); 
 336     addReplyStatusLength(c
,s
,sdslen(s
)); 
 340 /* Adds an empty object to the reply list that will contain the multi bulk 
 341  * length, which is not known when this function is called. */ 
 342 void *addDeferredMultiBulkLength(redisClient 
*c
) { 
 343     /* Note that we install the write event here even if the object is not 
 344      * ready to be sent, since we are sure that before returning to the 
 345      * event loop setDeferredMultiBulkLength() will be called. */ 
 346     if (prepareClientToWrite(c
) != REDIS_OK
) return NULL
; 
 347     listAddNodeTail(c
->reply
,createObject(REDIS_STRING
,NULL
)); 
 348     return listLast(c
->reply
); 
 351 /* Populate the length object and try glueing it to the next chunk. */ 
 352 void setDeferredMultiBulkLength(redisClient 
*c
, void *node
, long length
) { 
 353     listNode 
*ln 
= (listNode
*)node
; 
 356     /* Abort when *node is NULL (see addDeferredMultiBulkLength). */ 
 357     if (node 
== NULL
) return; 
 359     len 
= listNodeValue(ln
); 
 360     len
->ptr 
= sdscatprintf(sdsempty(),"*%ld\r\n",length
); 
 361     c
->reply_bytes 
+= zmalloc_size_sds(len
->ptr
); 
 362     if (ln
->next 
!= NULL
) { 
 363         next 
= listNodeValue(ln
->next
); 
 365         /* Only glue when the next node is non-NULL (an sds in this case) */ 
 366         if (next
->ptr 
!= NULL
) { 
 367             len
->ptr 
= sdscatlen(len
->ptr
,next
->ptr
,sdslen(next
->ptr
)); 
 368             listDelNode(c
->reply
,ln
->next
); 
 371     asyncCloseClientOnOutputBufferLimitReached(c
); 
 374 /* Add a duble as a bulk reply */ 
 375 void addReplyDouble(redisClient 
*c
, double d
) { 
 376     char dbuf
[128], sbuf
[128]; 
 378     dlen 
= snprintf(dbuf
,sizeof(dbuf
),"%.17g",d
); 
 379     slen 
= snprintf(sbuf
,sizeof(sbuf
),"$%d\r\n%s\r\n",dlen
,dbuf
); 
 380     addReplyString(c
,sbuf
,slen
); 
 383 /* Add a long long as integer reply or bulk len / multi bulk count. 
 384  * Basically this is used to output <prefix><long long><crlf>. */ 
 385 void addReplyLongLongWithPrefix(redisClient 
*c
, long long ll
, char prefix
) { 
 389     /* Things like $3\r\n or *2\r\n are emitted very often by the protocol 
 390      * so we have a few shared objects to use if the integer is small 
 391      * like it is most of the times. */ 
 392     if (prefix 
== '*' && ll 
< REDIS_SHARED_BULKHDR_LEN
) { 
 393         addReply(c
,shared
.mbulkhdr
[ll
]); 
 395     } else if (prefix 
== '$' && ll 
< REDIS_SHARED_BULKHDR_LEN
) { 
 396         addReply(c
,shared
.bulkhdr
[ll
]); 
 401     len 
= ll2string(buf
+1,sizeof(buf
)-1,ll
); 
 404     addReplyString(c
,buf
,len
+3); 
 407 void addReplyLongLong(redisClient 
*c
, long long ll
) { 
 409         addReply(c
,shared
.czero
); 
 411         addReply(c
,shared
.cone
); 
 413         addReplyLongLongWithPrefix(c
,ll
,':'); 
 416 void addReplyMultiBulkLen(redisClient 
*c
, long length
) { 
 417     addReplyLongLongWithPrefix(c
,length
,'*'); 
 420 /* Create the length prefix of a bulk reply, example: $2234 */ 
 421 void addReplyBulkLen(redisClient 
*c
, robj 
*obj
) { 
 424     if (obj
->encoding 
== REDIS_ENCODING_RAW
) { 
 425         len 
= sdslen(obj
->ptr
); 
 427         long n 
= (long)obj
->ptr
; 
 429         /* Compute how many bytes will take this integer as a radix 10 string */ 
 435         while((n 
= n
/10) != 0) { 
 439     addReplyLongLongWithPrefix(c
,len
,'$'); 
 442 /* Add a Redis Object as a bulk reply */ 
 443 void addReplyBulk(redisClient 
*c
, robj 
*obj
) { 
 444     addReplyBulkLen(c
,obj
); 
 446     addReply(c
,shared
.crlf
); 
 449 /* Add a C buffer as bulk reply */ 
 450 void addReplyBulkCBuffer(redisClient 
*c
, void *p
, size_t len
) { 
 451     addReplyLongLongWithPrefix(c
,len
,'$'); 
 452     addReplyString(c
,p
,len
); 
 453     addReply(c
,shared
.crlf
); 
 456 /* Add a C nul term string as bulk reply */ 
 457 void addReplyBulkCString(redisClient 
*c
, char *s
) { 
 459         addReply(c
,shared
.nullbulk
); 
 461         addReplyBulkCBuffer(c
,s
,strlen(s
)); 
 465 /* Add a long long as a bulk reply */ 
 466 void addReplyBulkLongLong(redisClient 
*c
, long long ll
) { 
 470     len 
= ll2string(buf
,64,ll
); 
 471     addReplyBulkCBuffer(c
,buf
,len
); 
 474 /* Copy 'src' client output buffers into 'dst' client output buffers. 
 475  * The function takes care of freeing the old output buffers of the 
 476  * destination client. */ 
 477 void copyClientOutputBuffer(redisClient 
*dst
, redisClient 
*src
) { 
 478     listRelease(dst
->reply
); 
 479     dst
->reply 
= listDup(src
->reply
); 
 480     memcpy(dst
->buf
,src
->buf
,src
->bufpos
); 
 481     dst
->bufpos 
= src
->bufpos
; 
 482     dst
->reply_bytes 
= src
->reply_bytes
; 
 485 static void acceptCommonHandler(int fd
) { 
 487     if ((c 
= createClient(fd
)) == NULL
) { 
 488         redisLog(REDIS_WARNING
,"Error allocating resoures for the client"); 
 489         close(fd
); /* May be already closed, just ingore errors */ 
 492     /* If maxclient directive is set and this is one client more... close the 
 493      * connection. Note that we create the client instead to check before 
 494      * for this condition, since now the socket is already set in nonblocking 
 495      * mode and we can send an error for free using the Kernel I/O */ 
 496     if (listLength(server
.clients
) > server
.maxclients
) { 
 497         char *err 
= "-ERR max number of clients reached\r\n"; 
 499         /* That's a best effort error message, don't check write errors */ 
 500         if (write(c
->fd
,err
,strlen(err
)) == -1) { 
 501             /* Nothing to do, Just to avoid the warning... */ 
 503         server
.stat_rejected_conn
++; 
 507     server
.stat_numconnections
++; 
 510 void acceptTcpHandler(aeEventLoop 
*el
, int fd
, void *privdata
, int mask
) { 
 515     REDIS_NOTUSED(privdata
); 
 517     cfd 
= anetTcpAccept(server
.neterr
, fd
, cip
, &cport
); 
 519         redisLog(REDIS_WARNING
,"Accepting client connection: %s", server
.neterr
); 
 522     redisLog(REDIS_VERBOSE
,"Accepted %s:%d", cip
, cport
); 
 523     acceptCommonHandler(cfd
); 
 526 void acceptUnixHandler(aeEventLoop 
*el
, int fd
, void *privdata
, int mask
) { 
 530     REDIS_NOTUSED(privdata
); 
 532     cfd 
= anetUnixAccept(server
.neterr
, fd
); 
 534         redisLog(REDIS_WARNING
,"Accepting client connection: %s", server
.neterr
); 
 537     redisLog(REDIS_VERBOSE
,"Accepted connection to %s", server
.unixsocket
); 
 538     acceptCommonHandler(cfd
); 
 542 static void freeClientArgv(redisClient 
*c
) { 
 544     for (j 
= 0; j 
< c
->argc
; j
++) 
 545         decrRefCount(c
->argv
[j
]); 
 550 /* Close all the slaves connections. This is useful in chained replication 
 551  * when we resync with our own master and want to force all our slaves to 
 552  * resync with us as well. */ 
 553 void disconnectSlaves(void) { 
 554     while (listLength(server
.slaves
)) { 
 555         listNode 
*ln 
= listFirst(server
.slaves
); 
 556         freeClient((redisClient
*)ln
->value
); 
 560 void freeClient(redisClient 
*c
) { 
 563     /* If this is marked as current client unset it */ 
 564     if (server
.current_client 
== c
) server
.current_client 
= NULL
; 
 566     /* Note that if the client we are freeing is blocked into a blocking 
 567      * call, we have to set querybuf to NULL *before* to call 
 568      * unblockClientWaitingData() to avoid processInputBuffer() will get 
 569      * called. Also it is important to remove the file events after 
 570      * this, because this call adds the READABLE event. */ 
 571     sdsfree(c
->querybuf
); 
 573     if (c
->flags 
& REDIS_BLOCKED
) 
 574         unblockClientWaitingData(c
); 
 576     /* UNWATCH all the keys */ 
 578     listRelease(c
->watched_keys
); 
 579     /* Unsubscribe from all the pubsub channels */ 
 580     pubsubUnsubscribeAllChannels(c
,0); 
 581     pubsubUnsubscribeAllPatterns(c
,0); 
 582     dictRelease(c
->pubsub_channels
); 
 583     listRelease(c
->pubsub_patterns
); 
 584     /* Obvious cleanup */ 
 585     aeDeleteFileEvent(server
.el
,c
->fd
,AE_READABLE
); 
 586     aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
); 
 587     listRelease(c
->reply
); 
 590     /* Remove from the list of clients */ 
 591     ln 
= listSearchKey(server
.clients
,c
); 
 592     redisAssert(ln 
!= NULL
); 
 593     listDelNode(server
.clients
,ln
); 
 594     /* When client was just unblocked because of a blocking operation, 
 595      * remove it from the list with unblocked clients. */ 
 596     if (c
->flags 
& REDIS_UNBLOCKED
) { 
 597         ln 
= listSearchKey(server
.unblocked_clients
,c
); 
 598         redisAssert(ln 
!= NULL
); 
 599         listDelNode(server
.unblocked_clients
,ln
); 
 601     listRelease(c
->io_keys
); 
 602     /* Master/slave cleanup. 
 603      * Case 1: we lost the connection with a slave. */ 
 604     if (c
->flags 
& REDIS_SLAVE
) { 
 605         if (c
->replstate 
== REDIS_REPL_SEND_BULK 
&& c
->repldbfd 
!= -1) 
 607         list 
*l 
= (c
->flags 
& REDIS_MONITOR
) ? server
.monitors 
: server
.slaves
; 
 608         ln 
= listSearchKey(l
,c
); 
 609         redisAssert(ln 
!= NULL
); 
 613     /* Case 2: we lost the connection with the master. */ 
 614     if (c
->flags 
& REDIS_MASTER
) { 
 615         server
.master 
= NULL
; 
 616         server
.repl_state 
= REDIS_REPL_CONNECT
; 
 617         server
.repl_down_since 
= server
.unixtime
; 
 618         /* We lost connection with our master, force our slaves to resync 
 619          * with us as well to load the new data set. 
 621          * If server.masterhost is NULL the user called SLAVEOF NO ONE so 
 622          * slave resync is not needed. */ 
 623         if (server
.masterhost 
!= NULL
) disconnectSlaves(); 
 626     /* If this client was scheduled for async freeing we need to remove it 
 628     if (c
->flags 
& REDIS_CLOSE_ASAP
) { 
 629         ln 
= listSearchKey(server
.clients_to_close
,c
); 
 630         redisAssert(ln 
!= NULL
); 
 631         listDelNode(server
.clients_to_close
,ln
); 
 636     freeClientMultiState(c
); 
 640 /* Schedule a client to free it at a safe time in the serverCron() function. 
 641  * This function is useful when we need to terminate a client but we are in 
 642  * a context where calling freeClient() is not possible, because the client 
 643  * should be valid for the continuation of the flow of the program. */ 
 644 void freeClientAsync(redisClient 
*c
) { 
 645     if (c
->flags 
& REDIS_CLOSE_ASAP
) return; 
 646     c
->flags 
|= REDIS_CLOSE_ASAP
; 
 647     listAddNodeTail(server
.clients_to_close
,c
); 
 650 void freeClientsInAsyncFreeQueue(void) { 
 651     while (listLength(server
.clients_to_close
)) { 
 652         listNode 
*ln 
= listFirst(server
.clients_to_close
); 
 653         redisClient 
*c 
= listNodeValue(ln
); 
 655         c
->flags 
&= ~REDIS_CLOSE_ASAP
; 
 657         listDelNode(server
.clients_to_close
,ln
); 
 661 void sendReplyToClient(aeEventLoop 
*el
, int fd
, void *privdata
, int mask
) { 
 662     redisClient 
*c 
= privdata
; 
 663     int nwritten 
= 0, totwritten 
= 0, objlen
; 
 669     while(c
->bufpos 
> 0 || listLength(c
->reply
)) { 
 671             if (c
->flags 
& REDIS_MASTER
) { 
 672                 /* Don't reply to a master */ 
 673                 nwritten 
= c
->bufpos 
- c
->sentlen
; 
 675                 nwritten 
= write(fd
,c
->buf
+c
->sentlen
,c
->bufpos
-c
->sentlen
); 
 676                 if (nwritten 
<= 0) break; 
 678             c
->sentlen 
+= nwritten
; 
 679             totwritten 
+= nwritten
; 
 681             /* If the buffer was sent, set bufpos to zero to continue with 
 682              * the remainder of the reply. */ 
 683             if (c
->sentlen 
== c
->bufpos
) { 
 688             o 
= listNodeValue(listFirst(c
->reply
)); 
 689             objlen 
= sdslen(o
->ptr
); 
 690             objmem 
= zmalloc_size_sds(o
->ptr
); 
 693                 listDelNode(c
->reply
,listFirst(c
->reply
)); 
 697             if (c
->flags 
& REDIS_MASTER
) { 
 698                 /* Don't reply to a master */ 
 699                 nwritten 
= objlen 
- c
->sentlen
; 
 701                 nwritten 
= write(fd
, ((char*)o
->ptr
)+c
->sentlen
,objlen
-c
->sentlen
); 
 702                 if (nwritten 
<= 0) break; 
 704             c
->sentlen 
+= nwritten
; 
 705             totwritten 
+= nwritten
; 
 707             /* If we fully sent the object on head go to the next one */ 
 708             if (c
->sentlen 
== objlen
) { 
 709                 listDelNode(c
->reply
,listFirst(c
->reply
)); 
 711                 c
->reply_bytes 
-= objmem
; 
 714         /* Note that we avoid to send more than REDIS_MAX_WRITE_PER_EVENT 
 715          * bytes, in a single threaded server it's a good idea to serve 
 716          * other clients as well, even if a very large request comes from 
 717          * super fast link that is always able to accept data (in real world 
 718          * scenario think about 'KEYS *' against the loopback interface). 
 720          * However if we are over the maxmemory limit we ignore that and 
 721          * just deliver as much data as it is possible to deliver. */ 
 722         if (totwritten 
> REDIS_MAX_WRITE_PER_EVENT 
&& 
 723             (server
.maxmemory 
== 0 || 
 724              zmalloc_used_memory() < server
.maxmemory
)) break; 
 726     if (nwritten 
== -1) { 
 727         if (errno 
== EAGAIN
) { 
 730             redisLog(REDIS_VERBOSE
, 
 731                 "Error writing to client: %s", strerror(errno
)); 
 736     if (totwritten 
> 0) c
->lastinteraction 
= server
.unixtime
; 
 737     if (c
->bufpos 
== 0 && listLength(c
->reply
) == 0) { 
 739         aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
); 
 741         /* Close connection after entire reply has been sent. */ 
 742         if (c
->flags 
& REDIS_CLOSE_AFTER_REPLY
) freeClient(c
); 
 746 /* resetClient prepare the client to process the next command */ 
 747 void resetClient(redisClient 
*c
) { 
 752     /* We clear the ASKING flag as well if we are not inside a MULTI. */ 
 753     if (!(c
->flags 
& REDIS_MULTI
)) c
->flags 
&= (~REDIS_ASKING
); 
 756 int processInlineBuffer(redisClient 
*c
) { 
 757     char *newline 
= strstr(c
->querybuf
,"\r\n"); 
 762     /* Nothing to do without a \r\n */ 
 763     if (newline 
== NULL
) { 
 764         if (sdslen(c
->querybuf
) > REDIS_INLINE_MAX_SIZE
) { 
 765             addReplyError(c
,"Protocol error: too big inline request"); 
 766             setProtocolError(c
,0); 
 771     /* Split the input buffer up to the \r\n */ 
 772     querylen 
= newline
-(c
->querybuf
); 
 773     argv 
= sdssplitlen(c
->querybuf
,querylen
," ",1,&argc
); 
 775     /* Leave data after the first line of the query in the buffer */ 
 776     c
->querybuf 
= sdsrange(c
->querybuf
,querylen
+2,-1); 
 778     /* Setup argv array on client structure */ 
 779     if (c
->argv
) zfree(c
->argv
); 
 780     c
->argv 
= zmalloc(sizeof(robj
*)*argc
); 
 782     /* Create redis objects for all arguments. */ 
 783     for (c
->argc 
= 0, j 
= 0; j 
< argc
; j
++) { 
 784         if (sdslen(argv
[j
])) { 
 785             c
->argv
[c
->argc
] = createObject(REDIS_STRING
,argv
[j
]); 
 795 /* Helper function. Trims query buffer to make the function that processes 
 796  * multi bulk requests idempotent. */ 
 797 static void setProtocolError(redisClient 
*c
, int pos
) { 
 798     if (server
.verbosity 
>= REDIS_VERBOSE
) { 
 799         sds client 
= getClientInfoString(c
); 
 800         redisLog(REDIS_VERBOSE
, 
 801             "Protocol error from client: %s", client
); 
 804     c
->flags 
|= REDIS_CLOSE_AFTER_REPLY
; 
 805     c
->querybuf 
= sdsrange(c
->querybuf
,pos
,-1); 
 808 int processMultibulkBuffer(redisClient 
*c
) { 
 809     char *newline 
= NULL
; 
 813     if (c
->multibulklen 
== 0) { 
 814         /* The client should have been reset */ 
 815         redisAssertWithInfo(c
,NULL
,c
->argc 
== 0); 
 817         /* Multi bulk length cannot be read without a \r\n */ 
 818         newline 
= strchr(c
->querybuf
,'\r'); 
 819         if (newline 
== NULL
) { 
 820             if (sdslen(c
->querybuf
) > REDIS_INLINE_MAX_SIZE
) { 
 821                 addReplyError(c
,"Protocol error: too big mbulk count string"); 
 822                 setProtocolError(c
,0); 
 827         /* Buffer should also contain \n */ 
 828         if (newline
-(c
->querybuf
) > ((signed)sdslen(c
->querybuf
)-2)) 
 831         /* We know for sure there is a whole line since newline != NULL, 
 832          * so go ahead and find out the multi bulk length. */ 
 833         redisAssertWithInfo(c
,NULL
,c
->querybuf
[0] == '*'); 
 834         ok 
= string2ll(c
->querybuf
+1,newline
-(c
->querybuf
+1),&ll
); 
 835         if (!ok 
|| ll 
> 1024*1024) { 
 836             addReplyError(c
,"Protocol error: invalid multibulk length"); 
 837             setProtocolError(c
,pos
); 
 841         pos 
= (newline
-c
->querybuf
)+2; 
 843             c
->querybuf 
= sdsrange(c
->querybuf
,pos
,-1); 
 847         c
->multibulklen 
= ll
; 
 849         /* Setup argv array on client structure */ 
 850         if (c
->argv
) zfree(c
->argv
); 
 851         c
->argv 
= zmalloc(sizeof(robj
*)*c
->multibulklen
); 
 854     redisAssertWithInfo(c
,NULL
,c
->multibulklen 
> 0); 
 855     while(c
->multibulklen
) { 
 856         /* Read bulk length if unknown */ 
 857         if (c
->bulklen 
== -1) { 
 858             newline 
= strchr(c
->querybuf
+pos
,'\r'); 
 859             if (newline 
== NULL
) { 
 860                 if (sdslen(c
->querybuf
) > REDIS_INLINE_MAX_SIZE
) { 
 861                     addReplyError(c
,"Protocol error: too big bulk count string"); 
 862                     setProtocolError(c
,0); 
 867             /* Buffer should also contain \n */ 
 868             if (newline
-(c
->querybuf
) > ((signed)sdslen(c
->querybuf
)-2)) 
 871             if (c
->querybuf
[pos
] != '$') { 
 872                 addReplyErrorFormat(c
, 
 873                     "Protocol error: expected '$', got '%c'", 
 875                 setProtocolError(c
,pos
); 
 879             ok 
= string2ll(c
->querybuf
+pos
+1,newline
-(c
->querybuf
+pos
+1),&ll
); 
 880             if (!ok 
|| ll 
< 0 || ll 
> 512*1024*1024) { 
 881                 addReplyError(c
,"Protocol error: invalid bulk length"); 
 882                 setProtocolError(c
,pos
); 
 886             pos 
+= newline
-(c
->querybuf
+pos
)+2; 
 887             if (ll 
>= REDIS_MBULK_BIG_ARG
) { 
 888                 /* If we are going to read a large object from network 
 889                  * try to make it likely that it will start at c->querybuf 
 890                  * boundary so that we can optimized object creation 
 891                  * avoiding a large copy of data. */ 
 892                 c
->querybuf 
= sdsrange(c
->querybuf
,pos
,-1); 
 894                 /* Hint the sds library about the amount of bytes this string is 
 895                  * going to contain. */ 
 896                 c
->querybuf 
= sdsMakeRoomFor(c
->querybuf
,ll
+2); 
 901         /* Read bulk argument */ 
 902         if (sdslen(c
->querybuf
)-pos 
< (unsigned)(c
->bulklen
+2)) { 
 903             /* Not enough data (+2 == trailing \r\n) */ 
 906             /* Optimization: if the buffer contanins JUST our bulk element 
 907              * instead of creating a new object by *copying* the sds we 
 908              * just use the current sds string. */ 
 910                 c
->bulklen 
>= REDIS_MBULK_BIG_ARG 
&& 
 911                 (signed) sdslen(c
->querybuf
) == c
->bulklen
+2) 
 913                 c
->argv
[c
->argc
++] = createObject(REDIS_STRING
,c
->querybuf
); 
 914                 sdsIncrLen(c
->querybuf
,-2); /* remove CRLF */ 
 915                 c
->querybuf 
= sdsempty(); 
 916                 /* Assume that if we saw a fat argument we'll see another one 
 918                 c
->querybuf 
= sdsMakeRoomFor(c
->querybuf
,c
->bulklen
+2); 
 922                     createStringObject(c
->querybuf
+pos
,c
->bulklen
); 
 931     if (pos
) c
->querybuf 
= sdsrange(c
->querybuf
,pos
,-1); 
 933     /* We're done when c->multibulk == 0 */ 
 934     if (c
->multibulklen 
== 0) return REDIS_OK
; 
 936     /* Still not read to process the command */ 
 940 void processInputBuffer(redisClient 
*c
) { 
 941     /* Keep processing while there is something in the input buffer */ 
 942     while(sdslen(c
->querybuf
)) { 
 943         /* Immediately abort if the client is in the middle of something. */ 
 944         if (c
->flags 
& REDIS_BLOCKED
) return; 
 946         /* REDIS_CLOSE_AFTER_REPLY closes the connection once the reply is 
 947          * written to the client. Make sure to not let the reply grow after 
 948          * this flag has been set (i.e. don't process more commands). */ 
 949         if (c
->flags 
& REDIS_CLOSE_AFTER_REPLY
) return; 
 951         /* Determine request type when unknown. */ 
 953             if (c
->querybuf
[0] == '*') { 
 954                 c
->reqtype 
= REDIS_REQ_MULTIBULK
; 
 956                 c
->reqtype 
= REDIS_REQ_INLINE
; 
 960         if (c
->reqtype 
== REDIS_REQ_INLINE
) { 
 961             if (processInlineBuffer(c
) != REDIS_OK
) break; 
 962         } else if (c
->reqtype 
== REDIS_REQ_MULTIBULK
) { 
 963             if (processMultibulkBuffer(c
) != REDIS_OK
) break; 
 965             redisPanic("Unknown request type"); 
 968         /* Multibulk processing could see a <= 0 length. */ 
 972             /* Only reset the client when the command was executed. */ 
 973             if (processCommand(c
) == REDIS_OK
) 
 979 void readQueryFromClient(aeEventLoop 
*el
, int fd
, void *privdata
, int mask
) { 
 980     redisClient 
*c 
= (redisClient
*) privdata
; 
 986     server
.current_client 
= c
; 
 987     readlen 
= REDIS_IOBUF_LEN
; 
 988     /* If this is a multi bulk request, and we are processing a bulk reply 
 989      * that is large enough, try to maximize the probabilty that the query 
 990      * buffer contains excatly the SDS string representing the object, even 
 991      * at the risk of requring more read(2) calls. This way the function 
 992      * processMultiBulkBuffer() can avoid copying buffers to create the 
 993      * Redis Object representing the argument. */ 
 994     if (c
->reqtype 
== REDIS_REQ_MULTIBULK 
&& c
->multibulklen 
&& c
->bulklen 
!= -1 
 995         && c
->bulklen 
>= REDIS_MBULK_BIG_ARG
) 
 997         int remaining 
= (unsigned)(c
->bulklen
+2)-sdslen(c
->querybuf
); 
 999         if (remaining 
< readlen
) readlen 
= remaining
; 
1002     qblen 
= sdslen(c
->querybuf
); 
1003     if (c
->querybuf_peak 
< qblen
) c
->querybuf_peak 
= qblen
; 
1004     c
->querybuf 
= sdsMakeRoomFor(c
->querybuf
, readlen
); 
1005     nread 
= read(fd
, c
->querybuf
+qblen
, readlen
); 
1007         if (errno 
== EAGAIN
) { 
1010             redisLog(REDIS_VERBOSE
, "Reading from client: %s",strerror(errno
)); 
1014     } else if (nread 
== 0) { 
1015         redisLog(REDIS_VERBOSE
, "Client closed connection"); 
1020         sdsIncrLen(c
->querybuf
,nread
); 
1021         c
->lastinteraction 
= server
.unixtime
; 
1023         server
.current_client 
= NULL
; 
1026     if (sdslen(c
->querybuf
) > server
.client_max_querybuf_len
) { 
1027         sds ci 
= getClientInfoString(c
), bytes 
= sdsempty(); 
1029         bytes 
= sdscatrepr(bytes
,c
->querybuf
,64); 
1030         redisLog(REDIS_WARNING
,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci
, bytes
); 
1036     processInputBuffer(c
); 
1037     server
.current_client 
= NULL
; 
1040 void getClientsMaxBuffers(unsigned long *longest_output_list
, 
1041                           unsigned long *biggest_input_buffer
) { 
1045     unsigned long lol 
= 0, bib 
= 0; 
1047     listRewind(server
.clients
,&li
); 
1048     while ((ln 
= listNext(&li
)) != NULL
) { 
1049         c 
= listNodeValue(ln
); 
1051         if (listLength(c
->reply
) > lol
) lol 
= listLength(c
->reply
); 
1052         if (sdslen(c
->querybuf
) > bib
) bib 
= sdslen(c
->querybuf
); 
1054     *longest_output_list 
= lol
; 
1055     *biggest_input_buffer 
= bib
; 
1058 /* Turn a Redis client into an sds string representing its state. */ 
1059 sds 
getClientInfoString(redisClient 
*client
) { 
1060     char ip
[32], flags
[16], events
[3], *p
; 
1064     anetPeerToString(client
->fd
,ip
,&port
); 
1066     if (client
->flags 
& REDIS_SLAVE
) { 
1067         if (client
->flags 
& REDIS_MONITOR
) 
1072     if (client
->flags 
& REDIS_MASTER
) *p
++ = 'M'; 
1073     if (client
->flags 
& REDIS_MULTI
) *p
++ = 'x'; 
1074     if (client
->flags 
& REDIS_BLOCKED
) *p
++ = 'b'; 
1075     if (client
->flags 
& REDIS_DIRTY_CAS
) *p
++ = 'd'; 
1076     if (client
->flags 
& REDIS_CLOSE_AFTER_REPLY
) *p
++ = 'c'; 
1077     if (client
->flags 
& REDIS_UNBLOCKED
) *p
++ = 'u'; 
1078     if (client
->flags 
& REDIS_CLOSE_ASAP
) *p
++ = 'A'; 
1079     if (p 
== flags
) *p
++ = 'N'; 
1082     emask 
= client
->fd 
== -1 ? 0 : aeGetFileEvents(server
.el
,client
->fd
); 
1084     if (emask 
& AE_READABLE
) *p
++ = 'r'; 
1085     if (emask 
& AE_WRITABLE
) *p
++ = 'w'; 
1087     return sdscatprintf(sdsempty(), 
1088         "addr=%s:%d fd=%d age=%ld idle=%ld flags=%s db=%d sub=%d psub=%d multi=%d qbuf=%lu qbuf-free=%lu obl=%lu oll=%lu omem=%lu events=%s cmd=%s", 
1090         (long)(server
.unixtime 
- client
->ctime
), 
1091         (long)(server
.unixtime 
- client
->lastinteraction
), 
1094         (int) dictSize(client
->pubsub_channels
), 
1095         (int) listLength(client
->pubsub_patterns
), 
1096         (client
->flags 
& REDIS_MULTI
) ? client
->mstate
.count 
: -1, 
1097         (unsigned long) sdslen(client
->querybuf
), 
1098         (unsigned long) sdsavail(client
->querybuf
), 
1099         (unsigned long) client
->bufpos
, 
1100         (unsigned long) listLength(client
->reply
), 
1101         getClientOutputBufferMemoryUsage(client
), 
1103         client
->lastcmd 
? client
->lastcmd
->name 
: "NULL"); 
1106 sds 
getAllClientsInfoString(void) { 
1109     redisClient 
*client
; 
1112     listRewind(server
.clients
,&li
); 
1113     while ((ln 
= listNext(&li
)) != NULL
) { 
1116         client 
= listNodeValue(ln
); 
1117         cs 
= getClientInfoString(client
); 
1118         o 
= sdscatsds(o
,cs
); 
1120         o 
= sdscatlen(o
,"\n",1); 
1125 void clientCommand(redisClient 
*c
) { 
1128     redisClient 
*client
; 
1130     if (!strcasecmp(c
->argv
[1]->ptr
,"list") && c
->argc 
== 2) { 
1131         sds o 
= getAllClientsInfoString(); 
1132         addReplyBulkCBuffer(c
,o
,sdslen(o
)); 
1134     } else if (!strcasecmp(c
->argv
[1]->ptr
,"kill") && c
->argc 
== 3) { 
1135         listRewind(server
.clients
,&li
); 
1136         while ((ln 
= listNext(&li
)) != NULL
) { 
1137             char ip
[32], addr
[64]; 
1140             client 
= listNodeValue(ln
); 
1141             if (anetPeerToString(client
->fd
,ip
,&port
) == -1) continue; 
1142             snprintf(addr
,sizeof(addr
),"%s:%d",ip
,port
); 
1143             if (strcmp(addr
,c
->argv
[2]->ptr
) == 0) { 
1144                 addReply(c
,shared
.ok
); 
1146                     client
->flags 
|= REDIS_CLOSE_AFTER_REPLY
; 
1153         addReplyError(c
,"No such client"); 
1155         addReplyError(c
, "Syntax error, try CLIENT (LIST | KILL ip:port)"); 
1159 /* Rewrite the command vector of the client. All the new objects ref count 
1160  * is incremented. The old command vector is freed, and the old objects 
1161  * ref count is decremented. */ 
1162 void rewriteClientCommandVector(redisClient 
*c
, int argc
, ...) { 
1165     robj 
**argv
; /* The new argument vector */ 
1167     argv 
= zmalloc(sizeof(robj
*)*argc
); 
1169     for (j 
= 0; j 
< argc
; j
++) { 
1172         a 
= va_arg(ap
, robj
*); 
1176     /* We free the objects in the original vector at the end, so we are 
1177      * sure that if the same objects are reused in the new vector the 
1178      * refcount gets incremented before it gets decremented. */ 
1179     for (j 
= 0; j 
< c
->argc
; j
++) decrRefCount(c
->argv
[j
]); 
1181     /* Replace argv and argc with our new versions. */ 
1184     c
->cmd 
= lookupCommand(c
->argv
[0]->ptr
); 
1185     redisAssertWithInfo(c
,NULL
,c
->cmd 
!= NULL
); 
1189 /* Rewrite a single item in the command vector. 
1190  * The new val ref count is incremented, and the old decremented. */ 
1191 void rewriteClientCommandArgument(redisClient 
*c
, int i
, robj 
*newval
) { 
1194     redisAssertWithInfo(c
,NULL
,i 
< c
->argc
); 
1195     oldval 
= c
->argv
[i
]; 
1196     c
->argv
[i
] = newval
; 
1197     incrRefCount(newval
); 
1198     decrRefCount(oldval
); 
1200     /* If this is the command name make sure to fix c->cmd. */ 
1202         c
->cmd 
= lookupCommand(c
->argv
[0]->ptr
); 
1203         redisAssertWithInfo(c
,NULL
,c
->cmd 
!= NULL
); 
1207 /* This function returns the number of bytes that Redis is virtually 
1208  * using to store the reply still not read by the client. 
1209  * It is "virtual" since the reply output list may contain objects that 
1210  * are shared and are not really using additional memory. 
1212  * The function returns the total sum of the length of all the objects 
1213  * stored in the output list, plus the memory used to allocate every 
1214  * list node. The static reply buffer is not taken into account since it 
1215  * is allocated anyway. 
1217  * Note: this function is very fast so can be called as many time as 
1218  * the caller wishes. The main usage of this function currently is 
1219  * enforcing the client output length limits. */ 
1220 unsigned long getClientOutputBufferMemoryUsage(redisClient 
*c
) { 
1221     unsigned long list_item_size 
= sizeof(listNode
)+sizeof(robj
); 
1223     return c
->reply_bytes 
+ (list_item_size
*listLength(c
->reply
)); 
1226 /* Get the class of a client, used in order to envorce limits to different 
1227  * classes of clients. 
1229  * The function will return one of the following: 
1230  * REDIS_CLIENT_LIMIT_CLASS_NORMAL -> Normal client 
1231  * REDIS_CLIENT_LIMIT_CLASS_SLAVE  -> Slave or client executing MONITOR command 
1232  * REDIS_CLIENT_LIMIT_CLASS_PUBSUB -> Client subscribed to Pub/Sub channels 
1234 int getClientLimitClass(redisClient 
*c
) { 
1235     if (c
->flags 
& REDIS_SLAVE
) return REDIS_CLIENT_LIMIT_CLASS_SLAVE
; 
1236     if (dictSize(c
->pubsub_channels
) || listLength(c
->pubsub_patterns
)) 
1237         return REDIS_CLIENT_LIMIT_CLASS_PUBSUB
; 
1238     return REDIS_CLIENT_LIMIT_CLASS_NORMAL
; 
1241 int getClientLimitClassByName(char *name
) { 
1242     if (!strcasecmp(name
,"normal")) return REDIS_CLIENT_LIMIT_CLASS_NORMAL
; 
1243     else if (!strcasecmp(name
,"slave")) return REDIS_CLIENT_LIMIT_CLASS_SLAVE
; 
1244     else if (!strcasecmp(name
,"pubsub")) return REDIS_CLIENT_LIMIT_CLASS_PUBSUB
; 
1248 char *getClientLimitClassName(int class) { 
1250     case REDIS_CLIENT_LIMIT_CLASS_NORMAL
:   return "normal"; 
1251     case REDIS_CLIENT_LIMIT_CLASS_SLAVE
:    return "slave"; 
1252     case REDIS_CLIENT_LIMIT_CLASS_PUBSUB
:   return "pubsub"; 
1253     default:                                return NULL
; 
1257 /* The function checks if the client reached output buffer soft or hard 
1258  * limit, and also update the state needed to check the soft limit as 
1261  * Return value: non-zero if the client reached the soft or the hard limit. 
1262  *               Otherwise zero is returned. */ 
1263 int checkClientOutputBufferLimits(redisClient 
*c
) { 
1264     int soft 
= 0, hard 
= 0, class; 
1265     unsigned long used_mem 
= getClientOutputBufferMemoryUsage(c
); 
1267     class = getClientLimitClass(c
); 
1268     if (server
.client_obuf_limits
[class].hard_limit_bytes 
&& 
1269         used_mem 
>= server
.client_obuf_limits
[class].hard_limit_bytes
) 
1271     if (server
.client_obuf_limits
[class].soft_limit_bytes 
&& 
1272         used_mem 
>= server
.client_obuf_limits
[class].soft_limit_bytes
) 
1275     /* We need to check if the soft limit is reached continuously for the 
1276      * specified amount of seconds. */ 
1278         if (c
->obuf_soft_limit_reached_time 
== 0) { 
1279             c
->obuf_soft_limit_reached_time 
= server
.unixtime
; 
1280             soft 
= 0; /* First time we see the soft limit reached */ 
1282             time_t elapsed 
= server
.unixtime 
- c
->obuf_soft_limit_reached_time
; 
1285                 server
.client_obuf_limits
[class].soft_limit_seconds
) { 
1286                 soft 
= 0; /* The client still did not reached the max number of 
1287                              seconds for the soft limit to be considered 
1292         c
->obuf_soft_limit_reached_time 
= 0; 
1294     return soft 
|| hard
; 
1297 /* Asynchronously close a client if soft or hard limit is reached on the 
1298  * output buffer size. The caller can check if the client will be closed 
1299  * checking if the client REDIS_CLOSE_ASAP flag is set. 
1301  * Note: we need to close the client asynchronously because this function is 
1302  * called from contexts where the client can't be freed safely, i.e. from the 
1303  * lower level functions pushing data inside the client output buffers. */ 
1304 void asyncCloseClientOnOutputBufferLimitReached(redisClient 
*c
) { 
1305     if (c
->reply_bytes 
== 0 || c
->flags 
& REDIS_CLOSE_ASAP
) return; 
1306     if (checkClientOutputBufferLimits(c
)) { 
1307         sds client 
= getClientInfoString(c
); 
1310         redisLog(REDIS_WARNING
,"Client %s scheduled to be closed ASAP for overcoming of output buffer limits.", client
); 
1315 /* Helper function used by freeMemoryIfNeeded() in order to flush slaves 
1316  * output buffers without returning control to the event loop. */ 
1317 void flushSlavesOutputBuffers(void) { 
1321     listRewind(server
.slaves
,&li
); 
1322     while((ln 
= listNext(&li
))) { 
1323         redisClient 
*slave 
= listNodeValue(ln
); 
1326         events 
= aeGetFileEvents(server
.el
,slave
->fd
); 
1327         if (events 
& AE_WRITABLE 
&& 
1328             slave
->replstate 
== REDIS_REPL_ONLINE 
&& 
1329             listLength(slave
->reply
)) 
1331             sendReplyToClient(server
.el
,slave
->fd
,slave
,0);