c->reply = listCreate();
listSetFreeMethod(c->reply,decrRefCount);
listSetDupMethod(c->reply,dupClientReplyValue);
- c->blocking_keys = NULL;
- c->blocking_keys_num = 0;
+ c->bpop.keys = NULL;
+ c->bpop.count = 0;
+ c->bpop.timeout = 0;
+ c->bpop.target = NULL;
c->io_keys = listCreate();
c->watched_keys = listCreate();
listSetFreeMethod(c->io_keys,decrRefCount);
if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK)
_addReplyObjectToList(c,obj);
} else {
+ /* FIXME: convert the long into string and use _addReplyToBuffer()
+ * instead of calling getDecodedObject. As this place in the
+ * code is too performance critical. */
obj = getDecodedObject(obj);
if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK)
_addReplyObjectToList(c,obj);
}
}
+/* Add a duble as a bulk reply */
void addReplyDouble(redisClient *c, double d) {
char dbuf[128], sbuf[128];
int dlen, slen;
addReplyString(c,sbuf,slen);
}
+/* Add a long long as integer reply or bulk len / multi bulk count.
+ * Basically this is used to output <prefix><long long><crlf>. */
void _addReplyLongLong(redisClient *c, long long ll, char prefix) {
char buf[128];
int len;
_addReplyLongLong(c,length,'*');
}
+/* Create the length prefix of a bulk reply, example: $2234 */
void addReplyBulkLen(redisClient *c, robj *obj) {
size_t len;
_addReplyLongLong(c,len,'$');
}
+/* Add a Redis Object as a bulk reply */
void addReplyBulk(redisClient *c, robj *obj) {
addReplyBulkLen(c,obj);
addReply(c,obj);
addReply(c,shared.crlf);
}
-/* In the CONFIG command we need to add vanilla C string as bulk replies */
+/* Add a C buffer as bulk reply */
+void addReplyBulkCBuffer(redisClient *c, void *p, size_t len) {
+ _addReplyLongLong(c,len,'$');
+ addReplyString(c,p,len);
+ addReply(c,shared.crlf);
+}
+
+/* Add a C nul term string as bulk reply */
void addReplyBulkCString(redisClient *c, char *s) {
if (s == NULL) {
addReply(c,shared.nullbulk);
} else {
- robj *o = createStringObject(s,strlen(s));
- addReplyBulk(c,o);
- decrRefCount(o);
+ addReplyBulkCBuffer(c,s,strlen(s));
}
}
-void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
- int cport, cfd;
- char cip[128];
- redisClient *c;
- REDIS_NOTUSED(el);
- REDIS_NOTUSED(mask);
- REDIS_NOTUSED(privdata);
+/* Add a long long as a bulk reply */
+void addReplyBulkLongLong(redisClient *c, long long ll) {
+ char buf[64];
+ int len;
- cfd = anetAccept(server.neterr, fd, cip, &cport);
- if (cfd == AE_ERR) {
- redisLog(REDIS_VERBOSE,"Accepting client connection: %s", server.neterr);
- return;
- }
- redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
- if ((c = createClient(cfd)) == NULL) {
+ len = ll2string(buf,64,ll);
+ addReplyBulkCBuffer(c,buf,len);
+}
+
+static void acceptCommonHandler(int fd) {
+ redisClient *c;
+ if ((c = createClient(fd)) == NULL) {
redisLog(REDIS_WARNING,"Error allocating resoures for the client");
- close(cfd); /* May be already closed, just ingore errors */
+ close(fd); /* May be already closed, just ingore errors */
return;
}
/* If maxclient directive is set and this is one client more... close the
server.stat_numconnections++;
}
+void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
+ int cport, cfd;
+ char cip[128];
+ REDIS_NOTUSED(el);
+ REDIS_NOTUSED(mask);
+ REDIS_NOTUSED(privdata);
+
+ cfd = anetTcpAccept(server.neterr, fd, cip, &cport);
+ if (cfd == AE_ERR) {
+ redisLog(REDIS_VERBOSE,"Accepting client connection: %s", server.neterr);
+ return;
+ }
+ redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
+ acceptCommonHandler(cfd);
+}
+
+void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
+ int cfd;
+ REDIS_NOTUSED(el);
+ REDIS_NOTUSED(mask);
+ REDIS_NOTUSED(privdata);
+
+ cfd = anetUnixAccept(server.neterr, fd);
+ if (cfd == AE_ERR) {
+ redisLog(REDIS_VERBOSE,"Accepting client connection: %s", server.neterr);
+ return;
+ }
+ redisLog(REDIS_VERBOSE,"Accepted connection to %s", server.unixsocket);
+ acceptCommonHandler(cfd);
+}
+
+
static void freeClientArgv(redisClient *c) {
int j;
for (j = 0; j < c->argc; j++)
/* Case 2: we lost the connection with the master. */
if (c->flags & REDIS_MASTER) {
server.master = NULL;
+ /* FIXME */
server.replstate = REDIS_REPL_CONNECT;
/* Since we lost the connection with the master, we should also
* close the connection with all our slaves if we have any, so
redisLog(REDIS_VERBOSE,"Closing idle client");
freeClient(c);
} else if (c->flags & REDIS_BLOCKED) {
- if (c->blockingto != 0 && c->blockingto < now) {
+ if (c->bpop.timeout != 0 && c->bpop.timeout < now) {
addReply(c,shared.nullmultibulk);
unblockClientWaitingData(c);
}
bulklen = strtol(c->querybuf+pos+1,&eptr,10);
tolerr = (eptr[0] != '\r');
if (tolerr || bulklen == LONG_MIN || bulklen == LONG_MAX ||
- bulklen < 0 || bulklen > 1024*1024*1024)
+ bulklen < 0 || bulklen > 512*1024*1024)
{
addReplyError(c,"Protocol error: invalid bulk length");
setProtocolError(c,pos);