anetNonBlock(NULL,fd);
anetTcpNoDelay(NULL,fd);
- if (!c) return NULL;
if (aeCreateFileEvent(server.el,fd,AE_READABLE,
readQueryFromClient, c) == AE_ERR)
{
/* Set the event loop to listen for write events on the client's socket.
* Typically gets called every time a reply is built. */
int _installWriteEvent(redisClient *c) {
- /* When CLOSE_AFTER_REPLY is set, no more replies may be added! */
- redisAssert(!(c->flags & REDIS_CLOSE_AFTER_REPLY));
-
if (c->fd <= 0) return REDIS_ERR;
if (c->bufpos == 0 && listLength(c->reply) == 0 &&
(c->replstate == REDIS_REPL_NONE ||
return listNodeValue(ln);
}
+/* -----------------------------------------------------------------------------
+ * Low level functions to add more data to output buffers.
+ * -------------------------------------------------------------------------- */
+
int _addReplyToBuffer(redisClient *c, char *s, size_t len) {
size_t available = sizeof(c->buf)-c->bufpos;
+ if (c->flags & REDIS_CLOSE_AFTER_REPLY) return REDIS_OK;
+
/* If there already are entries in the reply list, we cannot
* add anything more to the static buffer. */
if (listLength(c->reply) > 0) return REDIS_ERR;
void _addReplyObjectToList(redisClient *c, robj *o) {
robj *tail;
+
+ if (c->flags & REDIS_CLOSE_AFTER_REPLY) return;
+
if (listLength(c->reply) == 0) {
incrRefCount(o);
listAddNodeTail(c->reply,o);
* needed it will be free'd, otherwise it ends up in a robj. */
void _addReplySdsToList(redisClient *c, sds s) {
robj *tail;
+
+ if (c->flags & REDIS_CLOSE_AFTER_REPLY) {
+ sdsfree(s);
+ return;
+ }
+
if (listLength(c->reply) == 0) {
listAddNodeTail(c->reply,createObject(REDIS_STRING,s));
} else {
void _addReplyStringToList(redisClient *c, char *s, size_t len) {
robj *tail;
+
+ if (c->flags & REDIS_CLOSE_AFTER_REPLY) return;
+
if (listLength(c->reply) == 0) {
listAddNodeTail(c->reply,createStringObject(s,len));
} else {
}
}
+/* -----------------------------------------------------------------------------
+ * Higher level functions to queue data on the client output buffer.
+ * The following functions are the ones that commands implementations will call.
+ * -------------------------------------------------------------------------- */
+
void addReply(redisClient *c, robj *obj) {
if (_installWriteEvent(c) != REDIS_OK) return;
- redisAssert(!server.vm_enabled || obj->storage == REDIS_VM_MEMORY);
/* This is an important place where we can avoid copy-on-write
* when there is a saving child running, avoiding touching the
}
void addReplyLongLong(redisClient *c, long long ll) {
- _addReplyLongLong(c,ll,':');
+ if (ll == 0)
+ addReply(c,shared.czero);
+ else if (ll == 1)
+ addReply(c,shared.cone);
+ else
+ _addReplyLongLong(c,ll,':');
}
void addReplyMultiBulkLen(redisClient *c, long length) {
ln = listSearchKey(server.clients,c);
redisAssert(ln != NULL);
listDelNode(server.clients,ln);
+ /* When client was just unblocked because of a blocking operation,
+ * remove it from the list with unblocked clients. */
+ if (c->flags & REDIS_UNBLOCKED) {
+ ln = listSearchKey(server.unblocked_clients,c);
+ redisAssert(ln != NULL);
+ listDelNode(server.unblocked_clients,ln);
+ }
/* Remove from the list of clients waiting for swapped keys, or ready
* to be restarted, but not yet woken up again. */
if (c->flags & REDIS_IO_WAIT) {
- redisAssert(server.vm_enabled);
+ redisAssert(server.ds_enabled);
if (listLength(c->io_keys) == 0) {
ln = listSearchKey(server.io_ready_clients,c);
dontWaitForSwappedKey(c,ln->value);
}
}
- server.vm_blocked_clients--;
+ server.cache_blocked_clients--;
}
listRelease(c->io_keys);
/* Master/slave cleanup.
/* Case 2: we lost the connection with the master. */
if (c->flags & REDIS_MASTER) {
server.master = NULL;
- /* FIXME */
server.replstate = REDIS_REPL_CONNECT;
/* Since we lost the connection with the master, we should also
* close the connection with all our slaves if we have any, so
* when we'll resync with the master the other slaves will sync again
* with us as well. Note that also when the slave is not connected
- * to the master it will keep refusing connections by other slaves. */
- while (listLength(server.slaves)) {
- ln = listFirst(server.slaves);
- freeClient((redisClient*)ln->value);
+ * to the master it will keep refusing connections by other slaves.
+ *
+ * We do this only if server.masterhost != NULL. If it is NULL this
+ * means the user called SLAVEOF NO ONE and we are freeing our
+ * link with the master, so no need to close link with slaves. */
+ if (server.masterhost != NULL) {
+ while (listLength(server.slaves)) {
+ ln = listFirst(server.slaves);
+ freeClient((redisClient*)ln->value);
+ }
}
}
/* Release memory */
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
- /* Use writev() if we have enough buffers to send */
- if (!server.glueoutputbuf &&
- listLength(c->reply) > REDIS_WRITEV_THRESHOLD &&
- !(c->flags & REDIS_MASTER))
- {
- sendReplyToClientWritev(el, fd, privdata, mask);
- return;
- }
-
while(c->bufpos > 0 || listLength(c->reply)) {
if (c->bufpos > 0) {
if (c->flags & REDIS_MASTER) {
}
}
-void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask)
-{
- redisClient *c = privdata;
- int nwritten = 0, totwritten = 0, objlen, willwrite;
- robj *o;
- struct iovec iov[REDIS_WRITEV_IOVEC_COUNT];
- int offset, ion = 0;
- REDIS_NOTUSED(el);
- REDIS_NOTUSED(mask);
-
- listNode *node;
- while (listLength(c->reply)) {
- offset = c->sentlen;
- ion = 0;
- willwrite = 0;
-
- /* fill-in the iov[] array */
- for(node = listFirst(c->reply); node; node = listNextNode(node)) {
- o = listNodeValue(node);
- objlen = sdslen(o->ptr);
-
- if (totwritten + objlen - offset > REDIS_MAX_WRITE_PER_EVENT)
- break;
-
- if(ion == REDIS_WRITEV_IOVEC_COUNT)
- break; /* no more iovecs */
-
- iov[ion].iov_base = ((char*)o->ptr) + offset;
- iov[ion].iov_len = objlen - offset;
- willwrite += objlen - offset;
- offset = 0; /* just for the first item */
- ion++;
- }
-
- if(willwrite == 0)
- break;
-
- /* write all collected blocks at once */
- if((nwritten = writev(fd, iov, ion)) < 0) {
- if (errno != EAGAIN) {
- redisLog(REDIS_VERBOSE,
- "Error writing to client: %s", strerror(errno));
- freeClient(c);
- return;
- }
- break;
- }
-
- totwritten += nwritten;
- offset = c->sentlen;
-
- /* remove written robjs from c->reply */
- while (nwritten && listLength(c->reply)) {
- o = listNodeValue(listFirst(c->reply));
- objlen = sdslen(o->ptr);
-
- if(nwritten >= objlen - offset) {
- listDelNode(c->reply, listFirst(c->reply));
- nwritten -= objlen - offset;
- c->sentlen = 0;
- } else {
- /* partial write */
- c->sentlen += nwritten;
- break;
- }
- offset = 0;
- }
- }
-
- if (totwritten > 0)
- c->lastinteraction = time(NULL);
-
- if (listLength(c->reply) == 0) {
- c->sentlen = 0;
- aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
- }
-}
-
/* resetClient prepare the client to process the next command */
void resetClient(redisClient *c) {
freeClientArgv(c);
bulklen = strtol(c->querybuf+pos+1,&eptr,10);
tolerr = (eptr[0] != '\r');
if (tolerr || bulklen == LONG_MIN || bulklen == LONG_MAX ||
- bulklen < 0 || bulklen > 1024*1024*1024)
+ bulklen < 0 || bulklen > 512*1024*1024)
{
addReplyError(c,"Protocol error: invalid bulk length");
setProtocolError(c,pos);
}
processInputBuffer(c);
}
+
+void getClientsMaxBuffers(unsigned long *longest_output_list,
+ unsigned long *biggest_input_buffer) {
+ redisClient *c;
+ listNode *ln;
+ listIter li;
+ unsigned long lol = 0, bib = 0;
+
+ listRewind(server.clients,&li);
+ while ((ln = listNext(&li)) != NULL) {
+ c = listNodeValue(ln);
+
+ if (listLength(c->reply) > lol) lol = listLength(c->reply);
+ if (sdslen(c->querybuf) > bib) bib = sdslen(c->querybuf);
+ }
+ *longest_output_list = lol;
+ *biggest_input_buffer = bib;
+}
+
+void clientCommand(redisClient *c) {
+ listNode *ln;
+ listIter li;
+ redisClient *client;
+
+ if (!strcasecmp(c->argv[1]->ptr,"list") && c->argc == 2) {
+ sds o = sdsempty();
+ time_t now = time(NULL);
+
+ listRewind(server.clients,&li);
+ while ((ln = listNext(&li)) != NULL) {
+ char ip[32], flags[16], *p;
+ int port;
+
+ client = listNodeValue(ln);
+ if (anetPeerToString(client->fd,ip,&port) == -1) continue;
+ p = flags;
+ if (client->flags & REDIS_SLAVE) {
+ if (client->flags & REDIS_MONITOR)
+ *p++ = 'O';
+ else
+ *p++ = 'S';
+ }
+ if (client->flags & REDIS_MASTER) *p++ = 'M';
+ if (p == flags) *p++ = 'N';
+ if (client->flags & REDIS_MULTI) *p++ = 'x';
+ if (client->flags & REDIS_BLOCKED) *p++ = 'b';
+ if (client->flags & REDIS_IO_WAIT) *p++ = 'i';
+ if (client->flags & REDIS_DIRTY_CAS) *p++ = 'd';
+ if (client->flags & REDIS_CLOSE_AFTER_REPLY) *p++ = 'c';
+ if (client->flags & REDIS_UNBLOCKED) *p++ = 'u';
+ *p++ = '\0';
+ o = sdscatprintf(o,
+ "addr=%s:%d fd=%d idle=%ld flags=%s db=%d sub=%d psub=%d\n",
+ ip,port,client->fd,
+ (long)(now - client->lastinteraction),
+ flags,
+ client->db->id,
+ (int) dictSize(client->pubsub_channels),
+ (int) listLength(client->pubsub_patterns));
+ }
+ addReplyBulkCBuffer(c,o,sdslen(o));
+ sdsfree(o);
+ } else if (!strcasecmp(c->argv[1]->ptr,"kill") && c->argc == 3) {
+ listRewind(server.clients,&li);
+ while ((ln = listNext(&li)) != NULL) {
+ char ip[32], addr[64];
+ int port;
+
+ client = listNodeValue(ln);
+ if (anetPeerToString(client->fd,ip,&port) == -1) continue;
+ snprintf(addr,sizeof(addr),"%s:%d",ip,port);
+ if (strcmp(addr,c->argv[2]->ptr) == 0) {
+ addReply(c,shared.ok);
+ if (c == client) {
+ client->flags |= REDIS_CLOSE_AFTER_REPLY;
+ } else {
+ freeClient(client);
+ }
+ return;
+ }
+ }
+ addReplyError(c,"No such client");
+ } else {
+ addReplyError(c, "Syntax error, try CLIENT (LIST | KILL ip:port)");
+ }
+}