2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
30 #define REDIS_VERSION "1.1.91"
40 #define __USE_POSIX199309
46 #endif /* HAVE_BACKTRACE */
54 #include <arpa/inet.h>
58 #include <sys/resource.h>
64 #include "solarisfixes.h"
68 #include "ae.h" /* Event driven programming library */
69 #include "sds.h" /* Dynamic safe strings */
70 #include "anet.h" /* Networking the easy way */
71 #include "dict.h" /* Hash tables */
72 #include "adlist.h" /* Linked lists */
73 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
74 #include "lzf.h" /* LZF compression library */
75 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
81 /* Static server configuration */
82 #define REDIS_SERVERPORT 6379 /* TCP port */
83 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
84 #define REDIS_IOBUF_LEN 1024
85 #define REDIS_LOADBUF_LEN 1024
86 #define REDIS_STATIC_ARGS 4
87 #define REDIS_DEFAULT_DBNUM 16
88 #define REDIS_CONFIGLINE_MAX 1024
89 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
90 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
91 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
92 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
93 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
95 /* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */
96 #define REDIS_WRITEV_THRESHOLD 3
97 /* Max number of iovecs used for each writev call */
98 #define REDIS_WRITEV_IOVEC_COUNT 256
100 /* Hash table parameters */
101 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
104 #define REDIS_CMD_BULK 1 /* Bulk write command */
105 #define REDIS_CMD_INLINE 2 /* Inline command */
106 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
107 this flags will return an error when the 'maxmemory' option is set in the
108 config file and the server is using more than maxmemory bytes of memory.
109 In short this commands are denied on low memory conditions. */
110 #define REDIS_CMD_DENYOOM 4
113 #define REDIS_STRING 0
119 /* Objects encoding */
120 #define REDIS_ENCODING_RAW 0 /* Raw representation */
121 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
123 /* Object types only used for dumping to disk */
124 #define REDIS_EXPIRETIME 253
125 #define REDIS_SELECTDB 254
126 #define REDIS_EOF 255
128 /* Defines related to the dump file format. To store 32 bits lengths for short
129 * keys requires a lot of space, so we check the most significant 2 bits of
130 * the first byte to interpreter the length:
132 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
133 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
134 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
135 * 11|000000 this means: specially encoded object will follow. The six bits
136 * number specify the kind of object that follows.
137 * See the REDIS_RDB_ENC_* defines.
139 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
140 * values, will fit inside. */
141 #define REDIS_RDB_6BITLEN 0
142 #define REDIS_RDB_14BITLEN 1
143 #define REDIS_RDB_32BITLEN 2
144 #define REDIS_RDB_ENCVAL 3
145 #define REDIS_RDB_LENERR UINT_MAX
147 /* When a length of a string object stored on disk has the first two bits
148 * set, the remaining two bits specify a special encoding for the object
149 * accordingly to the following defines: */
150 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
151 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
152 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
153 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
156 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
157 #define REDIS_SLAVE 2 /* This client is a slave server */
158 #define REDIS_MASTER 4 /* This client is a master server */
159 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
161 /* Slave replication state - slave side */
162 #define REDIS_REPL_NONE 0 /* No active replication */
163 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
164 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
166 /* Slave replication state - from the point of view of master
167 * Note that in SEND_BULK and ONLINE state the slave receives new updates
168 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
169 * to start the next background saving in order to send updates to it. */
170 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
171 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
172 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
173 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
175 /* List related stuff */
179 /* Sort operations */
180 #define REDIS_SORT_GET 0
181 #define REDIS_SORT_ASC 1
182 #define REDIS_SORT_DESC 2
183 #define REDIS_SORTKEY_MAX 1024
186 #define REDIS_DEBUG 0
187 #define REDIS_NOTICE 1
188 #define REDIS_WARNING 2
190 /* Anti-warning macro... */
191 #define REDIS_NOTUSED(V) ((void) V)
193 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
194 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
196 /* Append only defines */
197 #define APPENDFSYNC_NO 0
198 #define APPENDFSYNC_ALWAYS 1
199 #define APPENDFSYNC_EVERYSEC 2
201 /* We can print the stacktrace, so our assert is defined this way: */
202 #define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e),exit(1)))
203 static void _redisAssert(char *estr
);
205 /*================================= Data types ============================== */
207 /* A redis object, that is a type able to hold a string / list / set */
208 typedef struct redisObject
{
211 unsigned char encoding
;
212 unsigned char notused
[2];
216 /* Macro used to initalize a Redis object allocated on the stack.
217 * Note that this macro is taken near the structure definition to make sure
218 * we'll update it when the structure is changed, to avoid bugs like
219 * bug #85 introduced exactly in this way. */
220 #define initStaticStringObject(_var,_ptr) do { \
222 _var.type = REDIS_STRING; \
223 _var.encoding = REDIS_ENCODING_RAW; \
227 typedef struct redisDb
{
233 /* With multiplexing we need to take per-clinet state.
234 * Clients are taken in a liked list. */
235 typedef struct redisClient
{
240 robj
**argv
, **mbargv
;
242 int bulklen
; /* bulk read len. -1 if not in bulk read mode */
243 int multibulk
; /* multi bulk command format active */
246 time_t lastinteraction
; /* time of the last interaction, used for timeout */
247 int flags
; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
248 int slaveseldb
; /* slave selected db, if this client is a slave */
249 int authenticated
; /* when requirepass is non-NULL */
250 int replstate
; /* replication state if this is a slave */
251 int repldbfd
; /* replication DB file descriptor */
252 long repldboff
; /* replication DB file offset */
253 off_t repldbsize
; /* replication DB file size */
261 /* Global server state structure */
267 unsigned int sharingpoolsize
;
268 long long dirty
; /* changes to DB from the last save */
270 list
*slaves
, *monitors
;
271 char neterr
[ANET_ERR_LEN
];
273 int cronloops
; /* number of times the cron function run */
274 list
*objfreelist
; /* A list of freed objects to avoid malloc() */
275 time_t lastsave
; /* Unix time of last save succeeede */
276 size_t usedmemory
; /* Used memory in megabytes */
277 /* Fields used only for stats */
278 time_t stat_starttime
; /* server start time */
279 long long stat_numcommands
; /* number of processed commands */
280 long long stat_numconnections
; /* number of connections received */
293 pid_t bgsavechildpid
;
294 pid_t bgrewritechildpid
;
295 sds bgrewritebuf
; /* buffer taken by parent during oppend only rewrite */
296 struct saveparam
*saveparams
;
301 char *appendfilename
;
304 /* Replication related */
309 redisClient
*master
; /* client that is master for this slave */
311 unsigned int maxclients
;
312 unsigned long maxmemory
;
313 /* Sort parameters - qsort_r() is only available under BSD so we
314 * have to take this state global, in order to pass it to sortCompare() */
320 typedef void redisCommandProc(redisClient
*c
);
321 struct redisCommand
{
323 redisCommandProc
*proc
;
328 struct redisFunctionSym
{
330 unsigned long pointer
;
333 typedef struct _redisSortObject
{
341 typedef struct _redisSortOperation
{
344 } redisSortOperation
;
346 /* ZSETs use a specialized version of Skiplists */
348 typedef struct zskiplistNode
{
349 struct zskiplistNode
**forward
;
350 struct zskiplistNode
*backward
;
355 typedef struct zskiplist
{
356 struct zskiplistNode
*header
, *tail
;
357 unsigned long length
;
361 typedef struct zset
{
366 /* Our shared "common" objects */
368 struct sharedObjectsStruct
{
369 robj
*crlf
, *ok
, *err
, *emptybulk
, *czero
, *cone
, *pong
, *space
,
370 *colon
, *nullbulk
, *nullmultibulk
,
371 *emptymultibulk
, *wrongtypeerr
, *nokeyerr
, *syntaxerr
, *sameobjecterr
,
372 *outofrangeerr
, *plus
,
373 *select0
, *select1
, *select2
, *select3
, *select4
,
374 *select5
, *select6
, *select7
, *select8
, *select9
;
377 /* Global vars that are actally used as constants. The following double
378 * values are used for double on-disk serialization, and are initialized
379 * at runtime to avoid strange compiler optimizations. */
381 static double R_Zero
, R_PosInf
, R_NegInf
, R_Nan
;
383 /*================================ Prototypes =============================== */
385 static void freeStringObject(robj
*o
);
386 static void freeListObject(robj
*o
);
387 static void freeSetObject(robj
*o
);
388 static void decrRefCount(void *o
);
389 static robj
*createObject(int type
, void *ptr
);
390 static void freeClient(redisClient
*c
);
391 static int rdbLoad(char *filename
);
392 static void addReply(redisClient
*c
, robj
*obj
);
393 static void addReplySds(redisClient
*c
, sds s
);
394 static void incrRefCount(robj
*o
);
395 static int rdbSaveBackground(char *filename
);
396 static robj
*createStringObject(char *ptr
, size_t len
);
397 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
);
398 static void feedAppendOnlyFile(struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
);
399 static int syncWithMaster(void);
400 static robj
*tryObjectSharing(robj
*o
);
401 static int tryObjectEncoding(robj
*o
);
402 static robj
*getDecodedObject(robj
*o
);
403 static int removeExpire(redisDb
*db
, robj
*key
);
404 static int expireIfNeeded(redisDb
*db
, robj
*key
);
405 static int deleteIfVolatile(redisDb
*db
, robj
*key
);
406 static int deleteKey(redisDb
*db
, robj
*key
);
407 static time_t getExpire(redisDb
*db
, robj
*key
);
408 static int setExpire(redisDb
*db
, robj
*key
, time_t when
);
409 static void updateSlavesWaitingBgsave(int bgsaveerr
);
410 static void freeMemoryIfNeeded(void);
411 static int processCommand(redisClient
*c
);
412 static void setupSigSegvAction(void);
413 static void rdbRemoveTempFile(pid_t childpid
);
414 static void aofRemoveTempFile(pid_t childpid
);
415 static size_t stringObjectLen(robj
*o
);
416 static void processInputBuffer(redisClient
*c
);
417 static zskiplist
*zslCreate(void);
418 static void zslFree(zskiplist
*zsl
);
419 static void zslInsert(zskiplist
*zsl
, double score
, robj
*obj
);
420 static void sendReplyToClientWritev(aeEventLoop
*el
, int fd
, void *privdata
, int mask
);
422 static void authCommand(redisClient
*c
);
423 static void pingCommand(redisClient
*c
);
424 static void echoCommand(redisClient
*c
);
425 static void setCommand(redisClient
*c
);
426 static void setnxCommand(redisClient
*c
);
427 static void getCommand(redisClient
*c
);
428 static void delCommand(redisClient
*c
);
429 static void existsCommand(redisClient
*c
);
430 static void incrCommand(redisClient
*c
);
431 static void decrCommand(redisClient
*c
);
432 static void incrbyCommand(redisClient
*c
);
433 static void decrbyCommand(redisClient
*c
);
434 static void selectCommand(redisClient
*c
);
435 static void randomkeyCommand(redisClient
*c
);
436 static void keysCommand(redisClient
*c
);
437 static void dbsizeCommand(redisClient
*c
);
438 static void lastsaveCommand(redisClient
*c
);
439 static void saveCommand(redisClient
*c
);
440 static void bgsaveCommand(redisClient
*c
);
441 static void bgrewriteaofCommand(redisClient
*c
);
442 static void shutdownCommand(redisClient
*c
);
443 static void moveCommand(redisClient
*c
);
444 static void renameCommand(redisClient
*c
);
445 static void renamenxCommand(redisClient
*c
);
446 static void lpushCommand(redisClient
*c
);
447 static void rpushCommand(redisClient
*c
);
448 static void lpopCommand(redisClient
*c
);
449 static void rpopCommand(redisClient
*c
);
450 static void llenCommand(redisClient
*c
);
451 static void lindexCommand(redisClient
*c
);
452 static void lrangeCommand(redisClient
*c
);
453 static void ltrimCommand(redisClient
*c
);
454 static void typeCommand(redisClient
*c
);
455 static void lsetCommand(redisClient
*c
);
456 static void saddCommand(redisClient
*c
);
457 static void sremCommand(redisClient
*c
);
458 static void smoveCommand(redisClient
*c
);
459 static void sismemberCommand(redisClient
*c
);
460 static void scardCommand(redisClient
*c
);
461 static void spopCommand(redisClient
*c
);
462 static void srandmemberCommand(redisClient
*c
);
463 static void sinterCommand(redisClient
*c
);
464 static void sinterstoreCommand(redisClient
*c
);
465 static void sunionCommand(redisClient
*c
);
466 static void sunionstoreCommand(redisClient
*c
);
467 static void sdiffCommand(redisClient
*c
);
468 static void sdiffstoreCommand(redisClient
*c
);
469 static void syncCommand(redisClient
*c
);
470 static void flushdbCommand(redisClient
*c
);
471 static void flushallCommand(redisClient
*c
);
472 static void sortCommand(redisClient
*c
);
473 static void lremCommand(redisClient
*c
);
474 static void rpoplpushcommand(redisClient
*c
);
475 static void infoCommand(redisClient
*c
);
476 static void mgetCommand(redisClient
*c
);
477 static void monitorCommand(redisClient
*c
);
478 static void expireCommand(redisClient
*c
);
479 static void expireatCommand(redisClient
*c
);
480 static void getsetCommand(redisClient
*c
);
481 static void ttlCommand(redisClient
*c
);
482 static void slaveofCommand(redisClient
*c
);
483 static void debugCommand(redisClient
*c
);
484 static void msetCommand(redisClient
*c
);
485 static void msetnxCommand(redisClient
*c
);
486 static void zaddCommand(redisClient
*c
);
487 static void zincrbyCommand(redisClient
*c
);
488 static void zrangeCommand(redisClient
*c
);
489 static void zrangebyscoreCommand(redisClient
*c
);
490 static void zrevrangeCommand(redisClient
*c
);
491 static void zcardCommand(redisClient
*c
);
492 static void zremCommand(redisClient
*c
);
493 static void zscoreCommand(redisClient
*c
);
494 static void zremrangebyscoreCommand(redisClient
*c
);
496 /*================================= Globals ================================= */
499 static struct redisServer server
; /* server global state */
500 static struct redisCommand cmdTable
[] = {
501 {"get",getCommand
,2,REDIS_CMD_INLINE
},
502 {"set",setCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
503 {"setnx",setnxCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
504 {"del",delCommand
,-2,REDIS_CMD_INLINE
},
505 {"exists",existsCommand
,2,REDIS_CMD_INLINE
},
506 {"incr",incrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
507 {"decr",decrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
508 {"mget",mgetCommand
,-2,REDIS_CMD_INLINE
},
509 {"rpush",rpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
510 {"lpush",lpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
511 {"rpop",rpopCommand
,2,REDIS_CMD_INLINE
},
512 {"lpop",lpopCommand
,2,REDIS_CMD_INLINE
},
513 {"llen",llenCommand
,2,REDIS_CMD_INLINE
},
514 {"lindex",lindexCommand
,3,REDIS_CMD_INLINE
},
515 {"lset",lsetCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
516 {"lrange",lrangeCommand
,4,REDIS_CMD_INLINE
},
517 {"ltrim",ltrimCommand
,4,REDIS_CMD_INLINE
},
518 {"lrem",lremCommand
,4,REDIS_CMD_BULK
},
519 {"rpoplpush",rpoplpushcommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
520 {"sadd",saddCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
521 {"srem",sremCommand
,3,REDIS_CMD_BULK
},
522 {"smove",smoveCommand
,4,REDIS_CMD_BULK
},
523 {"sismember",sismemberCommand
,3,REDIS_CMD_BULK
},
524 {"scard",scardCommand
,2,REDIS_CMD_INLINE
},
525 {"spop",spopCommand
,2,REDIS_CMD_INLINE
},
526 {"srandmember",srandmemberCommand
,2,REDIS_CMD_INLINE
},
527 {"sinter",sinterCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
528 {"sinterstore",sinterstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
529 {"sunion",sunionCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
530 {"sunionstore",sunionstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
531 {"sdiff",sdiffCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
532 {"sdiffstore",sdiffstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
533 {"smembers",sinterCommand
,2,REDIS_CMD_INLINE
},
534 {"zadd",zaddCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
535 {"zincrby",zincrbyCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
536 {"zrem",zremCommand
,3,REDIS_CMD_BULK
},
537 {"zremrangebyscore",zremrangebyscoreCommand
,4,REDIS_CMD_INLINE
},
538 {"zrange",zrangeCommand
,4,REDIS_CMD_INLINE
},
539 {"zrangebyscore",zrangebyscoreCommand
,-4,REDIS_CMD_INLINE
},
540 {"zrevrange",zrevrangeCommand
,4,REDIS_CMD_INLINE
},
541 {"zcard",zcardCommand
,2,REDIS_CMD_INLINE
},
542 {"zscore",zscoreCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
543 {"incrby",incrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
544 {"decrby",decrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
545 {"getset",getsetCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
546 {"mset",msetCommand
,-3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
547 {"msetnx",msetnxCommand
,-3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
548 {"randomkey",randomkeyCommand
,1,REDIS_CMD_INLINE
},
549 {"select",selectCommand
,2,REDIS_CMD_INLINE
},
550 {"move",moveCommand
,3,REDIS_CMD_INLINE
},
551 {"rename",renameCommand
,3,REDIS_CMD_INLINE
},
552 {"renamenx",renamenxCommand
,3,REDIS_CMD_INLINE
},
553 {"expire",expireCommand
,3,REDIS_CMD_INLINE
},
554 {"expireat",expireatCommand
,3,REDIS_CMD_INLINE
},
555 {"keys",keysCommand
,2,REDIS_CMD_INLINE
},
556 {"dbsize",dbsizeCommand
,1,REDIS_CMD_INLINE
},
557 {"auth",authCommand
,2,REDIS_CMD_INLINE
},
558 {"ping",pingCommand
,1,REDIS_CMD_INLINE
},
559 {"echo",echoCommand
,2,REDIS_CMD_BULK
},
560 {"save",saveCommand
,1,REDIS_CMD_INLINE
},
561 {"bgsave",bgsaveCommand
,1,REDIS_CMD_INLINE
},
562 {"bgrewriteaof",bgrewriteaofCommand
,1,REDIS_CMD_INLINE
},
563 {"shutdown",shutdownCommand
,1,REDIS_CMD_INLINE
},
564 {"lastsave",lastsaveCommand
,1,REDIS_CMD_INLINE
},
565 {"type",typeCommand
,2,REDIS_CMD_INLINE
},
566 {"sync",syncCommand
,1,REDIS_CMD_INLINE
},
567 {"flushdb",flushdbCommand
,1,REDIS_CMD_INLINE
},
568 {"flushall",flushallCommand
,1,REDIS_CMD_INLINE
},
569 {"sort",sortCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
570 {"info",infoCommand
,1,REDIS_CMD_INLINE
},
571 {"monitor",monitorCommand
,1,REDIS_CMD_INLINE
},
572 {"ttl",ttlCommand
,2,REDIS_CMD_INLINE
},
573 {"slaveof",slaveofCommand
,3,REDIS_CMD_INLINE
},
574 {"debug",debugCommand
,-2,REDIS_CMD_INLINE
},
578 /*============================ Utility functions ============================ */
580 /* Glob-style pattern matching. */
581 int stringmatchlen(const char *pattern
, int patternLen
,
582 const char *string
, int stringLen
, int nocase
)
587 while (pattern
[1] == '*') {
592 return 1; /* match */
594 if (stringmatchlen(pattern
+1, patternLen
-1,
595 string
, stringLen
, nocase
))
596 return 1; /* match */
600 return 0; /* no match */
604 return 0; /* no match */
614 not = pattern
[0] == '^';
621 if (pattern
[0] == '\\') {
624 if (pattern
[0] == string
[0])
626 } else if (pattern
[0] == ']') {
628 } else if (patternLen
== 0) {
632 } else if (pattern
[1] == '-' && patternLen
>= 3) {
633 int start
= pattern
[0];
634 int end
= pattern
[2];
642 start
= tolower(start
);
648 if (c
>= start
&& c
<= end
)
652 if (pattern
[0] == string
[0])
655 if (tolower((int)pattern
[0]) == tolower((int)string
[0]))
665 return 0; /* no match */
671 if (patternLen
>= 2) {
678 if (pattern
[0] != string
[0])
679 return 0; /* no match */
681 if (tolower((int)pattern
[0]) != tolower((int)string
[0]))
682 return 0; /* no match */
690 if (stringLen
== 0) {
691 while(*pattern
== '*') {
698 if (patternLen
== 0 && stringLen
== 0)
703 static void redisLog(int level
, const char *fmt
, ...) {
707 fp
= (server
.logfile
== NULL
) ? stdout
: fopen(server
.logfile
,"a");
711 if (level
>= server
.verbosity
) {
717 strftime(buf
,64,"%d %b %H:%M:%S",localtime(&now
));
718 fprintf(fp
,"%s %c ",buf
,c
[level
]);
719 vfprintf(fp
, fmt
, ap
);
725 if (server
.logfile
) fclose(fp
);
728 /*====================== Hash table type implementation ==================== */
730 /* This is an hash table type that uses the SDS dynamic strings libary as
731 * keys and radis objects as values (objects can hold SDS strings,
734 static void dictVanillaFree(void *privdata
, void *val
)
736 DICT_NOTUSED(privdata
);
740 static int sdsDictKeyCompare(void *privdata
, const void *key1
,
744 DICT_NOTUSED(privdata
);
746 l1
= sdslen((sds
)key1
);
747 l2
= sdslen((sds
)key2
);
748 if (l1
!= l2
) return 0;
749 return memcmp(key1
, key2
, l1
) == 0;
752 static void dictRedisObjectDestructor(void *privdata
, void *val
)
754 DICT_NOTUSED(privdata
);
759 static int dictObjKeyCompare(void *privdata
, const void *key1
,
762 const robj
*o1
= key1
, *o2
= key2
;
763 return sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
766 static unsigned int dictObjHash(const void *key
) {
768 return dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
771 static int dictEncObjKeyCompare(void *privdata
, const void *key1
,
774 robj
*o1
= (robj
*) key1
, *o2
= (robj
*) key2
;
777 o1
= getDecodedObject(o1
);
778 o2
= getDecodedObject(o2
);
779 cmp
= sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
785 static unsigned int dictEncObjHash(const void *key
) {
786 robj
*o
= (robj
*) key
;
788 o
= getDecodedObject(o
);
789 unsigned int hash
= dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
794 static dictType setDictType
= {
795 dictEncObjHash
, /* hash function */
798 dictEncObjKeyCompare
, /* key compare */
799 dictRedisObjectDestructor
, /* key destructor */
800 NULL
/* val destructor */
803 static dictType zsetDictType
= {
804 dictEncObjHash
, /* hash function */
807 dictEncObjKeyCompare
, /* key compare */
808 dictRedisObjectDestructor
, /* key destructor */
809 dictVanillaFree
/* val destructor of malloc(sizeof(double)) */
812 static dictType hashDictType
= {
813 dictObjHash
, /* hash function */
816 dictObjKeyCompare
, /* key compare */
817 dictRedisObjectDestructor
, /* key destructor */
818 dictRedisObjectDestructor
/* val destructor */
821 /* ========================= Random utility functions ======================= */
823 /* Redis generally does not try to recover from out of memory conditions
824 * when allocating objects or strings, it is not clear if it will be possible
825 * to report this condition to the client since the networking layer itself
826 * is based on heap allocation for send buffers, so we simply abort.
827 * At least the code will be simpler to read... */
828 static void oom(const char *msg
) {
829 redisLog(REDIS_WARNING
, "%s: Out of memory\n",msg
);
834 /* ====================== Redis server networking stuff ===================== */
835 static void closeTimedoutClients(void) {
838 time_t now
= time(NULL
);
840 listRewind(server
.clients
);
841 while ((ln
= listYield(server
.clients
)) != NULL
) {
842 c
= listNodeValue(ln
);
843 if (!(c
->flags
& REDIS_SLAVE
) && /* no timeout for slaves */
844 !(c
->flags
& REDIS_MASTER
) && /* no timeout for masters */
845 (now
- c
->lastinteraction
> server
.maxidletime
)) {
846 redisLog(REDIS_DEBUG
,"Closing idle client");
852 static int htNeedsResize(dict
*dict
) {
853 long long size
, used
;
855 size
= dictSlots(dict
);
856 used
= dictSize(dict
);
857 return (size
&& used
&& size
> DICT_HT_INITIAL_SIZE
&&
858 (used
*100/size
< REDIS_HT_MINFILL
));
861 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
862 * we resize the hash table to save memory */
863 static void tryResizeHashTables(void) {
866 for (j
= 0; j
< server
.dbnum
; j
++) {
867 if (htNeedsResize(server
.db
[j
].dict
)) {
868 redisLog(REDIS_DEBUG
,"The hash table %d is too sparse, resize it...",j
);
869 dictResize(server
.db
[j
].dict
);
870 redisLog(REDIS_DEBUG
,"Hash table %d resized.",j
);
872 if (htNeedsResize(server
.db
[j
].expires
))
873 dictResize(server
.db
[j
].expires
);
877 /* A background saving child (BGSAVE) terminated its work. Handle this. */
878 void backgroundSaveDoneHandler(int statloc
) {
879 int exitcode
= WEXITSTATUS(statloc
);
880 int bysignal
= WIFSIGNALED(statloc
);
882 if (!bysignal
&& exitcode
== 0) {
883 redisLog(REDIS_NOTICE
,
884 "Background saving terminated with success");
886 server
.lastsave
= time(NULL
);
887 } else if (!bysignal
&& exitcode
!= 0) {
888 redisLog(REDIS_WARNING
, "Background saving error");
890 redisLog(REDIS_WARNING
,
891 "Background saving terminated by signal");
892 rdbRemoveTempFile(server
.bgsavechildpid
);
894 server
.bgsavechildpid
= -1;
895 /* Possibly there are slaves waiting for a BGSAVE in order to be served
896 * (the first stage of SYNC is a bulk transfer of dump.rdb) */
897 updateSlavesWaitingBgsave(exitcode
== 0 ? REDIS_OK
: REDIS_ERR
);
900 /* A background append only file rewriting (BGREWRITEAOF) terminated its work.
902 void backgroundRewriteDoneHandler(int statloc
) {
903 int exitcode
= WEXITSTATUS(statloc
);
904 int bysignal
= WIFSIGNALED(statloc
);
906 if (!bysignal
&& exitcode
== 0) {
910 redisLog(REDIS_NOTICE
,
911 "Background append only file rewriting terminated with success");
912 /* Now it's time to flush the differences accumulated by the parent */
913 snprintf(tmpfile
,256,"temp-rewriteaof-bg-%d.aof", (int) server
.bgrewritechildpid
);
914 fd
= open(tmpfile
,O_WRONLY
|O_APPEND
);
916 redisLog(REDIS_WARNING
, "Not able to open the temp append only file produced by the child: %s", strerror(errno
));
919 /* Flush our data... */
920 if (write(fd
,server
.bgrewritebuf
,sdslen(server
.bgrewritebuf
)) !=
921 (signed) sdslen(server
.bgrewritebuf
)) {
922 redisLog(REDIS_WARNING
, "Error or short write trying to flush the parent diff of the append log file in the child temp file: %s", strerror(errno
));
926 redisLog(REDIS_NOTICE
,"Parent diff flushed into the new append log file with success (%lu bytes)",sdslen(server
.bgrewritebuf
));
927 /* Now our work is to rename the temp file into the stable file. And
928 * switch the file descriptor used by the server for append only. */
929 if (rename(tmpfile
,server
.appendfilename
) == -1) {
930 redisLog(REDIS_WARNING
,"Can't rename the temp append only file into the stable one: %s", strerror(errno
));
934 /* Mission completed... almost */
935 redisLog(REDIS_NOTICE
,"Append only file successfully rewritten.");
936 if (server
.appendfd
!= -1) {
937 /* If append only is actually enabled... */
938 close(server
.appendfd
);
939 server
.appendfd
= fd
;
941 server
.appendseldb
= -1; /* Make sure it will issue SELECT */
942 redisLog(REDIS_NOTICE
,"The new append only file was selected for future appends.");
944 /* If append only is disabled we just generate a dump in this
945 * format. Why not? */
948 } else if (!bysignal
&& exitcode
!= 0) {
949 redisLog(REDIS_WARNING
, "Background append only file rewriting error");
951 redisLog(REDIS_WARNING
,
952 "Background append only file rewriting terminated by signal");
955 sdsfree(server
.bgrewritebuf
);
956 server
.bgrewritebuf
= sdsempty();
957 aofRemoveTempFile(server
.bgrewritechildpid
);
958 server
.bgrewritechildpid
= -1;
961 static int serverCron(struct aeEventLoop
*eventLoop
, long long id
, void *clientData
) {
962 int j
, loops
= server
.cronloops
++;
963 REDIS_NOTUSED(eventLoop
);
965 REDIS_NOTUSED(clientData
);
967 /* Update the global state with the amount of used memory */
968 server
.usedmemory
= zmalloc_used_memory();
970 /* Show some info about non-empty databases */
971 for (j
= 0; j
< server
.dbnum
; j
++) {
972 long long size
, used
, vkeys
;
974 size
= dictSlots(server
.db
[j
].dict
);
975 used
= dictSize(server
.db
[j
].dict
);
976 vkeys
= dictSize(server
.db
[j
].expires
);
977 if (!(loops
% 5) && (used
|| vkeys
)) {
978 redisLog(REDIS_DEBUG
,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j
,used
,vkeys
,size
);
979 /* dictPrintStats(server.dict); */
983 /* We don't want to resize the hash tables while a bacground saving
984 * is in progress: the saving child is created using fork() that is
985 * implemented with a copy-on-write semantic in most modern systems, so
986 * if we resize the HT while there is the saving child at work actually
987 * a lot of memory movements in the parent will cause a lot of pages
989 if (server
.bgsavechildpid
== -1) tryResizeHashTables();
991 /* Show information about connected clients */
993 redisLog(REDIS_DEBUG
,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
994 listLength(server
.clients
)-listLength(server
.slaves
),
995 listLength(server
.slaves
),
997 dictSize(server
.sharingpool
));
1000 /* Close connections of timedout clients */
1001 if (server
.maxidletime
&& !(loops
% 10))
1002 closeTimedoutClients();
1004 /* Check if a background saving or AOF rewrite in progress terminated */
1005 if (server
.bgsavechildpid
!= -1 || server
.bgrewritechildpid
!= -1) {
1009 if ((pid
= wait3(&statloc
,WNOHANG
,NULL
)) != 0) {
1010 if (pid
== server
.bgsavechildpid
) {
1011 backgroundSaveDoneHandler(statloc
);
1013 backgroundRewriteDoneHandler(statloc
);
1017 /* If there is not a background saving in progress check if
1018 * we have to save now */
1019 time_t now
= time(NULL
);
1020 for (j
= 0; j
< server
.saveparamslen
; j
++) {
1021 struct saveparam
*sp
= server
.saveparams
+j
;
1023 if (server
.dirty
>= sp
->changes
&&
1024 now
-server
.lastsave
> sp
->seconds
) {
1025 redisLog(REDIS_NOTICE
,"%d changes in %d seconds. Saving...",
1026 sp
->changes
, sp
->seconds
);
1027 rdbSaveBackground(server
.dbfilename
);
1033 /* Try to expire a few timed out keys. The algorithm used is adaptive and
1034 * will use few CPU cycles if there are few expiring keys, otherwise
1035 * it will get more aggressive to avoid that too much memory is used by
1036 * keys that can be removed from the keyspace. */
1037 for (j
= 0; j
< server
.dbnum
; j
++) {
1039 redisDb
*db
= server
.db
+j
;
1041 /* Continue to expire if at the end of the cycle more than 25%
1042 * of the keys were expired. */
1044 int num
= dictSize(db
->expires
);
1045 time_t now
= time(NULL
);
1048 if (num
> REDIS_EXPIRELOOKUPS_PER_CRON
)
1049 num
= REDIS_EXPIRELOOKUPS_PER_CRON
;
1054 if ((de
= dictGetRandomKey(db
->expires
)) == NULL
) break;
1055 t
= (time_t) dictGetEntryVal(de
);
1057 deleteKey(db
,dictGetEntryKey(de
));
1061 } while (expired
> REDIS_EXPIRELOOKUPS_PER_CRON
/4);
1064 /* Check if we should connect to a MASTER */
1065 if (server
.replstate
== REDIS_REPL_CONNECT
) {
1066 redisLog(REDIS_NOTICE
,"Connecting to MASTER...");
1067 if (syncWithMaster() == REDIS_OK
) {
1068 redisLog(REDIS_NOTICE
,"MASTER <-> SLAVE sync succeeded");
1074 static void createSharedObjects(void) {
1075 shared
.crlf
= createObject(REDIS_STRING
,sdsnew("\r\n"));
1076 shared
.ok
= createObject(REDIS_STRING
,sdsnew("+OK\r\n"));
1077 shared
.err
= createObject(REDIS_STRING
,sdsnew("-ERR\r\n"));
1078 shared
.emptybulk
= createObject(REDIS_STRING
,sdsnew("$0\r\n\r\n"));
1079 shared
.czero
= createObject(REDIS_STRING
,sdsnew(":0\r\n"));
1080 shared
.cone
= createObject(REDIS_STRING
,sdsnew(":1\r\n"));
1081 shared
.nullbulk
= createObject(REDIS_STRING
,sdsnew("$-1\r\n"));
1082 shared
.nullmultibulk
= createObject(REDIS_STRING
,sdsnew("*-1\r\n"));
1083 shared
.emptymultibulk
= createObject(REDIS_STRING
,sdsnew("*0\r\n"));
1085 shared
.pong
= createObject(REDIS_STRING
,sdsnew("+PONG\r\n"));
1086 shared
.wrongtypeerr
= createObject(REDIS_STRING
,sdsnew(
1087 "-ERR Operation against a key holding the wrong kind of value\r\n"));
1088 shared
.nokeyerr
= createObject(REDIS_STRING
,sdsnew(
1089 "-ERR no such key\r\n"));
1090 shared
.syntaxerr
= createObject(REDIS_STRING
,sdsnew(
1091 "-ERR syntax error\r\n"));
1092 shared
.sameobjecterr
= createObject(REDIS_STRING
,sdsnew(
1093 "-ERR source and destination objects are the same\r\n"));
1094 shared
.outofrangeerr
= createObject(REDIS_STRING
,sdsnew(
1095 "-ERR index out of range\r\n"));
1096 shared
.space
= createObject(REDIS_STRING
,sdsnew(" "));
1097 shared
.colon
= createObject(REDIS_STRING
,sdsnew(":"));
1098 shared
.plus
= createObject(REDIS_STRING
,sdsnew("+"));
1099 shared
.select0
= createStringObject("select 0\r\n",10);
1100 shared
.select1
= createStringObject("select 1\r\n",10);
1101 shared
.select2
= createStringObject("select 2\r\n",10);
1102 shared
.select3
= createStringObject("select 3\r\n",10);
1103 shared
.select4
= createStringObject("select 4\r\n",10);
1104 shared
.select5
= createStringObject("select 5\r\n",10);
1105 shared
.select6
= createStringObject("select 6\r\n",10);
1106 shared
.select7
= createStringObject("select 7\r\n",10);
1107 shared
.select8
= createStringObject("select 8\r\n",10);
1108 shared
.select9
= createStringObject("select 9\r\n",10);
1111 static void appendServerSaveParams(time_t seconds
, int changes
) {
1112 server
.saveparams
= zrealloc(server
.saveparams
,sizeof(struct saveparam
)*(server
.saveparamslen
+1));
1113 server
.saveparams
[server
.saveparamslen
].seconds
= seconds
;
1114 server
.saveparams
[server
.saveparamslen
].changes
= changes
;
1115 server
.saveparamslen
++;
1118 static void resetServerSaveParams() {
1119 zfree(server
.saveparams
);
1120 server
.saveparams
= NULL
;
1121 server
.saveparamslen
= 0;
1124 static void initServerConfig() {
1125 server
.dbnum
= REDIS_DEFAULT_DBNUM
;
1126 server
.port
= REDIS_SERVERPORT
;
1127 server
.verbosity
= REDIS_DEBUG
;
1128 server
.maxidletime
= REDIS_MAXIDLETIME
;
1129 server
.saveparams
= NULL
;
1130 server
.logfile
= NULL
; /* NULL = log on standard output */
1131 server
.bindaddr
= NULL
;
1132 server
.glueoutputbuf
= 1;
1133 server
.daemonize
= 0;
1134 server
.appendonly
= 0;
1135 server
.appendfsync
= APPENDFSYNC_ALWAYS
;
1136 server
.lastfsync
= time(NULL
);
1137 server
.appendfd
= -1;
1138 server
.appendseldb
= -1; /* Make sure the first time will not match */
1139 server
.pidfile
= "/var/run/redis.pid";
1140 server
.dbfilename
= "dump.rdb";
1141 server
.appendfilename
= "appendonly.aof";
1142 server
.requirepass
= NULL
;
1143 server
.shareobjects
= 0;
1144 server
.sharingpoolsize
= 1024;
1145 server
.maxclients
= 0;
1146 server
.maxmemory
= 0;
1147 resetServerSaveParams();
1149 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1150 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1151 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1152 /* Replication related */
1154 server
.masterauth
= NULL
;
1155 server
.masterhost
= NULL
;
1156 server
.masterport
= 6379;
1157 server
.master
= NULL
;
1158 server
.replstate
= REDIS_REPL_NONE
;
1160 /* Double constants initialization */
1162 R_PosInf
= 1.0/R_Zero
;
1163 R_NegInf
= -1.0/R_Zero
;
1164 R_Nan
= R_Zero
/R_Zero
;
1167 static void initServer() {
1170 signal(SIGHUP
, SIG_IGN
);
1171 signal(SIGPIPE
, SIG_IGN
);
1172 setupSigSegvAction();
1174 server
.clients
= listCreate();
1175 server
.slaves
= listCreate();
1176 server
.monitors
= listCreate();
1177 server
.objfreelist
= listCreate();
1178 createSharedObjects();
1179 server
.el
= aeCreateEventLoop();
1180 server
.db
= zmalloc(sizeof(redisDb
)*server
.dbnum
);
1181 server
.sharingpool
= dictCreate(&setDictType
,NULL
);
1182 server
.fd
= anetTcpServer(server
.neterr
, server
.port
, server
.bindaddr
);
1183 if (server
.fd
== -1) {
1184 redisLog(REDIS_WARNING
, "Opening TCP port: %s", server
.neterr
);
1187 for (j
= 0; j
< server
.dbnum
; j
++) {
1188 server
.db
[j
].dict
= dictCreate(&hashDictType
,NULL
);
1189 server
.db
[j
].expires
= dictCreate(&setDictType
,NULL
);
1190 server
.db
[j
].id
= j
;
1192 server
.cronloops
= 0;
1193 server
.bgsavechildpid
= -1;
1194 server
.bgrewritechildpid
= -1;
1195 server
.bgrewritebuf
= sdsempty();
1196 server
.lastsave
= time(NULL
);
1198 server
.usedmemory
= 0;
1199 server
.stat_numcommands
= 0;
1200 server
.stat_numconnections
= 0;
1201 server
.stat_starttime
= time(NULL
);
1202 aeCreateTimeEvent(server
.el
, 1, serverCron
, NULL
, NULL
);
1204 if (server
.appendonly
) {
1205 server
.appendfd
= open(server
.appendfilename
,O_WRONLY
|O_APPEND
|O_CREAT
,0644);
1206 if (server
.appendfd
== -1) {
1207 redisLog(REDIS_WARNING
, "Can't open the append-only file: %s",
1214 /* Empty the whole database */
1215 static long long emptyDb() {
1217 long long removed
= 0;
1219 for (j
= 0; j
< server
.dbnum
; j
++) {
1220 removed
+= dictSize(server
.db
[j
].dict
);
1221 dictEmpty(server
.db
[j
].dict
);
1222 dictEmpty(server
.db
[j
].expires
);
1227 static int yesnotoi(char *s
) {
1228 if (!strcasecmp(s
,"yes")) return 1;
1229 else if (!strcasecmp(s
,"no")) return 0;
1233 /* I agree, this is a very rudimental way to load a configuration...
1234 will improve later if the config gets more complex */
1235 static void loadServerConfig(char *filename
) {
1237 char buf
[REDIS_CONFIGLINE_MAX
+1], *err
= NULL
;
1241 if (filename
[0] == '-' && filename
[1] == '\0')
1244 if ((fp
= fopen(filename
,"r")) == NULL
) {
1245 redisLog(REDIS_WARNING
,"Fatal error, can't open config file");
1250 while(fgets(buf
,REDIS_CONFIGLINE_MAX
+1,fp
) != NULL
) {
1256 line
= sdstrim(line
," \t\r\n");
1258 /* Skip comments and blank lines*/
1259 if (line
[0] == '#' || line
[0] == '\0') {
1264 /* Split into arguments */
1265 argv
= sdssplitlen(line
,sdslen(line
)," ",1,&argc
);
1266 sdstolower(argv
[0]);
1268 /* Execute config directives */
1269 if (!strcasecmp(argv
[0],"timeout") && argc
== 2) {
1270 server
.maxidletime
= atoi(argv
[1]);
1271 if (server
.maxidletime
< 0) {
1272 err
= "Invalid timeout value"; goto loaderr
;
1274 } else if (!strcasecmp(argv
[0],"port") && argc
== 2) {
1275 server
.port
= atoi(argv
[1]);
1276 if (server
.port
< 1 || server
.port
> 65535) {
1277 err
= "Invalid port"; goto loaderr
;
1279 } else if (!strcasecmp(argv
[0],"bind") && argc
== 2) {
1280 server
.bindaddr
= zstrdup(argv
[1]);
1281 } else if (!strcasecmp(argv
[0],"save") && argc
== 3) {
1282 int seconds
= atoi(argv
[1]);
1283 int changes
= atoi(argv
[2]);
1284 if (seconds
< 1 || changes
< 0) {
1285 err
= "Invalid save parameters"; goto loaderr
;
1287 appendServerSaveParams(seconds
,changes
);
1288 } else if (!strcasecmp(argv
[0],"dir") && argc
== 2) {
1289 if (chdir(argv
[1]) == -1) {
1290 redisLog(REDIS_WARNING
,"Can't chdir to '%s': %s",
1291 argv
[1], strerror(errno
));
1294 } else if (!strcasecmp(argv
[0],"loglevel") && argc
== 2) {
1295 if (!strcasecmp(argv
[1],"debug")) server
.verbosity
= REDIS_DEBUG
;
1296 else if (!strcasecmp(argv
[1],"notice")) server
.verbosity
= REDIS_NOTICE
;
1297 else if (!strcasecmp(argv
[1],"warning")) server
.verbosity
= REDIS_WARNING
;
1299 err
= "Invalid log level. Must be one of debug, notice, warning";
1302 } else if (!strcasecmp(argv
[0],"logfile") && argc
== 2) {
1305 server
.logfile
= zstrdup(argv
[1]);
1306 if (!strcasecmp(server
.logfile
,"stdout")) {
1307 zfree(server
.logfile
);
1308 server
.logfile
= NULL
;
1310 if (server
.logfile
) {
1311 /* Test if we are able to open the file. The server will not
1312 * be able to abort just for this problem later... */
1313 logfp
= fopen(server
.logfile
,"a");
1314 if (logfp
== NULL
) {
1315 err
= sdscatprintf(sdsempty(),
1316 "Can't open the log file: %s", strerror(errno
));
1321 } else if (!strcasecmp(argv
[0],"databases") && argc
== 2) {
1322 server
.dbnum
= atoi(argv
[1]);
1323 if (server
.dbnum
< 1) {
1324 err
= "Invalid number of databases"; goto loaderr
;
1326 } else if (!strcasecmp(argv
[0],"maxclients") && argc
== 2) {
1327 server
.maxclients
= atoi(argv
[1]);
1328 } else if (!strcasecmp(argv
[0],"maxmemory") && argc
== 2) {
1329 server
.maxmemory
= strtoll(argv
[1], NULL
, 10);
1330 } else if (!strcasecmp(argv
[0],"slaveof") && argc
== 3) {
1331 server
.masterhost
= sdsnew(argv
[1]);
1332 server
.masterport
= atoi(argv
[2]);
1333 server
.replstate
= REDIS_REPL_CONNECT
;
1334 } else if (!strcasecmp(argv
[0],"masterauth") && argc
== 2) {
1335 server
.masterauth
= zstrdup(argv
[1]);
1336 } else if (!strcasecmp(argv
[0],"glueoutputbuf") && argc
== 2) {
1337 if ((server
.glueoutputbuf
= yesnotoi(argv
[1])) == -1) {
1338 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1340 } else if (!strcasecmp(argv
[0],"shareobjects") && argc
== 2) {
1341 if ((server
.shareobjects
= yesnotoi(argv
[1])) == -1) {
1342 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1344 } else if (!strcasecmp(argv
[0],"shareobjectspoolsize") && argc
== 2) {
1345 server
.sharingpoolsize
= atoi(argv
[1]);
1346 if (server
.sharingpoolsize
< 1) {
1347 err
= "invalid object sharing pool size"; goto loaderr
;
1349 } else if (!strcasecmp(argv
[0],"daemonize") && argc
== 2) {
1350 if ((server
.daemonize
= yesnotoi(argv
[1])) == -1) {
1351 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1353 } else if (!strcasecmp(argv
[0],"appendonly") && argc
== 2) {
1354 if ((server
.appendonly
= yesnotoi(argv
[1])) == -1) {
1355 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1357 } else if (!strcasecmp(argv
[0],"appendfsync") && argc
== 2) {
1358 if (!strcasecmp(argv
[1],"no")) {
1359 server
.appendfsync
= APPENDFSYNC_NO
;
1360 } else if (!strcasecmp(argv
[1],"always")) {
1361 server
.appendfsync
= APPENDFSYNC_ALWAYS
;
1362 } else if (!strcasecmp(argv
[1],"everysec")) {
1363 server
.appendfsync
= APPENDFSYNC_EVERYSEC
;
1365 err
= "argument must be 'no', 'always' or 'everysec'";
1368 } else if (!strcasecmp(argv
[0],"requirepass") && argc
== 2) {
1369 server
.requirepass
= zstrdup(argv
[1]);
1370 } else if (!strcasecmp(argv
[0],"pidfile") && argc
== 2) {
1371 server
.pidfile
= zstrdup(argv
[1]);
1372 } else if (!strcasecmp(argv
[0],"dbfilename") && argc
== 2) {
1373 server
.dbfilename
= zstrdup(argv
[1]);
1375 err
= "Bad directive or wrong number of arguments"; goto loaderr
;
1377 for (j
= 0; j
< argc
; j
++)
1382 if (fp
!= stdin
) fclose(fp
);
1386 fprintf(stderr
, "\n*** FATAL CONFIG FILE ERROR ***\n");
1387 fprintf(stderr
, "Reading the configuration file, at line %d\n", linenum
);
1388 fprintf(stderr
, ">>> '%s'\n", line
);
1389 fprintf(stderr
, "%s\n", err
);
1393 static void freeClientArgv(redisClient
*c
) {
1396 for (j
= 0; j
< c
->argc
; j
++)
1397 decrRefCount(c
->argv
[j
]);
1398 for (j
= 0; j
< c
->mbargc
; j
++)
1399 decrRefCount(c
->mbargv
[j
]);
1404 static void freeClient(redisClient
*c
) {
1407 aeDeleteFileEvent(server
.el
,c
->fd
,AE_READABLE
);
1408 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1409 sdsfree(c
->querybuf
);
1410 listRelease(c
->reply
);
1413 ln
= listSearchKey(server
.clients
,c
);
1414 redisAssert(ln
!= NULL
);
1415 listDelNode(server
.clients
,ln
);
1416 if (c
->flags
& REDIS_SLAVE
) {
1417 if (c
->replstate
== REDIS_REPL_SEND_BULK
&& c
->repldbfd
!= -1)
1419 list
*l
= (c
->flags
& REDIS_MONITOR
) ? server
.monitors
: server
.slaves
;
1420 ln
= listSearchKey(l
,c
);
1421 redisAssert(ln
!= NULL
);
1424 if (c
->flags
& REDIS_MASTER
) {
1425 server
.master
= NULL
;
1426 server
.replstate
= REDIS_REPL_CONNECT
;
1433 #define GLUEREPLY_UP_TO (1024)
1434 static void glueReplyBuffersIfNeeded(redisClient
*c
) {
1436 char buf
[GLUEREPLY_UP_TO
];
1440 listRewind(c
->reply
);
1441 while((ln
= listYield(c
->reply
))) {
1445 objlen
= sdslen(o
->ptr
);
1446 if (copylen
+ objlen
<= GLUEREPLY_UP_TO
) {
1447 memcpy(buf
+copylen
,o
->ptr
,objlen
);
1449 listDelNode(c
->reply
,ln
);
1451 if (copylen
== 0) return;
1455 /* Now the output buffer is empty, add the new single element */
1456 o
= createObject(REDIS_STRING
,sdsnewlen(buf
,copylen
));
1457 listAddNodeHead(c
->reply
,o
);
1460 static void sendReplyToClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1461 redisClient
*c
= privdata
;
1462 int nwritten
= 0, totwritten
= 0, objlen
;
1465 REDIS_NOTUSED(mask
);
1467 /* Use writev() if we have enough buffers to send */
1468 if (!server
.glueoutputbuf
&&
1469 listLength(c
->reply
) > REDIS_WRITEV_THRESHOLD
&&
1470 !(c
->flags
& REDIS_MASTER
))
1472 sendReplyToClientWritev(el
, fd
, privdata
, mask
);
1476 while(listLength(c
->reply
)) {
1477 if (server
.glueoutputbuf
&& listLength(c
->reply
) > 1)
1478 glueReplyBuffersIfNeeded(c
);
1480 o
= listNodeValue(listFirst(c
->reply
));
1481 objlen
= sdslen(o
->ptr
);
1484 listDelNode(c
->reply
,listFirst(c
->reply
));
1488 if (c
->flags
& REDIS_MASTER
) {
1489 /* Don't reply to a master */
1490 nwritten
= objlen
- c
->sentlen
;
1492 nwritten
= write(fd
, ((char*)o
->ptr
)+c
->sentlen
, objlen
- c
->sentlen
);
1493 if (nwritten
<= 0) break;
1495 c
->sentlen
+= nwritten
;
1496 totwritten
+= nwritten
;
1497 /* If we fully sent the object on head go to the next one */
1498 if (c
->sentlen
== objlen
) {
1499 listDelNode(c
->reply
,listFirst(c
->reply
));
1502 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1503 * bytes, in a single threaded server it's a good idea to serve
1504 * other clients as well, even if a very large request comes from
1505 * super fast link that is always able to accept data (in real world
1506 * scenario think about 'KEYS *' against the loopback interfae) */
1507 if (totwritten
> REDIS_MAX_WRITE_PER_EVENT
) break;
1509 if (nwritten
== -1) {
1510 if (errno
== EAGAIN
) {
1513 redisLog(REDIS_DEBUG
,
1514 "Error writing to client: %s", strerror(errno
));
1519 if (totwritten
> 0) c
->lastinteraction
= time(NULL
);
1520 if (listLength(c
->reply
) == 0) {
1522 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1526 static void sendReplyToClientWritev(aeEventLoop
*el
, int fd
, void *privdata
, int mask
)
1528 redisClient
*c
= privdata
;
1529 int nwritten
= 0, totwritten
= 0, objlen
, willwrite
;
1531 struct iovec iov
[REDIS_WRITEV_IOVEC_COUNT
];
1532 int offset
, ion
= 0;
1534 REDIS_NOTUSED(mask
);
1537 while (listLength(c
->reply
)) {
1538 offset
= c
->sentlen
;
1542 /* fill-in the iov[] array */
1543 for(node
= listFirst(c
->reply
); node
; node
= listNextNode(node
)) {
1544 o
= listNodeValue(node
);
1545 objlen
= sdslen(o
->ptr
);
1547 if (totwritten
+ objlen
- offset
> REDIS_MAX_WRITE_PER_EVENT
)
1550 if(ion
== REDIS_WRITEV_IOVEC_COUNT
)
1551 break; /* no more iovecs */
1553 iov
[ion
].iov_base
= ((char*)o
->ptr
) + offset
;
1554 iov
[ion
].iov_len
= objlen
- offset
;
1555 willwrite
+= objlen
- offset
;
1556 offset
= 0; /* just for the first item */
1563 /* write all collected blocks at once */
1564 if((nwritten
= writev(fd
, iov
, ion
)) < 0) {
1565 if (errno
!= EAGAIN
) {
1566 redisLog(REDIS_DEBUG
,
1567 "Error writing to client: %s", strerror(errno
));
1574 totwritten
+= nwritten
;
1575 offset
= c
->sentlen
;
1577 /* remove written robjs from c->reply */
1578 while (nwritten
&& listLength(c
->reply
)) {
1579 o
= listNodeValue(listFirst(c
->reply
));
1580 objlen
= sdslen(o
->ptr
);
1582 if(nwritten
>= objlen
- offset
) {
1583 listDelNode(c
->reply
, listFirst(c
->reply
));
1584 nwritten
-= objlen
- offset
;
1588 c
->sentlen
+= nwritten
;
1596 c
->lastinteraction
= time(NULL
);
1598 if (listLength(c
->reply
) == 0) {
1600 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1604 static struct redisCommand
*lookupCommand(char *name
) {
1606 while(cmdTable
[j
].name
!= NULL
) {
1607 if (!strcasecmp(name
,cmdTable
[j
].name
)) return &cmdTable
[j
];
1613 /* resetClient prepare the client to process the next command */
1614 static void resetClient(redisClient
*c
) {
1620 /* If this function gets called we already read a whole
1621 * command, argments are in the client argv/argc fields.
1622 * processCommand() execute the command or prepare the
1623 * server for a bulk read from the client.
1625 * If 1 is returned the client is still alive and valid and
1626 * and other operations can be performed by the caller. Otherwise
1627 * if 0 is returned the client was destroied (i.e. after QUIT). */
1628 static int processCommand(redisClient
*c
) {
1629 struct redisCommand
*cmd
;
1632 /* Free some memory if needed (maxmemory setting) */
1633 if (server
.maxmemory
) freeMemoryIfNeeded();
1635 /* Handle the multi bulk command type. This is an alternative protocol
1636 * supported by Redis in order to receive commands that are composed of
1637 * multiple binary-safe "bulk" arguments. The latency of processing is
1638 * a bit higher but this allows things like multi-sets, so if this
1639 * protocol is used only for MSET and similar commands this is a big win. */
1640 if (c
->multibulk
== 0 && c
->argc
== 1 && ((char*)(c
->argv
[0]->ptr
))[0] == '*') {
1641 c
->multibulk
= atoi(((char*)c
->argv
[0]->ptr
)+1);
1642 if (c
->multibulk
<= 0) {
1646 decrRefCount(c
->argv
[c
->argc
-1]);
1650 } else if (c
->multibulk
) {
1651 if (c
->bulklen
== -1) {
1652 if (((char*)c
->argv
[0]->ptr
)[0] != '$') {
1653 addReplySds(c
,sdsnew("-ERR multi bulk protocol error\r\n"));
1657 int bulklen
= atoi(((char*)c
->argv
[0]->ptr
)+1);
1658 decrRefCount(c
->argv
[0]);
1659 if (bulklen
< 0 || bulklen
> 1024*1024*1024) {
1661 addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n"));
1666 c
->bulklen
= bulklen
+2; /* add two bytes for CR+LF */
1670 c
->mbargv
= zrealloc(c
->mbargv
,(sizeof(robj
*))*(c
->mbargc
+1));
1671 c
->mbargv
[c
->mbargc
] = c
->argv
[0];
1675 if (c
->multibulk
== 0) {
1679 /* Here we need to swap the multi-bulk argc/argv with the
1680 * normal argc/argv of the client structure. */
1682 c
->argv
= c
->mbargv
;
1683 c
->mbargv
= auxargv
;
1686 c
->argc
= c
->mbargc
;
1687 c
->mbargc
= auxargc
;
1689 /* We need to set bulklen to something different than -1
1690 * in order for the code below to process the command without
1691 * to try to read the last argument of a bulk command as
1692 * a special argument. */
1694 /* continue below and process the command */
1701 /* -- end of multi bulk commands processing -- */
1703 /* The QUIT command is handled as a special case. Normal command
1704 * procs are unable to close the client connection safely */
1705 if (!strcasecmp(c
->argv
[0]->ptr
,"quit")) {
1709 cmd
= lookupCommand(c
->argv
[0]->ptr
);
1711 addReplySds(c
,sdsnew("-ERR unknown command\r\n"));
1714 } else if ((cmd
->arity
> 0 && cmd
->arity
!= c
->argc
) ||
1715 (c
->argc
< -cmd
->arity
)) {
1717 sdscatprintf(sdsempty(),
1718 "-ERR wrong number of arguments for '%s' command\r\n",
1722 } else if (server
.maxmemory
&& cmd
->flags
& REDIS_CMD_DENYOOM
&& zmalloc_used_memory() > server
.maxmemory
) {
1723 addReplySds(c
,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1726 } else if (cmd
->flags
& REDIS_CMD_BULK
&& c
->bulklen
== -1) {
1727 int bulklen
= atoi(c
->argv
[c
->argc
-1]->ptr
);
1729 decrRefCount(c
->argv
[c
->argc
-1]);
1730 if (bulklen
< 0 || bulklen
> 1024*1024*1024) {
1732 addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n"));
1737 c
->bulklen
= bulklen
+2; /* add two bytes for CR+LF */
1738 /* It is possible that the bulk read is already in the
1739 * buffer. Check this condition and handle it accordingly.
1740 * This is just a fast path, alternative to call processInputBuffer().
1741 * It's a good idea since the code is small and this condition
1742 * happens most of the times. */
1743 if ((signed)sdslen(c
->querybuf
) >= c
->bulklen
) {
1744 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
1746 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
1751 /* Let's try to share objects on the command arguments vector */
1752 if (server
.shareobjects
) {
1754 for(j
= 1; j
< c
->argc
; j
++)
1755 c
->argv
[j
] = tryObjectSharing(c
->argv
[j
]);
1757 /* Let's try to encode the bulk object to save space. */
1758 if (cmd
->flags
& REDIS_CMD_BULK
)
1759 tryObjectEncoding(c
->argv
[c
->argc
-1]);
1761 /* Check if the user is authenticated */
1762 if (server
.requirepass
&& !c
->authenticated
&& cmd
->proc
!= authCommand
) {
1763 addReplySds(c
,sdsnew("-ERR operation not permitted\r\n"));
1768 /* Exec the command */
1769 dirty
= server
.dirty
;
1771 if (server
.appendonly
&& server
.dirty
-dirty
)
1772 feedAppendOnlyFile(cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1773 if (server
.dirty
-dirty
&& listLength(server
.slaves
))
1774 replicationFeedSlaves(server
.slaves
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1775 if (listLength(server
.monitors
))
1776 replicationFeedSlaves(server
.monitors
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1777 server
.stat_numcommands
++;
1779 /* Prepare the client for the next command */
1780 if (c
->flags
& REDIS_CLOSE
) {
1788 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
) {
1792 /* (args*2)+1 is enough room for args, spaces, newlines */
1793 robj
*static_outv
[REDIS_STATIC_ARGS
*2+1];
1795 if (argc
<= REDIS_STATIC_ARGS
) {
1798 outv
= zmalloc(sizeof(robj
*)*(argc
*2+1));
1801 for (j
= 0; j
< argc
; j
++) {
1802 if (j
!= 0) outv
[outc
++] = shared
.space
;
1803 if ((cmd
->flags
& REDIS_CMD_BULK
) && j
== argc
-1) {
1806 lenobj
= createObject(REDIS_STRING
,
1807 sdscatprintf(sdsempty(),"%lu\r\n",
1808 (unsigned long) stringObjectLen(argv
[j
])));
1809 lenobj
->refcount
= 0;
1810 outv
[outc
++] = lenobj
;
1812 outv
[outc
++] = argv
[j
];
1814 outv
[outc
++] = shared
.crlf
;
1816 /* Increment all the refcounts at start and decrement at end in order to
1817 * be sure to free objects if there is no slave in a replication state
1818 * able to be feed with commands */
1819 for (j
= 0; j
< outc
; j
++) incrRefCount(outv
[j
]);
1821 while((ln
= listYield(slaves
))) {
1822 redisClient
*slave
= ln
->value
;
1824 /* Don't feed slaves that are still waiting for BGSAVE to start */
1825 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) continue;
1827 /* Feed all the other slaves, MONITORs and so on */
1828 if (slave
->slaveseldb
!= dictid
) {
1832 case 0: selectcmd
= shared
.select0
; break;
1833 case 1: selectcmd
= shared
.select1
; break;
1834 case 2: selectcmd
= shared
.select2
; break;
1835 case 3: selectcmd
= shared
.select3
; break;
1836 case 4: selectcmd
= shared
.select4
; break;
1837 case 5: selectcmd
= shared
.select5
; break;
1838 case 6: selectcmd
= shared
.select6
; break;
1839 case 7: selectcmd
= shared
.select7
; break;
1840 case 8: selectcmd
= shared
.select8
; break;
1841 case 9: selectcmd
= shared
.select9
; break;
1843 selectcmd
= createObject(REDIS_STRING
,
1844 sdscatprintf(sdsempty(),"select %d\r\n",dictid
));
1845 selectcmd
->refcount
= 0;
1848 addReply(slave
,selectcmd
);
1849 slave
->slaveseldb
= dictid
;
1851 for (j
= 0; j
< outc
; j
++) addReply(slave
,outv
[j
]);
1853 for (j
= 0; j
< outc
; j
++) decrRefCount(outv
[j
]);
1854 if (outv
!= static_outv
) zfree(outv
);
1857 static void processInputBuffer(redisClient
*c
) {
1859 if (c
->bulklen
== -1) {
1860 /* Read the first line of the query */
1861 char *p
= strchr(c
->querybuf
,'\n');
1868 query
= c
->querybuf
;
1869 c
->querybuf
= sdsempty();
1870 querylen
= 1+(p
-(query
));
1871 if (sdslen(query
) > querylen
) {
1872 /* leave data after the first line of the query in the buffer */
1873 c
->querybuf
= sdscatlen(c
->querybuf
,query
+querylen
,sdslen(query
)-querylen
);
1875 *p
= '\0'; /* remove "\n" */
1876 if (*(p
-1) == '\r') *(p
-1) = '\0'; /* and "\r" if any */
1877 sdsupdatelen(query
);
1879 /* Now we can split the query in arguments */
1880 argv
= sdssplitlen(query
,sdslen(query
)," ",1,&argc
);
1883 if (c
->argv
) zfree(c
->argv
);
1884 c
->argv
= zmalloc(sizeof(robj
*)*argc
);
1886 for (j
= 0; j
< argc
; j
++) {
1887 if (sdslen(argv
[j
])) {
1888 c
->argv
[c
->argc
] = createObject(REDIS_STRING
,argv
[j
]);
1896 /* Execute the command. If the client is still valid
1897 * after processCommand() return and there is something
1898 * on the query buffer try to process the next command. */
1899 if (processCommand(c
) && sdslen(c
->querybuf
)) goto again
;
1901 /* Nothing to process, argc == 0. Just process the query
1902 * buffer if it's not empty or return to the caller */
1903 if (sdslen(c
->querybuf
)) goto again
;
1906 } else if (sdslen(c
->querybuf
) >= REDIS_REQUEST_MAX_SIZE
) {
1907 redisLog(REDIS_DEBUG
, "Client protocol error");
1912 /* Bulk read handling. Note that if we are at this point
1913 the client already sent a command terminated with a newline,
1914 we are reading the bulk data that is actually the last
1915 argument of the command. */
1916 int qbl
= sdslen(c
->querybuf
);
1918 if (c
->bulklen
<= qbl
) {
1919 /* Copy everything but the final CRLF as final argument */
1920 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
1922 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
1923 /* Process the command. If the client is still valid after
1924 * the processing and there is more data in the buffer
1925 * try to parse it. */
1926 if (processCommand(c
) && sdslen(c
->querybuf
)) goto again
;
1932 static void readQueryFromClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1933 redisClient
*c
= (redisClient
*) privdata
;
1934 char buf
[REDIS_IOBUF_LEN
];
1937 REDIS_NOTUSED(mask
);
1939 nread
= read(fd
, buf
, REDIS_IOBUF_LEN
);
1941 if (errno
== EAGAIN
) {
1944 redisLog(REDIS_DEBUG
, "Reading from client: %s",strerror(errno
));
1948 } else if (nread
== 0) {
1949 redisLog(REDIS_DEBUG
, "Client closed connection");
1954 c
->querybuf
= sdscatlen(c
->querybuf
, buf
, nread
);
1955 c
->lastinteraction
= time(NULL
);
1959 processInputBuffer(c
);
1962 static int selectDb(redisClient
*c
, int id
) {
1963 if (id
< 0 || id
>= server
.dbnum
)
1965 c
->db
= &server
.db
[id
];
1969 static void *dupClientReplyValue(void *o
) {
1970 incrRefCount((robj
*)o
);
1974 static redisClient
*createClient(int fd
) {
1975 redisClient
*c
= zmalloc(sizeof(*c
));
1977 anetNonBlock(NULL
,fd
);
1978 anetTcpNoDelay(NULL
,fd
);
1979 if (!c
) return NULL
;
1982 c
->querybuf
= sdsempty();
1991 c
->lastinteraction
= time(NULL
);
1992 c
->authenticated
= 0;
1993 c
->replstate
= REDIS_REPL_NONE
;
1994 c
->reply
= listCreate();
1995 listSetFreeMethod(c
->reply
,decrRefCount
);
1996 listSetDupMethod(c
->reply
,dupClientReplyValue
);
1997 if (aeCreateFileEvent(server
.el
, c
->fd
, AE_READABLE
,
1998 readQueryFromClient
, c
) == AE_ERR
) {
2002 listAddNodeTail(server
.clients
,c
);
2006 static void addReply(redisClient
*c
, robj
*obj
) {
2007 if (listLength(c
->reply
) == 0 &&
2008 (c
->replstate
== REDIS_REPL_NONE
||
2009 c
->replstate
== REDIS_REPL_ONLINE
) &&
2010 aeCreateFileEvent(server
.el
, c
->fd
, AE_WRITABLE
,
2011 sendReplyToClient
, c
) == AE_ERR
) return;
2012 listAddNodeTail(c
->reply
,getDecodedObject(obj
));
2015 static void addReplySds(redisClient
*c
, sds s
) {
2016 robj
*o
= createObject(REDIS_STRING
,s
);
2021 static void addReplyDouble(redisClient
*c
, double d
) {
2024 snprintf(buf
,sizeof(buf
),"%.17g",d
);
2025 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n%s\r\n",
2026 (unsigned long) strlen(buf
),buf
));
2029 static void addReplyBulkLen(redisClient
*c
, robj
*obj
) {
2032 if (obj
->encoding
== REDIS_ENCODING_RAW
) {
2033 len
= sdslen(obj
->ptr
);
2035 long n
= (long)obj
->ptr
;
2037 /* Compute how many bytes will take this integer as a radix 10 string */
2043 while((n
= n
/10) != 0) {
2047 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n",(unsigned long)len
));
2050 static void acceptHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
2055 REDIS_NOTUSED(mask
);
2056 REDIS_NOTUSED(privdata
);
2058 cfd
= anetAccept(server
.neterr
, fd
, cip
, &cport
);
2059 if (cfd
== AE_ERR
) {
2060 redisLog(REDIS_DEBUG
,"Accepting client connection: %s", server
.neterr
);
2063 redisLog(REDIS_DEBUG
,"Accepted %s:%d", cip
, cport
);
2064 if ((c
= createClient(cfd
)) == NULL
) {
2065 redisLog(REDIS_WARNING
,"Error allocating resoures for the client");
2066 close(cfd
); /* May be already closed, just ingore errors */
2069 /* If maxclient directive is set and this is one client more... close the
2070 * connection. Note that we create the client instead to check before
2071 * for this condition, since now the socket is already set in nonblocking
2072 * mode and we can send an error for free using the Kernel I/O */
2073 if (server
.maxclients
&& listLength(server
.clients
) > server
.maxclients
) {
2074 char *err
= "-ERR max number of clients reached\r\n";
2076 /* That's a best effort error message, don't check write errors */
2077 if (write(c
->fd
,err
,strlen(err
)) == -1) {
2078 /* Nothing to do, Just to avoid the warning... */
2083 server
.stat_numconnections
++;
2086 /* ======================= Redis objects implementation ===================== */
2088 static robj
*createObject(int type
, void *ptr
) {
2091 if (listLength(server
.objfreelist
)) {
2092 listNode
*head
= listFirst(server
.objfreelist
);
2093 o
= listNodeValue(head
);
2094 listDelNode(server
.objfreelist
,head
);
2096 o
= zmalloc(sizeof(*o
));
2099 o
->encoding
= REDIS_ENCODING_RAW
;
2105 static robj
*createStringObject(char *ptr
, size_t len
) {
2106 return createObject(REDIS_STRING
,sdsnewlen(ptr
,len
));
2109 static robj
*createListObject(void) {
2110 list
*l
= listCreate();
2112 listSetFreeMethod(l
,decrRefCount
);
2113 return createObject(REDIS_LIST
,l
);
2116 static robj
*createSetObject(void) {
2117 dict
*d
= dictCreate(&setDictType
,NULL
);
2118 return createObject(REDIS_SET
,d
);
2121 static robj
*createZsetObject(void) {
2122 zset
*zs
= zmalloc(sizeof(*zs
));
2124 zs
->dict
= dictCreate(&zsetDictType
,NULL
);
2125 zs
->zsl
= zslCreate();
2126 return createObject(REDIS_ZSET
,zs
);
2129 static void freeStringObject(robj
*o
) {
2130 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2135 static void freeListObject(robj
*o
) {
2136 listRelease((list
*) o
->ptr
);
2139 static void freeSetObject(robj
*o
) {
2140 dictRelease((dict
*) o
->ptr
);
2143 static void freeZsetObject(robj
*o
) {
2146 dictRelease(zs
->dict
);
2151 static void freeHashObject(robj
*o
) {
2152 dictRelease((dict
*) o
->ptr
);
2155 static void incrRefCount(robj
*o
) {
2157 #ifdef DEBUG_REFCOUNT
2158 if (o
->type
== REDIS_STRING
)
2159 printf("Increment '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
);
2163 static void decrRefCount(void *obj
) {
2166 #ifdef DEBUG_REFCOUNT
2167 if (o
->type
== REDIS_STRING
)
2168 printf("Decrement '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
-1);
2170 if (--(o
->refcount
) == 0) {
2172 case REDIS_STRING
: freeStringObject(o
); break;
2173 case REDIS_LIST
: freeListObject(o
); break;
2174 case REDIS_SET
: freeSetObject(o
); break;
2175 case REDIS_ZSET
: freeZsetObject(o
); break;
2176 case REDIS_HASH
: freeHashObject(o
); break;
2177 default: redisAssert(0 != 0); break;
2179 if (listLength(server
.objfreelist
) > REDIS_OBJFREELIST_MAX
||
2180 !listAddNodeHead(server
.objfreelist
,o
))
2185 static robj
*lookupKey(redisDb
*db
, robj
*key
) {
2186 dictEntry
*de
= dictFind(db
->dict
,key
);
2187 return de
? dictGetEntryVal(de
) : NULL
;
2190 static robj
*lookupKeyRead(redisDb
*db
, robj
*key
) {
2191 expireIfNeeded(db
,key
);
2192 return lookupKey(db
,key
);
2195 static robj
*lookupKeyWrite(redisDb
*db
, robj
*key
) {
2196 deleteIfVolatile(db
,key
);
2197 return lookupKey(db
,key
);
2200 static int deleteKey(redisDb
*db
, robj
*key
) {
2203 /* We need to protect key from destruction: after the first dictDelete()
2204 * it may happen that 'key' is no longer valid if we don't increment
2205 * it's count. This may happen when we get the object reference directly
2206 * from the hash table with dictRandomKey() or dict iterators */
2208 if (dictSize(db
->expires
)) dictDelete(db
->expires
,key
);
2209 retval
= dictDelete(db
->dict
,key
);
2212 return retval
== DICT_OK
;
2215 /* Try to share an object against the shared objects pool */
2216 static robj
*tryObjectSharing(robj
*o
) {
2217 struct dictEntry
*de
;
2220 if (o
== NULL
|| server
.shareobjects
== 0) return o
;
2222 redisAssert(o
->type
== REDIS_STRING
);
2223 de
= dictFind(server
.sharingpool
,o
);
2225 robj
*shared
= dictGetEntryKey(de
);
2227 c
= ((unsigned long) dictGetEntryVal(de
))+1;
2228 dictGetEntryVal(de
) = (void*) c
;
2229 incrRefCount(shared
);
2233 /* Here we are using a stream algorihtm: Every time an object is
2234 * shared we increment its count, everytime there is a miss we
2235 * recrement the counter of a random object. If this object reaches
2236 * zero we remove the object and put the current object instead. */
2237 if (dictSize(server
.sharingpool
) >=
2238 server
.sharingpoolsize
) {
2239 de
= dictGetRandomKey(server
.sharingpool
);
2240 redisAssert(de
!= NULL
);
2241 c
= ((unsigned long) dictGetEntryVal(de
))-1;
2242 dictGetEntryVal(de
) = (void*) c
;
2244 dictDelete(server
.sharingpool
,de
->key
);
2247 c
= 0; /* If the pool is empty we want to add this object */
2252 retval
= dictAdd(server
.sharingpool
,o
,(void*)1);
2253 redisAssert(retval
== DICT_OK
);
2260 /* Check if the nul-terminated string 's' can be represented by a long
2261 * (that is, is a number that fits into long without any other space or
2262 * character before or after the digits).
2264 * If so, the function returns REDIS_OK and *longval is set to the value
2265 * of the number. Otherwise REDIS_ERR is returned */
2266 static int isStringRepresentableAsLong(sds s
, long *longval
) {
2267 char buf
[32], *endptr
;
2271 value
= strtol(s
, &endptr
, 10);
2272 if (endptr
[0] != '\0') return REDIS_ERR
;
2273 slen
= snprintf(buf
,32,"%ld",value
);
2275 /* If the number converted back into a string is not identical
2276 * then it's not possible to encode the string as integer */
2277 if (sdslen(s
) != (unsigned)slen
|| memcmp(buf
,s
,slen
)) return REDIS_ERR
;
2278 if (longval
) *longval
= value
;
2282 /* Try to encode a string object in order to save space */
2283 static int tryObjectEncoding(robj
*o
) {
2287 if (o
->encoding
!= REDIS_ENCODING_RAW
)
2288 return REDIS_ERR
; /* Already encoded */
2290 /* It's not save to encode shared objects: shared objects can be shared
2291 * everywhere in the "object space" of Redis. Encoded objects can only
2292 * appear as "values" (and not, for instance, as keys) */
2293 if (o
->refcount
> 1) return REDIS_ERR
;
2295 /* Currently we try to encode only strings */
2296 redisAssert(o
->type
== REDIS_STRING
);
2298 /* Check if we can represent this string as a long integer */
2299 if (isStringRepresentableAsLong(s
,&value
) == REDIS_ERR
) return REDIS_ERR
;
2301 /* Ok, this object can be encoded */
2302 o
->encoding
= REDIS_ENCODING_INT
;
2304 o
->ptr
= (void*) value
;
2308 /* Get a decoded version of an encoded object (returned as a new object).
2309 * If the object is already raw-encoded just increment the ref count. */
2310 static robj
*getDecodedObject(robj
*o
) {
2313 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2317 if (o
->type
== REDIS_STRING
&& o
->encoding
== REDIS_ENCODING_INT
) {
2320 snprintf(buf
,32,"%ld",(long)o
->ptr
);
2321 dec
= createStringObject(buf
,strlen(buf
));
2324 redisAssert(1 != 1);
2328 /* Compare two string objects via strcmp() or alike.
2329 * Note that the objects may be integer-encoded. In such a case we
2330 * use snprintf() to get a string representation of the numbers on the stack
2331 * and compare the strings, it's much faster than calling getDecodedObject().
2333 * Important note: if objects are not integer encoded, but binary-safe strings,
2334 * sdscmp() from sds.c will apply memcmp() so this function ca be considered
2336 static int compareStringObjects(robj
*a
, robj
*b
) {
2337 redisAssert(a
->type
== REDIS_STRING
&& b
->type
== REDIS_STRING
);
2338 char bufa
[128], bufb
[128], *astr
, *bstr
;
2341 if (a
== b
) return 0;
2342 if (a
->encoding
!= REDIS_ENCODING_RAW
) {
2343 snprintf(bufa
,sizeof(bufa
),"%ld",(long) a
->ptr
);
2349 if (b
->encoding
!= REDIS_ENCODING_RAW
) {
2350 snprintf(bufb
,sizeof(bufb
),"%ld",(long) b
->ptr
);
2356 return bothsds
? sdscmp(astr
,bstr
) : strcmp(astr
,bstr
);
2359 static size_t stringObjectLen(robj
*o
) {
2360 redisAssert(o
->type
== REDIS_STRING
);
2361 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2362 return sdslen(o
->ptr
);
2366 return snprintf(buf
,32,"%ld",(long)o
->ptr
);
2370 /*============================ DB saving/loading ============================ */
2372 static int rdbSaveType(FILE *fp
, unsigned char type
) {
2373 if (fwrite(&type
,1,1,fp
) == 0) return -1;
2377 static int rdbSaveTime(FILE *fp
, time_t t
) {
2378 int32_t t32
= (int32_t) t
;
2379 if (fwrite(&t32
,4,1,fp
) == 0) return -1;
2383 /* check rdbLoadLen() comments for more info */
2384 static int rdbSaveLen(FILE *fp
, uint32_t len
) {
2385 unsigned char buf
[2];
2388 /* Save a 6 bit len */
2389 buf
[0] = (len
&0xFF)|(REDIS_RDB_6BITLEN
<<6);
2390 if (fwrite(buf
,1,1,fp
) == 0) return -1;
2391 } else if (len
< (1<<14)) {
2392 /* Save a 14 bit len */
2393 buf
[0] = ((len
>>8)&0xFF)|(REDIS_RDB_14BITLEN
<<6);
2395 if (fwrite(buf
,2,1,fp
) == 0) return -1;
2397 /* Save a 32 bit len */
2398 buf
[0] = (REDIS_RDB_32BITLEN
<<6);
2399 if (fwrite(buf
,1,1,fp
) == 0) return -1;
2401 if (fwrite(&len
,4,1,fp
) == 0) return -1;
2406 /* String objects in the form "2391" "-100" without any space and with a
2407 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2408 * encoded as integers to save space */
2409 static int rdbTryIntegerEncoding(sds s
, unsigned char *enc
) {
2411 char *endptr
, buf
[32];
2413 /* Check if it's possible to encode this value as a number */
2414 value
= strtoll(s
, &endptr
, 10);
2415 if (endptr
[0] != '\0') return 0;
2416 snprintf(buf
,32,"%lld",value
);
2418 /* If the number converted back into a string is not identical
2419 * then it's not possible to encode the string as integer */
2420 if (strlen(buf
) != sdslen(s
) || memcmp(buf
,s
,sdslen(s
))) return 0;
2422 /* Finally check if it fits in our ranges */
2423 if (value
>= -(1<<7) && value
<= (1<<7)-1) {
2424 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT8
;
2425 enc
[1] = value
&0xFF;
2427 } else if (value
>= -(1<<15) && value
<= (1<<15)-1) {
2428 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT16
;
2429 enc
[1] = value
&0xFF;
2430 enc
[2] = (value
>>8)&0xFF;
2432 } else if (value
>= -((long long)1<<31) && value
<= ((long long)1<<31)-1) {
2433 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT32
;
2434 enc
[1] = value
&0xFF;
2435 enc
[2] = (value
>>8)&0xFF;
2436 enc
[3] = (value
>>16)&0xFF;
2437 enc
[4] = (value
>>24)&0xFF;
2444 static int rdbSaveLzfStringObject(FILE *fp
, robj
*obj
) {
2445 unsigned int comprlen
, outlen
;
2449 /* We require at least four bytes compression for this to be worth it */
2450 outlen
= sdslen(obj
->ptr
)-4;
2451 if (outlen
<= 0) return 0;
2452 if ((out
= zmalloc(outlen
+1)) == NULL
) return 0;
2453 comprlen
= lzf_compress(obj
->ptr
, sdslen(obj
->ptr
), out
, outlen
);
2454 if (comprlen
== 0) {
2458 /* Data compressed! Let's save it on disk */
2459 byte
= (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_LZF
;
2460 if (fwrite(&byte
,1,1,fp
) == 0) goto writeerr
;
2461 if (rdbSaveLen(fp
,comprlen
) == -1) goto writeerr
;
2462 if (rdbSaveLen(fp
,sdslen(obj
->ptr
)) == -1) goto writeerr
;
2463 if (fwrite(out
,comprlen
,1,fp
) == 0) goto writeerr
;
2472 /* Save a string objet as [len][data] on disk. If the object is a string
2473 * representation of an integer value we try to safe it in a special form */
2474 static int rdbSaveStringObjectRaw(FILE *fp
, robj
*obj
) {
2478 len
= sdslen(obj
->ptr
);
2480 /* Try integer encoding */
2482 unsigned char buf
[5];
2483 if ((enclen
= rdbTryIntegerEncoding(obj
->ptr
,buf
)) > 0) {
2484 if (fwrite(buf
,enclen
,1,fp
) == 0) return -1;
2489 /* Try LZF compression - under 20 bytes it's unable to compress even
2490 * aaaaaaaaaaaaaaaaaa so skip it */
2494 retval
= rdbSaveLzfStringObject(fp
,obj
);
2495 if (retval
== -1) return -1;
2496 if (retval
> 0) return 0;
2497 /* retval == 0 means data can't be compressed, save the old way */
2500 /* Store verbatim */
2501 if (rdbSaveLen(fp
,len
) == -1) return -1;
2502 if (len
&& fwrite(obj
->ptr
,len
,1,fp
) == 0) return -1;
2506 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2507 static int rdbSaveStringObject(FILE *fp
, robj
*obj
) {
2510 obj
= getDecodedObject(obj
);
2511 retval
= rdbSaveStringObjectRaw(fp
,obj
);
2516 /* Save a double value. Doubles are saved as strings prefixed by an unsigned
2517 * 8 bit integer specifing the length of the representation.
2518 * This 8 bit integer has special values in order to specify the following
2524 static int rdbSaveDoubleValue(FILE *fp
, double val
) {
2525 unsigned char buf
[128];
2531 } else if (!isfinite(val
)) {
2533 buf
[0] = (val
< 0) ? 255 : 254;
2535 snprintf((char*)buf
+1,sizeof(buf
)-1,"%.17g",val
);
2536 buf
[0] = strlen((char*)buf
+1);
2539 if (fwrite(buf
,len
,1,fp
) == 0) return -1;
2543 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2544 static int rdbSave(char *filename
) {
2545 dictIterator
*di
= NULL
;
2550 time_t now
= time(NULL
);
2552 snprintf(tmpfile
,256,"temp-%d.rdb", (int) getpid());
2553 fp
= fopen(tmpfile
,"w");
2555 redisLog(REDIS_WARNING
, "Failed saving the DB: %s", strerror(errno
));
2558 if (fwrite("REDIS0001",9,1,fp
) == 0) goto werr
;
2559 for (j
= 0; j
< server
.dbnum
; j
++) {
2560 redisDb
*db
= server
.db
+j
;
2562 if (dictSize(d
) == 0) continue;
2563 di
= dictGetIterator(d
);
2569 /* Write the SELECT DB opcode */
2570 if (rdbSaveType(fp
,REDIS_SELECTDB
) == -1) goto werr
;
2571 if (rdbSaveLen(fp
,j
) == -1) goto werr
;
2573 /* Iterate this DB writing every entry */
2574 while((de
= dictNext(di
)) != NULL
) {
2575 robj
*key
= dictGetEntryKey(de
);
2576 robj
*o
= dictGetEntryVal(de
);
2577 time_t expiretime
= getExpire(db
,key
);
2579 /* Save the expire time */
2580 if (expiretime
!= -1) {
2581 /* If this key is already expired skip it */
2582 if (expiretime
< now
) continue;
2583 if (rdbSaveType(fp
,REDIS_EXPIRETIME
) == -1) goto werr
;
2584 if (rdbSaveTime(fp
,expiretime
) == -1) goto werr
;
2586 /* Save the key and associated value */
2587 if (rdbSaveType(fp
,o
->type
) == -1) goto werr
;
2588 if (rdbSaveStringObject(fp
,key
) == -1) goto werr
;
2589 if (o
->type
== REDIS_STRING
) {
2590 /* Save a string value */
2591 if (rdbSaveStringObject(fp
,o
) == -1) goto werr
;
2592 } else if (o
->type
== REDIS_LIST
) {
2593 /* Save a list value */
2594 list
*list
= o
->ptr
;
2598 if (rdbSaveLen(fp
,listLength(list
)) == -1) goto werr
;
2599 while((ln
= listYield(list
))) {
2600 robj
*eleobj
= listNodeValue(ln
);
2602 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
2604 } else if (o
->type
== REDIS_SET
) {
2605 /* Save a set value */
2607 dictIterator
*di
= dictGetIterator(set
);
2610 if (rdbSaveLen(fp
,dictSize(set
)) == -1) goto werr
;
2611 while((de
= dictNext(di
)) != NULL
) {
2612 robj
*eleobj
= dictGetEntryKey(de
);
2614 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
2616 dictReleaseIterator(di
);
2617 } else if (o
->type
== REDIS_ZSET
) {
2618 /* Save a set value */
2620 dictIterator
*di
= dictGetIterator(zs
->dict
);
2623 if (rdbSaveLen(fp
,dictSize(zs
->dict
)) == -1) goto werr
;
2624 while((de
= dictNext(di
)) != NULL
) {
2625 robj
*eleobj
= dictGetEntryKey(de
);
2626 double *score
= dictGetEntryVal(de
);
2628 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
2629 if (rdbSaveDoubleValue(fp
,*score
) == -1) goto werr
;
2631 dictReleaseIterator(di
);
2633 redisAssert(0 != 0);
2636 dictReleaseIterator(di
);
2639 if (rdbSaveType(fp
,REDIS_EOF
) == -1) goto werr
;
2641 /* Make sure data will not remain on the OS's output buffers */
2646 /* Use RENAME to make sure the DB file is changed atomically only
2647 * if the generate DB file is ok. */
2648 if (rename(tmpfile
,filename
) == -1) {
2649 redisLog(REDIS_WARNING
,"Error moving temp DB file on the final destination: %s", strerror(errno
));
2653 redisLog(REDIS_NOTICE
,"DB saved on disk");
2655 server
.lastsave
= time(NULL
);
2661 redisLog(REDIS_WARNING
,"Write error saving DB on disk: %s", strerror(errno
));
2662 if (di
) dictReleaseIterator(di
);
2666 static int rdbSaveBackground(char *filename
) {
2669 if (server
.bgsavechildpid
!= -1) return REDIS_ERR
;
2670 if ((childpid
= fork()) == 0) {
2673 if (rdbSave(filename
) == REDIS_OK
) {
2680 if (childpid
== -1) {
2681 redisLog(REDIS_WARNING
,"Can't save in background: fork: %s",
2685 redisLog(REDIS_NOTICE
,"Background saving started by pid %d",childpid
);
2686 server
.bgsavechildpid
= childpid
;
2689 return REDIS_OK
; /* unreached */
2692 static void rdbRemoveTempFile(pid_t childpid
) {
2695 snprintf(tmpfile
,256,"temp-%d.rdb", (int) childpid
);
2699 static int rdbLoadType(FILE *fp
) {
2701 if (fread(&type
,1,1,fp
) == 0) return -1;
2705 static time_t rdbLoadTime(FILE *fp
) {
2707 if (fread(&t32
,4,1,fp
) == 0) return -1;
2708 return (time_t) t32
;
2711 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2712 * of this file for a description of how this are stored on disk.
2714 * isencoded is set to 1 if the readed length is not actually a length but
2715 * an "encoding type", check the above comments for more info */
2716 static uint32_t rdbLoadLen(FILE *fp
, int rdbver
, int *isencoded
) {
2717 unsigned char buf
[2];
2720 if (isencoded
) *isencoded
= 0;
2722 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
2727 if (fread(buf
,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2728 type
= (buf
[0]&0xC0)>>6;
2729 if (type
== REDIS_RDB_6BITLEN
) {
2730 /* Read a 6 bit len */
2732 } else if (type
== REDIS_RDB_ENCVAL
) {
2733 /* Read a 6 bit len encoding type */
2734 if (isencoded
) *isencoded
= 1;
2736 } else if (type
== REDIS_RDB_14BITLEN
) {
2737 /* Read a 14 bit len */
2738 if (fread(buf
+1,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2739 return ((buf
[0]&0x3F)<<8)|buf
[1];
2741 /* Read a 32 bit len */
2742 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
2748 static robj
*rdbLoadIntegerObject(FILE *fp
, int enctype
) {
2749 unsigned char enc
[4];
2752 if (enctype
== REDIS_RDB_ENC_INT8
) {
2753 if (fread(enc
,1,1,fp
) == 0) return NULL
;
2754 val
= (signed char)enc
[0];
2755 } else if (enctype
== REDIS_RDB_ENC_INT16
) {
2757 if (fread(enc
,2,1,fp
) == 0) return NULL
;
2758 v
= enc
[0]|(enc
[1]<<8);
2760 } else if (enctype
== REDIS_RDB_ENC_INT32
) {
2762 if (fread(enc
,4,1,fp
) == 0) return NULL
;
2763 v
= enc
[0]|(enc
[1]<<8)|(enc
[2]<<16)|(enc
[3]<<24);
2766 val
= 0; /* anti-warning */
2769 return createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",val
));
2772 static robj
*rdbLoadLzfStringObject(FILE*fp
, int rdbver
) {
2773 unsigned int len
, clen
;
2774 unsigned char *c
= NULL
;
2777 if ((clen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2778 if ((len
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2779 if ((c
= zmalloc(clen
)) == NULL
) goto err
;
2780 if ((val
= sdsnewlen(NULL
,len
)) == NULL
) goto err
;
2781 if (fread(c
,clen
,1,fp
) == 0) goto err
;
2782 if (lzf_decompress(c
,clen
,val
,len
) == 0) goto err
;
2784 return createObject(REDIS_STRING
,val
);
2791 static robj
*rdbLoadStringObject(FILE*fp
, int rdbver
) {
2796 len
= rdbLoadLen(fp
,rdbver
,&isencoded
);
2799 case REDIS_RDB_ENC_INT8
:
2800 case REDIS_RDB_ENC_INT16
:
2801 case REDIS_RDB_ENC_INT32
:
2802 return tryObjectSharing(rdbLoadIntegerObject(fp
,len
));
2803 case REDIS_RDB_ENC_LZF
:
2804 return tryObjectSharing(rdbLoadLzfStringObject(fp
,rdbver
));
2810 if (len
== REDIS_RDB_LENERR
) return NULL
;
2811 val
= sdsnewlen(NULL
,len
);
2812 if (len
&& fread(val
,len
,1,fp
) == 0) {
2816 return tryObjectSharing(createObject(REDIS_STRING
,val
));
2819 /* For information about double serialization check rdbSaveDoubleValue() */
2820 static int rdbLoadDoubleValue(FILE *fp
, double *val
) {
2824 if (fread(&len
,1,1,fp
) == 0) return -1;
2826 case 255: *val
= R_NegInf
; return 0;
2827 case 254: *val
= R_PosInf
; return 0;
2828 case 253: *val
= R_Nan
; return 0;
2830 if (fread(buf
,len
,1,fp
) == 0) return -1;
2832 sscanf(buf
, "%lg", val
);
2837 static int rdbLoad(char *filename
) {
2839 robj
*keyobj
= NULL
;
2841 int type
, retval
, rdbver
;
2842 dict
*d
= server
.db
[0].dict
;
2843 redisDb
*db
= server
.db
+0;
2845 time_t expiretime
= -1, now
= time(NULL
);
2847 fp
= fopen(filename
,"r");
2848 if (!fp
) return REDIS_ERR
;
2849 if (fread(buf
,9,1,fp
) == 0) goto eoferr
;
2851 if (memcmp(buf
,"REDIS",5) != 0) {
2853 redisLog(REDIS_WARNING
,"Wrong signature trying to load DB from file");
2856 rdbver
= atoi(buf
+5);
2859 redisLog(REDIS_WARNING
,"Can't handle RDB format version %d",rdbver
);
2866 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
2867 if (type
== REDIS_EXPIRETIME
) {
2868 if ((expiretime
= rdbLoadTime(fp
)) == -1) goto eoferr
;
2869 /* We read the time so we need to read the object type again */
2870 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
2872 if (type
== REDIS_EOF
) break;
2873 /* Handle SELECT DB opcode as a special case */
2874 if (type
== REDIS_SELECTDB
) {
2875 if ((dbid
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2877 if (dbid
>= (unsigned)server
.dbnum
) {
2878 redisLog(REDIS_WARNING
,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server
.dbnum
);
2881 db
= server
.db
+dbid
;
2886 if ((keyobj
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2888 if (type
== REDIS_STRING
) {
2889 /* Read string value */
2890 if ((o
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2891 tryObjectEncoding(o
);
2892 } else if (type
== REDIS_LIST
|| type
== REDIS_SET
) {
2893 /* Read list/set value */
2896 if ((listlen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2898 o
= (type
== REDIS_LIST
) ? createListObject() : createSetObject();
2899 /* Load every single element of the list/set */
2903 if ((ele
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2904 tryObjectEncoding(ele
);
2905 if (type
== REDIS_LIST
) {
2906 listAddNodeTail((list
*)o
->ptr
,ele
);
2908 dictAdd((dict
*)o
->ptr
,ele
,NULL
);
2911 } else if (type
== REDIS_ZSET
) {
2912 /* Read list/set value */
2916 if ((zsetlen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2918 o
= createZsetObject();
2920 /* Load every single element of the list/set */
2923 double *score
= zmalloc(sizeof(double));
2925 if ((ele
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2926 tryObjectEncoding(ele
);
2927 if (rdbLoadDoubleValue(fp
,score
) == -1) goto eoferr
;
2928 dictAdd(zs
->dict
,ele
,score
);
2929 zslInsert(zs
->zsl
,*score
,ele
);
2930 incrRefCount(ele
); /* added to skiplist */
2933 redisAssert(0 != 0);
2935 /* Add the new object in the hash table */
2936 retval
= dictAdd(d
,keyobj
,o
);
2937 if (retval
== DICT_ERR
) {
2938 redisLog(REDIS_WARNING
,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj
->ptr
);
2941 /* Set the expire time if needed */
2942 if (expiretime
!= -1) {
2943 setExpire(db
,keyobj
,expiretime
);
2944 /* Delete this key if already expired */
2945 if (expiretime
< now
) deleteKey(db
,keyobj
);
2953 eoferr
: /* unexpected end of file is handled here with a fatal exit */
2954 if (keyobj
) decrRefCount(keyobj
);
2955 redisLog(REDIS_WARNING
,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
2957 return REDIS_ERR
; /* Just to avoid warning */
2960 /*================================== Commands =============================== */
2962 static void authCommand(redisClient
*c
) {
2963 if (!server
.requirepass
|| !strcmp(c
->argv
[1]->ptr
, server
.requirepass
)) {
2964 c
->authenticated
= 1;
2965 addReply(c
,shared
.ok
);
2967 c
->authenticated
= 0;
2968 addReplySds(c
,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
2972 static void pingCommand(redisClient
*c
) {
2973 addReply(c
,shared
.pong
);
2976 static void echoCommand(redisClient
*c
) {
2977 addReplyBulkLen(c
,c
->argv
[1]);
2978 addReply(c
,c
->argv
[1]);
2979 addReply(c
,shared
.crlf
);
2982 /*=================================== Strings =============================== */
2984 static void setGenericCommand(redisClient
*c
, int nx
) {
2987 if (nx
) deleteIfVolatile(c
->db
,c
->argv
[1]);
2988 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2989 if (retval
== DICT_ERR
) {
2991 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2992 incrRefCount(c
->argv
[2]);
2994 addReply(c
,shared
.czero
);
2998 incrRefCount(c
->argv
[1]);
2999 incrRefCount(c
->argv
[2]);
3002 removeExpire(c
->db
,c
->argv
[1]);
3003 addReply(c
, nx
? shared
.cone
: shared
.ok
);
3006 static void setCommand(redisClient
*c
) {
3007 setGenericCommand(c
,0);
3010 static void setnxCommand(redisClient
*c
) {
3011 setGenericCommand(c
,1);
3014 static void getCommand(redisClient
*c
) {
3015 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3018 addReply(c
,shared
.nullbulk
);
3020 if (o
->type
!= REDIS_STRING
) {
3021 addReply(c
,shared
.wrongtypeerr
);
3023 addReplyBulkLen(c
,o
);
3025 addReply(c
,shared
.crlf
);
3030 static void getsetCommand(redisClient
*c
) {
3032 if (dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]) == DICT_ERR
) {
3033 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
3035 incrRefCount(c
->argv
[1]);
3037 incrRefCount(c
->argv
[2]);
3039 removeExpire(c
->db
,c
->argv
[1]);
3042 static void mgetCommand(redisClient
*c
) {
3045 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",c
->argc
-1));
3046 for (j
= 1; j
< c
->argc
; j
++) {
3047 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[j
]);
3049 addReply(c
,shared
.nullbulk
);
3051 if (o
->type
!= REDIS_STRING
) {
3052 addReply(c
,shared
.nullbulk
);
3054 addReplyBulkLen(c
,o
);
3056 addReply(c
,shared
.crlf
);
3062 static void msetGenericCommand(redisClient
*c
, int nx
) {
3063 int j
, busykeys
= 0;
3065 if ((c
->argc
% 2) == 0) {
3066 addReplySds(c
,sdsnew("-ERR wrong number of arguments for MSET\r\n"));
3069 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
3070 * set nothing at all if at least one already key exists. */
3072 for (j
= 1; j
< c
->argc
; j
+= 2) {
3073 if (lookupKeyWrite(c
->db
,c
->argv
[j
]) != NULL
) {
3079 addReply(c
, shared
.czero
);
3083 for (j
= 1; j
< c
->argc
; j
+= 2) {
3086 tryObjectEncoding(c
->argv
[j
+1]);
3087 retval
= dictAdd(c
->db
->dict
,c
->argv
[j
],c
->argv
[j
+1]);
3088 if (retval
== DICT_ERR
) {
3089 dictReplace(c
->db
->dict
,c
->argv
[j
],c
->argv
[j
+1]);
3090 incrRefCount(c
->argv
[j
+1]);
3092 incrRefCount(c
->argv
[j
]);
3093 incrRefCount(c
->argv
[j
+1]);
3095 removeExpire(c
->db
,c
->argv
[j
]);
3097 server
.dirty
+= (c
->argc
-1)/2;
3098 addReply(c
, nx
? shared
.cone
: shared
.ok
);
3101 static void msetCommand(redisClient
*c
) {
3102 msetGenericCommand(c
,0);
3105 static void msetnxCommand(redisClient
*c
) {
3106 msetGenericCommand(c
,1);
3109 static void incrDecrCommand(redisClient
*c
, long long incr
) {
3114 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3118 if (o
->type
!= REDIS_STRING
) {
3123 if (o
->encoding
== REDIS_ENCODING_RAW
)
3124 value
= strtoll(o
->ptr
, &eptr
, 10);
3125 else if (o
->encoding
== REDIS_ENCODING_INT
)
3126 value
= (long)o
->ptr
;
3128 redisAssert(1 != 1);
3133 o
= createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",value
));
3134 tryObjectEncoding(o
);
3135 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],o
);
3136 if (retval
== DICT_ERR
) {
3137 dictReplace(c
->db
->dict
,c
->argv
[1],o
);
3138 removeExpire(c
->db
,c
->argv
[1]);
3140 incrRefCount(c
->argv
[1]);
3143 addReply(c
,shared
.colon
);
3145 addReply(c
,shared
.crlf
);
3148 static void incrCommand(redisClient
*c
) {
3149 incrDecrCommand(c
,1);
3152 static void decrCommand(redisClient
*c
) {
3153 incrDecrCommand(c
,-1);
3156 static void incrbyCommand(redisClient
*c
) {
3157 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
3158 incrDecrCommand(c
,incr
);
3161 static void decrbyCommand(redisClient
*c
) {
3162 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
3163 incrDecrCommand(c
,-incr
);
3166 /* ========================= Type agnostic commands ========================= */
3168 static void delCommand(redisClient
*c
) {
3171 for (j
= 1; j
< c
->argc
; j
++) {
3172 if (deleteKey(c
->db
,c
->argv
[j
])) {
3179 addReply(c
,shared
.czero
);
3182 addReply(c
,shared
.cone
);
3185 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",deleted
));
3190 static void existsCommand(redisClient
*c
) {
3191 addReply(c
,lookupKeyRead(c
->db
,c
->argv
[1]) ? shared
.cone
: shared
.czero
);
3194 static void selectCommand(redisClient
*c
) {
3195 int id
= atoi(c
->argv
[1]->ptr
);
3197 if (selectDb(c
,id
) == REDIS_ERR
) {
3198 addReplySds(c
,sdsnew("-ERR invalid DB index\r\n"));
3200 addReply(c
,shared
.ok
);
3204 static void randomkeyCommand(redisClient
*c
) {
3208 de
= dictGetRandomKey(c
->db
->dict
);
3209 if (!de
|| expireIfNeeded(c
->db
,dictGetEntryKey(de
)) == 0) break;
3212 addReply(c
,shared
.plus
);
3213 addReply(c
,shared
.crlf
);
3215 addReply(c
,shared
.plus
);
3216 addReply(c
,dictGetEntryKey(de
));
3217 addReply(c
,shared
.crlf
);
3221 static void keysCommand(redisClient
*c
) {
3224 sds pattern
= c
->argv
[1]->ptr
;
3225 int plen
= sdslen(pattern
);
3226 unsigned long numkeys
= 0, keyslen
= 0;
3227 robj
*lenobj
= createObject(REDIS_STRING
,NULL
);
3229 di
= dictGetIterator(c
->db
->dict
);
3231 decrRefCount(lenobj
);
3232 while((de
= dictNext(di
)) != NULL
) {
3233 robj
*keyobj
= dictGetEntryKey(de
);
3235 sds key
= keyobj
->ptr
;
3236 if ((pattern
[0] == '*' && pattern
[1] == '\0') ||
3237 stringmatchlen(pattern
,plen
,key
,sdslen(key
),0)) {
3238 if (expireIfNeeded(c
->db
,keyobj
) == 0) {
3240 addReply(c
,shared
.space
);
3243 keyslen
+= sdslen(key
);
3247 dictReleaseIterator(di
);
3248 lenobj
->ptr
= sdscatprintf(sdsempty(),"$%lu\r\n",keyslen
+(numkeys
? (numkeys
-1) : 0));
3249 addReply(c
,shared
.crlf
);
3252 static void dbsizeCommand(redisClient
*c
) {
3254 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c
->db
->dict
)));
3257 static void lastsaveCommand(redisClient
*c
) {
3259 sdscatprintf(sdsempty(),":%lu\r\n",server
.lastsave
));
3262 static void typeCommand(redisClient
*c
) {
3266 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3271 case REDIS_STRING
: type
= "+string"; break;
3272 case REDIS_LIST
: type
= "+list"; break;
3273 case REDIS_SET
: type
= "+set"; break;
3274 case REDIS_ZSET
: type
= "+zset"; break;
3275 default: type
= "unknown"; break;
3278 addReplySds(c
,sdsnew(type
));
3279 addReply(c
,shared
.crlf
);
3282 static void saveCommand(redisClient
*c
) {
3283 if (server
.bgsavechildpid
!= -1) {
3284 addReplySds(c
,sdsnew("-ERR background save in progress\r\n"));
3287 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
3288 addReply(c
,shared
.ok
);
3290 addReply(c
,shared
.err
);
3294 static void bgsaveCommand(redisClient
*c
) {
3295 if (server
.bgsavechildpid
!= -1) {
3296 addReplySds(c
,sdsnew("-ERR background save already in progress\r\n"));
3299 if (rdbSaveBackground(server
.dbfilename
) == REDIS_OK
) {
3300 char *status
= "+Background saving started\r\n";
3301 addReplySds(c
,sdsnew(status
));
3303 addReply(c
,shared
.err
);
3307 static void shutdownCommand(redisClient
*c
) {
3308 redisLog(REDIS_WARNING
,"User requested shutdown, saving DB...");
3309 /* Kill the saving child if there is a background saving in progress.
3310 We want to avoid race conditions, for instance our saving child may
3311 overwrite the synchronous saving did by SHUTDOWN. */
3312 if (server
.bgsavechildpid
!= -1) {
3313 redisLog(REDIS_WARNING
,"There is a live saving child. Killing it!");
3314 kill(server
.bgsavechildpid
,SIGKILL
);
3315 rdbRemoveTempFile(server
.bgsavechildpid
);
3318 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
3319 if (server
.daemonize
)
3320 unlink(server
.pidfile
);
3321 redisLog(REDIS_WARNING
,"%zu bytes used at exit",zmalloc_used_memory());
3322 redisLog(REDIS_WARNING
,"Server exit now, bye bye...");
3325 /* Ooops.. error saving! The best we can do is to continue operating.
3326 * Note that if there was a background saving process, in the next
3327 * cron() Redis will be notified that the background saving aborted,
3328 * handling special stuff like slaves pending for synchronization... */
3329 redisLog(REDIS_WARNING
,"Error trying to save the DB, can't exit");
3330 addReplySds(c
,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3334 static void renameGenericCommand(redisClient
*c
, int nx
) {
3337 /* To use the same key as src and dst is probably an error */
3338 if (sdscmp(c
->argv
[1]->ptr
,c
->argv
[2]->ptr
) == 0) {
3339 addReply(c
,shared
.sameobjecterr
);
3343 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3345 addReply(c
,shared
.nokeyerr
);
3349 deleteIfVolatile(c
->db
,c
->argv
[2]);
3350 if (dictAdd(c
->db
->dict
,c
->argv
[2],o
) == DICT_ERR
) {
3353 addReply(c
,shared
.czero
);
3356 dictReplace(c
->db
->dict
,c
->argv
[2],o
);
3358 incrRefCount(c
->argv
[2]);
3360 deleteKey(c
->db
,c
->argv
[1]);
3362 addReply(c
,nx
? shared
.cone
: shared
.ok
);
3365 static void renameCommand(redisClient
*c
) {
3366 renameGenericCommand(c
,0);
3369 static void renamenxCommand(redisClient
*c
) {
3370 renameGenericCommand(c
,1);
3373 static void moveCommand(redisClient
*c
) {
3378 /* Obtain source and target DB pointers */
3381 if (selectDb(c
,atoi(c
->argv
[2]->ptr
)) == REDIS_ERR
) {
3382 addReply(c
,shared
.outofrangeerr
);
3386 selectDb(c
,srcid
); /* Back to the source DB */
3388 /* If the user is moving using as target the same
3389 * DB as the source DB it is probably an error. */
3391 addReply(c
,shared
.sameobjecterr
);
3395 /* Check if the element exists and get a reference */
3396 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3398 addReply(c
,shared
.czero
);
3402 /* Try to add the element to the target DB */
3403 deleteIfVolatile(dst
,c
->argv
[1]);
3404 if (dictAdd(dst
->dict
,c
->argv
[1],o
) == DICT_ERR
) {
3405 addReply(c
,shared
.czero
);
3408 incrRefCount(c
->argv
[1]);
3411 /* OK! key moved, free the entry in the source DB */
3412 deleteKey(src
,c
->argv
[1]);
3414 addReply(c
,shared
.cone
);
3417 /* =================================== Lists ================================ */
3418 static void pushGenericCommand(redisClient
*c
, int where
) {
3422 lobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3424 lobj
= createListObject();
3426 if (where
== REDIS_HEAD
) {
3427 listAddNodeHead(list
,c
->argv
[2]);
3429 listAddNodeTail(list
,c
->argv
[2]);
3431 dictAdd(c
->db
->dict
,c
->argv
[1],lobj
);
3432 incrRefCount(c
->argv
[1]);
3433 incrRefCount(c
->argv
[2]);
3435 if (lobj
->type
!= REDIS_LIST
) {
3436 addReply(c
,shared
.wrongtypeerr
);
3440 if (where
== REDIS_HEAD
) {
3441 listAddNodeHead(list
,c
->argv
[2]);
3443 listAddNodeTail(list
,c
->argv
[2]);
3445 incrRefCount(c
->argv
[2]);
3448 addReply(c
,shared
.ok
);
3451 static void lpushCommand(redisClient
*c
) {
3452 pushGenericCommand(c
,REDIS_HEAD
);
3455 static void rpushCommand(redisClient
*c
) {
3456 pushGenericCommand(c
,REDIS_TAIL
);
3459 static void llenCommand(redisClient
*c
) {
3463 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3465 addReply(c
,shared
.czero
);
3468 if (o
->type
!= REDIS_LIST
) {
3469 addReply(c
,shared
.wrongtypeerr
);
3472 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",listLength(l
)));
3477 static void lindexCommand(redisClient
*c
) {
3479 int index
= atoi(c
->argv
[2]->ptr
);
3481 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3483 addReply(c
,shared
.nullbulk
);
3485 if (o
->type
!= REDIS_LIST
) {
3486 addReply(c
,shared
.wrongtypeerr
);
3488 list
*list
= o
->ptr
;
3491 ln
= listIndex(list
, index
);
3493 addReply(c
,shared
.nullbulk
);
3495 robj
*ele
= listNodeValue(ln
);
3496 addReplyBulkLen(c
,ele
);
3498 addReply(c
,shared
.crlf
);
3504 static void lsetCommand(redisClient
*c
) {
3506 int index
= atoi(c
->argv
[2]->ptr
);
3508 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3510 addReply(c
,shared
.nokeyerr
);
3512 if (o
->type
!= REDIS_LIST
) {
3513 addReply(c
,shared
.wrongtypeerr
);
3515 list
*list
= o
->ptr
;
3518 ln
= listIndex(list
, index
);
3520 addReply(c
,shared
.outofrangeerr
);
3522 robj
*ele
= listNodeValue(ln
);
3525 listNodeValue(ln
) = c
->argv
[3];
3526 incrRefCount(c
->argv
[3]);
3527 addReply(c
,shared
.ok
);
3534 static void popGenericCommand(redisClient
*c
, int where
) {
3537 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3539 addReply(c
,shared
.nullbulk
);
3541 if (o
->type
!= REDIS_LIST
) {
3542 addReply(c
,shared
.wrongtypeerr
);
3544 list
*list
= o
->ptr
;
3547 if (where
== REDIS_HEAD
)
3548 ln
= listFirst(list
);
3550 ln
= listLast(list
);
3553 addReply(c
,shared
.nullbulk
);
3555 robj
*ele
= listNodeValue(ln
);
3556 addReplyBulkLen(c
,ele
);
3558 addReply(c
,shared
.crlf
);
3559 listDelNode(list
,ln
);
3566 static void lpopCommand(redisClient
*c
) {
3567 popGenericCommand(c
,REDIS_HEAD
);
3570 static void rpopCommand(redisClient
*c
) {
3571 popGenericCommand(c
,REDIS_TAIL
);
3574 static void lrangeCommand(redisClient
*c
) {
3576 int start
= atoi(c
->argv
[2]->ptr
);
3577 int end
= atoi(c
->argv
[3]->ptr
);
3579 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3581 addReply(c
,shared
.nullmultibulk
);
3583 if (o
->type
!= REDIS_LIST
) {
3584 addReply(c
,shared
.wrongtypeerr
);
3586 list
*list
= o
->ptr
;
3588 int llen
= listLength(list
);
3592 /* convert negative indexes */
3593 if (start
< 0) start
= llen
+start
;
3594 if (end
< 0) end
= llen
+end
;
3595 if (start
< 0) start
= 0;
3596 if (end
< 0) end
= 0;
3598 /* indexes sanity checks */
3599 if (start
> end
|| start
>= llen
) {
3600 /* Out of range start or start > end result in empty list */
3601 addReply(c
,shared
.emptymultibulk
);
3604 if (end
>= llen
) end
= llen
-1;
3605 rangelen
= (end
-start
)+1;
3607 /* Return the result in form of a multi-bulk reply */
3608 ln
= listIndex(list
, start
);
3609 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",rangelen
));
3610 for (j
= 0; j
< rangelen
; j
++) {
3611 ele
= listNodeValue(ln
);
3612 addReplyBulkLen(c
,ele
);
3614 addReply(c
,shared
.crlf
);
3621 static void ltrimCommand(redisClient
*c
) {
3623 int start
= atoi(c
->argv
[2]->ptr
);
3624 int end
= atoi(c
->argv
[3]->ptr
);
3626 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3628 addReply(c
,shared
.nokeyerr
);
3630 if (o
->type
!= REDIS_LIST
) {
3631 addReply(c
,shared
.wrongtypeerr
);
3633 list
*list
= o
->ptr
;
3635 int llen
= listLength(list
);
3636 int j
, ltrim
, rtrim
;
3638 /* convert negative indexes */
3639 if (start
< 0) start
= llen
+start
;
3640 if (end
< 0) end
= llen
+end
;
3641 if (start
< 0) start
= 0;
3642 if (end
< 0) end
= 0;
3644 /* indexes sanity checks */
3645 if (start
> end
|| start
>= llen
) {
3646 /* Out of range start or start > end result in empty list */
3650 if (end
>= llen
) end
= llen
-1;
3655 /* Remove list elements to perform the trim */
3656 for (j
= 0; j
< ltrim
; j
++) {
3657 ln
= listFirst(list
);
3658 listDelNode(list
,ln
);
3660 for (j
= 0; j
< rtrim
; j
++) {
3661 ln
= listLast(list
);
3662 listDelNode(list
,ln
);
3665 addReply(c
,shared
.ok
);
3670 static void lremCommand(redisClient
*c
) {
3673 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3675 addReply(c
,shared
.czero
);
3677 if (o
->type
!= REDIS_LIST
) {
3678 addReply(c
,shared
.wrongtypeerr
);
3680 list
*list
= o
->ptr
;
3681 listNode
*ln
, *next
;
3682 int toremove
= atoi(c
->argv
[2]->ptr
);
3687 toremove
= -toremove
;
3690 ln
= fromtail
? list
->tail
: list
->head
;
3692 robj
*ele
= listNodeValue(ln
);
3694 next
= fromtail
? ln
->prev
: ln
->next
;
3695 if (compareStringObjects(ele
,c
->argv
[3]) == 0) {
3696 listDelNode(list
,ln
);
3699 if (toremove
&& removed
== toremove
) break;
3703 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",removed
));
3708 /* This is the semantic of this command:
3709 * RPOPLPUSH srclist dstlist:
3710 * IF LLEN(srclist) > 0
3711 * element = RPOP srclist
3712 * LPUSH dstlist element
3719 * The idea is to be able to get an element from a list in a reliable way
3720 * since the element is not just returned but pushed against another list
3721 * as well. This command was originally proposed by Ezra Zygmuntowicz.
3723 static void rpoplpushcommand(redisClient
*c
) {
3726 sobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3728 addReply(c
,shared
.nullbulk
);
3730 if (sobj
->type
!= REDIS_LIST
) {
3731 addReply(c
,shared
.wrongtypeerr
);
3733 list
*srclist
= sobj
->ptr
;
3734 listNode
*ln
= listLast(srclist
);
3737 addReply(c
,shared
.nullbulk
);
3739 robj
*dobj
= lookupKeyWrite(c
->db
,c
->argv
[2]);
3740 robj
*ele
= listNodeValue(ln
);
3745 /* Create the list if the key does not exist */
3746 dobj
= createListObject();
3747 dictAdd(c
->db
->dict
,c
->argv
[2],dobj
);
3748 incrRefCount(c
->argv
[2]);
3749 } else if (dobj
->type
!= REDIS_LIST
) {
3750 addReply(c
,shared
.wrongtypeerr
);
3753 /* Add the element to the target list */
3754 dstlist
= dobj
->ptr
;
3755 listAddNodeHead(dstlist
,ele
);
3758 /* Send the element to the client as reply as well */
3759 addReplyBulkLen(c
,ele
);
3761 addReply(c
,shared
.crlf
);
3763 /* Finally remove the element from the source list */
3764 listDelNode(srclist
,ln
);
3772 /* ==================================== Sets ================================ */
3774 static void saddCommand(redisClient
*c
) {
3777 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3779 set
= createSetObject();
3780 dictAdd(c
->db
->dict
,c
->argv
[1],set
);
3781 incrRefCount(c
->argv
[1]);
3783 if (set
->type
!= REDIS_SET
) {
3784 addReply(c
,shared
.wrongtypeerr
);
3788 if (dictAdd(set
->ptr
,c
->argv
[2],NULL
) == DICT_OK
) {
3789 incrRefCount(c
->argv
[2]);
3791 addReply(c
,shared
.cone
);
3793 addReply(c
,shared
.czero
);
3797 static void sremCommand(redisClient
*c
) {
3800 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3802 addReply(c
,shared
.czero
);
3804 if (set
->type
!= REDIS_SET
) {
3805 addReply(c
,shared
.wrongtypeerr
);
3808 if (dictDelete(set
->ptr
,c
->argv
[2]) == DICT_OK
) {
3810 if (htNeedsResize(set
->ptr
)) dictResize(set
->ptr
);
3811 addReply(c
,shared
.cone
);
3813 addReply(c
,shared
.czero
);
3818 static void smoveCommand(redisClient
*c
) {
3819 robj
*srcset
, *dstset
;
3821 srcset
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3822 dstset
= lookupKeyWrite(c
->db
,c
->argv
[2]);
3824 /* If the source key does not exist return 0, if it's of the wrong type
3826 if (srcset
== NULL
|| srcset
->type
!= REDIS_SET
) {
3827 addReply(c
, srcset
? shared
.wrongtypeerr
: shared
.czero
);
3830 /* Error if the destination key is not a set as well */
3831 if (dstset
&& dstset
->type
!= REDIS_SET
) {
3832 addReply(c
,shared
.wrongtypeerr
);
3835 /* Remove the element from the source set */
3836 if (dictDelete(srcset
->ptr
,c
->argv
[3]) == DICT_ERR
) {
3837 /* Key not found in the src set! return zero */
3838 addReply(c
,shared
.czero
);
3842 /* Add the element to the destination set */
3844 dstset
= createSetObject();
3845 dictAdd(c
->db
->dict
,c
->argv
[2],dstset
);
3846 incrRefCount(c
->argv
[2]);
3848 if (dictAdd(dstset
->ptr
,c
->argv
[3],NULL
) == DICT_OK
)
3849 incrRefCount(c
->argv
[3]);
3850 addReply(c
,shared
.cone
);
3853 static void sismemberCommand(redisClient
*c
) {
3856 set
= lookupKeyRead(c
->db
,c
->argv
[1]);
3858 addReply(c
,shared
.czero
);
3860 if (set
->type
!= REDIS_SET
) {
3861 addReply(c
,shared
.wrongtypeerr
);
3864 if (dictFind(set
->ptr
,c
->argv
[2]))
3865 addReply(c
,shared
.cone
);
3867 addReply(c
,shared
.czero
);
3871 static void scardCommand(redisClient
*c
) {
3875 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3877 addReply(c
,shared
.czero
);
3880 if (o
->type
!= REDIS_SET
) {
3881 addReply(c
,shared
.wrongtypeerr
);
3884 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",
3890 static void spopCommand(redisClient
*c
) {
3894 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3896 addReply(c
,shared
.nullbulk
);
3898 if (set
->type
!= REDIS_SET
) {
3899 addReply(c
,shared
.wrongtypeerr
);
3902 de
= dictGetRandomKey(set
->ptr
);
3904 addReply(c
,shared
.nullbulk
);
3906 robj
*ele
= dictGetEntryKey(de
);
3908 addReplyBulkLen(c
,ele
);
3910 addReply(c
,shared
.crlf
);
3911 dictDelete(set
->ptr
,ele
);
3912 if (htNeedsResize(set
->ptr
)) dictResize(set
->ptr
);
3918 static void srandmemberCommand(redisClient
*c
) {
3922 set
= lookupKeyRead(c
->db
,c
->argv
[1]);
3924 addReply(c
,shared
.nullbulk
);
3926 if (set
->type
!= REDIS_SET
) {
3927 addReply(c
,shared
.wrongtypeerr
);
3930 de
= dictGetRandomKey(set
->ptr
);
3932 addReply(c
,shared
.nullbulk
);
3934 robj
*ele
= dictGetEntryKey(de
);
3936 addReplyBulkLen(c
,ele
);
3938 addReply(c
,shared
.crlf
);
3943 static int qsortCompareSetsByCardinality(const void *s1
, const void *s2
) {
3944 dict
**d1
= (void*) s1
, **d2
= (void*) s2
;
3946 return dictSize(*d1
)-dictSize(*d2
);
3949 static void sinterGenericCommand(redisClient
*c
, robj
**setskeys
, unsigned long setsnum
, robj
*dstkey
) {
3950 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
3953 robj
*lenobj
= NULL
, *dstset
= NULL
;
3954 unsigned long j
, cardinality
= 0;
3956 for (j
= 0; j
< setsnum
; j
++) {
3960 lookupKeyWrite(c
->db
,setskeys
[j
]) :
3961 lookupKeyRead(c
->db
,setskeys
[j
]);
3965 deleteKey(c
->db
,dstkey
);
3966 addReply(c
,shared
.czero
);
3968 addReply(c
,shared
.nullmultibulk
);
3972 if (setobj
->type
!= REDIS_SET
) {
3974 addReply(c
,shared
.wrongtypeerr
);
3977 dv
[j
] = setobj
->ptr
;
3979 /* Sort sets from the smallest to largest, this will improve our
3980 * algorithm's performace */
3981 qsort(dv
,setsnum
,sizeof(dict
*),qsortCompareSetsByCardinality
);
3983 /* The first thing we should output is the total number of elements...
3984 * since this is a multi-bulk write, but at this stage we don't know
3985 * the intersection set size, so we use a trick, append an empty object
3986 * to the output list and save the pointer to later modify it with the
3989 lenobj
= createObject(REDIS_STRING
,NULL
);
3991 decrRefCount(lenobj
);
3993 /* If we have a target key where to store the resulting set
3994 * create this key with an empty set inside */
3995 dstset
= createSetObject();
3998 /* Iterate all the elements of the first (smallest) set, and test
3999 * the element against all the other sets, if at least one set does
4000 * not include the element it is discarded */
4001 di
= dictGetIterator(dv
[0]);
4003 while((de
= dictNext(di
)) != NULL
) {
4006 for (j
= 1; j
< setsnum
; j
++)
4007 if (dictFind(dv
[j
],dictGetEntryKey(de
)) == NULL
) break;
4009 continue; /* at least one set does not contain the member */
4010 ele
= dictGetEntryKey(de
);
4012 addReplyBulkLen(c
,ele
);
4014 addReply(c
,shared
.crlf
);
4017 dictAdd(dstset
->ptr
,ele
,NULL
);
4021 dictReleaseIterator(di
);
4024 /* Store the resulting set into the target */
4025 deleteKey(c
->db
,dstkey
);
4026 dictAdd(c
->db
->dict
,dstkey
,dstset
);
4027 incrRefCount(dstkey
);
4031 lenobj
->ptr
= sdscatprintf(sdsempty(),"*%lu\r\n",cardinality
);
4033 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",
4034 dictSize((dict
*)dstset
->ptr
)));
4040 static void sinterCommand(redisClient
*c
) {
4041 sinterGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
);
4044 static void sinterstoreCommand(redisClient
*c
) {
4045 sinterGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1]);
4048 #define REDIS_OP_UNION 0
4049 #define REDIS_OP_DIFF 1
4051 static void sunionDiffGenericCommand(redisClient
*c
, robj
**setskeys
, int setsnum
, robj
*dstkey
, int op
) {
4052 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
4055 robj
*dstset
= NULL
;
4056 int j
, cardinality
= 0;
4058 for (j
= 0; j
< setsnum
; j
++) {
4062 lookupKeyWrite(c
->db
,setskeys
[j
]) :
4063 lookupKeyRead(c
->db
,setskeys
[j
]);
4068 if (setobj
->type
!= REDIS_SET
) {
4070 addReply(c
,shared
.wrongtypeerr
);
4073 dv
[j
] = setobj
->ptr
;
4076 /* We need a temp set object to store our union. If the dstkey
4077 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
4078 * this set object will be the resulting object to set into the target key*/
4079 dstset
= createSetObject();
4081 /* Iterate all the elements of all the sets, add every element a single
4082 * time to the result set */
4083 for (j
= 0; j
< setsnum
; j
++) {
4084 if (op
== REDIS_OP_DIFF
&& j
== 0 && !dv
[j
]) break; /* result set is empty */
4085 if (!dv
[j
]) continue; /* non existing keys are like empty sets */
4087 di
= dictGetIterator(dv
[j
]);
4089 while((de
= dictNext(di
)) != NULL
) {
4092 /* dictAdd will not add the same element multiple times */
4093 ele
= dictGetEntryKey(de
);
4094 if (op
== REDIS_OP_UNION
|| j
== 0) {
4095 if (dictAdd(dstset
->ptr
,ele
,NULL
) == DICT_OK
) {
4099 } else if (op
== REDIS_OP_DIFF
) {
4100 if (dictDelete(dstset
->ptr
,ele
) == DICT_OK
) {
4105 dictReleaseIterator(di
);
4107 if (op
== REDIS_OP_DIFF
&& cardinality
== 0) break; /* result set is empty */
4110 /* Output the content of the resulting set, if not in STORE mode */
4112 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",cardinality
));
4113 di
= dictGetIterator(dstset
->ptr
);
4114 while((de
= dictNext(di
)) != NULL
) {
4117 ele
= dictGetEntryKey(de
);
4118 addReplyBulkLen(c
,ele
);
4120 addReply(c
,shared
.crlf
);
4122 dictReleaseIterator(di
);
4124 /* If we have a target key where to store the resulting set
4125 * create this key with the result set inside */
4126 deleteKey(c
->db
,dstkey
);
4127 dictAdd(c
->db
->dict
,dstkey
,dstset
);
4128 incrRefCount(dstkey
);
4133 decrRefCount(dstset
);
4135 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",
4136 dictSize((dict
*)dstset
->ptr
)));
4142 static void sunionCommand(redisClient
*c
) {
4143 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_UNION
);
4146 static void sunionstoreCommand(redisClient
*c
) {
4147 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_UNION
);
4150 static void sdiffCommand(redisClient
*c
) {
4151 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_DIFF
);
4154 static void sdiffstoreCommand(redisClient
*c
) {
4155 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_DIFF
);
4158 /* ==================================== ZSets =============================== */
4160 /* ZSETs are ordered sets using two data structures to hold the same elements
4161 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
4164 * The elements are added to an hash table mapping Redis objects to scores.
4165 * At the same time the elements are added to a skip list mapping scores
4166 * to Redis objects (so objects are sorted by scores in this "view"). */
4168 /* This skiplist implementation is almost a C translation of the original
4169 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
4170 * Alternative to Balanced Trees", modified in three ways:
4171 * a) this implementation allows for repeated values.
4172 * b) the comparison is not just by key (our 'score') but by satellite data.
4173 * c) there is a back pointer, so it's a doubly linked list with the back
4174 * pointers being only at "level 1". This allows to traverse the list
4175 * from tail to head, useful for ZREVRANGE. */
4177 static zskiplistNode
*zslCreateNode(int level
, double score
, robj
*obj
) {
4178 zskiplistNode
*zn
= zmalloc(sizeof(*zn
));
4180 zn
->forward
= zmalloc(sizeof(zskiplistNode
*) * level
);
4186 static zskiplist
*zslCreate(void) {
4190 zsl
= zmalloc(sizeof(*zsl
));
4193 zsl
->header
= zslCreateNode(ZSKIPLIST_MAXLEVEL
,0,NULL
);
4194 for (j
= 0; j
< ZSKIPLIST_MAXLEVEL
; j
++)
4195 zsl
->header
->forward
[j
] = NULL
;
4196 zsl
->header
->backward
= NULL
;
4201 static void zslFreeNode(zskiplistNode
*node
) {
4202 decrRefCount(node
->obj
);
4203 zfree(node
->forward
);
4207 static void zslFree(zskiplist
*zsl
) {
4208 zskiplistNode
*node
= zsl
->header
->forward
[0], *next
;
4210 zfree(zsl
->header
->forward
);
4213 next
= node
->forward
[0];
4220 static int zslRandomLevel(void) {
4222 while ((random()&0xFFFF) < (ZSKIPLIST_P
* 0xFFFF))
4227 static void zslInsert(zskiplist
*zsl
, double score
, robj
*obj
) {
4228 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
4232 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4233 while (x
->forward
[i
] &&
4234 (x
->forward
[i
]->score
< score
||
4235 (x
->forward
[i
]->score
== score
&&
4236 compareStringObjects(x
->forward
[i
]->obj
,obj
) < 0)))
4240 /* we assume the key is not already inside, since we allow duplicated
4241 * scores, and the re-insertion of score and redis object should never
4242 * happpen since the caller of zslInsert() should test in the hash table
4243 * if the element is already inside or not. */
4244 level
= zslRandomLevel();
4245 if (level
> zsl
->level
) {
4246 for (i
= zsl
->level
; i
< level
; i
++)
4247 update
[i
] = zsl
->header
;
4250 x
= zslCreateNode(level
,score
,obj
);
4251 for (i
= 0; i
< level
; i
++) {
4252 x
->forward
[i
] = update
[i
]->forward
[i
];
4253 update
[i
]->forward
[i
] = x
;
4255 x
->backward
= (update
[0] == zsl
->header
) ? NULL
: update
[0];
4257 x
->forward
[0]->backward
= x
;
4263 /* Delete an element with matching score/object from the skiplist. */
4264 static int zslDelete(zskiplist
*zsl
, double score
, robj
*obj
) {
4265 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
4269 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4270 while (x
->forward
[i
] &&
4271 (x
->forward
[i
]->score
< score
||
4272 (x
->forward
[i
]->score
== score
&&
4273 compareStringObjects(x
->forward
[i
]->obj
,obj
) < 0)))
4277 /* We may have multiple elements with the same score, what we need
4278 * is to find the element with both the right score and object. */
4280 if (x
&& score
== x
->score
&& compareStringObjects(x
->obj
,obj
) == 0) {
4281 for (i
= 0; i
< zsl
->level
; i
++) {
4282 if (update
[i
]->forward
[i
] != x
) break;
4283 update
[i
]->forward
[i
] = x
->forward
[i
];
4285 if (x
->forward
[0]) {
4286 x
->forward
[0]->backward
= (x
->backward
== zsl
->header
) ?
4289 zsl
->tail
= x
->backward
;
4292 while(zsl
->level
> 1 && zsl
->header
->forward
[zsl
->level
-1] == NULL
)
4297 return 0; /* not found */
4299 return 0; /* not found */
4302 /* Delete all the elements with score between min and max from the skiplist.
4303 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
4304 * Note that this function takes the reference to the hash table view of the
4305 * sorted set, in order to remove the elements from the hash table too. */
4306 static unsigned long zslDeleteRange(zskiplist
*zsl
, double min
, double max
, dict
*dict
) {
4307 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
4308 unsigned long removed
= 0;
4312 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4313 while (x
->forward
[i
] && x
->forward
[i
]->score
< min
)
4317 /* We may have multiple elements with the same score, what we need
4318 * is to find the element with both the right score and object. */
4320 while (x
&& x
->score
<= max
) {
4321 zskiplistNode
*next
;
4323 for (i
= 0; i
< zsl
->level
; i
++) {
4324 if (update
[i
]->forward
[i
] != x
) break;
4325 update
[i
]->forward
[i
] = x
->forward
[i
];
4327 if (x
->forward
[0]) {
4328 x
->forward
[0]->backward
= (x
->backward
== zsl
->header
) ?
4331 zsl
->tail
= x
->backward
;
4333 next
= x
->forward
[0];
4334 dictDelete(dict
,x
->obj
);
4336 while(zsl
->level
> 1 && zsl
->header
->forward
[zsl
->level
-1] == NULL
)
4342 return removed
; /* not found */
4345 /* Find the first node having a score equal or greater than the specified one.
4346 * Returns NULL if there is no match. */
4347 static zskiplistNode
*zslFirstWithScore(zskiplist
*zsl
, double score
) {
4352 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4353 while (x
->forward
[i
] && x
->forward
[i
]->score
< score
)
4356 /* We may have multiple elements with the same score, what we need
4357 * is to find the element with both the right score and object. */
4358 return x
->forward
[0];
4361 /* The actual Z-commands implementations */
4363 /* This generic command implements both ZADD and ZINCRBY.
4364 * scoreval is the score if the operation is a ZADD (doincrement == 0) or
4365 * the increment if the operation is a ZINCRBY (doincrement == 1). */
4366 static void zaddGenericCommand(redisClient
*c
, robj
*key
, robj
*ele
, double scoreval
, int doincrement
) {
4371 zsetobj
= lookupKeyWrite(c
->db
,key
);
4372 if (zsetobj
== NULL
) {
4373 zsetobj
= createZsetObject();
4374 dictAdd(c
->db
->dict
,key
,zsetobj
);
4377 if (zsetobj
->type
!= REDIS_ZSET
) {
4378 addReply(c
,shared
.wrongtypeerr
);
4384 /* Ok now since we implement both ZADD and ZINCRBY here the code
4385 * needs to handle the two different conditions. It's all about setting
4386 * '*score', that is, the new score to set, to the right value. */
4387 score
= zmalloc(sizeof(double));
4391 /* Read the old score. If the element was not present starts from 0 */
4392 de
= dictFind(zs
->dict
,ele
);
4394 double *oldscore
= dictGetEntryVal(de
);
4395 *score
= *oldscore
+ scoreval
;
4403 /* What follows is a simple remove and re-insert operation that is common
4404 * to both ZADD and ZINCRBY... */
4405 if (dictAdd(zs
->dict
,ele
,score
) == DICT_OK
) {
4406 /* case 1: New element */
4407 incrRefCount(ele
); /* added to hash */
4408 zslInsert(zs
->zsl
,*score
,ele
);
4409 incrRefCount(ele
); /* added to skiplist */
4412 addReplyDouble(c
,*score
);
4414 addReply(c
,shared
.cone
);
4419 /* case 2: Score update operation */
4420 de
= dictFind(zs
->dict
,ele
);
4421 redisAssert(de
!= NULL
);
4422 oldscore
= dictGetEntryVal(de
);
4423 if (*score
!= *oldscore
) {
4426 /* Remove and insert the element in the skip list with new score */
4427 deleted
= zslDelete(zs
->zsl
,*oldscore
,ele
);
4428 redisAssert(deleted
!= 0);
4429 zslInsert(zs
->zsl
,*score
,ele
);
4431 /* Update the score in the hash table */
4432 dictReplace(zs
->dict
,ele
,score
);
4438 addReplyDouble(c
,*score
);
4440 addReply(c
,shared
.czero
);
4444 static void zaddCommand(redisClient
*c
) {
4447 scoreval
= strtod(c
->argv
[2]->ptr
,NULL
);
4448 zaddGenericCommand(c
,c
->argv
[1],c
->argv
[3],scoreval
,0);
4451 static void zincrbyCommand(redisClient
*c
) {
4454 scoreval
= strtod(c
->argv
[2]->ptr
,NULL
);
4455 zaddGenericCommand(c
,c
->argv
[1],c
->argv
[3],scoreval
,1);
4458 static void zremCommand(redisClient
*c
) {
4462 zsetobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4463 if (zsetobj
== NULL
) {
4464 addReply(c
,shared
.czero
);
4470 if (zsetobj
->type
!= REDIS_ZSET
) {
4471 addReply(c
,shared
.wrongtypeerr
);
4475 de
= dictFind(zs
->dict
,c
->argv
[2]);
4477 addReply(c
,shared
.czero
);
4480 /* Delete from the skiplist */
4481 oldscore
= dictGetEntryVal(de
);
4482 deleted
= zslDelete(zs
->zsl
,*oldscore
,c
->argv
[2]);
4483 redisAssert(deleted
!= 0);
4485 /* Delete from the hash table */
4486 dictDelete(zs
->dict
,c
->argv
[2]);
4487 if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
);
4489 addReply(c
,shared
.cone
);
4493 static void zremrangebyscoreCommand(redisClient
*c
) {
4494 double min
= strtod(c
->argv
[2]->ptr
,NULL
);
4495 double max
= strtod(c
->argv
[3]->ptr
,NULL
);
4499 zsetobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4500 if (zsetobj
== NULL
) {
4501 addReply(c
,shared
.czero
);
4505 if (zsetobj
->type
!= REDIS_ZSET
) {
4506 addReply(c
,shared
.wrongtypeerr
);
4510 deleted
= zslDeleteRange(zs
->zsl
,min
,max
,zs
->dict
);
4511 if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
);
4512 server
.dirty
+= deleted
;
4513 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",deleted
));
4517 static void zrangeGenericCommand(redisClient
*c
, int reverse
) {
4519 int start
= atoi(c
->argv
[2]->ptr
);
4520 int end
= atoi(c
->argv
[3]->ptr
);
4522 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4524 addReply(c
,shared
.nullmultibulk
);
4526 if (o
->type
!= REDIS_ZSET
) {
4527 addReply(c
,shared
.wrongtypeerr
);
4529 zset
*zsetobj
= o
->ptr
;
4530 zskiplist
*zsl
= zsetobj
->zsl
;
4533 int llen
= zsl
->length
;
4537 /* convert negative indexes */
4538 if (start
< 0) start
= llen
+start
;
4539 if (end
< 0) end
= llen
+end
;
4540 if (start
< 0) start
= 0;
4541 if (end
< 0) end
= 0;
4543 /* indexes sanity checks */
4544 if (start
> end
|| start
>= llen
) {
4545 /* Out of range start or start > end result in empty list */
4546 addReply(c
,shared
.emptymultibulk
);
4549 if (end
>= llen
) end
= llen
-1;
4550 rangelen
= (end
-start
)+1;
4552 /* Return the result in form of a multi-bulk reply */
4558 ln
= zsl
->header
->forward
[0];
4560 ln
= ln
->forward
[0];
4563 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",rangelen
));
4564 for (j
= 0; j
< rangelen
; j
++) {
4566 addReplyBulkLen(c
,ele
);
4568 addReply(c
,shared
.crlf
);
4569 ln
= reverse
? ln
->backward
: ln
->forward
[0];
4575 static void zrangeCommand(redisClient
*c
) {
4576 zrangeGenericCommand(c
,0);
4579 static void zrevrangeCommand(redisClient
*c
) {
4580 zrangeGenericCommand(c
,1);
4583 static void zrangebyscoreCommand(redisClient
*c
) {
4585 double min
= strtod(c
->argv
[2]->ptr
,NULL
);
4586 double max
= strtod(c
->argv
[3]->ptr
,NULL
);
4587 int offset
= 0, limit
= -1;
4589 if (c
->argc
!= 4 && c
->argc
!= 7) {
4591 sdsnew("-ERR wrong number of arguments for ZRANGEBYSCORE\r\n"));
4593 } else if (c
->argc
== 7 && strcasecmp(c
->argv
[4]->ptr
,"limit")) {
4594 addReply(c
,shared
.syntaxerr
);
4596 } else if (c
->argc
== 7) {
4597 offset
= atoi(c
->argv
[5]->ptr
);
4598 limit
= atoi(c
->argv
[6]->ptr
);
4599 if (offset
< 0) offset
= 0;
4602 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4604 addReply(c
,shared
.nullmultibulk
);
4606 if (o
->type
!= REDIS_ZSET
) {
4607 addReply(c
,shared
.wrongtypeerr
);
4609 zset
*zsetobj
= o
->ptr
;
4610 zskiplist
*zsl
= zsetobj
->zsl
;
4613 unsigned int rangelen
= 0;
4615 /* Get the first node with the score >= min */
4616 ln
= zslFirstWithScore(zsl
,min
);
4618 /* No element matching the speciifed interval */
4619 addReply(c
,shared
.emptymultibulk
);
4623 /* We don't know in advance how many matching elements there
4624 * are in the list, so we push this object that will represent
4625 * the multi-bulk length in the output buffer, and will "fix"
4627 lenobj
= createObject(REDIS_STRING
,NULL
);
4629 decrRefCount(lenobj
);
4631 while(ln
&& ln
->score
<= max
) {
4634 ln
= ln
->forward
[0];
4637 if (limit
== 0) break;
4639 addReplyBulkLen(c
,ele
);
4641 addReply(c
,shared
.crlf
);
4642 ln
= ln
->forward
[0];
4644 if (limit
> 0) limit
--;
4646 lenobj
->ptr
= sdscatprintf(sdsempty(),"*%d\r\n",rangelen
);
4651 static void zcardCommand(redisClient
*c
) {
4655 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4657 addReply(c
,shared
.czero
);
4660 if (o
->type
!= REDIS_ZSET
) {
4661 addReply(c
,shared
.wrongtypeerr
);
4664 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",zs
->zsl
->length
));
4669 static void zscoreCommand(redisClient
*c
) {
4673 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4675 addReply(c
,shared
.nullbulk
);
4678 if (o
->type
!= REDIS_ZSET
) {
4679 addReply(c
,shared
.wrongtypeerr
);
4684 de
= dictFind(zs
->dict
,c
->argv
[2]);
4686 addReply(c
,shared
.nullbulk
);
4688 double *score
= dictGetEntryVal(de
);
4690 addReplyDouble(c
,*score
);
4696 /* ========================= Non type-specific commands ==================== */
4698 static void flushdbCommand(redisClient
*c
) {
4699 server
.dirty
+= dictSize(c
->db
->dict
);
4700 dictEmpty(c
->db
->dict
);
4701 dictEmpty(c
->db
->expires
);
4702 addReply(c
,shared
.ok
);
4705 static void flushallCommand(redisClient
*c
) {
4706 server
.dirty
+= emptyDb();
4707 addReply(c
,shared
.ok
);
4708 rdbSave(server
.dbfilename
);
4712 static redisSortOperation
*createSortOperation(int type
, robj
*pattern
) {
4713 redisSortOperation
*so
= zmalloc(sizeof(*so
));
4715 so
->pattern
= pattern
;
4719 /* Return the value associated to the key with a name obtained
4720 * substituting the first occurence of '*' in 'pattern' with 'subst' */
4721 static robj
*lookupKeyByPattern(redisDb
*db
, robj
*pattern
, robj
*subst
) {
4725 int prefixlen
, sublen
, postfixlen
;
4726 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
4730 char buf
[REDIS_SORTKEY_MAX
+1];
4733 /* If the pattern is "#" return the substitution object itself in order
4734 * to implement the "SORT ... GET #" feature. */
4735 spat
= pattern
->ptr
;
4736 if (spat
[0] == '#' && spat
[1] == '\0') {
4740 /* The substitution object may be specially encoded. If so we create
4741 * a decoded object on the fly. Otherwise getDecodedObject will just
4742 * increment the ref count, that we'll decrement later. */
4743 subst
= getDecodedObject(subst
);
4746 if (sdslen(spat
)+sdslen(ssub
)-1 > REDIS_SORTKEY_MAX
) return NULL
;
4747 p
= strchr(spat
,'*');
4749 decrRefCount(subst
);
4754 sublen
= sdslen(ssub
);
4755 postfixlen
= sdslen(spat
)-(prefixlen
+1);
4756 memcpy(keyname
.buf
,spat
,prefixlen
);
4757 memcpy(keyname
.buf
+prefixlen
,ssub
,sublen
);
4758 memcpy(keyname
.buf
+prefixlen
+sublen
,p
+1,postfixlen
);
4759 keyname
.buf
[prefixlen
+sublen
+postfixlen
] = '\0';
4760 keyname
.len
= prefixlen
+sublen
+postfixlen
;
4762 initStaticStringObject(keyobj
,((char*)&keyname
)+(sizeof(long)*2))
4763 decrRefCount(subst
);
4765 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
4766 return lookupKeyRead(db
,&keyobj
);
4769 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
4770 * the additional parameter is not standard but a BSD-specific we have to
4771 * pass sorting parameters via the global 'server' structure */
4772 static int sortCompare(const void *s1
, const void *s2
) {
4773 const redisSortObject
*so1
= s1
, *so2
= s2
;
4776 if (!server
.sort_alpha
) {
4777 /* Numeric sorting. Here it's trivial as we precomputed scores */
4778 if (so1
->u
.score
> so2
->u
.score
) {
4780 } else if (so1
->u
.score
< so2
->u
.score
) {
4786 /* Alphanumeric sorting */
4787 if (server
.sort_bypattern
) {
4788 if (!so1
->u
.cmpobj
|| !so2
->u
.cmpobj
) {
4789 /* At least one compare object is NULL */
4790 if (so1
->u
.cmpobj
== so2
->u
.cmpobj
)
4792 else if (so1
->u
.cmpobj
== NULL
)
4797 /* We have both the objects, use strcoll */
4798 cmp
= strcoll(so1
->u
.cmpobj
->ptr
,so2
->u
.cmpobj
->ptr
);
4801 /* Compare elements directly */
4804 dec1
= getDecodedObject(so1
->obj
);
4805 dec2
= getDecodedObject(so2
->obj
);
4806 cmp
= strcoll(dec1
->ptr
,dec2
->ptr
);
4811 return server
.sort_desc
? -cmp
: cmp
;
4814 /* The SORT command is the most complex command in Redis. Warning: this code
4815 * is optimized for speed and a bit less for readability */
4816 static void sortCommand(redisClient
*c
) {
4819 int desc
= 0, alpha
= 0;
4820 int limit_start
= 0, limit_count
= -1, start
, end
;
4821 int j
, dontsort
= 0, vectorlen
;
4822 int getop
= 0; /* GET operation counter */
4823 robj
*sortval
, *sortby
= NULL
, *storekey
= NULL
;
4824 redisSortObject
*vector
; /* Resulting vector to sort */
4826 /* Lookup the key to sort. It must be of the right types */
4827 sortval
= lookupKeyRead(c
->db
,c
->argv
[1]);
4828 if (sortval
== NULL
) {
4829 addReply(c
,shared
.nokeyerr
);
4832 if (sortval
->type
!= REDIS_SET
&& sortval
->type
!= REDIS_LIST
&&
4833 sortval
->type
!= REDIS_ZSET
)
4835 addReply(c
,shared
.wrongtypeerr
);
4839 /* Create a list of operations to perform for every sorted element.
4840 * Operations can be GET/DEL/INCR/DECR */
4841 operations
= listCreate();
4842 listSetFreeMethod(operations
,zfree
);
4845 /* Now we need to protect sortval incrementing its count, in the future
4846 * SORT may have options able to overwrite/delete keys during the sorting
4847 * and the sorted key itself may get destroied */
4848 incrRefCount(sortval
);
4850 /* The SORT command has an SQL-alike syntax, parse it */
4851 while(j
< c
->argc
) {
4852 int leftargs
= c
->argc
-j
-1;
4853 if (!strcasecmp(c
->argv
[j
]->ptr
,"asc")) {
4855 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"desc")) {
4857 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"alpha")) {
4859 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"limit") && leftargs
>= 2) {
4860 limit_start
= atoi(c
->argv
[j
+1]->ptr
);
4861 limit_count
= atoi(c
->argv
[j
+2]->ptr
);
4863 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"store") && leftargs
>= 1) {
4864 storekey
= c
->argv
[j
+1];
4866 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"by") && leftargs
>= 1) {
4867 sortby
= c
->argv
[j
+1];
4868 /* If the BY pattern does not contain '*', i.e. it is constant,
4869 * we don't need to sort nor to lookup the weight keys. */
4870 if (strchr(c
->argv
[j
+1]->ptr
,'*') == NULL
) dontsort
= 1;
4872 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"get") && leftargs
>= 1) {
4873 listAddNodeTail(operations
,createSortOperation(
4874 REDIS_SORT_GET
,c
->argv
[j
+1]));
4878 decrRefCount(sortval
);
4879 listRelease(operations
);
4880 addReply(c
,shared
.syntaxerr
);
4886 /* Load the sorting vector with all the objects to sort */
4887 switch(sortval
->type
) {
4888 case REDIS_LIST
: vectorlen
= listLength((list
*)sortval
->ptr
); break;
4889 case REDIS_SET
: vectorlen
= dictSize((dict
*)sortval
->ptr
); break;
4890 case REDIS_ZSET
: vectorlen
= dictSize(((zset
*)sortval
->ptr
)->dict
); break;
4891 default: vectorlen
= 0; redisAssert(0); /* Avoid GCC warning */
4893 vector
= zmalloc(sizeof(redisSortObject
)*vectorlen
);
4896 if (sortval
->type
== REDIS_LIST
) {
4897 list
*list
= sortval
->ptr
;
4901 while((ln
= listYield(list
))) {
4902 robj
*ele
= ln
->value
;
4903 vector
[j
].obj
= ele
;
4904 vector
[j
].u
.score
= 0;
4905 vector
[j
].u
.cmpobj
= NULL
;
4913 if (sortval
->type
== REDIS_SET
) {
4916 zset
*zs
= sortval
->ptr
;
4920 di
= dictGetIterator(set
);
4921 while((setele
= dictNext(di
)) != NULL
) {
4922 vector
[j
].obj
= dictGetEntryKey(setele
);
4923 vector
[j
].u
.score
= 0;
4924 vector
[j
].u
.cmpobj
= NULL
;
4927 dictReleaseIterator(di
);
4929 redisAssert(j
== vectorlen
);
4931 /* Now it's time to load the right scores in the sorting vector */
4932 if (dontsort
== 0) {
4933 for (j
= 0; j
< vectorlen
; j
++) {
4937 byval
= lookupKeyByPattern(c
->db
,sortby
,vector
[j
].obj
);
4938 if (!byval
|| byval
->type
!= REDIS_STRING
) continue;
4940 vector
[j
].u
.cmpobj
= getDecodedObject(byval
);
4942 if (byval
->encoding
== REDIS_ENCODING_RAW
) {
4943 vector
[j
].u
.score
= strtod(byval
->ptr
,NULL
);
4945 /* Don't need to decode the object if it's
4946 * integer-encoded (the only encoding supported) so
4947 * far. We can just cast it */
4948 if (byval
->encoding
== REDIS_ENCODING_INT
) {
4949 vector
[j
].u
.score
= (long)byval
->ptr
;
4951 redisAssert(1 != 1);
4956 if (vector
[j
].obj
->encoding
== REDIS_ENCODING_RAW
)
4957 vector
[j
].u
.score
= strtod(vector
[j
].obj
->ptr
,NULL
);
4959 if (vector
[j
].obj
->encoding
== REDIS_ENCODING_INT
)
4960 vector
[j
].u
.score
= (long) vector
[j
].obj
->ptr
;
4962 redisAssert(1 != 1);
4969 /* We are ready to sort the vector... perform a bit of sanity check
4970 * on the LIMIT option too. We'll use a partial version of quicksort. */
4971 start
= (limit_start
< 0) ? 0 : limit_start
;
4972 end
= (limit_count
< 0) ? vectorlen
-1 : start
+limit_count
-1;
4973 if (start
>= vectorlen
) {
4974 start
= vectorlen
-1;
4977 if (end
>= vectorlen
) end
= vectorlen
-1;
4979 if (dontsort
== 0) {
4980 server
.sort_desc
= desc
;
4981 server
.sort_alpha
= alpha
;
4982 server
.sort_bypattern
= sortby
? 1 : 0;
4983 if (sortby
&& (start
!= 0 || end
!= vectorlen
-1))
4984 pqsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
, start
,end
);
4986 qsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
);
4989 /* Send command output to the output buffer, performing the specified
4990 * GET/DEL/INCR/DECR operations if any. */
4991 outputlen
= getop
? getop
*(end
-start
+1) : end
-start
+1;
4992 if (storekey
== NULL
) {
4993 /* STORE option not specified, sent the sorting result to client */
4994 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",outputlen
));
4995 for (j
= start
; j
<= end
; j
++) {
4998 addReplyBulkLen(c
,vector
[j
].obj
);
4999 addReply(c
,vector
[j
].obj
);
5000 addReply(c
,shared
.crlf
);
5002 listRewind(operations
);
5003 while((ln
= listYield(operations
))) {
5004 redisSortOperation
*sop
= ln
->value
;
5005 robj
*val
= lookupKeyByPattern(c
->db
,sop
->pattern
,
5008 if (sop
->type
== REDIS_SORT_GET
) {
5009 if (!val
|| val
->type
!= REDIS_STRING
) {
5010 addReply(c
,shared
.nullbulk
);
5012 addReplyBulkLen(c
,val
);
5014 addReply(c
,shared
.crlf
);
5017 redisAssert(sop
->type
== REDIS_SORT_GET
); /* always fails */
5022 robj
*listObject
= createListObject();
5023 list
*listPtr
= (list
*) listObject
->ptr
;
5025 /* STORE option specified, set the sorting result as a List object */
5026 for (j
= start
; j
<= end
; j
++) {
5029 listAddNodeTail(listPtr
,vector
[j
].obj
);
5030 incrRefCount(vector
[j
].obj
);
5032 listRewind(operations
);
5033 while((ln
= listYield(operations
))) {
5034 redisSortOperation
*sop
= ln
->value
;
5035 robj
*val
= lookupKeyByPattern(c
->db
,sop
->pattern
,
5038 if (sop
->type
== REDIS_SORT_GET
) {
5039 if (!val
|| val
->type
!= REDIS_STRING
) {
5040 listAddNodeTail(listPtr
,createStringObject("",0));
5042 listAddNodeTail(listPtr
,val
);
5046 redisAssert(sop
->type
== REDIS_SORT_GET
); /* always fails */
5050 if (dictReplace(c
->db
->dict
,storekey
,listObject
)) {
5051 incrRefCount(storekey
);
5053 /* Note: we add 1 because the DB is dirty anyway since even if the
5054 * SORT result is empty a new key is set and maybe the old content
5056 server
.dirty
+= 1+outputlen
;
5057 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",outputlen
));
5061 decrRefCount(sortval
);
5062 listRelease(operations
);
5063 for (j
= 0; j
< vectorlen
; j
++) {
5064 if (sortby
&& alpha
&& vector
[j
].u
.cmpobj
)
5065 decrRefCount(vector
[j
].u
.cmpobj
);
5070 /* Create the string returned by the INFO command. This is decoupled
5071 * by the INFO command itself as we need to report the same information
5072 * on memory corruption problems. */
5073 static sds
genRedisInfoString(void) {
5075 time_t uptime
= time(NULL
)-server
.stat_starttime
;
5078 info
= sdscatprintf(sdsempty(),
5079 "redis_version:%s\r\n"
5081 "multiplexing_api:%s\r\n"
5082 "uptime_in_seconds:%ld\r\n"
5083 "uptime_in_days:%ld\r\n"
5084 "connected_clients:%d\r\n"
5085 "connected_slaves:%d\r\n"
5086 "used_memory:%zu\r\n"
5087 "changes_since_last_save:%lld\r\n"
5088 "bgsave_in_progress:%d\r\n"
5089 "last_save_time:%ld\r\n"
5090 "bgrewriteaof_in_progress:%d\r\n"
5091 "total_connections_received:%lld\r\n"
5092 "total_commands_processed:%lld\r\n"
5095 (sizeof(long) == 8) ? "64" : "32",
5099 listLength(server
.clients
)-listLength(server
.slaves
),
5100 listLength(server
.slaves
),
5103 server
.bgsavechildpid
!= -1,
5105 server
.bgrewritechildpid
!= -1,
5106 server
.stat_numconnections
,
5107 server
.stat_numcommands
,
5108 server
.masterhost
== NULL
? "master" : "slave"
5110 if (server
.masterhost
) {
5111 info
= sdscatprintf(info
,
5112 "master_host:%s\r\n"
5113 "master_port:%d\r\n"
5114 "master_link_status:%s\r\n"
5115 "master_last_io_seconds_ago:%d\r\n"
5118 (server
.replstate
== REDIS_REPL_CONNECTED
) ?
5120 server
.master
? ((int)(time(NULL
)-server
.master
->lastinteraction
)) : -1
5123 for (j
= 0; j
< server
.dbnum
; j
++) {
5124 long long keys
, vkeys
;
5126 keys
= dictSize(server
.db
[j
].dict
);
5127 vkeys
= dictSize(server
.db
[j
].expires
);
5128 if (keys
|| vkeys
) {
5129 info
= sdscatprintf(info
, "db%d:keys=%lld,expires=%lld\r\n",
5136 static void infoCommand(redisClient
*c
) {
5137 sds info
= genRedisInfoString();
5138 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n",
5139 (unsigned long)sdslen(info
)));
5140 addReplySds(c
,info
);
5141 addReply(c
,shared
.crlf
);
5144 static void monitorCommand(redisClient
*c
) {
5145 /* ignore MONITOR if aleady slave or in monitor mode */
5146 if (c
->flags
& REDIS_SLAVE
) return;
5148 c
->flags
|= (REDIS_SLAVE
|REDIS_MONITOR
);
5150 listAddNodeTail(server
.monitors
,c
);
5151 addReply(c
,shared
.ok
);
5154 /* ================================= Expire ================================= */
5155 static int removeExpire(redisDb
*db
, robj
*key
) {
5156 if (dictDelete(db
->expires
,key
) == DICT_OK
) {
5163 static int setExpire(redisDb
*db
, robj
*key
, time_t when
) {
5164 if (dictAdd(db
->expires
,key
,(void*)when
) == DICT_ERR
) {
5172 /* Return the expire time of the specified key, or -1 if no expire
5173 * is associated with this key (i.e. the key is non volatile) */
5174 static time_t getExpire(redisDb
*db
, robj
*key
) {
5177 /* No expire? return ASAP */
5178 if (dictSize(db
->expires
) == 0 ||
5179 (de
= dictFind(db
->expires
,key
)) == NULL
) return -1;
5181 return (time_t) dictGetEntryVal(de
);
5184 static int expireIfNeeded(redisDb
*db
, robj
*key
) {
5188 /* No expire? return ASAP */
5189 if (dictSize(db
->expires
) == 0 ||
5190 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
5192 /* Lookup the expire */
5193 when
= (time_t) dictGetEntryVal(de
);
5194 if (time(NULL
) <= when
) return 0;
5196 /* Delete the key */
5197 dictDelete(db
->expires
,key
);
5198 return dictDelete(db
->dict
,key
) == DICT_OK
;
5201 static int deleteIfVolatile(redisDb
*db
, robj
*key
) {
5204 /* No expire? return ASAP */
5205 if (dictSize(db
->expires
) == 0 ||
5206 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
5208 /* Delete the key */
5210 dictDelete(db
->expires
,key
);
5211 return dictDelete(db
->dict
,key
) == DICT_OK
;
5214 static void expireGenericCommand(redisClient
*c
, robj
*key
, time_t seconds
) {
5217 de
= dictFind(c
->db
->dict
,key
);
5219 addReply(c
,shared
.czero
);
5223 if (deleteKey(c
->db
,key
)) server
.dirty
++;
5224 addReply(c
, shared
.cone
);
5227 time_t when
= time(NULL
)+seconds
;
5228 if (setExpire(c
->db
,key
,when
)) {
5229 addReply(c
,shared
.cone
);
5232 addReply(c
,shared
.czero
);
5238 static void expireCommand(redisClient
*c
) {
5239 expireGenericCommand(c
,c
->argv
[1],strtol(c
->argv
[2]->ptr
,NULL
,10));
5242 static void expireatCommand(redisClient
*c
) {
5243 expireGenericCommand(c
,c
->argv
[1],strtol(c
->argv
[2]->ptr
,NULL
,10)-time(NULL
));
5246 static void ttlCommand(redisClient
*c
) {
5250 expire
= getExpire(c
->db
,c
->argv
[1]);
5252 ttl
= (int) (expire
-time(NULL
));
5253 if (ttl
< 0) ttl
= -1;
5255 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",ttl
));
5258 /* =============================== Replication ============================= */
5260 static int syncWrite(int fd
, char *ptr
, ssize_t size
, int timeout
) {
5261 ssize_t nwritten
, ret
= size
;
5262 time_t start
= time(NULL
);
5266 if (aeWait(fd
,AE_WRITABLE
,1000) & AE_WRITABLE
) {
5267 nwritten
= write(fd
,ptr
,size
);
5268 if (nwritten
== -1) return -1;
5272 if ((time(NULL
)-start
) > timeout
) {
5280 static int syncRead(int fd
, char *ptr
, ssize_t size
, int timeout
) {
5281 ssize_t nread
, totread
= 0;
5282 time_t start
= time(NULL
);
5286 if (aeWait(fd
,AE_READABLE
,1000) & AE_READABLE
) {
5287 nread
= read(fd
,ptr
,size
);
5288 if (nread
== -1) return -1;
5293 if ((time(NULL
)-start
) > timeout
) {
5301 static int syncReadLine(int fd
, char *ptr
, ssize_t size
, int timeout
) {
5308 if (syncRead(fd
,&c
,1,timeout
) == -1) return -1;
5311 if (nread
&& *(ptr
-1) == '\r') *(ptr
-1) = '\0';
5322 static void syncCommand(redisClient
*c
) {
5323 /* ignore SYNC if aleady slave or in monitor mode */
5324 if (c
->flags
& REDIS_SLAVE
) return;
5326 /* SYNC can't be issued when the server has pending data to send to
5327 * the client about already issued commands. We need a fresh reply
5328 * buffer registering the differences between the BGSAVE and the current
5329 * dataset, so that we can copy to other slaves if needed. */
5330 if (listLength(c
->reply
) != 0) {
5331 addReplySds(c
,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
5335 redisLog(REDIS_NOTICE
,"Slave ask for synchronization");
5336 /* Here we need to check if there is a background saving operation
5337 * in progress, or if it is required to start one */
5338 if (server
.bgsavechildpid
!= -1) {
5339 /* Ok a background save is in progress. Let's check if it is a good
5340 * one for replication, i.e. if there is another slave that is
5341 * registering differences since the server forked to save */
5345 listRewind(server
.slaves
);
5346 while((ln
= listYield(server
.slaves
))) {
5348 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) break;
5351 /* Perfect, the server is already registering differences for
5352 * another slave. Set the right state, and copy the buffer. */
5353 listRelease(c
->reply
);
5354 c
->reply
= listDup(slave
->reply
);
5355 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
5356 redisLog(REDIS_NOTICE
,"Waiting for end of BGSAVE for SYNC");
5358 /* No way, we need to wait for the next BGSAVE in order to
5359 * register differences */
5360 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_START
;
5361 redisLog(REDIS_NOTICE
,"Waiting for next BGSAVE for SYNC");
5364 /* Ok we don't have a BGSAVE in progress, let's start one */
5365 redisLog(REDIS_NOTICE
,"Starting BGSAVE for SYNC");
5366 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
5367 redisLog(REDIS_NOTICE
,"Replication failed, can't BGSAVE");
5368 addReplySds(c
,sdsnew("-ERR Unalbe to perform background save\r\n"));
5371 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
5374 c
->flags
|= REDIS_SLAVE
;
5376 listAddNodeTail(server
.slaves
,c
);
5380 static void sendBulkToSlave(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
5381 redisClient
*slave
= privdata
;
5383 REDIS_NOTUSED(mask
);
5384 char buf
[REDIS_IOBUF_LEN
];
5385 ssize_t nwritten
, buflen
;
5387 if (slave
->repldboff
== 0) {
5388 /* Write the bulk write count before to transfer the DB. In theory here
5389 * we don't know how much room there is in the output buffer of the
5390 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
5391 * operations) will never be smaller than the few bytes we need. */
5394 bulkcount
= sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
5396 if (write(fd
,bulkcount
,sdslen(bulkcount
)) != (signed)sdslen(bulkcount
))
5404 lseek(slave
->repldbfd
,slave
->repldboff
,SEEK_SET
);
5405 buflen
= read(slave
->repldbfd
,buf
,REDIS_IOBUF_LEN
);
5407 redisLog(REDIS_WARNING
,"Read error sending DB to slave: %s",
5408 (buflen
== 0) ? "premature EOF" : strerror(errno
));
5412 if ((nwritten
= write(fd
,buf
,buflen
)) == -1) {
5413 redisLog(REDIS_DEBUG
,"Write error sending DB to slave: %s",
5418 slave
->repldboff
+= nwritten
;
5419 if (slave
->repldboff
== slave
->repldbsize
) {
5420 close(slave
->repldbfd
);
5421 slave
->repldbfd
= -1;
5422 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
5423 slave
->replstate
= REDIS_REPL_ONLINE
;
5424 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
,
5425 sendReplyToClient
, slave
) == AE_ERR
) {
5429 addReplySds(slave
,sdsempty());
5430 redisLog(REDIS_NOTICE
,"Synchronization with slave succeeded");
5434 /* This function is called at the end of every backgrond saving.
5435 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
5436 * otherwise REDIS_ERR is passed to the function.
5438 * The goal of this function is to handle slaves waiting for a successful
5439 * background saving in order to perform non-blocking synchronization. */
5440 static void updateSlavesWaitingBgsave(int bgsaveerr
) {
5442 int startbgsave
= 0;
5444 listRewind(server
.slaves
);
5445 while((ln
= listYield(server
.slaves
))) {
5446 redisClient
*slave
= ln
->value
;
5448 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) {
5450 slave
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
5451 } else if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) {
5452 struct redis_stat buf
;
5454 if (bgsaveerr
!= REDIS_OK
) {
5456 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE child returned an error");
5459 if ((slave
->repldbfd
= open(server
.dbfilename
,O_RDONLY
)) == -1 ||
5460 redis_fstat(slave
->repldbfd
,&buf
) == -1) {
5462 redisLog(REDIS_WARNING
,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno
));
5465 slave
->repldboff
= 0;
5466 slave
->repldbsize
= buf
.st_size
;
5467 slave
->replstate
= REDIS_REPL_SEND_BULK
;
5468 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
5469 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
, sendBulkToSlave
, slave
) == AE_ERR
) {
5476 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
5477 listRewind(server
.slaves
);
5478 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE failed");
5479 while((ln
= listYield(server
.slaves
))) {
5480 redisClient
*slave
= ln
->value
;
5482 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
)
5489 static int syncWithMaster(void) {
5490 char buf
[1024], tmpfile
[256], authcmd
[1024];
5492 int fd
= anetTcpConnect(NULL
,server
.masterhost
,server
.masterport
);
5496 redisLog(REDIS_WARNING
,"Unable to connect to MASTER: %s",
5501 /* AUTH with the master if required. */
5502 if(server
.masterauth
) {
5503 snprintf(authcmd
, 1024, "AUTH %s\r\n", server
.masterauth
);
5504 if (syncWrite(fd
, authcmd
, strlen(server
.masterauth
)+7, 5) == -1) {
5506 redisLog(REDIS_WARNING
,"Unable to AUTH to MASTER: %s",
5510 /* Read the AUTH result. */
5511 if (syncReadLine(fd
,buf
,1024,3600) == -1) {
5513 redisLog(REDIS_WARNING
,"I/O error reading auth result from MASTER: %s",
5517 if (buf
[0] != '+') {
5519 redisLog(REDIS_WARNING
,"Cannot AUTH to MASTER, is the masterauth password correct?");
5524 /* Issue the SYNC command */
5525 if (syncWrite(fd
,"SYNC \r\n",7,5) == -1) {
5527 redisLog(REDIS_WARNING
,"I/O error writing to MASTER: %s",
5531 /* Read the bulk write count */
5532 if (syncReadLine(fd
,buf
,1024,3600) == -1) {
5534 redisLog(REDIS_WARNING
,"I/O error reading bulk count from MASTER: %s",
5538 if (buf
[0] != '$') {
5540 redisLog(REDIS_WARNING
,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
5543 dumpsize
= atoi(buf
+1);
5544 redisLog(REDIS_NOTICE
,"Receiving %d bytes data dump from MASTER",dumpsize
);
5545 /* Read the bulk write data on a temp file */
5546 snprintf(tmpfile
,256,"temp-%d.%ld.rdb",(int)time(NULL
),(long int)random());
5547 dfd
= open(tmpfile
,O_CREAT
|O_WRONLY
,0644);
5550 redisLog(REDIS_WARNING
,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno
));
5554 int nread
, nwritten
;
5556 nread
= read(fd
,buf
,(dumpsize
< 1024)?dumpsize
:1024);
5558 redisLog(REDIS_WARNING
,"I/O error trying to sync with MASTER: %s",
5564 nwritten
= write(dfd
,buf
,nread
);
5565 if (nwritten
== -1) {
5566 redisLog(REDIS_WARNING
,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno
));
5574 if (rename(tmpfile
,server
.dbfilename
) == -1) {
5575 redisLog(REDIS_WARNING
,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno
));
5581 if (rdbLoad(server
.dbfilename
) != REDIS_OK
) {
5582 redisLog(REDIS_WARNING
,"Failed trying to load the MASTER synchronization DB from disk");
5586 server
.master
= createClient(fd
);
5587 server
.master
->flags
|= REDIS_MASTER
;
5588 server
.master
->authenticated
= 1;
5589 server
.replstate
= REDIS_REPL_CONNECTED
;
5593 static void slaveofCommand(redisClient
*c
) {
5594 if (!strcasecmp(c
->argv
[1]->ptr
,"no") &&
5595 !strcasecmp(c
->argv
[2]->ptr
,"one")) {
5596 if (server
.masterhost
) {
5597 sdsfree(server
.masterhost
);
5598 server
.masterhost
= NULL
;
5599 if (server
.master
) freeClient(server
.master
);
5600 server
.replstate
= REDIS_REPL_NONE
;
5601 redisLog(REDIS_NOTICE
,"MASTER MODE enabled (user request)");
5604 sdsfree(server
.masterhost
);
5605 server
.masterhost
= sdsdup(c
->argv
[1]->ptr
);
5606 server
.masterport
= atoi(c
->argv
[2]->ptr
);
5607 if (server
.master
) freeClient(server
.master
);
5608 server
.replstate
= REDIS_REPL_CONNECT
;
5609 redisLog(REDIS_NOTICE
,"SLAVE OF %s:%d enabled (user request)",
5610 server
.masterhost
, server
.masterport
);
5612 addReply(c
,shared
.ok
);
5615 /* ============================ Maxmemory directive ======================== */
5617 /* This function gets called when 'maxmemory' is set on the config file to limit
5618 * the max memory used by the server, and we are out of memory.
5619 * This function will try to, in order:
5621 * - Free objects from the free list
5622 * - Try to remove keys with an EXPIRE set
5624 * It is not possible to free enough memory to reach used-memory < maxmemory
5625 * the server will start refusing commands that will enlarge even more the
5628 static void freeMemoryIfNeeded(void) {
5629 while (server
.maxmemory
&& zmalloc_used_memory() > server
.maxmemory
) {
5630 if (listLength(server
.objfreelist
)) {
5633 listNode
*head
= listFirst(server
.objfreelist
);
5634 o
= listNodeValue(head
);
5635 listDelNode(server
.objfreelist
,head
);
5638 int j
, k
, freed
= 0;
5640 for (j
= 0; j
< server
.dbnum
; j
++) {
5642 robj
*minkey
= NULL
;
5643 struct dictEntry
*de
;
5645 if (dictSize(server
.db
[j
].expires
)) {
5647 /* From a sample of three keys drop the one nearest to
5648 * the natural expire */
5649 for (k
= 0; k
< 3; k
++) {
5652 de
= dictGetRandomKey(server
.db
[j
].expires
);
5653 t
= (time_t) dictGetEntryVal(de
);
5654 if (minttl
== -1 || t
< minttl
) {
5655 minkey
= dictGetEntryKey(de
);
5659 deleteKey(server
.db
+j
,minkey
);
5662 if (!freed
) return; /* nothing to free... */
5667 /* ============================== Append Only file ========================== */
5669 static void feedAppendOnlyFile(struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
) {
5670 sds buf
= sdsempty();
5676 /* The DB this command was targetting is not the same as the last command
5677 * we appendend. To issue a SELECT command is needed. */
5678 if (dictid
!= server
.appendseldb
) {
5681 snprintf(seldb
,sizeof(seldb
),"%d",dictid
);
5682 buf
= sdscatprintf(buf
,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
5683 (unsigned long)strlen(seldb
),seldb
);
5684 server
.appendseldb
= dictid
;
5687 /* "Fix" the argv vector if the command is EXPIRE. We want to translate
5688 * EXPIREs into EXPIREATs calls */
5689 if (cmd
->proc
== expireCommand
) {
5692 tmpargv
[0] = createStringObject("EXPIREAT",8);
5693 tmpargv
[1] = argv
[1];
5694 incrRefCount(argv
[1]);
5695 when
= time(NULL
)+strtol(argv
[2]->ptr
,NULL
,10);
5696 tmpargv
[2] = createObject(REDIS_STRING
,
5697 sdscatprintf(sdsempty(),"%ld",when
));
5701 /* Append the actual command */
5702 buf
= sdscatprintf(buf
,"*%d\r\n",argc
);
5703 for (j
= 0; j
< argc
; j
++) {
5706 o
= getDecodedObject(o
);
5707 buf
= sdscatprintf(buf
,"$%lu\r\n",(unsigned long)sdslen(o
->ptr
));
5708 buf
= sdscatlen(buf
,o
->ptr
,sdslen(o
->ptr
));
5709 buf
= sdscatlen(buf
,"\r\n",2);
5713 /* Free the objects from the modified argv for EXPIREAT */
5714 if (cmd
->proc
== expireCommand
) {
5715 for (j
= 0; j
< 3; j
++)
5716 decrRefCount(argv
[j
]);
5719 /* We want to perform a single write. This should be guaranteed atomic
5720 * at least if the filesystem we are writing is a real physical one.
5721 * While this will save us against the server being killed I don't think
5722 * there is much to do about the whole server stopping for power problems
5724 nwritten
= write(server
.appendfd
,buf
,sdslen(buf
));
5725 if (nwritten
!= (signed)sdslen(buf
)) {
5726 /* Ooops, we are in troubles. The best thing to do for now is
5727 * to simply exit instead to give the illusion that everything is
5728 * working as expected. */
5729 if (nwritten
== -1) {
5730 redisLog(REDIS_WARNING
,"Exiting on error writing to the append-only file: %s",strerror(errno
));
5732 redisLog(REDIS_WARNING
,"Exiting on short write while writing to the append-only file: %s",strerror(errno
));
5736 /* If a background append only file rewriting is in progress we want to
5737 * accumulate the differences between the child DB and the current one
5738 * in a buffer, so that when the child process will do its work we
5739 * can append the differences to the new append only file. */
5740 if (server
.bgrewritechildpid
!= -1)
5741 server
.bgrewritebuf
= sdscatlen(server
.bgrewritebuf
,buf
,sdslen(buf
));
5745 if (server
.appendfsync
== APPENDFSYNC_ALWAYS
||
5746 (server
.appendfsync
== APPENDFSYNC_EVERYSEC
&&
5747 now
-server
.lastfsync
> 1))
5749 fsync(server
.appendfd
); /* Let's try to get this data on the disk */
5750 server
.lastfsync
= now
;
5754 /* In Redis commands are always executed in the context of a client, so in
5755 * order to load the append only file we need to create a fake client. */
5756 static struct redisClient
*createFakeClient(void) {
5757 struct redisClient
*c
= zmalloc(sizeof(*c
));
5761 c
->querybuf
= sdsempty();
5765 /* We set the fake client as a slave waiting for the synchronization
5766 * so that Redis will not try to send replies to this client. */
5767 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_START
;
5768 c
->reply
= listCreate();
5769 listSetFreeMethod(c
->reply
,decrRefCount
);
5770 listSetDupMethod(c
->reply
,dupClientReplyValue
);
5774 static void freeFakeClient(struct redisClient
*c
) {
5775 sdsfree(c
->querybuf
);
5776 listRelease(c
->reply
);
5780 /* Replay the append log file. On error REDIS_OK is returned. On non fatal
5781 * error (the append only file is zero-length) REDIS_ERR is returned. On
5782 * fatal error an error message is logged and the program exists. */
5783 int loadAppendOnlyFile(char *filename
) {
5784 struct redisClient
*fakeClient
;
5785 FILE *fp
= fopen(filename
,"r");
5786 struct redis_stat sb
;
5788 if (redis_fstat(fileno(fp
),&sb
) != -1 && sb
.st_size
== 0)
5792 redisLog(REDIS_WARNING
,"Fatal error: can't open the append log file for reading: %s",strerror(errno
));
5796 fakeClient
= createFakeClient();
5803 struct redisCommand
*cmd
;
5805 if (fgets(buf
,sizeof(buf
),fp
) == NULL
) {
5811 if (buf
[0] != '*') goto fmterr
;
5813 argv
= zmalloc(sizeof(robj
*)*argc
);
5814 for (j
= 0; j
< argc
; j
++) {
5815 if (fgets(buf
,sizeof(buf
),fp
) == NULL
) goto readerr
;
5816 if (buf
[0] != '$') goto fmterr
;
5817 len
= strtol(buf
+1,NULL
,10);
5818 argsds
= sdsnewlen(NULL
,len
);
5819 if (len
&& fread(argsds
,len
,1,fp
) == 0) goto fmterr
;
5820 argv
[j
] = createObject(REDIS_STRING
,argsds
);
5821 if (fread(buf
,2,1,fp
) == 0) goto fmterr
; /* discard CRLF */
5824 /* Command lookup */
5825 cmd
= lookupCommand(argv
[0]->ptr
);
5827 redisLog(REDIS_WARNING
,"Unknown command '%s' reading the append only file", argv
[0]->ptr
);
5830 /* Try object sharing and encoding */
5831 if (server
.shareobjects
) {
5833 for(j
= 1; j
< argc
; j
++)
5834 argv
[j
] = tryObjectSharing(argv
[j
]);
5836 if (cmd
->flags
& REDIS_CMD_BULK
)
5837 tryObjectEncoding(argv
[argc
-1]);
5838 /* Run the command in the context of a fake client */
5839 fakeClient
->argc
= argc
;
5840 fakeClient
->argv
= argv
;
5841 cmd
->proc(fakeClient
);
5842 /* Discard the reply objects list from the fake client */
5843 while(listLength(fakeClient
->reply
))
5844 listDelNode(fakeClient
->reply
,listFirst(fakeClient
->reply
));
5845 /* Clean up, ready for the next command */
5846 for (j
= 0; j
< argc
; j
++) decrRefCount(argv
[j
]);
5850 freeFakeClient(fakeClient
);
5855 redisLog(REDIS_WARNING
,"Unexpected end of file reading the append only file");
5857 redisLog(REDIS_WARNING
,"Unrecoverable error reading the append only file: %s", strerror(errno
));
5861 redisLog(REDIS_WARNING
,"Bad file format reading the append only file");
5865 /* Write an object into a file in the bulk format $<count>\r\n<payload>\r\n */
5866 static int fwriteBulk(FILE *fp
, robj
*obj
) {
5868 obj
= getDecodedObject(obj
);
5869 snprintf(buf
,sizeof(buf
),"$%ld\r\n",(long)sdslen(obj
->ptr
));
5870 if (fwrite(buf
,strlen(buf
),1,fp
) == 0) goto err
;
5871 if (fwrite(obj
->ptr
,sdslen(obj
->ptr
),1,fp
) == 0) goto err
;
5872 if (fwrite("\r\n",2,1,fp
) == 0) goto err
;
5880 /* Write a double value in bulk format $<count>\r\n<payload>\r\n */
5881 static int fwriteBulkDouble(FILE *fp
, double d
) {
5882 char buf
[128], dbuf
[128];
5884 snprintf(dbuf
,sizeof(dbuf
),"%.17g\r\n",d
);
5885 snprintf(buf
,sizeof(buf
),"$%lu\r\n",(unsigned long)strlen(dbuf
)-2);
5886 if (fwrite(buf
,strlen(buf
),1,fp
) == 0) return 0;
5887 if (fwrite(dbuf
,strlen(dbuf
),1,fp
) == 0) return 0;
5891 /* Write a long value in bulk format $<count>\r\n<payload>\r\n */
5892 static int fwriteBulkLong(FILE *fp
, long l
) {
5893 char buf
[128], lbuf
[128];
5895 snprintf(lbuf
,sizeof(lbuf
),"%ld\r\n",l
);
5896 snprintf(buf
,sizeof(buf
),"$%lu\r\n",(unsigned long)strlen(lbuf
)-2);
5897 if (fwrite(buf
,strlen(buf
),1,fp
) == 0) return 0;
5898 if (fwrite(lbuf
,strlen(lbuf
),1,fp
) == 0) return 0;
5902 /* Write a sequence of commands able to fully rebuild the dataset into
5903 * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */
5904 static int rewriteAppendOnlyFile(char *filename
) {
5905 dictIterator
*di
= NULL
;
5910 time_t now
= time(NULL
);
5912 /* Note that we have to use a different temp name here compared to the
5913 * one used by rewriteAppendOnlyFileBackground() function. */
5914 snprintf(tmpfile
,256,"temp-rewriteaof-%d.aof", (int) getpid());
5915 fp
= fopen(tmpfile
,"w");
5917 redisLog(REDIS_WARNING
, "Failed rewriting the append only file: %s", strerror(errno
));
5920 for (j
= 0; j
< server
.dbnum
; j
++) {
5921 char selectcmd
[] = "*2\r\n$6\r\nSELECT\r\n";
5922 redisDb
*db
= server
.db
+j
;
5924 if (dictSize(d
) == 0) continue;
5925 di
= dictGetIterator(d
);
5931 /* SELECT the new DB */
5932 if (fwrite(selectcmd
,sizeof(selectcmd
)-1,1,fp
) == 0) goto werr
;
5933 if (fwriteBulkLong(fp
,j
) == 0) goto werr
;
5935 /* Iterate this DB writing every entry */
5936 while((de
= dictNext(di
)) != NULL
) {
5937 robj
*key
= dictGetEntryKey(de
);
5938 robj
*o
= dictGetEntryVal(de
);
5939 time_t expiretime
= getExpire(db
,key
);
5941 /* Save the key and associated value */
5942 if (o
->type
== REDIS_STRING
) {
5943 /* Emit a SET command */
5944 char cmd
[]="*3\r\n$3\r\nSET\r\n";
5945 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
5947 if (fwriteBulk(fp
,key
) == 0) goto werr
;
5948 if (fwriteBulk(fp
,o
) == 0) goto werr
;
5949 } else if (o
->type
== REDIS_LIST
) {
5950 /* Emit the RPUSHes needed to rebuild the list */
5951 list
*list
= o
->ptr
;
5955 while((ln
= listYield(list
))) {
5956 char cmd
[]="*3\r\n$5\r\nRPUSH\r\n";
5957 robj
*eleobj
= listNodeValue(ln
);
5959 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
5960 if (fwriteBulk(fp
,key
) == 0) goto werr
;
5961 if (fwriteBulk(fp
,eleobj
) == 0) goto werr
;
5963 } else if (o
->type
== REDIS_SET
) {
5964 /* Emit the SADDs needed to rebuild the set */
5966 dictIterator
*di
= dictGetIterator(set
);
5969 while((de
= dictNext(di
)) != NULL
) {
5970 char cmd
[]="*3\r\n$4\r\nSADD\r\n";
5971 robj
*eleobj
= dictGetEntryKey(de
);
5973 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
5974 if (fwriteBulk(fp
,key
) == 0) goto werr
;
5975 if (fwriteBulk(fp
,eleobj
) == 0) goto werr
;
5977 dictReleaseIterator(di
);
5978 } else if (o
->type
== REDIS_ZSET
) {
5979 /* Emit the ZADDs needed to rebuild the sorted set */
5981 dictIterator
*di
= dictGetIterator(zs
->dict
);
5984 while((de
= dictNext(di
)) != NULL
) {
5985 char cmd
[]="*4\r\n$4\r\nZADD\r\n";
5986 robj
*eleobj
= dictGetEntryKey(de
);
5987 double *score
= dictGetEntryVal(de
);
5989 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
5990 if (fwriteBulk(fp
,key
) == 0) goto werr
;
5991 if (fwriteBulkDouble(fp
,*score
) == 0) goto werr
;
5992 if (fwriteBulk(fp
,eleobj
) == 0) goto werr
;
5994 dictReleaseIterator(di
);
5996 redisAssert(0 != 0);
5998 /* Save the expire time */
5999 if (expiretime
!= -1) {
6000 char cmd
[]="*3\r\n$6\r\nEXPIRE\r\n";
6001 /* If this key is already expired skip it */
6002 if (expiretime
< now
) continue;
6003 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6004 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6005 if (fwriteBulkLong(fp
,expiretime
) == 0) goto werr
;
6008 dictReleaseIterator(di
);
6011 /* Make sure data will not remain on the OS's output buffers */
6016 /* Use RENAME to make sure the DB file is changed atomically only
6017 * if the generate DB file is ok. */
6018 if (rename(tmpfile
,filename
) == -1) {
6019 redisLog(REDIS_WARNING
,"Error moving temp append only file on the final destination: %s", strerror(errno
));
6023 redisLog(REDIS_NOTICE
,"SYNC append only file rewrite performed");
6029 redisLog(REDIS_WARNING
,"Write error writing append only fileon disk: %s", strerror(errno
));
6030 if (di
) dictReleaseIterator(di
);
6034 /* This is how rewriting of the append only file in background works:
6036 * 1) The user calls BGREWRITEAOF
6037 * 2) Redis calls this function, that forks():
6038 * 2a) the child rewrite the append only file in a temp file.
6039 * 2b) the parent accumulates differences in server.bgrewritebuf.
6040 * 3) When the child finished '2a' exists.
6041 * 4) The parent will trap the exit code, if it's OK, will append the
6042 * data accumulated into server.bgrewritebuf into the temp file, and
6043 * finally will rename(2) the temp file in the actual file name.
6044 * The the new file is reopened as the new append only file. Profit!
6046 static int rewriteAppendOnlyFileBackground(void) {
6049 if (server
.bgrewritechildpid
!= -1) return REDIS_ERR
;
6050 if ((childpid
= fork()) == 0) {
6055 snprintf(tmpfile
,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
6056 if (rewriteAppendOnlyFile(tmpfile
) == REDIS_OK
) {
6063 if (childpid
== -1) {
6064 redisLog(REDIS_WARNING
,
6065 "Can't rewrite append only file in background: fork: %s",
6069 redisLog(REDIS_NOTICE
,
6070 "Background append only file rewriting started by pid %d",childpid
);
6071 server
.bgrewritechildpid
= childpid
;
6072 /* We set appendseldb to -1 in order to force the next call to the
6073 * feedAppendOnlyFile() to issue a SELECT command, so the differences
6074 * accumulated by the parent into server.bgrewritebuf will start
6075 * with a SELECT statement and it will be safe to merge. */
6076 server
.appendseldb
= -1;
6079 return REDIS_OK
; /* unreached */
6082 static void bgrewriteaofCommand(redisClient
*c
) {
6083 if (server
.bgrewritechildpid
!= -1) {
6084 addReplySds(c
,sdsnew("-ERR background append only file rewriting already in progress\r\n"));
6087 if (rewriteAppendOnlyFileBackground() == REDIS_OK
) {
6088 char *status
= "+Background append only file rewriting started\r\n";
6089 addReplySds(c
,sdsnew(status
));
6091 addReply(c
,shared
.err
);
6095 static void aofRemoveTempFile(pid_t childpid
) {
6098 snprintf(tmpfile
,256,"temp-rewriteaof-bg-%d.aof", (int) childpid
);
6102 /* ================================= Debugging ============================== */
6104 static void debugCommand(redisClient
*c
) {
6105 if (!strcasecmp(c
->argv
[1]->ptr
,"segfault")) {
6107 } else if (!strcasecmp(c
->argv
[1]->ptr
,"reload")) {
6108 if (rdbSave(server
.dbfilename
) != REDIS_OK
) {
6109 addReply(c
,shared
.err
);
6113 if (rdbLoad(server
.dbfilename
) != REDIS_OK
) {
6114 addReply(c
,shared
.err
);
6117 redisLog(REDIS_WARNING
,"DB reloaded by DEBUG RELOAD");
6118 addReply(c
,shared
.ok
);
6119 } else if (!strcasecmp(c
->argv
[1]->ptr
,"loadaof")) {
6121 if (loadAppendOnlyFile(server
.appendfilename
) != REDIS_OK
) {
6122 addReply(c
,shared
.err
);
6125 redisLog(REDIS_WARNING
,"Append Only File loaded by DEBUG LOADAOF");
6126 addReply(c
,shared
.ok
);
6127 } else if (!strcasecmp(c
->argv
[1]->ptr
,"object") && c
->argc
== 3) {
6128 dictEntry
*de
= dictFind(c
->db
->dict
,c
->argv
[2]);
6132 addReply(c
,shared
.nokeyerr
);
6135 key
= dictGetEntryKey(de
);
6136 val
= dictGetEntryVal(de
);
6137 addReplySds(c
,sdscatprintf(sdsempty(),
6138 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
6139 (void*)key
, key
->refcount
, (void*)val
, val
->refcount
,
6142 addReplySds(c
,sdsnew(
6143 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>|RELOAD]\r\n"));
6147 static void _redisAssert(char *estr
) {
6148 redisLog(REDIS_WARNING
,"=== ASSERTION FAILED ===");
6149 redisLog(REDIS_WARNING
,"==> %s\n",estr
);
6150 #ifdef HAVE_BACKTRACE
6151 redisLog(REDIS_WARNING
,"(forcing SIGSEGV in order to print the stack trace)");
6156 /* =================================== Main! ================================ */
6159 int linuxOvercommitMemoryValue(void) {
6160 FILE *fp
= fopen("/proc/sys/vm/overcommit_memory","r");
6164 if (fgets(buf
,64,fp
) == NULL
) {
6173 void linuxOvercommitMemoryWarning(void) {
6174 if (linuxOvercommitMemoryValue() == 0) {
6175 redisLog(REDIS_WARNING
,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
6178 #endif /* __linux__ */
6180 static void daemonize(void) {
6184 if (fork() != 0) exit(0); /* parent exits */
6185 printf("New pid: %d\n", getpid());
6186 setsid(); /* create a new session */
6188 /* Every output goes to /dev/null. If Redis is daemonized but
6189 * the 'logfile' is set to 'stdout' in the configuration file
6190 * it will not log at all. */
6191 if ((fd
= open("/dev/null", O_RDWR
, 0)) != -1) {
6192 dup2(fd
, STDIN_FILENO
);
6193 dup2(fd
, STDOUT_FILENO
);
6194 dup2(fd
, STDERR_FILENO
);
6195 if (fd
> STDERR_FILENO
) close(fd
);
6197 /* Try to write the pid file */
6198 fp
= fopen(server
.pidfile
,"w");
6200 fprintf(fp
,"%d\n",getpid());
6205 int main(int argc
, char **argv
) {
6208 resetServerSaveParams();
6209 loadServerConfig(argv
[1]);
6210 } else if (argc
> 2) {
6211 fprintf(stderr
,"Usage: ./redis-server [/path/to/redis.conf]\n");
6214 redisLog(REDIS_WARNING
,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
6216 if (server
.daemonize
) daemonize();
6218 redisLog(REDIS_NOTICE
,"Server started, Redis version " REDIS_VERSION
);
6220 linuxOvercommitMemoryWarning();
6222 if (server
.appendonly
) {
6223 if (loadAppendOnlyFile(server
.appendfilename
) == REDIS_OK
)
6224 redisLog(REDIS_NOTICE
,"DB loaded from append only file");
6226 if (rdbLoad(server
.dbfilename
) == REDIS_OK
)
6227 redisLog(REDIS_NOTICE
,"DB loaded from disk");
6229 if (aeCreateFileEvent(server
.el
, server
.fd
, AE_READABLE
,
6230 acceptHandler
, NULL
) == AE_ERR
) oom("creating file event");
6231 redisLog(REDIS_NOTICE
,"The server is now ready to accept connections on port %d", server
.port
);
6233 aeDeleteEventLoop(server
.el
);
6237 /* ============================= Backtrace support ========================= */
6239 #ifdef HAVE_BACKTRACE
6240 static char *findFuncName(void *pointer
, unsigned long *offset
);
6242 static void *getMcontextEip(ucontext_t
*uc
) {
6243 #if defined(__FreeBSD__)
6244 return (void*) uc
->uc_mcontext
.mc_eip
;
6245 #elif defined(__dietlibc__)
6246 return (void*) uc
->uc_mcontext
.eip
;
6247 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
6249 return (void*) uc
->uc_mcontext
->__ss
.__rip
;
6251 return (void*) uc
->uc_mcontext
->__ss
.__eip
;
6253 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
6254 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
6255 return (void*) uc
->uc_mcontext
->__ss
.__rip
;
6257 return (void*) uc
->uc_mcontext
->__ss
.__eip
;
6259 #elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
6260 return (void*) uc
->uc_mcontext
.gregs
[REG_EIP
];
6261 #elif defined(__ia64__) /* Linux IA64 */
6262 return (void*) uc
->uc_mcontext
.sc_ip
;
6268 static void segvHandler(int sig
, siginfo_t
*info
, void *secret
) {
6270 char **messages
= NULL
;
6271 int i
, trace_size
= 0;
6272 unsigned long offset
=0;
6273 ucontext_t
*uc
= (ucontext_t
*) secret
;
6275 REDIS_NOTUSED(info
);
6277 redisLog(REDIS_WARNING
,
6278 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION
, sig
);
6279 infostring
= genRedisInfoString();
6280 redisLog(REDIS_WARNING
, "%s",infostring
);
6281 /* It's not safe to sdsfree() the returned string under memory
6282 * corruption conditions. Let it leak as we are going to abort */
6284 trace_size
= backtrace(trace
, 100);
6285 /* overwrite sigaction with caller's address */
6286 if (getMcontextEip(uc
) != NULL
) {
6287 trace
[1] = getMcontextEip(uc
);
6289 messages
= backtrace_symbols(trace
, trace_size
);
6291 for (i
=1; i
<trace_size
; ++i
) {
6292 char *fn
= findFuncName(trace
[i
], &offset
), *p
;
6294 p
= strchr(messages
[i
],'+');
6295 if (!fn
|| (p
&& ((unsigned long)strtol(p
+1,NULL
,10)) < offset
)) {
6296 redisLog(REDIS_WARNING
,"%s", messages
[i
]);
6298 redisLog(REDIS_WARNING
,"%d redis-server %p %s + %d", i
, trace
[i
], fn
, (unsigned int)offset
);
6301 // free(messages); Don't call free() with possibly corrupted memory.
6305 static void setupSigSegvAction(void) {
6306 struct sigaction act
;
6308 sigemptyset (&act
.sa_mask
);
6309 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
6310 * is used. Otherwise, sa_handler is used */
6311 act
.sa_flags
= SA_NODEFER
| SA_ONSTACK
| SA_RESETHAND
| SA_SIGINFO
;
6312 act
.sa_sigaction
= segvHandler
;
6313 sigaction (SIGSEGV
, &act
, NULL
);
6314 sigaction (SIGBUS
, &act
, NULL
);
6315 sigaction (SIGFPE
, &act
, NULL
);
6316 sigaction (SIGILL
, &act
, NULL
);
6317 sigaction (SIGBUS
, &act
, NULL
);
6321 #include "staticsymbols.h"
6322 /* This function try to convert a pointer into a function name. It's used in
6323 * oreder to provide a backtrace under segmentation fault that's able to
6324 * display functions declared as static (otherwise the backtrace is useless). */
6325 static char *findFuncName(void *pointer
, unsigned long *offset
){
6327 unsigned long off
, minoff
= 0;
6329 /* Try to match against the Symbol with the smallest offset */
6330 for (i
=0; symsTable
[i
].pointer
; i
++) {
6331 unsigned long lp
= (unsigned long) pointer
;
6333 if (lp
!= (unsigned long)-1 && lp
>= symsTable
[i
].pointer
) {
6334 off
=lp
-symsTable
[i
].pointer
;
6335 if (ret
< 0 || off
< minoff
) {
6341 if (ret
== -1) return NULL
;
6343 return symsTable
[ret
].name
;
6345 #else /* HAVE_BACKTRACE */
6346 static void setupSigSegvAction(void) {
6348 #endif /* HAVE_BACKTRACE */