]> git.saurik.com Git - redis.git/blobdiff - redis.c
multi bulk input protocol fixed
[redis.git] / redis.c
diff --git a/redis.c b/redis.c
index 2edc186f438e2ae36a299a4870887b563bed0b34..74986528c3573e690dfd158a852d506794ea5249 100644 (file)
--- a/redis.c
+++ b/redis.c
@@ -206,9 +206,10 @@ typedef struct redisClient {
     redisDb *db;
     int dictid;
     sds querybuf;
-    robj **argv;
-    int argc;
+    robj **argv, **mbargv;
+    int argc, mbargc;
     int bulklen;            /* bulk read len. -1 if not in bulk read mode */
+    int multibulk;          /* multi bulk command format active */
     list *reply;
     int sentlen;
     time_t lastinteraction; /* time of the last interaction, used for timeout */
@@ -343,6 +344,7 @@ static int processCommand(redisClient *c);
 static void setupSigSegvAction(void);
 static void rdbRemoveTempFile(pid_t childpid);
 static size_t stringObjectLen(robj *o);
+static void processInputBuffer(redisClient *c);
 
 static void authCommand(redisClient *c);
 static void pingCommand(redisClient *c);
@@ -398,10 +400,13 @@ static void infoCommand(redisClient *c);
 static void mgetCommand(redisClient *c);
 static void monitorCommand(redisClient *c);
 static void expireCommand(redisClient *c);
-static void getSetCommand(redisClient *c);
+static void getsetCommand(redisClient *c);
 static void ttlCommand(redisClient *c);
 static void slaveofCommand(redisClient *c);
 static void debugCommand(redisClient *c);
+static void msetCommand(redisClient *c);
+static void msetnxCommand(redisClient *c);
+
 /*================================= Globals ================================= */
 
 /* Global vars */
@@ -440,7 +445,9 @@ static struct redisCommand cmdTable[] = {
     {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
     {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
     {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
-    {"getset",getSetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
+    {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
+    {"mset",msetCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
+    {"msetnx",msetnxCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
     {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
     {"select",selectCommand,2,REDIS_CMD_INLINE},
     {"move",moveCommand,3,REDIS_CMD_INLINE},
@@ -1169,7 +1176,10 @@ static void freeClientArgv(redisClient *c) {
 
     for (j = 0; j < c->argc; j++)
         decrRefCount(c->argv[j]);
+    for (j = 0; j < c->mbargc; j++)
+        decrRefCount(c->mbargv[j]);
     c->argc = 0;
+    c->mbargc = 0;
 }
 
 static void freeClient(redisClient *c) {
@@ -1197,6 +1207,7 @@ static void freeClient(redisClient *c) {
         server.replstate = REDIS_REPL_CONNECT;
     }
     zfree(c->argv);
+    zfree(c->mbargv);
     zfree(c);
 }
 
@@ -1299,6 +1310,7 @@ static struct redisCommand *lookupCommand(char *name) {
 static void resetClient(redisClient *c) {
     freeClientArgv(c);
     c->bulklen = -1;
+    c->multibulk = 0;
 }
 
 /* If this function gets called we already read a whole
@@ -1316,6 +1328,82 @@ static int processCommand(redisClient *c) {
     /* Free some memory if needed (maxmemory setting) */
     if (server.maxmemory) freeMemoryIfNeeded();
 
+    /* Handle the multi bulk command type. This is an alternative protocol
+     * supported by Redis in order to receive commands that are composed of
+     * multiple binary-safe "bulk" arguments. The latency of processing is
+     * a bit higher but this allows things like multi-sets, so if this
+     * protocol is used only for MSET and similar commands this is a big win. */
+    if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') {
+        c->multibulk = atoi(((char*)c->argv[0]->ptr)+1);
+        if (c->multibulk <= 0) {
+            resetClient(c);
+            return 1;
+        } else {
+            decrRefCount(c->argv[c->argc-1]);
+            c->argc--;
+            return 1;
+        }
+    } else if (c->multibulk) {
+        if (c->bulklen == -1) {
+            if (((char*)c->argv[0]->ptr)[0] != '$') {
+                addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
+                resetClient(c);
+                return 1;
+            } else {
+                int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
+                decrRefCount(c->argv[0]);
+                if (bulklen < 0 || bulklen > 1024*1024*1024) {
+                    c->argc--;
+                    addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
+                    resetClient(c);
+                    return 1;
+                }
+                c->argc--;
+                c->bulklen = bulklen+2; /* add two bytes for CR+LF */
+                /*
+                if (sdslen(c->querybuf) > 0)
+                    processInputBuffer(c);
+                    */
+                return 1;
+            }
+        } else {
+            c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1));
+            c->mbargv[c->mbargc] = c->argv[0];
+            c->mbargc++;
+            c->argc--;
+            c->multibulk--;
+            if (c->multibulk == 0) {
+                robj **auxargv;
+                int auxargc;
+
+                /* Here we need to swap the multi-bulk argc/argv with the
+                 * normal argc/argv of the client structure. */
+                auxargv = c->argv;
+                c->argv = c->mbargv;
+                c->mbargv = auxargv;
+
+                auxargc = c->argc;
+                c->argc = c->mbargc;
+                c->mbargc = auxargc;
+
+                /* We need to set bulklen to something different than -1
+                 * in order for the code below to process the command without
+                 * to try to read the last argument of a bulk command as
+                 * a special argument. */
+                c->bulklen = 0;
+                /* continue below and process the command */
+            } else {
+                c->bulklen = -1;
+                /*
+                if (sdslen(c->querybuf) > 0)
+                    processInputBuffer(c);
+                    */
+                return 1;
+            }
+        }
+    }
+    /* -- end of multi bulk commands processing -- */
+
     /* The QUIT command is handled as a special case. Normal command
      * procs are unable to close the client connection safely */
     if (!strcasecmp(c->argv[0]->ptr,"quit")) {
@@ -1463,34 +1551,7 @@ static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int di
     if (outv != static_outv) zfree(outv);
 }
 
-static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
-    redisClient *c = (redisClient*) privdata;
-    char buf[REDIS_IOBUF_LEN];
-    int nread;
-    REDIS_NOTUSED(el);
-    REDIS_NOTUSED(mask);
-
-    nread = read(fd, buf, REDIS_IOBUF_LEN);
-    if (nread == -1) {
-        if (errno == EAGAIN) {
-            nread = 0;
-        } else {
-            redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
-            freeClient(c);
-            return;
-        }
-    } else if (nread == 0) {
-        redisLog(REDIS_DEBUG, "Client closed connection");
-        freeClient(c);
-        return;
-    }
-    if (nread) {
-        c->querybuf = sdscatlen(c->querybuf, buf, nread);
-        c->lastinteraction = time(NULL);
-    } else {
-        return;
-    }
-
+static void processInputBuffer(redisClient *c) {
 again:
     if (c->bulklen == -1) {
         /* Read the first line of the query */
@@ -1557,12 +1618,45 @@ again:
             c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
             c->argc++;
             c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
-            processCommand(c);
+            /* Process the command. If the client is still valid after
+             * the processing and there is more data in the buffer
+             * try to parse it. */
+            if (processCommand(c) && sdslen(c->querybuf)) goto again;
             return;
         }
     }
 }
 
+static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
+    redisClient *c = (redisClient*) privdata;
+    char buf[REDIS_IOBUF_LEN];
+    int nread;
+    REDIS_NOTUSED(el);
+    REDIS_NOTUSED(mask);
+
+    nread = read(fd, buf, REDIS_IOBUF_LEN);
+    if (nread == -1) {
+        if (errno == EAGAIN) {
+            nread = 0;
+        } else {
+            redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
+            freeClient(c);
+            return;
+        }
+    } else if (nread == 0) {
+        redisLog(REDIS_DEBUG, "Client closed connection");
+        freeClient(c);
+        return;
+    }
+    if (nread) {
+        c->querybuf = sdscatlen(c->querybuf, buf, nread);
+        c->lastinteraction = time(NULL);
+    } else {
+        return;
+    }
+    processInputBuffer(c);
+}
+
 static int selectDb(redisClient *c, int id) {
     if (id < 0 || id >= server.dbnum)
         return REDIS_ERR;
@@ -1587,6 +1681,9 @@ static redisClient *createClient(int fd) {
     c->argc = 0;
     c->argv = NULL;
     c->bulklen = -1;
+    c->multibulk = 0;
+    c->mbargc = 0;
+    c->mbargv = NULL;
     c->sentlen = 0;
     c->flags = 0;
     c->lastinteraction = time(NULL);
@@ -2519,7 +2616,7 @@ static void getCommand(redisClient *c) {
     }
 }
 
-static void getSetCommand(redisClient *c) {
+static void getsetCommand(redisClient *c) {
     getCommand(c);
     if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
         dictReplace(c->db->dict,c->argv[1],c->argv[2]);
@@ -3546,9 +3643,8 @@ static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
     int prefixlen, sublen, postfixlen;
     /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
     struct {
-        int len;
-        unsigned short free;
-        unsigned short _len; /* not used here */
+        long len;
+        long free;
         char buf[REDIS_SORTKEY_MAX+1];
     } keyname;
 
@@ -3572,7 +3668,6 @@ static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
     memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
     keyname.buf[prefixlen+sublen+postfixlen] = '\0';
     keyname.len = prefixlen+sublen+postfixlen;
-    keyname._len = USHRT_MAX;
 
     keyobj.refcount = 1;
     keyobj.type = REDIS_STRING;
@@ -3770,9 +3865,9 @@ static void sortCommand(redisClient *c) {
                     if (byval->encoding == REDIS_ENCODING_RAW) {
                         vector[j].u.score = strtod(byval->ptr,NULL);
                     } else {
-                        if (byval->encoding == REDIS_ENCODING_INT)
+                        if (byval->encoding == REDIS_ENCODING_INT) {
                             vector[j].u.score = (long)byval->ptr;
-                        else
+                        else
                             assert(1 != 1);
                     }
                 }
@@ -3859,6 +3954,7 @@ static void infoCommand(redisClient *c) {
     
     info = sdscatprintf(sdsempty(),
         "redis_version:%s\r\n"
+        "arch_bits:%s\r\n"
         "uptime_in_seconds:%d\r\n"
         "uptime_in_days:%d\r\n"
         "connected_clients:%d\r\n"
@@ -3871,6 +3967,7 @@ static void infoCommand(redisClient *c) {
         "total_commands_processed:%lld\r\n"
         "role:%s\r\n"
         ,REDIS_VERSION,
+        (sizeof(long) == 8) ? "64" : "32",
         uptime,
         uptime/(3600*24),
         listLength(server.clients)-listLength(server.slaves),
@@ -4017,6 +4114,42 @@ static void ttlCommand(redisClient *c) {
     addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
 }
 
+static void msetGenericCommand(redisClient *c, int nx) {
+    int j;
+
+    if ((c->argc % 2) == 0) {
+        addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
+        return;
+    }
+    /* Handle the NX flag. The MSETNX semantic is to return zero and don't
+     * set nothing at all if at least one already key exists. */
+    if (nx) {
+        for (j = 1; j < c->argc; j += 2) {
+            if (dictFind(c->db->dict,c->argv[j]) != NULL) {
+                addReply(c, shared.czero);
+                return;
+            }
+        }
+    }
+
+    for (j = 1; j < c->argc; j += 2) {
+        dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
+        incrRefCount(c->argv[j]);
+        incrRefCount(c->argv[j+1]);
+        removeExpire(c->db,c->argv[j]);
+    }
+    server.dirty += (c->argc-1)/2;
+    addReply(c, nx ? shared.cone : shared.ok);
+}
+
+static void msetCommand(redisClient *c) {
+    msetGenericCommand(c,0);
+}
+
+static void msetnxCommand(redisClient *c) {
+    msetGenericCommand(c,1);
+}
+
 /* =============================== Replication  ============================= */
 
 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
@@ -4509,7 +4642,7 @@ static struct redisFunctionSym symsTable[] = {
 {"mgetCommand", (unsigned long)mgetCommand},
 {"monitorCommand", (unsigned long)monitorCommand},
 {"expireCommand", (unsigned long)expireCommand},
-{"getSetCommand", (unsigned long)getSetCommand},
+{"getsetCommand", (unsigned long)getsetCommand},
 {"ttlCommand", (unsigned long)ttlCommand},
 {"slaveofCommand", (unsigned long)slaveofCommand},
 {"debugCommand", (unsigned long)debugCommand},
@@ -4517,6 +4650,9 @@ static struct redisFunctionSym symsTable[] = {
 {"setupSigSegvAction", (unsigned long)setupSigSegvAction},
 {"readQueryFromClient", (unsigned long)readQueryFromClient},
 {"rdbRemoveTempFile", (unsigned long)rdbRemoveTempFile},
+{"msetGenericCommand", (unsigned long)msetGenericCommand},
+{"msetCommand", (unsigned long)msetCommand},
+{"msetnxCommand", (unsigned long)msetnxCommand},
 {NULL,0}
 };