/* Set the event loop to listen for write events on the client's socket.
* Typically gets called every time a reply is built. */
int _installWriteEvent(redisClient *c) {
- /* When CLOSE_AFTER_REPLY is set, no more replies may be added! */
- redisAssert(!(c->flags & REDIS_CLOSE_AFTER_REPLY));
-
if (c->fd <= 0) return REDIS_ERR;
if (c->bufpos == 0 && listLength(c->reply) == 0 &&
(c->replstate == REDIS_REPL_NONE ||
return listNodeValue(ln);
}
+/* -----------------------------------------------------------------------------
+ * Low level functions to add more data to output buffers.
+ * -------------------------------------------------------------------------- */
+
int _addReplyToBuffer(redisClient *c, char *s, size_t len) {
size_t available = sizeof(c->buf)-c->bufpos;
+ if (c->flags & REDIS_CLOSE_AFTER_REPLY) return REDIS_OK;
+
/* If there already are entries in the reply list, we cannot
* add anything more to the static buffer. */
if (listLength(c->reply) > 0) return REDIS_ERR;
void _addReplyObjectToList(redisClient *c, robj *o) {
robj *tail;
+
+ if (c->flags & REDIS_CLOSE_AFTER_REPLY) return;
+
if (listLength(c->reply) == 0) {
incrRefCount(o);
listAddNodeTail(c->reply,o);
* needed it will be free'd, otherwise it ends up in a robj. */
void _addReplySdsToList(redisClient *c, sds s) {
robj *tail;
+
+ if (c->flags & REDIS_CLOSE_AFTER_REPLY) {
+ sdsfree(s);
+ return;
+ }
+
if (listLength(c->reply) == 0) {
listAddNodeTail(c->reply,createObject(REDIS_STRING,s));
} else {
void _addReplyStringToList(redisClient *c, char *s, size_t len) {
robj *tail;
+
+ if (c->flags & REDIS_CLOSE_AFTER_REPLY) return;
+
if (listLength(c->reply) == 0) {
listAddNodeTail(c->reply,createStringObject(s,len));
} else {
}
}
+/* -----------------------------------------------------------------------------
+ * Higher level functions to queue data on the client output buffer.
+ * The following functions are the ones that commands implementations will call.
+ * -------------------------------------------------------------------------- */
+
void addReply(redisClient *c, robj *obj) {
if (_installWriteEvent(c) != REDIS_OK) return;
- redisAssert(!server.ds_enabled || obj->storage != REDIS_DS_SAVING);
/* This is an important place where we can avoid copy-on-write
* when there is a saving child running, avoiding touching the
ln = listSearchKey(server.clients,c);
redisAssert(ln != NULL);
listDelNode(server.clients,ln);
+ /* When client was just unblocked because of a blocking operation,
+ * remove it from the list with unblocked clients. */
+ if (c->flags & REDIS_UNBLOCKED) {
+ ln = listSearchKey(server.unblocked_clients,c);
+ redisAssert(ln != NULL);
+ listDelNode(server.unblocked_clients,ln);
+ }
/* Remove from the list of clients waiting for swapped keys, or ready
* to be restarted, but not yet woken up again. */
if (c->flags & REDIS_IO_WAIT) {
/* Case 2: we lost the connection with the master. */
if (c->flags & REDIS_MASTER) {
server.master = NULL;
- /* FIXME */
server.replstate = REDIS_REPL_CONNECT;
/* Since we lost the connection with the master, we should also
* close the connection with all our slaves if we have any, so
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
- /* Use writev() if we have enough buffers to send */
- if (!server.glueoutputbuf &&
- listLength(c->reply) > REDIS_WRITEV_THRESHOLD &&
- !(c->flags & REDIS_MASTER))
- {
- sendReplyToClientWritev(el, fd, privdata, mask);
- return;
- }
-
while(c->bufpos > 0 || listLength(c->reply)) {
if (c->bufpos > 0) {
if (c->flags & REDIS_MASTER) {
}
}
-void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask)
-{
- redisClient *c = privdata;
- int nwritten = 0, totwritten = 0, objlen, willwrite;
- robj *o;
- struct iovec iov[REDIS_WRITEV_IOVEC_COUNT];
- int offset, ion = 0;
- REDIS_NOTUSED(el);
- REDIS_NOTUSED(mask);
-
- listNode *node;
- while (listLength(c->reply)) {
- offset = c->sentlen;
- ion = 0;
- willwrite = 0;
-
- /* fill-in the iov[] array */
- for(node = listFirst(c->reply); node; node = listNextNode(node)) {
- o = listNodeValue(node);
- objlen = sdslen(o->ptr);
-
- if (totwritten + objlen - offset > REDIS_MAX_WRITE_PER_EVENT)
- break;
-
- if(ion == REDIS_WRITEV_IOVEC_COUNT)
- break; /* no more iovecs */
-
- iov[ion].iov_base = ((char*)o->ptr) + offset;
- iov[ion].iov_len = objlen - offset;
- willwrite += objlen - offset;
- offset = 0; /* just for the first item */
- ion++;
- }
-
- if(willwrite == 0)
- break;
-
- /* write all collected blocks at once */
- if((nwritten = writev(fd, iov, ion)) < 0) {
- if (errno != EAGAIN) {
- redisLog(REDIS_VERBOSE,
- "Error writing to client: %s", strerror(errno));
- freeClient(c);
- return;
- }
- break;
- }
-
- totwritten += nwritten;
- offset = c->sentlen;
-
- /* remove written robjs from c->reply */
- while (nwritten && listLength(c->reply)) {
- o = listNodeValue(listFirst(c->reply));
- objlen = sdslen(o->ptr);
-
- if(nwritten >= objlen - offset) {
- listDelNode(c->reply, listFirst(c->reply));
- nwritten -= objlen - offset;
- c->sentlen = 0;
- } else {
- /* partial write */
- c->sentlen += nwritten;
- break;
- }
- offset = 0;
- }
- }
-
- if (totwritten > 0)
- c->lastinteraction = time(NULL);
-
- if (listLength(c->reply) == 0) {
- c->sentlen = 0;
- aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
- }
-}
-
/* resetClient prepare the client to process the next command */
void resetClient(redisClient *c) {
freeClientArgv(c);
}
processInputBuffer(c);
}
+
+void getClientsMaxBuffers(unsigned long *longest_output_list,
+ unsigned long *biggest_input_buffer) {
+ redisClient *c;
+ listNode *ln;
+ listIter li;
+ unsigned long lol = 0, bib = 0;
+
+ listRewind(server.clients,&li);
+ while ((ln = listNext(&li)) != NULL) {
+ c = listNodeValue(ln);
+
+ if (listLength(c->reply) > lol) lol = listLength(c->reply);
+ if (sdslen(c->querybuf) > bib) bib = sdslen(c->querybuf);
+ }
+ *longest_output_list = lol;
+ *biggest_input_buffer = bib;
+}
+