2  * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com> 
   5  * Redistribution and use in source and binary forms, with or without 
   6  * modification, are permitted provided that the following conditions are met: 
   8  *   * Redistributions of source code must retain the above copyright notice, 
   9  *     this list of conditions and the following disclaimer. 
  10  *   * Redistributions in binary form must reproduce the above copyright 
  11  *     notice, this list of conditions and the following disclaimer in the 
  12  *     documentation and/or other materials provided with the distribution. 
  13  *   * Neither the name of Redis nor the names of its contributors may be used 
  14  *     to endorse or promote products derived from this software without 
  15  *     specific prior written permission. 
  17  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 
  18  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
  19  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
  20  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
  21  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
  22  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
  23  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
  24  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
  25  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 
  26  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
  27  * POSSIBILITY OF SUCH DAMAGE. 
  30 #define REDIS_VERSION "1.050" 
  40 #define __USE_POSIX199309 
  46 #endif /* HAVE_BACKTRACE */ 
  54 #include <arpa/inet.h> 
  58 #include <sys/resource.h> 
  62 #include "ae.h"     /* Event driven programming library */ 
  63 #include "sds.h"    /* Dynamic safe strings */ 
  64 #include "anet.h"   /* Networking the easy way */ 
  65 #include "dict.h"   /* Hash tables */ 
  66 #include "adlist.h" /* Linked lists */ 
  67 #include "zmalloc.h" /* total memory usage aware version of malloc/free */ 
  68 #include "lzf.h"    /* LZF compression library */ 
  69 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */ 
  75 /* Static server configuration */ 
  76 #define REDIS_SERVERPORT        6379    /* TCP port */ 
  77 #define REDIS_MAXIDLETIME       (60*5)  /* default client timeout */ 
  78 #define REDIS_IOBUF_LEN         1024 
  79 #define REDIS_LOADBUF_LEN       1024 
  80 #define REDIS_STATIC_ARGS       4 
  81 #define REDIS_DEFAULT_DBNUM     16 
  82 #define REDIS_CONFIGLINE_MAX    1024 
  83 #define REDIS_OBJFREELIST_MAX   1000000 /* Max number of objects to cache */ 
  84 #define REDIS_MAX_SYNC_TIME     60      /* Slave can't take more to sync */ 
  85 #define REDIS_EXPIRELOOKUPS_PER_CRON    100 /* try to expire 100 keys/second */ 
  86 #define REDIS_MAX_WRITE_PER_EVENT (1024*64) 
  87 #define REDIS_REQUEST_MAX_SIZE  (1024*1024*256) /* max bytes in inline command */ 
  89 /* Hash table parameters */ 
  90 #define REDIS_HT_MINFILL        10      /* Minimal hash table fill 10% */ 
  93 #define REDIS_CMD_BULK          1       /* Bulk write command */ 
  94 #define REDIS_CMD_INLINE        2       /* Inline command */ 
  95 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with 
  96    this flags will return an error when the 'maxmemory' option is set in the 
  97    config file and the server is using more than maxmemory bytes of memory. 
  98    In short this commands are denied on low memory conditions. */ 
  99 #define REDIS_CMD_DENYOOM       4 
 102 #define REDIS_STRING 0 
 108 /* Objects encoding */ 
 109 #define REDIS_ENCODING_RAW 0    /* Raw representation */ 
 110 #define REDIS_ENCODING_INT 1    /* Encoded as integer */ 
 112 /* Object types only used for dumping to disk */ 
 113 #define REDIS_EXPIRETIME 253 
 114 #define REDIS_SELECTDB 254 
 115 #define REDIS_EOF 255 
 117 /* Defines related to the dump file format. To store 32 bits lengths for short 
 118  * keys requires a lot of space, so we check the most significant 2 bits of 
 119  * the first byte to interpreter the length: 
 121  * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte 
 122  * 01|000000 00000000 =>  01, the len is 14 byes, 6 bits + 8 bits of next byte 
 123  * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow 
 124  * 11|000000 this means: specially encoded object will follow. The six bits 
 125  *           number specify the kind of object that follows. 
 126  *           See the REDIS_RDB_ENC_* defines. 
 128  * Lenghts up to 63 are stored using a single byte, most DB keys, and may 
 129  * values, will fit inside. */ 
 130 #define REDIS_RDB_6BITLEN 0 
 131 #define REDIS_RDB_14BITLEN 1 
 132 #define REDIS_RDB_32BITLEN 2 
 133 #define REDIS_RDB_ENCVAL 3 
 134 #define REDIS_RDB_LENERR UINT_MAX 
 136 /* When a length of a string object stored on disk has the first two bits 
 137  * set, the remaining two bits specify a special encoding for the object 
 138  * accordingly to the following defines: */ 
 139 #define REDIS_RDB_ENC_INT8 0        /* 8 bit signed integer */ 
 140 #define REDIS_RDB_ENC_INT16 1       /* 16 bit signed integer */ 
 141 #define REDIS_RDB_ENC_INT32 2       /* 32 bit signed integer */ 
 142 #define REDIS_RDB_ENC_LZF 3         /* string compressed with FASTLZ */ 
 145 #define REDIS_CLOSE 1       /* This client connection should be closed ASAP */ 
 146 #define REDIS_SLAVE 2       /* This client is a slave server */ 
 147 #define REDIS_MASTER 4      /* This client is a master server */ 
 148 #define REDIS_MONITOR 8      /* This client is a slave monitor, see MONITOR */ 
 150 /* Slave replication state - slave side */ 
 151 #define REDIS_REPL_NONE 0   /* No active replication */ 
 152 #define REDIS_REPL_CONNECT 1    /* Must connect to master */ 
 153 #define REDIS_REPL_CONNECTED 2  /* Connected to master */ 
 155 /* Slave replication state - from the point of view of master 
 156  * Note that in SEND_BULK and ONLINE state the slave receives new updates 
 157  * in its output queue. In the WAIT_BGSAVE state instead the server is waiting 
 158  * to start the next background saving in order to send updates to it. */ 
 159 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */ 
 160 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */ 
 161 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */ 
 162 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */ 
 164 /* List related stuff */ 
 168 /* Sort operations */ 
 169 #define REDIS_SORT_GET 0 
 170 #define REDIS_SORT_DEL 1 
 171 #define REDIS_SORT_INCR 2 
 172 #define REDIS_SORT_DECR 3 
 173 #define REDIS_SORT_ASC 4 
 174 #define REDIS_SORT_DESC 5 
 175 #define REDIS_SORTKEY_MAX 1024 
 178 #define REDIS_DEBUG 0 
 179 #define REDIS_NOTICE 1 
 180 #define REDIS_WARNING 2 
 182 /* Anti-warning macro... */ 
 183 #define REDIS_NOTUSED(V) ((void) V) 
 185 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */ 
 186 #define ZSKIPLIST_P 0.25      /* Skiplist P = 1/4 */ 
 188 /*================================= Data types ============================== */ 
 190 /* A redis object, that is a type able to hold a string / list / set */ 
 191 typedef struct redisObject 
{ 
 194     unsigned char encoding
; 
 195     unsigned char notused
[2]; 
 199 typedef struct redisDb 
{ 
 205 /* With multiplexing we need to take per-clinet state. 
 206  * Clients are taken in a liked list. */ 
 207 typedef struct redisClient 
{ 
 212     robj 
**argv
, **mbargv
; 
 214     int bulklen
;            /* bulk read len. -1 if not in bulk read mode */ 
 215     int multibulk
;          /* multi bulk command format active */ 
 218     time_t lastinteraction
; /* time of the last interaction, used for timeout */ 
 219     int flags
;              /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */ 
 220     int slaveseldb
;         /* slave selected db, if this client is a slave */ 
 221     int authenticated
;      /* when requirepass is non-NULL */ 
 222     int replstate
;          /* replication state if this is a slave */ 
 223     int repldbfd
;           /* replication DB file descriptor */ 
 224     long repldboff
;          /* replication DB file offset */ 
 225     off_t repldbsize
;       /* replication DB file size */ 
 233 /* Global server state structure */ 
 239     unsigned int sharingpoolsize
; 
 240     long long dirty
;            /* changes to DB from the last save */ 
 242     list 
*slaves
, *monitors
; 
 243     char neterr
[ANET_ERR_LEN
]; 
 245     int cronloops
;              /* number of times the cron function run */ 
 246     list 
*objfreelist
;          /* A list of freed objects to avoid malloc() */ 
 247     time_t lastsave
;            /* Unix time of last save succeeede */ 
 248     size_t usedmemory
;             /* Used memory in megabytes */ 
 249     /* Fields used only for stats */ 
 250     time_t stat_starttime
;         /* server start time */ 
 251     long long stat_numcommands
;    /* number of processed commands */ 
 252     long long stat_numconnections
; /* number of connections received */ 
 260     int bgsaveinprogress
; 
 261     pid_t bgsavechildpid
; 
 262     struct saveparam 
*saveparams
; 
 269     /* Replication related */ 
 273     redisClient 
*master
;    /* client that is master for this slave */ 
 275     unsigned int maxclients
; 
 276     unsigned long maxmemory
; 
 277     /* Sort parameters - qsort_r() is only available under BSD so we 
 278      * have to take this state global, in order to pass it to sortCompare() */ 
 284 typedef void redisCommandProc(redisClient 
*c
); 
 285 struct redisCommand 
{ 
 287     redisCommandProc 
*proc
; 
 292 struct redisFunctionSym 
{ 
 294     unsigned long pointer
; 
 297 typedef struct _redisSortObject 
{ 
 305 typedef struct _redisSortOperation 
{ 
 308 } redisSortOperation
; 
 310 /* ZSETs use a specialized version of Skiplists */ 
 312 typedef struct zskiplistNode 
{ 
 313     struct zskiplistNode 
**forward
; 
 318 typedef struct zskiplist 
{ 
 319     struct zskiplistNode 
*header
; 
 324 typedef struct zset 
{ 
 329 /* Our shared "common" objects */ 
 331 struct sharedObjectsStruct 
{ 
 332     robj 
*crlf
, *ok
, *err
, *emptybulk
, *czero
, *cone
, *pong
, *space
, 
 333     *colon
, *nullbulk
, *nullmultibulk
, 
 334     *emptymultibulk
, *wrongtypeerr
, *nokeyerr
, *syntaxerr
, *sameobjecterr
, 
 335     *outofrangeerr
, *plus
, 
 336     *select0
, *select1
, *select2
, *select3
, *select4
, 
 337     *select5
, *select6
, *select7
, *select8
, *select9
; 
 340 /*================================ Prototypes =============================== */ 
 342 static void freeStringObject(robj 
*o
); 
 343 static void freeListObject(robj 
*o
); 
 344 static void freeSetObject(robj 
*o
); 
 345 static void decrRefCount(void *o
); 
 346 static robj 
*createObject(int type
, void *ptr
); 
 347 static void freeClient(redisClient 
*c
); 
 348 static int rdbLoad(char *filename
); 
 349 static void addReply(redisClient 
*c
, robj 
*obj
); 
 350 static void addReplySds(redisClient 
*c
, sds s
); 
 351 static void incrRefCount(robj 
*o
); 
 352 static int rdbSaveBackground(char *filename
); 
 353 static robj 
*createStringObject(char *ptr
, size_t len
); 
 354 static void replicationFeedSlaves(list 
*slaves
, struct redisCommand 
*cmd
, int dictid
, robj 
**argv
, int argc
); 
 355 static int syncWithMaster(void); 
 356 static robj 
*tryObjectSharing(robj 
*o
); 
 357 static int tryObjectEncoding(robj 
*o
); 
 358 static robj 
*getDecodedObject(const robj 
*o
); 
 359 static int removeExpire(redisDb 
*db
, robj 
*key
); 
 360 static int expireIfNeeded(redisDb 
*db
, robj 
*key
); 
 361 static int deleteIfVolatile(redisDb 
*db
, robj 
*key
); 
 362 static int deleteKey(redisDb 
*db
, robj 
*key
); 
 363 static time_t getExpire(redisDb 
*db
, robj 
*key
); 
 364 static int setExpire(redisDb 
*db
, robj 
*key
, time_t when
); 
 365 static void updateSlavesWaitingBgsave(int bgsaveerr
); 
 366 static void freeMemoryIfNeeded(void); 
 367 static int processCommand(redisClient 
*c
); 
 368 static void setupSigSegvAction(void); 
 369 static void rdbRemoveTempFile(pid_t childpid
); 
 370 static size_t stringObjectLen(robj 
*o
); 
 371 static void processInputBuffer(redisClient 
*c
); 
 372 static zskiplist 
*zslCreate(void); 
 373 static void zslFree(zskiplist 
*zsl
); 
 375 static void authCommand(redisClient 
*c
); 
 376 static void pingCommand(redisClient 
*c
); 
 377 static void echoCommand(redisClient 
*c
); 
 378 static void setCommand(redisClient 
*c
); 
 379 static void setnxCommand(redisClient 
*c
); 
 380 static void getCommand(redisClient 
*c
); 
 381 static void delCommand(redisClient 
*c
); 
 382 static void existsCommand(redisClient 
*c
); 
 383 static void incrCommand(redisClient 
*c
); 
 384 static void decrCommand(redisClient 
*c
); 
 385 static void incrbyCommand(redisClient 
*c
); 
 386 static void decrbyCommand(redisClient 
*c
); 
 387 static void selectCommand(redisClient 
*c
); 
 388 static void randomkeyCommand(redisClient 
*c
); 
 389 static void keysCommand(redisClient 
*c
); 
 390 static void dbsizeCommand(redisClient 
*c
); 
 391 static void lastsaveCommand(redisClient 
*c
); 
 392 static void saveCommand(redisClient 
*c
); 
 393 static void bgsaveCommand(redisClient 
*c
); 
 394 static void shutdownCommand(redisClient 
*c
); 
 395 static void moveCommand(redisClient 
*c
); 
 396 static void renameCommand(redisClient 
*c
); 
 397 static void renamenxCommand(redisClient 
*c
); 
 398 static void lpushCommand(redisClient 
*c
); 
 399 static void rpushCommand(redisClient 
*c
); 
 400 static void lpopCommand(redisClient 
*c
); 
 401 static void rpopCommand(redisClient 
*c
); 
 402 static void llenCommand(redisClient 
*c
); 
 403 static void lindexCommand(redisClient 
*c
); 
 404 static void lrangeCommand(redisClient 
*c
); 
 405 static void ltrimCommand(redisClient 
*c
); 
 406 static void typeCommand(redisClient 
*c
); 
 407 static void lsetCommand(redisClient 
*c
); 
 408 static void saddCommand(redisClient 
*c
); 
 409 static void sremCommand(redisClient 
*c
); 
 410 static void smoveCommand(redisClient 
*c
); 
 411 static void sismemberCommand(redisClient 
*c
); 
 412 static void scardCommand(redisClient 
*c
); 
 413 static void spopCommand(redisClient 
*c
); 
 414 static void srandmemberCommand(redisClient 
*c
); 
 415 static void sinterCommand(redisClient 
*c
); 
 416 static void sinterstoreCommand(redisClient 
*c
); 
 417 static void sunionCommand(redisClient 
*c
); 
 418 static void sunionstoreCommand(redisClient 
*c
); 
 419 static void sdiffCommand(redisClient 
*c
); 
 420 static void sdiffstoreCommand(redisClient 
*c
); 
 421 static void syncCommand(redisClient 
*c
); 
 422 static void flushdbCommand(redisClient 
*c
); 
 423 static void flushallCommand(redisClient 
*c
); 
 424 static void sortCommand(redisClient 
*c
); 
 425 static void lremCommand(redisClient 
*c
); 
 426 static void infoCommand(redisClient 
*c
); 
 427 static void mgetCommand(redisClient 
*c
); 
 428 static void monitorCommand(redisClient 
*c
); 
 429 static void expireCommand(redisClient 
*c
); 
 430 static void getsetCommand(redisClient 
*c
); 
 431 static void ttlCommand(redisClient 
*c
); 
 432 static void slaveofCommand(redisClient 
*c
); 
 433 static void debugCommand(redisClient 
*c
); 
 434 static void msetCommand(redisClient 
*c
); 
 435 static void msetnxCommand(redisClient 
*c
); 
 436 static void zaddCommand(redisClient 
*c
); 
 437 static void zrangeCommand(redisClient 
*c
); 
 439 /*================================= Globals ================================= */ 
 442 static struct redisServer server
; /* server global state */ 
 443 static struct redisCommand cmdTable
[] = { 
 444     {"get",getCommand
,2,REDIS_CMD_INLINE
}, 
 445     {"set",setCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
}, 
 446     {"setnx",setnxCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
}, 
 447     {"del",delCommand
,-2,REDIS_CMD_INLINE
}, 
 448     {"exists",existsCommand
,2,REDIS_CMD_INLINE
}, 
 449     {"incr",incrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
}, 
 450     {"decr",decrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
}, 
 451     {"mget",mgetCommand
,-2,REDIS_CMD_INLINE
}, 
 452     {"rpush",rpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
}, 
 453     {"lpush",lpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
}, 
 454     {"rpop",rpopCommand
,2,REDIS_CMD_INLINE
}, 
 455     {"lpop",lpopCommand
,2,REDIS_CMD_INLINE
}, 
 456     {"llen",llenCommand
,2,REDIS_CMD_INLINE
}, 
 457     {"lindex",lindexCommand
,3,REDIS_CMD_INLINE
}, 
 458     {"lset",lsetCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
}, 
 459     {"lrange",lrangeCommand
,4,REDIS_CMD_INLINE
}, 
 460     {"ltrim",ltrimCommand
,4,REDIS_CMD_INLINE
}, 
 461     {"lrem",lremCommand
,4,REDIS_CMD_BULK
}, 
 462     {"sadd",saddCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
}, 
 463     {"srem",sremCommand
,3,REDIS_CMD_BULK
}, 
 464     {"smove",smoveCommand
,4,REDIS_CMD_BULK
}, 
 465     {"sismember",sismemberCommand
,3,REDIS_CMD_BULK
}, 
 466     {"scard",scardCommand
,2,REDIS_CMD_INLINE
}, 
 467     {"spop",spopCommand
,2,REDIS_CMD_INLINE
}, 
 468     {"srandmember",srandmemberCommand
,2,REDIS_CMD_INLINE
}, 
 469     {"sinter",sinterCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
}, 
 470     {"sinterstore",sinterstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
}, 
 471     {"sunion",sunionCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
}, 
 472     {"sunionstore",sunionstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
}, 
 473     {"sdiff",sdiffCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
}, 
 474     {"sdiffstore",sdiffstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
}, 
 475     {"smembers",sinterCommand
,2,REDIS_CMD_INLINE
}, 
 476     {"zadd",zaddCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
}, 
 477     {"zrange",zrangeCommand
,4,REDIS_CMD_INLINE
}, 
 478     {"incrby",incrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
}, 
 479     {"decrby",decrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
}, 
 480     {"getset",getsetCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
}, 
 481     {"mset",msetCommand
,-3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
}, 
 482     {"msetnx",msetnxCommand
,-3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
}, 
 483     {"randomkey",randomkeyCommand
,1,REDIS_CMD_INLINE
}, 
 484     {"select",selectCommand
,2,REDIS_CMD_INLINE
}, 
 485     {"move",moveCommand
,3,REDIS_CMD_INLINE
}, 
 486     {"rename",renameCommand
,3,REDIS_CMD_INLINE
}, 
 487     {"renamenx",renamenxCommand
,3,REDIS_CMD_INLINE
}, 
 488     {"expire",expireCommand
,3,REDIS_CMD_INLINE
}, 
 489     {"keys",keysCommand
,2,REDIS_CMD_INLINE
}, 
 490     {"dbsize",dbsizeCommand
,1,REDIS_CMD_INLINE
}, 
 491     {"auth",authCommand
,2,REDIS_CMD_INLINE
}, 
 492     {"ping",pingCommand
,1,REDIS_CMD_INLINE
}, 
 493     {"echo",echoCommand
,2,REDIS_CMD_BULK
}, 
 494     {"save",saveCommand
,1,REDIS_CMD_INLINE
}, 
 495     {"bgsave",bgsaveCommand
,1,REDIS_CMD_INLINE
}, 
 496     {"shutdown",shutdownCommand
,1,REDIS_CMD_INLINE
}, 
 497     {"lastsave",lastsaveCommand
,1,REDIS_CMD_INLINE
}, 
 498     {"type",typeCommand
,2,REDIS_CMD_INLINE
}, 
 499     {"sync",syncCommand
,1,REDIS_CMD_INLINE
}, 
 500     {"flushdb",flushdbCommand
,1,REDIS_CMD_INLINE
}, 
 501     {"flushall",flushallCommand
,1,REDIS_CMD_INLINE
}, 
 502     {"sort",sortCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
}, 
 503     {"info",infoCommand
,1,REDIS_CMD_INLINE
}, 
 504     {"monitor",monitorCommand
,1,REDIS_CMD_INLINE
}, 
 505     {"ttl",ttlCommand
,2,REDIS_CMD_INLINE
}, 
 506     {"slaveof",slaveofCommand
,3,REDIS_CMD_INLINE
}, 
 507     {"debug",debugCommand
,-2,REDIS_CMD_INLINE
}, 
 510 /*============================ Utility functions ============================ */ 
 512 /* Glob-style pattern matching. */ 
 513 int stringmatchlen(const char *pattern
, int patternLen
, 
 514         const char *string
, int stringLen
, int nocase
) 
 519             while (pattern
[1] == '*') { 
 524                 return 1; /* match */ 
 526                 if (stringmatchlen(pattern
+1, patternLen
-1, 
 527                             string
, stringLen
, nocase
)) 
 528                     return 1; /* match */ 
 532             return 0; /* no match */ 
 536                 return 0; /* no match */ 
 546             not = pattern
[0] == '^'; 
 553                 if (pattern
[0] == '\\') { 
 556                     if (pattern
[0] == string
[0]) 
 558                 } else if (pattern
[0] == ']') { 
 560                 } else if (patternLen 
== 0) { 
 564                 } else if (pattern
[1] == '-' && patternLen 
>= 3) { 
 565                     int start 
= pattern
[0]; 
 566                     int end 
= pattern
[2]; 
 574                         start 
= tolower(start
); 
 580                     if (c 
>= start 
&& c 
<= end
) 
 584                         if (pattern
[0] == string
[0]) 
 587                         if (tolower((int)pattern
[0]) == tolower((int)string
[0])) 
 597                 return 0; /* no match */ 
 603             if (patternLen 
>= 2) { 
 610                 if (pattern
[0] != string
[0]) 
 611                     return 0; /* no match */ 
 613                 if (tolower((int)pattern
[0]) != tolower((int)string
[0])) 
 614                     return 0; /* no match */ 
 622         if (stringLen 
== 0) { 
 623             while(*pattern 
== '*') { 
 630     if (patternLen 
== 0 && stringLen 
== 0) 
 635 static void redisLog(int level
, const char *fmt
, ...) { 
 639     fp 
= (server
.logfile 
== NULL
) ? stdout 
: fopen(server
.logfile
,"a"); 
 643     if (level 
>= server
.verbosity
) { 
 649         strftime(buf
,64,"%d %b %H:%M:%S",gmtime(&now
)); 
 650         fprintf(fp
,"%s %c ",buf
,c
[level
]); 
 651         vfprintf(fp
, fmt
, ap
); 
 657     if (server
.logfile
) fclose(fp
); 
 660 /*====================== Hash table type implementation  ==================== */ 
 662 /* This is an hash table type that uses the SDS dynamic strings libary as 
 663  * keys and radis objects as values (objects can hold SDS strings, 
 666 static void dictVanillaFree(void *privdata
, void *val
) 
 668     DICT_NOTUSED(privdata
); 
 672 static int sdsDictKeyCompare(void *privdata
, const void *key1
, 
 676     DICT_NOTUSED(privdata
); 
 678     l1 
= sdslen((sds
)key1
); 
 679     l2 
= sdslen((sds
)key2
); 
 680     if (l1 
!= l2
) return 0; 
 681     return memcmp(key1
, key2
, l1
) == 0; 
 684 static void dictRedisObjectDestructor(void *privdata
, void *val
) 
 686     DICT_NOTUSED(privdata
); 
 691 static int dictObjKeyCompare(void *privdata
, const void *key1
, 
 694     const robj 
*o1 
= key1
, *o2 
= key2
; 
 695     return sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
); 
 698 static unsigned int dictObjHash(const void *key
) { 
 700     return dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
)); 
 703 static int dictEncObjKeyCompare(void *privdata
, const void *key1
, 
 706     const robj 
*o1 
= key1
, *o2 
= key2
; 
 708     if (o1
->encoding 
== REDIS_ENCODING_RAW 
&& 
 709         o2
->encoding 
== REDIS_ENCODING_RAW
) 
 710         return sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
); 
 715         dec1 
= o1
->encoding 
!= REDIS_ENCODING_RAW 
? 
 716             getDecodedObject(o1
) : (robj
*)o1
; 
 717         dec2 
= o2
->encoding 
!= REDIS_ENCODING_RAW 
? 
 718             getDecodedObject(o2
) : (robj
*)o2
; 
 719         cmp 
= sdsDictKeyCompare(privdata
,dec1
->ptr
,dec2
->ptr
); 
 720         if (dec1 
!= o1
) decrRefCount(dec1
); 
 721         if (dec2 
!= o2
) decrRefCount(dec2
); 
 726 static unsigned int dictEncObjHash(const void *key
) { 
 729     if (o
->encoding 
== REDIS_ENCODING_RAW
) 
 730         return dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
)); 
 732         robj 
*dec 
= getDecodedObject(o
); 
 733         unsigned int hash 
= dictGenHashFunction(dec
->ptr
, sdslen((sds
)dec
->ptr
)); 
 739 static dictType setDictType 
= { 
 740     dictEncObjHash
,            /* hash function */ 
 743     dictEncObjKeyCompare
,      /* key compare */ 
 744     dictRedisObjectDestructor
, /* key destructor */ 
 745     NULL                       
/* val destructor */ 
 748 static dictType zsetDictType 
= { 
 749     dictEncObjHash
,            /* hash function */ 
 752     dictEncObjKeyCompare
,      /* key compare */ 
 753     dictRedisObjectDestructor
, /* key destructor */ 
 754     dictVanillaFree            
/* val destructor */ 
 757 static dictType hashDictType 
= { 
 758     dictObjHash
,                /* hash function */ 
 761     dictObjKeyCompare
,          /* key compare */ 
 762     dictRedisObjectDestructor
,  /* key destructor */ 
 763     dictRedisObjectDestructor   
/* val destructor */ 
 766 /* ========================= Random utility functions ======================= */ 
 768 /* Redis generally does not try to recover from out of memory conditions 
 769  * when allocating objects or strings, it is not clear if it will be possible 
 770  * to report this condition to the client since the networking layer itself 
 771  * is based on heap allocation for send buffers, so we simply abort. 
 772  * At least the code will be simpler to read... */ 
 773 static void oom(const char *msg
) { 
 774     fprintf(stderr
, "%s: Out of memory\n",msg
); 
 780 /* ====================== Redis server networking stuff ===================== */ 
 781 static void closeTimedoutClients(void) { 
 784     time_t now 
= time(NULL
); 
 786     listRewind(server
.clients
); 
 787     while ((ln 
= listYield(server
.clients
)) != NULL
) { 
 788         c 
= listNodeValue(ln
); 
 789         if (!(c
->flags 
& REDIS_SLAVE
) &&    /* no timeout for slaves */ 
 790             !(c
->flags 
& REDIS_MASTER
) &&   /* no timeout for masters */ 
 791              (now 
- c
->lastinteraction 
> server
.maxidletime
)) { 
 792             redisLog(REDIS_DEBUG
,"Closing idle client"); 
 798 static int htNeedsResize(dict 
*dict
) { 
 799     long long size
, used
; 
 801     size 
= dictSlots(dict
); 
 802     used 
= dictSize(dict
); 
 803     return (size 
&& used 
&& size 
> DICT_HT_INITIAL_SIZE 
&& 
 804             (used
*100/size 
< REDIS_HT_MINFILL
)); 
 807 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL 
 808  * we resize the hash table to save memory */ 
 809 static void tryResizeHashTables(void) { 
 812     for (j 
= 0; j 
< server
.dbnum
; j
++) { 
 813         if (htNeedsResize(server
.db
[j
].dict
)) { 
 814             redisLog(REDIS_DEBUG
,"The hash table %d is too sparse, resize it...",j
); 
 815             dictResize(server
.db
[j
].dict
); 
 816             redisLog(REDIS_DEBUG
,"Hash table %d resized.",j
); 
 818         if (htNeedsResize(server
.db
[j
].expires
)) 
 819             dictResize(server
.db
[j
].expires
); 
 823 static int serverCron(struct aeEventLoop 
*eventLoop
, long long id
, void *clientData
) { 
 824     int j
, loops 
= server
.cronloops
++; 
 825     REDIS_NOTUSED(eventLoop
); 
 827     REDIS_NOTUSED(clientData
); 
 829     /* Update the global state with the amount of used memory */ 
 830     server
.usedmemory 
= zmalloc_used_memory(); 
 832     /* Show some info about non-empty databases */ 
 833     for (j 
= 0; j 
< server
.dbnum
; j
++) { 
 834         long long size
, used
, vkeys
; 
 836         size 
= dictSlots(server
.db
[j
].dict
); 
 837         used 
= dictSize(server
.db
[j
].dict
); 
 838         vkeys 
= dictSize(server
.db
[j
].expires
); 
 839         if (!(loops 
% 5) && (used 
|| vkeys
)) { 
 840             redisLog(REDIS_DEBUG
,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j
,used
,vkeys
,size
); 
 841             /* dictPrintStats(server.dict); */ 
 845     /* We don't want to resize the hash tables while a bacground saving 
 846      * is in progress: the saving child is created using fork() that is 
 847      * implemented with a copy-on-write semantic in most modern systems, so 
 848      * if we resize the HT while there is the saving child at work actually 
 849      * a lot of memory movements in the parent will cause a lot of pages 
 851     if (!server
.bgsaveinprogress
) tryResizeHashTables(); 
 853     /* Show information about connected clients */ 
 855         redisLog(REDIS_DEBUG
,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects", 
 856             listLength(server
.clients
)-listLength(server
.slaves
), 
 857             listLength(server
.slaves
), 
 859             dictSize(server
.sharingpool
)); 
 862     /* Close connections of timedout clients */ 
 863     if (server
.maxidletime 
&& !(loops 
% 10)) 
 864         closeTimedoutClients(); 
 866     /* Check if a background saving in progress terminated */ 
 867     if (server
.bgsaveinprogress
) { 
 869         if (wait4(-1,&statloc
,WNOHANG
,NULL
)) { 
 870             int exitcode 
= WEXITSTATUS(statloc
); 
 871             int bysignal 
= WIFSIGNALED(statloc
); 
 873             if (!bysignal 
&& exitcode 
== 0) { 
 874                 redisLog(REDIS_NOTICE
, 
 875                     "Background saving terminated with success"); 
 877                 server
.lastsave 
= time(NULL
); 
 878             } else if (!bysignal 
&& exitcode 
!= 0) { 
 879                 redisLog(REDIS_WARNING
, "Background saving error"); 
 881                 redisLog(REDIS_WARNING
, 
 882                     "Background saving terminated by signal"); 
 883                 rdbRemoveTempFile(server
.bgsavechildpid
); 
 885             server
.bgsaveinprogress 
= 0; 
 886             server
.bgsavechildpid 
= -1; 
 887             updateSlavesWaitingBgsave(exitcode 
== 0 ? REDIS_OK 
: REDIS_ERR
); 
 890         /* If there is not a background saving in progress check if 
 891          * we have to save now */ 
 892          time_t now 
= time(NULL
); 
 893          for (j 
= 0; j 
< server
.saveparamslen
; j
++) { 
 894             struct saveparam 
*sp 
= server
.saveparams
+j
; 
 896             if (server
.dirty 
>= sp
->changes 
&& 
 897                 now
-server
.lastsave 
> sp
->seconds
) { 
 898                 redisLog(REDIS_NOTICE
,"%d changes in %d seconds. Saving...", 
 899                     sp
->changes
, sp
->seconds
); 
 900                 rdbSaveBackground(server
.dbfilename
); 
 906     /* Try to expire a few timed out keys */ 
 907     for (j 
= 0; j 
< server
.dbnum
; j
++) { 
 908         redisDb 
*db 
= server
.db
+j
; 
 909         int num 
= dictSize(db
->expires
); 
 912             time_t now 
= time(NULL
); 
 914             if (num 
> REDIS_EXPIRELOOKUPS_PER_CRON
) 
 915                 num 
= REDIS_EXPIRELOOKUPS_PER_CRON
; 
 920                 if ((de 
= dictGetRandomKey(db
->expires
)) == NULL
) break; 
 921                 t 
= (time_t) dictGetEntryVal(de
); 
 923                     deleteKey(db
,dictGetEntryKey(de
)); 
 929     /* Check if we should connect to a MASTER */ 
 930     if (server
.replstate 
== REDIS_REPL_CONNECT
) { 
 931         redisLog(REDIS_NOTICE
,"Connecting to MASTER..."); 
 932         if (syncWithMaster() == REDIS_OK
) { 
 933             redisLog(REDIS_NOTICE
,"MASTER <-> SLAVE sync succeeded"); 
 939 static void createSharedObjects(void) { 
 940     shared
.crlf 
= createObject(REDIS_STRING
,sdsnew("\r\n")); 
 941     shared
.ok 
= createObject(REDIS_STRING
,sdsnew("+OK\r\n")); 
 942     shared
.err 
= createObject(REDIS_STRING
,sdsnew("-ERR\r\n")); 
 943     shared
.emptybulk 
= createObject(REDIS_STRING
,sdsnew("$0\r\n\r\n")); 
 944     shared
.czero 
= createObject(REDIS_STRING
,sdsnew(":0\r\n")); 
 945     shared
.cone 
= createObject(REDIS_STRING
,sdsnew(":1\r\n")); 
 946     shared
.nullbulk 
= createObject(REDIS_STRING
,sdsnew("$-1\r\n")); 
 947     shared
.nullmultibulk 
= createObject(REDIS_STRING
,sdsnew("*-1\r\n")); 
 948     shared
.emptymultibulk 
= createObject(REDIS_STRING
,sdsnew("*0\r\n")); 
 950     shared
.pong 
= createObject(REDIS_STRING
,sdsnew("+PONG\r\n")); 
 951     shared
.wrongtypeerr 
= createObject(REDIS_STRING
,sdsnew( 
 952         "-ERR Operation against a key holding the wrong kind of value\r\n")); 
 953     shared
.nokeyerr 
= createObject(REDIS_STRING
,sdsnew( 
 954         "-ERR no such key\r\n")); 
 955     shared
.syntaxerr 
= createObject(REDIS_STRING
,sdsnew( 
 956         "-ERR syntax error\r\n")); 
 957     shared
.sameobjecterr 
= createObject(REDIS_STRING
,sdsnew( 
 958         "-ERR source and destination objects are the same\r\n")); 
 959     shared
.outofrangeerr 
= createObject(REDIS_STRING
,sdsnew( 
 960         "-ERR index out of range\r\n")); 
 961     shared
.space 
= createObject(REDIS_STRING
,sdsnew(" ")); 
 962     shared
.colon 
= createObject(REDIS_STRING
,sdsnew(":")); 
 963     shared
.plus 
= createObject(REDIS_STRING
,sdsnew("+")); 
 964     shared
.select0 
= createStringObject("select 0\r\n",10); 
 965     shared
.select1 
= createStringObject("select 1\r\n",10); 
 966     shared
.select2 
= createStringObject("select 2\r\n",10); 
 967     shared
.select3 
= createStringObject("select 3\r\n",10); 
 968     shared
.select4 
= createStringObject("select 4\r\n",10); 
 969     shared
.select5 
= createStringObject("select 5\r\n",10); 
 970     shared
.select6 
= createStringObject("select 6\r\n",10); 
 971     shared
.select7 
= createStringObject("select 7\r\n",10); 
 972     shared
.select8 
= createStringObject("select 8\r\n",10); 
 973     shared
.select9 
= createStringObject("select 9\r\n",10); 
 976 static void appendServerSaveParams(time_t seconds
, int changes
) { 
 977     server
.saveparams 
= zrealloc(server
.saveparams
,sizeof(struct saveparam
)*(server
.saveparamslen
+1)); 
 978     server
.saveparams
[server
.saveparamslen
].seconds 
= seconds
; 
 979     server
.saveparams
[server
.saveparamslen
].changes 
= changes
; 
 980     server
.saveparamslen
++; 
 983 static void ResetServerSaveParams() { 
 984     zfree(server
.saveparams
); 
 985     server
.saveparams 
= NULL
; 
 986     server
.saveparamslen 
= 0; 
 989 static void initServerConfig() { 
 990     server
.dbnum 
= REDIS_DEFAULT_DBNUM
; 
 991     server
.port 
= REDIS_SERVERPORT
; 
 992     server
.verbosity 
= REDIS_DEBUG
; 
 993     server
.maxidletime 
= REDIS_MAXIDLETIME
; 
 994     server
.saveparams 
= NULL
; 
 995     server
.logfile 
= NULL
; /* NULL = log on standard output */ 
 996     server
.bindaddr 
= NULL
; 
 997     server
.glueoutputbuf 
= 1; 
 998     server
.daemonize 
= 0; 
 999     server
.pidfile 
= "/var/run/redis.pid"; 
1000     server
.dbfilename 
= "dump.rdb"; 
1001     server
.requirepass 
= NULL
; 
1002     server
.shareobjects 
= 0; 
1003     server
.sharingpoolsize 
= 1024; 
1004     server
.maxclients 
= 0; 
1005     server
.maxmemory 
= 0; 
1006     ResetServerSaveParams(); 
1008     appendServerSaveParams(60*60,1);  /* save after 1 hour and 1 change */ 
1009     appendServerSaveParams(300,100);  /* save after 5 minutes and 100 changes */ 
1010     appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */ 
1011     /* Replication related */ 
1013     server
.masterhost 
= NULL
; 
1014     server
.masterport 
= 6379; 
1015     server
.master 
= NULL
; 
1016     server
.replstate 
= REDIS_REPL_NONE
; 
1019 static void initServer() { 
1022     signal(SIGHUP
, SIG_IGN
); 
1023     signal(SIGPIPE
, SIG_IGN
); 
1024     setupSigSegvAction(); 
1026     server
.clients 
= listCreate(); 
1027     server
.slaves 
= listCreate(); 
1028     server
.monitors 
= listCreate(); 
1029     server
.objfreelist 
= listCreate(); 
1030     createSharedObjects(); 
1031     server
.el 
= aeCreateEventLoop(); 
1032     server
.db 
= zmalloc(sizeof(redisDb
)*server
.dbnum
); 
1033     server
.sharingpool 
= dictCreate(&setDictType
,NULL
); 
1034     server
.fd 
= anetTcpServer(server
.neterr
, server
.port
, server
.bindaddr
); 
1035     if (server
.fd 
== -1) { 
1036         redisLog(REDIS_WARNING
, "Opening TCP port: %s", server
.neterr
); 
1039     for (j 
= 0; j 
< server
.dbnum
; j
++) { 
1040         server
.db
[j
].dict 
= dictCreate(&hashDictType
,NULL
); 
1041         server
.db
[j
].expires 
= dictCreate(&setDictType
,NULL
); 
1042         server
.db
[j
].id 
= j
; 
1044     server
.cronloops 
= 0; 
1045     server
.bgsaveinprogress 
= 0; 
1046     server
.bgsavechildpid 
= -1; 
1047     server
.lastsave 
= time(NULL
); 
1049     server
.usedmemory 
= 0; 
1050     server
.stat_numcommands 
= 0; 
1051     server
.stat_numconnections 
= 0; 
1052     server
.stat_starttime 
= time(NULL
); 
1053     aeCreateTimeEvent(server
.el
, 1000, serverCron
, NULL
, NULL
); 
1056 /* Empty the whole database */ 
1057 static long long emptyDb() { 
1059     long long removed 
= 0; 
1061     for (j 
= 0; j 
< server
.dbnum
; j
++) { 
1062         removed 
+= dictSize(server
.db
[j
].dict
); 
1063         dictEmpty(server
.db
[j
].dict
); 
1064         dictEmpty(server
.db
[j
].expires
); 
1069 static int yesnotoi(char *s
) { 
1070     if (!strcasecmp(s
,"yes")) return 1; 
1071     else if (!strcasecmp(s
,"no")) return 0; 
1075 /* I agree, this is a very rudimental way to load a configuration... 
1076    will improve later if the config gets more complex */ 
1077 static void loadServerConfig(char *filename
) { 
1079     char buf
[REDIS_CONFIGLINE_MAX
+1], *err 
= NULL
; 
1083     if (filename
[0] == '-' && filename
[1] == '\0') 
1086         if ((fp 
= fopen(filename
,"r")) == NULL
) { 
1087             redisLog(REDIS_WARNING
,"Fatal error, can't open config file"); 
1092     while(fgets(buf
,REDIS_CONFIGLINE_MAX
+1,fp
) != NULL
) { 
1098         line 
= sdstrim(line
," \t\r\n"); 
1100         /* Skip comments and blank lines*/ 
1101         if (line
[0] == '#' || line
[0] == '\0') { 
1106         /* Split into arguments */ 
1107         argv 
= sdssplitlen(line
,sdslen(line
)," ",1,&argc
); 
1108         sdstolower(argv
[0]); 
1110         /* Execute config directives */ 
1111         if (!strcasecmp(argv
[0],"timeout") && argc 
== 2) { 
1112             server
.maxidletime 
= atoi(argv
[1]); 
1113             if (server
.maxidletime 
< 0) { 
1114                 err 
= "Invalid timeout value"; goto loaderr
; 
1116         } else if (!strcasecmp(argv
[0],"port") && argc 
== 2) { 
1117             server
.port 
= atoi(argv
[1]); 
1118             if (server
.port 
< 1 || server
.port 
> 65535) { 
1119                 err 
= "Invalid port"; goto loaderr
; 
1121         } else if (!strcasecmp(argv
[0],"bind") && argc 
== 2) { 
1122             server
.bindaddr 
= zstrdup(argv
[1]); 
1123         } else if (!strcasecmp(argv
[0],"save") && argc 
== 3) { 
1124             int seconds 
= atoi(argv
[1]); 
1125             int changes 
= atoi(argv
[2]); 
1126             if (seconds 
< 1 || changes 
< 0) { 
1127                 err 
= "Invalid save parameters"; goto loaderr
; 
1129             appendServerSaveParams(seconds
,changes
); 
1130         } else if (!strcasecmp(argv
[0],"dir") && argc 
== 2) { 
1131             if (chdir(argv
[1]) == -1) { 
1132                 redisLog(REDIS_WARNING
,"Can't chdir to '%s': %s", 
1133                     argv
[1], strerror(errno
)); 
1136         } else if (!strcasecmp(argv
[0],"loglevel") && argc 
== 2) { 
1137             if (!strcasecmp(argv
[1],"debug")) server
.verbosity 
= REDIS_DEBUG
; 
1138             else if (!strcasecmp(argv
[1],"notice")) server
.verbosity 
= REDIS_NOTICE
; 
1139             else if (!strcasecmp(argv
[1],"warning")) server
.verbosity 
= REDIS_WARNING
; 
1141                 err 
= "Invalid log level. Must be one of debug, notice, warning"; 
1144         } else if (!strcasecmp(argv
[0],"logfile") && argc 
== 2) { 
1147             server
.logfile 
= zstrdup(argv
[1]); 
1148             if (!strcasecmp(server
.logfile
,"stdout")) { 
1149                 zfree(server
.logfile
); 
1150                 server
.logfile 
= NULL
; 
1152             if (server
.logfile
) { 
1153                 /* Test if we are able to open the file. The server will not 
1154                  * be able to abort just for this problem later... */ 
1155                 logfp 
= fopen(server
.logfile
,"a"); 
1156                 if (logfp 
== NULL
) { 
1157                     err 
= sdscatprintf(sdsempty(), 
1158                         "Can't open the log file: %s", strerror(errno
)); 
1163         } else if (!strcasecmp(argv
[0],"databases") && argc 
== 2) { 
1164             server
.dbnum 
= atoi(argv
[1]); 
1165             if (server
.dbnum 
< 1) { 
1166                 err 
= "Invalid number of databases"; goto loaderr
; 
1168         } else if (!strcasecmp(argv
[0],"maxclients") && argc 
== 2) { 
1169             server
.maxclients 
= atoi(argv
[1]); 
1170         } else if (!strcasecmp(argv
[0],"maxmemory") && argc 
== 2) { 
1171             server
.maxmemory 
= strtoll(argv
[1], NULL
, 10); 
1172         } else if (!strcasecmp(argv
[0],"slaveof") && argc 
== 3) { 
1173             server
.masterhost 
= sdsnew(argv
[1]); 
1174             server
.masterport 
= atoi(argv
[2]); 
1175             server
.replstate 
= REDIS_REPL_CONNECT
; 
1176         } else if (!strcasecmp(argv
[0],"glueoutputbuf") && argc 
== 2) { 
1177             if ((server
.glueoutputbuf 
= yesnotoi(argv
[1])) == -1) { 
1178                 err 
= "argument must be 'yes' or 'no'"; goto loaderr
; 
1180         } else if (!strcasecmp(argv
[0],"shareobjects") && argc 
== 2) { 
1181             if ((server
.shareobjects 
= yesnotoi(argv
[1])) == -1) { 
1182                 err 
= "argument must be 'yes' or 'no'"; goto loaderr
; 
1184         } else if (!strcasecmp(argv
[0],"shareobjectspoolsize") && argc 
== 2) { 
1185             server
.sharingpoolsize 
= atoi(argv
[1]); 
1186             if (server
.sharingpoolsize 
< 1) { 
1187                 err 
= "invalid object sharing pool size"; goto loaderr
; 
1189         } else if (!strcasecmp(argv
[0],"daemonize") && argc 
== 2) { 
1190             if ((server
.daemonize 
= yesnotoi(argv
[1])) == -1) { 
1191                 err 
= "argument must be 'yes' or 'no'"; goto loaderr
; 
1193         } else if (!strcasecmp(argv
[0],"requirepass") && argc 
== 2) { 
1194           server
.requirepass 
= zstrdup(argv
[1]); 
1195         } else if (!strcasecmp(argv
[0],"pidfile") && argc 
== 2) { 
1196           server
.pidfile 
= zstrdup(argv
[1]); 
1197         } else if (!strcasecmp(argv
[0],"dbfilename") && argc 
== 2) { 
1198           server
.dbfilename 
= zstrdup(argv
[1]); 
1200             err 
= "Bad directive or wrong number of arguments"; goto loaderr
; 
1202         for (j 
= 0; j 
< argc
; j
++) 
1207     if (fp 
!= stdin
) fclose(fp
); 
1211     fprintf(stderr
, "\n*** FATAL CONFIG FILE ERROR ***\n"); 
1212     fprintf(stderr
, "Reading the configuration file, at line %d\n", linenum
); 
1213     fprintf(stderr
, ">>> '%s'\n", line
); 
1214     fprintf(stderr
, "%s\n", err
); 
1218 static void freeClientArgv(redisClient 
*c
) { 
1221     for (j 
= 0; j 
< c
->argc
; j
++) 
1222         decrRefCount(c
->argv
[j
]); 
1223     for (j 
= 0; j 
< c
->mbargc
; j
++) 
1224         decrRefCount(c
->mbargv
[j
]); 
1229 static void freeClient(redisClient 
*c
) { 
1232     aeDeleteFileEvent(server
.el
,c
->fd
,AE_READABLE
); 
1233     aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
); 
1234     sdsfree(c
->querybuf
); 
1235     listRelease(c
->reply
); 
1238     ln 
= listSearchKey(server
.clients
,c
); 
1240     listDelNode(server
.clients
,ln
); 
1241     if (c
->flags 
& REDIS_SLAVE
) { 
1242         if (c
->replstate 
== REDIS_REPL_SEND_BULK 
&& c
->repldbfd 
!= -1) 
1244         list 
*l 
= (c
->flags 
& REDIS_MONITOR
) ? server
.monitors 
: server
.slaves
; 
1245         ln 
= listSearchKey(l
,c
); 
1249     if (c
->flags 
& REDIS_MASTER
) { 
1250         server
.master 
= NULL
; 
1251         server
.replstate 
= REDIS_REPL_CONNECT
; 
1258 static void glueReplyBuffersIfNeeded(redisClient 
*c
) { 
1263     listRewind(c
->reply
); 
1264     while((ln 
= listYield(c
->reply
))) { 
1266         totlen 
+= sdslen(o
->ptr
); 
1267         /* This optimization makes more sense if we don't have to copy 
1269         if (totlen 
> 1024) return; 
1275         listRewind(c
->reply
); 
1276         while((ln 
= listYield(c
->reply
))) { 
1278             memcpy(buf
+copylen
,o
->ptr
,sdslen(o
->ptr
)); 
1279             copylen 
+= sdslen(o
->ptr
); 
1280             listDelNode(c
->reply
,ln
); 
1282         /* Now the output buffer is empty, add the new single element */ 
1283         o 
= createObject(REDIS_STRING
,sdsnewlen(buf
,totlen
)); 
1284         listAddNodeTail(c
->reply
,o
); 
1288 static void sendReplyToClient(aeEventLoop 
*el
, int fd
, void *privdata
, int mask
) { 
1289     redisClient 
*c 
= privdata
; 
1290     int nwritten 
= 0, totwritten 
= 0, objlen
; 
1293     REDIS_NOTUSED(mask
); 
1295     if (server
.glueoutputbuf 
&& listLength(c
->reply
) > 1) 
1296         glueReplyBuffersIfNeeded(c
); 
1297     while(listLength(c
->reply
)) { 
1298         o 
= listNodeValue(listFirst(c
->reply
)); 
1299         objlen 
= sdslen(o
->ptr
); 
1302             listDelNode(c
->reply
,listFirst(c
->reply
)); 
1306         if (c
->flags 
& REDIS_MASTER
) { 
1307             /* Don't reply to a master */ 
1308             nwritten 
= objlen 
- c
->sentlen
; 
1310             nwritten 
= write(fd
, ((char*)o
->ptr
)+c
->sentlen
, objlen 
- c
->sentlen
); 
1311             if (nwritten 
<= 0) break; 
1313         c
->sentlen 
+= nwritten
; 
1314         totwritten 
+= nwritten
; 
1315         /* If we fully sent the object on head go to the next one */ 
1316         if (c
->sentlen 
== objlen
) { 
1317             listDelNode(c
->reply
,listFirst(c
->reply
)); 
1320         /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT 
1321          * bytes, in a single threaded server it's a good idea to server 
1322          * other clients as well, even if a very large request comes from 
1323          * super fast link that is always able to accept data (in real world 
1324          * terms think to 'KEYS *' against the loopback interfae) */ 
1325         if (totwritten 
> REDIS_MAX_WRITE_PER_EVENT
) break; 
1327     if (nwritten 
== -1) { 
1328         if (errno 
== EAGAIN
) { 
1331             redisLog(REDIS_DEBUG
, 
1332                 "Error writing to client: %s", strerror(errno
)); 
1337     if (totwritten 
> 0) c
->lastinteraction 
= time(NULL
); 
1338     if (listLength(c
->reply
) == 0) { 
1340         aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
); 
1344 static struct redisCommand 
*lookupCommand(char *name
) { 
1346     while(cmdTable
[j
].name 
!= NULL
) { 
1347         if (!strcasecmp(name
,cmdTable
[j
].name
)) return &cmdTable
[j
]; 
1353 /* resetClient prepare the client to process the next command */ 
1354 static void resetClient(redisClient 
*c
) { 
1360 /* If this function gets called we already read a whole 
1361  * command, argments are in the client argv/argc fields. 
1362  * processCommand() execute the command or prepare the 
1363  * server for a bulk read from the client. 
1365  * If 1 is returned the client is still alive and valid and 
1366  * and other operations can be performed by the caller. Otherwise 
1367  * if 0 is returned the client was destroied (i.e. after QUIT). */ 
1368 static int processCommand(redisClient 
*c
) { 
1369     struct redisCommand 
*cmd
; 
1372     /* Free some memory if needed (maxmemory setting) */ 
1373     if (server
.maxmemory
) freeMemoryIfNeeded(); 
1375     /* Handle the multi bulk command type. This is an alternative protocol 
1376      * supported by Redis in order to receive commands that are composed of 
1377      * multiple binary-safe "bulk" arguments. The latency of processing is 
1378      * a bit higher but this allows things like multi-sets, so if this 
1379      * protocol is used only for MSET and similar commands this is a big win. */ 
1380     if (c
->multibulk 
== 0 && c
->argc 
== 1 && ((char*)(c
->argv
[0]->ptr
))[0] == '*') { 
1381         c
->multibulk 
= atoi(((char*)c
->argv
[0]->ptr
)+1); 
1382         if (c
->multibulk 
<= 0) { 
1386             decrRefCount(c
->argv
[c
->argc
-1]); 
1390     } else if (c
->multibulk
) { 
1391         if (c
->bulklen 
== -1) { 
1392             if (((char*)c
->argv
[0]->ptr
)[0] != '$') { 
1393                 addReplySds(c
,sdsnew("-ERR multi bulk protocol error\r\n")); 
1397                 int bulklen 
= atoi(((char*)c
->argv
[0]->ptr
)+1); 
1398                 decrRefCount(c
->argv
[0]); 
1399                 if (bulklen 
< 0 || bulklen 
> 1024*1024*1024) { 
1401                     addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n")); 
1406                 c
->bulklen 
= bulklen
+2; /* add two bytes for CR+LF */ 
1410             c
->mbargv 
= zrealloc(c
->mbargv
,(sizeof(robj
*))*(c
->mbargc
+1)); 
1411             c
->mbargv
[c
->mbargc
] = c
->argv
[0]; 
1415             if (c
->multibulk 
== 0) { 
1419                 /* Here we need to swap the multi-bulk argc/argv with the 
1420                  * normal argc/argv of the client structure. */ 
1422                 c
->argv 
= c
->mbargv
; 
1423                 c
->mbargv 
= auxargv
; 
1426                 c
->argc 
= c
->mbargc
; 
1427                 c
->mbargc 
= auxargc
; 
1429                 /* We need to set bulklen to something different than -1 
1430                  * in order for the code below to process the command without 
1431                  * to try to read the last argument of a bulk command as 
1432                  * a special argument. */ 
1434                 /* continue below and process the command */ 
1441     /* -- end of multi bulk commands processing -- */ 
1443     /* The QUIT command is handled as a special case. Normal command 
1444      * procs are unable to close the client connection safely */ 
1445     if (!strcasecmp(c
->argv
[0]->ptr
,"quit")) { 
1449     cmd 
= lookupCommand(c
->argv
[0]->ptr
); 
1451         addReplySds(c
,sdsnew("-ERR unknown command\r\n")); 
1454     } else if ((cmd
->arity 
> 0 && cmd
->arity 
!= c
->argc
) || 
1455                (c
->argc 
< -cmd
->arity
)) { 
1456         addReplySds(c
,sdsnew("-ERR wrong number of arguments\r\n")); 
1459     } else if (server
.maxmemory 
&& cmd
->flags 
& REDIS_CMD_DENYOOM 
&& zmalloc_used_memory() > server
.maxmemory
) { 
1460         addReplySds(c
,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n")); 
1463     } else if (cmd
->flags 
& REDIS_CMD_BULK 
&& c
->bulklen 
== -1) { 
1464         int bulklen 
= atoi(c
->argv
[c
->argc
-1]->ptr
); 
1466         decrRefCount(c
->argv
[c
->argc
-1]); 
1467         if (bulklen 
< 0 || bulklen 
> 1024*1024*1024) { 
1469             addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n")); 
1474         c
->bulklen 
= bulklen
+2; /* add two bytes for CR+LF */ 
1475         /* It is possible that the bulk read is already in the 
1476          * buffer. Check this condition and handle it accordingly. 
1477          * This is just a fast path, alternative to call processInputBuffer(). 
1478          * It's a good idea since the code is small and this condition 
1479          * happens most of the times. */ 
1480         if ((signed)sdslen(c
->querybuf
) >= c
->bulklen
) { 
1481             c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2); 
1483             c
->querybuf 
= sdsrange(c
->querybuf
,c
->bulklen
,-1); 
1488     /* Let's try to share objects on the command arguments vector */ 
1489     if (server
.shareobjects
) { 
1491         for(j 
= 1; j 
< c
->argc
; j
++) 
1492             c
->argv
[j
] = tryObjectSharing(c
->argv
[j
]); 
1494     /* Let's try to encode the bulk object to save space. */ 
1495     if (cmd
->flags 
& REDIS_CMD_BULK
) 
1496         tryObjectEncoding(c
->argv
[c
->argc
-1]); 
1498     /* Check if the user is authenticated */ 
1499     if (server
.requirepass 
&& !c
->authenticated 
&& cmd
->proc 
!= authCommand
) { 
1500         addReplySds(c
,sdsnew("-ERR operation not permitted\r\n")); 
1505     /* Exec the command */ 
1506     dirty 
= server
.dirty
; 
1508     if (server
.dirty
-dirty 
!= 0 && listLength(server
.slaves
)) 
1509         replicationFeedSlaves(server
.slaves
,cmd
,c
->db
->id
,c
->argv
,c
->argc
); 
1510     if (listLength(server
.monitors
)) 
1511         replicationFeedSlaves(server
.monitors
,cmd
,c
->db
->id
,c
->argv
,c
->argc
); 
1512     server
.stat_numcommands
++; 
1514     /* Prepare the client for the next command */ 
1515     if (c
->flags 
& REDIS_CLOSE
) { 
1523 static void replicationFeedSlaves(list 
*slaves
, struct redisCommand 
*cmd
, int dictid
, robj 
**argv
, int argc
) { 
1527     /* (args*2)+1 is enough room for args, spaces, newlines */ 
1528     robj 
*static_outv
[REDIS_STATIC_ARGS
*2+1]; 
1530     if (argc 
<= REDIS_STATIC_ARGS
) { 
1533         outv 
= zmalloc(sizeof(robj
*)*(argc
*2+1)); 
1536     for (j 
= 0; j 
< argc
; j
++) { 
1537         if (j 
!= 0) outv
[outc
++] = shared
.space
; 
1538         if ((cmd
->flags 
& REDIS_CMD_BULK
) && j 
== argc
-1) { 
1541             lenobj 
= createObject(REDIS_STRING
, 
1542                 sdscatprintf(sdsempty(),"%d\r\n", 
1543                     stringObjectLen(argv
[j
]))); 
1544             lenobj
->refcount 
= 0; 
1545             outv
[outc
++] = lenobj
; 
1547         outv
[outc
++] = argv
[j
]; 
1549     outv
[outc
++] = shared
.crlf
; 
1551     /* Increment all the refcounts at start and decrement at end in order to 
1552      * be sure to free objects if there is no slave in a replication state 
1553      * able to be feed with commands */ 
1554     for (j 
= 0; j 
< outc
; j
++) incrRefCount(outv
[j
]); 
1556     while((ln 
= listYield(slaves
))) { 
1557         redisClient 
*slave 
= ln
->value
; 
1559         /* Don't feed slaves that are still waiting for BGSAVE to start */ 
1560         if (slave
->replstate 
== REDIS_REPL_WAIT_BGSAVE_START
) continue; 
1562         /* Feed all the other slaves, MONITORs and so on */ 
1563         if (slave
->slaveseldb 
!= dictid
) { 
1567             case 0: selectcmd 
= shared
.select0
; break; 
1568             case 1: selectcmd 
= shared
.select1
; break; 
1569             case 2: selectcmd 
= shared
.select2
; break; 
1570             case 3: selectcmd 
= shared
.select3
; break; 
1571             case 4: selectcmd 
= shared
.select4
; break; 
1572             case 5: selectcmd 
= shared
.select5
; break; 
1573             case 6: selectcmd 
= shared
.select6
; break; 
1574             case 7: selectcmd 
= shared
.select7
; break; 
1575             case 8: selectcmd 
= shared
.select8
; break; 
1576             case 9: selectcmd 
= shared
.select9
; break; 
1578                 selectcmd 
= createObject(REDIS_STRING
, 
1579                     sdscatprintf(sdsempty(),"select %d\r\n",dictid
)); 
1580                 selectcmd
->refcount 
= 0; 
1583             addReply(slave
,selectcmd
); 
1584             slave
->slaveseldb 
= dictid
; 
1586         for (j 
= 0; j 
< outc
; j
++) addReply(slave
,outv
[j
]); 
1588     for (j 
= 0; j 
< outc
; j
++) decrRefCount(outv
[j
]); 
1589     if (outv 
!= static_outv
) zfree(outv
); 
1592 static void processInputBuffer(redisClient 
*c
) { 
1594     if (c
->bulklen 
== -1) { 
1595         /* Read the first line of the query */ 
1596         char *p 
= strchr(c
->querybuf
,'\n'); 
1603             query 
= c
->querybuf
; 
1604             c
->querybuf 
= sdsempty(); 
1605             querylen 
= 1+(p
-(query
)); 
1606             if (sdslen(query
) > querylen
) { 
1607                 /* leave data after the first line of the query in the buffer */ 
1608                 c
->querybuf 
= sdscatlen(c
->querybuf
,query
+querylen
,sdslen(query
)-querylen
); 
1610             *p 
= '\0'; /* remove "\n" */ 
1611             if (*(p
-1) == '\r') *(p
-1) = '\0'; /* and "\r" if any */ 
1612             sdsupdatelen(query
); 
1614             /* Now we can split the query in arguments */ 
1615             if (sdslen(query
) == 0) { 
1616                 /* Ignore empty query */ 
1620             argv 
= sdssplitlen(query
,sdslen(query
)," ",1,&argc
); 
1623             if (c
->argv
) zfree(c
->argv
); 
1624             c
->argv 
= zmalloc(sizeof(robj
*)*argc
); 
1626             for (j 
= 0; j 
< argc
; j
++) { 
1627                 if (sdslen(argv
[j
])) { 
1628                     c
->argv
[c
->argc
] = createObject(REDIS_STRING
,argv
[j
]); 
1635             /* Execute the command. If the client is still valid 
1636              * after processCommand() return and there is something 
1637              * on the query buffer try to process the next command. */ 
1638             if (c
->argc 
&& processCommand(c
) && sdslen(c
->querybuf
)) goto again
; 
1640         } else if (sdslen(c
->querybuf
) >= REDIS_REQUEST_MAX_SIZE
) { 
1641             redisLog(REDIS_DEBUG
, "Client protocol error"); 
1646         /* Bulk read handling. Note that if we are at this point 
1647            the client already sent a command terminated with a newline, 
1648            we are reading the bulk data that is actually the last 
1649            argument of the command. */ 
1650         int qbl 
= sdslen(c
->querybuf
); 
1652         if (c
->bulklen 
<= qbl
) { 
1653             /* Copy everything but the final CRLF as final argument */ 
1654             c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2); 
1656             c
->querybuf 
= sdsrange(c
->querybuf
,c
->bulklen
,-1); 
1657             /* Process the command. If the client is still valid after 
1658              * the processing and there is more data in the buffer 
1659              * try to parse it. */ 
1660             if (processCommand(c
) && sdslen(c
->querybuf
)) goto again
; 
1666 static void readQueryFromClient(aeEventLoop 
*el
, int fd
, void *privdata
, int mask
) { 
1667     redisClient 
*c 
= (redisClient
*) privdata
; 
1668     char buf
[REDIS_IOBUF_LEN
]; 
1671     REDIS_NOTUSED(mask
); 
1673     nread 
= read(fd
, buf
, REDIS_IOBUF_LEN
); 
1675         if (errno 
== EAGAIN
) { 
1678             redisLog(REDIS_DEBUG
, "Reading from client: %s",strerror(errno
)); 
1682     } else if (nread 
== 0) { 
1683         redisLog(REDIS_DEBUG
, "Client closed connection"); 
1688         c
->querybuf 
= sdscatlen(c
->querybuf
, buf
, nread
); 
1689         c
->lastinteraction 
= time(NULL
); 
1693     processInputBuffer(c
); 
1696 static int selectDb(redisClient 
*c
, int id
) { 
1697     if (id 
< 0 || id 
>= server
.dbnum
) 
1699     c
->db 
= &server
.db
[id
]; 
1703 static void *dupClientReplyValue(void *o
) { 
1704     incrRefCount((robj
*)o
); 
1708 static redisClient 
*createClient(int fd
) { 
1709     redisClient 
*c 
= zmalloc(sizeof(*c
)); 
1711     anetNonBlock(NULL
,fd
); 
1712     anetTcpNoDelay(NULL
,fd
); 
1713     if (!c
) return NULL
; 
1716     c
->querybuf 
= sdsempty(); 
1725     c
->lastinteraction 
= time(NULL
); 
1726     c
->authenticated 
= 0; 
1727     c
->replstate 
= REDIS_REPL_NONE
; 
1728     c
->reply 
= listCreate(); 
1729     listSetFreeMethod(c
->reply
,decrRefCount
); 
1730     listSetDupMethod(c
->reply
,dupClientReplyValue
); 
1731     if (aeCreateFileEvent(server
.el
, c
->fd
, AE_READABLE
, 
1732         readQueryFromClient
, c
, NULL
) == AE_ERR
) { 
1736     listAddNodeTail(server
.clients
,c
); 
1740 static void addReply(redisClient 
*c
, robj 
*obj
) { 
1741     if (listLength(c
->reply
) == 0 && 
1742         (c
->replstate 
== REDIS_REPL_NONE 
|| 
1743          c
->replstate 
== REDIS_REPL_ONLINE
) && 
1744         aeCreateFileEvent(server
.el
, c
->fd
, AE_WRITABLE
, 
1745         sendReplyToClient
, c
, NULL
) == AE_ERR
) return; 
1746     if (obj
->encoding 
!= REDIS_ENCODING_RAW
) { 
1747         obj 
= getDecodedObject(obj
); 
1751     listAddNodeTail(c
->reply
,obj
); 
1754 static void addReplySds(redisClient 
*c
, sds s
) { 
1755     robj 
*o 
= createObject(REDIS_STRING
,s
); 
1760 static void addReplyBulkLen(redisClient 
*c
, robj 
*obj
) { 
1763     if (obj
->encoding 
== REDIS_ENCODING_RAW
) { 
1764         len 
= sdslen(obj
->ptr
); 
1766         long n 
= (long)obj
->ptr
; 
1773         while((n 
= n
/10) != 0) { 
1777     addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",len
)); 
1780 static void acceptHandler(aeEventLoop 
*el
, int fd
, void *privdata
, int mask
) { 
1785     REDIS_NOTUSED(mask
); 
1786     REDIS_NOTUSED(privdata
); 
1788     cfd 
= anetAccept(server
.neterr
, fd
, cip
, &cport
); 
1789     if (cfd 
== AE_ERR
) { 
1790         redisLog(REDIS_DEBUG
,"Accepting client connection: %s", server
.neterr
); 
1793     redisLog(REDIS_DEBUG
,"Accepted %s:%d", cip
, cport
); 
1794     if ((c 
= createClient(cfd
)) == NULL
) { 
1795         redisLog(REDIS_WARNING
,"Error allocating resoures for the client"); 
1796         close(cfd
); /* May be already closed, just ingore errors */ 
1799     /* If maxclient directive is set and this is one client more... close the 
1800      * connection. Note that we create the client instead to check before 
1801      * for this condition, since now the socket is already set in nonblocking 
1802      * mode and we can send an error for free using the Kernel I/O */ 
1803     if (server
.maxclients 
&& listLength(server
.clients
) > server
.maxclients
) { 
1804         char *err 
= "-ERR max number of clients reached\r\n"; 
1806         /* That's a best effort error message, don't check write errors */ 
1807         (void) write(c
->fd
,err
,strlen(err
)); 
1811     server
.stat_numconnections
++; 
1814 /* ======================= Redis objects implementation ===================== */ 
1816 static robj 
*createObject(int type
, void *ptr
) { 
1819     if (listLength(server
.objfreelist
)) { 
1820         listNode 
*head 
= listFirst(server
.objfreelist
); 
1821         o 
= listNodeValue(head
); 
1822         listDelNode(server
.objfreelist
,head
); 
1824         o 
= zmalloc(sizeof(*o
)); 
1827     o
->encoding 
= REDIS_ENCODING_RAW
; 
1833 static robj 
*createStringObject(char *ptr
, size_t len
) { 
1834     return createObject(REDIS_STRING
,sdsnewlen(ptr
,len
)); 
1837 static robj 
*createListObject(void) { 
1838     list 
*l 
= listCreate(); 
1840     listSetFreeMethod(l
,decrRefCount
); 
1841     return createObject(REDIS_LIST
,l
); 
1844 static robj 
*createSetObject(void) { 
1845     dict 
*d 
= dictCreate(&setDictType
,NULL
); 
1846     return createObject(REDIS_SET
,d
); 
1849 static robj 
*createZsetObject(void) { 
1850     zset 
*zs 
= zmalloc(sizeof(*zs
)); 
1852     zs
->dict 
= dictCreate(&zsetDictType
,NULL
); 
1853     zs
->zsl 
= zslCreate(); 
1854     return createObject(REDIS_ZSET
,zs
); 
1857 static void freeStringObject(robj 
*o
) { 
1858     if (o
->encoding 
== REDIS_ENCODING_RAW
) { 
1863 static void freeListObject(robj 
*o
) { 
1864     listRelease((list
*) o
->ptr
); 
1867 static void freeSetObject(robj 
*o
) { 
1868     dictRelease((dict
*) o
->ptr
); 
1871 static void freeZsetObject(robj 
*o
) { 
1874     dictRelease(zs
->dict
); 
1879 static void freeHashObject(robj 
*o
) { 
1880     dictRelease((dict
*) o
->ptr
); 
1883 static void incrRefCount(robj 
*o
) { 
1885 #ifdef DEBUG_REFCOUNT 
1886     if (o
->type 
== REDIS_STRING
) 
1887         printf("Increment '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
); 
1891 static void decrRefCount(void *obj
) { 
1894 #ifdef DEBUG_REFCOUNT 
1895     if (o
->type 
== REDIS_STRING
) 
1896         printf("Decrement '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
-1); 
1898     if (--(o
->refcount
) == 0) { 
1900         case REDIS_STRING
: freeStringObject(o
); break; 
1901         case REDIS_LIST
: freeListObject(o
); break; 
1902         case REDIS_SET
: freeSetObject(o
); break; 
1903         case REDIS_ZSET
: freeZsetObject(o
); break; 
1904         case REDIS_HASH
: freeHashObject(o
); break; 
1905         default: assert(0 != 0); break; 
1907         if (listLength(server
.objfreelist
) > REDIS_OBJFREELIST_MAX 
|| 
1908             !listAddNodeHead(server
.objfreelist
,o
)) 
1913 static robj 
*lookupKey(redisDb 
*db
, robj 
*key
) { 
1914     dictEntry 
*de 
= dictFind(db
->dict
,key
); 
1915     return de 
? dictGetEntryVal(de
) : NULL
; 
1918 static robj 
*lookupKeyRead(redisDb 
*db
, robj 
*key
) { 
1919     expireIfNeeded(db
,key
); 
1920     return lookupKey(db
,key
); 
1923 static robj 
*lookupKeyWrite(redisDb 
*db
, robj 
*key
) { 
1924     deleteIfVolatile(db
,key
); 
1925     return lookupKey(db
,key
); 
1928 static int deleteKey(redisDb 
*db
, robj 
*key
) { 
1931     /* We need to protect key from destruction: after the first dictDelete() 
1932      * it may happen that 'key' is no longer valid if we don't increment 
1933      * it's count. This may happen when we get the object reference directly 
1934      * from the hash table with dictRandomKey() or dict iterators */ 
1936     if (dictSize(db
->expires
)) dictDelete(db
->expires
,key
); 
1937     retval 
= dictDelete(db
->dict
,key
); 
1940     return retval 
== DICT_OK
; 
1943 /* Try to share an object against the shared objects pool */ 
1944 static robj 
*tryObjectSharing(robj 
*o
) { 
1945     struct dictEntry 
*de
; 
1948     if (o 
== NULL 
|| server
.shareobjects 
== 0) return o
; 
1950     assert(o
->type 
== REDIS_STRING
); 
1951     de 
= dictFind(server
.sharingpool
,o
); 
1953         robj 
*shared 
= dictGetEntryKey(de
); 
1955         c 
= ((unsigned long) dictGetEntryVal(de
))+1; 
1956         dictGetEntryVal(de
) = (void*) c
; 
1957         incrRefCount(shared
); 
1961         /* Here we are using a stream algorihtm: Every time an object is 
1962          * shared we increment its count, everytime there is a miss we 
1963          * recrement the counter of a random object. If this object reaches 
1964          * zero we remove the object and put the current object instead. */ 
1965         if (dictSize(server
.sharingpool
) >= 
1966                 server
.sharingpoolsize
) { 
1967             de 
= dictGetRandomKey(server
.sharingpool
); 
1969             c 
= ((unsigned long) dictGetEntryVal(de
))-1; 
1970             dictGetEntryVal(de
) = (void*) c
; 
1972                 dictDelete(server
.sharingpool
,de
->key
); 
1975             c 
= 0; /* If the pool is empty we want to add this object */ 
1980             retval 
= dictAdd(server
.sharingpool
,o
,(void*)1); 
1981             assert(retval 
== DICT_OK
); 
1988 /* Check if the nul-terminated string 's' can be represented by a long 
1989  * (that is, is a number that fits into long without any other space or 
1990  * character before or after the digits). 
1992  * If so, the function returns REDIS_OK and *longval is set to the value 
1993  * of the number. Otherwise REDIS_ERR is returned */ 
1994 static int isStringRepresentableAsLong(sds s
, long *longval
) { 
1995     char buf
[32], *endptr
; 
1999     value 
= strtol(s
, &endptr
, 10); 
2000     if (endptr
[0] != '\0') return REDIS_ERR
; 
2001     slen 
= snprintf(buf
,32,"%ld",value
); 
2003     /* If the number converted back into a string is not identical 
2004      * then it's not possible to encode the string as integer */ 
2005     if (sdslen(s
) != (unsigned)slen 
|| memcmp(buf
,s
,slen
)) return REDIS_ERR
; 
2006     if (longval
) *longval 
= value
; 
2010 /* Try to encode a string object in order to save space */ 
2011 static int tryObjectEncoding(robj 
*o
) { 
2015     if (o
->encoding 
!= REDIS_ENCODING_RAW
) 
2016         return REDIS_ERR
; /* Already encoded */ 
2018     /* It's not save to encode shared objects: shared objects can be shared 
2019      * everywhere in the "object space" of Redis. Encoded objects can only 
2020      * appear as "values" (and not, for instance, as keys) */ 
2021      if (o
->refcount 
> 1) return REDIS_ERR
; 
2023     /* Currently we try to encode only strings */ 
2024     assert(o
->type 
== REDIS_STRING
); 
2026     /* Check if we can represent this string as a long integer */ 
2027     if (isStringRepresentableAsLong(s
,&value
) == REDIS_ERR
) return REDIS_ERR
; 
2029     /* Ok, this object can be encoded */ 
2030     o
->encoding 
= REDIS_ENCODING_INT
; 
2032     o
->ptr 
= (void*) value
; 
2036 /* Get a decoded version of an encoded object (returned as a new object) */ 
2037 static robj 
*getDecodedObject(const robj 
*o
) { 
2040     assert(o
->encoding 
!= REDIS_ENCODING_RAW
); 
2041     if (o
->type 
== REDIS_STRING 
&& o
->encoding 
== REDIS_ENCODING_INT
) { 
2044         snprintf(buf
,32,"%ld",(long)o
->ptr
); 
2045         dec 
= createStringObject(buf
,strlen(buf
)); 
2052 static int compareStringObjects(robj 
*a
, robj 
*b
) { 
2053     assert(a
->type 
== REDIS_STRING 
&& b
->type 
== REDIS_STRING
); 
2055     if (a
->encoding 
== REDIS_ENCODING_INT 
&& b
->encoding 
== REDIS_ENCODING_INT
){ 
2056         return (long)a
->ptr 
- (long)b
->ptr
; 
2062         if (a
->encoding 
!= REDIS_ENCODING_RAW
) a 
= getDecodedObject(a
); 
2063         if (b
->encoding 
!= REDIS_ENCODING_RAW
) b 
= getDecodedObject(a
); 
2064         retval 
= sdscmp(a
->ptr
,b
->ptr
); 
2071 static size_t stringObjectLen(robj 
*o
) { 
2072     assert(o
->type 
== REDIS_STRING
); 
2073     if (o
->encoding 
== REDIS_ENCODING_RAW
) { 
2074         return sdslen(o
->ptr
); 
2078         return snprintf(buf
,32,"%ld",(long)o
->ptr
); 
2082 /*============================ DB saving/loading ============================ */ 
2084 static int rdbSaveType(FILE *fp
, unsigned char type
) { 
2085     if (fwrite(&type
,1,1,fp
) == 0) return -1; 
2089 static int rdbSaveTime(FILE *fp
, time_t t
) { 
2090     int32_t t32 
= (int32_t) t
; 
2091     if (fwrite(&t32
,4,1,fp
) == 0) return -1; 
2095 /* check rdbLoadLen() comments for more info */ 
2096 static int rdbSaveLen(FILE *fp
, uint32_t len
) { 
2097     unsigned char buf
[2]; 
2100         /* Save a 6 bit len */ 
2101         buf
[0] = (len
&0xFF)|(REDIS_RDB_6BITLEN
<<6); 
2102         if (fwrite(buf
,1,1,fp
) == 0) return -1; 
2103     } else if (len 
< (1<<14)) { 
2104         /* Save a 14 bit len */ 
2105         buf
[0] = ((len
>>8)&0xFF)|(REDIS_RDB_14BITLEN
<<6); 
2107         if (fwrite(buf
,2,1,fp
) == 0) return -1; 
2109         /* Save a 32 bit len */ 
2110         buf
[0] = (REDIS_RDB_32BITLEN
<<6); 
2111         if (fwrite(buf
,1,1,fp
) == 0) return -1; 
2113         if (fwrite(&len
,4,1,fp
) == 0) return -1; 
2118 /* String objects in the form "2391" "-100" without any space and with a 
2119  * range of values that can fit in an 8, 16 or 32 bit signed value can be 
2120  * encoded as integers to save space */ 
2121 static int rdbTryIntegerEncoding(sds s
, unsigned char *enc
) { 
2123     char *endptr
, buf
[32]; 
2125     /* Check if it's possible to encode this value as a number */ 
2126     value 
= strtoll(s
, &endptr
, 10); 
2127     if (endptr
[0] != '\0') return 0; 
2128     snprintf(buf
,32,"%lld",value
); 
2130     /* If the number converted back into a string is not identical 
2131      * then it's not possible to encode the string as integer */ 
2132     if (strlen(buf
) != sdslen(s
) || memcmp(buf
,s
,sdslen(s
))) return 0; 
2134     /* Finally check if it fits in our ranges */ 
2135     if (value 
>= -(1<<7) && value 
<= (1<<7)-1) { 
2136         enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT8
; 
2137         enc
[1] = value
&0xFF; 
2139     } else if (value 
>= -(1<<15) && value 
<= (1<<15)-1) { 
2140         enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT16
; 
2141         enc
[1] = value
&0xFF; 
2142         enc
[2] = (value
>>8)&0xFF; 
2144     } else if (value 
>= -((long long)1<<31) && value 
<= ((long long)1<<31)-1) { 
2145         enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT32
; 
2146         enc
[1] = value
&0xFF; 
2147         enc
[2] = (value
>>8)&0xFF; 
2148         enc
[3] = (value
>>16)&0xFF; 
2149         enc
[4] = (value
>>24)&0xFF; 
2156 static int rdbSaveLzfStringObject(FILE *fp
, robj 
*obj
) { 
2157     unsigned int comprlen
, outlen
; 
2161     /* We require at least four bytes compression for this to be worth it */ 
2162     outlen 
= sdslen(obj
->ptr
)-4; 
2163     if (outlen 
<= 0) return 0; 
2164     if ((out 
= zmalloc(outlen
+1)) == NULL
) return 0; 
2165     comprlen 
= lzf_compress(obj
->ptr
, sdslen(obj
->ptr
), out
, outlen
); 
2166     if (comprlen 
== 0) { 
2170     /* Data compressed! Let's save it on disk */ 
2171     byte 
= (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_LZF
; 
2172     if (fwrite(&byte
,1,1,fp
) == 0) goto writeerr
; 
2173     if (rdbSaveLen(fp
,comprlen
) == -1) goto writeerr
; 
2174     if (rdbSaveLen(fp
,sdslen(obj
->ptr
)) == -1) goto writeerr
; 
2175     if (fwrite(out
,comprlen
,1,fp
) == 0) goto writeerr
; 
2184 /* Save a string objet as [len][data] on disk. If the object is a string 
2185  * representation of an integer value we try to safe it in a special form */ 
2186 static int rdbSaveStringObjectRaw(FILE *fp
, robj 
*obj
) { 
2190     len 
= sdslen(obj
->ptr
); 
2192     /* Try integer encoding */ 
2194         unsigned char buf
[5]; 
2195         if ((enclen 
= rdbTryIntegerEncoding(obj
->ptr
,buf
)) > 0) { 
2196             if (fwrite(buf
,enclen
,1,fp
) == 0) return -1; 
2201     /* Try LZF compression - under 20 bytes it's unable to compress even 
2202      * aaaaaaaaaaaaaaaaaa so skip it */ 
2206         retval 
= rdbSaveLzfStringObject(fp
,obj
); 
2207         if (retval 
== -1) return -1; 
2208         if (retval 
> 0) return 0; 
2209         /* retval == 0 means data can't be compressed, save the old way */ 
2212     /* Store verbatim */ 
2213     if (rdbSaveLen(fp
,len
) == -1) return -1; 
2214     if (len 
&& fwrite(obj
->ptr
,len
,1,fp
) == 0) return -1; 
2218 /* Like rdbSaveStringObjectRaw() but handle encoded objects */ 
2219 static int rdbSaveStringObject(FILE *fp
, robj 
*obj
) { 
2223     if (obj
->encoding 
!= REDIS_ENCODING_RAW
) { 
2224         dec 
= getDecodedObject(obj
); 
2225         retval 
= rdbSaveStringObjectRaw(fp
,dec
); 
2229         return rdbSaveStringObjectRaw(fp
,obj
); 
2233 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */ 
2234 static int rdbSave(char *filename
) { 
2235     dictIterator 
*di 
= NULL
; 
2240     time_t now 
= time(NULL
); 
2242     snprintf(tmpfile
,256,"temp-%d.rdb", (int) getpid()); 
2243     fp 
= fopen(tmpfile
,"w"); 
2245         redisLog(REDIS_WARNING
, "Failed saving the DB: %s", strerror(errno
)); 
2248     if (fwrite("REDIS0001",9,1,fp
) == 0) goto werr
; 
2249     for (j 
= 0; j 
< server
.dbnum
; j
++) { 
2250         redisDb 
*db 
= server
.db
+j
; 
2252         if (dictSize(d
) == 0) continue; 
2253         di 
= dictGetIterator(d
); 
2259         /* Write the SELECT DB opcode */ 
2260         if (rdbSaveType(fp
,REDIS_SELECTDB
) == -1) goto werr
; 
2261         if (rdbSaveLen(fp
,j
) == -1) goto werr
; 
2263         /* Iterate this DB writing every entry */ 
2264         while((de 
= dictNext(di
)) != NULL
) { 
2265             robj 
*key 
= dictGetEntryKey(de
); 
2266             robj 
*o 
= dictGetEntryVal(de
); 
2267             time_t expiretime 
= getExpire(db
,key
); 
2269             /* Save the expire time */ 
2270             if (expiretime 
!= -1) { 
2271                 /* If this key is already expired skip it */ 
2272                 if (expiretime 
< now
) continue; 
2273                 if (rdbSaveType(fp
,REDIS_EXPIRETIME
) == -1) goto werr
; 
2274                 if (rdbSaveTime(fp
,expiretime
) == -1) goto werr
; 
2276             /* Save the key and associated value */ 
2277             if (rdbSaveType(fp
,o
->type
) == -1) goto werr
; 
2278             if (rdbSaveStringObject(fp
,key
) == -1) goto werr
; 
2279             if (o
->type 
== REDIS_STRING
) { 
2280                 /* Save a string value */ 
2281                 if (rdbSaveStringObject(fp
,o
) == -1) goto werr
; 
2282             } else if (o
->type 
== REDIS_LIST
) { 
2283                 /* Save a list value */ 
2284                 list 
*list 
= o
->ptr
; 
2288                 if (rdbSaveLen(fp
,listLength(list
)) == -1) goto werr
; 
2289                 while((ln 
= listYield(list
))) { 
2290                     robj 
*eleobj 
= listNodeValue(ln
); 
2292                     if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
; 
2294             } else if (o
->type 
== REDIS_SET
) { 
2295                 /* Save a set value */ 
2297                 dictIterator 
*di 
= dictGetIterator(set
); 
2300                 if (rdbSaveLen(fp
,dictSize(set
)) == -1) goto werr
; 
2301                 while((de 
= dictNext(di
)) != NULL
) { 
2302                     robj 
*eleobj 
= dictGetEntryKey(de
); 
2304                     if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
; 
2306                 dictReleaseIterator(di
); 
2311         dictReleaseIterator(di
); 
2314     if (rdbSaveType(fp
,REDIS_EOF
) == -1) goto werr
; 
2316     /* Make sure data will not remain on the OS's output buffers */ 
2321     /* Use RENAME to make sure the DB file is changed atomically only 
2322      * if the generate DB file is ok. */ 
2323     if (rename(tmpfile
,filename
) == -1) { 
2324         redisLog(REDIS_WARNING
,"Error moving temp DB file on the final destionation: %s", strerror(errno
)); 
2328     redisLog(REDIS_NOTICE
,"DB saved on disk"); 
2330     server
.lastsave 
= time(NULL
); 
2336     redisLog(REDIS_WARNING
,"Write error saving DB on disk: %s", strerror(errno
)); 
2337     if (di
) dictReleaseIterator(di
); 
2341 static int rdbSaveBackground(char *filename
) { 
2344     if (server
.bgsaveinprogress
) return REDIS_ERR
; 
2345     if ((childpid 
= fork()) == 0) { 
2348         if (rdbSave(filename
) == REDIS_OK
) { 
2355         if (childpid 
== -1) { 
2356             redisLog(REDIS_WARNING
,"Can't save in background: fork: %s", 
2360         redisLog(REDIS_NOTICE
,"Background saving started by pid %d",childpid
); 
2361         server
.bgsaveinprogress 
= 1; 
2362         server
.bgsavechildpid 
= childpid
; 
2365     return REDIS_OK
; /* unreached */ 
2368 static void rdbRemoveTempFile(pid_t childpid
) { 
2371     snprintf(tmpfile
,256,"temp-%d.rdb", (int) childpid
); 
2375 static int rdbLoadType(FILE *fp
) { 
2377     if (fread(&type
,1,1,fp
) == 0) return -1; 
2381 static time_t rdbLoadTime(FILE *fp
) { 
2383     if (fread(&t32
,4,1,fp
) == 0) return -1; 
2384     return (time_t) t32
; 
2387 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top 
2388  * of this file for a description of how this are stored on disk. 
2390  * isencoded is set to 1 if the readed length is not actually a length but 
2391  * an "encoding type", check the above comments for more info */ 
2392 static uint32_t rdbLoadLen(FILE *fp
, int rdbver
, int *isencoded
) { 
2393     unsigned char buf
[2]; 
2396     if (isencoded
) *isencoded 
= 0; 
2398         if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
; 
2403         if (fread(buf
,1,1,fp
) == 0) return REDIS_RDB_LENERR
; 
2404         type 
= (buf
[0]&0xC0)>>6; 
2405         if (type 
== REDIS_RDB_6BITLEN
) { 
2406             /* Read a 6 bit len */ 
2408         } else if (type 
== REDIS_RDB_ENCVAL
) { 
2409             /* Read a 6 bit len encoding type */ 
2410             if (isencoded
) *isencoded 
= 1; 
2412         } else if (type 
== REDIS_RDB_14BITLEN
) { 
2413             /* Read a 14 bit len */ 
2414             if (fread(buf
+1,1,1,fp
) == 0) return REDIS_RDB_LENERR
; 
2415             return ((buf
[0]&0x3F)<<8)|buf
[1]; 
2417             /* Read a 32 bit len */ 
2418             if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
; 
2424 static robj 
*rdbLoadIntegerObject(FILE *fp
, int enctype
) { 
2425     unsigned char enc
[4]; 
2428     if (enctype 
== REDIS_RDB_ENC_INT8
) { 
2429         if (fread(enc
,1,1,fp
) == 0) return NULL
; 
2430         val 
= (signed char)enc
[0]; 
2431     } else if (enctype 
== REDIS_RDB_ENC_INT16
) { 
2433         if (fread(enc
,2,1,fp
) == 0) return NULL
; 
2434         v 
= enc
[0]|(enc
[1]<<8); 
2436     } else if (enctype 
== REDIS_RDB_ENC_INT32
) { 
2438         if (fread(enc
,4,1,fp
) == 0) return NULL
; 
2439         v 
= enc
[0]|(enc
[1]<<8)|(enc
[2]<<16)|(enc
[3]<<24); 
2442         val 
= 0; /* anti-warning */ 
2445     return createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",val
)); 
2448 static robj 
*rdbLoadLzfStringObject(FILE*fp
, int rdbver
) { 
2449     unsigned int len
, clen
; 
2450     unsigned char *c 
= NULL
; 
2453     if ((clen 
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
; 
2454     if ((len 
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
; 
2455     if ((c 
= zmalloc(clen
)) == NULL
) goto err
; 
2456     if ((val 
= sdsnewlen(NULL
,len
)) == NULL
) goto err
; 
2457     if (fread(c
,clen
,1,fp
) == 0) goto err
; 
2458     if (lzf_decompress(c
,clen
,val
,len
) == 0) goto err
; 
2460     return createObject(REDIS_STRING
,val
); 
2467 static robj 
*rdbLoadStringObject(FILE*fp
, int rdbver
) { 
2472     len 
= rdbLoadLen(fp
,rdbver
,&isencoded
); 
2475         case REDIS_RDB_ENC_INT8
: 
2476         case REDIS_RDB_ENC_INT16
: 
2477         case REDIS_RDB_ENC_INT32
: 
2478             return tryObjectSharing(rdbLoadIntegerObject(fp
,len
)); 
2479         case REDIS_RDB_ENC_LZF
: 
2480             return tryObjectSharing(rdbLoadLzfStringObject(fp
,rdbver
)); 
2486     if (len 
== REDIS_RDB_LENERR
) return NULL
; 
2487     val 
= sdsnewlen(NULL
,len
); 
2488     if (len 
&& fread(val
,len
,1,fp
) == 0) { 
2492     return tryObjectSharing(createObject(REDIS_STRING
,val
)); 
2495 static int rdbLoad(char *filename
) { 
2497     robj 
*keyobj 
= NULL
; 
2499     int type
, retval
, rdbver
; 
2500     dict 
*d 
= server
.db
[0].dict
; 
2501     redisDb 
*db 
= server
.db
+0; 
2503     time_t expiretime 
= -1, now 
= time(NULL
); 
2505     fp 
= fopen(filename
,"r"); 
2506     if (!fp
) return REDIS_ERR
; 
2507     if (fread(buf
,9,1,fp
) == 0) goto eoferr
; 
2509     if (memcmp(buf
,"REDIS",5) != 0) { 
2511         redisLog(REDIS_WARNING
,"Wrong signature trying to load DB from file"); 
2514     rdbver 
= atoi(buf
+5); 
2517         redisLog(REDIS_WARNING
,"Can't handle RDB format version %d",rdbver
); 
2524         if ((type 
= rdbLoadType(fp
)) == -1) goto eoferr
; 
2525         if (type 
== REDIS_EXPIRETIME
) { 
2526             if ((expiretime 
= rdbLoadTime(fp
)) == -1) goto eoferr
; 
2527             /* We read the time so we need to read the object type again */ 
2528             if ((type 
= rdbLoadType(fp
)) == -1) goto eoferr
; 
2530         if (type 
== REDIS_EOF
) break; 
2531         /* Handle SELECT DB opcode as a special case */ 
2532         if (type 
== REDIS_SELECTDB
) { 
2533             if ((dbid 
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) 
2535             if (dbid 
>= (unsigned)server
.dbnum
) { 
2536                 redisLog(REDIS_WARNING
,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server
.dbnum
); 
2539             db 
= server
.db
+dbid
; 
2544         if ((keyobj 
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
; 
2546         if (type 
== REDIS_STRING
) { 
2547             /* Read string value */ 
2548             if ((o 
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
; 
2549             tryObjectEncoding(o
); 
2550         } else if (type 
== REDIS_LIST 
|| type 
== REDIS_SET
) { 
2551             /* Read list/set value */ 
2554             if ((listlen 
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) 
2556             o 
= (type 
== REDIS_LIST
) ? createListObject() : createSetObject(); 
2557             /* Load every single element of the list/set */ 
2561                 if ((ele 
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
; 
2562                 tryObjectEncoding(ele
); 
2563                 if (type 
== REDIS_LIST
) { 
2564                     listAddNodeTail((list
*)o
->ptr
,ele
); 
2566                     dictAdd((dict
*)o
->ptr
,ele
,NULL
); 
2572         /* Add the new object in the hash table */ 
2573         retval 
= dictAdd(d
,keyobj
,o
); 
2574         if (retval 
== DICT_ERR
) { 
2575             redisLog(REDIS_WARNING
,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj
->ptr
); 
2578         /* Set the expire time if needed */ 
2579         if (expiretime 
!= -1) { 
2580             setExpire(db
,keyobj
,expiretime
); 
2581             /* Delete this key if already expired */ 
2582             if (expiretime 
< now
) deleteKey(db
,keyobj
); 
2590 eoferr
: /* unexpected end of file is handled here with a fatal exit */ 
2591     if (keyobj
) decrRefCount(keyobj
); 
2592     redisLog(REDIS_WARNING
,"Short read or OOM loading DB. Unrecoverable error, exiting now."); 
2594     return REDIS_ERR
; /* Just to avoid warning */ 
2597 /*================================== Commands =============================== */ 
2599 static void authCommand(redisClient 
*c
) { 
2600     if (!server
.requirepass 
|| !strcmp(c
->argv
[1]->ptr
, server
.requirepass
)) { 
2601       c
->authenticated 
= 1; 
2602       addReply(c
,shared
.ok
); 
2604       c
->authenticated 
= 0; 
2605       addReply(c
,shared
.err
); 
2609 static void pingCommand(redisClient 
*c
) { 
2610     addReply(c
,shared
.pong
); 
2613 static void echoCommand(redisClient 
*c
) { 
2614     addReplyBulkLen(c
,c
->argv
[1]); 
2615     addReply(c
,c
->argv
[1]); 
2616     addReply(c
,shared
.crlf
); 
2619 /*=================================== Strings =============================== */ 
2621 static void setGenericCommand(redisClient 
*c
, int nx
) { 
2624     retval 
= dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]); 
2625     if (retval 
== DICT_ERR
) { 
2627             dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]); 
2628             incrRefCount(c
->argv
[2]); 
2630             addReply(c
,shared
.czero
); 
2634         incrRefCount(c
->argv
[1]); 
2635         incrRefCount(c
->argv
[2]); 
2638     removeExpire(c
->db
,c
->argv
[1]); 
2639     addReply(c
, nx 
? shared
.cone 
: shared
.ok
); 
2642 static void setCommand(redisClient 
*c
) { 
2643     setGenericCommand(c
,0); 
2646 static void setnxCommand(redisClient 
*c
) { 
2647     setGenericCommand(c
,1); 
2650 static void getCommand(redisClient 
*c
) { 
2651     robj 
*o 
= lookupKeyRead(c
->db
,c
->argv
[1]); 
2654         addReply(c
,shared
.nullbulk
); 
2656         if (o
->type 
!= REDIS_STRING
) { 
2657             addReply(c
,shared
.wrongtypeerr
); 
2659             addReplyBulkLen(c
,o
); 
2661             addReply(c
,shared
.crlf
); 
2666 static void getsetCommand(redisClient 
*c
) { 
2668     if (dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]) == DICT_ERR
) { 
2669         dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]); 
2671         incrRefCount(c
->argv
[1]); 
2673     incrRefCount(c
->argv
[2]); 
2675     removeExpire(c
->db
,c
->argv
[1]); 
2678 static void mgetCommand(redisClient 
*c
) { 
2681     addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",c
->argc
-1)); 
2682     for (j 
= 1; j 
< c
->argc
; j
++) { 
2683         robj 
*o 
= lookupKeyRead(c
->db
,c
->argv
[j
]); 
2685             addReply(c
,shared
.nullbulk
); 
2687             if (o
->type 
!= REDIS_STRING
) { 
2688                 addReply(c
,shared
.nullbulk
); 
2690                 addReplyBulkLen(c
,o
); 
2692                 addReply(c
,shared
.crlf
); 
2698 static void incrDecrCommand(redisClient 
*c
, long long incr
) { 
2703     o 
= lookupKeyWrite(c
->db
,c
->argv
[1]); 
2707         if (o
->type 
!= REDIS_STRING
) { 
2712             if (o
->encoding 
== REDIS_ENCODING_RAW
) 
2713                 value 
= strtoll(o
->ptr
, &eptr
, 10); 
2714             else if (o
->encoding 
== REDIS_ENCODING_INT
) 
2715                 value 
= (long)o
->ptr
; 
2722     o 
= createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",value
)); 
2723     tryObjectEncoding(o
); 
2724     retval 
= dictAdd(c
->db
->dict
,c
->argv
[1],o
); 
2725     if (retval 
== DICT_ERR
) { 
2726         dictReplace(c
->db
->dict
,c
->argv
[1],o
); 
2727         removeExpire(c
->db
,c
->argv
[1]); 
2729         incrRefCount(c
->argv
[1]); 
2732     addReply(c
,shared
.colon
); 
2734     addReply(c
,shared
.crlf
); 
2737 static void incrCommand(redisClient 
*c
) { 
2738     incrDecrCommand(c
,1); 
2741 static void decrCommand(redisClient 
*c
) { 
2742     incrDecrCommand(c
,-1); 
2745 static void incrbyCommand(redisClient 
*c
) { 
2746     long long incr 
= strtoll(c
->argv
[2]->ptr
, NULL
, 10); 
2747     incrDecrCommand(c
,incr
); 
2750 static void decrbyCommand(redisClient 
*c
) { 
2751     long long incr 
= strtoll(c
->argv
[2]->ptr
, NULL
, 10); 
2752     incrDecrCommand(c
,-incr
); 
2755 /* ========================= Type agnostic commands ========================= */ 
2757 static void delCommand(redisClient 
*c
) { 
2760     for (j 
= 1; j 
< c
->argc
; j
++) { 
2761         if (deleteKey(c
->db
,c
->argv
[j
])) { 
2768         addReply(c
,shared
.czero
); 
2771         addReply(c
,shared
.cone
); 
2774         addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",deleted
)); 
2779 static void existsCommand(redisClient 
*c
) { 
2780     addReply(c
,lookupKeyRead(c
->db
,c
->argv
[1]) ? shared
.cone 
: shared
.czero
); 
2783 static void selectCommand(redisClient 
*c
) { 
2784     int id 
= atoi(c
->argv
[1]->ptr
); 
2786     if (selectDb(c
,id
) == REDIS_ERR
) { 
2787         addReplySds(c
,sdsnew("-ERR invalid DB index\r\n")); 
2789         addReply(c
,shared
.ok
); 
2793 static void randomkeyCommand(redisClient 
*c
) { 
2797         de 
= dictGetRandomKey(c
->db
->dict
); 
2798         if (!de 
|| expireIfNeeded(c
->db
,dictGetEntryKey(de
)) == 0) break; 
2801         addReply(c
,shared
.plus
); 
2802         addReply(c
,shared
.crlf
); 
2804         addReply(c
,shared
.plus
); 
2805         addReply(c
,dictGetEntryKey(de
)); 
2806         addReply(c
,shared
.crlf
); 
2810 static void keysCommand(redisClient 
*c
) { 
2813     sds pattern 
= c
->argv
[1]->ptr
; 
2814     int plen 
= sdslen(pattern
); 
2815     int numkeys 
= 0, keyslen 
= 0; 
2816     robj 
*lenobj 
= createObject(REDIS_STRING
,NULL
); 
2818     di 
= dictGetIterator(c
->db
->dict
); 
2820     decrRefCount(lenobj
); 
2821     while((de 
= dictNext(di
)) != NULL
) { 
2822         robj 
*keyobj 
= dictGetEntryKey(de
); 
2824         sds key 
= keyobj
->ptr
; 
2825         if ((pattern
[0] == '*' && pattern
[1] == '\0') || 
2826             stringmatchlen(pattern
,plen
,key
,sdslen(key
),0)) { 
2827             if (expireIfNeeded(c
->db
,keyobj
) == 0) { 
2829                     addReply(c
,shared
.space
); 
2832                 keyslen 
+= sdslen(key
); 
2836     dictReleaseIterator(di
); 
2837     lenobj
->ptr 
= sdscatprintf(sdsempty(),"$%lu\r\n",keyslen
+(numkeys 
? (numkeys
-1) : 0)); 
2838     addReply(c
,shared
.crlf
); 
2841 static void dbsizeCommand(redisClient 
*c
) { 
2843         sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c
->db
->dict
))); 
2846 static void lastsaveCommand(redisClient 
*c
) { 
2848         sdscatprintf(sdsempty(),":%lu\r\n",server
.lastsave
)); 
2851 static void typeCommand(redisClient 
*c
) { 
2855     o 
= lookupKeyRead(c
->db
,c
->argv
[1]); 
2860         case REDIS_STRING
: type 
= "+string"; break; 
2861         case REDIS_LIST
: type 
= "+list"; break; 
2862         case REDIS_SET
: type 
= "+set"; break; 
2863         default: type 
= "unknown"; break; 
2866     addReplySds(c
,sdsnew(type
)); 
2867     addReply(c
,shared
.crlf
); 
2870 static void saveCommand(redisClient 
*c
) { 
2871     if (server
.bgsaveinprogress
) { 
2872         addReplySds(c
,sdsnew("-ERR background save in progress\r\n")); 
2875     if (rdbSave(server
.dbfilename
) == REDIS_OK
) { 
2876         addReply(c
,shared
.ok
); 
2878         addReply(c
,shared
.err
); 
2882 static void bgsaveCommand(redisClient 
*c
) { 
2883     if (server
.bgsaveinprogress
) { 
2884         addReplySds(c
,sdsnew("-ERR background save already in progress\r\n")); 
2887     if (rdbSaveBackground(server
.dbfilename
) == REDIS_OK
) { 
2888         addReply(c
,shared
.ok
); 
2890         addReply(c
,shared
.err
); 
2894 static void shutdownCommand(redisClient 
*c
) { 
2895     redisLog(REDIS_WARNING
,"User requested shutdown, saving DB..."); 
2896     /* Kill the saving child if there is a background saving in progress. 
2897        We want to avoid race conditions, for instance our saving child may 
2898        overwrite the synchronous saving did by SHUTDOWN. */ 
2899     if (server
.bgsaveinprogress
) { 
2900         redisLog(REDIS_WARNING
,"There is a live saving child. Killing it!"); 
2901         kill(server
.bgsavechildpid
,SIGKILL
); 
2902         rdbRemoveTempFile(server
.bgsavechildpid
); 
2905     if (rdbSave(server
.dbfilename
) == REDIS_OK
) { 
2906         if (server
.daemonize
) 
2907             unlink(server
.pidfile
); 
2908         redisLog(REDIS_WARNING
,"%zu bytes used at exit",zmalloc_used_memory()); 
2909         redisLog(REDIS_WARNING
,"Server exit now, bye bye..."); 
2912         /* Ooops.. error saving! The best we can do is to continue operating. 
2913          * Note that if there was a background saving process, in the next 
2914          * cron() Redis will be notified that the background saving aborted, 
2915          * handling special stuff like slaves pending for synchronization... */ 
2916         redisLog(REDIS_WARNING
,"Error trying to save the DB, can't exit");  
2917         addReplySds(c
,sdsnew("-ERR can't quit, problems saving the DB\r\n")); 
2921 static void renameGenericCommand(redisClient 
*c
, int nx
) { 
2924     /* To use the same key as src and dst is probably an error */ 
2925     if (sdscmp(c
->argv
[1]->ptr
,c
->argv
[2]->ptr
) == 0) { 
2926         addReply(c
,shared
.sameobjecterr
); 
2930     o 
= lookupKeyWrite(c
->db
,c
->argv
[1]); 
2932         addReply(c
,shared
.nokeyerr
); 
2936     deleteIfVolatile(c
->db
,c
->argv
[2]); 
2937     if (dictAdd(c
->db
->dict
,c
->argv
[2],o
) == DICT_ERR
) { 
2940             addReply(c
,shared
.czero
); 
2943         dictReplace(c
->db
->dict
,c
->argv
[2],o
); 
2945         incrRefCount(c
->argv
[2]); 
2947     deleteKey(c
->db
,c
->argv
[1]); 
2949     addReply(c
,nx 
? shared
.cone 
: shared
.ok
); 
2952 static void renameCommand(redisClient 
*c
) { 
2953     renameGenericCommand(c
,0); 
2956 static void renamenxCommand(redisClient 
*c
) { 
2957     renameGenericCommand(c
,1); 
2960 static void moveCommand(redisClient 
*c
) { 
2965     /* Obtain source and target DB pointers */ 
2968     if (selectDb(c
,atoi(c
->argv
[2]->ptr
)) == REDIS_ERR
) { 
2969         addReply(c
,shared
.outofrangeerr
); 
2973     selectDb(c
,srcid
); /* Back to the source DB */ 
2975     /* If the user is moving using as target the same 
2976      * DB as the source DB it is probably an error. */ 
2978         addReply(c
,shared
.sameobjecterr
); 
2982     /* Check if the element exists and get a reference */ 
2983     o 
= lookupKeyWrite(c
->db
,c
->argv
[1]); 
2985         addReply(c
,shared
.czero
); 
2989     /* Try to add the element to the target DB */ 
2990     deleteIfVolatile(dst
,c
->argv
[1]); 
2991     if (dictAdd(dst
->dict
,c
->argv
[1],o
) == DICT_ERR
) { 
2992         addReply(c
,shared
.czero
); 
2995     incrRefCount(c
->argv
[1]); 
2998     /* OK! key moved, free the entry in the source DB */ 
2999     deleteKey(src
,c
->argv
[1]); 
3001     addReply(c
,shared
.cone
); 
3004 /* =================================== Lists ================================ */ 
3005 static void pushGenericCommand(redisClient 
*c
, int where
) { 
3009     lobj 
= lookupKeyWrite(c
->db
,c
->argv
[1]); 
3011         lobj 
= createListObject(); 
3013         if (where 
== REDIS_HEAD
) { 
3014             listAddNodeHead(list
,c
->argv
[2]); 
3016             listAddNodeTail(list
,c
->argv
[2]); 
3018         dictAdd(c
->db
->dict
,c
->argv
[1],lobj
); 
3019         incrRefCount(c
->argv
[1]); 
3020         incrRefCount(c
->argv
[2]); 
3022         if (lobj
->type 
!= REDIS_LIST
) { 
3023             addReply(c
,shared
.wrongtypeerr
); 
3027         if (where 
== REDIS_HEAD
) { 
3028             listAddNodeHead(list
,c
->argv
[2]); 
3030             listAddNodeTail(list
,c
->argv
[2]); 
3032         incrRefCount(c
->argv
[2]); 
3035     addReply(c
,shared
.ok
); 
3038 static void lpushCommand(redisClient 
*c
) { 
3039     pushGenericCommand(c
,REDIS_HEAD
); 
3042 static void rpushCommand(redisClient 
*c
) { 
3043     pushGenericCommand(c
,REDIS_TAIL
); 
3046 static void llenCommand(redisClient 
*c
) { 
3050     o 
= lookupKeyRead(c
->db
,c
->argv
[1]); 
3052         addReply(c
,shared
.czero
); 
3055         if (o
->type 
!= REDIS_LIST
) { 
3056             addReply(c
,shared
.wrongtypeerr
); 
3059             addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",listLength(l
))); 
3064 static void lindexCommand(redisClient 
*c
) { 
3066     int index 
= atoi(c
->argv
[2]->ptr
); 
3068     o 
= lookupKeyRead(c
->db
,c
->argv
[1]); 
3070         addReply(c
,shared
.nullbulk
); 
3072         if (o
->type 
!= REDIS_LIST
) { 
3073             addReply(c
,shared
.wrongtypeerr
); 
3075             list 
*list 
= o
->ptr
; 
3078             ln 
= listIndex(list
, index
); 
3080                 addReply(c
,shared
.nullbulk
); 
3082                 robj 
*ele 
= listNodeValue(ln
); 
3083                 addReplyBulkLen(c
,ele
); 
3085                 addReply(c
,shared
.crlf
); 
3091 static void lsetCommand(redisClient 
*c
) { 
3093     int index 
= atoi(c
->argv
[2]->ptr
); 
3095     o 
= lookupKeyWrite(c
->db
,c
->argv
[1]); 
3097         addReply(c
,shared
.nokeyerr
); 
3099         if (o
->type 
!= REDIS_LIST
) { 
3100             addReply(c
,shared
.wrongtypeerr
); 
3102             list 
*list 
= o
->ptr
; 
3105             ln 
= listIndex(list
, index
); 
3107                 addReply(c
,shared
.outofrangeerr
); 
3109                 robj 
*ele 
= listNodeValue(ln
); 
3112                 listNodeValue(ln
) = c
->argv
[3]; 
3113                 incrRefCount(c
->argv
[3]); 
3114                 addReply(c
,shared
.ok
); 
3121 static void popGenericCommand(redisClient 
*c
, int where
) { 
3124     o 
= lookupKeyWrite(c
->db
,c
->argv
[1]); 
3126         addReply(c
,shared
.nullbulk
); 
3128         if (o
->type 
!= REDIS_LIST
) { 
3129             addReply(c
,shared
.wrongtypeerr
); 
3131             list 
*list 
= o
->ptr
; 
3134             if (where 
== REDIS_HEAD
) 
3135                 ln 
= listFirst(list
); 
3137                 ln 
= listLast(list
); 
3140                 addReply(c
,shared
.nullbulk
); 
3142                 robj 
*ele 
= listNodeValue(ln
); 
3143                 addReplyBulkLen(c
,ele
); 
3145                 addReply(c
,shared
.crlf
); 
3146                 listDelNode(list
,ln
); 
3153 static void lpopCommand(redisClient 
*c
) { 
3154     popGenericCommand(c
,REDIS_HEAD
); 
3157 static void rpopCommand(redisClient 
*c
) { 
3158     popGenericCommand(c
,REDIS_TAIL
); 
3161 static void lrangeCommand(redisClient 
*c
) { 
3163     int start 
= atoi(c
->argv
[2]->ptr
); 
3164     int end 
= atoi(c
->argv
[3]->ptr
); 
3166     o 
= lookupKeyRead(c
->db
,c
->argv
[1]); 
3168         addReply(c
,shared
.nullmultibulk
); 
3170         if (o
->type 
!= REDIS_LIST
) { 
3171             addReply(c
,shared
.wrongtypeerr
); 
3173             list 
*list 
= o
->ptr
; 
3175             int llen 
= listLength(list
); 
3179             /* convert negative indexes */ 
3180             if (start 
< 0) start 
= llen
+start
; 
3181             if (end 
< 0) end 
= llen
+end
; 
3182             if (start 
< 0) start 
= 0; 
3183             if (end 
< 0) end 
= 0; 
3185             /* indexes sanity checks */ 
3186             if (start 
> end 
|| start 
>= llen
) { 
3187                 /* Out of range start or start > end result in empty list */ 
3188                 addReply(c
,shared
.emptymultibulk
); 
3191             if (end 
>= llen
) end 
= llen
-1; 
3192             rangelen 
= (end
-start
)+1; 
3194             /* Return the result in form of a multi-bulk reply */ 
3195             ln 
= listIndex(list
, start
); 
3196             addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",rangelen
)); 
3197             for (j 
= 0; j 
< rangelen
; j
++) { 
3198                 ele 
= listNodeValue(ln
); 
3199                 addReplyBulkLen(c
,ele
); 
3201                 addReply(c
,shared
.crlf
); 
3208 static void ltrimCommand(redisClient 
*c
) { 
3210     int start 
= atoi(c
->argv
[2]->ptr
); 
3211     int end 
= atoi(c
->argv
[3]->ptr
); 
3213     o 
= lookupKeyWrite(c
->db
,c
->argv
[1]); 
3215         addReply(c
,shared
.nokeyerr
); 
3217         if (o
->type 
!= REDIS_LIST
) { 
3218             addReply(c
,shared
.wrongtypeerr
); 
3220             list 
*list 
= o
->ptr
; 
3222             int llen 
= listLength(list
); 
3223             int j
, ltrim
, rtrim
; 
3225             /* convert negative indexes */ 
3226             if (start 
< 0) start 
= llen
+start
; 
3227             if (end 
< 0) end 
= llen
+end
; 
3228             if (start 
< 0) start 
= 0; 
3229             if (end 
< 0) end 
= 0; 
3231             /* indexes sanity checks */ 
3232             if (start 
> end 
|| start 
>= llen
) { 
3233                 /* Out of range start or start > end result in empty list */ 
3237                 if (end 
>= llen
) end 
= llen
-1; 
3242             /* Remove list elements to perform the trim */ 
3243             for (j 
= 0; j 
< ltrim
; j
++) { 
3244                 ln 
= listFirst(list
); 
3245                 listDelNode(list
,ln
); 
3247             for (j 
= 0; j 
< rtrim
; j
++) { 
3248                 ln 
= listLast(list
); 
3249                 listDelNode(list
,ln
); 
3252             addReply(c
,shared
.ok
); 
3257 static void lremCommand(redisClient 
*c
) { 
3260     o 
= lookupKeyWrite(c
->db
,c
->argv
[1]); 
3262         addReply(c
,shared
.czero
); 
3264         if (o
->type 
!= REDIS_LIST
) { 
3265             addReply(c
,shared
.wrongtypeerr
); 
3267             list 
*list 
= o
->ptr
; 
3268             listNode 
*ln
, *next
; 
3269             int toremove 
= atoi(c
->argv
[2]->ptr
); 
3274                 toremove 
= -toremove
; 
3277             ln 
= fromtail 
? list
->tail 
: list
->head
; 
3279                 robj 
*ele 
= listNodeValue(ln
); 
3281                 next 
= fromtail 
? ln
->prev 
: ln
->next
; 
3282                 if (compareStringObjects(ele
,c
->argv
[3]) == 0) { 
3283                     listDelNode(list
,ln
); 
3286                     if (toremove 
&& removed 
== toremove
) break; 
3290             addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",removed
)); 
3295 /* ==================================== Sets ================================ */ 
3297 static void saddCommand(redisClient 
*c
) { 
3300     set 
= lookupKeyWrite(c
->db
,c
->argv
[1]); 
3302         set 
= createSetObject(); 
3303         dictAdd(c
->db
->dict
,c
->argv
[1],set
); 
3304         incrRefCount(c
->argv
[1]); 
3306         if (set
->type 
!= REDIS_SET
) { 
3307             addReply(c
,shared
.wrongtypeerr
); 
3311     if (dictAdd(set
->ptr
,c
->argv
[2],NULL
) == DICT_OK
) { 
3312         incrRefCount(c
->argv
[2]); 
3314         addReply(c
,shared
.cone
); 
3316         addReply(c
,shared
.czero
); 
3320 static void sremCommand(redisClient 
*c
) { 
3323     set 
= lookupKeyWrite(c
->db
,c
->argv
[1]); 
3325         addReply(c
,shared
.czero
); 
3327         if (set
->type 
!= REDIS_SET
) { 
3328             addReply(c
,shared
.wrongtypeerr
); 
3331         if (dictDelete(set
->ptr
,c
->argv
[2]) == DICT_OK
) { 
3333             if (htNeedsResize(set
->ptr
)) dictResize(set
->ptr
); 
3334             addReply(c
,shared
.cone
); 
3336             addReply(c
,shared
.czero
); 
3341 static void smoveCommand(redisClient 
*c
) { 
3342     robj 
*srcset
, *dstset
; 
3344     srcset 
= lookupKeyWrite(c
->db
,c
->argv
[1]); 
3345     dstset 
= lookupKeyWrite(c
->db
,c
->argv
[2]); 
3347     /* If the source key does not exist return 0, if it's of the wrong type 
3349     if (srcset 
== NULL 
|| srcset
->type 
!= REDIS_SET
) { 
3350         addReply(c
, srcset 
? shared
.wrongtypeerr 
: shared
.czero
); 
3353     /* Error if the destination key is not a set as well */ 
3354     if (dstset 
&& dstset
->type 
!= REDIS_SET
) { 
3355         addReply(c
,shared
.wrongtypeerr
); 
3358     /* Remove the element from the source set */ 
3359     if (dictDelete(srcset
->ptr
,c
->argv
[3]) == DICT_ERR
) { 
3360         /* Key not found in the src set! return zero */ 
3361         addReply(c
,shared
.czero
); 
3365     /* Add the element to the destination set */ 
3367         dstset 
= createSetObject(); 
3368         dictAdd(c
->db
->dict
,c
->argv
[2],dstset
); 
3369         incrRefCount(c
->argv
[2]); 
3371     if (dictAdd(dstset
->ptr
,c
->argv
[3],NULL
) == DICT_OK
) 
3372         incrRefCount(c
->argv
[3]); 
3373     addReply(c
,shared
.cone
); 
3376 static void sismemberCommand(redisClient 
*c
) { 
3379     set 
= lookupKeyRead(c
->db
,c
->argv
[1]); 
3381         addReply(c
,shared
.czero
); 
3383         if (set
->type 
!= REDIS_SET
) { 
3384             addReply(c
,shared
.wrongtypeerr
); 
3387         if (dictFind(set
->ptr
,c
->argv
[2])) 
3388             addReply(c
,shared
.cone
); 
3390             addReply(c
,shared
.czero
); 
3394 static void scardCommand(redisClient 
*c
) { 
3398     o 
= lookupKeyRead(c
->db
,c
->argv
[1]); 
3400         addReply(c
,shared
.czero
); 
3403         if (o
->type 
!= REDIS_SET
) { 
3404             addReply(c
,shared
.wrongtypeerr
); 
3407             addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n", 
3413 static void spopCommand(redisClient 
*c
) { 
3417     set 
= lookupKeyWrite(c
->db
,c
->argv
[1]); 
3419         addReply(c
,shared
.nullbulk
); 
3421         if (set
->type 
!= REDIS_SET
) { 
3422             addReply(c
,shared
.wrongtypeerr
); 
3425         de 
= dictGetRandomKey(set
->ptr
); 
3427             addReply(c
,shared
.nullbulk
); 
3429             robj 
*ele 
= dictGetEntryKey(de
); 
3431             addReplyBulkLen(c
,ele
); 
3433             addReply(c
,shared
.crlf
); 
3434             dictDelete(set
->ptr
,ele
); 
3435             if (htNeedsResize(set
->ptr
)) dictResize(set
->ptr
); 
3441 static void srandmemberCommand(redisClient 
*c
) { 
3445     set 
= lookupKeyRead(c
->db
,c
->argv
[1]); 
3447         addReply(c
,shared
.nullbulk
); 
3449         if (set
->type 
!= REDIS_SET
) { 
3450             addReply(c
,shared
.wrongtypeerr
); 
3453         de 
= dictGetRandomKey(set
->ptr
); 
3455             addReply(c
,shared
.nullbulk
); 
3457             robj 
*ele 
= dictGetEntryKey(de
); 
3459             addReplyBulkLen(c
,ele
); 
3461             addReply(c
,shared
.crlf
); 
3466 static int qsortCompareSetsByCardinality(const void *s1
, const void *s2
) { 
3467     dict 
**d1 
= (void*) s1
, **d2 
= (void*) s2
; 
3469     return dictSize(*d1
)-dictSize(*d2
); 
3472 static void sinterGenericCommand(redisClient 
*c
, robj 
**setskeys
, int setsnum
, robj 
*dstkey
) { 
3473     dict 
**dv 
= zmalloc(sizeof(dict
*)*setsnum
); 
3476     robj 
*lenobj 
= NULL
, *dstset 
= NULL
; 
3477     int j
, cardinality 
= 0; 
3479     for (j 
= 0; j 
< setsnum
; j
++) { 
3483                     lookupKeyWrite(c
->db
,setskeys
[j
]) : 
3484                     lookupKeyRead(c
->db
,setskeys
[j
]); 
3488                 deleteKey(c
->db
,dstkey
); 
3489                 addReply(c
,shared
.ok
); 
3491                 addReply(c
,shared
.nullmultibulk
); 
3495         if (setobj
->type 
!= REDIS_SET
) { 
3497             addReply(c
,shared
.wrongtypeerr
); 
3500         dv
[j
] = setobj
->ptr
; 
3502     /* Sort sets from the smallest to largest, this will improve our 
3503      * algorithm's performace */ 
3504     qsort(dv
,setsnum
,sizeof(dict
*),qsortCompareSetsByCardinality
); 
3506     /* The first thing we should output is the total number of elements... 
3507      * since this is a multi-bulk write, but at this stage we don't know 
3508      * the intersection set size, so we use a trick, append an empty object 
3509      * to the output list and save the pointer to later modify it with the 
3512         lenobj 
= createObject(REDIS_STRING
,NULL
); 
3514         decrRefCount(lenobj
); 
3516         /* If we have a target key where to store the resulting set 
3517          * create this key with an empty set inside */ 
3518         dstset 
= createSetObject(); 
3521     /* Iterate all the elements of the first (smallest) set, and test 
3522      * the element against all the other sets, if at least one set does 
3523      * not include the element it is discarded */ 
3524     di 
= dictGetIterator(dv
[0]); 
3526     while((de 
= dictNext(di
)) != NULL
) { 
3529         for (j 
= 1; j 
< setsnum
; j
++) 
3530             if (dictFind(dv
[j
],dictGetEntryKey(de
)) == NULL
) break; 
3532             continue; /* at least one set does not contain the member */ 
3533         ele 
= dictGetEntryKey(de
); 
3535             addReplyBulkLen(c
,ele
); 
3537             addReply(c
,shared
.crlf
); 
3540             dictAdd(dstset
->ptr
,ele
,NULL
); 
3544     dictReleaseIterator(di
); 
3547         /* Store the resulting set into the target */ 
3548         deleteKey(c
->db
,dstkey
); 
3549         dictAdd(c
->db
->dict
,dstkey
,dstset
); 
3550         incrRefCount(dstkey
); 
3554         lenobj
->ptr 
= sdscatprintf(sdsempty(),"*%d\r\n",cardinality
); 
3556         addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n", 
3557             dictSize((dict
*)dstset
->ptr
))); 
3563 static void sinterCommand(redisClient 
*c
) { 
3564     sinterGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
); 
3567 static void sinterstoreCommand(redisClient 
*c
) { 
3568     sinterGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1]); 
3571 #define REDIS_OP_UNION 0 
3572 #define REDIS_OP_DIFF 1 
3574 static void sunionDiffGenericCommand(redisClient 
*c
, robj 
**setskeys
, int setsnum
, robj 
*dstkey
, int op
) { 
3575     dict 
**dv 
= zmalloc(sizeof(dict
*)*setsnum
); 
3578     robj 
*dstset 
= NULL
; 
3579     int j
, cardinality 
= 0; 
3581     for (j 
= 0; j 
< setsnum
; j
++) { 
3585                     lookupKeyWrite(c
->db
,setskeys
[j
]) : 
3586                     lookupKeyRead(c
->db
,setskeys
[j
]); 
3591         if (setobj
->type 
!= REDIS_SET
) { 
3593             addReply(c
,shared
.wrongtypeerr
); 
3596         dv
[j
] = setobj
->ptr
; 
3599     /* We need a temp set object to store our union. If the dstkey 
3600      * is not NULL (that is, we are inside an SUNIONSTORE operation) then 
3601      * this set object will be the resulting object to set into the target key*/ 
3602     dstset 
= createSetObject(); 
3604     /* Iterate all the elements of all the sets, add every element a single 
3605      * time to the result set */ 
3606     for (j 
= 0; j 
< setsnum
; j
++) { 
3607         if (op 
== REDIS_OP_DIFF 
&& j 
== 0 && !dv
[j
]) break; /* result set is empty */ 
3608         if (!dv
[j
]) continue; /* non existing keys are like empty sets */ 
3610         di 
= dictGetIterator(dv
[j
]); 
3612         while((de 
= dictNext(di
)) != NULL
) { 
3615             /* dictAdd will not add the same element multiple times */ 
3616             ele 
= dictGetEntryKey(de
); 
3617             if (op 
== REDIS_OP_UNION 
|| j 
== 0) { 
3618                 if (dictAdd(dstset
->ptr
,ele
,NULL
) == DICT_OK
) { 
3622             } else if (op 
== REDIS_OP_DIFF
) { 
3623                 if (dictDelete(dstset
->ptr
,ele
) == DICT_OK
) { 
3628         dictReleaseIterator(di
); 
3630         if (op 
== REDIS_OP_DIFF 
&& cardinality 
== 0) break; /* result set is empty */ 
3633     /* Output the content of the resulting set, if not in STORE mode */ 
3635         addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",cardinality
)); 
3636         di 
= dictGetIterator(dstset
->ptr
); 
3637         while((de 
= dictNext(di
)) != NULL
) { 
3640             ele 
= dictGetEntryKey(de
); 
3641             addReplyBulkLen(c
,ele
); 
3643             addReply(c
,shared
.crlf
); 
3645         dictReleaseIterator(di
); 
3647         /* If we have a target key where to store the resulting set 
3648          * create this key with the result set inside */ 
3649         deleteKey(c
->db
,dstkey
); 
3650         dictAdd(c
->db
->dict
,dstkey
,dstset
); 
3651         incrRefCount(dstkey
); 
3656         decrRefCount(dstset
); 
3658         addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n", 
3659             dictSize((dict
*)dstset
->ptr
))); 
3665 static void sunionCommand(redisClient 
*c
) { 
3666     sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_UNION
); 
3669 static void sunionstoreCommand(redisClient 
*c
) { 
3670     sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_UNION
); 
3673 static void sdiffCommand(redisClient 
*c
) { 
3674     sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_DIFF
); 
3677 static void sdiffstoreCommand(redisClient 
*c
) { 
3678     sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_DIFF
); 
3681 /* ==================================== ZSets =============================== */ 
3683 /* ZSETs are ordered sets using two data structures to hold the same elements 
3684  * in order to get O(log(N)) INSERT and REMOVE operations into a sorted 
3687  * The elements are added to an hash table mapping Redis objects to scores. 
3688  * At the same time the elements are added to a skip list mapping scores 
3689  * to Redis objects (so objects are sorted by scores in this "view"). */ 
3691 /* This skiplist implementation is almost a C translation of the original 
3692  * algorithm described by William Pugh in "Skip Lists: A Probabilistic 
3693  * Alternative to Balanced Trees", modified in three ways: 
3694  * a) this implementation allows for repeated values. 
3695  * b) the comparison is not just by key (our 'score') but by satellite data. 
3696  * c) there is a back pointer, so it's a doubly linked list with the back 
3697  * pointers being only at "level 1". This allows to traverse the list 
3698  * from tail to head, useful for ZREVRANGE. */ 
3700 static zskiplistNode 
*zslCreateNode(int level
, double score
, robj 
*obj
) { 
3701     zskiplistNode 
*zn 
= zmalloc(sizeof(*zn
)); 
3703     zn
->forward 
= zmalloc(sizeof(zskiplistNode
*) * level
); 
3709 static zskiplist 
*zslCreate(void) { 
3713     zsl 
= zmalloc(sizeof(*zsl
)); 
3716     zsl
->header 
= zslCreateNode(ZSKIPLIST_MAXLEVEL
,0,NULL
); 
3717     for (j 
= 0; j 
< ZSKIPLIST_MAXLEVEL
; j
++) 
3718         zsl
->header
->forward
[j
] = NULL
; 
3722 static void zslFreeNode(zskiplistNode 
*node
) { 
3723     decrRefCount(node
->obj
); 
3727 static void zslFree(zskiplist 
*zsl
) { 
3728     zskiplistNode 
*node 
= zsl
->header
->forward
[1], *next
; 
3731         next 
= node
->forward
[1]; 
3737 static int zslRandomLevel(void) { 
3739     while ((random()&0xFFFF) < (ZSKIPLIST_P 
* 0xFFFF)) 
3744 static void zslInsert(zskiplist 
*zsl
, double score
, robj 
*obj
) { 
3745     zskiplistNode 
*update
[ZSKIPLIST_MAXLEVEL
], *x
; 
3749     for (i 
= zsl
->level
-1; i 
>= 0; i
--) { 
3750         while (x
->forward
[i
] && x
->forward
[i
]->score 
< score
) 
3755     /* we assume the key is not already inside, since we allow duplicated 
3756      * scores, and the re-insertion of score and redis object should never 
3757      * happpen since the caller of zslInsert() should test in the hash table 
3758      * if the element is already inside or not. */ 
3759     level 
= zslRandomLevel(); 
3760     if (level 
> zsl
->level
) { 
3761         for (i 
= zsl
->level
; i 
< level
; i
++) 
3762             update
[i
] = zsl
->header
; 
3765     x 
= zslCreateNode(level
,score
,obj
); 
3766     for (i 
= 0; i 
< level
; i
++) { 
3767         x
->forward
[i
] = update
[i
]->forward
[i
]; 
3768         update
[i
]->forward
[i
] = x
; 
3773 static int zslDelete(zskiplist 
*zsl
, double score
, robj 
*obj
) { 
3777 /* The actual Z-commands implementations */ 
3779 static void zaddCommand(redisClient 
*c
) { 
3784     zsetobj 
= lookupKeyWrite(c
->db
,c
->argv
[1]); 
3785     if (zsetobj 
== NULL
) { 
3786         zsetobj 
= createZsetObject(); 
3787         dictAdd(c
->db
->dict
,c
->argv
[1],zsetobj
); 
3788         incrRefCount(c
->argv
[1]); 
3790         if (zsetobj
->type 
!= REDIS_ZSET
) { 
3791             addReply(c
,shared
.wrongtypeerr
); 
3795     score 
= zmalloc(sizeof(double)); 
3796     *score 
= strtod(c
->argv
[2]->ptr
,NULL
); 
3798     if (dictAdd(zs
->dict
,c
->argv
[3],score
) == DICT_OK
) { 
3799         /* case 1: New element */ 
3800         incrRefCount(c
->argv
[3]); /* added to hash */ 
3801         zslInsert(zs
->zsl
,*score
,c
->argv
[3]); 
3802         incrRefCount(c
->argv
[3]); /* added to skiplist */ 
3804         addReply(c
,shared
.cone
); 
3809         /* case 2: Score update operation */ 
3810         de 
= dictFind(zs
->dict
,c
->argv
[3]); 
3812         oldscore 
= dictGetEntryVal(de
); 
3813         if (*score 
!= *oldscore
) { 
3816             deleted 
= zslDelete(zs
->zsl
,*score
,c
->argv
[3]); 
3817             assert(deleted 
!= 0); 
3818             zslInsert(zs
->zsl
,*score
,c
->argv
[3]); 
3819             incrRefCount(c
->argv
[3]); 
3822         addReply(c
,shared
.czero
); 
3826 static void zrangeCommand(redisClient 
*c
) { 
3828     int start 
= atoi(c
->argv
[2]->ptr
); 
3829     int end 
= atoi(c
->argv
[3]->ptr
); 
3831     o 
= lookupKeyRead(c
->db
,c
->argv
[1]); 
3833         addReply(c
,shared
.nullmultibulk
); 
3835         if (o
->type 
!= REDIS_ZSET
) { 
3836             addReply(c
,shared
.wrongtypeerr
); 
3838             zset 
*zsetobj 
= o
->ptr
; 
3839             zskiplist 
*zsl 
= zsetobj
->zsl
; 
3842             int llen 
= zsl
->length
; 
3846             /* convert negative indexes */ 
3847             if (start 
< 0) start 
= llen
+start
; 
3848             if (end 
< 0) end 
= llen
+end
; 
3849             if (start 
< 0) start 
= 0; 
3850             if (end 
< 0) end 
= 0; 
3852             /* indexes sanity checks */ 
3853             if (start 
> end 
|| start 
>= llen
) { 
3854                 /* Out of range start or start > end result in empty list */ 
3855                 addReply(c
,shared
.emptymultibulk
); 
3858             if (end 
>= llen
) end 
= llen
-1; 
3859             rangelen 
= (end
-start
)+1; 
3861             /* Return the result in form of a multi-bulk reply */ 
3862             ln 
= zsl
->header
->forward
[0]; 
3864                 ln 
= ln
->forward
[0]; 
3866             addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",rangelen
)); 
3867             for (j 
= 0; j 
< rangelen
; j
++) { 
3869                 addReplyBulkLen(c
,ele
); 
3871                 addReply(c
,shared
.crlf
); 
3872                 ln 
= ln
->forward
[0]; 
3878 /* ========================= Non type-specific commands  ==================== */ 
3880 static void flushdbCommand(redisClient 
*c
) { 
3881     server
.dirty 
+= dictSize(c
->db
->dict
); 
3882     dictEmpty(c
->db
->dict
); 
3883     dictEmpty(c
->db
->expires
); 
3884     addReply(c
,shared
.ok
); 
3887 static void flushallCommand(redisClient 
*c
) { 
3888     server
.dirty 
+= emptyDb(); 
3889     addReply(c
,shared
.ok
); 
3890     rdbSave(server
.dbfilename
); 
3894 static redisSortOperation 
*createSortOperation(int type
, robj 
*pattern
) { 
3895     redisSortOperation 
*so 
= zmalloc(sizeof(*so
)); 
3897     so
->pattern 
= pattern
; 
3901 /* Return the value associated to the key with a name obtained 
3902  * substituting the first occurence of '*' in 'pattern' with 'subst' */ 
3903 static robj 
*lookupKeyByPattern(redisDb 
*db
, robj 
*pattern
, robj 
*subst
) { 
3907     int prefixlen
, sublen
, postfixlen
; 
3908     /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */ 
3912         char buf
[REDIS_SORTKEY_MAX
+1]; 
3915     if (subst
->encoding 
== REDIS_ENCODING_RAW
) 
3916         incrRefCount(subst
); 
3918         subst 
= getDecodedObject(subst
); 
3921     spat 
= pattern
->ptr
; 
3923     if (sdslen(spat
)+sdslen(ssub
)-1 > REDIS_SORTKEY_MAX
) return NULL
; 
3924     p 
= strchr(spat
,'*'); 
3925     if (!p
) return NULL
; 
3928     sublen 
= sdslen(ssub
); 
3929     postfixlen 
= sdslen(spat
)-(prefixlen
+1); 
3930     memcpy(keyname
.buf
,spat
,prefixlen
); 
3931     memcpy(keyname
.buf
+prefixlen
,ssub
,sublen
); 
3932     memcpy(keyname
.buf
+prefixlen
+sublen
,p
+1,postfixlen
); 
3933     keyname
.buf
[prefixlen
+sublen
+postfixlen
] = '\0'; 
3934     keyname
.len 
= prefixlen
+sublen
+postfixlen
; 
3936     keyobj
.refcount 
= 1; 
3937     keyobj
.type 
= REDIS_STRING
; 
3938     keyobj
.ptr 
= ((char*)&keyname
)+(sizeof(long)*2); 
3940     decrRefCount(subst
); 
3942     /* printf("lookup '%s' => %p\n", keyname.buf,de); */ 
3943     return lookupKeyRead(db
,&keyobj
); 
3946 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with 
3947  * the additional parameter is not standard but a BSD-specific we have to 
3948  * pass sorting parameters via the global 'server' structure */ 
3949 static int sortCompare(const void *s1
, const void *s2
) { 
3950     const redisSortObject 
*so1 
= s1
, *so2 
= s2
; 
3953     if (!server
.sort_alpha
) { 
3954         /* Numeric sorting. Here it's trivial as we precomputed scores */ 
3955         if (so1
->u
.score 
> so2
->u
.score
) { 
3957         } else if (so1
->u
.score 
< so2
->u
.score
) { 
3963         /* Alphanumeric sorting */ 
3964         if (server
.sort_bypattern
) { 
3965             if (!so1
->u
.cmpobj 
|| !so2
->u
.cmpobj
) { 
3966                 /* At least one compare object is NULL */ 
3967                 if (so1
->u
.cmpobj 
== so2
->u
.cmpobj
) 
3969                 else if (so1
->u
.cmpobj 
== NULL
) 
3974                 /* We have both the objects, use strcoll */ 
3975                 cmp 
= strcoll(so1
->u
.cmpobj
->ptr
,so2
->u
.cmpobj
->ptr
); 
3978             /* Compare elements directly */ 
3979             if (so1
->obj
->encoding 
== REDIS_ENCODING_RAW 
&& 
3980                 so2
->obj
->encoding 
== REDIS_ENCODING_RAW
) { 
3981                 cmp 
= strcoll(so1
->obj
->ptr
,so2
->obj
->ptr
); 
3985                 dec1 
= so1
->obj
->encoding 
== REDIS_ENCODING_RAW 
? 
3986                     so1
->obj 
: getDecodedObject(so1
->obj
); 
3987                 dec2 
= so2
->obj
->encoding 
== REDIS_ENCODING_RAW 
? 
3988                     so2
->obj 
: getDecodedObject(so2
->obj
); 
3989                 cmp 
= strcoll(dec1
->ptr
,dec2
->ptr
); 
3990                 if (dec1 
!= so1
->obj
) decrRefCount(dec1
); 
3991                 if (dec2 
!= so2
->obj
) decrRefCount(dec2
); 
3995     return server
.sort_desc 
? -cmp 
: cmp
; 
3998 /* The SORT command is the most complex command in Redis. Warning: this code 
3999  * is optimized for speed and a bit less for readability */ 
4000 static void sortCommand(redisClient 
*c
) { 
4003     int desc 
= 0, alpha 
= 0; 
4004     int limit_start 
= 0, limit_count 
= -1, start
, end
; 
4005     int j
, dontsort 
= 0, vectorlen
; 
4006     int getop 
= 0; /* GET operation counter */ 
4007     robj 
*sortval
, *sortby 
= NULL
; 
4008     redisSortObject 
*vector
; /* Resulting vector to sort */ 
4010     /* Lookup the key to sort. It must be of the right types */ 
4011     sortval 
= lookupKeyRead(c
->db
,c
->argv
[1]); 
4012     if (sortval 
== NULL
) { 
4013         addReply(c
,shared
.nokeyerr
); 
4016     if (sortval
->type 
!= REDIS_SET 
&& sortval
->type 
!= REDIS_LIST
) { 
4017         addReply(c
,shared
.wrongtypeerr
); 
4021     /* Create a list of operations to perform for every sorted element. 
4022      * Operations can be GET/DEL/INCR/DECR */ 
4023     operations 
= listCreate(); 
4024     listSetFreeMethod(operations
,zfree
); 
4027     /* Now we need to protect sortval incrementing its count, in the future 
4028      * SORT may have options able to overwrite/delete keys during the sorting 
4029      * and the sorted key itself may get destroied */ 
4030     incrRefCount(sortval
); 
4032     /* The SORT command has an SQL-alike syntax, parse it */ 
4033     while(j 
< c
->argc
) { 
4034         int leftargs 
= c
->argc
-j
-1; 
4035         if (!strcasecmp(c
->argv
[j
]->ptr
,"asc")) { 
4037         } else if (!strcasecmp(c
->argv
[j
]->ptr
,"desc")) { 
4039         } else if (!strcasecmp(c
->argv
[j
]->ptr
,"alpha")) { 
4041         } else if (!strcasecmp(c
->argv
[j
]->ptr
,"limit") && leftargs 
>= 2) { 
4042             limit_start 
= atoi(c
->argv
[j
+1]->ptr
); 
4043             limit_count 
= atoi(c
->argv
[j
+2]->ptr
); 
4045         } else if (!strcasecmp(c
->argv
[j
]->ptr
,"by") && leftargs 
>= 1) { 
4046             sortby 
= c
->argv
[j
+1]; 
4047             /* If the BY pattern does not contain '*', i.e. it is constant, 
4048              * we don't need to sort nor to lookup the weight keys. */ 
4049             if (strchr(c
->argv
[j
+1]->ptr
,'*') == NULL
) dontsort 
= 1; 
4051         } else if (!strcasecmp(c
->argv
[j
]->ptr
,"get") && leftargs 
>= 1) { 
4052             listAddNodeTail(operations
,createSortOperation( 
4053                 REDIS_SORT_GET
,c
->argv
[j
+1])); 
4056         } else if (!strcasecmp(c
->argv
[j
]->ptr
,"del") && leftargs 
>= 1) { 
4057             listAddNodeTail(operations
,createSortOperation( 
4058                 REDIS_SORT_DEL
,c
->argv
[j
+1])); 
4060         } else if (!strcasecmp(c
->argv
[j
]->ptr
,"incr") && leftargs 
>= 1) { 
4061             listAddNodeTail(operations
,createSortOperation( 
4062                 REDIS_SORT_INCR
,c
->argv
[j
+1])); 
4064         } else if (!strcasecmp(c
->argv
[j
]->ptr
,"get") && leftargs 
>= 1) { 
4065             listAddNodeTail(operations
,createSortOperation( 
4066                 REDIS_SORT_DECR
,c
->argv
[j
+1])); 
4069             decrRefCount(sortval
); 
4070             listRelease(operations
); 
4071             addReply(c
,shared
.syntaxerr
); 
4077     /* Load the sorting vector with all the objects to sort */ 
4078     vectorlen 
= (sortval
->type 
== REDIS_LIST
) ? 
4079         listLength((list
*)sortval
->ptr
) : 
4080         dictSize((dict
*)sortval
->ptr
); 
4081     vector 
= zmalloc(sizeof(redisSortObject
)*vectorlen
); 
4083     if (sortval
->type 
== REDIS_LIST
) { 
4084         list 
*list 
= sortval
->ptr
; 
4088         while((ln 
= listYield(list
))) { 
4089             robj 
*ele 
= ln
->value
; 
4090             vector
[j
].obj 
= ele
; 
4091             vector
[j
].u
.score 
= 0; 
4092             vector
[j
].u
.cmpobj 
= NULL
; 
4096         dict 
*set 
= sortval
->ptr
; 
4100         di 
= dictGetIterator(set
); 
4101         while((setele 
= dictNext(di
)) != NULL
) { 
4102             vector
[j
].obj 
= dictGetEntryKey(setele
); 
4103             vector
[j
].u
.score 
= 0; 
4104             vector
[j
].u
.cmpobj 
= NULL
; 
4107         dictReleaseIterator(di
); 
4109     assert(j 
== vectorlen
); 
4111     /* Now it's time to load the right scores in the sorting vector */ 
4112     if (dontsort 
== 0) { 
4113         for (j 
= 0; j 
< vectorlen
; j
++) { 
4117                 byval 
= lookupKeyByPattern(c
->db
,sortby
,vector
[j
].obj
); 
4118                 if (!byval 
|| byval
->type 
!= REDIS_STRING
) continue; 
4120                     if (byval
->encoding 
== REDIS_ENCODING_RAW
) { 
4121                         vector
[j
].u
.cmpobj 
= byval
; 
4122                         incrRefCount(byval
); 
4124                         vector
[j
].u
.cmpobj 
= getDecodedObject(byval
); 
4127                     if (byval
->encoding 
== REDIS_ENCODING_RAW
) { 
4128                         vector
[j
].u
.score 
= strtod(byval
->ptr
,NULL
); 
4130                         if (byval
->encoding 
== REDIS_ENCODING_INT
) { 
4131                             vector
[j
].u
.score 
= (long)byval
->ptr
; 
4138                     if (vector
[j
].obj
->encoding 
== REDIS_ENCODING_RAW
) 
4139                         vector
[j
].u
.score 
= strtod(vector
[j
].obj
->ptr
,NULL
); 
4141                         if (vector
[j
].obj
->encoding 
== REDIS_ENCODING_INT
) 
4142                             vector
[j
].u
.score 
= (long) vector
[j
].obj
->ptr
; 
4151     /* We are ready to sort the vector... perform a bit of sanity check 
4152      * on the LIMIT option too. We'll use a partial version of quicksort. */ 
4153     start 
= (limit_start 
< 0) ? 0 : limit_start
; 
4154     end 
= (limit_count 
< 0) ? vectorlen
-1 : start
+limit_count
-1; 
4155     if (start 
>= vectorlen
) { 
4156         start 
= vectorlen
-1; 
4159     if (end 
>= vectorlen
) end 
= vectorlen
-1; 
4161     if (dontsort 
== 0) { 
4162         server
.sort_desc 
= desc
; 
4163         server
.sort_alpha 
= alpha
; 
4164         server
.sort_bypattern 
= sortby 
? 1 : 0; 
4165         if (sortby 
&& (start 
!= 0 || end 
!= vectorlen
-1)) 
4166             pqsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
, start
,end
); 
4168             qsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
); 
4171     /* Send command output to the output buffer, performing the specified 
4172      * GET/DEL/INCR/DECR operations if any. */ 
4173     outputlen 
= getop 
? getop
*(end
-start
+1) : end
-start
+1; 
4174     addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",outputlen
)); 
4175     for (j 
= start
; j 
<= end
; j
++) { 
4178             addReplyBulkLen(c
,vector
[j
].obj
); 
4179             addReply(c
,vector
[j
].obj
); 
4180             addReply(c
,shared
.crlf
); 
4182         listRewind(operations
); 
4183         while((ln 
= listYield(operations
))) { 
4184             redisSortOperation 
*sop 
= ln
->value
; 
4185             robj 
*val 
= lookupKeyByPattern(c
->db
,sop
->pattern
, 
4188             if (sop
->type 
== REDIS_SORT_GET
) { 
4189                 if (!val 
|| val
->type 
!= REDIS_STRING
) { 
4190                     addReply(c
,shared
.nullbulk
); 
4192                     addReplyBulkLen(c
,val
); 
4194                     addReply(c
,shared
.crlf
); 
4196             } else if (sop
->type 
== REDIS_SORT_DEL
) { 
4203     decrRefCount(sortval
); 
4204     listRelease(operations
); 
4205     for (j 
= 0; j 
< vectorlen
; j
++) { 
4206         if (sortby 
&& alpha 
&& vector
[j
].u
.cmpobj
) 
4207             decrRefCount(vector
[j
].u
.cmpobj
); 
4212 static void infoCommand(redisClient 
*c
) { 
4214     time_t uptime 
= time(NULL
)-server
.stat_starttime
; 
4217     info 
= sdscatprintf(sdsempty(), 
4218         "redis_version:%s\r\n" 
4220         "uptime_in_seconds:%d\r\n" 
4221         "uptime_in_days:%d\r\n" 
4222         "connected_clients:%d\r\n" 
4223         "connected_slaves:%d\r\n" 
4224         "used_memory:%zu\r\n" 
4225         "changes_since_last_save:%lld\r\n" 
4226         "bgsave_in_progress:%d\r\n" 
4227         "last_save_time:%d\r\n" 
4228         "total_connections_received:%lld\r\n" 
4229         "total_commands_processed:%lld\r\n" 
4232         (sizeof(long) == 8) ? "64" : "32", 
4235         listLength(server
.clients
)-listLength(server
.slaves
), 
4236         listLength(server
.slaves
), 
4239         server
.bgsaveinprogress
, 
4241         server
.stat_numconnections
, 
4242         server
.stat_numcommands
, 
4243         server
.masterhost 
== NULL 
? "master" : "slave" 
4245     if (server
.masterhost
) { 
4246         info 
= sdscatprintf(info
, 
4247             "master_host:%s\r\n" 
4248             "master_port:%d\r\n" 
4249             "master_link_status:%s\r\n" 
4250             "master_last_io_seconds_ago:%d\r\n" 
4253             (server
.replstate 
== REDIS_REPL_CONNECTED
) ? 
4255             (int)(time(NULL
)-server
.master
->lastinteraction
) 
4258     for (j 
= 0; j 
< server
.dbnum
; j
++) { 
4259         long long keys
, vkeys
; 
4261         keys 
= dictSize(server
.db
[j
].dict
); 
4262         vkeys 
= dictSize(server
.db
[j
].expires
); 
4263         if (keys 
|| vkeys
) { 
4264             info 
= sdscatprintf(info
, "db%d: keys=%lld,expires=%lld\r\n", 
4268     addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info
))); 
4269     addReplySds(c
,info
); 
4270     addReply(c
,shared
.crlf
); 
4273 static void monitorCommand(redisClient 
*c
) { 
4274     /* ignore MONITOR if aleady slave or in monitor mode */ 
4275     if (c
->flags 
& REDIS_SLAVE
) return; 
4277     c
->flags 
|= (REDIS_SLAVE
|REDIS_MONITOR
); 
4279     listAddNodeTail(server
.monitors
,c
); 
4280     addReply(c
,shared
.ok
); 
4283 /* ================================= Expire ================================= */ 
4284 static int removeExpire(redisDb 
*db
, robj 
*key
) { 
4285     if (dictDelete(db
->expires
,key
) == DICT_OK
) { 
4292 static int setExpire(redisDb 
*db
, robj 
*key
, time_t when
) { 
4293     if (dictAdd(db
->expires
,key
,(void*)when
) == DICT_ERR
) { 
4301 /* Return the expire time of the specified key, or -1 if no expire 
4302  * is associated with this key (i.e. the key is non volatile) */ 
4303 static time_t getExpire(redisDb 
*db
, robj 
*key
) { 
4306     /* No expire? return ASAP */ 
4307     if (dictSize(db
->expires
) == 0 || 
4308        (de 
= dictFind(db
->expires
,key
)) == NULL
) return -1; 
4310     return (time_t) dictGetEntryVal(de
); 
4313 static int expireIfNeeded(redisDb 
*db
, robj 
*key
) { 
4317     /* No expire? return ASAP */ 
4318     if (dictSize(db
->expires
) == 0 || 
4319        (de 
= dictFind(db
->expires
,key
)) == NULL
) return 0; 
4321     /* Lookup the expire */ 
4322     when 
= (time_t) dictGetEntryVal(de
); 
4323     if (time(NULL
) <= when
) return 0; 
4325     /* Delete the key */ 
4326     dictDelete(db
->expires
,key
); 
4327     return dictDelete(db
->dict
,key
) == DICT_OK
; 
4330 static int deleteIfVolatile(redisDb 
*db
, robj 
*key
) { 
4333     /* No expire? return ASAP */ 
4334     if (dictSize(db
->expires
) == 0 || 
4335        (de 
= dictFind(db
->expires
,key
)) == NULL
) return 0; 
4337     /* Delete the key */ 
4339     dictDelete(db
->expires
,key
); 
4340     return dictDelete(db
->dict
,key
) == DICT_OK
; 
4343 static void expireCommand(redisClient 
*c
) { 
4345     int seconds 
= atoi(c
->argv
[2]->ptr
); 
4347     de 
= dictFind(c
->db
->dict
,c
->argv
[1]); 
4349         addReply(c
,shared
.czero
); 
4353         addReply(c
, shared
.czero
); 
4356         time_t when 
= time(NULL
)+seconds
; 
4357         if (setExpire(c
->db
,c
->argv
[1],when
)) { 
4358             addReply(c
,shared
.cone
); 
4361             addReply(c
,shared
.czero
); 
4367 static void ttlCommand(redisClient 
*c
) { 
4371     expire 
= getExpire(c
->db
,c
->argv
[1]); 
4373         ttl 
= (int) (expire
-time(NULL
)); 
4374         if (ttl 
< 0) ttl 
= -1; 
4376     addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",ttl
)); 
4379 static void msetGenericCommand(redisClient 
*c
, int nx
) { 
4382     if ((c
->argc 
% 2) == 0) { 
4383         addReplySds(c
,sdsnew("-ERR wrong number of arguments\r\n")); 
4386     /* Handle the NX flag. The MSETNX semantic is to return zero and don't 
4387      * set nothing at all if at least one already key exists. */ 
4389         for (j 
= 1; j 
< c
->argc
; j 
+= 2) { 
4390             if (dictFind(c
->db
->dict
,c
->argv
[j
]) != NULL
) { 
4391                 addReply(c
, shared
.czero
); 
4397     for (j 
= 1; j 
< c
->argc
; j 
+= 2) { 
4400         retval 
= dictAdd(c
->db
->dict
,c
->argv
[j
],c
->argv
[j
+1]); 
4401         if (retval 
== DICT_ERR
) { 
4402             dictReplace(c
->db
->dict
,c
->argv
[j
],c
->argv
[j
+1]); 
4403             incrRefCount(c
->argv
[j
+1]); 
4405             incrRefCount(c
->argv
[j
]); 
4406             incrRefCount(c
->argv
[j
+1]); 
4408         removeExpire(c
->db
,c
->argv
[j
]); 
4410     server
.dirty 
+= (c
->argc
-1)/2; 
4411     addReply(c
, nx 
? shared
.cone 
: shared
.ok
); 
4414 static void msetCommand(redisClient 
*c
) { 
4415     msetGenericCommand(c
,0); 
4418 static void msetnxCommand(redisClient 
*c
) { 
4419     msetGenericCommand(c
,1); 
4422 /* =============================== Replication  ============================= */ 
4424 static int syncWrite(int fd
, char *ptr
, ssize_t size
, int timeout
) { 
4425     ssize_t nwritten
, ret 
= size
; 
4426     time_t start 
= time(NULL
); 
4430         if (aeWait(fd
,AE_WRITABLE
,1000) & AE_WRITABLE
) { 
4431             nwritten 
= write(fd
,ptr
,size
); 
4432             if (nwritten 
== -1) return -1; 
4436         if ((time(NULL
)-start
) > timeout
) { 
4444 static int syncRead(int fd
, char *ptr
, ssize_t size
, int timeout
) { 
4445     ssize_t nread
, totread 
= 0; 
4446     time_t start 
= time(NULL
); 
4450         if (aeWait(fd
,AE_READABLE
,1000) & AE_READABLE
) { 
4451             nread 
= read(fd
,ptr
,size
); 
4452             if (nread 
== -1) return -1; 
4457         if ((time(NULL
)-start
) > timeout
) { 
4465 static int syncReadLine(int fd
, char *ptr
, ssize_t size
, int timeout
) { 
4472         if (syncRead(fd
,&c
,1,timeout
) == -1) return -1; 
4475             if (nread 
&& *(ptr
-1) == '\r') *(ptr
-1) = '\0'; 
4486 static void syncCommand(redisClient 
*c
) { 
4487     /* ignore SYNC if aleady slave or in monitor mode */ 
4488     if (c
->flags 
& REDIS_SLAVE
) return; 
4490     /* SYNC can't be issued when the server has pending data to send to 
4491      * the client about already issued commands. We need a fresh reply 
4492      * buffer registering the differences between the BGSAVE and the current 
4493      * dataset, so that we can copy to other slaves if needed. */ 
4494     if (listLength(c
->reply
) != 0) { 
4495         addReplySds(c
,sdsnew("-ERR SYNC is invalid with pending input\r\n")); 
4499     redisLog(REDIS_NOTICE
,"Slave ask for synchronization"); 
4500     /* Here we need to check if there is a background saving operation 
4501      * in progress, or if it is required to start one */ 
4502     if (server
.bgsaveinprogress
) { 
4503         /* Ok a background save is in progress. Let's check if it is a good 
4504          * one for replication, i.e. if there is another slave that is 
4505          * registering differences since the server forked to save */ 
4509         listRewind(server
.slaves
); 
4510         while((ln 
= listYield(server
.slaves
))) { 
4512             if (slave
->replstate 
== REDIS_REPL_WAIT_BGSAVE_END
) break; 
4515             /* Perfect, the server is already registering differences for 
4516              * another slave. Set the right state, and copy the buffer. */ 
4517             listRelease(c
->reply
); 
4518             c
->reply 
= listDup(slave
->reply
); 
4519             c
->replstate 
= REDIS_REPL_WAIT_BGSAVE_END
; 
4520             redisLog(REDIS_NOTICE
,"Waiting for end of BGSAVE for SYNC"); 
4522             /* No way, we need to wait for the next BGSAVE in order to 
4523              * register differences */ 
4524             c
->replstate 
= REDIS_REPL_WAIT_BGSAVE_START
; 
4525             redisLog(REDIS_NOTICE
,"Waiting for next BGSAVE for SYNC"); 
4528         /* Ok we don't have a BGSAVE in progress, let's start one */ 
4529         redisLog(REDIS_NOTICE
,"Starting BGSAVE for SYNC"); 
4530         if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) { 
4531             redisLog(REDIS_NOTICE
,"Replication failed, can't BGSAVE"); 
4532             addReplySds(c
,sdsnew("-ERR Unalbe to perform background save\r\n")); 
4535         c
->replstate 
= REDIS_REPL_WAIT_BGSAVE_END
; 
4538     c
->flags 
|= REDIS_SLAVE
; 
4540     listAddNodeTail(server
.slaves
,c
); 
4544 static void sendBulkToSlave(aeEventLoop 
*el
, int fd
, void *privdata
, int mask
) { 
4545     redisClient 
*slave 
= privdata
; 
4547     REDIS_NOTUSED(mask
); 
4548     char buf
[REDIS_IOBUF_LEN
]; 
4549     ssize_t nwritten
, buflen
; 
4551     if (slave
->repldboff 
== 0) { 
4552         /* Write the bulk write count before to transfer the DB. In theory here 
4553          * we don't know how much room there is in the output buffer of the 
4554          * socket, but in pratice SO_SNDLOWAT (the minimum count for output 
4555          * operations) will never be smaller than the few bytes we need. */ 
4558         bulkcount 
= sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long) 
4560         if (write(fd
,bulkcount
,sdslen(bulkcount
)) != (signed)sdslen(bulkcount
)) 
4568     lseek(slave
->repldbfd
,slave
->repldboff
,SEEK_SET
); 
4569     buflen 
= read(slave
->repldbfd
,buf
,REDIS_IOBUF_LEN
); 
4571         redisLog(REDIS_WARNING
,"Read error sending DB to slave: %s", 
4572             (buflen 
== 0) ? "premature EOF" : strerror(errno
)); 
4576     if ((nwritten 
= write(fd
,buf
,buflen
)) == -1) { 
4577         redisLog(REDIS_DEBUG
,"Write error sending DB to slave: %s", 
4582     slave
->repldboff 
+= nwritten
; 
4583     if (slave
->repldboff 
== slave
->repldbsize
) { 
4584         close(slave
->repldbfd
); 
4585         slave
->repldbfd 
= -1; 
4586         aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
); 
4587         slave
->replstate 
= REDIS_REPL_ONLINE
; 
4588         if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
, 
4589             sendReplyToClient
, slave
, NULL
) == AE_ERR
) { 
4593         addReplySds(slave
,sdsempty()); 
4594         redisLog(REDIS_NOTICE
,"Synchronization with slave succeeded"); 
4598 /* This function is called at the end of every backgrond saving. 
4599  * The argument bgsaveerr is REDIS_OK if the background saving succeeded 
4600  * otherwise REDIS_ERR is passed to the function. 
4602  * The goal of this function is to handle slaves waiting for a successful 
4603  * background saving in order to perform non-blocking synchronization. */ 
4604 static void updateSlavesWaitingBgsave(int bgsaveerr
) { 
4606     int startbgsave 
= 0; 
4608     listRewind(server
.slaves
); 
4609     while((ln 
= listYield(server
.slaves
))) { 
4610         redisClient 
*slave 
= ln
->value
; 
4612         if (slave
->replstate 
== REDIS_REPL_WAIT_BGSAVE_START
) { 
4614             slave
->replstate 
= REDIS_REPL_WAIT_BGSAVE_END
; 
4615         } else if (slave
->replstate 
== REDIS_REPL_WAIT_BGSAVE_END
) { 
4616             struct redis_stat buf
; 
4618             if (bgsaveerr 
!= REDIS_OK
) { 
4620                 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE child returned an error"); 
4623             if ((slave
->repldbfd 
= open(server
.dbfilename
,O_RDONLY
)) == -1 || 
4624                 redis_fstat(slave
->repldbfd
,&buf
) == -1) { 
4626                 redisLog(REDIS_WARNING
,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno
)); 
4629             slave
->repldboff 
= 0; 
4630             slave
->repldbsize 
= buf
.st_size
; 
4631             slave
->replstate 
= REDIS_REPL_SEND_BULK
; 
4632             aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
); 
4633             if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
, sendBulkToSlave
, slave
, NULL
) == AE_ERR
) { 
4640         if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) { 
4641             listRewind(server
.slaves
); 
4642             redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE failed"); 
4643             while((ln 
= listYield(server
.slaves
))) { 
4644                 redisClient 
*slave 
= ln
->value
; 
4646                 if (slave
->replstate 
== REDIS_REPL_WAIT_BGSAVE_START
) 
4653 static int syncWithMaster(void) { 
4654     char buf
[1024], tmpfile
[256]; 
4656     int fd 
= anetTcpConnect(NULL
,server
.masterhost
,server
.masterport
); 
4660         redisLog(REDIS_WARNING
,"Unable to connect to MASTER: %s", 
4664     /* Issue the SYNC command */ 
4665     if (syncWrite(fd
,"SYNC \r\n",7,5) == -1) { 
4667         redisLog(REDIS_WARNING
,"I/O error writing to MASTER: %s", 
4671     /* Read the bulk write count */ 
4672     if (syncReadLine(fd
,buf
,1024,3600) == -1) { 
4674         redisLog(REDIS_WARNING
,"I/O error reading bulk count from MASTER: %s", 
4678     dumpsize 
= atoi(buf
+1); 
4679     redisLog(REDIS_NOTICE
,"Receiving %d bytes data dump from MASTER",dumpsize
); 
4680     /* Read the bulk write data on a temp file */ 
4681     snprintf(tmpfile
,256,"temp-%d.%ld.rdb",(int)time(NULL
),(long int)random()); 
4682     dfd 
= open(tmpfile
,O_CREAT
|O_WRONLY
,0644); 
4685         redisLog(REDIS_WARNING
,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno
)); 
4689         int nread
, nwritten
; 
4691         nread 
= read(fd
,buf
,(dumpsize 
< 1024)?dumpsize
:1024); 
4693             redisLog(REDIS_WARNING
,"I/O error trying to sync with MASTER: %s", 
4699         nwritten 
= write(dfd
,buf
,nread
); 
4700         if (nwritten 
== -1) { 
4701             redisLog(REDIS_WARNING
,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno
)); 
4709     if (rename(tmpfile
,server
.dbfilename
) == -1) { 
4710         redisLog(REDIS_WARNING
,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno
)); 
4716     if (rdbLoad(server
.dbfilename
) != REDIS_OK
) { 
4717         redisLog(REDIS_WARNING
,"Failed trying to load the MASTER synchronization DB from disk"); 
4721     server
.master 
= createClient(fd
); 
4722     server
.master
->flags 
|= REDIS_MASTER
; 
4723     server
.replstate 
= REDIS_REPL_CONNECTED
; 
4727 static void slaveofCommand(redisClient 
*c
) { 
4728     if (!strcasecmp(c
->argv
[1]->ptr
,"no") && 
4729         !strcasecmp(c
->argv
[2]->ptr
,"one")) { 
4730         if (server
.masterhost
) { 
4731             sdsfree(server
.masterhost
); 
4732             server
.masterhost 
= NULL
; 
4733             if (server
.master
) freeClient(server
.master
); 
4734             server
.replstate 
= REDIS_REPL_NONE
; 
4735             redisLog(REDIS_NOTICE
,"MASTER MODE enabled (user request)"); 
4738         sdsfree(server
.masterhost
); 
4739         server
.masterhost 
= sdsdup(c
->argv
[1]->ptr
); 
4740         server
.masterport 
= atoi(c
->argv
[2]->ptr
); 
4741         if (server
.master
) freeClient(server
.master
); 
4742         server
.replstate 
= REDIS_REPL_CONNECT
; 
4743         redisLog(REDIS_NOTICE
,"SLAVE OF %s:%d enabled (user request)", 
4744             server
.masterhost
, server
.masterport
); 
4746     addReply(c
,shared
.ok
); 
4749 /* ============================ Maxmemory directive  ======================== */ 
4751 /* This function gets called when 'maxmemory' is set on the config file to limit 
4752  * the max memory used by the server, and we are out of memory. 
4753  * This function will try to, in order: 
4755  * - Free objects from the free list 
4756  * - Try to remove keys with an EXPIRE set 
4758  * It is not possible to free enough memory to reach used-memory < maxmemory 
4759  * the server will start refusing commands that will enlarge even more the 
4762 static void freeMemoryIfNeeded(void) { 
4763     while (server
.maxmemory 
&& zmalloc_used_memory() > server
.maxmemory
) { 
4764         if (listLength(server
.objfreelist
)) { 
4767             listNode 
*head 
= listFirst(server
.objfreelist
); 
4768             o 
= listNodeValue(head
); 
4769             listDelNode(server
.objfreelist
,head
); 
4772             int j
, k
, freed 
= 0; 
4774             for (j 
= 0; j 
< server
.dbnum
; j
++) { 
4776                 robj 
*minkey 
= NULL
; 
4777                 struct dictEntry 
*de
; 
4779                 if (dictSize(server
.db
[j
].expires
)) { 
4781                     /* From a sample of three keys drop the one nearest to 
4782                      * the natural expire */ 
4783                     for (k 
= 0; k 
< 3; k
++) { 
4786                         de 
= dictGetRandomKey(server
.db
[j
].expires
); 
4787                         t 
= (time_t) dictGetEntryVal(de
); 
4788                         if (minttl 
== -1 || t 
< minttl
) { 
4789                             minkey 
= dictGetEntryKey(de
); 
4793                     deleteKey(server
.db
+j
,minkey
); 
4796             if (!freed
) return; /* nothing to free... */ 
4801 /* ================================= Debugging ============================== */ 
4803 static void debugCommand(redisClient 
*c
) { 
4804     if (!strcasecmp(c
->argv
[1]->ptr
,"segfault")) { 
4806     } else if (!strcasecmp(c
->argv
[1]->ptr
,"object") && c
->argc 
== 3) { 
4807         dictEntry 
*de 
= dictFind(c
->db
->dict
,c
->argv
[2]); 
4811             addReply(c
,shared
.nokeyerr
); 
4814         key 
= dictGetEntryKey(de
); 
4815         val 
= dictGetEntryVal(de
); 
4816         addReplySds(c
,sdscatprintf(sdsempty(), 
4817             "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n", 
4818                 key
, key
->refcount
, val
, val
->refcount
, val
->encoding
)); 
4820         addReplySds(c
,sdsnew( 
4821             "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>]\r\n")); 
4825 #ifdef HAVE_BACKTRACE 
4826 static struct redisFunctionSym symsTable
[] = { 
4827 {"compareStringObjects", (unsigned long)compareStringObjects
}, 
4828 {"isStringRepresentableAsLong", (unsigned long)isStringRepresentableAsLong
}, 
4829 {"dictEncObjKeyCompare", (unsigned long)dictEncObjKeyCompare
}, 
4830 {"dictEncObjHash", (unsigned long)dictEncObjHash
}, 
4831 {"incrDecrCommand", (unsigned long)incrDecrCommand
}, 
4832 {"freeStringObject", (unsigned long)freeStringObject
}, 
4833 {"freeListObject", (unsigned long)freeListObject
}, 
4834 {"freeSetObject", (unsigned long)freeSetObject
}, 
4835 {"decrRefCount", (unsigned long)decrRefCount
}, 
4836 {"createObject", (unsigned long)createObject
}, 
4837 {"freeClient", (unsigned long)freeClient
}, 
4838 {"rdbLoad", (unsigned long)rdbLoad
}, 
4839 {"rdbSaveStringObject", (unsigned long)rdbSaveStringObject
}, 
4840 {"rdbSaveStringObjectRaw", (unsigned long)rdbSaveStringObjectRaw
}, 
4841 {"addReply", (unsigned long)addReply
}, 
4842 {"addReplySds", (unsigned long)addReplySds
}, 
4843 {"incrRefCount", (unsigned long)incrRefCount
}, 
4844 {"rdbSaveBackground", (unsigned long)rdbSaveBackground
}, 
4845 {"createStringObject", (unsigned long)createStringObject
}, 
4846 {"replicationFeedSlaves", (unsigned long)replicationFeedSlaves
}, 
4847 {"syncWithMaster", (unsigned long)syncWithMaster
}, 
4848 {"tryObjectSharing", (unsigned long)tryObjectSharing
}, 
4849 {"tryObjectEncoding", (unsigned long)tryObjectEncoding
}, 
4850 {"getDecodedObject", (unsigned long)getDecodedObject
}, 
4851 {"removeExpire", (unsigned long)removeExpire
}, 
4852 {"expireIfNeeded", (unsigned long)expireIfNeeded
}, 
4853 {"deleteIfVolatile", (unsigned long)deleteIfVolatile
}, 
4854 {"deleteKey", (unsigned long)deleteKey
}, 
4855 {"getExpire", (unsigned long)getExpire
}, 
4856 {"setExpire", (unsigned long)setExpire
}, 
4857 {"updateSlavesWaitingBgsave", (unsigned long)updateSlavesWaitingBgsave
}, 
4858 {"freeMemoryIfNeeded", (unsigned long)freeMemoryIfNeeded
}, 
4859 {"authCommand", (unsigned long)authCommand
}, 
4860 {"pingCommand", (unsigned long)pingCommand
}, 
4861 {"echoCommand", (unsigned long)echoCommand
}, 
4862 {"setCommand", (unsigned long)setCommand
}, 
4863 {"setnxCommand", (unsigned long)setnxCommand
}, 
4864 {"getCommand", (unsigned long)getCommand
}, 
4865 {"delCommand", (unsigned long)delCommand
}, 
4866 {"existsCommand", (unsigned long)existsCommand
}, 
4867 {"incrCommand", (unsigned long)incrCommand
}, 
4868 {"decrCommand", (unsigned long)decrCommand
}, 
4869 {"incrbyCommand", (unsigned long)incrbyCommand
}, 
4870 {"decrbyCommand", (unsigned long)decrbyCommand
}, 
4871 {"selectCommand", (unsigned long)selectCommand
}, 
4872 {"randomkeyCommand", (unsigned long)randomkeyCommand
}, 
4873 {"keysCommand", (unsigned long)keysCommand
}, 
4874 {"dbsizeCommand", (unsigned long)dbsizeCommand
}, 
4875 {"lastsaveCommand", (unsigned long)lastsaveCommand
}, 
4876 {"saveCommand", (unsigned long)saveCommand
}, 
4877 {"bgsaveCommand", (unsigned long)bgsaveCommand
}, 
4878 {"shutdownCommand", (unsigned long)shutdownCommand
}, 
4879 {"moveCommand", (unsigned long)moveCommand
}, 
4880 {"renameCommand", (unsigned long)renameCommand
}, 
4881 {"renamenxCommand", (unsigned long)renamenxCommand
}, 
4882 {"lpushCommand", (unsigned long)lpushCommand
}, 
4883 {"rpushCommand", (unsigned long)rpushCommand
}, 
4884 {"lpopCommand", (unsigned long)lpopCommand
}, 
4885 {"rpopCommand", (unsigned long)rpopCommand
}, 
4886 {"llenCommand", (unsigned long)llenCommand
}, 
4887 {"lindexCommand", (unsigned long)lindexCommand
}, 
4888 {"lrangeCommand", (unsigned long)lrangeCommand
}, 
4889 {"ltrimCommand", (unsigned long)ltrimCommand
}, 
4890 {"typeCommand", (unsigned long)typeCommand
}, 
4891 {"lsetCommand", (unsigned long)lsetCommand
}, 
4892 {"saddCommand", (unsigned long)saddCommand
}, 
4893 {"sremCommand", (unsigned long)sremCommand
}, 
4894 {"smoveCommand", (unsigned long)smoveCommand
}, 
4895 {"sismemberCommand", (unsigned long)sismemberCommand
}, 
4896 {"scardCommand", (unsigned long)scardCommand
}, 
4897 {"spopCommand", (unsigned long)spopCommand
}, 
4898 {"srandmemberCommand", (unsigned long)srandmemberCommand
}, 
4899 {"sinterCommand", (unsigned long)sinterCommand
}, 
4900 {"sinterstoreCommand", (unsigned long)sinterstoreCommand
}, 
4901 {"sunionCommand", (unsigned long)sunionCommand
}, 
4902 {"sunionstoreCommand", (unsigned long)sunionstoreCommand
}, 
4903 {"sdiffCommand", (unsigned long)sdiffCommand
}, 
4904 {"sdiffstoreCommand", (unsigned long)sdiffstoreCommand
}, 
4905 {"syncCommand", (unsigned long)syncCommand
}, 
4906 {"flushdbCommand", (unsigned long)flushdbCommand
}, 
4907 {"flushallCommand", (unsigned long)flushallCommand
}, 
4908 {"sortCommand", (unsigned long)sortCommand
}, 
4909 {"lremCommand", (unsigned long)lremCommand
}, 
4910 {"infoCommand", (unsigned long)infoCommand
}, 
4911 {"mgetCommand", (unsigned long)mgetCommand
}, 
4912 {"monitorCommand", (unsigned long)monitorCommand
}, 
4913 {"expireCommand", (unsigned long)expireCommand
}, 
4914 {"getsetCommand", (unsigned long)getsetCommand
}, 
4915 {"ttlCommand", (unsigned long)ttlCommand
}, 
4916 {"slaveofCommand", (unsigned long)slaveofCommand
}, 
4917 {"debugCommand", (unsigned long)debugCommand
}, 
4918 {"processCommand", (unsigned long)processCommand
}, 
4919 {"setupSigSegvAction", (unsigned long)setupSigSegvAction
}, 
4920 {"readQueryFromClient", (unsigned long)readQueryFromClient
}, 
4921 {"rdbRemoveTempFile", (unsigned long)rdbRemoveTempFile
}, 
4922 {"msetGenericCommand", (unsigned long)msetGenericCommand
}, 
4923 {"msetCommand", (unsigned long)msetCommand
}, 
4924 {"msetnxCommand", (unsigned long)msetnxCommand
}, 
4925 {"zslCreateNode", (unsigned long)zslCreateNode
}, 
4926 {"zslCreate", (unsigned long)zslCreate
}, 
4927 {"zslFreeNode",(unsigned long)zslFreeNode
}, 
4928 {"zslFree",(unsigned long)zslFree
}, 
4929 {"zslRandomLevel",(unsigned long)zslRandomLevel
}, 
4930 {"zslInsert",(unsigned long)zslInsert
}, 
4931 {"zslDelete",(unsigned long)zslDelete
}, 
4932 {"createZsetObject",(unsigned long)createZsetObject
}, 
4933 {"zaddCommand",(unsigned long)zaddCommand
}, 
4934 {"zrangeCommand",(unsigned long)zrangeCommand
}, 
4938 /* This function try to convert a pointer into a function name. It's used in 
4939  * oreder to provide a backtrace under segmentation fault that's able to 
4940  * display functions declared as static (otherwise the backtrace is useless). */ 
4941 static char *findFuncName(void *pointer
, unsigned long *offset
){ 
4943     unsigned long off
, minoff 
= 0; 
4945     /* Try to match against the Symbol with the smallest offset */ 
4946     for (i
=0; symsTable
[i
].pointer
; i
++) { 
4947         unsigned long lp 
= (unsigned long) pointer
; 
4949         if (lp 
!= (unsigned long)-1 && lp 
>= symsTable
[i
].pointer
) { 
4950             off
=lp
-symsTable
[i
].pointer
; 
4951             if (ret 
< 0 || off 
< minoff
) { 
4957     if (ret 
== -1) return NULL
; 
4959     return symsTable
[ret
].name
; 
4962 static void *getMcontextEip(ucontext_t 
*uc
) { 
4963 #if defined(__FreeBSD__) 
4964     return (void*) uc
->uc_mcontext
.mc_eip
; 
4965 #elif defined(__dietlibc__) 
4966     return (void*) uc
->uc_mcontext
.eip
; 
4967 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6) 
4968     return (void*) uc
->uc_mcontext
->__ss
.__eip
; 
4969 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6) 
4970   #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__) 
4971     return (void*) uc
->uc_mcontext
->__ss
.__rip
; 
4973     return (void*) uc
->uc_mcontext
->__ss
.__eip
; 
4975 #elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */ 
4976     return (void*) uc
->uc_mcontext
.gregs
[REG_EIP
]; 
4977 #elif defined(__ia64__) /* Linux IA64 */ 
4978     return (void*) uc
->uc_mcontext
.sc_ip
; 
4984 static void segvHandler(int sig
, siginfo_t 
*info
, void *secret
) { 
4986     char **messages 
= NULL
; 
4987     int i
, trace_size 
= 0; 
4988     unsigned long offset
=0; 
4989     time_t uptime 
= time(NULL
)-server
.stat_starttime
; 
4990     ucontext_t 
*uc 
= (ucontext_t
*) secret
; 
4991     REDIS_NOTUSED(info
); 
4993     redisLog(REDIS_WARNING
, 
4994         "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION
, sig
); 
4995     redisLog(REDIS_WARNING
, "%s", sdscatprintf(sdsempty(), 
4996         "redis_version:%s; " 
4997         "uptime_in_seconds:%d; " 
4998         "connected_clients:%d; " 
4999         "connected_slaves:%d; " 
5001         "changes_since_last_save:%lld; " 
5002         "bgsave_in_progress:%d; " 
5003         "last_save_time:%d; " 
5004         "total_connections_received:%lld; " 
5005         "total_commands_processed:%lld; " 
5009         listLength(server
.clients
)-listLength(server
.slaves
), 
5010         listLength(server
.slaves
), 
5013         server
.bgsaveinprogress
, 
5015         server
.stat_numconnections
, 
5016         server
.stat_numcommands
, 
5017         server
.masterhost 
== NULL 
? "master" : "slave" 
5020     trace_size 
= backtrace(trace
, 100); 
5021     /* overwrite sigaction with caller's address */ 
5022     if (getMcontextEip(uc
) != NULL
) { 
5023         trace
[1] = getMcontextEip(uc
); 
5025     messages 
= backtrace_symbols(trace
, trace_size
); 
5027     for (i
=1; i
<trace_size
; ++i
) { 
5028         char *fn 
= findFuncName(trace
[i
], &offset
), *p
; 
5030         p 
= strchr(messages
[i
],'+'); 
5031         if (!fn 
|| (p 
&& ((unsigned long)strtol(p
+1,NULL
,10)) < offset
)) { 
5032             redisLog(REDIS_WARNING
,"%s", messages
[i
]); 
5034             redisLog(REDIS_WARNING
,"%d redis-server %p %s + %d", i
, trace
[i
], fn
, (unsigned int)offset
); 
5041 static void setupSigSegvAction(void) { 
5042     struct sigaction act
; 
5044     sigemptyset (&act
.sa_mask
); 
5045     /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction 
5046      * is used. Otherwise, sa_handler is used */ 
5047     act
.sa_flags 
= SA_NODEFER 
| SA_ONSTACK 
| SA_RESETHAND 
| SA_SIGINFO
; 
5048     act
.sa_sigaction 
= segvHandler
; 
5049     sigaction (SIGSEGV
, &act
, NULL
); 
5050     sigaction (SIGBUS
, &act
, NULL
); 
5051     sigaction (SIGFPE
, &act
, NULL
); 
5052     sigaction (SIGILL
, &act
, NULL
); 
5053     sigaction (SIGBUS
, &act
, NULL
); 
5056 #else /* HAVE_BACKTRACE */ 
5057 static void setupSigSegvAction(void) { 
5059 #endif /* HAVE_BACKTRACE */ 
5061 /* =================================== Main! ================================ */ 
5064 int linuxOvercommitMemoryValue(void) { 
5065     FILE *fp 
= fopen("/proc/sys/vm/overcommit_memory","r"); 
5069     if (fgets(buf
,64,fp
) == NULL
) { 
5078 void linuxOvercommitMemoryWarning(void) { 
5079     if (linuxOvercommitMemoryValue() == 0) { 
5080         redisLog(REDIS_WARNING
,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect."); 
5083 #endif /* __linux__ */ 
5085 static void daemonize(void) { 
5089     if (fork() != 0) exit(0); /* parent exits */ 
5090     setsid(); /* create a new session */ 
5092     /* Every output goes to /dev/null. If Redis is daemonized but 
5093      * the 'logfile' is set to 'stdout' in the configuration file 
5094      * it will not log at all. */ 
5095     if ((fd 
= open("/dev/null", O_RDWR
, 0)) != -1) { 
5096         dup2(fd
, STDIN_FILENO
); 
5097         dup2(fd
, STDOUT_FILENO
); 
5098         dup2(fd
, STDERR_FILENO
); 
5099         if (fd 
> STDERR_FILENO
) close(fd
); 
5101     /* Try to write the pid file */ 
5102     fp 
= fopen(server
.pidfile
,"w"); 
5104         fprintf(fp
,"%d\n",getpid()); 
5109 int main(int argc
, char **argv
) { 
5112         ResetServerSaveParams(); 
5113         loadServerConfig(argv
[1]); 
5114     } else if (argc 
> 2) { 
5115         fprintf(stderr
,"Usage: ./redis-server [/path/to/redis.conf]\n"); 
5118         redisLog(REDIS_WARNING
,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'"); 
5121     if (server
.daemonize
) daemonize(); 
5122     redisLog(REDIS_NOTICE
,"Server started, Redis version " REDIS_VERSION
); 
5124     linuxOvercommitMemoryWarning(); 
5126     if (rdbLoad(server
.dbfilename
) == REDIS_OK
) 
5127         redisLog(REDIS_NOTICE
,"DB loaded from disk"); 
5128     if (aeCreateFileEvent(server
.el
, server
.fd
, AE_READABLE
, 
5129         acceptHandler
, NULL
, NULL
) == AE_ERR
) oom("creating file event"); 
5130     redisLog(REDIS_NOTICE
,"The server is now ready to accept connections on port %d", server
.port
); 
5132     aeDeleteEventLoop(server
.el
);