]> git.saurik.com Git - redis.git/blobdiff - src/networking.c
added diskstore.c, currently just a stub
[redis.git] / src / networking.c
index 6181799aabf191660858356dcd8d773add450374..26ee46f63cfbbfa1566321ac3ccd60a38289179c 100644 (file)
@@ -41,8 +41,10 @@ redisClient *createClient(int fd) {
     c->reply = listCreate();
     listSetFreeMethod(c->reply,decrRefCount);
     listSetDupMethod(c->reply,dupClientReplyValue);
     c->reply = listCreate();
     listSetFreeMethod(c->reply,decrRefCount);
     listSetDupMethod(c->reply,dupClientReplyValue);
-    c->blocking_keys = NULL;
-    c->blocking_keys_num = 0;
+    c->bpop.keys = NULL;
+    c->bpop.count = 0;
+    c->bpop.timeout = 0;
+    c->bpop.target = NULL;
     c->io_keys = listCreate();
     c->watched_keys = listCreate();
     listSetFreeMethod(c->io_keys,decrRefCount);
     c->io_keys = listCreate();
     c->watched_keys = listCreate();
     listSetFreeMethod(c->io_keys,decrRefCount);
@@ -178,6 +180,9 @@ void addReply(redisClient *c, robj *obj) {
         if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK)
             _addReplyObjectToList(c,obj);
     } else {
         if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK)
             _addReplyObjectToList(c,obj);
     } else {
+        /* FIXME: convert the long into string and use _addReplyToBuffer()
+         * instead of calling getDecodedObject. As this place in the
+         * code is too performance critical. */
         obj = getDecodedObject(obj);
         if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK)
             _addReplyObjectToList(c,obj);
         obj = getDecodedObject(obj);
         if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK)
             _addReplyObjectToList(c,obj);
@@ -275,6 +280,7 @@ void setDeferredMultiBulkLength(redisClient *c, void *node, long length) {
     }
 }
 
     }
 }
 
