]> git.saurik.com Git - redis.git/commitdiff
Merge branch 'zipmap' of git://github.com/pietern/redis
authorantirez <antirez@gmail.com>
Thu, 1 Apr 2010 16:48:14 +0000 (18:48 +0200)
committerantirez <antirez@gmail.com>
Thu, 1 Apr 2010 16:48:14 +0000 (18:48 +0200)
1  2 
redis.c

diff --combined redis.c
index bb0e200408545c8ad90020c9f4d28461cb71cda9,e69d3a3abf89b99aaff9cf2fdca493c13c5077eb..3f75037daa22c8f665b9bef4cb6a71f84ff360b4
+++ b/redis.c
@@@ -27,7 -27,7 +27,7 @@@
   * POSSIBILITY OF SUCH DAMAGE.
   */
  
 -#define REDIS_VERSION "1.3.7"
 +#define REDIS_VERSION "1.3.8"
  
  #include "fmacros.h"
  #include "config.h"
@@@ -327,8 -327,6 +327,8 @@@ typedef struct redisClient 
                               * is >= blockingto then the operation timed out. */
      list *io_keys;          /* Keys this client is waiting to be loaded from the
                               * swap file in order to continue. */
 +    dict *pubsub_channels;  /* channels a client is interested in (SUBSCRIBE) */
 +    list *pubsub_patterns;  /* patterns a client is interested in (SUBSCRIBE) */
  } redisClient;
  
  struct saveparam {
@@@ -437,18 -435,9 +437,18 @@@ struct redisServer 
      unsigned long long vm_stats_swapped_objects;
      unsigned long long vm_stats_swapouts;
      unsigned long long vm_stats_swapins;
 +    /* Pubsub */
 +    dict *pubsub_channels; /* Map channels to list of subscribed clients */
 +    list *pubsub_patterns; /* A list of pubsub_patterns */
 +    /* Misc */
      FILE *devnull;
  };
  
 +typedef struct pubsubPattern {
 +    redisClient *client;
 +    robj *pattern;
 +} pubsubPattern;
 +
  typedef void redisCommandProc(redisClient *c);
  struct redisCommand {
      char *name;
@@@ -512,9 -501,7 +512,9 @@@ struct sharedObjectsStruct 
      *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
      *outofrangeerr, *plus,
      *select0, *select1, *select2, *select3, *select4,
 -    *select5, *select6, *select7, *select8, *select9;
 +    *select5, *select6, *select7, *select8, *select9,
 +    *messagebulk, *subscribebulk, *unsubscribebulk, *mbulk3,
 +    *psubscribebulk, *punsubscribebulk;
  } shared;
  
  /* Global vars that are actally used as constants. The following double
@@@ -614,12 -601,6 +614,12 @@@ static struct redisCommand *lookupComma
  static void call(redisClient *c, struct redisCommand *cmd);
  static void resetClient(redisClient *c);
  static void convertToRealHash(robj *o);
 +static int pubsubUnsubscribeAllChannels(redisClient *c, int notify);
 +static int pubsubUnsubscribeAllPatterns(redisClient *c, int notify);
 +static void freePubsubPattern(void *p);
 +static int listMatchPubsubPattern(void *a, void *b);
 +static int compareStringObjects(robj *a, robj *b);
 +static void usage();
  
  static void authCommand(redisClient *c);
  static void pingCommand(redisClient *c);
@@@ -717,11 -698,6 +717,11 @@@ static void hgetallCommand(redisClient 
  static void hexistsCommand(redisClient *c);
  static void configCommand(redisClient *c);
  static void hincrbyCommand(redisClient *c);
 +static void subscribeCommand(redisClient *c);
 +static void unsubscribeCommand(redisClient *c);
 +static void psubscribeCommand(redisClient *c);
 +static void punsubscribeCommand(redisClient *c);
 +static void publishCommand(redisClient *c);
  
  /*================================= Globals ================================= */
  
