2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
30 #define REDIS_VERSION "1.1.91"
40 #define __USE_POSIX199309
46 #endif /* HAVE_BACKTRACE */
54 #include <arpa/inet.h>
58 #include <sys/resource.h>
64 #include "solarisfixes.h"
68 #include "ae.h" /* Event driven programming library */
69 #include "sds.h" /* Dynamic safe strings */
70 #include "anet.h" /* Networking the easy way */
71 #include "dict.h" /* Hash tables */
72 #include "adlist.h" /* Linked lists */
73 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
74 #include "lzf.h" /* LZF compression library */
75 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
81 /* Static server configuration */
82 #define REDIS_SERVERPORT 6379 /* TCP port */
83 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
84 #define REDIS_IOBUF_LEN 1024
85 #define REDIS_LOADBUF_LEN 1024
86 #define REDIS_STATIC_ARGS 4
87 #define REDIS_DEFAULT_DBNUM 16
88 #define REDIS_CONFIGLINE_MAX 1024
89 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
90 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
91 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
92 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
93 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
95 /* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */
96 #define REDIS_WRITEV_THRESHOLD 3
97 /* Max number of iovecs used for each writev call */
98 #define REDIS_WRITEV_IOVEC_COUNT 256
100 /* Hash table parameters */
101 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
104 #define REDIS_CMD_BULK 1 /* Bulk write command */
105 #define REDIS_CMD_INLINE 2 /* Inline command */
106 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
107 this flags will return an error when the 'maxmemory' option is set in the
108 config file and the server is using more than maxmemory bytes of memory.
109 In short this commands are denied on low memory conditions. */
110 #define REDIS_CMD_DENYOOM 4
113 #define REDIS_STRING 0
119 /* Objects encoding */
120 #define REDIS_ENCODING_RAW 0 /* Raw representation */
121 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
123 /* Object types only used for dumping to disk */
124 #define REDIS_EXPIRETIME 253
125 #define REDIS_SELECTDB 254
126 #define REDIS_EOF 255
128 /* Defines related to the dump file format. To store 32 bits lengths for short
129 * keys requires a lot of space, so we check the most significant 2 bits of
130 * the first byte to interpreter the length:
132 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
133 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
134 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
135 * 11|000000 this means: specially encoded object will follow. The six bits
136 * number specify the kind of object that follows.
137 * See the REDIS_RDB_ENC_* defines.
139 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
140 * values, will fit inside. */
141 #define REDIS_RDB_6BITLEN 0
142 #define REDIS_RDB_14BITLEN 1
143 #define REDIS_RDB_32BITLEN 2
144 #define REDIS_RDB_ENCVAL 3
145 #define REDIS_RDB_LENERR UINT_MAX
147 /* When a length of a string object stored on disk has the first two bits
148 * set, the remaining two bits specify a special encoding for the object
149 * accordingly to the following defines: */
150 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
151 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
152 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
153 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
156 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
157 #define REDIS_SLAVE 2 /* This client is a slave server */
158 #define REDIS_MASTER 4 /* This client is a master server */
159 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
161 /* Slave replication state - slave side */
162 #define REDIS_REPL_NONE 0 /* No active replication */
163 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
164 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
166 /* Slave replication state - from the point of view of master
167 * Note that in SEND_BULK and ONLINE state the slave receives new updates
168 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
169 * to start the next background saving in order to send updates to it. */
170 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
171 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
172 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
173 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
175 /* List related stuff */
179 /* Sort operations */
180 #define REDIS_SORT_GET 0
181 #define REDIS_SORT_ASC 1
182 #define REDIS_SORT_DESC 2
183 #define REDIS_SORTKEY_MAX 1024
186 #define REDIS_DEBUG 0
187 #define REDIS_NOTICE 1
188 #define REDIS_WARNING 2
190 /* Anti-warning macro... */
191 #define REDIS_NOTUSED(V) ((void) V)
193 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
194 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
196 /* Append only defines */
197 #define APPENDFSYNC_NO 0
198 #define APPENDFSYNC_ALWAYS 1
199 #define APPENDFSYNC_EVERYSEC 2
201 /* We can print the stacktrace, so our assert is defined this way: */
202 #define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e),exit(1)))
203 static void _redisAssert(char *estr
);
205 /*================================= Data types ============================== */
207 /* A redis object, that is a type able to hold a string / list / set */
208 typedef struct redisObject
{
211 unsigned char encoding
;
212 unsigned char notused
[2];
216 /* Macro used to initalize a Redis object allocated on the stack.
217 * Note that this macro is taken near the structure definition to make sure
218 * we'll update it when the structure is changed, to avoid bugs like
219 * bug #85 introduced exactly in this way. */
220 #define initStaticStringObject(_var,_ptr) do { \
222 _var.type = REDIS_STRING; \
223 _var.encoding = REDIS_ENCODING_RAW; \
227 typedef struct redisDb
{
233 /* With multiplexing we need to take per-clinet state.
234 * Clients are taken in a liked list. */
235 typedef struct redisClient
{
240 robj
**argv
, **mbargv
;
242 int bulklen
; /* bulk read len. -1 if not in bulk read mode */
243 int multibulk
; /* multi bulk command format active */
246 time_t lastinteraction
; /* time of the last interaction, used for timeout */
247 int flags
; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
248 int slaveseldb
; /* slave selected db, if this client is a slave */
249 int authenticated
; /* when requirepass is non-NULL */
250 int replstate
; /* replication state if this is a slave */
251 int repldbfd
; /* replication DB file descriptor */
252 long repldboff
; /* replication DB file offset */
253 off_t repldbsize
; /* replication DB file size */
261 /* Global server state structure */
267 unsigned int sharingpoolsize
;
268 long long dirty
; /* changes to DB from the last save */
270 list
*slaves
, *monitors
;
271 char neterr
[ANET_ERR_LEN
];
273 int cronloops
; /* number of times the cron function run */
274 list
*objfreelist
; /* A list of freed objects to avoid malloc() */
275 time_t lastsave
; /* Unix time of last save succeeede */
276 size_t usedmemory
; /* Used memory in megabytes */
277 /* Fields used only for stats */
278 time_t stat_starttime
; /* server start time */
279 long long stat_numcommands
; /* number of processed commands */
280 long long stat_numconnections
; /* number of connections received */
293 pid_t bgsavechildpid
;
294 pid_t bgrewritechildpid
;
295 sds bgrewritebuf
; /* buffer taken by parent during oppend only rewrite */
296 struct saveparam
*saveparams
;
301 char *appendfilename
;
304 /* Replication related */
309 redisClient
*master
; /* client that is master for this slave */
311 unsigned int maxclients
;
312 unsigned long maxmemory
;
313 /* Sort parameters - qsort_r() is only available under BSD so we
314 * have to take this state global, in order to pass it to sortCompare() */
320 typedef void redisCommandProc(redisClient
*c
);
321 struct redisCommand
{
323 redisCommandProc
*proc
;
328 struct redisFunctionSym
{
330 unsigned long pointer
;
333 typedef struct _redisSortObject
{
341 typedef struct _redisSortOperation
{
344 } redisSortOperation
;
346 /* ZSETs use a specialized version of Skiplists */
348 typedef struct zskiplistNode
{
349 struct zskiplistNode
**forward
;
350 struct zskiplistNode
*backward
;
355 typedef struct zskiplist
{
356 struct zskiplistNode
*header
, *tail
;
357 unsigned long length
;
361 typedef struct zset
{
366 /* Our shared "common" objects */
368 struct sharedObjectsStruct
{
369 robj
*crlf
, *ok
, *err
, *emptybulk
, *czero
, *cone
, *pong
, *space
,
370 *colon
, *nullbulk
, *nullmultibulk
,
371 *emptymultibulk
, *wrongtypeerr
, *nokeyerr
, *syntaxerr
, *sameobjecterr
,
372 *outofrangeerr
, *plus
,
373 *select0
, *select1
, *select2
, *select3
, *select4
,
374 *select5
, *select6
, *select7
, *select8
, *select9
;
377 /* Global vars that are actally used as constants. The following double
378 * values are used for double on-disk serialization, and are initialized
379 * at runtime to avoid strange compiler optimizations. */
381 static double R_Zero
, R_PosInf
, R_NegInf
, R_Nan
;
383 /*================================ Prototypes =============================== */
385 static void freeStringObject(robj
*o
);
386 static void freeListObject(robj
*o
);
387 static void freeSetObject(robj
*o
);
388 static void decrRefCount(void *o
);
389 static robj
*createObject(int type
, void *ptr
);
390 static void freeClient(redisClient
*c
);
391 static int rdbLoad(char *filename
);
392 static void addReply(redisClient
*c
, robj
*obj
);
393 static void addReplySds(redisClient
*c
, sds s
);
394 static void incrRefCount(robj
*o
);
395 static int rdbSaveBackground(char *filename
);
396 static robj
*createStringObject(char *ptr
, size_t len
);
397 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
);
398 static void feedAppendOnlyFile(struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
);
399 static int syncWithMaster(void);
400 static robj
*tryObjectSharing(robj
*o
);
401 static int tryObjectEncoding(robj
*o
);
402 static robj
*getDecodedObject(robj
*o
);
403 static int removeExpire(redisDb
*db
, robj
*key
);
404 static int expireIfNeeded(redisDb
*db
, robj
*key
);
405 static int deleteIfVolatile(redisDb
*db
, robj
*key
);
406 static int deleteKey(redisDb
*db
, robj
*key
);
407 static time_t getExpire(redisDb
*db
, robj
*key
);
408 static int setExpire(redisDb
*db
, robj
*key
, time_t when
);
409 static void updateSlavesWaitingBgsave(int bgsaveerr
);
410 static void freeMemoryIfNeeded(void);
411 static int processCommand(redisClient
*c
);
412 static void setupSigSegvAction(void);
413 static void rdbRemoveTempFile(pid_t childpid
);
414 static void aofRemoveTempFile(pid_t childpid
);
415 static size_t stringObjectLen(robj
*o
);
416 static void processInputBuffer(redisClient
*c
);
417 static zskiplist
*zslCreate(void);
418 static void zslFree(zskiplist
*zsl
);
419 static void zslInsert(zskiplist
*zsl
, double score
, robj
*obj
);
420 static void sendReplyToClientWritev(aeEventLoop
*el
, int fd
, void *privdata
, int mask
);
422 static void authCommand(redisClient
*c
);
423 static void pingCommand(redisClient
*c
);
424 static void echoCommand(redisClient
*c
);
425 static void setCommand(redisClient
*c
);
426 static void setnxCommand(redisClient
*c
);
427 static void getCommand(redisClient
*c
);
428 static void delCommand(redisClient
*c
);
429 static void existsCommand(redisClient
*c
);
430 static void incrCommand(redisClient
*c
);
431 static void decrCommand(redisClient
*c
);
432 static void incrbyCommand(redisClient
*c
);
433 static void decrbyCommand(redisClient
*c
);
434 static void selectCommand(redisClient
*c
);
435 static void randomkeyCommand(redisClient
*c
);
436 static void keysCommand(redisClient
*c
);
437 static void dbsizeCommand(redisClient
*c
);
438 static void lastsaveCommand(redisClient
*c
);
439 static void saveCommand(redisClient
*c
);
440 static void bgsaveCommand(redisClient
*c
);
441 static void bgrewriteaofCommand(redisClient
*c
);
442 static void shutdownCommand(redisClient
*c
);
443 static void moveCommand(redisClient
*c
);
444 static void renameCommand(redisClient
*c
);
445 static void renamenxCommand(redisClient
*c
);
446 static void lpushCommand(redisClient
*c
);
447 static void rpushCommand(redisClient
*c
);
448 static void lpopCommand(redisClient
*c
);
449 static void rpopCommand(redisClient
*c
);
450 static void llenCommand(redisClient
*c
);
451 static void lindexCommand(redisClient
*c
);
452 static void lrangeCommand(redisClient
*c
);
453 static void ltrimCommand(redisClient
*c
);
454 static void typeCommand(redisClient
*c
);
455 static void lsetCommand(redisClient
*c
);
456 static void saddCommand(redisClient
*c
);
457 static void sremCommand(redisClient
*c
);
458 static void smoveCommand(redisClient
*c
);
459 static void sismemberCommand(redisClient
*c
);
460 static void scardCommand(redisClient
*c
);
461 static void spopCommand(redisClient
*c
);
462 static void srandmemberCommand(redisClient
*c
);
463 static void sinterCommand(redisClient
*c
);
464 static void sinterstoreCommand(redisClient
*c
);
465 static void sunionCommand(redisClient
*c
);
466 static void sunionstoreCommand(redisClient
*c
);
467 static void sdiffCommand(redisClient
*c
);
468 static void sdiffstoreCommand(redisClient
*c
);
469 static void syncCommand(redisClient
*c
);
470 static void flushdbCommand(redisClient
*c
);
471 static void flushallCommand(redisClient
*c
);
472 static void sortCommand(redisClient
*c
);
473 static void lremCommand(redisClient
*c
);
474 static void rpoplpushcommand(redisClient
*c
);
475 static void infoCommand(redisClient
*c
);
476 static void mgetCommand(redisClient
*c
);
477 static void monitorCommand(redisClient
*c
);
478 static void expireCommand(redisClient
*c
);
479 static void expireatCommand(redisClient
*c
);
480 static void getsetCommand(redisClient
*c
);
481 static void ttlCommand(redisClient
*c
);
482 static void slaveofCommand(redisClient
*c
);
483 static void debugCommand(redisClient
*c
);
484 static void msetCommand(redisClient
*c
);
485 static void msetnxCommand(redisClient
*c
);
486 static void zaddCommand(redisClient
*c
);
487 static void zincrbyCommand(redisClient
*c
);
488 static void zrangeCommand(redisClient
*c
);
489 static void zrangebyscoreCommand(redisClient
*c
);
490 static void zrevrangeCommand(redisClient
*c
);
491 static void zcardCommand(redisClient
*c
);
492 static void zremCommand(redisClient
*c
);
493 static void zscoreCommand(redisClient
*c
);
494 static void zremrangebyscoreCommand(redisClient
*c
);
496 /*================================= Globals ================================= */
499 static struct redisServer server
; /* server global state */
500 static struct redisCommand cmdTable
[] = {
501 {"get",getCommand
,2,REDIS_CMD_INLINE
},
502 {"set",setCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
503 {"setnx",setnxCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
504 {"del",delCommand
,-2,REDIS_CMD_INLINE
},
505 {"exists",existsCommand
,2,REDIS_CMD_INLINE
},
506 {"incr",incrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
507 {"decr",decrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
508 {"mget",mgetCommand
,-2,REDIS_CMD_INLINE
},
509 {"rpush",rpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
510 {"lpush",lpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
511 {"rpop",rpopCommand
,2,REDIS_CMD_INLINE
},
512 {"lpop",lpopCommand
,2,REDIS_CMD_INLINE
},
513 {"llen",llenCommand
,2,REDIS_CMD_INLINE
},
514 {"lindex",lindexCommand
,3,REDIS_CMD_INLINE
},
515 {"lset",lsetCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
516 {"lrange",lrangeCommand
,4,REDIS_CMD_INLINE
},
517 {"ltrim",ltrimCommand
,4,REDIS_CMD_INLINE
},
518 {"lrem",lremCommand
,4,REDIS_CMD_BULK
},
519 {"rpoplpush",rpoplpushcommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
520 {"sadd",saddCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
521 {"srem",sremCommand
,3,REDIS_CMD_BULK
},
522 {"smove",smoveCommand
,4,REDIS_CMD_BULK
},
523 {"sismember",sismemberCommand
,3,REDIS_CMD_BULK
},
524 {"scard",scardCommand
,2,REDIS_CMD_INLINE
},
525 {"spop",spopCommand
,2,REDIS_CMD_INLINE
},
526 {"srandmember",srandmemberCommand
,2,REDIS_CMD_INLINE
},
527 {"sinter",sinterCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
528 {"sinterstore",sinterstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
529 {"sunion",sunionCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
530 {"sunionstore",sunionstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
531 {"sdiff",sdiffCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
532 {"sdiffstore",sdiffstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
533 {"smembers",sinterCommand
,2,REDIS_CMD_INLINE
},
534 {"zadd",zaddCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
535 {"zincrby",zincrbyCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
536 {"zrem",zremCommand
,3,REDIS_CMD_BULK
},
537 {"zremrangebyscore",zremrangebyscoreCommand
,4,REDIS_CMD_INLINE
},
538 {"zrange",zrangeCommand
,4,REDIS_CMD_INLINE
},
539 {"zrangebyscore",zrangebyscoreCommand
,-4,REDIS_CMD_INLINE
},
540 {"zrevrange",zrevrangeCommand
,4,REDIS_CMD_INLINE
},
541 {"zcard",zcardCommand
,2,REDIS_CMD_INLINE
},
542 {"zscore",zscoreCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
543 {"incrby",incrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
544 {"decrby",decrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
545 {"getset",getsetCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
546 {"mset",msetCommand
,-3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
547 {"msetnx",msetnxCommand
,-3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
548 {"randomkey",randomkeyCommand
,1,REDIS_CMD_INLINE
},
549 {"select",selectCommand
,2,REDIS_CMD_INLINE
},
550 {"move",moveCommand
,3,REDIS_CMD_INLINE
},
551 {"rename",renameCommand
,3,REDIS_CMD_INLINE
},
552 {"renamenx",renamenxCommand
,3,REDIS_CMD_INLINE
},
553 {"expire",expireCommand
,3,REDIS_CMD_INLINE
},
554 {"expireat",expireatCommand
,3,REDIS_CMD_INLINE
},
555 {"keys",keysCommand
,2,REDIS_CMD_INLINE
},
556 {"dbsize",dbsizeCommand
,1,REDIS_CMD_INLINE
},
557 {"auth",authCommand
,2,REDIS_CMD_INLINE
},
558 {"ping",pingCommand
,1,REDIS_CMD_INLINE
},
559 {"echo",echoCommand
,2,REDIS_CMD_BULK
},
560 {"save",saveCommand
,1,REDIS_CMD_INLINE
},
561 {"bgsave",bgsaveCommand
,1,REDIS_CMD_INLINE
},
562 {"bgrewriteaof",bgrewriteaofCommand
,1,REDIS_CMD_INLINE
},
563 {"shutdown",shutdownCommand
,1,REDIS_CMD_INLINE
},
564 {"lastsave",lastsaveCommand
,1,REDIS_CMD_INLINE
},
565 {"type",typeCommand
,2,REDIS_CMD_INLINE
},
566 {"sync",syncCommand
,1,REDIS_CMD_INLINE
},
567 {"flushdb",flushdbCommand
,1,REDIS_CMD_INLINE
},
568 {"flushall",flushallCommand
,1,REDIS_CMD_INLINE
},
569 {"sort",sortCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
570 {"info",infoCommand
,1,REDIS_CMD_INLINE
},
571 {"monitor",monitorCommand
,1,REDIS_CMD_INLINE
},
572 {"ttl",ttlCommand
,2,REDIS_CMD_INLINE
},
573 {"slaveof",slaveofCommand
,3,REDIS_CMD_INLINE
},
574 {"debug",debugCommand
,-2,REDIS_CMD_INLINE
},
578 /*============================ Utility functions ============================ */
580 /* Glob-style pattern matching. */
581 int stringmatchlen(const char *pattern
, int patternLen
,
582 const char *string
, int stringLen
, int nocase
)
587 while (pattern
[1] == '*') {
592 return 1; /* match */
594 if (stringmatchlen(pattern
+1, patternLen
-1,
595 string
, stringLen
, nocase
))
596 return 1; /* match */
600 return 0; /* no match */
604 return 0; /* no match */
614 not = pattern
[0] == '^';
621 if (pattern
[0] == '\\') {
624 if (pattern
[0] == string
[0])
626 } else if (pattern
[0] == ']') {
628 } else if (patternLen
== 0) {
632 } else if (pattern
[1] == '-' && patternLen
>= 3) {
633 int start
= pattern
[0];
634 int end
= pattern
[2];
642 start
= tolower(start
);
648 if (c
>= start
&& c
<= end
)
652 if (pattern
[0] == string
[0])
655 if (tolower((int)pattern
[0]) == tolower((int)string
[0]))
665 return 0; /* no match */
671 if (patternLen
>= 2) {
678 if (pattern
[0] != string
[0])
679 return 0; /* no match */
681 if (tolower((int)pattern
[0]) != tolower((int)string
[0]))
682 return 0; /* no match */
690 if (stringLen
== 0) {
691 while(*pattern
== '*') {
698 if (patternLen
== 0 && stringLen
== 0)
703 static void redisLog(int level
, const char *fmt
, ...) {
707 fp
= (server
.logfile
== NULL
) ? stdout
: fopen(server
.logfile
,"a");
711 if (level
>= server
.verbosity
) {
717 strftime(buf
,64,"%d %b %H:%M:%S",localtime(&now
));
718 fprintf(fp
,"%s %c ",buf
,c
[level
]);
719 vfprintf(fp
, fmt
, ap
);
725 if (server
.logfile
) fclose(fp
);
728 /*====================== Hash table type implementation ==================== */
730 /* This is an hash table type that uses the SDS dynamic strings libary as
731 * keys and radis objects as values (objects can hold SDS strings,
734 static void dictVanillaFree(void *privdata
, void *val
)
736 DICT_NOTUSED(privdata
);
740 static int sdsDictKeyCompare(void *privdata
, const void *key1
,
744 DICT_NOTUSED(privdata
);
746 l1
= sdslen((sds
)key1
);
747 l2
= sdslen((sds
)key2
);
748 if (l1
!= l2
) return 0;
749 return memcmp(key1
, key2
, l1
) == 0;
752 static void dictRedisObjectDestructor(void *privdata
, void *val
)
754 DICT_NOTUSED(privdata
);
759 static int dictObjKeyCompare(void *privdata
, const void *key1
,
762 const robj
*o1
= key1
, *o2
= key2
;
763 return sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
766 static unsigned int dictObjHash(const void *key
) {
768 return dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
771 static int dictEncObjKeyCompare(void *privdata
, const void *key1
,
774 robj
*o1
= (robj
*) key1
, *o2
= (robj
*) key2
;
777 o1
= getDecodedObject(o1
);
778 o2
= getDecodedObject(o2
);
779 cmp
= sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
785 static unsigned int dictEncObjHash(const void *key
) {
786 robj
*o
= (robj
*) key
;
788 o
= getDecodedObject(o
);
789 unsigned int hash
= dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
794 static dictType setDictType
= {
795 dictEncObjHash
, /* hash function */
798 dictEncObjKeyCompare
, /* key compare */
799 dictRedisObjectDestructor
, /* key destructor */
800 NULL
/* val destructor */
803 static dictType zsetDictType
= {
804 dictEncObjHash
, /* hash function */
807 dictEncObjKeyCompare
, /* key compare */
808 dictRedisObjectDestructor
, /* key destructor */
809 dictVanillaFree
/* val destructor of malloc(sizeof(double)) */
812 static dictType hashDictType
= {
813 dictObjHash
, /* hash function */
816 dictObjKeyCompare
, /* key compare */
817 dictRedisObjectDestructor
, /* key destructor */
818 dictRedisObjectDestructor
/* val destructor */
821 /* ========================= Random utility functions ======================= */
823 /* Redis generally does not try to recover from out of memory conditions
824 * when allocating objects or strings, it is not clear if it will be possible
825 * to report this condition to the client since the networking layer itself
826 * is based on heap allocation for send buffers, so we simply abort.
827 * At least the code will be simpler to read... */
828 static void oom(const char *msg
) {
829 redisLog(REDIS_WARNING
, "%s: Out of memory\n",msg
);
834 /* ====================== Redis server networking stuff ===================== */
835 static void closeTimedoutClients(void) {
838 time_t now
= time(NULL
);
840 listRewind(server
.clients
);
841 while ((ln
= listYield(server
.clients
)) != NULL
) {
842 c
= listNodeValue(ln
);
843 if (!(c
->flags
& REDIS_SLAVE
) && /* no timeout for slaves */
844 !(c
->flags
& REDIS_MASTER
) && /* no timeout for masters */
845 (now
- c
->lastinteraction
> server
.maxidletime
)) {
846 redisLog(REDIS_DEBUG
,"Closing idle client");
852 static int htNeedsResize(dict
*dict
) {
853 long long size
, used
;
855 size
= dictSlots(dict
);
856 used
= dictSize(dict
);
857 return (size
&& used
&& size
> DICT_HT_INITIAL_SIZE
&&
858 (used
*100/size
< REDIS_HT_MINFILL
));
861 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
862 * we resize the hash table to save memory */
863 static void tryResizeHashTables(void) {
866 for (j
= 0; j
< server
.dbnum
; j
++) {
867 if (htNeedsResize(server
.db
[j
].dict
)) {
868 redisLog(REDIS_DEBUG
,"The hash table %d is too sparse, resize it...",j
);
869 dictResize(server
.db
[j
].dict
);
870 redisLog(REDIS_DEBUG
,"Hash table %d resized.",j
);
872 if (htNeedsResize(server
.db
[j
].expires
))
873 dictResize(server
.db
[j
].expires
);
877 /* A background saving child (BGSAVE) terminated its work. Handle this. */
878 void backgroundSaveDoneHandler(int statloc
) {
879 int exitcode
= WEXITSTATUS(statloc
);
880 int bysignal
= WIFSIGNALED(statloc
);
882 if (!bysignal
&& exitcode
== 0) {
883 redisLog(REDIS_NOTICE
,
884 "Background saving terminated with success");
886 server
.lastsave
= time(NULL
);
887 } else if (!bysignal
&& exitcode
!= 0) {
888 redisLog(REDIS_WARNING
, "Background saving error");
890 redisLog(REDIS_WARNING
,
891 "Background saving terminated by signal");
892 rdbRemoveTempFile(server
.bgsavechildpid
);
894 server
.bgsavechildpid
= -1;
895 /* Possibly there are slaves waiting for a BGSAVE in order to be served
896 * (the first stage of SYNC is a bulk transfer of dump.rdb) */
897 updateSlavesWaitingBgsave(exitcode
== 0 ? REDIS_OK
: REDIS_ERR
);
900 /* A background append only file rewriting (BGREWRITEAOF) terminated its work.
902 void backgroundRewriteDoneHandler(int statloc
) {
903 int exitcode
= WEXITSTATUS(statloc
);
904 int bysignal
= WIFSIGNALED(statloc
);
906 if (!bysignal
&& exitcode
== 0) {
910 redisLog(REDIS_NOTICE
,
911 "Background append only file rewriting terminated with success");
912 /* Now it's time to flush the differences accumulated by the parent */
913 snprintf(tmpfile
,256,"temp-rewriteaof-bg-%d.aof", (int) server
.bgrewritechildpid
);
914 fd
= open(tmpfile
,O_WRONLY
|O_APPEND
);
916 redisLog(REDIS_WARNING
, "Not able to open the temp append only file produced by the child: %s", strerror(errno
));
919 /* Flush our data... */
920 if (write(fd
,server
.bgrewritebuf
,sdslen(server
.bgrewritebuf
)) !=
921 (signed) sdslen(server
.bgrewritebuf
)) {
922 redisLog(REDIS_WARNING
, "Error or short write trying to flush the parent diff of the append log file in the child temp file: %s", strerror(errno
));
926 redisLog(REDIS_NOTICE
,"Parent diff flushed into the new append log file with success (%lu bytes)",sdslen(server
.bgrewritebuf
));
927 /* Now our work is to rename the temp file into the stable file. And
928 * switch the file descriptor used by the server for append only. */
929 if (rename(tmpfile
,server
.appendfilename
) == -1) {
930 redisLog(REDIS_WARNING
,"Can't rename the temp append only file into the stable one: %s", strerror(errno
));
934 /* Mission completed... almost */
935 redisLog(REDIS_NOTICE
,"Append only file successfully rewritten.");
936 if (server
.appendfd
!= -1) {
937 /* If append only is actually enabled... */
938 close(server
.appendfd
);
939 server
.appendfd
= fd
;
941 server
.appendseldb
= -1; /* Make sure it will issue SELECT */
942 redisLog(REDIS_NOTICE
,"The new append only file was selected for future appends.");
944 /* If append only is disabled we just generate a dump in this
945 * format. Why not? */
948 } else if (!bysignal
&& exitcode
!= 0) {
949 redisLog(REDIS_WARNING
, "Background append only file rewriting error");
951 redisLog(REDIS_WARNING
,
952 "Background append only file rewriting terminated by signal");
955 sdsfree(server
.bgrewritebuf
);
956 server
.bgrewritebuf
= sdsempty();
957 aofRemoveTempFile(server
.bgrewritechildpid
);
958 server
.bgrewritechildpid
= -1;
961 static int serverCron(struct aeEventLoop
*eventLoop
, long long id
, void *clientData
) {
962 int j
, loops
= server
.cronloops
++;
963 REDIS_NOTUSED(eventLoop
);
965 REDIS_NOTUSED(clientData
);
967 /* Update the global state with the amount of used memory */
968 server
.usedmemory
= zmalloc_used_memory();
970 /* Show some info about non-empty databases */
971 for (j
= 0; j
< server
.dbnum
; j
++) {
972 long long size
, used
, vkeys
;
974 size
= dictSlots(server
.db
[j
].dict
);
975 used
= dictSize(server
.db
[j
].dict
);
976 vkeys
= dictSize(server
.db
[j
].expires
);
977 if (!(loops
% 5) && (used
|| vkeys
)) {
978 redisLog(REDIS_DEBUG
,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j
,used
,vkeys
,size
);
979 /* dictPrintStats(server.dict); */
983 /* We don't want to resize the hash tables while a bacground saving
984 * is in progress: the saving child is created using fork() that is
985 * implemented with a copy-on-write semantic in most modern systems, so
986 * if we resize the HT while there is the saving child at work actually
987 * a lot of memory movements in the parent will cause a lot of pages
989 if (server
.bgsavechildpid
== -1) tryResizeHashTables();
991 /* Show information about connected clients */
993 redisLog(REDIS_DEBUG
,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
994 listLength(server
.clients
)-listLength(server
.slaves
),
995 listLength(server
.slaves
),
997 dictSize(server
.sharingpool
));
1000 /* Close connections of timedout clients */
1001 if (server
.maxidletime
&& !(loops
% 10))
1002 closeTimedoutClients();
1004 /* Check if a background saving or AOF rewrite in progress terminated */
1005 if (server
.bgsavechildpid
!= -1 || server
.bgrewritechildpid
!= -1) {
1009 if ((pid
= wait3(&statloc
,WNOHANG
,NULL
)) != 0) {
1010 if (pid
== server
.bgsavechildpid
) {
1011 backgroundSaveDoneHandler(statloc
);
1013 backgroundRewriteDoneHandler(statloc
);
1017 /* If there is not a background saving in progress check if
1018 * we have to save now */
1019 time_t now
= time(NULL
);
1020 for (j
= 0; j
< server
.saveparamslen
; j
++) {
1021 struct saveparam
*sp
= server
.saveparams
+j
;
1023 if (server
.dirty
>= sp
->changes
&&
1024 now
-server
.lastsave
> sp
->seconds
) {
1025 redisLog(REDIS_NOTICE
,"%d changes in %d seconds. Saving...",
1026 sp
->changes
, sp
->seconds
);
1027 rdbSaveBackground(server
.dbfilename
);
1033 /* Try to expire a few timed out keys. The algorithm used is adaptive and
1034 * will use few CPU cycles if there are few expiring keys, otherwise
1035 * it will get more aggressive to avoid that too much memory is used by
1036 * keys that can be removed from the keyspace. */
1037 for (j
= 0; j
< server
.dbnum
; j
++) {
1039 redisDb
*db
= server
.db
+j
;
1041 /* Continue to expire if at the end of the cycle more than 25%
1042 * of the keys were expired. */
1044 int num
= dictSize(db
->expires
);
1045 time_t now
= time(NULL
);
1048 if (num
> REDIS_EXPIRELOOKUPS_PER_CRON
)
1049 num
= REDIS_EXPIRELOOKUPS_PER_CRON
;
1054 if ((de
= dictGetRandomKey(db
->expires
)) == NULL
) break;
1055 t
= (time_t) dictGetEntryVal(de
);
1057 deleteKey(db
,dictGetEntryKey(de
));
1061 } while (expired
> REDIS_EXPIRELOOKUPS_PER_CRON
/4);
1064 /* Check if we should connect to a MASTER */
1065 if (server
.replstate
== REDIS_REPL_CONNECT
) {
1066 redisLog(REDIS_NOTICE
,"Connecting to MASTER...");
1067 if (syncWithMaster() == REDIS_OK
) {
1068 redisLog(REDIS_NOTICE
,"MASTER <-> SLAVE sync succeeded");
1074 static void createSharedObjects(void) {
1075 shared
.crlf
= createObject(REDIS_STRING
,sdsnew("\r\n"));
1076 shared
.ok
= createObject(REDIS_STRING
,sdsnew("+OK\r\n"));
1077 shared
.err
= createObject(REDIS_STRING
,sdsnew("-ERR\r\n"));
1078 shared
.emptybulk
= createObject(REDIS_STRING
,sdsnew("$0\r\n\r\n"));
1079 shared
.czero
= createObject(REDIS_STRING
,sdsnew(":0\r\n"));
1080 shared
.cone
= createObject(REDIS_STRING
,sdsnew(":1\r\n"));
1081 shared
.nullbulk
= createObject(REDIS_STRING
,sdsnew("$-1\r\n"));
1082 shared
.nullmultibulk
= createObject(REDIS_STRING
,sdsnew("*-1\r\n"));
1083 shared
.emptymultibulk
= createObject(REDIS_STRING
,sdsnew("*0\r\n"));
1085 shared
.pong
= createObject(REDIS_STRING
,sdsnew("+PONG\r\n"));
1086 shared
.wrongtypeerr
= createObject(REDIS_STRING
,sdsnew(
1087 "-ERR Operation against a key holding the wrong kind of value\r\n"));
1088 shared
.nokeyerr
= createObject(REDIS_STRING
,sdsnew(
1089 "-ERR no such key\r\n"));
1090 shared
.syntaxerr
= createObject(REDIS_STRING
,sdsnew(
1091 "-ERR syntax error\r\n"));
1092 shared
.sameobjecterr
= createObject(REDIS_STRING
,sdsnew(
1093 "-ERR source and destination objects are the same\r\n"));
1094 shared
.outofrangeerr
= createObject(REDIS_STRING
,sdsnew(
1095 "-ERR index out of range\r\n"));
1096 shared
.space
= createObject(REDIS_STRING
,sdsnew(" "));
1097 shared
.colon
= createObject(REDIS_STRING
,sdsnew(":"));
1098 shared
.plus
= createObject(REDIS_STRING
,sdsnew("+"));
1099 shared
.select0
= createStringObject("select 0\r\n",10);
1100 shared
.select1
= createStringObject("select 1\r\n",10);
1101 shared
.select2
= createStringObject("select 2\r\n",10);
1102 shared
.select3
= createStringObject("select 3\r\n",10);
1103 shared
.select4
= createStringObject("select 4\r\n",10);
1104 shared
.select5
= createStringObject("select 5\r\n",10);
1105 shared
.select6
= createStringObject("select 6\r\n",10);
1106 shared
.select7
= createStringObject("select 7\r\n",10);
1107 shared
.select8
= createStringObject("select 8\r\n",10);
1108 shared
.select9
= createStringObject("select 9\r\n",10);
1111 static void appendServerSaveParams(time_t seconds
, int changes
) {
1112 server
.saveparams
= zrealloc(server
.saveparams
,sizeof(struct saveparam
)*(server
.saveparamslen
+1));
1113 server
.saveparams
[server
.saveparamslen
].seconds
= seconds
;
1114 server
.saveparams
[server
.saveparamslen
].changes
= changes
;
1115 server
.saveparamslen
++;
1118 static void resetServerSaveParams() {
1119 zfree(server
.saveparams
);
1120 server
.saveparams
= NULL
;
1121 server
.saveparamslen
= 0;
1124 static void initServerConfig() {
1125 server
.dbnum
= REDIS_DEFAULT_DBNUM
;
1126 server
.port
= REDIS_SERVERPORT
;
1127 server
.verbosity
= REDIS_DEBUG
;
1128 server
.maxidletime
= REDIS_MAXIDLETIME
;
1129 server
.saveparams
= NULL
;
1130 server
.logfile
= NULL
; /* NULL = log on standard output */
1131 server
.bindaddr
= NULL
;
1132 server
.glueoutputbuf
= 1;
1133 server
.daemonize
= 0;
1134 server
.appendonly
= 0;
1135 server
.appendfsync
= APPENDFSYNC_ALWAYS
;
1136 server
.lastfsync
= time(NULL
);
1137 server
.appendfd
= -1;
1138 server
.appendseldb
= -1; /* Make sure the first time will not match */
1139 server
.pidfile
= "/var/run/redis.pid";
1140 server
.dbfilename
= "dump.rdb";
1141 server
.appendfilename
= "appendonly.aof";
1142 server
.requirepass
= NULL
;
1143 server
.shareobjects
= 0;
1144 server
.sharingpoolsize
= 1024;
1145 server
.maxclients
= 0;
1146 server
.maxmemory
= 0;
1147 resetServerSaveParams();
1149 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1150 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1151 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1152 /* Replication related */
1154 server
.masterauth
= NULL
;
1155 server
.masterhost
= NULL
;
1156 server
.masterport
= 6379;
1157 server
.master
= NULL
;
1158 server
.replstate
= REDIS_REPL_NONE
;
1160 /* Double constants initialization */
1162 R_PosInf
= 1.0/R_Zero
;
1163 R_NegInf
= -1.0/R_Zero
;
1164 R_Nan
= R_Zero
/R_Zero
;
1167 static void initServer() {
1170 signal(SIGHUP
, SIG_IGN
);
1171 signal(SIGPIPE
, SIG_IGN
);
1172 setupSigSegvAction();
1174 server
.clients
= listCreate();
1175 server
.slaves
= listCreate();
1176 server
.monitors
= listCreate();
1177 server
.objfreelist
= listCreate();
1178 createSharedObjects();
1179 server
.el
= aeCreateEventLoop();
1180 server
.db
= zmalloc(sizeof(redisDb
)*server
.dbnum
);
1181 server
.sharingpool
= dictCreate(&setDictType
,NULL
);
1182 server
.fd
= anetTcpServer(server
.neterr
, server
.port
, server
.bindaddr
);
1183 if (server
.fd
== -1) {
1184 redisLog(REDIS_WARNING
, "Opening TCP port: %s", server
.neterr
);
1187 for (j
= 0; j
< server
.dbnum
; j
++) {
1188 server
.db
[j
].dict
= dictCreate(&hashDictType
,NULL
);
1189 server
.db
[j
].expires
= dictCreate(&setDictType
,NULL
);
1190 server
.db
[j
].id
= j
;
1192 server
.cronloops
= 0;
1193 server
.bgsavechildpid
= -1;
1194 server
.bgrewritechildpid
= -1;
1195 server
.bgrewritebuf
= sdsempty();
1196 server
.lastsave
= time(NULL
);
1198 server
.usedmemory
= 0;
1199 server
.stat_numcommands
= 0;
1200 server
.stat_numconnections
= 0;
1201 server
.stat_starttime
= time(NULL
);
1202 aeCreateTimeEvent(server
.el
, 1, serverCron
, NULL
, NULL
);
1204 if (server
.appendonly
) {
1205 server
.appendfd
= open(server
.appendfilename
,O_WRONLY
|O_APPEND
|O_CREAT
,0644);
1206 if (server
.appendfd
== -1) {
1207 redisLog(REDIS_WARNING
, "Can't open the append-only file: %s",
1214 /* Empty the whole database */
1215 static long long emptyDb() {
1217 long long removed
= 0;
1219 for (j
= 0; j
< server
.dbnum
; j
++) {
1220 removed
+= dictSize(server
.db
[j
].dict
);
1221 dictEmpty(server
.db
[j
].dict
);
1222 dictEmpty(server
.db
[j
].expires
);
1227 static int yesnotoi(char *s
) {
1228 if (!strcasecmp(s
,"yes")) return 1;
1229 else if (!strcasecmp(s
,"no")) return 0;
1233 /* I agree, this is a very rudimental way to load a configuration...
1234 will improve later if the config gets more complex */
1235 static void loadServerConfig(char *filename
) {
1237 char buf
[REDIS_CONFIGLINE_MAX
+1], *err
= NULL
;
1241 if (filename
[0] == '-' && filename
[1] == '\0')
1244 if ((fp
= fopen(filename
,"r")) == NULL
) {
1245 redisLog(REDIS_WARNING
,"Fatal error, can't open config file");
1250 while(fgets(buf
,REDIS_CONFIGLINE_MAX
+1,fp
) != NULL
) {
1256 line
= sdstrim(line
," \t\r\n");
1258 /* Skip comments and blank lines*/
1259 if (line
[0] == '#' || line
[0] == '\0') {
1264 /* Split into arguments */
1265 argv
= sdssplitlen(line
,sdslen(line
)," ",1,&argc
);
1266 sdstolower(argv
[0]);
1268 /* Execute config directives */
1269 if (!strcasecmp(argv
[0],"timeout") && argc
== 2) {
1270 server
.maxidletime
= atoi(argv
[1]);
1271 if (server
.maxidletime
< 0) {
1272 err
= "Invalid timeout value"; goto loaderr
;
1274 } else if (!strcasecmp(argv
[0],"port") && argc
== 2) {
1275 server
.port
= atoi(argv
[1]);
1276 if (server
.port
< 1 || server
.port
> 65535) {
1277 err
= "Invalid port"; goto loaderr
;
1279 } else if (!strcasecmp(argv
[0],"bind") && argc
== 2) {
1280 server
.bindaddr
= zstrdup(argv
[1]);
1281 } else if (!strcasecmp(argv
[0],"save") && argc
== 3) {
1282 int seconds
= atoi(argv
[1]);
1283 int changes
= atoi(argv
[2]);
1284 if (seconds
< 1 || changes
< 0) {
1285 err
= "Invalid save parameters"; goto loaderr
;
1287 appendServerSaveParams(seconds
,changes
);
1288 } else if (!strcasecmp(argv
[0],"dir") && argc
== 2) {
1289 if (chdir(argv
[1]) == -1) {
1290 redisLog(REDIS_WARNING
,"Can't chdir to '%s': %s",
1291 argv
[1], strerror(errno
));
1294 } else if (!strcasecmp(argv
[0],"loglevel") && argc
== 2) {
1295 if (!strcasecmp(argv
[1],"debug")) server
.verbosity
= REDIS_DEBUG
;
1296 else if (!strcasecmp(argv
[1],"notice")) server
.verbosity
= REDIS_NOTICE
;
1297 else if (!strcasecmp(argv
[1],"warning")) server
.verbosity
= REDIS_WARNING
;
1299 err
= "Invalid log level. Must be one of debug, notice, warning";
1302 } else if (!strcasecmp(argv
[0],"logfile") && argc
== 2) {
1305 server
.logfile
= zstrdup(argv
[1]);
1306 if (!strcasecmp(server
.logfile
,"stdout")) {
1307 zfree(server
.logfile
);
1308 server
.logfile
= NULL
;
1310 if (server
.logfile
) {
1311 /* Test if we are able to open the file. The server will not
1312 * be able to abort just for this problem later... */
1313 logfp
= fopen(server
.logfile
,"a");
1314 if (logfp
== NULL
) {
1315 err
= sdscatprintf(sdsempty(),
1316 "Can't open the log file: %s", strerror(errno
));
1321 } else if (!strcasecmp(argv
[0],"databases") && argc
== 2) {
1322 server
.dbnum
= atoi(argv
[1]);
1323 if (server
.dbnum
< 1) {
1324 err
= "Invalid number of databases"; goto loaderr
;
1326 } else if (!strcasecmp(argv
[0],"maxclients") && argc
== 2) {
1327 server
.maxclients
= atoi(argv
[1]);
1328 } else if (!strcasecmp(argv
[0],"maxmemory") && argc
== 2) {
1329 server
.maxmemory
= strtoll(argv
[1], NULL
, 10);
1330 } else if (!strcasecmp(argv
[0],"slaveof") && argc
== 3) {
1331 server
.masterhost
= sdsnew(argv
[1]);
1332 server
.masterport
= atoi(argv
[2]);
1333 server
.replstate
= REDIS_REPL_CONNECT
;
1334 } else if (!strcasecmp(argv
[0],"masterauth") && argc
== 2) {
1335 server
.masterauth
= zstrdup(argv
[1]);
1336 } else if (!strcasecmp(argv
[0],"glueoutputbuf") && argc
== 2) {
1337 if ((server
.glueoutputbuf
= yesnotoi(argv
[1])) == -1) {
1338 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1340 } else if (!strcasecmp(argv
[0],"shareobjects") && argc
== 2) {
1341 if ((server
.shareobjects
= yesnotoi(argv
[1])) == -1) {
1342 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1344 } else if (!strcasecmp(argv
[0],"shareobjectspoolsize") && argc
== 2) {
1345 server
.sharingpoolsize
= atoi(argv
[1]);
1346 if (server
.sharingpoolsize
< 1) {
1347 err
= "invalid object sharing pool size"; goto loaderr
;
1349 } else if (!strcasecmp(argv
[0],"daemonize") && argc
== 2) {
1350 if ((server
.daemonize
= yesnotoi(argv
[1])) == -1) {
1351 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1353 } else if (!strcasecmp(argv
[0],"appendonly") && argc
== 2) {
1354 if ((server
.appendonly
= yesnotoi(argv
[1])) == -1) {
1355 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1357 } else if (!strcasecmp(argv
[0],"appendfsync") && argc
== 2) {
1358 if (!strcasecmp(argv
[1],"no")) {
1359 server
.appendfsync
= APPENDFSYNC_NO
;
1360 } else if (!strcasecmp(argv
[1],"always")) {
1361 server
.appendfsync
= APPENDFSYNC_ALWAYS
;
1362 } else if (!strcasecmp(argv
[1],"everysec")) {
1363 server
.appendfsync
= APPENDFSYNC_EVERYSEC
;
1365 err
= "argument must be 'no', 'always' or 'everysec'";
1368 } else if (!strcasecmp(argv
[0],"requirepass") && argc
== 2) {
1369 server
.requirepass
= zstrdup(argv
[1]);
1370 } else if (!strcasecmp(argv
[0],"pidfile") && argc
== 2) {
1371 server
.pidfile
= zstrdup(argv
[1]);
1372 } else if (!strcasecmp(argv
[0],"dbfilename") && argc
== 2) {
1373 server
.dbfilename
= zstrdup(argv
[1]);
1375 err
= "Bad directive or wrong number of arguments"; goto loaderr
;
1377 for (j
= 0; j
< argc
; j
++)
1382 if (fp
!= stdin
) fclose(fp
);
1386 fprintf(stderr
, "\n*** FATAL CONFIG FILE ERROR ***\n");
1387 fprintf(stderr
, "Reading the configuration file, at line %d\n", linenum
);
1388 fprintf(stderr
, ">>> '%s'\n", line
);
1389 fprintf(stderr
, "%s\n", err
);
1393 static void freeClientArgv(redisClient
*c
) {
1396 for (j
= 0; j
< c
->argc
; j
++)
1397 decrRefCount(c
->argv
[j
]);
1398 for (j
= 0; j
< c
->mbargc
; j
++)
1399 decrRefCount(c
->mbargv
[j
]);
1404 static void freeClient(redisClient
*c
) {
1407 aeDeleteFileEvent(server
.el
,c
->fd
,AE_READABLE
);
1408 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1409 sdsfree(c
->querybuf
);
1410 listRelease(c
->reply
);
1413 ln
= listSearchKey(server
.clients
,c
);
1414 redisAssert(ln
!= NULL
);
1415 listDelNode(server
.clients
,ln
);
1416 if (c
->flags
& REDIS_SLAVE
) {
1417 if (c
->replstate
== REDIS_REPL_SEND_BULK
&& c
->repldbfd
!= -1)
1419 list
*l
= (c
->flags
& REDIS_MONITOR
) ? server
.monitors
: server
.slaves
;
1420 ln
= listSearchKey(l
,c
);
1421 redisAssert(ln
!= NULL
);
1424 if (c
->flags
& REDIS_MASTER
) {
1425 server
.master
= NULL
;
1426 server
.replstate
= REDIS_REPL_CONNECT
;
1433 #define GLUEREPLY_UP_TO (1024)
1434 static void glueReplyBuffersIfNeeded(redisClient
*c
) {
1436 char buf
[GLUEREPLY_UP_TO
];
1440 listRewind(c
->reply
);
1441 while((ln
= listYield(c
->reply
))) {
1445 objlen
= sdslen(o
->ptr
);
1446 if (copylen
+ objlen
<= GLUEREPLY_UP_TO
) {
1447 memcpy(buf
+copylen
,o
->ptr
,objlen
);
1449 listDelNode(c
->reply
,ln
);
1451 if (copylen
== 0) return;
1455 /* Now the output buffer is empty, add the new single element */
1456 o
= createObject(REDIS_STRING
,sdsnewlen(buf
,copylen
));
1457 listAddNodeHead(c
->reply
,o
);
1460 static void sendReplyToClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1461 redisClient
*c
= privdata
;
1462 int nwritten
= 0, totwritten
= 0, objlen
;
1465 REDIS_NOTUSED(mask
);
1467 /* Use writev() if we have enough buffers to send */
1468 if (!server
.glueoutputbuf
&&
1469 listLength(c
->reply
) > REDIS_WRITEV_THRESHOLD
&&
1470 !(c
->flags
& REDIS_MASTER
))
1472 sendReplyToClientWritev(el
, fd
, privdata
, mask
);
1476 while(listLength(c
->reply
)) {
1477 if (server
.glueoutputbuf
&& listLength(c
->reply
) > 1)
1478 glueReplyBuffersIfNeeded(c
);
1480 o
= listNodeValue(listFirst(c
->reply
));
1481 objlen
= sdslen(o
->ptr
);
1484 listDelNode(c
->reply
,listFirst(c
->reply
));
1488 if (c
->flags
& REDIS_MASTER
) {
1489 /* Don't reply to a master */
1490 nwritten
= objlen
- c
->sentlen
;
1492 nwritten
= write(fd
, ((char*)o
->ptr
)+c
->sentlen
, objlen
- c
->sentlen
);
1493 if (nwritten
<= 0) break;
1495 c
->sentlen
+= nwritten
;
1496 totwritten
+= nwritten
;
1497 /* If we fully sent the object on head go to the next one */
1498 if (c
->sentlen
== objlen
) {
1499 listDelNode(c
->reply
,listFirst(c
->reply
));
1502 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1503 * bytes, in a single threaded server it's a good idea to serve
1504 * other clients as well, even if a very large request comes from
1505 * super fast link that is always able to accept data (in real world
1506 * scenario think about 'KEYS *' against the loopback interfae) */
1507 if (totwritten
> REDIS_MAX_WRITE_PER_EVENT
) break;
1509 if (nwritten
== -1) {
1510 if (errno
== EAGAIN
) {
1513 redisLog(REDIS_DEBUG
,
1514 "Error writing to client: %s", strerror(errno
));
1519 if (totwritten
> 0) c
->lastinteraction
= time(NULL
);
1520 if (listLength(c
->reply
) == 0) {
1522 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1526 static void sendReplyToClientWritev(aeEventLoop
*el
, int fd
, void *privdata
, int mask
)
1528 redisClient
*c
= privdata
;
1529 int nwritten
= 0, totwritten
= 0, objlen
, willwrite
;
1531 struct iovec iov
[REDIS_WRITEV_IOVEC_COUNT
];
1532 int offset
, ion
= 0;
1534 REDIS_NOTUSED(mask
);
1537 while (listLength(c
->reply
)) {
1538 offset
= c
->sentlen
;
1542 /* fill-in the iov[] array */
1543 for(node
= listFirst(c
->reply
); node
; node
= listNextNode(node
)) {
1544 o
= listNodeValue(node
);
1545 objlen
= sdslen(o
->ptr
);
1547 if (totwritten
+ objlen
- offset
> REDIS_MAX_WRITE_PER_EVENT
)
1550 if(ion
== REDIS_WRITEV_IOVEC_COUNT
)
1551 break; /* no more iovecs */
1553 iov
[ion
].iov_base
= ((char*)o
->ptr
) + offset
;
1554 iov
[ion
].iov_len
= objlen
- offset
;
1555 willwrite
+= objlen
- offset
;
1556 offset
= 0; /* just for the first item */
1563 /* write all collected blocks at once */
1564 if((nwritten
= writev(fd
, iov
, ion
)) < 0) {
1565 if (errno
!= EAGAIN
) {
1566 redisLog(REDIS_DEBUG
,
1567 "Error writing to client: %s", strerror(errno
));
1574 totwritten
+= nwritten
;
1575 offset
= c
->sentlen
;
1577 /* remove written robjs from c->reply */
1578 while (nwritten
&& listLength(c
->reply
)) {
1579 o
= listNodeValue(listFirst(c
->reply
));
1580 objlen
= sdslen(o
->ptr
);
1582 if(nwritten
>= objlen
- offset
) {
1583 listDelNode(c
->reply
, listFirst(c
->reply
));
1584 nwritten
-= objlen
- offset
;
1588 c
->sentlen
+= nwritten
;
1596 c
->lastinteraction
= time(NULL
);
1598 if (listLength(c
->reply
) == 0) {
1600 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1604 static struct redisCommand
*lookupCommand(char *name
) {
1606 while(cmdTable
[j
].name
!= NULL
) {
1607 if (!strcasecmp(name
,cmdTable
[j
].name
)) return &cmdTable
[j
];
1613 /* resetClient prepare the client to process the next command */
1614 static void resetClient(redisClient
*c
) {
1620 /* If this function gets called we already read a whole
1621 * command, argments are in the client argv/argc fields.
1622 * processCommand() execute the command or prepare the
1623 * server for a bulk read from the client.
1625 * If 1 is returned the client is still alive and valid and
1626 * and other operations can be performed by the caller. Otherwise
1627 * if 0 is returned the client was destroied (i.e. after QUIT). */
1628 static int processCommand(redisClient
*c
) {
1629 struct redisCommand
*cmd
;
1632 /* Free some memory if needed (maxmemory setting) */
1633 if (server
.maxmemory
) freeMemoryIfNeeded();
1635 /* Handle the multi bulk command type. This is an alternative protocol
1636 * supported by Redis in order to receive commands that are composed of
1637 * multiple binary-safe "bulk" arguments. The latency of processing is
1638 * a bit higher but this allows things like multi-sets, so if this
1639 * protocol is used only for MSET and similar commands this is a big win. */
1640 if (c
->multibulk
== 0 && c
->argc
== 1 && ((char*)(c
->argv
[0]->ptr
))[0] == '*') {
1641 c
->multibulk
= atoi(((char*)c
->argv
[0]->ptr
)+1);
1642 if (c
->multibulk
<= 0) {
1646 decrRefCount(c
->argv
[c
->argc
-1]);
1650 } else if (c
->multibulk
) {
1651 if (c
->bulklen
== -1) {
1652 if (((char*)c
->argv
[0]->ptr
)[0] != '$') {
1653 addReplySds(c
,sdsnew("-ERR multi bulk protocol error\r\n"));
1657 int bulklen
= atoi(((char*)c
->argv
[0]->ptr
)+1);
1658 decrRefCount(c
->argv
[0]);
1659 if (bulklen
< 0 || bulklen
> 1024*1024*1024) {
1661 addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n"));
1666 c
->bulklen
= bulklen
+2; /* add two bytes for CR+LF */
1670 c
->mbargv
= zrealloc(c
->mbargv
,(sizeof(robj
*))*(c
->mbargc
+1));
1671 c
->mbargv
[c
->mbargc
] = c
->argv
[0];
1675 if (c
->multibulk
== 0) {
1679 /* Here we need to swap the multi-bulk argc/argv with the
1680 * normal argc/argv of the client structure. */
1682 c
->argv
= c
->mbargv
;
1683 c
->mbargv
= auxargv
;
1686 c
->argc
= c
->mbargc
;
1687 c
->mbargc
= auxargc
;
1689 /* We need to set bulklen to something different than -1
1690 * in order for the code below to process the command without
1691 * to try to read the last argument of a bulk command as
1692 * a special argument. */
1694 /* continue below and process the command */
1701 /* -- end of multi bulk commands processing -- */
1703 /* The QUIT command is handled as a special case. Normal command
1704 * procs are unable to close the client connection safely */
1705 if (!strcasecmp(c
->argv
[0]->ptr
,"quit")) {
1709 cmd
= lookupCommand(c
->argv
[0]->ptr
);
1711 addReplySds(c
,sdsnew("-ERR unknown command\r\n"));
1714 } else if ((cmd
->arity
> 0 && cmd
->arity
!= c
->argc
) ||
1715 (c
->argc
< -cmd
->arity
)) {
1717 sdscatprintf(sdsempty(),
1718 "-ERR wrong number of arguments for '%s' command\r\n",
1722 } else if (server
.maxmemory
&& cmd
->flags
& REDIS_CMD_DENYOOM
&& zmalloc_used_memory() > server
.maxmemory
) {
1723 addReplySds(c
,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1726 } else if (cmd
->flags
& REDIS_CMD_BULK
&& c
->bulklen
== -1) {
1727 int bulklen
= atoi(c
->argv
[c
->argc
-1]->ptr
);
1729 decrRefCount(c
->argv
[c
->argc
-1]);
1730 if (bulklen
< 0 || bulklen
> 1024*1024*1024) {
1732 addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n"));
1737 c
->bulklen
= bulklen
+2; /* add two bytes for CR+LF */
1738 /* It is possible that the bulk read is already in the
1739 * buffer. Check this condition and handle it accordingly.
1740 * This is just a fast path, alternative to call processInputBuffer().
1741 * It's a good idea since the code is small and this condition
1742 * happens most of the times. */
1743 if ((signed)sdslen(c
->querybuf
) >= c
->bulklen
) {
1744 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
1746 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
1751 /* Let's try to share objects on the command arguments vector */
1752 if (server
.shareobjects
) {
1754 for(j
= 1; j
< c
->argc
; j
++)
1755 c
->argv
[j
] = tryObjectSharing(c
->argv
[j
]);
1757 /* Let's try to encode the bulk object to save space. */
1758 if (cmd
->flags
& REDIS_CMD_BULK
)
1759 tryObjectEncoding(c
->argv
[c
->argc
-1]);
1761 /* Check if the user is authenticated */
1762 if (server
.requirepass
&& !c
->authenticated
&& cmd
->proc
!= authCommand
) {
1763 addReplySds(c
,sdsnew("-ERR operation not permitted\r\n"));
1768 /* Exec the command */
1769 dirty
= server
.dirty
;
1771 if (server
.appendonly
&& server
.dirty
-dirty
)
1772 feedAppendOnlyFile(cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1773 if (server
.dirty
-dirty
&& listLength(server
.slaves
))
1774 replicationFeedSlaves(server
.slaves
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1775 if (listLength(server
.monitors
))
1776 replicationFeedSlaves(server
.monitors
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1777 server
.stat_numcommands
++;
1779 /* Prepare the client for the next command */
1780 if (c
->flags
& REDIS_CLOSE
) {
1788 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
) {
1792 /* (args*2)+1 is enough room for args, spaces, newlines */
1793 robj
*static_outv
[REDIS_STATIC_ARGS
*2+1];
1795 if (argc
<= REDIS_STATIC_ARGS
) {
1798 outv
= zmalloc(sizeof(robj
*)*(argc
*2+1));
1801 for (j
= 0; j
< argc
; j
++) {
1802 if (j
!= 0) outv
[outc
++] = shared
.space
;
1803 if ((cmd
->flags
& REDIS_CMD_BULK
) && j
== argc
-1) {
1806 lenobj
= createObject(REDIS_STRING
,
1807 sdscatprintf(sdsempty(),"%lu\r\n",
1808 (unsigned long) stringObjectLen(argv
[j
])));
1809 lenobj
->refcount
= 0;
1810 outv
[outc
++] = lenobj
;
1812 outv
[outc
++] = argv
[j
];
1814 outv
[outc
++] = shared
.crlf
;
1816 /* Increment all the refcounts at start and decrement at end in order to
1817 * be sure to free objects if there is no slave in a replication state
1818 * able to be feed with commands */
1819 for (j
= 0; j
< outc
; j
++) incrRefCount(outv
[j
]);
1821 while((ln
= listYield(slaves
))) {
1822 redisClient
*slave
= ln
->value
;
1824 /* Don't feed slaves that are still waiting for BGSAVE to start */
1825 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) continue;
1827 /* Feed all the other slaves, MONITORs and so on */
1828 if (slave
->slaveseldb
!= dictid
) {
1832 case 0: selectcmd
= shared
.select0
; break;
1833 case 1: selectcmd
= shared
.select1
; break;
1834 case 2: selectcmd
= shared
.select2
; break;
1835 case 3: selectcmd
= shared
.select3
; break;
1836 case 4: selectcmd
= shared
.select4
; break;
1837 case 5: selectcmd
= shared
.select5
; break;
1838 case 6: selectcmd
= shared
.select6
; break;
1839 case 7: selectcmd
= shared
.select7
; break;
1840 case 8: selectcmd
= shared
.select8
; break;
1841 case 9: selectcmd
= shared
.select9
; break;
1843 selectcmd
= createObject(REDIS_STRING
,
1844 sdscatprintf(sdsempty(),"select %d\r\n",dictid
));
1845 selectcmd
->refcount
= 0;
1848 addReply(slave
,selectcmd
);
1849 slave
->slaveseldb
= dictid
;
1851 for (j
= 0; j
< outc
; j
++) addReply(slave
,outv
[j
]);
1853 for (j
= 0; j
< outc
; j
++) decrRefCount(outv
[j
]);
1854 if (outv
!= static_outv
) zfree(outv
);
1857 static void processInputBuffer(redisClient
*c
) {
1859 if (c
->bulklen
== -1) {
1860 /* Read the first line of the query */
1861 char *p
= strchr(c
->querybuf
,'\n');
1868 query
= c
->querybuf
;
1869 c
->querybuf
= sdsempty();
1870 querylen
= 1+(p
-(query
));
1871 if (sdslen(query
) > querylen
) {
1872 /* leave data after the first line of the query in the buffer */
1873 c
->querybuf
= sdscatlen(c
->querybuf
,query
+querylen
,sdslen(query
)-querylen
);
1875 *p
= '\0'; /* remove "\n" */
1876 if (*(p
-1) == '\r') *(p
-1) = '\0'; /* and "\r" if any */
1877 sdsupdatelen(query
);
1879 /* Now we can split the query in arguments */
1880 if (sdslen(query
) == 0) {
1881 /* Ignore empty query */
1883 if (sdslen(c
->querybuf
)) goto again
;
1886 argv
= sdssplitlen(query
,sdslen(query
)," ",1,&argc
);
1889 if (c
->argv
) zfree(c
->argv
);
1890 c
->argv
= zmalloc(sizeof(robj
*)*argc
);
1892 for (j
= 0; j
< argc
; j
++) {
1893 if (sdslen(argv
[j
])) {
1894 c
->argv
[c
->argc
] = createObject(REDIS_STRING
,argv
[j
]);
1902 /* Execute the command. If the client is still valid
1903 * after processCommand() return and there is something
1904 * on the query buffer try to process the next command. */
1905 if (processCommand(c
) && sdslen(c
->querybuf
)) goto again
;
1907 /* Nothing to process, argc == 0. Just process the query
1908 * buffer if it's not empty or return to the caller */
1909 if (sdslen(c
->querybuf
)) goto again
;
1912 } else if (sdslen(c
->querybuf
) >= REDIS_REQUEST_MAX_SIZE
) {
1913 redisLog(REDIS_DEBUG
, "Client protocol error");
1918 /* Bulk read handling. Note that if we are at this point
1919 the client already sent a command terminated with a newline,
1920 we are reading the bulk data that is actually the last
1921 argument of the command. */
1922 int qbl
= sdslen(c
->querybuf
);
1924 if (c
->bulklen
<= qbl
) {
1925 /* Copy everything but the final CRLF as final argument */
1926 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
1928 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
1929 /* Process the command. If the client is still valid after
1930 * the processing and there is more data in the buffer
1931 * try to parse it. */
1932 if (processCommand(c
) && sdslen(c
->querybuf
)) goto again
;
1938 static void readQueryFromClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1939 redisClient
*c
= (redisClient
*) privdata
;
1940 char buf
[REDIS_IOBUF_LEN
];
1943 REDIS_NOTUSED(mask
);
1945 nread
= read(fd
, buf
, REDIS_IOBUF_LEN
);
1947 if (errno
== EAGAIN
) {
1950 redisLog(REDIS_DEBUG
, "Reading from client: %s",strerror(errno
));
1954 } else if (nread
== 0) {
1955 redisLog(REDIS_DEBUG
, "Client closed connection");
1960 c
->querybuf
= sdscatlen(c
->querybuf
, buf
, nread
);
1961 c
->lastinteraction
= time(NULL
);
1965 processInputBuffer(c
);
1968 static int selectDb(redisClient
*c
, int id
) {
1969 if (id
< 0 || id
>= server
.dbnum
)
1971 c
->db
= &server
.db
[id
];
1975 static void *dupClientReplyValue(void *o
) {
1976 incrRefCount((robj
*)o
);
1980 static redisClient
*createClient(int fd
) {
1981 redisClient
*c
= zmalloc(sizeof(*c
));
1983 anetNonBlock(NULL
,fd
);
1984 anetTcpNoDelay(NULL
,fd
);
1985 if (!c
) return NULL
;
1988 c
->querybuf
= sdsempty();
1997 c
->lastinteraction
= time(NULL
);
1998 c
->authenticated
= 0;
1999 c
->replstate
= REDIS_REPL_NONE
;
2000 c
->reply
= listCreate();
2001 listSetFreeMethod(c
->reply
,decrRefCount
);
2002 listSetDupMethod(c
->reply
,dupClientReplyValue
);
2003 if (aeCreateFileEvent(server
.el
, c
->fd
, AE_READABLE
,
2004 readQueryFromClient
, c
) == AE_ERR
) {
2008 listAddNodeTail(server
.clients
,c
);
2012 static void addReply(redisClient
*c
, robj
*obj
) {
2013 if (listLength(c
->reply
) == 0 &&
2014 (c
->replstate
== REDIS_REPL_NONE
||
2015 c
->replstate
== REDIS_REPL_ONLINE
) &&
2016 aeCreateFileEvent(server
.el
, c
->fd
, AE_WRITABLE
,
2017 sendReplyToClient
, c
) == AE_ERR
) return;
2018 listAddNodeTail(c
->reply
,getDecodedObject(obj
));
2021 static void addReplySds(redisClient
*c
, sds s
) {
2022 robj
*o
= createObject(REDIS_STRING
,s
);
2027 static void addReplyDouble(redisClient
*c
, double d
) {
2030 snprintf(buf
,sizeof(buf
),"%.17g",d
);
2031 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n%s\r\n",
2032 (unsigned long) strlen(buf
),buf
));
2035 static void addReplyBulkLen(redisClient
*c
, robj
*obj
) {
2038 if (obj
->encoding
== REDIS_ENCODING_RAW
) {
2039 len
= sdslen(obj
->ptr
);
2041 long n
= (long)obj
->ptr
;
2048 while((n
= n
/10) != 0) {
2052 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n",(unsigned long)len
));
2055 static void acceptHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
2060 REDIS_NOTUSED(mask
);
2061 REDIS_NOTUSED(privdata
);
2063 cfd
= anetAccept(server
.neterr
, fd
, cip
, &cport
);
2064 if (cfd
== AE_ERR
) {
2065 redisLog(REDIS_DEBUG
,"Accepting client connection: %s", server
.neterr
);
2068 redisLog(REDIS_DEBUG
,"Accepted %s:%d", cip
, cport
);
2069 if ((c
= createClient(cfd
)) == NULL
) {
2070 redisLog(REDIS_WARNING
,"Error allocating resoures for the client");
2071 close(cfd
); /* May be already closed, just ingore errors */
2074 /* If maxclient directive is set and this is one client more... close the
2075 * connection. Note that we create the client instead to check before
2076 * for this condition, since now the socket is already set in nonblocking
2077 * mode and we can send an error for free using the Kernel I/O */
2078 if (server
.maxclients
&& listLength(server
.clients
) > server
.maxclients
) {
2079 char *err
= "-ERR max number of clients reached\r\n";
2081 /* That's a best effort error message, don't check write errors */
2082 if (write(c
->fd
,err
,strlen(err
)) == -1) {
2083 /* Nothing to do, Just to avoid the warning... */
2088 server
.stat_numconnections
++;
2091 /* ======================= Redis objects implementation ===================== */
2093 static robj
*createObject(int type
, void *ptr
) {
2096 if (listLength(server
.objfreelist
)) {
2097 listNode
*head
= listFirst(server
.objfreelist
);
2098 o
= listNodeValue(head
);
2099 listDelNode(server
.objfreelist
,head
);
2101 o
= zmalloc(sizeof(*o
));
2104 o
->encoding
= REDIS_ENCODING_RAW
;
2110 static robj
*createStringObject(char *ptr
, size_t len
) {
2111 return createObject(REDIS_STRING
,sdsnewlen(ptr
,len
));
2114 static robj
*createListObject(void) {
2115 list
*l
= listCreate();
2117 listSetFreeMethod(l
,decrRefCount
);
2118 return createObject(REDIS_LIST
,l
);
2121 static robj
*createSetObject(void) {
2122 dict
*d
= dictCreate(&setDictType
,NULL
);
2123 return createObject(REDIS_SET
,d
);
2126 static robj
*createZsetObject(void) {
2127 zset
*zs
= zmalloc(sizeof(*zs
));
2129 zs
->dict
= dictCreate(&zsetDictType
,NULL
);
2130 zs
->zsl
= zslCreate();
2131 return createObject(REDIS_ZSET
,zs
);
2134 static void freeStringObject(robj
*o
) {
2135 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2140 static void freeListObject(robj
*o
) {
2141 listRelease((list
*) o
->ptr
);
2144 static void freeSetObject(robj
*o
) {
2145 dictRelease((dict
*) o
->ptr
);
2148 static void freeZsetObject(robj
*o
) {
2151 dictRelease(zs
->dict
);
2156 static void freeHashObject(robj
*o
) {
2157 dictRelease((dict
*) o
->ptr
);
2160 static void incrRefCount(robj
*o
) {
2162 #ifdef DEBUG_REFCOUNT
2163 if (o
->type
== REDIS_STRING
)
2164 printf("Increment '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
);
2168 static void decrRefCount(void *obj
) {
2171 #ifdef DEBUG_REFCOUNT
2172 if (o
->type
== REDIS_STRING
)
2173 printf("Decrement '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
-1);
2175 if (--(o
->refcount
) == 0) {
2177 case REDIS_STRING
: freeStringObject(o
); break;
2178 case REDIS_LIST
: freeListObject(o
); break;
2179 case REDIS_SET
: freeSetObject(o
); break;
2180 case REDIS_ZSET
: freeZsetObject(o
); break;
2181 case REDIS_HASH
: freeHashObject(o
); break;
2182 default: redisAssert(0 != 0); break;
2184 if (listLength(server
.objfreelist
) > REDIS_OBJFREELIST_MAX
||
2185 !listAddNodeHead(server
.objfreelist
,o
))
2190 static robj
*lookupKey(redisDb
*db
, robj
*key
) {
2191 dictEntry
*de
= dictFind(db
->dict
,key
);
2192 return de
? dictGetEntryVal(de
) : NULL
;
2195 static robj
*lookupKeyRead(redisDb
*db
, robj
*key
) {
2196 expireIfNeeded(db
,key
);
2197 return lookupKey(db
,key
);
2200 static robj
*lookupKeyWrite(redisDb
*db
, robj
*key
) {
2201 deleteIfVolatile(db
,key
);
2202 return lookupKey(db
,key
);
2205 static int deleteKey(redisDb
*db
, robj
*key
) {
2208 /* We need to protect key from destruction: after the first dictDelete()
2209 * it may happen that 'key' is no longer valid if we don't increment
2210 * it's count. This may happen when we get the object reference directly
2211 * from the hash table with dictRandomKey() or dict iterators */
2213 if (dictSize(db
->expires
)) dictDelete(db
->expires
,key
);
2214 retval
= dictDelete(db
->dict
,key
);
2217 return retval
== DICT_OK
;
2220 /* Try to share an object against the shared objects pool */
2221 static robj
*tryObjectSharing(robj
*o
) {
2222 struct dictEntry
*de
;
2225 if (o
== NULL
|| server
.shareobjects
== 0) return o
;
2227 redisAssert(o
->type
== REDIS_STRING
);
2228 de
= dictFind(server
.sharingpool
,o
);
2230 robj
*shared
= dictGetEntryKey(de
);
2232 c
= ((unsigned long) dictGetEntryVal(de
))+1;
2233 dictGetEntryVal(de
) = (void*) c
;
2234 incrRefCount(shared
);
2238 /* Here we are using a stream algorihtm: Every time an object is
2239 * shared we increment its count, everytime there is a miss we
2240 * recrement the counter of a random object. If this object reaches
2241 * zero we remove the object and put the current object instead. */
2242 if (dictSize(server
.sharingpool
) >=
2243 server
.sharingpoolsize
) {
2244 de
= dictGetRandomKey(server
.sharingpool
);
2245 redisAssert(de
!= NULL
);
2246 c
= ((unsigned long) dictGetEntryVal(de
))-1;
2247 dictGetEntryVal(de
) = (void*) c
;
2249 dictDelete(server
.sharingpool
,de
->key
);
2252 c
= 0; /* If the pool is empty we want to add this object */
2257 retval
= dictAdd(server
.sharingpool
,o
,(void*)1);
2258 redisAssert(retval
== DICT_OK
);
2265 /* Check if the nul-terminated string 's' can be represented by a long
2266 * (that is, is a number that fits into long without any other space or
2267 * character before or after the digits).
2269 * If so, the function returns REDIS_OK and *longval is set to the value
2270 * of the number. Otherwise REDIS_ERR is returned */
2271 static int isStringRepresentableAsLong(sds s
, long *longval
) {
2272 char buf
[32], *endptr
;
2276 value
= strtol(s
, &endptr
, 10);
2277 if (endptr
[0] != '\0') return REDIS_ERR
;
2278 slen
= snprintf(buf
,32,"%ld",value
);
2280 /* If the number converted back into a string is not identical
2281 * then it's not possible to encode the string as integer */
2282 if (sdslen(s
) != (unsigned)slen
|| memcmp(buf
,s
,slen
)) return REDIS_ERR
;
2283 if (longval
) *longval
= value
;
2287 /* Try to encode a string object in order to save space */
2288 static int tryObjectEncoding(robj
*o
) {
2292 if (o
->encoding
!= REDIS_ENCODING_RAW
)
2293 return REDIS_ERR
; /* Already encoded */
2295 /* It's not save to encode shared objects: shared objects can be shared
2296 * everywhere in the "object space" of Redis. Encoded objects can only
2297 * appear as "values" (and not, for instance, as keys) */
2298 if (o
->refcount
> 1) return REDIS_ERR
;
2300 /* Currently we try to encode only strings */
2301 redisAssert(o
->type
== REDIS_STRING
);
2303 /* Check if we can represent this string as a long integer */
2304 if (isStringRepresentableAsLong(s
,&value
) == REDIS_ERR
) return REDIS_ERR
;
2306 /* Ok, this object can be encoded */
2307 o
->encoding
= REDIS_ENCODING_INT
;
2309 o
->ptr
= (void*) value
;
2313 /* Get a decoded version of an encoded object (returned as a new object).
2314 * If the object is already raw-encoded just increment the ref count. */
2315 static robj
*getDecodedObject(robj
*o
) {
2318 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2322 if (o
->type
== REDIS_STRING
&& o
->encoding
== REDIS_ENCODING_INT
) {
2325 snprintf(buf
,32,"%ld",(long)o
->ptr
);
2326 dec
= createStringObject(buf
,strlen(buf
));
2329 redisAssert(1 != 1);
2333 /* Compare two string objects via strcmp() or alike.
2334 * Note that the objects may be integer-encoded. In such a case we
2335 * use snprintf() to get a string representation of the numbers on the stack
2336 * and compare the strings, it's much faster than calling getDecodedObject().
2338 * Important note: if objects are not integer encoded, but binary-safe strings,
2339 * sdscmp() from sds.c will apply memcmp() so this function ca be considered
2341 static int compareStringObjects(robj
*a
, robj
*b
) {
2342 redisAssert(a
->type
== REDIS_STRING
&& b
->type
== REDIS_STRING
);
2343 char bufa
[128], bufb
[128], *astr
, *bstr
;
2346 if (a
== b
) return 0;
2347 if (a
->encoding
!= REDIS_ENCODING_RAW
) {
2348 snprintf(bufa
,sizeof(bufa
),"%ld",(long) a
->ptr
);
2354 if (b
->encoding
!= REDIS_ENCODING_RAW
) {
2355 snprintf(bufb
,sizeof(bufb
),"%ld",(long) b
->ptr
);
2361 return bothsds
? sdscmp(astr
,bstr
) : strcmp(astr
,bstr
);
2364 static size_t stringObjectLen(robj
*o
) {
2365 redisAssert(o
->type
== REDIS_STRING
);
2366 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2367 return sdslen(o
->ptr
);
2371 return snprintf(buf
,32,"%ld",(long)o
->ptr
);
2375 /*============================ DB saving/loading ============================ */
2377 static int rdbSaveType(FILE *fp
, unsigned char type
) {
2378 if (fwrite(&type
,1,1,fp
) == 0) return -1;
2382 static int rdbSaveTime(FILE *fp
, time_t t
) {
2383 int32_t t32
= (int32_t) t
;
2384 if (fwrite(&t32
,4,1,fp
) == 0) return -1;
2388 /* check rdbLoadLen() comments for more info */
2389 static int rdbSaveLen(FILE *fp
, uint32_t len
) {
2390 unsigned char buf
[2];
2393 /* Save a 6 bit len */
2394 buf
[0] = (len
&0xFF)|(REDIS_RDB_6BITLEN
<<6);
2395 if (fwrite(buf
,1,1,fp
) == 0) return -1;
2396 } else if (len
< (1<<14)) {
2397 /* Save a 14 bit len */
2398 buf
[0] = ((len
>>8)&0xFF)|(REDIS_RDB_14BITLEN
<<6);
2400 if (fwrite(buf
,2,1,fp
) == 0) return -1;
2402 /* Save a 32 bit len */
2403 buf
[0] = (REDIS_RDB_32BITLEN
<<6);
2404 if (fwrite(buf
,1,1,fp
) == 0) return -1;
2406 if (fwrite(&len
,4,1,fp
) == 0) return -1;
2411 /* String objects in the form "2391" "-100" without any space and with a
2412 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2413 * encoded as integers to save space */
2414 static int rdbTryIntegerEncoding(sds s
, unsigned char *enc
) {
2416 char *endptr
, buf
[32];
2418 /* Check if it's possible to encode this value as a number */
2419 value
= strtoll(s
, &endptr
, 10);
2420 if (endptr
[0] != '\0') return 0;
2421 snprintf(buf
,32,"%lld",value
);
2423 /* If the number converted back into a string is not identical
2424 * then it's not possible to encode the string as integer */
2425 if (strlen(buf
) != sdslen(s
) || memcmp(buf
,s
,sdslen(s
))) return 0;
2427 /* Finally check if it fits in our ranges */
2428 if (value
>= -(1<<7) && value
<= (1<<7)-1) {
2429 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT8
;
2430 enc
[1] = value
&0xFF;
2432 } else if (value
>= -(1<<15) && value
<= (1<<15)-1) {
2433 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT16
;
2434 enc
[1] = value
&0xFF;
2435 enc
[2] = (value
>>8)&0xFF;
2437 } else if (value
>= -((long long)1<<31) && value
<= ((long long)1<<31)-1) {
2438 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT32
;
2439 enc
[1] = value
&0xFF;
2440 enc
[2] = (value
>>8)&0xFF;
2441 enc
[3] = (value
>>16)&0xFF;
2442 enc
[4] = (value
>>24)&0xFF;
2449 static int rdbSaveLzfStringObject(FILE *fp
, robj
*obj
) {
2450 unsigned int comprlen
, outlen
;
2454 /* We require at least four bytes compression for this to be worth it */
2455 outlen
= sdslen(obj
->ptr
)-4;
2456 if (outlen
<= 0) return 0;
2457 if ((out
= zmalloc(outlen
+1)) == NULL
) return 0;
2458 comprlen
= lzf_compress(obj
->ptr
, sdslen(obj
->ptr
), out
, outlen
);
2459 if (comprlen
== 0) {
2463 /* Data compressed! Let's save it on disk */
2464 byte
= (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_LZF
;
2465 if (fwrite(&byte
,1,1,fp
) == 0) goto writeerr
;
2466 if (rdbSaveLen(fp
,comprlen
) == -1) goto writeerr
;
2467 if (rdbSaveLen(fp
,sdslen(obj
->ptr
)) == -1) goto writeerr
;
2468 if (fwrite(out
,comprlen
,1,fp
) == 0) goto writeerr
;
2477 /* Save a string objet as [len][data] on disk. If the object is a string
2478 * representation of an integer value we try to safe it in a special form */
2479 static int rdbSaveStringObjectRaw(FILE *fp
, robj
*obj
) {
2483 len
= sdslen(obj
->ptr
);
2485 /* Try integer encoding */
2487 unsigned char buf
[5];
2488 if ((enclen
= rdbTryIntegerEncoding(obj
->ptr
,buf
)) > 0) {
2489 if (fwrite(buf
,enclen
,1,fp
) == 0) return -1;
2494 /* Try LZF compression - under 20 bytes it's unable to compress even
2495 * aaaaaaaaaaaaaaaaaa so skip it */
2499 retval
= rdbSaveLzfStringObject(fp
,obj
);
2500 if (retval
== -1) return -1;
2501 if (retval
> 0) return 0;
2502 /* retval == 0 means data can't be compressed, save the old way */
2505 /* Store verbatim */
2506 if (rdbSaveLen(fp
,len
) == -1) return -1;
2507 if (len
&& fwrite(obj
->ptr
,len
,1,fp
) == 0) return -1;
2511 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2512 static int rdbSaveStringObject(FILE *fp
, robj
*obj
) {
2515 obj
= getDecodedObject(obj
);
2516 retval
= rdbSaveStringObjectRaw(fp
,obj
);
2521 /* Save a double value. Doubles are saved as strings prefixed by an unsigned
2522 * 8 bit integer specifing the length of the representation.
2523 * This 8 bit integer has special values in order to specify the following
2529 static int rdbSaveDoubleValue(FILE *fp
, double val
) {
2530 unsigned char buf
[128];
2536 } else if (!isfinite(val
)) {
2538 buf
[0] = (val
< 0) ? 255 : 254;
2540 snprintf((char*)buf
+1,sizeof(buf
)-1,"%.17g",val
);
2541 buf
[0] = strlen((char*)buf
+1);
2544 if (fwrite(buf
,len
,1,fp
) == 0) return -1;
2548 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2549 static int rdbSave(char *filename
) {
2550 dictIterator
*di
= NULL
;
2555 time_t now
= time(NULL
);
2557 snprintf(tmpfile
,256,"temp-%d.rdb", (int) getpid());
2558 fp
= fopen(tmpfile
,"w");
2560 redisLog(REDIS_WARNING
, "Failed saving the DB: %s", strerror(errno
));
2563 if (fwrite("REDIS0001",9,1,fp
) == 0) goto werr
;
2564 for (j
= 0; j
< server
.dbnum
; j
++) {
2565 redisDb
*db
= server
.db
+j
;
2567 if (dictSize(d
) == 0) continue;
2568 di
= dictGetIterator(d
);
2574 /* Write the SELECT DB opcode */
2575 if (rdbSaveType(fp
,REDIS_SELECTDB
) == -1) goto werr
;
2576 if (rdbSaveLen(fp
,j
) == -1) goto werr
;
2578 /* Iterate this DB writing every entry */
2579 while((de
= dictNext(di
)) != NULL
) {
2580 robj
*key
= dictGetEntryKey(de
);
2581 robj
*o
= dictGetEntryVal(de
);
2582 time_t expiretime
= getExpire(db
,key
);
2584 /* Save the expire time */
2585 if (expiretime
!= -1) {
2586 /* If this key is already expired skip it */
2587 if (expiretime
< now
) continue;
2588 if (rdbSaveType(fp
,REDIS_EXPIRETIME
) == -1) goto werr
;
2589 if (rdbSaveTime(fp
,expiretime
) == -1) goto werr
;
2591 /* Save the key and associated value */
2592 if (rdbSaveType(fp
,o
->type
) == -1) goto werr
;
2593 if (rdbSaveStringObject(fp
,key
) == -1) goto werr
;
2594 if (o
->type
== REDIS_STRING
) {
2595 /* Save a string value */
2596 if (rdbSaveStringObject(fp
,o
) == -1) goto werr
;
2597 } else if (o
->type
== REDIS_LIST
) {
2598 /* Save a list value */
2599 list
*list
= o
->ptr
;
2603 if (rdbSaveLen(fp
,listLength(list
)) == -1) goto werr
;
2604 while((ln
= listYield(list
))) {
2605 robj
*eleobj
= listNodeValue(ln
);
2607 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
2609 } else if (o
->type
== REDIS_SET
) {
2610 /* Save a set value */
2612 dictIterator
*di
= dictGetIterator(set
);
2615 if (rdbSaveLen(fp
,dictSize(set
)) == -1) goto werr
;
2616 while((de
= dictNext(di
)) != NULL
) {
2617 robj
*eleobj
= dictGetEntryKey(de
);
2619 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
2621 dictReleaseIterator(di
);
2622 } else if (o
->type
== REDIS_ZSET
) {
2623 /* Save a set value */
2625 dictIterator
*di
= dictGetIterator(zs
->dict
);
2628 if (rdbSaveLen(fp
,dictSize(zs
->dict
)) == -1) goto werr
;
2629 while((de
= dictNext(di
)) != NULL
) {
2630 robj
*eleobj
= dictGetEntryKey(de
);
2631 double *score
= dictGetEntryVal(de
);
2633 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
2634 if (rdbSaveDoubleValue(fp
,*score
) == -1) goto werr
;
2636 dictReleaseIterator(di
);
2638 redisAssert(0 != 0);
2641 dictReleaseIterator(di
);
2644 if (rdbSaveType(fp
,REDIS_EOF
) == -1) goto werr
;
2646 /* Make sure data will not remain on the OS's output buffers */
2651 /* Use RENAME to make sure the DB file is changed atomically only
2652 * if the generate DB file is ok. */
2653 if (rename(tmpfile
,filename
) == -1) {
2654 redisLog(REDIS_WARNING
,"Error moving temp DB file on the final destination: %s", strerror(errno
));
2658 redisLog(REDIS_NOTICE
,"DB saved on disk");
2660 server
.lastsave
= time(NULL
);
2666 redisLog(REDIS_WARNING
,"Write error saving DB on disk: %s", strerror(errno
));
2667 if (di
) dictReleaseIterator(di
);
2671 static int rdbSaveBackground(char *filename
) {
2674 if (server
.bgsavechildpid
!= -1) return REDIS_ERR
;
2675 if ((childpid
= fork()) == 0) {
2678 if (rdbSave(filename
) == REDIS_OK
) {
2685 if (childpid
== -1) {
2686 redisLog(REDIS_WARNING
,"Can't save in background: fork: %s",
2690 redisLog(REDIS_NOTICE
,"Background saving started by pid %d",childpid
);
2691 server
.bgsavechildpid
= childpid
;
2694 return REDIS_OK
; /* unreached */
2697 static void rdbRemoveTempFile(pid_t childpid
) {
2700 snprintf(tmpfile
,256,"temp-%d.rdb", (int) childpid
);
2704 static int rdbLoadType(FILE *fp
) {
2706 if (fread(&type
,1,1,fp
) == 0) return -1;
2710 static time_t rdbLoadTime(FILE *fp
) {
2712 if (fread(&t32
,4,1,fp
) == 0) return -1;
2713 return (time_t) t32
;
2716 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2717 * of this file for a description of how this are stored on disk.
2719 * isencoded is set to 1 if the readed length is not actually a length but
2720 * an "encoding type", check the above comments for more info */
2721 static uint32_t rdbLoadLen(FILE *fp
, int rdbver
, int *isencoded
) {
2722 unsigned char buf
[2];
2725 if (isencoded
) *isencoded
= 0;
2727 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
2732 if (fread(buf
,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2733 type
= (buf
[0]&0xC0)>>6;
2734 if (type
== REDIS_RDB_6BITLEN
) {
2735 /* Read a 6 bit len */
2737 } else if (type
== REDIS_RDB_ENCVAL
) {
2738 /* Read a 6 bit len encoding type */
2739 if (isencoded
) *isencoded
= 1;
2741 } else if (type
== REDIS_RDB_14BITLEN
) {
2742 /* Read a 14 bit len */
2743 if (fread(buf
+1,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2744 return ((buf
[0]&0x3F)<<8)|buf
[1];
2746 /* Read a 32 bit len */
2747 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
2753 static robj
*rdbLoadIntegerObject(FILE *fp
, int enctype
) {
2754 unsigned char enc
[4];
2757 if (enctype
== REDIS_RDB_ENC_INT8
) {
2758 if (fread(enc
,1,1,fp
) == 0) return NULL
;
2759 val
= (signed char)enc
[0];
2760 } else if (enctype
== REDIS_RDB_ENC_INT16
) {
2762 if (fread(enc
,2,1,fp
) == 0) return NULL
;
2763 v
= enc
[0]|(enc
[1]<<8);
2765 } else if (enctype
== REDIS_RDB_ENC_INT32
) {
2767 if (fread(enc
,4,1,fp
) == 0) return NULL
;
2768 v
= enc
[0]|(enc
[1]<<8)|(enc
[2]<<16)|(enc
[3]<<24);
2771 val
= 0; /* anti-warning */
2774 return createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",val
));
2777 static robj
*rdbLoadLzfStringObject(FILE*fp
, int rdbver
) {
2778 unsigned int len
, clen
;
2779 unsigned char *c
= NULL
;
2782 if ((clen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2783 if ((len
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2784 if ((c
= zmalloc(clen
)) == NULL
) goto err
;
2785 if ((val
= sdsnewlen(NULL
,len
)) == NULL
) goto err
;
2786 if (fread(c
,clen
,1,fp
) == 0) goto err
;
2787 if (lzf_decompress(c
,clen
,val
,len
) == 0) goto err
;
2789 return createObject(REDIS_STRING
,val
);
2796 static robj
*rdbLoadStringObject(FILE*fp
, int rdbver
) {
2801 len
= rdbLoadLen(fp
,rdbver
,&isencoded
);
2804 case REDIS_RDB_ENC_INT8
:
2805 case REDIS_RDB_ENC_INT16
:
2806 case REDIS_RDB_ENC_INT32
:
2807 return tryObjectSharing(rdbLoadIntegerObject(fp
,len
));
2808 case REDIS_RDB_ENC_LZF
:
2809 return tryObjectSharing(rdbLoadLzfStringObject(fp
,rdbver
));
2815 if (len
== REDIS_RDB_LENERR
) return NULL
;
2816 val
= sdsnewlen(NULL
,len
);
2817 if (len
&& fread(val
,len
,1,fp
) == 0) {
2821 return tryObjectSharing(createObject(REDIS_STRING
,val
));
2824 /* For information about double serialization check rdbSaveDoubleValue() */
2825 static int rdbLoadDoubleValue(FILE *fp
, double *val
) {
2829 if (fread(&len
,1,1,fp
) == 0) return -1;
2831 case 255: *val
= R_NegInf
; return 0;
2832 case 254: *val
= R_PosInf
; return 0;
2833 case 253: *val
= R_Nan
; return 0;
2835 if (fread(buf
,len
,1,fp
) == 0) return -1;
2837 sscanf(buf
, "%lg", val
);
2842 static int rdbLoad(char *filename
) {
2844 robj
*keyobj
= NULL
;
2846 int type
, retval
, rdbver
;
2847 dict
*d
= server
.db
[0].dict
;
2848 redisDb
*db
= server
.db
+0;
2850 time_t expiretime
= -1, now
= time(NULL
);
2852 fp
= fopen(filename
,"r");
2853 if (!fp
) return REDIS_ERR
;
2854 if (fread(buf
,9,1,fp
) == 0) goto eoferr
;
2856 if (memcmp(buf
,"REDIS",5) != 0) {
2858 redisLog(REDIS_WARNING
,"Wrong signature trying to load DB from file");
2861 rdbver
= atoi(buf
+5);
2864 redisLog(REDIS_WARNING
,"Can't handle RDB format version %d",rdbver
);
2871 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
2872 if (type
== REDIS_EXPIRETIME
) {
2873 if ((expiretime
= rdbLoadTime(fp
)) == -1) goto eoferr
;
2874 /* We read the time so we need to read the object type again */
2875 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
2877 if (type
== REDIS_EOF
) break;
2878 /* Handle SELECT DB opcode as a special case */
2879 if (type
== REDIS_SELECTDB
) {
2880 if ((dbid
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2882 if (dbid
>= (unsigned)server
.dbnum
) {
2883 redisLog(REDIS_WARNING
,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server
.dbnum
);
2886 db
= server
.db
+dbid
;
2891 if ((keyobj
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2893 if (type
== REDIS_STRING
) {
2894 /* Read string value */
2895 if ((o
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2896 tryObjectEncoding(o
);
2897 } else if (type
== REDIS_LIST
|| type
== REDIS_SET
) {
2898 /* Read list/set value */
2901 if ((listlen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2903 o
= (type
== REDIS_LIST
) ? createListObject() : createSetObject();
2904 /* Load every single element of the list/set */
2908 if ((ele
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2909 tryObjectEncoding(ele
);
2910 if (type
== REDIS_LIST
) {
2911 listAddNodeTail((list
*)o
->ptr
,ele
);
2913 dictAdd((dict
*)o
->ptr
,ele
,NULL
);
2916 } else if (type
== REDIS_ZSET
) {
2917 /* Read list/set value */
2921 if ((zsetlen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2923 o
= createZsetObject();
2925 /* Load every single element of the list/set */
2928 double *score
= zmalloc(sizeof(double));
2930 if ((ele
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2931 tryObjectEncoding(ele
);
2932 if (rdbLoadDoubleValue(fp
,score
) == -1) goto eoferr
;
2933 dictAdd(zs
->dict
,ele
,score
);
2934 zslInsert(zs
->zsl
,*score
,ele
);
2935 incrRefCount(ele
); /* added to skiplist */
2938 redisAssert(0 != 0);
2940 /* Add the new object in the hash table */
2941 retval
= dictAdd(d
,keyobj
,o
);
2942 if (retval
== DICT_ERR
) {
2943 redisLog(REDIS_WARNING
,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj
->ptr
);
2946 /* Set the expire time if needed */
2947 if (expiretime
!= -1) {
2948 setExpire(db
,keyobj
,expiretime
);
2949 /* Delete this key if already expired */
2950 if (expiretime
< now
) deleteKey(db
,keyobj
);
2958 eoferr
: /* unexpected end of file is handled here with a fatal exit */
2959 if (keyobj
) decrRefCount(keyobj
);
2960 redisLog(REDIS_WARNING
,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
2962 return REDIS_ERR
; /* Just to avoid warning */
2965 /*================================== Commands =============================== */
2967 static void authCommand(redisClient
*c
) {
2968 if (!server
.requirepass
|| !strcmp(c
->argv
[1]->ptr
, server
.requirepass
)) {
2969 c
->authenticated
= 1;
2970 addReply(c
,shared
.ok
);
2972 c
->authenticated
= 0;
2973 addReplySds(c
,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
2977 static void pingCommand(redisClient
*c
) {
2978 addReply(c
,shared
.pong
);
2981 static void echoCommand(redisClient
*c
) {
2982 addReplyBulkLen(c
,c
->argv
[1]);
2983 addReply(c
,c
->argv
[1]);
2984 addReply(c
,shared
.crlf
);
2987 /*=================================== Strings =============================== */
2989 static void setGenericCommand(redisClient
*c
, int nx
) {
2992 if (nx
) deleteIfVolatile(c
->db
,c
->argv
[1]);
2993 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2994 if (retval
== DICT_ERR
) {
2996 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2997 incrRefCount(c
->argv
[2]);
2999 addReply(c
,shared
.czero
);
3003 incrRefCount(c
->argv
[1]);
3004 incrRefCount(c
->argv
[2]);
3007 removeExpire(c
->db
,c
->argv
[1]);
3008 addReply(c
, nx
? shared
.cone
: shared
.ok
);
3011 static void setCommand(redisClient
*c
) {
3012 setGenericCommand(c
,0);
3015 static void setnxCommand(redisClient
*c
) {
3016 setGenericCommand(c
,1);
3019 static void getCommand(redisClient
*c
) {
3020 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3023 addReply(c
,shared
.nullbulk
);
3025 if (o
->type
!= REDIS_STRING
) {
3026 addReply(c
,shared
.wrongtypeerr
);
3028 addReplyBulkLen(c
,o
);
3030 addReply(c
,shared
.crlf
);
3035 static void getsetCommand(redisClient
*c
) {
3037 if (dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]) == DICT_ERR
) {
3038 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
3040 incrRefCount(c
->argv
[1]);
3042 incrRefCount(c
->argv
[2]);
3044 removeExpire(c
->db
,c
->argv
[1]);
3047 static void mgetCommand(redisClient
*c
) {
3050 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",c
->argc
-1));
3051 for (j
= 1; j
< c
->argc
; j
++) {
3052 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[j
]);
3054 addReply(c
,shared
.nullbulk
);
3056 if (o
->type
!= REDIS_STRING
) {
3057 addReply(c
,shared
.nullbulk
);
3059 addReplyBulkLen(c
,o
);
3061 addReply(c
,shared
.crlf
);
3067 static void msetGenericCommand(redisClient
*c
, int nx
) {
3068 int j
, busykeys
= 0;
3070 if ((c
->argc
% 2) == 0) {
3071 addReplySds(c
,sdsnew("-ERR wrong number of arguments for MSET\r\n"));
3074 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
3075 * set nothing at all if at least one already key exists. */
3077 for (j
= 1; j
< c
->argc
; j
+= 2) {
3078 if (lookupKeyWrite(c
->db
,c
->argv
[j
]) != NULL
) {
3084 addReply(c
, shared
.czero
);
3088 for (j
= 1; j
< c
->argc
; j
+= 2) {
3091 tryObjectEncoding(c
->argv
[j
+1]);
3092 retval
= dictAdd(c
->db
->dict
,c
->argv
[j
],c
->argv
[j
+1]);
3093 if (retval
== DICT_ERR
) {
3094 dictReplace(c
->db
->dict
,c
->argv
[j
],c
->argv
[j
+1]);
3095 incrRefCount(c
->argv
[j
+1]);
3097 incrRefCount(c
->argv
[j
]);
3098 incrRefCount(c
->argv
[j
+1]);
3100 removeExpire(c
->db
,c
->argv
[j
]);
3102 server
.dirty
+= (c
->argc
-1)/2;
3103 addReply(c
, nx
? shared
.cone
: shared
.ok
);
3106 static void msetCommand(redisClient
*c
) {
3107 msetGenericCommand(c
,0);
3110 static void msetnxCommand(redisClient
*c
) {
3111 msetGenericCommand(c
,1);
3114 static void incrDecrCommand(redisClient
*c
, long long incr
) {
3119 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3123 if (o
->type
!= REDIS_STRING
) {
3128 if (o
->encoding
== REDIS_ENCODING_RAW
)
3129 value
= strtoll(o
->ptr
, &eptr
, 10);
3130 else if (o
->encoding
== REDIS_ENCODING_INT
)
3131 value
= (long)o
->ptr
;
3133 redisAssert(1 != 1);
3138 o
= createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",value
));
3139 tryObjectEncoding(o
);
3140 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],o
);
3141 if (retval
== DICT_ERR
) {
3142 dictReplace(c
->db
->dict
,c
->argv
[1],o
);
3143 removeExpire(c
->db
,c
->argv
[1]);
3145 incrRefCount(c
->argv
[1]);
3148 addReply(c
,shared
.colon
);
3150 addReply(c
,shared
.crlf
);
3153 static void incrCommand(redisClient
*c
) {
3154 incrDecrCommand(c
,1);
3157 static void decrCommand(redisClient
*c
) {
3158 incrDecrCommand(c
,-1);
3161 static void incrbyCommand(redisClient
*c
) {
3162 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
3163 incrDecrCommand(c
,incr
);
3166 static void decrbyCommand(redisClient
*c
) {
3167 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
3168 incrDecrCommand(c
,-incr
);
3171 /* ========================= Type agnostic commands ========================= */
3173 static void delCommand(redisClient
*c
) {
3176 for (j
= 1; j
< c
->argc
; j
++) {
3177 if (deleteKey(c
->db
,c
->argv
[j
])) {
3184 addReply(c
,shared
.czero
);
3187 addReply(c
,shared
.cone
);
3190 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",deleted
));
3195 static void existsCommand(redisClient
*c
) {
3196 addReply(c
,lookupKeyRead(c
->db
,c
->argv
[1]) ? shared
.cone
: shared
.czero
);
3199 static void selectCommand(redisClient
*c
) {
3200 int id
= atoi(c
->argv
[1]->ptr
);
3202 if (selectDb(c
,id
) == REDIS_ERR
) {
3203 addReplySds(c
,sdsnew("-ERR invalid DB index\r\n"));
3205 addReply(c
,shared
.ok
);
3209 static void randomkeyCommand(redisClient
*c
) {
3213 de
= dictGetRandomKey(c
->db
->dict
);
3214 if (!de
|| expireIfNeeded(c
->db
,dictGetEntryKey(de
)) == 0) break;
3217 addReply(c
,shared
.plus
);
3218 addReply(c
,shared
.crlf
);
3220 addReply(c
,shared
.plus
);
3221 addReply(c
,dictGetEntryKey(de
));
3222 addReply(c
,shared
.crlf
);
3226 static void keysCommand(redisClient
*c
) {
3229 sds pattern
= c
->argv
[1]->ptr
;
3230 int plen
= sdslen(pattern
);
3231 unsigned long numkeys
= 0, keyslen
= 0;
3232 robj
*lenobj
= createObject(REDIS_STRING
,NULL
);
3234 di
= dictGetIterator(c
->db
->dict
);
3236 decrRefCount(lenobj
);
3237 while((de
= dictNext(di
)) != NULL
) {
3238 robj
*keyobj
= dictGetEntryKey(de
);
3240 sds key
= keyobj
->ptr
;
3241 if ((pattern
[0] == '*' && pattern
[1] == '\0') ||
3242 stringmatchlen(pattern
,plen
,key
,sdslen(key
),0)) {
3243 if (expireIfNeeded(c
->db
,keyobj
) == 0) {
3245 addReply(c
,shared
.space
);
3248 keyslen
+= sdslen(key
);
3252 dictReleaseIterator(di
);
3253 lenobj
->ptr
= sdscatprintf(sdsempty(),"$%lu\r\n",keyslen
+(numkeys
? (numkeys
-1) : 0));
3254 addReply(c
,shared
.crlf
);
3257 static void dbsizeCommand(redisClient
*c
) {
3259 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c
->db
->dict
)));
3262 static void lastsaveCommand(redisClient
*c
) {
3264 sdscatprintf(sdsempty(),":%lu\r\n",server
.lastsave
));
3267 static void typeCommand(redisClient
*c
) {
3271 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3276 case REDIS_STRING
: type
= "+string"; break;
3277 case REDIS_LIST
: type
= "+list"; break;
3278 case REDIS_SET
: type
= "+set"; break;
3279 case REDIS_ZSET
: type
= "+zset"; break;
3280 default: type
= "unknown"; break;
3283 addReplySds(c
,sdsnew(type
));
3284 addReply(c
,shared
.crlf
);
3287 static void saveCommand(redisClient
*c
) {
3288 if (server
.bgsavechildpid
!= -1) {
3289 addReplySds(c
,sdsnew("-ERR background save in progress\r\n"));
3292 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
3293 addReply(c
,shared
.ok
);
3295 addReply(c
,shared
.err
);
3299 static void bgsaveCommand(redisClient
*c
) {
3300 if (server
.bgsavechildpid
!= -1) {
3301 addReplySds(c
,sdsnew("-ERR background save already in progress\r\n"));
3304 if (rdbSaveBackground(server
.dbfilename
) == REDIS_OK
) {
3305 char *status
= "+Background saving started\r\n";
3306 addReplySds(c
,sdsnew(status
));
3308 addReply(c
,shared
.err
);
3312 static void shutdownCommand(redisClient
*c
) {
3313 redisLog(REDIS_WARNING
,"User requested shutdown, saving DB...");
3314 /* Kill the saving child if there is a background saving in progress.
3315 We want to avoid race conditions, for instance our saving child may
3316 overwrite the synchronous saving did by SHUTDOWN. */
3317 if (server
.bgsavechildpid
!= -1) {
3318 redisLog(REDIS_WARNING
,"There is a live saving child. Killing it!");
3319 kill(server
.bgsavechildpid
,SIGKILL
);
3320 rdbRemoveTempFile(server
.bgsavechildpid
);
3323 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
3324 if (server
.daemonize
)
3325 unlink(server
.pidfile
);
3326 redisLog(REDIS_WARNING
,"%zu bytes used at exit",zmalloc_used_memory());
3327 redisLog(REDIS_WARNING
,"Server exit now, bye bye...");
3330 /* Ooops.. error saving! The best we can do is to continue operating.
3331 * Note that if there was a background saving process, in the next
3332 * cron() Redis will be notified that the background saving aborted,
3333 * handling special stuff like slaves pending for synchronization... */
3334 redisLog(REDIS_WARNING
,"Error trying to save the DB, can't exit");
3335 addReplySds(c
,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3339 static void renameGenericCommand(redisClient
*c
, int nx
) {
3342 /* To use the same key as src and dst is probably an error */
3343 if (sdscmp(c
->argv
[1]->ptr
,c
->argv
[2]->ptr
) == 0) {
3344 addReply(c
,shared
.sameobjecterr
);
3348 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3350 addReply(c
,shared
.nokeyerr
);
3354 deleteIfVolatile(c
->db
,c
->argv
[2]);
3355 if (dictAdd(c
->db
->dict
,c
->argv
[2],o
) == DICT_ERR
) {
3358 addReply(c
,shared
.czero
);
3361 dictReplace(c
->db
->dict
,c
->argv
[2],o
);
3363 incrRefCount(c
->argv
[2]);
3365 deleteKey(c
->db
,c
->argv
[1]);
3367 addReply(c
,nx
? shared
.cone
: shared
.ok
);
3370 static void renameCommand(redisClient
*c
) {
3371 renameGenericCommand(c
,0);
3374 static void renamenxCommand(redisClient
*c
) {
3375 renameGenericCommand(c
,1);
3378 static void moveCommand(redisClient
*c
) {
3383 /* Obtain source and target DB pointers */
3386 if (selectDb(c
,atoi(c
->argv
[2]->ptr
)) == REDIS_ERR
) {
3387 addReply(c
,shared
.outofrangeerr
);
3391 selectDb(c
,srcid
); /* Back to the source DB */
3393 /* If the user is moving using as target the same
3394 * DB as the source DB it is probably an error. */
3396 addReply(c
,shared
.sameobjecterr
);
3400 /* Check if the element exists and get a reference */
3401 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3403 addReply(c
,shared
.czero
);
3407 /* Try to add the element to the target DB */
3408 deleteIfVolatile(dst
,c
->argv
[1]);
3409 if (dictAdd(dst
->dict
,c
->argv
[1],o
) == DICT_ERR
) {
3410 addReply(c
,shared
.czero
);
3413 incrRefCount(c
->argv
[1]);
3416 /* OK! key moved, free the entry in the source DB */
3417 deleteKey(src
,c
->argv
[1]);
3419 addReply(c
,shared
.cone
);
3422 /* =================================== Lists ================================ */
3423 static void pushGenericCommand(redisClient
*c
, int where
) {
3427 lobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3429 lobj
= createListObject();
3431 if (where
== REDIS_HEAD
) {
3432 listAddNodeHead(list
,c
->argv
[2]);
3434 listAddNodeTail(list
,c
->argv
[2]);
3436 dictAdd(c
->db
->dict
,c
->argv
[1],lobj
);
3437 incrRefCount(c
->argv
[1]);
3438 incrRefCount(c
->argv
[2]);
3440 if (lobj
->type
!= REDIS_LIST
) {
3441 addReply(c
,shared
.wrongtypeerr
);
3445 if (where
== REDIS_HEAD
) {
3446 listAddNodeHead(list
,c
->argv
[2]);
3448 listAddNodeTail(list
,c
->argv
[2]);
3450 incrRefCount(c
->argv
[2]);
3453 addReply(c
,shared
.ok
);
3456 static void lpushCommand(redisClient
*c
) {
3457 pushGenericCommand(c
,REDIS_HEAD
);
3460 static void rpushCommand(redisClient
*c
) {
3461 pushGenericCommand(c
,REDIS_TAIL
);
3464 static void llenCommand(redisClient
*c
) {
3468 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3470 addReply(c
,shared
.czero
);
3473 if (o
->type
!= REDIS_LIST
) {
3474 addReply(c
,shared
.wrongtypeerr
);
3477 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",listLength(l
)));
3482 static void lindexCommand(redisClient
*c
) {
3484 int index
= atoi(c
->argv
[2]->ptr
);
3486 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3488 addReply(c
,shared
.nullbulk
);
3490 if (o
->type
!= REDIS_LIST
) {
3491 addReply(c
,shared
.wrongtypeerr
);
3493 list
*list
= o
->ptr
;
3496 ln
= listIndex(list
, index
);
3498 addReply(c
,shared
.nullbulk
);
3500 robj
*ele
= listNodeValue(ln
);
3501 addReplyBulkLen(c
,ele
);
3503 addReply(c
,shared
.crlf
);
3509 static void lsetCommand(redisClient
*c
) {
3511 int index
= atoi(c
->argv
[2]->ptr
);
3513 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3515 addReply(c
,shared
.nokeyerr
);
3517 if (o
->type
!= REDIS_LIST
) {
3518 addReply(c
,shared
.wrongtypeerr
);
3520 list
*list
= o
->ptr
;
3523 ln
= listIndex(list
, index
);
3525 addReply(c
,shared
.outofrangeerr
);
3527 robj
*ele
= listNodeValue(ln
);
3530 listNodeValue(ln
) = c
->argv
[3];
3531 incrRefCount(c
->argv
[3]);
3532 addReply(c
,shared
.ok
);
3539 static void popGenericCommand(redisClient
*c
, int where
) {
3542 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3544 addReply(c
,shared
.nullbulk
);
3546 if (o
->type
!= REDIS_LIST
) {
3547 addReply(c
,shared
.wrongtypeerr
);
3549 list
*list
= o
->ptr
;
3552 if (where
== REDIS_HEAD
)
3553 ln
= listFirst(list
);
3555 ln
= listLast(list
);
3558 addReply(c
,shared
.nullbulk
);
3560 robj
*ele
= listNodeValue(ln
);
3561 addReplyBulkLen(c
,ele
);
3563 addReply(c
,shared
.crlf
);
3564 listDelNode(list
,ln
);
3571 static void lpopCommand(redisClient
*c
) {
3572 popGenericCommand(c
,REDIS_HEAD
);
3575 static void rpopCommand(redisClient
*c
) {
3576 popGenericCommand(c
,REDIS_TAIL
);
3579 static void lrangeCommand(redisClient
*c
) {
3581 int start
= atoi(c
->argv
[2]->ptr
);
3582 int end
= atoi(c
->argv
[3]->ptr
);
3584 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3586 addReply(c
,shared
.nullmultibulk
);
3588 if (o
->type
!= REDIS_LIST
) {
3589 addReply(c
,shared
.wrongtypeerr
);
3591 list
*list
= o
->ptr
;
3593 int llen
= listLength(list
);
3597 /* convert negative indexes */
3598 if (start
< 0) start
= llen
+start
;
3599 if (end
< 0) end
= llen
+end
;
3600 if (start
< 0) start
= 0;
3601 if (end
< 0) end
= 0;
3603 /* indexes sanity checks */
3604 if (start
> end
|| start
>= llen
) {
3605 /* Out of range start or start > end result in empty list */
3606 addReply(c
,shared
.emptymultibulk
);
3609 if (end
>= llen
) end
= llen
-1;
3610 rangelen
= (end
-start
)+1;
3612 /* Return the result in form of a multi-bulk reply */
3613 ln
= listIndex(list
, start
);
3614 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",rangelen
));
3615 for (j
= 0; j
< rangelen
; j
++) {
3616 ele
= listNodeValue(ln
);
3617 addReplyBulkLen(c
,ele
);
3619 addReply(c
,shared
.crlf
);
3626 static void ltrimCommand(redisClient
*c
) {
3628 int start
= atoi(c
->argv
[2]->ptr
);
3629 int end
= atoi(c
->argv
[3]->ptr
);
3631 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3633 addReply(c
,shared
.nokeyerr
);
3635 if (o
->type
!= REDIS_LIST
) {
3636 addReply(c
,shared
.wrongtypeerr
);
3638 list
*list
= o
->ptr
;
3640 int llen
= listLength(list
);
3641 int j
, ltrim
, rtrim
;
3643 /* convert negative indexes */
3644 if (start
< 0) start
= llen
+start
;
3645 if (end
< 0) end
= llen
+end
;
3646 if (start
< 0) start
= 0;
3647 if (end
< 0) end
= 0;
3649 /* indexes sanity checks */
3650 if (start
> end
|| start
>= llen
) {
3651 /* Out of range start or start > end result in empty list */
3655 if (end
>= llen
) end
= llen
-1;
3660 /* Remove list elements to perform the trim */
3661 for (j
= 0; j
< ltrim
; j
++) {
3662 ln
= listFirst(list
);
3663 listDelNode(list
,ln
);
3665 for (j
= 0; j
< rtrim
; j
++) {
3666 ln
= listLast(list
);
3667 listDelNode(list
,ln
);
3670 addReply(c
,shared
.ok
);
3675 static void lremCommand(redisClient
*c
) {
3678 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3680 addReply(c
,shared
.czero
);
3682 if (o
->type
!= REDIS_LIST
) {
3683 addReply(c
,shared
.wrongtypeerr
);
3685 list
*list
= o
->ptr
;
3686 listNode
*ln
, *next
;
3687 int toremove
= atoi(c
->argv
[2]->ptr
);
3692 toremove
= -toremove
;
3695 ln
= fromtail
? list
->tail
: list
->head
;
3697 robj
*ele
= listNodeValue(ln
);
3699 next
= fromtail
? ln
->prev
: ln
->next
;
3700 if (compareStringObjects(ele
,c
->argv
[3]) == 0) {
3701 listDelNode(list
,ln
);
3704 if (toremove
&& removed
== toremove
) break;
3708 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",removed
));
3713 /* This is the semantic of this command:
3714 * RPOPLPUSH srclist dstlist:
3715 * IF LLEN(srclist) > 0
3716 * element = RPOP srclist
3717 * LPUSH dstlist element
3724 * The idea is to be able to get an element from a list in a reliable way
3725 * since the element is not just returned but pushed against another list
3726 * as well. This command was originally proposed by Ezra Zygmuntowicz.
3728 static void rpoplpushcommand(redisClient
*c
) {
3731 sobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3733 addReply(c
,shared
.nullbulk
);
3735 if (sobj
->type
!= REDIS_LIST
) {
3736 addReply(c
,shared
.wrongtypeerr
);
3738 list
*srclist
= sobj
->ptr
;
3739 listNode
*ln
= listLast(srclist
);
3742 addReply(c
,shared
.nullbulk
);
3744 robj
*dobj
= lookupKeyWrite(c
->db
,c
->argv
[2]);
3745 robj
*ele
= listNodeValue(ln
);
3750 /* Create the list if the key does not exist */
3751 dobj
= createListObject();
3752 dictAdd(c
->db
->dict
,c
->argv
[2],dobj
);
3753 incrRefCount(c
->argv
[2]);
3754 } else if (dobj
->type
!= REDIS_LIST
) {
3755 addReply(c
,shared
.wrongtypeerr
);
3758 /* Add the element to the target list */
3759 dstlist
= dobj
->ptr
;
3760 listAddNodeHead(dstlist
,ele
);
3763 /* Send the element to the client as reply as well */
3764 addReplyBulkLen(c
,ele
);
3766 addReply(c
,shared
.crlf
);
3768 /* Finally remove the element from the source list */
3769 listDelNode(srclist
,ln
);
3777 /* ==================================== Sets ================================ */
3779 static void saddCommand(redisClient
*c
) {
3782 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3784 set
= createSetObject();
3785 dictAdd(c
->db
->dict
,c
->argv
[1],set
);
3786 incrRefCount(c
->argv
[1]);
3788 if (set
->type
!= REDIS_SET
) {
3789 addReply(c
,shared
.wrongtypeerr
);
3793 if (dictAdd(set
->ptr
,c
->argv
[2],NULL
) == DICT_OK
) {
3794 incrRefCount(c
->argv
[2]);
3796 addReply(c
,shared
.cone
);
3798 addReply(c
,shared
.czero
);
3802 static void sremCommand(redisClient
*c
) {
3805 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3807 addReply(c
,shared
.czero
);
3809 if (set
->type
!= REDIS_SET
) {
3810 addReply(c
,shared
.wrongtypeerr
);
3813 if (dictDelete(set
->ptr
,c
->argv
[2]) == DICT_OK
) {
3815 if (htNeedsResize(set
->ptr
)) dictResize(set
->ptr
);
3816 addReply(c
,shared
.cone
);
3818 addReply(c
,shared
.czero
);
3823 static void smoveCommand(redisClient
*c
) {
3824 robj
*srcset
, *dstset
;
3826 srcset
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3827 dstset
= lookupKeyWrite(c
->db
,c
->argv
[2]);
3829 /* If the source key does not exist return 0, if it's of the wrong type
3831 if (srcset
== NULL
|| srcset
->type
!= REDIS_SET
) {
3832 addReply(c
, srcset
? shared
.wrongtypeerr
: shared
.czero
);
3835 /* Error if the destination key is not a set as well */
3836 if (dstset
&& dstset
->type
!= REDIS_SET
) {
3837 addReply(c
,shared
.wrongtypeerr
);
3840 /* Remove the element from the source set */
3841 if (dictDelete(srcset
->ptr
,c
->argv
[3]) == DICT_ERR
) {
3842 /* Key not found in the src set! return zero */
3843 addReply(c
,shared
.czero
);
3847 /* Add the element to the destination set */
3849 dstset
= createSetObject();
3850 dictAdd(c
->db
->dict
,c
->argv
[2],dstset
);
3851 incrRefCount(c
->argv
[2]);
3853 if (dictAdd(dstset
->ptr
,c
->argv
[3],NULL
) == DICT_OK
)
3854 incrRefCount(c
->argv
[3]);
3855 addReply(c
,shared
.cone
);
3858 static void sismemberCommand(redisClient
*c
) {
3861 set
= lookupKeyRead(c
->db
,c
->argv
[1]);
3863 addReply(c
,shared
.czero
);
3865 if (set
->type
!= REDIS_SET
) {
3866 addReply(c
,shared
.wrongtypeerr
);
3869 if (dictFind(set
->ptr
,c
->argv
[2]))
3870 addReply(c
,shared
.cone
);
3872 addReply(c
,shared
.czero
);
3876 static void scardCommand(redisClient
*c
) {
3880 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3882 addReply(c
,shared
.czero
);
3885 if (o
->type
!= REDIS_SET
) {
3886 addReply(c
,shared
.wrongtypeerr
);
3889 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",
3895 static void spopCommand(redisClient
*c
) {
3899 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3901 addReply(c
,shared
.nullbulk
);
3903 if (set
->type
!= REDIS_SET
) {
3904 addReply(c
,shared
.wrongtypeerr
);
3907 de
= dictGetRandomKey(set
->ptr
);
3909 addReply(c
,shared
.nullbulk
);
3911 robj
*ele
= dictGetEntryKey(de
);
3913 addReplyBulkLen(c
,ele
);
3915 addReply(c
,shared
.crlf
);
3916 dictDelete(set
->ptr
,ele
);
3917 if (htNeedsResize(set
->ptr
)) dictResize(set
->ptr
);
3923 static void srandmemberCommand(redisClient
*c
) {
3927 set
= lookupKeyRead(c
->db
,c
->argv
[1]);
3929 addReply(c
,shared
.nullbulk
);
3931 if (set
->type
!= REDIS_SET
) {
3932 addReply(c
,shared
.wrongtypeerr
);
3935 de
= dictGetRandomKey(set
->ptr
);
3937 addReply(c
,shared
.nullbulk
);
3939 robj
*ele
= dictGetEntryKey(de
);
3941 addReplyBulkLen(c
,ele
);
3943 addReply(c
,shared
.crlf
);
3948 static int qsortCompareSetsByCardinality(const void *s1
, const void *s2
) {
3949 dict
**d1
= (void*) s1
, **d2
= (void*) s2
;
3951 return dictSize(*d1
)-dictSize(*d2
);
3954 static void sinterGenericCommand(redisClient
*c
, robj
**setskeys
, unsigned long setsnum
, robj
*dstkey
) {
3955 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
3958 robj
*lenobj
= NULL
, *dstset
= NULL
;
3959 unsigned long j
, cardinality
= 0;
3961 for (j
= 0; j
< setsnum
; j
++) {
3965 lookupKeyWrite(c
->db
,setskeys
[j
]) :
3966 lookupKeyRead(c
->db
,setskeys
[j
]);
3970 deleteKey(c
->db
,dstkey
);
3971 addReply(c
,shared
.czero
);
3973 addReply(c
,shared
.nullmultibulk
);
3977 if (setobj
->type
!= REDIS_SET
) {
3979 addReply(c
,shared
.wrongtypeerr
);
3982 dv
[j
] = setobj
->ptr
;
3984 /* Sort sets from the smallest to largest, this will improve our
3985 * algorithm's performace */
3986 qsort(dv
,setsnum
,sizeof(dict
*),qsortCompareSetsByCardinality
);
3988 /* The first thing we should output is the total number of elements...
3989 * since this is a multi-bulk write, but at this stage we don't know
3990 * the intersection set size, so we use a trick, append an empty object
3991 * to the output list and save the pointer to later modify it with the
3994 lenobj
= createObject(REDIS_STRING
,NULL
);
3996 decrRefCount(lenobj
);
3998 /* If we have a target key where to store the resulting set
3999 * create this key with an empty set inside */
4000 dstset
= createSetObject();
4003 /* Iterate all the elements of the first (smallest) set, and test
4004 * the element against all the other sets, if at least one set does
4005 * not include the element it is discarded */
4006 di
= dictGetIterator(dv
[0]);
4008 while((de
= dictNext(di
)) != NULL
) {
4011 for (j
= 1; j
< setsnum
; j
++)
4012 if (dictFind(dv
[j
],dictGetEntryKey(de
)) == NULL
) break;
4014 continue; /* at least one set does not contain the member */
4015 ele
= dictGetEntryKey(de
);
4017 addReplyBulkLen(c
,ele
);
4019 addReply(c
,shared
.crlf
);
4022 dictAdd(dstset
->ptr
,ele
,NULL
);
4026 dictReleaseIterator(di
);
4029 /* Store the resulting set into the target */
4030 deleteKey(c
->db
,dstkey
);
4031 dictAdd(c
->db
->dict
,dstkey
,dstset
);
4032 incrRefCount(dstkey
);
4036 lenobj
->ptr
= sdscatprintf(sdsempty(),"*%lu\r\n",cardinality
);
4038 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",
4039 dictSize((dict
*)dstset
->ptr
)));
4045 static void sinterCommand(redisClient
*c
) {
4046 sinterGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
);
4049 static void sinterstoreCommand(redisClient
*c
) {
4050 sinterGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1]);
4053 #define REDIS_OP_UNION 0
4054 #define REDIS_OP_DIFF 1
4056 static void sunionDiffGenericCommand(redisClient
*c
, robj
**setskeys
, int setsnum
, robj
*dstkey
, int op
) {
4057 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
4060 robj
*dstset
= NULL
;
4061 int j
, cardinality
= 0;
4063 for (j
= 0; j
< setsnum
; j
++) {
4067 lookupKeyWrite(c
->db
,setskeys
[j
]) :
4068 lookupKeyRead(c
->db
,setskeys
[j
]);
4073 if (setobj
->type
!= REDIS_SET
) {
4075 addReply(c
,shared
.wrongtypeerr
);
4078 dv
[j
] = setobj
->ptr
;
4081 /* We need a temp set object to store our union. If the dstkey
4082 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
4083 * this set object will be the resulting object to set into the target key*/
4084 dstset
= createSetObject();
4086 /* Iterate all the elements of all the sets, add every element a single
4087 * time to the result set */
4088 for (j
= 0; j
< setsnum
; j
++) {
4089 if (op
== REDIS_OP_DIFF
&& j
== 0 && !dv
[j
]) break; /* result set is empty */
4090 if (!dv
[j
]) continue; /* non existing keys are like empty sets */
4092 di
= dictGetIterator(dv
[j
]);
4094 while((de
= dictNext(di
)) != NULL
) {
4097 /* dictAdd will not add the same element multiple times */
4098 ele
= dictGetEntryKey(de
);
4099 if (op
== REDIS_OP_UNION
|| j
== 0) {
4100 if (dictAdd(dstset
->ptr
,ele
,NULL
) == DICT_OK
) {
4104 } else if (op
== REDIS_OP_DIFF
) {
4105 if (dictDelete(dstset
->ptr
,ele
) == DICT_OK
) {
4110 dictReleaseIterator(di
);
4112 if (op
== REDIS_OP_DIFF
&& cardinality
== 0) break; /* result set is empty */
4115 /* Output the content of the resulting set, if not in STORE mode */
4117 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",cardinality
));
4118 di
= dictGetIterator(dstset
->ptr
);
4119 while((de
= dictNext(di
)) != NULL
) {
4122 ele
= dictGetEntryKey(de
);
4123 addReplyBulkLen(c
,ele
);
4125 addReply(c
,shared
.crlf
);
4127 dictReleaseIterator(di
);
4129 /* If we have a target key where to store the resulting set
4130 * create this key with the result set inside */
4131 deleteKey(c
->db
,dstkey
);
4132 dictAdd(c
->db
->dict
,dstkey
,dstset
);
4133 incrRefCount(dstkey
);
4138 decrRefCount(dstset
);
4140 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",
4141 dictSize((dict
*)dstset
->ptr
)));
4147 static void sunionCommand(redisClient
*c
) {
4148 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_UNION
);
4151 static void sunionstoreCommand(redisClient
*c
) {
4152 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_UNION
);
4155 static void sdiffCommand(redisClient
*c
) {
4156 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_DIFF
);
4159 static void sdiffstoreCommand(redisClient
*c
) {
4160 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_DIFF
);
4163 /* ==================================== ZSets =============================== */
4165 /* ZSETs are ordered sets using two data structures to hold the same elements
4166 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
4169 * The elements are added to an hash table mapping Redis objects to scores.
4170 * At the same time the elements are added to a skip list mapping scores
4171 * to Redis objects (so objects are sorted by scores in this "view"). */
4173 /* This skiplist implementation is almost a C translation of the original
4174 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
4175 * Alternative to Balanced Trees", modified in three ways:
4176 * a) this implementation allows for repeated values.
4177 * b) the comparison is not just by key (our 'score') but by satellite data.
4178 * c) there is a back pointer, so it's a doubly linked list with the back
4179 * pointers being only at "level 1". This allows to traverse the list
4180 * from tail to head, useful for ZREVRANGE. */
4182 static zskiplistNode
*zslCreateNode(int level
, double score
, robj
*obj
) {
4183 zskiplistNode
*zn
= zmalloc(sizeof(*zn
));
4185 zn
->forward
= zmalloc(sizeof(zskiplistNode
*) * level
);
4191 static zskiplist
*zslCreate(void) {
4195 zsl
= zmalloc(sizeof(*zsl
));
4198 zsl
->header
= zslCreateNode(ZSKIPLIST_MAXLEVEL
,0,NULL
);
4199 for (j
= 0; j
< ZSKIPLIST_MAXLEVEL
; j
++)
4200 zsl
->header
->forward
[j
] = NULL
;
4201 zsl
->header
->backward
= NULL
;
4206 static void zslFreeNode(zskiplistNode
*node
) {
4207 decrRefCount(node
->obj
);
4208 zfree(node
->forward
);
4212 static void zslFree(zskiplist
*zsl
) {
4213 zskiplistNode
*node
= zsl
->header
->forward
[0], *next
;
4215 zfree(zsl
->header
->forward
);
4218 next
= node
->forward
[0];
4225 static int zslRandomLevel(void) {
4227 while ((random()&0xFFFF) < (ZSKIPLIST_P
* 0xFFFF))
4232 static void zslInsert(zskiplist
*zsl
, double score
, robj
*obj
) {
4233 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
4237 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4238 while (x
->forward
[i
] &&
4239 (x
->forward
[i
]->score
< score
||
4240 (x
->forward
[i
]->score
== score
&&
4241 compareStringObjects(x
->forward
[i
]->obj
,obj
) < 0)))
4245 /* we assume the key is not already inside, since we allow duplicated
4246 * scores, and the re-insertion of score and redis object should never
4247 * happpen since the caller of zslInsert() should test in the hash table
4248 * if the element is already inside or not. */
4249 level
= zslRandomLevel();
4250 if (level
> zsl
->level
) {
4251 for (i
= zsl
->level
; i
< level
; i
++)
4252 update
[i
] = zsl
->header
;
4255 x
= zslCreateNode(level
,score
,obj
);
4256 for (i
= 0; i
< level
; i
++) {
4257 x
->forward
[i
] = update
[i
]->forward
[i
];
4258 update
[i
]->forward
[i
] = x
;
4260 x
->backward
= (update
[0] == zsl
->header
) ? NULL
: update
[0];
4262 x
->forward
[0]->backward
= x
;
4268 /* Delete an element with matching score/object from the skiplist. */
4269 static int zslDelete(zskiplist
*zsl
, double score
, robj
*obj
) {
4270 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
4274 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4275 while (x
->forward
[i
] &&
4276 (x
->forward
[i
]->score
< score
||
4277 (x
->forward
[i
]->score
== score
&&
4278 compareStringObjects(x
->forward
[i
]->obj
,obj
) < 0)))
4282 /* We may have multiple elements with the same score, what we need
4283 * is to find the element with both the right score and object. */
4285 if (x
&& score
== x
->score
&& compareStringObjects(x
->obj
,obj
) == 0) {
4286 for (i
= 0; i
< zsl
->level
; i
++) {
4287 if (update
[i
]->forward
[i
] != x
) break;
4288 update
[i
]->forward
[i
] = x
->forward
[i
];
4290 if (x
->forward
[0]) {
4291 x
->forward
[0]->backward
= (x
->backward
== zsl
->header
) ?
4294 zsl
->tail
= x
->backward
;
4297 while(zsl
->level
> 1 && zsl
->header
->forward
[zsl
->level
-1] == NULL
)
4302 return 0; /* not found */
4304 return 0; /* not found */
4307 /* Delete all the elements with score between min and max from the skiplist.
4308 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
4309 * Note that this function takes the reference to the hash table view of the
4310 * sorted set, in order to remove the elements from the hash table too. */
4311 static unsigned long zslDeleteRange(zskiplist
*zsl
, double min
, double max
, dict
*dict
) {
4312 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
4313 unsigned long removed
= 0;
4317 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4318 while (x
->forward
[i
] && x
->forward
[i
]->score
< min
)
4322 /* We may have multiple elements with the same score, what we need
4323 * is to find the element with both the right score and object. */
4325 while (x
&& x
->score
<= max
) {
4326 zskiplistNode
*next
;
4328 for (i
= 0; i
< zsl
->level
; i
++) {
4329 if (update
[i
]->forward
[i
] != x
) break;
4330 update
[i
]->forward
[i
] = x
->forward
[i
];
4332 if (x
->forward
[0]) {
4333 x
->forward
[0]->backward
= (x
->backward
== zsl
->header
) ?
4336 zsl
->tail
= x
->backward
;
4338 next
= x
->forward
[0];
4339 dictDelete(dict
,x
->obj
);
4341 while(zsl
->level
> 1 && zsl
->header
->forward
[zsl
->level
-1] == NULL
)
4347 return removed
; /* not found */
4350 /* Find the first node having a score equal or greater than the specified one.
4351 * Returns NULL if there is no match. */
4352 static zskiplistNode
*zslFirstWithScore(zskiplist
*zsl
, double score
) {
4357 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4358 while (x
->forward
[i
] && x
->forward
[i
]->score
< score
)
4361 /* We may have multiple elements with the same score, what we need
4362 * is to find the element with both the right score and object. */
4363 return x
->forward
[0];
4366 /* The actual Z-commands implementations */
4368 /* This generic command implements both ZADD and ZINCRBY.
4369 * scoreval is the score if the operation is a ZADD (doincrement == 0) or
4370 * the increment if the operation is a ZINCRBY (doincrement == 1). */
4371 static void zaddGenericCommand(redisClient
*c
, robj
*key
, robj
*ele
, double scoreval
, int doincrement
) {
4376 zsetobj
= lookupKeyWrite(c
->db
,key
);
4377 if (zsetobj
== NULL
) {
4378 zsetobj
= createZsetObject();
4379 dictAdd(c
->db
->dict
,key
,zsetobj
);
4382 if (zsetobj
->type
!= REDIS_ZSET
) {
4383 addReply(c
,shared
.wrongtypeerr
);
4389 /* Ok now since we implement both ZADD and ZINCRBY here the code
4390 * needs to handle the two different conditions. It's all about setting
4391 * '*score', that is, the new score to set, to the right value. */
4392 score
= zmalloc(sizeof(double));
4396 /* Read the old score. If the element was not present starts from 0 */
4397 de
= dictFind(zs
->dict
,ele
);
4399 double *oldscore
= dictGetEntryVal(de
);
4400 *score
= *oldscore
+ scoreval
;
4408 /* What follows is a simple remove and re-insert operation that is common
4409 * to both ZADD and ZINCRBY... */
4410 if (dictAdd(zs
->dict
,ele
,score
) == DICT_OK
) {
4411 /* case 1: New element */
4412 incrRefCount(ele
); /* added to hash */
4413 zslInsert(zs
->zsl
,*score
,ele
);
4414 incrRefCount(ele
); /* added to skiplist */
4417 addReplyDouble(c
,*score
);
4419 addReply(c
,shared
.cone
);
4424 /* case 2: Score update operation */
4425 de
= dictFind(zs
->dict
,ele
);
4426 redisAssert(de
!= NULL
);
4427 oldscore
= dictGetEntryVal(de
);
4428 if (*score
!= *oldscore
) {
4431 /* Remove and insert the element in the skip list with new score */
4432 deleted
= zslDelete(zs
->zsl
,*oldscore
,ele
);
4433 redisAssert(deleted
!= 0);
4434 zslInsert(zs
->zsl
,*score
,ele
);
4436 /* Update the score in the hash table */
4437 dictReplace(zs
->dict
,ele
,score
);
4443 addReplyDouble(c
,*score
);
4445 addReply(c
,shared
.czero
);
4449 static void zaddCommand(redisClient
*c
) {
4452 scoreval
= strtod(c
->argv
[2]->ptr
,NULL
);
4453 zaddGenericCommand(c
,c
->argv
[1],c
->argv
[3],scoreval
,0);
4456 static void zincrbyCommand(redisClient
*c
) {
4459 scoreval
= strtod(c
->argv
[2]->ptr
,NULL
);
4460 zaddGenericCommand(c
,c
->argv
[1],c
->argv
[3],scoreval
,1);
4463 static void zremCommand(redisClient
*c
) {
4467 zsetobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4468 if (zsetobj
== NULL
) {
4469 addReply(c
,shared
.czero
);
4475 if (zsetobj
->type
!= REDIS_ZSET
) {
4476 addReply(c
,shared
.wrongtypeerr
);
4480 de
= dictFind(zs
->dict
,c
->argv
[2]);
4482 addReply(c
,shared
.czero
);
4485 /* Delete from the skiplist */
4486 oldscore
= dictGetEntryVal(de
);
4487 deleted
= zslDelete(zs
->zsl
,*oldscore
,c
->argv
[2]);
4488 redisAssert(deleted
!= 0);
4490 /* Delete from the hash table */
4491 dictDelete(zs
->dict
,c
->argv
[2]);
4492 if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
);
4494 addReply(c
,shared
.cone
);
4498 static void zremrangebyscoreCommand(redisClient
*c
) {
4499 double min
= strtod(c
->argv
[2]->ptr
,NULL
);
4500 double max
= strtod(c
->argv
[3]->ptr
,NULL
);
4504 zsetobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4505 if (zsetobj
== NULL
) {
4506 addReply(c
,shared
.czero
);
4510 if (zsetobj
->type
!= REDIS_ZSET
) {
4511 addReply(c
,shared
.wrongtypeerr
);
4515 deleted
= zslDeleteRange(zs
->zsl
,min
,max
,zs
->dict
);
4516 if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
);
4517 server
.dirty
+= deleted
;
4518 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",deleted
));
4522 static void zrangeGenericCommand(redisClient
*c
, int reverse
) {
4524 int start
= atoi(c
->argv
[2]->ptr
);
4525 int end
= atoi(c
->argv
[3]->ptr
);
4527 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4529 addReply(c
,shared
.nullmultibulk
);
4531 if (o
->type
!= REDIS_ZSET
) {
4532 addReply(c
,shared
.wrongtypeerr
);
4534 zset
*zsetobj
= o
->ptr
;
4535 zskiplist
*zsl
= zsetobj
->zsl
;
4538 int llen
= zsl
->length
;
4542 /* convert negative indexes */
4543 if (start
< 0) start
= llen
+start
;
4544 if (end
< 0) end
= llen
+end
;
4545 if (start
< 0) start
= 0;
4546 if (end
< 0) end
= 0;
4548 /* indexes sanity checks */
4549 if (start
> end
|| start
>= llen
) {
4550 /* Out of range start or start > end result in empty list */
4551 addReply(c
,shared
.emptymultibulk
);
4554 if (end
>= llen
) end
= llen
-1;
4555 rangelen
= (end
-start
)+1;
4557 /* Return the result in form of a multi-bulk reply */
4563 ln
= zsl
->header
->forward
[0];
4565 ln
= ln
->forward
[0];
4568 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",rangelen
));
4569 for (j
= 0; j
< rangelen
; j
++) {
4571 addReplyBulkLen(c
,ele
);
4573 addReply(c
,shared
.crlf
);
4574 ln
= reverse
? ln
->backward
: ln
->forward
[0];
4580 static void zrangeCommand(redisClient
*c
) {
4581 zrangeGenericCommand(c
,0);
4584 static void zrevrangeCommand(redisClient
*c
) {
4585 zrangeGenericCommand(c
,1);
4588 static void zrangebyscoreCommand(redisClient
*c
) {
4590 double min
= strtod(c
->argv
[2]->ptr
,NULL
);
4591 double max
= strtod(c
->argv
[3]->ptr
,NULL
);
4592 int offset
= 0, limit
= -1;
4594 if (c
->argc
!= 4 && c
->argc
!= 7) {
4596 sdsnew("-ERR wrong number of arguments for ZRANGEBYSCORE\r\n"));
4598 } else if (c
->argc
== 7 && strcasecmp(c
->argv
[4]->ptr
,"limit")) {
4599 addReply(c
,shared
.syntaxerr
);
4601 } else if (c
->argc
== 7) {
4602 offset
= atoi(c
->argv
[5]->ptr
);
4603 limit
= atoi(c
->argv
[6]->ptr
);
4604 if (offset
< 0) offset
= 0;
4607 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4609 addReply(c
,shared
.nullmultibulk
);
4611 if (o
->type
!= REDIS_ZSET
) {
4612 addReply(c
,shared
.wrongtypeerr
);
4614 zset
*zsetobj
= o
->ptr
;
4615 zskiplist
*zsl
= zsetobj
->zsl
;
4618 unsigned int rangelen
= 0;
4620 /* Get the first node with the score >= min */
4621 ln
= zslFirstWithScore(zsl
,min
);
4623 /* No element matching the speciifed interval */
4624 addReply(c
,shared
.emptymultibulk
);
4628 /* We don't know in advance how many matching elements there
4629 * are in the list, so we push this object that will represent
4630 * the multi-bulk length in the output buffer, and will "fix"
4632 lenobj
= createObject(REDIS_STRING
,NULL
);
4634 decrRefCount(lenobj
);
4636 while(ln
&& ln
->score
<= max
) {
4639 ln
= ln
->forward
[0];
4642 if (limit
== 0) break;
4644 addReplyBulkLen(c
,ele
);
4646 addReply(c
,shared
.crlf
);
4647 ln
= ln
->forward
[0];
4649 if (limit
> 0) limit
--;
4651 lenobj
->ptr
= sdscatprintf(sdsempty(),"*%d\r\n",rangelen
);
4656 static void zcardCommand(redisClient
*c
) {
4660 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4662 addReply(c
,shared
.czero
);
4665 if (o
->type
!= REDIS_ZSET
) {
4666 addReply(c
,shared
.wrongtypeerr
);
4669 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",zs
->zsl
->length
));
4674 static void zscoreCommand(redisClient
*c
) {
4678 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4680 addReply(c
,shared
.nullbulk
);
4683 if (o
->type
!= REDIS_ZSET
) {
4684 addReply(c
,shared
.wrongtypeerr
);
4689 de
= dictFind(zs
->dict
,c
->argv
[2]);
4691 addReply(c
,shared
.nullbulk
);
4693 double *score
= dictGetEntryVal(de
);
4695 addReplyDouble(c
,*score
);
4701 /* ========================= Non type-specific commands ==================== */
4703 static void flushdbCommand(redisClient
*c
) {
4704 server
.dirty
+= dictSize(c
->db
->dict
);
4705 dictEmpty(c
->db
->dict
);
4706 dictEmpty(c
->db
->expires
);
4707 addReply(c
,shared
.ok
);
4710 static void flushallCommand(redisClient
*c
) {
4711 server
.dirty
+= emptyDb();
4712 addReply(c
,shared
.ok
);
4713 rdbSave(server
.dbfilename
);
4717 static redisSortOperation
*createSortOperation(int type
, robj
*pattern
) {
4718 redisSortOperation
*so
= zmalloc(sizeof(*so
));
4720 so
->pattern
= pattern
;
4724 /* Return the value associated to the key with a name obtained
4725 * substituting the first occurence of '*' in 'pattern' with 'subst' */
4726 static robj
*lookupKeyByPattern(redisDb
*db
, robj
*pattern
, robj
*subst
) {
4730 int prefixlen
, sublen
, postfixlen
;
4731 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
4735 char buf
[REDIS_SORTKEY_MAX
+1];
4738 /* If the pattern is "#" return the substitution object itself in order
4739 * to implement the "SORT ... GET #" feature. */
4740 spat
= pattern
->ptr
;
4741 if (spat
[0] == '#' && spat
[1] == '\0') {
4745 /* The substitution object may be specially encoded. If so we create
4746 * a decoded object on the fly. Otherwise getDecodedObject will just
4747 * increment the ref count, that we'll decrement later. */
4748 subst
= getDecodedObject(subst
);
4751 if (sdslen(spat
)+sdslen(ssub
)-1 > REDIS_SORTKEY_MAX
) return NULL
;
4752 p
= strchr(spat
,'*');
4754 decrRefCount(subst
);
4759 sublen
= sdslen(ssub
);
4760 postfixlen
= sdslen(spat
)-(prefixlen
+1);
4761 memcpy(keyname
.buf
,spat
,prefixlen
);
4762 memcpy(keyname
.buf
+prefixlen
,ssub
,sublen
);
4763 memcpy(keyname
.buf
+prefixlen
+sublen
,p
+1,postfixlen
);
4764 keyname
.buf
[prefixlen
+sublen
+postfixlen
] = '\0';
4765 keyname
.len
= prefixlen
+sublen
+postfixlen
;
4767 initStaticStringObject(keyobj
,((char*)&keyname
)+(sizeof(long)*2))
4768 decrRefCount(subst
);
4770 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
4771 return lookupKeyRead(db
,&keyobj
);
4774 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
4775 * the additional parameter is not standard but a BSD-specific we have to
4776 * pass sorting parameters via the global 'server' structure */
4777 static int sortCompare(const void *s1
, const void *s2
) {
4778 const redisSortObject
*so1
= s1
, *so2
= s2
;
4781 if (!server
.sort_alpha
) {
4782 /* Numeric sorting. Here it's trivial as we precomputed scores */
4783 if (so1
->u
.score
> so2
->u
.score
) {
4785 } else if (so1
->u
.score
< so2
->u
.score
) {
4791 /* Alphanumeric sorting */
4792 if (server
.sort_bypattern
) {
4793 if (!so1
->u
.cmpobj
|| !so2
->u
.cmpobj
) {
4794 /* At least one compare object is NULL */
4795 if (so1
->u
.cmpobj
== so2
->u
.cmpobj
)
4797 else if (so1
->u
.cmpobj
== NULL
)
4802 /* We have both the objects, use strcoll */
4803 cmp
= strcoll(so1
->u
.cmpobj
->ptr
,so2
->u
.cmpobj
->ptr
);
4806 /* Compare elements directly */
4809 dec1
= getDecodedObject(so1
->obj
);
4810 dec2
= getDecodedObject(so2
->obj
);
4811 cmp
= strcoll(dec1
->ptr
,dec2
->ptr
);
4816 return server
.sort_desc
? -cmp
: cmp
;
4819 /* The SORT command is the most complex command in Redis. Warning: this code
4820 * is optimized for speed and a bit less for readability */
4821 static void sortCommand(redisClient
*c
) {
4824 int desc
= 0, alpha
= 0;
4825 int limit_start
= 0, limit_count
= -1, start
, end
;
4826 int j
, dontsort
= 0, vectorlen
;
4827 int getop
= 0; /* GET operation counter */
4828 robj
*sortval
, *sortby
= NULL
, *storekey
= NULL
;
4829 redisSortObject
*vector
; /* Resulting vector to sort */
4831 /* Lookup the key to sort. It must be of the right types */
4832 sortval
= lookupKeyRead(c
->db
,c
->argv
[1]);
4833 if (sortval
== NULL
) {
4834 addReply(c
,shared
.nokeyerr
);
4837 if (sortval
->type
!= REDIS_SET
&& sortval
->type
!= REDIS_LIST
&&
4838 sortval
->type
!= REDIS_ZSET
)
4840 addReply(c
,shared
.wrongtypeerr
);
4844 /* Create a list of operations to perform for every sorted element.
4845 * Operations can be GET/DEL/INCR/DECR */
4846 operations
= listCreate();
4847 listSetFreeMethod(operations
,zfree
);
4850 /* Now we need to protect sortval incrementing its count, in the future
4851 * SORT may have options able to overwrite/delete keys during the sorting
4852 * and the sorted key itself may get destroied */
4853 incrRefCount(sortval
);
4855 /* The SORT command has an SQL-alike syntax, parse it */
4856 while(j
< c
->argc
) {
4857 int leftargs
= c
->argc
-j
-1;
4858 if (!strcasecmp(c
->argv
[j
]->ptr
,"asc")) {
4860 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"desc")) {
4862 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"alpha")) {
4864 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"limit") && leftargs
>= 2) {
4865 limit_start
= atoi(c
->argv
[j
+1]->ptr
);
4866 limit_count
= atoi(c
->argv
[j
+2]->ptr
);
4868 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"store") && leftargs
>= 1) {
4869 storekey
= c
->argv
[j
+1];
4871 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"by") && leftargs
>= 1) {
4872 sortby
= c
->argv
[j
+1];
4873 /* If the BY pattern does not contain '*', i.e. it is constant,
4874 * we don't need to sort nor to lookup the weight keys. */
4875 if (strchr(c
->argv
[j
+1]->ptr
,'*') == NULL
) dontsort
= 1;
4877 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"get") && leftargs
>= 1) {
4878 listAddNodeTail(operations
,createSortOperation(
4879 REDIS_SORT_GET
,c
->argv
[j
+1]));
4883 decrRefCount(sortval
);
4884 listRelease(operations
);
4885 addReply(c
,shared
.syntaxerr
);
4891 /* Load the sorting vector with all the objects to sort */
4892 switch(sortval
->type
) {
4893 case REDIS_LIST
: vectorlen
= listLength((list
*)sortval
->ptr
); break;
4894 case REDIS_SET
: vectorlen
= dictSize((dict
*)sortval
->ptr
); break;
4895 case REDIS_ZSET
: vectorlen
= dictSize(((zset
*)sortval
->ptr
)->dict
); break;
4896 default: vectorlen
= 0; redisAssert(0); /* Avoid GCC warning */
4898 vector
= zmalloc(sizeof(redisSortObject
)*vectorlen
);
4901 if (sortval
->type
== REDIS_LIST
) {
4902 list
*list
= sortval
->ptr
;
4906 while((ln
= listYield(list
))) {
4907 robj
*ele
= ln
->value
;
4908 vector
[j
].obj
= ele
;
4909 vector
[j
].u
.score
= 0;
4910 vector
[j
].u
.cmpobj
= NULL
;
4918 if (sortval
->type
== REDIS_SET
) {
4921 zset
*zs
= sortval
->ptr
;
4925 di
= dictGetIterator(set
);
4926 while((setele
= dictNext(di
)) != NULL
) {
4927 vector
[j
].obj
= dictGetEntryKey(setele
);
4928 vector
[j
].u
.score
= 0;
4929 vector
[j
].u
.cmpobj
= NULL
;
4932 dictReleaseIterator(di
);
4934 redisAssert(j
== vectorlen
);
4936 /* Now it's time to load the right scores in the sorting vector */
4937 if (dontsort
== 0) {
4938 for (j
= 0; j
< vectorlen
; j
++) {
4942 byval
= lookupKeyByPattern(c
->db
,sortby
,vector
[j
].obj
);
4943 if (!byval
|| byval
->type
!= REDIS_STRING
) continue;
4945 vector
[j
].u
.cmpobj
= getDecodedObject(byval
);
4947 if (byval
->encoding
== REDIS_ENCODING_RAW
) {
4948 vector
[j
].u
.score
= strtod(byval
->ptr
,NULL
);
4950 /* Don't need to decode the object if it's
4951 * integer-encoded (the only encoding supported) so
4952 * far. We can just cast it */
4953 if (byval
->encoding
== REDIS_ENCODING_INT
) {
4954 vector
[j
].u
.score
= (long)byval
->ptr
;
4956 redisAssert(1 != 1);
4961 if (vector
[j
].obj
->encoding
== REDIS_ENCODING_RAW
)
4962 vector
[j
].u
.score
= strtod(vector
[j
].obj
->ptr
,NULL
);
4964 if (vector
[j
].obj
->encoding
== REDIS_ENCODING_INT
)
4965 vector
[j
].u
.score
= (long) vector
[j
].obj
->ptr
;
4967 redisAssert(1 != 1);
4974 /* We are ready to sort the vector... perform a bit of sanity check
4975 * on the LIMIT option too. We'll use a partial version of quicksort. */
4976 start
= (limit_start
< 0) ? 0 : limit_start
;
4977 end
= (limit_count
< 0) ? vectorlen
-1 : start
+limit_count
-1;
4978 if (start
>= vectorlen
) {
4979 start
= vectorlen
-1;
4982 if (end
>= vectorlen
) end
= vectorlen
-1;
4984 if (dontsort
== 0) {
4985 server
.sort_desc
= desc
;
4986 server
.sort_alpha
= alpha
;
4987 server
.sort_bypattern
= sortby
? 1 : 0;
4988 if (sortby
&& (start
!= 0 || end
!= vectorlen
-1))
4989 pqsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
, start
,end
);
4991 qsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
);
4994 /* Send command output to the output buffer, performing the specified
4995 * GET/DEL/INCR/DECR operations if any. */
4996 outputlen
= getop
? getop
*(end
-start
+1) : end
-start
+1;
4997 if (storekey
== NULL
) {
4998 /* STORE option not specified, sent the sorting result to client */
4999 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",outputlen
));
5000 for (j
= start
; j
<= end
; j
++) {
5003 addReplyBulkLen(c
,vector
[j
].obj
);
5004 addReply(c
,vector
[j
].obj
);
5005 addReply(c
,shared
.crlf
);
5007 listRewind(operations
);
5008 while((ln
= listYield(operations
))) {
5009 redisSortOperation
*sop
= ln
->value
;
5010 robj
*val
= lookupKeyByPattern(c
->db
,sop
->pattern
,
5013 if (sop
->type
== REDIS_SORT_GET
) {
5014 if (!val
|| val
->type
!= REDIS_STRING
) {
5015 addReply(c
,shared
.nullbulk
);
5017 addReplyBulkLen(c
,val
);
5019 addReply(c
,shared
.crlf
);
5022 redisAssert(sop
->type
== REDIS_SORT_GET
); /* always fails */
5027 robj
*listObject
= createListObject();
5028 list
*listPtr
= (list
*) listObject
->ptr
;
5030 /* STORE option specified, set the sorting result as a List object */
5031 for (j
= start
; j
<= end
; j
++) {
5034 listAddNodeTail(listPtr
,vector
[j
].obj
);
5035 incrRefCount(vector
[j
].obj
);
5037 listRewind(operations
);
5038 while((ln
= listYield(operations
))) {
5039 redisSortOperation
*sop
= ln
->value
;
5040 robj
*val
= lookupKeyByPattern(c
->db
,sop
->pattern
,
5043 if (sop
->type
== REDIS_SORT_GET
) {
5044 if (!val
|| val
->type
!= REDIS_STRING
) {
5045 listAddNodeTail(listPtr
,createStringObject("",0));
5047 listAddNodeTail(listPtr
,val
);
5051 redisAssert(sop
->type
== REDIS_SORT_GET
); /* always fails */
5055 if (dictReplace(c
->db
->dict
,storekey
,listObject
)) {
5056 incrRefCount(storekey
);
5058 /* Note: we add 1 because the DB is dirty anyway since even if the
5059 * SORT result is empty a new key is set and maybe the old content
5061 server
.dirty
+= 1+outputlen
;
5062 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",outputlen
));
5066 decrRefCount(sortval
);
5067 listRelease(operations
);
5068 for (j
= 0; j
< vectorlen
; j
++) {
5069 if (sortby
&& alpha
&& vector
[j
].u
.cmpobj
)
5070 decrRefCount(vector
[j
].u
.cmpobj
);
5075 /* Create the string returned by the INFO command. This is decoupled
5076 * by the INFO command itself as we need to report the same information
5077 * on memory corruption problems. */
5078 static sds
genRedisInfoString(void) {
5080 time_t uptime
= time(NULL
)-server
.stat_starttime
;
5083 info
= sdscatprintf(sdsempty(),
5084 "redis_version:%s\r\n"
5086 "multiplexing_api:%s\r\n"
5087 "uptime_in_seconds:%ld\r\n"
5088 "uptime_in_days:%ld\r\n"
5089 "connected_clients:%d\r\n"
5090 "connected_slaves:%d\r\n"
5091 "used_memory:%zu\r\n"
5092 "changes_since_last_save:%lld\r\n"
5093 "bgsave_in_progress:%d\r\n"
5094 "last_save_time:%ld\r\n"
5095 "bgrewriteaof_in_progress:%d\r\n"
5096 "total_connections_received:%lld\r\n"
5097 "total_commands_processed:%lld\r\n"
5100 (sizeof(long) == 8) ? "64" : "32",
5104 listLength(server
.clients
)-listLength(server
.slaves
),
5105 listLength(server
.slaves
),
5108 server
.bgsavechildpid
!= -1,
5110 server
.bgrewritechildpid
!= -1,
5111 server
.stat_numconnections
,
5112 server
.stat_numcommands
,
5113 server
.masterhost
== NULL
? "master" : "slave"
5115 if (server
.masterhost
) {
5116 info
= sdscatprintf(info
,
5117 "master_host:%s\r\n"
5118 "master_port:%d\r\n"
5119 "master_link_status:%s\r\n"
5120 "master_last_io_seconds_ago:%d\r\n"
5123 (server
.replstate
== REDIS_REPL_CONNECTED
) ?
5125 server
.master
? ((int)(time(NULL
)-server
.master
->lastinteraction
)) : -1
5128 for (j
= 0; j
< server
.dbnum
; j
++) {
5129 long long keys
, vkeys
;
5131 keys
= dictSize(server
.db
[j
].dict
);
5132 vkeys
= dictSize(server
.db
[j
].expires
);
5133 if (keys
|| vkeys
) {
5134 info
= sdscatprintf(info
, "db%d:keys=%lld,expires=%lld\r\n",
5141 static void infoCommand(redisClient
*c
) {
5142 sds info
= genRedisInfoString();
5143 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n",
5144 (unsigned long)sdslen(info
)));
5145 addReplySds(c
,info
);
5146 addReply(c
,shared
.crlf
);
5149 static void monitorCommand(redisClient
*c
) {
5150 /* ignore MONITOR if aleady slave or in monitor mode */
5151 if (c
->flags
& REDIS_SLAVE
) return;
5153 c
->flags
|= (REDIS_SLAVE
|REDIS_MONITOR
);
5155 listAddNodeTail(server
.monitors
,c
);
5156 addReply(c
,shared
.ok
);
5159 /* ================================= Expire ================================= */
5160 static int removeExpire(redisDb
*db
, robj
*key
) {
5161 if (dictDelete(db
->expires
,key
) == DICT_OK
) {
5168 static int setExpire(redisDb
*db
, robj
*key
, time_t when
) {
5169 if (dictAdd(db
->expires
,key
,(void*)when
) == DICT_ERR
) {
5177 /* Return the expire time of the specified key, or -1 if no expire
5178 * is associated with this key (i.e. the key is non volatile) */
5179 static time_t getExpire(redisDb
*db
, robj
*key
) {
5182 /* No expire? return ASAP */
5183 if (dictSize(db
->expires
) == 0 ||
5184 (de
= dictFind(db
->expires
,key
)) == NULL
) return -1;
5186 return (time_t) dictGetEntryVal(de
);
5189 static int expireIfNeeded(redisDb
*db
, robj
*key
) {
5193 /* No expire? return ASAP */
5194 if (dictSize(db
->expires
) == 0 ||
5195 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
5197 /* Lookup the expire */
5198 when
= (time_t) dictGetEntryVal(de
);
5199 if (time(NULL
) <= when
) return 0;
5201 /* Delete the key */
5202 dictDelete(db
->expires
,key
);
5203 return dictDelete(db
->dict
,key
) == DICT_OK
;
5206 static int deleteIfVolatile(redisDb
*db
, robj
*key
) {
5209 /* No expire? return ASAP */
5210 if (dictSize(db
->expires
) == 0 ||
5211 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
5213 /* Delete the key */
5215 dictDelete(db
->expires
,key
);
5216 return dictDelete(db
->dict
,key
) == DICT_OK
;
5219 static void expireGenericCommand(redisClient
*c
, robj
*key
, time_t seconds
) {
5222 de
= dictFind(c
->db
->dict
,key
);
5224 addReply(c
,shared
.czero
);
5228 if (deleteKey(c
->db
,key
)) server
.dirty
++;
5229 addReply(c
, shared
.cone
);
5232 time_t when
= time(NULL
)+seconds
;
5233 if (setExpire(c
->db
,key
,when
)) {
5234 addReply(c
,shared
.cone
);
5237 addReply(c
,shared
.czero
);
5243 static void expireCommand(redisClient
*c
) {
5244 expireGenericCommand(c
,c
->argv
[1],strtol(c
->argv
[2]->ptr
,NULL
,10));
5247 static void expireatCommand(redisClient
*c
) {
5248 expireGenericCommand(c
,c
->argv
[1],strtol(c
->argv
[2]->ptr
,NULL
,10)-time(NULL
));
5251 static void ttlCommand(redisClient
*c
) {
5255 expire
= getExpire(c
->db
,c
->argv
[1]);
5257 ttl
= (int) (expire
-time(NULL
));
5258 if (ttl
< 0) ttl
= -1;
5260 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",ttl
));
5263 /* =============================== Replication ============================= */
5265 static int syncWrite(int fd
, char *ptr
, ssize_t size
, int timeout
) {
5266 ssize_t nwritten
, ret
= size
;
5267 time_t start
= time(NULL
);
5271 if (aeWait(fd
,AE_WRITABLE
,1000) & AE_WRITABLE
) {
5272 nwritten
= write(fd
,ptr
,size
);
5273 if (nwritten
== -1) return -1;
5277 if ((time(NULL
)-start
) > timeout
) {
5285 static int syncRead(int fd
, char *ptr
, ssize_t size
, int timeout
) {
5286 ssize_t nread
, totread
= 0;
5287 time_t start
= time(NULL
);
5291 if (aeWait(fd
,AE_READABLE
,1000) & AE_READABLE
) {
5292 nread
= read(fd
,ptr
,size
);
5293 if (nread
== -1) return -1;
5298 if ((time(NULL
)-start
) > timeout
) {
5306 static int syncReadLine(int fd
, char *ptr
, ssize_t size
, int timeout
) {
5313 if (syncRead(fd
,&c
,1,timeout
) == -1) return -1;
5316 if (nread
&& *(ptr
-1) == '\r') *(ptr
-1) = '\0';
5327 static void syncCommand(redisClient
*c
) {
5328 /* ignore SYNC if aleady slave or in monitor mode */
5329 if (c
->flags
& REDIS_SLAVE
) return;
5331 /* SYNC can't be issued when the server has pending data to send to
5332 * the client about already issued commands. We need a fresh reply
5333 * buffer registering the differences between the BGSAVE and the current
5334 * dataset, so that we can copy to other slaves if needed. */
5335 if (listLength(c
->reply
) != 0) {
5336 addReplySds(c
,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
5340 redisLog(REDIS_NOTICE
,"Slave ask for synchronization");
5341 /* Here we need to check if there is a background saving operation
5342 * in progress, or if it is required to start one */
5343 if (server
.bgsavechildpid
!= -1) {
5344 /* Ok a background save is in progress. Let's check if it is a good
5345 * one for replication, i.e. if there is another slave that is
5346 * registering differences since the server forked to save */
5350 listRewind(server
.slaves
);
5351 while((ln
= listYield(server
.slaves
))) {
5353 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) break;
5356 /* Perfect, the server is already registering differences for
5357 * another slave. Set the right state, and copy the buffer. */
5358 listRelease(c
->reply
);
5359 c
->reply
= listDup(slave
->reply
);
5360 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
5361 redisLog(REDIS_NOTICE
,"Waiting for end of BGSAVE for SYNC");
5363 /* No way, we need to wait for the next BGSAVE in order to
5364 * register differences */
5365 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_START
;
5366 redisLog(REDIS_NOTICE
,"Waiting for next BGSAVE for SYNC");
5369 /* Ok we don't have a BGSAVE in progress, let's start one */
5370 redisLog(REDIS_NOTICE
,"Starting BGSAVE for SYNC");
5371 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
5372 redisLog(REDIS_NOTICE
,"Replication failed, can't BGSAVE");
5373 addReplySds(c
,sdsnew("-ERR Unalbe to perform background save\r\n"));
5376 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
5379 c
->flags
|= REDIS_SLAVE
;
5381 listAddNodeTail(server
.slaves
,c
);
5385 static void sendBulkToSlave(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
5386 redisClient
*slave
= privdata
;
5388 REDIS_NOTUSED(mask
);
5389 char buf
[REDIS_IOBUF_LEN
];
5390 ssize_t nwritten
, buflen
;
5392 if (slave
->repldboff
== 0) {
5393 /* Write the bulk write count before to transfer the DB. In theory here
5394 * we don't know how much room there is in the output buffer of the
5395 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
5396 * operations) will never be smaller than the few bytes we need. */
5399 bulkcount
= sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
5401 if (write(fd
,bulkcount
,sdslen(bulkcount
)) != (signed)sdslen(bulkcount
))
5409 lseek(slave
->repldbfd
,slave
->repldboff
,SEEK_SET
);
5410 buflen
= read(slave
->repldbfd
,buf
,REDIS_IOBUF_LEN
);
5412 redisLog(REDIS_WARNING
,"Read error sending DB to slave: %s",
5413 (buflen
== 0) ? "premature EOF" : strerror(errno
));
5417 if ((nwritten
= write(fd
,buf
,buflen
)) == -1) {
5418 redisLog(REDIS_DEBUG
,"Write error sending DB to slave: %s",
5423 slave
->repldboff
+= nwritten
;
5424 if (slave
->repldboff
== slave
->repldbsize
) {
5425 close(slave
->repldbfd
);
5426 slave
->repldbfd
= -1;
5427 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
5428 slave
->replstate
= REDIS_REPL_ONLINE
;
5429 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
,
5430 sendReplyToClient
, slave
) == AE_ERR
) {
5434 addReplySds(slave
,sdsempty());
5435 redisLog(REDIS_NOTICE
,"Synchronization with slave succeeded");
5439 /* This function is called at the end of every backgrond saving.
5440 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
5441 * otherwise REDIS_ERR is passed to the function.
5443 * The goal of this function is to handle slaves waiting for a successful
5444 * background saving in order to perform non-blocking synchronization. */
5445 static void updateSlavesWaitingBgsave(int bgsaveerr
) {
5447 int startbgsave
= 0;
5449 listRewind(server
.slaves
);
5450 while((ln
= listYield(server
.slaves
))) {
5451 redisClient
*slave
= ln
->value
;
5453 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) {
5455 slave
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
5456 } else if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) {
5457 struct redis_stat buf
;
5459 if (bgsaveerr
!= REDIS_OK
) {
5461 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE child returned an error");
5464 if ((slave
->repldbfd
= open(server
.dbfilename
,O_RDONLY
)) == -1 ||
5465 redis_fstat(slave
->repldbfd
,&buf
) == -1) {
5467 redisLog(REDIS_WARNING
,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno
));
5470 slave
->repldboff
= 0;
5471 slave
->repldbsize
= buf
.st_size
;
5472 slave
->replstate
= REDIS_REPL_SEND_BULK
;
5473 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
5474 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
, sendBulkToSlave
, slave
) == AE_ERR
) {
5481 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
5482 listRewind(server
.slaves
);
5483 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE failed");
5484 while((ln
= listYield(server
.slaves
))) {
5485 redisClient
*slave
= ln
->value
;
5487 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
)
5494 static int syncWithMaster(void) {
5495 char buf
[1024], tmpfile
[256], authcmd
[1024];
5497 int fd
= anetTcpConnect(NULL
,server
.masterhost
,server
.masterport
);
5501 redisLog(REDIS_WARNING
,"Unable to connect to MASTER: %s",
5506 /* AUTH with the master if required. */
5507 if(server
.masterauth
) {
5508 snprintf(authcmd
, 1024, "AUTH %s\r\n", server
.masterauth
);
5509 if (syncWrite(fd
, authcmd
, strlen(server
.masterauth
)+7, 5) == -1) {
5511 redisLog(REDIS_WARNING
,"Unable to AUTH to MASTER: %s",
5515 /* Read the AUTH result. */
5516 if (syncReadLine(fd
,buf
,1024,3600) == -1) {
5518 redisLog(REDIS_WARNING
,"I/O error reading auth result from MASTER: %s",
5522 if (buf
[0] != '+') {
5524 redisLog(REDIS_WARNING
,"Cannot AUTH to MASTER, is the masterauth password correct?");
5529 /* Issue the SYNC command */
5530 if (syncWrite(fd
,"SYNC \r\n",7,5) == -1) {
5532 redisLog(REDIS_WARNING
,"I/O error writing to MASTER: %s",
5536 /* Read the bulk write count */
5537 if (syncReadLine(fd
,buf
,1024,3600) == -1) {
5539 redisLog(REDIS_WARNING
,"I/O error reading bulk count from MASTER: %s",
5543 if (buf
[0] != '$') {
5545 redisLog(REDIS_WARNING
,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
5548 dumpsize
= atoi(buf
+1);
5549 redisLog(REDIS_NOTICE
,"Receiving %d bytes data dump from MASTER",dumpsize
);
5550 /* Read the bulk write data on a temp file */
5551 snprintf(tmpfile
,256,"temp-%d.%ld.rdb",(int)time(NULL
),(long int)random());
5552 dfd
= open(tmpfile
,O_CREAT
|O_WRONLY
,0644);
5555 redisLog(REDIS_WARNING
,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno
));
5559 int nread
, nwritten
;
5561 nread
= read(fd
,buf
,(dumpsize
< 1024)?dumpsize
:1024);
5563 redisLog(REDIS_WARNING
,"I/O error trying to sync with MASTER: %s",
5569 nwritten
= write(dfd
,buf
,nread
);
5570 if (nwritten
== -1) {
5571 redisLog(REDIS_WARNING
,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno
));
5579 if (rename(tmpfile
,server
.dbfilename
) == -1) {
5580 redisLog(REDIS_WARNING
,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno
));
5586 if (rdbLoad(server
.dbfilename
) != REDIS_OK
) {
5587 redisLog(REDIS_WARNING
,"Failed trying to load the MASTER synchronization DB from disk");
5591 server
.master
= createClient(fd
);
5592 server
.master
->flags
|= REDIS_MASTER
;
5593 server
.master
->authenticated
= 1;
5594 server
.replstate
= REDIS_REPL_CONNECTED
;
5598 static void slaveofCommand(redisClient
*c
) {
5599 if (!strcasecmp(c
->argv
[1]->ptr
,"no") &&
5600 !strcasecmp(c
->argv
[2]->ptr
,"one")) {
5601 if (server
.masterhost
) {
5602 sdsfree(server
.masterhost
);
5603 server
.masterhost
= NULL
;
5604 if (server
.master
) freeClient(server
.master
);
5605 server
.replstate
= REDIS_REPL_NONE
;
5606 redisLog(REDIS_NOTICE
,"MASTER MODE enabled (user request)");
5609 sdsfree(server
.masterhost
);
5610 server
.masterhost
= sdsdup(c
->argv
[1]->ptr
);
5611 server
.masterport
= atoi(c
->argv
[2]->ptr
);
5612 if (server
.master
) freeClient(server
.master
);
5613 server
.replstate
= REDIS_REPL_CONNECT
;
5614 redisLog(REDIS_NOTICE
,"SLAVE OF %s:%d enabled (user request)",
5615 server
.masterhost
, server
.masterport
);
5617 addReply(c
,shared
.ok
);
5620 /* ============================ Maxmemory directive ======================== */
5622 /* This function gets called when 'maxmemory' is set on the config file to limit
5623 * the max memory used by the server, and we are out of memory.
5624 * This function will try to, in order:
5626 * - Free objects from the free list
5627 * - Try to remove keys with an EXPIRE set
5629 * It is not possible to free enough memory to reach used-memory < maxmemory
5630 * the server will start refusing commands that will enlarge even more the
5633 static void freeMemoryIfNeeded(void) {
5634 while (server
.maxmemory
&& zmalloc_used_memory() > server
.maxmemory
) {
5635 if (listLength(server
.objfreelist
)) {
5638 listNode
*head
= listFirst(server
.objfreelist
);
5639 o
= listNodeValue(head
);
5640 listDelNode(server
.objfreelist
,head
);
5643 int j
, k
, freed
= 0;
5645 for (j
= 0; j
< server
.dbnum
; j
++) {
5647 robj
*minkey
= NULL
;
5648 struct dictEntry
*de
;
5650 if (dictSize(server
.db
[j
].expires
)) {
5652 /* From a sample of three keys drop the one nearest to
5653 * the natural expire */
5654 for (k
= 0; k
< 3; k
++) {
5657 de
= dictGetRandomKey(server
.db
[j
].expires
);
5658 t
= (time_t) dictGetEntryVal(de
);
5659 if (minttl
== -1 || t
< minttl
) {
5660 minkey
= dictGetEntryKey(de
);
5664 deleteKey(server
.db
+j
,minkey
);
5667 if (!freed
) return; /* nothing to free... */
5672 /* ============================== Append Only file ========================== */
5674 static void feedAppendOnlyFile(struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
) {
5675 sds buf
= sdsempty();
5681 /* The DB this command was targetting is not the same as the last command
5682 * we appendend. To issue a SELECT command is needed. */
5683 if (dictid
!= server
.appendseldb
) {
5686 snprintf(seldb
,sizeof(seldb
),"%d",dictid
);
5687 buf
= sdscatprintf(buf
,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
5688 (unsigned long)strlen(seldb
),seldb
);
5689 server
.appendseldb
= dictid
;
5692 /* "Fix" the argv vector if the command is EXPIRE. We want to translate
5693 * EXPIREs into EXPIREATs calls */
5694 if (cmd
->proc
== expireCommand
) {
5697 tmpargv
[0] = createStringObject("EXPIREAT",8);
5698 tmpargv
[1] = argv
[1];
5699 incrRefCount(argv
[1]);
5700 when
= time(NULL
)+strtol(argv
[2]->ptr
,NULL
,10);
5701 tmpargv
[2] = createObject(REDIS_STRING
,
5702 sdscatprintf(sdsempty(),"%ld",when
));
5706 /* Append the actual command */
5707 buf
= sdscatprintf(buf
,"*%d\r\n",argc
);
5708 for (j
= 0; j
< argc
; j
++) {
5711 o
= getDecodedObject(o
);
5712 buf
= sdscatprintf(buf
,"$%lu\r\n",(unsigned long)sdslen(o
->ptr
));
5713 buf
= sdscatlen(buf
,o
->ptr
,sdslen(o
->ptr
));
5714 buf
= sdscatlen(buf
,"\r\n",2);
5718 /* Free the objects from the modified argv for EXPIREAT */
5719 if (cmd
->proc
== expireCommand
) {
5720 for (j
= 0; j
< 3; j
++)
5721 decrRefCount(argv
[j
]);
5724 /* We want to perform a single write. This should be guaranteed atomic
5725 * at least if the filesystem we are writing is a real physical one.
5726 * While this will save us against the server being killed I don't think
5727 * there is much to do about the whole server stopping for power problems
5729 nwritten
= write(server
.appendfd
,buf
,sdslen(buf
));
5730 if (nwritten
!= (signed)sdslen(buf
)) {
5731 /* Ooops, we are in troubles. The best thing to do for now is
5732 * to simply exit instead to give the illusion that everything is
5733 * working as expected. */
5734 if (nwritten
== -1) {
5735 redisLog(REDIS_WARNING
,"Exiting on error writing to the append-only file: %s",strerror(errno
));
5737 redisLog(REDIS_WARNING
,"Exiting on short write while writing to the append-only file: %s",strerror(errno
));
5741 /* If a background append only file rewriting is in progress we want to
5742 * accumulate the differences between the child DB and the current one
5743 * in a buffer, so that when the child process will do its work we
5744 * can append the differences to the new append only file. */
5745 if (server
.bgrewritechildpid
!= -1)
5746 server
.bgrewritebuf
= sdscatlen(server
.bgrewritebuf
,buf
,sdslen(buf
));
5750 if (server
.appendfsync
== APPENDFSYNC_ALWAYS
||
5751 (server
.appendfsync
== APPENDFSYNC_EVERYSEC
&&
5752 now
-server
.lastfsync
> 1))
5754 fsync(server
.appendfd
); /* Let's try to get this data on the disk */
5755 server
.lastfsync
= now
;
5759 /* In Redis commands are always executed in the context of a client, so in
5760 * order to load the append only file we need to create a fake client. */
5761 static struct redisClient
*createFakeClient(void) {
5762 struct redisClient
*c
= zmalloc(sizeof(*c
));
5766 c
->querybuf
= sdsempty();
5770 /* We set the fake client as a slave waiting for the synchronization
5771 * so that Redis will not try to send replies to this client. */
5772 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_START
;
5773 c
->reply
= listCreate();
5774 listSetFreeMethod(c
->reply
,decrRefCount
);
5775 listSetDupMethod(c
->reply
,dupClientReplyValue
);
5779 static void freeFakeClient(struct redisClient
*c
) {
5780 sdsfree(c
->querybuf
);
5781 listRelease(c
->reply
);
5785 /* Replay the append log file. On error REDIS_OK is returned. On non fatal
5786 * error (the append only file is zero-length) REDIS_ERR is returned. On
5787 * fatal error an error message is logged and the program exists. */
5788 int loadAppendOnlyFile(char *filename
) {
5789 struct redisClient
*fakeClient
;
5790 FILE *fp
= fopen(filename
,"r");
5791 struct redis_stat sb
;
5793 if (redis_fstat(fileno(fp
),&sb
) != -1 && sb
.st_size
== 0)
5797 redisLog(REDIS_WARNING
,"Fatal error: can't open the append log file for reading: %s",strerror(errno
));
5801 fakeClient
= createFakeClient();
5808 struct redisCommand
*cmd
;
5810 if (fgets(buf
,sizeof(buf
),fp
) == NULL
) {
5816 if (buf
[0] != '*') goto fmterr
;
5818 argv
= zmalloc(sizeof(robj
*)*argc
);
5819 for (j
= 0; j
< argc
; j
++) {
5820 if (fgets(buf
,sizeof(buf
),fp
) == NULL
) goto readerr
;
5821 if (buf
[0] != '$') goto fmterr
;
5822 len
= strtol(buf
+1,NULL
,10);
5823 argsds
= sdsnewlen(NULL
,len
);
5824 if (len
&& fread(argsds
,len
,1,fp
) == 0) goto fmterr
;
5825 argv
[j
] = createObject(REDIS_STRING
,argsds
);
5826 if (fread(buf
,2,1,fp
) == 0) goto fmterr
; /* discard CRLF */
5829 /* Command lookup */
5830 cmd
= lookupCommand(argv
[0]->ptr
);
5832 redisLog(REDIS_WARNING
,"Unknown command '%s' reading the append only file", argv
[0]->ptr
);
5835 /* Try object sharing and encoding */
5836 if (server
.shareobjects
) {
5838 for(j
= 1; j
< argc
; j
++)
5839 argv
[j
] = tryObjectSharing(argv
[j
]);
5841 if (cmd
->flags
& REDIS_CMD_BULK
)
5842 tryObjectEncoding(argv
[argc
-1]);
5843 /* Run the command in the context of a fake client */
5844 fakeClient
->argc
= argc
;
5845 fakeClient
->argv
= argv
;
5846 cmd
->proc(fakeClient
);
5847 /* Discard the reply objects list from the fake client */
5848 while(listLength(fakeClient
->reply
))
5849 listDelNode(fakeClient
->reply
,listFirst(fakeClient
->reply
));
5850 /* Clean up, ready for the next command */
5851 for (j
= 0; j
< argc
; j
++) decrRefCount(argv
[j
]);
5855 freeFakeClient(fakeClient
);
5860 redisLog(REDIS_WARNING
,"Unexpected end of file reading the append only file");
5862 redisLog(REDIS_WARNING
,"Unrecoverable error reading the append only file: %s", strerror(errno
));
5866 redisLog(REDIS_WARNING
,"Bad file format reading the append only file");
5870 /* Write an object into a file in the bulk format $<count>\r\n<payload>\r\n */
5871 static int fwriteBulk(FILE *fp
, robj
*obj
) {
5873 obj
= getDecodedObject(obj
);
5874 snprintf(buf
,sizeof(buf
),"$%ld\r\n",(long)sdslen(obj
->ptr
));
5875 if (fwrite(buf
,strlen(buf
),1,fp
) == 0) goto err
;
5876 if (fwrite(obj
->ptr
,sdslen(obj
->ptr
),1,fp
) == 0) goto err
;
5877 if (fwrite("\r\n",2,1,fp
) == 0) goto err
;
5885 /* Write a double value in bulk format $<count>\r\n<payload>\r\n */
5886 static int fwriteBulkDouble(FILE *fp
, double d
) {
5887 char buf
[128], dbuf
[128];
5889 snprintf(dbuf
,sizeof(dbuf
),"%.17g\r\n",d
);
5890 snprintf(buf
,sizeof(buf
),"$%lu\r\n",(unsigned long)strlen(dbuf
)-2);
5891 if (fwrite(buf
,strlen(buf
),1,fp
) == 0) return 0;
5892 if (fwrite(dbuf
,strlen(dbuf
),1,fp
) == 0) return 0;
5896 /* Write a long value in bulk format $<count>\r\n<payload>\r\n */
5897 static int fwriteBulkLong(FILE *fp
, long l
) {
5898 char buf
[128], lbuf
[128];
5900 snprintf(lbuf
,sizeof(lbuf
),"%ld\r\n",l
);
5901 snprintf(buf
,sizeof(buf
),"$%lu\r\n",(unsigned long)strlen(lbuf
)-2);
5902 if (fwrite(buf
,strlen(buf
),1,fp
) == 0) return 0;
5903 if (fwrite(lbuf
,strlen(lbuf
),1,fp
) == 0) return 0;
5907 /* Write a sequence of commands able to fully rebuild the dataset into
5908 * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */
5909 static int rewriteAppendOnlyFile(char *filename
) {
5910 dictIterator
*di
= NULL
;
5915 time_t now
= time(NULL
);
5917 /* Note that we have to use a different temp name here compared to the
5918 * one used by rewriteAppendOnlyFileBackground() function. */
5919 snprintf(tmpfile
,256,"temp-rewriteaof-%d.aof", (int) getpid());
5920 fp
= fopen(tmpfile
,"w");
5922 redisLog(REDIS_WARNING
, "Failed rewriting the append only file: %s", strerror(errno
));
5925 for (j
= 0; j
< server
.dbnum
; j
++) {
5926 char selectcmd
[] = "*2\r\n$6\r\nSELECT\r\n";
5927 redisDb
*db
= server
.db
+j
;
5929 if (dictSize(d
) == 0) continue;
5930 di
= dictGetIterator(d
);
5936 /* SELECT the new DB */
5937 if (fwrite(selectcmd
,sizeof(selectcmd
)-1,1,fp
) == 0) goto werr
;
5938 if (fwriteBulkLong(fp
,j
) == 0) goto werr
;
5940 /* Iterate this DB writing every entry */
5941 while((de
= dictNext(di
)) != NULL
) {
5942 robj
*key
= dictGetEntryKey(de
);
5943 robj
*o
= dictGetEntryVal(de
);
5944 time_t expiretime
= getExpire(db
,key
);
5946 /* Save the key and associated value */
5947 if (o
->type
== REDIS_STRING
) {
5948 /* Emit a SET command */
5949 char cmd
[]="*3\r\n$3\r\nSET\r\n";
5950 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
5952 if (fwriteBulk(fp
,key
) == 0) goto werr
;
5953 if (fwriteBulk(fp
,o
) == 0) goto werr
;
5954 } else if (o
->type
== REDIS_LIST
) {
5955 /* Emit the RPUSHes needed to rebuild the list */
5956 list
*list
= o
->ptr
;
5960 while((ln
= listYield(list
))) {
5961 char cmd
[]="*3\r\n$5\r\nRPUSH\r\n";
5962 robj
*eleobj
= listNodeValue(ln
);
5964 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
5965 if (fwriteBulk(fp
,key
) == 0) goto werr
;
5966 if (fwriteBulk(fp
,eleobj
) == 0) goto werr
;
5968 } else if (o
->type
== REDIS_SET
) {
5969 /* Emit the SADDs needed to rebuild the set */
5971 dictIterator
*di
= dictGetIterator(set
);
5974 while((de
= dictNext(di
)) != NULL
) {
5975 char cmd
[]="*3\r\n$4\r\nSADD\r\n";
5976 robj
*eleobj
= dictGetEntryKey(de
);
5978 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
5979 if (fwriteBulk(fp
,key
) == 0) goto werr
;
5980 if (fwriteBulk(fp
,eleobj
) == 0) goto werr
;
5982 dictReleaseIterator(di
);
5983 } else if (o
->type
== REDIS_ZSET
) {
5984 /* Emit the ZADDs needed to rebuild the sorted set */
5986 dictIterator
*di
= dictGetIterator(zs
->dict
);
5989 while((de
= dictNext(di
)) != NULL
) {
5990 char cmd
[]="*4\r\n$4\r\nZADD\r\n";
5991 robj
*eleobj
= dictGetEntryKey(de
);
5992 double *score
= dictGetEntryVal(de
);
5994 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
5995 if (fwriteBulk(fp
,key
) == 0) goto werr
;
5996 if (fwriteBulkDouble(fp
,*score
) == 0) goto werr
;
5997 if (fwriteBulk(fp
,eleobj
) == 0) goto werr
;
5999 dictReleaseIterator(di
);
6001 redisAssert(0 != 0);
6003 /* Save the expire time */
6004 if (expiretime
!= -1) {
6005 char cmd
[]="*3\r\n$6\r\nEXPIRE\r\n";
6006 /* If this key is already expired skip it */
6007 if (expiretime
< now
) continue;
6008 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6009 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6010 if (fwriteBulkLong(fp
,expiretime
) == 0) goto werr
;
6013 dictReleaseIterator(di
);
6016 /* Make sure data will not remain on the OS's output buffers */
6021 /* Use RENAME to make sure the DB file is changed atomically only
6022 * if the generate DB file is ok. */
6023 if (rename(tmpfile
,filename
) == -1) {
6024 redisLog(REDIS_WARNING
,"Error moving temp append only file on the final destination: %s", strerror(errno
));
6028 redisLog(REDIS_NOTICE
,"SYNC append only file rewrite performed");
6034 redisLog(REDIS_WARNING
,"Write error writing append only fileon disk: %s", strerror(errno
));
6035 if (di
) dictReleaseIterator(di
);
6039 /* This is how rewriting of the append only file in background works:
6041 * 1) The user calls BGREWRITEAOF
6042 * 2) Redis calls this function, that forks():
6043 * 2a) the child rewrite the append only file in a temp file.
6044 * 2b) the parent accumulates differences in server.bgrewritebuf.
6045 * 3) When the child finished '2a' exists.
6046 * 4) The parent will trap the exit code, if it's OK, will append the
6047 * data accumulated into server.bgrewritebuf into the temp file, and
6048 * finally will rename(2) the temp file in the actual file name.
6049 * The the new file is reopened as the new append only file. Profit!
6051 static int rewriteAppendOnlyFileBackground(void) {
6054 if (server
.bgrewritechildpid
!= -1) return REDIS_ERR
;
6055 if ((childpid
= fork()) == 0) {
6060 snprintf(tmpfile
,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
6061 if (rewriteAppendOnlyFile(tmpfile
) == REDIS_OK
) {
6068 if (childpid
== -1) {
6069 redisLog(REDIS_WARNING
,
6070 "Can't rewrite append only file in background: fork: %s",
6074 redisLog(REDIS_NOTICE
,
6075 "Background append only file rewriting started by pid %d",childpid
);
6076 server
.bgrewritechildpid
= childpid
;
6077 /* We set appendseldb to -1 in order to force the next call to the
6078 * feedAppendOnlyFile() to issue a SELECT command, so the differences
6079 * accumulated by the parent into server.bgrewritebuf will start
6080 * with a SELECT statement and it will be safe to merge. */
6081 server
.appendseldb
= -1;
6084 return REDIS_OK
; /* unreached */
6087 static void bgrewriteaofCommand(redisClient
*c
) {
6088 if (server
.bgrewritechildpid
!= -1) {
6089 addReplySds(c
,sdsnew("-ERR background append only file rewriting already in progress\r\n"));
6092 if (rewriteAppendOnlyFileBackground() == REDIS_OK
) {
6093 char *status
= "+Background append only file rewriting started\r\n";
6094 addReplySds(c
,sdsnew(status
));
6096 addReply(c
,shared
.err
);
6100 static void aofRemoveTempFile(pid_t childpid
) {
6103 snprintf(tmpfile
,256,"temp-rewriteaof-bg-%d.aof", (int) childpid
);
6107 /* ================================= Debugging ============================== */
6109 static void debugCommand(redisClient
*c
) {
6110 if (!strcasecmp(c
->argv
[1]->ptr
,"segfault")) {
6112 } else if (!strcasecmp(c
->argv
[1]->ptr
,"reload")) {
6113 if (rdbSave(server
.dbfilename
) != REDIS_OK
) {
6114 addReply(c
,shared
.err
);
6118 if (rdbLoad(server
.dbfilename
) != REDIS_OK
) {
6119 addReply(c
,shared
.err
);
6122 redisLog(REDIS_WARNING
,"DB reloaded by DEBUG RELOAD");
6123 addReply(c
,shared
.ok
);
6124 } else if (!strcasecmp(c
->argv
[1]->ptr
,"object") && c
->argc
== 3) {
6125 dictEntry
*de
= dictFind(c
->db
->dict
,c
->argv
[2]);
6129 addReply(c
,shared
.nokeyerr
);
6132 key
= dictGetEntryKey(de
);
6133 val
= dictGetEntryVal(de
);
6134 addReplySds(c
,sdscatprintf(sdsempty(),
6135 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
6136 (void*)key
, key
->refcount
, (void*)val
, val
->refcount
,
6139 addReplySds(c
,sdsnew(
6140 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>|RELOAD]\r\n"));
6144 static void _redisAssert(char *estr
) {
6145 redisLog(REDIS_WARNING
,"=== ASSERTION FAILED ===");
6146 redisLog(REDIS_WARNING
,"==> %s\n",estr
);
6147 #ifdef HAVE_BACKTRACE
6148 redisLog(REDIS_WARNING
,"(forcing SIGSEGV in order to print the stack trace)");
6153 /* =================================== Main! ================================ */
6156 int linuxOvercommitMemoryValue(void) {
6157 FILE *fp
= fopen("/proc/sys/vm/overcommit_memory","r");
6161 if (fgets(buf
,64,fp
) == NULL
) {
6170 void linuxOvercommitMemoryWarning(void) {
6171 if (linuxOvercommitMemoryValue() == 0) {
6172 redisLog(REDIS_WARNING
,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
6175 #endif /* __linux__ */
6177 static void daemonize(void) {
6181 if (fork() != 0) exit(0); /* parent exits */
6182 printf("New pid: %d\n", getpid());
6183 setsid(); /* create a new session */
6185 /* Every output goes to /dev/null. If Redis is daemonized but
6186 * the 'logfile' is set to 'stdout' in the configuration file
6187 * it will not log at all. */
6188 if ((fd
= open("/dev/null", O_RDWR
, 0)) != -1) {
6189 dup2(fd
, STDIN_FILENO
);
6190 dup2(fd
, STDOUT_FILENO
);
6191 dup2(fd
, STDERR_FILENO
);
6192 if (fd
> STDERR_FILENO
) close(fd
);
6194 /* Try to write the pid file */
6195 fp
= fopen(server
.pidfile
,"w");
6197 fprintf(fp
,"%d\n",getpid());
6202 int main(int argc
, char **argv
) {
6205 resetServerSaveParams();
6206 loadServerConfig(argv
[1]);
6207 } else if (argc
> 2) {
6208 fprintf(stderr
,"Usage: ./redis-server [/path/to/redis.conf]\n");
6211 redisLog(REDIS_WARNING
,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
6213 if (server
.daemonize
) daemonize();
6215 redisLog(REDIS_NOTICE
,"Server started, Redis version " REDIS_VERSION
);
6217 linuxOvercommitMemoryWarning();
6219 if (server
.appendonly
) {
6220 if (loadAppendOnlyFile(server
.appendfilename
) == REDIS_OK
)
6221 redisLog(REDIS_NOTICE
,"DB loaded from append only file");
6223 if (rdbLoad(server
.dbfilename
) == REDIS_OK
)
6224 redisLog(REDIS_NOTICE
,"DB loaded from disk");
6226 if (aeCreateFileEvent(server
.el
, server
.fd
, AE_READABLE
,
6227 acceptHandler
, NULL
) == AE_ERR
) oom("creating file event");
6228 redisLog(REDIS_NOTICE
,"The server is now ready to accept connections on port %d", server
.port
);
6230 aeDeleteEventLoop(server
.el
);
6234 /* ============================= Backtrace support ========================= */
6236 #ifdef HAVE_BACKTRACE
6237 static char *findFuncName(void *pointer
, unsigned long *offset
);
6239 static void *getMcontextEip(ucontext_t
*uc
) {
6240 #if defined(__FreeBSD__)
6241 return (void*) uc
->uc_mcontext
.mc_eip
;
6242 #elif defined(__dietlibc__)
6243 return (void*) uc
->uc_mcontext
.eip
;
6244 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
6246 return (void*) uc
->uc_mcontext
->__ss
.__rip
;
6248 return (void*) uc
->uc_mcontext
->__ss
.__eip
;
6250 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
6251 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
6252 return (void*) uc
->uc_mcontext
->__ss
.__rip
;
6254 return (void*) uc
->uc_mcontext
->__ss
.__eip
;
6256 #elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
6257 return (void*) uc
->uc_mcontext
.gregs
[REG_EIP
];
6258 #elif defined(__ia64__) /* Linux IA64 */
6259 return (void*) uc
->uc_mcontext
.sc_ip
;
6265 static void segvHandler(int sig
, siginfo_t
*info
, void *secret
) {
6267 char **messages
= NULL
;
6268 int i
, trace_size
= 0;
6269 unsigned long offset
=0;
6270 ucontext_t
*uc
= (ucontext_t
*) secret
;
6272 REDIS_NOTUSED(info
);
6274 redisLog(REDIS_WARNING
,
6275 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION
, sig
);
6276 infostring
= genRedisInfoString();
6277 redisLog(REDIS_WARNING
, "%s",infostring
);
6278 /* It's not safe to sdsfree() the returned string under memory
6279 * corruption conditions. Let it leak as we are going to abort */
6281 trace_size
= backtrace(trace
, 100);
6282 /* overwrite sigaction with caller's address */
6283 if (getMcontextEip(uc
) != NULL
) {
6284 trace
[1] = getMcontextEip(uc
);
6286 messages
= backtrace_symbols(trace
, trace_size
);
6288 for (i
=1; i
<trace_size
; ++i
) {
6289 char *fn
= findFuncName(trace
[i
], &offset
), *p
;
6291 p
= strchr(messages
[i
],'+');
6292 if (!fn
|| (p
&& ((unsigned long)strtol(p
+1,NULL
,10)) < offset
)) {
6293 redisLog(REDIS_WARNING
,"%s", messages
[i
]);
6295 redisLog(REDIS_WARNING
,"%d redis-server %p %s + %d", i
, trace
[i
], fn
, (unsigned int)offset
);
6298 // free(messages); Don't call free() with possibly corrupted memory.
6302 static void setupSigSegvAction(void) {
6303 struct sigaction act
;
6305 sigemptyset (&act
.sa_mask
);
6306 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
6307 * is used. Otherwise, sa_handler is used */
6308 act
.sa_flags
= SA_NODEFER
| SA_ONSTACK
| SA_RESETHAND
| SA_SIGINFO
;
6309 act
.sa_sigaction
= segvHandler
;
6310 sigaction (SIGSEGV
, &act
, NULL
);
6311 sigaction (SIGBUS
, &act
, NULL
);
6312 sigaction (SIGFPE
, &act
, NULL
);
6313 sigaction (SIGILL
, &act
, NULL
);
6314 sigaction (SIGBUS
, &act
, NULL
);
6318 #include "staticsymbols.h"
6319 /* This function try to convert a pointer into a function name. It's used in
6320 * oreder to provide a backtrace under segmentation fault that's able to
6321 * display functions declared as static (otherwise the backtrace is useless). */
6322 static char *findFuncName(void *pointer
, unsigned long *offset
){
6324 unsigned long off
, minoff
= 0;
6326 /* Try to match against the Symbol with the smallest offset */
6327 for (i
=0; symsTable
[i
].pointer
; i
++) {
6328 unsigned long lp
= (unsigned long) pointer
;
6330 if (lp
!= (unsigned long)-1 && lp
>= symsTable
[i
].pointer
) {
6331 off
=lp
-symsTable
[i
].pointer
;
6332 if (ret
< 0 || off
< minoff
) {
6338 if (ret
== -1) return NULL
;
6340 return symsTable
[ret
].name
;
6342 #else /* HAVE_BACKTRACE */
6343 static void setupSigSegvAction(void) {
6345 #endif /* HAVE_BACKTRACE */