2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
30 #define REDIS_VERSION "1.1.93"
40 #define __USE_POSIX199309
46 #endif /* HAVE_BACKTRACE */
54 #include <arpa/inet.h>
58 #include <sys/resource.h>
64 #include "solarisfixes.h"
68 #include "ae.h" /* Event driven programming library */
69 #include "sds.h" /* Dynamic safe strings */
70 #include "anet.h" /* Networking the easy way */
71 #include "dict.h" /* Hash tables */
72 #include "adlist.h" /* Linked lists */
73 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
74 #include "lzf.h" /* LZF compression library */
75 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
81 /* Static server configuration */
82 #define REDIS_SERVERPORT 6379 /* TCP port */
83 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
84 #define REDIS_IOBUF_LEN 1024
85 #define REDIS_LOADBUF_LEN 1024
86 #define REDIS_STATIC_ARGS 4
87 #define REDIS_DEFAULT_DBNUM 16
88 #define REDIS_CONFIGLINE_MAX 1024
89 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
90 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
91 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
92 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
93 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
95 /* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */
96 #define REDIS_WRITEV_THRESHOLD 3
97 /* Max number of iovecs used for each writev call */
98 #define REDIS_WRITEV_IOVEC_COUNT 256
100 /* Hash table parameters */
101 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
104 #define REDIS_CMD_BULK 1 /* Bulk write command */
105 #define REDIS_CMD_INLINE 2 /* Inline command */
106 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
107 this flags will return an error when the 'maxmemory' option is set in the
108 config file and the server is using more than maxmemory bytes of memory.
109 In short this commands are denied on low memory conditions. */
110 #define REDIS_CMD_DENYOOM 4
113 #define REDIS_STRING 0
119 /* Objects encoding */
120 #define REDIS_ENCODING_RAW 0 /* Raw representation */
121 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
123 /* Object types only used for dumping to disk */
124 #define REDIS_EXPIRETIME 253
125 #define REDIS_SELECTDB 254
126 #define REDIS_EOF 255
128 /* Defines related to the dump file format. To store 32 bits lengths for short
129 * keys requires a lot of space, so we check the most significant 2 bits of
130 * the first byte to interpreter the length:
132 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
133 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
134 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
135 * 11|000000 this means: specially encoded object will follow. The six bits
136 * number specify the kind of object that follows.
137 * See the REDIS_RDB_ENC_* defines.
139 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
140 * values, will fit inside. */
141 #define REDIS_RDB_6BITLEN 0
142 #define REDIS_RDB_14BITLEN 1
143 #define REDIS_RDB_32BITLEN 2
144 #define REDIS_RDB_ENCVAL 3
145 #define REDIS_RDB_LENERR UINT_MAX
147 /* When a length of a string object stored on disk has the first two bits
148 * set, the remaining two bits specify a special encoding for the object
149 * accordingly to the following defines: */
150 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
151 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
152 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
153 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
156 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
157 #define REDIS_SLAVE 2 /* This client is a slave server */
158 #define REDIS_MASTER 4 /* This client is a master server */
159 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
161 /* Slave replication state - slave side */
162 #define REDIS_REPL_NONE 0 /* No active replication */
163 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
164 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
166 /* Slave replication state - from the point of view of master
167 * Note that in SEND_BULK and ONLINE state the slave receives new updates
168 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
169 * to start the next background saving in order to send updates to it. */
170 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
171 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
172 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
173 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
175 /* List related stuff */
179 /* Sort operations */
180 #define REDIS_SORT_GET 0
181 #define REDIS_SORT_ASC 1
182 #define REDIS_SORT_DESC 2
183 #define REDIS_SORTKEY_MAX 1024
186 #define REDIS_DEBUG 0
187 #define REDIS_NOTICE 1
188 #define REDIS_WARNING 2
190 /* Anti-warning macro... */
191 #define REDIS_NOTUSED(V) ((void) V)
193 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
194 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
196 /* Append only defines */
197 #define APPENDFSYNC_NO 0
198 #define APPENDFSYNC_ALWAYS 1
199 #define APPENDFSYNC_EVERYSEC 2
201 /* We can print the stacktrace, so our assert is defined this way: */
202 #define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e),exit(1)))
203 static void _redisAssert(char *estr
);
205 /*================================= Data types ============================== */
207 /* A redis object, that is a type able to hold a string / list / set */
208 typedef struct redisObject
{
211 unsigned char encoding
;
212 unsigned char notused
[2];
216 /* Macro used to initalize a Redis object allocated on the stack.
217 * Note that this macro is taken near the structure definition to make sure
218 * we'll update it when the structure is changed, to avoid bugs like
219 * bug #85 introduced exactly in this way. */
220 #define initStaticStringObject(_var,_ptr) do { \
222 _var.type = REDIS_STRING; \
223 _var.encoding = REDIS_ENCODING_RAW; \
227 typedef struct redisDb
{
233 /* With multiplexing we need to take per-clinet state.
234 * Clients are taken in a liked list. */
235 typedef struct redisClient
{
240 robj
**argv
, **mbargv
;
242 int bulklen
; /* bulk read len. -1 if not in bulk read mode */
243 int multibulk
; /* multi bulk command format active */
246 time_t lastinteraction
; /* time of the last interaction, used for timeout */
247 int flags
; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
248 int slaveseldb
; /* slave selected db, if this client is a slave */
249 int authenticated
; /* when requirepass is non-NULL */
250 int replstate
; /* replication state if this is a slave */
251 int repldbfd
; /* replication DB file descriptor */
252 long repldboff
; /* replication DB file offset */
253 off_t repldbsize
; /* replication DB file size */
261 /* Global server state structure */
267 unsigned int sharingpoolsize
;
268 long long dirty
; /* changes to DB from the last save */
270 list
*slaves
, *monitors
;
271 char neterr
[ANET_ERR_LEN
];
273 int cronloops
; /* number of times the cron function run */
274 list
*objfreelist
; /* A list of freed objects to avoid malloc() */
275 time_t lastsave
; /* Unix time of last save succeeede */
276 size_t usedmemory
; /* Used memory in megabytes */
277 /* Fields used only for stats */
278 time_t stat_starttime
; /* server start time */
279 long long stat_numcommands
; /* number of processed commands */
280 long long stat_numconnections
; /* number of connections received */
293 pid_t bgsavechildpid
;
294 pid_t bgrewritechildpid
;
295 sds bgrewritebuf
; /* buffer taken by parent during oppend only rewrite */
296 struct saveparam
*saveparams
;
301 char *appendfilename
;
305 /* Replication related */
310 redisClient
*master
; /* client that is master for this slave */
312 unsigned int maxclients
;
313 unsigned long maxmemory
;
314 /* Sort parameters - qsort_r() is only available under BSD so we
315 * have to take this state global, in order to pass it to sortCompare() */
321 typedef void redisCommandProc(redisClient
*c
);
322 struct redisCommand
{
324 redisCommandProc
*proc
;
329 struct redisFunctionSym
{
331 unsigned long pointer
;
334 typedef struct _redisSortObject
{
342 typedef struct _redisSortOperation
{
345 } redisSortOperation
;
347 /* ZSETs use a specialized version of Skiplists */
349 typedef struct zskiplistNode
{
350 struct zskiplistNode
**forward
;
351 struct zskiplistNode
*backward
;
356 typedef struct zskiplist
{
357 struct zskiplistNode
*header
, *tail
;
358 unsigned long length
;
362 typedef struct zset
{
367 /* Our shared "common" objects */
369 struct sharedObjectsStruct
{
370 robj
*crlf
, *ok
, *err
, *emptybulk
, *czero
, *cone
, *pong
, *space
,
371 *colon
, *nullbulk
, *nullmultibulk
,
372 *emptymultibulk
, *wrongtypeerr
, *nokeyerr
, *syntaxerr
, *sameobjecterr
,
373 *outofrangeerr
, *plus
,
374 *select0
, *select1
, *select2
, *select3
, *select4
,
375 *select5
, *select6
, *select7
, *select8
, *select9
;
378 /* Global vars that are actally used as constants. The following double
379 * values are used for double on-disk serialization, and are initialized
380 * at runtime to avoid strange compiler optimizations. */
382 static double R_Zero
, R_PosInf
, R_NegInf
, R_Nan
;
384 /*================================ Prototypes =============================== */
386 static void freeStringObject(robj
*o
);
387 static void freeListObject(robj
*o
);
388 static void freeSetObject(robj
*o
);
389 static void decrRefCount(void *o
);
390 static robj
*createObject(int type
, void *ptr
);
391 static void freeClient(redisClient
*c
);
392 static int rdbLoad(char *filename
);
393 static void addReply(redisClient
*c
, robj
*obj
);
394 static void addReplySds(redisClient
*c
, sds s
);
395 static void incrRefCount(robj
*o
);
396 static int rdbSaveBackground(char *filename
);
397 static robj
*createStringObject(char *ptr
, size_t len
);
398 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
);
399 static void feedAppendOnlyFile(struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
);
400 static int syncWithMaster(void);
401 static robj
*tryObjectSharing(robj
*o
);
402 static int tryObjectEncoding(robj
*o
);
403 static robj
*getDecodedObject(robj
*o
);
404 static int removeExpire(redisDb
*db
, robj
*key
);
405 static int expireIfNeeded(redisDb
*db
, robj
*key
);
406 static int deleteIfVolatile(redisDb
*db
, robj
*key
);
407 static int deleteKey(redisDb
*db
, robj
*key
);
408 static time_t getExpire(redisDb
*db
, robj
*key
);
409 static int setExpire(redisDb
*db
, robj
*key
, time_t when
);
410 static void updateSlavesWaitingBgsave(int bgsaveerr
);
411 static void freeMemoryIfNeeded(void);
412 static int processCommand(redisClient
*c
);
413 static void setupSigSegvAction(void);
414 static void rdbRemoveTempFile(pid_t childpid
);
415 static void aofRemoveTempFile(pid_t childpid
);
416 static size_t stringObjectLen(robj
*o
);
417 static void processInputBuffer(redisClient
*c
);
418 static zskiplist
*zslCreate(void);
419 static void zslFree(zskiplist
*zsl
);
420 static void zslInsert(zskiplist
*zsl
, double score
, robj
*obj
);
421 static void sendReplyToClientWritev(aeEventLoop
*el
, int fd
, void *privdata
, int mask
);
423 static void authCommand(redisClient
*c
);
424 static void pingCommand(redisClient
*c
);
425 static void echoCommand(redisClient
*c
);
426 static void setCommand(redisClient
*c
);
427 static void setnxCommand(redisClient
*c
);
428 static void getCommand(redisClient
*c
);
429 static void delCommand(redisClient
*c
);
430 static void existsCommand(redisClient
*c
);
431 static void incrCommand(redisClient
*c
);
432 static void decrCommand(redisClient
*c
);
433 static void incrbyCommand(redisClient
*c
);
434 static void decrbyCommand(redisClient
*c
);
435 static void selectCommand(redisClient
*c
);
436 static void randomkeyCommand(redisClient
*c
);
437 static void keysCommand(redisClient
*c
);
438 static void dbsizeCommand(redisClient
*c
);
439 static void lastsaveCommand(redisClient
*c
);
440 static void saveCommand(redisClient
*c
);
441 static void bgsaveCommand(redisClient
*c
);
442 static void bgrewriteaofCommand(redisClient
*c
);
443 static void shutdownCommand(redisClient
*c
);
444 static void moveCommand(redisClient
*c
);
445 static void renameCommand(redisClient
*c
);
446 static void renamenxCommand(redisClient
*c
);
447 static void lpushCommand(redisClient
*c
);
448 static void rpushCommand(redisClient
*c
);
449 static void lpopCommand(redisClient
*c
);
450 static void rpopCommand(redisClient
*c
);
451 static void llenCommand(redisClient
*c
);
452 static void lindexCommand(redisClient
*c
);
453 static void lrangeCommand(redisClient
*c
);
454 static void ltrimCommand(redisClient
*c
);
455 static void typeCommand(redisClient
*c
);
456 static void lsetCommand(redisClient
*c
);
457 static void saddCommand(redisClient
*c
);
458 static void sremCommand(redisClient
*c
);
459 static void smoveCommand(redisClient
*c
);
460 static void sismemberCommand(redisClient
*c
);
461 static void scardCommand(redisClient
*c
);
462 static void spopCommand(redisClient
*c
);
463 static void srandmemberCommand(redisClient
*c
);
464 static void sinterCommand(redisClient
*c
);
465 static void sinterstoreCommand(redisClient
*c
);
466 static void sunionCommand(redisClient
*c
);
467 static void sunionstoreCommand(redisClient
*c
);
468 static void sdiffCommand(redisClient
*c
);
469 static void sdiffstoreCommand(redisClient
*c
);
470 static void syncCommand(redisClient
*c
);
471 static void flushdbCommand(redisClient
*c
);
472 static void flushallCommand(redisClient
*c
);
473 static void sortCommand(redisClient
*c
);
474 static void lremCommand(redisClient
*c
);
475 static void rpoplpushcommand(redisClient
*c
);
476 static void infoCommand(redisClient
*c
);
477 static void mgetCommand(redisClient
*c
);
478 static void monitorCommand(redisClient
*c
);
479 static void expireCommand(redisClient
*c
);
480 static void expireatCommand(redisClient
*c
);
481 static void getsetCommand(redisClient
*c
);
482 static void ttlCommand(redisClient
*c
);
483 static void slaveofCommand(redisClient
*c
);
484 static void debugCommand(redisClient
*c
);
485 static void msetCommand(redisClient
*c
);
486 static void msetnxCommand(redisClient
*c
);
487 static void zaddCommand(redisClient
*c
);
488 static void zincrbyCommand(redisClient
*c
);
489 static void zrangeCommand(redisClient
*c
);
490 static void zrangebyscoreCommand(redisClient
*c
);
491 static void zrevrangeCommand(redisClient
*c
);
492 static void zcardCommand(redisClient
*c
);
493 static void zremCommand(redisClient
*c
);
494 static void zscoreCommand(redisClient
*c
);
495 static void zremrangebyscoreCommand(redisClient
*c
);
497 /*================================= Globals ================================= */
500 static struct redisServer server
; /* server global state */
501 static struct redisCommand cmdTable
[] = {
502 {"get",getCommand
,2,REDIS_CMD_INLINE
},
503 {"set",setCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
504 {"setnx",setnxCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
505 {"del",delCommand
,-2,REDIS_CMD_INLINE
},
506 {"exists",existsCommand
,2,REDIS_CMD_INLINE
},
507 {"incr",incrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
508 {"decr",decrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
509 {"mget",mgetCommand
,-2,REDIS_CMD_INLINE
},
510 {"rpush",rpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
511 {"lpush",lpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
512 {"rpop",rpopCommand
,2,REDIS_CMD_INLINE
},
513 {"lpop",lpopCommand
,2,REDIS_CMD_INLINE
},
514 {"llen",llenCommand
,2,REDIS_CMD_INLINE
},
515 {"lindex",lindexCommand
,3,REDIS_CMD_INLINE
},
516 {"lset",lsetCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
517 {"lrange",lrangeCommand
,4,REDIS_CMD_INLINE
},
518 {"ltrim",ltrimCommand
,4,REDIS_CMD_INLINE
},
519 {"lrem",lremCommand
,4,REDIS_CMD_BULK
},
520 {"rpoplpush",rpoplpushcommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
521 {"sadd",saddCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
522 {"srem",sremCommand
,3,REDIS_CMD_BULK
},
523 {"smove",smoveCommand
,4,REDIS_CMD_BULK
},
524 {"sismember",sismemberCommand
,3,REDIS_CMD_BULK
},
525 {"scard",scardCommand
,2,REDIS_CMD_INLINE
},
526 {"spop",spopCommand
,2,REDIS_CMD_INLINE
},
527 {"srandmember",srandmemberCommand
,2,REDIS_CMD_INLINE
},
528 {"sinter",sinterCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
529 {"sinterstore",sinterstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
530 {"sunion",sunionCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
531 {"sunionstore",sunionstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
532 {"sdiff",sdiffCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
533 {"sdiffstore",sdiffstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
534 {"smembers",sinterCommand
,2,REDIS_CMD_INLINE
},
535 {"zadd",zaddCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
536 {"zincrby",zincrbyCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
537 {"zrem",zremCommand
,3,REDIS_CMD_BULK
},
538 {"zremrangebyscore",zremrangebyscoreCommand
,4,REDIS_CMD_INLINE
},
539 {"zrange",zrangeCommand
,-4,REDIS_CMD_INLINE
},
540 {"zrangebyscore",zrangebyscoreCommand
,-4,REDIS_CMD_INLINE
},
541 {"zrevrange",zrevrangeCommand
,-4,REDIS_CMD_INLINE
},
542 {"zcard",zcardCommand
,2,REDIS_CMD_INLINE
},
543 {"zscore",zscoreCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
544 {"incrby",incrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
545 {"decrby",decrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
546 {"getset",getsetCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
547 {"mset",msetCommand
,-3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
548 {"msetnx",msetnxCommand
,-3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
549 {"randomkey",randomkeyCommand
,1,REDIS_CMD_INLINE
},
550 {"select",selectCommand
,2,REDIS_CMD_INLINE
},
551 {"move",moveCommand
,3,REDIS_CMD_INLINE
},
552 {"rename",renameCommand
,3,REDIS_CMD_INLINE
},
553 {"renamenx",renamenxCommand
,3,REDIS_CMD_INLINE
},
554 {"expire",expireCommand
,3,REDIS_CMD_INLINE
},
555 {"expireat",expireatCommand
,3,REDIS_CMD_INLINE
},
556 {"keys",keysCommand
,2,REDIS_CMD_INLINE
},
557 {"dbsize",dbsizeCommand
,1,REDIS_CMD_INLINE
},
558 {"auth",authCommand
,2,REDIS_CMD_INLINE
},
559 {"ping",pingCommand
,1,REDIS_CMD_INLINE
},
560 {"echo",echoCommand
,2,REDIS_CMD_BULK
},
561 {"save",saveCommand
,1,REDIS_CMD_INLINE
},
562 {"bgsave",bgsaveCommand
,1,REDIS_CMD_INLINE
},
563 {"bgrewriteaof",bgrewriteaofCommand
,1,REDIS_CMD_INLINE
},
564 {"shutdown",shutdownCommand
,1,REDIS_CMD_INLINE
},
565 {"lastsave",lastsaveCommand
,1,REDIS_CMD_INLINE
},
566 {"type",typeCommand
,2,REDIS_CMD_INLINE
},
567 {"sync",syncCommand
,1,REDIS_CMD_INLINE
},
568 {"flushdb",flushdbCommand
,1,REDIS_CMD_INLINE
},
569 {"flushall",flushallCommand
,1,REDIS_CMD_INLINE
},
570 {"sort",sortCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
571 {"info",infoCommand
,1,REDIS_CMD_INLINE
},
572 {"monitor",monitorCommand
,1,REDIS_CMD_INLINE
},
573 {"ttl",ttlCommand
,2,REDIS_CMD_INLINE
},
574 {"slaveof",slaveofCommand
,3,REDIS_CMD_INLINE
},
575 {"debug",debugCommand
,-2,REDIS_CMD_INLINE
},
579 /*============================ Utility functions ============================ */
581 /* Glob-style pattern matching. */
582 int stringmatchlen(const char *pattern
, int patternLen
,
583 const char *string
, int stringLen
, int nocase
)
588 while (pattern
[1] == '*') {
593 return 1; /* match */
595 if (stringmatchlen(pattern
+1, patternLen
-1,
596 string
, stringLen
, nocase
))
597 return 1; /* match */
601 return 0; /* no match */
605 return 0; /* no match */
615 not = pattern
[0] == '^';
622 if (pattern
[0] == '\\') {
625 if (pattern
[0] == string
[0])
627 } else if (pattern
[0] == ']') {
629 } else if (patternLen
== 0) {
633 } else if (pattern
[1] == '-' && patternLen
>= 3) {
634 int start
= pattern
[0];
635 int end
= pattern
[2];
643 start
= tolower(start
);
649 if (c
>= start
&& c
<= end
)
653 if (pattern
[0] == string
[0])
656 if (tolower((int)pattern
[0]) == tolower((int)string
[0]))
666 return 0; /* no match */
672 if (patternLen
>= 2) {
679 if (pattern
[0] != string
[0])
680 return 0; /* no match */
682 if (tolower((int)pattern
[0]) != tolower((int)string
[0]))
683 return 0; /* no match */
691 if (stringLen
== 0) {
692 while(*pattern
== '*') {
699 if (patternLen
== 0 && stringLen
== 0)
704 static void redisLog(int level
, const char *fmt
, ...) {
708 fp
= (server
.logfile
== NULL
) ? stdout
: fopen(server
.logfile
,"a");
712 if (level
>= server
.verbosity
) {
718 strftime(buf
,64,"%d %b %H:%M:%S",localtime(&now
));
719 fprintf(fp
,"%s %c ",buf
,c
[level
]);
720 vfprintf(fp
, fmt
, ap
);
726 if (server
.logfile
) fclose(fp
);
729 /*====================== Hash table type implementation ==================== */
731 /* This is an hash table type that uses the SDS dynamic strings libary as
732 * keys and radis objects as values (objects can hold SDS strings,
735 static void dictVanillaFree(void *privdata
, void *val
)
737 DICT_NOTUSED(privdata
);
741 static int sdsDictKeyCompare(void *privdata
, const void *key1
,
745 DICT_NOTUSED(privdata
);
747 l1
= sdslen((sds
)key1
);
748 l2
= sdslen((sds
)key2
);
749 if (l1
!= l2
) return 0;
750 return memcmp(key1
, key2
, l1
) == 0;
753 static void dictRedisObjectDestructor(void *privdata
, void *val
)
755 DICT_NOTUSED(privdata
);
760 static int dictObjKeyCompare(void *privdata
, const void *key1
,
763 const robj
*o1
= key1
, *o2
= key2
;
764 return sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
767 static unsigned int dictObjHash(const void *key
) {
769 return dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
772 static int dictEncObjKeyCompare(void *privdata
, const void *key1
,
775 robj
*o1
= (robj
*) key1
, *o2
= (robj
*) key2
;
778 o1
= getDecodedObject(o1
);
779 o2
= getDecodedObject(o2
);
780 cmp
= sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
786 static unsigned int dictEncObjHash(const void *key
) {
787 robj
*o
= (robj
*) key
;
789 o
= getDecodedObject(o
);
790 unsigned int hash
= dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
795 static dictType setDictType
= {
796 dictEncObjHash
, /* hash function */
799 dictEncObjKeyCompare
, /* key compare */
800 dictRedisObjectDestructor
, /* key destructor */
801 NULL
/* val destructor */
804 static dictType zsetDictType
= {
805 dictEncObjHash
, /* hash function */
808 dictEncObjKeyCompare
, /* key compare */
809 dictRedisObjectDestructor
, /* key destructor */
810 dictVanillaFree
/* val destructor of malloc(sizeof(double)) */
813 static dictType hashDictType
= {
814 dictObjHash
, /* hash function */
817 dictObjKeyCompare
, /* key compare */
818 dictRedisObjectDestructor
, /* key destructor */
819 dictRedisObjectDestructor
/* val destructor */
822 /* ========================= Random utility functions ======================= */
824 /* Redis generally does not try to recover from out of memory conditions
825 * when allocating objects or strings, it is not clear if it will be possible
826 * to report this condition to the client since the networking layer itself
827 * is based on heap allocation for send buffers, so we simply abort.
828 * At least the code will be simpler to read... */
829 static void oom(const char *msg
) {
830 redisLog(REDIS_WARNING
, "%s: Out of memory\n",msg
);
835 /* ====================== Redis server networking stuff ===================== */
836 static void closeTimedoutClients(void) {
839 time_t now
= time(NULL
);
841 listRewind(server
.clients
);
842 while ((ln
= listYield(server
.clients
)) != NULL
) {
843 c
= listNodeValue(ln
);
844 if (!(c
->flags
& REDIS_SLAVE
) && /* no timeout for slaves */
845 !(c
->flags
& REDIS_MASTER
) && /* no timeout for masters */
846 (now
- c
->lastinteraction
> server
.maxidletime
)) {
847 redisLog(REDIS_DEBUG
,"Closing idle client");
853 static int htNeedsResize(dict
*dict
) {
854 long long size
, used
;
856 size
= dictSlots(dict
);
857 used
= dictSize(dict
);
858 return (size
&& used
&& size
> DICT_HT_INITIAL_SIZE
&&
859 (used
*100/size
< REDIS_HT_MINFILL
));
862 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
863 * we resize the hash table to save memory */
864 static void tryResizeHashTables(void) {
867 for (j
= 0; j
< server
.dbnum
; j
++) {
868 if (htNeedsResize(server
.db
[j
].dict
)) {
869 redisLog(REDIS_DEBUG
,"The hash table %d is too sparse, resize it...",j
);
870 dictResize(server
.db
[j
].dict
);
871 redisLog(REDIS_DEBUG
,"Hash table %d resized.",j
);
873 if (htNeedsResize(server
.db
[j
].expires
))
874 dictResize(server
.db
[j
].expires
);
878 /* A background saving child (BGSAVE) terminated its work. Handle this. */
879 void backgroundSaveDoneHandler(int statloc
) {
880 int exitcode
= WEXITSTATUS(statloc
);
881 int bysignal
= WIFSIGNALED(statloc
);
883 if (!bysignal
&& exitcode
== 0) {
884 redisLog(REDIS_NOTICE
,
885 "Background saving terminated with success");
887 server
.lastsave
= time(NULL
);
888 } else if (!bysignal
&& exitcode
!= 0) {
889 redisLog(REDIS_WARNING
, "Background saving error");
891 redisLog(REDIS_WARNING
,
892 "Background saving terminated by signal");
893 rdbRemoveTempFile(server
.bgsavechildpid
);
895 server
.bgsavechildpid
= -1;
896 /* Possibly there are slaves waiting for a BGSAVE in order to be served
897 * (the first stage of SYNC is a bulk transfer of dump.rdb) */
898 updateSlavesWaitingBgsave(exitcode
== 0 ? REDIS_OK
: REDIS_ERR
);
901 /* A background append only file rewriting (BGREWRITEAOF) terminated its work.
903 void backgroundRewriteDoneHandler(int statloc
) {
904 int exitcode
= WEXITSTATUS(statloc
);
905 int bysignal
= WIFSIGNALED(statloc
);
907 if (!bysignal
&& exitcode
== 0) {
911 redisLog(REDIS_NOTICE
,
912 "Background append only file rewriting terminated with success");
913 /* Now it's time to flush the differences accumulated by the parent */
914 snprintf(tmpfile
,256,"temp-rewriteaof-bg-%d.aof", (int) server
.bgrewritechildpid
);
915 fd
= open(tmpfile
,O_WRONLY
|O_APPEND
);
917 redisLog(REDIS_WARNING
, "Not able to open the temp append only file produced by the child: %s", strerror(errno
));
920 /* Flush our data... */
921 if (write(fd
,server
.bgrewritebuf
,sdslen(server
.bgrewritebuf
)) !=
922 (signed) sdslen(server
.bgrewritebuf
)) {
923 redisLog(REDIS_WARNING
, "Error or short write trying to flush the parent diff of the append log file in the child temp file: %s", strerror(errno
));
927 redisLog(REDIS_NOTICE
,"Parent diff flushed into the new append log file with success (%lu bytes)",sdslen(server
.bgrewritebuf
));
928 /* Now our work is to rename the temp file into the stable file. And
929 * switch the file descriptor used by the server for append only. */
930 if (rename(tmpfile
,server
.appendfilename
) == -1) {
931 redisLog(REDIS_WARNING
,"Can't rename the temp append only file into the stable one: %s", strerror(errno
));
935 /* Mission completed... almost */
936 redisLog(REDIS_NOTICE
,"Append only file successfully rewritten.");
937 if (server
.appendfd
!= -1) {
938 /* If append only is actually enabled... */
939 close(server
.appendfd
);
940 server
.appendfd
= fd
;
942 server
.appendseldb
= -1; /* Make sure it will issue SELECT */
943 redisLog(REDIS_NOTICE
,"The new append only file was selected for future appends.");
945 /* If append only is disabled we just generate a dump in this
946 * format. Why not? */
949 } else if (!bysignal
&& exitcode
!= 0) {
950 redisLog(REDIS_WARNING
, "Background append only file rewriting error");
952 redisLog(REDIS_WARNING
,
953 "Background append only file rewriting terminated by signal");
956 sdsfree(server
.bgrewritebuf
);
957 server
.bgrewritebuf
= sdsempty();
958 aofRemoveTempFile(server
.bgrewritechildpid
);
959 server
.bgrewritechildpid
= -1;
962 static int serverCron(struct aeEventLoop
*eventLoop
, long long id
, void *clientData
) {
963 int j
, loops
= server
.cronloops
++;
964 REDIS_NOTUSED(eventLoop
);
966 REDIS_NOTUSED(clientData
);
968 /* Update the global state with the amount of used memory */
969 server
.usedmemory
= zmalloc_used_memory();
971 /* Show some info about non-empty databases */
972 for (j
= 0; j
< server
.dbnum
; j
++) {
973 long long size
, used
, vkeys
;
975 size
= dictSlots(server
.db
[j
].dict
);
976 used
= dictSize(server
.db
[j
].dict
);
977 vkeys
= dictSize(server
.db
[j
].expires
);
978 if (!(loops
% 5) && (used
|| vkeys
)) {
979 redisLog(REDIS_DEBUG
,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j
,used
,vkeys
,size
);
980 /* dictPrintStats(server.dict); */
984 /* We don't want to resize the hash tables while a bacground saving
985 * is in progress: the saving child is created using fork() that is
986 * implemented with a copy-on-write semantic in most modern systems, so
987 * if we resize the HT while there is the saving child at work actually
988 * a lot of memory movements in the parent will cause a lot of pages
990 if (server
.bgsavechildpid
== -1) tryResizeHashTables();
992 /* Show information about connected clients */
994 redisLog(REDIS_DEBUG
,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
995 listLength(server
.clients
)-listLength(server
.slaves
),
996 listLength(server
.slaves
),
998 dictSize(server
.sharingpool
));
1001 /* Close connections of timedout clients */
1002 if (server
.maxidletime
&& !(loops
% 10))
1003 closeTimedoutClients();
1005 /* Check if a background saving or AOF rewrite in progress terminated */
1006 if (server
.bgsavechildpid
!= -1 || server
.bgrewritechildpid
!= -1) {
1010 if ((pid
= wait3(&statloc
,WNOHANG
,NULL
)) != 0) {
1011 if (pid
== server
.bgsavechildpid
) {
1012 backgroundSaveDoneHandler(statloc
);
1014 backgroundRewriteDoneHandler(statloc
);
1018 /* If there is not a background saving in progress check if
1019 * we have to save now */
1020 time_t now
= time(NULL
);
1021 for (j
= 0; j
< server
.saveparamslen
; j
++) {
1022 struct saveparam
*sp
= server
.saveparams
+j
;
1024 if (server
.dirty
>= sp
->changes
&&
1025 now
-server
.lastsave
> sp
->seconds
) {
1026 redisLog(REDIS_NOTICE
,"%d changes in %d seconds. Saving...",
1027 sp
->changes
, sp
->seconds
);
1028 rdbSaveBackground(server
.dbfilename
);
1034 /* Try to expire a few timed out keys. The algorithm used is adaptive and
1035 * will use few CPU cycles if there are few expiring keys, otherwise
1036 * it will get more aggressive to avoid that too much memory is used by
1037 * keys that can be removed from the keyspace. */
1038 for (j
= 0; j
< server
.dbnum
; j
++) {
1040 redisDb
*db
= server
.db
+j
;
1042 /* Continue to expire if at the end of the cycle more than 25%
1043 * of the keys were expired. */
1045 int num
= dictSize(db
->expires
);
1046 time_t now
= time(NULL
);
1049 if (num
> REDIS_EXPIRELOOKUPS_PER_CRON
)
1050 num
= REDIS_EXPIRELOOKUPS_PER_CRON
;
1055 if ((de
= dictGetRandomKey(db
->expires
)) == NULL
) break;
1056 t
= (time_t) dictGetEntryVal(de
);
1058 deleteKey(db
,dictGetEntryKey(de
));
1062 } while (expired
> REDIS_EXPIRELOOKUPS_PER_CRON
/4);
1065 /* Check if we should connect to a MASTER */
1066 if (server
.replstate
== REDIS_REPL_CONNECT
) {
1067 redisLog(REDIS_NOTICE
,"Connecting to MASTER...");
1068 if (syncWithMaster() == REDIS_OK
) {
1069 redisLog(REDIS_NOTICE
,"MASTER <-> SLAVE sync succeeded");
1075 static void createSharedObjects(void) {
1076 shared
.crlf
= createObject(REDIS_STRING
,sdsnew("\r\n"));
1077 shared
.ok
= createObject(REDIS_STRING
,sdsnew("+OK\r\n"));
1078 shared
.err
= createObject(REDIS_STRING
,sdsnew("-ERR\r\n"));
1079 shared
.emptybulk
= createObject(REDIS_STRING
,sdsnew("$0\r\n\r\n"));
1080 shared
.czero
= createObject(REDIS_STRING
,sdsnew(":0\r\n"));
1081 shared
.cone
= createObject(REDIS_STRING
,sdsnew(":1\r\n"));
1082 shared
.nullbulk
= createObject(REDIS_STRING
,sdsnew("$-1\r\n"));
1083 shared
.nullmultibulk
= createObject(REDIS_STRING
,sdsnew("*-1\r\n"));
1084 shared
.emptymultibulk
= createObject(REDIS_STRING
,sdsnew("*0\r\n"));
1086 shared
.pong
= createObject(REDIS_STRING
,sdsnew("+PONG\r\n"));
1087 shared
.wrongtypeerr
= createObject(REDIS_STRING
,sdsnew(
1088 "-ERR Operation against a key holding the wrong kind of value\r\n"));
1089 shared
.nokeyerr
= createObject(REDIS_STRING
,sdsnew(
1090 "-ERR no such key\r\n"));
1091 shared
.syntaxerr
= createObject(REDIS_STRING
,sdsnew(
1092 "-ERR syntax error\r\n"));
1093 shared
.sameobjecterr
= createObject(REDIS_STRING
,sdsnew(
1094 "-ERR source and destination objects are the same\r\n"));
1095 shared
.outofrangeerr
= createObject(REDIS_STRING
,sdsnew(
1096 "-ERR index out of range\r\n"));
1097 shared
.space
= createObject(REDIS_STRING
,sdsnew(" "));
1098 shared
.colon
= createObject(REDIS_STRING
,sdsnew(":"));
1099 shared
.plus
= createObject(REDIS_STRING
,sdsnew("+"));
1100 shared
.select0
= createStringObject("select 0\r\n",10);
1101 shared
.select1
= createStringObject("select 1\r\n",10);
1102 shared
.select2
= createStringObject("select 2\r\n",10);
1103 shared
.select3
= createStringObject("select 3\r\n",10);
1104 shared
.select4
= createStringObject("select 4\r\n",10);
1105 shared
.select5
= createStringObject("select 5\r\n",10);
1106 shared
.select6
= createStringObject("select 6\r\n",10);
1107 shared
.select7
= createStringObject("select 7\r\n",10);
1108 shared
.select8
= createStringObject("select 8\r\n",10);
1109 shared
.select9
= createStringObject("select 9\r\n",10);
1112 static void appendServerSaveParams(time_t seconds
, int changes
) {
1113 server
.saveparams
= zrealloc(server
.saveparams
,sizeof(struct saveparam
)*(server
.saveparamslen
+1));
1114 server
.saveparams
[server
.saveparamslen
].seconds
= seconds
;
1115 server
.saveparams
[server
.saveparamslen
].changes
= changes
;
1116 server
.saveparamslen
++;
1119 static void resetServerSaveParams() {
1120 zfree(server
.saveparams
);
1121 server
.saveparams
= NULL
;
1122 server
.saveparamslen
= 0;
1125 static void initServerConfig() {
1126 server
.dbnum
= REDIS_DEFAULT_DBNUM
;
1127 server
.port
= REDIS_SERVERPORT
;
1128 server
.verbosity
= REDIS_DEBUG
;
1129 server
.maxidletime
= REDIS_MAXIDLETIME
;
1130 server
.saveparams
= NULL
;
1131 server
.logfile
= NULL
; /* NULL = log on standard output */
1132 server
.bindaddr
= NULL
;
1133 server
.glueoutputbuf
= 1;
1134 server
.daemonize
= 0;
1135 server
.appendonly
= 0;
1136 server
.appendfsync
= APPENDFSYNC_ALWAYS
;
1137 server
.lastfsync
= time(NULL
);
1138 server
.appendfd
= -1;
1139 server
.appendseldb
= -1; /* Make sure the first time will not match */
1140 server
.pidfile
= "/var/run/redis.pid";
1141 server
.dbfilename
= "dump.rdb";
1142 server
.appendfilename
= "appendonly.aof";
1143 server
.requirepass
= NULL
;
1144 server
.shareobjects
= 0;
1145 server
.rdbcompression
= 1;
1146 server
.sharingpoolsize
= 1024;
1147 server
.maxclients
= 0;
1148 server
.maxmemory
= 0;
1149 resetServerSaveParams();
1151 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1152 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1153 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1154 /* Replication related */
1156 server
.masterauth
= NULL
;
1157 server
.masterhost
= NULL
;
1158 server
.masterport
= 6379;
1159 server
.master
= NULL
;
1160 server
.replstate
= REDIS_REPL_NONE
;
1162 /* Double constants initialization */
1164 R_PosInf
= 1.0/R_Zero
;
1165 R_NegInf
= -1.0/R_Zero
;
1166 R_Nan
= R_Zero
/R_Zero
;
1169 static void initServer() {
1172 signal(SIGHUP
, SIG_IGN
);
1173 signal(SIGPIPE
, SIG_IGN
);
1174 setupSigSegvAction();
1176 server
.clients
= listCreate();
1177 server
.slaves
= listCreate();
1178 server
.monitors
= listCreate();
1179 server
.objfreelist
= listCreate();
1180 createSharedObjects();
1181 server
.el
= aeCreateEventLoop();
1182 server
.db
= zmalloc(sizeof(redisDb
)*server
.dbnum
);
1183 server
.sharingpool
= dictCreate(&setDictType
,NULL
);
1184 server
.fd
= anetTcpServer(server
.neterr
, server
.port
, server
.bindaddr
);
1185 if (server
.fd
== -1) {
1186 redisLog(REDIS_WARNING
, "Opening TCP port: %s", server
.neterr
);
1189 for (j
= 0; j
< server
.dbnum
; j
++) {
1190 server
.db
[j
].dict
= dictCreate(&hashDictType
,NULL
);
1191 server
.db
[j
].expires
= dictCreate(&setDictType
,NULL
);
1192 server
.db
[j
].id
= j
;
1194 server
.cronloops
= 0;
1195 server
.bgsavechildpid
= -1;
1196 server
.bgrewritechildpid
= -1;
1197 server
.bgrewritebuf
= sdsempty();
1198 server
.lastsave
= time(NULL
);
1200 server
.usedmemory
= 0;
1201 server
.stat_numcommands
= 0;
1202 server
.stat_numconnections
= 0;
1203 server
.stat_starttime
= time(NULL
);
1204 aeCreateTimeEvent(server
.el
, 1, serverCron
, NULL
, NULL
);
1206 if (server
.appendonly
) {
1207 server
.appendfd
= open(server
.appendfilename
,O_WRONLY
|O_APPEND
|O_CREAT
,0644);
1208 if (server
.appendfd
== -1) {
1209 redisLog(REDIS_WARNING
, "Can't open the append-only file: %s",
1216 /* Empty the whole database */
1217 static long long emptyDb() {
1219 long long removed
= 0;
1221 for (j
= 0; j
< server
.dbnum
; j
++) {
1222 removed
+= dictSize(server
.db
[j
].dict
);
1223 dictEmpty(server
.db
[j
].dict
);
1224 dictEmpty(server
.db
[j
].expires
);
1229 static int yesnotoi(char *s
) {
1230 if (!strcasecmp(s
,"yes")) return 1;
1231 else if (!strcasecmp(s
,"no")) return 0;
1235 /* I agree, this is a very rudimental way to load a configuration...
1236 will improve later if the config gets more complex */
1237 static void loadServerConfig(char *filename
) {
1239 char buf
[REDIS_CONFIGLINE_MAX
+1], *err
= NULL
;
1243 if (filename
[0] == '-' && filename
[1] == '\0')
1246 if ((fp
= fopen(filename
,"r")) == NULL
) {
1247 redisLog(REDIS_WARNING
,"Fatal error, can't open config file");
1252 while(fgets(buf
,REDIS_CONFIGLINE_MAX
+1,fp
) != NULL
) {
1258 line
= sdstrim(line
," \t\r\n");
1260 /* Skip comments and blank lines*/
1261 if (line
[0] == '#' || line
[0] == '\0') {
1266 /* Split into arguments */
1267 argv
= sdssplitlen(line
,sdslen(line
)," ",1,&argc
);
1268 sdstolower(argv
[0]);
1270 /* Execute config directives */
1271 if (!strcasecmp(argv
[0],"timeout") && argc
== 2) {
1272 server
.maxidletime
= atoi(argv
[1]);
1273 if (server
.maxidletime
< 0) {
1274 err
= "Invalid timeout value"; goto loaderr
;
1276 } else if (!strcasecmp(argv
[0],"port") && argc
== 2) {
1277 server
.port
= atoi(argv
[1]);
1278 if (server
.port
< 1 || server
.port
> 65535) {
1279 err
= "Invalid port"; goto loaderr
;
1281 } else if (!strcasecmp(argv
[0],"bind") && argc
== 2) {
1282 server
.bindaddr
= zstrdup(argv
[1]);
1283 } else if (!strcasecmp(argv
[0],"save") && argc
== 3) {
1284 int seconds
= atoi(argv
[1]);
1285 int changes
= atoi(argv
[2]);
1286 if (seconds
< 1 || changes
< 0) {
1287 err
= "Invalid save parameters"; goto loaderr
;
1289 appendServerSaveParams(seconds
,changes
);
1290 } else if (!strcasecmp(argv
[0],"dir") && argc
== 2) {
1291 if (chdir(argv
[1]) == -1) {
1292 redisLog(REDIS_WARNING
,"Can't chdir to '%s': %s",
1293 argv
[1], strerror(errno
));
1296 } else if (!strcasecmp(argv
[0],"loglevel") && argc
== 2) {
1297 if (!strcasecmp(argv
[1],"debug")) server
.verbosity
= REDIS_DEBUG
;
1298 else if (!strcasecmp(argv
[1],"notice")) server
.verbosity
= REDIS_NOTICE
;
1299 else if (!strcasecmp(argv
[1],"warning")) server
.verbosity
= REDIS_WARNING
;
1301 err
= "Invalid log level. Must be one of debug, notice, warning";
1304 } else if (!strcasecmp(argv
[0],"logfile") && argc
== 2) {
1307 server
.logfile
= zstrdup(argv
[1]);
1308 if (!strcasecmp(server
.logfile
,"stdout")) {
1309 zfree(server
.logfile
);
1310 server
.logfile
= NULL
;
1312 if (server
.logfile
) {
1313 /* Test if we are able to open the file. The server will not
1314 * be able to abort just for this problem later... */
1315 logfp
= fopen(server
.logfile
,"a");
1316 if (logfp
== NULL
) {
1317 err
= sdscatprintf(sdsempty(),
1318 "Can't open the log file: %s", strerror(errno
));
1323 } else if (!strcasecmp(argv
[0],"databases") && argc
== 2) {
1324 server
.dbnum
= atoi(argv
[1]);
1325 if (server
.dbnum
< 1) {
1326 err
= "Invalid number of databases"; goto loaderr
;
1328 } else if (!strcasecmp(argv
[0],"maxclients") && argc
== 2) {
1329 server
.maxclients
= atoi(argv
[1]);
1330 } else if (!strcasecmp(argv
[0],"maxmemory") && argc
== 2) {
1331 server
.maxmemory
= strtoll(argv
[1], NULL
, 10);
1332 } else if (!strcasecmp(argv
[0],"slaveof") && argc
== 3) {
1333 server
.masterhost
= sdsnew(argv
[1]);
1334 server
.masterport
= atoi(argv
[2]);
1335 server
.replstate
= REDIS_REPL_CONNECT
;
1336 } else if (!strcasecmp(argv
[0],"masterauth") && argc
== 2) {
1337 server
.masterauth
= zstrdup(argv
[1]);
1338 } else if (!strcasecmp(argv
[0],"glueoutputbuf") && argc
== 2) {
1339 if ((server
.glueoutputbuf
= yesnotoi(argv
[1])) == -1) {
1340 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1342 } else if (!strcasecmp(argv
[0],"shareobjects") && argc
== 2) {
1343 if ((server
.shareobjects
= yesnotoi(argv
[1])) == -1) {
1344 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1346 } else if (!strcasecmp(argv
[0],"rdbcompression") && argc
== 2) {
1347 if ((server
.rdbcompression
= yesnotoi(argv
[1])) == -1) {
1348 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1350 } else if (!strcasecmp(argv
[0],"shareobjectspoolsize") && argc
== 2) {
1351 server
.sharingpoolsize
= atoi(argv
[1]);
1352 if (server
.sharingpoolsize
< 1) {
1353 err
= "invalid object sharing pool size"; goto loaderr
;
1355 } else if (!strcasecmp(argv
[0],"daemonize") && argc
== 2) {
1356 if ((server
.daemonize
= yesnotoi(argv
[1])) == -1) {
1357 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1359 } else if (!strcasecmp(argv
[0],"appendonly") && argc
== 2) {
1360 if ((server
.appendonly
= yesnotoi(argv
[1])) == -1) {
1361 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1363 } else if (!strcasecmp(argv
[0],"appendfsync") && argc
== 2) {
1364 if (!strcasecmp(argv
[1],"no")) {
1365 server
.appendfsync
= APPENDFSYNC_NO
;
1366 } else if (!strcasecmp(argv
[1],"always")) {
1367 server
.appendfsync
= APPENDFSYNC_ALWAYS
;
1368 } else if (!strcasecmp(argv
[1],"everysec")) {
1369 server
.appendfsync
= APPENDFSYNC_EVERYSEC
;
1371 err
= "argument must be 'no', 'always' or 'everysec'";
1374 } else if (!strcasecmp(argv
[0],"requirepass") && argc
== 2) {
1375 server
.requirepass
= zstrdup(argv
[1]);
1376 } else if (!strcasecmp(argv
[0],"pidfile") && argc
== 2) {
1377 server
.pidfile
= zstrdup(argv
[1]);
1378 } else if (!strcasecmp(argv
[0],"dbfilename") && argc
== 2) {
1379 server
.dbfilename
= zstrdup(argv
[1]);
1381 err
= "Bad directive or wrong number of arguments"; goto loaderr
;
1383 for (j
= 0; j
< argc
; j
++)
1388 if (fp
!= stdin
) fclose(fp
);
1392 fprintf(stderr
, "\n*** FATAL CONFIG FILE ERROR ***\n");
1393 fprintf(stderr
, "Reading the configuration file, at line %d\n", linenum
);
1394 fprintf(stderr
, ">>> '%s'\n", line
);
1395 fprintf(stderr
, "%s\n", err
);
1399 static void freeClientArgv(redisClient
*c
) {
1402 for (j
= 0; j
< c
->argc
; j
++)
1403 decrRefCount(c
->argv
[j
]);
1404 for (j
= 0; j
< c
->mbargc
; j
++)
1405 decrRefCount(c
->mbargv
[j
]);
1410 static void freeClient(redisClient
*c
) {
1413 aeDeleteFileEvent(server
.el
,c
->fd
,AE_READABLE
);
1414 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1415 sdsfree(c
->querybuf
);
1416 listRelease(c
->reply
);
1419 ln
= listSearchKey(server
.clients
,c
);
1420 redisAssert(ln
!= NULL
);
1421 listDelNode(server
.clients
,ln
);
1422 if (c
->flags
& REDIS_SLAVE
) {
1423 if (c
->replstate
== REDIS_REPL_SEND_BULK
&& c
->repldbfd
!= -1)
1425 list
*l
= (c
->flags
& REDIS_MONITOR
) ? server
.monitors
: server
.slaves
;
1426 ln
= listSearchKey(l
,c
);
1427 redisAssert(ln
!= NULL
);
1430 if (c
->flags
& REDIS_MASTER
) {
1431 server
.master
= NULL
;
1432 server
.replstate
= REDIS_REPL_CONNECT
;
1439 #define GLUEREPLY_UP_TO (1024)
1440 static void glueReplyBuffersIfNeeded(redisClient
*c
) {
1442 char buf
[GLUEREPLY_UP_TO
];
1446 listRewind(c
->reply
);
1447 while((ln
= listYield(c
->reply
))) {
1451 objlen
= sdslen(o
->ptr
);
1452 if (copylen
+ objlen
<= GLUEREPLY_UP_TO
) {
1453 memcpy(buf
+copylen
,o
->ptr
,objlen
);
1455 listDelNode(c
->reply
,ln
);
1457 if (copylen
== 0) return;
1461 /* Now the output buffer is empty, add the new single element */
1462 o
= createObject(REDIS_STRING
,sdsnewlen(buf
,copylen
));
1463 listAddNodeHead(c
->reply
,o
);
1466 static void sendReplyToClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1467 redisClient
*c
= privdata
;
1468 int nwritten
= 0, totwritten
= 0, objlen
;
1471 REDIS_NOTUSED(mask
);
1473 /* Use writev() if we have enough buffers to send */
1474 if (!server
.glueoutputbuf
&&
1475 listLength(c
->reply
) > REDIS_WRITEV_THRESHOLD
&&
1476 !(c
->flags
& REDIS_MASTER
))
1478 sendReplyToClientWritev(el
, fd
, privdata
, mask
);
1482 while(listLength(c
->reply
)) {
1483 if (server
.glueoutputbuf
&& listLength(c
->reply
) > 1)
1484 glueReplyBuffersIfNeeded(c
);
1486 o
= listNodeValue(listFirst(c
->reply
));
1487 objlen
= sdslen(o
->ptr
);
1490 listDelNode(c
->reply
,listFirst(c
->reply
));
1494 if (c
->flags
& REDIS_MASTER
) {
1495 /* Don't reply to a master */
1496 nwritten
= objlen
- c
->sentlen
;
1498 nwritten
= write(fd
, ((char*)o
->ptr
)+c
->sentlen
, objlen
- c
->sentlen
);
1499 if (nwritten
<= 0) break;
1501 c
->sentlen
+= nwritten
;
1502 totwritten
+= nwritten
;
1503 /* If we fully sent the object on head go to the next one */
1504 if (c
->sentlen
== objlen
) {
1505 listDelNode(c
->reply
,listFirst(c
->reply
));
1508 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1509 * bytes, in a single threaded server it's a good idea to serve
1510 * other clients as well, even if a very large request comes from
1511 * super fast link that is always able to accept data (in real world
1512 * scenario think about 'KEYS *' against the loopback interfae) */
1513 if (totwritten
> REDIS_MAX_WRITE_PER_EVENT
) break;
1515 if (nwritten
== -1) {
1516 if (errno
== EAGAIN
) {
1519 redisLog(REDIS_DEBUG
,
1520 "Error writing to client: %s", strerror(errno
));
1525 if (totwritten
> 0) c
->lastinteraction
= time(NULL
);
1526 if (listLength(c
->reply
) == 0) {
1528 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1532 static void sendReplyToClientWritev(aeEventLoop
*el
, int fd
, void *privdata
, int mask
)
1534 redisClient
*c
= privdata
;
1535 int nwritten
= 0, totwritten
= 0, objlen
, willwrite
;
1537 struct iovec iov
[REDIS_WRITEV_IOVEC_COUNT
];
1538 int offset
, ion
= 0;
1540 REDIS_NOTUSED(mask
);
1543 while (listLength(c
->reply
)) {
1544 offset
= c
->sentlen
;
1548 /* fill-in the iov[] array */
1549 for(node
= listFirst(c
->reply
); node
; node
= listNextNode(node
)) {
1550 o
= listNodeValue(node
);
1551 objlen
= sdslen(o
->ptr
);
1553 if (totwritten
+ objlen
- offset
> REDIS_MAX_WRITE_PER_EVENT
)
1556 if(ion
== REDIS_WRITEV_IOVEC_COUNT
)
1557 break; /* no more iovecs */
1559 iov
[ion
].iov_base
= ((char*)o
->ptr
) + offset
;
1560 iov
[ion
].iov_len
= objlen
- offset
;
1561 willwrite
+= objlen
- offset
;
1562 offset
= 0; /* just for the first item */
1569 /* write all collected blocks at once */
1570 if((nwritten
= writev(fd
, iov
, ion
)) < 0) {
1571 if (errno
!= EAGAIN
) {
1572 redisLog(REDIS_DEBUG
,
1573 "Error writing to client: %s", strerror(errno
));
1580 totwritten
+= nwritten
;
1581 offset
= c
->sentlen
;
1583 /* remove written robjs from c->reply */
1584 while (nwritten
&& listLength(c
->reply
)) {
1585 o
= listNodeValue(listFirst(c
->reply
));
1586 objlen
= sdslen(o
->ptr
);
1588 if(nwritten
>= objlen
- offset
) {
1589 listDelNode(c
->reply
, listFirst(c
->reply
));
1590 nwritten
-= objlen
- offset
;
1594 c
->sentlen
+= nwritten
;
1602 c
->lastinteraction
= time(NULL
);
1604 if (listLength(c
->reply
) == 0) {
1606 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1610 static struct redisCommand
*lookupCommand(char *name
) {
1612 while(cmdTable
[j
].name
!= NULL
) {
1613 if (!strcasecmp(name
,cmdTable
[j
].name
)) return &cmdTable
[j
];
1619 /* resetClient prepare the client to process the next command */
1620 static void resetClient(redisClient
*c
) {
1626 /* If this function gets called we already read a whole
1627 * command, argments are in the client argv/argc fields.
1628 * processCommand() execute the command or prepare the
1629 * server for a bulk read from the client.
1631 * If 1 is returned the client is still alive and valid and
1632 * and other operations can be performed by the caller. Otherwise
1633 * if 0 is returned the client was destroied (i.e. after QUIT). */
1634 static int processCommand(redisClient
*c
) {
1635 struct redisCommand
*cmd
;
1638 /* Free some memory if needed (maxmemory setting) */
1639 if (server
.maxmemory
) freeMemoryIfNeeded();
1641 /* Handle the multi bulk command type. This is an alternative protocol
1642 * supported by Redis in order to receive commands that are composed of
1643 * multiple binary-safe "bulk" arguments. The latency of processing is
1644 * a bit higher but this allows things like multi-sets, so if this
1645 * protocol is used only for MSET and similar commands this is a big win. */
1646 if (c
->multibulk
== 0 && c
->argc
== 1 && ((char*)(c
->argv
[0]->ptr
))[0] == '*') {
1647 c
->multibulk
= atoi(((char*)c
->argv
[0]->ptr
)+1);
1648 if (c
->multibulk
<= 0) {
1652 decrRefCount(c
->argv
[c
->argc
-1]);
1656 } else if (c
->multibulk
) {
1657 if (c
->bulklen
== -1) {
1658 if (((char*)c
->argv
[0]->ptr
)[0] != '$') {
1659 addReplySds(c
,sdsnew("-ERR multi bulk protocol error\r\n"));
1663 int bulklen
= atoi(((char*)c
->argv
[0]->ptr
)+1);
1664 decrRefCount(c
->argv
[0]);
1665 if (bulklen
< 0 || bulklen
> 1024*1024*1024) {
1667 addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n"));
1672 c
->bulklen
= bulklen
+2; /* add two bytes for CR+LF */
1676 c
->mbargv
= zrealloc(c
->mbargv
,(sizeof(robj
*))*(c
->mbargc
+1));
1677 c
->mbargv
[c
->mbargc
] = c
->argv
[0];
1681 if (c
->multibulk
== 0) {
1685 /* Here we need to swap the multi-bulk argc/argv with the
1686 * normal argc/argv of the client structure. */
1688 c
->argv
= c
->mbargv
;
1689 c
->mbargv
= auxargv
;
1692 c
->argc
= c
->mbargc
;
1693 c
->mbargc
= auxargc
;
1695 /* We need to set bulklen to something different than -1
1696 * in order for the code below to process the command without
1697 * to try to read the last argument of a bulk command as
1698 * a special argument. */
1700 /* continue below and process the command */
1707 /* -- end of multi bulk commands processing -- */
1709 /* The QUIT command is handled as a special case. Normal command
1710 * procs are unable to close the client connection safely */
1711 if (!strcasecmp(c
->argv
[0]->ptr
,"quit")) {
1715 cmd
= lookupCommand(c
->argv
[0]->ptr
);
1717 addReplySds(c
,sdsnew("-ERR unknown command\r\n"));
1720 } else if ((cmd
->arity
> 0 && cmd
->arity
!= c
->argc
) ||
1721 (c
->argc
< -cmd
->arity
)) {
1723 sdscatprintf(sdsempty(),
1724 "-ERR wrong number of arguments for '%s' command\r\n",
1728 } else if (server
.maxmemory
&& cmd
->flags
& REDIS_CMD_DENYOOM
&& zmalloc_used_memory() > server
.maxmemory
) {
1729 addReplySds(c
,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1732 } else if (cmd
->flags
& REDIS_CMD_BULK
&& c
->bulklen
== -1) {
1733 int bulklen
= atoi(c
->argv
[c
->argc
-1]->ptr
);
1735 decrRefCount(c
->argv
[c
->argc
-1]);
1736 if (bulklen
< 0 || bulklen
> 1024*1024*1024) {
1738 addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n"));
1743 c
->bulklen
= bulklen
+2; /* add two bytes for CR+LF */
1744 /* It is possible that the bulk read is already in the
1745 * buffer. Check this condition and handle it accordingly.
1746 * This is just a fast path, alternative to call processInputBuffer().
1747 * It's a good idea since the code is small and this condition
1748 * happens most of the times. */
1749 if ((signed)sdslen(c
->querybuf
) >= c
->bulklen
) {
1750 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
1752 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
1757 /* Let's try to share objects on the command arguments vector */
1758 if (server
.shareobjects
) {
1760 for(j
= 1; j
< c
->argc
; j
++)
1761 c
->argv
[j
] = tryObjectSharing(c
->argv
[j
]);
1763 /* Let's try to encode the bulk object to save space. */
1764 if (cmd
->flags
& REDIS_CMD_BULK
)
1765 tryObjectEncoding(c
->argv
[c
->argc
-1]);
1767 /* Check if the user is authenticated */
1768 if (server
.requirepass
&& !c
->authenticated
&& cmd
->proc
!= authCommand
) {
1769 addReplySds(c
,sdsnew("-ERR operation not permitted\r\n"));
1774 /* Exec the command */
1775 dirty
= server
.dirty
;
1777 if (server
.appendonly
&& server
.dirty
-dirty
)
1778 feedAppendOnlyFile(cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1779 if (server
.dirty
-dirty
&& listLength(server
.slaves
))
1780 replicationFeedSlaves(server
.slaves
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1781 if (listLength(server
.monitors
))
1782 replicationFeedSlaves(server
.monitors
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1783 server
.stat_numcommands
++;
1785 /* Prepare the client for the next command */
1786 if (c
->flags
& REDIS_CLOSE
) {
1794 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
) {
1798 /* (args*2)+1 is enough room for args, spaces, newlines */
1799 robj
*static_outv
[REDIS_STATIC_ARGS
*2+1];
1801 if (argc
<= REDIS_STATIC_ARGS
) {
1804 outv
= zmalloc(sizeof(robj
*)*(argc
*2+1));
1807 for (j
= 0; j
< argc
; j
++) {
1808 if (j
!= 0) outv
[outc
++] = shared
.space
;
1809 if ((cmd
->flags
& REDIS_CMD_BULK
) && j
== argc
-1) {
1812 lenobj
= createObject(REDIS_STRING
,
1813 sdscatprintf(sdsempty(),"%lu\r\n",
1814 (unsigned long) stringObjectLen(argv
[j
])));
1815 lenobj
->refcount
= 0;
1816 outv
[outc
++] = lenobj
;
1818 outv
[outc
++] = argv
[j
];
1820 outv
[outc
++] = shared
.crlf
;
1822 /* Increment all the refcounts at start and decrement at end in order to
1823 * be sure to free objects if there is no slave in a replication state
1824 * able to be feed with commands */
1825 for (j
= 0; j
< outc
; j
++) incrRefCount(outv
[j
]);
1827 while((ln
= listYield(slaves
))) {
1828 redisClient
*slave
= ln
->value
;
1830 /* Don't feed slaves that are still waiting for BGSAVE to start */
1831 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) continue;
1833 /* Feed all the other slaves, MONITORs and so on */
1834 if (slave
->slaveseldb
!= dictid
) {
1838 case 0: selectcmd
= shared
.select0
; break;
1839 case 1: selectcmd
= shared
.select1
; break;
1840 case 2: selectcmd
= shared
.select2
; break;
1841 case 3: selectcmd
= shared
.select3
; break;
1842 case 4: selectcmd
= shared
.select4
; break;
1843 case 5: selectcmd
= shared
.select5
; break;
1844 case 6: selectcmd
= shared
.select6
; break;
1845 case 7: selectcmd
= shared
.select7
; break;
1846 case 8: selectcmd
= shared
.select8
; break;
1847 case 9: selectcmd
= shared
.select9
; break;
1849 selectcmd
= createObject(REDIS_STRING
,
1850 sdscatprintf(sdsempty(),"select %d\r\n",dictid
));
1851 selectcmd
->refcount
= 0;
1854 addReply(slave
,selectcmd
);
1855 slave
->slaveseldb
= dictid
;
1857 for (j
= 0; j
< outc
; j
++) addReply(slave
,outv
[j
]);
1859 for (j
= 0; j
< outc
; j
++) decrRefCount(outv
[j
]);
1860 if (outv
!= static_outv
) zfree(outv
);
1863 static void processInputBuffer(redisClient
*c
) {
1865 if (c
->bulklen
== -1) {
1866 /* Read the first line of the query */
1867 char *p
= strchr(c
->querybuf
,'\n');
1874 query
= c
->querybuf
;
1875 c
->querybuf
= sdsempty();
1876 querylen
= 1+(p
-(query
));
1877 if (sdslen(query
) > querylen
) {
1878 /* leave data after the first line of the query in the buffer */
1879 c
->querybuf
= sdscatlen(c
->querybuf
,query
+querylen
,sdslen(query
)-querylen
);
1881 *p
= '\0'; /* remove "\n" */
1882 if (*(p
-1) == '\r') *(p
-1) = '\0'; /* and "\r" if any */
1883 sdsupdatelen(query
);
1885 /* Now we can split the query in arguments */
1886 argv
= sdssplitlen(query
,sdslen(query
)," ",1,&argc
);
1889 if (c
->argv
) zfree(c
->argv
);
1890 c
->argv
= zmalloc(sizeof(robj
*)*argc
);
1892 for (j
= 0; j
< argc
; j
++) {
1893 if (sdslen(argv
[j
])) {
1894 c
->argv
[c
->argc
] = createObject(REDIS_STRING
,argv
[j
]);
1902 /* Execute the command. If the client is still valid
1903 * after processCommand() return and there is something
1904 * on the query buffer try to process the next command. */
1905 if (processCommand(c
) && sdslen(c
->querybuf
)) goto again
;
1907 /* Nothing to process, argc == 0. Just process the query
1908 * buffer if it's not empty or return to the caller */
1909 if (sdslen(c
->querybuf
)) goto again
;
1912 } else if (sdslen(c
->querybuf
) >= REDIS_REQUEST_MAX_SIZE
) {
1913 redisLog(REDIS_DEBUG
, "Client protocol error");
1918 /* Bulk read handling. Note that if we are at this point
1919 the client already sent a command terminated with a newline,
1920 we are reading the bulk data that is actually the last
1921 argument of the command. */
1922 int qbl
= sdslen(c
->querybuf
);
1924 if (c
->bulklen
<= qbl
) {
1925 /* Copy everything but the final CRLF as final argument */
1926 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
1928 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
1929 /* Process the command. If the client is still valid after
1930 * the processing and there is more data in the buffer
1931 * try to parse it. */
1932 if (processCommand(c
) && sdslen(c
->querybuf
)) goto again
;
1938 static void readQueryFromClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1939 redisClient
*c
= (redisClient
*) privdata
;
1940 char buf
[REDIS_IOBUF_LEN
];
1943 REDIS_NOTUSED(mask
);
1945 nread
= read(fd
, buf
, REDIS_IOBUF_LEN
);
1947 if (errno
== EAGAIN
) {
1950 redisLog(REDIS_DEBUG
, "Reading from client: %s",strerror(errno
));
1954 } else if (nread
== 0) {
1955 redisLog(REDIS_DEBUG
, "Client closed connection");
1960 c
->querybuf
= sdscatlen(c
->querybuf
, buf
, nread
);
1961 c
->lastinteraction
= time(NULL
);
1965 processInputBuffer(c
);
1968 static int selectDb(redisClient
*c
, int id
) {
1969 if (id
< 0 || id
>= server
.dbnum
)
1971 c
->db
= &server
.db
[id
];
1975 static void *dupClientReplyValue(void *o
) {
1976 incrRefCount((robj
*)o
);
1980 static redisClient
*createClient(int fd
) {
1981 redisClient
*c
= zmalloc(sizeof(*c
));
1983 anetNonBlock(NULL
,fd
);
1984 anetTcpNoDelay(NULL
,fd
);
1985 if (!c
) return NULL
;
1988 c
->querybuf
= sdsempty();
1997 c
->lastinteraction
= time(NULL
);
1998 c
->authenticated
= 0;
1999 c
->replstate
= REDIS_REPL_NONE
;
2000 c
->reply
= listCreate();
2001 listSetFreeMethod(c
->reply
,decrRefCount
);
2002 listSetDupMethod(c
->reply
,dupClientReplyValue
);
2003 if (aeCreateFileEvent(server
.el
, c
->fd
, AE_READABLE
,
2004 readQueryFromClient
, c
) == AE_ERR
) {
2008 listAddNodeTail(server
.clients
,c
);
2012 static void addReply(redisClient
*c
, robj
*obj
) {
2013 if (listLength(c
->reply
) == 0 &&
2014 (c
->replstate
== REDIS_REPL_NONE
||
2015 c
->replstate
== REDIS_REPL_ONLINE
) &&
2016 aeCreateFileEvent(server
.el
, c
->fd
, AE_WRITABLE
,
2017 sendReplyToClient
, c
) == AE_ERR
) return;
2018 listAddNodeTail(c
->reply
,getDecodedObject(obj
));
2021 static void addReplySds(redisClient
*c
, sds s
) {
2022 robj
*o
= createObject(REDIS_STRING
,s
);
2027 static void addReplyDouble(redisClient
*c
, double d
) {
2030 snprintf(buf
,sizeof(buf
),"%.17g",d
);
2031 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n%s\r\n",
2032 (unsigned long) strlen(buf
),buf
));
2035 static void addReplyBulkLen(redisClient
*c
, robj
*obj
) {
2038 if (obj
->encoding
== REDIS_ENCODING_RAW
) {
2039 len
= sdslen(obj
->ptr
);
2041 long n
= (long)obj
->ptr
;
2043 /* Compute how many bytes will take this integer as a radix 10 string */
2049 while((n
= n
/10) != 0) {
2053 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n",(unsigned long)len
));
2056 static void acceptHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
2061 REDIS_NOTUSED(mask
);
2062 REDIS_NOTUSED(privdata
);
2064 cfd
= anetAccept(server
.neterr
, fd
, cip
, &cport
);
2065 if (cfd
== AE_ERR
) {
2066 redisLog(REDIS_DEBUG
,"Accepting client connection: %s", server
.neterr
);
2069 redisLog(REDIS_DEBUG
,"Accepted %s:%d", cip
, cport
);
2070 if ((c
= createClient(cfd
)) == NULL
) {
2071 redisLog(REDIS_WARNING
,"Error allocating resoures for the client");
2072 close(cfd
); /* May be already closed, just ingore errors */
2075 /* If maxclient directive is set and this is one client more... close the
2076 * connection. Note that we create the client instead to check before
2077 * for this condition, since now the socket is already set in nonblocking
2078 * mode and we can send an error for free using the Kernel I/O */
2079 if (server
.maxclients
&& listLength(server
.clients
) > server
.maxclients
) {
2080 char *err
= "-ERR max number of clients reached\r\n";
2082 /* That's a best effort error message, don't check write errors */
2083 if (write(c
->fd
,err
,strlen(err
)) == -1) {
2084 /* Nothing to do, Just to avoid the warning... */
2089 server
.stat_numconnections
++;
2092 /* ======================= Redis objects implementation ===================== */
2094 static robj
*createObject(int type
, void *ptr
) {
2097 if (listLength(server
.objfreelist
)) {
2098 listNode
*head
= listFirst(server
.objfreelist
);
2099 o
= listNodeValue(head
);
2100 listDelNode(server
.objfreelist
,head
);
2102 o
= zmalloc(sizeof(*o
));
2105 o
->encoding
= REDIS_ENCODING_RAW
;
2111 static robj
*createStringObject(char *ptr
, size_t len
) {
2112 return createObject(REDIS_STRING
,sdsnewlen(ptr
,len
));
2115 static robj
*createListObject(void) {
2116 list
*l
= listCreate();
2118 listSetFreeMethod(l
,decrRefCount
);
2119 return createObject(REDIS_LIST
,l
);
2122 static robj
*createSetObject(void) {
2123 dict
*d
= dictCreate(&setDictType
,NULL
);
2124 return createObject(REDIS_SET
,d
);
2127 static robj
*createZsetObject(void) {
2128 zset
*zs
= zmalloc(sizeof(*zs
));
2130 zs
->dict
= dictCreate(&zsetDictType
,NULL
);
2131 zs
->zsl
= zslCreate();
2132 return createObject(REDIS_ZSET
,zs
);
2135 static void freeStringObject(robj
*o
) {
2136 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2141 static void freeListObject(robj
*o
) {
2142 listRelease((list
*) o
->ptr
);
2145 static void freeSetObject(robj
*o
) {
2146 dictRelease((dict
*) o
->ptr
);
2149 static void freeZsetObject(robj
*o
) {
2152 dictRelease(zs
->dict
);
2157 static void freeHashObject(robj
*o
) {
2158 dictRelease((dict
*) o
->ptr
);
2161 static void incrRefCount(robj
*o
) {
2163 #ifdef DEBUG_REFCOUNT
2164 if (o
->type
== REDIS_STRING
)
2165 printf("Increment '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
);
2169 static void decrRefCount(void *obj
) {
2172 #ifdef DEBUG_REFCOUNT
2173 if (o
->type
== REDIS_STRING
)
2174 printf("Decrement '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
-1);
2176 if (--(o
->refcount
) == 0) {
2178 case REDIS_STRING
: freeStringObject(o
); break;
2179 case REDIS_LIST
: freeListObject(o
); break;
2180 case REDIS_SET
: freeSetObject(o
); break;
2181 case REDIS_ZSET
: freeZsetObject(o
); break;
2182 case REDIS_HASH
: freeHashObject(o
); break;
2183 default: redisAssert(0 != 0); break;
2185 if (listLength(server
.objfreelist
) > REDIS_OBJFREELIST_MAX
||
2186 !listAddNodeHead(server
.objfreelist
,o
))
2191 static robj
*lookupKey(redisDb
*db
, robj
*key
) {
2192 dictEntry
*de
= dictFind(db
->dict
,key
);
2193 return de
? dictGetEntryVal(de
) : NULL
;
2196 static robj
*lookupKeyRead(redisDb
*db
, robj
*key
) {
2197 expireIfNeeded(db
,key
);
2198 return lookupKey(db
,key
);
2201 static robj
*lookupKeyWrite(redisDb
*db
, robj
*key
) {
2202 deleteIfVolatile(db
,key
);
2203 return lookupKey(db
,key
);
2206 static int deleteKey(redisDb
*db
, robj
*key
) {
2209 /* We need to protect key from destruction: after the first dictDelete()
2210 * it may happen that 'key' is no longer valid if we don't increment
2211 * it's count. This may happen when we get the object reference directly
2212 * from the hash table with dictRandomKey() or dict iterators */
2214 if (dictSize(db
->expires
)) dictDelete(db
->expires
,key
);
2215 retval
= dictDelete(db
->dict
,key
);
2218 return retval
== DICT_OK
;
2221 /* Try to share an object against the shared objects pool */
2222 static robj
*tryObjectSharing(robj
*o
) {
2223 struct dictEntry
*de
;
2226 if (o
== NULL
|| server
.shareobjects
== 0) return o
;
2228 redisAssert(o
->type
== REDIS_STRING
);
2229 de
= dictFind(server
.sharingpool
,o
);
2231 robj
*shared
= dictGetEntryKey(de
);
2233 c
= ((unsigned long) dictGetEntryVal(de
))+1;
2234 dictGetEntryVal(de
) = (void*) c
;
2235 incrRefCount(shared
);
2239 /* Here we are using a stream algorihtm: Every time an object is
2240 * shared we increment its count, everytime there is a miss we
2241 * recrement the counter of a random object. If this object reaches
2242 * zero we remove the object and put the current object instead. */
2243 if (dictSize(server
.sharingpool
) >=
2244 server
.sharingpoolsize
) {
2245 de
= dictGetRandomKey(server
.sharingpool
);
2246 redisAssert(de
!= NULL
);
2247 c
= ((unsigned long) dictGetEntryVal(de
))-1;
2248 dictGetEntryVal(de
) = (void*) c
;
2250 dictDelete(server
.sharingpool
,de
->key
);
2253 c
= 0; /* If the pool is empty we want to add this object */
2258 retval
= dictAdd(server
.sharingpool
,o
,(void*)1);
2259 redisAssert(retval
== DICT_OK
);
2266 /* Check if the nul-terminated string 's' can be represented by a long
2267 * (that is, is a number that fits into long without any other space or
2268 * character before or after the digits).
2270 * If so, the function returns REDIS_OK and *longval is set to the value
2271 * of the number. Otherwise REDIS_ERR is returned */
2272 static int isStringRepresentableAsLong(sds s
, long *longval
) {
2273 char buf
[32], *endptr
;
2277 value
= strtol(s
, &endptr
, 10);
2278 if (endptr
[0] != '\0') return REDIS_ERR
;
2279 slen
= snprintf(buf
,32,"%ld",value
);
2281 /* If the number converted back into a string is not identical
2282 * then it's not possible to encode the string as integer */
2283 if (sdslen(s
) != (unsigned)slen
|| memcmp(buf
,s
,slen
)) return REDIS_ERR
;
2284 if (longval
) *longval
= value
;
2288 /* Try to encode a string object in order to save space */
2289 static int tryObjectEncoding(robj
*o
) {
2293 if (o
->encoding
!= REDIS_ENCODING_RAW
)
2294 return REDIS_ERR
; /* Already encoded */
2296 /* It's not save to encode shared objects: shared objects can be shared
2297 * everywhere in the "object space" of Redis. Encoded objects can only
2298 * appear as "values" (and not, for instance, as keys) */
2299 if (o
->refcount
> 1) return REDIS_ERR
;
2301 /* Currently we try to encode only strings */
2302 redisAssert(o
->type
== REDIS_STRING
);
2304 /* Check if we can represent this string as a long integer */
2305 if (isStringRepresentableAsLong(s
,&value
) == REDIS_ERR
) return REDIS_ERR
;
2307 /* Ok, this object can be encoded */
2308 o
->encoding
= REDIS_ENCODING_INT
;
2310 o
->ptr
= (void*) value
;
2314 /* Get a decoded version of an encoded object (returned as a new object).
2315 * If the object is already raw-encoded just increment the ref count. */
2316 static robj
*getDecodedObject(robj
*o
) {
2319 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2323 if (o
->type
== REDIS_STRING
&& o
->encoding
== REDIS_ENCODING_INT
) {
2326 snprintf(buf
,32,"%ld",(long)o
->ptr
);
2327 dec
= createStringObject(buf
,strlen(buf
));
2330 redisAssert(1 != 1);
2334 /* Compare two string objects via strcmp() or alike.
2335 * Note that the objects may be integer-encoded. In such a case we
2336 * use snprintf() to get a string representation of the numbers on the stack
2337 * and compare the strings, it's much faster than calling getDecodedObject().
2339 * Important note: if objects are not integer encoded, but binary-safe strings,
2340 * sdscmp() from sds.c will apply memcmp() so this function ca be considered
2342 static int compareStringObjects(robj
*a
, robj
*b
) {
2343 redisAssert(a
->type
== REDIS_STRING
&& b
->type
== REDIS_STRING
);
2344 char bufa
[128], bufb
[128], *astr
, *bstr
;
2347 if (a
== b
) return 0;
2348 if (a
->encoding
!= REDIS_ENCODING_RAW
) {
2349 snprintf(bufa
,sizeof(bufa
),"%ld",(long) a
->ptr
);
2355 if (b
->encoding
!= REDIS_ENCODING_RAW
) {
2356 snprintf(bufb
,sizeof(bufb
),"%ld",(long) b
->ptr
);
2362 return bothsds
? sdscmp(astr
,bstr
) : strcmp(astr
,bstr
);
2365 static size_t stringObjectLen(robj
*o
) {
2366 redisAssert(o
->type
== REDIS_STRING
);
2367 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2368 return sdslen(o
->ptr
);
2372 return snprintf(buf
,32,"%ld",(long)o
->ptr
);
2376 /*============================ DB saving/loading ============================ */
2378 static int rdbSaveType(FILE *fp
, unsigned char type
) {
2379 if (fwrite(&type
,1,1,fp
) == 0) return -1;
2383 static int rdbSaveTime(FILE *fp
, time_t t
) {
2384 int32_t t32
= (int32_t) t
;
2385 if (fwrite(&t32
,4,1,fp
) == 0) return -1;
2389 /* check rdbLoadLen() comments for more info */
2390 static int rdbSaveLen(FILE *fp
, uint32_t len
) {
2391 unsigned char buf
[2];
2394 /* Save a 6 bit len */
2395 buf
[0] = (len
&0xFF)|(REDIS_RDB_6BITLEN
<<6);
2396 if (fwrite(buf
,1,1,fp
) == 0) return -1;
2397 } else if (len
< (1<<14)) {
2398 /* Save a 14 bit len */
2399 buf
[0] = ((len
>>8)&0xFF)|(REDIS_RDB_14BITLEN
<<6);
2401 if (fwrite(buf
,2,1,fp
) == 0) return -1;
2403 /* Save a 32 bit len */
2404 buf
[0] = (REDIS_RDB_32BITLEN
<<6);
2405 if (fwrite(buf
,1,1,fp
) == 0) return -1;
2407 if (fwrite(&len
,4,1,fp
) == 0) return -1;
2412 /* String objects in the form "2391" "-100" without any space and with a
2413 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2414 * encoded as integers to save space */
2415 static int rdbTryIntegerEncoding(sds s
, unsigned char *enc
) {
2417 char *endptr
, buf
[32];
2419 /* Check if it's possible to encode this value as a number */
2420 value
= strtoll(s
, &endptr
, 10);
2421 if (endptr
[0] != '\0') return 0;
2422 snprintf(buf
,32,"%lld",value
);
2424 /* If the number converted back into a string is not identical
2425 * then it's not possible to encode the string as integer */
2426 if (strlen(buf
) != sdslen(s
) || memcmp(buf
,s
,sdslen(s
))) return 0;
2428 /* Finally check if it fits in our ranges */
2429 if (value
>= -(1<<7) && value
<= (1<<7)-1) {
2430 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT8
;
2431 enc
[1] = value
&0xFF;
2433 } else if (value
>= -(1<<15) && value
<= (1<<15)-1) {
2434 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT16
;
2435 enc
[1] = value
&0xFF;
2436 enc
[2] = (value
>>8)&0xFF;
2438 } else if (value
>= -((long long)1<<31) && value
<= ((long long)1<<31)-1) {
2439 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT32
;
2440 enc
[1] = value
&0xFF;
2441 enc
[2] = (value
>>8)&0xFF;
2442 enc
[3] = (value
>>16)&0xFF;
2443 enc
[4] = (value
>>24)&0xFF;
2450 static int rdbSaveLzfStringObject(FILE *fp
, robj
*obj
) {
2451 unsigned int comprlen
, outlen
;
2455 /* We require at least four bytes compression for this to be worth it */
2456 outlen
= sdslen(obj
->ptr
)-4;
2457 if (outlen
<= 0) return 0;
2458 if ((out
= zmalloc(outlen
+1)) == NULL
) return 0;
2459 comprlen
= lzf_compress(obj
->ptr
, sdslen(obj
->ptr
), out
, outlen
);
2460 if (comprlen
== 0) {
2464 /* Data compressed! Let's save it on disk */
2465 byte
= (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_LZF
;
2466 if (fwrite(&byte
,1,1,fp
) == 0) goto writeerr
;
2467 if (rdbSaveLen(fp
,comprlen
) == -1) goto writeerr
;
2468 if (rdbSaveLen(fp
,sdslen(obj
->ptr
)) == -1) goto writeerr
;
2469 if (fwrite(out
,comprlen
,1,fp
) == 0) goto writeerr
;
2478 /* Save a string objet as [len][data] on disk. If the object is a string
2479 * representation of an integer value we try to safe it in a special form */
2480 static int rdbSaveStringObjectRaw(FILE *fp
, robj
*obj
) {
2484 len
= sdslen(obj
->ptr
);
2486 /* Try integer encoding */
2488 unsigned char buf
[5];
2489 if ((enclen
= rdbTryIntegerEncoding(obj
->ptr
,buf
)) > 0) {
2490 if (fwrite(buf
,enclen
,1,fp
) == 0) return -1;
2495 /* Try LZF compression - under 20 bytes it's unable to compress even
2496 * aaaaaaaaaaaaaaaaaa so skip it */
2497 if (server
.rdbcompression
&& len
> 20) {
2500 retval
= rdbSaveLzfStringObject(fp
,obj
);
2501 if (retval
== -1) return -1;
2502 if (retval
> 0) return 0;
2503 /* retval == 0 means data can't be compressed, save the old way */
2506 /* Store verbatim */
2507 if (rdbSaveLen(fp
,len
) == -1) return -1;
2508 if (len
&& fwrite(obj
->ptr
,len
,1,fp
) == 0) return -1;
2512 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2513 static int rdbSaveStringObject(FILE *fp
, robj
*obj
) {
2516 obj
= getDecodedObject(obj
);
2517 retval
= rdbSaveStringObjectRaw(fp
,obj
);
2522 /* Save a double value. Doubles are saved as strings prefixed by an unsigned
2523 * 8 bit integer specifing the length of the representation.
2524 * This 8 bit integer has special values in order to specify the following
2530 static int rdbSaveDoubleValue(FILE *fp
, double val
) {
2531 unsigned char buf
[128];
2537 } else if (!isfinite(val
)) {
2539 buf
[0] = (val
< 0) ? 255 : 254;
2541 snprintf((char*)buf
+1,sizeof(buf
)-1,"%.17g",val
);
2542 buf
[0] = strlen((char*)buf
+1);
2545 if (fwrite(buf
,len
,1,fp
) == 0) return -1;
2549 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2550 static int rdbSave(char *filename
) {
2551 dictIterator
*di
= NULL
;
2556 time_t now
= time(NULL
);
2558 snprintf(tmpfile
,256,"temp-%d.rdb", (int) getpid());
2559 fp
= fopen(tmpfile
,"w");
2561 redisLog(REDIS_WARNING
, "Failed saving the DB: %s", strerror(errno
));
2564 if (fwrite("REDIS0001",9,1,fp
) == 0) goto werr
;
2565 for (j
= 0; j
< server
.dbnum
; j
++) {
2566 redisDb
*db
= server
.db
+j
;
2568 if (dictSize(d
) == 0) continue;
2569 di
= dictGetIterator(d
);
2575 /* Write the SELECT DB opcode */
2576 if (rdbSaveType(fp
,REDIS_SELECTDB
) == -1) goto werr
;
2577 if (rdbSaveLen(fp
,j
) == -1) goto werr
;
2579 /* Iterate this DB writing every entry */
2580 while((de
= dictNext(di
)) != NULL
) {
2581 robj
*key
= dictGetEntryKey(de
);
2582 robj
*o
= dictGetEntryVal(de
);
2583 time_t expiretime
= getExpire(db
,key
);
2585 /* Save the expire time */
2586 if (expiretime
!= -1) {
2587 /* If this key is already expired skip it */
2588 if (expiretime
< now
) continue;
2589 if (rdbSaveType(fp
,REDIS_EXPIRETIME
) == -1) goto werr
;
2590 if (rdbSaveTime(fp
,expiretime
) == -1) goto werr
;
2592 /* Save the key and associated value */
2593 if (rdbSaveType(fp
,o
->type
) == -1) goto werr
;
2594 if (rdbSaveStringObject(fp
,key
) == -1) goto werr
;
2595 if (o
->type
== REDIS_STRING
) {
2596 /* Save a string value */
2597 if (rdbSaveStringObject(fp
,o
) == -1) goto werr
;
2598 } else if (o
->type
== REDIS_LIST
) {
2599 /* Save a list value */
2600 list
*list
= o
->ptr
;
2604 if (rdbSaveLen(fp
,listLength(list
)) == -1) goto werr
;
2605 while((ln
= listYield(list
))) {
2606 robj
*eleobj
= listNodeValue(ln
);
2608 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
2610 } else if (o
->type
== REDIS_SET
) {
2611 /* Save a set value */
2613 dictIterator
*di
= dictGetIterator(set
);
2616 if (rdbSaveLen(fp
,dictSize(set
)) == -1) goto werr
;
2617 while((de
= dictNext(di
)) != NULL
) {
2618 robj
*eleobj
= dictGetEntryKey(de
);
2620 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
2622 dictReleaseIterator(di
);
2623 } else if (o
->type
== REDIS_ZSET
) {
2624 /* Save a set value */
2626 dictIterator
*di
= dictGetIterator(zs
->dict
);
2629 if (rdbSaveLen(fp
,dictSize(zs
->dict
)) == -1) goto werr
;
2630 while((de
= dictNext(di
)) != NULL
) {
2631 robj
*eleobj
= dictGetEntryKey(de
);
2632 double *score
= dictGetEntryVal(de
);
2634 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
2635 if (rdbSaveDoubleValue(fp
,*score
) == -1) goto werr
;
2637 dictReleaseIterator(di
);
2639 redisAssert(0 != 0);
2642 dictReleaseIterator(di
);
2645 if (rdbSaveType(fp
,REDIS_EOF
) == -1) goto werr
;
2647 /* Make sure data will not remain on the OS's output buffers */
2652 /* Use RENAME to make sure the DB file is changed atomically only
2653 * if the generate DB file is ok. */
2654 if (rename(tmpfile
,filename
) == -1) {
2655 redisLog(REDIS_WARNING
,"Error moving temp DB file on the final destination: %s", strerror(errno
));
2659 redisLog(REDIS_NOTICE
,"DB saved on disk");
2661 server
.lastsave
= time(NULL
);
2667 redisLog(REDIS_WARNING
,"Write error saving DB on disk: %s", strerror(errno
));
2668 if (di
) dictReleaseIterator(di
);
2672 static int rdbSaveBackground(char *filename
) {
2675 if (server
.bgsavechildpid
!= -1) return REDIS_ERR
;
2676 if ((childpid
= fork()) == 0) {
2679 if (rdbSave(filename
) == REDIS_OK
) {
2686 if (childpid
== -1) {
2687 redisLog(REDIS_WARNING
,"Can't save in background: fork: %s",
2691 redisLog(REDIS_NOTICE
,"Background saving started by pid %d",childpid
);
2692 server
.bgsavechildpid
= childpid
;
2695 return REDIS_OK
; /* unreached */
2698 static void rdbRemoveTempFile(pid_t childpid
) {
2701 snprintf(tmpfile
,256,"temp-%d.rdb", (int) childpid
);
2705 static int rdbLoadType(FILE *fp
) {
2707 if (fread(&type
,1,1,fp
) == 0) return -1;
2711 static time_t rdbLoadTime(FILE *fp
) {
2713 if (fread(&t32
,4,1,fp
) == 0) return -1;
2714 return (time_t) t32
;
2717 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2718 * of this file for a description of how this are stored on disk.
2720 * isencoded is set to 1 if the readed length is not actually a length but
2721 * an "encoding type", check the above comments for more info */
2722 static uint32_t rdbLoadLen(FILE *fp
, int rdbver
, int *isencoded
) {
2723 unsigned char buf
[2];
2726 if (isencoded
) *isencoded
= 0;
2728 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
2733 if (fread(buf
,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2734 type
= (buf
[0]&0xC0)>>6;
2735 if (type
== REDIS_RDB_6BITLEN
) {
2736 /* Read a 6 bit len */
2738 } else if (type
== REDIS_RDB_ENCVAL
) {
2739 /* Read a 6 bit len encoding type */
2740 if (isencoded
) *isencoded
= 1;
2742 } else if (type
== REDIS_RDB_14BITLEN
) {
2743 /* Read a 14 bit len */
2744 if (fread(buf
+1,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2745 return ((buf
[0]&0x3F)<<8)|buf
[1];
2747 /* Read a 32 bit len */
2748 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
2754 static robj
*rdbLoadIntegerObject(FILE *fp
, int enctype
) {
2755 unsigned char enc
[4];
2758 if (enctype
== REDIS_RDB_ENC_INT8
) {
2759 if (fread(enc
,1,1,fp
) == 0) return NULL
;
2760 val
= (signed char)enc
[0];
2761 } else if (enctype
== REDIS_RDB_ENC_INT16
) {
2763 if (fread(enc
,2,1,fp
) == 0) return NULL
;
2764 v
= enc
[0]|(enc
[1]<<8);
2766 } else if (enctype
== REDIS_RDB_ENC_INT32
) {
2768 if (fread(enc
,4,1,fp
) == 0) return NULL
;
2769 v
= enc
[0]|(enc
[1]<<8)|(enc
[2]<<16)|(enc
[3]<<24);
2772 val
= 0; /* anti-warning */
2775 return createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",val
));
2778 static robj
*rdbLoadLzfStringObject(FILE*fp
, int rdbver
) {
2779 unsigned int len
, clen
;
2780 unsigned char *c
= NULL
;
2783 if ((clen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2784 if ((len
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2785 if ((c
= zmalloc(clen
)) == NULL
) goto err
;
2786 if ((val
= sdsnewlen(NULL
,len
)) == NULL
) goto err
;
2787 if (fread(c
,clen
,1,fp
) == 0) goto err
;
2788 if (lzf_decompress(c
,clen
,val
,len
) == 0) goto err
;
2790 return createObject(REDIS_STRING
,val
);
2797 static robj
*rdbLoadStringObject(FILE*fp
, int rdbver
) {
2802 len
= rdbLoadLen(fp
,rdbver
,&isencoded
);
2805 case REDIS_RDB_ENC_INT8
:
2806 case REDIS_RDB_ENC_INT16
:
2807 case REDIS_RDB_ENC_INT32
:
2808 return tryObjectSharing(rdbLoadIntegerObject(fp
,len
));
2809 case REDIS_RDB_ENC_LZF
:
2810 return tryObjectSharing(rdbLoadLzfStringObject(fp
,rdbver
));
2816 if (len
== REDIS_RDB_LENERR
) return NULL
;
2817 val
= sdsnewlen(NULL
,len
);
2818 if (len
&& fread(val
,len
,1,fp
) == 0) {
2822 return tryObjectSharing(createObject(REDIS_STRING
,val
));
2825 /* For information about double serialization check rdbSaveDoubleValue() */
2826 static int rdbLoadDoubleValue(FILE *fp
, double *val
) {
2830 if (fread(&len
,1,1,fp
) == 0) return -1;
2832 case 255: *val
= R_NegInf
; return 0;
2833 case 254: *val
= R_PosInf
; return 0;
2834 case 253: *val
= R_Nan
; return 0;
2836 if (fread(buf
,len
,1,fp
) == 0) return -1;
2838 sscanf(buf
, "%lg", val
);
2843 static int rdbLoad(char *filename
) {
2845 robj
*keyobj
= NULL
;
2847 int type
, retval
, rdbver
;
2848 dict
*d
= server
.db
[0].dict
;
2849 redisDb
*db
= server
.db
+0;
2851 time_t expiretime
= -1, now
= time(NULL
);
2853 fp
= fopen(filename
,"r");
2854 if (!fp
) return REDIS_ERR
;
2855 if (fread(buf
,9,1,fp
) == 0) goto eoferr
;
2857 if (memcmp(buf
,"REDIS",5) != 0) {
2859 redisLog(REDIS_WARNING
,"Wrong signature trying to load DB from file");
2862 rdbver
= atoi(buf
+5);
2865 redisLog(REDIS_WARNING
,"Can't handle RDB format version %d",rdbver
);
2872 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
2873 if (type
== REDIS_EXPIRETIME
) {
2874 if ((expiretime
= rdbLoadTime(fp
)) == -1) goto eoferr
;
2875 /* We read the time so we need to read the object type again */
2876 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
2878 if (type
== REDIS_EOF
) break;
2879 /* Handle SELECT DB opcode as a special case */
2880 if (type
== REDIS_SELECTDB
) {
2881 if ((dbid
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2883 if (dbid
>= (unsigned)server
.dbnum
) {
2884 redisLog(REDIS_WARNING
,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server
.dbnum
);
2887 db
= server
.db
+dbid
;
2892 if ((keyobj
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2894 if (type
== REDIS_STRING
) {
2895 /* Read string value */
2896 if ((o
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2897 tryObjectEncoding(o
);
2898 } else if (type
== REDIS_LIST
|| type
== REDIS_SET
) {
2899 /* Read list/set value */
2902 if ((listlen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2904 o
= (type
== REDIS_LIST
) ? createListObject() : createSetObject();
2905 /* Load every single element of the list/set */
2909 if ((ele
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2910 tryObjectEncoding(ele
);
2911 if (type
== REDIS_LIST
) {
2912 listAddNodeTail((list
*)o
->ptr
,ele
);
2914 dictAdd((dict
*)o
->ptr
,ele
,NULL
);
2917 } else if (type
== REDIS_ZSET
) {
2918 /* Read list/set value */
2922 if ((zsetlen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2924 o
= createZsetObject();
2926 /* Load every single element of the list/set */
2929 double *score
= zmalloc(sizeof(double));
2931 if ((ele
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2932 tryObjectEncoding(ele
);
2933 if (rdbLoadDoubleValue(fp
,score
) == -1) goto eoferr
;
2934 dictAdd(zs
->dict
,ele
,score
);
2935 zslInsert(zs
->zsl
,*score
,ele
);
2936 incrRefCount(ele
); /* added to skiplist */
2939 redisAssert(0 != 0);
2941 /* Add the new object in the hash table */
2942 retval
= dictAdd(d
,keyobj
,o
);
2943 if (retval
== DICT_ERR
) {
2944 redisLog(REDIS_WARNING
,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj
->ptr
);
2947 /* Set the expire time if needed */
2948 if (expiretime
!= -1) {
2949 setExpire(db
,keyobj
,expiretime
);
2950 /* Delete this key if already expired */
2951 if (expiretime
< now
) deleteKey(db
,keyobj
);
2959 eoferr
: /* unexpected end of file is handled here with a fatal exit */
2960 if (keyobj
) decrRefCount(keyobj
);
2961 redisLog(REDIS_WARNING
,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
2963 return REDIS_ERR
; /* Just to avoid warning */
2966 /*================================== Commands =============================== */
2968 static void authCommand(redisClient
*c
) {
2969 if (!server
.requirepass
|| !strcmp(c
->argv
[1]->ptr
, server
.requirepass
)) {
2970 c
->authenticated
= 1;
2971 addReply(c
,shared
.ok
);
2973 c
->authenticated
= 0;
2974 addReplySds(c
,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
2978 static void pingCommand(redisClient
*c
) {
2979 addReply(c
,shared
.pong
);
2982 static void echoCommand(redisClient
*c
) {
2983 addReplyBulkLen(c
,c
->argv
[1]);
2984 addReply(c
,c
->argv
[1]);
2985 addReply(c
,shared
.crlf
);
2988 /*=================================== Strings =============================== */
2990 static void setGenericCommand(redisClient
*c
, int nx
) {
2993 if (nx
) deleteIfVolatile(c
->db
,c
->argv
[1]);
2994 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2995 if (retval
== DICT_ERR
) {
2997 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2998 incrRefCount(c
->argv
[2]);
3000 addReply(c
,shared
.czero
);
3004 incrRefCount(c
->argv
[1]);
3005 incrRefCount(c
->argv
[2]);
3008 removeExpire(c
->db
,c
->argv
[1]);
3009 addReply(c
, nx
? shared
.cone
: shared
.ok
);
3012 static void setCommand(redisClient
*c
) {
3013 setGenericCommand(c
,0);
3016 static void setnxCommand(redisClient
*c
) {
3017 setGenericCommand(c
,1);
3020 static void getCommand(redisClient
*c
) {
3021 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3024 addReply(c
,shared
.nullbulk
);
3026 if (o
->type
!= REDIS_STRING
) {
3027 addReply(c
,shared
.wrongtypeerr
);
3029 addReplyBulkLen(c
,o
);
3031 addReply(c
,shared
.crlf
);
3036 static void getsetCommand(redisClient
*c
) {
3038 if (dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]) == DICT_ERR
) {
3039 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
3041 incrRefCount(c
->argv
[1]);
3043 incrRefCount(c
->argv
[2]);
3045 removeExpire(c
->db
,c
->argv
[1]);
3048 static void mgetCommand(redisClient
*c
) {
3051 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",c
->argc
-1));
3052 for (j
= 1; j
< c
->argc
; j
++) {
3053 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[j
]);
3055 addReply(c
,shared
.nullbulk
);
3057 if (o
->type
!= REDIS_STRING
) {
3058 addReply(c
,shared
.nullbulk
);
3060 addReplyBulkLen(c
,o
);
3062 addReply(c
,shared
.crlf
);
3068 static void msetGenericCommand(redisClient
*c
, int nx
) {
3069 int j
, busykeys
= 0;
3071 if ((c
->argc
% 2) == 0) {
3072 addReplySds(c
,sdsnew("-ERR wrong number of arguments for MSET\r\n"));
3075 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
3076 * set nothing at all if at least one already key exists. */
3078 for (j
= 1; j
< c
->argc
; j
+= 2) {
3079 if (lookupKeyWrite(c
->db
,c
->argv
[j
]) != NULL
) {
3085 addReply(c
, shared
.czero
);
3089 for (j
= 1; j
< c
->argc
; j
+= 2) {
3092 tryObjectEncoding(c
->argv
[j
+1]);
3093 retval
= dictAdd(c
->db
->dict
,c
->argv
[j
],c
->argv
[j
+1]);
3094 if (retval
== DICT_ERR
) {
3095 dictReplace(c
->db
->dict
,c
->argv
[j
],c
->argv
[j
+1]);
3096 incrRefCount(c
->argv
[j
+1]);
3098 incrRefCount(c
->argv
[j
]);
3099 incrRefCount(c
->argv
[j
+1]);
3101 removeExpire(c
->db
,c
->argv
[j
]);
3103 server
.dirty
+= (c
->argc
-1)/2;
3104 addReply(c
, nx
? shared
.cone
: shared
.ok
);
3107 static void msetCommand(redisClient
*c
) {
3108 msetGenericCommand(c
,0);
3111 static void msetnxCommand(redisClient
*c
) {
3112 msetGenericCommand(c
,1);
3115 static void incrDecrCommand(redisClient
*c
, long long incr
) {
3120 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3124 if (o
->type
!= REDIS_STRING
) {
3129 if (o
->encoding
== REDIS_ENCODING_RAW
)
3130 value
= strtoll(o
->ptr
, &eptr
, 10);
3131 else if (o
->encoding
== REDIS_ENCODING_INT
)
3132 value
= (long)o
->ptr
;
3134 redisAssert(1 != 1);
3139 o
= createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",value
));
3140 tryObjectEncoding(o
);
3141 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],o
);
3142 if (retval
== DICT_ERR
) {
3143 dictReplace(c
->db
->dict
,c
->argv
[1],o
);
3144 removeExpire(c
->db
,c
->argv
[1]);
3146 incrRefCount(c
->argv
[1]);
3149 addReply(c
,shared
.colon
);
3151 addReply(c
,shared
.crlf
);
3154 static void incrCommand(redisClient
*c
) {
3155 incrDecrCommand(c
,1);
3158 static void decrCommand(redisClient
*c
) {
3159 incrDecrCommand(c
,-1);
3162 static void incrbyCommand(redisClient
*c
) {
3163 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
3164 incrDecrCommand(c
,incr
);
3167 static void decrbyCommand(redisClient
*c
) {
3168 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
3169 incrDecrCommand(c
,-incr
);
3172 /* ========================= Type agnostic commands ========================= */
3174 static void delCommand(redisClient
*c
) {
3177 for (j
= 1; j
< c
->argc
; j
++) {
3178 if (deleteKey(c
->db
,c
->argv
[j
])) {
3185 addReply(c
,shared
.czero
);
3188 addReply(c
,shared
.cone
);
3191 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",deleted
));
3196 static void existsCommand(redisClient
*c
) {
3197 addReply(c
,lookupKeyRead(c
->db
,c
->argv
[1]) ? shared
.cone
: shared
.czero
);
3200 static void selectCommand(redisClient
*c
) {
3201 int id
= atoi(c
->argv
[1]->ptr
);
3203 if (selectDb(c
,id
) == REDIS_ERR
) {
3204 addReplySds(c
,sdsnew("-ERR invalid DB index\r\n"));
3206 addReply(c
,shared
.ok
);
3210 static void randomkeyCommand(redisClient
*c
) {
3214 de
= dictGetRandomKey(c
->db
->dict
);
3215 if (!de
|| expireIfNeeded(c
->db
,dictGetEntryKey(de
)) == 0) break;
3218 addReply(c
,shared
.plus
);
3219 addReply(c
,shared
.crlf
);
3221 addReply(c
,shared
.plus
);
3222 addReply(c
,dictGetEntryKey(de
));
3223 addReply(c
,shared
.crlf
);
3227 static void keysCommand(redisClient
*c
) {
3230 sds pattern
= c
->argv
[1]->ptr
;
3231 int plen
= sdslen(pattern
);
3232 unsigned long numkeys
= 0, keyslen
= 0;
3233 robj
*lenobj
= createObject(REDIS_STRING
,NULL
);
3235 di
= dictGetIterator(c
->db
->dict
);
3237 decrRefCount(lenobj
);
3238 while((de
= dictNext(di
)) != NULL
) {
3239 robj
*keyobj
= dictGetEntryKey(de
);
3241 sds key
= keyobj
->ptr
;
3242 if ((pattern
[0] == '*' && pattern
[1] == '\0') ||
3243 stringmatchlen(pattern
,plen
,key
,sdslen(key
),0)) {
3244 if (expireIfNeeded(c
->db
,keyobj
) == 0) {
3246 addReply(c
,shared
.space
);
3249 keyslen
+= sdslen(key
);
3253 dictReleaseIterator(di
);
3254 lenobj
->ptr
= sdscatprintf(sdsempty(),"$%lu\r\n",keyslen
+(numkeys
? (numkeys
-1) : 0));
3255 addReply(c
,shared
.crlf
);
3258 static void dbsizeCommand(redisClient
*c
) {
3260 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c
->db
->dict
)));
3263 static void lastsaveCommand(redisClient
*c
) {
3265 sdscatprintf(sdsempty(),":%lu\r\n",server
.lastsave
));
3268 static void typeCommand(redisClient
*c
) {
3272 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3277 case REDIS_STRING
: type
= "+string"; break;
3278 case REDIS_LIST
: type
= "+list"; break;
3279 case REDIS_SET
: type
= "+set"; break;
3280 case REDIS_ZSET
: type
= "+zset"; break;
3281 default: type
= "unknown"; break;
3284 addReplySds(c
,sdsnew(type
));
3285 addReply(c
,shared
.crlf
);
3288 static void saveCommand(redisClient
*c
) {
3289 if (server
.bgsavechildpid
!= -1) {
3290 addReplySds(c
,sdsnew("-ERR background save in progress\r\n"));
3293 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
3294 addReply(c
,shared
.ok
);
3296 addReply(c
,shared
.err
);
3300 static void bgsaveCommand(redisClient
*c
) {
3301 if (server
.bgsavechildpid
!= -1) {
3302 addReplySds(c
,sdsnew("-ERR background save already in progress\r\n"));
3305 if (rdbSaveBackground(server
.dbfilename
) == REDIS_OK
) {
3306 char *status
= "+Background saving started\r\n";
3307 addReplySds(c
,sdsnew(status
));
3309 addReply(c
,shared
.err
);
3313 static void shutdownCommand(redisClient
*c
) {
3314 redisLog(REDIS_WARNING
,"User requested shutdown, saving DB...");
3315 /* Kill the saving child if there is a background saving in progress.
3316 We want to avoid race conditions, for instance our saving child may
3317 overwrite the synchronous saving did by SHUTDOWN. */
3318 if (server
.bgsavechildpid
!= -1) {
3319 redisLog(REDIS_WARNING
,"There is a live saving child. Killing it!");
3320 kill(server
.bgsavechildpid
,SIGKILL
);
3321 rdbRemoveTempFile(server
.bgsavechildpid
);
3323 if (server
.appendonly
) {
3324 /* Append only file: fsync() the AOF and exit */
3325 fsync(server
.appendfd
);
3328 /* Snapshotting. Perform a SYNC SAVE and exit */
3329 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
3330 if (server
.daemonize
)
3331 unlink(server
.pidfile
);
3332 redisLog(REDIS_WARNING
,"%zu bytes used at exit",zmalloc_used_memory());
3333 redisLog(REDIS_WARNING
,"Server exit now, bye bye...");
3336 /* Ooops.. error saving! The best we can do is to continue operating.
3337 * Note that if there was a background saving process, in the next
3338 * cron() Redis will be notified that the background saving aborted,
3339 * handling special stuff like slaves pending for synchronization... */
3340 redisLog(REDIS_WARNING
,"Error trying to save the DB, can't exit");
3341 addReplySds(c
,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3346 static void renameGenericCommand(redisClient
*c
, int nx
) {
3349 /* To use the same key as src and dst is probably an error */
3350 if (sdscmp(c
->argv
[1]->ptr
,c
->argv
[2]->ptr
) == 0) {
3351 addReply(c
,shared
.sameobjecterr
);
3355 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3357 addReply(c
,shared
.nokeyerr
);
3361 deleteIfVolatile(c
->db
,c
->argv
[2]);
3362 if (dictAdd(c
->db
->dict
,c
->argv
[2],o
) == DICT_ERR
) {
3365 addReply(c
,shared
.czero
);
3368 dictReplace(c
->db
->dict
,c
->argv
[2],o
);
3370 incrRefCount(c
->argv
[2]);
3372 deleteKey(c
->db
,c
->argv
[1]);
3374 addReply(c
,nx
? shared
.cone
: shared
.ok
);
3377 static void renameCommand(redisClient
*c
) {
3378 renameGenericCommand(c
,0);
3381 static void renamenxCommand(redisClient
*c
) {
3382 renameGenericCommand(c
,1);
3385 static void moveCommand(redisClient
*c
) {
3390 /* Obtain source and target DB pointers */
3393 if (selectDb(c
,atoi(c
->argv
[2]->ptr
)) == REDIS_ERR
) {
3394 addReply(c
,shared
.outofrangeerr
);
3398 selectDb(c
,srcid
); /* Back to the source DB */
3400 /* If the user is moving using as target the same
3401 * DB as the source DB it is probably an error. */
3403 addReply(c
,shared
.sameobjecterr
);
3407 /* Check if the element exists and get a reference */
3408 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3410 addReply(c
,shared
.czero
);
3414 /* Try to add the element to the target DB */
3415 deleteIfVolatile(dst
,c
->argv
[1]);
3416 if (dictAdd(dst
->dict
,c
->argv
[1],o
) == DICT_ERR
) {
3417 addReply(c
,shared
.czero
);
3420 incrRefCount(c
->argv
[1]);
3423 /* OK! key moved, free the entry in the source DB */
3424 deleteKey(src
,c
->argv
[1]);
3426 addReply(c
,shared
.cone
);
3429 /* =================================== Lists ================================ */
3430 static void pushGenericCommand(redisClient
*c
, int where
) {
3434 lobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3436 lobj
= createListObject();
3438 if (where
== REDIS_HEAD
) {
3439 listAddNodeHead(list
,c
->argv
[2]);
3441 listAddNodeTail(list
,c
->argv
[2]);
3443 dictAdd(c
->db
->dict
,c
->argv
[1],lobj
);
3444 incrRefCount(c
->argv
[1]);
3445 incrRefCount(c
->argv
[2]);
3447 if (lobj
->type
!= REDIS_LIST
) {
3448 addReply(c
,shared
.wrongtypeerr
);
3452 if (where
== REDIS_HEAD
) {
3453 listAddNodeHead(list
,c
->argv
[2]);
3455 listAddNodeTail(list
,c
->argv
[2]);
3457 incrRefCount(c
->argv
[2]);
3460 addReply(c
,shared
.ok
);
3463 static void lpushCommand(redisClient
*c
) {
3464 pushGenericCommand(c
,REDIS_HEAD
);
3467 static void rpushCommand(redisClient
*c
) {
3468 pushGenericCommand(c
,REDIS_TAIL
);
3471 static void llenCommand(redisClient
*c
) {
3475 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3477 addReply(c
,shared
.czero
);
3480 if (o
->type
!= REDIS_LIST
) {
3481 addReply(c
,shared
.wrongtypeerr
);
3484 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",listLength(l
)));
3489 static void lindexCommand(redisClient
*c
) {
3491 int index
= atoi(c
->argv
[2]->ptr
);
3493 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3495 addReply(c
,shared
.nullbulk
);
3497 if (o
->type
!= REDIS_LIST
) {
3498 addReply(c
,shared
.wrongtypeerr
);
3500 list
*list
= o
->ptr
;
3503 ln
= listIndex(list
, index
);
3505 addReply(c
,shared
.nullbulk
);
3507 robj
*ele
= listNodeValue(ln
);
3508 addReplyBulkLen(c
,ele
);
3510 addReply(c
,shared
.crlf
);
3516 static void lsetCommand(redisClient
*c
) {
3518 int index
= atoi(c
->argv
[2]->ptr
);
3520 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3522 addReply(c
,shared
.nokeyerr
);
3524 if (o
->type
!= REDIS_LIST
) {
3525 addReply(c
,shared
.wrongtypeerr
);
3527 list
*list
= o
->ptr
;
3530 ln
= listIndex(list
, index
);
3532 addReply(c
,shared
.outofrangeerr
);
3534 robj
*ele
= listNodeValue(ln
);
3537 listNodeValue(ln
) = c
->argv
[3];
3538 incrRefCount(c
->argv
[3]);
3539 addReply(c
,shared
.ok
);
3546 static void popGenericCommand(redisClient
*c
, int where
) {
3549 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3551 addReply(c
,shared
.nullbulk
);
3553 if (o
->type
!= REDIS_LIST
) {
3554 addReply(c
,shared
.wrongtypeerr
);
3556 list
*list
= o
->ptr
;
3559 if (where
== REDIS_HEAD
)
3560 ln
= listFirst(list
);
3562 ln
= listLast(list
);
3565 addReply(c
,shared
.nullbulk
);
3567 robj
*ele
= listNodeValue(ln
);
3568 addReplyBulkLen(c
,ele
);
3570 addReply(c
,shared
.crlf
);
3571 listDelNode(list
,ln
);
3578 static void lpopCommand(redisClient
*c
) {
3579 popGenericCommand(c
,REDIS_HEAD
);
3582 static void rpopCommand(redisClient
*c
) {
3583 popGenericCommand(c
,REDIS_TAIL
);
3586 static void lrangeCommand(redisClient
*c
) {
3588 int start
= atoi(c
->argv
[2]->ptr
);
3589 int end
= atoi(c
->argv
[3]->ptr
);
3591 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3593 addReply(c
,shared
.nullmultibulk
);
3595 if (o
->type
!= REDIS_LIST
) {
3596 addReply(c
,shared
.wrongtypeerr
);
3598 list
*list
= o
->ptr
;
3600 int llen
= listLength(list
);
3604 /* convert negative indexes */
3605 if (start
< 0) start
= llen
+start
;
3606 if (end
< 0) end
= llen
+end
;
3607 if (start
< 0) start
= 0;
3608 if (end
< 0) end
= 0;
3610 /* indexes sanity checks */
3611 if (start
> end
|| start
>= llen
) {
3612 /* Out of range start or start > end result in empty list */
3613 addReply(c
,shared
.emptymultibulk
);
3616 if (end
>= llen
) end
= llen
-1;
3617 rangelen
= (end
-start
)+1;
3619 /* Return the result in form of a multi-bulk reply */
3620 ln
= listIndex(list
, start
);
3621 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",rangelen
));
3622 for (j
= 0; j
< rangelen
; j
++) {
3623 ele
= listNodeValue(ln
);
3624 addReplyBulkLen(c
,ele
);
3626 addReply(c
,shared
.crlf
);
3633 static void ltrimCommand(redisClient
*c
) {
3635 int start
= atoi(c
->argv
[2]->ptr
);
3636 int end
= atoi(c
->argv
[3]->ptr
);
3638 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3640 addReply(c
,shared
.ok
);
3642 if (o
->type
!= REDIS_LIST
) {
3643 addReply(c
,shared
.wrongtypeerr
);
3645 list
*list
= o
->ptr
;
3647 int llen
= listLength(list
);
3648 int j
, ltrim
, rtrim
;
3650 /* convert negative indexes */
3651 if (start
< 0) start
= llen
+start
;
3652 if (end
< 0) end
= llen
+end
;
3653 if (start
< 0) start
= 0;
3654 if (end
< 0) end
= 0;
3656 /* indexes sanity checks */
3657 if (start
> end
|| start
>= llen
) {
3658 /* Out of range start or start > end result in empty list */
3662 if (end
>= llen
) end
= llen
-1;
3667 /* Remove list elements to perform the trim */
3668 for (j
= 0; j
< ltrim
; j
++) {
3669 ln
= listFirst(list
);
3670 listDelNode(list
,ln
);
3672 for (j
= 0; j
< rtrim
; j
++) {
3673 ln
= listLast(list
);
3674 listDelNode(list
,ln
);
3677 addReply(c
,shared
.ok
);
3682 static void lremCommand(redisClient
*c
) {
3685 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3687 addReply(c
,shared
.czero
);
3689 if (o
->type
!= REDIS_LIST
) {
3690 addReply(c
,shared
.wrongtypeerr
);
3692 list
*list
= o
->ptr
;
3693 listNode
*ln
, *next
;
3694 int toremove
= atoi(c
->argv
[2]->ptr
);
3699 toremove
= -toremove
;
3702 ln
= fromtail
? list
->tail
: list
->head
;
3704 robj
*ele
= listNodeValue(ln
);
3706 next
= fromtail
? ln
->prev
: ln
->next
;
3707 if (compareStringObjects(ele
,c
->argv
[3]) == 0) {
3708 listDelNode(list
,ln
);
3711 if (toremove
&& removed
== toremove
) break;
3715 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",removed
));
3720 /* This is the semantic of this command:
3721 * RPOPLPUSH srclist dstlist:
3722 * IF LLEN(srclist) > 0
3723 * element = RPOP srclist
3724 * LPUSH dstlist element
3731 * The idea is to be able to get an element from a list in a reliable way
3732 * since the element is not just returned but pushed against another list
3733 * as well. This command was originally proposed by Ezra Zygmuntowicz.
3735 static void rpoplpushcommand(redisClient
*c
) {
3738 sobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3740 addReply(c
,shared
.nullbulk
);
3742 if (sobj
->type
!= REDIS_LIST
) {
3743 addReply(c
,shared
.wrongtypeerr
);
3745 list
*srclist
= sobj
->ptr
;
3746 listNode
*ln
= listLast(srclist
);
3749 addReply(c
,shared
.nullbulk
);
3751 robj
*dobj
= lookupKeyWrite(c
->db
,c
->argv
[2]);
3752 robj
*ele
= listNodeValue(ln
);
3757 /* Create the list if the key does not exist */
3758 dobj
= createListObject();
3759 dictAdd(c
->db
->dict
,c
->argv
[2],dobj
);
3760 incrRefCount(c
->argv
[2]);
3761 } else if (dobj
->type
!= REDIS_LIST
) {
3762 addReply(c
,shared
.wrongtypeerr
);
3765 /* Add the element to the target list */
3766 dstlist
= dobj
->ptr
;
3767 listAddNodeHead(dstlist
,ele
);
3770 /* Send the element to the client as reply as well */
3771 addReplyBulkLen(c
,ele
);
3773 addReply(c
,shared
.crlf
);
3775 /* Finally remove the element from the source list */
3776 listDelNode(srclist
,ln
);
3784 /* ==================================== Sets ================================ */
3786 static void saddCommand(redisClient
*c
) {
3789 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3791 set
= createSetObject();
3792 dictAdd(c
->db
->dict
,c
->argv
[1],set
);
3793 incrRefCount(c
->argv
[1]);
3795 if (set
->type
!= REDIS_SET
) {
3796 addReply(c
,shared
.wrongtypeerr
);
3800 if (dictAdd(set
->ptr
,c
->argv
[2],NULL
) == DICT_OK
) {
3801 incrRefCount(c
->argv
[2]);
3803 addReply(c
,shared
.cone
);
3805 addReply(c
,shared
.czero
);
3809 static void sremCommand(redisClient
*c
) {
3812 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3814 addReply(c
,shared
.czero
);
3816 if (set
->type
!= REDIS_SET
) {
3817 addReply(c
,shared
.wrongtypeerr
);
3820 if (dictDelete(set
->ptr
,c
->argv
[2]) == DICT_OK
) {
3822 if (htNeedsResize(set
->ptr
)) dictResize(set
->ptr
);
3823 addReply(c
,shared
.cone
);
3825 addReply(c
,shared
.czero
);
3830 static void smoveCommand(redisClient
*c
) {
3831 robj
*srcset
, *dstset
;
3833 srcset
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3834 dstset
= lookupKeyWrite(c
->db
,c
->argv
[2]);
3836 /* If the source key does not exist return 0, if it's of the wrong type
3838 if (srcset
== NULL
|| srcset
->type
!= REDIS_SET
) {
3839 addReply(c
, srcset
? shared
.wrongtypeerr
: shared
.czero
);
3842 /* Error if the destination key is not a set as well */
3843 if (dstset
&& dstset
->type
!= REDIS_SET
) {
3844 addReply(c
,shared
.wrongtypeerr
);
3847 /* Remove the element from the source set */
3848 if (dictDelete(srcset
->ptr
,c
->argv
[3]) == DICT_ERR
) {
3849 /* Key not found in the src set! return zero */
3850 addReply(c
,shared
.czero
);
3854 /* Add the element to the destination set */
3856 dstset
= createSetObject();
3857 dictAdd(c
->db
->dict
,c
->argv
[2],dstset
);
3858 incrRefCount(c
->argv
[2]);
3860 if (dictAdd(dstset
->ptr
,c
->argv
[3],NULL
) == DICT_OK
)
3861 incrRefCount(c
->argv
[3]);
3862 addReply(c
,shared
.cone
);
3865 static void sismemberCommand(redisClient
*c
) {
3868 set
= lookupKeyRead(c
->db
,c
->argv
[1]);
3870 addReply(c
,shared
.czero
);
3872 if (set
->type
!= REDIS_SET
) {
3873 addReply(c
,shared
.wrongtypeerr
);
3876 if (dictFind(set
->ptr
,c
->argv
[2]))
3877 addReply(c
,shared
.cone
);
3879 addReply(c
,shared
.czero
);
3883 static void scardCommand(redisClient
*c
) {
3887 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3889 addReply(c
,shared
.czero
);
3892 if (o
->type
!= REDIS_SET
) {
3893 addReply(c
,shared
.wrongtypeerr
);
3896 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",
3902 static void spopCommand(redisClient
*c
) {
3906 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3908 addReply(c
,shared
.nullbulk
);
3910 if (set
->type
!= REDIS_SET
) {
3911 addReply(c
,shared
.wrongtypeerr
);
3914 de
= dictGetRandomKey(set
->ptr
);
3916 addReply(c
,shared
.nullbulk
);
3918 robj
*ele
= dictGetEntryKey(de
);
3920 addReplyBulkLen(c
,ele
);
3922 addReply(c
,shared
.crlf
);
3923 dictDelete(set
->ptr
,ele
);
3924 if (htNeedsResize(set
->ptr
)) dictResize(set
->ptr
);
3930 static void srandmemberCommand(redisClient
*c
) {
3934 set
= lookupKeyRead(c
->db
,c
->argv
[1]);
3936 addReply(c
,shared
.nullbulk
);
3938 if (set
->type
!= REDIS_SET
) {
3939 addReply(c
,shared
.wrongtypeerr
);
3942 de
= dictGetRandomKey(set
->ptr
);
3944 addReply(c
,shared
.nullbulk
);
3946 robj
*ele
= dictGetEntryKey(de
);
3948 addReplyBulkLen(c
,ele
);
3950 addReply(c
,shared
.crlf
);
3955 static int qsortCompareSetsByCardinality(const void *s1
, const void *s2
) {
3956 dict
**d1
= (void*) s1
, **d2
= (void*) s2
;
3958 return dictSize(*d1
)-dictSize(*d2
);
3961 static void sinterGenericCommand(redisClient
*c
, robj
**setskeys
, unsigned long setsnum
, robj
*dstkey
) {
3962 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
3965 robj
*lenobj
= NULL
, *dstset
= NULL
;
3966 unsigned long j
, cardinality
= 0;
3968 for (j
= 0; j
< setsnum
; j
++) {
3972 lookupKeyWrite(c
->db
,setskeys
[j
]) :
3973 lookupKeyRead(c
->db
,setskeys
[j
]);
3977 if (deleteKey(c
->db
,dstkey
))
3979 addReply(c
,shared
.czero
);
3981 addReply(c
,shared
.nullmultibulk
);
3985 if (setobj
->type
!= REDIS_SET
) {
3987 addReply(c
,shared
.wrongtypeerr
);
3990 dv
[j
] = setobj
->ptr
;
3992 /* Sort sets from the smallest to largest, this will improve our
3993 * algorithm's performace */
3994 qsort(dv
,setsnum
,sizeof(dict
*),qsortCompareSetsByCardinality
);
3996 /* The first thing we should output is the total number of elements...
3997 * since this is a multi-bulk write, but at this stage we don't know
3998 * the intersection set size, so we use a trick, append an empty object
3999 * to the output list and save the pointer to later modify it with the
4002 lenobj
= createObject(REDIS_STRING
,NULL
);
4004 decrRefCount(lenobj
);
4006 /* If we have a target key where to store the resulting set
4007 * create this key with an empty set inside */
4008 dstset
= createSetObject();
4011 /* Iterate all the elements of the first (smallest) set, and test
4012 * the element against all the other sets, if at least one set does
4013 * not include the element it is discarded */
4014 di
= dictGetIterator(dv
[0]);
4016 while((de
= dictNext(di
)) != NULL
) {
4019 for (j
= 1; j
< setsnum
; j
++)
4020 if (dictFind(dv
[j
],dictGetEntryKey(de
)) == NULL
) break;
4022 continue; /* at least one set does not contain the member */
4023 ele
= dictGetEntryKey(de
);
4025 addReplyBulkLen(c
,ele
);
4027 addReply(c
,shared
.crlf
);
4030 dictAdd(dstset
->ptr
,ele
,NULL
);
4034 dictReleaseIterator(di
);
4037 /* Store the resulting set into the target */
4038 deleteKey(c
->db
,dstkey
);
4039 dictAdd(c
->db
->dict
,dstkey
,dstset
);
4040 incrRefCount(dstkey
);
4044 lenobj
->ptr
= sdscatprintf(sdsempty(),"*%lu\r\n",cardinality
);
4046 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",
4047 dictSize((dict
*)dstset
->ptr
)));
4053 static void sinterCommand(redisClient
*c
) {
4054 sinterGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
);
4057 static void sinterstoreCommand(redisClient
*c
) {
4058 sinterGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1]);
4061 #define REDIS_OP_UNION 0
4062 #define REDIS_OP_DIFF 1
4064 static void sunionDiffGenericCommand(redisClient
*c
, robj
**setskeys
, int setsnum
, robj
*dstkey
, int op
) {
4065 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
4068 robj
*dstset
= NULL
;
4069 int j
, cardinality
= 0;
4071 for (j
= 0; j
< setsnum
; j
++) {
4075 lookupKeyWrite(c
->db
,setskeys
[j
]) :
4076 lookupKeyRead(c
->db
,setskeys
[j
]);
4081 if (setobj
->type
!= REDIS_SET
) {
4083 addReply(c
,shared
.wrongtypeerr
);
4086 dv
[j
] = setobj
->ptr
;
4089 /* We need a temp set object to store our union. If the dstkey
4090 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
4091 * this set object will be the resulting object to set into the target key*/
4092 dstset
= createSetObject();
4094 /* Iterate all the elements of all the sets, add every element a single
4095 * time to the result set */
4096 for (j
= 0; j
< setsnum
; j
++) {
4097 if (op
== REDIS_OP_DIFF
&& j
== 0 && !dv
[j
]) break; /* result set is empty */
4098 if (!dv
[j
]) continue; /* non existing keys are like empty sets */
4100 di
= dictGetIterator(dv
[j
]);
4102 while((de
= dictNext(di
)) != NULL
) {
4105 /* dictAdd will not add the same element multiple times */
4106 ele
= dictGetEntryKey(de
);
4107 if (op
== REDIS_OP_UNION
|| j
== 0) {
4108 if (dictAdd(dstset
->ptr
,ele
,NULL
) == DICT_OK
) {
4112 } else if (op
== REDIS_OP_DIFF
) {
4113 if (dictDelete(dstset
->ptr
,ele
) == DICT_OK
) {
4118 dictReleaseIterator(di
);
4120 if (op
== REDIS_OP_DIFF
&& cardinality
== 0) break; /* result set is empty */
4123 /* Output the content of the resulting set, if not in STORE mode */
4125 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",cardinality
));
4126 di
= dictGetIterator(dstset
->ptr
);
4127 while((de
= dictNext(di
)) != NULL
) {
4130 ele
= dictGetEntryKey(de
);
4131 addReplyBulkLen(c
,ele
);
4133 addReply(c
,shared
.crlf
);
4135 dictReleaseIterator(di
);
4137 /* If we have a target key where to store the resulting set
4138 * create this key with the result set inside */
4139 deleteKey(c
->db
,dstkey
);
4140 dictAdd(c
->db
->dict
,dstkey
,dstset
);
4141 incrRefCount(dstkey
);
4146 decrRefCount(dstset
);
4148 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",
4149 dictSize((dict
*)dstset
->ptr
)));
4155 static void sunionCommand(redisClient
*c
) {
4156 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_UNION
);
4159 static void sunionstoreCommand(redisClient
*c
) {
4160 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_UNION
);
4163 static void sdiffCommand(redisClient
*c
) {
4164 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_DIFF
);
4167 static void sdiffstoreCommand(redisClient
*c
) {
4168 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_DIFF
);
4171 /* ==================================== ZSets =============================== */
4173 /* ZSETs are ordered sets using two data structures to hold the same elements
4174 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
4177 * The elements are added to an hash table mapping Redis objects to scores.
4178 * At the same time the elements are added to a skip list mapping scores
4179 * to Redis objects (so objects are sorted by scores in this "view"). */
4181 /* This skiplist implementation is almost a C translation of the original
4182 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
4183 * Alternative to Balanced Trees", modified in three ways:
4184 * a) this implementation allows for repeated values.
4185 * b) the comparison is not just by key (our 'score') but by satellite data.
4186 * c) there is a back pointer, so it's a doubly linked list with the back
4187 * pointers being only at "level 1". This allows to traverse the list
4188 * from tail to head, useful for ZREVRANGE. */
4190 static zskiplistNode
*zslCreateNode(int level
, double score
, robj
*obj
) {
4191 zskiplistNode
*zn
= zmalloc(sizeof(*zn
));
4193 zn
->forward
= zmalloc(sizeof(zskiplistNode
*) * level
);
4199 static zskiplist
*zslCreate(void) {
4203 zsl
= zmalloc(sizeof(*zsl
));
4206 zsl
->header
= zslCreateNode(ZSKIPLIST_MAXLEVEL
,0,NULL
);
4207 for (j
= 0; j
< ZSKIPLIST_MAXLEVEL
; j
++)
4208 zsl
->header
->forward
[j
] = NULL
;
4209 zsl
->header
->backward
= NULL
;
4214 static void zslFreeNode(zskiplistNode
*node
) {
4215 decrRefCount(node
->obj
);
4216 zfree(node
->forward
);
4220 static void zslFree(zskiplist
*zsl
) {
4221 zskiplistNode
*node
= zsl
->header
->forward
[0], *next
;
4223 zfree(zsl
->header
->forward
);
4226 next
= node
->forward
[0];
4233 static int zslRandomLevel(void) {
4235 while ((random()&0xFFFF) < (ZSKIPLIST_P
* 0xFFFF))
4240 static void zslInsert(zskiplist
*zsl
, double score
, robj
*obj
) {
4241 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
4245 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4246 while (x
->forward
[i
] &&
4247 (x
->forward
[i
]->score
< score
||
4248 (x
->forward
[i
]->score
== score
&&
4249 compareStringObjects(x
->forward
[i
]->obj
,obj
) < 0)))
4253 /* we assume the key is not already inside, since we allow duplicated
4254 * scores, and the re-insertion of score and redis object should never
4255 * happpen since the caller of zslInsert() should test in the hash table
4256 * if the element is already inside or not. */
4257 level
= zslRandomLevel();
4258 if (level
> zsl
->level
) {
4259 for (i
= zsl
->level
; i
< level
; i
++)
4260 update
[i
] = zsl
->header
;
4263 x
= zslCreateNode(level
,score
,obj
);
4264 for (i
= 0; i
< level
; i
++) {
4265 x
->forward
[i
] = update
[i
]->forward
[i
];
4266 update
[i
]->forward
[i
] = x
;
4268 x
->backward
= (update
[0] == zsl
->header
) ? NULL
: update
[0];
4270 x
->forward
[0]->backward
= x
;
4276 /* Delete an element with matching score/object from the skiplist. */
4277 static int zslDelete(zskiplist
*zsl
, double score
, robj
*obj
) {
4278 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
4282 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4283 while (x
->forward
[i
] &&
4284 (x
->forward
[i
]->score
< score
||
4285 (x
->forward
[i
]->score
== score
&&
4286 compareStringObjects(x
->forward
[i
]->obj
,obj
) < 0)))
4290 /* We may have multiple elements with the same score, what we need
4291 * is to find the element with both the right score and object. */
4293 if (x
&& score
== x
->score
&& compareStringObjects(x
->obj
,obj
) == 0) {
4294 for (i
= 0; i
< zsl
->level
; i
++) {
4295 if (update
[i
]->forward
[i
] != x
) break;
4296 update
[i
]->forward
[i
] = x
->forward
[i
];
4298 if (x
->forward
[0]) {
4299 x
->forward
[0]->backward
= (x
->backward
== zsl
->header
) ?
4302 zsl
->tail
= x
->backward
;
4305 while(zsl
->level
> 1 && zsl
->header
->forward
[zsl
->level
-1] == NULL
)
4310 return 0; /* not found */
4312 return 0; /* not found */
4315 /* Delete all the elements with score between min and max from the skiplist.
4316 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
4317 * Note that this function takes the reference to the hash table view of the
4318 * sorted set, in order to remove the elements from the hash table too. */
4319 static unsigned long zslDeleteRange(zskiplist
*zsl
, double min
, double max
, dict
*dict
) {
4320 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
4321 unsigned long removed
= 0;
4325 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4326 while (x
->forward
[i
] && x
->forward
[i
]->score
< min
)
4330 /* We may have multiple elements with the same score, what we need
4331 * is to find the element with both the right score and object. */
4333 while (x
&& x
->score
<= max
) {
4334 zskiplistNode
*next
;
4336 for (i
= 0; i
< zsl
->level
; i
++) {
4337 if (update
[i
]->forward
[i
] != x
) break;
4338 update
[i
]->forward
[i
] = x
->forward
[i
];
4340 if (x
->forward
[0]) {
4341 x
->forward
[0]->backward
= (x
->backward
== zsl
->header
) ?
4344 zsl
->tail
= x
->backward
;
4346 next
= x
->forward
[0];
4347 dictDelete(dict
,x
->obj
);
4349 while(zsl
->level
> 1 && zsl
->header
->forward
[zsl
->level
-1] == NULL
)
4355 return removed
; /* not found */
4358 /* Find the first node having a score equal or greater than the specified one.
4359 * Returns NULL if there is no match. */
4360 static zskiplistNode
*zslFirstWithScore(zskiplist
*zsl
, double score
) {
4365 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4366 while (x
->forward
[i
] && x
->forward
[i
]->score
< score
)
4369 /* We may have multiple elements with the same score, what we need
4370 * is to find the element with both the right score and object. */
4371 return x
->forward
[0];
4374 /* The actual Z-commands implementations */
4376 /* This generic command implements both ZADD and ZINCRBY.
4377 * scoreval is the score if the operation is a ZADD (doincrement == 0) or
4378 * the increment if the operation is a ZINCRBY (doincrement == 1). */
4379 static void zaddGenericCommand(redisClient
*c
, robj
*key
, robj
*ele
, double scoreval
, int doincrement
) {
4384 zsetobj
= lookupKeyWrite(c
->db
,key
);
4385 if (zsetobj
== NULL
) {
4386 zsetobj
= createZsetObject();
4387 dictAdd(c
->db
->dict
,key
,zsetobj
);
4390 if (zsetobj
->type
!= REDIS_ZSET
) {
4391 addReply(c
,shared
.wrongtypeerr
);
4397 /* Ok now since we implement both ZADD and ZINCRBY here the code
4398 * needs to handle the two different conditions. It's all about setting
4399 * '*score', that is, the new score to set, to the right value. */
4400 score
= zmalloc(sizeof(double));
4404 /* Read the old score. If the element was not present starts from 0 */
4405 de
= dictFind(zs
->dict
,ele
);
4407 double *oldscore
= dictGetEntryVal(de
);
4408 *score
= *oldscore
+ scoreval
;
4416 /* What follows is a simple remove and re-insert operation that is common
4417 * to both ZADD and ZINCRBY... */
4418 if (dictAdd(zs
->dict
,ele
,score
) == DICT_OK
) {
4419 /* case 1: New element */
4420 incrRefCount(ele
); /* added to hash */
4421 zslInsert(zs
->zsl
,*score
,ele
);
4422 incrRefCount(ele
); /* added to skiplist */
4425 addReplyDouble(c
,*score
);
4427 addReply(c
,shared
.cone
);
4432 /* case 2: Score update operation */
4433 de
= dictFind(zs
->dict
,ele
);
4434 redisAssert(de
!= NULL
);
4435 oldscore
= dictGetEntryVal(de
);
4436 if (*score
!= *oldscore
) {
4439 /* Remove and insert the element in the skip list with new score */
4440 deleted
= zslDelete(zs
->zsl
,*oldscore
,ele
);
4441 redisAssert(deleted
!= 0);
4442 zslInsert(zs
->zsl
,*score
,ele
);
4444 /* Update the score in the hash table */
4445 dictReplace(zs
->dict
,ele
,score
);
4451 addReplyDouble(c
,*score
);
4453 addReply(c
,shared
.czero
);
4457 static void zaddCommand(redisClient
*c
) {
4460 scoreval
= strtod(c
->argv
[2]->ptr
,NULL
);
4461 zaddGenericCommand(c
,c
->argv
[1],c
->argv
[3],scoreval
,0);
4464 static void zincrbyCommand(redisClient
*c
) {
4467 scoreval
= strtod(c
->argv
[2]->ptr
,NULL
);
4468 zaddGenericCommand(c
,c
->argv
[1],c
->argv
[3],scoreval
,1);
4471 static void zremCommand(redisClient
*c
) {
4475 zsetobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4476 if (zsetobj
== NULL
) {
4477 addReply(c
,shared
.czero
);
4483 if (zsetobj
->type
!= REDIS_ZSET
) {
4484 addReply(c
,shared
.wrongtypeerr
);
4488 de
= dictFind(zs
->dict
,c
->argv
[2]);
4490 addReply(c
,shared
.czero
);
4493 /* Delete from the skiplist */
4494 oldscore
= dictGetEntryVal(de
);
4495 deleted
= zslDelete(zs
->zsl
,*oldscore
,c
->argv
[2]);
4496 redisAssert(deleted
!= 0);
4498 /* Delete from the hash table */
4499 dictDelete(zs
->dict
,c
->argv
[2]);
4500 if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
);
4502 addReply(c
,shared
.cone
);
4506 static void zremrangebyscoreCommand(redisClient
*c
) {
4507 double min
= strtod(c
->argv
[2]->ptr
,NULL
);
4508 double max
= strtod(c
->argv
[3]->ptr
,NULL
);
4512 zsetobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4513 if (zsetobj
== NULL
) {
4514 addReply(c
,shared
.czero
);
4518 if (zsetobj
->type
!= REDIS_ZSET
) {
4519 addReply(c
,shared
.wrongtypeerr
);
4523 deleted
= zslDeleteRange(zs
->zsl
,min
,max
,zs
->dict
);
4524 if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
);
4525 server
.dirty
+= deleted
;
4526 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",deleted
));
4530 static void zrangeGenericCommand(redisClient
*c
, int reverse
) {
4532 int start
= atoi(c
->argv
[2]->ptr
);
4533 int end
= atoi(c
->argv
[3]->ptr
);
4536 if (c
->argc
== 5 && !strcasecmp(c
->argv
[4]->ptr
,"withscores")) {
4538 } else if (c
->argc
>= 5) {
4539 addReply(c
,shared
.syntaxerr
);
4543 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4545 addReply(c
,shared
.nullmultibulk
);
4547 if (o
->type
!= REDIS_ZSET
) {
4548 addReply(c
,shared
.wrongtypeerr
);
4550 zset
*zsetobj
= o
->ptr
;
4551 zskiplist
*zsl
= zsetobj
->zsl
;
4554 int llen
= zsl
->length
;
4558 /* convert negative indexes */
4559 if (start
< 0) start
= llen
+start
;
4560 if (end
< 0) end
= llen
+end
;
4561 if (start
< 0) start
= 0;
4562 if (end
< 0) end
= 0;
4564 /* indexes sanity checks */
4565 if (start
> end
|| start
>= llen
) {
4566 /* Out of range start or start > end result in empty list */
4567 addReply(c
,shared
.emptymultibulk
);
4570 if (end
>= llen
) end
= llen
-1;
4571 rangelen
= (end
-start
)+1;
4573 /* Return the result in form of a multi-bulk reply */
4579 ln
= zsl
->header
->forward
[0];
4581 ln
= ln
->forward
[0];
4584 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",
4585 withscores
? (rangelen
*2) : rangelen
));
4586 for (j
= 0; j
< rangelen
; j
++) {
4588 addReplyBulkLen(c
,ele
);
4590 addReply(c
,shared
.crlf
);
4592 addReplyDouble(c
,ln
->score
);
4593 ln
= reverse
? ln
->backward
: ln
->forward
[0];
4599 static void zrangeCommand(redisClient
*c
) {
4600 zrangeGenericCommand(c
,0);
4603 static void zrevrangeCommand(redisClient
*c
) {
4604 zrangeGenericCommand(c
,1);
4607 static void zrangebyscoreCommand(redisClient
*c
) {
4609 double min
= strtod(c
->argv
[2]->ptr
,NULL
);
4610 double max
= strtod(c
->argv
[3]->ptr
,NULL
);
4611 int offset
= 0, limit
= -1;
4613 if (c
->argc
!= 4 && c
->argc
!= 7) {
4615 sdsnew("-ERR wrong number of arguments for ZRANGEBYSCORE\r\n"));
4617 } else if (c
->argc
== 7 && strcasecmp(c
->argv
[4]->ptr
,"limit")) {
4618 addReply(c
,shared
.syntaxerr
);
4620 } else if (c
->argc
== 7) {
4621 offset
= atoi(c
->argv
[5]->ptr
);
4622 limit
= atoi(c
->argv
[6]->ptr
);
4623 if (offset
< 0) offset
= 0;
4626 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4628 addReply(c
,shared
.nullmultibulk
);
4630 if (o
->type
!= REDIS_ZSET
) {
4631 addReply(c
,shared
.wrongtypeerr
);
4633 zset
*zsetobj
= o
->ptr
;
4634 zskiplist
*zsl
= zsetobj
->zsl
;
4637 unsigned int rangelen
= 0;
4639 /* Get the first node with the score >= min */
4640 ln
= zslFirstWithScore(zsl
,min
);
4642 /* No element matching the speciifed interval */
4643 addReply(c
,shared
.emptymultibulk
);
4647 /* We don't know in advance how many matching elements there
4648 * are in the list, so we push this object that will represent
4649 * the multi-bulk length in the output buffer, and will "fix"
4651 lenobj
= createObject(REDIS_STRING
,NULL
);
4653 decrRefCount(lenobj
);
4655 while(ln
&& ln
->score
<= max
) {
4658 ln
= ln
->forward
[0];
4661 if (limit
== 0) break;
4663 addReplyBulkLen(c
,ele
);
4665 addReply(c
,shared
.crlf
);
4666 ln
= ln
->forward
[0];
4668 if (limit
> 0) limit
--;
4670 lenobj
->ptr
= sdscatprintf(sdsempty(),"*%d\r\n",rangelen
);
4675 static void zcardCommand(redisClient
*c
) {
4679 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4681 addReply(c
,shared
.czero
);
4684 if (o
->type
!= REDIS_ZSET
) {
4685 addReply(c
,shared
.wrongtypeerr
);
4688 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",zs
->zsl
->length
));
4693 static void zscoreCommand(redisClient
*c
) {
4697 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4699 addReply(c
,shared
.nullbulk
);
4702 if (o
->type
!= REDIS_ZSET
) {
4703 addReply(c
,shared
.wrongtypeerr
);
4708 de
= dictFind(zs
->dict
,c
->argv
[2]);
4710 addReply(c
,shared
.nullbulk
);
4712 double *score
= dictGetEntryVal(de
);
4714 addReplyDouble(c
,*score
);
4720 /* ========================= Non type-specific commands ==================== */
4722 static void flushdbCommand(redisClient
*c
) {
4723 server
.dirty
+= dictSize(c
->db
->dict
);
4724 dictEmpty(c
->db
->dict
);
4725 dictEmpty(c
->db
->expires
);
4726 addReply(c
,shared
.ok
);
4729 static void flushallCommand(redisClient
*c
) {
4730 server
.dirty
+= emptyDb();
4731 addReply(c
,shared
.ok
);
4732 rdbSave(server
.dbfilename
);
4736 static redisSortOperation
*createSortOperation(int type
, robj
*pattern
) {
4737 redisSortOperation
*so
= zmalloc(sizeof(*so
));
4739 so
->pattern
= pattern
;
4743 /* Return the value associated to the key with a name obtained
4744 * substituting the first occurence of '*' in 'pattern' with 'subst' */
4745 static robj
*lookupKeyByPattern(redisDb
*db
, robj
*pattern
, robj
*subst
) {
4749 int prefixlen
, sublen
, postfixlen
;
4750 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
4754 char buf
[REDIS_SORTKEY_MAX
+1];
4757 /* If the pattern is "#" return the substitution object itself in order
4758 * to implement the "SORT ... GET #" feature. */
4759 spat
= pattern
->ptr
;
4760 if (spat
[0] == '#' && spat
[1] == '\0') {
4764 /* The substitution object may be specially encoded. If so we create
4765 * a decoded object on the fly. Otherwise getDecodedObject will just
4766 * increment the ref count, that we'll decrement later. */
4767 subst
= getDecodedObject(subst
);
4770 if (sdslen(spat
)+sdslen(ssub
)-1 > REDIS_SORTKEY_MAX
) return NULL
;
4771 p
= strchr(spat
,'*');
4773 decrRefCount(subst
);
4778 sublen
= sdslen(ssub
);
4779 postfixlen
= sdslen(spat
)-(prefixlen
+1);
4780 memcpy(keyname
.buf
,spat
,prefixlen
);
4781 memcpy(keyname
.buf
+prefixlen
,ssub
,sublen
);
4782 memcpy(keyname
.buf
+prefixlen
+sublen
,p
+1,postfixlen
);
4783 keyname
.buf
[prefixlen
+sublen
+postfixlen
] = '\0';
4784 keyname
.len
= prefixlen
+sublen
+postfixlen
;
4786 initStaticStringObject(keyobj
,((char*)&keyname
)+(sizeof(long)*2))
4787 decrRefCount(subst
);
4789 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
4790 return lookupKeyRead(db
,&keyobj
);
4793 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
4794 * the additional parameter is not standard but a BSD-specific we have to
4795 * pass sorting parameters via the global 'server' structure */
4796 static int sortCompare(const void *s1
, const void *s2
) {
4797 const redisSortObject
*so1
= s1
, *so2
= s2
;
4800 if (!server
.sort_alpha
) {
4801 /* Numeric sorting. Here it's trivial as we precomputed scores */
4802 if (so1
->u
.score
> so2
->u
.score
) {
4804 } else if (so1
->u
.score
< so2
->u
.score
) {
4810 /* Alphanumeric sorting */
4811 if (server
.sort_bypattern
) {
4812 if (!so1
->u
.cmpobj
|| !so2
->u
.cmpobj
) {
4813 /* At least one compare object is NULL */
4814 if (so1
->u
.cmpobj
== so2
->u
.cmpobj
)
4816 else if (so1
->u
.cmpobj
== NULL
)
4821 /* We have both the objects, use strcoll */
4822 cmp
= strcoll(so1
->u
.cmpobj
->ptr
,so2
->u
.cmpobj
->ptr
);
4825 /* Compare elements directly */
4828 dec1
= getDecodedObject(so1
->obj
);
4829 dec2
= getDecodedObject(so2
->obj
);
4830 cmp
= strcoll(dec1
->ptr
,dec2
->ptr
);
4835 return server
.sort_desc
? -cmp
: cmp
;
4838 /* The SORT command is the most complex command in Redis. Warning: this code
4839 * is optimized for speed and a bit less for readability */
4840 static void sortCommand(redisClient
*c
) {
4843 int desc
= 0, alpha
= 0;
4844 int limit_start
= 0, limit_count
= -1, start
, end
;
4845 int j
, dontsort
= 0, vectorlen
;
4846 int getop
= 0; /* GET operation counter */
4847 robj
*sortval
, *sortby
= NULL
, *storekey
= NULL
;
4848 redisSortObject
*vector
; /* Resulting vector to sort */
4850 /* Lookup the key to sort. It must be of the right types */
4851 sortval
= lookupKeyRead(c
->db
,c
->argv
[1]);
4852 if (sortval
== NULL
) {
4853 addReply(c
,shared
.nullmultibulk
);
4856 if (sortval
->type
!= REDIS_SET
&& sortval
->type
!= REDIS_LIST
&&
4857 sortval
->type
!= REDIS_ZSET
)
4859 addReply(c
,shared
.wrongtypeerr
);
4863 /* Create a list of operations to perform for every sorted element.
4864 * Operations can be GET/DEL/INCR/DECR */
4865 operations
= listCreate();
4866 listSetFreeMethod(operations
,zfree
);
4869 /* Now we need to protect sortval incrementing its count, in the future
4870 * SORT may have options able to overwrite/delete keys during the sorting
4871 * and the sorted key itself may get destroied */
4872 incrRefCount(sortval
);
4874 /* The SORT command has an SQL-alike syntax, parse it */
4875 while(j
< c
->argc
) {
4876 int leftargs
= c
->argc
-j
-1;
4877 if (!strcasecmp(c
->argv
[j
]->ptr
,"asc")) {
4879 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"desc")) {
4881 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"alpha")) {
4883 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"limit") && leftargs
>= 2) {
4884 limit_start
= atoi(c
->argv
[j
+1]->ptr
);
4885 limit_count
= atoi(c
->argv
[j
+2]->ptr
);
4887 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"store") && leftargs
>= 1) {
4888 storekey
= c
->argv
[j
+1];
4890 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"by") && leftargs
>= 1) {
4891 sortby
= c
->argv
[j
+1];
4892 /* If the BY pattern does not contain '*', i.e. it is constant,
4893 * we don't need to sort nor to lookup the weight keys. */
4894 if (strchr(c
->argv
[j
+1]->ptr
,'*') == NULL
) dontsort
= 1;
4896 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"get") && leftargs
>= 1) {
4897 listAddNodeTail(operations
,createSortOperation(
4898 REDIS_SORT_GET
,c
->argv
[j
+1]));
4902 decrRefCount(sortval
);
4903 listRelease(operations
);
4904 addReply(c
,shared
.syntaxerr
);
4910 /* Load the sorting vector with all the objects to sort */
4911 switch(sortval
->type
) {
4912 case REDIS_LIST
: vectorlen
= listLength((list
*)sortval
->ptr
); break;
4913 case REDIS_SET
: vectorlen
= dictSize((dict
*)sortval
->ptr
); break;
4914 case REDIS_ZSET
: vectorlen
= dictSize(((zset
*)sortval
->ptr
)->dict
); break;
4915 default: vectorlen
= 0; redisAssert(0); /* Avoid GCC warning */
4917 vector
= zmalloc(sizeof(redisSortObject
)*vectorlen
);
4920 if (sortval
->type
== REDIS_LIST
) {
4921 list
*list
= sortval
->ptr
;
4925 while((ln
= listYield(list
))) {
4926 robj
*ele
= ln
->value
;
4927 vector
[j
].obj
= ele
;
4928 vector
[j
].u
.score
= 0;
4929 vector
[j
].u
.cmpobj
= NULL
;
4937 if (sortval
->type
== REDIS_SET
) {
4940 zset
*zs
= sortval
->ptr
;
4944 di
= dictGetIterator(set
);
4945 while((setele
= dictNext(di
)) != NULL
) {
4946 vector
[j
].obj
= dictGetEntryKey(setele
);
4947 vector
[j
].u
.score
= 0;
4948 vector
[j
].u
.cmpobj
= NULL
;
4951 dictReleaseIterator(di
);
4953 redisAssert(j
== vectorlen
);
4955 /* Now it's time to load the right scores in the sorting vector */
4956 if (dontsort
== 0) {
4957 for (j
= 0; j
< vectorlen
; j
++) {
4961 byval
= lookupKeyByPattern(c
->db
,sortby
,vector
[j
].obj
);
4962 if (!byval
|| byval
->type
!= REDIS_STRING
) continue;
4964 vector
[j
].u
.cmpobj
= getDecodedObject(byval
);
4966 if (byval
->encoding
== REDIS_ENCODING_RAW
) {
4967 vector
[j
].u
.score
= strtod(byval
->ptr
,NULL
);
4969 /* Don't need to decode the object if it's
4970 * integer-encoded (the only encoding supported) so
4971 * far. We can just cast it */
4972 if (byval
->encoding
== REDIS_ENCODING_INT
) {
4973 vector
[j
].u
.score
= (long)byval
->ptr
;
4975 redisAssert(1 != 1);
4980 if (vector
[j
].obj
->encoding
== REDIS_ENCODING_RAW
)
4981 vector
[j
].u
.score
= strtod(vector
[j
].obj
->ptr
,NULL
);
4983 if (vector
[j
].obj
->encoding
== REDIS_ENCODING_INT
)
4984 vector
[j
].u
.score
= (long) vector
[j
].obj
->ptr
;
4986 redisAssert(1 != 1);
4993 /* We are ready to sort the vector... perform a bit of sanity check
4994 * on the LIMIT option too. We'll use a partial version of quicksort. */
4995 start
= (limit_start
< 0) ? 0 : limit_start
;
4996 end
= (limit_count
< 0) ? vectorlen
-1 : start
+limit_count
-1;
4997 if (start
>= vectorlen
) {
4998 start
= vectorlen
-1;
5001 if (end
>= vectorlen
) end
= vectorlen
-1;
5003 if (dontsort
== 0) {
5004 server
.sort_desc
= desc
;
5005 server
.sort_alpha
= alpha
;
5006 server
.sort_bypattern
= sortby
? 1 : 0;
5007 if (sortby
&& (start
!= 0 || end
!= vectorlen
-1))
5008 pqsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
, start
,end
);
5010 qsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
);
5013 /* Send command output to the output buffer, performing the specified
5014 * GET/DEL/INCR/DECR operations if any. */
5015 outputlen
= getop
? getop
*(end
-start
+1) : end
-start
+1;
5016 if (storekey
== NULL
) {
5017 /* STORE option not specified, sent the sorting result to client */
5018 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",outputlen
));
5019 for (j
= start
; j
<= end
; j
++) {
5022 addReplyBulkLen(c
,vector
[j
].obj
);
5023 addReply(c
,vector
[j
].obj
);
5024 addReply(c
,shared
.crlf
);
5026 listRewind(operations
);
5027 while((ln
= listYield(operations
))) {
5028 redisSortOperation
*sop
= ln
->value
;
5029 robj
*val
= lookupKeyByPattern(c
->db
,sop
->pattern
,
5032 if (sop
->type
== REDIS_SORT_GET
) {
5033 if (!val
|| val
->type
!= REDIS_STRING
) {
5034 addReply(c
,shared
.nullbulk
);
5036 addReplyBulkLen(c
,val
);
5038 addReply(c
,shared
.crlf
);
5041 redisAssert(sop
->type
== REDIS_SORT_GET
); /* always fails */
5046 robj
*listObject
= createListObject();
5047 list
*listPtr
= (list
*) listObject
->ptr
;
5049 /* STORE option specified, set the sorting result as a List object */
5050 for (j
= start
; j
<= end
; j
++) {
5053 listAddNodeTail(listPtr
,vector
[j
].obj
);
5054 incrRefCount(vector
[j
].obj
);
5056 listRewind(operations
);
5057 while((ln
= listYield(operations
))) {
5058 redisSortOperation
*sop
= ln
->value
;
5059 robj
*val
= lookupKeyByPattern(c
->db
,sop
->pattern
,
5062 if (sop
->type
== REDIS_SORT_GET
) {
5063 if (!val
|| val
->type
!= REDIS_STRING
) {
5064 listAddNodeTail(listPtr
,createStringObject("",0));
5066 listAddNodeTail(listPtr
,val
);
5070 redisAssert(sop
->type
== REDIS_SORT_GET
); /* always fails */
5074 if (dictReplace(c
->db
->dict
,storekey
,listObject
)) {
5075 incrRefCount(storekey
);
5077 /* Note: we add 1 because the DB is dirty anyway since even if the
5078 * SORT result is empty a new key is set and maybe the old content
5080 server
.dirty
+= 1+outputlen
;
5081 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",outputlen
));
5085 decrRefCount(sortval
);
5086 listRelease(operations
);
5087 for (j
= 0; j
< vectorlen
; j
++) {
5088 if (sortby
&& alpha
&& vector
[j
].u
.cmpobj
)
5089 decrRefCount(vector
[j
].u
.cmpobj
);
5094 /* Create the string returned by the INFO command. This is decoupled
5095 * by the INFO command itself as we need to report the same information
5096 * on memory corruption problems. */
5097 static sds
genRedisInfoString(void) {
5099 time_t uptime
= time(NULL
)-server
.stat_starttime
;
5102 info
= sdscatprintf(sdsempty(),
5103 "redis_version:%s\r\n"
5105 "multiplexing_api:%s\r\n"
5106 "uptime_in_seconds:%ld\r\n"
5107 "uptime_in_days:%ld\r\n"
5108 "connected_clients:%d\r\n"
5109 "connected_slaves:%d\r\n"
5110 "used_memory:%zu\r\n"
5111 "changes_since_last_save:%lld\r\n"
5112 "bgsave_in_progress:%d\r\n"
5113 "last_save_time:%ld\r\n"
5114 "bgrewriteaof_in_progress:%d\r\n"
5115 "total_connections_received:%lld\r\n"
5116 "total_commands_processed:%lld\r\n"
5119 (sizeof(long) == 8) ? "64" : "32",
5123 listLength(server
.clients
)-listLength(server
.slaves
),
5124 listLength(server
.slaves
),
5127 server
.bgsavechildpid
!= -1,
5129 server
.bgrewritechildpid
!= -1,
5130 server
.stat_numconnections
,
5131 server
.stat_numcommands
,
5132 server
.masterhost
== NULL
? "master" : "slave"
5134 if (server
.masterhost
) {
5135 info
= sdscatprintf(info
,
5136 "master_host:%s\r\n"
5137 "master_port:%d\r\n"
5138 "master_link_status:%s\r\n"
5139 "master_last_io_seconds_ago:%d\r\n"
5142 (server
.replstate
== REDIS_REPL_CONNECTED
) ?
5144 server
.master
? ((int)(time(NULL
)-server
.master
->lastinteraction
)) : -1
5147 for (j
= 0; j
< server
.dbnum
; j
++) {
5148 long long keys
, vkeys
;
5150 keys
= dictSize(server
.db
[j
].dict
);
5151 vkeys
= dictSize(server
.db
[j
].expires
);
5152 if (keys
|| vkeys
) {
5153 info
= sdscatprintf(info
, "db%d:keys=%lld,expires=%lld\r\n",
5160 static void infoCommand(redisClient
*c
) {
5161 sds info
= genRedisInfoString();
5162 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n",
5163 (unsigned long)sdslen(info
)));
5164 addReplySds(c
,info
);
5165 addReply(c
,shared
.crlf
);
5168 static void monitorCommand(redisClient
*c
) {
5169 /* ignore MONITOR if aleady slave or in monitor mode */
5170 if (c
->flags
& REDIS_SLAVE
) return;
5172 c
->flags
|= (REDIS_SLAVE
|REDIS_MONITOR
);
5174 listAddNodeTail(server
.monitors
,c
);
5175 addReply(c
,shared
.ok
);
5178 /* ================================= Expire ================================= */
5179 static int removeExpire(redisDb
*db
, robj
*key
) {
5180 if (dictDelete(db
->expires
,key
) == DICT_OK
) {
5187 static int setExpire(redisDb
*db
, robj
*key
, time_t when
) {
5188 if (dictAdd(db
->expires
,key
,(void*)when
) == DICT_ERR
) {
5196 /* Return the expire time of the specified key, or -1 if no expire
5197 * is associated with this key (i.e. the key is non volatile) */
5198 static time_t getExpire(redisDb
*db
, robj
*key
) {
5201 /* No expire? return ASAP */
5202 if (dictSize(db
->expires
) == 0 ||
5203 (de
= dictFind(db
->expires
,key
)) == NULL
) return -1;
5205 return (time_t) dictGetEntryVal(de
);
5208 static int expireIfNeeded(redisDb
*db
, robj
*key
) {
5212 /* No expire? return ASAP */
5213 if (dictSize(db
->expires
) == 0 ||
5214 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
5216 /* Lookup the expire */
5217 when
= (time_t) dictGetEntryVal(de
);
5218 if (time(NULL
) <= when
) return 0;
5220 /* Delete the key */
5221 dictDelete(db
->expires
,key
);
5222 return dictDelete(db
->dict
,key
) == DICT_OK
;
5225 static int deleteIfVolatile(redisDb
*db
, robj
*key
) {
5228 /* No expire? return ASAP */
5229 if (dictSize(db
->expires
) == 0 ||
5230 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
5232 /* Delete the key */
5234 dictDelete(db
->expires
,key
);
5235 return dictDelete(db
->dict
,key
) == DICT_OK
;
5238 static void expireGenericCommand(redisClient
*c
, robj
*key
, time_t seconds
) {
5241 de
= dictFind(c
->db
->dict
,key
);
5243 addReply(c
,shared
.czero
);
5247 if (deleteKey(c
->db
,key
)) server
.dirty
++;
5248 addReply(c
, shared
.cone
);
5251 time_t when
= time(NULL
)+seconds
;
5252 if (setExpire(c
->db
,key
,when
)) {
5253 addReply(c
,shared
.cone
);
5256 addReply(c
,shared
.czero
);
5262 static void expireCommand(redisClient
*c
) {
5263 expireGenericCommand(c
,c
->argv
[1],strtol(c
->argv
[2]->ptr
,NULL
,10));
5266 static void expireatCommand(redisClient
*c
) {
5267 expireGenericCommand(c
,c
->argv
[1],strtol(c
->argv
[2]->ptr
,NULL
,10)-time(NULL
));
5270 static void ttlCommand(redisClient
*c
) {
5274 expire
= getExpire(c
->db
,c
->argv
[1]);
5276 ttl
= (int) (expire
-time(NULL
));
5277 if (ttl
< 0) ttl
= -1;
5279 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",ttl
));
5282 /* =============================== Replication ============================= */
5284 static int syncWrite(int fd
, char *ptr
, ssize_t size
, int timeout
) {
5285 ssize_t nwritten
, ret
= size
;
5286 time_t start
= time(NULL
);
5290 if (aeWait(fd
,AE_WRITABLE
,1000) & AE_WRITABLE
) {
5291 nwritten
= write(fd
,ptr
,size
);
5292 if (nwritten
== -1) return -1;
5296 if ((time(NULL
)-start
) > timeout
) {
5304 static int syncRead(int fd
, char *ptr
, ssize_t size
, int timeout
) {
5305 ssize_t nread
, totread
= 0;
5306 time_t start
= time(NULL
);
5310 if (aeWait(fd
,AE_READABLE
,1000) & AE_READABLE
) {
5311 nread
= read(fd
,ptr
,size
);
5312 if (nread
== -1) return -1;
5317 if ((time(NULL
)-start
) > timeout
) {
5325 static int syncReadLine(int fd
, char *ptr
, ssize_t size
, int timeout
) {
5332 if (syncRead(fd
,&c
,1,timeout
) == -1) return -1;
5335 if (nread
&& *(ptr
-1) == '\r') *(ptr
-1) = '\0';
5346 static void syncCommand(redisClient
*c
) {
5347 /* ignore SYNC if aleady slave or in monitor mode */
5348 if (c
->flags
& REDIS_SLAVE
) return;
5350 /* SYNC can't be issued when the server has pending data to send to
5351 * the client about already issued commands. We need a fresh reply
5352 * buffer registering the differences between the BGSAVE and the current
5353 * dataset, so that we can copy to other slaves if needed. */
5354 if (listLength(c
->reply
) != 0) {
5355 addReplySds(c
,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
5359 redisLog(REDIS_NOTICE
,"Slave ask for synchronization");
5360 /* Here we need to check if there is a background saving operation
5361 * in progress, or if it is required to start one */
5362 if (server
.bgsavechildpid
!= -1) {
5363 /* Ok a background save is in progress. Let's check if it is a good
5364 * one for replication, i.e. if there is another slave that is
5365 * registering differences since the server forked to save */
5369 listRewind(server
.slaves
);
5370 while((ln
= listYield(server
.slaves
))) {
5372 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) break;
5375 /* Perfect, the server is already registering differences for
5376 * another slave. Set the right state, and copy the buffer. */
5377 listRelease(c
->reply
);
5378 c
->reply
= listDup(slave
->reply
);
5379 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
5380 redisLog(REDIS_NOTICE
,"Waiting for end of BGSAVE for SYNC");
5382 /* No way, we need to wait for the next BGSAVE in order to
5383 * register differences */
5384 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_START
;
5385 redisLog(REDIS_NOTICE
,"Waiting for next BGSAVE for SYNC");
5388 /* Ok we don't have a BGSAVE in progress, let's start one */
5389 redisLog(REDIS_NOTICE
,"Starting BGSAVE for SYNC");
5390 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
5391 redisLog(REDIS_NOTICE
,"Replication failed, can't BGSAVE");
5392 addReplySds(c
,sdsnew("-ERR Unalbe to perform background save\r\n"));
5395 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
5398 c
->flags
|= REDIS_SLAVE
;
5400 listAddNodeTail(server
.slaves
,c
);
5404 static void sendBulkToSlave(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
5405 redisClient
*slave
= privdata
;
5407 REDIS_NOTUSED(mask
);
5408 char buf
[REDIS_IOBUF_LEN
];
5409 ssize_t nwritten
, buflen
;
5411 if (slave
->repldboff
== 0) {
5412 /* Write the bulk write count before to transfer the DB. In theory here
5413 * we don't know how much room there is in the output buffer of the
5414 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
5415 * operations) will never be smaller than the few bytes we need. */
5418 bulkcount
= sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
5420 if (write(fd
,bulkcount
,sdslen(bulkcount
)) != (signed)sdslen(bulkcount
))
5428 lseek(slave
->repldbfd
,slave
->repldboff
,SEEK_SET
);
5429 buflen
= read(slave
->repldbfd
,buf
,REDIS_IOBUF_LEN
);
5431 redisLog(REDIS_WARNING
,"Read error sending DB to slave: %s",
5432 (buflen
== 0) ? "premature EOF" : strerror(errno
));
5436 if ((nwritten
= write(fd
,buf
,buflen
)) == -1) {
5437 redisLog(REDIS_DEBUG
,"Write error sending DB to slave: %s",
5442 slave
->repldboff
+= nwritten
;
5443 if (slave
->repldboff
== slave
->repldbsize
) {
5444 close(slave
->repldbfd
);
5445 slave
->repldbfd
= -1;
5446 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
5447 slave
->replstate
= REDIS_REPL_ONLINE
;
5448 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
,
5449 sendReplyToClient
, slave
) == AE_ERR
) {
5453 addReplySds(slave
,sdsempty());
5454 redisLog(REDIS_NOTICE
,"Synchronization with slave succeeded");
5458 /* This function is called at the end of every backgrond saving.
5459 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
5460 * otherwise REDIS_ERR is passed to the function.
5462 * The goal of this function is to handle slaves waiting for a successful
5463 * background saving in order to perform non-blocking synchronization. */
5464 static void updateSlavesWaitingBgsave(int bgsaveerr
) {
5466 int startbgsave
= 0;
5468 listRewind(server
.slaves
);
5469 while((ln
= listYield(server
.slaves
))) {
5470 redisClient
*slave
= ln
->value
;
5472 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) {
5474 slave
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
5475 } else if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) {
5476 struct redis_stat buf
;
5478 if (bgsaveerr
!= REDIS_OK
) {
5480 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE child returned an error");
5483 if ((slave
->repldbfd
= open(server
.dbfilename
,O_RDONLY
)) == -1 ||
5484 redis_fstat(slave
->repldbfd
,&buf
) == -1) {
5486 redisLog(REDIS_WARNING
,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno
));
5489 slave
->repldboff
= 0;
5490 slave
->repldbsize
= buf
.st_size
;
5491 slave
->replstate
= REDIS_REPL_SEND_BULK
;
5492 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
5493 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
, sendBulkToSlave
, slave
) == AE_ERR
) {
5500 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
5501 listRewind(server
.slaves
);
5502 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE failed");
5503 while((ln
= listYield(server
.slaves
))) {
5504 redisClient
*slave
= ln
->value
;
5506 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
)
5513 static int syncWithMaster(void) {
5514 char buf
[1024], tmpfile
[256], authcmd
[1024];
5516 int fd
= anetTcpConnect(NULL
,server
.masterhost
,server
.masterport
);
5520 redisLog(REDIS_WARNING
,"Unable to connect to MASTER: %s",
5525 /* AUTH with the master if required. */
5526 if(server
.masterauth
) {
5527 snprintf(authcmd
, 1024, "AUTH %s\r\n", server
.masterauth
);
5528 if (syncWrite(fd
, authcmd
, strlen(server
.masterauth
)+7, 5) == -1) {
5530 redisLog(REDIS_WARNING
,"Unable to AUTH to MASTER: %s",
5534 /* Read the AUTH result. */
5535 if (syncReadLine(fd
,buf
,1024,3600) == -1) {
5537 redisLog(REDIS_WARNING
,"I/O error reading auth result from MASTER: %s",
5541 if (buf
[0] != '+') {
5543 redisLog(REDIS_WARNING
,"Cannot AUTH to MASTER, is the masterauth password correct?");
5548 /* Issue the SYNC command */
5549 if (syncWrite(fd
,"SYNC \r\n",7,5) == -1) {
5551 redisLog(REDIS_WARNING
,"I/O error writing to MASTER: %s",
5555 /* Read the bulk write count */
5556 if (syncReadLine(fd
,buf
,1024,3600) == -1) {
5558 redisLog(REDIS_WARNING
,"I/O error reading bulk count from MASTER: %s",
5562 if (buf
[0] != '$') {
5564 redisLog(REDIS_WARNING
,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
5567 dumpsize
= atoi(buf
+1);
5568 redisLog(REDIS_NOTICE
,"Receiving %d bytes data dump from MASTER",dumpsize
);
5569 /* Read the bulk write data on a temp file */
5570 snprintf(tmpfile
,256,"temp-%d.%ld.rdb",(int)time(NULL
),(long int)random());
5571 dfd
= open(tmpfile
,O_CREAT
|O_WRONLY
,0644);
5574 redisLog(REDIS_WARNING
,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno
));
5578 int nread
, nwritten
;
5580 nread
= read(fd
,buf
,(dumpsize
< 1024)?dumpsize
:1024);
5582 redisLog(REDIS_WARNING
,"I/O error trying to sync with MASTER: %s",
5588 nwritten
= write(dfd
,buf
,nread
);
5589 if (nwritten
== -1) {
5590 redisLog(REDIS_WARNING
,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno
));
5598 if (rename(tmpfile
,server
.dbfilename
) == -1) {
5599 redisLog(REDIS_WARNING
,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno
));
5605 if (rdbLoad(server
.dbfilename
) != REDIS_OK
) {
5606 redisLog(REDIS_WARNING
,"Failed trying to load the MASTER synchronization DB from disk");
5610 server
.master
= createClient(fd
);
5611 server
.master
->flags
|= REDIS_MASTER
;
5612 server
.master
->authenticated
= 1;
5613 server
.replstate
= REDIS_REPL_CONNECTED
;
5617 static void slaveofCommand(redisClient
*c
) {
5618 if (!strcasecmp(c
->argv
[1]->ptr
,"no") &&
5619 !strcasecmp(c
->argv
[2]->ptr
,"one")) {
5620 if (server
.masterhost
) {
5621 sdsfree(server
.masterhost
);
5622 server
.masterhost
= NULL
;
5623 if (server
.master
) freeClient(server
.master
);
5624 server
.replstate
= REDIS_REPL_NONE
;
5625 redisLog(REDIS_NOTICE
,"MASTER MODE enabled (user request)");
5628 sdsfree(server
.masterhost
);
5629 server
.masterhost
= sdsdup(c
->argv
[1]->ptr
);
5630 server
.masterport
= atoi(c
->argv
[2]->ptr
);
5631 if (server
.master
) freeClient(server
.master
);
5632 server
.replstate
= REDIS_REPL_CONNECT
;
5633 redisLog(REDIS_NOTICE
,"SLAVE OF %s:%d enabled (user request)",
5634 server
.masterhost
, server
.masterport
);
5636 addReply(c
,shared
.ok
);
5639 /* ============================ Maxmemory directive ======================== */
5641 /* This function gets called when 'maxmemory' is set on the config file to limit
5642 * the max memory used by the server, and we are out of memory.
5643 * This function will try to, in order:
5645 * - Free objects from the free list
5646 * - Try to remove keys with an EXPIRE set
5648 * It is not possible to free enough memory to reach used-memory < maxmemory
5649 * the server will start refusing commands that will enlarge even more the
5652 static void freeMemoryIfNeeded(void) {
5653 while (server
.maxmemory
&& zmalloc_used_memory() > server
.maxmemory
) {
5654 if (listLength(server
.objfreelist
)) {
5657 listNode
*head
= listFirst(server
.objfreelist
);
5658 o
= listNodeValue(head
);
5659 listDelNode(server
.objfreelist
,head
);
5662 int j
, k
, freed
= 0;
5664 for (j
= 0; j
< server
.dbnum
; j
++) {
5666 robj
*minkey
= NULL
;
5667 struct dictEntry
*de
;
5669 if (dictSize(server
.db
[j
].expires
)) {
5671 /* From a sample of three keys drop the one nearest to
5672 * the natural expire */
5673 for (k
= 0; k
< 3; k
++) {
5676 de
= dictGetRandomKey(server
.db
[j
].expires
);
5677 t
= (time_t) dictGetEntryVal(de
);
5678 if (minttl
== -1 || t
< minttl
) {
5679 minkey
= dictGetEntryKey(de
);
5683 deleteKey(server
.db
+j
,minkey
);
5686 if (!freed
) return; /* nothing to free... */
5691 /* ============================== Append Only file ========================== */
5693 static void feedAppendOnlyFile(struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
) {
5694 sds buf
= sdsempty();
5700 /* The DB this command was targetting is not the same as the last command
5701 * we appendend. To issue a SELECT command is needed. */
5702 if (dictid
!= server
.appendseldb
) {
5705 snprintf(seldb
,sizeof(seldb
),"%d",dictid
);
5706 buf
= sdscatprintf(buf
,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
5707 (unsigned long)strlen(seldb
),seldb
);
5708 server
.appendseldb
= dictid
;
5711 /* "Fix" the argv vector if the command is EXPIRE. We want to translate
5712 * EXPIREs into EXPIREATs calls */
5713 if (cmd
->proc
== expireCommand
) {
5716 tmpargv
[0] = createStringObject("EXPIREAT",8);
5717 tmpargv
[1] = argv
[1];
5718 incrRefCount(argv
[1]);
5719 when
= time(NULL
)+strtol(argv
[2]->ptr
,NULL
,10);
5720 tmpargv
[2] = createObject(REDIS_STRING
,
5721 sdscatprintf(sdsempty(),"%ld",when
));
5725 /* Append the actual command */
5726 buf
= sdscatprintf(buf
,"*%d\r\n",argc
);
5727 for (j
= 0; j
< argc
; j
++) {
5730 o
= getDecodedObject(o
);
5731 buf
= sdscatprintf(buf
,"$%lu\r\n",(unsigned long)sdslen(o
->ptr
));
5732 buf
= sdscatlen(buf
,o
->ptr
,sdslen(o
->ptr
));
5733 buf
= sdscatlen(buf
,"\r\n",2);
5737 /* Free the objects from the modified argv for EXPIREAT */
5738 if (cmd
->proc
== expireCommand
) {
5739 for (j
= 0; j
< 3; j
++)
5740 decrRefCount(argv
[j
]);
5743 /* We want to perform a single write. This should be guaranteed atomic
5744 * at least if the filesystem we are writing is a real physical one.
5745 * While this will save us against the server being killed I don't think
5746 * there is much to do about the whole server stopping for power problems
5748 nwritten
= write(server
.appendfd
,buf
,sdslen(buf
));
5749 if (nwritten
!= (signed)sdslen(buf
)) {
5750 /* Ooops, we are in troubles. The best thing to do for now is
5751 * to simply exit instead to give the illusion that everything is
5752 * working as expected. */
5753 if (nwritten
== -1) {
5754 redisLog(REDIS_WARNING
,"Exiting on error writing to the append-only file: %s",strerror(errno
));
5756 redisLog(REDIS_WARNING
,"Exiting on short write while writing to the append-only file: %s",strerror(errno
));
5760 /* If a background append only file rewriting is in progress we want to
5761 * accumulate the differences between the child DB and the current one
5762 * in a buffer, so that when the child process will do its work we
5763 * can append the differences to the new append only file. */
5764 if (server
.bgrewritechildpid
!= -1)
5765 server
.bgrewritebuf
= sdscatlen(server
.bgrewritebuf
,buf
,sdslen(buf
));
5769 if (server
.appendfsync
== APPENDFSYNC_ALWAYS
||
5770 (server
.appendfsync
== APPENDFSYNC_EVERYSEC
&&
5771 now
-server
.lastfsync
> 1))
5773 fsync(server
.appendfd
); /* Let's try to get this data on the disk */
5774 server
.lastfsync
= now
;
5778 /* In Redis commands are always executed in the context of a client, so in
5779 * order to load the append only file we need to create a fake client. */
5780 static struct redisClient
*createFakeClient(void) {
5781 struct redisClient
*c
= zmalloc(sizeof(*c
));
5785 c
->querybuf
= sdsempty();
5789 /* We set the fake client as a slave waiting for the synchronization
5790 * so that Redis will not try to send replies to this client. */
5791 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_START
;
5792 c
->reply
= listCreate();
5793 listSetFreeMethod(c
->reply
,decrRefCount
);
5794 listSetDupMethod(c
->reply
,dupClientReplyValue
);
5798 static void freeFakeClient(struct redisClient
*c
) {
5799 sdsfree(c
->querybuf
);
5800 listRelease(c
->reply
);
5804 /* Replay the append log file. On error REDIS_OK is returned. On non fatal
5805 * error (the append only file is zero-length) REDIS_ERR is returned. On
5806 * fatal error an error message is logged and the program exists. */
5807 int loadAppendOnlyFile(char *filename
) {
5808 struct redisClient
*fakeClient
;
5809 FILE *fp
= fopen(filename
,"r");
5810 struct redis_stat sb
;
5812 if (redis_fstat(fileno(fp
),&sb
) != -1 && sb
.st_size
== 0)
5816 redisLog(REDIS_WARNING
,"Fatal error: can't open the append log file for reading: %s",strerror(errno
));
5820 fakeClient
= createFakeClient();
5827 struct redisCommand
*cmd
;
5829 if (fgets(buf
,sizeof(buf
),fp
) == NULL
) {
5835 if (buf
[0] != '*') goto fmterr
;
5837 argv
= zmalloc(sizeof(robj
*)*argc
);
5838 for (j
= 0; j
< argc
; j
++) {
5839 if (fgets(buf
,sizeof(buf
),fp
) == NULL
) goto readerr
;
5840 if (buf
[0] != '$') goto fmterr
;
5841 len
= strtol(buf
+1,NULL
,10);
5842 argsds
= sdsnewlen(NULL
,len
);
5843 if (len
&& fread(argsds
,len
,1,fp
) == 0) goto fmterr
;
5844 argv
[j
] = createObject(REDIS_STRING
,argsds
);
5845 if (fread(buf
,2,1,fp
) == 0) goto fmterr
; /* discard CRLF */
5848 /* Command lookup */
5849 cmd
= lookupCommand(argv
[0]->ptr
);
5851 redisLog(REDIS_WARNING
,"Unknown command '%s' reading the append only file", argv
[0]->ptr
);
5854 /* Try object sharing and encoding */
5855 if (server
.shareobjects
) {
5857 for(j
= 1; j
< argc
; j
++)
5858 argv
[j
] = tryObjectSharing(argv
[j
]);
5860 if (cmd
->flags
& REDIS_CMD_BULK
)
5861 tryObjectEncoding(argv
[argc
-1]);
5862 /* Run the command in the context of a fake client */
5863 fakeClient
->argc
= argc
;
5864 fakeClient
->argv
= argv
;
5865 cmd
->proc(fakeClient
);
5866 /* Discard the reply objects list from the fake client */
5867 while(listLength(fakeClient
->reply
))
5868 listDelNode(fakeClient
->reply
,listFirst(fakeClient
->reply
));
5869 /* Clean up, ready for the next command */
5870 for (j
= 0; j
< argc
; j
++) decrRefCount(argv
[j
]);
5874 freeFakeClient(fakeClient
);
5879 redisLog(REDIS_WARNING
,"Unexpected end of file reading the append only file");
5881 redisLog(REDIS_WARNING
,"Unrecoverable error reading the append only file: %s", strerror(errno
));
5885 redisLog(REDIS_WARNING
,"Bad file format reading the append only file");
5889 /* Write an object into a file in the bulk format $<count>\r\n<payload>\r\n */
5890 static int fwriteBulk(FILE *fp
, robj
*obj
) {
5892 obj
= getDecodedObject(obj
);
5893 snprintf(buf
,sizeof(buf
),"$%ld\r\n",(long)sdslen(obj
->ptr
));
5894 if (fwrite(buf
,strlen(buf
),1,fp
) == 0) goto err
;
5895 if (sdslen(obj
->ptr
) && fwrite(obj
->ptr
,sdslen(obj
->ptr
),1,fp
) == 0)
5897 if (fwrite("\r\n",2,1,fp
) == 0) goto err
;
5905 /* Write a double value in bulk format $<count>\r\n<payload>\r\n */
5906 static int fwriteBulkDouble(FILE *fp
, double d
) {
5907 char buf
[128], dbuf
[128];
5909 snprintf(dbuf
,sizeof(dbuf
),"%.17g\r\n",d
);
5910 snprintf(buf
,sizeof(buf
),"$%lu\r\n",(unsigned long)strlen(dbuf
)-2);
5911 if (fwrite(buf
,strlen(buf
),1,fp
) == 0) return 0;
5912 if (fwrite(dbuf
,strlen(dbuf
),1,fp
) == 0) return 0;
5916 /* Write a long value in bulk format $<count>\r\n<payload>\r\n */
5917 static int fwriteBulkLong(FILE *fp
, long l
) {
5918 char buf
[128], lbuf
[128];
5920 snprintf(lbuf
,sizeof(lbuf
),"%ld\r\n",l
);
5921 snprintf(buf
,sizeof(buf
),"$%lu\r\n",(unsigned long)strlen(lbuf
)-2);
5922 if (fwrite(buf
,strlen(buf
),1,fp
) == 0) return 0;
5923 if (fwrite(lbuf
,strlen(lbuf
),1,fp
) == 0) return 0;
5927 /* Write a sequence of commands able to fully rebuild the dataset into
5928 * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */
5929 static int rewriteAppendOnlyFile(char *filename
) {
5930 dictIterator
*di
= NULL
;
5935 time_t now
= time(NULL
);
5937 /* Note that we have to use a different temp name here compared to the
5938 * one used by rewriteAppendOnlyFileBackground() function. */
5939 snprintf(tmpfile
,256,"temp-rewriteaof-%d.aof", (int) getpid());
5940 fp
= fopen(tmpfile
,"w");
5942 redisLog(REDIS_WARNING
, "Failed rewriting the append only file: %s", strerror(errno
));
5945 for (j
= 0; j
< server
.dbnum
; j
++) {
5946 char selectcmd
[] = "*2\r\n$6\r\nSELECT\r\n";
5947 redisDb
*db
= server
.db
+j
;
5949 if (dictSize(d
) == 0) continue;
5950 di
= dictGetIterator(d
);
5956 /* SELECT the new DB */
5957 if (fwrite(selectcmd
,sizeof(selectcmd
)-1,1,fp
) == 0) goto werr
;
5958 if (fwriteBulkLong(fp
,j
) == 0) goto werr
;
5960 /* Iterate this DB writing every entry */
5961 while((de
= dictNext(di
)) != NULL
) {
5962 robj
*key
= dictGetEntryKey(de
);
5963 robj
*o
= dictGetEntryVal(de
);
5964 time_t expiretime
= getExpire(db
,key
);
5966 /* Save the key and associated value */
5967 if (o
->type
== REDIS_STRING
) {
5968 /* Emit a SET command */
5969 char cmd
[]="*3\r\n$3\r\nSET\r\n";
5970 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
5972 if (fwriteBulk(fp
,key
) == 0) goto werr
;
5973 if (fwriteBulk(fp
,o
) == 0) goto werr
;
5974 } else if (o
->type
== REDIS_LIST
) {
5975 /* Emit the RPUSHes needed to rebuild the list */
5976 list
*list
= o
->ptr
;
5980 while((ln
= listYield(list
))) {
5981 char cmd
[]="*3\r\n$5\r\nRPUSH\r\n";
5982 robj
*eleobj
= listNodeValue(ln
);
5984 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
5985 if (fwriteBulk(fp
,key
) == 0) goto werr
;
5986 if (fwriteBulk(fp
,eleobj
) == 0) goto werr
;
5988 } else if (o
->type
== REDIS_SET
) {
5989 /* Emit the SADDs needed to rebuild the set */
5991 dictIterator
*di
= dictGetIterator(set
);
5994 while((de
= dictNext(di
)) != NULL
) {
5995 char cmd
[]="*3\r\n$4\r\nSADD\r\n";
5996 robj
*eleobj
= dictGetEntryKey(de
);
5998 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
5999 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6000 if (fwriteBulk(fp
,eleobj
) == 0) goto werr
;
6002 dictReleaseIterator(di
);
6003 } else if (o
->type
== REDIS_ZSET
) {
6004 /* Emit the ZADDs needed to rebuild the sorted set */
6006 dictIterator
*di
= dictGetIterator(zs
->dict
);
6009 while((de
= dictNext(di
)) != NULL
) {
6010 char cmd
[]="*4\r\n$4\r\nZADD\r\n";
6011 robj
*eleobj
= dictGetEntryKey(de
);
6012 double *score
= dictGetEntryVal(de
);
6014 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6015 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6016 if (fwriteBulkDouble(fp
,*score
) == 0) goto werr
;
6017 if (fwriteBulk(fp
,eleobj
) == 0) goto werr
;
6019 dictReleaseIterator(di
);
6021 redisAssert(0 != 0);
6023 /* Save the expire time */
6024 if (expiretime
!= -1) {
6025 char cmd
[]="*3\r\n$8\r\nEXPIREAT\r\n";
6026 /* If this key is already expired skip it */
6027 if (expiretime
< now
) continue;
6028 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6029 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6030 if (fwriteBulkLong(fp
,expiretime
) == 0) goto werr
;
6033 dictReleaseIterator(di
);
6036 /* Make sure data will not remain on the OS's output buffers */
6041 /* Use RENAME to make sure the DB file is changed atomically only
6042 * if the generate DB file is ok. */
6043 if (rename(tmpfile
,filename
) == -1) {
6044 redisLog(REDIS_WARNING
,"Error moving temp append only file on the final destination: %s", strerror(errno
));
6048 redisLog(REDIS_NOTICE
,"SYNC append only file rewrite performed");
6054 redisLog(REDIS_WARNING
,"Write error writing append only file on disk: %s", strerror(errno
));
6055 if (di
) dictReleaseIterator(di
);
6059 /* This is how rewriting of the append only file in background works:
6061 * 1) The user calls BGREWRITEAOF
6062 * 2) Redis calls this function, that forks():
6063 * 2a) the child rewrite the append only file in a temp file.
6064 * 2b) the parent accumulates differences in server.bgrewritebuf.
6065 * 3) When the child finished '2a' exists.
6066 * 4) The parent will trap the exit code, if it's OK, will append the
6067 * data accumulated into server.bgrewritebuf into the temp file, and
6068 * finally will rename(2) the temp file in the actual file name.
6069 * The the new file is reopened as the new append only file. Profit!
6071 static int rewriteAppendOnlyFileBackground(void) {
6074 if (server
.bgrewritechildpid
!= -1) return REDIS_ERR
;
6075 if ((childpid
= fork()) == 0) {
6080 snprintf(tmpfile
,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
6081 if (rewriteAppendOnlyFile(tmpfile
) == REDIS_OK
) {
6088 if (childpid
== -1) {
6089 redisLog(REDIS_WARNING
,
6090 "Can't rewrite append only file in background: fork: %s",
6094 redisLog(REDIS_NOTICE
,
6095 "Background append only file rewriting started by pid %d",childpid
);
6096 server
.bgrewritechildpid
= childpid
;
6097 /* We set appendseldb to -1 in order to force the next call to the
6098 * feedAppendOnlyFile() to issue a SELECT command, so the differences
6099 * accumulated by the parent into server.bgrewritebuf will start
6100 * with a SELECT statement and it will be safe to merge. */
6101 server
.appendseldb
= -1;
6104 return REDIS_OK
; /* unreached */
6107 static void bgrewriteaofCommand(redisClient
*c
) {
6108 if (server
.bgrewritechildpid
!= -1) {
6109 addReplySds(c
,sdsnew("-ERR background append only file rewriting already in progress\r\n"));
6112 if (rewriteAppendOnlyFileBackground() == REDIS_OK
) {
6113 char *status
= "+Background append only file rewriting started\r\n";
6114 addReplySds(c
,sdsnew(status
));
6116 addReply(c
,shared
.err
);
6120 static void aofRemoveTempFile(pid_t childpid
) {
6123 snprintf(tmpfile
,256,"temp-rewriteaof-bg-%d.aof", (int) childpid
);
6127 /* ================================= Debugging ============================== */
6129 static void debugCommand(redisClient
*c
) {
6130 if (!strcasecmp(c
->argv
[1]->ptr
,"segfault")) {
6132 } else if (!strcasecmp(c
->argv
[1]->ptr
,"reload")) {
6133 if (rdbSave(server
.dbfilename
) != REDIS_OK
) {
6134 addReply(c
,shared
.err
);
6138 if (rdbLoad(server
.dbfilename
) != REDIS_OK
) {
6139 addReply(c
,shared
.err
);
6142 redisLog(REDIS_WARNING
,"DB reloaded by DEBUG RELOAD");
6143 addReply(c
,shared
.ok
);
6144 } else if (!strcasecmp(c
->argv
[1]->ptr
,"loadaof")) {
6146 if (loadAppendOnlyFile(server
.appendfilename
) != REDIS_OK
) {
6147 addReply(c
,shared
.err
);
6150 redisLog(REDIS_WARNING
,"Append Only File loaded by DEBUG LOADAOF");
6151 addReply(c
,shared
.ok
);
6152 } else if (!strcasecmp(c
->argv
[1]->ptr
,"object") && c
->argc
== 3) {
6153 dictEntry
*de
= dictFind(c
->db
->dict
,c
->argv
[2]);
6157 addReply(c
,shared
.nokeyerr
);
6160 key
= dictGetEntryKey(de
);
6161 val
= dictGetEntryVal(de
);
6162 addReplySds(c
,sdscatprintf(sdsempty(),
6163 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
6164 (void*)key
, key
->refcount
, (void*)val
, val
->refcount
,
6167 addReplySds(c
,sdsnew(
6168 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>|RELOAD]\r\n"));
6172 static void _redisAssert(char *estr
) {
6173 redisLog(REDIS_WARNING
,"=== ASSERTION FAILED ===");
6174 redisLog(REDIS_WARNING
,"==> %s\n",estr
);
6175 #ifdef HAVE_BACKTRACE
6176 redisLog(REDIS_WARNING
,"(forcing SIGSEGV in order to print the stack trace)");
6181 /* =================================== Main! ================================ */
6184 int linuxOvercommitMemoryValue(void) {
6185 FILE *fp
= fopen("/proc/sys/vm/overcommit_memory","r");
6189 if (fgets(buf
,64,fp
) == NULL
) {
6198 void linuxOvercommitMemoryWarning(void) {
6199 if (linuxOvercommitMemoryValue() == 0) {
6200 redisLog(REDIS_WARNING
,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
6203 #endif /* __linux__ */
6205 static void daemonize(void) {
6209 if (fork() != 0) exit(0); /* parent exits */
6210 printf("New pid: %d\n", getpid());
6211 setsid(); /* create a new session */
6213 /* Every output goes to /dev/null. If Redis is daemonized but
6214 * the 'logfile' is set to 'stdout' in the configuration file
6215 * it will not log at all. */
6216 if ((fd
= open("/dev/null", O_RDWR
, 0)) != -1) {
6217 dup2(fd
, STDIN_FILENO
);
6218 dup2(fd
, STDOUT_FILENO
);
6219 dup2(fd
, STDERR_FILENO
);
6220 if (fd
> STDERR_FILENO
) close(fd
);
6222 /* Try to write the pid file */
6223 fp
= fopen(server
.pidfile
,"w");
6225 fprintf(fp
,"%d\n",getpid());
6230 int main(int argc
, char **argv
) {
6233 resetServerSaveParams();
6234 loadServerConfig(argv
[1]);
6235 } else if (argc
> 2) {
6236 fprintf(stderr
,"Usage: ./redis-server [/path/to/redis.conf]\n");
6239 redisLog(REDIS_WARNING
,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
6241 if (server
.daemonize
) daemonize();
6243 redisLog(REDIS_NOTICE
,"Server started, Redis version " REDIS_VERSION
);
6245 linuxOvercommitMemoryWarning();
6247 if (server
.appendonly
) {
6248 if (loadAppendOnlyFile(server
.appendfilename
) == REDIS_OK
)
6249 redisLog(REDIS_NOTICE
,"DB loaded from append only file");
6251 if (rdbLoad(server
.dbfilename
) == REDIS_OK
)
6252 redisLog(REDIS_NOTICE
,"DB loaded from disk");
6254 if (aeCreateFileEvent(server
.el
, server
.fd
, AE_READABLE
,
6255 acceptHandler
, NULL
) == AE_ERR
) oom("creating file event");
6256 redisLog(REDIS_NOTICE
,"The server is now ready to accept connections on port %d", server
.port
);
6258 aeDeleteEventLoop(server
.el
);
6262 /* ============================= Backtrace support ========================= */
6264 #ifdef HAVE_BACKTRACE
6265 static char *findFuncName(void *pointer
, unsigned long *offset
);
6267 static void *getMcontextEip(ucontext_t
*uc
) {
6268 #if defined(__FreeBSD__)
6269 return (void*) uc
->uc_mcontext
.mc_eip
;
6270 #elif defined(__dietlibc__)
6271 return (void*) uc
->uc_mcontext
.eip
;
6272 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
6274 return (void*) uc
->uc_mcontext
->__ss
.__rip
;
6276 return (void*) uc
->uc_mcontext
->__ss
.__eip
;
6278 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
6279 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
6280 return (void*) uc
->uc_mcontext
->__ss
.__rip
;
6282 return (void*) uc
->uc_mcontext
->__ss
.__eip
;
6284 #elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
6285 return (void*) uc
->uc_mcontext
.gregs
[REG_EIP
];
6286 #elif defined(__ia64__) /* Linux IA64 */
6287 return (void*) uc
->uc_mcontext
.sc_ip
;
6293 static void segvHandler(int sig
, siginfo_t
*info
, void *secret
) {
6295 char **messages
= NULL
;
6296 int i
, trace_size
= 0;
6297 unsigned long offset
=0;
6298 ucontext_t
*uc
= (ucontext_t
*) secret
;
6300 REDIS_NOTUSED(info
);
6302 redisLog(REDIS_WARNING
,
6303 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION
, sig
);
6304 infostring
= genRedisInfoString();
6305 redisLog(REDIS_WARNING
, "%s",infostring
);
6306 /* It's not safe to sdsfree() the returned string under memory
6307 * corruption conditions. Let it leak as we are going to abort */
6309 trace_size
= backtrace(trace
, 100);
6310 /* overwrite sigaction with caller's address */
6311 if (getMcontextEip(uc
) != NULL
) {
6312 trace
[1] = getMcontextEip(uc
);
6314 messages
= backtrace_symbols(trace
, trace_size
);
6316 for (i
=1; i
<trace_size
; ++i
) {
6317 char *fn
= findFuncName(trace
[i
], &offset
), *p
;
6319 p
= strchr(messages
[i
],'+');
6320 if (!fn
|| (p
&& ((unsigned long)strtol(p
+1,NULL
,10)) < offset
)) {
6321 redisLog(REDIS_WARNING
,"%s", messages
[i
]);
6323 redisLog(REDIS_WARNING
,"%d redis-server %p %s + %d", i
, trace
[i
], fn
, (unsigned int)offset
);
6326 // free(messages); Don't call free() with possibly corrupted memory.
6330 static void setupSigSegvAction(void) {
6331 struct sigaction act
;
6333 sigemptyset (&act
.sa_mask
);
6334 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
6335 * is used. Otherwise, sa_handler is used */
6336 act
.sa_flags
= SA_NODEFER
| SA_ONSTACK
| SA_RESETHAND
| SA_SIGINFO
;
6337 act
.sa_sigaction
= segvHandler
;
6338 sigaction (SIGSEGV
, &act
, NULL
);
6339 sigaction (SIGBUS
, &act
, NULL
);
6340 sigaction (SIGFPE
, &act
, NULL
);
6341 sigaction (SIGILL
, &act
, NULL
);
6342 sigaction (SIGBUS
, &act
, NULL
);
6346 #include "staticsymbols.h"
6347 /* This function try to convert a pointer into a function name. It's used in
6348 * oreder to provide a backtrace under segmentation fault that's able to
6349 * display functions declared as static (otherwise the backtrace is useless). */
6350 static char *findFuncName(void *pointer
, unsigned long *offset
){
6352 unsigned long off
, minoff
= 0;
6354 /* Try to match against the Symbol with the smallest offset */
6355 for (i
=0; symsTable
[i
].pointer
; i
++) {
6356 unsigned long lp
= (unsigned long) pointer
;
6358 if (lp
!= (unsigned long)-1 && lp
>= symsTable
[i
].pointer
) {
6359 off
=lp
-symsTable
[i
].pointer
;
6360 if (ret
< 0 || off
< minoff
) {
6366 if (ret
== -1) return NULL
;
6368 return symsTable
[ret
].name
;
6370 #else /* HAVE_BACKTRACE */
6371 static void setupSigSegvAction(void) {
6373 #endif /* HAVE_BACKTRACE */