@@@ -813,7 -789,7 +813,7 @@@ static struct redisCommand cmdTable[] 
      {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE,NULL,0,0,0},
      {"type",typeCommand,2,REDIS_CMD_INLINE,NULL,1,1,1},
      {"multi",multiCommand,1,REDIS_CMD_INLINE,NULL,0,0,0},
 -    {"exec",execCommand,1,REDIS_CMD_INLINE,NULL,0,0,0},
 +    {"exec",execCommand,1,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,NULL,0,0,0},
      {"discard",discardCommand,1,REDIS_CMD_INLINE,NULL,0,0,0},
      {"sync",syncCommand,1,REDIS_CMD_INLINE,NULL,0,0,0},
      {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE,NULL,0,0,0},
      {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE,NULL,0,0,0},
      {"debug",debugCommand,-2,REDIS_CMD_INLINE,NULL,0,0,0},
      {"config",configCommand,-2,REDIS_CMD_BULK,NULL,0,0,0},
 +    {"subscribe",subscribeCommand,-2,REDIS_CMD_INLINE,NULL,0,0,0},
 +    {"unsubscribe",unsubscribeCommand,-1,REDIS_CMD_INLINE,NULL,0,0,0},
 +    {"psubscribe",psubscribeCommand,-2,REDIS_CMD_INLINE,NULL,0,0,0},
 +    {"punsubscribe",punsubscribeCommand,-1,REDIS_CMD_INLINE,NULL,0,0,0},
 +    {"publish",publishCommand,3,REDIS_CMD_BULK,NULL,0,0,0},
      {NULL,NULL,0,0,NULL,0,0,0}
  };
  
 -static void usage();
 -
  /*============================ Utility functions ============================ */
  
  /* Glob-style pattern matching. */
@@@ -1168,9 -1141,7 +1168,9 @@@ static void closeTimedoutClients(void) 
          if (server.maxidletime &&
              !(c->flags & REDIS_SLAVE) &&    /* no timeout for slaves */
              !(c->flags & REDIS_MASTER) &&   /* no timeout for masters */
 -             (now - c->lastinteraction > server.maxidletime))
 +            dictSize(c->pubsub_channels) == 0 && /* no timeout for pubsub */
 +            listLength(c->pubsub_patterns) == 0 &&
 +            (now - c->lastinteraction > server.maxidletime))
          {
              redisLog(REDIS_VERBOSE,"Closing idle client");
              freeClient(c);
@@@ -1502,12 -1473,6 +1502,12 @@@ static void createSharedObjects(void) 
      shared.select7 = createStringObject("select 7\r\n",10);
      shared.select8 = createStringObject("select 8\r\n",10);
      shared.select9 = createStringObject("select 9\r\n",10);
 +    shared.messagebulk = createStringObject("$7\r\nmessage\r\n",13);
 +    shared.subscribebulk = createStringObject("$9\r\nsubscribe\r\n",15);
 +    shared.unsubscribebulk = createStringObject("$11\r\nunsubscribe\r\n",18);
 +    shared.psubscribebulk = createStringObject("$10\r\npsubscribe\r\n",17);
 +    shared.punsubscribebulk = createStringObject("$12\r\npunsubscribe\r\n",19);
 +    shared.mbulk3 = createStringObject("*3\r\n",4);
  }
  
  static void appendServerSaveParams(time_t seconds, int changes) {
@@@ -1611,10 -1576,6 +1611,10 @@@ static void initServer() 
              server.db[j].io_keys = dictCreate(&keylistDictType,NULL);
          server.db[j].id = j;
      }
 +    server.pubsub_channels = dictCreate(&keylistDictType,NULL);
 +    server.pubsub_patterns = listCreate();
 +    listSetFreeMethod(server.pubsub_patterns,freePubsubPattern);
 +    listSetMatchMethod(server.pubsub_patterns,listMatchPubsubPattern);
      server.cronloops = 0;
      server.bgsavechildpid = -1;
      server.bgrewritechildpid = -1;
@@@ -1878,12 -1839,6 +1878,12 @@@ static void freeClient(redisClient *c) 
      if (c->flags & REDIS_BLOCKED)
          unblockClientWaitingData(c);
  
 +    /* Unsubscribe from all the pubsub channels */
 +    pubsubUnsubscribeAllChannels(c,0);
 +    pubsubUnsubscribeAllPatterns(c,0);
 +    dictRelease(c->pubsub_channels);
 +    listRelease(c->pubsub_patterns);
 +    /* Obvious cleanup */
      aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
      aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
      listRelease(c->reply);
          dontWaitForSwappedKey(c,ln->value);
      }
      listRelease(c->io_keys);
 -    /* Other cleanup */
 +    /* Master/slave cleanup */
      if (c->flags & REDIS_SLAVE) {
          if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
              close(c->repldbfd);
          server.master = NULL;
          server.replstate = REDIS_REPL_CONNECT;
      }
 +    /* Release memory */
      zfree(c->argv);
      zfree(c->mbargv);
      freeClientMultiState(c);
