2  * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com> 
   5  * Redistribution and use in source and binary forms, with or without 
   6  * modification, are permitted provided that the following conditions are met: 
   8  *   * Redistributions of source code must retain the above copyright notice, 
   9  *     this list of conditions and the following disclaimer. 
  10  *   * Redistributions in binary form must reproduce the above copyright 
  11  *     notice, this list of conditions and the following disclaimer in the 
  12  *     documentation and/or other materials provided with the distribution. 
  13  *   * Neither the name of Redis nor the names of its contributors may be used 
  14  *     to endorse or promote products derived from this software without 
  15  *     specific prior written permission. 
  17  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 
  18  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
  19  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
  20  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
  21  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
  22  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
  23  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
  24  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
  25  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 
  26  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
  27  * POSSIBILITY OF SUCH DAMAGE. 
  30 #define REDIS_VERSION "1.050" 
  40 #define __USE_POSIX199309 
  46 #endif /* HAVE_BACKTRACE */ 
  54 #include <arpa/inet.h> 
  58 #include <sys/resource.h> 
  63 #include "solarisfixes.h" 
  67 #include "ae.h"     /* Event driven programming library */ 
  68 #include "sds.h"    /* Dynamic safe strings */ 
  69 #include "anet.h"   /* Networking the easy way */ 
  70 #include "dict.h"   /* Hash tables */ 
  71 #include "adlist.h" /* Linked lists */ 
  72 #include "zmalloc.h" /* total memory usage aware version of malloc/free */ 
  73 #include "lzf.h"    /* LZF compression library */ 
  74 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */ 
  80 /* Static server configuration */ 
  81 #define REDIS_SERVERPORT        6379    /* TCP port */ 
  82 #define REDIS_MAXIDLETIME       (60*5)  /* default client timeout */ 
  83 #define REDIS_IOBUF_LEN         1024 
  84 #define REDIS_LOADBUF_LEN       1024 
  85 #define REDIS_STATIC_ARGS       4 
  86 #define REDIS_DEFAULT_DBNUM     16 
  87 #define REDIS_CONFIGLINE_MAX    1024 
  88 #define REDIS_OBJFREELIST_MAX   1000000 /* Max number of objects to cache */ 
  89 #define REDIS_MAX_SYNC_TIME     60      /* Slave can't take more to sync */ 
  90 #define REDIS_EXPIRELOOKUPS_PER_CRON    100 /* try to expire 100 keys/second */ 
  91 #define REDIS_MAX_WRITE_PER_EVENT (1024*64) 
  92 #define REDIS_REQUEST_MAX_SIZE  (1024*1024*256) /* max bytes in inline command */ 
  94 /* Hash table parameters */ 
  95 #define REDIS_HT_MINFILL        10      /* Minimal hash table fill 10% */ 
  98 #define REDIS_CMD_BULK          1       /* Bulk write command */ 
  99 #define REDIS_CMD_INLINE        2       /* Inline command */ 
 100 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with 
 101    this flags will return an error when the 'maxmemory' option is set in the 
 102    config file and the server is using more than maxmemory bytes of memory. 
 103    In short this commands are denied on low memory conditions. */ 
 104 #define REDIS_CMD_DENYOOM       4 
 107 #define REDIS_STRING 0 
 113 /* Objects encoding */ 
 114 #define REDIS_ENCODING_RAW 0    /* Raw representation */ 
 115 #define REDIS_ENCODING_INT 1    /* Encoded as integer */ 
 117 /* Object types only used for dumping to disk */ 
 118 #define REDIS_EXPIRETIME 253 
 119 #define REDIS_SELECTDB 254 
 120 #define REDIS_EOF 255 
 122 /* Defines related to the dump file format. To store 32 bits lengths for short 
 123  * keys requires a lot of space, so we check the most significant 2 bits of 
 124  * the first byte to interpreter the length: 
 126  * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte 
 127  * 01|000000 00000000 =>  01, the len is 14 byes, 6 bits + 8 bits of next byte 
 128  * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow 
 129  * 11|000000 this means: specially encoded object will follow. The six bits 
 130  *           number specify the kind of object that follows. 
 131  *           See the REDIS_RDB_ENC_* defines. 
 133  * Lenghts up to 63 are stored using a single byte, most DB keys, and may 
 134  * values, will fit inside. */ 
 135 #define REDIS_RDB_6BITLEN 0 
 136 #define REDIS_RDB_14BITLEN 1 
 137 #define REDIS_RDB_32BITLEN 2 
 138 #define REDIS_RDB_ENCVAL 3 
 139 #define REDIS_RDB_LENERR UINT_MAX 
 141 /* When a length of a string object stored on disk has the first two bits 
 142  * set, the remaining two bits specify a special encoding for the object 
 143  * accordingly to the following defines: */ 
 144 #define REDIS_RDB_ENC_INT8 0        /* 8 bit signed integer */ 
 145 #define REDIS_RDB_ENC_INT16 1       /* 16 bit signed integer */ 
 146 #define REDIS_RDB_ENC_INT32 2       /* 32 bit signed integer */ 
 147 #define REDIS_RDB_ENC_LZF 3         /* string compressed with FASTLZ */ 
 150 #define REDIS_CLOSE 1       /* This client connection should be closed ASAP */ 
 151 #define REDIS_SLAVE 2       /* This client is a slave server */ 
 152 #define REDIS_MASTER 4      /* This client is a master server */ 
 153 #define REDIS_MONITOR 8      /* This client is a slave monitor, see MONITOR */ 
 155 /* Slave replication state - slave side */ 
 156 #define REDIS_REPL_NONE 0   /* No active replication */ 
 157 #define REDIS_REPL_CONNECT 1    /* Must connect to master */ 
 158 #define REDIS_REPL_CONNECTED 2  /* Connected to master */ 
 160 /* Slave replication state - from the point of view of master 
 161  * Note that in SEND_BULK and ONLINE state the slave receives new updates 
 162  * in its output queue. In the WAIT_BGSAVE state instead the server is waiting 
 163  * to start the next background saving in order to send updates to it. */ 
 164 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */ 
 165 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */ 
 166 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */ 
 167 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */ 
 169 /* List related stuff */ 
 173 /* Sort operations */ 
 174 #define REDIS_SORT_GET 0 
 175 #define REDIS_SORT_DEL 1 
 176 #define REDIS_SORT_INCR 2 
 177 #define REDIS_SORT_DECR 3 
 178 #define REDIS_SORT_ASC 4 
 179 #define REDIS_SORT_DESC 5 
 180 #define REDIS_SORTKEY_MAX 1024 
 183 #define REDIS_DEBUG 0 
 184 #define REDIS_NOTICE 1 
 185 #define REDIS_WARNING 2 
 187 /* Anti-warning macro... */ 
 188 #define REDIS_NOTUSED(V) ((void) V) 
 190 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */ 
 191 #define ZSKIPLIST_P 0.25      /* Skiplist P = 1/4 */ 
 193 /* Append only defines */ 
 194 #define APPENDFSYNC_NO 0 
 195 #define APPENDFSYNC_ALWAYS 1 
 196 #define APPENDFSYNC_EVERYSEC 2 
 198 /*================================= Data types ============================== */ 
 200 /* A redis object, that is a type able to hold a string / list / set */ 
 201 typedef struct redisObject 
{ 
 204     unsigned char encoding
; 
 205     unsigned char notused
[2]; 
 209 typedef struct redisDb 
{ 
 215 /* With multiplexing we need to take per-clinet state. 
 216  * Clients are taken in a liked list. */ 
 217 typedef struct redisClient 
{ 
 222     robj 
**argv
, **mbargv
; 
 224     int bulklen
;            /* bulk read len. -1 if not in bulk read mode */ 
 225     int multibulk
;          /* multi bulk command format active */ 
 228     time_t lastinteraction
; /* time of the last interaction, used for timeout */ 
 229     int flags
;              /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */ 
 230     int slaveseldb
;         /* slave selected db, if this client is a slave */ 
 231     int authenticated
;      /* when requirepass is non-NULL */ 
 232     int replstate
;          /* replication state if this is a slave */ 
 233     int repldbfd
;           /* replication DB file descriptor */ 
 234     long repldboff
;          /* replication DB file offset */ 
 235     off_t repldbsize
;       /* replication DB file size */ 
 243 /* Global server state structure */ 
 249     unsigned int sharingpoolsize
; 
 250     long long dirty
;            /* changes to DB from the last save */ 
 252     list 
*slaves
, *monitors
; 
 253     char neterr
[ANET_ERR_LEN
]; 
 255     int cronloops
;              /* number of times the cron function run */ 
 256     list 
*objfreelist
;          /* A list of freed objects to avoid malloc() */ 
 257     time_t lastsave
;            /* Unix time of last save succeeede */ 
 258     size_t usedmemory
;             /* Used memory in megabytes */ 
 259     /* Fields used only for stats */ 
 260     time_t stat_starttime
;         /* server start time */ 
 261     long long stat_numcommands
;    /* number of processed commands */ 
 262     long long stat_numconnections
; /* number of connections received */ 
 275     int bgsaveinprogress
; 
 276     pid_t bgsavechildpid
; 
 277     struct saveparam 
*saveparams
; 
 282     char *appendfilename
; 
 285     /* Replication related */ 
 289     redisClient 
*master
;    /* client that is master for this slave */ 
 291     unsigned int maxclients
; 
 292     unsigned long maxmemory
; 
 293     /* Sort parameters - qsort_r() is only available under BSD so we 
 294      * have to take this state global, in order to pass it to sortCompare() */ 
 300 typedef void redisCommandProc(redisClient 
*c
); 
 301 struct redisCommand 
{ 
 303     redisCommandProc 
*proc
; 
 308 struct redisFunctionSym 
{ 
 310     unsigned long pointer
; 
 313 typedef struct _redisSortObject 
{ 
 321 typedef struct _redisSortOperation 
{ 
 324 } redisSortOperation
; 
 326 /* ZSETs use a specialized version of Skiplists */ 
 328 typedef struct zskiplistNode 
{ 
 329     struct zskiplistNode 
**forward
; 
 330     struct zskiplistNode 
*backward
; 
 335 typedef struct zskiplist 
{ 
 336     struct zskiplistNode 
*header
, *tail
; 
 337     unsigned long length
; 
 341 typedef struct zset 
{ 
 346 /* Our shared "common" objects */ 
 348 struct sharedObjectsStruct 
{ 
 349     robj 
*crlf
, *ok
, *err
, *emptybulk
, *czero
, *cone
, *pong
, *space
, 
 350     *colon
, *nullbulk
, *nullmultibulk
, 
 351     *emptymultibulk
, *wrongtypeerr
, *nokeyerr
, *syntaxerr
, *sameobjecterr
, 
 352     *outofrangeerr
, *plus
, 
 353     *select0
, *select1
, *select2
, *select3
, *select4
, 
 354     *select5
, *select6
, *select7
, *select8
, *select9
; 
 357 /* Global vars that are actally used as constants. The following double 
 358  * values are used for double on-disk serialization, and are initialized 
 359  * at runtime to avoid strange compiler optimizations. */ 
 361 static double R_Zero
, R_PosInf
, R_NegInf
, R_Nan
; 
 363 /*================================ Prototypes =============================== */ 
 365 static void freeStringObject(robj 
*o
); 
 366 static void freeListObject(robj 
*o
); 
 367 static void freeSetObject(robj 
*o
); 
 368 static void decrRefCount(void *o
); 
 369 static robj 
*createObject(int type
, void *ptr
); 
 370 static void freeClient(redisClient 
*c
); 
 371 static int rdbLoad(char *filename
); 
 372 static void addReply(redisClient 
*c
, robj 
*obj
); 
 373 static void addReplySds(redisClient 
*c
, sds s
); 
 374 static void incrRefCount(robj 
*o
); 
 375 static int rdbSaveBackground(char *filename
); 
 376 static robj 
*createStringObject(char *ptr
, size_t len
); 
 377 static void replicationFeedSlaves(list 
*slaves
, struct redisCommand 
*cmd
, int dictid
, robj 
**argv
, int argc
); 
 378 static void feedAppendOnlyFile(struct redisCommand 
*cmd
, int dictid
, robj 
**argv
, int argc
); 
 379 static int syncWithMaster(void); 
 380 static robj 
*tryObjectSharing(robj 
*o
); 
 381 static int tryObjectEncoding(robj 
*o
); 
 382 static robj 
*getDecodedObject(const robj 
*o
); 
 383 static int removeExpire(redisDb 
*db
, robj 
*key
); 
 384 static int expireIfNeeded(redisDb 
*db
, robj 
*key
); 
 385 static int deleteIfVolatile(redisDb 
*db
, robj 
*key
); 
 386 static int deleteKey(redisDb 
*db
, robj 
*key
); 
 387 static time_t getExpire(redisDb 
*db
, robj 
*key
); 
 388 static int setExpire(redisDb 
*db
, robj 
*key
, time_t when
); 
 389 static void updateSlavesWaitingBgsave(int bgsaveerr
); 
 390 static void freeMemoryIfNeeded(void); 
 391 static int processCommand(redisClient 
*c
); 
 392 static void setupSigSegvAction(void); 
 393 static void rdbRemoveTempFile(pid_t childpid
); 
 394 static size_t stringObjectLen(robj 
*o
); 
 395 static void processInputBuffer(redisClient 
*c
); 
 396 static zskiplist 
*zslCreate(void); 
 397 static void zslFree(zskiplist 
*zsl
); 
 398 static void zslInsert(zskiplist 
*zsl
, double score
, robj 
*obj
); 
 400 static void authCommand(redisClient 
*c
); 
 401 static void pingCommand(redisClient 
*c
); 
 402 static void echoCommand(redisClient 
*c
); 
 403 static void setCommand(redisClient 
*c
); 
 404 static void setnxCommand(redisClient 
*c
); 
 405 static void getCommand(redisClient 
*c
); 
 406 static void delCommand(redisClient 
*c
); 
 407 static void existsCommand(redisClient 
*c
); 
 408 static void incrCommand(redisClient 
*c
); 
 409 static void decrCommand(redisClient 
*c
); 
 410 static void incrbyCommand(redisClient 
*c
); 
 411 static void decrbyCommand(redisClient 
*c
); 
 412 static void selectCommand(redisClient 
*c
); 
 413 static void randomkeyCommand(redisClient 
*c
); 
 414 static void keysCommand(redisClient 
*c
); 
 415 static void dbsizeCommand(redisClient 
*c
); 
 416 static void lastsaveCommand(redisClient 
*c
); 
 417 static void saveCommand(redisClient 
*c
); 
 418 static void bgsaveCommand(redisClient 
*c
); 
 419 static void shutdownCommand(redisClient 
*c
); 
 420 static void moveCommand(redisClient 
*c
); 
 421 static void renameCommand(redisClient 
*c
); 
 422 static void renamenxCommand(redisClient 
*c
); 
 423 static void lpushCommand(redisClient 
*c
); 
 424 static void rpushCommand(redisClient 
*c
); 
 425 static void lpopCommand(redisClient 
*c
); 
 426 static void rpopCommand(redisClient 
*c
); 
 427 static void llenCommand(redisClient 
*c
); 
 428 static void lindexCommand(redisClient 
*c
); 
 429 static void lrangeCommand(redisClient 
*c
); 
 430 static void ltrimCommand(redisClient 
*c
); 
 431 static void typeCommand(redisClient 
*c
); 
 432 static void lsetCommand(redisClient 
*c
); 
 433 static void saddCommand(redisClient 
*c
); 
 434 static void sremCommand(redisClient 
*c
); 
 435 static void smoveCommand(redisClient 
*c
); 
 436 static void sismemberCommand(redisClient 
*c
); 
 437 static void scardCommand(redisClient 
*c
); 
 438 static void spopCommand(redisClient 
*c
); 
 439 static void srandmemberCommand(redisClient 
*c
); 
 440 static void sinterCommand(redisClient 
*c
); 
 441 static void sinterstoreCommand(redisClient 
*c
); 
 442 static void sunionCommand(redisClient 
*c
); 
 443 static void sunionstoreCommand(redisClient 
*c
); 
 444 static void sdiffCommand(redisClient 
*c
); 
 445 static void sdiffstoreCommand(redisClient 
*c
); 
 446 static void syncCommand(redisClient 
*c
); 
 447 static void flushdbCommand(redisClient 
*c
); 
 448 static void flushallCommand(redisClient 
*c
); 
 449 static void sortCommand(redisClient 
*c
); 
 450 static void lremCommand(redisClient 
*c
); 
 451 static void infoCommand(redisClient 
*c
); 
 452 static void mgetCommand(redisClient 
*c
); 
 453 static void monitorCommand(redisClient 
*c
); 
 454 static void expireCommand(redisClient 
*c
); 
 455 static void expireatCommand(redisClient 
*c
); 
 456 static void getsetCommand(redisClient 
*c
); 
 457 static void ttlCommand(redisClient 
*c
); 
 458 static void slaveofCommand(redisClient 
*c
); 
 459 static void debugCommand(redisClient 
*c
); 
 460 static void msetCommand(redisClient 
*c
); 
 461 static void msetnxCommand(redisClient 
*c
); 
 462 static void zaddCommand(redisClient 
*c
); 
 463 static void zrangeCommand(redisClient 
*c
); 
 464 static void zrangebyscoreCommand(redisClient 
*c
); 
 465 static void zrevrangeCommand(redisClient 
*c
); 
 466 static void zcardCommand(redisClient 
*c
); 
 467 static void zremCommand(redisClient 
*c
); 
 468 static void zscoreCommand(redisClient 
*c
); 
 469 static void zremrangebyscoreCommand(redisClient 
*c
); 
 471 /*================================= Globals ================================= */ 
 474 static struct redisServer server
; /* server global state */ 
 475 static struct redisCommand cmdTable
[] = { 
 476     {"get",getCommand
,2,REDIS_CMD_INLINE
}, 
 477     {"set",setCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
}, 
 478     {"setnx",setnxCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
}, 
 479     {"del",delCommand
,-2,REDIS_CMD_INLINE
}, 
 480     {"exists",existsCommand
,2,REDIS_CMD_INLINE
}, 
 481     {"incr",incrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
}, 
 482     {"decr",decrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
}, 
 483     {"mget",mgetCommand
,-2,REDIS_CMD_INLINE
}, 
 484     {"rpush",rpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
}, 
 485     {"lpush",lpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
}, 
 486     {"rpop",rpopCommand
,2,REDIS_CMD_INLINE
}, 
 487     {"lpop",lpopCommand
,2,REDIS_CMD_INLINE
}, 
 488     {"llen",llenCommand
,2,REDIS_CMD_INLINE
}, 
 489     {"lindex",lindexCommand
,3,REDIS_CMD_INLINE
}, 
 490     {"lset",lsetCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
}, 
 491     {"lrange",lrangeCommand
,4,REDIS_CMD_INLINE
}, 
 492     {"ltrim",ltrimCommand
,4,REDIS_CMD_INLINE
}, 
 493     {"lrem",lremCommand
,4,REDIS_CMD_BULK
}, 
 494     {"sadd",saddCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
}, 
 495     {"srem",sremCommand
,3,REDIS_CMD_BULK
}, 
 496     {"smove",smoveCommand
,4,REDIS_CMD_BULK
}, 
 497     {"sismember",sismemberCommand
,3,REDIS_CMD_BULK
}, 
 498     {"scard",scardCommand
,2,REDIS_CMD_INLINE
}, 
 499     {"spop",spopCommand
,2,REDIS_CMD_INLINE
}, 
 500     {"srandmember",srandmemberCommand
,2,REDIS_CMD_INLINE
}, 
 501     {"sinter",sinterCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
}, 
 502     {"sinterstore",sinterstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
}, 
 503     {"sunion",sunionCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
}, 
 504     {"sunionstore",sunionstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
}, 
 505     {"sdiff",sdiffCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
}, 
 506     {"sdiffstore",sdiffstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
}, 
 507     {"smembers",sinterCommand
,2,REDIS_CMD_INLINE
}, 
 508     {"zadd",zaddCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
}, 
 509     {"zrem",zremCommand
,3,REDIS_CMD_BULK
}, 
 510     {"zremrangebyscore",zremrangebyscoreCommand
,4,REDIS_CMD_INLINE
}, 
 511     {"zrange",zrangeCommand
,4,REDIS_CMD_INLINE
}, 
 512     {"zrangebyscore",zrangebyscoreCommand
,4,REDIS_CMD_INLINE
}, 
 513     {"zrevrange",zrevrangeCommand
,4,REDIS_CMD_INLINE
}, 
 514     {"zcard",zcardCommand
,2,REDIS_CMD_INLINE
}, 
 515     {"zscore",zscoreCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
}, 
 516     {"incrby",incrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
}, 
 517     {"decrby",decrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
}, 
 518     {"getset",getsetCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
}, 
 519     {"mset",msetCommand
,-3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
}, 
 520     {"msetnx",msetnxCommand
,-3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
}, 
 521     {"randomkey",randomkeyCommand
,1,REDIS_CMD_INLINE
}, 
 522     {"select",selectCommand
,2,REDIS_CMD_INLINE
}, 
 523     {"move",moveCommand
,3,REDIS_CMD_INLINE
}, 
 524     {"rename",renameCommand
,3,REDIS_CMD_INLINE
}, 
 525     {"renamenx",renamenxCommand
,3,REDIS_CMD_INLINE
}, 
 526     {"expire",expireCommand
,3,REDIS_CMD_INLINE
}, 
 527     {"expireat",expireatCommand
,3,REDIS_CMD_INLINE
}, 
 528     {"keys",keysCommand
,2,REDIS_CMD_INLINE
}, 
 529     {"dbsize",dbsizeCommand
,1,REDIS_CMD_INLINE
}, 
 530     {"auth",authCommand
,2,REDIS_CMD_INLINE
}, 
 531     {"ping",pingCommand
,1,REDIS_CMD_INLINE
}, 
 532     {"echo",echoCommand
,2,REDIS_CMD_BULK
}, 
 533     {"save",saveCommand
,1,REDIS_CMD_INLINE
}, 
 534     {"bgsave",bgsaveCommand
,1,REDIS_CMD_INLINE
}, 
 535     {"shutdown",shutdownCommand
,1,REDIS_CMD_INLINE
}, 
 536     {"lastsave",lastsaveCommand
,1,REDIS_CMD_INLINE
}, 
 537     {"type",typeCommand
,2,REDIS_CMD_INLINE
}, 
 538     {"sync",syncCommand
,1,REDIS_CMD_INLINE
}, 
 539     {"flushdb",flushdbCommand
,1,REDIS_CMD_INLINE
}, 
 540     {"flushall",flushallCommand
,1,REDIS_CMD_INLINE
}, 
 541     {"sort",sortCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
}, 
 542     {"info",infoCommand
,1,REDIS_CMD_INLINE
}, 
 543     {"monitor",monitorCommand
,1,REDIS_CMD_INLINE
}, 
 544     {"ttl",ttlCommand
,2,REDIS_CMD_INLINE
}, 
 545     {"slaveof",slaveofCommand
,3,REDIS_CMD_INLINE
}, 
 546     {"debug",debugCommand
,-2,REDIS_CMD_INLINE
}, 
 549 /*============================ Utility functions ============================ */ 
 551 /* Glob-style pattern matching. */ 
 552 int stringmatchlen(const char *pattern
, int patternLen
, 
 553         const char *string
, int stringLen
, int nocase
) 
 558             while (pattern
[1] == '*') { 
 563                 return 1; /* match */ 
 565                 if (stringmatchlen(pattern
+1, patternLen
-1, 
 566                             string
, stringLen
, nocase
)) 
 567                     return 1; /* match */ 
 571             return 0; /* no match */ 
 575                 return 0; /* no match */ 
 585             not = pattern
[0] == '^'; 
 592                 if (pattern
[0] == '\\') { 
 595                     if (pattern
[0] == string
[0]) 
 597                 } else if (pattern
[0] == ']') { 
 599                 } else if (patternLen 
== 0) { 
 603                 } else if (pattern
[1] == '-' && patternLen 
>= 3) { 
 604                     int start 
= pattern
[0]; 
 605                     int end 
= pattern
[2]; 
 613                         start 
= tolower(start
); 
 619                     if (c 
>= start 
&& c 
<= end
) 
 623                         if (pattern
[0] == string
[0]) 
 626                         if (tolower((int)pattern
[0]) == tolower((int)string
[0])) 
 636                 return 0; /* no match */ 
 642             if (patternLen 
>= 2) { 
 649                 if (pattern
[0] != string
[0]) 
 650                     return 0; /* no match */ 
 652                 if (tolower((int)pattern
[0]) != tolower((int)string
[0])) 
 653                     return 0; /* no match */ 
 661         if (stringLen 
== 0) { 
 662             while(*pattern 
== '*') { 
 669     if (patternLen 
== 0 && stringLen 
== 0) 
 674 static void redisLog(int level
, const char *fmt
, ...) { 
 678     fp 
= (server
.logfile 
== NULL
) ? stdout 
: fopen(server
.logfile
,"a"); 
 682     if (level 
>= server
.verbosity
) { 
 688         strftime(buf
,64,"%d %b %H:%M:%S",localtime(&now
)); 
 689         fprintf(fp
,"%s %c ",buf
,c
[level
]); 
 690         vfprintf(fp
, fmt
, ap
); 
 696     if (server
.logfile
) fclose(fp
); 
 699 /*====================== Hash table type implementation  ==================== */ 
 701 /* This is an hash table type that uses the SDS dynamic strings libary as 
 702  * keys and radis objects as values (objects can hold SDS strings, 
 705 static void dictVanillaFree(void *privdata
, void *val
) 
 707     DICT_NOTUSED(privdata
); 
 711 static int sdsDictKeyCompare(void *privdata
, const void *key1
, 
 715     DICT_NOTUSED(privdata
); 
 717     l1 
= sdslen((sds
)key1
); 
 718     l2 
= sdslen((sds
)key2
); 
 719     if (l1 
!= l2
) return 0; 
 720     return memcmp(key1
, key2
, l1
) == 0; 
 723 static void dictRedisObjectDestructor(void *privdata
, void *val
) 
 725     DICT_NOTUSED(privdata
); 
 730 static int dictObjKeyCompare(void *privdata
, const void *key1
, 
 733     const robj 
*o1 
= key1
, *o2 
= key2
; 
 734     return sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
); 
 737 static unsigned int dictObjHash(const void *key
) { 
 739     return dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
)); 
 742 static int dictEncObjKeyCompare(void *privdata
, const void *key1
, 
 745     const robj 
*o1 
= key1
, *o2 
= key2
; 
 747     if (o1
->encoding 
== REDIS_ENCODING_RAW 
&& 
 748         o2
->encoding 
== REDIS_ENCODING_RAW
) 
 749         return sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
); 
 754         dec1 
= o1
->encoding 
!= REDIS_ENCODING_RAW 
? 
 755             getDecodedObject(o1
) : (robj
*)o1
; 
 756         dec2 
= o2
->encoding 
!= REDIS_ENCODING_RAW 
? 
 757             getDecodedObject(o2
) : (robj
*)o2
; 
 758         cmp 
= sdsDictKeyCompare(privdata
,dec1
->ptr
,dec2
->ptr
); 
 759         if (dec1 
!= o1
) decrRefCount(dec1
); 
 760         if (dec2 
!= o2
) decrRefCount(dec2
); 
 765 static unsigned int dictEncObjHash(const void *key
) { 
 768     if (o
->encoding 
== REDIS_ENCODING_RAW
) 
 769         return dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
)); 
 771         robj 
*dec 
= getDecodedObject(o
); 
 772         unsigned int hash 
= dictGenHashFunction(dec
->ptr
, sdslen((sds
)dec
->ptr
)); 
 778 static dictType setDictType 
= { 
 779     dictEncObjHash
,            /* hash function */ 
 782     dictEncObjKeyCompare
,      /* key compare */ 
 783     dictRedisObjectDestructor
, /* key destructor */ 
 784     NULL                       
/* val destructor */ 
 787 static dictType zsetDictType 
= { 
 788     dictEncObjHash
,            /* hash function */ 
 791     dictEncObjKeyCompare
,      /* key compare */ 
 792     dictRedisObjectDestructor
, /* key destructor */ 
 793     dictVanillaFree            
/* val destructor */ 
 796 static dictType hashDictType 
= { 
 797     dictObjHash
,                /* hash function */ 
 800     dictObjKeyCompare
,          /* key compare */ 
 801     dictRedisObjectDestructor
,  /* key destructor */ 
 802     dictRedisObjectDestructor   
/* val destructor */ 
 805 /* ========================= Random utility functions ======================= */ 
 807 /* Redis generally does not try to recover from out of memory conditions 
 808  * when allocating objects or strings, it is not clear if it will be possible 
 809  * to report this condition to the client since the networking layer itself 
 810  * is based on heap allocation for send buffers, so we simply abort. 
 811  * At least the code will be simpler to read... */ 
 812 static void oom(const char *msg
) { 
 813     fprintf(stderr
, "%s: Out of memory\n",msg
); 
 819 /* ====================== Redis server networking stuff ===================== */ 
 820 static void closeTimedoutClients(void) { 
 823     time_t now 
= time(NULL
); 
 825     listRewind(server
.clients
); 
 826     while ((ln 
= listYield(server
.clients
)) != NULL
) { 
 827         c 
= listNodeValue(ln
); 
 828         if (!(c
->flags 
& REDIS_SLAVE
) &&    /* no timeout for slaves */ 
 829             !(c
->flags 
& REDIS_MASTER
) &&   /* no timeout for masters */ 
 830              (now 
- c
->lastinteraction 
> server
.maxidletime
)) { 
 831             redisLog(REDIS_DEBUG
,"Closing idle client"); 
 837 static int htNeedsResize(dict 
*dict
) { 
 838     long long size
, used
; 
 840     size 
= dictSlots(dict
); 
 841     used 
= dictSize(dict
); 
 842     return (size 
&& used 
&& size 
> DICT_HT_INITIAL_SIZE 
&& 
 843             (used
*100/size 
< REDIS_HT_MINFILL
)); 
 846 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL 
 847  * we resize the hash table to save memory */ 
 848 static void tryResizeHashTables(void) { 
 851     for (j 
= 0; j 
< server
.dbnum
; j
++) { 
 852         if (htNeedsResize(server
.db
[j
].dict
)) { 
 853             redisLog(REDIS_DEBUG
,"The hash table %d is too sparse, resize it...",j
); 
 854             dictResize(server
.db
[j
].dict
); 
 855             redisLog(REDIS_DEBUG
,"Hash table %d resized.",j
); 
 857         if (htNeedsResize(server
.db
[j
].expires
)) 
 858             dictResize(server
.db
[j
].expires
); 
 862 static int serverCron(struct aeEventLoop 
*eventLoop
, long long id
, void *clientData
) { 
 863     int j
, loops 
= server
.cronloops
++; 
 864     REDIS_NOTUSED(eventLoop
); 
 866     REDIS_NOTUSED(clientData
); 
 868     /* Update the global state with the amount of used memory */ 
 869     server
.usedmemory 
= zmalloc_used_memory(); 
 871     /* Show some info about non-empty databases */ 
 872     for (j 
= 0; j 
< server
.dbnum
; j
++) { 
 873         long long size
, used
, vkeys
; 
 875         size 
= dictSlots(server
.db
[j
].dict
); 
 876         used 
= dictSize(server
.db
[j
].dict
); 
 877         vkeys 
= dictSize(server
.db
[j
].expires
); 
 878         if (!(loops 
% 5) && (used 
|| vkeys
)) { 
 879             redisLog(REDIS_DEBUG
,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j
,used
,vkeys
,size
); 
 880             /* dictPrintStats(server.dict); */ 
 884     /* We don't want to resize the hash tables while a bacground saving 
 885      * is in progress: the saving child is created using fork() that is 
 886      * implemented with a copy-on-write semantic in most modern systems, so 
 887      * if we resize the HT while there is the saving child at work actually 
 888      * a lot of memory movements in the parent will cause a lot of pages 
 890     if (!server
.bgsaveinprogress
) tryResizeHashTables(); 
 892     /* Show information about connected clients */ 
 894         redisLog(REDIS_DEBUG
,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects", 
 895             listLength(server
.clients
)-listLength(server
.slaves
), 
 896             listLength(server
.slaves
), 
 898             dictSize(server
.sharingpool
)); 
 901     /* Close connections of timedout clients */ 
 902     if (server
.maxidletime 
&& !(loops 
% 10)) 
 903         closeTimedoutClients(); 
 905     /* Check if a background saving in progress terminated */ 
 906     if (server
.bgsaveinprogress
) { 
 908         if (wait4(-1,&statloc
,WNOHANG
,NULL
)) { 
 909             int exitcode 
= WEXITSTATUS(statloc
); 
 910             int bysignal 
= WIFSIGNALED(statloc
); 
 912             if (!bysignal 
&& exitcode 
== 0) { 
 913                 redisLog(REDIS_NOTICE
, 
 914                     "Background saving terminated with success"); 
 916                 server
.lastsave 
= time(NULL
); 
 917             } else if (!bysignal 
&& exitcode 
!= 0) { 
 918                 redisLog(REDIS_WARNING
, "Background saving error"); 
 920                 redisLog(REDIS_WARNING
, 
 921                     "Background saving terminated by signal"); 
 922                 rdbRemoveTempFile(server
.bgsavechildpid
); 
 924             server
.bgsaveinprogress 
= 0; 
 925             server
.bgsavechildpid 
= -1; 
 926             updateSlavesWaitingBgsave(exitcode 
== 0 ? REDIS_OK 
: REDIS_ERR
); 
 929         /* If there is not a background saving in progress check if 
 930          * we have to save now */ 
 931          time_t now 
= time(NULL
); 
 932          for (j 
= 0; j 
< server
.saveparamslen
; j
++) { 
 933             struct saveparam 
*sp 
= server
.saveparams
+j
; 
 935             if (server
.dirty 
>= sp
->changes 
&& 
 936                 now
-server
.lastsave 
> sp
->seconds
) { 
 937                 redisLog(REDIS_NOTICE
,"%d changes in %d seconds. Saving...", 
 938                     sp
->changes
, sp
->seconds
); 
 939                 rdbSaveBackground(server
.dbfilename
); 
 945     /* Try to expire a few timed out keys */ 
 946     for (j 
= 0; j 
< server
.dbnum
; j
++) { 
 947         redisDb 
*db 
= server
.db
+j
; 
 948         int num 
= dictSize(db
->expires
); 
 951             time_t now 
= time(NULL
); 
 953             if (num 
> REDIS_EXPIRELOOKUPS_PER_CRON
) 
 954                 num 
= REDIS_EXPIRELOOKUPS_PER_CRON
; 
 959                 if ((de 
= dictGetRandomKey(db
->expires
)) == NULL
) break; 
 960                 t 
= (time_t) dictGetEntryVal(de
); 
 962                     deleteKey(db
,dictGetEntryKey(de
)); 
 968     /* Check if we should connect to a MASTER */ 
 969     if (server
.replstate 
== REDIS_REPL_CONNECT
) { 
 970         redisLog(REDIS_NOTICE
,"Connecting to MASTER..."); 
 971         if (syncWithMaster() == REDIS_OK
) { 
 972             redisLog(REDIS_NOTICE
,"MASTER <-> SLAVE sync succeeded"); 
 978 static void createSharedObjects(void) { 
 979     shared
.crlf 
= createObject(REDIS_STRING
,sdsnew("\r\n")); 
 980     shared
.ok 
= createObject(REDIS_STRING
,sdsnew("+OK\r\n")); 
 981     shared
.err 
= createObject(REDIS_STRING
,sdsnew("-ERR\r\n")); 
 982     shared
.emptybulk 
= createObject(REDIS_STRING
,sdsnew("$0\r\n\r\n")); 
 983     shared
.czero 
= createObject(REDIS_STRING
,sdsnew(":0\r\n")); 
 984     shared
.cone 
= createObject(REDIS_STRING
,sdsnew(":1\r\n")); 
 985     shared
.nullbulk 
= createObject(REDIS_STRING
,sdsnew("$-1\r\n")); 
 986     shared
.nullmultibulk 
= createObject(REDIS_STRING
,sdsnew("*-1\r\n")); 
 987     shared
.emptymultibulk 
= createObject(REDIS_STRING
,sdsnew("*0\r\n")); 
 989     shared
.pong 
= createObject(REDIS_STRING
,sdsnew("+PONG\r\n")); 
 990     shared
.wrongtypeerr 
= createObject(REDIS_STRING
,sdsnew( 
 991         "-ERR Operation against a key holding the wrong kind of value\r\n")); 
 992     shared
.nokeyerr 
= createObject(REDIS_STRING
,sdsnew( 
 993         "-ERR no such key\r\n")); 
 994     shared
.syntaxerr 
= createObject(REDIS_STRING
,sdsnew( 
 995         "-ERR syntax error\r\n")); 
 996     shared
.sameobjecterr 
= createObject(REDIS_STRING
,sdsnew( 
 997         "-ERR source and destination objects are the same\r\n")); 
 998     shared
.outofrangeerr 
= createObject(REDIS_STRING
,sdsnew( 
 999         "-ERR index out of range\r\n")); 
1000     shared
.space 
= createObject(REDIS_STRING
,sdsnew(" ")); 
1001     shared
.colon 
= createObject(REDIS_STRING
,sdsnew(":")); 
1002     shared
.plus 
= createObject(REDIS_STRING
,sdsnew("+")); 
1003     shared
.select0 
= createStringObject("select 0\r\n",10); 
1004     shared
.select1 
= createStringObject("select 1\r\n",10); 
1005     shared
.select2 
= createStringObject("select 2\r\n",10); 
1006     shared
.select3 
= createStringObject("select 3\r\n",10); 
1007     shared
.select4 
= createStringObject("select 4\r\n",10); 
1008     shared
.select5 
= createStringObject("select 5\r\n",10); 
1009     shared
.select6 
= createStringObject("select 6\r\n",10); 
1010     shared
.select7 
= createStringObject("select 7\r\n",10); 
1011     shared
.select8 
= createStringObject("select 8\r\n",10); 
1012     shared
.select9 
= createStringObject("select 9\r\n",10); 
1015 static void appendServerSaveParams(time_t seconds
, int changes
) { 
1016     server
.saveparams 
= zrealloc(server
.saveparams
,sizeof(struct saveparam
)*(server
.saveparamslen
+1)); 
1017     server
.saveparams
[server
.saveparamslen
].seconds 
= seconds
; 
1018     server
.saveparams
[server
.saveparamslen
].changes 
= changes
; 
1019     server
.saveparamslen
++; 
1022 static void ResetServerSaveParams() { 
1023     zfree(server
.saveparams
); 
1024     server
.saveparams 
= NULL
; 
1025     server
.saveparamslen 
= 0; 
1028 static void initServerConfig() { 
1029     server
.dbnum 
= REDIS_DEFAULT_DBNUM
; 
1030     server
.port 
= REDIS_SERVERPORT
; 
1031     server
.verbosity 
= REDIS_DEBUG
; 
1032     server
.maxidletime 
= REDIS_MAXIDLETIME
; 
1033     server
.saveparams 
= NULL
; 
1034     server
.logfile 
= NULL
; /* NULL = log on standard output */ 
1035     server
.bindaddr 
= NULL
; 
1036     server
.glueoutputbuf 
= 1; 
1037     server
.daemonize 
= 0; 
1038     server
.appendonly 
= 0; 
1039     server
.appendfsync 
= APPENDFSYNC_ALWAYS
; 
1040     server
.lastfsync 
= time(NULL
); 
1041     server
.appendfd 
= -1; 
1042     server
.appendseldb 
= -1; /* Make sure the first time will not match */ 
1043     server
.pidfile 
= "/var/run/redis.pid"; 
1044     server
.dbfilename 
= "dump.rdb"; 
1045     server
.appendfilename 
= "appendonly.log"; 
1046     server
.requirepass 
= NULL
; 
1047     server
.shareobjects 
= 0; 
1048     server
.sharingpoolsize 
= 1024; 
1049     server
.maxclients 
= 0; 
1050     server
.maxmemory 
= 0; 
1051     ResetServerSaveParams(); 
1053     appendServerSaveParams(60*60,1);  /* save after 1 hour and 1 change */ 
1054     appendServerSaveParams(300,100);  /* save after 5 minutes and 100 changes */ 
1055     appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */ 
1056     /* Replication related */ 
1058     server
.masterhost 
= NULL
; 
1059     server
.masterport 
= 6379; 
1060     server
.master 
= NULL
; 
1061     server
.replstate 
= REDIS_REPL_NONE
; 
1063     /* Double constants initialization */ 
1065     R_PosInf 
= 1.0/R_Zero
; 
1066     R_NegInf 
= -1.0/R_Zero
; 
1067     R_Nan 
= R_Zero
/R_Zero
; 
1070 static void initServer() { 
1073     signal(SIGHUP
, SIG_IGN
); 
1074     signal(SIGPIPE
, SIG_IGN
); 
1075     setupSigSegvAction(); 
1077     server
.clients 
= listCreate(); 
1078     server
.slaves 
= listCreate(); 
1079     server
.monitors 
= listCreate(); 
1080     server
.objfreelist 
= listCreate(); 
1081     createSharedObjects(); 
1082     server
.el 
= aeCreateEventLoop(); 
1083     server
.db 
= zmalloc(sizeof(redisDb
)*server
.dbnum
); 
1084     server
.sharingpool 
= dictCreate(&setDictType
,NULL
); 
1085     server
.fd 
= anetTcpServer(server
.neterr
, server
.port
, server
.bindaddr
); 
1086     if (server
.fd 
== -1) { 
1087         redisLog(REDIS_WARNING
, "Opening TCP port: %s", server
.neterr
); 
1090     for (j 
= 0; j 
< server
.dbnum
; j
++) { 
1091         server
.db
[j
].dict 
= dictCreate(&hashDictType
,NULL
); 
1092         server
.db
[j
].expires 
= dictCreate(&setDictType
,NULL
); 
1093         server
.db
[j
].id 
= j
; 
1095     server
.cronloops 
= 0; 
1096     server
.bgsaveinprogress 
= 0; 
1097     server
.bgsavechildpid 
= -1; 
1098     server
.lastsave 
= time(NULL
); 
1100     server
.usedmemory 
= 0; 
1101     server
.stat_numcommands 
= 0; 
1102     server
.stat_numconnections 
= 0; 
1103     server
.stat_starttime 
= time(NULL
); 
1104     aeCreateTimeEvent(server
.el
, 1, serverCron
, NULL
, NULL
); 
1106     if (server
.appendonly
) { 
1107         server
.appendfd 
= open(server
.appendfilename
,O_WRONLY
|O_APPEND
|O_CREAT
,0644); 
1108         if (server
.appendfd 
== -1) { 
1109             redisLog(REDIS_WARNING
, "Can't open the append-only file: %s", 
1116 /* Empty the whole database */ 
1117 static long long emptyDb() { 
1119     long long removed 
= 0; 
1121     for (j 
= 0; j 
< server
.dbnum
; j
++) { 
1122         removed 
+= dictSize(server
.db
[j
].dict
); 
1123         dictEmpty(server
.db
[j
].dict
); 
1124         dictEmpty(server
.db
[j
].expires
); 
1129 static int yesnotoi(char *s
) { 
1130     if (!strcasecmp(s
,"yes")) return 1; 
1131     else if (!strcasecmp(s
,"no")) return 0; 
1135 /* I agree, this is a very rudimental way to load a configuration... 
1136    will improve later if the config gets more complex */ 
1137 static void loadServerConfig(char *filename
) { 
1139     char buf
[REDIS_CONFIGLINE_MAX
+1], *err 
= NULL
; 
1143     if (filename
[0] == '-' && filename
[1] == '\0') 
1146         if ((fp 
= fopen(filename
,"r")) == NULL
) { 
1147             redisLog(REDIS_WARNING
,"Fatal error, can't open config file"); 
1152     while(fgets(buf
,REDIS_CONFIGLINE_MAX
+1,fp
) != NULL
) { 
1158         line 
= sdstrim(line
," \t\r\n"); 
1160         /* Skip comments and blank lines*/ 
1161         if (line
[0] == '#' || line
[0] == '\0') { 
1166         /* Split into arguments */ 
1167         argv 
= sdssplitlen(line
,sdslen(line
)," ",1,&argc
); 
1168         sdstolower(argv
[0]); 
1170         /* Execute config directives */ 
1171         if (!strcasecmp(argv
[0],"timeout") && argc 
== 2) { 
1172             server
.maxidletime 
= atoi(argv
[1]); 
1173             if (server
.maxidletime 
< 0) { 
1174                 err 
= "Invalid timeout value"; goto loaderr
; 
1176         } else if (!strcasecmp(argv
[0],"port") && argc 
== 2) { 
1177             server
.port 
= atoi(argv
[1]); 
1178             if (server
.port 
< 1 || server
.port 
> 65535) { 
1179                 err 
= "Invalid port"; goto loaderr
; 
1181         } else if (!strcasecmp(argv
[0],"bind") && argc 
== 2) { 
1182             server
.bindaddr 
= zstrdup(argv
[1]); 
1183         } else if (!strcasecmp(argv
[0],"save") && argc 
== 3) { 
1184             int seconds 
= atoi(argv
[1]); 
1185             int changes 
= atoi(argv
[2]); 
1186             if (seconds 
< 1 || changes 
< 0) { 
1187                 err 
= "Invalid save parameters"; goto loaderr
; 
1189             appendServerSaveParams(seconds
,changes
); 
1190         } else if (!strcasecmp(argv
[0],"dir") && argc 
== 2) { 
1191             if (chdir(argv
[1]) == -1) { 
1192                 redisLog(REDIS_WARNING
,"Can't chdir to '%s': %s", 
1193                     argv
[1], strerror(errno
)); 
1196         } else if (!strcasecmp(argv
[0],"loglevel") && argc 
== 2) { 
1197             if (!strcasecmp(argv
[1],"debug")) server
.verbosity 
= REDIS_DEBUG
; 
1198             else if (!strcasecmp(argv
[1],"notice")) server
.verbosity 
= REDIS_NOTICE
; 
1199             else if (!strcasecmp(argv
[1],"warning")) server
.verbosity 
= REDIS_WARNING
; 
1201                 err 
= "Invalid log level. Must be one of debug, notice, warning"; 
1204         } else if (!strcasecmp(argv
[0],"logfile") && argc 
== 2) { 
1207             server
.logfile 
= zstrdup(argv
[1]); 
1208             if (!strcasecmp(server
.logfile
,"stdout")) { 
1209                 zfree(server
.logfile
); 
1210                 server
.logfile 
= NULL
; 
1212             if (server
.logfile
) { 
1213                 /* Test if we are able to open the file. The server will not 
1214                  * be able to abort just for this problem later... */ 
1215                 logfp 
= fopen(server
.logfile
,"a"); 
1216                 if (logfp 
== NULL
) { 
1217                     err 
= sdscatprintf(sdsempty(), 
1218                         "Can't open the log file: %s", strerror(errno
)); 
1223         } else if (!strcasecmp(argv
[0],"databases") && argc 
== 2) { 
1224             server
.dbnum 
= atoi(argv
[1]); 
1225             if (server
.dbnum 
< 1) { 
1226                 err 
= "Invalid number of databases"; goto loaderr
; 
1228         } else if (!strcasecmp(argv
[0],"maxclients") && argc 
== 2) { 
1229             server
.maxclients 
= atoi(argv
[1]); 
1230         } else if (!strcasecmp(argv
[0],"maxmemory") && argc 
== 2) { 
1231             server
.maxmemory 
= strtoll(argv
[1], NULL
, 10); 
1232         } else if (!strcasecmp(argv
[0],"slaveof") && argc 
== 3) { 
1233             server
.masterhost 
= sdsnew(argv
[1]); 
1234             server
.masterport 
= atoi(argv
[2]); 
1235             server
.replstate 
= REDIS_REPL_CONNECT
; 
1236         } else if (!strcasecmp(argv
[0],"glueoutputbuf") && argc 
== 2) { 
1237             if ((server
.glueoutputbuf 
= yesnotoi(argv
[1])) == -1) { 
1238                 err 
= "argument must be 'yes' or 'no'"; goto loaderr
; 
1240         } else if (!strcasecmp(argv
[0],"shareobjects") && argc 
== 2) { 
1241             if ((server
.shareobjects 
= yesnotoi(argv
[1])) == -1) { 
1242                 err 
= "argument must be 'yes' or 'no'"; goto loaderr
; 
1244         } else if (!strcasecmp(argv
[0],"shareobjectspoolsize") && argc 
== 2) { 
1245             server
.sharingpoolsize 
= atoi(argv
[1]); 
1246             if (server
.sharingpoolsize 
< 1) { 
1247                 err 
= "invalid object sharing pool size"; goto loaderr
; 
1249         } else if (!strcasecmp(argv
[0],"daemonize") && argc 
== 2) { 
1250             if ((server
.daemonize 
= yesnotoi(argv
[1])) == -1) { 
1251                 err 
= "argument must be 'yes' or 'no'"; goto loaderr
; 
1253         } else if (!strcasecmp(argv
[0],"appendonly") && argc 
== 2) { 
1254             if ((server
.appendonly 
= yesnotoi(argv
[1])) == -1) { 
1255                 err 
= "argument must be 'yes' or 'no'"; goto loaderr
; 
1257         } else if (!strcasecmp(argv
[0],"appendfsync") && argc 
== 2) { 
1258             if (!strcasecmp(argv
[1],"no")) { 
1259                 server
.appendfsync 
= APPENDFSYNC_NO
; 
1260             } else if (!strcasecmp(argv
[1],"always")) { 
1261                 server
.appendfsync 
= APPENDFSYNC_ALWAYS
; 
1262             } else if (!strcasecmp(argv
[1],"everysec")) { 
1263                 server
.appendfsync 
= APPENDFSYNC_EVERYSEC
; 
1265                 err 
= "argument must be 'no', 'always' or 'everysec'"; 
1268         } else if (!strcasecmp(argv
[0],"requirepass") && argc 
== 2) { 
1269           server
.requirepass 
= zstrdup(argv
[1]); 
1270         } else if (!strcasecmp(argv
[0],"pidfile") && argc 
== 2) { 
1271           server
.pidfile 
= zstrdup(argv
[1]); 
1272         } else if (!strcasecmp(argv
[0],"dbfilename") && argc 
== 2) { 
1273           server
.dbfilename 
= zstrdup(argv
[1]); 
1275             err 
= "Bad directive or wrong number of arguments"; goto loaderr
; 
1277         for (j 
= 0; j 
< argc
; j
++) 
1282     if (fp 
!= stdin
) fclose(fp
); 
1286     fprintf(stderr
, "\n*** FATAL CONFIG FILE ERROR ***\n"); 
1287     fprintf(stderr
, "Reading the configuration file, at line %d\n", linenum
); 
1288     fprintf(stderr
, ">>> '%s'\n", line
); 
1289     fprintf(stderr
, "%s\n", err
); 
1293 static void freeClientArgv(redisClient 
*c
) { 
1296     for (j 
= 0; j 
< c
->argc
; j
++) 
1297         decrRefCount(c
->argv
[j
]); 
1298     for (j 
= 0; j 
< c
->mbargc
; j
++) 
1299         decrRefCount(c
->mbargv
[j
]); 
1304 static void freeClient(redisClient 
*c
) { 
1307     aeDeleteFileEvent(server
.el
,c
->fd
,AE_READABLE
); 
1308     aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
); 
1309     sdsfree(c
->querybuf
); 
1310     listRelease(c
->reply
); 
1313     ln 
= listSearchKey(server
.clients
,c
); 
1315     listDelNode(server
.clients
,ln
); 
1316     if (c
->flags 
& REDIS_SLAVE
) { 
1317         if (c
->replstate 
== REDIS_REPL_SEND_BULK 
&& c
->repldbfd 
!= -1) 
1319         list 
*l 
= (c
->flags 
& REDIS_MONITOR
) ? server
.monitors 
: server
.slaves
; 
1320         ln 
= listSearchKey(l
,c
); 
1324     if (c
->flags 
& REDIS_MASTER
) { 
1325         server
.master 
= NULL
; 
1326         server
.replstate 
= REDIS_REPL_CONNECT
; 
1333 static void glueReplyBuffersIfNeeded(redisClient 
*c
) { 
1338     listRewind(c
->reply
); 
1339     while((ln 
= listYield(c
->reply
))) { 
1341         totlen 
+= sdslen(o
->ptr
); 
1342         /* This optimization makes more sense if we don't have to copy 
1344         if (totlen 
> 1024) return; 
1350         listRewind(c
->reply
); 
1351         while((ln 
= listYield(c
->reply
))) { 
1353             memcpy(buf
+copylen
,o
->ptr
,sdslen(o
->ptr
)); 
1354             copylen 
+= sdslen(o
->ptr
); 
1355             listDelNode(c
->reply
,ln
); 
1357         /* Now the output buffer is empty, add the new single element */ 
1358         o 
= createObject(REDIS_STRING
,sdsnewlen(buf
,totlen
)); 
1359         listAddNodeTail(c
->reply
,o
); 
1363 static void sendReplyToClient(aeEventLoop 
*el
, int fd
, void *privdata
, int mask
) { 
1364     redisClient 
*c 
= privdata
; 
1365     int nwritten 
= 0, totwritten 
= 0, objlen
; 
1368     REDIS_NOTUSED(mask
); 
1370     if (server
.glueoutputbuf 
&& listLength(c
->reply
) > 1) 
1371         glueReplyBuffersIfNeeded(c
); 
1372     while(listLength(c
->reply
)) { 
1373         o 
= listNodeValue(listFirst(c
->reply
)); 
1374         objlen 
= sdslen(o
->ptr
); 
1377             listDelNode(c
->reply
,listFirst(c
->reply
)); 
1381         if (c
->flags 
& REDIS_MASTER
) { 
1382             /* Don't reply to a master */ 
1383             nwritten 
= objlen 
- c
->sentlen
; 
1385             nwritten 
= write(fd
, ((char*)o
->ptr
)+c
->sentlen
, objlen 
- c
->sentlen
); 
1386             if (nwritten 
<= 0) break; 
1388         c
->sentlen 
+= nwritten
; 
1389         totwritten 
+= nwritten
; 
1390         /* If we fully sent the object on head go to the next one */ 
1391         if (c
->sentlen 
== objlen
) { 
1392             listDelNode(c
->reply
,listFirst(c
->reply
)); 
1395         /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT 
1396          * bytes, in a single threaded server it's a good idea to server 
1397          * other clients as well, even if a very large request comes from 
1398          * super fast link that is always able to accept data (in real world 
1399          * terms think to 'KEYS *' against the loopback interfae) */ 
1400         if (totwritten 
> REDIS_MAX_WRITE_PER_EVENT
) break; 
1402     if (nwritten 
== -1) { 
1403         if (errno 
== EAGAIN
) { 
1406             redisLog(REDIS_DEBUG
, 
1407                 "Error writing to client: %s", strerror(errno
)); 
1412     if (totwritten 
> 0) c
->lastinteraction 
= time(NULL
); 
1413     if (listLength(c
->reply
) == 0) { 
1415         aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
); 
1419 static struct redisCommand 
*lookupCommand(char *name
) { 
1421     while(cmdTable
[j
].name 
!= NULL
) { 
1422         if (!strcasecmp(name
,cmdTable
[j
].name
)) return &cmdTable
[j
]; 
1428 /* resetClient prepare the client to process the next command */ 
1429 static void resetClient(redisClient 
*c
) { 
1435 /* If this function gets called we already read a whole 
1436  * command, argments are in the client argv/argc fields. 
1437  * processCommand() execute the command or prepare the 
1438  * server for a bulk read from the client. 
1440  * If 1 is returned the client is still alive and valid and 
1441  * and other operations can be performed by the caller. Otherwise 
1442  * if 0 is returned the client was destroied (i.e. after QUIT). */ 
1443 static int processCommand(redisClient 
*c
) { 
1444     struct redisCommand 
*cmd
; 
1447     /* Free some memory if needed (maxmemory setting) */ 
1448     if (server
.maxmemory
) freeMemoryIfNeeded(); 
1450     /* Handle the multi bulk command type. This is an alternative protocol 
1451      * supported by Redis in order to receive commands that are composed of 
1452      * multiple binary-safe "bulk" arguments. The latency of processing is 
1453      * a bit higher but this allows things like multi-sets, so if this 
1454      * protocol is used only for MSET and similar commands this is a big win. */ 
1455     if (c
->multibulk 
== 0 && c
->argc 
== 1 && ((char*)(c
->argv
[0]->ptr
))[0] == '*') { 
1456         c
->multibulk 
= atoi(((char*)c
->argv
[0]->ptr
)+1); 
1457         if (c
->multibulk 
<= 0) { 
1461             decrRefCount(c
->argv
[c
->argc
-1]); 
1465     } else if (c
->multibulk
) { 
1466         if (c
->bulklen 
== -1) { 
1467             if (((char*)c
->argv
[0]->ptr
)[0] != '$') { 
1468                 addReplySds(c
,sdsnew("-ERR multi bulk protocol error\r\n")); 
1472                 int bulklen 
= atoi(((char*)c
->argv
[0]->ptr
)+1); 
1473                 decrRefCount(c
->argv
[0]); 
1474                 if (bulklen 
< 0 || bulklen 
> 1024*1024*1024) { 
1476                     addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n")); 
1481                 c
->bulklen 
= bulklen
+2; /* add two bytes for CR+LF */ 
1485             c
->mbargv 
= zrealloc(c
->mbargv
,(sizeof(robj
*))*(c
->mbargc
+1)); 
1486             c
->mbargv
[c
->mbargc
] = c
->argv
[0]; 
1490             if (c
->multibulk 
== 0) { 
1494                 /* Here we need to swap the multi-bulk argc/argv with the 
1495                  * normal argc/argv of the client structure. */ 
1497                 c
->argv 
= c
->mbargv
; 
1498                 c
->mbargv 
= auxargv
; 
1501                 c
->argc 
= c
->mbargc
; 
1502                 c
->mbargc 
= auxargc
; 
1504                 /* We need to set bulklen to something different than -1 
1505                  * in order for the code below to process the command without 
1506                  * to try to read the last argument of a bulk command as 
1507                  * a special argument. */ 
1509                 /* continue below and process the command */ 
1516     /* -- end of multi bulk commands processing -- */ 
1518     /* The QUIT command is handled as a special case. Normal command 
1519      * procs are unable to close the client connection safely */ 
1520     if (!strcasecmp(c
->argv
[0]->ptr
,"quit")) { 
1524     cmd 
= lookupCommand(c
->argv
[0]->ptr
); 
1526         addReplySds(c
,sdsnew("-ERR unknown command\r\n")); 
1529     } else if ((cmd
->arity 
> 0 && cmd
->arity 
!= c
->argc
) || 
1530                (c
->argc 
< -cmd
->arity
)) { 
1531         addReplySds(c
,sdsnew("-ERR wrong number of arguments\r\n")); 
1534     } else if (server
.maxmemory 
&& cmd
->flags 
& REDIS_CMD_DENYOOM 
&& zmalloc_used_memory() > server
.maxmemory
) { 
1535         addReplySds(c
,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n")); 
1538     } else if (cmd
->flags 
& REDIS_CMD_BULK 
&& c
->bulklen 
== -1) { 
1539         int bulklen 
= atoi(c
->argv
[c
->argc
-1]->ptr
); 
1541         decrRefCount(c
->argv
[c
->argc
-1]); 
1542         if (bulklen 
< 0 || bulklen 
> 1024*1024*1024) { 
1544             addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n")); 
1549         c
->bulklen 
= bulklen
+2; /* add two bytes for CR+LF */ 
1550         /* It is possible that the bulk read is already in the 
1551          * buffer. Check this condition and handle it accordingly. 
1552          * This is just a fast path, alternative to call processInputBuffer(). 
1553          * It's a good idea since the code is small and this condition 
1554          * happens most of the times. */ 
1555         if ((signed)sdslen(c
->querybuf
) >= c
->bulklen
) { 
1556             c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2); 
1558             c
->querybuf 
= sdsrange(c
->querybuf
,c
->bulklen
,-1); 
1563     /* Let's try to share objects on the command arguments vector */ 
1564     if (server
.shareobjects
) { 
1566         for(j 
= 1; j 
< c
->argc
; j
++) 
1567             c
->argv
[j
] = tryObjectSharing(c
->argv
[j
]); 
1569     /* Let's try to encode the bulk object to save space. */ 
1570     if (cmd
->flags 
& REDIS_CMD_BULK
) 
1571         tryObjectEncoding(c
->argv
[c
->argc
-1]); 
1573     /* Check if the user is authenticated */ 
1574     if (server
.requirepass 
&& !c
->authenticated 
&& cmd
->proc 
!= authCommand
) { 
1575         addReplySds(c
,sdsnew("-ERR operation not permitted\r\n")); 
1580     /* Exec the command */ 
1581     dirty 
= server
.dirty
; 
1583     if (server
.appendonly 
&& server
.dirty
-dirty
) 
1584         feedAppendOnlyFile(cmd
,c
->db
->id
,c
->argv
,c
->argc
); 
1585     if (server
.dirty
-dirty 
&& listLength(server
.slaves
)) 
1586         replicationFeedSlaves(server
.slaves
,cmd
,c
->db
->id
,c
->argv
,c
->argc
); 
1587     if (listLength(server
.monitors
)) 
1588         replicationFeedSlaves(server
.monitors
,cmd
,c
->db
->id
,c
->argv
,c
->argc
); 
1589     server
.stat_numcommands
++; 
1591     /* Prepare the client for the next command */ 
1592     if (c
->flags 
& REDIS_CLOSE
) { 
1600 static void replicationFeedSlaves(list 
*slaves
, struct redisCommand 
*cmd
, int dictid
, robj 
**argv
, int argc
) { 
1604     /* (args*2)+1 is enough room for args, spaces, newlines */ 
1605     robj 
*static_outv
[REDIS_STATIC_ARGS
*2+1]; 
1607     if (argc 
<= REDIS_STATIC_ARGS
) { 
1610         outv 
= zmalloc(sizeof(robj
*)*(argc
*2+1)); 
1613     for (j 
= 0; j 
< argc
; j
++) { 
1614         if (j 
!= 0) outv
[outc
++] = shared
.space
; 
1615         if ((cmd
->flags 
& REDIS_CMD_BULK
) && j 
== argc
-1) { 
1618             lenobj 
= createObject(REDIS_STRING
, 
1619                 sdscatprintf(sdsempty(),"%d\r\n", 
1620                     stringObjectLen(argv
[j
]))); 
1621             lenobj
->refcount 
= 0; 
1622             outv
[outc
++] = lenobj
; 
1624         outv
[outc
++] = argv
[j
]; 
1626     outv
[outc
++] = shared
.crlf
; 
1628     /* Increment all the refcounts at start and decrement at end in order to 
1629      * be sure to free objects if there is no slave in a replication state 
1630      * able to be feed with commands */ 
1631     for (j 
= 0; j 
< outc
; j
++) incrRefCount(outv
[j
]); 
1633     while((ln 
= listYield(slaves
))) { 
1634         redisClient 
*slave 
= ln
->value
; 
1636         /* Don't feed slaves that are still waiting for BGSAVE to start */ 
1637         if (slave
->replstate 
== REDIS_REPL_WAIT_BGSAVE_START
) continue; 
1639         /* Feed all the other slaves, MONITORs and so on */ 
1640         if (slave
->slaveseldb 
!= dictid
) { 
1644             case 0: selectcmd 
= shared
.select0
; break; 
1645             case 1: selectcmd 
= shared
.select1
; break; 
1646             case 2: selectcmd 
= shared
.select2
; break; 
1647             case 3: selectcmd 
= shared
.select3
; break; 
1648             case 4: selectcmd 
= shared
.select4
; break; 
1649             case 5: selectcmd 
= shared
.select5
; break; 
1650             case 6: selectcmd 
= shared
.select6
; break; 
1651             case 7: selectcmd 
= shared
.select7
; break; 
1652             case 8: selectcmd 
= shared
.select8
; break; 
1653             case 9: selectcmd 
= shared
.select9
; break; 
1655                 selectcmd 
= createObject(REDIS_STRING
, 
1656                     sdscatprintf(sdsempty(),"select %d\r\n",dictid
)); 
1657                 selectcmd
->refcount 
= 0; 
1660             addReply(slave
,selectcmd
); 
1661             slave
->slaveseldb 
= dictid
; 
1663         for (j 
= 0; j 
< outc
; j
++) addReply(slave
,outv
[j
]); 
1665     for (j 
= 0; j 
< outc
; j
++) decrRefCount(outv
[j
]); 
1666     if (outv 
!= static_outv
) zfree(outv
); 
1669 static void processInputBuffer(redisClient 
*c
) { 
1671     if (c
->bulklen 
== -1) { 
1672         /* Read the first line of the query */ 
1673         char *p 
= strchr(c
->querybuf
,'\n'); 
1680             query 
= c
->querybuf
; 
1681             c
->querybuf 
= sdsempty(); 
1682             querylen 
= 1+(p
-(query
)); 
1683             if (sdslen(query
) > querylen
) { 
1684                 /* leave data after the first line of the query in the buffer */ 
1685                 c
->querybuf 
= sdscatlen(c
->querybuf
,query
+querylen
,sdslen(query
)-querylen
); 
1687             *p 
= '\0'; /* remove "\n" */ 
1688             if (*(p
-1) == '\r') *(p
-1) = '\0'; /* and "\r" if any */ 
1689             sdsupdatelen(query
); 
1691             /* Now we can split the query in arguments */ 
1692             if (sdslen(query
) == 0) { 
1693                 /* Ignore empty query */ 
1697             argv 
= sdssplitlen(query
,sdslen(query
)," ",1,&argc
); 
1700             if (c
->argv
) zfree(c
->argv
); 
1701             c
->argv 
= zmalloc(sizeof(robj
*)*argc
); 
1703             for (j 
= 0; j 
< argc
; j
++) { 
1704                 if (sdslen(argv
[j
])) { 
1705                     c
->argv
[c
->argc
] = createObject(REDIS_STRING
,argv
[j
]); 
1712             /* Execute the command. If the client is still valid 
1713              * after processCommand() return and there is something 
1714              * on the query buffer try to process the next command. */ 
1715             if (c
->argc 
&& processCommand(c
) && sdslen(c
->querybuf
)) goto again
; 
1717         } else if (sdslen(c
->querybuf
) >= REDIS_REQUEST_MAX_SIZE
) { 
1718             redisLog(REDIS_DEBUG
, "Client protocol error"); 
1723         /* Bulk read handling. Note that if we are at this point 
1724            the client already sent a command terminated with a newline, 
1725            we are reading the bulk data that is actually the last 
1726            argument of the command. */ 
1727         int qbl 
= sdslen(c
->querybuf
); 
1729         if (c
->bulklen 
<= qbl
) { 
1730             /* Copy everything but the final CRLF as final argument */ 
1731             c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2); 
1733             c
->querybuf 
= sdsrange(c
->querybuf
,c
->bulklen
,-1); 
1734             /* Process the command. If the client is still valid after 
1735              * the processing and there is more data in the buffer 
1736              * try to parse it. */ 
1737             if (processCommand(c
) && sdslen(c
->querybuf
)) goto again
; 
1743 static void readQueryFromClient(aeEventLoop 
*el
, int fd
, void *privdata
, int mask
) { 
1744     redisClient 
*c 
= (redisClient
*) privdata
; 
1745     char buf
[REDIS_IOBUF_LEN
]; 
1748     REDIS_NOTUSED(mask
); 
1750     nread 
= read(fd
, buf
, REDIS_IOBUF_LEN
); 
1752         if (errno 
== EAGAIN
) { 
1755             redisLog(REDIS_DEBUG
, "Reading from client: %s",strerror(errno
)); 
1759     } else if (nread 
== 0) { 
1760         redisLog(REDIS_DEBUG
, "Client closed connection"); 
1765         c
->querybuf 
= sdscatlen(c
->querybuf
, buf
, nread
); 
1766         c
->lastinteraction 
= time(NULL
); 
1770     processInputBuffer(c
); 
1773 static int selectDb(redisClient 
*c
, int id
) { 
1774     if (id 
< 0 || id 
>= server
.dbnum
) 
1776     c
->db 
= &server
.db
[id
]; 
1780 static void *dupClientReplyValue(void *o
) { 
1781     incrRefCount((robj
*)o
); 
1785 static redisClient 
*createClient(int fd
) { 
1786     redisClient 
*c 
= zmalloc(sizeof(*c
)); 
1788     anetNonBlock(NULL
,fd
); 
1789     anetTcpNoDelay(NULL
,fd
); 
1790     if (!c
) return NULL
; 
1793     c
->querybuf 
= sdsempty(); 
1802     c
->lastinteraction 
= time(NULL
); 
1803     c
->authenticated 
= 0; 
1804     c
->replstate 
= REDIS_REPL_NONE
; 
1805     c
->reply 
= listCreate(); 
1806     listSetFreeMethod(c
->reply
,decrRefCount
); 
1807     listSetDupMethod(c
->reply
,dupClientReplyValue
); 
1808     if (aeCreateFileEvent(server
.el
, c
->fd
, AE_READABLE
, 
1809         readQueryFromClient
, c
, NULL
) == AE_ERR
) { 
1813     listAddNodeTail(server
.clients
,c
); 
1817 static void addReply(redisClient 
*c
, robj 
*obj
) { 
1818     if (listLength(c
->reply
) == 0 && 
1819         (c
->replstate 
== REDIS_REPL_NONE 
|| 
1820          c
->replstate 
== REDIS_REPL_ONLINE
) && 
1821         aeCreateFileEvent(server
.el
, c
->fd
, AE_WRITABLE
, 
1822         sendReplyToClient
, c
, NULL
) == AE_ERR
) return; 
1823     if (obj
->encoding 
!= REDIS_ENCODING_RAW
) { 
1824         obj 
= getDecodedObject(obj
); 
1828     listAddNodeTail(c
->reply
,obj
); 
1831 static void addReplySds(redisClient 
*c
, sds s
) { 
1832     robj 
*o 
= createObject(REDIS_STRING
,s
); 
1837 static void addReplyBulkLen(redisClient 
*c
, robj 
*obj
) { 
1840     if (obj
->encoding 
== REDIS_ENCODING_RAW
) { 
1841         len 
= sdslen(obj
->ptr
); 
1843         long n 
= (long)obj
->ptr
; 
1850         while((n 
= n
/10) != 0) { 
1854     addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",len
)); 
1857 static void acceptHandler(aeEventLoop 
*el
, int fd
, void *privdata
, int mask
) { 
1862     REDIS_NOTUSED(mask
); 
1863     REDIS_NOTUSED(privdata
); 
1865     cfd 
= anetAccept(server
.neterr
, fd
, cip
, &cport
); 
1866     if (cfd 
== AE_ERR
) { 
1867         redisLog(REDIS_DEBUG
,"Accepting client connection: %s", server
.neterr
); 
1870     redisLog(REDIS_DEBUG
,"Accepted %s:%d", cip
, cport
); 
1871     if ((c 
= createClient(cfd
)) == NULL
) { 
1872         redisLog(REDIS_WARNING
,"Error allocating resoures for the client"); 
1873         close(cfd
); /* May be already closed, just ingore errors */ 
1876     /* If maxclient directive is set and this is one client more... close the 
1877      * connection. Note that we create the client instead to check before 
1878      * for this condition, since now the socket is already set in nonblocking 
1879      * mode and we can send an error for free using the Kernel I/O */ 
1880     if (server
.maxclients 
&& listLength(server
.clients
) > server
.maxclients
) { 
1881         char *err 
= "-ERR max number of clients reached\r\n"; 
1883         /* That's a best effort error message, don't check write errors */ 
1884         (void) write(c
->fd
,err
,strlen(err
)); 
1888     server
.stat_numconnections
++; 
1891 /* ======================= Redis objects implementation ===================== */ 
1893 static robj 
*createObject(int type
, void *ptr
) { 
1896     if (listLength(server
.objfreelist
)) { 
1897         listNode 
*head 
= listFirst(server
.objfreelist
); 
1898         o 
= listNodeValue(head
); 
1899         listDelNode(server
.objfreelist
,head
); 
1901         o 
= zmalloc(sizeof(*o
)); 
1904     o
->encoding 
= REDIS_ENCODING_RAW
; 
1910 static robj 
*createStringObject(char *ptr
, size_t len
) { 
1911     return createObject(REDIS_STRING
,sdsnewlen(ptr
,len
)); 
1914 static robj 
*createListObject(void) { 
1915     list 
*l 
= listCreate(); 
1917     listSetFreeMethod(l
,decrRefCount
); 
1918     return createObject(REDIS_LIST
,l
); 
1921 static robj 
*createSetObject(void) { 
1922     dict 
*d 
= dictCreate(&setDictType
,NULL
); 
1923     return createObject(REDIS_SET
,d
); 
1926 static robj 
*createZsetObject(void) { 
1927     zset 
*zs 
= zmalloc(sizeof(*zs
)); 
1929     zs
->dict 
= dictCreate(&zsetDictType
,NULL
); 
1930     zs
->zsl 
= zslCreate(); 
1931     return createObject(REDIS_ZSET
,zs
); 
1934 static void freeStringObject(robj 
*o
) { 
1935     if (o
->encoding 
== REDIS_ENCODING_RAW
) { 
1940 static void freeListObject(robj 
*o
) { 
1941     listRelease((list
*) o
->ptr
); 
1944 static void freeSetObject(robj 
*o
) { 
1945     dictRelease((dict
*) o
->ptr
); 
1948 static void freeZsetObject(robj 
*o
) { 
1951     dictRelease(zs
->dict
); 
1956 static void freeHashObject(robj 
*o
) { 
1957     dictRelease((dict
*) o
->ptr
); 
1960 static void incrRefCount(robj 
*o
) { 
1962 #ifdef DEBUG_REFCOUNT 
1963     if (o
->type 
== REDIS_STRING
) 
1964         printf("Increment '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
); 
1968 static void decrRefCount(void *obj
) { 
1971 #ifdef DEBUG_REFCOUNT 
1972     if (o
->type 
== REDIS_STRING
) 
1973         printf("Decrement '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
-1); 
1975     if (--(o
->refcount
) == 0) { 
1977         case REDIS_STRING
: freeStringObject(o
); break; 
1978         case REDIS_LIST
: freeListObject(o
); break; 
1979         case REDIS_SET
: freeSetObject(o
); break; 
1980         case REDIS_ZSET
: freeZsetObject(o
); break; 
1981         case REDIS_HASH
: freeHashObject(o
); break; 
1982         default: assert(0 != 0); break; 
1984         if (listLength(server
.objfreelist
) > REDIS_OBJFREELIST_MAX 
|| 
1985             !listAddNodeHead(server
.objfreelist
,o
)) 
1990 static robj 
*lookupKey(redisDb 
*db
, robj 
*key
) { 
1991     dictEntry 
*de 
= dictFind(db
->dict
,key
); 
1992     return de 
? dictGetEntryVal(de
) : NULL
; 
1995 static robj 
*lookupKeyRead(redisDb 
*db
, robj 
*key
) { 
1996     expireIfNeeded(db
,key
); 
1997     return lookupKey(db
,key
); 
2000 static robj 
*lookupKeyWrite(redisDb 
*db
, robj 
*key
) { 
2001     deleteIfVolatile(db
,key
); 
2002     return lookupKey(db
,key
); 
2005 static int deleteKey(redisDb 
*db
, robj 
*key
) { 
2008     /* We need to protect key from destruction: after the first dictDelete() 
2009      * it may happen that 'key' is no longer valid if we don't increment 
2010      * it's count. This may happen when we get the object reference directly 
2011      * from the hash table with dictRandomKey() or dict iterators */ 
2013     if (dictSize(db
->expires
)) dictDelete(db
->expires
,key
); 
2014     retval 
= dictDelete(db
->dict
,key
); 
2017     return retval 
== DICT_OK
; 
2020 /* Try to share an object against the shared objects pool */ 
2021 static robj 
*tryObjectSharing(robj 
*o
) { 
2022     struct dictEntry 
*de
; 
2025     if (o 
== NULL 
|| server
.shareobjects 
== 0) return o
; 
2027     assert(o
->type 
== REDIS_STRING
); 
2028     de 
= dictFind(server
.sharingpool
,o
); 
2030         robj 
*shared 
= dictGetEntryKey(de
); 
2032         c 
= ((unsigned long) dictGetEntryVal(de
))+1; 
2033         dictGetEntryVal(de
) = (void*) c
; 
2034         incrRefCount(shared
); 
2038         /* Here we are using a stream algorihtm: Every time an object is 
2039          * shared we increment its count, everytime there is a miss we 
2040          * recrement the counter of a random object. If this object reaches 
2041          * zero we remove the object and put the current object instead. */ 
2042         if (dictSize(server
.sharingpool
) >= 
2043                 server
.sharingpoolsize
) { 
2044             de 
= dictGetRandomKey(server
.sharingpool
); 
2046             c 
= ((unsigned long) dictGetEntryVal(de
))-1; 
2047             dictGetEntryVal(de
) = (void*) c
; 
2049                 dictDelete(server
.sharingpool
,de
->key
); 
2052             c 
= 0; /* If the pool is empty we want to add this object */ 
2057             retval 
= dictAdd(server
.sharingpool
,o
,(void*)1); 
2058             assert(retval 
== DICT_OK
); 
2065 /* Check if the nul-terminated string 's' can be represented by a long 
2066  * (that is, is a number that fits into long without any other space or 
2067  * character before or after the digits). 
2069  * If so, the function returns REDIS_OK and *longval is set to the value 
2070  * of the number. Otherwise REDIS_ERR is returned */ 
2071 static int isStringRepresentableAsLong(sds s
, long *longval
) { 
2072     char buf
[32], *endptr
; 
2076     value 
= strtol(s
, &endptr
, 10); 
2077     if (endptr
[0] != '\0') return REDIS_ERR
; 
2078     slen 
= snprintf(buf
,32,"%ld",value
); 
2080     /* If the number converted back into a string is not identical 
2081      * then it's not possible to encode the string as integer */ 
2082     if (sdslen(s
) != (unsigned)slen 
|| memcmp(buf
,s
,slen
)) return REDIS_ERR
; 
2083     if (longval
) *longval 
= value
; 
2087 /* Try to encode a string object in order to save space */ 
2088 static int tryObjectEncoding(robj 
*o
) { 
2092     if (o
->encoding 
!= REDIS_ENCODING_RAW
) 
2093         return REDIS_ERR
; /* Already encoded */ 
2095     /* It's not save to encode shared objects: shared objects can be shared 
2096      * everywhere in the "object space" of Redis. Encoded objects can only 
2097      * appear as "values" (and not, for instance, as keys) */ 
2098      if (o
->refcount 
> 1) return REDIS_ERR
; 
2100     /* Currently we try to encode only strings */ 
2101     assert(o
->type 
== REDIS_STRING
); 
2103     /* Check if we can represent this string as a long integer */ 
2104     if (isStringRepresentableAsLong(s
,&value
) == REDIS_ERR
) return REDIS_ERR
; 
2106     /* Ok, this object can be encoded */ 
2107     o
->encoding 
= REDIS_ENCODING_INT
; 
2109     o
->ptr 
= (void*) value
; 
2113 /* Get a decoded version of an encoded object (returned as a new object) */ 
2114 static robj 
*getDecodedObject(const robj 
*o
) { 
2117     assert(o
->encoding 
!= REDIS_ENCODING_RAW
); 
2118     if (o
->type 
== REDIS_STRING 
&& o
->encoding 
== REDIS_ENCODING_INT
) { 
2121         snprintf(buf
,32,"%ld",(long)o
->ptr
); 
2122         dec 
= createStringObject(buf
,strlen(buf
)); 
2129 /* Compare two string objects via strcmp() or alike. 
2130  * Note that the objects may be integer-encoded. In such a case we 
2131  * use snprintf() to get a string representation of the numbers on the stack 
2132  * and compare the strings, it's much faster than calling getDecodedObject(). */ 
2133 static int compareStringObjects(robj 
*a
, robj 
*b
) { 
2134     assert(a
->type 
== REDIS_STRING 
&& b
->type 
== REDIS_STRING
); 
2135     char bufa
[128], bufb
[128], *astr
, *bstr
; 
2138     if (a 
== b
) return 0; 
2139     if (a
->encoding 
!= REDIS_ENCODING_RAW
) { 
2140         snprintf(bufa
,sizeof(bufa
),"%ld",(long) a
->ptr
); 
2146     if (b
->encoding 
!= REDIS_ENCODING_RAW
) { 
2147         snprintf(bufb
,sizeof(bufb
),"%ld",(long) b
->ptr
); 
2153     return bothsds 
? sdscmp(astr
,bstr
) : strcmp(astr
,bstr
); 
2156 static size_t stringObjectLen(robj 
*o
) { 
2157     assert(o
->type 
== REDIS_STRING
); 
2158     if (o
->encoding 
== REDIS_ENCODING_RAW
) { 
2159         return sdslen(o
->ptr
); 
2163         return snprintf(buf
,32,"%ld",(long)o
->ptr
); 
2167 /*============================ DB saving/loading ============================ */ 
2169 static int rdbSaveType(FILE *fp
, unsigned char type
) { 
2170     if (fwrite(&type
,1,1,fp
) == 0) return -1; 
2174 static int rdbSaveTime(FILE *fp
, time_t t
) { 
2175     int32_t t32 
= (int32_t) t
; 
2176     if (fwrite(&t32
,4,1,fp
) == 0) return -1; 
2180 /* check rdbLoadLen() comments for more info */ 
2181 static int rdbSaveLen(FILE *fp
, uint32_t len
) { 
2182     unsigned char buf
[2]; 
2185         /* Save a 6 bit len */ 
2186         buf
[0] = (len
&0xFF)|(REDIS_RDB_6BITLEN
<<6); 
2187         if (fwrite(buf
,1,1,fp
) == 0) return -1; 
2188     } else if (len 
< (1<<14)) { 
2189         /* Save a 14 bit len */ 
2190         buf
[0] = ((len
>>8)&0xFF)|(REDIS_RDB_14BITLEN
<<6); 
2192         if (fwrite(buf
,2,1,fp
) == 0) return -1; 
2194         /* Save a 32 bit len */ 
2195         buf
[0] = (REDIS_RDB_32BITLEN
<<6); 
2196         if (fwrite(buf
,1,1,fp
) == 0) return -1; 
2198         if (fwrite(&len
,4,1,fp
) == 0) return -1; 
2203 /* String objects in the form "2391" "-100" without any space and with a 
2204  * range of values that can fit in an 8, 16 or 32 bit signed value can be 
2205  * encoded as integers to save space */ 
2206 static int rdbTryIntegerEncoding(sds s
, unsigned char *enc
) { 
2208     char *endptr
, buf
[32]; 
2210     /* Check if it's possible to encode this value as a number */ 
2211     value 
= strtoll(s
, &endptr
, 10); 
2212     if (endptr
[0] != '\0') return 0; 
2213     snprintf(buf
,32,"%lld",value
); 
2215     /* If the number converted back into a string is not identical 
2216      * then it's not possible to encode the string as integer */ 
2217     if (strlen(buf
) != sdslen(s
) || memcmp(buf
,s
,sdslen(s
))) return 0; 
2219     /* Finally check if it fits in our ranges */ 
2220     if (value 
>= -(1<<7) && value 
<= (1<<7)-1) { 
2221         enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT8
; 
2222         enc
[1] = value
&0xFF; 
2224     } else if (value 
>= -(1<<15) && value 
<= (1<<15)-1) { 
2225         enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT16
; 
2226         enc
[1] = value
&0xFF; 
2227         enc
[2] = (value
>>8)&0xFF; 
2229     } else if (value 
>= -((long long)1<<31) && value 
<= ((long long)1<<31)-1) { 
2230         enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT32
; 
2231         enc
[1] = value
&0xFF; 
2232         enc
[2] = (value
>>8)&0xFF; 
2233         enc
[3] = (value
>>16)&0xFF; 
2234         enc
[4] = (value
>>24)&0xFF; 
2241 static int rdbSaveLzfStringObject(FILE *fp
, robj 
*obj
) { 
2242     unsigned int comprlen
, outlen
; 
2246     /* We require at least four bytes compression for this to be worth it */ 
2247     outlen 
= sdslen(obj
->ptr
)-4; 
2248     if (outlen 
<= 0) return 0; 
2249     if ((out 
= zmalloc(outlen
+1)) == NULL
) return 0; 
2250     comprlen 
= lzf_compress(obj
->ptr
, sdslen(obj
->ptr
), out
, outlen
); 
2251     if (comprlen 
== 0) { 
2255     /* Data compressed! Let's save it on disk */ 
2256     byte 
= (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_LZF
; 
2257     if (fwrite(&byte
,1,1,fp
) == 0) goto writeerr
; 
2258     if (rdbSaveLen(fp
,comprlen
) == -1) goto writeerr
; 
2259     if (rdbSaveLen(fp
,sdslen(obj
->ptr
)) == -1) goto writeerr
; 
2260     if (fwrite(out
,comprlen
,1,fp
) == 0) goto writeerr
; 
2269 /* Save a string objet as [len][data] on disk. If the object is a string 
2270  * representation of an integer value we try to safe it in a special form */ 
2271 static int rdbSaveStringObjectRaw(FILE *fp
, robj 
*obj
) { 
2275     len 
= sdslen(obj
->ptr
); 
2277     /* Try integer encoding */ 
2279         unsigned char buf
[5]; 
2280         if ((enclen 
= rdbTryIntegerEncoding(obj
->ptr
,buf
)) > 0) { 
2281             if (fwrite(buf
,enclen
,1,fp
) == 0) return -1; 
2286     /* Try LZF compression - under 20 bytes it's unable to compress even 
2287      * aaaaaaaaaaaaaaaaaa so skip it */ 
2291         retval 
= rdbSaveLzfStringObject(fp
,obj
); 
2292         if (retval 
== -1) return -1; 
2293         if (retval 
> 0) return 0; 
2294         /* retval == 0 means data can't be compressed, save the old way */ 
2297     /* Store verbatim */ 
2298     if (rdbSaveLen(fp
,len
) == -1) return -1; 
2299     if (len 
&& fwrite(obj
->ptr
,len
,1,fp
) == 0) return -1; 
2303 /* Like rdbSaveStringObjectRaw() but handle encoded objects */ 
2304 static int rdbSaveStringObject(FILE *fp
, robj 
*obj
) { 
2308     if (obj
->encoding 
!= REDIS_ENCODING_RAW
) { 
2309         dec 
= getDecodedObject(obj
); 
2310         retval 
= rdbSaveStringObjectRaw(fp
,dec
); 
2314         return rdbSaveStringObjectRaw(fp
,obj
); 
2318 /* Save a double value. Doubles are saved as strings prefixed by an unsigned 
2319  * 8 bit integer specifing the length of the representation. 
2320  * This 8 bit integer has special values in order to specify the following 
2326 static int rdbSaveDoubleValue(FILE *fp
, double val
) { 
2327     unsigned char buf
[128]; 
2333     } else if (!isfinite(val
)) { 
2335         buf
[0] = (val 
< 0) ? 255 : 254; 
2337         snprintf((char*)buf
+1,sizeof(buf
)-1,"%.16g",val
); 
2338         buf
[0] = strlen((char*)buf
); 
2341     if (fwrite(buf
,len
,1,fp
) == 0) return -1; 
2345 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */ 
2346 static int rdbSave(char *filename
) { 
2347     dictIterator 
*di 
= NULL
; 
2352     time_t now 
= time(NULL
); 
2354     snprintf(tmpfile
,256,"temp-%d.rdb", (int) getpid()); 
2355     fp 
= fopen(tmpfile
,"w"); 
2357         redisLog(REDIS_WARNING
, "Failed saving the DB: %s", strerror(errno
)); 
2360     if (fwrite("REDIS0001",9,1,fp
) == 0) goto werr
; 
2361     for (j 
= 0; j 
< server
.dbnum
; j
++) { 
2362         redisDb 
*db 
= server
.db
+j
; 
2364         if (dictSize(d
) == 0) continue; 
2365         di 
= dictGetIterator(d
); 
2371         /* Write the SELECT DB opcode */ 
2372         if (rdbSaveType(fp
,REDIS_SELECTDB
) == -1) goto werr
; 
2373         if (rdbSaveLen(fp
,j
) == -1) goto werr
; 
2375         /* Iterate this DB writing every entry */ 
2376         while((de 
= dictNext(di
)) != NULL
) { 
2377             robj 
*key 
= dictGetEntryKey(de
); 
2378             robj 
*o 
= dictGetEntryVal(de
); 
2379             time_t expiretime 
= getExpire(db
,key
); 
2381             /* Save the expire time */ 
2382             if (expiretime 
!= -1) { 
2383                 /* If this key is already expired skip it */ 
2384                 if (expiretime 
< now
) continue; 
2385                 if (rdbSaveType(fp
,REDIS_EXPIRETIME
) == -1) goto werr
; 
2386                 if (rdbSaveTime(fp
,expiretime
) == -1) goto werr
; 
2388             /* Save the key and associated value */ 
2389             if (rdbSaveType(fp
,o
->type
) == -1) goto werr
; 
2390             if (rdbSaveStringObject(fp
,key
) == -1) goto werr
; 
2391             if (o
->type 
== REDIS_STRING
) { 
2392                 /* Save a string value */ 
2393                 if (rdbSaveStringObject(fp
,o
) == -1) goto werr
; 
2394             } else if (o
->type 
== REDIS_LIST
) { 
2395                 /* Save a list value */ 
2396                 list 
*list 
= o
->ptr
; 
2400                 if (rdbSaveLen(fp
,listLength(list
)) == -1) goto werr
; 
2401                 while((ln 
= listYield(list
))) { 
2402                     robj 
*eleobj 
= listNodeValue(ln
); 
2404                     if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
; 
2406             } else if (o
->type 
== REDIS_SET
) { 
2407                 /* Save a set value */ 
2409                 dictIterator 
*di 
= dictGetIterator(set
); 
2412                 if (rdbSaveLen(fp
,dictSize(set
)) == -1) goto werr
; 
2413                 while((de 
= dictNext(di
)) != NULL
) { 
2414                     robj 
*eleobj 
= dictGetEntryKey(de
); 
2416                     if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
; 
2418                 dictReleaseIterator(di
); 
2419             } else if (o
->type 
== REDIS_ZSET
) { 
2420                 /* Save a set value */ 
2422                 dictIterator 
*di 
= dictGetIterator(zs
->dict
); 
2425                 if (rdbSaveLen(fp
,dictSize(zs
->dict
)) == -1) goto werr
; 
2426                 while((de 
= dictNext(di
)) != NULL
) { 
2427                     robj 
*eleobj 
= dictGetEntryKey(de
); 
2428                     double *score 
= dictGetEntryVal(de
); 
2430                     if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
; 
2431                     if (rdbSaveDoubleValue(fp
,*score
) == -1) goto werr
; 
2433                 dictReleaseIterator(di
); 
2438         dictReleaseIterator(di
); 
2441     if (rdbSaveType(fp
,REDIS_EOF
) == -1) goto werr
; 
2443     /* Make sure data will not remain on the OS's output buffers */ 
2448     /* Use RENAME to make sure the DB file is changed atomically only 
2449      * if the generate DB file is ok. */ 
2450     if (rename(tmpfile
,filename
) == -1) { 
2451         redisLog(REDIS_WARNING
,"Error moving temp DB file on the final destination: %s", strerror(errno
)); 
2455     redisLog(REDIS_NOTICE
,"DB saved on disk"); 
2457     server
.lastsave 
= time(NULL
); 
2463     redisLog(REDIS_WARNING
,"Write error saving DB on disk: %s", strerror(errno
)); 
2464     if (di
) dictReleaseIterator(di
); 
2468 static int rdbSaveBackground(char *filename
) { 
2471     if (server
.bgsaveinprogress
) return REDIS_ERR
; 
2472     if ((childpid 
= fork()) == 0) { 
2475         if (rdbSave(filename
) == REDIS_OK
) { 
2482         if (childpid 
== -1) { 
2483             redisLog(REDIS_WARNING
,"Can't save in background: fork: %s", 
2487         redisLog(REDIS_NOTICE
,"Background saving started by pid %d",childpid
); 
2488         server
.bgsaveinprogress 
= 1; 
2489         server
.bgsavechildpid 
= childpid
; 
2492     return REDIS_OK
; /* unreached */ 
2495 static void rdbRemoveTempFile(pid_t childpid
) { 
2498     snprintf(tmpfile
,256,"temp-%d.rdb", (int) childpid
); 
2502 static int rdbLoadType(FILE *fp
) { 
2504     if (fread(&type
,1,1,fp
) == 0) return -1; 
2508 static time_t rdbLoadTime(FILE *fp
) { 
2510     if (fread(&t32
,4,1,fp
) == 0) return -1; 
2511     return (time_t) t32
; 
2514 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top 
2515  * of this file for a description of how this are stored on disk. 
2517  * isencoded is set to 1 if the readed length is not actually a length but 
2518  * an "encoding type", check the above comments for more info */ 
2519 static uint32_t rdbLoadLen(FILE *fp
, int rdbver
, int *isencoded
) { 
2520     unsigned char buf
[2]; 
2523     if (isencoded
) *isencoded 
= 0; 
2525         if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
; 
2530         if (fread(buf
,1,1,fp
) == 0) return REDIS_RDB_LENERR
; 
2531         type 
= (buf
[0]&0xC0)>>6; 
2532         if (type 
== REDIS_RDB_6BITLEN
) { 
2533             /* Read a 6 bit len */ 
2535         } else if (type 
== REDIS_RDB_ENCVAL
) { 
2536             /* Read a 6 bit len encoding type */ 
2537             if (isencoded
) *isencoded 
= 1; 
2539         } else if (type 
== REDIS_RDB_14BITLEN
) { 
2540             /* Read a 14 bit len */ 
2541             if (fread(buf
+1,1,1,fp
) == 0) return REDIS_RDB_LENERR
; 
2542             return ((buf
[0]&0x3F)<<8)|buf
[1]; 
2544             /* Read a 32 bit len */ 
2545             if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
; 
2551 static robj 
*rdbLoadIntegerObject(FILE *fp
, int enctype
) { 
2552     unsigned char enc
[4]; 
2555     if (enctype 
== REDIS_RDB_ENC_INT8
) { 
2556         if (fread(enc
,1,1,fp
) == 0) return NULL
; 
2557         val 
= (signed char)enc
[0]; 
2558     } else if (enctype 
== REDIS_RDB_ENC_INT16
) { 
2560         if (fread(enc
,2,1,fp
) == 0) return NULL
; 
2561         v 
= enc
[0]|(enc
[1]<<8); 
2563     } else if (enctype 
== REDIS_RDB_ENC_INT32
) { 
2565         if (fread(enc
,4,1,fp
) == 0) return NULL
; 
2566         v 
= enc
[0]|(enc
[1]<<8)|(enc
[2]<<16)|(enc
[3]<<24); 
2569         val 
= 0; /* anti-warning */ 
2572     return createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",val
)); 
2575 static robj 
*rdbLoadLzfStringObject(FILE*fp
, int rdbver
) { 
2576     unsigned int len
, clen
; 
2577     unsigned char *c 
= NULL
; 
2580     if ((clen 
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
; 
2581     if ((len 
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
; 
2582     if ((c 
= zmalloc(clen
)) == NULL
) goto err
; 
2583     if ((val 
= sdsnewlen(NULL
,len
)) == NULL
) goto err
; 
2584     if (fread(c
,clen
,1,fp
) == 0) goto err
; 
2585     if (lzf_decompress(c
,clen
,val
,len
) == 0) goto err
; 
2587     return createObject(REDIS_STRING
,val
); 
2594 static robj 
*rdbLoadStringObject(FILE*fp
, int rdbver
) { 
2599     len 
= rdbLoadLen(fp
,rdbver
,&isencoded
); 
2602         case REDIS_RDB_ENC_INT8
: 
2603         case REDIS_RDB_ENC_INT16
: 
2604         case REDIS_RDB_ENC_INT32
: 
2605             return tryObjectSharing(rdbLoadIntegerObject(fp
,len
)); 
2606         case REDIS_RDB_ENC_LZF
: 
2607             return tryObjectSharing(rdbLoadLzfStringObject(fp
,rdbver
)); 
2613     if (len 
== REDIS_RDB_LENERR
) return NULL
; 
2614     val 
= sdsnewlen(NULL
,len
); 
2615     if (len 
&& fread(val
,len
,1,fp
) == 0) { 
2619     return tryObjectSharing(createObject(REDIS_STRING
,val
)); 
2622 /* For information about double serialization check rdbSaveDoubleValue() */ 
2623 static int rdbLoadDoubleValue(FILE *fp
, double *val
) { 
2627     if (fread(&len
,1,1,fp
) == 0) return -1; 
2629     case 255: *val 
= R_NegInf
; return 0; 
2630     case 254: *val 
= R_PosInf
; return 0; 
2631     case 253: *val 
= R_Nan
; return 0; 
2633         if (fread(buf
,len
,1,fp
) == 0) return -1; 
2634         sscanf(buf
, "%lg", val
); 
2639 static int rdbLoad(char *filename
) { 
2641     robj 
*keyobj 
= NULL
; 
2643     int type
, retval
, rdbver
; 
2644     dict 
*d 
= server
.db
[0].dict
; 
2645     redisDb 
*db 
= server
.db
+0; 
2647     time_t expiretime 
= -1, now 
= time(NULL
); 
2649     fp 
= fopen(filename
,"r"); 
2650     if (!fp
) return REDIS_ERR
; 
2651     if (fread(buf
,9,1,fp
) == 0) goto eoferr
; 
2653     if (memcmp(buf
,"REDIS",5) != 0) { 
2655         redisLog(REDIS_WARNING
,"Wrong signature trying to load DB from file"); 
2658     rdbver 
= atoi(buf
+5); 
2661         redisLog(REDIS_WARNING
,"Can't handle RDB format version %d",rdbver
); 
2668         if ((type 
= rdbLoadType(fp
)) == -1) goto eoferr
; 
2669         if (type 
== REDIS_EXPIRETIME
) { 
2670             if ((expiretime 
= rdbLoadTime(fp
)) == -1) goto eoferr
; 
2671             /* We read the time so we need to read the object type again */ 
2672             if ((type 
= rdbLoadType(fp
)) == -1) goto eoferr
; 
2674         if (type 
== REDIS_EOF
) break; 
2675         /* Handle SELECT DB opcode as a special case */ 
2676         if (type 
== REDIS_SELECTDB
) { 
2677             if ((dbid 
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) 
2679             if (dbid 
>= (unsigned)server
.dbnum
) { 
2680                 redisLog(REDIS_WARNING
,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server
.dbnum
); 
2683             db 
= server
.db
+dbid
; 
2688         if ((keyobj 
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
; 
2690         if (type 
== REDIS_STRING
) { 
2691             /* Read string value */ 
2692             if ((o 
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
; 
2693             tryObjectEncoding(o
); 
2694         } else if (type 
== REDIS_LIST 
|| type 
== REDIS_SET
) { 
2695             /* Read list/set value */ 
2698             if ((listlen 
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) 
2700             o 
= (type 
== REDIS_LIST
) ? createListObject() : createSetObject(); 
2701             /* Load every single element of the list/set */ 
2705                 if ((ele 
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
; 
2706                 tryObjectEncoding(ele
); 
2707                 if (type 
== REDIS_LIST
) { 
2708                     listAddNodeTail((list
*)o
->ptr
,ele
); 
2710                     dictAdd((dict
*)o
->ptr
,ele
,NULL
); 
2713         } else if (type 
== REDIS_ZSET
) { 
2714             /* Read list/set value */ 
2718             if ((zsetlen 
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) 
2720             o 
= createZsetObject(); 
2722             /* Load every single element of the list/set */ 
2725                 double *score 
= zmalloc(sizeof(double)); 
2727                 if ((ele 
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
; 
2728                 tryObjectEncoding(ele
); 
2729                 if (rdbLoadDoubleValue(fp
,score
) == -1) goto eoferr
; 
2730                 dictAdd(zs
->dict
,ele
,score
); 
2731                 zslInsert(zs
->zsl
,*score
,ele
); 
2732                 incrRefCount(ele
); /* added to skiplist */ 
2737         /* Add the new object in the hash table */ 
2738         retval 
= dictAdd(d
,keyobj
,o
); 
2739         if (retval 
== DICT_ERR
) { 
2740             redisLog(REDIS_WARNING
,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj
->ptr
); 
2743         /* Set the expire time if needed */ 
2744         if (expiretime 
!= -1) { 
2745             setExpire(db
,keyobj
,expiretime
); 
2746             /* Delete this key if already expired */ 
2747             if (expiretime 
< now
) deleteKey(db
,keyobj
); 
2755 eoferr
: /* unexpected end of file is handled here with a fatal exit */ 
2756     if (keyobj
) decrRefCount(keyobj
); 
2757     redisLog(REDIS_WARNING
,"Short read or OOM loading DB. Unrecoverable error, aborting now."); 
2759     return REDIS_ERR
; /* Just to avoid warning */ 
2762 /*================================== Commands =============================== */ 
2764 static void authCommand(redisClient 
*c
) { 
2765     if (!server
.requirepass 
|| !strcmp(c
->argv
[1]->ptr
, server
.requirepass
)) { 
2766       c
->authenticated 
= 1; 
2767       addReply(c
,shared
.ok
); 
2769       c
->authenticated 
= 0; 
2770       addReplySds(c
,sdscatprintf(sdsempty(),"-ERR invalid password\r\n")); 
2774 static void pingCommand(redisClient 
*c
) { 
2775     addReply(c
,shared
.pong
); 
2778 static void echoCommand(redisClient 
*c
) { 
2779     addReplyBulkLen(c
,c
->argv
[1]); 
2780     addReply(c
,c
->argv
[1]); 
2781     addReply(c
,shared
.crlf
); 
2784 /*=================================== Strings =============================== */ 
2786 static void setGenericCommand(redisClient 
*c
, int nx
) { 
2789     retval 
= dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]); 
2790     if (retval 
== DICT_ERR
) { 
2792             dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]); 
2793             incrRefCount(c
->argv
[2]); 
2795             addReply(c
,shared
.czero
); 
2799         incrRefCount(c
->argv
[1]); 
2800         incrRefCount(c
->argv
[2]); 
2803     removeExpire(c
->db
,c
->argv
[1]); 
2804     addReply(c
, nx 
? shared
.cone 
: shared
.ok
); 
2807 static void setCommand(redisClient 
*c
) { 
2808     setGenericCommand(c
,0); 
2811 static void setnxCommand(redisClient 
*c
) { 
2812     setGenericCommand(c
,1); 
2815 static void getCommand(redisClient 
*c
) { 
2816     robj 
*o 
= lookupKeyRead(c
->db
,c
->argv
[1]); 
2819         addReply(c
,shared
.nullbulk
); 
2821         if (o
->type 
!= REDIS_STRING
) { 
2822             addReply(c
,shared
.wrongtypeerr
); 
2824             addReplyBulkLen(c
,o
); 
2826             addReply(c
,shared
.crlf
); 
2831 static void getsetCommand(redisClient 
*c
) { 
2833     if (dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]) == DICT_ERR
) { 
2834         dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]); 
2836         incrRefCount(c
->argv
[1]); 
2838     incrRefCount(c
->argv
[2]); 
2840     removeExpire(c
->db
,c
->argv
[1]); 
2843 static void mgetCommand(redisClient 
*c
) { 
2846     addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",c
->argc
-1)); 
2847     for (j 
= 1; j 
< c
->argc
; j
++) { 
2848         robj 
*o 
= lookupKeyRead(c
->db
,c
->argv
[j
]); 
2850             addReply(c
,shared
.nullbulk
); 
2852             if (o
->type 
!= REDIS_STRING
) { 
2853                 addReply(c
,shared
.nullbulk
); 
2855                 addReplyBulkLen(c
,o
); 
2857                 addReply(c
,shared
.crlf
); 
2863 static void incrDecrCommand(redisClient 
*c
, long long incr
) { 
2868     o 
= lookupKeyWrite(c
->db
,c
->argv
[1]); 
2872         if (o
->type 
!= REDIS_STRING
) { 
2877             if (o
->encoding 
== REDIS_ENCODING_RAW
) 
2878                 value 
= strtoll(o
->ptr
, &eptr
, 10); 
2879             else if (o
->encoding 
== REDIS_ENCODING_INT
) 
2880                 value 
= (long)o
->ptr
; 
2887     o 
= createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",value
)); 
2888     tryObjectEncoding(o
); 
2889     retval 
= dictAdd(c
->db
->dict
,c
->argv
[1],o
); 
2890     if (retval 
== DICT_ERR
) { 
2891         dictReplace(c
->db
->dict
,c
->argv
[1],o
); 
2892         removeExpire(c
->db
,c
->argv
[1]); 
2894         incrRefCount(c
->argv
[1]); 
2897     addReply(c
,shared
.colon
); 
2899     addReply(c
,shared
.crlf
); 
2902 static void incrCommand(redisClient 
*c
) { 
2903     incrDecrCommand(c
,1); 
2906 static void decrCommand(redisClient 
*c
) { 
2907     incrDecrCommand(c
,-1); 
2910 static void incrbyCommand(redisClient 
*c
) { 
2911     long long incr 
= strtoll(c
->argv
[2]->ptr
, NULL
, 10); 
2912     incrDecrCommand(c
,incr
); 
2915 static void decrbyCommand(redisClient 
*c
) { 
2916     long long incr 
= strtoll(c
->argv
[2]->ptr
, NULL
, 10); 
2917     incrDecrCommand(c
,-incr
); 
2920 /* ========================= Type agnostic commands ========================= */ 
2922 static void delCommand(redisClient 
*c
) { 
2925     for (j 
= 1; j 
< c
->argc
; j
++) { 
2926         if (deleteKey(c
->db
,c
->argv
[j
])) { 
2933         addReply(c
,shared
.czero
); 
2936         addReply(c
,shared
.cone
); 
2939         addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",deleted
)); 
2944 static void existsCommand(redisClient 
*c
) { 
2945     addReply(c
,lookupKeyRead(c
->db
,c
->argv
[1]) ? shared
.cone 
: shared
.czero
); 
2948 static void selectCommand(redisClient 
*c
) { 
2949     int id 
= atoi(c
->argv
[1]->ptr
); 
2951     if (selectDb(c
,id
) == REDIS_ERR
) { 
2952         addReplySds(c
,sdsnew("-ERR invalid DB index\r\n")); 
2954         addReply(c
,shared
.ok
); 
2958 static void randomkeyCommand(redisClient 
*c
) { 
2962         de 
= dictGetRandomKey(c
->db
->dict
); 
2963         if (!de 
|| expireIfNeeded(c
->db
,dictGetEntryKey(de
)) == 0) break; 
2966         addReply(c
,shared
.plus
); 
2967         addReply(c
,shared
.crlf
); 
2969         addReply(c
,shared
.plus
); 
2970         addReply(c
,dictGetEntryKey(de
)); 
2971         addReply(c
,shared
.crlf
); 
2975 static void keysCommand(redisClient 
*c
) { 
2978     sds pattern 
= c
->argv
[1]->ptr
; 
2979     int plen 
= sdslen(pattern
); 
2980     int numkeys 
= 0, keyslen 
= 0; 
2981     robj 
*lenobj 
= createObject(REDIS_STRING
,NULL
); 
2983     di 
= dictGetIterator(c
->db
->dict
); 
2985     decrRefCount(lenobj
); 
2986     while((de 
= dictNext(di
)) != NULL
) { 
2987         robj 
*keyobj 
= dictGetEntryKey(de
); 
2989         sds key 
= keyobj
->ptr
; 
2990         if ((pattern
[0] == '*' && pattern
[1] == '\0') || 
2991             stringmatchlen(pattern
,plen
,key
,sdslen(key
),0)) { 
2992             if (expireIfNeeded(c
->db
,keyobj
) == 0) { 
2994                     addReply(c
,shared
.space
); 
2997                 keyslen 
+= sdslen(key
); 
3001     dictReleaseIterator(di
); 
3002     lenobj
->ptr 
= sdscatprintf(sdsempty(),"$%lu\r\n",keyslen
+(numkeys 
? (numkeys
-1) : 0)); 
3003     addReply(c
,shared
.crlf
); 
3006 static void dbsizeCommand(redisClient 
*c
) { 
3008         sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c
->db
->dict
))); 
3011 static void lastsaveCommand(redisClient 
*c
) { 
3013         sdscatprintf(sdsempty(),":%lu\r\n",server
.lastsave
)); 
3016 static void typeCommand(redisClient 
*c
) { 
3020     o 
= lookupKeyRead(c
->db
,c
->argv
[1]); 
3025         case REDIS_STRING
: type 
= "+string"; break; 
3026         case REDIS_LIST
: type 
= "+list"; break; 
3027         case REDIS_SET
: type 
= "+set"; break; 
3028         case REDIS_ZSET
: type 
= "+zset"; break; 
3029         default: type 
= "unknown"; break; 
3032     addReplySds(c
,sdsnew(type
)); 
3033     addReply(c
,shared
.crlf
); 
3036 static void saveCommand(redisClient 
*c
) { 
3037     if (server
.bgsaveinprogress
) { 
3038         addReplySds(c
,sdsnew("-ERR background save in progress\r\n")); 
3041     if (rdbSave(server
.dbfilename
) == REDIS_OK
) { 
3042         addReply(c
,shared
.ok
); 
3044         addReply(c
,shared
.err
); 
3048 static void bgsaveCommand(redisClient 
*c
) { 
3049     if (server
.bgsaveinprogress
) { 
3050         addReplySds(c
,sdsnew("-ERR background save already in progress\r\n")); 
3053     if (rdbSaveBackground(server
.dbfilename
) == REDIS_OK
) { 
3054         addReply(c
,shared
.ok
); 
3056         addReply(c
,shared
.err
); 
3060 static void shutdownCommand(redisClient 
*c
) { 
3061     redisLog(REDIS_WARNING
,"User requested shutdown, saving DB..."); 
3062     /* Kill the saving child if there is a background saving in progress. 
3063        We want to avoid race conditions, for instance our saving child may 
3064        overwrite the synchronous saving did by SHUTDOWN. */ 
3065     if (server
.bgsaveinprogress
) { 
3066         redisLog(REDIS_WARNING
,"There is a live saving child. Killing it!"); 
3067         kill(server
.bgsavechildpid
,SIGKILL
); 
3068         rdbRemoveTempFile(server
.bgsavechildpid
); 
3071     if (rdbSave(server
.dbfilename
) == REDIS_OK
) { 
3072         if (server
.daemonize
) 
3073             unlink(server
.pidfile
); 
3074         redisLog(REDIS_WARNING
,"%zu bytes used at exit",zmalloc_used_memory()); 
3075         redisLog(REDIS_WARNING
,"Server exit now, bye bye..."); 
3078         /* Ooops.. error saving! The best we can do is to continue operating. 
3079          * Note that if there was a background saving process, in the next 
3080          * cron() Redis will be notified that the background saving aborted, 
3081          * handling special stuff like slaves pending for synchronization... */ 
3082         redisLog(REDIS_WARNING
,"Error trying to save the DB, can't exit");  
3083         addReplySds(c
,sdsnew("-ERR can't quit, problems saving the DB\r\n")); 
3087 static void renameGenericCommand(redisClient 
*c
, int nx
) { 
3090     /* To use the same key as src and dst is probably an error */ 
3091     if (sdscmp(c
->argv
[1]->ptr
,c
->argv
[2]->ptr
) == 0) { 
3092         addReply(c
,shared
.sameobjecterr
); 
3096     o 
= lookupKeyWrite(c
->db
,c
->argv
[1]); 
3098         addReply(c
,shared
.nokeyerr
); 
3102     deleteIfVolatile(c
->db
,c
->argv
[2]); 
3103     if (dictAdd(c
->db
->dict
,c
->argv
[2],o
) == DICT_ERR
) { 
3106             addReply(c
,shared
.czero
); 
3109         dictReplace(c
->db
->dict
,c
->argv
[2],o
); 
3111         incrRefCount(c
->argv
[2]); 
3113     deleteKey(c
->db
,c
->argv
[1]); 
3115     addReply(c
,nx 
? shared
.cone 
: shared
.ok
); 
3118 static void renameCommand(redisClient 
*c
) { 
3119     renameGenericCommand(c
,0); 
3122 static void renamenxCommand(redisClient 
*c
) { 
3123     renameGenericCommand(c
,1); 
3126 static void moveCommand(redisClient 
*c
) { 
3131     /* Obtain source and target DB pointers */ 
3134     if (selectDb(c
,atoi(c
->argv
[2]->ptr
)) == REDIS_ERR
) { 
3135         addReply(c
,shared
.outofrangeerr
); 
3139     selectDb(c
,srcid
); /* Back to the source DB */ 
3141     /* If the user is moving using as target the same 
3142      * DB as the source DB it is probably an error. */ 
3144         addReply(c
,shared
.sameobjecterr
); 
3148     /* Check if the element exists and get a reference */ 
3149     o 
= lookupKeyWrite(c
->db
,c
->argv
[1]); 
3151         addReply(c
,shared
.czero
); 
3155     /* Try to add the element to the target DB */ 
3156     deleteIfVolatile(dst
,c
->argv
[1]); 
3157     if (dictAdd(dst
->dict
,c
->argv
[1],o
) == DICT_ERR
) { 
3158         addReply(c
,shared
.czero
); 
3161     incrRefCount(c
->argv
[1]); 
3164     /* OK! key moved, free the entry in the source DB */ 
3165     deleteKey(src
,c
->argv
[1]); 
3167     addReply(c
,shared
.cone
); 
3170 /* =================================== Lists ================================ */ 
3171 static void pushGenericCommand(redisClient 
*c
, int where
) { 
3175     lobj 
= lookupKeyWrite(c
->db
,c
->argv
[1]); 
3177         lobj 
= createListObject(); 
3179         if (where 
== REDIS_HEAD
) { 
3180             listAddNodeHead(list
,c
->argv
[2]); 
3182             listAddNodeTail(list
,c
->argv
[2]); 
3184         dictAdd(c
->db
->dict
,c
->argv
[1],lobj
); 
3185         incrRefCount(c
->argv
[1]); 
3186         incrRefCount(c
->argv
[2]); 
3188         if (lobj
->type 
!= REDIS_LIST
) { 
3189             addReply(c
,shared
.wrongtypeerr
); 
3193         if (where 
== REDIS_HEAD
) { 
3194             listAddNodeHead(list
,c
->argv
[2]); 
3196             listAddNodeTail(list
,c
->argv
[2]); 
3198         incrRefCount(c
->argv
[2]); 
3201     addReply(c
,shared
.ok
); 
3204 static void lpushCommand(redisClient 
*c
) { 
3205     pushGenericCommand(c
,REDIS_HEAD
); 
3208 static void rpushCommand(redisClient 
*c
) { 
3209     pushGenericCommand(c
,REDIS_TAIL
); 
3212 static void llenCommand(redisClient 
*c
) { 
3216     o 
= lookupKeyRead(c
->db
,c
->argv
[1]); 
3218         addReply(c
,shared
.czero
); 
3221         if (o
->type 
!= REDIS_LIST
) { 
3222             addReply(c
,shared
.wrongtypeerr
); 
3225             addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",listLength(l
))); 
3230 static void lindexCommand(redisClient 
*c
) { 
3232     int index 
= atoi(c
->argv
[2]->ptr
); 
3234     o 
= lookupKeyRead(c
->db
,c
->argv
[1]); 
3236         addReply(c
,shared
.nullbulk
); 
3238         if (o
->type 
!= REDIS_LIST
) { 
3239             addReply(c
,shared
.wrongtypeerr
); 
3241             list 
*list 
= o
->ptr
; 
3244             ln 
= listIndex(list
, index
); 
3246                 addReply(c
,shared
.nullbulk
); 
3248                 robj 
*ele 
= listNodeValue(ln
); 
3249                 addReplyBulkLen(c
,ele
); 
3251                 addReply(c
,shared
.crlf
); 
3257 static void lsetCommand(redisClient 
*c
) { 
3259     int index 
= atoi(c
->argv
[2]->ptr
); 
3261     o 
= lookupKeyWrite(c
->db
,c
->argv
[1]); 
3263         addReply(c
,shared
.nokeyerr
); 
3265         if (o
->type 
!= REDIS_LIST
) { 
3266             addReply(c
,shared
.wrongtypeerr
); 
3268             list 
*list 
= o
->ptr
; 
3271             ln 
= listIndex(list
, index
); 
3273                 addReply(c
,shared
.outofrangeerr
); 
3275                 robj 
*ele 
= listNodeValue(ln
); 
3278                 listNodeValue(ln
) = c
->argv
[3]; 
3279                 incrRefCount(c
->argv
[3]); 
3280                 addReply(c
,shared
.ok
); 
3287 static void popGenericCommand(redisClient 
*c
, int where
) { 
3290     o 
= lookupKeyWrite(c
->db
,c
->argv
[1]); 
3292         addReply(c
,shared
.nullbulk
); 
3294         if (o
->type 
!= REDIS_LIST
) { 
3295             addReply(c
,shared
.wrongtypeerr
); 
3297             list 
*list 
= o
->ptr
; 
3300             if (where 
== REDIS_HEAD
) 
3301                 ln 
= listFirst(list
); 
3303                 ln 
= listLast(list
); 
3306                 addReply(c
,shared
.nullbulk
); 
3308                 robj 
*ele 
= listNodeValue(ln
); 
3309                 addReplyBulkLen(c
,ele
); 
3311                 addReply(c
,shared
.crlf
); 
3312                 listDelNode(list
,ln
); 
3319 static void lpopCommand(redisClient 
*c
) { 
3320     popGenericCommand(c
,REDIS_HEAD
); 
3323 static void rpopCommand(redisClient 
*c
) { 
3324     popGenericCommand(c
,REDIS_TAIL
); 
3327 static void lrangeCommand(redisClient 
*c
) { 
3329     int start 
= atoi(c
->argv
[2]->ptr
); 
3330     int end 
= atoi(c
->argv
[3]->ptr
); 
3332     o 
= lookupKeyRead(c
->db
,c
->argv
[1]); 
3334         addReply(c
,shared
.nullmultibulk
); 
3336         if (o
->type 
!= REDIS_LIST
) { 
3337             addReply(c
,shared
.wrongtypeerr
); 
3339             list 
*list 
= o
->ptr
; 
3341             int llen 
= listLength(list
); 
3345             /* convert negative indexes */ 
3346             if (start 
< 0) start 
= llen
+start
; 
3347             if (end 
< 0) end 
= llen
+end
; 
3348             if (start 
< 0) start 
= 0; 
3349             if (end 
< 0) end 
= 0; 
3351             /* indexes sanity checks */ 
3352             if (start 
> end 
|| start 
>= llen
) { 
3353                 /* Out of range start or start > end result in empty list */ 
3354                 addReply(c
,shared
.emptymultibulk
); 
3357             if (end 
>= llen
) end 
= llen
-1; 
3358             rangelen 
= (end
-start
)+1; 
3360             /* Return the result in form of a multi-bulk reply */ 
3361             ln 
= listIndex(list
, start
); 
3362             addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",rangelen
)); 
3363             for (j 
= 0; j 
< rangelen
; j
++) { 
3364                 ele 
= listNodeValue(ln
); 
3365                 addReplyBulkLen(c
,ele
); 
3367                 addReply(c
,shared
.crlf
); 
3374 static void ltrimCommand(redisClient 
*c
) { 
3376     int start 
= atoi(c
->argv
[2]->ptr
); 
3377     int end 
= atoi(c
->argv
[3]->ptr
); 
3379     o 
= lookupKeyWrite(c
->db
,c
->argv
[1]); 
3381         addReply(c
,shared
.nokeyerr
); 
3383         if (o
->type 
!= REDIS_LIST
) { 
3384             addReply(c
,shared
.wrongtypeerr
); 
3386             list 
*list 
= o
->ptr
; 
3388             int llen 
= listLength(list
); 
3389             int j
, ltrim
, rtrim
; 
3391             /* convert negative indexes */ 
3392             if (start 
< 0) start 
= llen
+start
; 
3393             if (end 
< 0) end 
= llen
+end
; 
3394             if (start 
< 0) start 
= 0; 
3395             if (end 
< 0) end 
= 0; 
3397             /* indexes sanity checks */ 
3398             if (start 
> end 
|| start 
>= llen
) { 
3399                 /* Out of range start or start > end result in empty list */ 
3403                 if (end 
>= llen
) end 
= llen
-1; 
3408             /* Remove list elements to perform the trim */ 
3409             for (j 
= 0; j 
< ltrim
; j
++) { 
3410                 ln 
= listFirst(list
); 
3411                 listDelNode(list
,ln
); 
3413             for (j 
= 0; j 
< rtrim
; j
++) { 
3414                 ln 
= listLast(list
); 
3415                 listDelNode(list
,ln
); 
3418             addReply(c
,shared
.ok
); 
3423 static void lremCommand(redisClient 
*c
) { 
3426     o 
= lookupKeyWrite(c
->db
,c
->argv
[1]); 
3428         addReply(c
,shared
.czero
); 
3430         if (o
->type 
!= REDIS_LIST
) { 
3431             addReply(c
,shared
.wrongtypeerr
); 
3433             list 
*list 
= o
->ptr
; 
3434             listNode 
*ln
, *next
; 
3435             int toremove 
= atoi(c
->argv
[2]->ptr
); 
3440                 toremove 
= -toremove
; 
3443             ln 
= fromtail 
? list
->tail 
: list
->head
; 
3445                 robj 
*ele 
= listNodeValue(ln
); 
3447                 next 
= fromtail 
? ln
->prev 
: ln
->next
; 
3448                 if (compareStringObjects(ele
,c
->argv
[3]) == 0) { 
3449                     listDelNode(list
,ln
); 
3452                     if (toremove 
&& removed 
== toremove
) break; 
3456             addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",removed
)); 
3461 /* ==================================== Sets ================================ */ 
3463 static void saddCommand(redisClient 
*c
) { 
3466     set 
= lookupKeyWrite(c
->db
,c
->argv
[1]); 
3468         set 
= createSetObject(); 
3469         dictAdd(c
->db
->dict
,c
->argv
[1],set
); 
3470         incrRefCount(c
->argv
[1]); 
3472         if (set
->type 
!= REDIS_SET
) { 
3473             addReply(c
,shared
.wrongtypeerr
); 
3477     if (dictAdd(set
->ptr
,c
->argv
[2],NULL
) == DICT_OK
) { 
3478         incrRefCount(c
->argv
[2]); 
3480         addReply(c
,shared
.cone
); 
3482         addReply(c
,shared
.czero
); 
3486 static void sremCommand(redisClient 
*c
) { 
3489     set 
= lookupKeyWrite(c
->db
,c
->argv
[1]); 
3491         addReply(c
,shared
.czero
); 
3493         if (set
->type 
!= REDIS_SET
) { 
3494             addReply(c
,shared
.wrongtypeerr
); 
3497         if (dictDelete(set
->ptr
,c
->argv
[2]) == DICT_OK
) { 
3499             if (htNeedsResize(set
->ptr
)) dictResize(set
->ptr
); 
3500             addReply(c
,shared
.cone
); 
3502             addReply(c
,shared
.czero
); 
3507 static void smoveCommand(redisClient 
*c
) { 
3508     robj 
*srcset
, *dstset
; 
3510     srcset 
= lookupKeyWrite(c
->db
,c
->argv
[1]); 
3511     dstset 
= lookupKeyWrite(c
->db
,c
->argv
[2]); 
3513     /* If the source key does not exist return 0, if it's of the wrong type 
3515     if (srcset 
== NULL 
|| srcset
->type 
!= REDIS_SET
) { 
3516         addReply(c
, srcset 
? shared
.wrongtypeerr 
: shared
.czero
); 
3519     /* Error if the destination key is not a set as well */ 
3520     if (dstset 
&& dstset
->type 
!= REDIS_SET
) { 
3521         addReply(c
,shared
.wrongtypeerr
); 
3524     /* Remove the element from the source set */ 
3525     if (dictDelete(srcset
->ptr
,c
->argv
[3]) == DICT_ERR
) { 
3526         /* Key not found in the src set! return zero */ 
3527         addReply(c
,shared
.czero
); 
3531     /* Add the element to the destination set */ 
3533         dstset 
= createSetObject(); 
3534         dictAdd(c
->db
->dict
,c
->argv
[2],dstset
); 
3535         incrRefCount(c
->argv
[2]); 
3537     if (dictAdd(dstset
->ptr
,c
->argv
[3],NULL
) == DICT_OK
) 
3538         incrRefCount(c
->argv
[3]); 
3539     addReply(c
,shared
.cone
); 
3542 static void sismemberCommand(redisClient 
*c
) { 
3545     set 
= lookupKeyRead(c
->db
,c
->argv
[1]); 
3547         addReply(c
,shared
.czero
); 
3549         if (set
->type 
!= REDIS_SET
) { 
3550             addReply(c
,shared
.wrongtypeerr
); 
3553         if (dictFind(set
->ptr
,c
->argv
[2])) 
3554             addReply(c
,shared
.cone
); 
3556             addReply(c
,shared
.czero
); 
3560 static void scardCommand(redisClient 
*c
) { 
3564     o 
= lookupKeyRead(c
->db
,c
->argv
[1]); 
3566         addReply(c
,shared
.czero
); 
3569         if (o
->type 
!= REDIS_SET
) { 
3570             addReply(c
,shared
.wrongtypeerr
); 
3573             addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n", 
3579 static void spopCommand(redisClient 
*c
) { 
3583     set 
= lookupKeyWrite(c
->db
,c
->argv
[1]); 
3585         addReply(c
,shared
.nullbulk
); 
3587         if (set
->type 
!= REDIS_SET
) { 
3588             addReply(c
,shared
.wrongtypeerr
); 
3591         de 
= dictGetRandomKey(set
->ptr
); 
3593             addReply(c
,shared
.nullbulk
); 
3595             robj 
*ele 
= dictGetEntryKey(de
); 
3597             addReplyBulkLen(c
,ele
); 
3599             addReply(c
,shared
.crlf
); 
3600             dictDelete(set
->ptr
,ele
); 
3601             if (htNeedsResize(set
->ptr
)) dictResize(set
->ptr
); 
3607 static void srandmemberCommand(redisClient 
*c
) { 
3611     set 
= lookupKeyRead(c
->db
,c
->argv
[1]); 
3613         addReply(c
,shared
.nullbulk
); 
3615         if (set
->type 
!= REDIS_SET
) { 
3616             addReply(c
,shared
.wrongtypeerr
); 
3619         de 
= dictGetRandomKey(set
->ptr
); 
3621             addReply(c
,shared
.nullbulk
); 
3623             robj 
*ele 
= dictGetEntryKey(de
); 
3625             addReplyBulkLen(c
,ele
); 
3627             addReply(c
,shared
.crlf
); 
3632 static int qsortCompareSetsByCardinality(const void *s1
, const void *s2
) { 
3633     dict 
**d1 
= (void*) s1
, **d2 
= (void*) s2
; 
3635     return dictSize(*d1
)-dictSize(*d2
); 
3638 static void sinterGenericCommand(redisClient 
*c
, robj 
**setskeys
, int setsnum
, robj 
*dstkey
) { 
3639     dict 
**dv 
= zmalloc(sizeof(dict
*)*setsnum
); 
3642     robj 
*lenobj 
= NULL
, *dstset 
= NULL
; 
3643     int j
, cardinality 
= 0; 
3645     for (j 
= 0; j 
< setsnum
; j
++) { 
3649                     lookupKeyWrite(c
->db
,setskeys
[j
]) : 
3650                     lookupKeyRead(c
->db
,setskeys
[j
]); 
3654                 deleteKey(c
->db
,dstkey
); 
3655                 addReply(c
,shared
.ok
); 
3657                 addReply(c
,shared
.nullmultibulk
); 
3661         if (setobj
->type 
!= REDIS_SET
) { 
3663             addReply(c
,shared
.wrongtypeerr
); 
3666         dv
[j
] = setobj
->ptr
; 
3668     /* Sort sets from the smallest to largest, this will improve our 
3669      * algorithm's performace */ 
3670     qsort(dv
,setsnum
,sizeof(dict
*),qsortCompareSetsByCardinality
); 
3672     /* The first thing we should output is the total number of elements... 
3673      * since this is a multi-bulk write, but at this stage we don't know 
3674      * the intersection set size, so we use a trick, append an empty object 
3675      * to the output list and save the pointer to later modify it with the 
3678         lenobj 
= createObject(REDIS_STRING
,NULL
); 
3680         decrRefCount(lenobj
); 
3682         /* If we have a target key where to store the resulting set 
3683          * create this key with an empty set inside */ 
3684         dstset 
= createSetObject(); 
3687     /* Iterate all the elements of the first (smallest) set, and test 
3688      * the element against all the other sets, if at least one set does 
3689      * not include the element it is discarded */ 
3690     di 
= dictGetIterator(dv
[0]); 
3692     while((de 
= dictNext(di
)) != NULL
) { 
3695         for (j 
= 1; j 
< setsnum
; j
++) 
3696             if (dictFind(dv
[j
],dictGetEntryKey(de
)) == NULL
) break; 
3698             continue; /* at least one set does not contain the member */ 
3699         ele 
= dictGetEntryKey(de
); 
3701             addReplyBulkLen(c
,ele
); 
3703             addReply(c
,shared
.crlf
); 
3706             dictAdd(dstset
->ptr
,ele
,NULL
); 
3710     dictReleaseIterator(di
); 
3713         /* Store the resulting set into the target */ 
3714         deleteKey(c
->db
,dstkey
); 
3715         dictAdd(c
->db
->dict
,dstkey
,dstset
); 
3716         incrRefCount(dstkey
); 
3720         lenobj
->ptr 
= sdscatprintf(sdsempty(),"*%d\r\n",cardinality
); 
3722         addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n", 
3723             dictSize((dict
*)dstset
->ptr
))); 
3729 static void sinterCommand(redisClient 
*c
) { 
3730     sinterGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
); 
3733 static void sinterstoreCommand(redisClient 
*c
) { 
3734     sinterGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1]); 
3737 #define REDIS_OP_UNION 0 
3738 #define REDIS_OP_DIFF 1 
3740 static void sunionDiffGenericCommand(redisClient 
*c
, robj 
**setskeys
, int setsnum
, robj 
*dstkey
, int op
) { 
3741     dict 
**dv 
= zmalloc(sizeof(dict
*)*setsnum
); 
3744     robj 
*dstset 
= NULL
; 
3745     int j
, cardinality 
= 0; 
3747     for (j 
= 0; j 
< setsnum
; j
++) { 
3751                     lookupKeyWrite(c
->db
,setskeys
[j
]) : 
3752                     lookupKeyRead(c
->db
,setskeys
[j
]); 
3757         if (setobj
->type 
!= REDIS_SET
) { 
3759             addReply(c
,shared
.wrongtypeerr
); 
3762         dv
[j
] = setobj
->ptr
; 
3765     /* We need a temp set object to store our union. If the dstkey 
3766      * is not NULL (that is, we are inside an SUNIONSTORE operation) then 
3767      * this set object will be the resulting object to set into the target key*/ 
3768     dstset 
= createSetObject(); 
3770     /* Iterate all the elements of all the sets, add every element a single 
3771      * time to the result set */ 
3772     for (j 
= 0; j 
< setsnum
; j
++) { 
3773         if (op 
== REDIS_OP_DIFF 
&& j 
== 0 && !dv
[j
]) break; /* result set is empty */ 
3774         if (!dv
[j
]) continue; /* non existing keys are like empty sets */ 
3776         di 
= dictGetIterator(dv
[j
]); 
3778         while((de 
= dictNext(di
)) != NULL
) { 
3781             /* dictAdd will not add the same element multiple times */ 
3782             ele 
= dictGetEntryKey(de
); 
3783             if (op 
== REDIS_OP_UNION 
|| j 
== 0) { 
3784                 if (dictAdd(dstset
->ptr
,ele
,NULL
) == DICT_OK
) { 
3788             } else if (op 
== REDIS_OP_DIFF
) { 
3789                 if (dictDelete(dstset
->ptr
,ele
) == DICT_OK
) { 
3794         dictReleaseIterator(di
); 
3796         if (op 
== REDIS_OP_DIFF 
&& cardinality 
== 0) break; /* result set is empty */ 
3799     /* Output the content of the resulting set, if not in STORE mode */ 
3801         addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",cardinality
)); 
3802         di 
= dictGetIterator(dstset
->ptr
); 
3803         while((de 
= dictNext(di
)) != NULL
) { 
3806             ele 
= dictGetEntryKey(de
); 
3807             addReplyBulkLen(c
,ele
); 
3809             addReply(c
,shared
.crlf
); 
3811         dictReleaseIterator(di
); 
3813         /* If we have a target key where to store the resulting set 
3814          * create this key with the result set inside */ 
3815         deleteKey(c
->db
,dstkey
); 
3816         dictAdd(c
->db
->dict
,dstkey
,dstset
); 
3817         incrRefCount(dstkey
); 
3822         decrRefCount(dstset
); 
3824         addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n", 
3825             dictSize((dict
*)dstset
->ptr
))); 
3831 static void sunionCommand(redisClient 
*c
) { 
3832     sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_UNION
); 
3835 static void sunionstoreCommand(redisClient 
*c
) { 
3836     sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_UNION
); 
3839 static void sdiffCommand(redisClient 
*c
) { 
3840     sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_DIFF
); 
3843 static void sdiffstoreCommand(redisClient 
*c
) { 
3844     sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_DIFF
); 
3847 /* ==================================== ZSets =============================== */ 
3849 /* ZSETs are ordered sets using two data structures to hold the same elements 
3850  * in order to get O(log(N)) INSERT and REMOVE operations into a sorted 
3853  * The elements are added to an hash table mapping Redis objects to scores. 
3854  * At the same time the elements are added to a skip list mapping scores 
3855  * to Redis objects (so objects are sorted by scores in this "view"). */ 
3857 /* This skiplist implementation is almost a C translation of the original 
3858  * algorithm described by William Pugh in "Skip Lists: A Probabilistic 
3859  * Alternative to Balanced Trees", modified in three ways: 
3860  * a) this implementation allows for repeated values. 
3861  * b) the comparison is not just by key (our 'score') but by satellite data. 
3862  * c) there is a back pointer, so it's a doubly linked list with the back 
3863  * pointers being only at "level 1". This allows to traverse the list 
3864  * from tail to head, useful for ZREVRANGE. */ 
3866 static zskiplistNode 
*zslCreateNode(int level
, double score
, robj 
*obj
) { 
3867     zskiplistNode 
*zn 
= zmalloc(sizeof(*zn
)); 
3869     zn
->forward 
= zmalloc(sizeof(zskiplistNode
*) * level
); 
3875 static zskiplist 
*zslCreate(void) { 
3879     zsl 
= zmalloc(sizeof(*zsl
)); 
3882     zsl
->header 
= zslCreateNode(ZSKIPLIST_MAXLEVEL
,0,NULL
); 
3883     for (j 
= 0; j 
< ZSKIPLIST_MAXLEVEL
; j
++) 
3884         zsl
->header
->forward
[j
] = NULL
; 
3885     zsl
->header
->backward 
= NULL
; 
3890 static void zslFreeNode(zskiplistNode 
*node
) { 
3891     decrRefCount(node
->obj
); 
3892     zfree(node
->forward
); 
3896 static void zslFree(zskiplist 
*zsl
) { 
3897     zskiplistNode 
*node 
= zsl
->header
->forward
[0], *next
; 
3899     zfree(zsl
->header
->forward
); 
3902         next 
= node
->forward
[0]; 
3909 static int zslRandomLevel(void) { 
3911     while ((random()&0xFFFF) < (ZSKIPLIST_P 
* 0xFFFF)) 
3916 static void zslInsert(zskiplist 
*zsl
, double score
, robj 
*obj
) { 
3917     zskiplistNode 
*update
[ZSKIPLIST_MAXLEVEL
], *x
; 
3921     for (i 
= zsl
->level
-1; i 
>= 0; i
--) { 
3922         while (x
->forward
[i
] && 
3923             (x
->forward
[i
]->score 
< score 
|| 
3924                 (x
->forward
[i
]->score 
== score 
&& 
3925                 compareStringObjects(x
->forward
[i
]->obj
,obj
) < 0))) 
3929     /* we assume the key is not already inside, since we allow duplicated 
3930      * scores, and the re-insertion of score and redis object should never 
3931      * happpen since the caller of zslInsert() should test in the hash table 
3932      * if the element is already inside or not. */ 
3933     level 
= zslRandomLevel(); 
3934     if (level 
> zsl
->level
) { 
3935         for (i 
= zsl
->level
; i 
< level
; i
++) 
3936             update
[i
] = zsl
->header
; 
3939     x 
= zslCreateNode(level
,score
,obj
); 
3940     for (i 
= 0; i 
< level
; i
++) { 
3941         x
->forward
[i
] = update
[i
]->forward
[i
]; 
3942         update
[i
]->forward
[i
] = x
; 
3944     x
->backward 
= (update
[0] == zsl
->header
) ? NULL 
: update
[0]; 
3946         x
->forward
[0]->backward 
= x
; 
3952 /* Delete an element with matching score/object from the skiplist. */ 
3953 static int zslDelete(zskiplist 
*zsl
, double score
, robj 
*obj
) { 
3954     zskiplistNode 
*update
[ZSKIPLIST_MAXLEVEL
], *x
; 
3958     for (i 
= zsl
->level
-1; i 
>= 0; i
--) { 
3959         while (x
->forward
[i
] && 
3960             (x
->forward
[i
]->score 
< score 
|| 
3961                 (x
->forward
[i
]->score 
== score 
&& 
3962                 compareStringObjects(x
->forward
[i
]->obj
,obj
) < 0))) 
3966     /* We may have multiple elements with the same score, what we need 
3967      * is to find the element with both the right score and object. */ 
3969     if (x 
&& score 
== x
->score 
&& compareStringObjects(x
->obj
,obj
) == 0) { 
3970         for (i 
= 0; i 
< zsl
->level
; i
++) { 
3971             if (update
[i
]->forward
[i
] != x
) break; 
3972             update
[i
]->forward
[i
] = x
->forward
[i
]; 
3974         if (x
->forward
[0]) { 
3975             x
->forward
[0]->backward 
= (x
->backward 
== zsl
->header
) ? 
3978             zsl
->tail 
= x
->backward
; 
3981         while(zsl
->level 
> 1 && zsl
->header
->forward
[zsl
->level
-1] == NULL
) 
3986         return 0; /* not found */ 
3988     return 0; /* not found */ 
3991 /* Delete all the elements with score between min and max from the skiplist. 
3992  * Min and mx are inclusive, so a score >= min || score <= max is deleted. 
3993  * Note that this function takes the reference to the hash table view of the 
3994  * sorted set, in order to remove the elements from the hash table too. */ 
3995 static unsigned long zslDeleteRange(zskiplist 
*zsl
, double min
, double max
, dict 
*dict
) { 
3996     zskiplistNode 
*update
[ZSKIPLIST_MAXLEVEL
], *x
; 
3997     unsigned long removed 
= 0; 
4001     for (i 
= zsl
->level
-1; i 
>= 0; i
--) { 
4002         while (x
->forward
[i
] && x
->forward
[i
]->score 
< min
) 
4006     /* We may have multiple elements with the same score, what we need 
4007      * is to find the element with both the right score and object. */ 
4009     while (x 
&& x
->score 
<= max
) { 
4010         zskiplistNode 
*next
; 
4012         for (i 
= 0; i 
< zsl
->level
; i
++) { 
4013             if (update
[i
]->forward
[i
] != x
) break; 
4014             update
[i
]->forward
[i
] = x
->forward
[i
]; 
4016         if (x
->forward
[0]) { 
4017             x
->forward
[0]->backward 
= (x
->backward 
== zsl
->header
) ? 
4020             zsl
->tail 
= x
->backward
; 
4022         next 
= x
->forward
[0]; 
4023         dictDelete(dict
,x
->obj
); 
4025         while(zsl
->level 
> 1 && zsl
->header
->forward
[zsl
->level
-1] == NULL
) 
4031     return removed
; /* not found */ 
4034 /* Find the first node having a score equal or greater than the specified one. 
4035  * Returns NULL if there is no match. */ 
4036 static zskiplistNode 
*zslFirstWithScore(zskiplist 
*zsl
, double score
) { 
4041     for (i 
= zsl
->level
-1; i 
>= 0; i
--) { 
4042         while (x
->forward
[i
] && x
->forward
[i
]->score 
< score
) 
4045     /* We may have multiple elements with the same score, what we need 
4046      * is to find the element with both the right score and object. */ 
4047     return x
->forward
[0]; 
4050 /* The actual Z-commands implementations */ 
4052 static void zaddCommand(redisClient 
*c
) { 
4057     zsetobj 
= lookupKeyWrite(c
->db
,c
->argv
[1]); 
4058     if (zsetobj 
== NULL
) { 
4059         zsetobj 
= createZsetObject(); 
4060         dictAdd(c
->db
->dict
,c
->argv
[1],zsetobj
); 
4061         incrRefCount(c
->argv
[1]); 
4063         if (zsetobj
->type 
!= REDIS_ZSET
) { 
4064             addReply(c
,shared
.wrongtypeerr
); 
4068     score 
= zmalloc(sizeof(double)); 
4069     *score 
= strtod(c
->argv
[2]->ptr
,NULL
); 
4071     if (dictAdd(zs
->dict
,c
->argv
[3],score
) == DICT_OK
) { 
4072         /* case 1: New element */ 
4073         incrRefCount(c
->argv
[3]); /* added to hash */ 
4074         zslInsert(zs
->zsl
,*score
,c
->argv
[3]); 
4075         incrRefCount(c
->argv
[3]); /* added to skiplist */ 
4077         addReply(c
,shared
.cone
); 
4082         /* case 2: Score update operation */ 
4083         de 
= dictFind(zs
->dict
,c
->argv
[3]); 
4085         oldscore 
= dictGetEntryVal(de
); 
4086         if (*score 
!= *oldscore
) { 
4089             deleted 
= zslDelete(zs
->zsl
,*oldscore
,c
->argv
[3]); 
4090             assert(deleted 
!= 0); 
4091             zslInsert(zs
->zsl
,*score
,c
->argv
[3]); 
4092             incrRefCount(c
->argv
[3]); 
4093             dictReplace(zs
->dict
,c
->argv
[3],score
); 
4098         addReply(c
,shared
.czero
); 
4102 static void zremCommand(redisClient 
*c
) { 
4106     zsetobj 
= lookupKeyWrite(c
->db
,c
->argv
[1]); 
4107     if (zsetobj 
== NULL
) { 
4108         addReply(c
,shared
.czero
); 
4114         if (zsetobj
->type 
!= REDIS_ZSET
) { 
4115             addReply(c
,shared
.wrongtypeerr
); 
4119         de 
= dictFind(zs
->dict
,c
->argv
[2]); 
4121             addReply(c
,shared
.czero
); 
4124         /* Delete from the skiplist */ 
4125         oldscore 
= dictGetEntryVal(de
); 
4126         deleted 
= zslDelete(zs
->zsl
,*oldscore
,c
->argv
[2]); 
4127         assert(deleted 
!= 0); 
4129         /* Delete from the hash table */ 
4130         dictDelete(zs
->dict
,c
->argv
[2]); 
4131         if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
); 
4133         addReply(c
,shared
.cone
); 
4137 static void zremrangebyscoreCommand(redisClient 
*c
) { 
4138     double min 
= strtod(c
->argv
[2]->ptr
,NULL
); 
4139     double max 
= strtod(c
->argv
[3]->ptr
,NULL
); 
4143     zsetobj 
= lookupKeyWrite(c
->db
,c
->argv
[1]); 
4144     if (zsetobj 
== NULL
) { 
4145         addReply(c
,shared
.czero
); 
4149         if (zsetobj
->type 
!= REDIS_ZSET
) { 
4150             addReply(c
,shared
.wrongtypeerr
); 
4154         deleted 
= zslDeleteRange(zs
->zsl
,min
,max
,zs
->dict
); 
4155         if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
); 
4156         server
.dirty 
+= deleted
; 
4157         addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",deleted
)); 
4161 static void zrangeGenericCommand(redisClient 
*c
, int reverse
) { 
4163     int start 
= atoi(c
->argv
[2]->ptr
); 
4164     int end 
= atoi(c
->argv
[3]->ptr
); 
4166     o 
= lookupKeyRead(c
->db
,c
->argv
[1]); 
4168         addReply(c
,shared
.nullmultibulk
); 
4170         if (o
->type 
!= REDIS_ZSET
) { 
4171             addReply(c
,shared
.wrongtypeerr
); 
4173             zset 
*zsetobj 
= o
->ptr
; 
4174             zskiplist 
*zsl 
= zsetobj
->zsl
; 
4177             int llen 
= zsl
->length
; 
4181             /* convert negative indexes */ 
4182             if (start 
< 0) start 
= llen
+start
; 
4183             if (end 
< 0) end 
= llen
+end
; 
4184             if (start 
< 0) start 
= 0; 
4185             if (end 
< 0) end 
= 0; 
4187             /* indexes sanity checks */ 
4188             if (start 
> end 
|| start 
>= llen
) { 
4189                 /* Out of range start or start > end result in empty list */ 
4190                 addReply(c
,shared
.emptymultibulk
); 
4193             if (end 
>= llen
) end 
= llen
-1; 
4194             rangelen 
= (end
-start
)+1; 
4196             /* Return the result in form of a multi-bulk reply */ 
4202                 ln 
= zsl
->header
->forward
[0]; 
4204                     ln 
= ln
->forward
[0]; 
4207             addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",rangelen
)); 
4208             for (j 
= 0; j 
< rangelen
; j
++) { 
4210                 addReplyBulkLen(c
,ele
); 
4212                 addReply(c
,shared
.crlf
); 
4213                 ln 
= reverse 
? ln
->backward 
: ln
->forward
[0]; 
4219 static void zrangeCommand(redisClient 
*c
) { 
4220     zrangeGenericCommand(c
,0); 
4223 static void zrevrangeCommand(redisClient 
*c
) { 
4224     zrangeGenericCommand(c
,1); 
4227 static void zrangebyscoreCommand(redisClient 
*c
) { 
4229     double min 
= strtod(c
->argv
[2]->ptr
,NULL
); 
4230     double max 
= strtod(c
->argv
[3]->ptr
,NULL
); 
4232     o 
= lookupKeyRead(c
->db
,c
->argv
[1]); 
4234         addReply(c
,shared
.nullmultibulk
); 
4236         if (o
->type 
!= REDIS_ZSET
) { 
4237             addReply(c
,shared
.wrongtypeerr
); 
4239             zset 
*zsetobj 
= o
->ptr
; 
4240             zskiplist 
*zsl 
= zsetobj
->zsl
; 
4243             unsigned int rangelen 
= 0; 
4245             /* Get the first node with the score >= min */ 
4246             ln 
= zslFirstWithScore(zsl
,min
); 
4248                 /* No element matching the speciifed interval */ 
4249                 addReply(c
,shared
.emptymultibulk
); 
4253             /* We don't know in advance how many matching elements there 
4254              * are in the list, so we push this object that will represent 
4255              * the multi-bulk length in the output buffer, and will "fix" 
4257             lenobj 
= createObject(REDIS_STRING
,NULL
); 
4260             while(ln 
&& ln
->score 
<= max
) { 
4262                 addReplyBulkLen(c
,ele
); 
4264                 addReply(c
,shared
.crlf
); 
4265                 ln 
= ln
->forward
[0]; 
4268             lenobj
->ptr 
= sdscatprintf(sdsempty(),"*%d\r\n",rangelen
); 
4273 static void zcardCommand(redisClient 
*c
) { 
4277     o 
= lookupKeyRead(c
->db
,c
->argv
[1]); 
4279         addReply(c
,shared
.czero
); 
4282         if (o
->type 
!= REDIS_ZSET
) { 
4283             addReply(c
,shared
.wrongtypeerr
); 
4286             addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",zs
->zsl
->length
)); 
4291 static void zscoreCommand(redisClient 
*c
) { 
4295     o 
= lookupKeyRead(c
->db
,c
->argv
[1]); 
4297         addReply(c
,shared
.czero
); 
4300         if (o
->type 
!= REDIS_ZSET
) { 
4301             addReply(c
,shared
.wrongtypeerr
); 
4306             de 
= dictFind(zs
->dict
,c
->argv
[2]); 
4308                 addReply(c
,shared
.nullbulk
); 
4311                 double *score 
= dictGetEntryVal(de
); 
4313                 snprintf(buf
,sizeof(buf
),"%.16g",*score
); 
4314                 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n%s\r\n", 
4321 /* ========================= Non type-specific commands  ==================== */ 
4323 static void flushdbCommand(redisClient 
*c
) { 
4324     server
.dirty 
+= dictSize(c
->db
->dict
); 
4325     dictEmpty(c
->db
->dict
); 
4326     dictEmpty(c
->db
->expires
); 
4327     addReply(c
,shared
.ok
); 
4330 static void flushallCommand(redisClient 
*c
) { 
4331     server
.dirty 
+= emptyDb(); 
4332     addReply(c
,shared
.ok
); 
4333     rdbSave(server
.dbfilename
); 
4337 static redisSortOperation 
*createSortOperation(int type
, robj 
*pattern
) { 
4338     redisSortOperation 
*so 
= zmalloc(sizeof(*so
)); 
4340     so
->pattern 
= pattern
; 
4344 /* Return the value associated to the key with a name obtained 
4345  * substituting the first occurence of '*' in 'pattern' with 'subst' */ 
4346 static robj 
*lookupKeyByPattern(redisDb 
*db
, robj 
*pattern
, robj 
*subst
) { 
4350     int prefixlen
, sublen
, postfixlen
; 
4351     /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */ 
4355         char buf
[REDIS_SORTKEY_MAX
+1]; 
4358     if (subst
->encoding 
== REDIS_ENCODING_RAW
) 
4359         incrRefCount(subst
); 
4361         subst 
= getDecodedObject(subst
); 
4364     spat 
= pattern
->ptr
; 
4366     if (sdslen(spat
)+sdslen(ssub
)-1 > REDIS_SORTKEY_MAX
) return NULL
; 
4367     p 
= strchr(spat
,'*'); 
4368     if (!p
) return NULL
; 
4371     sublen 
= sdslen(ssub
); 
4372     postfixlen 
= sdslen(spat
)-(prefixlen
+1); 
4373     memcpy(keyname
.buf
,spat
,prefixlen
); 
4374     memcpy(keyname
.buf
+prefixlen
,ssub
,sublen
); 
4375     memcpy(keyname
.buf
+prefixlen
+sublen
,p
+1,postfixlen
); 
4376     keyname
.buf
[prefixlen
+sublen
+postfixlen
] = '\0'; 
4377     keyname
.len 
= prefixlen
+sublen
+postfixlen
; 
4379     keyobj
.refcount 
= 1; 
4380     keyobj
.type 
= REDIS_STRING
; 
4381     keyobj
.ptr 
= ((char*)&keyname
)+(sizeof(long)*2); 
4383     decrRefCount(subst
); 
4385     /* printf("lookup '%s' => %p\n", keyname.buf,de); */ 
4386     return lookupKeyRead(db
,&keyobj
); 
4389 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with 
4390  * the additional parameter is not standard but a BSD-specific we have to 
4391  * pass sorting parameters via the global 'server' structure */ 
4392 static int sortCompare(const void *s1
, const void *s2
) { 
4393     const redisSortObject 
*so1 
= s1
, *so2 
= s2
; 
4396     if (!server
.sort_alpha
) { 
4397         /* Numeric sorting. Here it's trivial as we precomputed scores */ 
4398         if (so1
->u
.score 
> so2
->u
.score
) { 
4400         } else if (so1
->u
.score 
< so2
->u
.score
) { 
4406         /* Alphanumeric sorting */ 
4407         if (server
.sort_bypattern
) { 
4408             if (!so1
->u
.cmpobj 
|| !so2
->u
.cmpobj
) { 
4409                 /* At least one compare object is NULL */ 
4410                 if (so1
->u
.cmpobj 
== so2
->u
.cmpobj
) 
4412                 else if (so1
->u
.cmpobj 
== NULL
) 
4417                 /* We have both the objects, use strcoll */ 
4418                 cmp 
= strcoll(so1
->u
.cmpobj
->ptr
,so2
->u
.cmpobj
->ptr
); 
4421             /* Compare elements directly */ 
4422             if (so1
->obj
->encoding 
== REDIS_ENCODING_RAW 
&& 
4423                 so2
->obj
->encoding 
== REDIS_ENCODING_RAW
) { 
4424                 cmp 
= strcoll(so1
->obj
->ptr
,so2
->obj
->ptr
); 
4428                 dec1 
= so1
->obj
->encoding 
== REDIS_ENCODING_RAW 
? 
4429                     so1
->obj 
: getDecodedObject(so1
->obj
); 
4430                 dec2 
= so2
->obj
->encoding 
== REDIS_ENCODING_RAW 
? 
4431                     so2
->obj 
: getDecodedObject(so2
->obj
); 
4432                 cmp 
= strcoll(dec1
->ptr
,dec2
->ptr
); 
4433                 if (dec1 
!= so1
->obj
) decrRefCount(dec1
); 
4434                 if (dec2 
!= so2
->obj
) decrRefCount(dec2
); 
4438     return server
.sort_desc 
? -cmp 
: cmp
; 
4441 /* The SORT command is the most complex command in Redis. Warning: this code 
4442  * is optimized for speed and a bit less for readability */ 
4443 static void sortCommand(redisClient 
*c
) { 
4446     int desc 
= 0, alpha 
= 0; 
4447     int limit_start 
= 0, limit_count 
= -1, start
, end
; 
4448     int j
, dontsort 
= 0, vectorlen
; 
4449     int getop 
= 0; /* GET operation counter */ 
4450     robj 
*sortval
, *sortby 
= NULL
; 
4451     redisSortObject 
*vector
; /* Resulting vector to sort */ 
4453     /* Lookup the key to sort. It must be of the right types */ 
4454     sortval 
= lookupKeyRead(c
->db
,c
->argv
[1]); 
4455     if (sortval 
== NULL
) { 
4456         addReply(c
,shared
.nokeyerr
); 
4459     if (sortval
->type 
!= REDIS_SET 
&& sortval
->type 
!= REDIS_LIST
) { 
4460         addReply(c
,shared
.wrongtypeerr
); 
4464     /* Create a list of operations to perform for every sorted element. 
4465      * Operations can be GET/DEL/INCR/DECR */ 
4466     operations 
= listCreate(); 
4467     listSetFreeMethod(operations
,zfree
); 
4470     /* Now we need to protect sortval incrementing its count, in the future 
4471      * SORT may have options able to overwrite/delete keys during the sorting 
4472      * and the sorted key itself may get destroied */ 
4473     incrRefCount(sortval
); 
4475     /* The SORT command has an SQL-alike syntax, parse it */ 
4476     while(j 
< c
->argc
) { 
4477         int leftargs 
= c
->argc
-j
-1; 
4478         if (!strcasecmp(c
->argv
[j
]->ptr
,"asc")) { 
4480         } else if (!strcasecmp(c
->argv
[j
]->ptr
,"desc")) { 
4482         } else if (!strcasecmp(c
->argv
[j
]->ptr
,"alpha")) { 
4484         } else if (!strcasecmp(c
->argv
[j
]->ptr
,"limit") && leftargs 
>= 2) { 
4485             limit_start 
= atoi(c
->argv
[j
+1]->ptr
); 
4486             limit_count 
= atoi(c
->argv
[j
+2]->ptr
); 
4488         } else if (!strcasecmp(c
->argv
[j
]->ptr
,"by") && leftargs 
>= 1) { 
4489             sortby 
= c
->argv
[j
+1]; 
4490             /* If the BY pattern does not contain '*', i.e. it is constant, 
4491              * we don't need to sort nor to lookup the weight keys. */ 
4492             if (strchr(c
->argv
[j
+1]->ptr
,'*') == NULL
) dontsort 
= 1; 
4494         } else if (!strcasecmp(c
->argv
[j
]->ptr
,"get") && leftargs 
>= 1) { 
4495             listAddNodeTail(operations
,createSortOperation( 
4496                 REDIS_SORT_GET
,c
->argv
[j
+1])); 
4499         } else if (!strcasecmp(c
->argv
[j
]->ptr
,"del") && leftargs 
>= 1) { 
4500             listAddNodeTail(operations
,createSortOperation( 
4501                 REDIS_SORT_DEL
,c
->argv
[j
+1])); 
4503         } else if (!strcasecmp(c
->argv
[j
]->ptr
,"incr") && leftargs 
>= 1) { 
4504             listAddNodeTail(operations
,createSortOperation( 
4505                 REDIS_SORT_INCR
,c
->argv
[j
+1])); 
4507         } else if (!strcasecmp(c
->argv
[j
]->ptr
,"get") && leftargs 
>= 1) { 
4508             listAddNodeTail(operations
,createSortOperation( 
4509                 REDIS_SORT_DECR
,c
->argv
[j
+1])); 
4512             decrRefCount(sortval
); 
4513             listRelease(operations
); 
4514             addReply(c
,shared
.syntaxerr
); 
4520     /* Load the sorting vector with all the objects to sort */ 
4521     vectorlen 
= (sortval
->type 
== REDIS_LIST
) ? 
4522         listLength((list
*)sortval
->ptr
) : 
4523         dictSize((dict
*)sortval
->ptr
); 
4524     vector 
= zmalloc(sizeof(redisSortObject
)*vectorlen
); 
4526     if (sortval
->type 
== REDIS_LIST
) { 
4527         list 
*list 
= sortval
->ptr
; 
4531         while((ln 
= listYield(list
))) { 
4532             robj 
*ele 
= ln
->value
; 
4533             vector
[j
].obj 
= ele
; 
4534             vector
[j
].u
.score 
= 0; 
4535             vector
[j
].u
.cmpobj 
= NULL
; 
4539         dict 
*set 
= sortval
->ptr
; 
4543         di 
= dictGetIterator(set
); 
4544         while((setele 
= dictNext(di
)) != NULL
) { 
4545             vector
[j
].obj 
= dictGetEntryKey(setele
); 
4546             vector
[j
].u
.score 
= 0; 
4547             vector
[j
].u
.cmpobj 
= NULL
; 
4550         dictReleaseIterator(di
); 
4552     assert(j 
== vectorlen
); 
4554     /* Now it's time to load the right scores in the sorting vector */ 
4555     if (dontsort 
== 0) { 
4556         for (j 
= 0; j 
< vectorlen
; j
++) { 
4560                 byval 
= lookupKeyByPattern(c
->db
,sortby
,vector
[j
].obj
); 
4561                 if (!byval 
|| byval
->type 
!= REDIS_STRING
) continue; 
4563                     if (byval
->encoding 
== REDIS_ENCODING_RAW
) { 
4564                         vector
[j
].u
.cmpobj 
= byval
; 
4565                         incrRefCount(byval
); 
4567                         vector
[j
].u
.cmpobj 
= getDecodedObject(byval
); 
4570                     if (byval
->encoding 
== REDIS_ENCODING_RAW
) { 
4571                         vector
[j
].u
.score 
= strtod(byval
->ptr
,NULL
); 
4573                         if (byval
->encoding 
== REDIS_ENCODING_INT
) { 
4574                             vector
[j
].u
.score 
= (long)byval
->ptr
; 
4581                     if (vector
[j
].obj
->encoding 
== REDIS_ENCODING_RAW
) 
4582                         vector
[j
].u
.score 
= strtod(vector
[j
].obj
->ptr
,NULL
); 
4584                         if (vector
[j
].obj
->encoding 
== REDIS_ENCODING_INT
) 
4585                             vector
[j
].u
.score 
= (long) vector
[j
].obj
->ptr
; 
4594     /* We are ready to sort the vector... perform a bit of sanity check 
4595      * on the LIMIT option too. We'll use a partial version of quicksort. */ 
4596     start 
= (limit_start 
< 0) ? 0 : limit_start
; 
4597     end 
= (limit_count 
< 0) ? vectorlen
-1 : start
+limit_count
-1; 
4598     if (start 
>= vectorlen
) { 
4599         start 
= vectorlen
-1; 
4602     if (end 
>= vectorlen
) end 
= vectorlen
-1; 
4604     if (dontsort 
== 0) { 
4605         server
.sort_desc 
= desc
; 
4606         server
.sort_alpha 
= alpha
; 
4607         server
.sort_bypattern 
= sortby 
? 1 : 0; 
4608         if (sortby 
&& (start 
!= 0 || end 
!= vectorlen
-1)) 
4609             pqsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
, start
,end
); 
4611             qsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
); 
4614     /* Send command output to the output buffer, performing the specified 
4615      * GET/DEL/INCR/DECR operations if any. */ 
4616     outputlen 
= getop 
? getop
*(end
-start
+1) : end
-start
+1; 
4617     addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",outputlen
)); 
4618     for (j 
= start
; j 
<= end
; j
++) { 
4621             addReplyBulkLen(c
,vector
[j
].obj
); 
4622             addReply(c
,vector
[j
].obj
); 
4623             addReply(c
,shared
.crlf
); 
4625         listRewind(operations
); 
4626         while((ln 
= listYield(operations
))) { 
4627             redisSortOperation 
*sop 
= ln
->value
; 
4628             robj 
*val 
= lookupKeyByPattern(c
->db
,sop
->pattern
, 
4631             if (sop
->type 
== REDIS_SORT_GET
) { 
4632                 if (!val 
|| val
->type 
!= REDIS_STRING
) { 
4633                     addReply(c
,shared
.nullbulk
); 
4635                     addReplyBulkLen(c
,val
); 
4637                     addReply(c
,shared
.crlf
); 
4639             } else if (sop
->type 
== REDIS_SORT_DEL
) { 
4646     decrRefCount(sortval
); 
4647     listRelease(operations
); 
4648     for (j 
= 0; j 
< vectorlen
; j
++) { 
4649         if (sortby 
&& alpha 
&& vector
[j
].u
.cmpobj
) 
4650             decrRefCount(vector
[j
].u
.cmpobj
); 
4655 static void infoCommand(redisClient 
*c
) { 
4657     time_t uptime 
= time(NULL
)-server
.stat_starttime
; 
4660     info 
= sdscatprintf(sdsempty(), 
4661         "redis_version:%s\r\n" 
4663         "uptime_in_seconds:%d\r\n" 
4664         "uptime_in_days:%d\r\n" 
4665         "connected_clients:%d\r\n" 
4666         "connected_slaves:%d\r\n" 
4667         "used_memory:%zu\r\n" 
4668         "changes_since_last_save:%lld\r\n" 
4669         "bgsave_in_progress:%d\r\n" 
4670         "last_save_time:%d\r\n" 
4671         "total_connections_received:%lld\r\n" 
4672         "total_commands_processed:%lld\r\n" 
4675         (sizeof(long) == 8) ? "64" : "32", 
4678         listLength(server
.clients
)-listLength(server
.slaves
), 
4679         listLength(server
.slaves
), 
4682         server
.bgsaveinprogress
, 
4684         server
.stat_numconnections
, 
4685         server
.stat_numcommands
, 
4686         server
.masterhost 
== NULL 
? "master" : "slave" 
4688     if (server
.masterhost
) { 
4689         info 
= sdscatprintf(info
, 
4690             "master_host:%s\r\n" 
4691             "master_port:%d\r\n" 
4692             "master_link_status:%s\r\n" 
4693             "master_last_io_seconds_ago:%d\r\n" 
4696             (server
.replstate 
== REDIS_REPL_CONNECTED
) ? 
4698             server
.master 
? ((int)(time(NULL
)-server
.master
->lastinteraction
)) : -1 
4701     for (j 
= 0; j 
< server
.dbnum
; j
++) { 
4702         long long keys
, vkeys
; 
4704         keys 
= dictSize(server
.db
[j
].dict
); 
4705         vkeys 
= dictSize(server
.db
[j
].expires
); 
4706         if (keys 
|| vkeys
) { 
4707             info 
= sdscatprintf(info
, "db%d: keys=%lld,expires=%lld\r\n", 
4711     addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info
))); 
4712     addReplySds(c
,info
); 
4713     addReply(c
,shared
.crlf
); 
4716 static void monitorCommand(redisClient 
*c
) { 
4717     /* ignore MONITOR if aleady slave or in monitor mode */ 
4718     if (c
->flags 
& REDIS_SLAVE
) return; 
4720     c
->flags 
|= (REDIS_SLAVE
|REDIS_MONITOR
); 
4722     listAddNodeTail(server
.monitors
,c
); 
4723     addReply(c
,shared
.ok
); 
4726 /* ================================= Expire ================================= */ 
4727 static int removeExpire(redisDb 
*db
, robj 
*key
) { 
4728     if (dictDelete(db
->expires
,key
) == DICT_OK
) { 
4735 static int setExpire(redisDb 
*db
, robj 
*key
, time_t when
) { 
4736     if (dictAdd(db
->expires
,key
,(void*)when
) == DICT_ERR
) { 
4744 /* Return the expire time of the specified key, or -1 if no expire 
4745  * is associated with this key (i.e. the key is non volatile) */ 
4746 static time_t getExpire(redisDb 
*db
, robj 
*key
) { 
4749     /* No expire? return ASAP */ 
4750     if (dictSize(db
->expires
) == 0 || 
4751        (de 
= dictFind(db
->expires
,key
)) == NULL
) return -1; 
4753     return (time_t) dictGetEntryVal(de
); 
4756 static int expireIfNeeded(redisDb 
*db
, robj 
*key
) { 
4760     /* No expire? return ASAP */ 
4761     if (dictSize(db
->expires
) == 0 || 
4762        (de 
= dictFind(db
->expires
,key
)) == NULL
) return 0; 
4764     /* Lookup the expire */ 
4765     when 
= (time_t) dictGetEntryVal(de
); 
4766     if (time(NULL
) <= when
) return 0; 
4768     /* Delete the key */ 
4769     dictDelete(db
->expires
,key
); 
4770     return dictDelete(db
->dict
,key
) == DICT_OK
; 
4773 static int deleteIfVolatile(redisDb 
*db
, robj 
*key
) { 
4776     /* No expire? return ASAP */ 
4777     if (dictSize(db
->expires
) == 0 || 
4778        (de 
= dictFind(db
->expires
,key
)) == NULL
) return 0; 
4780     /* Delete the key */ 
4782     dictDelete(db
->expires
,key
); 
4783     return dictDelete(db
->dict
,key
) == DICT_OK
; 
4786 static void expireGenericCommand(redisClient 
*c
, robj 
*key
, time_t seconds
) { 
4789     de 
= dictFind(c
->db
->dict
,key
); 
4791         addReply(c
,shared
.czero
); 
4795         if (deleteKey(c
->db
,key
)) server
.dirty
++; 
4796         addReply(c
, shared
.cone
); 
4799         time_t when 
= time(NULL
)+seconds
; 
4800         if (setExpire(c
->db
,key
,when
)) { 
4801             addReply(c
,shared
.cone
); 
4804             addReply(c
,shared
.czero
); 
4810 static void expireCommand(redisClient 
*c
) { 
4811     expireGenericCommand(c
,c
->argv
[1],strtol(c
->argv
[2]->ptr
,NULL
,10)); 
4814 static void expireatCommand(redisClient 
*c
) { 
4815     expireGenericCommand(c
,c
->argv
[1],strtol(c
->argv
[2]->ptr
,NULL
,10)-time(NULL
)); 
4818 static void ttlCommand(redisClient 
*c
) { 
4822     expire 
= getExpire(c
->db
,c
->argv
[1]); 
4824         ttl 
= (int) (expire
-time(NULL
)); 
4825         if (ttl 
< 0) ttl 
= -1; 
4827     addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",ttl
)); 
4830 static void msetGenericCommand(redisClient 
*c
, int nx
) { 
4833     if ((c
->argc 
% 2) == 0) { 
4834         addReplySds(c
,sdsnew("-ERR wrong number of arguments\r\n")); 
4837     /* Handle the NX flag. The MSETNX semantic is to return zero and don't 
4838      * set nothing at all if at least one already key exists. */ 
4840         for (j 
= 1; j 
< c
->argc
; j 
+= 2) { 
4841             if (dictFind(c
->db
->dict
,c
->argv
[j
]) != NULL
) { 
4842                 addReply(c
, shared
.czero
); 
4848     for (j 
= 1; j 
< c
->argc
; j 
+= 2) { 
4851         retval 
= dictAdd(c
->db
->dict
,c
->argv
[j
],c
->argv
[j
+1]); 
4852         if (retval 
== DICT_ERR
) { 
4853             dictReplace(c
->db
->dict
,c
->argv
[j
],c
->argv
[j
+1]); 
4854             incrRefCount(c
->argv
[j
+1]); 
4856             incrRefCount(c
->argv
[j
]); 
4857             incrRefCount(c
->argv
[j
+1]); 
4859         removeExpire(c
->db
,c
->argv
[j
]); 
4861     server
.dirty 
+= (c
->argc
-1)/2; 
4862     addReply(c
, nx 
? shared
.cone 
: shared
.ok
); 
4865 static void msetCommand(redisClient 
*c
) { 
4866     msetGenericCommand(c
,0); 
4869 static void msetnxCommand(redisClient 
*c
) { 
4870     msetGenericCommand(c
,1); 
4873 /* =============================== Replication  ============================= */ 
4875 static int syncWrite(int fd
, char *ptr
, ssize_t size
, int timeout
) { 
4876     ssize_t nwritten
, ret 
= size
; 
4877     time_t start 
= time(NULL
); 
4881         if (aeWait(fd
,AE_WRITABLE
,1000) & AE_WRITABLE
) { 
4882             nwritten 
= write(fd
,ptr
,size
); 
4883             if (nwritten 
== -1) return -1; 
4887         if ((time(NULL
)-start
) > timeout
) { 
4895 static int syncRead(int fd
, char *ptr
, ssize_t size
, int timeout
) { 
4896     ssize_t nread
, totread 
= 0; 
4897     time_t start 
= time(NULL
); 
4901         if (aeWait(fd
,AE_READABLE
,1000) & AE_READABLE
) { 
4902             nread 
= read(fd
,ptr
,size
); 
4903             if (nread 
== -1) return -1; 
4908         if ((time(NULL
)-start
) > timeout
) { 
4916 static int syncReadLine(int fd
, char *ptr
, ssize_t size
, int timeout
) { 
4923         if (syncRead(fd
,&c
,1,timeout
) == -1) return -1; 
4926             if (nread 
&& *(ptr
-1) == '\r') *(ptr
-1) = '\0'; 
4937 static void syncCommand(redisClient 
*c
) { 
4938     /* ignore SYNC if aleady slave or in monitor mode */ 
4939     if (c
->flags 
& REDIS_SLAVE
) return; 
4941     /* SYNC can't be issued when the server has pending data to send to 
4942      * the client about already issued commands. We need a fresh reply 
4943      * buffer registering the differences between the BGSAVE and the current 
4944      * dataset, so that we can copy to other slaves if needed. */ 
4945     if (listLength(c
->reply
) != 0) { 
4946         addReplySds(c
,sdsnew("-ERR SYNC is invalid with pending input\r\n")); 
4950     redisLog(REDIS_NOTICE
,"Slave ask for synchronization"); 
4951     /* Here we need to check if there is a background saving operation 
4952      * in progress, or if it is required to start one */ 
4953     if (server
.bgsaveinprogress
) { 
4954         /* Ok a background save is in progress. Let's check if it is a good 
4955          * one for replication, i.e. if there is another slave that is 
4956          * registering differences since the server forked to save */ 
4960         listRewind(server
.slaves
); 
4961         while((ln 
= listYield(server
.slaves
))) { 
4963             if (slave
->replstate 
== REDIS_REPL_WAIT_BGSAVE_END
) break; 
4966             /* Perfect, the server is already registering differences for 
4967              * another slave. Set the right state, and copy the buffer. */ 
4968             listRelease(c
->reply
); 
4969             c
->reply 
= listDup(slave
->reply
); 
4970             c
->replstate 
= REDIS_REPL_WAIT_BGSAVE_END
; 
4971             redisLog(REDIS_NOTICE
,"Waiting for end of BGSAVE for SYNC"); 
4973             /* No way, we need to wait for the next BGSAVE in order to 
4974              * register differences */ 
4975             c
->replstate 
= REDIS_REPL_WAIT_BGSAVE_START
; 
4976             redisLog(REDIS_NOTICE
,"Waiting for next BGSAVE for SYNC"); 
4979         /* Ok we don't have a BGSAVE in progress, let's start one */ 
4980         redisLog(REDIS_NOTICE
,"Starting BGSAVE for SYNC"); 
4981         if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) { 
4982             redisLog(REDIS_NOTICE
,"Replication failed, can't BGSAVE"); 
4983             addReplySds(c
,sdsnew("-ERR Unalbe to perform background save\r\n")); 
4986         c
->replstate 
= REDIS_REPL_WAIT_BGSAVE_END
; 
4989     c
->flags 
|= REDIS_SLAVE
; 
4991     listAddNodeTail(server
.slaves
,c
); 
4995 static void sendBulkToSlave(aeEventLoop 
*el
, int fd
, void *privdata
, int mask
) { 
4996     redisClient 
*slave 
= privdata
; 
4998     REDIS_NOTUSED(mask
); 
4999     char buf
[REDIS_IOBUF_LEN
]; 
5000     ssize_t nwritten
, buflen
; 
5002     if (slave
->repldboff 
== 0) { 
5003         /* Write the bulk write count before to transfer the DB. In theory here 
5004          * we don't know how much room there is in the output buffer of the 
5005          * socket, but in pratice SO_SNDLOWAT (the minimum count for output 
5006          * operations) will never be smaller than the few bytes we need. */ 
5009         bulkcount 
= sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long) 
5011         if (write(fd
,bulkcount
,sdslen(bulkcount
)) != (signed)sdslen(bulkcount
)) 
5019     lseek(slave
->repldbfd
,slave
->repldboff
,SEEK_SET
); 
5020     buflen 
= read(slave
->repldbfd
,buf
,REDIS_IOBUF_LEN
); 
5022         redisLog(REDIS_WARNING
,"Read error sending DB to slave: %s", 
5023             (buflen 
== 0) ? "premature EOF" : strerror(errno
)); 
5027     if ((nwritten 
= write(fd
,buf
,buflen
)) == -1) { 
5028         redisLog(REDIS_DEBUG
,"Write error sending DB to slave: %s", 
5033     slave
->repldboff 
+= nwritten
; 
5034     if (slave
->repldboff 
== slave
->repldbsize
) { 
5035         close(slave
->repldbfd
); 
5036         slave
->repldbfd 
= -1; 
5037         aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
); 
5038         slave
->replstate 
= REDIS_REPL_ONLINE
; 
5039         if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
, 
5040             sendReplyToClient
, slave
, NULL
) == AE_ERR
) { 
5044         addReplySds(slave
,sdsempty()); 
5045         redisLog(REDIS_NOTICE
,"Synchronization with slave succeeded"); 
5049 /* This function is called at the end of every backgrond saving. 
5050  * The argument bgsaveerr is REDIS_OK if the background saving succeeded 
5051  * otherwise REDIS_ERR is passed to the function. 
5053  * The goal of this function is to handle slaves waiting for a successful 
5054  * background saving in order to perform non-blocking synchronization. */ 
5055 static void updateSlavesWaitingBgsave(int bgsaveerr
) { 
5057     int startbgsave 
= 0; 
5059     listRewind(server
.slaves
); 
5060     while((ln 
= listYield(server
.slaves
))) { 
5061         redisClient 
*slave 
= ln
->value
; 
5063         if (slave
->replstate 
== REDIS_REPL_WAIT_BGSAVE_START
) { 
5065             slave
->replstate 
= REDIS_REPL_WAIT_BGSAVE_END
; 
5066         } else if (slave
->replstate 
== REDIS_REPL_WAIT_BGSAVE_END
) { 
5067             struct redis_stat buf
; 
5069             if (bgsaveerr 
!= REDIS_OK
) { 
5071                 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE child returned an error"); 
5074             if ((slave
->repldbfd 
= open(server
.dbfilename
,O_RDONLY
)) == -1 || 
5075                 redis_fstat(slave
->repldbfd
,&buf
) == -1) { 
5077                 redisLog(REDIS_WARNING
,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno
)); 
5080             slave
->repldboff 
= 0; 
5081             slave
->repldbsize 
= buf
.st_size
; 
5082             slave
->replstate 
= REDIS_REPL_SEND_BULK
; 
5083             aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
); 
5084             if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
, sendBulkToSlave
, slave
, NULL
) == AE_ERR
) { 
5091         if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) { 
5092             listRewind(server
.slaves
); 
5093             redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE failed"); 
5094             while((ln 
= listYield(server
.slaves
))) { 
5095                 redisClient 
*slave 
= ln
->value
; 
5097                 if (slave
->replstate 
== REDIS_REPL_WAIT_BGSAVE_START
) 
5104 static int syncWithMaster(void) { 
5105     char buf
[1024], tmpfile
[256]; 
5107     int fd 
= anetTcpConnect(NULL
,server
.masterhost
,server
.masterport
); 
5111         redisLog(REDIS_WARNING
,"Unable to connect to MASTER: %s", 
5115     /* Issue the SYNC command */ 
5116     if (syncWrite(fd
,"SYNC \r\n",7,5) == -1) { 
5118         redisLog(REDIS_WARNING
,"I/O error writing to MASTER: %s", 
5122     /* Read the bulk write count */ 
5123     if (syncReadLine(fd
,buf
,1024,3600) == -1) { 
5125         redisLog(REDIS_WARNING
,"I/O error reading bulk count from MASTER: %s", 
5129     if (buf
[0] != '$') { 
5131         redisLog(REDIS_WARNING
,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?"); 
5134     dumpsize 
= atoi(buf
+1); 
5135     redisLog(REDIS_NOTICE
,"Receiving %d bytes data dump from MASTER",dumpsize
); 
5136     /* Read the bulk write data on a temp file */ 
5137     snprintf(tmpfile
,256,"temp-%d.%ld.rdb",(int)time(NULL
),(long int)random()); 
5138     dfd 
= open(tmpfile
,O_CREAT
|O_WRONLY
,0644); 
5141         redisLog(REDIS_WARNING
,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno
)); 
5145         int nread
, nwritten
; 
5147         nread 
= read(fd
,buf
,(dumpsize 
< 1024)?dumpsize
:1024); 
5149             redisLog(REDIS_WARNING
,"I/O error trying to sync with MASTER: %s", 
5155         nwritten 
= write(dfd
,buf
,nread
); 
5156         if (nwritten 
== -1) { 
5157             redisLog(REDIS_WARNING
,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno
)); 
5165     if (rename(tmpfile
,server
.dbfilename
) == -1) { 
5166         redisLog(REDIS_WARNING
,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno
)); 
5172     if (rdbLoad(server
.dbfilename
) != REDIS_OK
) { 
5173         redisLog(REDIS_WARNING
,"Failed trying to load the MASTER synchronization DB from disk"); 
5177     server
.master 
= createClient(fd
); 
5178     server
.master
->flags 
|= REDIS_MASTER
; 
5179     server
.replstate 
= REDIS_REPL_CONNECTED
; 
5183 static void slaveofCommand(redisClient 
*c
) { 
5184     if (!strcasecmp(c
->argv
[1]->ptr
,"no") && 
5185         !strcasecmp(c
->argv
[2]->ptr
,"one")) { 
5186         if (server
.masterhost
) { 
5187             sdsfree(server
.masterhost
); 
5188             server
.masterhost 
= NULL
; 
5189             if (server
.master
) freeClient(server
.master
); 
5190             server
.replstate 
= REDIS_REPL_NONE
; 
5191             redisLog(REDIS_NOTICE
,"MASTER MODE enabled (user request)"); 
5194         sdsfree(server
.masterhost
); 
5195         server
.masterhost 
= sdsdup(c
->argv
[1]->ptr
); 
5196         server
.masterport 
= atoi(c
->argv
[2]->ptr
); 
5197         if (server
.master
) freeClient(server
.master
); 
5198         server
.replstate 
= REDIS_REPL_CONNECT
; 
5199         redisLog(REDIS_NOTICE
,"SLAVE OF %s:%d enabled (user request)", 
5200             server
.masterhost
, server
.masterport
); 
5202     addReply(c
,shared
.ok
); 
5205 /* ============================ Maxmemory directive  ======================== */ 
5207 /* This function gets called when 'maxmemory' is set on the config file to limit 
5208  * the max memory used by the server, and we are out of memory. 
5209  * This function will try to, in order: 
5211  * - Free objects from the free list 
5212  * - Try to remove keys with an EXPIRE set 
5214  * It is not possible to free enough memory to reach used-memory < maxmemory 
5215  * the server will start refusing commands that will enlarge even more the 
5218 static void freeMemoryIfNeeded(void) { 
5219     while (server
.maxmemory 
&& zmalloc_used_memory() > server
.maxmemory
) { 
5220         if (listLength(server
.objfreelist
)) { 
5223             listNode 
*head 
= listFirst(server
.objfreelist
); 
5224             o 
= listNodeValue(head
); 
5225             listDelNode(server
.objfreelist
,head
); 
5228             int j
, k
, freed 
= 0; 
5230             for (j 
= 0; j 
< server
.dbnum
; j
++) { 
5232                 robj 
*minkey 
= NULL
; 
5233                 struct dictEntry 
*de
; 
5235                 if (dictSize(server
.db
[j
].expires
)) { 
5237                     /* From a sample of three keys drop the one nearest to 
5238                      * the natural expire */ 
5239                     for (k 
= 0; k 
< 3; k
++) { 
5242                         de 
= dictGetRandomKey(server
.db
[j
].expires
); 
5243                         t 
= (time_t) dictGetEntryVal(de
); 
5244                         if (minttl 
== -1 || t 
< minttl
) { 
5245                             minkey 
= dictGetEntryKey(de
); 
5249                     deleteKey(server
.db
+j
,minkey
); 
5252             if (!freed
) return; /* nothing to free... */ 
5257 /* ============================== Append Only file ========================== */ 
5259 static void feedAppendOnlyFile(struct redisCommand 
*cmd
, int dictid
, robj 
**argv
, int argc
) { 
5260     sds buf 
= sdsempty(); 
5266     /* The DB this command was targetting is not the same as the last command 
5267      * we appendend. To issue a SELECT command is needed. */ 
5268     if (dictid 
!= server
.appendseldb
) { 
5271         snprintf(seldb
,sizeof(seldb
),"%d",dictid
); 
5272         buf 
= sdscatprintf(buf
,"*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n", 
5273             strlen(seldb
),seldb
); 
5274         server
.appendseldb 
= dictid
; 
5277     /* "Fix" the argv vector if the command is EXPIRE. We want to translate 
5278      * EXPIREs into EXPIREATs calls */ 
5279     if (cmd
->proc 
== expireCommand
) { 
5282         tmpargv
[0] = createStringObject("EXPIREAT",8); 
5283         tmpargv
[1] = argv
[1]; 
5284         incrRefCount(argv
[1]); 
5285         when 
= time(NULL
)+strtol(argv
[2]->ptr
,NULL
,10); 
5286         tmpargv
[2] = createObject(REDIS_STRING
, 
5287             sdscatprintf(sdsempty(),"%ld",when
)); 
5291     /* Append the actual command */ 
5292     buf 
= sdscatprintf(buf
,"*%d\r\n",argc
); 
5293     for (j 
= 0; j 
< argc
; j
++) { 
5296         if (o
->encoding 
!= REDIS_ENCODING_RAW
) 
5297             o 
= getDecodedObject(o
); 
5298         buf 
= sdscatprintf(buf
,"$%d\r\n",sdslen(o
->ptr
)); 
5299         buf 
= sdscatlen(buf
,o
->ptr
,sdslen(o
->ptr
)); 
5300         buf 
= sdscatlen(buf
,"\r\n",2); 
5305     /* Free the objects from the modified argv for EXPIREAT */ 
5306     if (cmd
->proc 
== expireCommand
) { 
5307         for (j 
= 0; j 
< 3; j
++) 
5308             decrRefCount(argv
[j
]); 
5311     /* We want to perform a single write. This should be guaranteed atomic 
5312      * at least if the filesystem we are writing is a real physical one. 
5313      * While this will save us against the server being killed I don't think 
5314      * there is much to do about the whole server stopping for power problems 
5316      nwritten 
= write(server
.appendfd
,buf
,sdslen(buf
)); 
5317      if (nwritten 
!= (signed)sdslen(buf
)) { 
5318         /* Ooops, we are in troubles. The best thing to do for now is 
5319          * to simply exit instead to give the illusion that everything is 
5320          * working as expected. */ 
5321          if (nwritten 
== -1) { 
5322             redisLog(REDIS_WARNING
,"Exiting on error writing to the append-only file: %s",strerror(errno
)); 
5324             redisLog(REDIS_WARNING
,"Exiting on short write while writing to the append-only file: %s",strerror(errno
)); 
5329     if (server
.appendfsync 
== APPENDFSYNC_ALWAYS 
|| 
5330         (server
.appendfsync 
== APPENDFSYNC_EVERYSEC 
&& 
5331          now
-server
.lastfsync 
> 1)) 
5333         fsync(server
.appendfd
); /* Let's try to get this data on the disk */ 
5334         server
.lastfsync 
= now
; 
5338 /* In Redis commands are always executed in the context of a client, so in 
5339  * order to load the append only file we need to create a fake client. */ 
5340 static struct redisClient 
*createFakeClient(void) { 
5341     struct redisClient 
*c 
= zmalloc(sizeof(*c
)); 
5345     c
->querybuf 
= sdsempty(); 
5349     /* We set the fake client as a slave waiting for the synchronization 
5350      * so that Redis will not try to send replies to this client. */ 
5351     c
->replstate 
= REDIS_REPL_WAIT_BGSAVE_START
; 
5352     c
->reply 
= listCreate(); 
5353     listSetFreeMethod(c
->reply
,decrRefCount
); 
5354     listSetDupMethod(c
->reply
,dupClientReplyValue
); 
5358 static void freeFakeClient(struct redisClient 
*c
) { 
5359     sdsfree(c
->querybuf
); 
5360     listRelease(c
->reply
); 
5364 /* Replay the append log file. On error REDIS_OK is returned. On non fatal 
5365  * error (the append only file is zero-length) REDIS_ERR is returned. On 
5366  * fatal error an error message is logged and the program exists. */ 
5367 int loadAppendOnlyFile(char *filename
) { 
5368     struct redisClient 
*fakeClient
; 
5369     FILE *fp 
= fopen(filename
,"r"); 
5370     struct redis_stat sb
; 
5372     if (redis_fstat(fileno(fp
),&sb
) != -1 && sb
.st_size 
== 0) 
5376         redisLog(REDIS_WARNING
,"Fatal error: can't open the append log file for reading: %s",strerror(errno
)); 
5380     fakeClient 
= createFakeClient(); 
5387         struct redisCommand 
*cmd
; 
5389         if (fgets(buf
,sizeof(buf
),fp
) == NULL
) { 
5395         if (buf
[0] != '*') goto fmterr
; 
5397         argv 
= zmalloc(sizeof(robj
*)*argc
); 
5398         for (j 
= 0; j 
< argc
; j
++) { 
5399             if (fgets(buf
,sizeof(buf
),fp
) == NULL
) goto readerr
; 
5400             if (buf
[0] != '$') goto fmterr
; 
5401             len 
= strtol(buf
+1,NULL
,10); 
5402             argsds 
= sdsnewlen(NULL
,len
); 
5403             if (fread(argsds
,len
,1,fp
) == 0) goto fmterr
; 
5404             argv
[j
] = createObject(REDIS_STRING
,argsds
); 
5405             if (fread(buf
,2,1,fp
) == 0) goto fmterr
; /* discard CRLF */ 
5408         /* Command lookup */ 
5409         cmd 
= lookupCommand(argv
[0]->ptr
); 
5411             redisLog(REDIS_WARNING
,"Unknown command '%s' reading the append only file", argv
[0]->ptr
); 
5414         /* Try object sharing and encoding */ 
5415         if (server
.shareobjects
) { 
5417             for(j 
= 1; j 
< argc
; j
++) 
5418                 argv
[j
] = tryObjectSharing(argv
[j
]); 
5420         if (cmd
->flags 
& REDIS_CMD_BULK
) 
5421             tryObjectEncoding(argv
[argc
-1]); 
5422         /* Run the command in the context of a fake client */ 
5423         fakeClient
->argc 
= argc
; 
5424         fakeClient
->argv 
= argv
; 
5425         cmd
->proc(fakeClient
); 
5426         /* Discard the reply objects list from the fake client */ 
5427         while(listLength(fakeClient
->reply
)) 
5428             listDelNode(fakeClient
->reply
,listFirst(fakeClient
->reply
)); 
5429         /* Clean up, ready for the next command */ 
5430         for (j 
= 0; j 
< argc
; j
++) decrRefCount(argv
[j
]); 
5434     freeFakeClient(fakeClient
); 
5439         redisLog(REDIS_WARNING
,"Unexpected end of file reading the append only file"); 
5441         redisLog(REDIS_WARNING
,"Unrecoverable error reading the append only file: %s", strerror(errno
)); 
5445     redisLog(REDIS_WARNING
,"Bad file format reading the append only file"); 
5449 /* ================================= Debugging ============================== */ 
5451 static void debugCommand(redisClient 
*c
) { 
5452     if (!strcasecmp(c
->argv
[1]->ptr
,"segfault")) { 
5454     } else if (!strcasecmp(c
->argv
[1]->ptr
,"object") && c
->argc 
== 3) { 
5455         dictEntry 
*de 
= dictFind(c
->db
->dict
,c
->argv
[2]); 
5459             addReply(c
,shared
.nokeyerr
); 
5462         key 
= dictGetEntryKey(de
); 
5463         val 
= dictGetEntryVal(de
); 
5464         addReplySds(c
,sdscatprintf(sdsempty(), 
5465             "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n", 
5466                 key
, key
->refcount
, val
, val
->refcount
, val
->encoding
)); 
5468         addReplySds(c
,sdsnew( 
5469             "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>]\r\n")); 
5473 #ifdef HAVE_BACKTRACE 
5474 static struct redisFunctionSym symsTable
[] = { 
5475 {"compareStringObjects", (unsigned long)compareStringObjects
}, 
5476 {"isStringRepresentableAsLong", (unsigned long)isStringRepresentableAsLong
}, 
5477 {"dictEncObjKeyCompare", (unsigned long)dictEncObjKeyCompare
}, 
5478 {"dictEncObjHash", (unsigned long)dictEncObjHash
}, 
5479 {"incrDecrCommand", (unsigned long)incrDecrCommand
}, 
5480 {"freeStringObject", (unsigned long)freeStringObject
}, 
5481 {"freeListObject", (unsigned long)freeListObject
}, 
5482 {"freeSetObject", (unsigned long)freeSetObject
}, 
5483 {"decrRefCount", (unsigned long)decrRefCount
}, 
5484 {"createObject", (unsigned long)createObject
}, 
5485 {"freeClient", (unsigned long)freeClient
}, 
5486 {"rdbLoad", (unsigned long)rdbLoad
}, 
5487 {"rdbSaveStringObject", (unsigned long)rdbSaveStringObject
}, 
5488 {"rdbSaveStringObjectRaw", (unsigned long)rdbSaveStringObjectRaw
}, 
5489 {"addReply", (unsigned long)addReply
}, 
5490 {"addReplySds", (unsigned long)addReplySds
}, 
5491 {"incrRefCount", (unsigned long)incrRefCount
}, 
5492 {"rdbSaveBackground", (unsigned long)rdbSaveBackground
}, 
5493 {"createStringObject", (unsigned long)createStringObject
}, 
5494 {"replicationFeedSlaves", (unsigned long)replicationFeedSlaves
}, 
5495 {"syncWithMaster", (unsigned long)syncWithMaster
}, 
5496 {"tryObjectSharing", (unsigned long)tryObjectSharing
}, 
5497 {"tryObjectEncoding", (unsigned long)tryObjectEncoding
}, 
5498 {"getDecodedObject", (unsigned long)getDecodedObject
}, 
5499 {"removeExpire", (unsigned long)removeExpire
}, 
5500 {"expireIfNeeded", (unsigned long)expireIfNeeded
}, 
5501 {"deleteIfVolatile", (unsigned long)deleteIfVolatile
}, 
5502 {"deleteKey", (unsigned long)deleteKey
}, 
5503 {"getExpire", (unsigned long)getExpire
}, 
5504 {"setExpire", (unsigned long)setExpire
}, 
5505 {"updateSlavesWaitingBgsave", (unsigned long)updateSlavesWaitingBgsave
}, 
5506 {"freeMemoryIfNeeded", (unsigned long)freeMemoryIfNeeded
}, 
5507 {"authCommand", (unsigned long)authCommand
}, 
5508 {"pingCommand", (unsigned long)pingCommand
}, 
5509 {"echoCommand", (unsigned long)echoCommand
}, 
5510 {"setCommand", (unsigned long)setCommand
}, 
5511 {"setnxCommand", (unsigned long)setnxCommand
}, 
5512 {"getCommand", (unsigned long)getCommand
}, 
5513 {"delCommand", (unsigned long)delCommand
}, 
5514 {"existsCommand", (unsigned long)existsCommand
}, 
5515 {"incrCommand", (unsigned long)incrCommand
}, 
5516 {"decrCommand", (unsigned long)decrCommand
}, 
5517 {"incrbyCommand", (unsigned long)incrbyCommand
}, 
5518 {"decrbyCommand", (unsigned long)decrbyCommand
}, 
5519 {"selectCommand", (unsigned long)selectCommand
}, 
5520 {"randomkeyCommand", (unsigned long)randomkeyCommand
}, 
5521 {"keysCommand", (unsigned long)keysCommand
}, 
5522 {"dbsizeCommand", (unsigned long)dbsizeCommand
}, 
5523 {"lastsaveCommand", (unsigned long)lastsaveCommand
}, 
5524 {"saveCommand", (unsigned long)saveCommand
}, 
5525 {"bgsaveCommand", (unsigned long)bgsaveCommand
}, 
5526 {"shutdownCommand", (unsigned long)shutdownCommand
}, 
5527 {"moveCommand", (unsigned long)moveCommand
}, 
5528 {"renameCommand", (unsigned long)renameCommand
}, 
5529 {"renamenxCommand", (unsigned long)renamenxCommand
}, 
5530 {"lpushCommand", (unsigned long)lpushCommand
}, 
5531 {"rpushCommand", (unsigned long)rpushCommand
}, 
5532 {"lpopCommand", (unsigned long)lpopCommand
}, 
5533 {"rpopCommand", (unsigned long)rpopCommand
}, 
5534 {"llenCommand", (unsigned long)llenCommand
}, 
5535 {"lindexCommand", (unsigned long)lindexCommand
}, 
5536 {"lrangeCommand", (unsigned long)lrangeCommand
}, 
5537 {"ltrimCommand", (unsigned long)ltrimCommand
}, 
5538 {"typeCommand", (unsigned long)typeCommand
}, 
5539 {"lsetCommand", (unsigned long)lsetCommand
}, 
5540 {"saddCommand", (unsigned long)saddCommand
}, 
5541 {"sremCommand", (unsigned long)sremCommand
}, 
5542 {"smoveCommand", (unsigned long)smoveCommand
}, 
5543 {"sismemberCommand", (unsigned long)sismemberCommand
}, 
5544 {"scardCommand", (unsigned long)scardCommand
}, 
5545 {"spopCommand", (unsigned long)spopCommand
}, 
5546 {"srandmemberCommand", (unsigned long)srandmemberCommand
}, 
5547 {"sinterCommand", (unsigned long)sinterCommand
}, 
5548 {"sinterstoreCommand", (unsigned long)sinterstoreCommand
}, 
5549 {"sunionCommand", (unsigned long)sunionCommand
}, 
5550 {"sunionstoreCommand", (unsigned long)sunionstoreCommand
}, 
5551 {"sdiffCommand", (unsigned long)sdiffCommand
}, 
5552 {"sdiffstoreCommand", (unsigned long)sdiffstoreCommand
}, 
5553 {"syncCommand", (unsigned long)syncCommand
}, 
5554 {"flushdbCommand", (unsigned long)flushdbCommand
}, 
5555 {"flushallCommand", (unsigned long)flushallCommand
}, 
5556 {"sortCommand", (unsigned long)sortCommand
}, 
5557 {"lremCommand", (unsigned long)lremCommand
}, 
5558 {"infoCommand", (unsigned long)infoCommand
}, 
5559 {"mgetCommand", (unsigned long)mgetCommand
}, 
5560 {"monitorCommand", (unsigned long)monitorCommand
}, 
5561 {"expireCommand", (unsigned long)expireCommand
}, 
5562 {"expireatCommand", (unsigned long)expireatCommand
}, 
5563 {"getsetCommand", (unsigned long)getsetCommand
}, 
5564 {"ttlCommand", (unsigned long)ttlCommand
}, 
5565 {"slaveofCommand", (unsigned long)slaveofCommand
}, 
5566 {"debugCommand", (unsigned long)debugCommand
}, 
5567 {"processCommand", (unsigned long)processCommand
}, 
5568 {"setupSigSegvAction", (unsigned long)setupSigSegvAction
}, 
5569 {"readQueryFromClient", (unsigned long)readQueryFromClient
}, 
5570 {"rdbRemoveTempFile", (unsigned long)rdbRemoveTempFile
}, 
5571 {"msetGenericCommand", (unsigned long)msetGenericCommand
}, 
5572 {"msetCommand", (unsigned long)msetCommand
}, 
5573 {"msetnxCommand", (unsigned long)msetnxCommand
}, 
5574 {"zslCreateNode", (unsigned long)zslCreateNode
}, 
5575 {"zslCreate", (unsigned long)zslCreate
}, 
5576 {"zslFreeNode",(unsigned long)zslFreeNode
}, 
5577 {"zslFree",(unsigned long)zslFree
}, 
5578 {"zslRandomLevel",(unsigned long)zslRandomLevel
}, 
5579 {"zslInsert",(unsigned long)zslInsert
}, 
5580 {"zslDelete",(unsigned long)zslDelete
}, 
5581 {"createZsetObject",(unsigned long)createZsetObject
}, 
5582 {"zaddCommand",(unsigned long)zaddCommand
}, 
5583 {"zrangeGenericCommand",(unsigned long)zrangeGenericCommand
}, 
5584 {"zrangeCommand",(unsigned long)zrangeCommand
}, 
5585 {"zrevrangeCommand",(unsigned long)zrevrangeCommand
}, 
5586 {"zremCommand",(unsigned long)zremCommand
}, 
5587 {"rdbSaveDoubleValue",(unsigned long)rdbSaveDoubleValue
}, 
5588 {"rdbLoadDoubleValue",(unsigned long)rdbLoadDoubleValue
}, 
5589 {"feedAppendOnlyFile",(unsigned long)feedAppendOnlyFile
}, 
5593 /* This function try to convert a pointer into a function name. It's used in 
5594  * oreder to provide a backtrace under segmentation fault that's able to 
5595  * display functions declared as static (otherwise the backtrace is useless). */ 
5596 static char *findFuncName(void *pointer
, unsigned long *offset
){ 
5598     unsigned long off
, minoff 
= 0; 
5600     /* Try to match against the Symbol with the smallest offset */ 
5601     for (i
=0; symsTable
[i
].pointer
; i
++) { 
5602         unsigned long lp 
= (unsigned long) pointer
; 
5604         if (lp 
!= (unsigned long)-1 && lp 
>= symsTable
[i
].pointer
) { 
5605             off
=lp
-symsTable
[i
].pointer
; 
5606             if (ret 
< 0 || off 
< minoff
) { 
5612     if (ret 
== -1) return NULL
; 
5614     return symsTable
[ret
].name
; 
5617 static void *getMcontextEip(ucontext_t 
*uc
) { 
5618 #if defined(__FreeBSD__) 
5619     return (void*) uc
->uc_mcontext
.mc_eip
; 
5620 #elif defined(__dietlibc__) 
5621     return (void*) uc
->uc_mcontext
.eip
; 
5622 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6) 
5623     return (void*) uc
->uc_mcontext
->__ss
.__eip
; 
5624 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6) 
5625   #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__) 
5626     return (void*) uc
->uc_mcontext
->__ss
.__rip
; 
5628     return (void*) uc
->uc_mcontext
->__ss
.__eip
; 
5630 #elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */ 
5631     return (void*) uc
->uc_mcontext
.gregs
[REG_EIP
]; 
5632 #elif defined(__ia64__) /* Linux IA64 */ 
5633     return (void*) uc
->uc_mcontext
.sc_ip
; 
5639 static void segvHandler(int sig
, siginfo_t 
*info
, void *secret
) { 
5641     char **messages 
= NULL
; 
5642     int i
, trace_size 
= 0; 
5643     unsigned long offset
=0; 
5644     time_t uptime 
= time(NULL
)-server
.stat_starttime
; 
5645     ucontext_t 
*uc 
= (ucontext_t
*) secret
; 
5646     REDIS_NOTUSED(info
); 
5648     redisLog(REDIS_WARNING
, 
5649         "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION
, sig
); 
5650     redisLog(REDIS_WARNING
, "%s", sdscatprintf(sdsempty(), 
5651         "redis_version:%s; " 
5652         "uptime_in_seconds:%d; " 
5653         "connected_clients:%d; " 
5654         "connected_slaves:%d; " 
5656         "changes_since_last_save:%lld; " 
5657         "bgsave_in_progress:%d; " 
5658         "last_save_time:%d; " 
5659         "total_connections_received:%lld; " 
5660         "total_commands_processed:%lld; " 
5664         listLength(server
.clients
)-listLength(server
.slaves
), 
5665         listLength(server
.slaves
), 
5668         server
.bgsaveinprogress
, 
5670         server
.stat_numconnections
, 
5671         server
.stat_numcommands
, 
5672         server
.masterhost 
== NULL 
? "master" : "slave" 
5675     trace_size 
= backtrace(trace
, 100); 
5676     /* overwrite sigaction with caller's address */ 
5677     if (getMcontextEip(uc
) != NULL
) { 
5678         trace
[1] = getMcontextEip(uc
); 
5680     messages 
= backtrace_symbols(trace
, trace_size
); 
5682     for (i
=1; i
<trace_size
; ++i
) { 
5683         char *fn 
= findFuncName(trace
[i
], &offset
), *p
; 
5685         p 
= strchr(messages
[i
],'+'); 
5686         if (!fn 
|| (p 
&& ((unsigned long)strtol(p
+1,NULL
,10)) < offset
)) { 
5687             redisLog(REDIS_WARNING
,"%s", messages
[i
]); 
5689             redisLog(REDIS_WARNING
,"%d redis-server %p %s + %d", i
, trace
[i
], fn
, (unsigned int)offset
); 
5696 static void setupSigSegvAction(void) { 
5697     struct sigaction act
; 
5699     sigemptyset (&act
.sa_mask
); 
5700     /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction 
5701      * is used. Otherwise, sa_handler is used */ 
5702     act
.sa_flags 
= SA_NODEFER 
| SA_ONSTACK 
| SA_RESETHAND 
| SA_SIGINFO
; 
5703     act
.sa_sigaction 
= segvHandler
; 
5704     sigaction (SIGSEGV
, &act
, NULL
); 
5705     sigaction (SIGBUS
, &act
, NULL
); 
5706     sigaction (SIGFPE
, &act
, NULL
); 
5707     sigaction (SIGILL
, &act
, NULL
); 
5708     sigaction (SIGBUS
, &act
, NULL
); 
5711 #else /* HAVE_BACKTRACE */ 
5712 static void setupSigSegvAction(void) { 
5714 #endif /* HAVE_BACKTRACE */ 
5716 /* =================================== Main! ================================ */ 
5719 int linuxOvercommitMemoryValue(void) { 
5720     FILE *fp 
= fopen("/proc/sys/vm/overcommit_memory","r"); 
5724     if (fgets(buf
,64,fp
) == NULL
) { 
5733 void linuxOvercommitMemoryWarning(void) { 
5734     if (linuxOvercommitMemoryValue() == 0) { 
5735         redisLog(REDIS_WARNING
,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect."); 
5738 #endif /* __linux__ */ 
5740 static void daemonize(void) { 
5744     if (fork() != 0) exit(0); /* parent exits */ 
5745     setsid(); /* create a new session */ 
5747     /* Every output goes to /dev/null. If Redis is daemonized but 
5748      * the 'logfile' is set to 'stdout' in the configuration file 
5749      * it will not log at all. */ 
5750     if ((fd 
= open("/dev/null", O_RDWR
, 0)) != -1) { 
5751         dup2(fd
, STDIN_FILENO
); 
5752         dup2(fd
, STDOUT_FILENO
); 
5753         dup2(fd
, STDERR_FILENO
); 
5754         if (fd 
> STDERR_FILENO
) close(fd
); 
5756     /* Try to write the pid file */ 
5757     fp 
= fopen(server
.pidfile
,"w"); 
5759         fprintf(fp
,"%d\n",getpid()); 
5764 int main(int argc
, char **argv
) { 
5767         ResetServerSaveParams(); 
5768         loadServerConfig(argv
[1]); 
5769     } else if (argc 
> 2) { 
5770         fprintf(stderr
,"Usage: ./redis-server [/path/to/redis.conf]\n"); 
5773         redisLog(REDIS_WARNING
,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'"); 
5776     if (server
.daemonize
) daemonize(); 
5777     redisLog(REDIS_NOTICE
,"Server started, Redis version " REDIS_VERSION
); 
5779     linuxOvercommitMemoryWarning(); 
5781     if (server
.appendonly
) { 
5782         if (loadAppendOnlyFile(server
.appendfilename
) == REDIS_OK
) 
5783             redisLog(REDIS_NOTICE
,"DB loaded from append only file"); 
5785         if (rdbLoad(server
.dbfilename
) == REDIS_OK
) 
5786             redisLog(REDIS_NOTICE
,"DB loaded from disk"); 
5788     if (aeCreateFileEvent(server
.el
, server
.fd
, AE_READABLE
, 
5789         acceptHandler
, NULL
, NULL
) == AE_ERR
) oom("creating file event"); 
5790     redisLog(REDIS_NOTICE
,"The server is now ready to accept connections on port %d", server
.port
); 
5792     aeDeleteEventLoop(server
.el
);