2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
30 #define REDIS_VERSION "1.1.93"
40 #define __USE_POSIX199309
46 #endif /* HAVE_BACKTRACE */
54 #include <arpa/inet.h>
58 #include <sys/resource.h>
64 #include "solarisfixes.h"
68 #include "ae.h" /* Event driven programming library */
69 #include "sds.h" /* Dynamic safe strings */
70 #include "anet.h" /* Networking the easy way */
71 #include "dict.h" /* Hash tables */
72 #include "adlist.h" /* Linked lists */
73 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
74 #include "lzf.h" /* LZF compression library */
75 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
81 /* Static server configuration */
82 #define REDIS_SERVERPORT 6379 /* TCP port */
83 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
84 #define REDIS_IOBUF_LEN 1024
85 #define REDIS_LOADBUF_LEN 1024
86 #define REDIS_STATIC_ARGS 4
87 #define REDIS_DEFAULT_DBNUM 16
88 #define REDIS_CONFIGLINE_MAX 1024
89 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
90 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
91 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
92 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
93 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
95 /* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */
96 #define REDIS_WRITEV_THRESHOLD 3
97 /* Max number of iovecs used for each writev call */
98 #define REDIS_WRITEV_IOVEC_COUNT 256
100 /* Hash table parameters */
101 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
104 #define REDIS_CMD_BULK 1 /* Bulk write command */
105 #define REDIS_CMD_INLINE 2 /* Inline command */
106 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
107 this flags will return an error when the 'maxmemory' option is set in the
108 config file and the server is using more than maxmemory bytes of memory.
109 In short this commands are denied on low memory conditions. */
110 #define REDIS_CMD_DENYOOM 4
113 #define REDIS_STRING 0
119 /* Objects encoding */
120 #define REDIS_ENCODING_RAW 0 /* Raw representation */
121 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
123 /* Object types only used for dumping to disk */
124 #define REDIS_EXPIRETIME 253
125 #define REDIS_SELECTDB 254
126 #define REDIS_EOF 255
128 /* Defines related to the dump file format. To store 32 bits lengths for short
129 * keys requires a lot of space, so we check the most significant 2 bits of
130 * the first byte to interpreter the length:
132 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
133 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
134 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
135 * 11|000000 this means: specially encoded object will follow. The six bits
136 * number specify the kind of object that follows.
137 * See the REDIS_RDB_ENC_* defines.
139 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
140 * values, will fit inside. */
141 #define REDIS_RDB_6BITLEN 0
142 #define REDIS_RDB_14BITLEN 1
143 #define REDIS_RDB_32BITLEN 2
144 #define REDIS_RDB_ENCVAL 3
145 #define REDIS_RDB_LENERR UINT_MAX
147 /* When a length of a string object stored on disk has the first two bits
148 * set, the remaining two bits specify a special encoding for the object
149 * accordingly to the following defines: */
150 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
151 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
152 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
153 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
156 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
157 #define REDIS_SLAVE 2 /* This client is a slave server */
158 #define REDIS_MASTER 4 /* This client is a master server */
159 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
161 /* Slave replication state - slave side */
162 #define REDIS_REPL_NONE 0 /* No active replication */
163 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
164 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
166 /* Slave replication state - from the point of view of master
167 * Note that in SEND_BULK and ONLINE state the slave receives new updates
168 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
169 * to start the next background saving in order to send updates to it. */
170 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
171 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
172 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
173 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
175 /* List related stuff */
179 /* Sort operations */
180 #define REDIS_SORT_GET 0
181 #define REDIS_SORT_ASC 1
182 #define REDIS_SORT_DESC 2
183 #define REDIS_SORTKEY_MAX 1024
186 #define REDIS_DEBUG 0
187 #define REDIS_NOTICE 1
188 #define REDIS_WARNING 2
190 /* Anti-warning macro... */
191 #define REDIS_NOTUSED(V) ((void) V)
193 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
194 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
196 /* Append only defines */
197 #define APPENDFSYNC_NO 0
198 #define APPENDFSYNC_ALWAYS 1
199 #define APPENDFSYNC_EVERYSEC 2
201 /* We can print the stacktrace, so our assert is defined this way: */
202 #define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e),exit(1)))
203 static void _redisAssert(char *estr
);
205 /*================================= Data types ============================== */
207 /* A redis object, that is a type able to hold a string / list / set */
208 typedef struct redisObject
{
211 unsigned char encoding
;
212 unsigned char notused
[2];
216 /* Macro used to initalize a Redis object allocated on the stack.
217 * Note that this macro is taken near the structure definition to make sure
218 * we'll update it when the structure is changed, to avoid bugs like
219 * bug #85 introduced exactly in this way. */
220 #define initStaticStringObject(_var,_ptr) do { \
222 _var.type = REDIS_STRING; \
223 _var.encoding = REDIS_ENCODING_RAW; \
227 typedef struct redisDb
{
233 /* With multiplexing we need to take per-clinet state.
234 * Clients are taken in a liked list. */
235 typedef struct redisClient
{
240 robj
**argv
, **mbargv
;
242 int bulklen
; /* bulk read len. -1 if not in bulk read mode */
243 int multibulk
; /* multi bulk command format active */
246 time_t lastinteraction
; /* time of the last interaction, used for timeout */
247 int flags
; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
248 int slaveseldb
; /* slave selected db, if this client is a slave */
249 int authenticated
; /* when requirepass is non-NULL */
250 int replstate
; /* replication state if this is a slave */
251 int repldbfd
; /* replication DB file descriptor */
252 long repldboff
; /* replication DB file offset */
253 off_t repldbsize
; /* replication DB file size */
261 /* Global server state structure */
267 unsigned int sharingpoolsize
;
268 long long dirty
; /* changes to DB from the last save */
270 list
*slaves
, *monitors
;
271 char neterr
[ANET_ERR_LEN
];
273 int cronloops
; /* number of times the cron function run */
274 list
*objfreelist
; /* A list of freed objects to avoid malloc() */
275 time_t lastsave
; /* Unix time of last save succeeede */
276 size_t usedmemory
; /* Used memory in megabytes */
277 /* Fields used only for stats */
278 time_t stat_starttime
; /* server start time */
279 long long stat_numcommands
; /* number of processed commands */
280 long long stat_numconnections
; /* number of connections received */
293 pid_t bgsavechildpid
;
294 pid_t bgrewritechildpid
;
295 sds bgrewritebuf
; /* buffer taken by parent during oppend only rewrite */
296 struct saveparam
*saveparams
;
301 char *appendfilename
;
305 /* Replication related */
310 redisClient
*master
; /* client that is master for this slave */
312 unsigned int maxclients
;
313 unsigned long maxmemory
;
314 /* Sort parameters - qsort_r() is only available under BSD so we
315 * have to take this state global, in order to pass it to sortCompare() */
321 typedef void redisCommandProc(redisClient
*c
);
322 struct redisCommand
{
324 redisCommandProc
*proc
;
329 struct redisFunctionSym
{
331 unsigned long pointer
;
334 typedef struct _redisSortObject
{
342 typedef struct _redisSortOperation
{
345 } redisSortOperation
;
347 /* ZSETs use a specialized version of Skiplists */
349 typedef struct zskiplistNode
{
350 struct zskiplistNode
**forward
;
351 struct zskiplistNode
*backward
;
356 typedef struct zskiplist
{
357 struct zskiplistNode
*header
, *tail
;
358 unsigned long length
;
362 typedef struct zset
{
367 /* Our shared "common" objects */
369 struct sharedObjectsStruct
{
370 robj
*crlf
, *ok
, *err
, *emptybulk
, *czero
, *cone
, *pong
, *space
,
371 *colon
, *nullbulk
, *nullmultibulk
,
372 *emptymultibulk
, *wrongtypeerr
, *nokeyerr
, *syntaxerr
, *sameobjecterr
,
373 *outofrangeerr
, *plus
,
374 *select0
, *select1
, *select2
, *select3
, *select4
,
375 *select5
, *select6
, *select7
, *select8
, *select9
;
378 /* Global vars that are actally used as constants. The following double
379 * values are used for double on-disk serialization, and are initialized
380 * at runtime to avoid strange compiler optimizations. */
382 static double R_Zero
, R_PosInf
, R_NegInf
, R_Nan
;
384 /*================================ Prototypes =============================== */
386 static void freeStringObject(robj
*o
);
387 static void freeListObject(robj
*o
);
388 static void freeSetObject(robj
*o
);
389 static void decrRefCount(void *o
);
390 static robj
*createObject(int type
, void *ptr
);
391 static void freeClient(redisClient
*c
);
392 static int rdbLoad(char *filename
);
393 static void addReply(redisClient
*c
, robj
*obj
);
394 static void addReplySds(redisClient
*c
, sds s
);
395 static void incrRefCount(robj
*o
);
396 static int rdbSaveBackground(char *filename
);
397 static robj
*createStringObject(char *ptr
, size_t len
);
398 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
);
399 static void feedAppendOnlyFile(struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
);
400 static int syncWithMaster(void);
401 static robj
*tryObjectSharing(robj
*o
);
402 static int tryObjectEncoding(robj
*o
);
403 static robj
*getDecodedObject(robj
*o
);
404 static int removeExpire(redisDb
*db
, robj
*key
);
405 static int expireIfNeeded(redisDb
*db
, robj
*key
);
406 static int deleteIfVolatile(redisDb
*db
, robj
*key
);
407 static int deleteKey(redisDb
*db
, robj
*key
);
408 static time_t getExpire(redisDb
*db
, robj
*key
);
409 static int setExpire(redisDb
*db
, robj
*key
, time_t when
);
410 static void updateSlavesWaitingBgsave(int bgsaveerr
);
411 static void freeMemoryIfNeeded(void);
412 static int processCommand(redisClient
*c
);
413 static void setupSigSegvAction(void);
414 static void rdbRemoveTempFile(pid_t childpid
);
415 static void aofRemoveTempFile(pid_t childpid
);
416 static size_t stringObjectLen(robj
*o
);
417 static void processInputBuffer(redisClient
*c
);
418 static zskiplist
*zslCreate(void);
419 static void zslFree(zskiplist
*zsl
);
420 static void zslInsert(zskiplist
*zsl
, double score
, robj
*obj
);
421 static void sendReplyToClientWritev(aeEventLoop
*el
, int fd
, void *privdata
, int mask
);
423 static void authCommand(redisClient
*c
);
424 static void pingCommand(redisClient
*c
);
425 static void echoCommand(redisClient
*c
);
426 static void setCommand(redisClient
*c
);
427 static void setnxCommand(redisClient
*c
);
428 static void getCommand(redisClient
*c
);
429 static void delCommand(redisClient
*c
);
430 static void existsCommand(redisClient
*c
);
431 static void incrCommand(redisClient
*c
);
432 static void decrCommand(redisClient
*c
);
433 static void incrbyCommand(redisClient
*c
);
434 static void decrbyCommand(redisClient
*c
);
435 static void selectCommand(redisClient
*c
);
436 static void randomkeyCommand(redisClient
*c
);
437 static void keysCommand(redisClient
*c
);
438 static void dbsizeCommand(redisClient
*c
);
439 static void lastsaveCommand(redisClient
*c
);
440 static void saveCommand(redisClient
*c
);
441 static void bgsaveCommand(redisClient
*c
);
442 static void bgrewriteaofCommand(redisClient
*c
);
443 static void shutdownCommand(redisClient
*c
);
444 static void moveCommand(redisClient
*c
);
445 static void renameCommand(redisClient
*c
);
446 static void renamenxCommand(redisClient
*c
);
447 static void lpushCommand(redisClient
*c
);
448 static void rpushCommand(redisClient
*c
);
449 static void lpopCommand(redisClient
*c
);
450 static void rpopCommand(redisClient
*c
);
451 static void llenCommand(redisClient
*c
);
452 static void lindexCommand(redisClient
*c
);
453 static void lrangeCommand(redisClient
*c
);
454 static void ltrimCommand(redisClient
*c
);
455 static void typeCommand(redisClient
*c
);
456 static void lsetCommand(redisClient
*c
);
457 static void saddCommand(redisClient
*c
);
458 static void sremCommand(redisClient
*c
);
459 static void smoveCommand(redisClient
*c
);
460 static void sismemberCommand(redisClient
*c
);
461 static void scardCommand(redisClient
*c
);
462 static void spopCommand(redisClient
*c
);
463 static void srandmemberCommand(redisClient
*c
);
464 static void sinterCommand(redisClient
*c
);
465 static void sinterstoreCommand(redisClient
*c
);
466 static void sunionCommand(redisClient
*c
);
467 static void sunionstoreCommand(redisClient
*c
);
468 static void sdiffCommand(redisClient
*c
);
469 static void sdiffstoreCommand(redisClient
*c
);
470 static void syncCommand(redisClient
*c
);
471 static void flushdbCommand(redisClient
*c
);
472 static void flushallCommand(redisClient
*c
);
473 static void sortCommand(redisClient
*c
);
474 static void lremCommand(redisClient
*c
);
475 static void rpoplpushcommand(redisClient
*c
);
476 static void infoCommand(redisClient
*c
);
477 static void mgetCommand(redisClient
*c
);
478 static void monitorCommand(redisClient
*c
);
479 static void expireCommand(redisClient
*c
);
480 static void expireatCommand(redisClient
*c
);
481 static void getsetCommand(redisClient
*c
);
482 static void ttlCommand(redisClient
*c
);
483 static void slaveofCommand(redisClient
*c
);
484 static void debugCommand(redisClient
*c
);
485 static void msetCommand(redisClient
*c
);
486 static void msetnxCommand(redisClient
*c
);
487 static void zaddCommand(redisClient
*c
);
488 static void zincrbyCommand(redisClient
*c
);
489 static void zrangeCommand(redisClient
*c
);
490 static void zrangebyscoreCommand(redisClient
*c
);
491 static void zrevrangeCommand(redisClient
*c
);
492 static void zcardCommand(redisClient
*c
);
493 static void zremCommand(redisClient
*c
);
494 static void zscoreCommand(redisClient
*c
);
495 static void zremrangebyscoreCommand(redisClient
*c
);
497 /*================================= Globals ================================= */
500 static struct redisServer server
; /* server global state */
501 static struct redisCommand cmdTable
[] = {
502 {"get",getCommand
,2,REDIS_CMD_INLINE
},
503 {"set",setCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
504 {"setnx",setnxCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
505 {"del",delCommand
,-2,REDIS_CMD_INLINE
},
506 {"exists",existsCommand
,2,REDIS_CMD_INLINE
},
507 {"incr",incrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
508 {"decr",decrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
509 {"mget",mgetCommand
,-2,REDIS_CMD_INLINE
},
510 {"rpush",rpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
511 {"lpush",lpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
512 {"rpop",rpopCommand
,2,REDIS_CMD_INLINE
},
513 {"lpop",lpopCommand
,2,REDIS_CMD_INLINE
},
514 {"llen",llenCommand
,2,REDIS_CMD_INLINE
},
515 {"lindex",lindexCommand
,3,REDIS_CMD_INLINE
},
516 {"lset",lsetCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
517 {"lrange",lrangeCommand
,4,REDIS_CMD_INLINE
},
518 {"ltrim",ltrimCommand
,4,REDIS_CMD_INLINE
},
519 {"lrem",lremCommand
,4,REDIS_CMD_BULK
},
520 {"rpoplpush",rpoplpushcommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
521 {"sadd",saddCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
522 {"srem",sremCommand
,3,REDIS_CMD_BULK
},
523 {"smove",smoveCommand
,4,REDIS_CMD_BULK
},
524 {"sismember",sismemberCommand
,3,REDIS_CMD_BULK
},
525 {"scard",scardCommand
,2,REDIS_CMD_INLINE
},
526 {"spop",spopCommand
,2,REDIS_CMD_INLINE
},
527 {"srandmember",srandmemberCommand
,2,REDIS_CMD_INLINE
},
528 {"sinter",sinterCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
529 {"sinterstore",sinterstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
530 {"sunion",sunionCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
531 {"sunionstore",sunionstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
532 {"sdiff",sdiffCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
533 {"sdiffstore",sdiffstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
534 {"smembers",sinterCommand
,2,REDIS_CMD_INLINE
},
535 {"zadd",zaddCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
536 {"zincrby",zincrbyCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
537 {"zrem",zremCommand
,3,REDIS_CMD_BULK
},
538 {"zremrangebyscore",zremrangebyscoreCommand
,4,REDIS_CMD_INLINE
},
539 {"zrange",zrangeCommand
,-4,REDIS_CMD_INLINE
},
540 {"zrangebyscore",zrangebyscoreCommand
,-4,REDIS_CMD_INLINE
},
541 {"zrevrange",zrevrangeCommand
,-4,REDIS_CMD_INLINE
},
542 {"zcard",zcardCommand
,2,REDIS_CMD_INLINE
},
543 {"zscore",zscoreCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
544 {"incrby",incrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
545 {"decrby",decrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
546 {"getset",getsetCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
547 {"mset",msetCommand
,-3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
548 {"msetnx",msetnxCommand
,-3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
549 {"randomkey",randomkeyCommand
,1,REDIS_CMD_INLINE
},
550 {"select",selectCommand
,2,REDIS_CMD_INLINE
},
551 {"move",moveCommand
,3,REDIS_CMD_INLINE
},
552 {"rename",renameCommand
,3,REDIS_CMD_INLINE
},
553 {"renamenx",renamenxCommand
,3,REDIS_CMD_INLINE
},
554 {"expire",expireCommand
,3,REDIS_CMD_INLINE
},
555 {"expireat",expireatCommand
,3,REDIS_CMD_INLINE
},
556 {"keys",keysCommand
,2,REDIS_CMD_INLINE
},
557 {"dbsize",dbsizeCommand
,1,REDIS_CMD_INLINE
},
558 {"auth",authCommand
,2,REDIS_CMD_INLINE
},
559 {"ping",pingCommand
,1,REDIS_CMD_INLINE
},
560 {"echo",echoCommand
,2,REDIS_CMD_BULK
},
561 {"save",saveCommand
,1,REDIS_CMD_INLINE
},
562 {"bgsave",bgsaveCommand
,1,REDIS_CMD_INLINE
},
563 {"bgrewriteaof",bgrewriteaofCommand
,1,REDIS_CMD_INLINE
},
564 {"shutdown",shutdownCommand
,1,REDIS_CMD_INLINE
},
565 {"lastsave",lastsaveCommand
,1,REDIS_CMD_INLINE
},
566 {"type",typeCommand
,2,REDIS_CMD_INLINE
},
567 {"sync",syncCommand
,1,REDIS_CMD_INLINE
},
568 {"flushdb",flushdbCommand
,1,REDIS_CMD_INLINE
},
569 {"flushall",flushallCommand
,1,REDIS_CMD_INLINE
},
570 {"sort",sortCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
571 {"info",infoCommand
,1,REDIS_CMD_INLINE
},
572 {"monitor",monitorCommand
,1,REDIS_CMD_INLINE
},
573 {"ttl",ttlCommand
,2,REDIS_CMD_INLINE
},
574 {"slaveof",slaveofCommand
,3,REDIS_CMD_INLINE
},
575 {"debug",debugCommand
,-2,REDIS_CMD_INLINE
},
579 /*============================ Utility functions ============================ */
581 /* Glob-style pattern matching. */
582 int stringmatchlen(const char *pattern
, int patternLen
,
583 const char *string
, int stringLen
, int nocase
)
588 while (pattern
[1] == '*') {
593 return 1; /* match */
595 if (stringmatchlen(pattern
+1, patternLen
-1,
596 string
, stringLen
, nocase
))
597 return 1; /* match */
601 return 0; /* no match */
605 return 0; /* no match */
615 not = pattern
[0] == '^';
622 if (pattern
[0] == '\\') {
625 if (pattern
[0] == string
[0])
627 } else if (pattern
[0] == ']') {
629 } else if (patternLen
== 0) {
633 } else if (pattern
[1] == '-' && patternLen
>= 3) {
634 int start
= pattern
[0];
635 int end
= pattern
[2];
643 start
= tolower(start
);
649 if (c
>= start
&& c
<= end
)
653 if (pattern
[0] == string
[0])
656 if (tolower((int)pattern
[0]) == tolower((int)string
[0]))
666 return 0; /* no match */
672 if (patternLen
>= 2) {
679 if (pattern
[0] != string
[0])
680 return 0; /* no match */
682 if (tolower((int)pattern
[0]) != tolower((int)string
[0]))
683 return 0; /* no match */
691 if (stringLen
== 0) {
692 while(*pattern
== '*') {
699 if (patternLen
== 0 && stringLen
== 0)
704 static void redisLog(int level
, const char *fmt
, ...) {
708 fp
= (server
.logfile
== NULL
) ? stdout
: fopen(server
.logfile
,"a");
712 if (level
>= server
.verbosity
) {
718 strftime(buf
,64,"%d %b %H:%M:%S",localtime(&now
));
719 fprintf(fp
,"%s %c ",buf
,c
[level
]);
720 vfprintf(fp
, fmt
, ap
);
726 if (server
.logfile
) fclose(fp
);
729 /*====================== Hash table type implementation ==================== */
731 /* This is an hash table type that uses the SDS dynamic strings libary as
732 * keys and radis objects as values (objects can hold SDS strings,
735 static void dictVanillaFree(void *privdata
, void *val
)
737 DICT_NOTUSED(privdata
);
741 static int sdsDictKeyCompare(void *privdata
, const void *key1
,
745 DICT_NOTUSED(privdata
);
747 l1
= sdslen((sds
)key1
);
748 l2
= sdslen((sds
)key2
);
749 if (l1
!= l2
) return 0;
750 return memcmp(key1
, key2
, l1
) == 0;
753 static void dictRedisObjectDestructor(void *privdata
, void *val
)
755 DICT_NOTUSED(privdata
);
760 static int dictObjKeyCompare(void *privdata
, const void *key1
,
763 const robj
*o1
= key1
, *o2
= key2
;
764 return sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
767 static unsigned int dictObjHash(const void *key
) {
769 return dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
772 static int dictEncObjKeyCompare(void *privdata
, const void *key1
,
775 robj
*o1
= (robj
*) key1
, *o2
= (robj
*) key2
;
778 o1
= getDecodedObject(o1
);
779 o2
= getDecodedObject(o2
);
780 cmp
= sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
786 static unsigned int dictEncObjHash(const void *key
) {
787 robj
*o
= (robj
*) key
;
789 o
= getDecodedObject(o
);
790 unsigned int hash
= dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
795 static dictType setDictType
= {
796 dictEncObjHash
, /* hash function */
799 dictEncObjKeyCompare
, /* key compare */
800 dictRedisObjectDestructor
, /* key destructor */
801 NULL
/* val destructor */
804 static dictType zsetDictType
= {
805 dictEncObjHash
, /* hash function */
808 dictEncObjKeyCompare
, /* key compare */
809 dictRedisObjectDestructor
, /* key destructor */
810 dictVanillaFree
/* val destructor of malloc(sizeof(double)) */
813 static dictType hashDictType
= {
814 dictObjHash
, /* hash function */
817 dictObjKeyCompare
, /* key compare */
818 dictRedisObjectDestructor
, /* key destructor */
819 dictRedisObjectDestructor
/* val destructor */
822 /* ========================= Random utility functions ======================= */
824 /* Redis generally does not try to recover from out of memory conditions
825 * when allocating objects or strings, it is not clear if it will be possible
826 * to report this condition to the client since the networking layer itself
827 * is based on heap allocation for send buffers, so we simply abort.
828 * At least the code will be simpler to read... */
829 static void oom(const char *msg
) {
830 redisLog(REDIS_WARNING
, "%s: Out of memory\n",msg
);
835 /* ====================== Redis server networking stuff ===================== */
836 static void closeTimedoutClients(void) {
839 time_t now
= time(NULL
);
841 listRewind(server
.clients
);
842 while ((ln
= listYield(server
.clients
)) != NULL
) {
843 c
= listNodeValue(ln
);
844 if (!(c
->flags
& REDIS_SLAVE
) && /* no timeout for slaves */
845 !(c
->flags
& REDIS_MASTER
) && /* no timeout for masters */
846 (now
- c
->lastinteraction
> server
.maxidletime
)) {
847 redisLog(REDIS_DEBUG
,"Closing idle client");
853 static int htNeedsResize(dict
*dict
) {
854 long long size
, used
;
856 size
= dictSlots(dict
);
857 used
= dictSize(dict
);
858 return (size
&& used
&& size
> DICT_HT_INITIAL_SIZE
&&
859 (used
*100/size
< REDIS_HT_MINFILL
));
862 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
863 * we resize the hash table to save memory */
864 static void tryResizeHashTables(void) {
867 for (j
= 0; j
< server
.dbnum
; j
++) {
868 if (htNeedsResize(server
.db
[j
].dict
)) {
869 redisLog(REDIS_DEBUG
,"The hash table %d is too sparse, resize it...",j
);
870 dictResize(server
.db
[j
].dict
);
871 redisLog(REDIS_DEBUG
,"Hash table %d resized.",j
);
873 if (htNeedsResize(server
.db
[j
].expires
))
874 dictResize(server
.db
[j
].expires
);
878 /* A background saving child (BGSAVE) terminated its work. Handle this. */
879 void backgroundSaveDoneHandler(int statloc
) {
880 int exitcode
= WEXITSTATUS(statloc
);
881 int bysignal
= WIFSIGNALED(statloc
);
883 if (!bysignal
&& exitcode
== 0) {
884 redisLog(REDIS_NOTICE
,
885 "Background saving terminated with success");
887 server
.lastsave
= time(NULL
);
888 } else if (!bysignal
&& exitcode
!= 0) {
889 redisLog(REDIS_WARNING
, "Background saving error");
891 redisLog(REDIS_WARNING
,
892 "Background saving terminated by signal");
893 rdbRemoveTempFile(server
.bgsavechildpid
);
895 server
.bgsavechildpid
= -1;
896 /* Possibly there are slaves waiting for a BGSAVE in order to be served
897 * (the first stage of SYNC is a bulk transfer of dump.rdb) */
898 updateSlavesWaitingBgsave(exitcode
== 0 ? REDIS_OK
: REDIS_ERR
);
901 /* A background append only file rewriting (BGREWRITEAOF) terminated its work.
903 void backgroundRewriteDoneHandler(int statloc
) {
904 int exitcode
= WEXITSTATUS(statloc
);
905 int bysignal
= WIFSIGNALED(statloc
);
907 if (!bysignal
&& exitcode
== 0) {
911 redisLog(REDIS_NOTICE
,
912 "Background append only file rewriting terminated with success");
913 /* Now it's time to flush the differences accumulated by the parent */
914 snprintf(tmpfile
,256,"temp-rewriteaof-bg-%d.aof", (int) server
.bgrewritechildpid
);
915 fd
= open(tmpfile
,O_WRONLY
|O_APPEND
);
917 redisLog(REDIS_WARNING
, "Not able to open the temp append only file produced by the child: %s", strerror(errno
));
920 /* Flush our data... */
921 if (write(fd
,server
.bgrewritebuf
,sdslen(server
.bgrewritebuf
)) !=
922 (signed) sdslen(server
.bgrewritebuf
)) {
923 redisLog(REDIS_WARNING
, "Error or short write trying to flush the parent diff of the append log file in the child temp file: %s", strerror(errno
));
927 redisLog(REDIS_NOTICE
,"Parent diff flushed into the new append log file with success (%lu bytes)",sdslen(server
.bgrewritebuf
));
928 /* Now our work is to rename the temp file into the stable file. And
929 * switch the file descriptor used by the server for append only. */
930 if (rename(tmpfile
,server
.appendfilename
) == -1) {
931 redisLog(REDIS_WARNING
,"Can't rename the temp append only file into the stable one: %s", strerror(errno
));
935 /* Mission completed... almost */
936 redisLog(REDIS_NOTICE
,"Append only file successfully rewritten.");
937 if (server
.appendfd
!= -1) {
938 /* If append only is actually enabled... */
939 close(server
.appendfd
);
940 server
.appendfd
= fd
;
942 server
.appendseldb
= -1; /* Make sure it will issue SELECT */
943 redisLog(REDIS_NOTICE
,"The new append only file was selected for future appends.");
945 /* If append only is disabled we just generate a dump in this
946 * format. Why not? */
949 } else if (!bysignal
&& exitcode
!= 0) {
950 redisLog(REDIS_WARNING
, "Background append only file rewriting error");
952 redisLog(REDIS_WARNING
,
953 "Background append only file rewriting terminated by signal");
956 sdsfree(server
.bgrewritebuf
);
957 server
.bgrewritebuf
= sdsempty();
958 aofRemoveTempFile(server
.bgrewritechildpid
);
959 server
.bgrewritechildpid
= -1;
962 static int serverCron(struct aeEventLoop
*eventLoop
, long long id
, void *clientData
) {
963 int j
, loops
= server
.cronloops
++;
964 REDIS_NOTUSED(eventLoop
);
966 REDIS_NOTUSED(clientData
);
968 /* Update the global state with the amount of used memory */
969 server
.usedmemory
= zmalloc_used_memory();
971 /* Show some info about non-empty databases */
972 for (j
= 0; j
< server
.dbnum
; j
++) {
973 long long size
, used
, vkeys
;
975 size
= dictSlots(server
.db
[j
].dict
);
976 used
= dictSize(server
.db
[j
].dict
);
977 vkeys
= dictSize(server
.db
[j
].expires
);
978 if (!(loops
% 5) && (used
|| vkeys
)) {
979 redisLog(REDIS_DEBUG
,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j
,used
,vkeys
,size
);
980 /* dictPrintStats(server.dict); */
984 /* We don't want to resize the hash tables while a bacground saving
985 * is in progress: the saving child is created using fork() that is
986 * implemented with a copy-on-write semantic in most modern systems, so
987 * if we resize the HT while there is the saving child at work actually
988 * a lot of memory movements in the parent will cause a lot of pages
990 if (server
.bgsavechildpid
== -1) tryResizeHashTables();
992 /* Show information about connected clients */
994 redisLog(REDIS_DEBUG
,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
995 listLength(server
.clients
)-listLength(server
.slaves
),
996 listLength(server
.slaves
),
998 dictSize(server
.sharingpool
));
1001 /* Close connections of timedout clients */
1002 if (server
.maxidletime
&& !(loops
% 10))
1003 closeTimedoutClients();
1005 /* Check if a background saving or AOF rewrite in progress terminated */
1006 if (server
.bgsavechildpid
!= -1 || server
.bgrewritechildpid
!= -1) {
1010 if ((pid
= wait3(&statloc
,WNOHANG
,NULL
)) != 0) {
1011 if (pid
== server
.bgsavechildpid
) {
1012 backgroundSaveDoneHandler(statloc
);
1014 backgroundRewriteDoneHandler(statloc
);
1018 /* If there is not a background saving in progress check if
1019 * we have to save now */
1020 time_t now
= time(NULL
);
1021 for (j
= 0; j
< server
.saveparamslen
; j
++) {
1022 struct saveparam
*sp
= server
.saveparams
+j
;
1024 if (server
.dirty
>= sp
->changes
&&
1025 now
-server
.lastsave
> sp
->seconds
) {
1026 redisLog(REDIS_NOTICE
,"%d changes in %d seconds. Saving...",
1027 sp
->changes
, sp
->seconds
);
1028 rdbSaveBackground(server
.dbfilename
);
1034 /* Try to expire a few timed out keys. The algorithm used is adaptive and
1035 * will use few CPU cycles if there are few expiring keys, otherwise
1036 * it will get more aggressive to avoid that too much memory is used by
1037 * keys that can be removed from the keyspace. */
1038 for (j
= 0; j
< server
.dbnum
; j
++) {
1040 redisDb
*db
= server
.db
+j
;
1042 /* Continue to expire if at the end of the cycle more than 25%
1043 * of the keys were expired. */
1045 int num
= dictSize(db
->expires
);
1046 time_t now
= time(NULL
);
1049 if (num
> REDIS_EXPIRELOOKUPS_PER_CRON
)
1050 num
= REDIS_EXPIRELOOKUPS_PER_CRON
;
1055 if ((de
= dictGetRandomKey(db
->expires
)) == NULL
) break;
1056 t
= (time_t) dictGetEntryVal(de
);
1058 deleteKey(db
,dictGetEntryKey(de
));
1062 } while (expired
> REDIS_EXPIRELOOKUPS_PER_CRON
/4);
1065 /* Check if we should connect to a MASTER */
1066 if (server
.replstate
== REDIS_REPL_CONNECT
) {
1067 redisLog(REDIS_NOTICE
,"Connecting to MASTER...");
1068 if (syncWithMaster() == REDIS_OK
) {
1069 redisLog(REDIS_NOTICE
,"MASTER <-> SLAVE sync succeeded");
1075 static void createSharedObjects(void) {
1076 shared
.crlf
= createObject(REDIS_STRING
,sdsnew("\r\n"));
1077 shared
.ok
= createObject(REDIS_STRING
,sdsnew("+OK\r\n"));
1078 shared
.err
= createObject(REDIS_STRING
,sdsnew("-ERR\r\n"));
1079 shared
.emptybulk
= createObject(REDIS_STRING
,sdsnew("$0\r\n\r\n"));
1080 shared
.czero
= createObject(REDIS_STRING
,sdsnew(":0\r\n"));
1081 shared
.cone
= createObject(REDIS_STRING
,sdsnew(":1\r\n"));
1082 shared
.nullbulk
= createObject(REDIS_STRING
,sdsnew("$-1\r\n"));
1083 shared
.nullmultibulk
= createObject(REDIS_STRING
,sdsnew("*-1\r\n"));
1084 shared
.emptymultibulk
= createObject(REDIS_STRING
,sdsnew("*0\r\n"));
1086 shared
.pong
= createObject(REDIS_STRING
,sdsnew("+PONG\r\n"));
1087 shared
.wrongtypeerr
= createObject(REDIS_STRING
,sdsnew(
1088 "-ERR Operation against a key holding the wrong kind of value\r\n"));
1089 shared
.nokeyerr
= createObject(REDIS_STRING
,sdsnew(
1090 "-ERR no such key\r\n"));
1091 shared
.syntaxerr
= createObject(REDIS_STRING
,sdsnew(
1092 "-ERR syntax error\r\n"));
1093 shared
.sameobjecterr
= createObject(REDIS_STRING
,sdsnew(
1094 "-ERR source and destination objects are the same\r\n"));
1095 shared
.outofrangeerr
= createObject(REDIS_STRING
,sdsnew(
1096 "-ERR index out of range\r\n"));
1097 shared
.space
= createObject(REDIS_STRING
,sdsnew(" "));
1098 shared
.colon
= createObject(REDIS_STRING
,sdsnew(":"));
1099 shared
.plus
= createObject(REDIS_STRING
,sdsnew("+"));
1100 shared
.select0
= createStringObject("select 0\r\n",10);
1101 shared
.select1
= createStringObject("select 1\r\n",10);
1102 shared
.select2
= createStringObject("select 2\r\n",10);
1103 shared
.select3
= createStringObject("select 3\r\n",10);
1104 shared
.select4
= createStringObject("select 4\r\n",10);
1105 shared
.select5
= createStringObject("select 5\r\n",10);
1106 shared
.select6
= createStringObject("select 6\r\n",10);
1107 shared
.select7
= createStringObject("select 7\r\n",10);
1108 shared
.select8
= createStringObject("select 8\r\n",10);
1109 shared
.select9
= createStringObject("select 9\r\n",10);
1112 static void appendServerSaveParams(time_t seconds
, int changes
) {
1113 server
.saveparams
= zrealloc(server
.saveparams
,sizeof(struct saveparam
)*(server
.saveparamslen
+1));
1114 server
.saveparams
[server
.saveparamslen
].seconds
= seconds
;
1115 server
.saveparams
[server
.saveparamslen
].changes
= changes
;
1116 server
.saveparamslen
++;
1119 static void resetServerSaveParams() {
1120 zfree(server
.saveparams
);
1121 server
.saveparams
= NULL
;
1122 server
.saveparamslen
= 0;
1125 static void initServerConfig() {
1126 server
.dbnum
= REDIS_DEFAULT_DBNUM
;
1127 server
.port
= REDIS_SERVERPORT
;
1128 server
.verbosity
= REDIS_DEBUG
;
1129 server
.maxidletime
= REDIS_MAXIDLETIME
;
1130 server
.saveparams
= NULL
;
1131 server
.logfile
= NULL
; /* NULL = log on standard output */
1132 server
.bindaddr
= NULL
;
1133 server
.glueoutputbuf
= 1;
1134 server
.daemonize
= 0;
1135 server
.appendonly
= 0;
1136 server
.appendfsync
= APPENDFSYNC_ALWAYS
;
1137 server
.lastfsync
= time(NULL
);
1138 server
.appendfd
= -1;
1139 server
.appendseldb
= -1; /* Make sure the first time will not match */
1140 server
.pidfile
= "/var/run/redis.pid";
1141 server
.dbfilename
= "dump.rdb";
1142 server
.appendfilename
= "appendonly.aof";
1143 server
.requirepass
= NULL
;
1144 server
.shareobjects
= 0;
1145 server
.rdbcompression
= 1;
1146 server
.sharingpoolsize
= 1024;
1147 server
.maxclients
= 0;
1148 server
.maxmemory
= 0;
1149 resetServerSaveParams();
1151 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1152 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1153 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1154 /* Replication related */
1156 server
.masterauth
= NULL
;
1157 server
.masterhost
= NULL
;
1158 server
.masterport
= 6379;
1159 server
.master
= NULL
;
1160 server
.replstate
= REDIS_REPL_NONE
;
1162 /* Double constants initialization */
1164 R_PosInf
= 1.0/R_Zero
;
1165 R_NegInf
= -1.0/R_Zero
;
1166 R_Nan
= R_Zero
/R_Zero
;
1169 static void initServer() {
1172 signal(SIGHUP
, SIG_IGN
);
1173 signal(SIGPIPE
, SIG_IGN
);
1174 setupSigSegvAction();
1176 server
.clients
= listCreate();
1177 server
.slaves
= listCreate();
1178 server
.monitors
= listCreate();
1179 server
.objfreelist
= listCreate();
1180 createSharedObjects();
1181 server
.el
= aeCreateEventLoop();
1182 server
.db
= zmalloc(sizeof(redisDb
)*server
.dbnum
);
1183 server
.sharingpool
= dictCreate(&setDictType
,NULL
);
1184 server
.fd
= anetTcpServer(server
.neterr
, server
.port
, server
.bindaddr
);
1185 if (server
.fd
== -1) {
1186 redisLog(REDIS_WARNING
, "Opening TCP port: %s", server
.neterr
);
1189 for (j
= 0; j
< server
.dbnum
; j
++) {
1190 server
.db
[j
].dict
= dictCreate(&hashDictType
,NULL
);
1191 server
.db
[j
].expires
= dictCreate(&setDictType
,NULL
);
1192 server
.db
[j
].id
= j
;
1194 server
.cronloops
= 0;
1195 server
.bgsavechildpid
= -1;
1196 server
.bgrewritechildpid
= -1;
1197 server
.bgrewritebuf
= sdsempty();
1198 server
.lastsave
= time(NULL
);
1200 server
.usedmemory
= 0;
1201 server
.stat_numcommands
= 0;
1202 server
.stat_numconnections
= 0;
1203 server
.stat_starttime
= time(NULL
);
1204 aeCreateTimeEvent(server
.el
, 1, serverCron
, NULL
, NULL
);
1206 if (server
.appendonly
) {
1207 server
.appendfd
= open(server
.appendfilename
,O_WRONLY
|O_APPEND
|O_CREAT
,0644);
1208 if (server
.appendfd
== -1) {
1209 redisLog(REDIS_WARNING
, "Can't open the append-only file: %s",
1216 /* Empty the whole database */
1217 static long long emptyDb() {
1219 long long removed
= 0;
1221 for (j
= 0; j
< server
.dbnum
; j
++) {
1222 removed
+= dictSize(server
.db
[j
].dict
);
1223 dictEmpty(server
.db
[j
].dict
);
1224 dictEmpty(server
.db
[j
].expires
);
1229 static int yesnotoi(char *s
) {
1230 if (!strcasecmp(s
,"yes")) return 1;
1231 else if (!strcasecmp(s
,"no")) return 0;
1235 /* I agree, this is a very rudimental way to load a configuration...
1236 will improve later if the config gets more complex */
1237 static void loadServerConfig(char *filename
) {
1239 char buf
[REDIS_CONFIGLINE_MAX
+1], *err
= NULL
;
1243 if (filename
[0] == '-' && filename
[1] == '\0')
1246 if ((fp
= fopen(filename
,"r")) == NULL
) {
1247 redisLog(REDIS_WARNING
,"Fatal error, can't open config file");
1252 while(fgets(buf
,REDIS_CONFIGLINE_MAX
+1,fp
) != NULL
) {
1258 line
= sdstrim(line
," \t\r\n");
1260 /* Skip comments and blank lines*/
1261 if (line
[0] == '#' || line
[0] == '\0') {
1266 /* Split into arguments */
1267 argv
= sdssplitlen(line
,sdslen(line
)," ",1,&argc
);
1268 sdstolower(argv
[0]);
1270 /* Execute config directives */
1271 if (!strcasecmp(argv
[0],"timeout") && argc
== 2) {
1272 server
.maxidletime
= atoi(argv
[1]);
1273 if (server
.maxidletime
< 0) {
1274 err
= "Invalid timeout value"; goto loaderr
;
1276 } else if (!strcasecmp(argv
[0],"port") && argc
== 2) {
1277 server
.port
= atoi(argv
[1]);
1278 if (server
.port
< 1 || server
.port
> 65535) {
1279 err
= "Invalid port"; goto loaderr
;
1281 } else if (!strcasecmp(argv
[0],"bind") && argc
== 2) {
1282 server
.bindaddr
= zstrdup(argv
[1]);
1283 } else if (!strcasecmp(argv
[0],"save") && argc
== 3) {
1284 int seconds
= atoi(argv
[1]);
1285 int changes
= atoi(argv
[2]);
1286 if (seconds
< 1 || changes
< 0) {
1287 err
= "Invalid save parameters"; goto loaderr
;
1289 appendServerSaveParams(seconds
,changes
);
1290 } else if (!strcasecmp(argv
[0],"dir") && argc
== 2) {
1291 if (chdir(argv
[1]) == -1) {
1292 redisLog(REDIS_WARNING
,"Can't chdir to '%s': %s",
1293 argv
[1], strerror(errno
));
1296 } else if (!strcasecmp(argv
[0],"loglevel") && argc
== 2) {
1297 if (!strcasecmp(argv
[1],"debug")) server
.verbosity
= REDIS_DEBUG
;
1298 else if (!strcasecmp(argv
[1],"notice")) server
.verbosity
= REDIS_NOTICE
;
1299 else if (!strcasecmp(argv
[1],"warning")) server
.verbosity
= REDIS_WARNING
;
1301 err
= "Invalid log level. Must be one of debug, notice, warning";
1304 } else if (!strcasecmp(argv
[0],"logfile") && argc
== 2) {
1307 server
.logfile
= zstrdup(argv
[1]);
1308 if (!strcasecmp(server
.logfile
,"stdout")) {
1309 zfree(server
.logfile
);
1310 server
.logfile
= NULL
;
1312 if (server
.logfile
) {
1313 /* Test if we are able to open the file. The server will not
1314 * be able to abort just for this problem later... */
1315 logfp
= fopen(server
.logfile
,"a");
1316 if (logfp
== NULL
) {
1317 err
= sdscatprintf(sdsempty(),
1318 "Can't open the log file: %s", strerror(errno
));
1323 } else if (!strcasecmp(argv
[0],"databases") && argc
== 2) {
1324 server
.dbnum
= atoi(argv
[1]);
1325 if (server
.dbnum
< 1) {
1326 err
= "Invalid number of databases"; goto loaderr
;
1328 } else if (!strcasecmp(argv
[0],"maxclients") && argc
== 2) {
1329 server
.maxclients
= atoi(argv
[1]);
1330 } else if (!strcasecmp(argv
[0],"maxmemory") && argc
== 2) {
1331 server
.maxmemory
= strtoll(argv
[1], NULL
, 10);
1332 } else if (!strcasecmp(argv
[0],"slaveof") && argc
== 3) {
1333 server
.masterhost
= sdsnew(argv
[1]);
1334 server
.masterport
= atoi(argv
[2]);
1335 server
.replstate
= REDIS_REPL_CONNECT
;
1336 } else if (!strcasecmp(argv
[0],"masterauth") && argc
== 2) {
1337 server
.masterauth
= zstrdup(argv
[1]);
1338 } else if (!strcasecmp(argv
[0],"glueoutputbuf") && argc
== 2) {
1339 if ((server
.glueoutputbuf
= yesnotoi(argv
[1])) == -1) {
1340 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1342 } else if (!strcasecmp(argv
[0],"shareobjects") && argc
== 2) {
1343 if ((server
.shareobjects
= yesnotoi(argv
[1])) == -1) {
1344 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1346 } else if (!strcasecmp(argv
[0],"rdbcompression") && argc
== 2) {
1347 if ((server
.rdbcompression
= yesnotoi(argv
[1])) == -1) {
1348 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1350 } else if (!strcasecmp(argv
[0],"shareobjectspoolsize") && argc
== 2) {
1351 server
.sharingpoolsize
= atoi(argv
[1]);
1352 if (server
.sharingpoolsize
< 1) {
1353 err
= "invalid object sharing pool size"; goto loaderr
;
1355 } else if (!strcasecmp(argv
[0],"daemonize") && argc
== 2) {
1356 if ((server
.daemonize
= yesnotoi(argv
[1])) == -1) {
1357 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1359 } else if (!strcasecmp(argv
[0],"appendonly") && argc
== 2) {
1360 if ((server
.appendonly
= yesnotoi(argv
[1])) == -1) {
1361 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1363 } else if (!strcasecmp(argv
[0],"appendfsync") && argc
== 2) {
1364 if (!strcasecmp(argv
[1],"no")) {
1365 server
.appendfsync
= APPENDFSYNC_NO
;
1366 } else if (!strcasecmp(argv
[1],"always")) {
1367 server
.appendfsync
= APPENDFSYNC_ALWAYS
;
1368 } else if (!strcasecmp(argv
[1],"everysec")) {
1369 server
.appendfsync
= APPENDFSYNC_EVERYSEC
;
1371 err
= "argument must be 'no', 'always' or 'everysec'";
1374 } else if (!strcasecmp(argv
[0],"requirepass") && argc
== 2) {
1375 server
.requirepass
= zstrdup(argv
[1]);
1376 } else if (!strcasecmp(argv
[0],"pidfile") && argc
== 2) {
1377 server
.pidfile
= zstrdup(argv
[1]);
1378 } else if (!strcasecmp(argv
[0],"dbfilename") && argc
== 2) {
1379 server
.dbfilename
= zstrdup(argv
[1]);
1381 err
= "Bad directive or wrong number of arguments"; goto loaderr
;
1383 for (j
= 0; j
< argc
; j
++)
1388 if (fp
!= stdin
) fclose(fp
);
1392 fprintf(stderr
, "\n*** FATAL CONFIG FILE ERROR ***\n");
1393 fprintf(stderr
, "Reading the configuration file, at line %d\n", linenum
);
1394 fprintf(stderr
, ">>> '%s'\n", line
);
1395 fprintf(stderr
, "%s\n", err
);
1399 static void freeClientArgv(redisClient
*c
) {
1402 for (j
= 0; j
< c
->argc
; j
++)
1403 decrRefCount(c
->argv
[j
]);
1404 for (j
= 0; j
< c
->mbargc
; j
++)
1405 decrRefCount(c
->mbargv
[j
]);
1410 static void freeClient(redisClient
*c
) {
1413 aeDeleteFileEvent(server
.el
,c
->fd
,AE_READABLE
);
1414 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1415 sdsfree(c
->querybuf
);
1416 listRelease(c
->reply
);
1419 ln
= listSearchKey(server
.clients
,c
);
1420 redisAssert(ln
!= NULL
);
1421 listDelNode(server
.clients
,ln
);
1422 if (c
->flags
& REDIS_SLAVE
) {
1423 if (c
->replstate
== REDIS_REPL_SEND_BULK
&& c
->repldbfd
!= -1)
1425 list
*l
= (c
->flags
& REDIS_MONITOR
) ? server
.monitors
: server
.slaves
;
1426 ln
= listSearchKey(l
,c
);
1427 redisAssert(ln
!= NULL
);
1430 if (c
->flags
& REDIS_MASTER
) {
1431 server
.master
= NULL
;
1432 server
.replstate
= REDIS_REPL_CONNECT
;
1439 #define GLUEREPLY_UP_TO (1024)
1440 static void glueReplyBuffersIfNeeded(redisClient
*c
) {
1442 char buf
[GLUEREPLY_UP_TO
];
1446 listRewind(c
->reply
);
1447 while((ln
= listYield(c
->reply
))) {
1451 objlen
= sdslen(o
->ptr
);
1452 if (copylen
+ objlen
<= GLUEREPLY_UP_TO
) {
1453 memcpy(buf
+copylen
,o
->ptr
,objlen
);
1455 listDelNode(c
->reply
,ln
);
1457 if (copylen
== 0) return;
1461 /* Now the output buffer is empty, add the new single element */
1462 o
= createObject(REDIS_STRING
,sdsnewlen(buf
,copylen
));
1463 listAddNodeHead(c
->reply
,o
);
1466 static void sendReplyToClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1467 redisClient
*c
= privdata
;
1468 int nwritten
= 0, totwritten
= 0, objlen
;
1471 REDIS_NOTUSED(mask
);
1473 /* Use writev() if we have enough buffers to send */
1474 if (!server
.glueoutputbuf
&&
1475 listLength(c
->reply
) > REDIS_WRITEV_THRESHOLD
&&
1476 !(c
->flags
& REDIS_MASTER
))
1478 sendReplyToClientWritev(el
, fd
, privdata
, mask
);
1482 while(listLength(c
->reply
)) {
1483 if (server
.glueoutputbuf
&& listLength(c
->reply
) > 1)
1484 glueReplyBuffersIfNeeded(c
);
1486 o
= listNodeValue(listFirst(c
->reply
));
1487 objlen
= sdslen(o
->ptr
);
1490 listDelNode(c
->reply
,listFirst(c
->reply
));
1494 if (c
->flags
& REDIS_MASTER
) {
1495 /* Don't reply to a master */
1496 nwritten
= objlen
- c
->sentlen
;
1498 nwritten
= write(fd
, ((char*)o
->ptr
)+c
->sentlen
, objlen
- c
->sentlen
);
1499 if (nwritten
<= 0) break;
1501 c
->sentlen
+= nwritten
;
1502 totwritten
+= nwritten
;
1503 /* If we fully sent the object on head go to the next one */
1504 if (c
->sentlen
== objlen
) {
1505 listDelNode(c
->reply
,listFirst(c
->reply
));
1508 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1509 * bytes, in a single threaded server it's a good idea to serve
1510 * other clients as well, even if a very large request comes from
1511 * super fast link that is always able to accept data (in real world
1512 * scenario think about 'KEYS *' against the loopback interfae) */
1513 if (totwritten
> REDIS_MAX_WRITE_PER_EVENT
) break;
1515 if (nwritten
== -1) {
1516 if (errno
== EAGAIN
) {
1519 redisLog(REDIS_DEBUG
,
1520 "Error writing to client: %s", strerror(errno
));
1525 if (totwritten
> 0) c
->lastinteraction
= time(NULL
);
1526 if (listLength(c
->reply
) == 0) {
1528 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1532 static void sendReplyToClientWritev(aeEventLoop
*el
, int fd
, void *privdata
, int mask
)
1534 redisClient
*c
= privdata
;
1535 int nwritten
= 0, totwritten
= 0, objlen
, willwrite
;
1537 struct iovec iov
[REDIS_WRITEV_IOVEC_COUNT
];
1538 int offset
, ion
= 0;
1540 REDIS_NOTUSED(mask
);
1543 while (listLength(c
->reply
)) {
1544 offset
= c
->sentlen
;
1548 /* fill-in the iov[] array */
1549 for(node
= listFirst(c
->reply
); node
; node
= listNextNode(node
)) {
1550 o
= listNodeValue(node
);
1551 objlen
= sdslen(o
->ptr
);
1553 if (totwritten
+ objlen
- offset
> REDIS_MAX_WRITE_PER_EVENT
)
1556 if(ion
== REDIS_WRITEV_IOVEC_COUNT
)
1557 break; /* no more iovecs */
1559 iov
[ion
].iov_base
= ((char*)o
->ptr
) + offset
;
1560 iov
[ion
].iov_len
= objlen
- offset
;
1561 willwrite
+= objlen
- offset
;
1562 offset
= 0; /* just for the first item */
1569 /* write all collected blocks at once */
1570 if((nwritten
= writev(fd
, iov
, ion
)) < 0) {
1571 if (errno
!= EAGAIN
) {
1572 redisLog(REDIS_DEBUG
,
1573 "Error writing to client: %s", strerror(errno
));
1580 totwritten
+= nwritten
;
1581 offset
= c
->sentlen
;
1583 /* remove written robjs from c->reply */
1584 while (nwritten
&& listLength(c
->reply
)) {
1585 o
= listNodeValue(listFirst(c
->reply
));
1586 objlen
= sdslen(o
->ptr
);
1588 if(nwritten
>= objlen
- offset
) {
1589 listDelNode(c
->reply
, listFirst(c
->reply
));
1590 nwritten
-= objlen
- offset
;
1594 c
->sentlen
+= nwritten
;
1602 c
->lastinteraction
= time(NULL
);
1604 if (listLength(c
->reply
) == 0) {
1606 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1610 static struct redisCommand
*lookupCommand(char *name
) {
1612 while(cmdTable
[j
].name
!= NULL
) {
1613 if (!strcasecmp(name
,cmdTable
[j
].name
)) return &cmdTable
[j
];
1619 /* resetClient prepare the client to process the next command */
1620 static void resetClient(redisClient
*c
) {
1626 /* If this function gets called we already read a whole
1627 * command, argments are in the client argv/argc fields.
1628 * processCommand() execute the command or prepare the
1629 * server for a bulk read from the client.
1631 * If 1 is returned the client is still alive and valid and
1632 * and other operations can be performed by the caller. Otherwise
1633 * if 0 is returned the client was destroied (i.e. after QUIT). */
1634 static int processCommand(redisClient
*c
) {
1635 struct redisCommand
*cmd
;
1638 /* Free some memory if needed (maxmemory setting) */
1639 if (server
.maxmemory
) freeMemoryIfNeeded();
1641 /* Handle the multi bulk command type. This is an alternative protocol
1642 * supported by Redis in order to receive commands that are composed of
1643 * multiple binary-safe "bulk" arguments. The latency of processing is
1644 * a bit higher but this allows things like multi-sets, so if this
1645 * protocol is used only for MSET and similar commands this is a big win. */
1646 if (c
->multibulk
== 0 && c
->argc
== 1 && ((char*)(c
->argv
[0]->ptr
))[0] == '*') {
1647 c
->multibulk
= atoi(((char*)c
->argv
[0]->ptr
)+1);
1648 if (c
->multibulk
<= 0) {
1652 decrRefCount(c
->argv
[c
->argc
-1]);
1656 } else if (c
->multibulk
) {
1657 if (c
->bulklen
== -1) {
1658 if (((char*)c
->argv
[0]->ptr
)[0] != '$') {
1659 addReplySds(c
,sdsnew("-ERR multi bulk protocol error\r\n"));
1663 int bulklen
= atoi(((char*)c
->argv
[0]->ptr
)+1);
1664 decrRefCount(c
->argv
[0]);
1665 if (bulklen
< 0 || bulklen
> 1024*1024*1024) {
1667 addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n"));
1672 c
->bulklen
= bulklen
+2; /* add two bytes for CR+LF */
1676 c
->mbargv
= zrealloc(c
->mbargv
,(sizeof(robj
*))*(c
->mbargc
+1));
1677 c
->mbargv
[c
->mbargc
] = c
->argv
[0];
1681 if (c
->multibulk
== 0) {
1685 /* Here we need to swap the multi-bulk argc/argv with the
1686 * normal argc/argv of the client structure. */
1688 c
->argv
= c
->mbargv
;
1689 c
->mbargv
= auxargv
;
1692 c
->argc
= c
->mbargc
;
1693 c
->mbargc
= auxargc
;
1695 /* We need to set bulklen to something different than -1
1696 * in order for the code below to process the command without
1697 * to try to read the last argument of a bulk command as
1698 * a special argument. */
1700 /* continue below and process the command */
1707 /* -- end of multi bulk commands processing -- */
1709 /* The QUIT command is handled as a special case. Normal command
1710 * procs are unable to close the client connection safely */
1711 if (!strcasecmp(c
->argv
[0]->ptr
,"quit")) {
1715 cmd
= lookupCommand(c
->argv
[0]->ptr
);
1718 sdscatprintf(sdsempty(), "-ERR unknown command '%s'\r\n",
1719 (char*)c
->argv
[0]->ptr
));
1722 } else if ((cmd
->arity
> 0 && cmd
->arity
!= c
->argc
) ||
1723 (c
->argc
< -cmd
->arity
)) {
1725 sdscatprintf(sdsempty(),
1726 "-ERR wrong number of arguments for '%s' command\r\n",
1730 } else if (server
.maxmemory
&& cmd
->flags
& REDIS_CMD_DENYOOM
&& zmalloc_used_memory() > server
.maxmemory
) {
1731 addReplySds(c
,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1734 } else if (cmd
->flags
& REDIS_CMD_BULK
&& c
->bulklen
== -1) {
1735 int bulklen
= atoi(c
->argv
[c
->argc
-1]->ptr
);
1737 decrRefCount(c
->argv
[c
->argc
-1]);
1738 if (bulklen
< 0 || bulklen
> 1024*1024*1024) {
1740 addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n"));
1745 c
->bulklen
= bulklen
+2; /* add two bytes for CR+LF */
1746 /* It is possible that the bulk read is already in the
1747 * buffer. Check this condition and handle it accordingly.
1748 * This is just a fast path, alternative to call processInputBuffer().
1749 * It's a good idea since the code is small and this condition
1750 * happens most of the times. */
1751 if ((signed)sdslen(c
->querybuf
) >= c
->bulklen
) {
1752 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
1754 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
1759 /* Let's try to share objects on the command arguments vector */
1760 if (server
.shareobjects
) {
1762 for(j
= 1; j
< c
->argc
; j
++)
1763 c
->argv
[j
] = tryObjectSharing(c
->argv
[j
]);
1765 /* Let's try to encode the bulk object to save space. */
1766 if (cmd
->flags
& REDIS_CMD_BULK
)
1767 tryObjectEncoding(c
->argv
[c
->argc
-1]);
1769 /* Check if the user is authenticated */
1770 if (server
.requirepass
&& !c
->authenticated
&& cmd
->proc
!= authCommand
) {
1771 addReplySds(c
,sdsnew("-ERR operation not permitted\r\n"));
1776 /* Exec the command */
1777 dirty
= server
.dirty
;
1779 if (server
.appendonly
&& server
.dirty
-dirty
)
1780 feedAppendOnlyFile(cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1781 if (server
.dirty
-dirty
&& listLength(server
.slaves
))
1782 replicationFeedSlaves(server
.slaves
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1783 if (listLength(server
.monitors
))
1784 replicationFeedSlaves(server
.monitors
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1785 server
.stat_numcommands
++;
1787 /* Prepare the client for the next command */
1788 if (c
->flags
& REDIS_CLOSE
) {
1796 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
) {
1800 /* (args*2)+1 is enough room for args, spaces, newlines */
1801 robj
*static_outv
[REDIS_STATIC_ARGS
*2+1];
1803 if (argc
<= REDIS_STATIC_ARGS
) {
1806 outv
= zmalloc(sizeof(robj
*)*(argc
*2+1));
1809 for (j
= 0; j
< argc
; j
++) {
1810 if (j
!= 0) outv
[outc
++] = shared
.space
;
1811 if ((cmd
->flags
& REDIS_CMD_BULK
) && j
== argc
-1) {
1814 lenobj
= createObject(REDIS_STRING
,
1815 sdscatprintf(sdsempty(),"%lu\r\n",
1816 (unsigned long) stringObjectLen(argv
[j
])));
1817 lenobj
->refcount
= 0;
1818 outv
[outc
++] = lenobj
;
1820 outv
[outc
++] = argv
[j
];
1822 outv
[outc
++] = shared
.crlf
;
1824 /* Increment all the refcounts at start and decrement at end in order to
1825 * be sure to free objects if there is no slave in a replication state
1826 * able to be feed with commands */
1827 for (j
= 0; j
< outc
; j
++) incrRefCount(outv
[j
]);
1829 while((ln
= listYield(slaves
))) {
1830 redisClient
*slave
= ln
->value
;
1832 /* Don't feed slaves that are still waiting for BGSAVE to start */
1833 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) continue;
1835 /* Feed all the other slaves, MONITORs and so on */
1836 if (slave
->slaveseldb
!= dictid
) {
1840 case 0: selectcmd
= shared
.select0
; break;
1841 case 1: selectcmd
= shared
.select1
; break;
1842 case 2: selectcmd
= shared
.select2
; break;
1843 case 3: selectcmd
= shared
.select3
; break;
1844 case 4: selectcmd
= shared
.select4
; break;
1845 case 5: selectcmd
= shared
.select5
; break;
1846 case 6: selectcmd
= shared
.select6
; break;
1847 case 7: selectcmd
= shared
.select7
; break;
1848 case 8: selectcmd
= shared
.select8
; break;
1849 case 9: selectcmd
= shared
.select9
; break;
1851 selectcmd
= createObject(REDIS_STRING
,
1852 sdscatprintf(sdsempty(),"select %d\r\n",dictid
));
1853 selectcmd
->refcount
= 0;
1856 addReply(slave
,selectcmd
);
1857 slave
->slaveseldb
= dictid
;
1859 for (j
= 0; j
< outc
; j
++) addReply(slave
,outv
[j
]);
1861 for (j
= 0; j
< outc
; j
++) decrRefCount(outv
[j
]);
1862 if (outv
!= static_outv
) zfree(outv
);
1865 static void processInputBuffer(redisClient
*c
) {
1867 if (c
->bulklen
== -1) {
1868 /* Read the first line of the query */
1869 char *p
= strchr(c
->querybuf
,'\n');
1876 query
= c
->querybuf
;
1877 c
->querybuf
= sdsempty();
1878 querylen
= 1+(p
-(query
));
1879 if (sdslen(query
) > querylen
) {
1880 /* leave data after the first line of the query in the buffer */
1881 c
->querybuf
= sdscatlen(c
->querybuf
,query
+querylen
,sdslen(query
)-querylen
);
1883 *p
= '\0'; /* remove "\n" */
1884 if (*(p
-1) == '\r') *(p
-1) = '\0'; /* and "\r" if any */
1885 sdsupdatelen(query
);
1887 /* Now we can split the query in arguments */
1888 argv
= sdssplitlen(query
,sdslen(query
)," ",1,&argc
);
1891 if (c
->argv
) zfree(c
->argv
);
1892 c
->argv
= zmalloc(sizeof(robj
*)*argc
);
1894 for (j
= 0; j
< argc
; j
++) {
1895 if (sdslen(argv
[j
])) {
1896 c
->argv
[c
->argc
] = createObject(REDIS_STRING
,argv
[j
]);
1904 /* Execute the command. If the client is still valid
1905 * after processCommand() return and there is something
1906 * on the query buffer try to process the next command. */
1907 if (processCommand(c
) && sdslen(c
->querybuf
)) goto again
;
1909 /* Nothing to process, argc == 0. Just process the query
1910 * buffer if it's not empty or return to the caller */
1911 if (sdslen(c
->querybuf
)) goto again
;
1914 } else if (sdslen(c
->querybuf
) >= REDIS_REQUEST_MAX_SIZE
) {
1915 redisLog(REDIS_DEBUG
, "Client protocol error");
1920 /* Bulk read handling. Note that if we are at this point
1921 the client already sent a command terminated with a newline,
1922 we are reading the bulk data that is actually the last
1923 argument of the command. */
1924 int qbl
= sdslen(c
->querybuf
);
1926 if (c
->bulklen
<= qbl
) {
1927 /* Copy everything but the final CRLF as final argument */
1928 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
1930 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
1931 /* Process the command. If the client is still valid after
1932 * the processing and there is more data in the buffer
1933 * try to parse it. */
1934 if (processCommand(c
) && sdslen(c
->querybuf
)) goto again
;
1940 static void readQueryFromClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1941 redisClient
*c
= (redisClient
*) privdata
;
1942 char buf
[REDIS_IOBUF_LEN
];
1945 REDIS_NOTUSED(mask
);
1947 nread
= read(fd
, buf
, REDIS_IOBUF_LEN
);
1949 if (errno
== EAGAIN
) {
1952 redisLog(REDIS_DEBUG
, "Reading from client: %s",strerror(errno
));
1956 } else if (nread
== 0) {
1957 redisLog(REDIS_DEBUG
, "Client closed connection");
1962 c
->querybuf
= sdscatlen(c
->querybuf
, buf
, nread
);
1963 c
->lastinteraction
= time(NULL
);
1967 processInputBuffer(c
);
1970 static int selectDb(redisClient
*c
, int id
) {
1971 if (id
< 0 || id
>= server
.dbnum
)
1973 c
->db
= &server
.db
[id
];
1977 static void *dupClientReplyValue(void *o
) {
1978 incrRefCount((robj
*)o
);
1982 static redisClient
*createClient(int fd
) {
1983 redisClient
*c
= zmalloc(sizeof(*c
));
1985 anetNonBlock(NULL
,fd
);
1986 anetTcpNoDelay(NULL
,fd
);
1987 if (!c
) return NULL
;
1990 c
->querybuf
= sdsempty();
1999 c
->lastinteraction
= time(NULL
);
2000 c
->authenticated
= 0;
2001 c
->replstate
= REDIS_REPL_NONE
;
2002 c
->reply
= listCreate();
2003 listSetFreeMethod(c
->reply
,decrRefCount
);
2004 listSetDupMethod(c
->reply
,dupClientReplyValue
);
2005 if (aeCreateFileEvent(server
.el
, c
->fd
, AE_READABLE
,
2006 readQueryFromClient
, c
) == AE_ERR
) {
2010 listAddNodeTail(server
.clients
,c
);
2014 static void addReply(redisClient
*c
, robj
*obj
) {
2015 if (listLength(c
->reply
) == 0 &&
2016 (c
->replstate
== REDIS_REPL_NONE
||
2017 c
->replstate
== REDIS_REPL_ONLINE
) &&
2018 aeCreateFileEvent(server
.el
, c
->fd
, AE_WRITABLE
,
2019 sendReplyToClient
, c
) == AE_ERR
) return;
2020 listAddNodeTail(c
->reply
,getDecodedObject(obj
));
2023 static void addReplySds(redisClient
*c
, sds s
) {
2024 robj
*o
= createObject(REDIS_STRING
,s
);
2029 static void addReplyDouble(redisClient
*c
, double d
) {
2032 snprintf(buf
,sizeof(buf
),"%.17g",d
);
2033 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n%s\r\n",
2034 (unsigned long) strlen(buf
),buf
));
2037 static void addReplyBulkLen(redisClient
*c
, robj
*obj
) {
2040 if (obj
->encoding
== REDIS_ENCODING_RAW
) {
2041 len
= sdslen(obj
->ptr
);
2043 long n
= (long)obj
->ptr
;
2045 /* Compute how many bytes will take this integer as a radix 10 string */
2051 while((n
= n
/10) != 0) {
2055 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n",(unsigned long)len
));
2058 static void acceptHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
2063 REDIS_NOTUSED(mask
);
2064 REDIS_NOTUSED(privdata
);
2066 cfd
= anetAccept(server
.neterr
, fd
, cip
, &cport
);
2067 if (cfd
== AE_ERR
) {
2068 redisLog(REDIS_DEBUG
,"Accepting client connection: %s", server
.neterr
);
2071 redisLog(REDIS_DEBUG
,"Accepted %s:%d", cip
, cport
);
2072 if ((c
= createClient(cfd
)) == NULL
) {
2073 redisLog(REDIS_WARNING
,"Error allocating resoures for the client");
2074 close(cfd
); /* May be already closed, just ingore errors */
2077 /* If maxclient directive is set and this is one client more... close the
2078 * connection. Note that we create the client instead to check before
2079 * for this condition, since now the socket is already set in nonblocking
2080 * mode and we can send an error for free using the Kernel I/O */
2081 if (server
.maxclients
&& listLength(server
.clients
) > server
.maxclients
) {
2082 char *err
= "-ERR max number of clients reached\r\n";
2084 /* That's a best effort error message, don't check write errors */
2085 if (write(c
->fd
,err
,strlen(err
)) == -1) {
2086 /* Nothing to do, Just to avoid the warning... */
2091 server
.stat_numconnections
++;
2094 /* ======================= Redis objects implementation ===================== */
2096 static robj
*createObject(int type
, void *ptr
) {
2099 if (listLength(server
.objfreelist
)) {
2100 listNode
*head
= listFirst(server
.objfreelist
);
2101 o
= listNodeValue(head
);
2102 listDelNode(server
.objfreelist
,head
);
2104 o
= zmalloc(sizeof(*o
));
2107 o
->encoding
= REDIS_ENCODING_RAW
;
2113 static robj
*createStringObject(char *ptr
, size_t len
) {
2114 return createObject(REDIS_STRING
,sdsnewlen(ptr
,len
));
2117 static robj
*createListObject(void) {
2118 list
*l
= listCreate();
2120 listSetFreeMethod(l
,decrRefCount
);
2121 return createObject(REDIS_LIST
,l
);
2124 static robj
*createSetObject(void) {
2125 dict
*d
= dictCreate(&setDictType
,NULL
);
2126 return createObject(REDIS_SET
,d
);
2129 static robj
*createZsetObject(void) {
2130 zset
*zs
= zmalloc(sizeof(*zs
));
2132 zs
->dict
= dictCreate(&zsetDictType
,NULL
);
2133 zs
->zsl
= zslCreate();
2134 return createObject(REDIS_ZSET
,zs
);
2137 static void freeStringObject(robj
*o
) {
2138 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2143 static void freeListObject(robj
*o
) {
2144 listRelease((list
*) o
->ptr
);
2147 static void freeSetObject(robj
*o
) {
2148 dictRelease((dict
*) o
->ptr
);
2151 static void freeZsetObject(robj
*o
) {
2154 dictRelease(zs
->dict
);
2159 static void freeHashObject(robj
*o
) {
2160 dictRelease((dict
*) o
->ptr
);
2163 static void incrRefCount(robj
*o
) {
2165 #ifdef DEBUG_REFCOUNT
2166 if (o
->type
== REDIS_STRING
)
2167 printf("Increment '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
);
2171 static void decrRefCount(void *obj
) {
2174 #ifdef DEBUG_REFCOUNT
2175 if (o
->type
== REDIS_STRING
)
2176 printf("Decrement '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
-1);
2178 if (--(o
->refcount
) == 0) {
2180 case REDIS_STRING
: freeStringObject(o
); break;
2181 case REDIS_LIST
: freeListObject(o
); break;
2182 case REDIS_SET
: freeSetObject(o
); break;
2183 case REDIS_ZSET
: freeZsetObject(o
); break;
2184 case REDIS_HASH
: freeHashObject(o
); break;
2185 default: redisAssert(0 != 0); break;
2187 if (listLength(server
.objfreelist
) > REDIS_OBJFREELIST_MAX
||
2188 !listAddNodeHead(server
.objfreelist
,o
))
2193 static robj
*lookupKey(redisDb
*db
, robj
*key
) {
2194 dictEntry
*de
= dictFind(db
->dict
,key
);
2195 return de
? dictGetEntryVal(de
) : NULL
;
2198 static robj
*lookupKeyRead(redisDb
*db
, robj
*key
) {
2199 expireIfNeeded(db
,key
);
2200 return lookupKey(db
,key
);
2203 static robj
*lookupKeyWrite(redisDb
*db
, robj
*key
) {
2204 deleteIfVolatile(db
,key
);
2205 return lookupKey(db
,key
);
2208 static int deleteKey(redisDb
*db
, robj
*key
) {
2211 /* We need to protect key from destruction: after the first dictDelete()
2212 * it may happen that 'key' is no longer valid if we don't increment
2213 * it's count. This may happen when we get the object reference directly
2214 * from the hash table with dictRandomKey() or dict iterators */
2216 if (dictSize(db
->expires
)) dictDelete(db
->expires
,key
);
2217 retval
= dictDelete(db
->dict
,key
);
2220 return retval
== DICT_OK
;
2223 /* Try to share an object against the shared objects pool */
2224 static robj
*tryObjectSharing(robj
*o
) {
2225 struct dictEntry
*de
;
2228 if (o
== NULL
|| server
.shareobjects
== 0) return o
;
2230 redisAssert(o
->type
== REDIS_STRING
);
2231 de
= dictFind(server
.sharingpool
,o
);
2233 robj
*shared
= dictGetEntryKey(de
);
2235 c
= ((unsigned long) dictGetEntryVal(de
))+1;
2236 dictGetEntryVal(de
) = (void*) c
;
2237 incrRefCount(shared
);
2241 /* Here we are using a stream algorihtm: Every time an object is
2242 * shared we increment its count, everytime there is a miss we
2243 * recrement the counter of a random object. If this object reaches
2244 * zero we remove the object and put the current object instead. */
2245 if (dictSize(server
.sharingpool
) >=
2246 server
.sharingpoolsize
) {
2247 de
= dictGetRandomKey(server
.sharingpool
);
2248 redisAssert(de
!= NULL
);
2249 c
= ((unsigned long) dictGetEntryVal(de
))-1;
2250 dictGetEntryVal(de
) = (void*) c
;
2252 dictDelete(server
.sharingpool
,de
->key
);
2255 c
= 0; /* If the pool is empty we want to add this object */
2260 retval
= dictAdd(server
.sharingpool
,o
,(void*)1);
2261 redisAssert(retval
== DICT_OK
);
2268 /* Check if the nul-terminated string 's' can be represented by a long
2269 * (that is, is a number that fits into long without any other space or
2270 * character before or after the digits).
2272 * If so, the function returns REDIS_OK and *longval is set to the value
2273 * of the number. Otherwise REDIS_ERR is returned */
2274 static int isStringRepresentableAsLong(sds s
, long *longval
) {
2275 char buf
[32], *endptr
;
2279 value
= strtol(s
, &endptr
, 10);
2280 if (endptr
[0] != '\0') return REDIS_ERR
;
2281 slen
= snprintf(buf
,32,"%ld",value
);
2283 /* If the number converted back into a string is not identical
2284 * then it's not possible to encode the string as integer */
2285 if (sdslen(s
) != (unsigned)slen
|| memcmp(buf
,s
,slen
)) return REDIS_ERR
;
2286 if (longval
) *longval
= value
;
2290 /* Try to encode a string object in order to save space */
2291 static int tryObjectEncoding(robj
*o
) {
2295 if (o
->encoding
!= REDIS_ENCODING_RAW
)
2296 return REDIS_ERR
; /* Already encoded */
2298 /* It's not save to encode shared objects: shared objects can be shared
2299 * everywhere in the "object space" of Redis. Encoded objects can only
2300 * appear as "values" (and not, for instance, as keys) */
2301 if (o
->refcount
> 1) return REDIS_ERR
;
2303 /* Currently we try to encode only strings */
2304 redisAssert(o
->type
== REDIS_STRING
);
2306 /* Check if we can represent this string as a long integer */
2307 if (isStringRepresentableAsLong(s
,&value
) == REDIS_ERR
) return REDIS_ERR
;
2309 /* Ok, this object can be encoded */
2310 o
->encoding
= REDIS_ENCODING_INT
;
2312 o
->ptr
= (void*) value
;
2316 /* Get a decoded version of an encoded object (returned as a new object).
2317 * If the object is already raw-encoded just increment the ref count. */
2318 static robj
*getDecodedObject(robj
*o
) {
2321 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2325 if (o
->type
== REDIS_STRING
&& o
->encoding
== REDIS_ENCODING_INT
) {
2328 snprintf(buf
,32,"%ld",(long)o
->ptr
);
2329 dec
= createStringObject(buf
,strlen(buf
));
2332 redisAssert(1 != 1);
2336 /* Compare two string objects via strcmp() or alike.
2337 * Note that the objects may be integer-encoded. In such a case we
2338 * use snprintf() to get a string representation of the numbers on the stack
2339 * and compare the strings, it's much faster than calling getDecodedObject().
2341 * Important note: if objects are not integer encoded, but binary-safe strings,
2342 * sdscmp() from sds.c will apply memcmp() so this function ca be considered
2344 static int compareStringObjects(robj
*a
, robj
*b
) {
2345 redisAssert(a
->type
== REDIS_STRING
&& b
->type
== REDIS_STRING
);
2346 char bufa
[128], bufb
[128], *astr
, *bstr
;
2349 if (a
== b
) return 0;
2350 if (a
->encoding
!= REDIS_ENCODING_RAW
) {
2351 snprintf(bufa
,sizeof(bufa
),"%ld",(long) a
->ptr
);
2357 if (b
->encoding
!= REDIS_ENCODING_RAW
) {
2358 snprintf(bufb
,sizeof(bufb
),"%ld",(long) b
->ptr
);
2364 return bothsds
? sdscmp(astr
,bstr
) : strcmp(astr
,bstr
);
2367 static size_t stringObjectLen(robj
*o
) {
2368 redisAssert(o
->type
== REDIS_STRING
);
2369 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2370 return sdslen(o
->ptr
);
2374 return snprintf(buf
,32,"%ld",(long)o
->ptr
);
2378 /*============================ DB saving/loading ============================ */
2380 static int rdbSaveType(FILE *fp
, unsigned char type
) {
2381 if (fwrite(&type
,1,1,fp
) == 0) return -1;
2385 static int rdbSaveTime(FILE *fp
, time_t t
) {
2386 int32_t t32
= (int32_t) t
;
2387 if (fwrite(&t32
,4,1,fp
) == 0) return -1;
2391 /* check rdbLoadLen() comments for more info */
2392 static int rdbSaveLen(FILE *fp
, uint32_t len
) {
2393 unsigned char buf
[2];
2396 /* Save a 6 bit len */
2397 buf
[0] = (len
&0xFF)|(REDIS_RDB_6BITLEN
<<6);
2398 if (fwrite(buf
,1,1,fp
) == 0) return -1;
2399 } else if (len
< (1<<14)) {
2400 /* Save a 14 bit len */
2401 buf
[0] = ((len
>>8)&0xFF)|(REDIS_RDB_14BITLEN
<<6);
2403 if (fwrite(buf
,2,1,fp
) == 0) return -1;
2405 /* Save a 32 bit len */
2406 buf
[0] = (REDIS_RDB_32BITLEN
<<6);
2407 if (fwrite(buf
,1,1,fp
) == 0) return -1;
2409 if (fwrite(&len
,4,1,fp
) == 0) return -1;
2414 /* String objects in the form "2391" "-100" without any space and with a
2415 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2416 * encoded as integers to save space */
2417 static int rdbTryIntegerEncoding(sds s
, unsigned char *enc
) {
2419 char *endptr
, buf
[32];
2421 /* Check if it's possible to encode this value as a number */
2422 value
= strtoll(s
, &endptr
, 10);
2423 if (endptr
[0] != '\0') return 0;
2424 snprintf(buf
,32,"%lld",value
);
2426 /* If the number converted back into a string is not identical
2427 * then it's not possible to encode the string as integer */
2428 if (strlen(buf
) != sdslen(s
) || memcmp(buf
,s
,sdslen(s
))) return 0;
2430 /* Finally check if it fits in our ranges */
2431 if (value
>= -(1<<7) && value
<= (1<<7)-1) {
2432 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT8
;
2433 enc
[1] = value
&0xFF;
2435 } else if (value
>= -(1<<15) && value
<= (1<<15)-1) {
2436 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT16
;
2437 enc
[1] = value
&0xFF;
2438 enc
[2] = (value
>>8)&0xFF;
2440 } else if (value
>= -((long long)1<<31) && value
<= ((long long)1<<31)-1) {
2441 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT32
;
2442 enc
[1] = value
&0xFF;
2443 enc
[2] = (value
>>8)&0xFF;
2444 enc
[3] = (value
>>16)&0xFF;
2445 enc
[4] = (value
>>24)&0xFF;
2452 static int rdbSaveLzfStringObject(FILE *fp
, robj
*obj
) {
2453 unsigned int comprlen
, outlen
;
2457 /* We require at least four bytes compression for this to be worth it */
2458 outlen
= sdslen(obj
->ptr
)-4;
2459 if (outlen
<= 0) return 0;
2460 if ((out
= zmalloc(outlen
+1)) == NULL
) return 0;
2461 comprlen
= lzf_compress(obj
->ptr
, sdslen(obj
->ptr
), out
, outlen
);
2462 if (comprlen
== 0) {
2466 /* Data compressed! Let's save it on disk */
2467 byte
= (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_LZF
;
2468 if (fwrite(&byte
,1,1,fp
) == 0) goto writeerr
;
2469 if (rdbSaveLen(fp
,comprlen
) == -1) goto writeerr
;
2470 if (rdbSaveLen(fp
,sdslen(obj
->ptr
)) == -1) goto writeerr
;
2471 if (fwrite(out
,comprlen
,1,fp
) == 0) goto writeerr
;
2480 /* Save a string objet as [len][data] on disk. If the object is a string
2481 * representation of an integer value we try to safe it in a special form */
2482 static int rdbSaveStringObjectRaw(FILE *fp
, robj
*obj
) {
2486 len
= sdslen(obj
->ptr
);
2488 /* Try integer encoding */
2490 unsigned char buf
[5];
2491 if ((enclen
= rdbTryIntegerEncoding(obj
->ptr
,buf
)) > 0) {
2492 if (fwrite(buf
,enclen
,1,fp
) == 0) return -1;
2497 /* Try LZF compression - under 20 bytes it's unable to compress even
2498 * aaaaaaaaaaaaaaaaaa so skip it */
2499 if (server
.rdbcompression
&& len
> 20) {
2502 retval
= rdbSaveLzfStringObject(fp
,obj
);
2503 if (retval
== -1) return -1;
2504 if (retval
> 0) return 0;
2505 /* retval == 0 means data can't be compressed, save the old way */
2508 /* Store verbatim */
2509 if (rdbSaveLen(fp
,len
) == -1) return -1;
2510 if (len
&& fwrite(obj
->ptr
,len
,1,fp
) == 0) return -1;
2514 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2515 static int rdbSaveStringObject(FILE *fp
, robj
*obj
) {
2518 obj
= getDecodedObject(obj
);
2519 retval
= rdbSaveStringObjectRaw(fp
,obj
);
2524 /* Save a double value. Doubles are saved as strings prefixed by an unsigned
2525 * 8 bit integer specifing the length of the representation.
2526 * This 8 bit integer has special values in order to specify the following
2532 static int rdbSaveDoubleValue(FILE *fp
, double val
) {
2533 unsigned char buf
[128];
2539 } else if (!isfinite(val
)) {
2541 buf
[0] = (val
< 0) ? 255 : 254;
2543 snprintf((char*)buf
+1,sizeof(buf
)-1,"%.17g",val
);
2544 buf
[0] = strlen((char*)buf
+1);
2547 if (fwrite(buf
,len
,1,fp
) == 0) return -1;
2551 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2552 static int rdbSave(char *filename
) {
2553 dictIterator
*di
= NULL
;
2558 time_t now
= time(NULL
);
2560 snprintf(tmpfile
,256,"temp-%d.rdb", (int) getpid());
2561 fp
= fopen(tmpfile
,"w");
2563 redisLog(REDIS_WARNING
, "Failed saving the DB: %s", strerror(errno
));
2566 if (fwrite("REDIS0001",9,1,fp
) == 0) goto werr
;
2567 for (j
= 0; j
< server
.dbnum
; j
++) {
2568 redisDb
*db
= server
.db
+j
;
2570 if (dictSize(d
) == 0) continue;
2571 di
= dictGetIterator(d
);
2577 /* Write the SELECT DB opcode */
2578 if (rdbSaveType(fp
,REDIS_SELECTDB
) == -1) goto werr
;
2579 if (rdbSaveLen(fp
,j
) == -1) goto werr
;
2581 /* Iterate this DB writing every entry */
2582 while((de
= dictNext(di
)) != NULL
) {
2583 robj
*key
= dictGetEntryKey(de
);
2584 robj
*o
= dictGetEntryVal(de
);
2585 time_t expiretime
= getExpire(db
,key
);
2587 /* Save the expire time */
2588 if (expiretime
!= -1) {
2589 /* If this key is already expired skip it */
2590 if (expiretime
< now
) continue;
2591 if (rdbSaveType(fp
,REDIS_EXPIRETIME
) == -1) goto werr
;
2592 if (rdbSaveTime(fp
,expiretime
) == -1) goto werr
;
2594 /* Save the key and associated value */
2595 if (rdbSaveType(fp
,o
->type
) == -1) goto werr
;
2596 if (rdbSaveStringObject(fp
,key
) == -1) goto werr
;
2597 if (o
->type
== REDIS_STRING
) {
2598 /* Save a string value */
2599 if (rdbSaveStringObject(fp
,o
) == -1) goto werr
;
2600 } else if (o
->type
== REDIS_LIST
) {
2601 /* Save a list value */
2602 list
*list
= o
->ptr
;
2606 if (rdbSaveLen(fp
,listLength(list
)) == -1) goto werr
;
2607 while((ln
= listYield(list
))) {
2608 robj
*eleobj
= listNodeValue(ln
);
2610 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
2612 } else if (o
->type
== REDIS_SET
) {
2613 /* Save a set value */
2615 dictIterator
*di
= dictGetIterator(set
);
2618 if (rdbSaveLen(fp
,dictSize(set
)) == -1) goto werr
;
2619 while((de
= dictNext(di
)) != NULL
) {
2620 robj
*eleobj
= dictGetEntryKey(de
);
2622 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
2624 dictReleaseIterator(di
);
2625 } else if (o
->type
== REDIS_ZSET
) {
2626 /* Save a set value */
2628 dictIterator
*di
= dictGetIterator(zs
->dict
);
2631 if (rdbSaveLen(fp
,dictSize(zs
->dict
)) == -1) goto werr
;
2632 while((de
= dictNext(di
)) != NULL
) {
2633 robj
*eleobj
= dictGetEntryKey(de
);
2634 double *score
= dictGetEntryVal(de
);
2636 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
2637 if (rdbSaveDoubleValue(fp
,*score
) == -1) goto werr
;
2639 dictReleaseIterator(di
);
2641 redisAssert(0 != 0);
2644 dictReleaseIterator(di
);
2647 if (rdbSaveType(fp
,REDIS_EOF
) == -1) goto werr
;
2649 /* Make sure data will not remain on the OS's output buffers */
2654 /* Use RENAME to make sure the DB file is changed atomically only
2655 * if the generate DB file is ok. */
2656 if (rename(tmpfile
,filename
) == -1) {
2657 redisLog(REDIS_WARNING
,"Error moving temp DB file on the final destination: %s", strerror(errno
));
2661 redisLog(REDIS_NOTICE
,"DB saved on disk");
2663 server
.lastsave
= time(NULL
);
2669 redisLog(REDIS_WARNING
,"Write error saving DB on disk: %s", strerror(errno
));
2670 if (di
) dictReleaseIterator(di
);
2674 static int rdbSaveBackground(char *filename
) {
2677 if (server
.bgsavechildpid
!= -1) return REDIS_ERR
;
2678 if ((childpid
= fork()) == 0) {
2681 if (rdbSave(filename
) == REDIS_OK
) {
2688 if (childpid
== -1) {
2689 redisLog(REDIS_WARNING
,"Can't save in background: fork: %s",
2693 redisLog(REDIS_NOTICE
,"Background saving started by pid %d",childpid
);
2694 server
.bgsavechildpid
= childpid
;
2697 return REDIS_OK
; /* unreached */
2700 static void rdbRemoveTempFile(pid_t childpid
) {
2703 snprintf(tmpfile
,256,"temp-%d.rdb", (int) childpid
);
2707 static int rdbLoadType(FILE *fp
) {
2709 if (fread(&type
,1,1,fp
) == 0) return -1;
2713 static time_t rdbLoadTime(FILE *fp
) {
2715 if (fread(&t32
,4,1,fp
) == 0) return -1;
2716 return (time_t) t32
;
2719 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2720 * of this file for a description of how this are stored on disk.
2722 * isencoded is set to 1 if the readed length is not actually a length but
2723 * an "encoding type", check the above comments for more info */
2724 static uint32_t rdbLoadLen(FILE *fp
, int rdbver
, int *isencoded
) {
2725 unsigned char buf
[2];
2728 if (isencoded
) *isencoded
= 0;
2730 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
2735 if (fread(buf
,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2736 type
= (buf
[0]&0xC0)>>6;
2737 if (type
== REDIS_RDB_6BITLEN
) {
2738 /* Read a 6 bit len */
2740 } else if (type
== REDIS_RDB_ENCVAL
) {
2741 /* Read a 6 bit len encoding type */
2742 if (isencoded
) *isencoded
= 1;
2744 } else if (type
== REDIS_RDB_14BITLEN
) {
2745 /* Read a 14 bit len */
2746 if (fread(buf
+1,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2747 return ((buf
[0]&0x3F)<<8)|buf
[1];
2749 /* Read a 32 bit len */
2750 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
2756 static robj
*rdbLoadIntegerObject(FILE *fp
, int enctype
) {
2757 unsigned char enc
[4];
2760 if (enctype
== REDIS_RDB_ENC_INT8
) {
2761 if (fread(enc
,1,1,fp
) == 0) return NULL
;
2762 val
= (signed char)enc
[0];
2763 } else if (enctype
== REDIS_RDB_ENC_INT16
) {
2765 if (fread(enc
,2,1,fp
) == 0) return NULL
;
2766 v
= enc
[0]|(enc
[1]<<8);
2768 } else if (enctype
== REDIS_RDB_ENC_INT32
) {
2770 if (fread(enc
,4,1,fp
) == 0) return NULL
;
2771 v
= enc
[0]|(enc
[1]<<8)|(enc
[2]<<16)|(enc
[3]<<24);
2774 val
= 0; /* anti-warning */
2777 return createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",val
));
2780 static robj
*rdbLoadLzfStringObject(FILE*fp
, int rdbver
) {
2781 unsigned int len
, clen
;
2782 unsigned char *c
= NULL
;
2785 if ((clen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2786 if ((len
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2787 if ((c
= zmalloc(clen
)) == NULL
) goto err
;
2788 if ((val
= sdsnewlen(NULL
,len
)) == NULL
) goto err
;
2789 if (fread(c
,clen
,1,fp
) == 0) goto err
;
2790 if (lzf_decompress(c
,clen
,val
,len
) == 0) goto err
;
2792 return createObject(REDIS_STRING
,val
);
2799 static robj
*rdbLoadStringObject(FILE*fp
, int rdbver
) {
2804 len
= rdbLoadLen(fp
,rdbver
,&isencoded
);
2807 case REDIS_RDB_ENC_INT8
:
2808 case REDIS_RDB_ENC_INT16
:
2809 case REDIS_RDB_ENC_INT32
:
2810 return tryObjectSharing(rdbLoadIntegerObject(fp
,len
));
2811 case REDIS_RDB_ENC_LZF
:
2812 return tryObjectSharing(rdbLoadLzfStringObject(fp
,rdbver
));
2818 if (len
== REDIS_RDB_LENERR
) return NULL
;
2819 val
= sdsnewlen(NULL
,len
);
2820 if (len
&& fread(val
,len
,1,fp
) == 0) {
2824 return tryObjectSharing(createObject(REDIS_STRING
,val
));
2827 /* For information about double serialization check rdbSaveDoubleValue() */
2828 static int rdbLoadDoubleValue(FILE *fp
, double *val
) {
2832 if (fread(&len
,1,1,fp
) == 0) return -1;
2834 case 255: *val
= R_NegInf
; return 0;
2835 case 254: *val
= R_PosInf
; return 0;
2836 case 253: *val
= R_Nan
; return 0;
2838 if (fread(buf
,len
,1,fp
) == 0) return -1;
2840 sscanf(buf
, "%lg", val
);
2845 static int rdbLoad(char *filename
) {
2847 robj
*keyobj
= NULL
;
2849 int type
, retval
, rdbver
;
2850 dict
*d
= server
.db
[0].dict
;
2851 redisDb
*db
= server
.db
+0;
2853 time_t expiretime
= -1, now
= time(NULL
);
2855 fp
= fopen(filename
,"r");
2856 if (!fp
) return REDIS_ERR
;
2857 if (fread(buf
,9,1,fp
) == 0) goto eoferr
;
2859 if (memcmp(buf
,"REDIS",5) != 0) {
2861 redisLog(REDIS_WARNING
,"Wrong signature trying to load DB from file");
2864 rdbver
= atoi(buf
+5);
2867 redisLog(REDIS_WARNING
,"Can't handle RDB format version %d",rdbver
);
2874 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
2875 if (type
== REDIS_EXPIRETIME
) {
2876 if ((expiretime
= rdbLoadTime(fp
)) == -1) goto eoferr
;
2877 /* We read the time so we need to read the object type again */
2878 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
2880 if (type
== REDIS_EOF
) break;
2881 /* Handle SELECT DB opcode as a special case */
2882 if (type
== REDIS_SELECTDB
) {
2883 if ((dbid
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2885 if (dbid
>= (unsigned)server
.dbnum
) {
2886 redisLog(REDIS_WARNING
,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server
.dbnum
);
2889 db
= server
.db
+dbid
;
2894 if ((keyobj
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2896 if (type
== REDIS_STRING
) {
2897 /* Read string value */
2898 if ((o
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2899 tryObjectEncoding(o
);
2900 } else if (type
== REDIS_LIST
|| type
== REDIS_SET
) {
2901 /* Read list/set value */
2904 if ((listlen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2906 o
= (type
== REDIS_LIST
) ? createListObject() : createSetObject();
2907 /* Load every single element of the list/set */
2911 if ((ele
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2912 tryObjectEncoding(ele
);
2913 if (type
== REDIS_LIST
) {
2914 listAddNodeTail((list
*)o
->ptr
,ele
);
2916 dictAdd((dict
*)o
->ptr
,ele
,NULL
);
2919 } else if (type
== REDIS_ZSET
) {
2920 /* Read list/set value */
2924 if ((zsetlen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2926 o
= createZsetObject();
2928 /* Load every single element of the list/set */
2931 double *score
= zmalloc(sizeof(double));
2933 if ((ele
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2934 tryObjectEncoding(ele
);
2935 if (rdbLoadDoubleValue(fp
,score
) == -1) goto eoferr
;
2936 dictAdd(zs
->dict
,ele
,score
);
2937 zslInsert(zs
->zsl
,*score
,ele
);
2938 incrRefCount(ele
); /* added to skiplist */
2941 redisAssert(0 != 0);
2943 /* Add the new object in the hash table */
2944 retval
= dictAdd(d
,keyobj
,o
);
2945 if (retval
== DICT_ERR
) {
2946 redisLog(REDIS_WARNING
,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj
->ptr
);
2949 /* Set the expire time if needed */
2950 if (expiretime
!= -1) {
2951 setExpire(db
,keyobj
,expiretime
);
2952 /* Delete this key if already expired */
2953 if (expiretime
< now
) deleteKey(db
,keyobj
);
2961 eoferr
: /* unexpected end of file is handled here with a fatal exit */
2962 if (keyobj
) decrRefCount(keyobj
);
2963 redisLog(REDIS_WARNING
,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
2965 return REDIS_ERR
; /* Just to avoid warning */
2968 /*================================== Commands =============================== */
2970 static void authCommand(redisClient
*c
) {
2971 if (!server
.requirepass
|| !strcmp(c
->argv
[1]->ptr
, server
.requirepass
)) {
2972 c
->authenticated
= 1;
2973 addReply(c
,shared
.ok
);
2975 c
->authenticated
= 0;
2976 addReplySds(c
,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
2980 static void pingCommand(redisClient
*c
) {
2981 addReply(c
,shared
.pong
);
2984 static void echoCommand(redisClient
*c
) {
2985 addReplyBulkLen(c
,c
->argv
[1]);
2986 addReply(c
,c
->argv
[1]);
2987 addReply(c
,shared
.crlf
);
2990 /*=================================== Strings =============================== */
2992 static void setGenericCommand(redisClient
*c
, int nx
) {
2995 if (nx
) deleteIfVolatile(c
->db
,c
->argv
[1]);
2996 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2997 if (retval
== DICT_ERR
) {
2999 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
3000 incrRefCount(c
->argv
[2]);
3002 addReply(c
,shared
.czero
);
3006 incrRefCount(c
->argv
[1]);
3007 incrRefCount(c
->argv
[2]);
3010 removeExpire(c
->db
,c
->argv
[1]);
3011 addReply(c
, nx
? shared
.cone
: shared
.ok
);
3014 static void setCommand(redisClient
*c
) {
3015 setGenericCommand(c
,0);
3018 static void setnxCommand(redisClient
*c
) {
3019 setGenericCommand(c
,1);
3022 static void getCommand(redisClient
*c
) {
3023 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3026 addReply(c
,shared
.nullbulk
);
3028 if (o
->type
!= REDIS_STRING
) {
3029 addReply(c
,shared
.wrongtypeerr
);
3031 addReplyBulkLen(c
,o
);
3033 addReply(c
,shared
.crlf
);
3038 static void getsetCommand(redisClient
*c
) {
3040 if (dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]) == DICT_ERR
) {
3041 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
3043 incrRefCount(c
->argv
[1]);
3045 incrRefCount(c
->argv
[2]);
3047 removeExpire(c
->db
,c
->argv
[1]);
3050 static void mgetCommand(redisClient
*c
) {
3053 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",c
->argc
-1));
3054 for (j
= 1; j
< c
->argc
; j
++) {
3055 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[j
]);
3057 addReply(c
,shared
.nullbulk
);
3059 if (o
->type
!= REDIS_STRING
) {
3060 addReply(c
,shared
.nullbulk
);
3062 addReplyBulkLen(c
,o
);
3064 addReply(c
,shared
.crlf
);
3070 static void msetGenericCommand(redisClient
*c
, int nx
) {
3071 int j
, busykeys
= 0;
3073 if ((c
->argc
% 2) == 0) {
3074 addReplySds(c
,sdsnew("-ERR wrong number of arguments for MSET\r\n"));
3077 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
3078 * set nothing at all if at least one already key exists. */
3080 for (j
= 1; j
< c
->argc
; j
+= 2) {
3081 if (lookupKeyWrite(c
->db
,c
->argv
[j
]) != NULL
) {
3087 addReply(c
, shared
.czero
);
3091 for (j
= 1; j
< c
->argc
; j
+= 2) {
3094 tryObjectEncoding(c
->argv
[j
+1]);
3095 retval
= dictAdd(c
->db
->dict
,c
->argv
[j
],c
->argv
[j
+1]);
3096 if (retval
== DICT_ERR
) {
3097 dictReplace(c
->db
->dict
,c
->argv
[j
],c
->argv
[j
+1]);
3098 incrRefCount(c
->argv
[j
+1]);
3100 incrRefCount(c
->argv
[j
]);
3101 incrRefCount(c
->argv
[j
+1]);
3103 removeExpire(c
->db
,c
->argv
[j
]);
3105 server
.dirty
+= (c
->argc
-1)/2;
3106 addReply(c
, nx
? shared
.cone
: shared
.ok
);
3109 static void msetCommand(redisClient
*c
) {
3110 msetGenericCommand(c
,0);
3113 static void msetnxCommand(redisClient
*c
) {
3114 msetGenericCommand(c
,1);
3117 static void incrDecrCommand(redisClient
*c
, long long incr
) {
3122 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3126 if (o
->type
!= REDIS_STRING
) {
3131 if (o
->encoding
== REDIS_ENCODING_RAW
)
3132 value
= strtoll(o
->ptr
, &eptr
, 10);
3133 else if (o
->encoding
== REDIS_ENCODING_INT
)
3134 value
= (long)o
->ptr
;
3136 redisAssert(1 != 1);
3141 o
= createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",value
));
3142 tryObjectEncoding(o
);
3143 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],o
);
3144 if (retval
== DICT_ERR
) {
3145 dictReplace(c
->db
->dict
,c
->argv
[1],o
);
3146 removeExpire(c
->db
,c
->argv
[1]);
3148 incrRefCount(c
->argv
[1]);
3151 addReply(c
,shared
.colon
);
3153 addReply(c
,shared
.crlf
);
3156 static void incrCommand(redisClient
*c
) {
3157 incrDecrCommand(c
,1);
3160 static void decrCommand(redisClient
*c
) {
3161 incrDecrCommand(c
,-1);
3164 static void incrbyCommand(redisClient
*c
) {
3165 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
3166 incrDecrCommand(c
,incr
);
3169 static void decrbyCommand(redisClient
*c
) {
3170 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
3171 incrDecrCommand(c
,-incr
);
3174 /* ========================= Type agnostic commands ========================= */
3176 static void delCommand(redisClient
*c
) {
3179 for (j
= 1; j
< c
->argc
; j
++) {
3180 if (deleteKey(c
->db
,c
->argv
[j
])) {
3187 addReply(c
,shared
.czero
);
3190 addReply(c
,shared
.cone
);
3193 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",deleted
));
3198 static void existsCommand(redisClient
*c
) {
3199 addReply(c
,lookupKeyRead(c
->db
,c
->argv
[1]) ? shared
.cone
: shared
.czero
);
3202 static void selectCommand(redisClient
*c
) {
3203 int id
= atoi(c
->argv
[1]->ptr
);
3205 if (selectDb(c
,id
) == REDIS_ERR
) {
3206 addReplySds(c
,sdsnew("-ERR invalid DB index\r\n"));
3208 addReply(c
,shared
.ok
);
3212 static void randomkeyCommand(redisClient
*c
) {
3216 de
= dictGetRandomKey(c
->db
->dict
);
3217 if (!de
|| expireIfNeeded(c
->db
,dictGetEntryKey(de
)) == 0) break;
3220 addReply(c
,shared
.plus
);
3221 addReply(c
,shared
.crlf
);
3223 addReply(c
,shared
.plus
);
3224 addReply(c
,dictGetEntryKey(de
));
3225 addReply(c
,shared
.crlf
);
3229 static void keysCommand(redisClient
*c
) {
3232 sds pattern
= c
->argv
[1]->ptr
;
3233 int plen
= sdslen(pattern
);
3234 unsigned long numkeys
= 0, keyslen
= 0;
3235 robj
*lenobj
= createObject(REDIS_STRING
,NULL
);
3237 di
= dictGetIterator(c
->db
->dict
);
3239 decrRefCount(lenobj
);
3240 while((de
= dictNext(di
)) != NULL
) {
3241 robj
*keyobj
= dictGetEntryKey(de
);
3243 sds key
= keyobj
->ptr
;
3244 if ((pattern
[0] == '*' && pattern
[1] == '\0') ||
3245 stringmatchlen(pattern
,plen
,key
,sdslen(key
),0)) {
3246 if (expireIfNeeded(c
->db
,keyobj
) == 0) {
3248 addReply(c
,shared
.space
);
3251 keyslen
+= sdslen(key
);
3255 dictReleaseIterator(di
);
3256 lenobj
->ptr
= sdscatprintf(sdsempty(),"$%lu\r\n",keyslen
+(numkeys
? (numkeys
-1) : 0));
3257 addReply(c
,shared
.crlf
);
3260 static void dbsizeCommand(redisClient
*c
) {
3262 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c
->db
->dict
)));
3265 static void lastsaveCommand(redisClient
*c
) {
3267 sdscatprintf(sdsempty(),":%lu\r\n",server
.lastsave
));
3270 static void typeCommand(redisClient
*c
) {
3274 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3279 case REDIS_STRING
: type
= "+string"; break;
3280 case REDIS_LIST
: type
= "+list"; break;
3281 case REDIS_SET
: type
= "+set"; break;
3282 case REDIS_ZSET
: type
= "+zset"; break;
3283 default: type
= "unknown"; break;
3286 addReplySds(c
,sdsnew(type
));
3287 addReply(c
,shared
.crlf
);
3290 static void saveCommand(redisClient
*c
) {
3291 if (server
.bgsavechildpid
!= -1) {
3292 addReplySds(c
,sdsnew("-ERR background save in progress\r\n"));
3295 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
3296 addReply(c
,shared
.ok
);
3298 addReply(c
,shared
.err
);
3302 static void bgsaveCommand(redisClient
*c
) {
3303 if (server
.bgsavechildpid
!= -1) {
3304 addReplySds(c
,sdsnew("-ERR background save already in progress\r\n"));
3307 if (rdbSaveBackground(server
.dbfilename
) == REDIS_OK
) {
3308 char *status
= "+Background saving started\r\n";
3309 addReplySds(c
,sdsnew(status
));
3311 addReply(c
,shared
.err
);
3315 static void shutdownCommand(redisClient
*c
) {
3316 redisLog(REDIS_WARNING
,"User requested shutdown, saving DB...");
3317 /* Kill the saving child if there is a background saving in progress.
3318 We want to avoid race conditions, for instance our saving child may
3319 overwrite the synchronous saving did by SHUTDOWN. */
3320 if (server
.bgsavechildpid
!= -1) {
3321 redisLog(REDIS_WARNING
,"There is a live saving child. Killing it!");
3322 kill(server
.bgsavechildpid
,SIGKILL
);
3323 rdbRemoveTempFile(server
.bgsavechildpid
);
3325 if (server
.appendonly
) {
3326 /* Append only file: fsync() the AOF and exit */
3327 fsync(server
.appendfd
);
3330 /* Snapshotting. Perform a SYNC SAVE and exit */
3331 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
3332 if (server
.daemonize
)
3333 unlink(server
.pidfile
);
3334 redisLog(REDIS_WARNING
,"%zu bytes used at exit",zmalloc_used_memory());
3335 redisLog(REDIS_WARNING
,"Server exit now, bye bye...");
3338 /* Ooops.. error saving! The best we can do is to continue operating.
3339 * Note that if there was a background saving process, in the next
3340 * cron() Redis will be notified that the background saving aborted,
3341 * handling special stuff like slaves pending for synchronization... */
3342 redisLog(REDIS_WARNING
,"Error trying to save the DB, can't exit");
3343 addReplySds(c
,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3348 static void renameGenericCommand(redisClient
*c
, int nx
) {
3351 /* To use the same key as src and dst is probably an error */
3352 if (sdscmp(c
->argv
[1]->ptr
,c
->argv
[2]->ptr
) == 0) {
3353 addReply(c
,shared
.sameobjecterr
);
3357 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3359 addReply(c
,shared
.nokeyerr
);
3363 deleteIfVolatile(c
->db
,c
->argv
[2]);
3364 if (dictAdd(c
->db
->dict
,c
->argv
[2],o
) == DICT_ERR
) {
3367 addReply(c
,shared
.czero
);
3370 dictReplace(c
->db
->dict
,c
->argv
[2],o
);
3372 incrRefCount(c
->argv
[2]);
3374 deleteKey(c
->db
,c
->argv
[1]);
3376 addReply(c
,nx
? shared
.cone
: shared
.ok
);
3379 static void renameCommand(redisClient
*c
) {
3380 renameGenericCommand(c
,0);
3383 static void renamenxCommand(redisClient
*c
) {
3384 renameGenericCommand(c
,1);
3387 static void moveCommand(redisClient
*c
) {
3392 /* Obtain source and target DB pointers */
3395 if (selectDb(c
,atoi(c
->argv
[2]->ptr
)) == REDIS_ERR
) {
3396 addReply(c
,shared
.outofrangeerr
);
3400 selectDb(c
,srcid
); /* Back to the source DB */
3402 /* If the user is moving using as target the same
3403 * DB as the source DB it is probably an error. */
3405 addReply(c
,shared
.sameobjecterr
);
3409 /* Check if the element exists and get a reference */
3410 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3412 addReply(c
,shared
.czero
);
3416 /* Try to add the element to the target DB */
3417 deleteIfVolatile(dst
,c
->argv
[1]);
3418 if (dictAdd(dst
->dict
,c
->argv
[1],o
) == DICT_ERR
) {
3419 addReply(c
,shared
.czero
);
3422 incrRefCount(c
->argv
[1]);
3425 /* OK! key moved, free the entry in the source DB */
3426 deleteKey(src
,c
->argv
[1]);
3428 addReply(c
,shared
.cone
);
3431 /* =================================== Lists ================================ */
3432 static void pushGenericCommand(redisClient
*c
, int where
) {
3436 lobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3438 lobj
= createListObject();
3440 if (where
== REDIS_HEAD
) {
3441 listAddNodeHead(list
,c
->argv
[2]);
3443 listAddNodeTail(list
,c
->argv
[2]);
3445 dictAdd(c
->db
->dict
,c
->argv
[1],lobj
);
3446 incrRefCount(c
->argv
[1]);
3447 incrRefCount(c
->argv
[2]);
3449 if (lobj
->type
!= REDIS_LIST
) {
3450 addReply(c
,shared
.wrongtypeerr
);
3454 if (where
== REDIS_HEAD
) {
3455 listAddNodeHead(list
,c
->argv
[2]);
3457 listAddNodeTail(list
,c
->argv
[2]);
3459 incrRefCount(c
->argv
[2]);
3462 addReply(c
,shared
.ok
);
3465 static void lpushCommand(redisClient
*c
) {
3466 pushGenericCommand(c
,REDIS_HEAD
);
3469 static void rpushCommand(redisClient
*c
) {
3470 pushGenericCommand(c
,REDIS_TAIL
);
3473 static void llenCommand(redisClient
*c
) {
3477 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3479 addReply(c
,shared
.czero
);
3482 if (o
->type
!= REDIS_LIST
) {
3483 addReply(c
,shared
.wrongtypeerr
);
3486 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",listLength(l
)));
3491 static void lindexCommand(redisClient
*c
) {
3493 int index
= atoi(c
->argv
[2]->ptr
);
3495 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3497 addReply(c
,shared
.nullbulk
);
3499 if (o
->type
!= REDIS_LIST
) {
3500 addReply(c
,shared
.wrongtypeerr
);
3502 list
*list
= o
->ptr
;
3505 ln
= listIndex(list
, index
);
3507 addReply(c
,shared
.nullbulk
);
3509 robj
*ele
= listNodeValue(ln
);
3510 addReplyBulkLen(c
,ele
);
3512 addReply(c
,shared
.crlf
);
3518 static void lsetCommand(redisClient
*c
) {
3520 int index
= atoi(c
->argv
[2]->ptr
);
3522 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3524 addReply(c
,shared
.nokeyerr
);
3526 if (o
->type
!= REDIS_LIST
) {
3527 addReply(c
,shared
.wrongtypeerr
);
3529 list
*list
= o
->ptr
;
3532 ln
= listIndex(list
, index
);
3534 addReply(c
,shared
.outofrangeerr
);
3536 robj
*ele
= listNodeValue(ln
);
3539 listNodeValue(ln
) = c
->argv
[3];
3540 incrRefCount(c
->argv
[3]);
3541 addReply(c
,shared
.ok
);
3548 static void popGenericCommand(redisClient
*c
, int where
) {
3551 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3553 addReply(c
,shared
.nullbulk
);
3555 if (o
->type
!= REDIS_LIST
) {
3556 addReply(c
,shared
.wrongtypeerr
);
3558 list
*list
= o
->ptr
;
3561 if (where
== REDIS_HEAD
)
3562 ln
= listFirst(list
);
3564 ln
= listLast(list
);
3567 addReply(c
,shared
.nullbulk
);
3569 robj
*ele
= listNodeValue(ln
);
3570 addReplyBulkLen(c
,ele
);
3572 addReply(c
,shared
.crlf
);
3573 listDelNode(list
,ln
);
3580 static void lpopCommand(redisClient
*c
) {
3581 popGenericCommand(c
,REDIS_HEAD
);
3584 static void rpopCommand(redisClient
*c
) {
3585 popGenericCommand(c
,REDIS_TAIL
);
3588 static void lrangeCommand(redisClient
*c
) {
3590 int start
= atoi(c
->argv
[2]->ptr
);
3591 int end
= atoi(c
->argv
[3]->ptr
);
3593 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3595 addReply(c
,shared
.nullmultibulk
);
3597 if (o
->type
!= REDIS_LIST
) {
3598 addReply(c
,shared
.wrongtypeerr
);
3600 list
*list
= o
->ptr
;
3602 int llen
= listLength(list
);
3606 /* convert negative indexes */
3607 if (start
< 0) start
= llen
+start
;
3608 if (end
< 0) end
= llen
+end
;
3609 if (start
< 0) start
= 0;
3610 if (end
< 0) end
= 0;
3612 /* indexes sanity checks */
3613 if (start
> end
|| start
>= llen
) {
3614 /* Out of range start or start > end result in empty list */
3615 addReply(c
,shared
.emptymultibulk
);
3618 if (end
>= llen
) end
= llen
-1;
3619 rangelen
= (end
-start
)+1;
3621 /* Return the result in form of a multi-bulk reply */
3622 ln
= listIndex(list
, start
);
3623 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",rangelen
));
3624 for (j
= 0; j
< rangelen
; j
++) {
3625 ele
= listNodeValue(ln
);
3626 addReplyBulkLen(c
,ele
);
3628 addReply(c
,shared
.crlf
);
3635 static void ltrimCommand(redisClient
*c
) {
3637 int start
= atoi(c
->argv
[2]->ptr
);
3638 int end
= atoi(c
->argv
[3]->ptr
);
3640 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3642 addReply(c
,shared
.ok
);
3644 if (o
->type
!= REDIS_LIST
) {
3645 addReply(c
,shared
.wrongtypeerr
);
3647 list
*list
= o
->ptr
;
3649 int llen
= listLength(list
);
3650 int j
, ltrim
, rtrim
;
3652 /* convert negative indexes */
3653 if (start
< 0) start
= llen
+start
;
3654 if (end
< 0) end
= llen
+end
;
3655 if (start
< 0) start
= 0;
3656 if (end
< 0) end
= 0;
3658 /* indexes sanity checks */
3659 if (start
> end
|| start
>= llen
) {
3660 /* Out of range start or start > end result in empty list */
3664 if (end
>= llen
) end
= llen
-1;
3669 /* Remove list elements to perform the trim */
3670 for (j
= 0; j
< ltrim
; j
++) {
3671 ln
= listFirst(list
);
3672 listDelNode(list
,ln
);
3674 for (j
= 0; j
< rtrim
; j
++) {
3675 ln
= listLast(list
);
3676 listDelNode(list
,ln
);
3679 addReply(c
,shared
.ok
);
3684 static void lremCommand(redisClient
*c
) {
3687 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3689 addReply(c
,shared
.czero
);
3691 if (o
->type
!= REDIS_LIST
) {
3692 addReply(c
,shared
.wrongtypeerr
);
3694 list
*list
= o
->ptr
;
3695 listNode
*ln
, *next
;
3696 int toremove
= atoi(c
->argv
[2]->ptr
);
3701 toremove
= -toremove
;
3704 ln
= fromtail
? list
->tail
: list
->head
;
3706 robj
*ele
= listNodeValue(ln
);
3708 next
= fromtail
? ln
->prev
: ln
->next
;
3709 if (compareStringObjects(ele
,c
->argv
[3]) == 0) {
3710 listDelNode(list
,ln
);
3713 if (toremove
&& removed
== toremove
) break;
3717 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",removed
));
3722 /* This is the semantic of this command:
3723 * RPOPLPUSH srclist dstlist:
3724 * IF LLEN(srclist) > 0
3725 * element = RPOP srclist
3726 * LPUSH dstlist element
3733 * The idea is to be able to get an element from a list in a reliable way
3734 * since the element is not just returned but pushed against another list
3735 * as well. This command was originally proposed by Ezra Zygmuntowicz.
3737 static void rpoplpushcommand(redisClient
*c
) {
3740 sobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3742 addReply(c
,shared
.nullbulk
);
3744 if (sobj
->type
!= REDIS_LIST
) {
3745 addReply(c
,shared
.wrongtypeerr
);
3747 list
*srclist
= sobj
->ptr
;
3748 listNode
*ln
= listLast(srclist
);
3751 addReply(c
,shared
.nullbulk
);
3753 robj
*dobj
= lookupKeyWrite(c
->db
,c
->argv
[2]);
3754 robj
*ele
= listNodeValue(ln
);
3759 /* Create the list if the key does not exist */
3760 dobj
= createListObject();
3761 dictAdd(c
->db
->dict
,c
->argv
[2],dobj
);
3762 incrRefCount(c
->argv
[2]);
3763 } else if (dobj
->type
!= REDIS_LIST
) {
3764 addReply(c
,shared
.wrongtypeerr
);
3767 /* Add the element to the target list */
3768 dstlist
= dobj
->ptr
;
3769 listAddNodeHead(dstlist
,ele
);
3772 /* Send the element to the client as reply as well */
3773 addReplyBulkLen(c
,ele
);
3775 addReply(c
,shared
.crlf
);
3777 /* Finally remove the element from the source list */
3778 listDelNode(srclist
,ln
);
3786 /* ==================================== Sets ================================ */
3788 static void saddCommand(redisClient
*c
) {
3791 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3793 set
= createSetObject();
3794 dictAdd(c
->db
->dict
,c
->argv
[1],set
);
3795 incrRefCount(c
->argv
[1]);
3797 if (set
->type
!= REDIS_SET
) {
3798 addReply(c
,shared
.wrongtypeerr
);
3802 if (dictAdd(set
->ptr
,c
->argv
[2],NULL
) == DICT_OK
) {
3803 incrRefCount(c
->argv
[2]);
3805 addReply(c
,shared
.cone
);
3807 addReply(c
,shared
.czero
);
3811 static void sremCommand(redisClient
*c
) {
3814 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3816 addReply(c
,shared
.czero
);
3818 if (set
->type
!= REDIS_SET
) {
3819 addReply(c
,shared
.wrongtypeerr
);
3822 if (dictDelete(set
->ptr
,c
->argv
[2]) == DICT_OK
) {
3824 if (htNeedsResize(set
->ptr
)) dictResize(set
->ptr
);
3825 addReply(c
,shared
.cone
);
3827 addReply(c
,shared
.czero
);
3832 static void smoveCommand(redisClient
*c
) {
3833 robj
*srcset
, *dstset
;
3835 srcset
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3836 dstset
= lookupKeyWrite(c
->db
,c
->argv
[2]);
3838 /* If the source key does not exist return 0, if it's of the wrong type
3840 if (srcset
== NULL
|| srcset
->type
!= REDIS_SET
) {
3841 addReply(c
, srcset
? shared
.wrongtypeerr
: shared
.czero
);
3844 /* Error if the destination key is not a set as well */
3845 if (dstset
&& dstset
->type
!= REDIS_SET
) {
3846 addReply(c
,shared
.wrongtypeerr
);
3849 /* Remove the element from the source set */
3850 if (dictDelete(srcset
->ptr
,c
->argv
[3]) == DICT_ERR
) {
3851 /* Key not found in the src set! return zero */
3852 addReply(c
,shared
.czero
);
3856 /* Add the element to the destination set */
3858 dstset
= createSetObject();
3859 dictAdd(c
->db
->dict
,c
->argv
[2],dstset
);
3860 incrRefCount(c
->argv
[2]);
3862 if (dictAdd(dstset
->ptr
,c
->argv
[3],NULL
) == DICT_OK
)
3863 incrRefCount(c
->argv
[3]);
3864 addReply(c
,shared
.cone
);
3867 static void sismemberCommand(redisClient
*c
) {
3870 set
= lookupKeyRead(c
->db
,c
->argv
[1]);
3872 addReply(c
,shared
.czero
);
3874 if (set
->type
!= REDIS_SET
) {
3875 addReply(c
,shared
.wrongtypeerr
);
3878 if (dictFind(set
->ptr
,c
->argv
[2]))
3879 addReply(c
,shared
.cone
);
3881 addReply(c
,shared
.czero
);
3885 static void scardCommand(redisClient
*c
) {
3889 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3891 addReply(c
,shared
.czero
);
3894 if (o
->type
!= REDIS_SET
) {
3895 addReply(c
,shared
.wrongtypeerr
);
3898 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",
3904 static void spopCommand(redisClient
*c
) {
3908 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3910 addReply(c
,shared
.nullbulk
);
3912 if (set
->type
!= REDIS_SET
) {
3913 addReply(c
,shared
.wrongtypeerr
);
3916 de
= dictGetRandomKey(set
->ptr
);
3918 addReply(c
,shared
.nullbulk
);
3920 robj
*ele
= dictGetEntryKey(de
);
3922 addReplyBulkLen(c
,ele
);
3924 addReply(c
,shared
.crlf
);
3925 dictDelete(set
->ptr
,ele
);
3926 if (htNeedsResize(set
->ptr
)) dictResize(set
->ptr
);
3932 static void srandmemberCommand(redisClient
*c
) {
3936 set
= lookupKeyRead(c
->db
,c
->argv
[1]);
3938 addReply(c
,shared
.nullbulk
);
3940 if (set
->type
!= REDIS_SET
) {
3941 addReply(c
,shared
.wrongtypeerr
);
3944 de
= dictGetRandomKey(set
->ptr
);
3946 addReply(c
,shared
.nullbulk
);
3948 robj
*ele
= dictGetEntryKey(de
);
3950 addReplyBulkLen(c
,ele
);
3952 addReply(c
,shared
.crlf
);
3957 static int qsortCompareSetsByCardinality(const void *s1
, const void *s2
) {
3958 dict
**d1
= (void*) s1
, **d2
= (void*) s2
;
3960 return dictSize(*d1
)-dictSize(*d2
);
3963 static void sinterGenericCommand(redisClient
*c
, robj
**setskeys
, unsigned long setsnum
, robj
*dstkey
) {
3964 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
3967 robj
*lenobj
= NULL
, *dstset
= NULL
;
3968 unsigned long j
, cardinality
= 0;
3970 for (j
= 0; j
< setsnum
; j
++) {
3974 lookupKeyWrite(c
->db
,setskeys
[j
]) :
3975 lookupKeyRead(c
->db
,setskeys
[j
]);
3979 if (deleteKey(c
->db
,dstkey
))
3981 addReply(c
,shared
.czero
);
3983 addReply(c
,shared
.nullmultibulk
);
3987 if (setobj
->type
!= REDIS_SET
) {
3989 addReply(c
,shared
.wrongtypeerr
);
3992 dv
[j
] = setobj
->ptr
;
3994 /* Sort sets from the smallest to largest, this will improve our
3995 * algorithm's performace */
3996 qsort(dv
,setsnum
,sizeof(dict
*),qsortCompareSetsByCardinality
);
3998 /* The first thing we should output is the total number of elements...
3999 * since this is a multi-bulk write, but at this stage we don't know
4000 * the intersection set size, so we use a trick, append an empty object
4001 * to the output list and save the pointer to later modify it with the
4004 lenobj
= createObject(REDIS_STRING
,NULL
);
4006 decrRefCount(lenobj
);
4008 /* If we have a target key where to store the resulting set
4009 * create this key with an empty set inside */
4010 dstset
= createSetObject();
4013 /* Iterate all the elements of the first (smallest) set, and test
4014 * the element against all the other sets, if at least one set does
4015 * not include the element it is discarded */
4016 di
= dictGetIterator(dv
[0]);
4018 while((de
= dictNext(di
)) != NULL
) {
4021 for (j
= 1; j
< setsnum
; j
++)
4022 if (dictFind(dv
[j
],dictGetEntryKey(de
)) == NULL
) break;
4024 continue; /* at least one set does not contain the member */
4025 ele
= dictGetEntryKey(de
);
4027 addReplyBulkLen(c
,ele
);
4029 addReply(c
,shared
.crlf
);
4032 dictAdd(dstset
->ptr
,ele
,NULL
);
4036 dictReleaseIterator(di
);
4039 /* Store the resulting set into the target */
4040 deleteKey(c
->db
,dstkey
);
4041 dictAdd(c
->db
->dict
,dstkey
,dstset
);
4042 incrRefCount(dstkey
);
4046 lenobj
->ptr
= sdscatprintf(sdsempty(),"*%lu\r\n",cardinality
);
4048 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",
4049 dictSize((dict
*)dstset
->ptr
)));
4055 static void sinterCommand(redisClient
*c
) {
4056 sinterGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
);
4059 static void sinterstoreCommand(redisClient
*c
) {
4060 sinterGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1]);
4063 #define REDIS_OP_UNION 0
4064 #define REDIS_OP_DIFF 1
4066 static void sunionDiffGenericCommand(redisClient
*c
, robj
**setskeys
, int setsnum
, robj
*dstkey
, int op
) {
4067 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
4070 robj
*dstset
= NULL
;
4071 int j
, cardinality
= 0;
4073 for (j
= 0; j
< setsnum
; j
++) {
4077 lookupKeyWrite(c
->db
,setskeys
[j
]) :
4078 lookupKeyRead(c
->db
,setskeys
[j
]);
4083 if (setobj
->type
!= REDIS_SET
) {
4085 addReply(c
,shared
.wrongtypeerr
);
4088 dv
[j
] = setobj
->ptr
;
4091 /* We need a temp set object to store our union. If the dstkey
4092 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
4093 * this set object will be the resulting object to set into the target key*/
4094 dstset
= createSetObject();
4096 /* Iterate all the elements of all the sets, add every element a single
4097 * time to the result set */
4098 for (j
= 0; j
< setsnum
; j
++) {
4099 if (op
== REDIS_OP_DIFF
&& j
== 0 && !dv
[j
]) break; /* result set is empty */
4100 if (!dv
[j
]) continue; /* non existing keys are like empty sets */
4102 di
= dictGetIterator(dv
[j
]);
4104 while((de
= dictNext(di
)) != NULL
) {
4107 /* dictAdd will not add the same element multiple times */
4108 ele
= dictGetEntryKey(de
);
4109 if (op
== REDIS_OP_UNION
|| j
== 0) {
4110 if (dictAdd(dstset
->ptr
,ele
,NULL
) == DICT_OK
) {
4114 } else if (op
== REDIS_OP_DIFF
) {
4115 if (dictDelete(dstset
->ptr
,ele
) == DICT_OK
) {
4120 dictReleaseIterator(di
);
4122 if (op
== REDIS_OP_DIFF
&& cardinality
== 0) break; /* result set is empty */
4125 /* Output the content of the resulting set, if not in STORE mode */
4127 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",cardinality
));
4128 di
= dictGetIterator(dstset
->ptr
);
4129 while((de
= dictNext(di
)) != NULL
) {
4132 ele
= dictGetEntryKey(de
);
4133 addReplyBulkLen(c
,ele
);
4135 addReply(c
,shared
.crlf
);
4137 dictReleaseIterator(di
);
4139 /* If we have a target key where to store the resulting set
4140 * create this key with the result set inside */
4141 deleteKey(c
->db
,dstkey
);
4142 dictAdd(c
->db
->dict
,dstkey
,dstset
);
4143 incrRefCount(dstkey
);
4148 decrRefCount(dstset
);
4150 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",
4151 dictSize((dict
*)dstset
->ptr
)));
4157 static void sunionCommand(redisClient
*c
) {
4158 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_UNION
);
4161 static void sunionstoreCommand(redisClient
*c
) {
4162 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_UNION
);
4165 static void sdiffCommand(redisClient
*c
) {
4166 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_DIFF
);
4169 static void sdiffstoreCommand(redisClient
*c
) {
4170 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_DIFF
);
4173 /* ==================================== ZSets =============================== */
4175 /* ZSETs are ordered sets using two data structures to hold the same elements
4176 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
4179 * The elements are added to an hash table mapping Redis objects to scores.
4180 * At the same time the elements are added to a skip list mapping scores
4181 * to Redis objects (so objects are sorted by scores in this "view"). */
4183 /* This skiplist implementation is almost a C translation of the original
4184 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
4185 * Alternative to Balanced Trees", modified in three ways:
4186 * a) this implementation allows for repeated values.
4187 * b) the comparison is not just by key (our 'score') but by satellite data.
4188 * c) there is a back pointer, so it's a doubly linked list with the back
4189 * pointers being only at "level 1". This allows to traverse the list
4190 * from tail to head, useful for ZREVRANGE. */
4192 static zskiplistNode
*zslCreateNode(int level
, double score
, robj
*obj
) {
4193 zskiplistNode
*zn
= zmalloc(sizeof(*zn
));
4195 zn
->forward
= zmalloc(sizeof(zskiplistNode
*) * level
);
4201 static zskiplist
*zslCreate(void) {
4205 zsl
= zmalloc(sizeof(*zsl
));
4208 zsl
->header
= zslCreateNode(ZSKIPLIST_MAXLEVEL
,0,NULL
);
4209 for (j
= 0; j
< ZSKIPLIST_MAXLEVEL
; j
++)
4210 zsl
->header
->forward
[j
] = NULL
;
4211 zsl
->header
->backward
= NULL
;
4216 static void zslFreeNode(zskiplistNode
*node
) {
4217 decrRefCount(node
->obj
);
4218 zfree(node
->forward
);
4222 static void zslFree(zskiplist
*zsl
) {
4223 zskiplistNode
*node
= zsl
->header
->forward
[0], *next
;
4225 zfree(zsl
->header
->forward
);
4228 next
= node
->forward
[0];
4235 static int zslRandomLevel(void) {
4237 while ((random()&0xFFFF) < (ZSKIPLIST_P
* 0xFFFF))
4242 static void zslInsert(zskiplist
*zsl
, double score
, robj
*obj
) {
4243 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
4247 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4248 while (x
->forward
[i
] &&
4249 (x
->forward
[i
]->score
< score
||
4250 (x
->forward
[i
]->score
== score
&&
4251 compareStringObjects(x
->forward
[i
]->obj
,obj
) < 0)))
4255 /* we assume the key is not already inside, since we allow duplicated
4256 * scores, and the re-insertion of score and redis object should never
4257 * happpen since the caller of zslInsert() should test in the hash table
4258 * if the element is already inside or not. */
4259 level
= zslRandomLevel();
4260 if (level
> zsl
->level
) {
4261 for (i
= zsl
->level
; i
< level
; i
++)
4262 update
[i
] = zsl
->header
;
4265 x
= zslCreateNode(level
,score
,obj
);
4266 for (i
= 0; i
< level
; i
++) {
4267 x
->forward
[i
] = update
[i
]->forward
[i
];
4268 update
[i
]->forward
[i
] = x
;
4270 x
->backward
= (update
[0] == zsl
->header
) ? NULL
: update
[0];
4272 x
->forward
[0]->backward
= x
;
4278 /* Delete an element with matching score/object from the skiplist. */
4279 static int zslDelete(zskiplist
*zsl
, double score
, robj
*obj
) {
4280 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
4284 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4285 while (x
->forward
[i
] &&
4286 (x
->forward
[i
]->score
< score
||
4287 (x
->forward
[i
]->score
== score
&&
4288 compareStringObjects(x
->forward
[i
]->obj
,obj
) < 0)))
4292 /* We may have multiple elements with the same score, what we need
4293 * is to find the element with both the right score and object. */
4295 if (x
&& score
== x
->score
&& compareStringObjects(x
->obj
,obj
) == 0) {
4296 for (i
= 0; i
< zsl
->level
; i
++) {
4297 if (update
[i
]->forward
[i
] != x
) break;
4298 update
[i
]->forward
[i
] = x
->forward
[i
];
4300 if (x
->forward
[0]) {
4301 x
->forward
[0]->backward
= (x
->backward
== zsl
->header
) ?
4304 zsl
->tail
= x
->backward
;
4307 while(zsl
->level
> 1 && zsl
->header
->forward
[zsl
->level
-1] == NULL
)
4312 return 0; /* not found */
4314 return 0; /* not found */
4317 /* Delete all the elements with score between min and max from the skiplist.
4318 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
4319 * Note that this function takes the reference to the hash table view of the
4320 * sorted set, in order to remove the elements from the hash table too. */
4321 static unsigned long zslDeleteRange(zskiplist
*zsl
, double min
, double max
, dict
*dict
) {
4322 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
4323 unsigned long removed
= 0;
4327 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4328 while (x
->forward
[i
] && x
->forward
[i
]->score
< min
)
4332 /* We may have multiple elements with the same score, what we need
4333 * is to find the element with both the right score and object. */
4335 while (x
&& x
->score
<= max
) {
4336 zskiplistNode
*next
;
4338 for (i
= 0; i
< zsl
->level
; i
++) {
4339 if (update
[i
]->forward
[i
] != x
) break;
4340 update
[i
]->forward
[i
] = x
->forward
[i
];
4342 if (x
->forward
[0]) {
4343 x
->forward
[0]->backward
= (x
->backward
== zsl
->header
) ?
4346 zsl
->tail
= x
->backward
;
4348 next
= x
->forward
[0];
4349 dictDelete(dict
,x
->obj
);
4351 while(zsl
->level
> 1 && zsl
->header
->forward
[zsl
->level
-1] == NULL
)
4357 return removed
; /* not found */
4360 /* Find the first node having a score equal or greater than the specified one.
4361 * Returns NULL if there is no match. */
4362 static zskiplistNode
*zslFirstWithScore(zskiplist
*zsl
, double score
) {
4367 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4368 while (x
->forward
[i
] && x
->forward
[i
]->score
< score
)
4371 /* We may have multiple elements with the same score, what we need
4372 * is to find the element with both the right score and object. */
4373 return x
->forward
[0];
4376 /* The actual Z-commands implementations */
4378 /* This generic command implements both ZADD and ZINCRBY.
4379 * scoreval is the score if the operation is a ZADD (doincrement == 0) or
4380 * the increment if the operation is a ZINCRBY (doincrement == 1). */
4381 static void zaddGenericCommand(redisClient
*c
, robj
*key
, robj
*ele
, double scoreval
, int doincrement
) {
4386 zsetobj
= lookupKeyWrite(c
->db
,key
);
4387 if (zsetobj
== NULL
) {
4388 zsetobj
= createZsetObject();
4389 dictAdd(c
->db
->dict
,key
,zsetobj
);
4392 if (zsetobj
->type
!= REDIS_ZSET
) {
4393 addReply(c
,shared
.wrongtypeerr
);
4399 /* Ok now since we implement both ZADD and ZINCRBY here the code
4400 * needs to handle the two different conditions. It's all about setting
4401 * '*score', that is, the new score to set, to the right value. */
4402 score
= zmalloc(sizeof(double));
4406 /* Read the old score. If the element was not present starts from 0 */
4407 de
= dictFind(zs
->dict
,ele
);
4409 double *oldscore
= dictGetEntryVal(de
);
4410 *score
= *oldscore
+ scoreval
;
4418 /* What follows is a simple remove and re-insert operation that is common
4419 * to both ZADD and ZINCRBY... */
4420 if (dictAdd(zs
->dict
,ele
,score
) == DICT_OK
) {
4421 /* case 1: New element */
4422 incrRefCount(ele
); /* added to hash */
4423 zslInsert(zs
->zsl
,*score
,ele
);
4424 incrRefCount(ele
); /* added to skiplist */
4427 addReplyDouble(c
,*score
);
4429 addReply(c
,shared
.cone
);
4434 /* case 2: Score update operation */
4435 de
= dictFind(zs
->dict
,ele
);
4436 redisAssert(de
!= NULL
);
4437 oldscore
= dictGetEntryVal(de
);
4438 if (*score
!= *oldscore
) {
4441 /* Remove and insert the element in the skip list with new score */
4442 deleted
= zslDelete(zs
->zsl
,*oldscore
,ele
);
4443 redisAssert(deleted
!= 0);
4444 zslInsert(zs
->zsl
,*score
,ele
);
4446 /* Update the score in the hash table */
4447 dictReplace(zs
->dict
,ele
,score
);
4453 addReplyDouble(c
,*score
);
4455 addReply(c
,shared
.czero
);
4459 static void zaddCommand(redisClient
*c
) {
4462 scoreval
= strtod(c
->argv
[2]->ptr
,NULL
);
4463 zaddGenericCommand(c
,c
->argv
[1],c
->argv
[3],scoreval
,0);
4466 static void zincrbyCommand(redisClient
*c
) {
4469 scoreval
= strtod(c
->argv
[2]->ptr
,NULL
);
4470 zaddGenericCommand(c
,c
->argv
[1],c
->argv
[3],scoreval
,1);
4473 static void zremCommand(redisClient
*c
) {
4477 zsetobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4478 if (zsetobj
== NULL
) {
4479 addReply(c
,shared
.czero
);
4485 if (zsetobj
->type
!= REDIS_ZSET
) {
4486 addReply(c
,shared
.wrongtypeerr
);
4490 de
= dictFind(zs
->dict
,c
->argv
[2]);
4492 addReply(c
,shared
.czero
);
4495 /* Delete from the skiplist */
4496 oldscore
= dictGetEntryVal(de
);
4497 deleted
= zslDelete(zs
->zsl
,*oldscore
,c
->argv
[2]);
4498 redisAssert(deleted
!= 0);
4500 /* Delete from the hash table */
4501 dictDelete(zs
->dict
,c
->argv
[2]);
4502 if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
);
4504 addReply(c
,shared
.cone
);
4508 static void zremrangebyscoreCommand(redisClient
*c
) {
4509 double min
= strtod(c
->argv
[2]->ptr
,NULL
);
4510 double max
= strtod(c
->argv
[3]->ptr
,NULL
);
4514 zsetobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4515 if (zsetobj
== NULL
) {
4516 addReply(c
,shared
.czero
);
4520 if (zsetobj
->type
!= REDIS_ZSET
) {
4521 addReply(c
,shared
.wrongtypeerr
);
4525 deleted
= zslDeleteRange(zs
->zsl
,min
,max
,zs
->dict
);
4526 if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
);
4527 server
.dirty
+= deleted
;
4528 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",deleted
));
4532 static void zrangeGenericCommand(redisClient
*c
, int reverse
) {
4534 int start
= atoi(c
->argv
[2]->ptr
);
4535 int end
= atoi(c
->argv
[3]->ptr
);
4538 if (c
->argc
== 5 && !strcasecmp(c
->argv
[4]->ptr
,"withscores")) {
4540 } else if (c
->argc
>= 5) {
4541 addReply(c
,shared
.syntaxerr
);
4545 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4547 addReply(c
,shared
.nullmultibulk
);
4549 if (o
->type
!= REDIS_ZSET
) {
4550 addReply(c
,shared
.wrongtypeerr
);
4552 zset
*zsetobj
= o
->ptr
;
4553 zskiplist
*zsl
= zsetobj
->zsl
;
4556 int llen
= zsl
->length
;
4560 /* convert negative indexes */
4561 if (start
< 0) start
= llen
+start
;
4562 if (end
< 0) end
= llen
+end
;
4563 if (start
< 0) start
= 0;
4564 if (end
< 0) end
= 0;
4566 /* indexes sanity checks */
4567 if (start
> end
|| start
>= llen
) {
4568 /* Out of range start or start > end result in empty list */
4569 addReply(c
,shared
.emptymultibulk
);
4572 if (end
>= llen
) end
= llen
-1;
4573 rangelen
= (end
-start
)+1;
4575 /* Return the result in form of a multi-bulk reply */
4581 ln
= zsl
->header
->forward
[0];
4583 ln
= ln
->forward
[0];
4586 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",
4587 withscores
? (rangelen
*2) : rangelen
));
4588 for (j
= 0; j
< rangelen
; j
++) {
4590 addReplyBulkLen(c
,ele
);
4592 addReply(c
,shared
.crlf
);
4594 addReplyDouble(c
,ln
->score
);
4595 ln
= reverse
? ln
->backward
: ln
->forward
[0];
4601 static void zrangeCommand(redisClient
*c
) {
4602 zrangeGenericCommand(c
,0);
4605 static void zrevrangeCommand(redisClient
*c
) {
4606 zrangeGenericCommand(c
,1);
4609 static void zrangebyscoreCommand(redisClient
*c
) {
4611 double min
= strtod(c
->argv
[2]->ptr
,NULL
);
4612 double max
= strtod(c
->argv
[3]->ptr
,NULL
);
4613 int offset
= 0, limit
= -1;
4615 if (c
->argc
!= 4 && c
->argc
!= 7) {
4617 sdsnew("-ERR wrong number of arguments for ZRANGEBYSCORE\r\n"));
4619 } else if (c
->argc
== 7 && strcasecmp(c
->argv
[4]->ptr
,"limit")) {
4620 addReply(c
,shared
.syntaxerr
);
4622 } else if (c
->argc
== 7) {
4623 offset
= atoi(c
->argv
[5]->ptr
);
4624 limit
= atoi(c
->argv
[6]->ptr
);
4625 if (offset
< 0) offset
= 0;
4628 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4630 addReply(c
,shared
.nullmultibulk
);
4632 if (o
->type
!= REDIS_ZSET
) {
4633 addReply(c
,shared
.wrongtypeerr
);
4635 zset
*zsetobj
= o
->ptr
;
4636 zskiplist
*zsl
= zsetobj
->zsl
;
4639 unsigned int rangelen
= 0;
4641 /* Get the first node with the score >= min */
4642 ln
= zslFirstWithScore(zsl
,min
);
4644 /* No element matching the speciifed interval */
4645 addReply(c
,shared
.emptymultibulk
);
4649 /* We don't know in advance how many matching elements there
4650 * are in the list, so we push this object that will represent
4651 * the multi-bulk length in the output buffer, and will "fix"
4653 lenobj
= createObject(REDIS_STRING
,NULL
);
4655 decrRefCount(lenobj
);
4657 while(ln
&& ln
->score
<= max
) {
4660 ln
= ln
->forward
[0];
4663 if (limit
== 0) break;
4665 addReplyBulkLen(c
,ele
);
4667 addReply(c
,shared
.crlf
);
4668 ln
= ln
->forward
[0];
4670 if (limit
> 0) limit
--;
4672 lenobj
->ptr
= sdscatprintf(sdsempty(),"*%d\r\n",rangelen
);
4677 static void zcardCommand(redisClient
*c
) {
4681 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4683 addReply(c
,shared
.czero
);
4686 if (o
->type
!= REDIS_ZSET
) {
4687 addReply(c
,shared
.wrongtypeerr
);
4690 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",zs
->zsl
->length
));
4695 static void zscoreCommand(redisClient
*c
) {
4699 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4701 addReply(c
,shared
.nullbulk
);
4704 if (o
->type
!= REDIS_ZSET
) {
4705 addReply(c
,shared
.wrongtypeerr
);
4710 de
= dictFind(zs
->dict
,c
->argv
[2]);
4712 addReply(c
,shared
.nullbulk
);
4714 double *score
= dictGetEntryVal(de
);
4716 addReplyDouble(c
,*score
);
4722 /* ========================= Non type-specific commands ==================== */
4724 static void flushdbCommand(redisClient
*c
) {
4725 server
.dirty
+= dictSize(c
->db
->dict
);
4726 dictEmpty(c
->db
->dict
);
4727 dictEmpty(c
->db
->expires
);
4728 addReply(c
,shared
.ok
);
4731 static void flushallCommand(redisClient
*c
) {
4732 server
.dirty
+= emptyDb();
4733 addReply(c
,shared
.ok
);
4734 rdbSave(server
.dbfilename
);
4738 static redisSortOperation
*createSortOperation(int type
, robj
*pattern
) {
4739 redisSortOperation
*so
= zmalloc(sizeof(*so
));
4741 so
->pattern
= pattern
;
4745 /* Return the value associated to the key with a name obtained
4746 * substituting the first occurence of '*' in 'pattern' with 'subst' */
4747 static robj
*lookupKeyByPattern(redisDb
*db
, robj
*pattern
, robj
*subst
) {
4751 int prefixlen
, sublen
, postfixlen
;
4752 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
4756 char buf
[REDIS_SORTKEY_MAX
+1];
4759 /* If the pattern is "#" return the substitution object itself in order
4760 * to implement the "SORT ... GET #" feature. */
4761 spat
= pattern
->ptr
;
4762 if (spat
[0] == '#' && spat
[1] == '\0') {
4766 /* The substitution object may be specially encoded. If so we create
4767 * a decoded object on the fly. Otherwise getDecodedObject will just
4768 * increment the ref count, that we'll decrement later. */
4769 subst
= getDecodedObject(subst
);
4772 if (sdslen(spat
)+sdslen(ssub
)-1 > REDIS_SORTKEY_MAX
) return NULL
;
4773 p
= strchr(spat
,'*');
4775 decrRefCount(subst
);
4780 sublen
= sdslen(ssub
);
4781 postfixlen
= sdslen(spat
)-(prefixlen
+1);
4782 memcpy(keyname
.buf
,spat
,prefixlen
);
4783 memcpy(keyname
.buf
+prefixlen
,ssub
,sublen
);
4784 memcpy(keyname
.buf
+prefixlen
+sublen
,p
+1,postfixlen
);
4785 keyname
.buf
[prefixlen
+sublen
+postfixlen
] = '\0';
4786 keyname
.len
= prefixlen
+sublen
+postfixlen
;
4788 initStaticStringObject(keyobj
,((char*)&keyname
)+(sizeof(long)*2))
4789 decrRefCount(subst
);
4791 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
4792 return lookupKeyRead(db
,&keyobj
);
4795 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
4796 * the additional parameter is not standard but a BSD-specific we have to
4797 * pass sorting parameters via the global 'server' structure */
4798 static int sortCompare(const void *s1
, const void *s2
) {
4799 const redisSortObject
*so1
= s1
, *so2
= s2
;
4802 if (!server
.sort_alpha
) {
4803 /* Numeric sorting. Here it's trivial as we precomputed scores */
4804 if (so1
->u
.score
> so2
->u
.score
) {
4806 } else if (so1
->u
.score
< so2
->u
.score
) {
4812 /* Alphanumeric sorting */
4813 if (server
.sort_bypattern
) {
4814 if (!so1
->u
.cmpobj
|| !so2
->u
.cmpobj
) {
4815 /* At least one compare object is NULL */
4816 if (so1
->u
.cmpobj
== so2
->u
.cmpobj
)
4818 else if (so1
->u
.cmpobj
== NULL
)
4823 /* We have both the objects, use strcoll */
4824 cmp
= strcoll(so1
->u
.cmpobj
->ptr
,so2
->u
.cmpobj
->ptr
);
4827 /* Compare elements directly */
4830 dec1
= getDecodedObject(so1
->obj
);
4831 dec2
= getDecodedObject(so2
->obj
);
4832 cmp
= strcoll(dec1
->ptr
,dec2
->ptr
);
4837 return server
.sort_desc
? -cmp
: cmp
;
4840 /* The SORT command is the most complex command in Redis. Warning: this code
4841 * is optimized for speed and a bit less for readability */
4842 static void sortCommand(redisClient
*c
) {
4845 int desc
= 0, alpha
= 0;
4846 int limit_start
= 0, limit_count
= -1, start
, end
;
4847 int j
, dontsort
= 0, vectorlen
;
4848 int getop
= 0; /* GET operation counter */
4849 robj
*sortval
, *sortby
= NULL
, *storekey
= NULL
;
4850 redisSortObject
*vector
; /* Resulting vector to sort */
4852 /* Lookup the key to sort. It must be of the right types */
4853 sortval
= lookupKeyRead(c
->db
,c
->argv
[1]);
4854 if (sortval
== NULL
) {
4855 addReply(c
,shared
.nullmultibulk
);
4858 if (sortval
->type
!= REDIS_SET
&& sortval
->type
!= REDIS_LIST
&&
4859 sortval
->type
!= REDIS_ZSET
)
4861 addReply(c
,shared
.wrongtypeerr
);
4865 /* Create a list of operations to perform for every sorted element.
4866 * Operations can be GET/DEL/INCR/DECR */
4867 operations
= listCreate();
4868 listSetFreeMethod(operations
,zfree
);
4871 /* Now we need to protect sortval incrementing its count, in the future
4872 * SORT may have options able to overwrite/delete keys during the sorting
4873 * and the sorted key itself may get destroied */
4874 incrRefCount(sortval
);
4876 /* The SORT command has an SQL-alike syntax, parse it */
4877 while(j
< c
->argc
) {
4878 int leftargs
= c
->argc
-j
-1;
4879 if (!strcasecmp(c
->argv
[j
]->ptr
,"asc")) {
4881 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"desc")) {
4883 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"alpha")) {
4885 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"limit") && leftargs
>= 2) {
4886 limit_start
= atoi(c
->argv
[j
+1]->ptr
);
4887 limit_count
= atoi(c
->argv
[j
+2]->ptr
);
4889 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"store") && leftargs
>= 1) {
4890 storekey
= c
->argv
[j
+1];
4892 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"by") && leftargs
>= 1) {
4893 sortby
= c
->argv
[j
+1];
4894 /* If the BY pattern does not contain '*', i.e. it is constant,
4895 * we don't need to sort nor to lookup the weight keys. */
4896 if (strchr(c
->argv
[j
+1]->ptr
,'*') == NULL
) dontsort
= 1;
4898 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"get") && leftargs
>= 1) {
4899 listAddNodeTail(operations
,createSortOperation(
4900 REDIS_SORT_GET
,c
->argv
[j
+1]));
4904 decrRefCount(sortval
);
4905 listRelease(operations
);
4906 addReply(c
,shared
.syntaxerr
);
4912 /* Load the sorting vector with all the objects to sort */
4913 switch(sortval
->type
) {
4914 case REDIS_LIST
: vectorlen
= listLength((list
*)sortval
->ptr
); break;
4915 case REDIS_SET
: vectorlen
= dictSize((dict
*)sortval
->ptr
); break;
4916 case REDIS_ZSET
: vectorlen
= dictSize(((zset
*)sortval
->ptr
)->dict
); break;
4917 default: vectorlen
= 0; redisAssert(0); /* Avoid GCC warning */
4919 vector
= zmalloc(sizeof(redisSortObject
)*vectorlen
);
4922 if (sortval
->type
== REDIS_LIST
) {
4923 list
*list
= sortval
->ptr
;
4927 while((ln
= listYield(list
))) {
4928 robj
*ele
= ln
->value
;
4929 vector
[j
].obj
= ele
;
4930 vector
[j
].u
.score
= 0;
4931 vector
[j
].u
.cmpobj
= NULL
;
4939 if (sortval
->type
== REDIS_SET
) {
4942 zset
*zs
= sortval
->ptr
;
4946 di
= dictGetIterator(set
);
4947 while((setele
= dictNext(di
)) != NULL
) {
4948 vector
[j
].obj
= dictGetEntryKey(setele
);
4949 vector
[j
].u
.score
= 0;
4950 vector
[j
].u
.cmpobj
= NULL
;
4953 dictReleaseIterator(di
);
4955 redisAssert(j
== vectorlen
);
4957 /* Now it's time to load the right scores in the sorting vector */
4958 if (dontsort
== 0) {
4959 for (j
= 0; j
< vectorlen
; j
++) {
4963 byval
= lookupKeyByPattern(c
->db
,sortby
,vector
[j
].obj
);
4964 if (!byval
|| byval
->type
!= REDIS_STRING
) continue;
4966 vector
[j
].u
.cmpobj
= getDecodedObject(byval
);
4968 if (byval
->encoding
== REDIS_ENCODING_RAW
) {
4969 vector
[j
].u
.score
= strtod(byval
->ptr
,NULL
);
4971 /* Don't need to decode the object if it's
4972 * integer-encoded (the only encoding supported) so
4973 * far. We can just cast it */
4974 if (byval
->encoding
== REDIS_ENCODING_INT
) {
4975 vector
[j
].u
.score
= (long)byval
->ptr
;
4977 redisAssert(1 != 1);
4982 if (vector
[j
].obj
->encoding
== REDIS_ENCODING_RAW
)
4983 vector
[j
].u
.score
= strtod(vector
[j
].obj
->ptr
,NULL
);
4985 if (vector
[j
].obj
->encoding
== REDIS_ENCODING_INT
)
4986 vector
[j
].u
.score
= (long) vector
[j
].obj
->ptr
;
4988 redisAssert(1 != 1);
4995 /* We are ready to sort the vector... perform a bit of sanity check
4996 * on the LIMIT option too. We'll use a partial version of quicksort. */
4997 start
= (limit_start
< 0) ? 0 : limit_start
;
4998 end
= (limit_count
< 0) ? vectorlen
-1 : start
+limit_count
-1;
4999 if (start
>= vectorlen
) {
5000 start
= vectorlen
-1;
5003 if (end
>= vectorlen
) end
= vectorlen
-1;
5005 if (dontsort
== 0) {
5006 server
.sort_desc
= desc
;
5007 server
.sort_alpha
= alpha
;
5008 server
.sort_bypattern
= sortby
? 1 : 0;
5009 if (sortby
&& (start
!= 0 || end
!= vectorlen
-1))
5010 pqsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
, start
,end
);
5012 qsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
);
5015 /* Send command output to the output buffer, performing the specified
5016 * GET/DEL/INCR/DECR operations if any. */
5017 outputlen
= getop
? getop
*(end
-start
+1) : end
-start
+1;
5018 if (storekey
== NULL
) {
5019 /* STORE option not specified, sent the sorting result to client */
5020 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",outputlen
));
5021 for (j
= start
; j
<= end
; j
++) {
5024 addReplyBulkLen(c
,vector
[j
].obj
);
5025 addReply(c
,vector
[j
].obj
);
5026 addReply(c
,shared
.crlf
);
5028 listRewind(operations
);
5029 while((ln
= listYield(operations
))) {
5030 redisSortOperation
*sop
= ln
->value
;
5031 robj
*val
= lookupKeyByPattern(c
->db
,sop
->pattern
,
5034 if (sop
->type
== REDIS_SORT_GET
) {
5035 if (!val
|| val
->type
!= REDIS_STRING
) {
5036 addReply(c
,shared
.nullbulk
);
5038 addReplyBulkLen(c
,val
);
5040 addReply(c
,shared
.crlf
);
5043 redisAssert(sop
->type
== REDIS_SORT_GET
); /* always fails */
5048 robj
*listObject
= createListObject();
5049 list
*listPtr
= (list
*) listObject
->ptr
;
5051 /* STORE option specified, set the sorting result as a List object */
5052 for (j
= start
; j
<= end
; j
++) {
5055 listAddNodeTail(listPtr
,vector
[j
].obj
);
5056 incrRefCount(vector
[j
].obj
);
5058 listRewind(operations
);
5059 while((ln
= listYield(operations
))) {
5060 redisSortOperation
*sop
= ln
->value
;
5061 robj
*val
= lookupKeyByPattern(c
->db
,sop
->pattern
,
5064 if (sop
->type
== REDIS_SORT_GET
) {
5065 if (!val
|| val
->type
!= REDIS_STRING
) {
5066 listAddNodeTail(listPtr
,createStringObject("",0));
5068 listAddNodeTail(listPtr
,val
);
5072 redisAssert(sop
->type
== REDIS_SORT_GET
); /* always fails */
5076 if (dictReplace(c
->db
->dict
,storekey
,listObject
)) {
5077 incrRefCount(storekey
);
5079 /* Note: we add 1 because the DB is dirty anyway since even if the
5080 * SORT result is empty a new key is set and maybe the old content
5082 server
.dirty
+= 1+outputlen
;
5083 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",outputlen
));
5087 decrRefCount(sortval
);
5088 listRelease(operations
);
5089 for (j
= 0; j
< vectorlen
; j
++) {
5090 if (sortby
&& alpha
&& vector
[j
].u
.cmpobj
)
5091 decrRefCount(vector
[j
].u
.cmpobj
);
5096 /* Create the string returned by the INFO command. This is decoupled
5097 * by the INFO command itself as we need to report the same information
5098 * on memory corruption problems. */
5099 static sds
genRedisInfoString(void) {
5101 time_t uptime
= time(NULL
)-server
.stat_starttime
;
5104 info
= sdscatprintf(sdsempty(),
5105 "redis_version:%s\r\n"
5107 "multiplexing_api:%s\r\n"
5108 "uptime_in_seconds:%ld\r\n"
5109 "uptime_in_days:%ld\r\n"
5110 "connected_clients:%d\r\n"
5111 "connected_slaves:%d\r\n"
5112 "used_memory:%zu\r\n"
5113 "changes_since_last_save:%lld\r\n"
5114 "bgsave_in_progress:%d\r\n"
5115 "last_save_time:%ld\r\n"
5116 "bgrewriteaof_in_progress:%d\r\n"
5117 "total_connections_received:%lld\r\n"
5118 "total_commands_processed:%lld\r\n"
5121 (sizeof(long) == 8) ? "64" : "32",
5125 listLength(server
.clients
)-listLength(server
.slaves
),
5126 listLength(server
.slaves
),
5129 server
.bgsavechildpid
!= -1,
5131 server
.bgrewritechildpid
!= -1,
5132 server
.stat_numconnections
,
5133 server
.stat_numcommands
,
5134 server
.masterhost
== NULL
? "master" : "slave"
5136 if (server
.masterhost
) {
5137 info
= sdscatprintf(info
,
5138 "master_host:%s\r\n"
5139 "master_port:%d\r\n"
5140 "master_link_status:%s\r\n"
5141 "master_last_io_seconds_ago:%d\r\n"
5144 (server
.replstate
== REDIS_REPL_CONNECTED
) ?
5146 server
.master
? ((int)(time(NULL
)-server
.master
->lastinteraction
)) : -1
5149 for (j
= 0; j
< server
.dbnum
; j
++) {
5150 long long keys
, vkeys
;
5152 keys
= dictSize(server
.db
[j
].dict
);
5153 vkeys
= dictSize(server
.db
[j
].expires
);
5154 if (keys
|| vkeys
) {
5155 info
= sdscatprintf(info
, "db%d:keys=%lld,expires=%lld\r\n",
5162 static void infoCommand(redisClient
*c
) {
5163 sds info
= genRedisInfoString();
5164 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n",
5165 (unsigned long)sdslen(info
)));
5166 addReplySds(c
,info
);
5167 addReply(c
,shared
.crlf
);
5170 static void monitorCommand(redisClient
*c
) {
5171 /* ignore MONITOR if aleady slave or in monitor mode */
5172 if (c
->flags
& REDIS_SLAVE
) return;
5174 c
->flags
|= (REDIS_SLAVE
|REDIS_MONITOR
);
5176 listAddNodeTail(server
.monitors
,c
);
5177 addReply(c
,shared
.ok
);
5180 /* ================================= Expire ================================= */
5181 static int removeExpire(redisDb
*db
, robj
*key
) {
5182 if (dictDelete(db
->expires
,key
) == DICT_OK
) {
5189 static int setExpire(redisDb
*db
, robj
*key
, time_t when
) {
5190 if (dictAdd(db
->expires
,key
,(void*)when
) == DICT_ERR
) {
5198 /* Return the expire time of the specified key, or -1 if no expire
5199 * is associated with this key (i.e. the key is non volatile) */
5200 static time_t getExpire(redisDb
*db
, robj
*key
) {
5203 /* No expire? return ASAP */
5204 if (dictSize(db
->expires
) == 0 ||
5205 (de
= dictFind(db
->expires
,key
)) == NULL
) return -1;
5207 return (time_t) dictGetEntryVal(de
);
5210 static int expireIfNeeded(redisDb
*db
, robj
*key
) {
5214 /* No expire? return ASAP */
5215 if (dictSize(db
->expires
) == 0 ||
5216 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
5218 /* Lookup the expire */
5219 when
= (time_t) dictGetEntryVal(de
);
5220 if (time(NULL
) <= when
) return 0;
5222 /* Delete the key */
5223 dictDelete(db
->expires
,key
);
5224 return dictDelete(db
->dict
,key
) == DICT_OK
;
5227 static int deleteIfVolatile(redisDb
*db
, robj
*key
) {
5230 /* No expire? return ASAP */
5231 if (dictSize(db
->expires
) == 0 ||
5232 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
5234 /* Delete the key */
5236 dictDelete(db
->expires
,key
);
5237 return dictDelete(db
->dict
,key
) == DICT_OK
;
5240 static void expireGenericCommand(redisClient
*c
, robj
*key
, time_t seconds
) {
5243 de
= dictFind(c
->db
->dict
,key
);
5245 addReply(c
,shared
.czero
);
5249 if (deleteKey(c
->db
,key
)) server
.dirty
++;
5250 addReply(c
, shared
.cone
);
5253 time_t when
= time(NULL
)+seconds
;
5254 if (setExpire(c
->db
,key
,when
)) {
5255 addReply(c
,shared
.cone
);
5258 addReply(c
,shared
.czero
);
5264 static void expireCommand(redisClient
*c
) {
5265 expireGenericCommand(c
,c
->argv
[1],strtol(c
->argv
[2]->ptr
,NULL
,10));
5268 static void expireatCommand(redisClient
*c
) {
5269 expireGenericCommand(c
,c
->argv
[1],strtol(c
->argv
[2]->ptr
,NULL
,10)-time(NULL
));
5272 static void ttlCommand(redisClient
*c
) {
5276 expire
= getExpire(c
->db
,c
->argv
[1]);
5278 ttl
= (int) (expire
-time(NULL
));
5279 if (ttl
< 0) ttl
= -1;
5281 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",ttl
));
5284 /* =============================== Replication ============================= */
5286 static int syncWrite(int fd
, char *ptr
, ssize_t size
, int timeout
) {
5287 ssize_t nwritten
, ret
= size
;
5288 time_t start
= time(NULL
);
5292 if (aeWait(fd
,AE_WRITABLE
,1000) & AE_WRITABLE
) {
5293 nwritten
= write(fd
,ptr
,size
);
5294 if (nwritten
== -1) return -1;
5298 if ((time(NULL
)-start
) > timeout
) {
5306 static int syncRead(int fd
, char *ptr
, ssize_t size
, int timeout
) {
5307 ssize_t nread
, totread
= 0;
5308 time_t start
= time(NULL
);
5312 if (aeWait(fd
,AE_READABLE
,1000) & AE_READABLE
) {
5313 nread
= read(fd
,ptr
,size
);
5314 if (nread
== -1) return -1;
5319 if ((time(NULL
)-start
) > timeout
) {
5327 static int syncReadLine(int fd
, char *ptr
, ssize_t size
, int timeout
) {
5334 if (syncRead(fd
,&c
,1,timeout
) == -1) return -1;
5337 if (nread
&& *(ptr
-1) == '\r') *(ptr
-1) = '\0';
5348 static void syncCommand(redisClient
*c
) {
5349 /* ignore SYNC if aleady slave or in monitor mode */
5350 if (c
->flags
& REDIS_SLAVE
) return;
5352 /* SYNC can't be issued when the server has pending data to send to
5353 * the client about already issued commands. We need a fresh reply
5354 * buffer registering the differences between the BGSAVE and the current
5355 * dataset, so that we can copy to other slaves if needed. */
5356 if (listLength(c
->reply
) != 0) {
5357 addReplySds(c
,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
5361 redisLog(REDIS_NOTICE
,"Slave ask for synchronization");
5362 /* Here we need to check if there is a background saving operation
5363 * in progress, or if it is required to start one */
5364 if (server
.bgsavechildpid
!= -1) {
5365 /* Ok a background save is in progress. Let's check if it is a good
5366 * one for replication, i.e. if there is another slave that is
5367 * registering differences since the server forked to save */
5371 listRewind(server
.slaves
);
5372 while((ln
= listYield(server
.slaves
))) {
5374 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) break;
5377 /* Perfect, the server is already registering differences for
5378 * another slave. Set the right state, and copy the buffer. */
5379 listRelease(c
->reply
);
5380 c
->reply
= listDup(slave
->reply
);
5381 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
5382 redisLog(REDIS_NOTICE
,"Waiting for end of BGSAVE for SYNC");
5384 /* No way, we need to wait for the next BGSAVE in order to
5385 * register differences */
5386 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_START
;
5387 redisLog(REDIS_NOTICE
,"Waiting for next BGSAVE for SYNC");
5390 /* Ok we don't have a BGSAVE in progress, let's start one */
5391 redisLog(REDIS_NOTICE
,"Starting BGSAVE for SYNC");
5392 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
5393 redisLog(REDIS_NOTICE
,"Replication failed, can't BGSAVE");
5394 addReplySds(c
,sdsnew("-ERR Unalbe to perform background save\r\n"));
5397 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
5400 c
->flags
|= REDIS_SLAVE
;
5402 listAddNodeTail(server
.slaves
,c
);
5406 static void sendBulkToSlave(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
5407 redisClient
*slave
= privdata
;
5409 REDIS_NOTUSED(mask
);
5410 char buf
[REDIS_IOBUF_LEN
];
5411 ssize_t nwritten
, buflen
;
5413 if (slave
->repldboff
== 0) {
5414 /* Write the bulk write count before to transfer the DB. In theory here
5415 * we don't know how much room there is in the output buffer of the
5416 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
5417 * operations) will never be smaller than the few bytes we need. */
5420 bulkcount
= sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
5422 if (write(fd
,bulkcount
,sdslen(bulkcount
)) != (signed)sdslen(bulkcount
))
5430 lseek(slave
->repldbfd
,slave
->repldboff
,SEEK_SET
);
5431 buflen
= read(slave
->repldbfd
,buf
,REDIS_IOBUF_LEN
);
5433 redisLog(REDIS_WARNING
,"Read error sending DB to slave: %s",
5434 (buflen
== 0) ? "premature EOF" : strerror(errno
));
5438 if ((nwritten
= write(fd
,buf
,buflen
)) == -1) {
5439 redisLog(REDIS_DEBUG
,"Write error sending DB to slave: %s",
5444 slave
->repldboff
+= nwritten
;
5445 if (slave
->repldboff
== slave
->repldbsize
) {
5446 close(slave
->repldbfd
);
5447 slave
->repldbfd
= -1;
5448 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
5449 slave
->replstate
= REDIS_REPL_ONLINE
;
5450 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
,
5451 sendReplyToClient
, slave
) == AE_ERR
) {
5455 addReplySds(slave
,sdsempty());
5456 redisLog(REDIS_NOTICE
,"Synchronization with slave succeeded");
5460 /* This function is called at the end of every backgrond saving.
5461 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
5462 * otherwise REDIS_ERR is passed to the function.
5464 * The goal of this function is to handle slaves waiting for a successful
5465 * background saving in order to perform non-blocking synchronization. */
5466 static void updateSlavesWaitingBgsave(int bgsaveerr
) {
5468 int startbgsave
= 0;
5470 listRewind(server
.slaves
);
5471 while((ln
= listYield(server
.slaves
))) {
5472 redisClient
*slave
= ln
->value
;
5474 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) {
5476 slave
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
5477 } else if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) {
5478 struct redis_stat buf
;
5480 if (bgsaveerr
!= REDIS_OK
) {
5482 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE child returned an error");
5485 if ((slave
->repldbfd
= open(server
.dbfilename
,O_RDONLY
)) == -1 ||
5486 redis_fstat(slave
->repldbfd
,&buf
) == -1) {
5488 redisLog(REDIS_WARNING
,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno
));
5491 slave
->repldboff
= 0;
5492 slave
->repldbsize
= buf
.st_size
;
5493 slave
->replstate
= REDIS_REPL_SEND_BULK
;
5494 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
5495 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
, sendBulkToSlave
, slave
) == AE_ERR
) {
5502 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
5503 listRewind(server
.slaves
);
5504 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE failed");
5505 while((ln
= listYield(server
.slaves
))) {
5506 redisClient
*slave
= ln
->value
;
5508 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
)
5515 static int syncWithMaster(void) {
5516 char buf
[1024], tmpfile
[256], authcmd
[1024];
5518 int fd
= anetTcpConnect(NULL
,server
.masterhost
,server
.masterport
);
5522 redisLog(REDIS_WARNING
,"Unable to connect to MASTER: %s",
5527 /* AUTH with the master if required. */
5528 if(server
.masterauth
) {
5529 snprintf(authcmd
, 1024, "AUTH %s\r\n", server
.masterauth
);
5530 if (syncWrite(fd
, authcmd
, strlen(server
.masterauth
)+7, 5) == -1) {
5532 redisLog(REDIS_WARNING
,"Unable to AUTH to MASTER: %s",
5536 /* Read the AUTH result. */
5537 if (syncReadLine(fd
,buf
,1024,3600) == -1) {
5539 redisLog(REDIS_WARNING
,"I/O error reading auth result from MASTER: %s",
5543 if (buf
[0] != '+') {
5545 redisLog(REDIS_WARNING
,"Cannot AUTH to MASTER, is the masterauth password correct?");
5550 /* Issue the SYNC command */
5551 if (syncWrite(fd
,"SYNC \r\n",7,5) == -1) {
5553 redisLog(REDIS_WARNING
,"I/O error writing to MASTER: %s",
5557 /* Read the bulk write count */
5558 if (syncReadLine(fd
,buf
,1024,3600) == -1) {
5560 redisLog(REDIS_WARNING
,"I/O error reading bulk count from MASTER: %s",
5564 if (buf
[0] != '$') {
5566 redisLog(REDIS_WARNING
,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
5569 dumpsize
= atoi(buf
+1);
5570 redisLog(REDIS_NOTICE
,"Receiving %d bytes data dump from MASTER",dumpsize
);
5571 /* Read the bulk write data on a temp file */
5572 snprintf(tmpfile
,256,"temp-%d.%ld.rdb",(int)time(NULL
),(long int)random());
5573 dfd
= open(tmpfile
,O_CREAT
|O_WRONLY
,0644);
5576 redisLog(REDIS_WARNING
,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno
));
5580 int nread
, nwritten
;
5582 nread
= read(fd
,buf
,(dumpsize
< 1024)?dumpsize
:1024);
5584 redisLog(REDIS_WARNING
,"I/O error trying to sync with MASTER: %s",
5590 nwritten
= write(dfd
,buf
,nread
);
5591 if (nwritten
== -1) {
5592 redisLog(REDIS_WARNING
,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno
));
5600 if (rename(tmpfile
,server
.dbfilename
) == -1) {
5601 redisLog(REDIS_WARNING
,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno
));
5607 if (rdbLoad(server
.dbfilename
) != REDIS_OK
) {
5608 redisLog(REDIS_WARNING
,"Failed trying to load the MASTER synchronization DB from disk");
5612 server
.master
= createClient(fd
);
5613 server
.master
->flags
|= REDIS_MASTER
;
5614 server
.master
->authenticated
= 1;
5615 server
.replstate
= REDIS_REPL_CONNECTED
;
5619 static void slaveofCommand(redisClient
*c
) {
5620 if (!strcasecmp(c
->argv
[1]->ptr
,"no") &&
5621 !strcasecmp(c
->argv
[2]->ptr
,"one")) {
5622 if (server
.masterhost
) {
5623 sdsfree(server
.masterhost
);
5624 server
.masterhost
= NULL
;
5625 if (server
.master
) freeClient(server
.master
);
5626 server
.replstate
= REDIS_REPL_NONE
;
5627 redisLog(REDIS_NOTICE
,"MASTER MODE enabled (user request)");
5630 sdsfree(server
.masterhost
);
5631 server
.masterhost
= sdsdup(c
->argv
[1]->ptr
);
5632 server
.masterport
= atoi(c
->argv
[2]->ptr
);
5633 if (server
.master
) freeClient(server
.master
);
5634 server
.replstate
= REDIS_REPL_CONNECT
;
5635 redisLog(REDIS_NOTICE
,"SLAVE OF %s:%d enabled (user request)",
5636 server
.masterhost
, server
.masterport
);
5638 addReply(c
,shared
.ok
);
5641 /* ============================ Maxmemory directive ======================== */
5643 /* This function gets called when 'maxmemory' is set on the config file to limit
5644 * the max memory used by the server, and we are out of memory.
5645 * This function will try to, in order:
5647 * - Free objects from the free list
5648 * - Try to remove keys with an EXPIRE set
5650 * It is not possible to free enough memory to reach used-memory < maxmemory
5651 * the server will start refusing commands that will enlarge even more the
5654 static void freeMemoryIfNeeded(void) {
5655 while (server
.maxmemory
&& zmalloc_used_memory() > server
.maxmemory
) {
5656 if (listLength(server
.objfreelist
)) {
5659 listNode
*head
= listFirst(server
.objfreelist
);
5660 o
= listNodeValue(head
);
5661 listDelNode(server
.objfreelist
,head
);
5664 int j
, k
, freed
= 0;
5666 for (j
= 0; j
< server
.dbnum
; j
++) {
5668 robj
*minkey
= NULL
;
5669 struct dictEntry
*de
;
5671 if (dictSize(server
.db
[j
].expires
)) {
5673 /* From a sample of three keys drop the one nearest to
5674 * the natural expire */
5675 for (k
= 0; k
< 3; k
++) {
5678 de
= dictGetRandomKey(server
.db
[j
].expires
);
5679 t
= (time_t) dictGetEntryVal(de
);
5680 if (minttl
== -1 || t
< minttl
) {
5681 minkey
= dictGetEntryKey(de
);
5685 deleteKey(server
.db
+j
,minkey
);
5688 if (!freed
) return; /* nothing to free... */
5693 /* ============================== Append Only file ========================== */
5695 static void feedAppendOnlyFile(struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
) {
5696 sds buf
= sdsempty();
5702 /* The DB this command was targetting is not the same as the last command
5703 * we appendend. To issue a SELECT command is needed. */
5704 if (dictid
!= server
.appendseldb
) {
5707 snprintf(seldb
,sizeof(seldb
),"%d",dictid
);
5708 buf
= sdscatprintf(buf
,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
5709 (unsigned long)strlen(seldb
),seldb
);
5710 server
.appendseldb
= dictid
;
5713 /* "Fix" the argv vector if the command is EXPIRE. We want to translate
5714 * EXPIREs into EXPIREATs calls */
5715 if (cmd
->proc
== expireCommand
) {
5718 tmpargv
[0] = createStringObject("EXPIREAT",8);
5719 tmpargv
[1] = argv
[1];
5720 incrRefCount(argv
[1]);
5721 when
= time(NULL
)+strtol(argv
[2]->ptr
,NULL
,10);
5722 tmpargv
[2] = createObject(REDIS_STRING
,
5723 sdscatprintf(sdsempty(),"%ld",when
));
5727 /* Append the actual command */
5728 buf
= sdscatprintf(buf
,"*%d\r\n",argc
);
5729 for (j
= 0; j
< argc
; j
++) {
5732 o
= getDecodedObject(o
);
5733 buf
= sdscatprintf(buf
,"$%lu\r\n",(unsigned long)sdslen(o
->ptr
));
5734 buf
= sdscatlen(buf
,o
->ptr
,sdslen(o
->ptr
));
5735 buf
= sdscatlen(buf
,"\r\n",2);
5739 /* Free the objects from the modified argv for EXPIREAT */
5740 if (cmd
->proc
== expireCommand
) {
5741 for (j
= 0; j
< 3; j
++)
5742 decrRefCount(argv
[j
]);
5745 /* We want to perform a single write. This should be guaranteed atomic
5746 * at least if the filesystem we are writing is a real physical one.
5747 * While this will save us against the server being killed I don't think
5748 * there is much to do about the whole server stopping for power problems
5750 nwritten
= write(server
.appendfd
,buf
,sdslen(buf
));
5751 if (nwritten
!= (signed)sdslen(buf
)) {
5752 /* Ooops, we are in troubles. The best thing to do for now is
5753 * to simply exit instead to give the illusion that everything is
5754 * working as expected. */
5755 if (nwritten
== -1) {
5756 redisLog(REDIS_WARNING
,"Exiting on error writing to the append-only file: %s",strerror(errno
));
5758 redisLog(REDIS_WARNING
,"Exiting on short write while writing to the append-only file: %s",strerror(errno
));
5762 /* If a background append only file rewriting is in progress we want to
5763 * accumulate the differences between the child DB and the current one
5764 * in a buffer, so that when the child process will do its work we
5765 * can append the differences to the new append only file. */
5766 if (server
.bgrewritechildpid
!= -1)
5767 server
.bgrewritebuf
= sdscatlen(server
.bgrewritebuf
,buf
,sdslen(buf
));
5771 if (server
.appendfsync
== APPENDFSYNC_ALWAYS
||
5772 (server
.appendfsync
== APPENDFSYNC_EVERYSEC
&&
5773 now
-server
.lastfsync
> 1))
5775 fsync(server
.appendfd
); /* Let's try to get this data on the disk */
5776 server
.lastfsync
= now
;
5780 /* In Redis commands are always executed in the context of a client, so in
5781 * order to load the append only file we need to create a fake client. */
5782 static struct redisClient
*createFakeClient(void) {
5783 struct redisClient
*c
= zmalloc(sizeof(*c
));
5787 c
->querybuf
= sdsempty();
5791 /* We set the fake client as a slave waiting for the synchronization
5792 * so that Redis will not try to send replies to this client. */
5793 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_START
;
5794 c
->reply
= listCreate();
5795 listSetFreeMethod(c
->reply
,decrRefCount
);
5796 listSetDupMethod(c
->reply
,dupClientReplyValue
);
5800 static void freeFakeClient(struct redisClient
*c
) {
5801 sdsfree(c
->querybuf
);
5802 listRelease(c
->reply
);
5806 /* Replay the append log file. On error REDIS_OK is returned. On non fatal
5807 * error (the append only file is zero-length) REDIS_ERR is returned. On
5808 * fatal error an error message is logged and the program exists. */
5809 int loadAppendOnlyFile(char *filename
) {
5810 struct redisClient
*fakeClient
;
5811 FILE *fp
= fopen(filename
,"r");
5812 struct redis_stat sb
;
5814 if (redis_fstat(fileno(fp
),&sb
) != -1 && sb
.st_size
== 0)
5818 redisLog(REDIS_WARNING
,"Fatal error: can't open the append log file for reading: %s",strerror(errno
));
5822 fakeClient
= createFakeClient();
5829 struct redisCommand
*cmd
;
5831 if (fgets(buf
,sizeof(buf
),fp
) == NULL
) {
5837 if (buf
[0] != '*') goto fmterr
;
5839 argv
= zmalloc(sizeof(robj
*)*argc
);
5840 for (j
= 0; j
< argc
; j
++) {
5841 if (fgets(buf
,sizeof(buf
),fp
) == NULL
) goto readerr
;
5842 if (buf
[0] != '$') goto fmterr
;
5843 len
= strtol(buf
+1,NULL
,10);
5844 argsds
= sdsnewlen(NULL
,len
);
5845 if (len
&& fread(argsds
,len
,1,fp
) == 0) goto fmterr
;
5846 argv
[j
] = createObject(REDIS_STRING
,argsds
);
5847 if (fread(buf
,2,1,fp
) == 0) goto fmterr
; /* discard CRLF */
5850 /* Command lookup */
5851 cmd
= lookupCommand(argv
[0]->ptr
);
5853 redisLog(REDIS_WARNING
,"Unknown command '%s' reading the append only file", argv
[0]->ptr
);
5856 /* Try object sharing and encoding */
5857 if (server
.shareobjects
) {
5859 for(j
= 1; j
< argc
; j
++)
5860 argv
[j
] = tryObjectSharing(argv
[j
]);
5862 if (cmd
->flags
& REDIS_CMD_BULK
)
5863 tryObjectEncoding(argv
[argc
-1]);
5864 /* Run the command in the context of a fake client */
5865 fakeClient
->argc
= argc
;
5866 fakeClient
->argv
= argv
;
5867 cmd
->proc(fakeClient
);
5868 /* Discard the reply objects list from the fake client */
5869 while(listLength(fakeClient
->reply
))
5870 listDelNode(fakeClient
->reply
,listFirst(fakeClient
->reply
));
5871 /* Clean up, ready for the next command */
5872 for (j
= 0; j
< argc
; j
++) decrRefCount(argv
[j
]);
5876 freeFakeClient(fakeClient
);
5881 redisLog(REDIS_WARNING
,"Unexpected end of file reading the append only file");
5883 redisLog(REDIS_WARNING
,"Unrecoverable error reading the append only file: %s", strerror(errno
));
5887 redisLog(REDIS_WARNING
,"Bad file format reading the append only file");
5891 /* Write an object into a file in the bulk format $<count>\r\n<payload>\r\n */
5892 static int fwriteBulk(FILE *fp
, robj
*obj
) {
5894 obj
= getDecodedObject(obj
);
5895 snprintf(buf
,sizeof(buf
),"$%ld\r\n",(long)sdslen(obj
->ptr
));
5896 if (fwrite(buf
,strlen(buf
),1,fp
) == 0) goto err
;
5897 if (sdslen(obj
->ptr
) && fwrite(obj
->ptr
,sdslen(obj
->ptr
),1,fp
) == 0)
5899 if (fwrite("\r\n",2,1,fp
) == 0) goto err
;
5907 /* Write a double value in bulk format $<count>\r\n<payload>\r\n */
5908 static int fwriteBulkDouble(FILE *fp
, double d
) {
5909 char buf
[128], dbuf
[128];
5911 snprintf(dbuf
,sizeof(dbuf
),"%.17g\r\n",d
);
5912 snprintf(buf
,sizeof(buf
),"$%lu\r\n",(unsigned long)strlen(dbuf
)-2);
5913 if (fwrite(buf
,strlen(buf
),1,fp
) == 0) return 0;
5914 if (fwrite(dbuf
,strlen(dbuf
),1,fp
) == 0) return 0;
5918 /* Write a long value in bulk format $<count>\r\n<payload>\r\n */
5919 static int fwriteBulkLong(FILE *fp
, long l
) {
5920 char buf
[128], lbuf
[128];
5922 snprintf(lbuf
,sizeof(lbuf
),"%ld\r\n",l
);
5923 snprintf(buf
,sizeof(buf
),"$%lu\r\n",(unsigned long)strlen(lbuf
)-2);
5924 if (fwrite(buf
,strlen(buf
),1,fp
) == 0) return 0;
5925 if (fwrite(lbuf
,strlen(lbuf
),1,fp
) == 0) return 0;
5929 /* Write a sequence of commands able to fully rebuild the dataset into
5930 * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */
5931 static int rewriteAppendOnlyFile(char *filename
) {
5932 dictIterator
*di
= NULL
;
5937 time_t now
= time(NULL
);
5939 /* Note that we have to use a different temp name here compared to the
5940 * one used by rewriteAppendOnlyFileBackground() function. */
5941 snprintf(tmpfile
,256,"temp-rewriteaof-%d.aof", (int) getpid());
5942 fp
= fopen(tmpfile
,"w");
5944 redisLog(REDIS_WARNING
, "Failed rewriting the append only file: %s", strerror(errno
));
5947 for (j
= 0; j
< server
.dbnum
; j
++) {
5948 char selectcmd
[] = "*2\r\n$6\r\nSELECT\r\n";
5949 redisDb
*db
= server
.db
+j
;
5951 if (dictSize(d
) == 0) continue;
5952 di
= dictGetIterator(d
);
5958 /* SELECT the new DB */
5959 if (fwrite(selectcmd
,sizeof(selectcmd
)-1,1,fp
) == 0) goto werr
;
5960 if (fwriteBulkLong(fp
,j
) == 0) goto werr
;
5962 /* Iterate this DB writing every entry */
5963 while((de
= dictNext(di
)) != NULL
) {
5964 robj
*key
= dictGetEntryKey(de
);
5965 robj
*o
= dictGetEntryVal(de
);
5966 time_t expiretime
= getExpire(db
,key
);
5968 /* Save the key and associated value */
5969 if (o
->type
== REDIS_STRING
) {
5970 /* Emit a SET command */
5971 char cmd
[]="*3\r\n$3\r\nSET\r\n";
5972 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
5974 if (fwriteBulk(fp
,key
) == 0) goto werr
;
5975 if (fwriteBulk(fp
,o
) == 0) goto werr
;
5976 } else if (o
->type
== REDIS_LIST
) {
5977 /* Emit the RPUSHes needed to rebuild the list */
5978 list
*list
= o
->ptr
;
5982 while((ln
= listYield(list
))) {
5983 char cmd
[]="*3\r\n$5\r\nRPUSH\r\n";
5984 robj
*eleobj
= listNodeValue(ln
);
5986 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
5987 if (fwriteBulk(fp
,key
) == 0) goto werr
;
5988 if (fwriteBulk(fp
,eleobj
) == 0) goto werr
;
5990 } else if (o
->type
== REDIS_SET
) {
5991 /* Emit the SADDs needed to rebuild the set */
5993 dictIterator
*di
= dictGetIterator(set
);
5996 while((de
= dictNext(di
)) != NULL
) {
5997 char cmd
[]="*3\r\n$4\r\nSADD\r\n";
5998 robj
*eleobj
= dictGetEntryKey(de
);
6000 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6001 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6002 if (fwriteBulk(fp
,eleobj
) == 0) goto werr
;
6004 dictReleaseIterator(di
);
6005 } else if (o
->type
== REDIS_ZSET
) {
6006 /* Emit the ZADDs needed to rebuild the sorted set */
6008 dictIterator
*di
= dictGetIterator(zs
->dict
);
6011 while((de
= dictNext(di
)) != NULL
) {
6012 char cmd
[]="*4\r\n$4\r\nZADD\r\n";
6013 robj
*eleobj
= dictGetEntryKey(de
);
6014 double *score
= dictGetEntryVal(de
);
6016 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6017 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6018 if (fwriteBulkDouble(fp
,*score
) == 0) goto werr
;
6019 if (fwriteBulk(fp
,eleobj
) == 0) goto werr
;
6021 dictReleaseIterator(di
);
6023 redisAssert(0 != 0);
6025 /* Save the expire time */
6026 if (expiretime
!= -1) {
6027 char cmd
[]="*3\r\n$8\r\nEXPIREAT\r\n";
6028 /* If this key is already expired skip it */
6029 if (expiretime
< now
) continue;
6030 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6031 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6032 if (fwriteBulkLong(fp
,expiretime
) == 0) goto werr
;
6035 dictReleaseIterator(di
);
6038 /* Make sure data will not remain on the OS's output buffers */
6043 /* Use RENAME to make sure the DB file is changed atomically only
6044 * if the generate DB file is ok. */
6045 if (rename(tmpfile
,filename
) == -1) {
6046 redisLog(REDIS_WARNING
,"Error moving temp append only file on the final destination: %s", strerror(errno
));
6050 redisLog(REDIS_NOTICE
,"SYNC append only file rewrite performed");
6056 redisLog(REDIS_WARNING
,"Write error writing append only file on disk: %s", strerror(errno
));
6057 if (di
) dictReleaseIterator(di
);
6061 /* This is how rewriting of the append only file in background works:
6063 * 1) The user calls BGREWRITEAOF
6064 * 2) Redis calls this function, that forks():
6065 * 2a) the child rewrite the append only file in a temp file.
6066 * 2b) the parent accumulates differences in server.bgrewritebuf.
6067 * 3) When the child finished '2a' exists.
6068 * 4) The parent will trap the exit code, if it's OK, will append the
6069 * data accumulated into server.bgrewritebuf into the temp file, and
6070 * finally will rename(2) the temp file in the actual file name.
6071 * The the new file is reopened as the new append only file. Profit!
6073 static int rewriteAppendOnlyFileBackground(void) {
6076 if (server
.bgrewritechildpid
!= -1) return REDIS_ERR
;
6077 if ((childpid
= fork()) == 0) {
6082 snprintf(tmpfile
,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
6083 if (rewriteAppendOnlyFile(tmpfile
) == REDIS_OK
) {
6090 if (childpid
== -1) {
6091 redisLog(REDIS_WARNING
,
6092 "Can't rewrite append only file in background: fork: %s",
6096 redisLog(REDIS_NOTICE
,
6097 "Background append only file rewriting started by pid %d",childpid
);
6098 server
.bgrewritechildpid
= childpid
;
6099 /* We set appendseldb to -1 in order to force the next call to the
6100 * feedAppendOnlyFile() to issue a SELECT command, so the differences
6101 * accumulated by the parent into server.bgrewritebuf will start
6102 * with a SELECT statement and it will be safe to merge. */
6103 server
.appendseldb
= -1;
6106 return REDIS_OK
; /* unreached */
6109 static void bgrewriteaofCommand(redisClient
*c
) {
6110 if (server
.bgrewritechildpid
!= -1) {
6111 addReplySds(c
,sdsnew("-ERR background append only file rewriting already in progress\r\n"));
6114 if (rewriteAppendOnlyFileBackground() == REDIS_OK
) {
6115 char *status
= "+Background append only file rewriting started\r\n";
6116 addReplySds(c
,sdsnew(status
));
6118 addReply(c
,shared
.err
);
6122 static void aofRemoveTempFile(pid_t childpid
) {
6125 snprintf(tmpfile
,256,"temp-rewriteaof-bg-%d.aof", (int) childpid
);
6129 /* ================================= Debugging ============================== */
6131 static void debugCommand(redisClient
*c
) {
6132 if (!strcasecmp(c
->argv
[1]->ptr
,"segfault")) {
6134 } else if (!strcasecmp(c
->argv
[1]->ptr
,"reload")) {
6135 if (rdbSave(server
.dbfilename
) != REDIS_OK
) {
6136 addReply(c
,shared
.err
);
6140 if (rdbLoad(server
.dbfilename
) != REDIS_OK
) {
6141 addReply(c
,shared
.err
);
6144 redisLog(REDIS_WARNING
,"DB reloaded by DEBUG RELOAD");
6145 addReply(c
,shared
.ok
);
6146 } else if (!strcasecmp(c
->argv
[1]->ptr
,"loadaof")) {
6148 if (loadAppendOnlyFile(server
.appendfilename
) != REDIS_OK
) {
6149 addReply(c
,shared
.err
);
6152 redisLog(REDIS_WARNING
,"Append Only File loaded by DEBUG LOADAOF");
6153 addReply(c
,shared
.ok
);
6154 } else if (!strcasecmp(c
->argv
[1]->ptr
,"object") && c
->argc
== 3) {
6155 dictEntry
*de
= dictFind(c
->db
->dict
,c
->argv
[2]);
6159 addReply(c
,shared
.nokeyerr
);
6162 key
= dictGetEntryKey(de
);
6163 val
= dictGetEntryVal(de
);
6164 addReplySds(c
,sdscatprintf(sdsempty(),
6165 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
6166 (void*)key
, key
->refcount
, (void*)val
, val
->refcount
,
6169 addReplySds(c
,sdsnew(
6170 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>|RELOAD]\r\n"));
6174 static void _redisAssert(char *estr
) {
6175 redisLog(REDIS_WARNING
,"=== ASSERTION FAILED ===");
6176 redisLog(REDIS_WARNING
,"==> %s\n",estr
);
6177 #ifdef HAVE_BACKTRACE
6178 redisLog(REDIS_WARNING
,"(forcing SIGSEGV in order to print the stack trace)");
6183 /* =================================== Main! ================================ */
6186 int linuxOvercommitMemoryValue(void) {
6187 FILE *fp
= fopen("/proc/sys/vm/overcommit_memory","r");
6191 if (fgets(buf
,64,fp
) == NULL
) {
6200 void linuxOvercommitMemoryWarning(void) {
6201 if (linuxOvercommitMemoryValue() == 0) {
6202 redisLog(REDIS_WARNING
,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
6205 #endif /* __linux__ */
6207 static void daemonize(void) {
6211 if (fork() != 0) exit(0); /* parent exits */
6212 printf("New pid: %d\n", getpid());
6213 setsid(); /* create a new session */
6215 /* Every output goes to /dev/null. If Redis is daemonized but
6216 * the 'logfile' is set to 'stdout' in the configuration file
6217 * it will not log at all. */
6218 if ((fd
= open("/dev/null", O_RDWR
, 0)) != -1) {
6219 dup2(fd
, STDIN_FILENO
);
6220 dup2(fd
, STDOUT_FILENO
);
6221 dup2(fd
, STDERR_FILENO
);
6222 if (fd
> STDERR_FILENO
) close(fd
);
6224 /* Try to write the pid file */
6225 fp
= fopen(server
.pidfile
,"w");
6227 fprintf(fp
,"%d\n",getpid());
6232 int main(int argc
, char **argv
) {
6235 resetServerSaveParams();
6236 loadServerConfig(argv
[1]);
6237 } else if (argc
> 2) {
6238 fprintf(stderr
,"Usage: ./redis-server [/path/to/redis.conf]\n");
6241 redisLog(REDIS_WARNING
,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
6243 if (server
.daemonize
) daemonize();
6245 redisLog(REDIS_NOTICE
,"Server started, Redis version " REDIS_VERSION
);
6247 linuxOvercommitMemoryWarning();
6249 if (server
.appendonly
) {
6250 if (loadAppendOnlyFile(server
.appendfilename
) == REDIS_OK
)
6251 redisLog(REDIS_NOTICE
,"DB loaded from append only file");
6253 if (rdbLoad(server
.dbfilename
) == REDIS_OK
)
6254 redisLog(REDIS_NOTICE
,"DB loaded from disk");
6256 if (aeCreateFileEvent(server
.el
, server
.fd
, AE_READABLE
,
6257 acceptHandler
, NULL
) == AE_ERR
) oom("creating file event");
6258 redisLog(REDIS_NOTICE
,"The server is now ready to accept connections on port %d", server
.port
);
6260 aeDeleteEventLoop(server
.el
);
6264 /* ============================= Backtrace support ========================= */
6266 #ifdef HAVE_BACKTRACE
6267 static char *findFuncName(void *pointer
, unsigned long *offset
);
6269 static void *getMcontextEip(ucontext_t
*uc
) {
6270 #if defined(__FreeBSD__)
6271 return (void*) uc
->uc_mcontext
.mc_eip
;
6272 #elif defined(__dietlibc__)
6273 return (void*) uc
->uc_mcontext
.eip
;
6274 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
6276 return (void*) uc
->uc_mcontext
->__ss
.__rip
;
6278 return (void*) uc
->uc_mcontext
->__ss
.__eip
;
6280 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
6281 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
6282 return (void*) uc
->uc_mcontext
->__ss
.__rip
;
6284 return (void*) uc
->uc_mcontext
->__ss
.__eip
;
6286 #elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
6287 return (void*) uc
->uc_mcontext
.gregs
[REG_EIP
];
6288 #elif defined(__ia64__) /* Linux IA64 */
6289 return (void*) uc
->uc_mcontext
.sc_ip
;
6295 static void segvHandler(int sig
, siginfo_t
*info
, void *secret
) {
6297 char **messages
= NULL
;
6298 int i
, trace_size
= 0;
6299 unsigned long offset
=0;
6300 ucontext_t
*uc
= (ucontext_t
*) secret
;
6302 REDIS_NOTUSED(info
);
6304 redisLog(REDIS_WARNING
,
6305 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION
, sig
);
6306 infostring
= genRedisInfoString();
6307 redisLog(REDIS_WARNING
, "%s",infostring
);
6308 /* It's not safe to sdsfree() the returned string under memory
6309 * corruption conditions. Let it leak as we are going to abort */
6311 trace_size
= backtrace(trace
, 100);
6312 /* overwrite sigaction with caller's address */
6313 if (getMcontextEip(uc
) != NULL
) {
6314 trace
[1] = getMcontextEip(uc
);
6316 messages
= backtrace_symbols(trace
, trace_size
);
6318 for (i
=1; i
<trace_size
; ++i
) {
6319 char *fn
= findFuncName(trace
[i
], &offset
), *p
;
6321 p
= strchr(messages
[i
],'+');
6322 if (!fn
|| (p
&& ((unsigned long)strtol(p
+1,NULL
,10)) < offset
)) {
6323 redisLog(REDIS_WARNING
,"%s", messages
[i
]);
6325 redisLog(REDIS_WARNING
,"%d redis-server %p %s + %d", i
, trace
[i
], fn
, (unsigned int)offset
);
6328 // free(messages); Don't call free() with possibly corrupted memory.
6332 static void setupSigSegvAction(void) {
6333 struct sigaction act
;
6335 sigemptyset (&act
.sa_mask
);
6336 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
6337 * is used. Otherwise, sa_handler is used */
6338 act
.sa_flags
= SA_NODEFER
| SA_ONSTACK
| SA_RESETHAND
| SA_SIGINFO
;
6339 act
.sa_sigaction
= segvHandler
;
6340 sigaction (SIGSEGV
, &act
, NULL
);
6341 sigaction (SIGBUS
, &act
, NULL
);
6342 sigaction (SIGFPE
, &act
, NULL
);
6343 sigaction (SIGILL
, &act
, NULL
);
6344 sigaction (SIGBUS
, &act
, NULL
);
6348 #include "staticsymbols.h"
6349 /* This function try to convert a pointer into a function name. It's used in
6350 * oreder to provide a backtrace under segmentation fault that's able to
6351 * display functions declared as static (otherwise the backtrace is useless). */
6352 static char *findFuncName(void *pointer
, unsigned long *offset
){
6354 unsigned long off
, minoff
= 0;
6356 /* Try to match against the Symbol with the smallest offset */
6357 for (i
=0; symsTable
[i
].pointer
; i
++) {
6358 unsigned long lp
= (unsigned long) pointer
;
6360 if (lp
!= (unsigned long)-1 && lp
>= symsTable
[i
].pointer
) {
6361 off
=lp
-symsTable
[i
].pointer
;
6362 if (ret
< 0 || off
< minoff
) {
6368 if (ret
== -1) return NULL
;
6370 return symsTable
[ret
].name
;
6372 #else /* HAVE_BACKTRACE */
6373 static void setupSigSegvAction(void) {
6375 #endif /* HAVE_BACKTRACE */