@@@ -2289,15 -2243,6 +2289,15 @@@ static int processCommand(redisClient *
          return 1;
      }
  
 +    /* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */
 +    if (dictSize(c->pubsub_channels) > 0 &&
 +        cmd->proc != subscribeCommand && cmd->proc != unsubscribeCommand &&
 +        cmd->proc != psubscribeCommand && cmd->proc != punsubscribeCommand) {
 +        addReplySds(c,sdsnew("-ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context\r\n"));
 +        resetClient(c);
 +        return 1;
 +    }
 +
      /* Exec the command */
      if (c->flags & REDIS_MULTI && cmd->proc != execCommand && cmd->proc != discardCommand) {
          queueMultiCommand(c,cmd);
@@@ -2509,10 -2454,6 +2509,10 @@@ static void *dupClientReplyValue(void *
      return o;
  }
  
 +static int listMatchObjects(void *a, void *b) {
 +    return compareStringObjects(a,b) == 0;
 +}
 +
  static redisClient *createClient(int fd) {
      redisClient *c = zmalloc(sizeof(*c));
  
      c->blockingkeysnum = 0;
      c->io_keys = listCreate();
      listSetFreeMethod(c->io_keys,decrRefCount);
 +    c->pubsub_channels = dictCreate(&setDictType,NULL);
 +    c->pubsub_patterns = listCreate();
 +    listSetFreeMethod(c->pubsub_patterns,decrRefCount);
 +    listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
      if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
          readQueryFromClient, c) == AE_ERR) {
          freeClient(c);
@@@ -6000,10 -5937,8 +6000,8 @@@ static void hsetCommand(redisClient *c
          decrRefCount(valobj);
          o->ptr = zm;
  
-         /* And here there is the second check for hash conversion...
-          * we want to do it only if the operation was not just an update as
-          * zipmapLen() is O(N). */
-         if (!update && zipmapLen(zm) > server.hash_max_zipmap_entries)
+         /* And here there is the second check for hash conversion. */
+         if (zipmapLen(zm) > server.hash_max_zipmap_entries)
              convertToRealHash(o);
      } else {
          tryObjectEncoding(c->argv[2]);
  }
  
  static void hincrbyCommand(redisClient *c) {
-     int update = 0;
      long long value = 0, incr = 0;
      robj *o = lookupKeyWrite(c->db,c->argv[1]);
  
          value += incr;
          sds svalue = sdscatprintf(sdsempty(),"%lld",value);
          zm = zipmapSet(zm,c->argv[2]->ptr,sdslen(c->argv[2]->ptr),
-             (unsigned char*)svalue,sdslen(svalue),&update);
+             (unsigned char*)svalue,sdslen(svalue),NULL);
          sdsfree(svalue);
          o->ptr = zm;
  
-         /* Check if the zipmap needs to be converted
-          * if this was not an update. */
-         if (!update && zipmapLen(zm) > server.hash_max_zipmap_entries)
+         /* Check if the zipmap needs to be converted. */
+         if (zipmapLen(zm) > server.hash_max_zipmap_entries)
              convertToRealHash(o);
      } else {
          robj *hval;
@@@ -6707,8 -6640,6 +6703,8 @@@ static sds genRedisInfoString(void) 
          "expired_keys:%lld\r\n"
          "hash_max_zipmap_entries:%ld\r\n"
          "hash_max_zipmap_value:%ld\r\n"
 +        "pubsub_channels:%ld\r\n"
 +        "pubsub_patterns:%u\r\n"
          "vm_enabled:%d\r\n"
          "role:%s\r\n"
          ,REDIS_VERSION,
          server.stat_expiredkeys,
          server.hash_max_zipmap_entries,
          server.hash_max_zipmap_value,
 +        dictSize(server.pubsub_channels),
 +        listLength(server.pubsub_patterns),
          server.vm_enabled != 0,
          server.masterhost == NULL ? "master" : "slave"
      );
