2 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
33 static void setProtocolError(redisClient
*c
, int pos
);
35 /* To evaluate the output buffer size of a client we need to get size of
36 * allocated objects, however we can't used zmalloc_size() directly on sds
37 * strings because of the trick they use to work (the header is before the
38 * returned pointer), so we use this helper function. */
39 size_t zmalloc_size_sds(sds s
) {
40 return zmalloc_size(s
-sizeof(struct sdshdr
));
43 void *dupClientReplyValue(void *o
) {
44 incrRefCount((robj
*)o
);
48 int listMatchObjects(void *a
, void *b
) {
49 return equalStringObjects(a
,b
);
52 redisClient
*createClient(int fd
) {
53 redisClient
*c
= zmalloc(sizeof(redisClient
));
55 /* passing -1 as fd it is possible to create a non connected client.
56 * This is useful since all the Redis commands needs to be executed
57 * in the context of a client. When commands are executed in other
58 * contexts (for instance a Lua script) we need a non connected client. */
60 anetNonBlock(NULL
,fd
);
61 anetTcpNoDelay(NULL
,fd
);
62 if (aeCreateFileEvent(server
.el
,fd
,AE_READABLE
,
63 readQueryFromClient
, c
) == AE_ERR
)
74 c
->querybuf
= sdsempty();
79 c
->cmd
= c
->lastcmd
= NULL
;
84 c
->ctime
= c
->lastinteraction
= server
.unixtime
;
86 c
->replstate
= REDIS_REPL_NONE
;
87 c
->slave_listening_port
= 0;
88 c
->reply
= listCreate();
90 c
->obuf_soft_limit_reached_time
= 0;
91 listSetFreeMethod(c
->reply
,decrRefCount
);
92 listSetDupMethod(c
->reply
,dupClientReplyValue
);
96 c
->bpop
.target
= NULL
;
97 c
->io_keys
= listCreate();
98 c
->watched_keys
= listCreate();
99 listSetFreeMethod(c
->io_keys
,decrRefCount
);
100 c
->pubsub_channels
= dictCreate(&setDictType
,NULL
);
101 c
->pubsub_patterns
= listCreate();
102 listSetFreeMethod(c
->pubsub_patterns
,decrRefCount
);
103 listSetMatchMethod(c
->pubsub_patterns
,listMatchObjects
);
104 if (fd
!= -1) listAddNodeTail(server
.clients
,c
);
105 initClientMultiState(c
);
109 /* This function is called every time we are going to transmit new data
110 * to the client. The behavior is the following:
112 * If the client should receive new data (normal clients will) the function
113 * returns REDIS_OK, and make sure to install the write handler in our event
114 * loop so that when the socket is writable new data gets written.
116 * If the client should not receive new data, because it is a fake client
117 * or a slave, or because the setup of the write handler failed, the function
120 * Typically gets called every time a reply is built, before adding more
121 * data to the clients output buffers. If the function returns REDIS_ERR no
122 * data should be appended to the output buffers. */
123 int prepareClientToWrite(redisClient
*c
) {
124 if (c
->flags
& REDIS_LUA_CLIENT
) return REDIS_OK
;
125 if (c
->fd
<= 0) return REDIS_ERR
; /* Fake client */
126 if (c
->bufpos
== 0 && listLength(c
->reply
) == 0 &&
127 (c
->replstate
== REDIS_REPL_NONE
||
128 c
->replstate
== REDIS_REPL_ONLINE
) &&
129 aeCreateFileEvent(server
.el
, c
->fd
, AE_WRITABLE
,
130 sendReplyToClient
, c
) == AE_ERR
) return REDIS_ERR
;
134 /* Create a duplicate of the last object in the reply list when
135 * it is not exclusively owned by the reply list. */
136 robj
*dupLastObjectIfNeeded(list
*reply
) {
139 redisAssert(listLength(reply
) > 0);
140 ln
= listLast(reply
);
141 cur
= listNodeValue(ln
);
142 if (cur
->refcount
> 1) {
143 new = dupStringObject(cur
);
145 listNodeValue(ln
) = new;
147 return listNodeValue(ln
);
150 /* -----------------------------------------------------------------------------
151 * Low level functions to add more data to output buffers.
152 * -------------------------------------------------------------------------- */
154 int _addReplyToBuffer(redisClient
*c
, char *s
, size_t len
) {
155 size_t available
= sizeof(c
->buf
)-c
->bufpos
;
157 if (c
->flags
& REDIS_CLOSE_AFTER_REPLY
) return REDIS_OK
;
159 /* If there already are entries in the reply list, we cannot
160 * add anything more to the static buffer. */
161 if (listLength(c
->reply
) > 0) return REDIS_ERR
;
163 /* Check that the buffer has enough space available for this string. */
164 if (len
> available
) return REDIS_ERR
;
166 memcpy(c
->buf
+c
->bufpos
,s
,len
);
171 void _addReplyObjectToList(redisClient
*c
, robj
*o
) {
174 if (c
->flags
& REDIS_CLOSE_AFTER_REPLY
) return;
176 if (listLength(c
->reply
) == 0) {
178 listAddNodeTail(c
->reply
,o
);
179 c
->reply_bytes
+= zmalloc_size_sds(o
->ptr
);
181 tail
= listNodeValue(listLast(c
->reply
));
183 /* Append to this object when possible. */
184 if (tail
->ptr
!= NULL
&&
185 sdslen(tail
->ptr
)+sdslen(o
->ptr
) <= REDIS_REPLY_CHUNK_BYTES
)
187 c
->reply_bytes
-= zmalloc_size_sds(tail
->ptr
);
188 tail
= dupLastObjectIfNeeded(c
->reply
);
189 tail
->ptr
= sdscatlen(tail
->ptr
,o
->ptr
,sdslen(o
->ptr
));
190 c
->reply_bytes
+= zmalloc_size_sds(tail
->ptr
);
193 listAddNodeTail(c
->reply
,o
);
194 c
->reply_bytes
+= zmalloc_size_sds(o
->ptr
);
197 asyncCloseClientOnOutputBufferLimitReached(c
);
200 /* This method takes responsibility over the sds. When it is no longer
201 * needed it will be free'd, otherwise it ends up in a robj. */
202 void _addReplySdsToList(redisClient
*c
, sds s
) {
205 if (c
->flags
& REDIS_CLOSE_AFTER_REPLY
) {
210 if (listLength(c
->reply
) == 0) {
211 listAddNodeTail(c
->reply
,createObject(REDIS_STRING
,s
));
212 c
->reply_bytes
+= zmalloc_size_sds(s
);
214 tail
= listNodeValue(listLast(c
->reply
));
216 /* Append to this object when possible. */
217 if (tail
->ptr
!= NULL
&&
218 sdslen(tail
->ptr
)+sdslen(s
) <= REDIS_REPLY_CHUNK_BYTES
)
220 c
->reply_bytes
-= zmalloc_size_sds(tail
->ptr
);
221 tail
= dupLastObjectIfNeeded(c
->reply
);
222 tail
->ptr
= sdscatlen(tail
->ptr
,s
,sdslen(s
));
223 c
->reply_bytes
+= zmalloc_size_sds(tail
->ptr
);
226 listAddNodeTail(c
->reply
,createObject(REDIS_STRING
,s
));
227 c
->reply_bytes
+= zmalloc_size_sds(s
);
230 asyncCloseClientOnOutputBufferLimitReached(c
);
233 void _addReplyStringToList(redisClient
*c
, char *s
, size_t len
) {
236 if (c
->flags
& REDIS_CLOSE_AFTER_REPLY
) return;
238 if (listLength(c
->reply
) == 0) {
239 robj
*o
= createStringObject(s
,len
);
241 listAddNodeTail(c
->reply
,o
);
242 c
->reply_bytes
+= zmalloc_size_sds(o
->ptr
);
244 tail
= listNodeValue(listLast(c
->reply
));
246 /* Append to this object when possible. */
247 if (tail
->ptr
!= NULL
&&
248 sdslen(tail
->ptr
)+len
<= REDIS_REPLY_CHUNK_BYTES
)
250 c
->reply_bytes
-= zmalloc_size_sds(tail
->ptr
);
251 tail
= dupLastObjectIfNeeded(c
->reply
);
252 tail
->ptr
= sdscatlen(tail
->ptr
,s
,len
);
253 c
->reply_bytes
+= zmalloc_size_sds(tail
->ptr
);
255 robj
*o
= createStringObject(s
,len
);
257 listAddNodeTail(c
->reply
,o
);
258 c
->reply_bytes
+= zmalloc_size_sds(o
->ptr
);
261 asyncCloseClientOnOutputBufferLimitReached(c
);
264 /* -----------------------------------------------------------------------------
265 * Higher level functions to queue data on the client output buffer.
266 * The following functions are the ones that commands implementations will call.
267 * -------------------------------------------------------------------------- */
269 void addReply(redisClient
*c
, robj
*obj
) {
270 if (prepareClientToWrite(c
) != REDIS_OK
) return;
272 /* This is an important place where we can avoid copy-on-write
273 * when there is a saving child running, avoiding touching the
274 * refcount field of the object if it's not needed.
276 * If the encoding is RAW and there is room in the static buffer
277 * we'll be able to send the object to the client without
278 * messing with its page. */
279 if (obj
->encoding
== REDIS_ENCODING_RAW
) {
280 if (_addReplyToBuffer(c
,obj
->ptr
,sdslen(obj
->ptr
)) != REDIS_OK
)
281 _addReplyObjectToList(c
,obj
);
282 } else if (obj
->encoding
== REDIS_ENCODING_INT
) {
283 /* Optimization: if there is room in the static buffer for 32 bytes
284 * (more than the max chars a 64 bit integer can take as string) we
285 * avoid decoding the object and go for the lower level approach. */
286 if (listLength(c
->reply
) == 0 && (sizeof(c
->buf
) - c
->bufpos
) >= 32) {
290 len
= ll2string(buf
,sizeof(buf
),(long)obj
->ptr
);
291 if (_addReplyToBuffer(c
,buf
,len
) == REDIS_OK
)
293 /* else... continue with the normal code path, but should never
294 * happen actually since we verified there is room. */
296 obj
= getDecodedObject(obj
);
297 if (_addReplyToBuffer(c
,obj
->ptr
,sdslen(obj
->ptr
)) != REDIS_OK
)
298 _addReplyObjectToList(c
,obj
);
301 redisPanic("Wrong obj->encoding in addReply()");
305 void addReplySds(redisClient
*c
, sds s
) {
306 if (prepareClientToWrite(c
) != REDIS_OK
) {
307 /* The caller expects the sds to be free'd. */
311 if (_addReplyToBuffer(c
,s
,sdslen(s
)) == REDIS_OK
) {
314 /* This method free's the sds when it is no longer needed. */
315 _addReplySdsToList(c
,s
);
319 void addReplyString(redisClient
*c
, char *s
, size_t len
) {
320 if (prepareClientToWrite(c
) != REDIS_OK
) return;
321 if (_addReplyToBuffer(c
,s
,len
) != REDIS_OK
)
322 _addReplyStringToList(c
,s
,len
);
325 void addReplyErrorLength(redisClient
*c
, char *s
, size_t len
) {
326 addReplyString(c
,"-ERR ",5);
327 addReplyString(c
,s
,len
);
328 addReplyString(c
,"\r\n",2);
331 void addReplyError(redisClient
*c
, char *err
) {
332 addReplyErrorLength(c
,err
,strlen(err
));
335 void addReplyErrorFormat(redisClient
*c
, const char *fmt
, ...) {
339 sds s
= sdscatvprintf(sdsempty(),fmt
,ap
);
341 /* Make sure there are no newlines in the string, otherwise invalid protocol
344 for (j
= 0; j
< l
; j
++) {
345 if (s
[j
] == '\r' || s
[j
] == '\n') s
[j
] = ' ';
347 addReplyErrorLength(c
,s
,sdslen(s
));
351 void addReplyStatusLength(redisClient
*c
, char *s
, size_t len
) {
352 addReplyString(c
,"+",1);
353 addReplyString(c
,s
,len
);
354 addReplyString(c
,"\r\n",2);
357 void addReplyStatus(redisClient
*c
, char *status
) {
358 addReplyStatusLength(c
,status
,strlen(status
));
361 void addReplyStatusFormat(redisClient
*c
, const char *fmt
, ...) {
364 sds s
= sdscatvprintf(sdsempty(),fmt
,ap
);
366 addReplyStatusLength(c
,s
,sdslen(s
));
370 /* Adds an empty object to the reply list that will contain the multi bulk
371 * length, which is not known when this function is called. */
372 void *addDeferredMultiBulkLength(redisClient
*c
) {
373 /* Note that we install the write event here even if the object is not
374 * ready to be sent, since we are sure that before returning to the
375 * event loop setDeferredMultiBulkLength() will be called. */
376 if (prepareClientToWrite(c
) != REDIS_OK
) return NULL
;
377 listAddNodeTail(c
->reply
,createObject(REDIS_STRING
,NULL
));
378 return listLast(c
->reply
);
381 /* Populate the length object and try glueing it to the next chunk. */
382 void setDeferredMultiBulkLength(redisClient
*c
, void *node
, long length
) {
383 listNode
*ln
= (listNode
*)node
;
386 /* Abort when *node is NULL (see addDeferredMultiBulkLength). */
387 if (node
== NULL
) return;
389 len
= listNodeValue(ln
);
390 len
->ptr
= sdscatprintf(sdsempty(),"*%ld\r\n",length
);
391 c
->reply_bytes
+= zmalloc_size_sds(len
->ptr
);
392 if (ln
->next
!= NULL
) {
393 next
= listNodeValue(ln
->next
);
395 /* Only glue when the next node is non-NULL (an sds in this case) */
396 if (next
->ptr
!= NULL
) {
397 c
->reply_bytes
-= zmalloc_size_sds(len
->ptr
);
398 c
->reply_bytes
-= zmalloc_size_sds(next
->ptr
);
399 len
->ptr
= sdscatlen(len
->ptr
,next
->ptr
,sdslen(next
->ptr
));
400 c
->reply_bytes
+= zmalloc_size_sds(len
->ptr
);
401 listDelNode(c
->reply
,ln
->next
);
404 asyncCloseClientOnOutputBufferLimitReached(c
);
407 /* Add a duble as a bulk reply */
408 void addReplyDouble(redisClient
*c
, double d
) {
409 char dbuf
[128], sbuf
[128];
411 dlen
= snprintf(dbuf
,sizeof(dbuf
),"%.17g",d
);
412 slen
= snprintf(sbuf
,sizeof(sbuf
),"$%d\r\n%s\r\n",dlen
,dbuf
);
413 addReplyString(c
,sbuf
,slen
);
416 /* Add a long long as integer reply or bulk len / multi bulk count.
417 * Basically this is used to output <prefix><long long><crlf>. */
418 void addReplyLongLongWithPrefix(redisClient
*c
, long long ll
, char prefix
) {
422 /* Things like $3\r\n or *2\r\n are emitted very often by the protocol
423 * so we have a few shared objects to use if the integer is small
424 * like it is most of the times. */
425 if (prefix
== '*' && ll
< REDIS_SHARED_BULKHDR_LEN
) {
426 addReply(c
,shared
.mbulkhdr
[ll
]);
428 } else if (prefix
== '$' && ll
< REDIS_SHARED_BULKHDR_LEN
) {
429 addReply(c
,shared
.bulkhdr
[ll
]);
434 len
= ll2string(buf
+1,sizeof(buf
)-1,ll
);
437 addReplyString(c
,buf
,len
+3);
440 void addReplyLongLong(redisClient
*c
, long long ll
) {
442 addReply(c
,shared
.czero
);
444 addReply(c
,shared
.cone
);
446 addReplyLongLongWithPrefix(c
,ll
,':');
449 void addReplyMultiBulkLen(redisClient
*c
, long length
) {
450 addReplyLongLongWithPrefix(c
,length
,'*');
453 /* Create the length prefix of a bulk reply, example: $2234 */
454 void addReplyBulkLen(redisClient
*c
, robj
*obj
) {
457 if (obj
->encoding
== REDIS_ENCODING_RAW
) {
458 len
= sdslen(obj
->ptr
);
460 long n
= (long)obj
->ptr
;
462 /* Compute how many bytes will take this integer as a radix 10 string */
468 while((n
= n
/10) != 0) {
472 addReplyLongLongWithPrefix(c
,len
,'$');
475 /* Add a Redis Object as a bulk reply */
476 void addReplyBulk(redisClient
*c
, robj
*obj
) {
477 addReplyBulkLen(c
,obj
);
479 addReply(c
,shared
.crlf
);
482 /* Add a C buffer as bulk reply */
483 void addReplyBulkCBuffer(redisClient
*c
, void *p
, size_t len
) {
484 addReplyLongLongWithPrefix(c
,len
,'$');
485 addReplyString(c
,p
,len
);
486 addReply(c
,shared
.crlf
);
489 /* Add a C nul term string as bulk reply */
490 void addReplyBulkCString(redisClient
*c
, char *s
) {
492 addReply(c
,shared
.nullbulk
);
494 addReplyBulkCBuffer(c
,s
,strlen(s
));
498 /* Add a long long as a bulk reply */
499 void addReplyBulkLongLong(redisClient
*c
, long long ll
) {
503 len
= ll2string(buf
,64,ll
);
504 addReplyBulkCBuffer(c
,buf
,len
);
507 /* Copy 'src' client output buffers into 'dst' client output buffers.
508 * The function takes care of freeing the old output buffers of the
509 * destination client. */
510 void copyClientOutputBuffer(redisClient
*dst
, redisClient
*src
) {
511 listRelease(dst
->reply
);
512 dst
->reply
= listDup(src
->reply
);
513 memcpy(dst
->buf
,src
->buf
,src
->bufpos
);
514 dst
->bufpos
= src
->bufpos
;
515 dst
->reply_bytes
= src
->reply_bytes
;
518 static void acceptCommonHandler(int fd
, int flags
) {
520 if ((c
= createClient(fd
)) == NULL
) {
521 redisLog(REDIS_WARNING
,"Error allocating resources for the client");
522 close(fd
); /* May be already closed, just ignore errors */
525 /* If maxclient directive is set and this is one client more... close the
526 * connection. Note that we create the client instead to check before
527 * for this condition, since now the socket is already set in nonblocking
528 * mode and we can send an error for free using the Kernel I/O */
529 if (listLength(server
.clients
) > server
.maxclients
) {
530 char *err
= "-ERR max number of clients reached\r\n";
532 /* That's a best effort error message, don't check write errors */
533 if (write(c
->fd
,err
,strlen(err
)) == -1) {
534 /* Nothing to do, Just to avoid the warning... */
536 server
.stat_rejected_conn
++;
540 server
.stat_numconnections
++;
544 void acceptTcpHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
549 REDIS_NOTUSED(privdata
);
551 cfd
= anetTcpAccept(server
.neterr
, fd
, cip
, &cport
);
553 redisLog(REDIS_WARNING
,"Accepting client connection: %s", server
.neterr
);
556 redisLog(REDIS_VERBOSE
,"Accepted %s:%d", cip
, cport
);
557 acceptCommonHandler(cfd
,0);
560 void acceptUnixHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
564 REDIS_NOTUSED(privdata
);
566 cfd
= anetUnixAccept(server
.neterr
, fd
);
568 redisLog(REDIS_WARNING
,"Accepting client connection: %s", server
.neterr
);
571 redisLog(REDIS_VERBOSE
,"Accepted connection to %s", server
.unixsocket
);
572 acceptCommonHandler(cfd
,REDIS_UNIX_SOCKET
);
576 static void freeClientArgv(redisClient
*c
) {
578 for (j
= 0; j
< c
->argc
; j
++)
579 decrRefCount(c
->argv
[j
]);
584 /* Close all the slaves connections. This is useful in chained replication
585 * when we resync with our own master and want to force all our slaves to
586 * resync with us as well. */
587 void disconnectSlaves(void) {
588 while (listLength(server
.slaves
)) {
589 listNode
*ln
= listFirst(server
.slaves
);
590 freeClient((redisClient
*)ln
->value
);
594 void freeClient(redisClient
*c
) {
597 /* If this is marked as current client unset it */
598 if (server
.current_client
== c
) server
.current_client
= NULL
;
600 /* Note that if the client we are freeing is blocked into a blocking
601 * call, we have to set querybuf to NULL *before* to call
602 * unblockClientWaitingData() to avoid processInputBuffer() will get
603 * called. Also it is important to remove the file events after
604 * this, because this call adds the READABLE event. */
605 sdsfree(c
->querybuf
);
607 if (c
->flags
& REDIS_BLOCKED
)
608 unblockClientWaitingData(c
);
610 /* UNWATCH all the keys */
612 listRelease(c
->watched_keys
);
613 /* Unsubscribe from all the pubsub channels */
614 pubsubUnsubscribeAllChannels(c
,0);
615 pubsubUnsubscribeAllPatterns(c
,0);
616 dictRelease(c
->pubsub_channels
);
617 listRelease(c
->pubsub_patterns
);
618 /* Obvious cleanup */
619 aeDeleteFileEvent(server
.el
,c
->fd
,AE_READABLE
);
620 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
621 listRelease(c
->reply
);
624 /* Remove from the list of clients */
625 ln
= listSearchKey(server
.clients
,c
);
626 redisAssert(ln
!= NULL
);
627 listDelNode(server
.clients
,ln
);
628 /* When client was just unblocked because of a blocking operation,
629 * remove it from the list with unblocked clients. */
630 if (c
->flags
& REDIS_UNBLOCKED
) {
631 ln
= listSearchKey(server
.unblocked_clients
,c
);
632 redisAssert(ln
!= NULL
);
633 listDelNode(server
.unblocked_clients
,ln
);
635 listRelease(c
->io_keys
);
636 /* Master/slave cleanup.
637 * Case 1: we lost the connection with a slave. */
638 if (c
->flags
& REDIS_SLAVE
) {
639 if (c
->replstate
== REDIS_REPL_SEND_BULK
&& c
->repldbfd
!= -1)
641 list
*l
= (c
->flags
& REDIS_MONITOR
) ? server
.monitors
: server
.slaves
;
642 ln
= listSearchKey(l
,c
);
643 redisAssert(ln
!= NULL
);
647 /* Case 2: we lost the connection with the master. */
648 if (c
->flags
& REDIS_MASTER
) {
649 server
.master
= NULL
;
650 server
.repl_state
= REDIS_REPL_CONNECT
;
651 server
.repl_down_since
= server
.unixtime
;
652 /* We lost connection with our master, force our slaves to resync
653 * with us as well to load the new data set.
655 * If server.masterhost is NULL the user called SLAVEOF NO ONE so
656 * slave resync is not needed. */
657 if (server
.masterhost
!= NULL
) disconnectSlaves();
660 /* If this client was scheduled for async freeing we need to remove it
662 if (c
->flags
& REDIS_CLOSE_ASAP
) {
663 ln
= listSearchKey(server
.clients_to_close
,c
);
664 redisAssert(ln
!= NULL
);
665 listDelNode(server
.clients_to_close
,ln
);
670 freeClientMultiState(c
);
674 /* Schedule a client to free it at a safe time in the serverCron() function.
675 * This function is useful when we need to terminate a client but we are in
676 * a context where calling freeClient() is not possible, because the client
677 * should be valid for the continuation of the flow of the program. */
678 void freeClientAsync(redisClient
*c
) {
679 if (c
->flags
& REDIS_CLOSE_ASAP
) return;
680 c
->flags
|= REDIS_CLOSE_ASAP
;
681 listAddNodeTail(server
.clients_to_close
,c
);
684 void freeClientsInAsyncFreeQueue(void) {
685 while (listLength(server
.clients_to_close
)) {
686 listNode
*ln
= listFirst(server
.clients_to_close
);
687 redisClient
*c
= listNodeValue(ln
);
689 c
->flags
&= ~REDIS_CLOSE_ASAP
;
691 listDelNode(server
.clients_to_close
,ln
);
695 void sendReplyToClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
696 redisClient
*c
= privdata
;
697 int nwritten
= 0, totwritten
= 0, objlen
;
703 while(c
->bufpos
> 0 || listLength(c
->reply
)) {
705 if (c
->flags
& REDIS_MASTER
) {
706 /* Don't reply to a master */
707 nwritten
= c
->bufpos
- c
->sentlen
;
709 nwritten
= write(fd
,c
->buf
+c
->sentlen
,c
->bufpos
-c
->sentlen
);
710 if (nwritten
<= 0) break;
712 c
->sentlen
+= nwritten
;
713 totwritten
+= nwritten
;
715 /* If the buffer was sent, set bufpos to zero to continue with
716 * the remainder of the reply. */
717 if (c
->sentlen
== c
->bufpos
) {
722 o
= listNodeValue(listFirst(c
->reply
));
723 objlen
= sdslen(o
->ptr
);
724 objmem
= zmalloc_size_sds(o
->ptr
);
727 listDelNode(c
->reply
,listFirst(c
->reply
));
731 if (c
->flags
& REDIS_MASTER
) {
732 /* Don't reply to a master */
733 nwritten
= objlen
- c
->sentlen
;
735 nwritten
= write(fd
, ((char*)o
->ptr
)+c
->sentlen
,objlen
-c
->sentlen
);
736 if (nwritten
<= 0) break;
738 c
->sentlen
+= nwritten
;
739 totwritten
+= nwritten
;
741 /* If we fully sent the object on head go to the next one */
742 if (c
->sentlen
== objlen
) {
743 listDelNode(c
->reply
,listFirst(c
->reply
));
745 c
->reply_bytes
-= objmem
;
748 /* Note that we avoid to send more than REDIS_MAX_WRITE_PER_EVENT
749 * bytes, in a single threaded server it's a good idea to serve
750 * other clients as well, even if a very large request comes from
751 * super fast link that is always able to accept data (in real world
752 * scenario think about 'KEYS *' against the loopback interface).
754 * However if we are over the maxmemory limit we ignore that and
755 * just deliver as much data as it is possible to deliver. */
756 if (totwritten
> REDIS_MAX_WRITE_PER_EVENT
&&
757 (server
.maxmemory
== 0 ||
758 zmalloc_used_memory() < server
.maxmemory
)) break;
760 if (nwritten
== -1) {
761 if (errno
== EAGAIN
) {
764 redisLog(REDIS_VERBOSE
,
765 "Error writing to client: %s", strerror(errno
));
770 if (totwritten
> 0) c
->lastinteraction
= server
.unixtime
;
771 if (c
->bufpos
== 0 && listLength(c
->reply
) == 0) {
773 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
775 /* Close connection after entire reply has been sent. */
776 if (c
->flags
& REDIS_CLOSE_AFTER_REPLY
) freeClient(c
);
780 /* resetClient prepare the client to process the next command */
781 void resetClient(redisClient
*c
) {
786 /* We clear the ASKING flag as well if we are not inside a MULTI. */
787 if (!(c
->flags
& REDIS_MULTI
)) c
->flags
&= (~REDIS_ASKING
);
790 int processInlineBuffer(redisClient
*c
) {
791 char *newline
= strstr(c
->querybuf
,"\r\n");
796 /* Nothing to do without a \r\n */
797 if (newline
== NULL
) {
798 if (sdslen(c
->querybuf
) > REDIS_INLINE_MAX_SIZE
) {
799 addReplyError(c
,"Protocol error: too big inline request");
800 setProtocolError(c
,0);
805 /* Split the input buffer up to the \r\n */
806 querylen
= newline
-(c
->querybuf
);
807 argv
= sdssplitlen(c
->querybuf
,querylen
," ",1,&argc
);
809 /* Leave data after the first line of the query in the buffer */
810 c
->querybuf
= sdsrange(c
->querybuf
,querylen
+2,-1);
812 /* Setup argv array on client structure */
813 if (c
->argv
) zfree(c
->argv
);
814 c
->argv
= zmalloc(sizeof(robj
*)*argc
);
816 /* Create redis objects for all arguments. */
817 for (c
->argc
= 0, j
= 0; j
< argc
; j
++) {
818 if (sdslen(argv
[j
])) {
819 c
->argv
[c
->argc
] = createObject(REDIS_STRING
,argv
[j
]);
829 /* Helper function. Trims query buffer to make the function that processes
830 * multi bulk requests idempotent. */
831 static void setProtocolError(redisClient
*c
, int pos
) {
832 if (server
.verbosity
>= REDIS_VERBOSE
) {
833 sds client
= getClientInfoString(c
);
834 redisLog(REDIS_VERBOSE
,
835 "Protocol error from client: %s", client
);
838 c
->flags
|= REDIS_CLOSE_AFTER_REPLY
;
839 c
->querybuf
= sdsrange(c
->querybuf
,pos
,-1);
842 int processMultibulkBuffer(redisClient
*c
) {
843 char *newline
= NULL
;
847 if (c
->multibulklen
== 0) {
848 /* The client should have been reset */
849 redisAssertWithInfo(c
,NULL
,c
->argc
== 0);
851 /* Multi bulk length cannot be read without a \r\n */
852 newline
= strchr(c
->querybuf
,'\r');
853 if (newline
== NULL
) {
854 if (sdslen(c
->querybuf
) > REDIS_INLINE_MAX_SIZE
) {
855 addReplyError(c
,"Protocol error: too big mbulk count string");
856 setProtocolError(c
,0);
861 /* Buffer should also contain \n */
862 if (newline
-(c
->querybuf
) > ((signed)sdslen(c
->querybuf
)-2))
865 /* We know for sure there is a whole line since newline != NULL,
866 * so go ahead and find out the multi bulk length. */
867 redisAssertWithInfo(c
,NULL
,c
->querybuf
[0] == '*');
868 ok
= string2ll(c
->querybuf
+1,newline
-(c
->querybuf
+1),&ll
);
869 if (!ok
|| ll
> 1024*1024) {
870 addReplyError(c
,"Protocol error: invalid multibulk length");
871 setProtocolError(c
,pos
);
875 pos
= (newline
-c
->querybuf
)+2;
877 c
->querybuf
= sdsrange(c
->querybuf
,pos
,-1);
881 c
->multibulklen
= ll
;
883 /* Setup argv array on client structure */
884 if (c
->argv
) zfree(c
->argv
);
885 c
->argv
= zmalloc(sizeof(robj
*)*c
->multibulklen
);
888 redisAssertWithInfo(c
,NULL
,c
->multibulklen
> 0);
889 while(c
->multibulklen
) {
890 /* Read bulk length if unknown */
891 if (c
->bulklen
== -1) {
892 newline
= strchr(c
->querybuf
+pos
,'\r');
893 if (newline
== NULL
) {
894 if (sdslen(c
->querybuf
) > REDIS_INLINE_MAX_SIZE
) {
895 addReplyError(c
,"Protocol error: too big bulk count string");
896 setProtocolError(c
,0);
901 /* Buffer should also contain \n */
902 if (newline
-(c
->querybuf
) > ((signed)sdslen(c
->querybuf
)-2))
905 if (c
->querybuf
[pos
] != '$') {
906 addReplyErrorFormat(c
,
907 "Protocol error: expected '$', got '%c'",
909 setProtocolError(c
,pos
);
913 ok
= string2ll(c
->querybuf
+pos
+1,newline
-(c
->querybuf
+pos
+1),&ll
);
914 if (!ok
|| ll
< 0 || ll
> 512*1024*1024) {
915 addReplyError(c
,"Protocol error: invalid bulk length");
916 setProtocolError(c
,pos
);
920 pos
+= newline
-(c
->querybuf
+pos
)+2;
921 if (ll
>= REDIS_MBULK_BIG_ARG
) {
922 /* If we are going to read a large object from network
923 * try to make it likely that it will start at c->querybuf
924 * boundary so that we can optimized object creation
925 * avoiding a large copy of data. */
926 c
->querybuf
= sdsrange(c
->querybuf
,pos
,-1);
928 /* Hint the sds library about the amount of bytes this string is
929 * going to contain. */
930 c
->querybuf
= sdsMakeRoomFor(c
->querybuf
,ll
+2);
935 /* Read bulk argument */
936 if (sdslen(c
->querybuf
)-pos
< (unsigned)(c
->bulklen
+2)) {
937 /* Not enough data (+2 == trailing \r\n) */
940 /* Optimization: if the buffer contanins JUST our bulk element
941 * instead of creating a new object by *copying* the sds we
942 * just use the current sds string. */
944 c
->bulklen
>= REDIS_MBULK_BIG_ARG
&&
945 (signed) sdslen(c
->querybuf
) == c
->bulklen
+2)
947 c
->argv
[c
->argc
++] = createObject(REDIS_STRING
,c
->querybuf
);
948 sdsIncrLen(c
->querybuf
,-2); /* remove CRLF */
949 c
->querybuf
= sdsempty();
950 /* Assume that if we saw a fat argument we'll see another one
952 c
->querybuf
= sdsMakeRoomFor(c
->querybuf
,c
->bulklen
+2);
956 createStringObject(c
->querybuf
+pos
,c
->bulklen
);
965 if (pos
) c
->querybuf
= sdsrange(c
->querybuf
,pos
,-1);
967 /* We're done when c->multibulk == 0 */
968 if (c
->multibulklen
== 0) return REDIS_OK
;
970 /* Still not read to process the command */
974 void processInputBuffer(redisClient
*c
) {
975 /* Keep processing while there is something in the input buffer */
976 while(sdslen(c
->querybuf
)) {
977 /* Immediately abort if the client is in the middle of something. */
978 if (c
->flags
& REDIS_BLOCKED
) return;
980 /* REDIS_CLOSE_AFTER_REPLY closes the connection once the reply is
981 * written to the client. Make sure to not let the reply grow after
982 * this flag has been set (i.e. don't process more commands). */
983 if (c
->flags
& REDIS_CLOSE_AFTER_REPLY
) return;
985 /* Determine request type when unknown. */
987 if (c
->querybuf
[0] == '*') {
988 c
->reqtype
= REDIS_REQ_MULTIBULK
;
990 c
->reqtype
= REDIS_REQ_INLINE
;
994 if (c
->reqtype
== REDIS_REQ_INLINE
) {
995 if (processInlineBuffer(c
) != REDIS_OK
) break;
996 } else if (c
->reqtype
== REDIS_REQ_MULTIBULK
) {
997 if (processMultibulkBuffer(c
) != REDIS_OK
) break;
999 redisPanic("Unknown request type");
1002 /* Multibulk processing could see a <= 0 length. */
1006 /* Only reset the client when the command was executed. */
1007 if (processCommand(c
) == REDIS_OK
)
1013 void readQueryFromClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1014 redisClient
*c
= (redisClient
*) privdata
;
1018 REDIS_NOTUSED(mask
);
1020 server
.current_client
= c
;
1021 readlen
= REDIS_IOBUF_LEN
;
1022 /* If this is a multi bulk request, and we are processing a bulk reply
1023 * that is large enough, try to maximize the probability that the query
1024 * buffer contains exactly the SDS string representing the object, even
1025 * at the risk of requiring more read(2) calls. This way the function
1026 * processMultiBulkBuffer() can avoid copying buffers to create the
1027 * Redis Object representing the argument. */
1028 if (c
->reqtype
== REDIS_REQ_MULTIBULK
&& c
->multibulklen
&& c
->bulklen
!= -1
1029 && c
->bulklen
>= REDIS_MBULK_BIG_ARG
)
1031 int remaining
= (unsigned)(c
->bulklen
+2)-sdslen(c
->querybuf
);
1033 if (remaining
< readlen
) readlen
= remaining
;
1036 qblen
= sdslen(c
->querybuf
);
1037 if (c
->querybuf_peak
< qblen
) c
->querybuf_peak
= qblen
;
1038 c
->querybuf
= sdsMakeRoomFor(c
->querybuf
, readlen
);
1039 nread
= read(fd
, c
->querybuf
+qblen
, readlen
);
1041 if (errno
== EAGAIN
) {
1044 redisLog(REDIS_VERBOSE
, "Reading from client: %s",strerror(errno
));
1048 } else if (nread
== 0) {
1049 redisLog(REDIS_VERBOSE
, "Client closed connection");
1054 sdsIncrLen(c
->querybuf
,nread
);
1055 c
->lastinteraction
= server
.unixtime
;
1057 server
.current_client
= NULL
;
1060 if (sdslen(c
->querybuf
) > server
.client_max_querybuf_len
) {
1061 sds ci
= getClientInfoString(c
), bytes
= sdsempty();
1063 bytes
= sdscatrepr(bytes
,c
->querybuf
,64);
1064 redisLog(REDIS_WARNING
,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci
, bytes
);
1070 processInputBuffer(c
);
1071 server
.current_client
= NULL
;
1074 void getClientsMaxBuffers(unsigned long *longest_output_list
,
1075 unsigned long *biggest_input_buffer
) {
1079 unsigned long lol
= 0, bib
= 0;
1081 listRewind(server
.clients
,&li
);
1082 while ((ln
= listNext(&li
)) != NULL
) {
1083 c
= listNodeValue(ln
);
1085 if (listLength(c
->reply
) > lol
) lol
= listLength(c
->reply
);
1086 if (sdslen(c
->querybuf
) > bib
) bib
= sdslen(c
->querybuf
);
1088 *longest_output_list
= lol
;
1089 *biggest_input_buffer
= bib
;
1092 /* Turn a Redis client into an sds string representing its state. */
1093 sds
getClientInfoString(redisClient
*client
) {
1094 char ip
[32], flags
[16], events
[3], *p
;
1095 int port
= 0; /* initialized to zero for the unix socket case. */
1098 if (!(client
->flags
& REDIS_UNIX_SOCKET
))
1099 anetPeerToString(client
->fd
,ip
,&port
);
1101 if (client
->flags
& REDIS_SLAVE
) {
1102 if (client
->flags
& REDIS_MONITOR
)
1107 if (client
->flags
& REDIS_MASTER
) *p
++ = 'M';
1108 if (client
->flags
& REDIS_MULTI
) *p
++ = 'x';
1109 if (client
->flags
& REDIS_BLOCKED
) *p
++ = 'b';
1110 if (client
->flags
& REDIS_DIRTY_CAS
) *p
++ = 'd';
1111 if (client
->flags
& REDIS_CLOSE_AFTER_REPLY
) *p
++ = 'c';
1112 if (client
->flags
& REDIS_UNBLOCKED
) *p
++ = 'u';
1113 if (client
->flags
& REDIS_CLOSE_ASAP
) *p
++ = 'A';
1114 if (client
->flags
& REDIS_UNIX_SOCKET
) *p
++ = 'U';
1115 if (p
== flags
) *p
++ = 'N';
1118 emask
= client
->fd
== -1 ? 0 : aeGetFileEvents(server
.el
,client
->fd
);
1120 if (emask
& AE_READABLE
) *p
++ = 'r';
1121 if (emask
& AE_WRITABLE
) *p
++ = 'w';
1123 return sdscatprintf(sdsempty(),
1124 "addr=%s:%d fd=%d age=%ld idle=%ld flags=%s db=%d sub=%d psub=%d multi=%d qbuf=%lu qbuf-free=%lu obl=%lu oll=%lu omem=%lu events=%s cmd=%s",
1125 (client
->flags
& REDIS_UNIX_SOCKET
) ? server
.unixsocket
: ip
,
1127 (long)(server
.unixtime
- client
->ctime
),
1128 (long)(server
.unixtime
- client
->lastinteraction
),
1131 (int) dictSize(client
->pubsub_channels
),
1132 (int) listLength(client
->pubsub_patterns
),
1133 (client
->flags
& REDIS_MULTI
) ? client
->mstate
.count
: -1,
1134 (unsigned long) sdslen(client
->querybuf
),
1135 (unsigned long) sdsavail(client
->querybuf
),
1136 (unsigned long) client
->bufpos
,
1137 (unsigned long) listLength(client
->reply
),
1138 getClientOutputBufferMemoryUsage(client
),
1140 client
->lastcmd
? client
->lastcmd
->name
: "NULL");
1143 sds
getAllClientsInfoString(void) {
1146 redisClient
*client
;
1149 listRewind(server
.clients
,&li
);
1150 while ((ln
= listNext(&li
)) != NULL
) {
1153 client
= listNodeValue(ln
);
1154 cs
= getClientInfoString(client
);
1155 o
= sdscatsds(o
,cs
);
1157 o
= sdscatlen(o
,"\n",1);
1162 void clientCommand(redisClient
*c
) {
1165 redisClient
*client
;
1167 if (!strcasecmp(c
->argv
[1]->ptr
,"list") && c
->argc
== 2) {
1168 sds o
= getAllClientsInfoString();
1169 addReplyBulkCBuffer(c
,o
,sdslen(o
));
1171 } else if (!strcasecmp(c
->argv
[1]->ptr
,"kill") && c
->argc
== 3) {
1172 listRewind(server
.clients
,&li
);
1173 while ((ln
= listNext(&li
)) != NULL
) {
1174 char ip
[32], addr
[64];
1177 client
= listNodeValue(ln
);
1178 if (anetPeerToString(client
->fd
,ip
,&port
) == -1) continue;
1179 snprintf(addr
,sizeof(addr
),"%s:%d",ip
,port
);
1180 if (strcmp(addr
,c
->argv
[2]->ptr
) == 0) {
1181 addReply(c
,shared
.ok
);
1183 client
->flags
|= REDIS_CLOSE_AFTER_REPLY
;
1190 addReplyError(c
,"No such client");
1192 addReplyError(c
, "Syntax error, try CLIENT (LIST | KILL ip:port)");
1196 /* Rewrite the command vector of the client. All the new objects ref count
1197 * is incremented. The old command vector is freed, and the old objects
1198 * ref count is decremented. */
1199 void rewriteClientCommandVector(redisClient
*c
, int argc
, ...) {
1202 robj
**argv
; /* The new argument vector */
1204 argv
= zmalloc(sizeof(robj
*)*argc
);
1206 for (j
= 0; j
< argc
; j
++) {
1209 a
= va_arg(ap
, robj
*);
1213 /* We free the objects in the original vector at the end, so we are
1214 * sure that if the same objects are reused in the new vector the
1215 * refcount gets incremented before it gets decremented. */
1216 for (j
= 0; j
< c
->argc
; j
++) decrRefCount(c
->argv
[j
]);
1218 /* Replace argv and argc with our new versions. */
1221 c
->cmd
= lookupCommand(c
->argv
[0]->ptr
);
1222 redisAssertWithInfo(c
,NULL
,c
->cmd
!= NULL
);
1226 /* Rewrite a single item in the command vector.
1227 * The new val ref count is incremented, and the old decremented. */
1228 void rewriteClientCommandArgument(redisClient
*c
, int i
, robj
*newval
) {
1231 redisAssertWithInfo(c
,NULL
,i
< c
->argc
);
1232 oldval
= c
->argv
[i
];
1233 c
->argv
[i
] = newval
;
1234 incrRefCount(newval
);
1235 decrRefCount(oldval
);
1237 /* If this is the command name make sure to fix c->cmd. */
1239 c
->cmd
= lookupCommand(c
->argv
[0]->ptr
);
1240 redisAssertWithInfo(c
,NULL
,c
->cmd
!= NULL
);
1244 /* This function returns the number of bytes that Redis is virtually
1245 * using to store the reply still not read by the client.
1246 * It is "virtual" since the reply output list may contain objects that
1247 * are shared and are not really using additional memory.
1249 * The function returns the total sum of the length of all the objects
1250 * stored in the output list, plus the memory used to allocate every
1251 * list node. The static reply buffer is not taken into account since it
1252 * is allocated anyway.
1254 * Note: this function is very fast so can be called as many time as
1255 * the caller wishes. The main usage of this function currently is
1256 * enforcing the client output length limits. */
1257 unsigned long getClientOutputBufferMemoryUsage(redisClient
*c
) {
1258 unsigned long list_item_size
= sizeof(listNode
)+sizeof(robj
);
1260 return c
->reply_bytes
+ (list_item_size
*listLength(c
->reply
));
1263 /* Get the class of a client, used in order to enforce limits to different
1264 * classes of clients.
1266 * The function will return one of the following:
1267 * REDIS_CLIENT_LIMIT_CLASS_NORMAL -> Normal client
1268 * REDIS_CLIENT_LIMIT_CLASS_SLAVE -> Slave or client executing MONITOR command
1269 * REDIS_CLIENT_LIMIT_CLASS_PUBSUB -> Client subscribed to Pub/Sub channels
1271 int getClientLimitClass(redisClient
*c
) {
1272 if (c
->flags
& REDIS_SLAVE
) return REDIS_CLIENT_LIMIT_CLASS_SLAVE
;
1273 if (dictSize(c
->pubsub_channels
) || listLength(c
->pubsub_patterns
))
1274 return REDIS_CLIENT_LIMIT_CLASS_PUBSUB
;
1275 return REDIS_CLIENT_LIMIT_CLASS_NORMAL
;
1278 int getClientLimitClassByName(char *name
) {
1279 if (!strcasecmp(name
,"normal")) return REDIS_CLIENT_LIMIT_CLASS_NORMAL
;
1280 else if (!strcasecmp(name
,"slave")) return REDIS_CLIENT_LIMIT_CLASS_SLAVE
;
1281 else if (!strcasecmp(name
,"pubsub")) return REDIS_CLIENT_LIMIT_CLASS_PUBSUB
;
1285 char *getClientLimitClassName(int class) {
1287 case REDIS_CLIENT_LIMIT_CLASS_NORMAL
: return "normal";
1288 case REDIS_CLIENT_LIMIT_CLASS_SLAVE
: return "slave";
1289 case REDIS_CLIENT_LIMIT_CLASS_PUBSUB
: return "pubsub";
1290 default: return NULL
;
1294 /* The function checks if the client reached output buffer soft or hard
1295 * limit, and also update the state needed to check the soft limit as
1298 * Return value: non-zero if the client reached the soft or the hard limit.
1299 * Otherwise zero is returned. */
1300 int checkClientOutputBufferLimits(redisClient
*c
) {
1301 int soft
= 0, hard
= 0, class;
1302 unsigned long used_mem
= getClientOutputBufferMemoryUsage(c
);
1304 class = getClientLimitClass(c
);
1305 if (server
.client_obuf_limits
[class].hard_limit_bytes
&&
1306 used_mem
>= server
.client_obuf_limits
[class].hard_limit_bytes
)
1308 if (server
.client_obuf_limits
[class].soft_limit_bytes
&&
1309 used_mem
>= server
.client_obuf_limits
[class].soft_limit_bytes
)
1312 /* We need to check if the soft limit is reached continuously for the
1313 * specified amount of seconds. */
1315 if (c
->obuf_soft_limit_reached_time
== 0) {
1316 c
->obuf_soft_limit_reached_time
= server
.unixtime
;
1317 soft
= 0; /* First time we see the soft limit reached */
1319 time_t elapsed
= server
.unixtime
- c
->obuf_soft_limit_reached_time
;
1322 server
.client_obuf_limits
[class].soft_limit_seconds
) {
1323 soft
= 0; /* The client still did not reached the max number of
1324 seconds for the soft limit to be considered
1329 c
->obuf_soft_limit_reached_time
= 0;
1331 return soft
|| hard
;
1334 /* Asynchronously close a client if soft or hard limit is reached on the
1335 * output buffer size. The caller can check if the client will be closed
1336 * checking if the client REDIS_CLOSE_ASAP flag is set.
1338 * Note: we need to close the client asynchronously because this function is
1339 * called from contexts where the client can't be freed safely, i.e. from the
1340 * lower level functions pushing data inside the client output buffers. */
1341 void asyncCloseClientOnOutputBufferLimitReached(redisClient
*c
) {
1342 redisAssert(c
->reply_bytes
< ULONG_MAX
-(1024*64));
1343 if (c
->reply_bytes
== 0 || c
->flags
& REDIS_CLOSE_ASAP
) return;
1344 if (checkClientOutputBufferLimits(c
)) {
1345 sds client
= getClientInfoString(c
);
1348 redisLog(REDIS_WARNING
,"Client %s scheduled to be closed ASAP for overcoming of output buffer limits.", client
);
1353 /* Helper function used by freeMemoryIfNeeded() in order to flush slaves
1354 * output buffers without returning control to the event loop. */
1355 void flushSlavesOutputBuffers(void) {
1359 listRewind(server
.slaves
,&li
);
1360 while((ln
= listNext(&li
))) {
1361 redisClient
*slave
= listNodeValue(ln
);
1364 events
= aeGetFileEvents(server
.el
,slave
->fd
);
1365 if (events
& AE_WRITABLE
&&
1366 slave
->replstate
== REDIS_REPL_ONLINE
&&
1367 listLength(slave
->reply
))
1369 sendReplyToClient(server
.el
,slave
->fd
,slave
,0);