2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
30 #define REDIS_VERSION "1.1.90"
40 #define __USE_POSIX199309
46 #endif /* HAVE_BACKTRACE */
54 #include <arpa/inet.h>
58 #include <sys/resource.h>
64 #include "solarisfixes.h"
68 #include "ae.h" /* Event driven programming library */
69 #include "sds.h" /* Dynamic safe strings */
70 #include "anet.h" /* Networking the easy way */
71 #include "dict.h" /* Hash tables */
72 #include "adlist.h" /* Linked lists */
73 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
74 #include "lzf.h" /* LZF compression library */
75 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
81 /* Static server configuration */
82 #define REDIS_SERVERPORT 6379 /* TCP port */
83 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
84 #define REDIS_IOBUF_LEN 1024
85 #define REDIS_LOADBUF_LEN 1024
86 #define REDIS_STATIC_ARGS 4
87 #define REDIS_DEFAULT_DBNUM 16
88 #define REDIS_CONFIGLINE_MAX 1024
89 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
90 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
91 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
92 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
93 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
95 /* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */
96 #define REDIS_WRITEV_THRESHOLD 3
97 /* Max number of iovecs used for each writev call */
98 #define REDIS_WRITEV_IOVEC_COUNT 256
100 /* Hash table parameters */
101 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
104 #define REDIS_CMD_BULK 1 /* Bulk write command */
105 #define REDIS_CMD_INLINE 2 /* Inline command */
106 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
107 this flags will return an error when the 'maxmemory' option is set in the
108 config file and the server is using more than maxmemory bytes of memory.
109 In short this commands are denied on low memory conditions. */
110 #define REDIS_CMD_DENYOOM 4
113 #define REDIS_STRING 0
119 /* Objects encoding */
120 #define REDIS_ENCODING_RAW 0 /* Raw representation */
121 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
123 /* Object types only used for dumping to disk */
124 #define REDIS_EXPIRETIME 253
125 #define REDIS_SELECTDB 254
126 #define REDIS_EOF 255
128 /* Defines related to the dump file format. To store 32 bits lengths for short
129 * keys requires a lot of space, so we check the most significant 2 bits of
130 * the first byte to interpreter the length:
132 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
133 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
134 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
135 * 11|000000 this means: specially encoded object will follow. The six bits
136 * number specify the kind of object that follows.
137 * See the REDIS_RDB_ENC_* defines.
139 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
140 * values, will fit inside. */
141 #define REDIS_RDB_6BITLEN 0
142 #define REDIS_RDB_14BITLEN 1
143 #define REDIS_RDB_32BITLEN 2
144 #define REDIS_RDB_ENCVAL 3
145 #define REDIS_RDB_LENERR UINT_MAX
147 /* When a length of a string object stored on disk has the first two bits
148 * set, the remaining two bits specify a special encoding for the object
149 * accordingly to the following defines: */
150 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
151 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
152 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
153 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
156 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
157 #define REDIS_SLAVE 2 /* This client is a slave server */
158 #define REDIS_MASTER 4 /* This client is a master server */
159 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
161 /* Slave replication state - slave side */
162 #define REDIS_REPL_NONE 0 /* No active replication */
163 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
164 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
166 /* Slave replication state - from the point of view of master
167 * Note that in SEND_BULK and ONLINE state the slave receives new updates
168 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
169 * to start the next background saving in order to send updates to it. */
170 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
171 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
172 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
173 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
175 /* List related stuff */
179 /* Sort operations */
180 #define REDIS_SORT_GET 0
181 #define REDIS_SORT_ASC 1
182 #define REDIS_SORT_DESC 2
183 #define REDIS_SORTKEY_MAX 1024
186 #define REDIS_DEBUG 0
187 #define REDIS_NOTICE 1
188 #define REDIS_WARNING 2
190 /* Anti-warning macro... */
191 #define REDIS_NOTUSED(V) ((void) V)
193 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
194 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
196 /* Append only defines */
197 #define APPENDFSYNC_NO 0
198 #define APPENDFSYNC_ALWAYS 1
199 #define APPENDFSYNC_EVERYSEC 2
201 /* We can print the stacktrace, so our assert is defined this way: */
202 #define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e),exit(1)))
203 static void _redisAssert(char *estr
);
205 /*================================= Data types ============================== */
207 /* A redis object, that is a type able to hold a string / list / set */
208 typedef struct redisObject
{
211 unsigned char encoding
;
212 unsigned char notused
[2];
216 /* Macro used to initalize a Redis object allocated on the stack.
217 * Note that this macro is taken near the structure definition to make sure
218 * we'll update it when the structure is changed, to avoid bugs like
219 * bug #85 introduced exactly in this way. */
220 #define initStaticStringObject(_var,_ptr) do { \
222 _var.type = REDIS_STRING; \
223 _var.encoding = REDIS_ENCODING_RAW; \
227 typedef struct redisDb
{
233 /* With multiplexing we need to take per-clinet state.
234 * Clients are taken in a liked list. */
235 typedef struct redisClient
{
240 robj
**argv
, **mbargv
;
242 int bulklen
; /* bulk read len. -1 if not in bulk read mode */
243 int multibulk
; /* multi bulk command format active */
246 time_t lastinteraction
; /* time of the last interaction, used for timeout */
247 int flags
; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
248 int slaveseldb
; /* slave selected db, if this client is a slave */
249 int authenticated
; /* when requirepass is non-NULL */
250 int replstate
; /* replication state if this is a slave */
251 int repldbfd
; /* replication DB file descriptor */
252 long repldboff
; /* replication DB file offset */
253 off_t repldbsize
; /* replication DB file size */
261 /* Global server state structure */
267 unsigned int sharingpoolsize
;
268 long long dirty
; /* changes to DB from the last save */
270 list
*slaves
, *monitors
;
271 char neterr
[ANET_ERR_LEN
];
273 int cronloops
; /* number of times the cron function run */
274 list
*objfreelist
; /* A list of freed objects to avoid malloc() */
275 time_t lastsave
; /* Unix time of last save succeeede */
276 size_t usedmemory
; /* Used memory in megabytes */
277 /* Fields used only for stats */
278 time_t stat_starttime
; /* server start time */
279 long long stat_numcommands
; /* number of processed commands */
280 long long stat_numconnections
; /* number of connections received */
293 pid_t bgsavechildpid
;
294 pid_t bgrewritechildpid
;
295 sds bgrewritebuf
; /* buffer taken by parent during oppend only rewrite */
296 struct saveparam
*saveparams
;
301 char *appendfilename
;
304 /* Replication related */
309 redisClient
*master
; /* client that is master for this slave */
311 unsigned int maxclients
;
312 unsigned long maxmemory
;
313 /* Sort parameters - qsort_r() is only available under BSD so we
314 * have to take this state global, in order to pass it to sortCompare() */
320 typedef void redisCommandProc(redisClient
*c
);
321 struct redisCommand
{
323 redisCommandProc
*proc
;
328 struct redisFunctionSym
{
330 unsigned long pointer
;
333 typedef struct _redisSortObject
{
341 typedef struct _redisSortOperation
{
344 } redisSortOperation
;
346 /* ZSETs use a specialized version of Skiplists */
348 typedef struct zskiplistNode
{
349 struct zskiplistNode
**forward
;
350 struct zskiplistNode
*backward
;
355 typedef struct zskiplist
{
356 struct zskiplistNode
*header
, *tail
;
357 unsigned long length
;
361 typedef struct zset
{
366 /* Our shared "common" objects */
368 struct sharedObjectsStruct
{
369 robj
*crlf
, *ok
, *err
, *emptybulk
, *czero
, *cone
, *pong
, *space
,
370 *colon
, *nullbulk
, *nullmultibulk
,
371 *emptymultibulk
, *wrongtypeerr
, *nokeyerr
, *syntaxerr
, *sameobjecterr
,
372 *outofrangeerr
, *plus
,
373 *select0
, *select1
, *select2
, *select3
, *select4
,
374 *select5
, *select6
, *select7
, *select8
, *select9
;
377 /* Global vars that are actally used as constants. The following double
378 * values are used for double on-disk serialization, and are initialized
379 * at runtime to avoid strange compiler optimizations. */
381 static double R_Zero
, R_PosInf
, R_NegInf
, R_Nan
;
383 /*================================ Prototypes =============================== */
385 static void freeStringObject(robj
*o
);
386 static void freeListObject(robj
*o
);
387 static void freeSetObject(robj
*o
);
388 static void decrRefCount(void *o
);
389 static robj
*createObject(int type
, void *ptr
);
390 static void freeClient(redisClient
*c
);
391 static int rdbLoad(char *filename
);
392 static void addReply(redisClient
*c
, robj
*obj
);
393 static void addReplySds(redisClient
*c
, sds s
);
394 static void incrRefCount(robj
*o
);
395 static int rdbSaveBackground(char *filename
);
396 static robj
*createStringObject(char *ptr
, size_t len
);
397 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
);
398 static void feedAppendOnlyFile(struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
);
399 static int syncWithMaster(void);
400 static robj
*tryObjectSharing(robj
*o
);
401 static int tryObjectEncoding(robj
*o
);
402 static robj
*getDecodedObject(robj
*o
);
403 static int removeExpire(redisDb
*db
, robj
*key
);
404 static int expireIfNeeded(redisDb
*db
, robj
*key
);
405 static int deleteIfVolatile(redisDb
*db
, robj
*key
);
406 static int deleteKey(redisDb
*db
, robj
*key
);
407 static time_t getExpire(redisDb
*db
, robj
*key
);
408 static int setExpire(redisDb
*db
, robj
*key
, time_t when
);
409 static void updateSlavesWaitingBgsave(int bgsaveerr
);
410 static void freeMemoryIfNeeded(void);
411 static int processCommand(redisClient
*c
);
412 static void setupSigSegvAction(void);
413 static void rdbRemoveTempFile(pid_t childpid
);
414 static void aofRemoveTempFile(pid_t childpid
);
415 static size_t stringObjectLen(robj
*o
);
416 static void processInputBuffer(redisClient
*c
);
417 static zskiplist
*zslCreate(void);
418 static void zslFree(zskiplist
*zsl
);
419 static void zslInsert(zskiplist
*zsl
, double score
, robj
*obj
);
420 static void sendReplyToClientWritev(aeEventLoop
*el
, int fd
, void *privdata
, int mask
);
422 static void authCommand(redisClient
*c
);
423 static void pingCommand(redisClient
*c
);
424 static void echoCommand(redisClient
*c
);
425 static void setCommand(redisClient
*c
);
426 static void setnxCommand(redisClient
*c
);
427 static void getCommand(redisClient
*c
);
428 static void delCommand(redisClient
*c
);
429 static void existsCommand(redisClient
*c
);
430 static void incrCommand(redisClient
*c
);
431 static void decrCommand(redisClient
*c
);
432 static void incrbyCommand(redisClient
*c
);
433 static void decrbyCommand(redisClient
*c
);
434 static void selectCommand(redisClient
*c
);
435 static void randomkeyCommand(redisClient
*c
);
436 static void keysCommand(redisClient
*c
);
437 static void dbsizeCommand(redisClient
*c
);
438 static void lastsaveCommand(redisClient
*c
);
439 static void saveCommand(redisClient
*c
);
440 static void bgsaveCommand(redisClient
*c
);
441 static void bgrewriteaofCommand(redisClient
*c
);
442 static void shutdownCommand(redisClient
*c
);
443 static void moveCommand(redisClient
*c
);
444 static void renameCommand(redisClient
*c
);
445 static void renamenxCommand(redisClient
*c
);
446 static void lpushCommand(redisClient
*c
);
447 static void rpushCommand(redisClient
*c
);
448 static void lpopCommand(redisClient
*c
);
449 static void rpopCommand(redisClient
*c
);
450 static void llenCommand(redisClient
*c
);
451 static void lindexCommand(redisClient
*c
);
452 static void lrangeCommand(redisClient
*c
);
453 static void ltrimCommand(redisClient
*c
);
454 static void typeCommand(redisClient
*c
);
455 static void lsetCommand(redisClient
*c
);
456 static void saddCommand(redisClient
*c
);
457 static void sremCommand(redisClient
*c
);
458 static void smoveCommand(redisClient
*c
);
459 static void sismemberCommand(redisClient
*c
);
460 static void scardCommand(redisClient
*c
);
461 static void spopCommand(redisClient
*c
);
462 static void srandmemberCommand(redisClient
*c
);
463 static void sinterCommand(redisClient
*c
);
464 static void sinterstoreCommand(redisClient
*c
);
465 static void sunionCommand(redisClient
*c
);
466 static void sunionstoreCommand(redisClient
*c
);
467 static void sdiffCommand(redisClient
*c
);
468 static void sdiffstoreCommand(redisClient
*c
);
469 static void syncCommand(redisClient
*c
);
470 static void flushdbCommand(redisClient
*c
);
471 static void flushallCommand(redisClient
*c
);
472 static void sortCommand(redisClient
*c
);
473 static void lremCommand(redisClient
*c
);
474 static void rpoplpushcommand(redisClient
*c
);
475 static void infoCommand(redisClient
*c
);
476 static void mgetCommand(redisClient
*c
);
477 static void monitorCommand(redisClient
*c
);
478 static void expireCommand(redisClient
*c
);
479 static void expireatCommand(redisClient
*c
);
480 static void getsetCommand(redisClient
*c
);
481 static void ttlCommand(redisClient
*c
);
482 static void slaveofCommand(redisClient
*c
);
483 static void debugCommand(redisClient
*c
);
484 static void msetCommand(redisClient
*c
);
485 static void msetnxCommand(redisClient
*c
);
486 static void zaddCommand(redisClient
*c
);
487 static void zincrbyCommand(redisClient
*c
);
488 static void zrangeCommand(redisClient
*c
);
489 static void zrangebyscoreCommand(redisClient
*c
);
490 static void zrevrangeCommand(redisClient
*c
);
491 static void zcardCommand(redisClient
*c
);
492 static void zremCommand(redisClient
*c
);
493 static void zscoreCommand(redisClient
*c
);
494 static void zremrangebyscoreCommand(redisClient
*c
);
496 /*================================= Globals ================================= */
499 static struct redisServer server
; /* server global state */
500 static struct redisCommand cmdTable
[] = {
501 {"get",getCommand
,2,REDIS_CMD_INLINE
},
502 {"set",setCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
503 {"setnx",setnxCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
504 {"del",delCommand
,-2,REDIS_CMD_INLINE
},
505 {"exists",existsCommand
,2,REDIS_CMD_INLINE
},
506 {"incr",incrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
507 {"decr",decrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
508 {"mget",mgetCommand
,-2,REDIS_CMD_INLINE
},
509 {"rpush",rpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
510 {"lpush",lpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
511 {"rpop",rpopCommand
,2,REDIS_CMD_INLINE
},
512 {"lpop",lpopCommand
,2,REDIS_CMD_INLINE
},
513 {"llen",llenCommand
,2,REDIS_CMD_INLINE
},
514 {"lindex",lindexCommand
,3,REDIS_CMD_INLINE
},
515 {"lset",lsetCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
516 {"lrange",lrangeCommand
,4,REDIS_CMD_INLINE
},
517 {"ltrim",ltrimCommand
,4,REDIS_CMD_INLINE
},
518 {"lrem",lremCommand
,4,REDIS_CMD_BULK
},
519 {"rpoplpush",rpoplpushcommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
520 {"sadd",saddCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
521 {"srem",sremCommand
,3,REDIS_CMD_BULK
},
522 {"smove",smoveCommand
,4,REDIS_CMD_BULK
},
523 {"sismember",sismemberCommand
,3,REDIS_CMD_BULK
},
524 {"scard",scardCommand
,2,REDIS_CMD_INLINE
},
525 {"spop",spopCommand
,2,REDIS_CMD_INLINE
},
526 {"srandmember",srandmemberCommand
,2,REDIS_CMD_INLINE
},
527 {"sinter",sinterCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
528 {"sinterstore",sinterstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
529 {"sunion",sunionCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
530 {"sunionstore",sunionstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
531 {"sdiff",sdiffCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
532 {"sdiffstore",sdiffstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
533 {"smembers",sinterCommand
,2,REDIS_CMD_INLINE
},
534 {"zadd",zaddCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
535 {"zincrby",zincrbyCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
536 {"zrem",zremCommand
,3,REDIS_CMD_BULK
},
537 {"zremrangebyscore",zremrangebyscoreCommand
,4,REDIS_CMD_INLINE
},
538 {"zrange",zrangeCommand
,4,REDIS_CMD_INLINE
},
539 {"zrangebyscore",zrangebyscoreCommand
,-4,REDIS_CMD_INLINE
},
540 {"zrevrange",zrevrangeCommand
,4,REDIS_CMD_INLINE
},
541 {"zcard",zcardCommand
,2,REDIS_CMD_INLINE
},
542 {"zscore",zscoreCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
543 {"incrby",incrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
544 {"decrby",decrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
545 {"getset",getsetCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
546 {"mset",msetCommand
,-3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
547 {"msetnx",msetnxCommand
,-3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
548 {"randomkey",randomkeyCommand
,1,REDIS_CMD_INLINE
},
549 {"select",selectCommand
,2,REDIS_CMD_INLINE
},
550 {"move",moveCommand
,3,REDIS_CMD_INLINE
},
551 {"rename",renameCommand
,3,REDIS_CMD_INLINE
},
552 {"renamenx",renamenxCommand
,3,REDIS_CMD_INLINE
},
553 {"expire",expireCommand
,3,REDIS_CMD_INLINE
},
554 {"expireat",expireatCommand
,3,REDIS_CMD_INLINE
},
555 {"keys",keysCommand
,2,REDIS_CMD_INLINE
},
556 {"dbsize",dbsizeCommand
,1,REDIS_CMD_INLINE
},
557 {"auth",authCommand
,2,REDIS_CMD_INLINE
},
558 {"ping",pingCommand
,1,REDIS_CMD_INLINE
},
559 {"echo",echoCommand
,2,REDIS_CMD_BULK
},
560 {"save",saveCommand
,1,REDIS_CMD_INLINE
},
561 {"bgsave",bgsaveCommand
,1,REDIS_CMD_INLINE
},
562 {"bgrewriteaof",bgrewriteaofCommand
,1,REDIS_CMD_INLINE
},
563 {"shutdown",shutdownCommand
,1,REDIS_CMD_INLINE
},
564 {"lastsave",lastsaveCommand
,1,REDIS_CMD_INLINE
},
565 {"type",typeCommand
,2,REDIS_CMD_INLINE
},
566 {"sync",syncCommand
,1,REDIS_CMD_INLINE
},
567 {"flushdb",flushdbCommand
,1,REDIS_CMD_INLINE
},
568 {"flushall",flushallCommand
,1,REDIS_CMD_INLINE
},
569 {"sort",sortCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
570 {"info",infoCommand
,1,REDIS_CMD_INLINE
},
571 {"monitor",monitorCommand
,1,REDIS_CMD_INLINE
},
572 {"ttl",ttlCommand
,2,REDIS_CMD_INLINE
},
573 {"slaveof",slaveofCommand
,3,REDIS_CMD_INLINE
},
574 {"debug",debugCommand
,-2,REDIS_CMD_INLINE
},
578 /*============================ Utility functions ============================ */
580 /* Glob-style pattern matching. */
581 int stringmatchlen(const char *pattern
, int patternLen
,
582 const char *string
, int stringLen
, int nocase
)
587 while (pattern
[1] == '*') {
592 return 1; /* match */
594 if (stringmatchlen(pattern
+1, patternLen
-1,
595 string
, stringLen
, nocase
))
596 return 1; /* match */
600 return 0; /* no match */
604 return 0; /* no match */
614 not = pattern
[0] == '^';
621 if (pattern
[0] == '\\') {
624 if (pattern
[0] == string
[0])
626 } else if (pattern
[0] == ']') {
628 } else if (patternLen
== 0) {
632 } else if (pattern
[1] == '-' && patternLen
>= 3) {
633 int start
= pattern
[0];
634 int end
= pattern
[2];
642 start
= tolower(start
);
648 if (c
>= start
&& c
<= end
)
652 if (pattern
[0] == string
[0])
655 if (tolower((int)pattern
[0]) == tolower((int)string
[0]))
665 return 0; /* no match */
671 if (patternLen
>= 2) {
678 if (pattern
[0] != string
[0])
679 return 0; /* no match */
681 if (tolower((int)pattern
[0]) != tolower((int)string
[0]))
682 return 0; /* no match */
690 if (stringLen
== 0) {
691 while(*pattern
== '*') {
698 if (patternLen
== 0 && stringLen
== 0)
703 static void redisLog(int level
, const char *fmt
, ...) {
707 fp
= (server
.logfile
== NULL
) ? stdout
: fopen(server
.logfile
,"a");
711 if (level
>= server
.verbosity
) {
717 strftime(buf
,64,"%d %b %H:%M:%S",localtime(&now
));
718 fprintf(fp
,"%s %c ",buf
,c
[level
]);
719 vfprintf(fp
, fmt
, ap
);
725 if (server
.logfile
) fclose(fp
);
728 /*====================== Hash table type implementation ==================== */
730 /* This is an hash table type that uses the SDS dynamic strings libary as
731 * keys and radis objects as values (objects can hold SDS strings,
734 static void dictVanillaFree(void *privdata
, void *val
)
736 DICT_NOTUSED(privdata
);
740 static int sdsDictKeyCompare(void *privdata
, const void *key1
,
744 DICT_NOTUSED(privdata
);
746 l1
= sdslen((sds
)key1
);
747 l2
= sdslen((sds
)key2
);
748 if (l1
!= l2
) return 0;
749 return memcmp(key1
, key2
, l1
) == 0;
752 static void dictRedisObjectDestructor(void *privdata
, void *val
)
754 DICT_NOTUSED(privdata
);
759 static int dictObjKeyCompare(void *privdata
, const void *key1
,
762 const robj
*o1
= key1
, *o2
= key2
;
763 return sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
766 static unsigned int dictObjHash(const void *key
) {
768 return dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
771 static int dictEncObjKeyCompare(void *privdata
, const void *key1
,
774 robj
*o1
= (robj
*) key1
, *o2
= (robj
*) key2
;
777 o1
= getDecodedObject(o1
);
778 o2
= getDecodedObject(o2
);
779 cmp
= sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
785 static unsigned int dictEncObjHash(const void *key
) {
786 robj
*o
= (robj
*) key
;
788 o
= getDecodedObject(o
);
789 unsigned int hash
= dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
794 static dictType setDictType
= {
795 dictEncObjHash
, /* hash function */
798 dictEncObjKeyCompare
, /* key compare */
799 dictRedisObjectDestructor
, /* key destructor */
800 NULL
/* val destructor */
803 static dictType zsetDictType
= {
804 dictEncObjHash
, /* hash function */
807 dictEncObjKeyCompare
, /* key compare */
808 dictRedisObjectDestructor
, /* key destructor */
809 dictVanillaFree
/* val destructor of malloc(sizeof(double)) */
812 static dictType hashDictType
= {
813 dictObjHash
, /* hash function */
816 dictObjKeyCompare
, /* key compare */
817 dictRedisObjectDestructor
, /* key destructor */
818 dictRedisObjectDestructor
/* val destructor */
821 /* ========================= Random utility functions ======================= */
823 /* Redis generally does not try to recover from out of memory conditions
824 * when allocating objects or strings, it is not clear if it will be possible
825 * to report this condition to the client since the networking layer itself
826 * is based on heap allocation for send buffers, so we simply abort.
827 * At least the code will be simpler to read... */
828 static void oom(const char *msg
) {
829 redisLog(REDIS_WARNING
, "%s: Out of memory\n",msg
);
834 /* ====================== Redis server networking stuff ===================== */
835 static void closeTimedoutClients(void) {
838 time_t now
= time(NULL
);
840 listRewind(server
.clients
);
841 while ((ln
= listYield(server
.clients
)) != NULL
) {
842 c
= listNodeValue(ln
);
843 if (!(c
->flags
& REDIS_SLAVE
) && /* no timeout for slaves */
844 !(c
->flags
& REDIS_MASTER
) && /* no timeout for masters */
845 (now
- c
->lastinteraction
> server
.maxidletime
)) {
846 redisLog(REDIS_DEBUG
,"Closing idle client");
852 static int htNeedsResize(dict
*dict
) {
853 long long size
, used
;
855 size
= dictSlots(dict
);
856 used
= dictSize(dict
);
857 return (size
&& used
&& size
> DICT_HT_INITIAL_SIZE
&&
858 (used
*100/size
< REDIS_HT_MINFILL
));
861 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
862 * we resize the hash table to save memory */
863 static void tryResizeHashTables(void) {
866 for (j
= 0; j
< server
.dbnum
; j
++) {
867 if (htNeedsResize(server
.db
[j
].dict
)) {
868 redisLog(REDIS_DEBUG
,"The hash table %d is too sparse, resize it...",j
);
869 dictResize(server
.db
[j
].dict
);
870 redisLog(REDIS_DEBUG
,"Hash table %d resized.",j
);
872 if (htNeedsResize(server
.db
[j
].expires
))
873 dictResize(server
.db
[j
].expires
);
877 /* A background saving child (BGSAVE) terminated its work. Handle this. */
878 void backgroundSaveDoneHandler(int statloc
) {
879 int exitcode
= WEXITSTATUS(statloc
);
880 int bysignal
= WIFSIGNALED(statloc
);
882 if (!bysignal
&& exitcode
== 0) {
883 redisLog(REDIS_NOTICE
,
884 "Background saving terminated with success");
886 server
.lastsave
= time(NULL
);
887 } else if (!bysignal
&& exitcode
!= 0) {
888 redisLog(REDIS_WARNING
, "Background saving error");
890 redisLog(REDIS_WARNING
,
891 "Background saving terminated by signal");
892 rdbRemoveTempFile(server
.bgsavechildpid
);
894 server
.bgsavechildpid
= -1;
895 /* Possibly there are slaves waiting for a BGSAVE in order to be served
896 * (the first stage of SYNC is a bulk transfer of dump.rdb) */
897 updateSlavesWaitingBgsave(exitcode
== 0 ? REDIS_OK
: REDIS_ERR
);
900 /* A background append only file rewriting (BGREWRITEAOF) terminated its work.
902 void backgroundRewriteDoneHandler(int statloc
) {
903 int exitcode
= WEXITSTATUS(statloc
);
904 int bysignal
= WIFSIGNALED(statloc
);
906 if (!bysignal
&& exitcode
== 0) {
910 redisLog(REDIS_NOTICE
,
911 "Background append only file rewriting terminated with success");
912 /* Now it's time to flush the differences accumulated by the parent */
913 snprintf(tmpfile
,256,"temp-rewriteaof-bg-%d.aof", (int) server
.bgrewritechildpid
);
914 fd
= open(tmpfile
,O_WRONLY
|O_APPEND
);
916 redisLog(REDIS_WARNING
, "Not able to open the temp append only file produced by the child: %s", strerror(errno
));
919 /* Flush our data... */
920 if (write(fd
,server
.bgrewritebuf
,sdslen(server
.bgrewritebuf
)) !=
921 (signed) sdslen(server
.bgrewritebuf
)) {
922 redisLog(REDIS_WARNING
, "Error or short write trying to flush the parent diff of the append log file in the child temp file: %s", strerror(errno
));
926 redisLog(REDIS_WARNING
,"Parent diff flushed into the new append log file with success");
927 /* Now our work is to rename the temp file into the stable file. And
928 * switch the file descriptor used by the server for append only. */
929 if (rename(tmpfile
,server
.appendfilename
) == -1) {
930 redisLog(REDIS_WARNING
,"Can't rename the temp append only file into the stable one: %s", strerror(errno
));
934 /* Mission completed... almost */
935 redisLog(REDIS_NOTICE
,"Append only file successfully rewritten.");
936 if (server
.appendfd
!= -1) {
937 /* If append only is actually enabled... */
938 close(server
.appendfd
);
939 server
.appendfd
= fd
;
941 server
.appendseldb
= -1; /* Make sure it will issue SELECT */
942 redisLog(REDIS_NOTICE
,"The new append only file was selected for future appends.");
944 /* If append only is disabled we just generate a dump in this
945 * format. Why not? */
948 } else if (!bysignal
&& exitcode
!= 0) {
949 redisLog(REDIS_WARNING
, "Background append only file rewriting error");
951 redisLog(REDIS_WARNING
,
952 "Background append only file rewriting terminated by signal");
955 sdsfree(server
.bgrewritebuf
);
956 server
.bgrewritebuf
= sdsempty();
957 aofRemoveTempFile(server
.bgrewritechildpid
);
958 server
.bgrewritechildpid
= -1;
961 static int serverCron(struct aeEventLoop
*eventLoop
, long long id
, void *clientData
) {
962 int j
, loops
= server
.cronloops
++;
963 REDIS_NOTUSED(eventLoop
);
965 REDIS_NOTUSED(clientData
);
967 /* Update the global state with the amount of used memory */
968 server
.usedmemory
= zmalloc_used_memory();
970 /* Show some info about non-empty databases */
971 for (j
= 0; j
< server
.dbnum
; j
++) {
972 long long size
, used
, vkeys
;
974 size
= dictSlots(server
.db
[j
].dict
);
975 used
= dictSize(server
.db
[j
].dict
);
976 vkeys
= dictSize(server
.db
[j
].expires
);
977 if (!(loops
% 5) && (used
|| vkeys
)) {
978 redisLog(REDIS_DEBUG
,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j
,used
,vkeys
,size
);
979 /* dictPrintStats(server.dict); */
983 /* We don't want to resize the hash tables while a bacground saving
984 * is in progress: the saving child is created using fork() that is
985 * implemented with a copy-on-write semantic in most modern systems, so
986 * if we resize the HT while there is the saving child at work actually
987 * a lot of memory movements in the parent will cause a lot of pages
989 if (server
.bgsavechildpid
== -1) tryResizeHashTables();
991 /* Show information about connected clients */
993 redisLog(REDIS_DEBUG
,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
994 listLength(server
.clients
)-listLength(server
.slaves
),
995 listLength(server
.slaves
),
997 dictSize(server
.sharingpool
));
1000 /* Close connections of timedout clients */
1001 if (server
.maxidletime
&& !(loops
% 10))
1002 closeTimedoutClients();
1004 /* Check if a background saving or AOF rewrite in progress terminated */
1005 if (server
.bgsavechildpid
!= -1 || server
.bgrewritechildpid
!= -1) {
1009 if ((pid
= wait3(&statloc
,WNOHANG
,NULL
)) != 0) {
1010 if (pid
== server
.bgsavechildpid
) {
1011 backgroundSaveDoneHandler(statloc
);
1013 backgroundRewriteDoneHandler(statloc
);
1017 /* If there is not a background saving in progress check if
1018 * we have to save now */
1019 time_t now
= time(NULL
);
1020 for (j
= 0; j
< server
.saveparamslen
; j
++) {
1021 struct saveparam
*sp
= server
.saveparams
+j
;
1023 if (server
.dirty
>= sp
->changes
&&
1024 now
-server
.lastsave
> sp
->seconds
) {
1025 redisLog(REDIS_NOTICE
,"%d changes in %d seconds. Saving...",
1026 sp
->changes
, sp
->seconds
);
1027 rdbSaveBackground(server
.dbfilename
);
1033 /* Try to expire a few timed out keys. The algorithm used is adaptive and
1034 * will use few CPU cycles if there are few expiring keys, otherwise
1035 * it will get more aggressive to avoid that too much memory is used by
1036 * keys that can be removed from the keyspace. */
1037 for (j
= 0; j
< server
.dbnum
; j
++) {
1039 redisDb
*db
= server
.db
+j
;
1041 /* Continue to expire if at the end of the cycle more than 25%
1042 * of the keys were expired. */
1044 int num
= dictSize(db
->expires
);
1045 time_t now
= time(NULL
);
1048 if (num
> REDIS_EXPIRELOOKUPS_PER_CRON
)
1049 num
= REDIS_EXPIRELOOKUPS_PER_CRON
;
1054 if ((de
= dictGetRandomKey(db
->expires
)) == NULL
) break;
1055 t
= (time_t) dictGetEntryVal(de
);
1057 deleteKey(db
,dictGetEntryKey(de
));
1061 } while (expired
> REDIS_EXPIRELOOKUPS_PER_CRON
/4);
1064 /* Check if we should connect to a MASTER */
1065 if (server
.replstate
== REDIS_REPL_CONNECT
) {
1066 redisLog(REDIS_NOTICE
,"Connecting to MASTER...");
1067 if (syncWithMaster() == REDIS_OK
) {
1068 redisLog(REDIS_NOTICE
,"MASTER <-> SLAVE sync succeeded");
1074 static void createSharedObjects(void) {
1075 shared
.crlf
= createObject(REDIS_STRING
,sdsnew("\r\n"));
1076 shared
.ok
= createObject(REDIS_STRING
,sdsnew("+OK\r\n"));
1077 shared
.err
= createObject(REDIS_STRING
,sdsnew("-ERR\r\n"));
1078 shared
.emptybulk
= createObject(REDIS_STRING
,sdsnew("$0\r\n\r\n"));
1079 shared
.czero
= createObject(REDIS_STRING
,sdsnew(":0\r\n"));
1080 shared
.cone
= createObject(REDIS_STRING
,sdsnew(":1\r\n"));
1081 shared
.nullbulk
= createObject(REDIS_STRING
,sdsnew("$-1\r\n"));
1082 shared
.nullmultibulk
= createObject(REDIS_STRING
,sdsnew("*-1\r\n"));
1083 shared
.emptymultibulk
= createObject(REDIS_STRING
,sdsnew("*0\r\n"));
1085 shared
.pong
= createObject(REDIS_STRING
,sdsnew("+PONG\r\n"));
1086 shared
.wrongtypeerr
= createObject(REDIS_STRING
,sdsnew(
1087 "-ERR Operation against a key holding the wrong kind of value\r\n"));
1088 shared
.nokeyerr
= createObject(REDIS_STRING
,sdsnew(
1089 "-ERR no such key\r\n"));
1090 shared
.syntaxerr
= createObject(REDIS_STRING
,sdsnew(
1091 "-ERR syntax error\r\n"));
1092 shared
.sameobjecterr
= createObject(REDIS_STRING
,sdsnew(
1093 "-ERR source and destination objects are the same\r\n"));
1094 shared
.outofrangeerr
= createObject(REDIS_STRING
,sdsnew(
1095 "-ERR index out of range\r\n"));
1096 shared
.space
= createObject(REDIS_STRING
,sdsnew(" "));
1097 shared
.colon
= createObject(REDIS_STRING
,sdsnew(":"));
1098 shared
.plus
= createObject(REDIS_STRING
,sdsnew("+"));
1099 shared
.select0
= createStringObject("select 0\r\n",10);
1100 shared
.select1
= createStringObject("select 1\r\n",10);
1101 shared
.select2
= createStringObject("select 2\r\n",10);
1102 shared
.select3
= createStringObject("select 3\r\n",10);
1103 shared
.select4
= createStringObject("select 4\r\n",10);
1104 shared
.select5
= createStringObject("select 5\r\n",10);
1105 shared
.select6
= createStringObject("select 6\r\n",10);
1106 shared
.select7
= createStringObject("select 7\r\n",10);
1107 shared
.select8
= createStringObject("select 8\r\n",10);
1108 shared
.select9
= createStringObject("select 9\r\n",10);
1111 static void appendServerSaveParams(time_t seconds
, int changes
) {
1112 server
.saveparams
= zrealloc(server
.saveparams
,sizeof(struct saveparam
)*(server
.saveparamslen
+1));
1113 server
.saveparams
[server
.saveparamslen
].seconds
= seconds
;
1114 server
.saveparams
[server
.saveparamslen
].changes
= changes
;
1115 server
.saveparamslen
++;
1118 static void resetServerSaveParams() {
1119 zfree(server
.saveparams
);
1120 server
.saveparams
= NULL
;
1121 server
.saveparamslen
= 0;
1124 static void initServerConfig() {
1125 server
.dbnum
= REDIS_DEFAULT_DBNUM
;
1126 server
.port
= REDIS_SERVERPORT
;
1127 server
.verbosity
= REDIS_DEBUG
;
1128 server
.maxidletime
= REDIS_MAXIDLETIME
;
1129 server
.saveparams
= NULL
;
1130 server
.logfile
= NULL
; /* NULL = log on standard output */
1131 server
.bindaddr
= NULL
;
1132 server
.glueoutputbuf
= 1;
1133 server
.daemonize
= 0;
1134 server
.appendonly
= 0;
1135 server
.appendfsync
= APPENDFSYNC_ALWAYS
;
1136 server
.lastfsync
= time(NULL
);
1137 server
.appendfd
= -1;
1138 server
.appendseldb
= -1; /* Make sure the first time will not match */
1139 server
.pidfile
= "/var/run/redis.pid";
1140 server
.dbfilename
= "dump.rdb";
1141 server
.appendfilename
= "appendonly.aof";
1142 server
.requirepass
= NULL
;
1143 server
.shareobjects
= 0;
1144 server
.sharingpoolsize
= 1024;
1145 server
.maxclients
= 0;
1146 server
.maxmemory
= 0;
1147 resetServerSaveParams();
1149 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1150 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1151 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1152 /* Replication related */
1154 server
.masterauth
= NULL
;
1155 server
.masterhost
= NULL
;
1156 server
.masterport
= 6379;
1157 server
.master
= NULL
;
1158 server
.replstate
= REDIS_REPL_NONE
;
1160 /* Double constants initialization */
1162 R_PosInf
= 1.0/R_Zero
;
1163 R_NegInf
= -1.0/R_Zero
;
1164 R_Nan
= R_Zero
/R_Zero
;
1167 static void initServer() {
1170 signal(SIGHUP
, SIG_IGN
);
1171 signal(SIGPIPE
, SIG_IGN
);
1172 setupSigSegvAction();
1174 server
.clients
= listCreate();
1175 server
.slaves
= listCreate();
1176 server
.monitors
= listCreate();
1177 server
.objfreelist
= listCreate();
1178 createSharedObjects();
1179 server
.el
= aeCreateEventLoop();
1180 server
.db
= zmalloc(sizeof(redisDb
)*server
.dbnum
);
1181 server
.sharingpool
= dictCreate(&setDictType
,NULL
);
1182 server
.fd
= anetTcpServer(server
.neterr
, server
.port
, server
.bindaddr
);
1183 if (server
.fd
== -1) {
1184 redisLog(REDIS_WARNING
, "Opening TCP port: %s", server
.neterr
);
1187 for (j
= 0; j
< server
.dbnum
; j
++) {
1188 server
.db
[j
].dict
= dictCreate(&hashDictType
,NULL
);
1189 server
.db
[j
].expires
= dictCreate(&setDictType
,NULL
);
1190 server
.db
[j
].id
= j
;
1192 server
.cronloops
= 0;
1193 server
.bgsavechildpid
= -1;
1194 server
.bgrewritechildpid
= -1;
1195 server
.bgrewritebuf
= sdsempty();
1196 server
.lastsave
= time(NULL
);
1198 server
.usedmemory
= 0;
1199 server
.stat_numcommands
= 0;
1200 server
.stat_numconnections
= 0;
1201 server
.stat_starttime
= time(NULL
);
1202 aeCreateTimeEvent(server
.el
, 1, serverCron
, NULL
, NULL
);
1204 if (server
.appendonly
) {
1205 server
.appendfd
= open(server
.appendfilename
,O_WRONLY
|O_APPEND
|O_CREAT
,0644);
1206 if (server
.appendfd
== -1) {
1207 redisLog(REDIS_WARNING
, "Can't open the append-only file: %s",
1214 /* Empty the whole database */
1215 static long long emptyDb() {
1217 long long removed
= 0;
1219 for (j
= 0; j
< server
.dbnum
; j
++) {
1220 removed
+= dictSize(server
.db
[j
].dict
);
1221 dictEmpty(server
.db
[j
].dict
);
1222 dictEmpty(server
.db
[j
].expires
);
1227 static int yesnotoi(char *s
) {
1228 if (!strcasecmp(s
,"yes")) return 1;
1229 else if (!strcasecmp(s
,"no")) return 0;
1233 /* I agree, this is a very rudimental way to load a configuration...
1234 will improve later if the config gets more complex */
1235 static void loadServerConfig(char *filename
) {
1237 char buf
[REDIS_CONFIGLINE_MAX
+1], *err
= NULL
;
1241 if (filename
[0] == '-' && filename
[1] == '\0')
1244 if ((fp
= fopen(filename
,"r")) == NULL
) {
1245 redisLog(REDIS_WARNING
,"Fatal error, can't open config file");
1250 while(fgets(buf
,REDIS_CONFIGLINE_MAX
+1,fp
) != NULL
) {
1256 line
= sdstrim(line
," \t\r\n");
1258 /* Skip comments and blank lines*/
1259 if (line
[0] == '#' || line
[0] == '\0') {
1264 /* Split into arguments */
1265 argv
= sdssplitlen(line
,sdslen(line
)," ",1,&argc
);
1266 sdstolower(argv
[0]);
1268 /* Execute config directives */
1269 if (!strcasecmp(argv
[0],"timeout") && argc
== 2) {
1270 server
.maxidletime
= atoi(argv
[1]);
1271 if (server
.maxidletime
< 0) {
1272 err
= "Invalid timeout value"; goto loaderr
;
1274 } else if (!strcasecmp(argv
[0],"port") && argc
== 2) {
1275 server
.port
= atoi(argv
[1]);
1276 if (server
.port
< 1 || server
.port
> 65535) {
1277 err
= "Invalid port"; goto loaderr
;
1279 } else if (!strcasecmp(argv
[0],"bind") && argc
== 2) {
1280 server
.bindaddr
= zstrdup(argv
[1]);
1281 } else if (!strcasecmp(argv
[0],"save") && argc
== 3) {
1282 int seconds
= atoi(argv
[1]);
1283 int changes
= atoi(argv
[2]);
1284 if (seconds
< 1 || changes
< 0) {
1285 err
= "Invalid save parameters"; goto loaderr
;
1287 appendServerSaveParams(seconds
,changes
);
1288 } else if (!strcasecmp(argv
[0],"dir") && argc
== 2) {
1289 if (chdir(argv
[1]) == -1) {
1290 redisLog(REDIS_WARNING
,"Can't chdir to '%s': %s",
1291 argv
[1], strerror(errno
));
1294 } else if (!strcasecmp(argv
[0],"loglevel") && argc
== 2) {
1295 if (!strcasecmp(argv
[1],"debug")) server
.verbosity
= REDIS_DEBUG
;
1296 else if (!strcasecmp(argv
[1],"notice")) server
.verbosity
= REDIS_NOTICE
;
1297 else if (!strcasecmp(argv
[1],"warning")) server
.verbosity
= REDIS_WARNING
;
1299 err
= "Invalid log level. Must be one of debug, notice, warning";
1302 } else if (!strcasecmp(argv
[0],"logfile") && argc
== 2) {
1305 server
.logfile
= zstrdup(argv
[1]);
1306 if (!strcasecmp(server
.logfile
,"stdout")) {
1307 zfree(server
.logfile
);
1308 server
.logfile
= NULL
;
1310 if (server
.logfile
) {
1311 /* Test if we are able to open the file. The server will not
1312 * be able to abort just for this problem later... */
1313 logfp
= fopen(server
.logfile
,"a");
1314 if (logfp
== NULL
) {
1315 err
= sdscatprintf(sdsempty(),
1316 "Can't open the log file: %s", strerror(errno
));
1321 } else if (!strcasecmp(argv
[0],"databases") && argc
== 2) {
1322 server
.dbnum
= atoi(argv
[1]);
1323 if (server
.dbnum
< 1) {
1324 err
= "Invalid number of databases"; goto loaderr
;
1326 } else if (!strcasecmp(argv
[0],"maxclients") && argc
== 2) {
1327 server
.maxclients
= atoi(argv
[1]);
1328 } else if (!strcasecmp(argv
[0],"maxmemory") && argc
== 2) {
1329 server
.maxmemory
= strtoll(argv
[1], NULL
, 10);
1330 } else if (!strcasecmp(argv
[0],"slaveof") && argc
== 3) {
1331 server
.masterhost
= sdsnew(argv
[1]);
1332 server
.masterport
= atoi(argv
[2]);
1333 server
.replstate
= REDIS_REPL_CONNECT
;
1334 } else if (!strcasecmp(argv
[0],"masterauth") && argc
== 2) {
1335 server
.masterauth
= zstrdup(argv
[1]);
1336 } else if (!strcasecmp(argv
[0],"glueoutputbuf") && argc
== 2) {
1337 if ((server
.glueoutputbuf
= yesnotoi(argv
[1])) == -1) {
1338 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1340 } else if (!strcasecmp(argv
[0],"shareobjects") && argc
== 2) {
1341 if ((server
.shareobjects
= yesnotoi(argv
[1])) == -1) {
1342 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1344 } else if (!strcasecmp(argv
[0],"shareobjectspoolsize") && argc
== 2) {
1345 server
.sharingpoolsize
= atoi(argv
[1]);
1346 if (server
.sharingpoolsize
< 1) {
1347 err
= "invalid object sharing pool size"; goto loaderr
;
1349 } else if (!strcasecmp(argv
[0],"daemonize") && argc
== 2) {
1350 if ((server
.daemonize
= yesnotoi(argv
[1])) == -1) {
1351 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1353 } else if (!strcasecmp(argv
[0],"appendonly") && argc
== 2) {
1354 if ((server
.appendonly
= yesnotoi(argv
[1])) == -1) {
1355 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1357 } else if (!strcasecmp(argv
[0],"appendfsync") && argc
== 2) {
1358 if (!strcasecmp(argv
[1],"no")) {
1359 server
.appendfsync
= APPENDFSYNC_NO
;
1360 } else if (!strcasecmp(argv
[1],"always")) {
1361 server
.appendfsync
= APPENDFSYNC_ALWAYS
;
1362 } else if (!strcasecmp(argv
[1],"everysec")) {
1363 server
.appendfsync
= APPENDFSYNC_EVERYSEC
;
1365 err
= "argument must be 'no', 'always' or 'everysec'";
1368 } else if (!strcasecmp(argv
[0],"requirepass") && argc
== 2) {
1369 server
.requirepass
= zstrdup(argv
[1]);
1370 } else if (!strcasecmp(argv
[0],"pidfile") && argc
== 2) {
1371 server
.pidfile
= zstrdup(argv
[1]);
1372 } else if (!strcasecmp(argv
[0],"dbfilename") && argc
== 2) {
1373 server
.dbfilename
= zstrdup(argv
[1]);
1375 err
= "Bad directive or wrong number of arguments"; goto loaderr
;
1377 for (j
= 0; j
< argc
; j
++)
1382 if (fp
!= stdin
) fclose(fp
);
1386 fprintf(stderr
, "\n*** FATAL CONFIG FILE ERROR ***\n");
1387 fprintf(stderr
, "Reading the configuration file, at line %d\n", linenum
);
1388 fprintf(stderr
, ">>> '%s'\n", line
);
1389 fprintf(stderr
, "%s\n", err
);
1393 static void freeClientArgv(redisClient
*c
) {
1396 for (j
= 0; j
< c
->argc
; j
++)
1397 decrRefCount(c
->argv
[j
]);
1398 for (j
= 0; j
< c
->mbargc
; j
++)
1399 decrRefCount(c
->mbargv
[j
]);
1404 static void freeClient(redisClient
*c
) {
1407 aeDeleteFileEvent(server
.el
,c
->fd
,AE_READABLE
);
1408 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1409 sdsfree(c
->querybuf
);
1410 listRelease(c
->reply
);
1413 ln
= listSearchKey(server
.clients
,c
);
1414 redisAssert(ln
!= NULL
);
1415 listDelNode(server
.clients
,ln
);
1416 if (c
->flags
& REDIS_SLAVE
) {
1417 if (c
->replstate
== REDIS_REPL_SEND_BULK
&& c
->repldbfd
!= -1)
1419 list
*l
= (c
->flags
& REDIS_MONITOR
) ? server
.monitors
: server
.slaves
;
1420 ln
= listSearchKey(l
,c
);
1421 redisAssert(ln
!= NULL
);
1424 if (c
->flags
& REDIS_MASTER
) {
1425 server
.master
= NULL
;
1426 server
.replstate
= REDIS_REPL_CONNECT
;
1433 #define GLUEREPLY_UP_TO (1024)
1434 static void glueReplyBuffersIfNeeded(redisClient
*c
) {
1436 char buf
[GLUEREPLY_UP_TO
];
1440 listRewind(c
->reply
);
1441 while((ln
= listYield(c
->reply
))) {
1445 objlen
= sdslen(o
->ptr
);
1446 if (copylen
+ objlen
<= GLUEREPLY_UP_TO
) {
1447 memcpy(buf
+copylen
,o
->ptr
,objlen
);
1449 listDelNode(c
->reply
,ln
);
1451 if (copylen
== 0) return;
1455 /* Now the output buffer is empty, add the new single element */
1456 o
= createObject(REDIS_STRING
,sdsnewlen(buf
,copylen
));
1457 listAddNodeHead(c
->reply
,o
);
1460 static void sendReplyToClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1461 redisClient
*c
= privdata
;
1462 int nwritten
= 0, totwritten
= 0, objlen
;
1465 REDIS_NOTUSED(mask
);
1467 /* Use writev() if we have enough buffers to send */
1468 if (!server
.glueoutputbuf
&&
1469 listLength(c
->reply
) > REDIS_WRITEV_THRESHOLD
&&
1470 !(c
->flags
& REDIS_MASTER
))
1472 sendReplyToClientWritev(el
, fd
, privdata
, mask
);
1476 while(listLength(c
->reply
)) {
1477 if (server
.glueoutputbuf
&& listLength(c
->reply
) > 1)
1478 glueReplyBuffersIfNeeded(c
);
1480 o
= listNodeValue(listFirst(c
->reply
));
1481 objlen
= sdslen(o
->ptr
);
1484 listDelNode(c
->reply
,listFirst(c
->reply
));
1488 if (c
->flags
& REDIS_MASTER
) {
1489 /* Don't reply to a master */
1490 nwritten
= objlen
- c
->sentlen
;
1492 nwritten
= write(fd
, ((char*)o
->ptr
)+c
->sentlen
, objlen
- c
->sentlen
);
1493 if (nwritten
<= 0) break;
1495 c
->sentlen
+= nwritten
;
1496 totwritten
+= nwritten
;
1497 /* If we fully sent the object on head go to the next one */
1498 if (c
->sentlen
== objlen
) {
1499 listDelNode(c
->reply
,listFirst(c
->reply
));
1502 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1503 * bytes, in a single threaded server it's a good idea to serve
1504 * other clients as well, even if a very large request comes from
1505 * super fast link that is always able to accept data (in real world
1506 * scenario think about 'KEYS *' against the loopback interfae) */
1507 if (totwritten
> REDIS_MAX_WRITE_PER_EVENT
) break;
1509 if (nwritten
== -1) {
1510 if (errno
== EAGAIN
) {
1513 redisLog(REDIS_DEBUG
,
1514 "Error writing to client: %s", strerror(errno
));
1519 if (totwritten
> 0) c
->lastinteraction
= time(NULL
);
1520 if (listLength(c
->reply
) == 0) {
1522 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1526 static void sendReplyToClientWritev(aeEventLoop
*el
, int fd
, void *privdata
, int mask
)
1528 redisClient
*c
= privdata
;
1529 int nwritten
= 0, totwritten
= 0, objlen
, willwrite
;
1531 struct iovec iov
[REDIS_WRITEV_IOVEC_COUNT
];
1532 int offset
, ion
= 0;
1534 REDIS_NOTUSED(mask
);
1537 while (listLength(c
->reply
)) {
1538 offset
= c
->sentlen
;
1542 /* fill-in the iov[] array */
1543 for(node
= listFirst(c
->reply
); node
; node
= listNextNode(node
)) {
1544 o
= listNodeValue(node
);
1545 objlen
= sdslen(o
->ptr
);
1547 if (totwritten
+ objlen
- offset
> REDIS_MAX_WRITE_PER_EVENT
)
1550 if(ion
== REDIS_WRITEV_IOVEC_COUNT
)
1551 break; /* no more iovecs */
1553 iov
[ion
].iov_base
= ((char*)o
->ptr
) + offset
;
1554 iov
[ion
].iov_len
= objlen
- offset
;
1555 willwrite
+= objlen
- offset
;
1556 offset
= 0; /* just for the first item */
1563 /* write all collected blocks at once */
1564 if((nwritten
= writev(fd
, iov
, ion
)) < 0) {
1565 if (errno
!= EAGAIN
) {
1566 redisLog(REDIS_DEBUG
,
1567 "Error writing to client: %s", strerror(errno
));
1574 totwritten
+= nwritten
;
1575 offset
= c
->sentlen
;
1577 /* remove written robjs from c->reply */
1578 while (nwritten
&& listLength(c
->reply
)) {
1579 o
= listNodeValue(listFirst(c
->reply
));
1580 objlen
= sdslen(o
->ptr
);
1582 if(nwritten
>= objlen
- offset
) {
1583 listDelNode(c
->reply
, listFirst(c
->reply
));
1584 nwritten
-= objlen
- offset
;
1588 c
->sentlen
+= nwritten
;
1596 c
->lastinteraction
= time(NULL
);
1598 if (listLength(c
->reply
) == 0) {
1600 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1604 static struct redisCommand
*lookupCommand(char *name
) {
1606 while(cmdTable
[j
].name
!= NULL
) {
1607 if (!strcasecmp(name
,cmdTable
[j
].name
)) return &cmdTable
[j
];
1613 /* resetClient prepare the client to process the next command */
1614 static void resetClient(redisClient
*c
) {
1620 /* If this function gets called we already read a whole
1621 * command, argments are in the client argv/argc fields.
1622 * processCommand() execute the command or prepare the
1623 * server for a bulk read from the client.
1625 * If 1 is returned the client is still alive and valid and
1626 * and other operations can be performed by the caller. Otherwise
1627 * if 0 is returned the client was destroied (i.e. after QUIT). */
1628 static int processCommand(redisClient
*c
) {
1629 struct redisCommand
*cmd
;
1632 /* Free some memory if needed (maxmemory setting) */
1633 if (server
.maxmemory
) freeMemoryIfNeeded();
1635 /* Handle the multi bulk command type. This is an alternative protocol
1636 * supported by Redis in order to receive commands that are composed of
1637 * multiple binary-safe "bulk" arguments. The latency of processing is
1638 * a bit higher but this allows things like multi-sets, so if this
1639 * protocol is used only for MSET and similar commands this is a big win. */
1640 if (c
->multibulk
== 0 && c
->argc
== 1 && ((char*)(c
->argv
[0]->ptr
))[0] == '*') {
1641 c
->multibulk
= atoi(((char*)c
->argv
[0]->ptr
)+1);
1642 if (c
->multibulk
<= 0) {
1646 decrRefCount(c
->argv
[c
->argc
-1]);
1650 } else if (c
->multibulk
) {
1651 if (c
->bulklen
== -1) {
1652 if (((char*)c
->argv
[0]->ptr
)[0] != '$') {
1653 addReplySds(c
,sdsnew("-ERR multi bulk protocol error\r\n"));
1657 int bulklen
= atoi(((char*)c
->argv
[0]->ptr
)+1);
1658 decrRefCount(c
->argv
[0]);
1659 if (bulklen
< 0 || bulklen
> 1024*1024*1024) {
1661 addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n"));
1666 c
->bulklen
= bulklen
+2; /* add two bytes for CR+LF */
1670 c
->mbargv
= zrealloc(c
->mbargv
,(sizeof(robj
*))*(c
->mbargc
+1));
1671 c
->mbargv
[c
->mbargc
] = c
->argv
[0];
1675 if (c
->multibulk
== 0) {
1679 /* Here we need to swap the multi-bulk argc/argv with the
1680 * normal argc/argv of the client structure. */
1682 c
->argv
= c
->mbargv
;
1683 c
->mbargv
= auxargv
;
1686 c
->argc
= c
->mbargc
;
1687 c
->mbargc
= auxargc
;
1689 /* We need to set bulklen to something different than -1
1690 * in order for the code below to process the command without
1691 * to try to read the last argument of a bulk command as
1692 * a special argument. */
1694 /* continue below and process the command */
1701 /* -- end of multi bulk commands processing -- */
1703 /* The QUIT command is handled as a special case. Normal command
1704 * procs are unable to close the client connection safely */
1705 if (!strcasecmp(c
->argv
[0]->ptr
,"quit")) {
1709 cmd
= lookupCommand(c
->argv
[0]->ptr
);
1711 addReplySds(c
,sdsnew("-ERR unknown command\r\n"));
1714 } else if ((cmd
->arity
> 0 && cmd
->arity
!= c
->argc
) ||
1715 (c
->argc
< -cmd
->arity
)) {
1716 addReplySds(c
,sdsnew("-ERR wrong number of arguments\r\n"));
1719 } else if (server
.maxmemory
&& cmd
->flags
& REDIS_CMD_DENYOOM
&& zmalloc_used_memory() > server
.maxmemory
) {
1720 addReplySds(c
,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1723 } else if (cmd
->flags
& REDIS_CMD_BULK
&& c
->bulklen
== -1) {
1724 int bulklen
= atoi(c
->argv
[c
->argc
-1]->ptr
);
1726 decrRefCount(c
->argv
[c
->argc
-1]);
1727 if (bulklen
< 0 || bulklen
> 1024*1024*1024) {
1729 addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n"));
1734 c
->bulklen
= bulklen
+2; /* add two bytes for CR+LF */
1735 /* It is possible that the bulk read is already in the
1736 * buffer. Check this condition and handle it accordingly.
1737 * This is just a fast path, alternative to call processInputBuffer().
1738 * It's a good idea since the code is small and this condition
1739 * happens most of the times. */
1740 if ((signed)sdslen(c
->querybuf
) >= c
->bulklen
) {
1741 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
1743 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
1748 /* Let's try to share objects on the command arguments vector */
1749 if (server
.shareobjects
) {
1751 for(j
= 1; j
< c
->argc
; j
++)
1752 c
->argv
[j
] = tryObjectSharing(c
->argv
[j
]);
1754 /* Let's try to encode the bulk object to save space. */
1755 if (cmd
->flags
& REDIS_CMD_BULK
)
1756 tryObjectEncoding(c
->argv
[c
->argc
-1]);
1758 /* Check if the user is authenticated */
1759 if (server
.requirepass
&& !c
->authenticated
&& cmd
->proc
!= authCommand
) {
1760 addReplySds(c
,sdsnew("-ERR operation not permitted\r\n"));
1765 /* Exec the command */
1766 dirty
= server
.dirty
;
1768 if (server
.appendonly
&& server
.dirty
-dirty
)
1769 feedAppendOnlyFile(cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1770 if (server
.dirty
-dirty
&& listLength(server
.slaves
))
1771 replicationFeedSlaves(server
.slaves
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1772 if (listLength(server
.monitors
))
1773 replicationFeedSlaves(server
.monitors
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1774 server
.stat_numcommands
++;
1776 /* Prepare the client for the next command */
1777 if (c
->flags
& REDIS_CLOSE
) {
1785 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
) {
1789 /* (args*2)+1 is enough room for args, spaces, newlines */
1790 robj
*static_outv
[REDIS_STATIC_ARGS
*2+1];
1792 if (argc
<= REDIS_STATIC_ARGS
) {
1795 outv
= zmalloc(sizeof(robj
*)*(argc
*2+1));
1798 for (j
= 0; j
< argc
; j
++) {
1799 if (j
!= 0) outv
[outc
++] = shared
.space
;
1800 if ((cmd
->flags
& REDIS_CMD_BULK
) && j
== argc
-1) {
1803 lenobj
= createObject(REDIS_STRING
,
1804 sdscatprintf(sdsempty(),"%lu\r\n",
1805 stringObjectLen(argv
[j
])));
1806 lenobj
->refcount
= 0;
1807 outv
[outc
++] = lenobj
;
1809 outv
[outc
++] = argv
[j
];
1811 outv
[outc
++] = shared
.crlf
;
1813 /* Increment all the refcounts at start and decrement at end in order to
1814 * be sure to free objects if there is no slave in a replication state
1815 * able to be feed with commands */
1816 for (j
= 0; j
< outc
; j
++) incrRefCount(outv
[j
]);
1818 while((ln
= listYield(slaves
))) {
1819 redisClient
*slave
= ln
->value
;
1821 /* Don't feed slaves that are still waiting for BGSAVE to start */
1822 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) continue;
1824 /* Feed all the other slaves, MONITORs and so on */
1825 if (slave
->slaveseldb
!= dictid
) {
1829 case 0: selectcmd
= shared
.select0
; break;
1830 case 1: selectcmd
= shared
.select1
; break;
1831 case 2: selectcmd
= shared
.select2
; break;
1832 case 3: selectcmd
= shared
.select3
; break;
1833 case 4: selectcmd
= shared
.select4
; break;
1834 case 5: selectcmd
= shared
.select5
; break;
1835 case 6: selectcmd
= shared
.select6
; break;
1836 case 7: selectcmd
= shared
.select7
; break;
1837 case 8: selectcmd
= shared
.select8
; break;
1838 case 9: selectcmd
= shared
.select9
; break;
1840 selectcmd
= createObject(REDIS_STRING
,
1841 sdscatprintf(sdsempty(),"select %d\r\n",dictid
));
1842 selectcmd
->refcount
= 0;
1845 addReply(slave
,selectcmd
);
1846 slave
->slaveseldb
= dictid
;
1848 for (j
= 0; j
< outc
; j
++) addReply(slave
,outv
[j
]);
1850 for (j
= 0; j
< outc
; j
++) decrRefCount(outv
[j
]);
1851 if (outv
!= static_outv
) zfree(outv
);
1854 static void processInputBuffer(redisClient
*c
) {
1856 if (c
->bulklen
== -1) {
1857 /* Read the first line of the query */
1858 char *p
= strchr(c
->querybuf
,'\n');
1865 query
= c
->querybuf
;
1866 c
->querybuf
= sdsempty();
1867 querylen
= 1+(p
-(query
));
1868 if (sdslen(query
) > querylen
) {
1869 /* leave data after the first line of the query in the buffer */
1870 c
->querybuf
= sdscatlen(c
->querybuf
,query
+querylen
,sdslen(query
)-querylen
);
1872 *p
= '\0'; /* remove "\n" */
1873 if (*(p
-1) == '\r') *(p
-1) = '\0'; /* and "\r" if any */
1874 sdsupdatelen(query
);
1876 /* Now we can split the query in arguments */
1877 if (sdslen(query
) == 0) {
1878 /* Ignore empty query */
1882 argv
= sdssplitlen(query
,sdslen(query
)," ",1,&argc
);
1885 if (c
->argv
) zfree(c
->argv
);
1886 c
->argv
= zmalloc(sizeof(robj
*)*argc
);
1888 for (j
= 0; j
< argc
; j
++) {
1889 if (sdslen(argv
[j
])) {
1890 c
->argv
[c
->argc
] = createObject(REDIS_STRING
,argv
[j
]);
1897 /* Execute the command. If the client is still valid
1898 * after processCommand() return and there is something
1899 * on the query buffer try to process the next command. */
1900 if (c
->argc
&& processCommand(c
) && sdslen(c
->querybuf
)) goto again
;
1902 } else if (sdslen(c
->querybuf
) >= REDIS_REQUEST_MAX_SIZE
) {
1903 redisLog(REDIS_DEBUG
, "Client protocol error");
1908 /* Bulk read handling. Note that if we are at this point
1909 the client already sent a command terminated with a newline,
1910 we are reading the bulk data that is actually the last
1911 argument of the command. */
1912 int qbl
= sdslen(c
->querybuf
);
1914 if (c
->bulklen
<= qbl
) {
1915 /* Copy everything but the final CRLF as final argument */
1916 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
1918 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
1919 /* Process the command. If the client is still valid after
1920 * the processing and there is more data in the buffer
1921 * try to parse it. */
1922 if (processCommand(c
) && sdslen(c
->querybuf
)) goto again
;
1928 static void readQueryFromClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1929 redisClient
*c
= (redisClient
*) privdata
;
1930 char buf
[REDIS_IOBUF_LEN
];
1933 REDIS_NOTUSED(mask
);
1935 nread
= read(fd
, buf
, REDIS_IOBUF_LEN
);
1937 if (errno
== EAGAIN
) {
1940 redisLog(REDIS_DEBUG
, "Reading from client: %s",strerror(errno
));
1944 } else if (nread
== 0) {
1945 redisLog(REDIS_DEBUG
, "Client closed connection");
1950 c
->querybuf
= sdscatlen(c
->querybuf
, buf
, nread
);
1951 c
->lastinteraction
= time(NULL
);
1955 processInputBuffer(c
);
1958 static int selectDb(redisClient
*c
, int id
) {
1959 if (id
< 0 || id
>= server
.dbnum
)
1961 c
->db
= &server
.db
[id
];
1965 static void *dupClientReplyValue(void *o
) {
1966 incrRefCount((robj
*)o
);
1970 static redisClient
*createClient(int fd
) {
1971 redisClient
*c
= zmalloc(sizeof(*c
));
1973 anetNonBlock(NULL
,fd
);
1974 anetTcpNoDelay(NULL
,fd
);
1975 if (!c
) return NULL
;
1978 c
->querybuf
= sdsempty();
1987 c
->lastinteraction
= time(NULL
);
1988 c
->authenticated
= 0;
1989 c
->replstate
= REDIS_REPL_NONE
;
1990 c
->reply
= listCreate();
1991 listSetFreeMethod(c
->reply
,decrRefCount
);
1992 listSetDupMethod(c
->reply
,dupClientReplyValue
);
1993 if (aeCreateFileEvent(server
.el
, c
->fd
, AE_READABLE
,
1994 readQueryFromClient
, c
) == AE_ERR
) {
1998 listAddNodeTail(server
.clients
,c
);
2002 static void addReply(redisClient
*c
, robj
*obj
) {
2003 if (listLength(c
->reply
) == 0 &&
2004 (c
->replstate
== REDIS_REPL_NONE
||
2005 c
->replstate
== REDIS_REPL_ONLINE
) &&
2006 aeCreateFileEvent(server
.el
, c
->fd
, AE_WRITABLE
,
2007 sendReplyToClient
, c
) == AE_ERR
) return;
2008 listAddNodeTail(c
->reply
,getDecodedObject(obj
));
2011 static void addReplySds(redisClient
*c
, sds s
) {
2012 robj
*o
= createObject(REDIS_STRING
,s
);
2017 static void addReplyDouble(redisClient
*c
, double d
) {
2020 snprintf(buf
,sizeof(buf
),"%.17g",d
);
2021 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n%s\r\n",
2025 static void addReplyBulkLen(redisClient
*c
, robj
*obj
) {
2028 if (obj
->encoding
== REDIS_ENCODING_RAW
) {
2029 len
= sdslen(obj
->ptr
);
2031 long n
= (long)obj
->ptr
;
2038 while((n
= n
/10) != 0) {
2042 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n",len
));
2045 static void acceptHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
2050 REDIS_NOTUSED(mask
);
2051 REDIS_NOTUSED(privdata
);
2053 cfd
= anetAccept(server
.neterr
, fd
, cip
, &cport
);
2054 if (cfd
== AE_ERR
) {
2055 redisLog(REDIS_DEBUG
,"Accepting client connection: %s", server
.neterr
);
2058 redisLog(REDIS_DEBUG
,"Accepted %s:%d", cip
, cport
);
2059 if ((c
= createClient(cfd
)) == NULL
) {
2060 redisLog(REDIS_WARNING
,"Error allocating resoures for the client");
2061 close(cfd
); /* May be already closed, just ingore errors */
2064 /* If maxclient directive is set and this is one client more... close the
2065 * connection. Note that we create the client instead to check before
2066 * for this condition, since now the socket is already set in nonblocking
2067 * mode and we can send an error for free using the Kernel I/O */
2068 if (server
.maxclients
&& listLength(server
.clients
) > server
.maxclients
) {
2069 char *err
= "-ERR max number of clients reached\r\n";
2071 /* That's a best effort error message, don't check write errors */
2072 if (write(c
->fd
,err
,strlen(err
)) == -1) {
2073 /* Nothing to do, Just to avoid the warning... */
2078 server
.stat_numconnections
++;
2081 /* ======================= Redis objects implementation ===================== */
2083 static robj
*createObject(int type
, void *ptr
) {
2086 if (listLength(server
.objfreelist
)) {
2087 listNode
*head
= listFirst(server
.objfreelist
);
2088 o
= listNodeValue(head
);
2089 listDelNode(server
.objfreelist
,head
);
2091 o
= zmalloc(sizeof(*o
));
2094 o
->encoding
= REDIS_ENCODING_RAW
;
2100 static robj
*createStringObject(char *ptr
, size_t len
) {
2101 return createObject(REDIS_STRING
,sdsnewlen(ptr
,len
));
2104 static robj
*createListObject(void) {
2105 list
*l
= listCreate();
2107 listSetFreeMethod(l
,decrRefCount
);
2108 return createObject(REDIS_LIST
,l
);
2111 static robj
*createSetObject(void) {
2112 dict
*d
= dictCreate(&setDictType
,NULL
);
2113 return createObject(REDIS_SET
,d
);
2116 static robj
*createZsetObject(void) {
2117 zset
*zs
= zmalloc(sizeof(*zs
));
2119 zs
->dict
= dictCreate(&zsetDictType
,NULL
);
2120 zs
->zsl
= zslCreate();
2121 return createObject(REDIS_ZSET
,zs
);
2124 static void freeStringObject(robj
*o
) {
2125 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2130 static void freeListObject(robj
*o
) {
2131 listRelease((list
*) o
->ptr
);
2134 static void freeSetObject(robj
*o
) {
2135 dictRelease((dict
*) o
->ptr
);
2138 static void freeZsetObject(robj
*o
) {
2141 dictRelease(zs
->dict
);
2146 static void freeHashObject(robj
*o
) {
2147 dictRelease((dict
*) o
->ptr
);
2150 static void incrRefCount(robj
*o
) {
2152 #ifdef DEBUG_REFCOUNT
2153 if (o
->type
== REDIS_STRING
)
2154 printf("Increment '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
);
2158 static void decrRefCount(void *obj
) {
2161 #ifdef DEBUG_REFCOUNT
2162 if (o
->type
== REDIS_STRING
)
2163 printf("Decrement '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
-1);
2165 if (--(o
->refcount
) == 0) {
2167 case REDIS_STRING
: freeStringObject(o
); break;
2168 case REDIS_LIST
: freeListObject(o
); break;
2169 case REDIS_SET
: freeSetObject(o
); break;
2170 case REDIS_ZSET
: freeZsetObject(o
); break;
2171 case REDIS_HASH
: freeHashObject(o
); break;
2172 default: redisAssert(0 != 0); break;
2174 if (listLength(server
.objfreelist
) > REDIS_OBJFREELIST_MAX
||
2175 !listAddNodeHead(server
.objfreelist
,o
))
2180 static robj
*lookupKey(redisDb
*db
, robj
*key
) {
2181 dictEntry
*de
= dictFind(db
->dict
,key
);
2182 return de
? dictGetEntryVal(de
) : NULL
;
2185 static robj
*lookupKeyRead(redisDb
*db
, robj
*key
) {
2186 expireIfNeeded(db
,key
);
2187 return lookupKey(db
,key
);
2190 static robj
*lookupKeyWrite(redisDb
*db
, robj
*key
) {
2191 deleteIfVolatile(db
,key
);
2192 return lookupKey(db
,key
);
2195 static int deleteKey(redisDb
*db
, robj
*key
) {
2198 /* We need to protect key from destruction: after the first dictDelete()
2199 * it may happen that 'key' is no longer valid if we don't increment
2200 * it's count. This may happen when we get the object reference directly
2201 * from the hash table with dictRandomKey() or dict iterators */
2203 if (dictSize(db
->expires
)) dictDelete(db
->expires
,key
);
2204 retval
= dictDelete(db
->dict
,key
);
2207 return retval
== DICT_OK
;
2210 /* Try to share an object against the shared objects pool */
2211 static robj
*tryObjectSharing(robj
*o
) {
2212 struct dictEntry
*de
;
2215 if (o
== NULL
|| server
.shareobjects
== 0) return o
;
2217 redisAssert(o
->type
== REDIS_STRING
);
2218 de
= dictFind(server
.sharingpool
,o
);
2220 robj
*shared
= dictGetEntryKey(de
);
2222 c
= ((unsigned long) dictGetEntryVal(de
))+1;
2223 dictGetEntryVal(de
) = (void*) c
;
2224 incrRefCount(shared
);
2228 /* Here we are using a stream algorihtm: Every time an object is
2229 * shared we increment its count, everytime there is a miss we
2230 * recrement the counter of a random object. If this object reaches
2231 * zero we remove the object and put the current object instead. */
2232 if (dictSize(server
.sharingpool
) >=
2233 server
.sharingpoolsize
) {
2234 de
= dictGetRandomKey(server
.sharingpool
);
2235 redisAssert(de
!= NULL
);
2236 c
= ((unsigned long) dictGetEntryVal(de
))-1;
2237 dictGetEntryVal(de
) = (void*) c
;
2239 dictDelete(server
.sharingpool
,de
->key
);
2242 c
= 0; /* If the pool is empty we want to add this object */
2247 retval
= dictAdd(server
.sharingpool
,o
,(void*)1);
2248 redisAssert(retval
== DICT_OK
);
2255 /* Check if the nul-terminated string 's' can be represented by a long
2256 * (that is, is a number that fits into long without any other space or
2257 * character before or after the digits).
2259 * If so, the function returns REDIS_OK and *longval is set to the value
2260 * of the number. Otherwise REDIS_ERR is returned */
2261 static int isStringRepresentableAsLong(sds s
, long *longval
) {
2262 char buf
[32], *endptr
;
2266 value
= strtol(s
, &endptr
, 10);
2267 if (endptr
[0] != '\0') return REDIS_ERR
;
2268 slen
= snprintf(buf
,32,"%ld",value
);
2270 /* If the number converted back into a string is not identical
2271 * then it's not possible to encode the string as integer */
2272 if (sdslen(s
) != (unsigned)slen
|| memcmp(buf
,s
,slen
)) return REDIS_ERR
;
2273 if (longval
) *longval
= value
;
2277 /* Try to encode a string object in order to save space */
2278 static int tryObjectEncoding(robj
*o
) {
2282 if (o
->encoding
!= REDIS_ENCODING_RAW
)
2283 return REDIS_ERR
; /* Already encoded */
2285 /* It's not save to encode shared objects: shared objects can be shared
2286 * everywhere in the "object space" of Redis. Encoded objects can only
2287 * appear as "values" (and not, for instance, as keys) */
2288 if (o
->refcount
> 1) return REDIS_ERR
;
2290 /* Currently we try to encode only strings */
2291 redisAssert(o
->type
== REDIS_STRING
);
2293 /* Check if we can represent this string as a long integer */
2294 if (isStringRepresentableAsLong(s
,&value
) == REDIS_ERR
) return REDIS_ERR
;
2296 /* Ok, this object can be encoded */
2297 o
->encoding
= REDIS_ENCODING_INT
;
2299 o
->ptr
= (void*) value
;
2303 /* Get a decoded version of an encoded object (returned as a new object).
2304 * If the object is already raw-encoded just increment the ref count. */
2305 static robj
*getDecodedObject(robj
*o
) {
2308 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2312 if (o
->type
== REDIS_STRING
&& o
->encoding
== REDIS_ENCODING_INT
) {
2315 snprintf(buf
,32,"%ld",(long)o
->ptr
);
2316 dec
= createStringObject(buf
,strlen(buf
));
2319 redisAssert(1 != 1);
2323 /* Compare two string objects via strcmp() or alike.
2324 * Note that the objects may be integer-encoded. In such a case we
2325 * use snprintf() to get a string representation of the numbers on the stack
2326 * and compare the strings, it's much faster than calling getDecodedObject().
2328 * Important note: if objects are not integer encoded, but binary-safe strings,
2329 * sdscmp() from sds.c will apply memcmp() so this function ca be considered
2331 static int compareStringObjects(robj
*a
, robj
*b
) {
2332 redisAssert(a
->type
== REDIS_STRING
&& b
->type
== REDIS_STRING
);
2333 char bufa
[128], bufb
[128], *astr
, *bstr
;
2336 if (a
== b
) return 0;
2337 if (a
->encoding
!= REDIS_ENCODING_RAW
) {
2338 snprintf(bufa
,sizeof(bufa
),"%ld",(long) a
->ptr
);
2344 if (b
->encoding
!= REDIS_ENCODING_RAW
) {
2345 snprintf(bufb
,sizeof(bufb
),"%ld",(long) b
->ptr
);
2351 return bothsds
? sdscmp(astr
,bstr
) : strcmp(astr
,bstr
);
2354 static size_t stringObjectLen(robj
*o
) {
2355 redisAssert(o
->type
== REDIS_STRING
);
2356 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2357 return sdslen(o
->ptr
);
2361 return snprintf(buf
,32,"%ld",(long)o
->ptr
);
2365 /*============================ DB saving/loading ============================ */
2367 static int rdbSaveType(FILE *fp
, unsigned char type
) {
2368 if (fwrite(&type
,1,1,fp
) == 0) return -1;
2372 static int rdbSaveTime(FILE *fp
, time_t t
) {
2373 int32_t t32
= (int32_t) t
;
2374 if (fwrite(&t32
,4,1,fp
) == 0) return -1;
2378 /* check rdbLoadLen() comments for more info */
2379 static int rdbSaveLen(FILE *fp
, uint32_t len
) {
2380 unsigned char buf
[2];
2383 /* Save a 6 bit len */
2384 buf
[0] = (len
&0xFF)|(REDIS_RDB_6BITLEN
<<6);
2385 if (fwrite(buf
,1,1,fp
) == 0) return -1;
2386 } else if (len
< (1<<14)) {
2387 /* Save a 14 bit len */
2388 buf
[0] = ((len
>>8)&0xFF)|(REDIS_RDB_14BITLEN
<<6);
2390 if (fwrite(buf
,2,1,fp
) == 0) return -1;
2392 /* Save a 32 bit len */
2393 buf
[0] = (REDIS_RDB_32BITLEN
<<6);
2394 if (fwrite(buf
,1,1,fp
) == 0) return -1;
2396 if (fwrite(&len
,4,1,fp
) == 0) return -1;
2401 /* String objects in the form "2391" "-100" without any space and with a
2402 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2403 * encoded as integers to save space */
2404 static int rdbTryIntegerEncoding(sds s
, unsigned char *enc
) {
2406 char *endptr
, buf
[32];
2408 /* Check if it's possible to encode this value as a number */
2409 value
= strtoll(s
, &endptr
, 10);
2410 if (endptr
[0] != '\0') return 0;
2411 snprintf(buf
,32,"%lld",value
);
2413 /* If the number converted back into a string is not identical
2414 * then it's not possible to encode the string as integer */
2415 if (strlen(buf
) != sdslen(s
) || memcmp(buf
,s
,sdslen(s
))) return 0;
2417 /* Finally check if it fits in our ranges */
2418 if (value
>= -(1<<7) && value
<= (1<<7)-1) {
2419 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT8
;
2420 enc
[1] = value
&0xFF;
2422 } else if (value
>= -(1<<15) && value
<= (1<<15)-1) {
2423 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT16
;
2424 enc
[1] = value
&0xFF;
2425 enc
[2] = (value
>>8)&0xFF;
2427 } else if (value
>= -((long long)1<<31) && value
<= ((long long)1<<31)-1) {
2428 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT32
;
2429 enc
[1] = value
&0xFF;
2430 enc
[2] = (value
>>8)&0xFF;
2431 enc
[3] = (value
>>16)&0xFF;
2432 enc
[4] = (value
>>24)&0xFF;
2439 static int rdbSaveLzfStringObject(FILE *fp
, robj
*obj
) {
2440 unsigned int comprlen
, outlen
;
2444 /* We require at least four bytes compression for this to be worth it */
2445 outlen
= sdslen(obj
->ptr
)-4;
2446 if (outlen
<= 0) return 0;
2447 if ((out
= zmalloc(outlen
+1)) == NULL
) return 0;
2448 comprlen
= lzf_compress(obj
->ptr
, sdslen(obj
->ptr
), out
, outlen
);
2449 if (comprlen
== 0) {
2453 /* Data compressed! Let's save it on disk */
2454 byte
= (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_LZF
;
2455 if (fwrite(&byte
,1,1,fp
) == 0) goto writeerr
;
2456 if (rdbSaveLen(fp
,comprlen
) == -1) goto writeerr
;
2457 if (rdbSaveLen(fp
,sdslen(obj
->ptr
)) == -1) goto writeerr
;
2458 if (fwrite(out
,comprlen
,1,fp
) == 0) goto writeerr
;
2467 /* Save a string objet as [len][data] on disk. If the object is a string
2468 * representation of an integer value we try to safe it in a special form */
2469 static int rdbSaveStringObjectRaw(FILE *fp
, robj
*obj
) {
2473 len
= sdslen(obj
->ptr
);
2475 /* Try integer encoding */
2477 unsigned char buf
[5];
2478 if ((enclen
= rdbTryIntegerEncoding(obj
->ptr
,buf
)) > 0) {
2479 if (fwrite(buf
,enclen
,1,fp
) == 0) return -1;
2484 /* Try LZF compression - under 20 bytes it's unable to compress even
2485 * aaaaaaaaaaaaaaaaaa so skip it */
2489 retval
= rdbSaveLzfStringObject(fp
,obj
);
2490 if (retval
== -1) return -1;
2491 if (retval
> 0) return 0;
2492 /* retval == 0 means data can't be compressed, save the old way */
2495 /* Store verbatim */
2496 if (rdbSaveLen(fp
,len
) == -1) return -1;
2497 if (len
&& fwrite(obj
->ptr
,len
,1,fp
) == 0) return -1;
2501 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2502 static int rdbSaveStringObject(FILE *fp
, robj
*obj
) {
2505 obj
= getDecodedObject(obj
);
2506 retval
= rdbSaveStringObjectRaw(fp
,obj
);
2511 /* Save a double value. Doubles are saved as strings prefixed by an unsigned
2512 * 8 bit integer specifing the length of the representation.
2513 * This 8 bit integer has special values in order to specify the following
2519 static int rdbSaveDoubleValue(FILE *fp
, double val
) {
2520 unsigned char buf
[128];
2526 } else if (!isfinite(val
)) {
2528 buf
[0] = (val
< 0) ? 255 : 254;
2530 snprintf((char*)buf
+1,sizeof(buf
)-1,"%.17g",val
);
2531 buf
[0] = strlen((char*)buf
+1);
2534 if (fwrite(buf
,len
,1,fp
) == 0) return -1;
2538 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2539 static int rdbSave(char *filename
) {
2540 dictIterator
*di
= NULL
;
2545 time_t now
= time(NULL
);
2547 snprintf(tmpfile
,256,"temp-%d.rdb", (int) getpid());
2548 fp
= fopen(tmpfile
,"w");
2550 redisLog(REDIS_WARNING
, "Failed saving the DB: %s", strerror(errno
));
2553 if (fwrite("REDIS0001",9,1,fp
) == 0) goto werr
;
2554 for (j
= 0; j
< server
.dbnum
; j
++) {
2555 redisDb
*db
= server
.db
+j
;
2557 if (dictSize(d
) == 0) continue;
2558 di
= dictGetIterator(d
);
2564 /* Write the SELECT DB opcode */
2565 if (rdbSaveType(fp
,REDIS_SELECTDB
) == -1) goto werr
;
2566 if (rdbSaveLen(fp
,j
) == -1) goto werr
;
2568 /* Iterate this DB writing every entry */
2569 while((de
= dictNext(di
)) != NULL
) {
2570 robj
*key
= dictGetEntryKey(de
);
2571 robj
*o
= dictGetEntryVal(de
);
2572 time_t expiretime
= getExpire(db
,key
);
2574 /* Save the expire time */
2575 if (expiretime
!= -1) {
2576 /* If this key is already expired skip it */
2577 if (expiretime
< now
) continue;
2578 if (rdbSaveType(fp
,REDIS_EXPIRETIME
) == -1) goto werr
;
2579 if (rdbSaveTime(fp
,expiretime
) == -1) goto werr
;
2581 /* Save the key and associated value */
2582 if (rdbSaveType(fp
,o
->type
) == -1) goto werr
;
2583 if (rdbSaveStringObject(fp
,key
) == -1) goto werr
;
2584 if (o
->type
== REDIS_STRING
) {
2585 /* Save a string value */
2586 if (rdbSaveStringObject(fp
,o
) == -1) goto werr
;
2587 } else if (o
->type
== REDIS_LIST
) {
2588 /* Save a list value */
2589 list
*list
= o
->ptr
;
2593 if (rdbSaveLen(fp
,listLength(list
)) == -1) goto werr
;
2594 while((ln
= listYield(list
))) {
2595 robj
*eleobj
= listNodeValue(ln
);
2597 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
2599 } else if (o
->type
== REDIS_SET
) {
2600 /* Save a set value */
2602 dictIterator
*di
= dictGetIterator(set
);
2605 if (rdbSaveLen(fp
,dictSize(set
)) == -1) goto werr
;
2606 while((de
= dictNext(di
)) != NULL
) {
2607 robj
*eleobj
= dictGetEntryKey(de
);
2609 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
2611 dictReleaseIterator(di
);
2612 } else if (o
->type
== REDIS_ZSET
) {
2613 /* Save a set value */
2615 dictIterator
*di
= dictGetIterator(zs
->dict
);
2618 if (rdbSaveLen(fp
,dictSize(zs
->dict
)) == -1) goto werr
;
2619 while((de
= dictNext(di
)) != NULL
) {
2620 robj
*eleobj
= dictGetEntryKey(de
);
2621 double *score
= dictGetEntryVal(de
);
2623 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
2624 if (rdbSaveDoubleValue(fp
,*score
) == -1) goto werr
;
2626 dictReleaseIterator(di
);
2628 redisAssert(0 != 0);
2631 dictReleaseIterator(di
);
2634 if (rdbSaveType(fp
,REDIS_EOF
) == -1) goto werr
;
2636 /* Make sure data will not remain on the OS's output buffers */
2641 /* Use RENAME to make sure the DB file is changed atomically only
2642 * if the generate DB file is ok. */
2643 if (rename(tmpfile
,filename
) == -1) {
2644 redisLog(REDIS_WARNING
,"Error moving temp DB file on the final destination: %s", strerror(errno
));
2648 redisLog(REDIS_NOTICE
,"DB saved on disk");
2650 server
.lastsave
= time(NULL
);
2656 redisLog(REDIS_WARNING
,"Write error saving DB on disk: %s", strerror(errno
));
2657 if (di
) dictReleaseIterator(di
);
2661 static int rdbSaveBackground(char *filename
) {
2664 if (server
.bgsavechildpid
!= -1) return REDIS_ERR
;
2665 if ((childpid
= fork()) == 0) {
2668 if (rdbSave(filename
) == REDIS_OK
) {
2675 if (childpid
== -1) {
2676 redisLog(REDIS_WARNING
,"Can't save in background: fork: %s",
2680 redisLog(REDIS_NOTICE
,"Background saving started by pid %d",childpid
);
2681 server
.bgsavechildpid
= childpid
;
2684 return REDIS_OK
; /* unreached */
2687 static void rdbRemoveTempFile(pid_t childpid
) {
2690 snprintf(tmpfile
,256,"temp-%d.rdb", (int) childpid
);
2694 static int rdbLoadType(FILE *fp
) {
2696 if (fread(&type
,1,1,fp
) == 0) return -1;
2700 static time_t rdbLoadTime(FILE *fp
) {
2702 if (fread(&t32
,4,1,fp
) == 0) return -1;
2703 return (time_t) t32
;
2706 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2707 * of this file for a description of how this are stored on disk.
2709 * isencoded is set to 1 if the readed length is not actually a length but
2710 * an "encoding type", check the above comments for more info */
2711 static uint32_t rdbLoadLen(FILE *fp
, int rdbver
, int *isencoded
) {
2712 unsigned char buf
[2];
2715 if (isencoded
) *isencoded
= 0;
2717 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
2722 if (fread(buf
,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2723 type
= (buf
[0]&0xC0)>>6;
2724 if (type
== REDIS_RDB_6BITLEN
) {
2725 /* Read a 6 bit len */
2727 } else if (type
== REDIS_RDB_ENCVAL
) {
2728 /* Read a 6 bit len encoding type */
2729 if (isencoded
) *isencoded
= 1;
2731 } else if (type
== REDIS_RDB_14BITLEN
) {
2732 /* Read a 14 bit len */
2733 if (fread(buf
+1,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2734 return ((buf
[0]&0x3F)<<8)|buf
[1];
2736 /* Read a 32 bit len */
2737 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
2743 static robj
*rdbLoadIntegerObject(FILE *fp
, int enctype
) {
2744 unsigned char enc
[4];
2747 if (enctype
== REDIS_RDB_ENC_INT8
) {
2748 if (fread(enc
,1,1,fp
) == 0) return NULL
;
2749 val
= (signed char)enc
[0];
2750 } else if (enctype
== REDIS_RDB_ENC_INT16
) {
2752 if (fread(enc
,2,1,fp
) == 0) return NULL
;
2753 v
= enc
[0]|(enc
[1]<<8);
2755 } else if (enctype
== REDIS_RDB_ENC_INT32
) {
2757 if (fread(enc
,4,1,fp
) == 0) return NULL
;
2758 v
= enc
[0]|(enc
[1]<<8)|(enc
[2]<<16)|(enc
[3]<<24);
2761 val
= 0; /* anti-warning */
2764 return createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",val
));
2767 static robj
*rdbLoadLzfStringObject(FILE*fp
, int rdbver
) {
2768 unsigned int len
, clen
;
2769 unsigned char *c
= NULL
;
2772 if ((clen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2773 if ((len
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2774 if ((c
= zmalloc(clen
)) == NULL
) goto err
;
2775 if ((val
= sdsnewlen(NULL
,len
)) == NULL
) goto err
;
2776 if (fread(c
,clen
,1,fp
) == 0) goto err
;
2777 if (lzf_decompress(c
,clen
,val
,len
) == 0) goto err
;
2779 return createObject(REDIS_STRING
,val
);
2786 static robj
*rdbLoadStringObject(FILE*fp
, int rdbver
) {
2791 len
= rdbLoadLen(fp
,rdbver
,&isencoded
);
2794 case REDIS_RDB_ENC_INT8
:
2795 case REDIS_RDB_ENC_INT16
:
2796 case REDIS_RDB_ENC_INT32
:
2797 return tryObjectSharing(rdbLoadIntegerObject(fp
,len
));
2798 case REDIS_RDB_ENC_LZF
:
2799 return tryObjectSharing(rdbLoadLzfStringObject(fp
,rdbver
));
2805 if (len
== REDIS_RDB_LENERR
) return NULL
;
2806 val
= sdsnewlen(NULL
,len
);
2807 if (len
&& fread(val
,len
,1,fp
) == 0) {
2811 return tryObjectSharing(createObject(REDIS_STRING
,val
));
2814 /* For information about double serialization check rdbSaveDoubleValue() */
2815 static int rdbLoadDoubleValue(FILE *fp
, double *val
) {
2819 if (fread(&len
,1,1,fp
) == 0) return -1;
2821 case 255: *val
= R_NegInf
; return 0;
2822 case 254: *val
= R_PosInf
; return 0;
2823 case 253: *val
= R_Nan
; return 0;
2825 if (fread(buf
,len
,1,fp
) == 0) return -1;
2826 sscanf(buf
, "%lg", val
);
2831 static int rdbLoad(char *filename
) {
2833 robj
*keyobj
= NULL
;
2835 int type
, retval
, rdbver
;
2836 dict
*d
= server
.db
[0].dict
;
2837 redisDb
*db
= server
.db
+0;
2839 time_t expiretime
= -1, now
= time(NULL
);
2841 fp
= fopen(filename
,"r");
2842 if (!fp
) return REDIS_ERR
;
2843 if (fread(buf
,9,1,fp
) == 0) goto eoferr
;
2845 if (memcmp(buf
,"REDIS",5) != 0) {
2847 redisLog(REDIS_WARNING
,"Wrong signature trying to load DB from file");
2850 rdbver
= atoi(buf
+5);
2853 redisLog(REDIS_WARNING
,"Can't handle RDB format version %d",rdbver
);
2860 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
2861 if (type
== REDIS_EXPIRETIME
) {
2862 if ((expiretime
= rdbLoadTime(fp
)) == -1) goto eoferr
;
2863 /* We read the time so we need to read the object type again */
2864 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
2866 if (type
== REDIS_EOF
) break;
2867 /* Handle SELECT DB opcode as a special case */
2868 if (type
== REDIS_SELECTDB
) {
2869 if ((dbid
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2871 if (dbid
>= (unsigned)server
.dbnum
) {
2872 redisLog(REDIS_WARNING
,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server
.dbnum
);
2875 db
= server
.db
+dbid
;
2880 if ((keyobj
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2882 if (type
== REDIS_STRING
) {
2883 /* Read string value */
2884 if ((o
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2885 tryObjectEncoding(o
);
2886 } else if (type
== REDIS_LIST
|| type
== REDIS_SET
) {
2887 /* Read list/set value */
2890 if ((listlen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2892 o
= (type
== REDIS_LIST
) ? createListObject() : createSetObject();
2893 /* Load every single element of the list/set */
2897 if ((ele
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2898 tryObjectEncoding(ele
);
2899 if (type
== REDIS_LIST
) {
2900 listAddNodeTail((list
*)o
->ptr
,ele
);
2902 dictAdd((dict
*)o
->ptr
,ele
,NULL
);
2905 } else if (type
== REDIS_ZSET
) {
2906 /* Read list/set value */
2910 if ((zsetlen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2912 o
= createZsetObject();
2914 /* Load every single element of the list/set */
2917 double *score
= zmalloc(sizeof(double));
2919 if ((ele
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2920 tryObjectEncoding(ele
);
2921 if (rdbLoadDoubleValue(fp
,score
) == -1) goto eoferr
;
2922 dictAdd(zs
->dict
,ele
,score
);
2923 zslInsert(zs
->zsl
,*score
,ele
);
2924 incrRefCount(ele
); /* added to skiplist */
2927 redisAssert(0 != 0);
2929 /* Add the new object in the hash table */
2930 retval
= dictAdd(d
,keyobj
,o
);
2931 if (retval
== DICT_ERR
) {
2932 redisLog(REDIS_WARNING
,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj
->ptr
);
2935 /* Set the expire time if needed */
2936 if (expiretime
!= -1) {
2937 setExpire(db
,keyobj
,expiretime
);
2938 /* Delete this key if already expired */
2939 if (expiretime
< now
) deleteKey(db
,keyobj
);
2947 eoferr
: /* unexpected end of file is handled here with a fatal exit */
2948 if (keyobj
) decrRefCount(keyobj
);
2949 redisLog(REDIS_WARNING
,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
2951 return REDIS_ERR
; /* Just to avoid warning */
2954 /*================================== Commands =============================== */
2956 static void authCommand(redisClient
*c
) {
2957 if (!server
.requirepass
|| !strcmp(c
->argv
[1]->ptr
, server
.requirepass
)) {
2958 c
->authenticated
= 1;
2959 addReply(c
,shared
.ok
);
2961 c
->authenticated
= 0;
2962 addReplySds(c
,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
2966 static void pingCommand(redisClient
*c
) {
2967 addReply(c
,shared
.pong
);
2970 static void echoCommand(redisClient
*c
) {
2971 addReplyBulkLen(c
,c
->argv
[1]);
2972 addReply(c
,c
->argv
[1]);
2973 addReply(c
,shared
.crlf
);
2976 /*=================================== Strings =============================== */
2978 static void setGenericCommand(redisClient
*c
, int nx
) {
2981 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2982 if (retval
== DICT_ERR
) {
2984 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2985 incrRefCount(c
->argv
[2]);
2987 addReply(c
,shared
.czero
);
2991 incrRefCount(c
->argv
[1]);
2992 incrRefCount(c
->argv
[2]);
2995 removeExpire(c
->db
,c
->argv
[1]);
2996 addReply(c
, nx
? shared
.cone
: shared
.ok
);
2999 static void setCommand(redisClient
*c
) {
3000 setGenericCommand(c
,0);
3003 static void setnxCommand(redisClient
*c
) {
3004 setGenericCommand(c
,1);
3007 static void getCommand(redisClient
*c
) {
3008 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3011 addReply(c
,shared
.nullbulk
);
3013 if (o
->type
!= REDIS_STRING
) {
3014 addReply(c
,shared
.wrongtypeerr
);
3016 addReplyBulkLen(c
,o
);
3018 addReply(c
,shared
.crlf
);
3023 static void getsetCommand(redisClient
*c
) {
3025 if (dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]) == DICT_ERR
) {
3026 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
3028 incrRefCount(c
->argv
[1]);
3030 incrRefCount(c
->argv
[2]);
3032 removeExpire(c
->db
,c
->argv
[1]);
3035 static void mgetCommand(redisClient
*c
) {
3038 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",c
->argc
-1));
3039 for (j
= 1; j
< c
->argc
; j
++) {
3040 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[j
]);
3042 addReply(c
,shared
.nullbulk
);
3044 if (o
->type
!= REDIS_STRING
) {
3045 addReply(c
,shared
.nullbulk
);
3047 addReplyBulkLen(c
,o
);
3049 addReply(c
,shared
.crlf
);
3055 static void msetGenericCommand(redisClient
*c
, int nx
) {
3058 if ((c
->argc
% 2) == 0) {
3059 addReplySds(c
,sdsnew("-ERR wrong number of arguments\r\n"));
3062 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
3063 * set nothing at all if at least one already key exists. */
3065 for (j
= 1; j
< c
->argc
; j
+= 2) {
3066 if (dictFind(c
->db
->dict
,c
->argv
[j
]) != NULL
) {
3067 addReply(c
, shared
.czero
);
3073 for (j
= 1; j
< c
->argc
; j
+= 2) {
3076 tryObjectEncoding(c
->argv
[j
+1]);
3077 retval
= dictAdd(c
->db
->dict
,c
->argv
[j
],c
->argv
[j
+1]);
3078 if (retval
== DICT_ERR
) {
3079 dictReplace(c
->db
->dict
,c
->argv
[j
],c
->argv
[j
+1]);
3080 incrRefCount(c
->argv
[j
+1]);
3082 incrRefCount(c
->argv
[j
]);
3083 incrRefCount(c
->argv
[j
+1]);
3085 removeExpire(c
->db
,c
->argv
[j
]);
3087 server
.dirty
+= (c
->argc
-1)/2;
3088 addReply(c
, nx
? shared
.cone
: shared
.ok
);
3091 static void msetCommand(redisClient
*c
) {
3092 msetGenericCommand(c
,0);
3095 static void msetnxCommand(redisClient
*c
) {
3096 msetGenericCommand(c
,1);
3099 static void incrDecrCommand(redisClient
*c
, long long incr
) {
3104 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3108 if (o
->type
!= REDIS_STRING
) {
3113 if (o
->encoding
== REDIS_ENCODING_RAW
)
3114 value
= strtoll(o
->ptr
, &eptr
, 10);
3115 else if (o
->encoding
== REDIS_ENCODING_INT
)
3116 value
= (long)o
->ptr
;
3118 redisAssert(1 != 1);
3123 o
= createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",value
));
3124 tryObjectEncoding(o
);
3125 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],o
);
3126 if (retval
== DICT_ERR
) {
3127 dictReplace(c
->db
->dict
,c
->argv
[1],o
);
3128 removeExpire(c
->db
,c
->argv
[1]);
3130 incrRefCount(c
->argv
[1]);
3133 addReply(c
,shared
.colon
);
3135 addReply(c
,shared
.crlf
);
3138 static void incrCommand(redisClient
*c
) {
3139 incrDecrCommand(c
,1);
3142 static void decrCommand(redisClient
*c
) {
3143 incrDecrCommand(c
,-1);
3146 static void incrbyCommand(redisClient
*c
) {
3147 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
3148 incrDecrCommand(c
,incr
);
3151 static void decrbyCommand(redisClient
*c
) {
3152 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
3153 incrDecrCommand(c
,-incr
);
3156 /* ========================= Type agnostic commands ========================= */
3158 static void delCommand(redisClient
*c
) {
3161 for (j
= 1; j
< c
->argc
; j
++) {
3162 if (deleteKey(c
->db
,c
->argv
[j
])) {
3169 addReply(c
,shared
.czero
);
3172 addReply(c
,shared
.cone
);
3175 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",deleted
));
3180 static void existsCommand(redisClient
*c
) {
3181 addReply(c
,lookupKeyRead(c
->db
,c
->argv
[1]) ? shared
.cone
: shared
.czero
);
3184 static void selectCommand(redisClient
*c
) {
3185 int id
= atoi(c
->argv
[1]->ptr
);
3187 if (selectDb(c
,id
) == REDIS_ERR
) {
3188 addReplySds(c
,sdsnew("-ERR invalid DB index\r\n"));
3190 addReply(c
,shared
.ok
);
3194 static void randomkeyCommand(redisClient
*c
) {
3198 de
= dictGetRandomKey(c
->db
->dict
);
3199 if (!de
|| expireIfNeeded(c
->db
,dictGetEntryKey(de
)) == 0) break;
3202 addReply(c
,shared
.plus
);
3203 addReply(c
,shared
.crlf
);
3205 addReply(c
,shared
.plus
);
3206 addReply(c
,dictGetEntryKey(de
));
3207 addReply(c
,shared
.crlf
);
3211 static void keysCommand(redisClient
*c
) {
3214 sds pattern
= c
->argv
[1]->ptr
;
3215 int plen
= sdslen(pattern
);
3216 unsigned long numkeys
= 0, keyslen
= 0;
3217 robj
*lenobj
= createObject(REDIS_STRING
,NULL
);
3219 di
= dictGetIterator(c
->db
->dict
);
3221 decrRefCount(lenobj
);
3222 while((de
= dictNext(di
)) != NULL
) {
3223 robj
*keyobj
= dictGetEntryKey(de
);
3225 sds key
= keyobj
->ptr
;
3226 if ((pattern
[0] == '*' && pattern
[1] == '\0') ||
3227 stringmatchlen(pattern
,plen
,key
,sdslen(key
),0)) {
3228 if (expireIfNeeded(c
->db
,keyobj
) == 0) {
3230 addReply(c
,shared
.space
);
3233 keyslen
+= sdslen(key
);
3237 dictReleaseIterator(di
);
3238 lenobj
->ptr
= sdscatprintf(sdsempty(),"$%lu\r\n",keyslen
+(numkeys
? (numkeys
-1) : 0));
3239 addReply(c
,shared
.crlf
);
3242 static void dbsizeCommand(redisClient
*c
) {
3244 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c
->db
->dict
)));
3247 static void lastsaveCommand(redisClient
*c
) {
3249 sdscatprintf(sdsempty(),":%lu\r\n",server
.lastsave
));
3252 static void typeCommand(redisClient
*c
) {
3256 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3261 case REDIS_STRING
: type
= "+string"; break;
3262 case REDIS_LIST
: type
= "+list"; break;
3263 case REDIS_SET
: type
= "+set"; break;
3264 case REDIS_ZSET
: type
= "+zset"; break;
3265 default: type
= "unknown"; break;
3268 addReplySds(c
,sdsnew(type
));
3269 addReply(c
,shared
.crlf
);
3272 static void saveCommand(redisClient
*c
) {
3273 if (server
.bgsavechildpid
!= -1) {
3274 addReplySds(c
,sdsnew("-ERR background save in progress\r\n"));
3277 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
3278 addReply(c
,shared
.ok
);
3280 addReply(c
,shared
.err
);
3284 static void bgsaveCommand(redisClient
*c
) {
3285 if (server
.bgsavechildpid
!= -1) {
3286 addReplySds(c
,sdsnew("-ERR background save already in progress\r\n"));
3289 if (rdbSaveBackground(server
.dbfilename
) == REDIS_OK
) {
3290 addReply(c
,shared
.ok
);
3292 addReply(c
,shared
.err
);
3296 static void shutdownCommand(redisClient
*c
) {
3297 redisLog(REDIS_WARNING
,"User requested shutdown, saving DB...");
3298 /* Kill the saving child if there is a background saving in progress.
3299 We want to avoid race conditions, for instance our saving child may
3300 overwrite the synchronous saving did by SHUTDOWN. */
3301 if (server
.bgsavechildpid
!= -1) {
3302 redisLog(REDIS_WARNING
,"There is a live saving child. Killing it!");
3303 kill(server
.bgsavechildpid
,SIGKILL
);
3304 rdbRemoveTempFile(server
.bgsavechildpid
);
3307 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
3308 if (server
.daemonize
)
3309 unlink(server
.pidfile
);
3310 redisLog(REDIS_WARNING
,"%zu bytes used at exit",zmalloc_used_memory());
3311 redisLog(REDIS_WARNING
,"Server exit now, bye bye...");
3314 /* Ooops.. error saving! The best we can do is to continue operating.
3315 * Note that if there was a background saving process, in the next
3316 * cron() Redis will be notified that the background saving aborted,
3317 * handling special stuff like slaves pending for synchronization... */
3318 redisLog(REDIS_WARNING
,"Error trying to save the DB, can't exit");
3319 addReplySds(c
,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3323 static void renameGenericCommand(redisClient
*c
, int nx
) {
3326 /* To use the same key as src and dst is probably an error */
3327 if (sdscmp(c
->argv
[1]->ptr
,c
->argv
[2]->ptr
) == 0) {
3328 addReply(c
,shared
.sameobjecterr
);
3332 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3334 addReply(c
,shared
.nokeyerr
);
3338 deleteIfVolatile(c
->db
,c
->argv
[2]);
3339 if (dictAdd(c
->db
->dict
,c
->argv
[2],o
) == DICT_ERR
) {
3342 addReply(c
,shared
.czero
);
3345 dictReplace(c
->db
->dict
,c
->argv
[2],o
);
3347 incrRefCount(c
->argv
[2]);
3349 deleteKey(c
->db
,c
->argv
[1]);
3351 addReply(c
,nx
? shared
.cone
: shared
.ok
);
3354 static void renameCommand(redisClient
*c
) {
3355 renameGenericCommand(c
,0);
3358 static void renamenxCommand(redisClient
*c
) {
3359 renameGenericCommand(c
,1);
3362 static void moveCommand(redisClient
*c
) {
3367 /* Obtain source and target DB pointers */
3370 if (selectDb(c
,atoi(c
->argv
[2]->ptr
)) == REDIS_ERR
) {
3371 addReply(c
,shared
.outofrangeerr
);
3375 selectDb(c
,srcid
); /* Back to the source DB */
3377 /* If the user is moving using as target the same
3378 * DB as the source DB it is probably an error. */
3380 addReply(c
,shared
.sameobjecterr
);
3384 /* Check if the element exists and get a reference */
3385 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3387 addReply(c
,shared
.czero
);
3391 /* Try to add the element to the target DB */
3392 deleteIfVolatile(dst
,c
->argv
[1]);
3393 if (dictAdd(dst
->dict
,c
->argv
[1],o
) == DICT_ERR
) {
3394 addReply(c
,shared
.czero
);
3397 incrRefCount(c
->argv
[1]);
3400 /* OK! key moved, free the entry in the source DB */
3401 deleteKey(src
,c
->argv
[1]);
3403 addReply(c
,shared
.cone
);
3406 /* =================================== Lists ================================ */
3407 static void pushGenericCommand(redisClient
*c
, int where
) {
3411 lobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3413 lobj
= createListObject();
3415 if (where
== REDIS_HEAD
) {
3416 listAddNodeHead(list
,c
->argv
[2]);
3418 listAddNodeTail(list
,c
->argv
[2]);
3420 dictAdd(c
->db
->dict
,c
->argv
[1],lobj
);
3421 incrRefCount(c
->argv
[1]);
3422 incrRefCount(c
->argv
[2]);
3424 if (lobj
->type
!= REDIS_LIST
) {
3425 addReply(c
,shared
.wrongtypeerr
);
3429 if (where
== REDIS_HEAD
) {
3430 listAddNodeHead(list
,c
->argv
[2]);
3432 listAddNodeTail(list
,c
->argv
[2]);
3434 incrRefCount(c
->argv
[2]);
3437 addReply(c
,shared
.ok
);
3440 static void lpushCommand(redisClient
*c
) {
3441 pushGenericCommand(c
,REDIS_HEAD
);
3444 static void rpushCommand(redisClient
*c
) {
3445 pushGenericCommand(c
,REDIS_TAIL
);
3448 static void llenCommand(redisClient
*c
) {
3452 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3454 addReply(c
,shared
.czero
);
3457 if (o
->type
!= REDIS_LIST
) {
3458 addReply(c
,shared
.wrongtypeerr
);
3461 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",listLength(l
)));
3466 static void lindexCommand(redisClient
*c
) {
3468 int index
= atoi(c
->argv
[2]->ptr
);
3470 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3472 addReply(c
,shared
.nullbulk
);
3474 if (o
->type
!= REDIS_LIST
) {
3475 addReply(c
,shared
.wrongtypeerr
);
3477 list
*list
= o
->ptr
;
3480 ln
= listIndex(list
, index
);
3482 addReply(c
,shared
.nullbulk
);
3484 robj
*ele
= listNodeValue(ln
);
3485 addReplyBulkLen(c
,ele
);
3487 addReply(c
,shared
.crlf
);
3493 static void lsetCommand(redisClient
*c
) {
3495 int index
= atoi(c
->argv
[2]->ptr
);
3497 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3499 addReply(c
,shared
.nokeyerr
);
3501 if (o
->type
!= REDIS_LIST
) {
3502 addReply(c
,shared
.wrongtypeerr
);
3504 list
*list
= o
->ptr
;
3507 ln
= listIndex(list
, index
);
3509 addReply(c
,shared
.outofrangeerr
);
3511 robj
*ele
= listNodeValue(ln
);
3514 listNodeValue(ln
) = c
->argv
[3];
3515 incrRefCount(c
->argv
[3]);
3516 addReply(c
,shared
.ok
);
3523 static void popGenericCommand(redisClient
*c
, int where
) {
3526 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3528 addReply(c
,shared
.nullbulk
);
3530 if (o
->type
!= REDIS_LIST
) {
3531 addReply(c
,shared
.wrongtypeerr
);
3533 list
*list
= o
->ptr
;
3536 if (where
== REDIS_HEAD
)
3537 ln
= listFirst(list
);
3539 ln
= listLast(list
);
3542 addReply(c
,shared
.nullbulk
);
3544 robj
*ele
= listNodeValue(ln
);
3545 addReplyBulkLen(c
,ele
);
3547 addReply(c
,shared
.crlf
);
3548 listDelNode(list
,ln
);
3555 static void lpopCommand(redisClient
*c
) {
3556 popGenericCommand(c
,REDIS_HEAD
);
3559 static void rpopCommand(redisClient
*c
) {
3560 popGenericCommand(c
,REDIS_TAIL
);
3563 static void lrangeCommand(redisClient
*c
) {
3565 int start
= atoi(c
->argv
[2]->ptr
);
3566 int end
= atoi(c
->argv
[3]->ptr
);
3568 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3570 addReply(c
,shared
.nullmultibulk
);
3572 if (o
->type
!= REDIS_LIST
) {
3573 addReply(c
,shared
.wrongtypeerr
);
3575 list
*list
= o
->ptr
;
3577 int llen
= listLength(list
);
3581 /* convert negative indexes */
3582 if (start
< 0) start
= llen
+start
;
3583 if (end
< 0) end
= llen
+end
;
3584 if (start
< 0) start
= 0;
3585 if (end
< 0) end
= 0;
3587 /* indexes sanity checks */
3588 if (start
> end
|| start
>= llen
) {
3589 /* Out of range start or start > end result in empty list */
3590 addReply(c
,shared
.emptymultibulk
);
3593 if (end
>= llen
) end
= llen
-1;
3594 rangelen
= (end
-start
)+1;
3596 /* Return the result in form of a multi-bulk reply */
3597 ln
= listIndex(list
, start
);
3598 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",rangelen
));
3599 for (j
= 0; j
< rangelen
; j
++) {
3600 ele
= listNodeValue(ln
);
3601 addReplyBulkLen(c
,ele
);
3603 addReply(c
,shared
.crlf
);
3610 static void ltrimCommand(redisClient
*c
) {
3612 int start
= atoi(c
->argv
[2]->ptr
);
3613 int end
= atoi(c
->argv
[3]->ptr
);
3615 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3617 addReply(c
,shared
.nokeyerr
);
3619 if (o
->type
!= REDIS_LIST
) {
3620 addReply(c
,shared
.wrongtypeerr
);
3622 list
*list
= o
->ptr
;
3624 int llen
= listLength(list
);
3625 int j
, ltrim
, rtrim
;
3627 /* convert negative indexes */
3628 if (start
< 0) start
= llen
+start
;
3629 if (end
< 0) end
= llen
+end
;
3630 if (start
< 0) start
= 0;
3631 if (end
< 0) end
= 0;
3633 /* indexes sanity checks */
3634 if (start
> end
|| start
>= llen
) {
3635 /* Out of range start or start > end result in empty list */
3639 if (end
>= llen
) end
= llen
-1;
3644 /* Remove list elements to perform the trim */
3645 for (j
= 0; j
< ltrim
; j
++) {
3646 ln
= listFirst(list
);
3647 listDelNode(list
,ln
);
3649 for (j
= 0; j
< rtrim
; j
++) {
3650 ln
= listLast(list
);
3651 listDelNode(list
,ln
);
3654 addReply(c
,shared
.ok
);
3659 static void lremCommand(redisClient
*c
) {
3662 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3664 addReply(c
,shared
.czero
);
3666 if (o
->type
!= REDIS_LIST
) {
3667 addReply(c
,shared
.wrongtypeerr
);
3669 list
*list
= o
->ptr
;
3670 listNode
*ln
, *next
;
3671 int toremove
= atoi(c
->argv
[2]->ptr
);
3676 toremove
= -toremove
;
3679 ln
= fromtail
? list
->tail
: list
->head
;
3681 robj
*ele
= listNodeValue(ln
);
3683 next
= fromtail
? ln
->prev
: ln
->next
;
3684 if (compareStringObjects(ele
,c
->argv
[3]) == 0) {
3685 listDelNode(list
,ln
);
3688 if (toremove
&& removed
== toremove
) break;
3692 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",removed
));
3697 /* This is the semantic of this command:
3698 * RPOPLPUSH srclist dstlist:
3699 * IF LLEN(srclist) > 0
3700 * element = RPOP srclist
3701 * LPUSH dstlist element
3708 * The idea is to be able to get an element from a list in a reliable way
3709 * since the element is not just returned but pushed against another list
3710 * as well. This command was originally proposed by Ezra Zygmuntowicz.
3712 static void rpoplpushcommand(redisClient
*c
) {
3715 sobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3717 addReply(c
,shared
.nullbulk
);
3719 if (sobj
->type
!= REDIS_LIST
) {
3720 addReply(c
,shared
.wrongtypeerr
);
3722 list
*srclist
= sobj
->ptr
;
3723 listNode
*ln
= listLast(srclist
);
3726 addReply(c
,shared
.nullbulk
);
3728 robj
*dobj
= lookupKeyWrite(c
->db
,c
->argv
[2]);
3729 robj
*ele
= listNodeValue(ln
);
3734 /* Create the list if the key does not exist */
3735 dobj
= createListObject();
3736 dictAdd(c
->db
->dict
,c
->argv
[2],dobj
);
3737 incrRefCount(c
->argv
[2]);
3738 } else if (dobj
->type
!= REDIS_LIST
) {
3739 addReply(c
,shared
.wrongtypeerr
);
3742 /* Add the element to the target list */
3743 dstlist
= dobj
->ptr
;
3744 listAddNodeHead(dstlist
,ele
);
3747 /* Send the element to the client as reply as well */
3748 addReplyBulkLen(c
,ele
);
3750 addReply(c
,shared
.crlf
);
3752 /* Finally remove the element from the source list */
3753 listDelNode(srclist
,ln
);
3761 /* ==================================== Sets ================================ */
3763 static void saddCommand(redisClient
*c
) {
3766 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3768 set
= createSetObject();
3769 dictAdd(c
->db
->dict
,c
->argv
[1],set
);
3770 incrRefCount(c
->argv
[1]);
3772 if (set
->type
!= REDIS_SET
) {
3773 addReply(c
,shared
.wrongtypeerr
);
3777 if (dictAdd(set
->ptr
,c
->argv
[2],NULL
) == DICT_OK
) {
3778 incrRefCount(c
->argv
[2]);
3780 addReply(c
,shared
.cone
);
3782 addReply(c
,shared
.czero
);
3786 static void sremCommand(redisClient
*c
) {
3789 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3791 addReply(c
,shared
.czero
);
3793 if (set
->type
!= REDIS_SET
) {
3794 addReply(c
,shared
.wrongtypeerr
);
3797 if (dictDelete(set
->ptr
,c
->argv
[2]) == DICT_OK
) {
3799 if (htNeedsResize(set
->ptr
)) dictResize(set
->ptr
);
3800 addReply(c
,shared
.cone
);
3802 addReply(c
,shared
.czero
);
3807 static void smoveCommand(redisClient
*c
) {
3808 robj
*srcset
, *dstset
;
3810 srcset
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3811 dstset
= lookupKeyWrite(c
->db
,c
->argv
[2]);
3813 /* If the source key does not exist return 0, if it's of the wrong type
3815 if (srcset
== NULL
|| srcset
->type
!= REDIS_SET
) {
3816 addReply(c
, srcset
? shared
.wrongtypeerr
: shared
.czero
);
3819 /* Error if the destination key is not a set as well */
3820 if (dstset
&& dstset
->type
!= REDIS_SET
) {
3821 addReply(c
,shared
.wrongtypeerr
);
3824 /* Remove the element from the source set */
3825 if (dictDelete(srcset
->ptr
,c
->argv
[3]) == DICT_ERR
) {
3826 /* Key not found in the src set! return zero */
3827 addReply(c
,shared
.czero
);
3831 /* Add the element to the destination set */
3833 dstset
= createSetObject();
3834 dictAdd(c
->db
->dict
,c
->argv
[2],dstset
);
3835 incrRefCount(c
->argv
[2]);
3837 if (dictAdd(dstset
->ptr
,c
->argv
[3],NULL
) == DICT_OK
)
3838 incrRefCount(c
->argv
[3]);
3839 addReply(c
,shared
.cone
);
3842 static void sismemberCommand(redisClient
*c
) {
3845 set
= lookupKeyRead(c
->db
,c
->argv
[1]);
3847 addReply(c
,shared
.czero
);
3849 if (set
->type
!= REDIS_SET
) {
3850 addReply(c
,shared
.wrongtypeerr
);
3853 if (dictFind(set
->ptr
,c
->argv
[2]))
3854 addReply(c
,shared
.cone
);
3856 addReply(c
,shared
.czero
);
3860 static void scardCommand(redisClient
*c
) {
3864 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3866 addReply(c
,shared
.czero
);
3869 if (o
->type
!= REDIS_SET
) {
3870 addReply(c
,shared
.wrongtypeerr
);
3873 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",
3879 static void spopCommand(redisClient
*c
) {
3883 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3885 addReply(c
,shared
.nullbulk
);
3887 if (set
->type
!= REDIS_SET
) {
3888 addReply(c
,shared
.wrongtypeerr
);
3891 de
= dictGetRandomKey(set
->ptr
);
3893 addReply(c
,shared
.nullbulk
);
3895 robj
*ele
= dictGetEntryKey(de
);
3897 addReplyBulkLen(c
,ele
);
3899 addReply(c
,shared
.crlf
);
3900 dictDelete(set
->ptr
,ele
);
3901 if (htNeedsResize(set
->ptr
)) dictResize(set
->ptr
);
3907 static void srandmemberCommand(redisClient
*c
) {
3911 set
= lookupKeyRead(c
->db
,c
->argv
[1]);
3913 addReply(c
,shared
.nullbulk
);
3915 if (set
->type
!= REDIS_SET
) {
3916 addReply(c
,shared
.wrongtypeerr
);
3919 de
= dictGetRandomKey(set
->ptr
);
3921 addReply(c
,shared
.nullbulk
);
3923 robj
*ele
= dictGetEntryKey(de
);
3925 addReplyBulkLen(c
,ele
);
3927 addReply(c
,shared
.crlf
);
3932 static int qsortCompareSetsByCardinality(const void *s1
, const void *s2
) {
3933 dict
**d1
= (void*) s1
, **d2
= (void*) s2
;
3935 return dictSize(*d1
)-dictSize(*d2
);
3938 static void sinterGenericCommand(redisClient
*c
, robj
**setskeys
, unsigned long setsnum
, robj
*dstkey
) {
3939 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
3942 robj
*lenobj
= NULL
, *dstset
= NULL
;
3943 unsigned long j
, cardinality
= 0;
3945 for (j
= 0; j
< setsnum
; j
++) {
3949 lookupKeyWrite(c
->db
,setskeys
[j
]) :
3950 lookupKeyRead(c
->db
,setskeys
[j
]);
3954 deleteKey(c
->db
,dstkey
);
3955 addReply(c
,shared
.ok
);
3957 addReply(c
,shared
.nullmultibulk
);
3961 if (setobj
->type
!= REDIS_SET
) {
3963 addReply(c
,shared
.wrongtypeerr
);
3966 dv
[j
] = setobj
->ptr
;
3968 /* Sort sets from the smallest to largest, this will improve our
3969 * algorithm's performace */
3970 qsort(dv
,setsnum
,sizeof(dict
*),qsortCompareSetsByCardinality
);
3972 /* The first thing we should output is the total number of elements...
3973 * since this is a multi-bulk write, but at this stage we don't know
3974 * the intersection set size, so we use a trick, append an empty object
3975 * to the output list and save the pointer to later modify it with the
3978 lenobj
= createObject(REDIS_STRING
,NULL
);
3980 decrRefCount(lenobj
);
3982 /* If we have a target key where to store the resulting set
3983 * create this key with an empty set inside */
3984 dstset
= createSetObject();
3987 /* Iterate all the elements of the first (smallest) set, and test
3988 * the element against all the other sets, if at least one set does
3989 * not include the element it is discarded */
3990 di
= dictGetIterator(dv
[0]);
3992 while((de
= dictNext(di
)) != NULL
) {
3995 for (j
= 1; j
< setsnum
; j
++)
3996 if (dictFind(dv
[j
],dictGetEntryKey(de
)) == NULL
) break;
3998 continue; /* at least one set does not contain the member */
3999 ele
= dictGetEntryKey(de
);
4001 addReplyBulkLen(c
,ele
);
4003 addReply(c
,shared
.crlf
);
4006 dictAdd(dstset
->ptr
,ele
,NULL
);
4010 dictReleaseIterator(di
);
4013 /* Store the resulting set into the target */
4014 deleteKey(c
->db
,dstkey
);
4015 dictAdd(c
->db
->dict
,dstkey
,dstset
);
4016 incrRefCount(dstkey
);
4020 lenobj
->ptr
= sdscatprintf(sdsempty(),"*%lu\r\n",cardinality
);
4022 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",
4023 dictSize((dict
*)dstset
->ptr
)));
4029 static void sinterCommand(redisClient
*c
) {
4030 sinterGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
);
4033 static void sinterstoreCommand(redisClient
*c
) {
4034 sinterGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1]);
4037 #define REDIS_OP_UNION 0
4038 #define REDIS_OP_DIFF 1
4040 static void sunionDiffGenericCommand(redisClient
*c
, robj
**setskeys
, int setsnum
, robj
*dstkey
, int op
) {
4041 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
4044 robj
*dstset
= NULL
;
4045 int j
, cardinality
= 0;
4047 for (j
= 0; j
< setsnum
; j
++) {
4051 lookupKeyWrite(c
->db
,setskeys
[j
]) :
4052 lookupKeyRead(c
->db
,setskeys
[j
]);
4057 if (setobj
->type
!= REDIS_SET
) {
4059 addReply(c
,shared
.wrongtypeerr
);
4062 dv
[j
] = setobj
->ptr
;
4065 /* We need a temp set object to store our union. If the dstkey
4066 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
4067 * this set object will be the resulting object to set into the target key*/
4068 dstset
= createSetObject();
4070 /* Iterate all the elements of all the sets, add every element a single
4071 * time to the result set */
4072 for (j
= 0; j
< setsnum
; j
++) {
4073 if (op
== REDIS_OP_DIFF
&& j
== 0 && !dv
[j
]) break; /* result set is empty */
4074 if (!dv
[j
]) continue; /* non existing keys are like empty sets */
4076 di
= dictGetIterator(dv
[j
]);
4078 while((de
= dictNext(di
)) != NULL
) {
4081 /* dictAdd will not add the same element multiple times */
4082 ele
= dictGetEntryKey(de
);
4083 if (op
== REDIS_OP_UNION
|| j
== 0) {
4084 if (dictAdd(dstset
->ptr
,ele
,NULL
) == DICT_OK
) {
4088 } else if (op
== REDIS_OP_DIFF
) {
4089 if (dictDelete(dstset
->ptr
,ele
) == DICT_OK
) {
4094 dictReleaseIterator(di
);
4096 if (op
== REDIS_OP_DIFF
&& cardinality
== 0) break; /* result set is empty */
4099 /* Output the content of the resulting set, if not in STORE mode */
4101 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",cardinality
));
4102 di
= dictGetIterator(dstset
->ptr
);
4103 while((de
= dictNext(di
)) != NULL
) {
4106 ele
= dictGetEntryKey(de
);
4107 addReplyBulkLen(c
,ele
);
4109 addReply(c
,shared
.crlf
);
4111 dictReleaseIterator(di
);
4113 /* If we have a target key where to store the resulting set
4114 * create this key with the result set inside */
4115 deleteKey(c
->db
,dstkey
);
4116 dictAdd(c
->db
->dict
,dstkey
,dstset
);
4117 incrRefCount(dstkey
);
4122 decrRefCount(dstset
);
4124 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",
4125 dictSize((dict
*)dstset
->ptr
)));
4131 static void sunionCommand(redisClient
*c
) {
4132 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_UNION
);
4135 static void sunionstoreCommand(redisClient
*c
) {
4136 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_UNION
);
4139 static void sdiffCommand(redisClient
*c
) {
4140 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_DIFF
);
4143 static void sdiffstoreCommand(redisClient
*c
) {
4144 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_DIFF
);
4147 /* ==================================== ZSets =============================== */
4149 /* ZSETs are ordered sets using two data structures to hold the same elements
4150 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
4153 * The elements are added to an hash table mapping Redis objects to scores.
4154 * At the same time the elements are added to a skip list mapping scores
4155 * to Redis objects (so objects are sorted by scores in this "view"). */
4157 /* This skiplist implementation is almost a C translation of the original
4158 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
4159 * Alternative to Balanced Trees", modified in three ways:
4160 * a) this implementation allows for repeated values.
4161 * b) the comparison is not just by key (our 'score') but by satellite data.
4162 * c) there is a back pointer, so it's a doubly linked list with the back
4163 * pointers being only at "level 1". This allows to traverse the list
4164 * from tail to head, useful for ZREVRANGE. */
4166 static zskiplistNode
*zslCreateNode(int level
, double score
, robj
*obj
) {
4167 zskiplistNode
*zn
= zmalloc(sizeof(*zn
));
4169 zn
->forward
= zmalloc(sizeof(zskiplistNode
*) * level
);
4175 static zskiplist
*zslCreate(void) {
4179 zsl
= zmalloc(sizeof(*zsl
));
4182 zsl
->header
= zslCreateNode(ZSKIPLIST_MAXLEVEL
,0,NULL
);
4183 for (j
= 0; j
< ZSKIPLIST_MAXLEVEL
; j
++)
4184 zsl
->header
->forward
[j
] = NULL
;
4185 zsl
->header
->backward
= NULL
;
4190 static void zslFreeNode(zskiplistNode
*node
) {
4191 decrRefCount(node
->obj
);
4192 zfree(node
->forward
);
4196 static void zslFree(zskiplist
*zsl
) {
4197 zskiplistNode
*node
= zsl
->header
->forward
[0], *next
;
4199 zfree(zsl
->header
->forward
);
4202 next
= node
->forward
[0];
4209 static int zslRandomLevel(void) {
4211 while ((random()&0xFFFF) < (ZSKIPLIST_P
* 0xFFFF))
4216 static void zslInsert(zskiplist
*zsl
, double score
, robj
*obj
) {
4217 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
4221 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4222 while (x
->forward
[i
] &&
4223 (x
->forward
[i
]->score
< score
||
4224 (x
->forward
[i
]->score
== score
&&
4225 compareStringObjects(x
->forward
[i
]->obj
,obj
) < 0)))
4229 /* we assume the key is not already inside, since we allow duplicated
4230 * scores, and the re-insertion of score and redis object should never
4231 * happpen since the caller of zslInsert() should test in the hash table
4232 * if the element is already inside or not. */
4233 level
= zslRandomLevel();
4234 if (level
> zsl
->level
) {
4235 for (i
= zsl
->level
; i
< level
; i
++)
4236 update
[i
] = zsl
->header
;
4239 x
= zslCreateNode(level
,score
,obj
);
4240 for (i
= 0; i
< level
; i
++) {
4241 x
->forward
[i
] = update
[i
]->forward
[i
];
4242 update
[i
]->forward
[i
] = x
;
4244 x
->backward
= (update
[0] == zsl
->header
) ? NULL
: update
[0];
4246 x
->forward
[0]->backward
= x
;
4252 /* Delete an element with matching score/object from the skiplist. */
4253 static int zslDelete(zskiplist
*zsl
, double score
, robj
*obj
) {
4254 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
4258 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4259 while (x
->forward
[i
] &&
4260 (x
->forward
[i
]->score
< score
||
4261 (x
->forward
[i
]->score
== score
&&
4262 compareStringObjects(x
->forward
[i
]->obj
,obj
) < 0)))
4266 /* We may have multiple elements with the same score, what we need
4267 * is to find the element with both the right score and object. */
4269 if (x
&& score
== x
->score
&& compareStringObjects(x
->obj
,obj
) == 0) {
4270 for (i
= 0; i
< zsl
->level
; i
++) {
4271 if (update
[i
]->forward
[i
] != x
) break;
4272 update
[i
]->forward
[i
] = x
->forward
[i
];
4274 if (x
->forward
[0]) {
4275 x
->forward
[0]->backward
= (x
->backward
== zsl
->header
) ?
4278 zsl
->tail
= x
->backward
;
4281 while(zsl
->level
> 1 && zsl
->header
->forward
[zsl
->level
-1] == NULL
)
4286 return 0; /* not found */
4288 return 0; /* not found */
4291 /* Delete all the elements with score between min and max from the skiplist.
4292 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
4293 * Note that this function takes the reference to the hash table view of the
4294 * sorted set, in order to remove the elements from the hash table too. */
4295 static unsigned long zslDeleteRange(zskiplist
*zsl
, double min
, double max
, dict
*dict
) {
4296 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
4297 unsigned long removed
= 0;
4301 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4302 while (x
->forward
[i
] && x
->forward
[i
]->score
< min
)
4306 /* We may have multiple elements with the same score, what we need
4307 * is to find the element with both the right score and object. */
4309 while (x
&& x
->score
<= max
) {
4310 zskiplistNode
*next
;
4312 for (i
= 0; i
< zsl
->level
; i
++) {
4313 if (update
[i
]->forward
[i
] != x
) break;
4314 update
[i
]->forward
[i
] = x
->forward
[i
];
4316 if (x
->forward
[0]) {
4317 x
->forward
[0]->backward
= (x
->backward
== zsl
->header
) ?
4320 zsl
->tail
= x
->backward
;
4322 next
= x
->forward
[0];
4323 dictDelete(dict
,x
->obj
);
4325 while(zsl
->level
> 1 && zsl
->header
->forward
[zsl
->level
-1] == NULL
)
4331 return removed
; /* not found */
4334 /* Find the first node having a score equal or greater than the specified one.
4335 * Returns NULL if there is no match. */
4336 static zskiplistNode
*zslFirstWithScore(zskiplist
*zsl
, double score
) {
4341 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4342 while (x
->forward
[i
] && x
->forward
[i
]->score
< score
)
4345 /* We may have multiple elements with the same score, what we need
4346 * is to find the element with both the right score and object. */
4347 return x
->forward
[0];
4350 /* The actual Z-commands implementations */
4352 /* This generic command implements both ZADD and ZINCRBY.
4353 * scoreval is the score if the operation is a ZADD (doincrement == 0) or
4354 * the increment if the operation is a ZINCRBY (doincrement == 1). */
4355 static void zaddGenericCommand(redisClient
*c
, robj
*key
, robj
*ele
, double scoreval
, int doincrement
) {
4360 zsetobj
= lookupKeyWrite(c
->db
,key
);
4361 if (zsetobj
== NULL
) {
4362 zsetobj
= createZsetObject();
4363 dictAdd(c
->db
->dict
,key
,zsetobj
);
4366 if (zsetobj
->type
!= REDIS_ZSET
) {
4367 addReply(c
,shared
.wrongtypeerr
);
4373 /* Ok now since we implement both ZADD and ZINCRBY here the code
4374 * needs to handle the two different conditions. It's all about setting
4375 * '*score', that is, the new score to set, to the right value. */
4376 score
= zmalloc(sizeof(double));
4380 /* Read the old score. If the element was not present starts from 0 */
4381 de
= dictFind(zs
->dict
,ele
);
4383 double *oldscore
= dictGetEntryVal(de
);
4384 *score
= *oldscore
+ scoreval
;
4392 /* What follows is a simple remove and re-insert operation that is common
4393 * to both ZADD and ZINCRBY... */
4394 if (dictAdd(zs
->dict
,ele
,score
) == DICT_OK
) {
4395 /* case 1: New element */
4396 incrRefCount(ele
); /* added to hash */
4397 zslInsert(zs
->zsl
,*score
,ele
);
4398 incrRefCount(ele
); /* added to skiplist */
4401 addReplyDouble(c
,*score
);
4403 addReply(c
,shared
.cone
);
4408 /* case 2: Score update operation */
4409 de
= dictFind(zs
->dict
,ele
);
4410 redisAssert(de
!= NULL
);
4411 oldscore
= dictGetEntryVal(de
);
4412 if (*score
!= *oldscore
) {
4415 /* Remove and insert the element in the skip list with new score */
4416 deleted
= zslDelete(zs
->zsl
,*oldscore
,ele
);
4417 redisAssert(deleted
!= 0);
4418 zslInsert(zs
->zsl
,*score
,ele
);
4420 /* Update the score in the hash table */
4421 dictReplace(zs
->dict
,ele
,score
);
4427 addReplyDouble(c
,*score
);
4429 addReply(c
,shared
.czero
);
4433 static void zaddCommand(redisClient
*c
) {
4436 scoreval
= strtod(c
->argv
[2]->ptr
,NULL
);
4437 zaddGenericCommand(c
,c
->argv
[1],c
->argv
[3],scoreval
,0);
4440 static void zincrbyCommand(redisClient
*c
) {
4443 scoreval
= strtod(c
->argv
[2]->ptr
,NULL
);
4444 zaddGenericCommand(c
,c
->argv
[1],c
->argv
[3],scoreval
,1);
4447 static void zremCommand(redisClient
*c
) {
4451 zsetobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4452 if (zsetobj
== NULL
) {
4453 addReply(c
,shared
.czero
);
4459 if (zsetobj
->type
!= REDIS_ZSET
) {
4460 addReply(c
,shared
.wrongtypeerr
);
4464 de
= dictFind(zs
->dict
,c
->argv
[2]);
4466 addReply(c
,shared
.czero
);
4469 /* Delete from the skiplist */
4470 oldscore
= dictGetEntryVal(de
);
4471 deleted
= zslDelete(zs
->zsl
,*oldscore
,c
->argv
[2]);
4472 redisAssert(deleted
!= 0);
4474 /* Delete from the hash table */
4475 dictDelete(zs
->dict
,c
->argv
[2]);
4476 if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
);
4478 addReply(c
,shared
.cone
);
4482 static void zremrangebyscoreCommand(redisClient
*c
) {
4483 double min
= strtod(c
->argv
[2]->ptr
,NULL
);
4484 double max
= strtod(c
->argv
[3]->ptr
,NULL
);
4488 zsetobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4489 if (zsetobj
== NULL
) {
4490 addReply(c
,shared
.czero
);
4494 if (zsetobj
->type
!= REDIS_ZSET
) {
4495 addReply(c
,shared
.wrongtypeerr
);
4499 deleted
= zslDeleteRange(zs
->zsl
,min
,max
,zs
->dict
);
4500 if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
);
4501 server
.dirty
+= deleted
;
4502 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",deleted
));
4506 static void zrangeGenericCommand(redisClient
*c
, int reverse
) {
4508 int start
= atoi(c
->argv
[2]->ptr
);
4509 int end
= atoi(c
->argv
[3]->ptr
);
4511 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4513 addReply(c
,shared
.nullmultibulk
);
4515 if (o
->type
!= REDIS_ZSET
) {
4516 addReply(c
,shared
.wrongtypeerr
);
4518 zset
*zsetobj
= o
->ptr
;
4519 zskiplist
*zsl
= zsetobj
->zsl
;
4522 int llen
= zsl
->length
;
4526 /* convert negative indexes */
4527 if (start
< 0) start
= llen
+start
;
4528 if (end
< 0) end
= llen
+end
;
4529 if (start
< 0) start
= 0;
4530 if (end
< 0) end
= 0;
4532 /* indexes sanity checks */
4533 if (start
> end
|| start
>= llen
) {
4534 /* Out of range start or start > end result in empty list */
4535 addReply(c
,shared
.emptymultibulk
);
4538 if (end
>= llen
) end
= llen
-1;
4539 rangelen
= (end
-start
)+1;
4541 /* Return the result in form of a multi-bulk reply */
4547 ln
= zsl
->header
->forward
[0];
4549 ln
= ln
->forward
[0];
4552 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",rangelen
));
4553 for (j
= 0; j
< rangelen
; j
++) {
4555 addReplyBulkLen(c
,ele
);
4557 addReply(c
,shared
.crlf
);
4558 ln
= reverse
? ln
->backward
: ln
->forward
[0];
4564 static void zrangeCommand(redisClient
*c
) {
4565 zrangeGenericCommand(c
,0);
4568 static void zrevrangeCommand(redisClient
*c
) {
4569 zrangeGenericCommand(c
,1);
4572 static void zrangebyscoreCommand(redisClient
*c
) {
4574 double min
= strtod(c
->argv
[2]->ptr
,NULL
);
4575 double max
= strtod(c
->argv
[3]->ptr
,NULL
);
4576 int offset
= 0, limit
= -1;
4578 if (c
->argc
!= 4 && c
->argc
!= 7) {
4579 addReplySds(c
,sdsnew("-ERR wrong number of arguments\r\n"));
4581 } else if (c
->argc
== 7 && strcasecmp(c
->argv
[4]->ptr
,"limit")) {
4582 addReply(c
,shared
.syntaxerr
);
4584 } else if (c
->argc
== 7) {
4585 offset
= atoi(c
->argv
[5]->ptr
);
4586 limit
= atoi(c
->argv
[6]->ptr
);
4587 if (offset
< 0) offset
= 0;
4590 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4592 addReply(c
,shared
.nullmultibulk
);
4594 if (o
->type
!= REDIS_ZSET
) {
4595 addReply(c
,shared
.wrongtypeerr
);
4597 zset
*zsetobj
= o
->ptr
;
4598 zskiplist
*zsl
= zsetobj
->zsl
;
4601 unsigned int rangelen
= 0;
4603 /* Get the first node with the score >= min */
4604 ln
= zslFirstWithScore(zsl
,min
);
4606 /* No element matching the speciifed interval */
4607 addReply(c
,shared
.emptymultibulk
);
4611 /* We don't know in advance how many matching elements there
4612 * are in the list, so we push this object that will represent
4613 * the multi-bulk length in the output buffer, and will "fix"
4615 lenobj
= createObject(REDIS_STRING
,NULL
);
4617 decrRefCount(lenobj
);
4619 while(ln
&& ln
->score
<= max
) {
4622 ln
= ln
->forward
[0];
4625 if (limit
== 0) break;
4627 addReplyBulkLen(c
,ele
);
4629 addReply(c
,shared
.crlf
);
4630 ln
= ln
->forward
[0];
4632 if (limit
> 0) limit
--;
4634 lenobj
->ptr
= sdscatprintf(sdsempty(),"*%d\r\n",rangelen
);
4639 static void zcardCommand(redisClient
*c
) {
4643 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4645 addReply(c
,shared
.czero
);
4648 if (o
->type
!= REDIS_ZSET
) {
4649 addReply(c
,shared
.wrongtypeerr
);
4652 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",zs
->zsl
->length
));
4657 static void zscoreCommand(redisClient
*c
) {
4661 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4663 addReply(c
,shared
.nullbulk
);
4666 if (o
->type
!= REDIS_ZSET
) {
4667 addReply(c
,shared
.wrongtypeerr
);
4672 de
= dictFind(zs
->dict
,c
->argv
[2]);
4674 addReply(c
,shared
.nullbulk
);
4676 double *score
= dictGetEntryVal(de
);
4678 addReplyDouble(c
,*score
);
4684 /* ========================= Non type-specific commands ==================== */
4686 static void flushdbCommand(redisClient
*c
) {
4687 server
.dirty
+= dictSize(c
->db
->dict
);
4688 dictEmpty(c
->db
->dict
);
4689 dictEmpty(c
->db
->expires
);
4690 addReply(c
,shared
.ok
);
4693 static void flushallCommand(redisClient
*c
) {
4694 server
.dirty
+= emptyDb();
4695 addReply(c
,shared
.ok
);
4696 rdbSave(server
.dbfilename
);
4700 static redisSortOperation
*createSortOperation(int type
, robj
*pattern
) {
4701 redisSortOperation
*so
= zmalloc(sizeof(*so
));
4703 so
->pattern
= pattern
;
4707 /* Return the value associated to the key with a name obtained
4708 * substituting the first occurence of '*' in 'pattern' with 'subst' */
4709 static robj
*lookupKeyByPattern(redisDb
*db
, robj
*pattern
, robj
*subst
) {
4713 int prefixlen
, sublen
, postfixlen
;
4714 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
4718 char buf
[REDIS_SORTKEY_MAX
+1];
4721 /* If the pattern is "#" return the substitution object itself in order
4722 * to implement the "SORT ... GET #" feature. */
4723 spat
= pattern
->ptr
;
4724 if (spat
[0] == '#' && spat
[1] == '\0') {
4728 /* The substitution object may be specially encoded. If so we create
4729 * a decoded object on the fly. Otherwise getDecodedObject will just
4730 * increment the ref count, that we'll decrement later. */
4731 subst
= getDecodedObject(subst
);
4734 if (sdslen(spat
)+sdslen(ssub
)-1 > REDIS_SORTKEY_MAX
) return NULL
;
4735 p
= strchr(spat
,'*');
4737 decrRefCount(subst
);
4742 sublen
= sdslen(ssub
);
4743 postfixlen
= sdslen(spat
)-(prefixlen
+1);
4744 memcpy(keyname
.buf
,spat
,prefixlen
);
4745 memcpy(keyname
.buf
+prefixlen
,ssub
,sublen
);
4746 memcpy(keyname
.buf
+prefixlen
+sublen
,p
+1,postfixlen
);
4747 keyname
.buf
[prefixlen
+sublen
+postfixlen
] = '\0';
4748 keyname
.len
= prefixlen
+sublen
+postfixlen
;
4750 initStaticStringObject(keyobj
,((char*)&keyname
)+(sizeof(long)*2))
4751 decrRefCount(subst
);
4753 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
4754 return lookupKeyRead(db
,&keyobj
);
4757 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
4758 * the additional parameter is not standard but a BSD-specific we have to
4759 * pass sorting parameters via the global 'server' structure */
4760 static int sortCompare(const void *s1
, const void *s2
) {
4761 const redisSortObject
*so1
= s1
, *so2
= s2
;
4764 if (!server
.sort_alpha
) {
4765 /* Numeric sorting. Here it's trivial as we precomputed scores */
4766 if (so1
->u
.score
> so2
->u
.score
) {
4768 } else if (so1
->u
.score
< so2
->u
.score
) {
4774 /* Alphanumeric sorting */
4775 if (server
.sort_bypattern
) {
4776 if (!so1
->u
.cmpobj
|| !so2
->u
.cmpobj
) {
4777 /* At least one compare object is NULL */
4778 if (so1
->u
.cmpobj
== so2
->u
.cmpobj
)
4780 else if (so1
->u
.cmpobj
== NULL
)
4785 /* We have both the objects, use strcoll */
4786 cmp
= strcoll(so1
->u
.cmpobj
->ptr
,so2
->u
.cmpobj
->ptr
);
4789 /* Compare elements directly */
4792 dec1
= getDecodedObject(so1
->obj
);
4793 dec2
= getDecodedObject(so2
->obj
);
4794 cmp
= strcoll(dec1
->ptr
,dec2
->ptr
);
4799 return server
.sort_desc
? -cmp
: cmp
;
4802 /* The SORT command is the most complex command in Redis. Warning: this code
4803 * is optimized for speed and a bit less for readability */
4804 static void sortCommand(redisClient
*c
) {
4807 int desc
= 0, alpha
= 0;
4808 int limit_start
= 0, limit_count
= -1, start
, end
;
4809 int j
, dontsort
= 0, vectorlen
;
4810 int getop
= 0; /* GET operation counter */
4811 robj
*sortval
, *sortby
= NULL
, *storekey
= NULL
;
4812 redisSortObject
*vector
; /* Resulting vector to sort */
4814 /* Lookup the key to sort. It must be of the right types */
4815 sortval
= lookupKeyRead(c
->db
,c
->argv
[1]);
4816 if (sortval
== NULL
) {
4817 addReply(c
,shared
.nokeyerr
);
4820 if (sortval
->type
!= REDIS_SET
&& sortval
->type
!= REDIS_LIST
&&
4821 sortval
->type
!= REDIS_ZSET
)
4823 addReply(c
,shared
.wrongtypeerr
);
4827 /* Create a list of operations to perform for every sorted element.
4828 * Operations can be GET/DEL/INCR/DECR */
4829 operations
= listCreate();
4830 listSetFreeMethod(operations
,zfree
);
4833 /* Now we need to protect sortval incrementing its count, in the future
4834 * SORT may have options able to overwrite/delete keys during the sorting
4835 * and the sorted key itself may get destroied */
4836 incrRefCount(sortval
);
4838 /* The SORT command has an SQL-alike syntax, parse it */
4839 while(j
< c
->argc
) {
4840 int leftargs
= c
->argc
-j
-1;
4841 if (!strcasecmp(c
->argv
[j
]->ptr
,"asc")) {
4843 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"desc")) {
4845 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"alpha")) {
4847 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"limit") && leftargs
>= 2) {
4848 limit_start
= atoi(c
->argv
[j
+1]->ptr
);
4849 limit_count
= atoi(c
->argv
[j
+2]->ptr
);
4851 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"store") && leftargs
>= 1) {
4852 storekey
= c
->argv
[j
+1];
4854 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"by") && leftargs
>= 1) {
4855 sortby
= c
->argv
[j
+1];
4856 /* If the BY pattern does not contain '*', i.e. it is constant,
4857 * we don't need to sort nor to lookup the weight keys. */
4858 if (strchr(c
->argv
[j
+1]->ptr
,'*') == NULL
) dontsort
= 1;
4860 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"get") && leftargs
>= 1) {
4861 listAddNodeTail(operations
,createSortOperation(
4862 REDIS_SORT_GET
,c
->argv
[j
+1]));
4866 decrRefCount(sortval
);
4867 listRelease(operations
);
4868 addReply(c
,shared
.syntaxerr
);
4874 /* Load the sorting vector with all the objects to sort */
4875 switch(sortval
->type
) {
4876 case REDIS_LIST
: vectorlen
= listLength((list
*)sortval
->ptr
); break;
4877 case REDIS_SET
: vectorlen
= dictSize((dict
*)sortval
->ptr
); break;
4878 case REDIS_ZSET
: vectorlen
= dictSize(((zset
*)sortval
->ptr
)->dict
); break;
4879 default: vectorlen
= 0; redisAssert(0); /* Avoid GCC warning */
4881 vector
= zmalloc(sizeof(redisSortObject
)*vectorlen
);
4884 if (sortval
->type
== REDIS_LIST
) {
4885 list
*list
= sortval
->ptr
;
4889 while((ln
= listYield(list
))) {
4890 robj
*ele
= ln
->value
;
4891 vector
[j
].obj
= ele
;
4892 vector
[j
].u
.score
= 0;
4893 vector
[j
].u
.cmpobj
= NULL
;
4901 if (sortval
->type
== REDIS_SET
) {
4904 zset
*zs
= sortval
->ptr
;
4908 di
= dictGetIterator(set
);
4909 while((setele
= dictNext(di
)) != NULL
) {
4910 vector
[j
].obj
= dictGetEntryKey(setele
);
4911 vector
[j
].u
.score
= 0;
4912 vector
[j
].u
.cmpobj
= NULL
;
4915 dictReleaseIterator(di
);
4917 redisAssert(j
== vectorlen
);
4919 /* Now it's time to load the right scores in the sorting vector */
4920 if (dontsort
== 0) {
4921 for (j
= 0; j
< vectorlen
; j
++) {
4925 byval
= lookupKeyByPattern(c
->db
,sortby
,vector
[j
].obj
);
4926 if (!byval
|| byval
->type
!= REDIS_STRING
) continue;
4928 vector
[j
].u
.cmpobj
= getDecodedObject(byval
);
4930 if (byval
->encoding
== REDIS_ENCODING_RAW
) {
4931 vector
[j
].u
.score
= strtod(byval
->ptr
,NULL
);
4933 /* Don't need to decode the object if it's
4934 * integer-encoded (the only encoding supported) so
4935 * far. We can just cast it */
4936 if (byval
->encoding
== REDIS_ENCODING_INT
) {
4937 vector
[j
].u
.score
= (long)byval
->ptr
;
4939 redisAssert(1 != 1);
4944 if (vector
[j
].obj
->encoding
== REDIS_ENCODING_RAW
)
4945 vector
[j
].u
.score
= strtod(vector
[j
].obj
->ptr
,NULL
);
4947 if (vector
[j
].obj
->encoding
== REDIS_ENCODING_INT
)
4948 vector
[j
].u
.score
= (long) vector
[j
].obj
->ptr
;
4950 redisAssert(1 != 1);
4957 /* We are ready to sort the vector... perform a bit of sanity check
4958 * on the LIMIT option too. We'll use a partial version of quicksort. */
4959 start
= (limit_start
< 0) ? 0 : limit_start
;
4960 end
= (limit_count
< 0) ? vectorlen
-1 : start
+limit_count
-1;
4961 if (start
>= vectorlen
) {
4962 start
= vectorlen
-1;
4965 if (end
>= vectorlen
) end
= vectorlen
-1;
4967 if (dontsort
== 0) {
4968 server
.sort_desc
= desc
;
4969 server
.sort_alpha
= alpha
;
4970 server
.sort_bypattern
= sortby
? 1 : 0;
4971 if (sortby
&& (start
!= 0 || end
!= vectorlen
-1))
4972 pqsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
, start
,end
);
4974 qsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
);
4977 /* Send command output to the output buffer, performing the specified
4978 * GET/DEL/INCR/DECR operations if any. */
4979 outputlen
= getop
? getop
*(end
-start
+1) : end
-start
+1;
4980 if (storekey
== NULL
) {
4981 /* STORE option not specified, sent the sorting result to client */
4982 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",outputlen
));
4983 for (j
= start
; j
<= end
; j
++) {
4986 addReplyBulkLen(c
,vector
[j
].obj
);
4987 addReply(c
,vector
[j
].obj
);
4988 addReply(c
,shared
.crlf
);
4990 listRewind(operations
);
4991 while((ln
= listYield(operations
))) {
4992 redisSortOperation
*sop
= ln
->value
;
4993 robj
*val
= lookupKeyByPattern(c
->db
,sop
->pattern
,
4996 if (sop
->type
== REDIS_SORT_GET
) {
4997 if (!val
|| val
->type
!= REDIS_STRING
) {
4998 addReply(c
,shared
.nullbulk
);
5000 addReplyBulkLen(c
,val
);
5002 addReply(c
,shared
.crlf
);
5005 redisAssert(sop
->type
== REDIS_SORT_GET
); /* always fails */
5010 robj
*listObject
= createListObject();
5011 list
*listPtr
= (list
*) listObject
->ptr
;
5013 /* STORE option specified, set the sorting result as a List object */
5014 for (j
= start
; j
<= end
; j
++) {
5017 listAddNodeTail(listPtr
,vector
[j
].obj
);
5018 incrRefCount(vector
[j
].obj
);
5020 listRewind(operations
);
5021 while((ln
= listYield(operations
))) {
5022 redisSortOperation
*sop
= ln
->value
;
5023 robj
*val
= lookupKeyByPattern(c
->db
,sop
->pattern
,
5026 if (sop
->type
== REDIS_SORT_GET
) {
5027 if (!val
|| val
->type
!= REDIS_STRING
) {
5028 listAddNodeTail(listPtr
,createStringObject("",0));
5030 listAddNodeTail(listPtr
,val
);
5034 redisAssert(sop
->type
== REDIS_SORT_GET
); /* always fails */
5038 if (dictReplace(c
->db
->dict
,storekey
,listObject
)) {
5039 incrRefCount(storekey
);
5041 /* Note: we add 1 because the DB is dirty anyway since even if the
5042 * SORT result is empty a new key is set and maybe the old content
5044 server
.dirty
+= 1+outputlen
;
5045 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",outputlen
));
5049 decrRefCount(sortval
);
5050 listRelease(operations
);
5051 for (j
= 0; j
< vectorlen
; j
++) {
5052 if (sortby
&& alpha
&& vector
[j
].u
.cmpobj
)
5053 decrRefCount(vector
[j
].u
.cmpobj
);
5058 /* Create the string returned by the INFO command. This is decoupled
5059 * by the INFO command itself as we need to report the same information
5060 * on memory corruption problems. */
5061 static sds
genRedisInfoString(void) {
5063 time_t uptime
= time(NULL
)-server
.stat_starttime
;
5066 info
= sdscatprintf(sdsempty(),
5067 "redis_version:%s\r\n"
5069 "multiplexing_api:%s\r\n"
5070 "uptime_in_seconds:%ld\r\n"
5071 "uptime_in_days:%ld\r\n"
5072 "connected_clients:%d\r\n"
5073 "connected_slaves:%d\r\n"
5074 "used_memory:%zu\r\n"
5075 "changes_since_last_save:%lld\r\n"
5076 "bgsave_in_progress:%d\r\n"
5077 "last_save_time:%ld\r\n"
5078 "total_connections_received:%lld\r\n"
5079 "total_commands_processed:%lld\r\n"
5082 (sizeof(long) == 8) ? "64" : "32",
5086 listLength(server
.clients
)-listLength(server
.slaves
),
5087 listLength(server
.slaves
),
5090 server
.bgsavechildpid
!= -1,
5092 server
.stat_numconnections
,
5093 server
.stat_numcommands
,
5094 server
.masterhost
== NULL
? "master" : "slave"
5096 if (server
.masterhost
) {
5097 info
= sdscatprintf(info
,
5098 "master_host:%s\r\n"
5099 "master_port:%d\r\n"
5100 "master_link_status:%s\r\n"
5101 "master_last_io_seconds_ago:%d\r\n"
5104 (server
.replstate
== REDIS_REPL_CONNECTED
) ?
5106 server
.master
? ((int)(time(NULL
)-server
.master
->lastinteraction
)) : -1
5109 for (j
= 0; j
< server
.dbnum
; j
++) {
5110 long long keys
, vkeys
;
5112 keys
= dictSize(server
.db
[j
].dict
);
5113 vkeys
= dictSize(server
.db
[j
].expires
);
5114 if (keys
|| vkeys
) {
5115 info
= sdscatprintf(info
, "db%d:keys=%lld,expires=%lld\r\n",
5122 static void infoCommand(redisClient
*c
) {
5123 sds info
= genRedisInfoString();
5124 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n",sdslen(info
)));
5125 addReplySds(c
,info
);
5126 addReply(c
,shared
.crlf
);
5129 static void monitorCommand(redisClient
*c
) {
5130 /* ignore MONITOR if aleady slave or in monitor mode */
5131 if (c
->flags
& REDIS_SLAVE
) return;
5133 c
->flags
|= (REDIS_SLAVE
|REDIS_MONITOR
);
5135 listAddNodeTail(server
.monitors
,c
);
5136 addReply(c
,shared
.ok
);
5139 /* ================================= Expire ================================= */
5140 static int removeExpire(redisDb
*db
, robj
*key
) {
5141 if (dictDelete(db
->expires
,key
) == DICT_OK
) {
5148 static int setExpire(redisDb
*db
, robj
*key
, time_t when
) {
5149 if (dictAdd(db
->expires
,key
,(void*)when
) == DICT_ERR
) {
5157 /* Return the expire time of the specified key, or -1 if no expire
5158 * is associated with this key (i.e. the key is non volatile) */
5159 static time_t getExpire(redisDb
*db
, robj
*key
) {
5162 /* No expire? return ASAP */
5163 if (dictSize(db
->expires
) == 0 ||
5164 (de
= dictFind(db
->expires
,key
)) == NULL
) return -1;
5166 return (time_t) dictGetEntryVal(de
);
5169 static int expireIfNeeded(redisDb
*db
, robj
*key
) {
5173 /* No expire? return ASAP */
5174 if (dictSize(db
->expires
) == 0 ||
5175 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
5177 /* Lookup the expire */
5178 when
= (time_t) dictGetEntryVal(de
);
5179 if (time(NULL
) <= when
) return 0;
5181 /* Delete the key */
5182 dictDelete(db
->expires
,key
);
5183 return dictDelete(db
->dict
,key
) == DICT_OK
;
5186 static int deleteIfVolatile(redisDb
*db
, robj
*key
) {
5189 /* No expire? return ASAP */
5190 if (dictSize(db
->expires
) == 0 ||
5191 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
5193 /* Delete the key */
5195 dictDelete(db
->expires
,key
);
5196 return dictDelete(db
->dict
,key
) == DICT_OK
;
5199 static void expireGenericCommand(redisClient
*c
, robj
*key
, time_t seconds
) {
5202 de
= dictFind(c
->db
->dict
,key
);
5204 addReply(c
,shared
.czero
);
5208 if (deleteKey(c
->db
,key
)) server
.dirty
++;
5209 addReply(c
, shared
.cone
);
5212 time_t when
= time(NULL
)+seconds
;
5213 if (setExpire(c
->db
,key
,when
)) {
5214 addReply(c
,shared
.cone
);
5217 addReply(c
,shared
.czero
);
5223 static void expireCommand(redisClient
*c
) {
5224 expireGenericCommand(c
,c
->argv
[1],strtol(c
->argv
[2]->ptr
,NULL
,10));
5227 static void expireatCommand(redisClient
*c
) {
5228 expireGenericCommand(c
,c
->argv
[1],strtol(c
->argv
[2]->ptr
,NULL
,10)-time(NULL
));
5231 static void ttlCommand(redisClient
*c
) {
5235 expire
= getExpire(c
->db
,c
->argv
[1]);
5237 ttl
= (int) (expire
-time(NULL
));
5238 if (ttl
< 0) ttl
= -1;
5240 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",ttl
));
5243 /* =============================== Replication ============================= */
5245 static int syncWrite(int fd
, char *ptr
, ssize_t size
, int timeout
) {
5246 ssize_t nwritten
, ret
= size
;
5247 time_t start
= time(NULL
);
5251 if (aeWait(fd
,AE_WRITABLE
,1000) & AE_WRITABLE
) {
5252 nwritten
= write(fd
,ptr
,size
);
5253 if (nwritten
== -1) return -1;
5257 if ((time(NULL
)-start
) > timeout
) {
5265 static int syncRead(int fd
, char *ptr
, ssize_t size
, int timeout
) {
5266 ssize_t nread
, totread
= 0;
5267 time_t start
= time(NULL
);
5271 if (aeWait(fd
,AE_READABLE
,1000) & AE_READABLE
) {
5272 nread
= read(fd
,ptr
,size
);
5273 if (nread
== -1) return -1;
5278 if ((time(NULL
)-start
) > timeout
) {
5286 static int syncReadLine(int fd
, char *ptr
, ssize_t size
, int timeout
) {
5293 if (syncRead(fd
,&c
,1,timeout
) == -1) return -1;
5296 if (nread
&& *(ptr
-1) == '\r') *(ptr
-1) = '\0';
5307 static void syncCommand(redisClient
*c
) {
5308 /* ignore SYNC if aleady slave or in monitor mode */
5309 if (c
->flags
& REDIS_SLAVE
) return;
5311 /* SYNC can't be issued when the server has pending data to send to
5312 * the client about already issued commands. We need a fresh reply
5313 * buffer registering the differences between the BGSAVE and the current
5314 * dataset, so that we can copy to other slaves if needed. */
5315 if (listLength(c
->reply
) != 0) {
5316 addReplySds(c
,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
5320 redisLog(REDIS_NOTICE
,"Slave ask for synchronization");
5321 /* Here we need to check if there is a background saving operation
5322 * in progress, or if it is required to start one */
5323 if (server
.bgsavechildpid
!= -1) {
5324 /* Ok a background save is in progress. Let's check if it is a good
5325 * one for replication, i.e. if there is another slave that is
5326 * registering differences since the server forked to save */
5330 listRewind(server
.slaves
);
5331 while((ln
= listYield(server
.slaves
))) {
5333 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) break;
5336 /* Perfect, the server is already registering differences for
5337 * another slave. Set the right state, and copy the buffer. */
5338 listRelease(c
->reply
);
5339 c
->reply
= listDup(slave
->reply
);
5340 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
5341 redisLog(REDIS_NOTICE
,"Waiting for end of BGSAVE for SYNC");
5343 /* No way, we need to wait for the next BGSAVE in order to
5344 * register differences */
5345 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_START
;
5346 redisLog(REDIS_NOTICE
,"Waiting for next BGSAVE for SYNC");
5349 /* Ok we don't have a BGSAVE in progress, let's start one */
5350 redisLog(REDIS_NOTICE
,"Starting BGSAVE for SYNC");
5351 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
5352 redisLog(REDIS_NOTICE
,"Replication failed, can't BGSAVE");
5353 addReplySds(c
,sdsnew("-ERR Unalbe to perform background save\r\n"));
5356 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
5359 c
->flags
|= REDIS_SLAVE
;
5361 listAddNodeTail(server
.slaves
,c
);
5365 static void sendBulkToSlave(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
5366 redisClient
*slave
= privdata
;
5368 REDIS_NOTUSED(mask
);
5369 char buf
[REDIS_IOBUF_LEN
];
5370 ssize_t nwritten
, buflen
;
5372 if (slave
->repldboff
== 0) {
5373 /* Write the bulk write count before to transfer the DB. In theory here
5374 * we don't know how much room there is in the output buffer of the
5375 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
5376 * operations) will never be smaller than the few bytes we need. */
5379 bulkcount
= sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
5381 if (write(fd
,bulkcount
,sdslen(bulkcount
)) != (signed)sdslen(bulkcount
))
5389 lseek(slave
->repldbfd
,slave
->repldboff
,SEEK_SET
);
5390 buflen
= read(slave
->repldbfd
,buf
,REDIS_IOBUF_LEN
);
5392 redisLog(REDIS_WARNING
,"Read error sending DB to slave: %s",
5393 (buflen
== 0) ? "premature EOF" : strerror(errno
));
5397 if ((nwritten
= write(fd
,buf
,buflen
)) == -1) {
5398 redisLog(REDIS_DEBUG
,"Write error sending DB to slave: %s",
5403 slave
->repldboff
+= nwritten
;
5404 if (slave
->repldboff
== slave
->repldbsize
) {
5405 close(slave
->repldbfd
);
5406 slave
->repldbfd
= -1;
5407 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
5408 slave
->replstate
= REDIS_REPL_ONLINE
;
5409 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
,
5410 sendReplyToClient
, slave
) == AE_ERR
) {
5414 addReplySds(slave
,sdsempty());
5415 redisLog(REDIS_NOTICE
,"Synchronization with slave succeeded");
5419 /* This function is called at the end of every backgrond saving.
5420 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
5421 * otherwise REDIS_ERR is passed to the function.
5423 * The goal of this function is to handle slaves waiting for a successful
5424 * background saving in order to perform non-blocking synchronization. */
5425 static void updateSlavesWaitingBgsave(int bgsaveerr
) {
5427 int startbgsave
= 0;
5429 listRewind(server
.slaves
);
5430 while((ln
= listYield(server
.slaves
))) {
5431 redisClient
*slave
= ln
->value
;
5433 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) {
5435 slave
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
5436 } else if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) {
5437 struct redis_stat buf
;
5439 if (bgsaveerr
!= REDIS_OK
) {
5441 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE child returned an error");
5444 if ((slave
->repldbfd
= open(server
.dbfilename
,O_RDONLY
)) == -1 ||
5445 redis_fstat(slave
->repldbfd
,&buf
) == -1) {
5447 redisLog(REDIS_WARNING
,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno
));
5450 slave
->repldboff
= 0;
5451 slave
->repldbsize
= buf
.st_size
;
5452 slave
->replstate
= REDIS_REPL_SEND_BULK
;
5453 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
5454 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
, sendBulkToSlave
, slave
) == AE_ERR
) {
5461 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
5462 listRewind(server
.slaves
);
5463 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE failed");
5464 while((ln
= listYield(server
.slaves
))) {
5465 redisClient
*slave
= ln
->value
;
5467 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
)
5474 static int syncWithMaster(void) {
5475 char buf
[1024], tmpfile
[256], authcmd
[1024];
5477 int fd
= anetTcpConnect(NULL
,server
.masterhost
,server
.masterport
);
5481 redisLog(REDIS_WARNING
,"Unable to connect to MASTER: %s",
5486 /* AUTH with the master if required. */
5487 if(server
.masterauth
) {
5488 snprintf(authcmd
, 1024, "AUTH %s\r\n", server
.masterauth
);
5489 if (syncWrite(fd
, authcmd
, strlen(server
.masterauth
)+7, 5) == -1) {
5491 redisLog(REDIS_WARNING
,"Unable to AUTH to MASTER: %s",
5495 /* Read the AUTH result. */
5496 if (syncReadLine(fd
,buf
,1024,3600) == -1) {
5498 redisLog(REDIS_WARNING
,"I/O error reading auth result from MASTER: %s",
5502 if (buf
[0] != '+') {
5504 redisLog(REDIS_WARNING
,"Cannot AUTH to MASTER, is the masterauth password correct?");
5509 /* Issue the SYNC command */
5510 if (syncWrite(fd
,"SYNC \r\n",7,5) == -1) {
5512 redisLog(REDIS_WARNING
,"I/O error writing to MASTER: %s",
5516 /* Read the bulk write count */
5517 if (syncReadLine(fd
,buf
,1024,3600) == -1) {
5519 redisLog(REDIS_WARNING
,"I/O error reading bulk count from MASTER: %s",
5523 if (buf
[0] != '$') {
5525 redisLog(REDIS_WARNING
,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
5528 dumpsize
= atoi(buf
+1);
5529 redisLog(REDIS_NOTICE
,"Receiving %d bytes data dump from MASTER",dumpsize
);
5530 /* Read the bulk write data on a temp file */
5531 snprintf(tmpfile
,256,"temp-%d.%ld.rdb",(int)time(NULL
),(long int)random());
5532 dfd
= open(tmpfile
,O_CREAT
|O_WRONLY
,0644);
5535 redisLog(REDIS_WARNING
,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno
));
5539 int nread
, nwritten
;
5541 nread
= read(fd
,buf
,(dumpsize
< 1024)?dumpsize
:1024);
5543 redisLog(REDIS_WARNING
,"I/O error trying to sync with MASTER: %s",
5549 nwritten
= write(dfd
,buf
,nread
);
5550 if (nwritten
== -1) {
5551 redisLog(REDIS_WARNING
,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno
));
5559 if (rename(tmpfile
,server
.dbfilename
) == -1) {
5560 redisLog(REDIS_WARNING
,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno
));
5566 if (rdbLoad(server
.dbfilename
) != REDIS_OK
) {
5567 redisLog(REDIS_WARNING
,"Failed trying to load the MASTER synchronization DB from disk");
5571 server
.master
= createClient(fd
);
5572 server
.master
->flags
|= REDIS_MASTER
;
5573 server
.replstate
= REDIS_REPL_CONNECTED
;
5577 static void slaveofCommand(redisClient
*c
) {
5578 if (!strcasecmp(c
->argv
[1]->ptr
,"no") &&
5579 !strcasecmp(c
->argv
[2]->ptr
,"one")) {
5580 if (server
.masterhost
) {
5581 sdsfree(server
.masterhost
);
5582 server
.masterhost
= NULL
;
5583 if (server
.master
) freeClient(server
.master
);
5584 server
.replstate
= REDIS_REPL_NONE
;
5585 redisLog(REDIS_NOTICE
,"MASTER MODE enabled (user request)");
5588 sdsfree(server
.masterhost
);
5589 server
.masterhost
= sdsdup(c
->argv
[1]->ptr
);
5590 server
.masterport
= atoi(c
->argv
[2]->ptr
);
5591 if (server
.master
) freeClient(server
.master
);
5592 server
.replstate
= REDIS_REPL_CONNECT
;
5593 redisLog(REDIS_NOTICE
,"SLAVE OF %s:%d enabled (user request)",
5594 server
.masterhost
, server
.masterport
);
5596 addReply(c
,shared
.ok
);
5599 /* ============================ Maxmemory directive ======================== */
5601 /* This function gets called when 'maxmemory' is set on the config file to limit
5602 * the max memory used by the server, and we are out of memory.
5603 * This function will try to, in order:
5605 * - Free objects from the free list
5606 * - Try to remove keys with an EXPIRE set
5608 * It is not possible to free enough memory to reach used-memory < maxmemory
5609 * the server will start refusing commands that will enlarge even more the
5612 static void freeMemoryIfNeeded(void) {
5613 while (server
.maxmemory
&& zmalloc_used_memory() > server
.maxmemory
) {
5614 if (listLength(server
.objfreelist
)) {
5617 listNode
*head
= listFirst(server
.objfreelist
);
5618 o
= listNodeValue(head
);
5619 listDelNode(server
.objfreelist
,head
);
5622 int j
, k
, freed
= 0;
5624 for (j
= 0; j
< server
.dbnum
; j
++) {
5626 robj
*minkey
= NULL
;
5627 struct dictEntry
*de
;
5629 if (dictSize(server
.db
[j
].expires
)) {
5631 /* From a sample of three keys drop the one nearest to
5632 * the natural expire */
5633 for (k
= 0; k
< 3; k
++) {
5636 de
= dictGetRandomKey(server
.db
[j
].expires
);
5637 t
= (time_t) dictGetEntryVal(de
);
5638 if (minttl
== -1 || t
< minttl
) {
5639 minkey
= dictGetEntryKey(de
);
5643 deleteKey(server
.db
+j
,minkey
);
5646 if (!freed
) return; /* nothing to free... */
5651 /* ============================== Append Only file ========================== */
5653 static void feedAppendOnlyFile(struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
) {
5654 sds buf
= sdsempty();
5660 /* The DB this command was targetting is not the same as the last command
5661 * we appendend. To issue a SELECT command is needed. */
5662 if (dictid
!= server
.appendseldb
) {
5665 snprintf(seldb
,sizeof(seldb
),"%d",dictid
);
5666 buf
= sdscatprintf(buf
,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
5667 strlen(seldb
),seldb
);
5668 server
.appendseldb
= dictid
;
5671 /* "Fix" the argv vector if the command is EXPIRE. We want to translate
5672 * EXPIREs into EXPIREATs calls */
5673 if (cmd
->proc
== expireCommand
) {
5676 tmpargv
[0] = createStringObject("EXPIREAT",8);
5677 tmpargv
[1] = argv
[1];
5678 incrRefCount(argv
[1]);
5679 when
= time(NULL
)+strtol(argv
[2]->ptr
,NULL
,10);
5680 tmpargv
[2] = createObject(REDIS_STRING
,
5681 sdscatprintf(sdsempty(),"%ld",when
));
5685 /* Append the actual command */
5686 buf
= sdscatprintf(buf
,"*%d\r\n",argc
);
5687 for (j
= 0; j
< argc
; j
++) {
5690 o
= getDecodedObject(o
);
5691 buf
= sdscatprintf(buf
,"$%lu\r\n",sdslen(o
->ptr
));
5692 buf
= sdscatlen(buf
,o
->ptr
,sdslen(o
->ptr
));
5693 buf
= sdscatlen(buf
,"\r\n",2);
5697 /* Free the objects from the modified argv for EXPIREAT */
5698 if (cmd
->proc
== expireCommand
) {
5699 for (j
= 0; j
< 3; j
++)
5700 decrRefCount(argv
[j
]);
5703 /* We want to perform a single write. This should be guaranteed atomic
5704 * at least if the filesystem we are writing is a real physical one.
5705 * While this will save us against the server being killed I don't think
5706 * there is much to do about the whole server stopping for power problems
5708 nwritten
= write(server
.appendfd
,buf
,sdslen(buf
));
5709 if (nwritten
!= (signed)sdslen(buf
)) {
5710 /* Ooops, we are in troubles. The best thing to do for now is
5711 * to simply exit instead to give the illusion that everything is
5712 * working as expected. */
5713 if (nwritten
== -1) {
5714 redisLog(REDIS_WARNING
,"Exiting on error writing to the append-only file: %s",strerror(errno
));
5716 redisLog(REDIS_WARNING
,"Exiting on short write while writing to the append-only file: %s",strerror(errno
));
5720 /* If a background append only file rewriting is in progress we want to
5721 * accumulate the differences between the child DB and the current one
5722 * in a buffer, so that when the child process will do its work we
5723 * can append the differences to the new append only file. */
5724 if (server
.bgrewritechildpid
!= -1)
5725 server
.bgrewritebuf
= sdscatlen(server
.bgrewritebuf
,buf
,sdslen(buf
));
5729 if (server
.appendfsync
== APPENDFSYNC_ALWAYS
||
5730 (server
.appendfsync
== APPENDFSYNC_EVERYSEC
&&
5731 now
-server
.lastfsync
> 1))
5733 fsync(server
.appendfd
); /* Let's try to get this data on the disk */
5734 server
.lastfsync
= now
;
5738 /* In Redis commands are always executed in the context of a client, so in
5739 * order to load the append only file we need to create a fake client. */
5740 static struct redisClient
*createFakeClient(void) {
5741 struct redisClient
*c
= zmalloc(sizeof(*c
));
5745 c
->querybuf
= sdsempty();
5749 /* We set the fake client as a slave waiting for the synchronization
5750 * so that Redis will not try to send replies to this client. */
5751 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_START
;
5752 c
->reply
= listCreate();
5753 listSetFreeMethod(c
->reply
,decrRefCount
);
5754 listSetDupMethod(c
->reply
,dupClientReplyValue
);
5758 static void freeFakeClient(struct redisClient
*c
) {
5759 sdsfree(c
->querybuf
);
5760 listRelease(c
->reply
);
5764 /* Replay the append log file. On error REDIS_OK is returned. On non fatal
5765 * error (the append only file is zero-length) REDIS_ERR is returned. On
5766 * fatal error an error message is logged and the program exists. */
5767 int loadAppendOnlyFile(char *filename
) {
5768 struct redisClient
*fakeClient
;
5769 FILE *fp
= fopen(filename
,"r");
5770 struct redis_stat sb
;
5772 if (redis_fstat(fileno(fp
),&sb
) != -1 && sb
.st_size
== 0)
5776 redisLog(REDIS_WARNING
,"Fatal error: can't open the append log file for reading: %s",strerror(errno
));
5780 fakeClient
= createFakeClient();
5787 struct redisCommand
*cmd
;
5789 if (fgets(buf
,sizeof(buf
),fp
) == NULL
) {
5795 if (buf
[0] != '*') goto fmterr
;
5797 argv
= zmalloc(sizeof(robj
*)*argc
);
5798 for (j
= 0; j
< argc
; j
++) {
5799 if (fgets(buf
,sizeof(buf
),fp
) == NULL
) goto readerr
;
5800 if (buf
[0] != '$') goto fmterr
;
5801 len
= strtol(buf
+1,NULL
,10);
5802 argsds
= sdsnewlen(NULL
,len
);
5803 if (len
&& fread(argsds
,len
,1,fp
) == 0) goto fmterr
;
5804 argv
[j
] = createObject(REDIS_STRING
,argsds
);
5805 if (fread(buf
,2,1,fp
) == 0) goto fmterr
; /* discard CRLF */
5808 /* Command lookup */
5809 cmd
= lookupCommand(argv
[0]->ptr
);
5811 redisLog(REDIS_WARNING
,"Unknown command '%s' reading the append only file", argv
[0]->ptr
);
5814 /* Try object sharing and encoding */
5815 if (server
.shareobjects
) {
5817 for(j
= 1; j
< argc
; j
++)
5818 argv
[j
] = tryObjectSharing(argv
[j
]);
5820 if (cmd
->flags
& REDIS_CMD_BULK
)
5821 tryObjectEncoding(argv
[argc
-1]);
5822 /* Run the command in the context of a fake client */
5823 fakeClient
->argc
= argc
;
5824 fakeClient
->argv
= argv
;
5825 cmd
->proc(fakeClient
);
5826 /* Discard the reply objects list from the fake client */
5827 while(listLength(fakeClient
->reply
))
5828 listDelNode(fakeClient
->reply
,listFirst(fakeClient
->reply
));
5829 /* Clean up, ready for the next command */
5830 for (j
= 0; j
< argc
; j
++) decrRefCount(argv
[j
]);
5834 freeFakeClient(fakeClient
);
5839 redisLog(REDIS_WARNING
,"Unexpected end of file reading the append only file");
5841 redisLog(REDIS_WARNING
,"Unrecoverable error reading the append only file: %s", strerror(errno
));
5845 redisLog(REDIS_WARNING
,"Bad file format reading the append only file");
5849 /* Write an object into a file in the bulk format $<count>\r\n<payload>\r\n */
5850 static int fwriteBulk(FILE *fp
, robj
*obj
) {
5852 obj
= getDecodedObject(obj
);
5853 snprintf(buf
,sizeof(buf
),"$%ld\r\n",(long)sdslen(obj
->ptr
));
5854 if (fwrite(buf
,strlen(buf
),1,fp
) == 0) goto err
;
5855 if (fwrite(obj
->ptr
,sdslen(obj
->ptr
),1,fp
) == 0) goto err
;
5856 if (fwrite("\r\n",2,1,fp
) == 0) goto err
;
5864 /* Write a double value in bulk format $<count>\r\n<payload>\r\n */
5865 static int fwriteBulkDouble(FILE *fp
, double d
) {
5866 char buf
[128], dbuf
[128];
5868 snprintf(dbuf
,sizeof(dbuf
),"%.17g\r\n",d
);
5869 snprintf(buf
,sizeof(buf
),"$%lu\r\n",(unsigned long)strlen(dbuf
)-2);
5870 if (fwrite(buf
,strlen(buf
),1,fp
) == 0) return 0;
5871 if (fwrite(dbuf
,strlen(dbuf
),1,fp
) == 0) return 0;
5875 /* Write a long value in bulk format $<count>\r\n<payload>\r\n */
5876 static int fwriteBulkLong(FILE *fp
, long l
) {
5877 char buf
[128], lbuf
[128];
5879 snprintf(lbuf
,sizeof(lbuf
),"%ld\r\n",l
);
5880 snprintf(buf
,sizeof(buf
),"$%lu\r\n",(unsigned long)strlen(lbuf
)-2);
5881 if (fwrite(buf
,strlen(buf
),1,fp
) == 0) return 0;
5882 if (fwrite(lbuf
,strlen(lbuf
),1,fp
) == 0) return 0;
5886 /* Write a sequence of commands able to fully rebuild the dataset into
5887 * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */
5888 static int rewriteAppendOnlyFile(char *filename
) {
5889 dictIterator
*di
= NULL
;
5894 time_t now
= time(NULL
);
5896 /* Note that we have to use a different temp name here compared to the
5897 * one used by rewriteAppendOnlyFileBackground() function. */
5898 snprintf(tmpfile
,256,"temp-rewriteaof-%d.aof", (int) getpid());
5899 fp
= fopen(tmpfile
,"w");
5901 redisLog(REDIS_WARNING
, "Failed rewriting the append only file: %s", strerror(errno
));
5904 for (j
= 0; j
< server
.dbnum
; j
++) {
5905 char selectcmd
[] = "*2\r\n$6\r\nSELECT\r\n";
5906 redisDb
*db
= server
.db
+j
;
5908 if (dictSize(d
) == 0) continue;
5909 di
= dictGetIterator(d
);
5915 /* SELECT the new DB */
5916 if (fwrite(selectcmd
,sizeof(selectcmd
)-1,1,fp
) == 0) goto werr
;
5917 if (fwriteBulkLong(fp
,j
) == 0) goto werr
;
5919 /* Iterate this DB writing every entry */
5920 while((de
= dictNext(di
)) != NULL
) {
5921 robj
*key
= dictGetEntryKey(de
);
5922 robj
*o
= dictGetEntryVal(de
);
5923 time_t expiretime
= getExpire(db
,key
);
5925 /* Save the key and associated value */
5926 if (o
->type
== REDIS_STRING
) {
5927 /* Emit a SET command */
5928 char cmd
[]="*3\r\n$3\r\nSET\r\n";
5929 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
5931 if (fwriteBulk(fp
,key
) == 0) goto werr
;
5932 if (fwriteBulk(fp
,o
) == 0) goto werr
;
5933 } else if (o
->type
== REDIS_LIST
) {
5934 /* Emit the RPUSHes needed to rebuild the list */
5935 list
*list
= o
->ptr
;
5939 while((ln
= listYield(list
))) {
5940 char cmd
[]="*3\r\n$5\r\nRPUSH\r\n";
5941 robj
*eleobj
= listNodeValue(ln
);
5943 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
5944 if (fwriteBulk(fp
,key
) == 0) goto werr
;
5945 if (fwriteBulk(fp
,eleobj
) == 0) goto werr
;
5947 } else if (o
->type
== REDIS_SET
) {
5948 /* Emit the SADDs needed to rebuild the set */
5950 dictIterator
*di
= dictGetIterator(set
);
5953 while((de
= dictNext(di
)) != NULL
) {
5954 char cmd
[]="*3\r\n$4\r\nSADD\r\n";
5955 robj
*eleobj
= dictGetEntryKey(de
);
5957 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
5958 if (fwriteBulk(fp
,key
) == 0) goto werr
;
5959 if (fwriteBulk(fp
,eleobj
) == 0) goto werr
;
5961 dictReleaseIterator(di
);
5962 } else if (o
->type
== REDIS_ZSET
) {
5963 /* Emit the ZADDs needed to rebuild the sorted set */
5965 dictIterator
*di
= dictGetIterator(zs
->dict
);
5968 while((de
= dictNext(di
)) != NULL
) {
5969 char cmd
[]="*4\r\n$4\r\nZADD\r\n";
5970 robj
*eleobj
= dictGetEntryKey(de
);
5971 double *score
= dictGetEntryVal(de
);
5973 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
5974 if (fwriteBulk(fp
,key
) == 0) goto werr
;
5975 if (fwriteBulkDouble(fp
,*score
) == 0) goto werr
;
5976 if (fwriteBulk(fp
,eleobj
) == 0) goto werr
;
5978 dictReleaseIterator(di
);
5980 redisAssert(0 != 0);
5982 /* Save the expire time */
5983 if (expiretime
!= -1) {
5984 char cmd
[]="*3\r\n$6\r\nEXPIRE\r\n";
5985 /* If this key is already expired skip it */
5986 if (expiretime
< now
) continue;
5987 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
5988 if (fwriteBulk(fp
,key
) == 0) goto werr
;
5989 if (fwriteBulkLong(fp
,expiretime
) == 0) goto werr
;
5992 dictReleaseIterator(di
);
5995 /* Make sure data will not remain on the OS's output buffers */
6000 /* Use RENAME to make sure the DB file is changed atomically only
6001 * if the generate DB file is ok. */
6002 if (rename(tmpfile
,filename
) == -1) {
6003 redisLog(REDIS_WARNING
,"Error moving temp append only file on the final destination: %s", strerror(errno
));
6007 redisLog(REDIS_NOTICE
,"SYNC append only file rewrite performed");
6013 redisLog(REDIS_WARNING
,"Write error writing append only fileon disk: %s", strerror(errno
));
6014 if (di
) dictReleaseIterator(di
);
6018 /* This is how rewriting of the append only file in background works:
6020 * 1) The user calls BGREWRITEAOF
6021 * 2) Redis calls this function, that forks():
6022 * 2a) the child rewrite the append only file in a temp file.
6023 * 2b) the parent accumulates differences in server.bgrewritebuf.
6024 * 3) When the child finished '2a' exists.
6025 * 4) The parent will trap the exit code, if it's OK, will append the
6026 * data accumulated into server.bgrewritebuf into the temp file, and
6027 * finally will rename(2) the temp file in the actual file name.
6028 * The the new file is reopened as the new append only file. Profit!
6030 static int rewriteAppendOnlyFileBackground(void) {
6033 if (server
.bgrewritechildpid
!= -1) return REDIS_ERR
;
6034 if ((childpid
= fork()) == 0) {
6039 snprintf(tmpfile
,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
6040 if (rewriteAppendOnlyFile(tmpfile
) == REDIS_OK
) {
6047 if (childpid
== -1) {
6048 redisLog(REDIS_WARNING
,
6049 "Can't rewrite append only file in background: fork: %s",
6053 redisLog(REDIS_NOTICE
,
6054 "Background append only file rewriting started by pid %d",childpid
);
6055 server
.bgrewritechildpid
= childpid
;
6056 /* We set appendseldb to -1 in order to force the next call to the
6057 * feedAppendOnlyFile() to issue a SELECT command, so the differences
6058 * accumulated by the parent into server.bgrewritebuf will start
6059 * with a SELECT statement and it will be safe to merge. */
6060 server
.appendseldb
= -1;
6063 return REDIS_OK
; /* unreached */
6066 static void bgrewriteaofCommand(redisClient
*c
) {
6067 if (server
.bgrewritechildpid
!= -1) {
6068 addReplySds(c
,sdsnew("-ERR background append only file rewriting already in progress\r\n"));
6071 if (rewriteAppendOnlyFileBackground() == REDIS_OK
) {
6072 addReply(c
,shared
.ok
);
6074 addReply(c
,shared
.err
);
6078 static void aofRemoveTempFile(pid_t childpid
) {
6081 snprintf(tmpfile
,256,"temp-rewriteaof-bg-%d.aof", (int) childpid
);
6085 /* ================================= Debugging ============================== */
6087 static void debugCommand(redisClient
*c
) {
6088 if (!strcasecmp(c
->argv
[1]->ptr
,"segfault")) {
6090 } else if (!strcasecmp(c
->argv
[1]->ptr
,"reload")) {
6091 if (rdbSave(server
.dbfilename
) != REDIS_OK
) {
6092 addReply(c
,shared
.err
);
6096 if (rdbLoad(server
.dbfilename
) != REDIS_OK
) {
6097 addReply(c
,shared
.err
);
6100 redisLog(REDIS_WARNING
,"DB reloaded by DEBUG RELOAD");
6101 addReply(c
,shared
.ok
);
6102 } else if (!strcasecmp(c
->argv
[1]->ptr
,"object") && c
->argc
== 3) {
6103 dictEntry
*de
= dictFind(c
->db
->dict
,c
->argv
[2]);
6107 addReply(c
,shared
.nokeyerr
);
6110 key
= dictGetEntryKey(de
);
6111 val
= dictGetEntryVal(de
);
6112 addReplySds(c
,sdscatprintf(sdsempty(),
6113 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
6114 (void*)key
, key
->refcount
, (void*)val
, val
->refcount
,
6117 addReplySds(c
,sdsnew(
6118 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>|RELOAD]\r\n"));
6122 static void _redisAssert(char *estr
) {
6123 redisLog(REDIS_WARNING
,"=== ASSERTION FAILED ===");
6124 redisLog(REDIS_WARNING
,"==> %s\n",estr
);
6125 #ifdef HAVE_BACKTRACE
6126 redisLog(REDIS_WARNING
,"(forcing SIGSEGV in order to print the stack trace)");
6131 /* =================================== Main! ================================ */
6134 int linuxOvercommitMemoryValue(void) {
6135 FILE *fp
= fopen("/proc/sys/vm/overcommit_memory","r");
6139 if (fgets(buf
,64,fp
) == NULL
) {
6148 void linuxOvercommitMemoryWarning(void) {
6149 if (linuxOvercommitMemoryValue() == 0) {
6150 redisLog(REDIS_WARNING
,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
6153 #endif /* __linux__ */
6155 static void daemonize(void) {
6159 if (fork() != 0) exit(0); /* parent exits */
6160 printf("New pid: %d\n", getpid());
6161 setsid(); /* create a new session */
6163 /* Every output goes to /dev/null. If Redis is daemonized but
6164 * the 'logfile' is set to 'stdout' in the configuration file
6165 * it will not log at all. */
6166 if ((fd
= open("/dev/null", O_RDWR
, 0)) != -1) {
6167 dup2(fd
, STDIN_FILENO
);
6168 dup2(fd
, STDOUT_FILENO
);
6169 dup2(fd
, STDERR_FILENO
);
6170 if (fd
> STDERR_FILENO
) close(fd
);
6172 /* Try to write the pid file */
6173 fp
= fopen(server
.pidfile
,"w");
6175 fprintf(fp
,"%d\n",getpid());
6180 int main(int argc
, char **argv
) {
6183 resetServerSaveParams();
6184 loadServerConfig(argv
[1]);
6185 } else if (argc
> 2) {
6186 fprintf(stderr
,"Usage: ./redis-server [/path/to/redis.conf]\n");
6189 redisLog(REDIS_WARNING
,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
6191 if (server
.daemonize
) daemonize();
6193 redisLog(REDIS_NOTICE
,"Server started, Redis version " REDIS_VERSION
);
6195 linuxOvercommitMemoryWarning();
6197 if (server
.appendonly
) {
6198 if (loadAppendOnlyFile(server
.appendfilename
) == REDIS_OK
)
6199 redisLog(REDIS_NOTICE
,"DB loaded from append only file");
6201 if (rdbLoad(server
.dbfilename
) == REDIS_OK
)
6202 redisLog(REDIS_NOTICE
,"DB loaded from disk");
6204 if (aeCreateFileEvent(server
.el
, server
.fd
, AE_READABLE
,
6205 acceptHandler
, NULL
) == AE_ERR
) oom("creating file event");
6206 redisLog(REDIS_NOTICE
,"The server is now ready to accept connections on port %d", server
.port
);
6208 aeDeleteEventLoop(server
.el
);
6212 /* ============================= Backtrace support ========================= */
6214 #ifdef HAVE_BACKTRACE
6215 static char *findFuncName(void *pointer
, unsigned long *offset
);
6217 static void *getMcontextEip(ucontext_t
*uc
) {
6218 #if defined(__FreeBSD__)
6219 return (void*) uc
->uc_mcontext
.mc_eip
;
6220 #elif defined(__dietlibc__)
6221 return (void*) uc
->uc_mcontext
.eip
;
6222 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
6224 return (void*) uc
->uc_mcontext
->__ss
.__rip
;
6226 return (void*) uc
->uc_mcontext
->__ss
.__eip
;
6228 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
6229 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
6230 return (void*) uc
->uc_mcontext
->__ss
.__rip
;
6232 return (void*) uc
->uc_mcontext
->__ss
.__eip
;
6234 #elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
6235 return (void*) uc
->uc_mcontext
.gregs
[REG_EIP
];
6236 #elif defined(__ia64__) /* Linux IA64 */
6237 return (void*) uc
->uc_mcontext
.sc_ip
;
6243 static void segvHandler(int sig
, siginfo_t
*info
, void *secret
) {
6245 char **messages
= NULL
;
6246 int i
, trace_size
= 0;
6247 unsigned long offset
=0;
6248 ucontext_t
*uc
= (ucontext_t
*) secret
;
6250 REDIS_NOTUSED(info
);
6252 redisLog(REDIS_WARNING
,
6253 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION
, sig
);
6254 infostring
= genRedisInfoString();
6255 redisLog(REDIS_WARNING
, "%s",infostring
);
6256 /* It's not safe to sdsfree() the returned string under memory
6257 * corruption conditions. Let it leak as we are going to abort */
6259 trace_size
= backtrace(trace
, 100);
6260 /* overwrite sigaction with caller's address */
6261 if (getMcontextEip(uc
) != NULL
) {
6262 trace
[1] = getMcontextEip(uc
);
6264 messages
= backtrace_symbols(trace
, trace_size
);
6266 for (i
=1; i
<trace_size
; ++i
) {
6267 char *fn
= findFuncName(trace
[i
], &offset
), *p
;
6269 p
= strchr(messages
[i
],'+');
6270 if (!fn
|| (p
&& ((unsigned long)strtol(p
+1,NULL
,10)) < offset
)) {
6271 redisLog(REDIS_WARNING
,"%s", messages
[i
]);
6273 redisLog(REDIS_WARNING
,"%d redis-server %p %s + %d", i
, trace
[i
], fn
, (unsigned int)offset
);
6276 // free(messages); Don't call free() with possibly corrupted memory.
6280 static void setupSigSegvAction(void) {
6281 struct sigaction act
;
6283 sigemptyset (&act
.sa_mask
);
6284 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
6285 * is used. Otherwise, sa_handler is used */
6286 act
.sa_flags
= SA_NODEFER
| SA_ONSTACK
| SA_RESETHAND
| SA_SIGINFO
;
6287 act
.sa_sigaction
= segvHandler
;
6288 sigaction (SIGSEGV
, &act
, NULL
);
6289 sigaction (SIGBUS
, &act
, NULL
);
6290 sigaction (SIGFPE
, &act
, NULL
);
6291 sigaction (SIGILL
, &act
, NULL
);
6292 sigaction (SIGBUS
, &act
, NULL
);
6296 #include "staticsymbols.h"
6297 /* This function try to convert a pointer into a function name. It's used in
6298 * oreder to provide a backtrace under segmentation fault that's able to
6299 * display functions declared as static (otherwise the backtrace is useless). */
6300 static char *findFuncName(void *pointer
, unsigned long *offset
){
6302 unsigned long off
, minoff
= 0;
6304 /* Try to match against the Symbol with the smallest offset */
6305 for (i
=0; symsTable
[i
].pointer
; i
++) {
6306 unsigned long lp
= (unsigned long) pointer
;
6308 if (lp
!= (unsigned long)-1 && lp
>= symsTable
[i
].pointer
) {
6309 off
=lp
-symsTable
[i
].pointer
;
6310 if (ret
< 0 || off
< minoff
) {
6316 if (ret
== -1) return NULL
;
6318 return symsTable
[ret
].name
;
6320 #else /* HAVE_BACKTRACE */
6321 static void setupSigSegvAction(void) {
6323 #endif /* HAVE_BACKTRACE */