@@@ -9303,265 -9232,6 +9299,265 @@@ badarity
          (char*) c->argv[1]->ptr));
  }
  
 +/* =========================== Pubsub implementation ======================== */
 +
 +static void freePubsubPattern(void *p) {
 +    pubsubPattern *pat = p;
 +
 +    decrRefCount(pat->pattern);
 +    zfree(pat);
 +}
 +
 +static int listMatchPubsubPattern(void *a, void *b) {
 +    pubsubPattern *pa = a, *pb = b;
 +
 +    return (pa->client == pb->client) &&
 +           (compareStringObjects(pa->pattern,pb->pattern) == 0);
 +}
 +
 +/* Subscribe a client to a channel. Returns 1 if the operation succeeded, or
 + * 0 if the client was already subscribed to that channel. */
 +static int pubsubSubscribeChannel(redisClient *c, robj *channel) {
 +    struct dictEntry *de;
 +    list *clients = NULL;
 +    int retval = 0;
 +
 +    /* Add the channel to the client -> channels hash table */
 +    if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
 +        retval = 1;
 +        incrRefCount(channel);
 +        /* Add the client to the channel -> list of clients hash table */
 +        de = dictFind(server.pubsub_channels,channel);
 +        if (de == NULL) {
 +            clients = listCreate();
 +            dictAdd(server.pubsub_channels,channel,clients);
 +            incrRefCount(channel);
 +        } else {
 +            clients = dictGetEntryVal(de);
 +        }
 +        listAddNodeTail(clients,c);
 +    }
 +    /* Notify the client */
 +    addReply(c,shared.mbulk3);
 +    addReply(c,shared.subscribebulk);
 +    addReplyBulk(c,channel);
 +    addReplyLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns));
 +    return retval;
 +}
 +
 +/* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or
 + * 0 if the client was not subscribed to the specified channel. */
 +static int pubsubUnsubscribeChannel(redisClient *c, robj *channel, int notify) {
 +    struct dictEntry *de;
 +    list *clients;
 +    listNode *ln;
 +    int retval = 0;
 +
 +    /* Remove the channel from the client -> channels hash table */
 +    incrRefCount(channel); /* channel may be just a pointer to the same object
 +                            we have in the hash tables. Protect it... */
 +    if (dictDelete(c->pubsub_channels,channel) == DICT_OK) {
 +        retval = 1;
 +        /* Remove the client from the channel -> clients list hash table */
 +        de = dictFind(server.pubsub_channels,channel);
 +        assert(de != NULL);
 +        clients = dictGetEntryVal(de);
 +        ln = listSearchKey(clients,c);
 +        assert(ln != NULL);
 +        listDelNode(clients,ln);
 +        if (listLength(clients) == 0) {
 +            /* Free the list and associated hash entry at all if this was
 +             * the latest client, so that it will be possible to abuse
 +             * Redis PUBSUB creating millions of channels. */
 +            dictDelete(server.pubsub_channels,channel);
 +        }
 +    }
 +    /* Notify the client */
 +    if (notify) {
 +        addReply(c,shared.mbulk3);
 +        addReply(c,shared.unsubscribebulk);
 +        addReplyBulk(c,channel);
 +        addReplyLong(c,dictSize(c->pubsub_channels)+
 +                       listLength(c->pubsub_patterns));
 +
 +    }
 +    decrRefCount(channel); /* it is finally safe to release it */
 +    return retval;
 +}
 +
 +/* Subscribe a client to a pattern. Returns 1 if the operation succeeded, or 0 if the clinet was already subscribed to that pattern. */
 +static int pubsubSubscribePattern(redisClient *c, robj *pattern) {
 +    int retval = 0;
 +
 +    if (listSearchKey(c->pubsub_patterns,pattern) == NULL) {
 +        retval = 1;
 +        pubsubPattern *pat;
 +        listAddNodeTail(c->pubsub_patterns,pattern);
 +        incrRefCount(pattern);
 +        pat = zmalloc(sizeof(*pat));
 +        pat->pattern = getDecodedObject(pattern);
 +        pat->client = c;
 +        listAddNodeTail(server.pubsub_patterns,pat);
 +    }
 +    /* Notify the client */
 +    addReply(c,shared.mbulk3);
 +    addReply(c,shared.psubscribebulk);
 +    addReplyBulk(c,pattern);
 +    addReplyLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns));
 +    return retval;
 +}
 +
 +/* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or
 + * 0 if the client was not subscribed to the specified channel. */
 +static int pubsubUnsubscribePattern(redisClient *c, robj *pattern, int notify) {
 +    listNode *ln;
 +    pubsubPattern pat;
 +    int retval = 0;
 +
 +    incrRefCount(pattern); /* Protect the object. May be the same we remove */
 +    if ((ln = listSearchKey(c->pubsub_patterns,pattern)) != NULL) {
 +        retval = 1;
 +        listDelNode(c->pubsub_patterns,ln);
 +        pat.client = c;
 +        pat.pattern = pattern;
 +        ln = listSearchKey(server.pubsub_patterns,&pat);
 +        listDelNode(server.pubsub_patterns,ln);
 +    }
 +    /* Notify the client */
 +    if (notify) {
 +        addReply(c,shared.mbulk3);
 +        addReply(c,shared.punsubscribebulk);
 +        addReplyBulk(c,pattern);
 +        addReplyLong(c,dictSize(c->pubsub_channels)+
 +                       listLength(c->pubsub_patterns));
 +    }
 +    decrRefCount(pattern);
 +    return retval;
 +}
 +
 +/* Unsubscribe from all the channels. Return the number of channels the
 + * client was subscribed from. */
 +static int pubsubUnsubscribeAllChannels(redisClient *c, int notify) {
 +    dictIterator *di = dictGetIterator(c->pubsub_channels);
 +    dictEntry *de;
 +    int count = 0;
 +
 +    while((de = dictNext(di)) != NULL) {
 +        robj *channel = dictGetEntryKey(de);
 +
 +        count += pubsubUnsubscribeChannel(c,channel,notify);
 +    }
 +    dictReleaseIterator(di);
 +    return count;
 +}
 +
 +/* Unsubscribe from all the patterns. Return the number of patterns the
 + * client was subscribed from. */
 +static int pubsubUnsubscribeAllPatterns(redisClient *c, int notify) {
 +    listNode *ln;
 +    listIter li;
 +    int count = 0;
 +
 +    listRewind(c->pubsub_patterns,&li);
 +    while ((ln = listNext(&li)) != NULL) {
 +        robj *pattern = ln->value;
 +
 +        count += pubsubUnsubscribePattern(c,pattern,notify);
 +    }
 +    return count;
 +}
 +
 +/* Publish a message */
 +static int pubsubPublishMessage(robj *channel, robj *message) {
 +    int receivers = 0;
 +    struct dictEntry *de;
 +    listNode *ln;
 +    listIter li;
 +
 +    /* Send to clients listening for that channel */
 +    de = dictFind(server.pubsub_channels,channel);
 +    if (de) {
 +        list *list = dictGetEntryVal(de);
 +        listNode *ln;
 +        listIter li;
 +
 +        listRewind(list,&li);
 +        while ((ln = listNext(&li)) != NULL) {
 +            redisClient *c = ln->value;
 +
 +            addReply(c,shared.mbulk3);
 +            addReply(c,shared.messagebulk);
 +            addReplyBulk(c,channel);
 +            addReplyBulk(c,message);
 +            receivers++;
 +        }
 +    }
 +    /* Send to clients listening to matching channels */
 +    if (listLength(server.pubsub_patterns)) {
 +        listRewind(server.pubsub_patterns,&li);
 +        channel = getDecodedObject(channel);
 +        while ((ln = listNext(&li)) != NULL) {
 +            pubsubPattern *pat = ln->value;
 +
 +            if (stringmatchlen((char*)pat->pattern->ptr,
 +                                sdslen(pat->pattern->ptr),
 +                                (char*)channel->ptr,
 +                                sdslen(channel->ptr),0)) {
 +                addReply(pat->client,shared.mbulk3);
 +                addReply(pat->client,shared.messagebulk);
 +                addReplyBulk(pat->client,channel);
 +                addReplyBulk(pat->client,message);
 +                receivers++;
 +            }
 +        }
 +        decrRefCount(channel);
 +    }
 +    return receivers;
 +}
 +
 +static void subscribeCommand(redisClient *c) {
 +    int j;
 +
 +    for (j = 1; j < c->argc; j++)
 +        pubsubSubscribeChannel(c,c->argv[j]);
 +}
 +
 +static void unsubscribeCommand(redisClient *c) {
 +    if (c->argc == 1) {
 +        pubsubUnsubscribeAllChannels(c,1);
 +        return;
 +    } else {
 +        int j;
 +
 +        for (j = 1; j < c->argc; j++)
 +            pubsubUnsubscribeChannel(c,c->argv[j],1);
 +    }
 +}
 +
 +static void psubscribeCommand(redisClient *c) {
 +    int j;
 +
 +    for (j = 1; j < c->argc; j++)
 +        pubsubSubscribePattern(c,c->argv[j]);
 +}
 +
 +static void punsubscribeCommand(redisClient *c) {
 +    if (c->argc == 1) {
 +        pubsubUnsubscribeAllPatterns(c,1);
 +        return;
 +    } else {
 +        int j;
 +
 +        for (j = 1; j < c->argc; j++)
 +            pubsubUnsubscribePattern(c,c->argv[j],1);
 +    }
 +}
 +
 +static void publishCommand(redisClient *c) {
 +    int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
 +    addReplyLong(c,receivers);
 +}
 +
  /* ================================= Debugging ============================== */
  
  static void debugCommand(redisClient *c) {