+/* Add a duble as a bulk reply */
 void addReplyDouble(redisClient *c, double d) {
     char dbuf[128], sbuf[128];
     int dlen, slen;
 void addReplyDouble(redisClient *c, double d) {
     char dbuf[128], sbuf[128];
     int dlen, slen;
@@ -283,6 +289,8 @@ void addReplyDouble(redisClient *c, double d) {
     addReplyString(c,sbuf,slen);
 }
 
     addReplyString(c,sbuf,slen);
 }
 
+/* Add a long long as integer reply or bulk len / multi bulk count.
+ * Basically this is used to output <prefix><long long><crlf>. */
 void _addReplyLongLong(redisClient *c, long long ll, char prefix) {
     char buf[128];
     int len;
 void _addReplyLongLong(redisClient *c, long long ll, char prefix) {
     char buf[128];
     int len;
@@ -301,6 +309,7 @@ void addReplyMultiBulkLen(redisClient *c, long length) {
     _addReplyLongLong(c,length,'*');
 }
 
     _addReplyLongLong(c,length,'*');
 }
 
+/* Create the length prefix of a bulk reply, example: $2234 */
 void addReplyBulkLen(redisClient *c, robj *obj) {
     size_t len;
 
 void addReplyBulkLen(redisClient *c, robj *obj) {
     size_t len;
 
@@ -322,40 +331,43 @@ void addReplyBulkLen(redisClient *c, robj *obj) {
     _addReplyLongLong(c,len,'$');
 }
 
     _addReplyLongLong(c,len,'$');
 }
 
+/* Add a Redis Object as a bulk reply */
 void addReplyBulk(redisClient *c, robj *obj) {
     addReplyBulkLen(c,obj);
     addReply(c,obj);
     addReply(c,shared.crlf);
 }
 
 void addReplyBulk(redisClient *c, robj *obj) {
     addReplyBulkLen(c,obj);
     addReply(c,obj);
     addReply(c,shared.crlf);
 }
 
-/* In the CONFIG command we need to add vanilla C string as bulk replies */
+/* Add a C buffer as bulk reply */
+void addReplyBulkCBuffer(redisClient *c, void *p, size_t len) {
+    _addReplyLongLong(c,len,'$');
+    addReplyString(c,p,len);
+    addReply(c,shared.crlf);
+}
+
+/* Add a C nul term string as bulk reply */
 void addReplyBulkCString(redisClient *c, char *s) {
     if (s == NULL) {
         addReply(c,shared.nullbulk);
     } else {
 void addReplyBulkCString(redisClient *c, char *s) {
     if (s == NULL) {
         addReply(c,shared.nullbulk);
     } else {
-        robj *o = createStringObject(s,strlen(s));
-        addReplyBulk(c,o);
-        decrRefCount(o);
+        addReplyBulkCBuffer(c,s,strlen(s));
     }
 }
 
     }
 }
 
-void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
-    int cport, cfd;
-    char cip[128];
-    redisClient *c;
-    REDIS_NOTUSED(el);
-    REDIS_NOTUSED(mask);
-    REDIS_NOTUSED(privdata);
+/* Add a long long as a bulk reply */
+void addReplyBulkLongLong(redisClient *c, long long ll) {
+    char buf[64];
+    int len;
 
 
-    cfd = anetAccept(server.neterr, fd, cip, &cport);
-    if (cfd == AE_ERR) {
-        redisLog(REDIS_VERBOSE,"Accepting client connection: %s", server.neterr);
-        return;
-    }
-    redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
-    if ((c = createClient(cfd)) == NULL) {
+    len = ll2string(buf,64,ll);
+    addReplyBulkCBuffer(c,buf,len);
+}
+
+static void acceptCommonHandler(int fd) {
+    redisClient *c;
+    if ((c = createClient(fd)) == NULL) {
         redisLog(REDIS_WARNING,"Error allocating resoures for the client");
         redisLog(REDIS_WARNING,"Error allocating resoures for the client");
-        close(cfd); /* May be already closed, just ingore errors */
+        close(fd); /* May be already closed, just ingore errors */
         return;
     }
     /* If maxclient directive is set and this is one client more... close the
         return;
     }
     /* If maxclient directive is set and this is one client more... close the
@@ -375,6 +387,38 @@ void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
     server.stat_numconnections++;
 }
 
     server.stat_numconnections++;
 }
 
+void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
+    int cport, cfd;
+    char cip[128];
+    REDIS_NOTUSED(el);
+    REDIS_NOTUSED(mask);
+    REDIS_NOTUSED(privdata);
+
+    cfd = anetTcpAccept(server.neterr, fd, cip, &cport);
+    if (cfd == AE_ERR) {
+        redisLog(REDIS_VERBOSE,"Accepting client connection: %s", server.neterr);
+        return;
+    }
+    redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
+    acceptCommonHandler(cfd);
+}
+
+void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
+    int cfd;
+    REDIS_NOTUSED(el);
+    REDIS_NOTUSED(mask);
+    REDIS_NOTUSED(privdata);
+
+    cfd = anetUnixAccept(server.neterr, fd);
+    if (cfd == AE_ERR) {
+        redisLog(REDIS_VERBOSE,"Accepting client connection: %s", server.neterr);
+        return;
+    }
+    redisLog(REDIS_VERBOSE,"Accepted connection to %s", server.unixsocket);
+    acceptCommonHandler(cfd);
+}
+
+
 static void freeClientArgv(redisClient *c) {
     int j;
     for (j = 0; j < c->argc; j++)
 static void freeClientArgv(redisClient *c) {
     int j;
     for (j = 0; j < c->argc; j++)
@@ -447,6 +491,7 @@ void freeClient(redisClient *c) {
     /* Case 2: we lost the connection with the master. */
     if (c->flags & REDIS_MASTER) {
         server.master = NULL;
     /* Case 2: we lost the connection with the master. */
     if (c->flags & REDIS_MASTER) {
         server.master = NULL;
+        /* FIXME */
         server.replstate = REDIS_REPL_CONNECT;
         /* Since we lost the connection with the master, we should also
          * close the connection with all our slaves if we have any, so
         server.replstate = REDIS_REPL_CONNECT;
         /* Since we lost the connection with the master, we should also
          * close the connection with all our slaves if we have any, so
@@ -656,7 +701,7 @@ void closeTimedoutClients(void) {
             redisLog(REDIS_VERBOSE,"Closing idle client");
             freeClient(c);
         } else if (c->flags & REDIS_BLOCKED) {
             redisLog(REDIS_VERBOSE,"Closing idle client");
             freeClient(c);
         } else if (c->flags & REDIS_BLOCKED) {
-            if (c->blockingto != 0 && c->blockingto < now) {
+            if (c->bpop.timeout != 0 && c->bpop.timeout < now) {
                 addReply(c,shared.nullmultibulk);
                 unblockClientWaitingData(c);
             }
                 addReply(c,shared.nullmultibulk);
                 unblockClientWaitingData(c);
             }
@@ -759,7 +804,7 @@ int processMultibulkBuffer(redisClient *c) {
                 bulklen = strtol(c->querybuf+pos+1,&eptr,10);
                 tolerr = (eptr[0] != '\r');
                 if (tolerr || bulklen == LONG_MIN || bulklen == LONG_MAX ||
                 bulklen = strtol(c->querybuf+pos+1,&eptr,10);
                 tolerr = (eptr[0] != '\r');
                 if (tolerr || bulklen == LONG_MIN || bulklen == LONG_MAX ||
-                    bulklen < 0 || bulklen > 1024*1024*1024)
+                    bulklen < 0 || bulklen > 512*1024*1024)
                 {
                     addReplyError(c,"Protocol error: invalid bulk length");
                     setProtocolError(c,pos);
                 {
                     addReplyError(c,"Protocol error: invalid bulk length");
                     setProtocolError(c,pos);