2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
30 #define REDIS_VERSION "1.1.94"
40 #define __USE_POSIX199309
46 #endif /* HAVE_BACKTRACE */
54 #include <arpa/inet.h>
58 #include <sys/resource.h>
64 #include "solarisfixes.h"
68 #include "ae.h" /* Event driven programming library */
69 #include "sds.h" /* Dynamic safe strings */
70 #include "anet.h" /* Networking the easy way */
71 #include "dict.h" /* Hash tables */
72 #include "adlist.h" /* Linked lists */
73 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
74 #include "lzf.h" /* LZF compression library */
75 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
81 /* Static server configuration */
82 #define REDIS_SERVERPORT 6379 /* TCP port */
83 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
84 #define REDIS_IOBUF_LEN 1024
85 #define REDIS_LOADBUF_LEN 1024
86 #define REDIS_STATIC_ARGS 4
87 #define REDIS_DEFAULT_DBNUM 16
88 #define REDIS_CONFIGLINE_MAX 1024
89 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
90 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
91 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
92 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
93 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
95 /* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */
96 #define REDIS_WRITEV_THRESHOLD 3
97 /* Max number of iovecs used for each writev call */
98 #define REDIS_WRITEV_IOVEC_COUNT 256
100 /* Hash table parameters */
101 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
104 #define REDIS_CMD_BULK 1 /* Bulk write command */
105 #define REDIS_CMD_INLINE 2 /* Inline command */
106 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
107 this flags will return an error when the 'maxmemory' option is set in the
108 config file and the server is using more than maxmemory bytes of memory.
109 In short this commands are denied on low memory conditions. */
110 #define REDIS_CMD_DENYOOM 4
113 #define REDIS_STRING 0
119 /* Objects encoding */
120 #define REDIS_ENCODING_RAW 0 /* Raw representation */
121 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
123 /* Object types only used for dumping to disk */
124 #define REDIS_EXPIRETIME 253
125 #define REDIS_SELECTDB 254
126 #define REDIS_EOF 255
128 /* Defines related to the dump file format. To store 32 bits lengths for short
129 * keys requires a lot of space, so we check the most significant 2 bits of
130 * the first byte to interpreter the length:
132 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
133 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
134 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
135 * 11|000000 this means: specially encoded object will follow. The six bits
136 * number specify the kind of object that follows.
137 * See the REDIS_RDB_ENC_* defines.
139 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
140 * values, will fit inside. */
141 #define REDIS_RDB_6BITLEN 0
142 #define REDIS_RDB_14BITLEN 1
143 #define REDIS_RDB_32BITLEN 2
144 #define REDIS_RDB_ENCVAL 3
145 #define REDIS_RDB_LENERR UINT_MAX
147 /* When a length of a string object stored on disk has the first two bits
148 * set, the remaining two bits specify a special encoding for the object
149 * accordingly to the following defines: */
150 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
151 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
152 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
153 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
156 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
157 #define REDIS_SLAVE 2 /* This client is a slave server */
158 #define REDIS_MASTER 4 /* This client is a master server */
159 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
161 /* Slave replication state - slave side */
162 #define REDIS_REPL_NONE 0 /* No active replication */
163 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
164 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
166 /* Slave replication state - from the point of view of master
167 * Note that in SEND_BULK and ONLINE state the slave receives new updates
168 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
169 * to start the next background saving in order to send updates to it. */
170 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
171 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
172 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
173 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
175 /* List related stuff */
179 /* Sort operations */
180 #define REDIS_SORT_GET 0
181 #define REDIS_SORT_ASC 1
182 #define REDIS_SORT_DESC 2
183 #define REDIS_SORTKEY_MAX 1024
186 #define REDIS_DEBUG 0
187 #define REDIS_NOTICE 1
188 #define REDIS_WARNING 2
190 /* Anti-warning macro... */
191 #define REDIS_NOTUSED(V) ((void) V)
193 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
194 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
196 /* Append only defines */
197 #define APPENDFSYNC_NO 0
198 #define APPENDFSYNC_ALWAYS 1
199 #define APPENDFSYNC_EVERYSEC 2
201 /* We can print the stacktrace, so our assert is defined this way: */
202 #define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e),exit(1)))
203 static void _redisAssert(char *estr
);
205 /*================================= Data types ============================== */
207 /* A redis object, that is a type able to hold a string / list / set */
208 typedef struct redisObject
{
211 unsigned char encoding
;
212 unsigned char notused
[2];
216 /* Macro used to initalize a Redis object allocated on the stack.
217 * Note that this macro is taken near the structure definition to make sure
218 * we'll update it when the structure is changed, to avoid bugs like
219 * bug #85 introduced exactly in this way. */
220 #define initStaticStringObject(_var,_ptr) do { \
222 _var.type = REDIS_STRING; \
223 _var.encoding = REDIS_ENCODING_RAW; \
227 typedef struct redisDb
{
233 /* With multiplexing we need to take per-clinet state.
234 * Clients are taken in a liked list. */
235 typedef struct redisClient
{
240 robj
**argv
, **mbargv
;
242 int bulklen
; /* bulk read len. -1 if not in bulk read mode */
243 int multibulk
; /* multi bulk command format active */
246 time_t lastinteraction
; /* time of the last interaction, used for timeout */
247 int flags
; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
248 int slaveseldb
; /* slave selected db, if this client is a slave */
249 int authenticated
; /* when requirepass is non-NULL */
250 int replstate
; /* replication state if this is a slave */
251 int repldbfd
; /* replication DB file descriptor */
252 long repldboff
; /* replication DB file offset */
253 off_t repldbsize
; /* replication DB file size */
261 /* Global server state structure */
267 unsigned int sharingpoolsize
;
268 long long dirty
; /* changes to DB from the last save */
270 list
*slaves
, *monitors
;
271 char neterr
[ANET_ERR_LEN
];
273 int cronloops
; /* number of times the cron function run */
274 list
*objfreelist
; /* A list of freed objects to avoid malloc() */
275 time_t lastsave
; /* Unix time of last save succeeede */
276 size_t usedmemory
; /* Used memory in megabytes */
277 /* Fields used only for stats */
278 time_t stat_starttime
; /* server start time */
279 long long stat_numcommands
; /* number of processed commands */
280 long long stat_numconnections
; /* number of connections received */
293 pid_t bgsavechildpid
;
294 pid_t bgrewritechildpid
;
295 sds bgrewritebuf
; /* buffer taken by parent during oppend only rewrite */
296 struct saveparam
*saveparams
;
301 char *appendfilename
;
305 /* Replication related */
310 redisClient
*master
; /* client that is master for this slave */
312 unsigned int maxclients
;
313 unsigned long maxmemory
;
314 /* Sort parameters - qsort_r() is only available under BSD so we
315 * have to take this state global, in order to pass it to sortCompare() */
321 typedef void redisCommandProc(redisClient
*c
);
322 struct redisCommand
{
324 redisCommandProc
*proc
;
329 struct redisFunctionSym
{
331 unsigned long pointer
;
334 typedef struct _redisSortObject
{
342 typedef struct _redisSortOperation
{
345 } redisSortOperation
;
347 /* ZSETs use a specialized version of Skiplists */
349 typedef struct zskiplistNode
{
350 struct zskiplistNode
**forward
;
351 struct zskiplistNode
*backward
;
356 typedef struct zskiplist
{
357 struct zskiplistNode
*header
, *tail
;
358 unsigned long length
;
362 typedef struct zset
{
367 /* Our shared "common" objects */
369 struct sharedObjectsStruct
{
370 robj
*crlf
, *ok
, *err
, *emptybulk
, *czero
, *cone
, *pong
, *space
,
371 *colon
, *nullbulk
, *nullmultibulk
,
372 *emptymultibulk
, *wrongtypeerr
, *nokeyerr
, *syntaxerr
, *sameobjecterr
,
373 *outofrangeerr
, *plus
,
374 *select0
, *select1
, *select2
, *select3
, *select4
,
375 *select5
, *select6
, *select7
, *select8
, *select9
;
378 /* Global vars that are actally used as constants. The following double
379 * values are used for double on-disk serialization, and are initialized
380 * at runtime to avoid strange compiler optimizations. */
382 static double R_Zero
, R_PosInf
, R_NegInf
, R_Nan
;
384 /*================================ Prototypes =============================== */
386 static void freeStringObject(robj
*o
);
387 static void freeListObject(robj
*o
);
388 static void freeSetObject(robj
*o
);
389 static void decrRefCount(void *o
);
390 static robj
*createObject(int type
, void *ptr
);
391 static void freeClient(redisClient
*c
);
392 static int rdbLoad(char *filename
);
393 static void addReply(redisClient
*c
, robj
*obj
);
394 static void addReplySds(redisClient
*c
, sds s
);
395 static void incrRefCount(robj
*o
);
396 static int rdbSaveBackground(char *filename
);
397 static robj
*createStringObject(char *ptr
, size_t len
);
398 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
);
399 static void feedAppendOnlyFile(struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
);
400 static int syncWithMaster(void);
401 static robj
*tryObjectSharing(robj
*o
);
402 static int tryObjectEncoding(robj
*o
);
403 static robj
*getDecodedObject(robj
*o
);
404 static int removeExpire(redisDb
*db
, robj
*key
);
405 static int expireIfNeeded(redisDb
*db
, robj
*key
);
406 static int deleteIfVolatile(redisDb
*db
, robj
*key
);
407 static int deleteKey(redisDb
*db
, robj
*key
);
408 static time_t getExpire(redisDb
*db
, robj
*key
);
409 static int setExpire(redisDb
*db
, robj
*key
, time_t when
);
410 static void updateSlavesWaitingBgsave(int bgsaveerr
);
411 static void freeMemoryIfNeeded(void);
412 static int processCommand(redisClient
*c
);
413 static void setupSigSegvAction(void);
414 static void rdbRemoveTempFile(pid_t childpid
);
415 static void aofRemoveTempFile(pid_t childpid
);
416 static size_t stringObjectLen(robj
*o
);
417 static void processInputBuffer(redisClient
*c
);
418 static zskiplist
*zslCreate(void);
419 static void zslFree(zskiplist
*zsl
);
420 static void zslInsert(zskiplist
*zsl
, double score
, robj
*obj
);
421 static void sendReplyToClientWritev(aeEventLoop
*el
, int fd
, void *privdata
, int mask
);
423 static void authCommand(redisClient
*c
);
424 static void pingCommand(redisClient
*c
);
425 static void echoCommand(redisClient
*c
);
426 static void setCommand(redisClient
*c
);
427 static void setnxCommand(redisClient
*c
);
428 static void getCommand(redisClient
*c
);
429 static void delCommand(redisClient
*c
);
430 static void existsCommand(redisClient
*c
);
431 static void incrCommand(redisClient
*c
);
432 static void decrCommand(redisClient
*c
);
433 static void incrbyCommand(redisClient
*c
);
434 static void decrbyCommand(redisClient
*c
);
435 static void selectCommand(redisClient
*c
);
436 static void randomkeyCommand(redisClient
*c
);
437 static void keysCommand(redisClient
*c
);
438 static void dbsizeCommand(redisClient
*c
);
439 static void lastsaveCommand(redisClient
*c
);
440 static void saveCommand(redisClient
*c
);
441 static void bgsaveCommand(redisClient
*c
);
442 static void bgrewriteaofCommand(redisClient
*c
);
443 static void shutdownCommand(redisClient
*c
);
444 static void moveCommand(redisClient
*c
);
445 static void renameCommand(redisClient
*c
);
446 static void renamenxCommand(redisClient
*c
);
447 static void lpushCommand(redisClient
*c
);
448 static void rpushCommand(redisClient
*c
);
449 static void lpopCommand(redisClient
*c
);
450 static void rpopCommand(redisClient
*c
);
451 static void llenCommand(redisClient
*c
);
452 static void lindexCommand(redisClient
*c
);
453 static void lrangeCommand(redisClient
*c
);
454 static void ltrimCommand(redisClient
*c
);
455 static void typeCommand(redisClient
*c
);
456 static void lsetCommand(redisClient
*c
);
457 static void saddCommand(redisClient
*c
);
458 static void sremCommand(redisClient
*c
);
459 static void smoveCommand(redisClient
*c
);
460 static void sismemberCommand(redisClient
*c
);
461 static void scardCommand(redisClient
*c
);
462 static void spopCommand(redisClient
*c
);
463 static void srandmemberCommand(redisClient
*c
);
464 static void sinterCommand(redisClient
*c
);
465 static void sinterstoreCommand(redisClient
*c
);
466 static void sunionCommand(redisClient
*c
);
467 static void sunionstoreCommand(redisClient
*c
);
468 static void sdiffCommand(redisClient
*c
);
469 static void sdiffstoreCommand(redisClient
*c
);
470 static void syncCommand(redisClient
*c
);
471 static void flushdbCommand(redisClient
*c
);
472 static void flushallCommand(redisClient
*c
);
473 static void sortCommand(redisClient
*c
);
474 static void lremCommand(redisClient
*c
);
475 static void rpoplpushcommand(redisClient
*c
);
476 static void infoCommand(redisClient
*c
);
477 static void mgetCommand(redisClient
*c
);
478 static void monitorCommand(redisClient
*c
);
479 static void expireCommand(redisClient
*c
);
480 static void expireatCommand(redisClient
*c
);
481 static void getsetCommand(redisClient
*c
);
482 static void ttlCommand(redisClient
*c
);
483 static void slaveofCommand(redisClient
*c
);
484 static void debugCommand(redisClient
*c
);
485 static void msetCommand(redisClient
*c
);
486 static void msetnxCommand(redisClient
*c
);
487 static void zaddCommand(redisClient
*c
);
488 static void zincrbyCommand(redisClient
*c
);
489 static void zrangeCommand(redisClient
*c
);
490 static void zrangebyscoreCommand(redisClient
*c
);
491 static void zrevrangeCommand(redisClient
*c
);
492 static void zcardCommand(redisClient
*c
);
493 static void zremCommand(redisClient
*c
);
494 static void zscoreCommand(redisClient
*c
);
495 static void zremrangebyscoreCommand(redisClient
*c
);
497 /*================================= Globals ================================= */
500 static struct redisServer server
; /* server global state */
501 static struct redisCommand cmdTable
[] = {
502 {"get",getCommand
,2,REDIS_CMD_INLINE
},
503 {"set",setCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
504 {"setnx",setnxCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
505 {"del",delCommand
,-2,REDIS_CMD_INLINE
},
506 {"exists",existsCommand
,2,REDIS_CMD_INLINE
},
507 {"incr",incrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
508 {"decr",decrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
509 {"mget",mgetCommand
,-2,REDIS_CMD_INLINE
},
510 {"rpush",rpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
511 {"lpush",lpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
512 {"rpop",rpopCommand
,2,REDIS_CMD_INLINE
},
513 {"lpop",lpopCommand
,2,REDIS_CMD_INLINE
},
514 {"llen",llenCommand
,2,REDIS_CMD_INLINE
},
515 {"lindex",lindexCommand
,3,REDIS_CMD_INLINE
},
516 {"lset",lsetCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
517 {"lrange",lrangeCommand
,4,REDIS_CMD_INLINE
},
518 {"ltrim",ltrimCommand
,4,REDIS_CMD_INLINE
},
519 {"lrem",lremCommand
,4,REDIS_CMD_BULK
},
520 {"rpoplpush",rpoplpushcommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
521 {"sadd",saddCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
522 {"srem",sremCommand
,3,REDIS_CMD_BULK
},
523 {"smove",smoveCommand
,4,REDIS_CMD_BULK
},
524 {"sismember",sismemberCommand
,3,REDIS_CMD_BULK
},
525 {"scard",scardCommand
,2,REDIS_CMD_INLINE
},
526 {"spop",spopCommand
,2,REDIS_CMD_INLINE
},
527 {"srandmember",srandmemberCommand
,2,REDIS_CMD_INLINE
},
528 {"sinter",sinterCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
529 {"sinterstore",sinterstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
530 {"sunion",sunionCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
531 {"sunionstore",sunionstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
532 {"sdiff",sdiffCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
533 {"sdiffstore",sdiffstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
534 {"smembers",sinterCommand
,2,REDIS_CMD_INLINE
},
535 {"zadd",zaddCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
536 {"zincrby",zincrbyCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
537 {"zrem",zremCommand
,3,REDIS_CMD_BULK
},
538 {"zremrangebyscore",zremrangebyscoreCommand
,4,REDIS_CMD_INLINE
},
539 {"zrange",zrangeCommand
,-4,REDIS_CMD_INLINE
},
540 {"zrangebyscore",zrangebyscoreCommand
,-4,REDIS_CMD_INLINE
},
541 {"zrevrange",zrevrangeCommand
,-4,REDIS_CMD_INLINE
},
542 {"zcard",zcardCommand
,2,REDIS_CMD_INLINE
},
543 {"zscore",zscoreCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
544 {"incrby",incrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
545 {"decrby",decrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
546 {"getset",getsetCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
547 {"mset",msetCommand
,-3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
548 {"msetnx",msetnxCommand
,-3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
549 {"randomkey",randomkeyCommand
,1,REDIS_CMD_INLINE
},
550 {"select",selectCommand
,2,REDIS_CMD_INLINE
},
551 {"move",moveCommand
,3,REDIS_CMD_INLINE
},
552 {"rename",renameCommand
,3,REDIS_CMD_INLINE
},
553 {"renamenx",renamenxCommand
,3,REDIS_CMD_INLINE
},
554 {"expire",expireCommand
,3,REDIS_CMD_INLINE
},
555 {"expireat",expireatCommand
,3,REDIS_CMD_INLINE
},
556 {"keys",keysCommand
,2,REDIS_CMD_INLINE
},
557 {"dbsize",dbsizeCommand
,1,REDIS_CMD_INLINE
},
558 {"auth",authCommand
,2,REDIS_CMD_INLINE
},
559 {"ping",pingCommand
,1,REDIS_CMD_INLINE
},
560 {"echo",echoCommand
,2,REDIS_CMD_BULK
},
561 {"save",saveCommand
,1,REDIS_CMD_INLINE
},
562 {"bgsave",bgsaveCommand
,1,REDIS_CMD_INLINE
},
563 {"bgrewriteaof",bgrewriteaofCommand
,1,REDIS_CMD_INLINE
},
564 {"shutdown",shutdownCommand
,1,REDIS_CMD_INLINE
},
565 {"lastsave",lastsaveCommand
,1,REDIS_CMD_INLINE
},
566 {"type",typeCommand
,2,REDIS_CMD_INLINE
},
567 {"sync",syncCommand
,1,REDIS_CMD_INLINE
},
568 {"flushdb",flushdbCommand
,1,REDIS_CMD_INLINE
},
569 {"flushall",flushallCommand
,1,REDIS_CMD_INLINE
},
570 {"sort",sortCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
571 {"info",infoCommand
,1,REDIS_CMD_INLINE
},
572 {"monitor",monitorCommand
,1,REDIS_CMD_INLINE
},
573 {"ttl",ttlCommand
,2,REDIS_CMD_INLINE
},
574 {"slaveof",slaveofCommand
,3,REDIS_CMD_INLINE
},
575 {"debug",debugCommand
,-2,REDIS_CMD_INLINE
},
579 /*============================ Utility functions ============================ */
581 /* Glob-style pattern matching. */
582 int stringmatchlen(const char *pattern
, int patternLen
,
583 const char *string
, int stringLen
, int nocase
)
588 while (pattern
[1] == '*') {
593 return 1; /* match */
595 if (stringmatchlen(pattern
+1, patternLen
-1,
596 string
, stringLen
, nocase
))
597 return 1; /* match */
601 return 0; /* no match */
605 return 0; /* no match */
615 not = pattern
[0] == '^';
622 if (pattern
[0] == '\\') {
625 if (pattern
[0] == string
[0])
627 } else if (pattern
[0] == ']') {
629 } else if (patternLen
== 0) {
633 } else if (pattern
[1] == '-' && patternLen
>= 3) {
634 int start
= pattern
[0];
635 int end
= pattern
[2];
643 start
= tolower(start
);
649 if (c
>= start
&& c
<= end
)
653 if (pattern
[0] == string
[0])
656 if (tolower((int)pattern
[0]) == tolower((int)string
[0]))
666 return 0; /* no match */
672 if (patternLen
>= 2) {
679 if (pattern
[0] != string
[0])
680 return 0; /* no match */
682 if (tolower((int)pattern
[0]) != tolower((int)string
[0]))
683 return 0; /* no match */
691 if (stringLen
== 0) {
692 while(*pattern
== '*') {
699 if (patternLen
== 0 && stringLen
== 0)
704 static void redisLog(int level
, const char *fmt
, ...) {
708 fp
= (server
.logfile
== NULL
) ? stdout
: fopen(server
.logfile
,"a");
712 if (level
>= server
.verbosity
) {
718 strftime(buf
,64,"%d %b %H:%M:%S",localtime(&now
));
719 fprintf(fp
,"%s %c ",buf
,c
[level
]);
720 vfprintf(fp
, fmt
, ap
);
726 if (server
.logfile
) fclose(fp
);
729 /*====================== Hash table type implementation ==================== */
731 /* This is an hash table type that uses the SDS dynamic strings libary as
732 * keys and radis objects as values (objects can hold SDS strings,
735 static void dictVanillaFree(void *privdata
, void *val
)
737 DICT_NOTUSED(privdata
);
741 static int sdsDictKeyCompare(void *privdata
, const void *key1
,
745 DICT_NOTUSED(privdata
);
747 l1
= sdslen((sds
)key1
);
748 l2
= sdslen((sds
)key2
);
749 if (l1
!= l2
) return 0;
750 return memcmp(key1
, key2
, l1
) == 0;
753 static void dictRedisObjectDestructor(void *privdata
, void *val
)
755 DICT_NOTUSED(privdata
);
760 static int dictObjKeyCompare(void *privdata
, const void *key1
,
763 const robj
*o1
= key1
, *o2
= key2
;
764 return sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
767 static unsigned int dictObjHash(const void *key
) {
769 return dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
772 static int dictEncObjKeyCompare(void *privdata
, const void *key1
,
775 robj
*o1
= (robj
*) key1
, *o2
= (robj
*) key2
;
778 o1
= getDecodedObject(o1
);
779 o2
= getDecodedObject(o2
);
780 cmp
= sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
786 static unsigned int dictEncObjHash(const void *key
) {
787 robj
*o
= (robj
*) key
;
789 o
= getDecodedObject(o
);
790 unsigned int hash
= dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
795 static dictType setDictType
= {
796 dictEncObjHash
, /* hash function */
799 dictEncObjKeyCompare
, /* key compare */
800 dictRedisObjectDestructor
, /* key destructor */
801 NULL
/* val destructor */
804 static dictType zsetDictType
= {
805 dictEncObjHash
, /* hash function */
808 dictEncObjKeyCompare
, /* key compare */
809 dictRedisObjectDestructor
, /* key destructor */
810 dictVanillaFree
/* val destructor of malloc(sizeof(double)) */
813 static dictType hashDictType
= {
814 dictObjHash
, /* hash function */
817 dictObjKeyCompare
, /* key compare */
818 dictRedisObjectDestructor
, /* key destructor */
819 dictRedisObjectDestructor
/* val destructor */
822 /* ========================= Random utility functions ======================= */
824 /* Redis generally does not try to recover from out of memory conditions
825 * when allocating objects or strings, it is not clear if it will be possible
826 * to report this condition to the client since the networking layer itself
827 * is based on heap allocation for send buffers, so we simply abort.
828 * At least the code will be simpler to read... */
829 static void oom(const char *msg
) {
830 redisLog(REDIS_WARNING
, "%s: Out of memory\n",msg
);
835 /* ====================== Redis server networking stuff ===================== */
836 static void closeTimedoutClients(void) {
839 time_t now
= time(NULL
);
841 listRewind(server
.clients
);
842 while ((ln
= listYield(server
.clients
)) != NULL
) {
843 c
= listNodeValue(ln
);
844 if (!(c
->flags
& REDIS_SLAVE
) && /* no timeout for slaves */
845 !(c
->flags
& REDIS_MASTER
) && /* no timeout for masters */
846 (now
- c
->lastinteraction
> server
.maxidletime
)) {
847 redisLog(REDIS_DEBUG
,"Closing idle client");
853 static int htNeedsResize(dict
*dict
) {
854 long long size
, used
;
856 size
= dictSlots(dict
);
857 used
= dictSize(dict
);
858 return (size
&& used
&& size
> DICT_HT_INITIAL_SIZE
&&
859 (used
*100/size
< REDIS_HT_MINFILL
));
862 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
863 * we resize the hash table to save memory */
864 static void tryResizeHashTables(void) {
867 for (j
= 0; j
< server
.dbnum
; j
++) {
868 if (htNeedsResize(server
.db
[j
].dict
)) {
869 redisLog(REDIS_DEBUG
,"The hash table %d is too sparse, resize it...",j
);
870 dictResize(server
.db
[j
].dict
);
871 redisLog(REDIS_DEBUG
,"Hash table %d resized.",j
);
873 if (htNeedsResize(server
.db
[j
].expires
))
874 dictResize(server
.db
[j
].expires
);
878 /* A background saving child (BGSAVE) terminated its work. Handle this. */
879 void backgroundSaveDoneHandler(int statloc
) {
880 int exitcode
= WEXITSTATUS(statloc
);
881 int bysignal
= WIFSIGNALED(statloc
);
883 if (!bysignal
&& exitcode
== 0) {
884 redisLog(REDIS_NOTICE
,
885 "Background saving terminated with success");
887 server
.lastsave
= time(NULL
);
888 } else if (!bysignal
&& exitcode
!= 0) {
889 redisLog(REDIS_WARNING
, "Background saving error");
891 redisLog(REDIS_WARNING
,
892 "Background saving terminated by signal");
893 rdbRemoveTempFile(server
.bgsavechildpid
);
895 server
.bgsavechildpid
= -1;
896 /* Possibly there are slaves waiting for a BGSAVE in order to be served
897 * (the first stage of SYNC is a bulk transfer of dump.rdb) */
898 updateSlavesWaitingBgsave(exitcode
== 0 ? REDIS_OK
: REDIS_ERR
);
901 /* A background append only file rewriting (BGREWRITEAOF) terminated its work.
903 void backgroundRewriteDoneHandler(int statloc
) {
904 int exitcode
= WEXITSTATUS(statloc
);
905 int bysignal
= WIFSIGNALED(statloc
);
907 if (!bysignal
&& exitcode
== 0) {
911 redisLog(REDIS_NOTICE
,
912 "Background append only file rewriting terminated with success");
913 /* Now it's time to flush the differences accumulated by the parent */
914 snprintf(tmpfile
,256,"temp-rewriteaof-bg-%d.aof", (int) server
.bgrewritechildpid
);
915 fd
= open(tmpfile
,O_WRONLY
|O_APPEND
);
917 redisLog(REDIS_WARNING
, "Not able to open the temp append only file produced by the child: %s", strerror(errno
));
920 /* Flush our data... */
921 if (write(fd
,server
.bgrewritebuf
,sdslen(server
.bgrewritebuf
)) !=
922 (signed) sdslen(server
.bgrewritebuf
)) {
923 redisLog(REDIS_WARNING
, "Error or short write trying to flush the parent diff of the append log file in the child temp file: %s", strerror(errno
));
927 redisLog(REDIS_NOTICE
,"Parent diff flushed into the new append log file with success (%lu bytes)",sdslen(server
.bgrewritebuf
));
928 /* Now our work is to rename the temp file into the stable file. And
929 * switch the file descriptor used by the server for append only. */
930 if (rename(tmpfile
,server
.appendfilename
) == -1) {
931 redisLog(REDIS_WARNING
,"Can't rename the temp append only file into the stable one: %s", strerror(errno
));
935 /* Mission completed... almost */
936 redisLog(REDIS_NOTICE
,"Append only file successfully rewritten.");
937 if (server
.appendfd
!= -1) {
938 /* If append only is actually enabled... */
939 close(server
.appendfd
);
940 server
.appendfd
= fd
;
942 server
.appendseldb
= -1; /* Make sure it will issue SELECT */
943 redisLog(REDIS_NOTICE
,"The new append only file was selected for future appends.");
945 /* If append only is disabled we just generate a dump in this
946 * format. Why not? */
949 } else if (!bysignal
&& exitcode
!= 0) {
950 redisLog(REDIS_WARNING
, "Background append only file rewriting error");
952 redisLog(REDIS_WARNING
,
953 "Background append only file rewriting terminated by signal");
956 sdsfree(server
.bgrewritebuf
);
957 server
.bgrewritebuf
= sdsempty();
958 aofRemoveTempFile(server
.bgrewritechildpid
);
959 server
.bgrewritechildpid
= -1;
962 static int serverCron(struct aeEventLoop
*eventLoop
, long long id
, void *clientData
) {
963 int j
, loops
= server
.cronloops
++;
964 REDIS_NOTUSED(eventLoop
);
966 REDIS_NOTUSED(clientData
);
968 /* Update the global state with the amount of used memory */
969 server
.usedmemory
= zmalloc_used_memory();
971 /* Show some info about non-empty databases */
972 for (j
= 0; j
< server
.dbnum
; j
++) {
973 long long size
, used
, vkeys
;
975 size
= dictSlots(server
.db
[j
].dict
);
976 used
= dictSize(server
.db
[j
].dict
);
977 vkeys
= dictSize(server
.db
[j
].expires
);
978 if (!(loops
% 5) && (used
|| vkeys
)) {
979 redisLog(REDIS_DEBUG
,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j
,used
,vkeys
,size
);
980 /* dictPrintStats(server.dict); */
984 /* We don't want to resize the hash tables while a bacground saving
985 * is in progress: the saving child is created using fork() that is
986 * implemented with a copy-on-write semantic in most modern systems, so
987 * if we resize the HT while there is the saving child at work actually
988 * a lot of memory movements in the parent will cause a lot of pages
990 if (server
.bgsavechildpid
== -1) tryResizeHashTables();
992 /* Show information about connected clients */
994 redisLog(REDIS_DEBUG
,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
995 listLength(server
.clients
)-listLength(server
.slaves
),
996 listLength(server
.slaves
),
998 dictSize(server
.sharingpool
));
1001 /* Close connections of timedout clients */
1002 if (server
.maxidletime
&& !(loops
% 10))
1003 closeTimedoutClients();
1005 /* Check if a background saving or AOF rewrite in progress terminated */
1006 if (server
.bgsavechildpid
!= -1 || server
.bgrewritechildpid
!= -1) {
1010 if ((pid
= wait3(&statloc
,WNOHANG
,NULL
)) != 0) {
1011 if (pid
== server
.bgsavechildpid
) {
1012 backgroundSaveDoneHandler(statloc
);
1014 backgroundRewriteDoneHandler(statloc
);
1018 /* If there is not a background saving in progress check if
1019 * we have to save now */
1020 time_t now
= time(NULL
);
1021 for (j
= 0; j
< server
.saveparamslen
; j
++) {
1022 struct saveparam
*sp
= server
.saveparams
+j
;
1024 if (server
.dirty
>= sp
->changes
&&
1025 now
-server
.lastsave
> sp
->seconds
) {
1026 redisLog(REDIS_NOTICE
,"%d changes in %d seconds. Saving...",
1027 sp
->changes
, sp
->seconds
);
1028 rdbSaveBackground(server
.dbfilename
);
1034 /* Try to expire a few timed out keys. The algorithm used is adaptive and
1035 * will use few CPU cycles if there are few expiring keys, otherwise
1036 * it will get more aggressive to avoid that too much memory is used by
1037 * keys that can be removed from the keyspace. */
1038 for (j
= 0; j
< server
.dbnum
; j
++) {
1040 redisDb
*db
= server
.db
+j
;
1042 /* Continue to expire if at the end of the cycle more than 25%
1043 * of the keys were expired. */
1045 int num
= dictSize(db
->expires
);
1046 time_t now
= time(NULL
);
1049 if (num
> REDIS_EXPIRELOOKUPS_PER_CRON
)
1050 num
= REDIS_EXPIRELOOKUPS_PER_CRON
;
1055 if ((de
= dictGetRandomKey(db
->expires
)) == NULL
) break;
1056 t
= (time_t) dictGetEntryVal(de
);
1058 deleteKey(db
,dictGetEntryKey(de
));
1062 } while (expired
> REDIS_EXPIRELOOKUPS_PER_CRON
/4);
1065 /* Check if we should connect to a MASTER */
1066 if (server
.replstate
== REDIS_REPL_CONNECT
) {
1067 redisLog(REDIS_NOTICE
,"Connecting to MASTER...");
1068 if (syncWithMaster() == REDIS_OK
) {
1069 redisLog(REDIS_NOTICE
,"MASTER <-> SLAVE sync succeeded");
1075 static void createSharedObjects(void) {
1076 shared
.crlf
= createObject(REDIS_STRING
,sdsnew("\r\n"));
1077 shared
.ok
= createObject(REDIS_STRING
,sdsnew("+OK\r\n"));
1078 shared
.err
= createObject(REDIS_STRING
,sdsnew("-ERR\r\n"));
1079 shared
.emptybulk
= createObject(REDIS_STRING
,sdsnew("$0\r\n\r\n"));
1080 shared
.czero
= createObject(REDIS_STRING
,sdsnew(":0\r\n"));
1081 shared
.cone
= createObject(REDIS_STRING
,sdsnew(":1\r\n"));
1082 shared
.nullbulk
= createObject(REDIS_STRING
,sdsnew("$-1\r\n"));
1083 shared
.nullmultibulk
= createObject(REDIS_STRING
,sdsnew("*-1\r\n"));
1084 shared
.emptymultibulk
= createObject(REDIS_STRING
,sdsnew("*0\r\n"));
1085 shared
.pong
= createObject(REDIS_STRING
,sdsnew("+PONG\r\n"));
1086 shared
.wrongtypeerr
= createObject(REDIS_STRING
,sdsnew(
1087 "-ERR Operation against a key holding the wrong kind of value\r\n"));
1088 shared
.nokeyerr
= createObject(REDIS_STRING
,sdsnew(
1089 "-ERR no such key\r\n"));
1090 shared
.syntaxerr
= createObject(REDIS_STRING
,sdsnew(
1091 "-ERR syntax error\r\n"));
1092 shared
.sameobjecterr
= createObject(REDIS_STRING
,sdsnew(
1093 "-ERR source and destination objects are the same\r\n"));
1094 shared
.outofrangeerr
= createObject(REDIS_STRING
,sdsnew(
1095 "-ERR index out of range\r\n"));
1096 shared
.space
= createObject(REDIS_STRING
,sdsnew(" "));
1097 shared
.colon
= createObject(REDIS_STRING
,sdsnew(":"));
1098 shared
.plus
= createObject(REDIS_STRING
,sdsnew("+"));
1099 shared
.select0
= createStringObject("select 0\r\n",10);
1100 shared
.select1
= createStringObject("select 1\r\n",10);
1101 shared
.select2
= createStringObject("select 2\r\n",10);
1102 shared
.select3
= createStringObject("select 3\r\n",10);
1103 shared
.select4
= createStringObject("select 4\r\n",10);
1104 shared
.select5
= createStringObject("select 5\r\n",10);
1105 shared
.select6
= createStringObject("select 6\r\n",10);
1106 shared
.select7
= createStringObject("select 7\r\n",10);
1107 shared
.select8
= createStringObject("select 8\r\n",10);
1108 shared
.select9
= createStringObject("select 9\r\n",10);
1111 static void appendServerSaveParams(time_t seconds
, int changes
) {
1112 server
.saveparams
= zrealloc(server
.saveparams
,sizeof(struct saveparam
)*(server
.saveparamslen
+1));
1113 server
.saveparams
[server
.saveparamslen
].seconds
= seconds
;
1114 server
.saveparams
[server
.saveparamslen
].changes
= changes
;
1115 server
.saveparamslen
++;
1118 static void resetServerSaveParams() {
1119 zfree(server
.saveparams
);
1120 server
.saveparams
= NULL
;
1121 server
.saveparamslen
= 0;
1124 static void initServerConfig() {
1125 server
.dbnum
= REDIS_DEFAULT_DBNUM
;
1126 server
.port
= REDIS_SERVERPORT
;
1127 server
.verbosity
= REDIS_DEBUG
;
1128 server
.maxidletime
= REDIS_MAXIDLETIME
;
1129 server
.saveparams
= NULL
;
1130 server
.logfile
= NULL
; /* NULL = log on standard output */
1131 server
.bindaddr
= NULL
;
1132 server
.glueoutputbuf
= 1;
1133 server
.daemonize
= 0;
1134 server
.appendonly
= 0;
1135 server
.appendfsync
= APPENDFSYNC_ALWAYS
;
1136 server
.lastfsync
= time(NULL
);
1137 server
.appendfd
= -1;
1138 server
.appendseldb
= -1; /* Make sure the first time will not match */
1139 server
.pidfile
= "/var/run/redis.pid";
1140 server
.dbfilename
= "dump.rdb";
1141 server
.appendfilename
= "appendonly.aof";
1142 server
.requirepass
= NULL
;
1143 server
.shareobjects
= 0;
1144 server
.rdbcompression
= 1;
1145 server
.sharingpoolsize
= 1024;
1146 server
.maxclients
= 0;
1147 server
.maxmemory
= 0;
1148 resetServerSaveParams();
1150 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1151 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1152 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1153 /* Replication related */
1155 server
.masterauth
= NULL
;
1156 server
.masterhost
= NULL
;
1157 server
.masterport
= 6379;
1158 server
.master
= NULL
;
1159 server
.replstate
= REDIS_REPL_NONE
;
1161 /* Double constants initialization */
1163 R_PosInf
= 1.0/R_Zero
;
1164 R_NegInf
= -1.0/R_Zero
;
1165 R_Nan
= R_Zero
/R_Zero
;
1168 static void initServer() {
1171 signal(SIGHUP
, SIG_IGN
);
1172 signal(SIGPIPE
, SIG_IGN
);
1173 setupSigSegvAction();
1175 server
.clients
= listCreate();
1176 server
.slaves
= listCreate();
1177 server
.monitors
= listCreate();
1178 server
.objfreelist
= listCreate();
1179 createSharedObjects();
1180 server
.el
= aeCreateEventLoop();
1181 server
.db
= zmalloc(sizeof(redisDb
)*server
.dbnum
);
1182 server
.sharingpool
= dictCreate(&setDictType
,NULL
);
1183 server
.fd
= anetTcpServer(server
.neterr
, server
.port
, server
.bindaddr
);
1184 if (server
.fd
== -1) {
1185 redisLog(REDIS_WARNING
, "Opening TCP port: %s", server
.neterr
);
1188 for (j
= 0; j
< server
.dbnum
; j
++) {
1189 server
.db
[j
].dict
= dictCreate(&hashDictType
,NULL
);
1190 server
.db
[j
].expires
= dictCreate(&setDictType
,NULL
);
1191 server
.db
[j
].id
= j
;
1193 server
.cronloops
= 0;
1194 server
.bgsavechildpid
= -1;
1195 server
.bgrewritechildpid
= -1;
1196 server
.bgrewritebuf
= sdsempty();
1197 server
.lastsave
= time(NULL
);
1199 server
.usedmemory
= 0;
1200 server
.stat_numcommands
= 0;
1201 server
.stat_numconnections
= 0;
1202 server
.stat_starttime
= time(NULL
);
1203 aeCreateTimeEvent(server
.el
, 1, serverCron
, NULL
, NULL
);
1205 if (server
.appendonly
) {
1206 server
.appendfd
= open(server
.appendfilename
,O_WRONLY
|O_APPEND
|O_CREAT
,0644);
1207 if (server
.appendfd
== -1) {
1208 redisLog(REDIS_WARNING
, "Can't open the append-only file: %s",
1215 /* Empty the whole database */
1216 static long long emptyDb() {
1218 long long removed
= 0;
1220 for (j
= 0; j
< server
.dbnum
; j
++) {
1221 removed
+= dictSize(server
.db
[j
].dict
);
1222 dictEmpty(server
.db
[j
].dict
);
1223 dictEmpty(server
.db
[j
].expires
);
1228 static int yesnotoi(char *s
) {
1229 if (!strcasecmp(s
,"yes")) return 1;
1230 else if (!strcasecmp(s
,"no")) return 0;
1234 /* I agree, this is a very rudimental way to load a configuration...
1235 will improve later if the config gets more complex */
1236 static void loadServerConfig(char *filename
) {
1238 char buf
[REDIS_CONFIGLINE_MAX
+1], *err
= NULL
;
1242 if (filename
[0] == '-' && filename
[1] == '\0')
1245 if ((fp
= fopen(filename
,"r")) == NULL
) {
1246 redisLog(REDIS_WARNING
,"Fatal error, can't open config file");
1251 while(fgets(buf
,REDIS_CONFIGLINE_MAX
+1,fp
) != NULL
) {
1257 line
= sdstrim(line
," \t\r\n");
1259 /* Skip comments and blank lines*/
1260 if (line
[0] == '#' || line
[0] == '\0') {
1265 /* Split into arguments */
1266 argv
= sdssplitlen(line
,sdslen(line
)," ",1,&argc
);
1267 sdstolower(argv
[0]);
1269 /* Execute config directives */
1270 if (!strcasecmp(argv
[0],"timeout") && argc
== 2) {
1271 server
.maxidletime
= atoi(argv
[1]);
1272 if (server
.maxidletime
< 0) {
1273 err
= "Invalid timeout value"; goto loaderr
;
1275 } else if (!strcasecmp(argv
[0],"port") && argc
== 2) {
1276 server
.port
= atoi(argv
[1]);
1277 if (server
.port
< 1 || server
.port
> 65535) {
1278 err
= "Invalid port"; goto loaderr
;
1280 } else if (!strcasecmp(argv
[0],"bind") && argc
== 2) {
1281 server
.bindaddr
= zstrdup(argv
[1]);
1282 } else if (!strcasecmp(argv
[0],"save") && argc
== 3) {
1283 int seconds
= atoi(argv
[1]);
1284 int changes
= atoi(argv
[2]);
1285 if (seconds
< 1 || changes
< 0) {
1286 err
= "Invalid save parameters"; goto loaderr
;
1288 appendServerSaveParams(seconds
,changes
);
1289 } else if (!strcasecmp(argv
[0],"dir") && argc
== 2) {
1290 if (chdir(argv
[1]) == -1) {
1291 redisLog(REDIS_WARNING
,"Can't chdir to '%s': %s",
1292 argv
[1], strerror(errno
));
1295 } else if (!strcasecmp(argv
[0],"loglevel") && argc
== 2) {
1296 if (!strcasecmp(argv
[1],"debug")) server
.verbosity
= REDIS_DEBUG
;
1297 else if (!strcasecmp(argv
[1],"notice")) server
.verbosity
= REDIS_NOTICE
;
1298 else if (!strcasecmp(argv
[1],"warning")) server
.verbosity
= REDIS_WARNING
;
1300 err
= "Invalid log level. Must be one of debug, notice, warning";
1303 } else if (!strcasecmp(argv
[0],"logfile") && argc
== 2) {
1306 server
.logfile
= zstrdup(argv
[1]);
1307 if (!strcasecmp(server
.logfile
,"stdout")) {
1308 zfree(server
.logfile
);
1309 server
.logfile
= NULL
;
1311 if (server
.logfile
) {
1312 /* Test if we are able to open the file. The server will not
1313 * be able to abort just for this problem later... */
1314 logfp
= fopen(server
.logfile
,"a");
1315 if (logfp
== NULL
) {
1316 err
= sdscatprintf(sdsempty(),
1317 "Can't open the log file: %s", strerror(errno
));
1322 } else if (!strcasecmp(argv
[0],"databases") && argc
== 2) {
1323 server
.dbnum
= atoi(argv
[1]);
1324 if (server
.dbnum
< 1) {
1325 err
= "Invalid number of databases"; goto loaderr
;
1327 } else if (!strcasecmp(argv
[0],"maxclients") && argc
== 2) {
1328 server
.maxclients
= atoi(argv
[1]);
1329 } else if (!strcasecmp(argv
[0],"maxmemory") && argc
== 2) {
1330 server
.maxmemory
= strtoll(argv
[1], NULL
, 10);
1331 } else if (!strcasecmp(argv
[0],"slaveof") && argc
== 3) {
1332 server
.masterhost
= sdsnew(argv
[1]);
1333 server
.masterport
= atoi(argv
[2]);
1334 server
.replstate
= REDIS_REPL_CONNECT
;
1335 } else if (!strcasecmp(argv
[0],"masterauth") && argc
== 2) {
1336 server
.masterauth
= zstrdup(argv
[1]);
1337 } else if (!strcasecmp(argv
[0],"glueoutputbuf") && argc
== 2) {
1338 if ((server
.glueoutputbuf
= yesnotoi(argv
[1])) == -1) {
1339 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1341 } else if (!strcasecmp(argv
[0],"shareobjects") && argc
== 2) {
1342 if ((server
.shareobjects
= yesnotoi(argv
[1])) == -1) {
1343 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1345 } else if (!strcasecmp(argv
[0],"rdbcompression") && argc
== 2) {
1346 if ((server
.rdbcompression
= yesnotoi(argv
[1])) == -1) {
1347 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1349 } else if (!strcasecmp(argv
[0],"shareobjectspoolsize") && argc
== 2) {
1350 server
.sharingpoolsize
= atoi(argv
[1]);
1351 if (server
.sharingpoolsize
< 1) {
1352 err
= "invalid object sharing pool size"; goto loaderr
;
1354 } else if (!strcasecmp(argv
[0],"daemonize") && argc
== 2) {
1355 if ((server
.daemonize
= yesnotoi(argv
[1])) == -1) {
1356 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1358 } else if (!strcasecmp(argv
[0],"appendonly") && argc
== 2) {
1359 if ((server
.appendonly
= yesnotoi(argv
[1])) == -1) {
1360 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1362 } else if (!strcasecmp(argv
[0],"appendfsync") && argc
== 2) {
1363 if (!strcasecmp(argv
[1],"no")) {
1364 server
.appendfsync
= APPENDFSYNC_NO
;
1365 } else if (!strcasecmp(argv
[1],"always")) {
1366 server
.appendfsync
= APPENDFSYNC_ALWAYS
;
1367 } else if (!strcasecmp(argv
[1],"everysec")) {
1368 server
.appendfsync
= APPENDFSYNC_EVERYSEC
;
1370 err
= "argument must be 'no', 'always' or 'everysec'";
1373 } else if (!strcasecmp(argv
[0],"requirepass") && argc
== 2) {
1374 server
.requirepass
= zstrdup(argv
[1]);
1375 } else if (!strcasecmp(argv
[0],"pidfile") && argc
== 2) {
1376 server
.pidfile
= zstrdup(argv
[1]);
1377 } else if (!strcasecmp(argv
[0],"dbfilename") && argc
== 2) {
1378 server
.dbfilename
= zstrdup(argv
[1]);
1380 err
= "Bad directive or wrong number of arguments"; goto loaderr
;
1382 for (j
= 0; j
< argc
; j
++)
1387 if (fp
!= stdin
) fclose(fp
);
1391 fprintf(stderr
, "\n*** FATAL CONFIG FILE ERROR ***\n");
1392 fprintf(stderr
, "Reading the configuration file, at line %d\n", linenum
);
1393 fprintf(stderr
, ">>> '%s'\n", line
);
1394 fprintf(stderr
, "%s\n", err
);
1398 static void freeClientArgv(redisClient
*c
) {
1401 for (j
= 0; j
< c
->argc
; j
++)
1402 decrRefCount(c
->argv
[j
]);
1403 for (j
= 0; j
< c
->mbargc
; j
++)
1404 decrRefCount(c
->mbargv
[j
]);
1409 static void freeClient(redisClient
*c
) {
1412 aeDeleteFileEvent(server
.el
,c
->fd
,AE_READABLE
);
1413 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1414 sdsfree(c
->querybuf
);
1415 listRelease(c
->reply
);
1418 ln
= listSearchKey(server
.clients
,c
);
1419 redisAssert(ln
!= NULL
);
1420 listDelNode(server
.clients
,ln
);
1421 if (c
->flags
& REDIS_SLAVE
) {
1422 if (c
->replstate
== REDIS_REPL_SEND_BULK
&& c
->repldbfd
!= -1)
1424 list
*l
= (c
->flags
& REDIS_MONITOR
) ? server
.monitors
: server
.slaves
;
1425 ln
= listSearchKey(l
,c
);
1426 redisAssert(ln
!= NULL
);
1429 if (c
->flags
& REDIS_MASTER
) {
1430 server
.master
= NULL
;
1431 server
.replstate
= REDIS_REPL_CONNECT
;
1438 #define GLUEREPLY_UP_TO (1024)
1439 static void glueReplyBuffersIfNeeded(redisClient
*c
) {
1441 char buf
[GLUEREPLY_UP_TO
];
1445 listRewind(c
->reply
);
1446 while((ln
= listYield(c
->reply
))) {
1450 objlen
= sdslen(o
->ptr
);
1451 if (copylen
+ objlen
<= GLUEREPLY_UP_TO
) {
1452 memcpy(buf
+copylen
,o
->ptr
,objlen
);
1454 listDelNode(c
->reply
,ln
);
1456 if (copylen
== 0) return;
1460 /* Now the output buffer is empty, add the new single element */
1461 o
= createObject(REDIS_STRING
,sdsnewlen(buf
,copylen
));
1462 listAddNodeHead(c
->reply
,o
);
1465 static void sendReplyToClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1466 redisClient
*c
= privdata
;
1467 int nwritten
= 0, totwritten
= 0, objlen
;
1470 REDIS_NOTUSED(mask
);
1472 /* Use writev() if we have enough buffers to send */
1473 if (!server
.glueoutputbuf
&&
1474 listLength(c
->reply
) > REDIS_WRITEV_THRESHOLD
&&
1475 !(c
->flags
& REDIS_MASTER
))
1477 sendReplyToClientWritev(el
, fd
, privdata
, mask
);
1481 while(listLength(c
->reply
)) {
1482 if (server
.glueoutputbuf
&& listLength(c
->reply
) > 1)
1483 glueReplyBuffersIfNeeded(c
);
1485 o
= listNodeValue(listFirst(c
->reply
));
1486 objlen
= sdslen(o
->ptr
);
1489 listDelNode(c
->reply
,listFirst(c
->reply
));
1493 if (c
->flags
& REDIS_MASTER
) {
1494 /* Don't reply to a master */
1495 nwritten
= objlen
- c
->sentlen
;
1497 nwritten
= write(fd
, ((char*)o
->ptr
)+c
->sentlen
, objlen
- c
->sentlen
);
1498 if (nwritten
<= 0) break;
1500 c
->sentlen
+= nwritten
;
1501 totwritten
+= nwritten
;
1502 /* If we fully sent the object on head go to the next one */
1503 if (c
->sentlen
== objlen
) {
1504 listDelNode(c
->reply
,listFirst(c
->reply
));
1507 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1508 * bytes, in a single threaded server it's a good idea to serve
1509 * other clients as well, even if a very large request comes from
1510 * super fast link that is always able to accept data (in real world
1511 * scenario think about 'KEYS *' against the loopback interfae) */
1512 if (totwritten
> REDIS_MAX_WRITE_PER_EVENT
) break;
1514 if (nwritten
== -1) {
1515 if (errno
== EAGAIN
) {
1518 redisLog(REDIS_DEBUG
,
1519 "Error writing to client: %s", strerror(errno
));
1524 if (totwritten
> 0) c
->lastinteraction
= time(NULL
);
1525 if (listLength(c
->reply
) == 0) {
1527 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1531 static void sendReplyToClientWritev(aeEventLoop
*el
, int fd
, void *privdata
, int mask
)
1533 redisClient
*c
= privdata
;
1534 int nwritten
= 0, totwritten
= 0, objlen
, willwrite
;
1536 struct iovec iov
[REDIS_WRITEV_IOVEC_COUNT
];
1537 int offset
, ion
= 0;
1539 REDIS_NOTUSED(mask
);
1542 while (listLength(c
->reply
)) {
1543 offset
= c
->sentlen
;
1547 /* fill-in the iov[] array */
1548 for(node
= listFirst(c
->reply
); node
; node
= listNextNode(node
)) {
1549 o
= listNodeValue(node
);
1550 objlen
= sdslen(o
->ptr
);
1552 if (totwritten
+ objlen
- offset
> REDIS_MAX_WRITE_PER_EVENT
)
1555 if(ion
== REDIS_WRITEV_IOVEC_COUNT
)
1556 break; /* no more iovecs */
1558 iov
[ion
].iov_base
= ((char*)o
->ptr
) + offset
;
1559 iov
[ion
].iov_len
= objlen
- offset
;
1560 willwrite
+= objlen
- offset
;
1561 offset
= 0; /* just for the first item */
1568 /* write all collected blocks at once */
1569 if((nwritten
= writev(fd
, iov
, ion
)) < 0) {
1570 if (errno
!= EAGAIN
) {
1571 redisLog(REDIS_DEBUG
,
1572 "Error writing to client: %s", strerror(errno
));
1579 totwritten
+= nwritten
;
1580 offset
= c
->sentlen
;
1582 /* remove written robjs from c->reply */
1583 while (nwritten
&& listLength(c
->reply
)) {
1584 o
= listNodeValue(listFirst(c
->reply
));
1585 objlen
= sdslen(o
->ptr
);
1587 if(nwritten
>= objlen
- offset
) {
1588 listDelNode(c
->reply
, listFirst(c
->reply
));
1589 nwritten
-= objlen
- offset
;
1593 c
->sentlen
+= nwritten
;
1601 c
->lastinteraction
= time(NULL
);
1603 if (listLength(c
->reply
) == 0) {
1605 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1609 static struct redisCommand
*lookupCommand(char *name
) {
1611 while(cmdTable
[j
].name
!= NULL
) {
1612 if (!strcasecmp(name
,cmdTable
[j
].name
)) return &cmdTable
[j
];
1618 /* resetClient prepare the client to process the next command */
1619 static void resetClient(redisClient
*c
) {
1625 /* If this function gets called we already read a whole
1626 * command, argments are in the client argv/argc fields.
1627 * processCommand() execute the command or prepare the
1628 * server for a bulk read from the client.
1630 * If 1 is returned the client is still alive and valid and
1631 * and other operations can be performed by the caller. Otherwise
1632 * if 0 is returned the client was destroied (i.e. after QUIT). */
1633 static int processCommand(redisClient
*c
) {
1634 struct redisCommand
*cmd
;
1637 /* Free some memory if needed (maxmemory setting) */
1638 if (server
.maxmemory
) freeMemoryIfNeeded();
1640 /* Handle the multi bulk command type. This is an alternative protocol
1641 * supported by Redis in order to receive commands that are composed of
1642 * multiple binary-safe "bulk" arguments. The latency of processing is
1643 * a bit higher but this allows things like multi-sets, so if this
1644 * protocol is used only for MSET and similar commands this is a big win. */
1645 if (c
->multibulk
== 0 && c
->argc
== 1 && ((char*)(c
->argv
[0]->ptr
))[0] == '*') {
1646 c
->multibulk
= atoi(((char*)c
->argv
[0]->ptr
)+1);
1647 if (c
->multibulk
<= 0) {
1651 decrRefCount(c
->argv
[c
->argc
-1]);
1655 } else if (c
->multibulk
) {
1656 if (c
->bulklen
== -1) {
1657 if (((char*)c
->argv
[0]->ptr
)[0] != '$') {
1658 addReplySds(c
,sdsnew("-ERR multi bulk protocol error\r\n"));
1662 int bulklen
= atoi(((char*)c
->argv
[0]->ptr
)+1);
1663 decrRefCount(c
->argv
[0]);
1664 if (bulklen
< 0 || bulklen
> 1024*1024*1024) {
1666 addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n"));
1671 c
->bulklen
= bulklen
+2; /* add two bytes for CR+LF */
1675 c
->mbargv
= zrealloc(c
->mbargv
,(sizeof(robj
*))*(c
->mbargc
+1));
1676 c
->mbargv
[c
->mbargc
] = c
->argv
[0];
1680 if (c
->multibulk
== 0) {
1684 /* Here we need to swap the multi-bulk argc/argv with the
1685 * normal argc/argv of the client structure. */
1687 c
->argv
= c
->mbargv
;
1688 c
->mbargv
= auxargv
;
1691 c
->argc
= c
->mbargc
;
1692 c
->mbargc
= auxargc
;
1694 /* We need to set bulklen to something different than -1
1695 * in order for the code below to process the command without
1696 * to try to read the last argument of a bulk command as
1697 * a special argument. */
1699 /* continue below and process the command */
1706 /* -- end of multi bulk commands processing -- */
1708 /* The QUIT command is handled as a special case. Normal command
1709 * procs are unable to close the client connection safely */
1710 if (!strcasecmp(c
->argv
[0]->ptr
,"quit")) {
1714 cmd
= lookupCommand(c
->argv
[0]->ptr
);
1717 sdscatprintf(sdsempty(), "-ERR unknown command '%s'\r\n",
1718 (char*)c
->argv
[0]->ptr
));
1721 } else if ((cmd
->arity
> 0 && cmd
->arity
!= c
->argc
) ||
1722 (c
->argc
< -cmd
->arity
)) {
1724 sdscatprintf(sdsempty(),
1725 "-ERR wrong number of arguments for '%s' command\r\n",
1729 } else if (server
.maxmemory
&& cmd
->flags
& REDIS_CMD_DENYOOM
&& zmalloc_used_memory() > server
.maxmemory
) {
1730 addReplySds(c
,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1733 } else if (cmd
->flags
& REDIS_CMD_BULK
&& c
->bulklen
== -1) {
1734 int bulklen
= atoi(c
->argv
[c
->argc
-1]->ptr
);
1736 decrRefCount(c
->argv
[c
->argc
-1]);
1737 if (bulklen
< 0 || bulklen
> 1024*1024*1024) {
1739 addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n"));
1744 c
->bulklen
= bulklen
+2; /* add two bytes for CR+LF */
1745 /* It is possible that the bulk read is already in the
1746 * buffer. Check this condition and handle it accordingly.
1747 * This is just a fast path, alternative to call processInputBuffer().
1748 * It's a good idea since the code is small and this condition
1749 * happens most of the times. */
1750 if ((signed)sdslen(c
->querybuf
) >= c
->bulklen
) {
1751 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
1753 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
1758 /* Let's try to share objects on the command arguments vector */
1759 if (server
.shareobjects
) {
1761 for(j
= 1; j
< c
->argc
; j
++)
1762 c
->argv
[j
] = tryObjectSharing(c
->argv
[j
]);
1764 /* Let's try to encode the bulk object to save space. */
1765 if (cmd
->flags
& REDIS_CMD_BULK
)
1766 tryObjectEncoding(c
->argv
[c
->argc
-1]);
1768 /* Check if the user is authenticated */
1769 if (server
.requirepass
&& !c
->authenticated
&& cmd
->proc
!= authCommand
) {
1770 addReplySds(c
,sdsnew("-ERR operation not permitted\r\n"));
1775 /* Exec the command */
1776 dirty
= server
.dirty
;
1778 if (server
.appendonly
&& server
.dirty
-dirty
)
1779 feedAppendOnlyFile(cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1780 if (server
.dirty
-dirty
&& listLength(server
.slaves
))
1781 replicationFeedSlaves(server
.slaves
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1782 if (listLength(server
.monitors
))
1783 replicationFeedSlaves(server
.monitors
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1784 server
.stat_numcommands
++;
1786 /* Prepare the client for the next command */
1787 if (c
->flags
& REDIS_CLOSE
) {
1795 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
) {
1799 /* (args*2)+1 is enough room for args, spaces, newlines */
1800 robj
*static_outv
[REDIS_STATIC_ARGS
*2+1];
1802 if (argc
<= REDIS_STATIC_ARGS
) {
1805 outv
= zmalloc(sizeof(robj
*)*(argc
*2+1));
1808 for (j
= 0; j
< argc
; j
++) {
1809 if (j
!= 0) outv
[outc
++] = shared
.space
;
1810 if ((cmd
->flags
& REDIS_CMD_BULK
) && j
== argc
-1) {
1813 lenobj
= createObject(REDIS_STRING
,
1814 sdscatprintf(sdsempty(),"%lu\r\n",
1815 (unsigned long) stringObjectLen(argv
[j
])));
1816 lenobj
->refcount
= 0;
1817 outv
[outc
++] = lenobj
;
1819 outv
[outc
++] = argv
[j
];
1821 outv
[outc
++] = shared
.crlf
;
1823 /* Increment all the refcounts at start and decrement at end in order to
1824 * be sure to free objects if there is no slave in a replication state
1825 * able to be feed with commands */
1826 for (j
= 0; j
< outc
; j
++) incrRefCount(outv
[j
]);
1828 while((ln
= listYield(slaves
))) {
1829 redisClient
*slave
= ln
->value
;
1831 /* Don't feed slaves that are still waiting for BGSAVE to start */
1832 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) continue;
1834 /* Feed all the other slaves, MONITORs and so on */
1835 if (slave
->slaveseldb
!= dictid
) {
1839 case 0: selectcmd
= shared
.select0
; break;
1840 case 1: selectcmd
= shared
.select1
; break;
1841 case 2: selectcmd
= shared
.select2
; break;
1842 case 3: selectcmd
= shared
.select3
; break;
1843 case 4: selectcmd
= shared
.select4
; break;
1844 case 5: selectcmd
= shared
.select5
; break;
1845 case 6: selectcmd
= shared
.select6
; break;
1846 case 7: selectcmd
= shared
.select7
; break;
1847 case 8: selectcmd
= shared
.select8
; break;
1848 case 9: selectcmd
= shared
.select9
; break;
1850 selectcmd
= createObject(REDIS_STRING
,
1851 sdscatprintf(sdsempty(),"select %d\r\n",dictid
));
1852 selectcmd
->refcount
= 0;
1855 addReply(slave
,selectcmd
);
1856 slave
->slaveseldb
= dictid
;
1858 for (j
= 0; j
< outc
; j
++) addReply(slave
,outv
[j
]);
1860 for (j
= 0; j
< outc
; j
++) decrRefCount(outv
[j
]);
1861 if (outv
!= static_outv
) zfree(outv
);
1864 static void processInputBuffer(redisClient
*c
) {
1866 if (c
->bulklen
== -1) {
1867 /* Read the first line of the query */
1868 char *p
= strchr(c
->querybuf
,'\n');
1875 query
= c
->querybuf
;
1876 c
->querybuf
= sdsempty();
1877 querylen
= 1+(p
-(query
));
1878 if (sdslen(query
) > querylen
) {
1879 /* leave data after the first line of the query in the buffer */
1880 c
->querybuf
= sdscatlen(c
->querybuf
,query
+querylen
,sdslen(query
)-querylen
);
1882 *p
= '\0'; /* remove "\n" */
1883 if (*(p
-1) == '\r') *(p
-1) = '\0'; /* and "\r" if any */
1884 sdsupdatelen(query
);
1886 /* Now we can split the query in arguments */
1887 argv
= sdssplitlen(query
,sdslen(query
)," ",1,&argc
);
1890 if (c
->argv
) zfree(c
->argv
);
1891 c
->argv
= zmalloc(sizeof(robj
*)*argc
);
1893 for (j
= 0; j
< argc
; j
++) {
1894 if (sdslen(argv
[j
])) {
1895 c
->argv
[c
->argc
] = createObject(REDIS_STRING
,argv
[j
]);
1903 /* Execute the command. If the client is still valid
1904 * after processCommand() return and there is something
1905 * on the query buffer try to process the next command. */
1906 if (processCommand(c
) && sdslen(c
->querybuf
)) goto again
;
1908 /* Nothing to process, argc == 0. Just process the query
1909 * buffer if it's not empty or return to the caller */
1910 if (sdslen(c
->querybuf
)) goto again
;
1913 } else if (sdslen(c
->querybuf
) >= REDIS_REQUEST_MAX_SIZE
) {
1914 redisLog(REDIS_DEBUG
, "Client protocol error");
1919 /* Bulk read handling. Note that if we are at this point
1920 the client already sent a command terminated with a newline,
1921 we are reading the bulk data that is actually the last
1922 argument of the command. */
1923 int qbl
= sdslen(c
->querybuf
);
1925 if (c
->bulklen
<= qbl
) {
1926 /* Copy everything but the final CRLF as final argument */
1927 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
1929 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
1930 /* Process the command. If the client is still valid after
1931 * the processing and there is more data in the buffer
1932 * try to parse it. */
1933 if (processCommand(c
) && sdslen(c
->querybuf
)) goto again
;
1939 static void readQueryFromClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1940 redisClient
*c
= (redisClient
*) privdata
;
1941 char buf
[REDIS_IOBUF_LEN
];
1944 REDIS_NOTUSED(mask
);
1946 nread
= read(fd
, buf
, REDIS_IOBUF_LEN
);
1948 if (errno
== EAGAIN
) {
1951 redisLog(REDIS_DEBUG
, "Reading from client: %s",strerror(errno
));
1955 } else if (nread
== 0) {
1956 redisLog(REDIS_DEBUG
, "Client closed connection");
1961 c
->querybuf
= sdscatlen(c
->querybuf
, buf
, nread
);
1962 c
->lastinteraction
= time(NULL
);
1966 processInputBuffer(c
);
1969 static int selectDb(redisClient
*c
, int id
) {
1970 if (id
< 0 || id
>= server
.dbnum
)
1972 c
->db
= &server
.db
[id
];
1976 static void *dupClientReplyValue(void *o
) {
1977 incrRefCount((robj
*)o
);
1981 static redisClient
*createClient(int fd
) {
1982 redisClient
*c
= zmalloc(sizeof(*c
));
1984 anetNonBlock(NULL
,fd
);
1985 anetTcpNoDelay(NULL
,fd
);
1986 if (!c
) return NULL
;
1989 c
->querybuf
= sdsempty();
1998 c
->lastinteraction
= time(NULL
);
1999 c
->authenticated
= 0;
2000 c
->replstate
= REDIS_REPL_NONE
;
2001 c
->reply
= listCreate();
2002 listSetFreeMethod(c
->reply
,decrRefCount
);
2003 listSetDupMethod(c
->reply
,dupClientReplyValue
);
2004 if (aeCreateFileEvent(server
.el
, c
->fd
, AE_READABLE
,
2005 readQueryFromClient
, c
) == AE_ERR
) {
2009 listAddNodeTail(server
.clients
,c
);
2013 static void addReply(redisClient
*c
, robj
*obj
) {
2014 if (listLength(c
->reply
) == 0 &&
2015 (c
->replstate
== REDIS_REPL_NONE
||
2016 c
->replstate
== REDIS_REPL_ONLINE
) &&
2017 aeCreateFileEvent(server
.el
, c
->fd
, AE_WRITABLE
,
2018 sendReplyToClient
, c
) == AE_ERR
) return;
2019 listAddNodeTail(c
->reply
,getDecodedObject(obj
));
2022 static void addReplySds(redisClient
*c
, sds s
) {
2023 robj
*o
= createObject(REDIS_STRING
,s
);
2028 static void addReplyDouble(redisClient
*c
, double d
) {
2031 snprintf(buf
,sizeof(buf
),"%.17g",d
);
2032 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n%s\r\n",
2033 (unsigned long) strlen(buf
),buf
));
2036 static void addReplyBulkLen(redisClient
*c
, robj
*obj
) {
2039 if (obj
->encoding
== REDIS_ENCODING_RAW
) {
2040 len
= sdslen(obj
->ptr
);
2042 long n
= (long)obj
->ptr
;
2044 /* Compute how many bytes will take this integer as a radix 10 string */
2050 while((n
= n
/10) != 0) {
2054 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n",(unsigned long)len
));
2057 static void acceptHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
2062 REDIS_NOTUSED(mask
);
2063 REDIS_NOTUSED(privdata
);
2065 cfd
= anetAccept(server
.neterr
, fd
, cip
, &cport
);
2066 if (cfd
== AE_ERR
) {
2067 redisLog(REDIS_DEBUG
,"Accepting client connection: %s", server
.neterr
);
2070 redisLog(REDIS_DEBUG
,"Accepted %s:%d", cip
, cport
);
2071 if ((c
= createClient(cfd
)) == NULL
) {
2072 redisLog(REDIS_WARNING
,"Error allocating resoures for the client");
2073 close(cfd
); /* May be already closed, just ingore errors */
2076 /* If maxclient directive is set and this is one client more... close the
2077 * connection. Note that we create the client instead to check before
2078 * for this condition, since now the socket is already set in nonblocking
2079 * mode and we can send an error for free using the Kernel I/O */
2080 if (server
.maxclients
&& listLength(server
.clients
) > server
.maxclients
) {
2081 char *err
= "-ERR max number of clients reached\r\n";
2083 /* That's a best effort error message, don't check write errors */
2084 if (write(c
->fd
,err
,strlen(err
)) == -1) {
2085 /* Nothing to do, Just to avoid the warning... */
2090 server
.stat_numconnections
++;
2093 /* ======================= Redis objects implementation ===================== */
2095 static robj
*createObject(int type
, void *ptr
) {
2098 if (listLength(server
.objfreelist
)) {
2099 listNode
*head
= listFirst(server
.objfreelist
);
2100 o
= listNodeValue(head
);
2101 listDelNode(server
.objfreelist
,head
);
2103 o
= zmalloc(sizeof(*o
));
2106 o
->encoding
= REDIS_ENCODING_RAW
;
2112 static robj
*createStringObject(char *ptr
, size_t len
) {
2113 return createObject(REDIS_STRING
,sdsnewlen(ptr
,len
));
2116 static robj
*createListObject(void) {
2117 list
*l
= listCreate();
2119 listSetFreeMethod(l
,decrRefCount
);
2120 return createObject(REDIS_LIST
,l
);
2123 static robj
*createSetObject(void) {
2124 dict
*d
= dictCreate(&setDictType
,NULL
);
2125 return createObject(REDIS_SET
,d
);
2128 static robj
*createZsetObject(void) {
2129 zset
*zs
= zmalloc(sizeof(*zs
));
2131 zs
->dict
= dictCreate(&zsetDictType
,NULL
);
2132 zs
->zsl
= zslCreate();
2133 return createObject(REDIS_ZSET
,zs
);
2136 static void freeStringObject(robj
*o
) {
2137 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2142 static void freeListObject(robj
*o
) {
2143 listRelease((list
*) o
->ptr
);
2146 static void freeSetObject(robj
*o
) {
2147 dictRelease((dict
*) o
->ptr
);
2150 static void freeZsetObject(robj
*o
) {
2153 dictRelease(zs
->dict
);
2158 static void freeHashObject(robj
*o
) {
2159 dictRelease((dict
*) o
->ptr
);
2162 static void incrRefCount(robj
*o
) {
2164 #ifdef DEBUG_REFCOUNT
2165 if (o
->type
== REDIS_STRING
)
2166 printf("Increment '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
);
2170 static void decrRefCount(void *obj
) {
2173 #ifdef DEBUG_REFCOUNT
2174 if (o
->type
== REDIS_STRING
)
2175 printf("Decrement '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
-1);
2177 if (--(o
->refcount
) == 0) {
2179 case REDIS_STRING
: freeStringObject(o
); break;
2180 case REDIS_LIST
: freeListObject(o
); break;
2181 case REDIS_SET
: freeSetObject(o
); break;
2182 case REDIS_ZSET
: freeZsetObject(o
); break;
2183 case REDIS_HASH
: freeHashObject(o
); break;
2184 default: redisAssert(0 != 0); break;
2186 if (listLength(server
.objfreelist
) > REDIS_OBJFREELIST_MAX
||
2187 !listAddNodeHead(server
.objfreelist
,o
))
2192 static robj
*lookupKey(redisDb
*db
, robj
*key
) {
2193 dictEntry
*de
= dictFind(db
->dict
,key
);
2194 return de
? dictGetEntryVal(de
) : NULL
;
2197 static robj
*lookupKeyRead(redisDb
*db
, robj
*key
) {
2198 expireIfNeeded(db
,key
);
2199 return lookupKey(db
,key
);
2202 static robj
*lookupKeyWrite(redisDb
*db
, robj
*key
) {
2203 deleteIfVolatile(db
,key
);
2204 return lookupKey(db
,key
);
2207 static int deleteKey(redisDb
*db
, robj
*key
) {
2210 /* We need to protect key from destruction: after the first dictDelete()
2211 * it may happen that 'key' is no longer valid if we don't increment
2212 * it's count. This may happen when we get the object reference directly
2213 * from the hash table with dictRandomKey() or dict iterators */
2215 if (dictSize(db
->expires
)) dictDelete(db
->expires
,key
);
2216 retval
= dictDelete(db
->dict
,key
);
2219 return retval
== DICT_OK
;
2222 /* Try to share an object against the shared objects pool */
2223 static robj
*tryObjectSharing(robj
*o
) {
2224 struct dictEntry
*de
;
2227 if (o
== NULL
|| server
.shareobjects
== 0) return o
;
2229 redisAssert(o
->type
== REDIS_STRING
);
2230 de
= dictFind(server
.sharingpool
,o
);
2232 robj
*shared
= dictGetEntryKey(de
);
2234 c
= ((unsigned long) dictGetEntryVal(de
))+1;
2235 dictGetEntryVal(de
) = (void*) c
;
2236 incrRefCount(shared
);
2240 /* Here we are using a stream algorihtm: Every time an object is
2241 * shared we increment its count, everytime there is a miss we
2242 * recrement the counter of a random object. If this object reaches
2243 * zero we remove the object and put the current object instead. */
2244 if (dictSize(server
.sharingpool
) >=
2245 server
.sharingpoolsize
) {
2246 de
= dictGetRandomKey(server
.sharingpool
);
2247 redisAssert(de
!= NULL
);
2248 c
= ((unsigned long) dictGetEntryVal(de
))-1;
2249 dictGetEntryVal(de
) = (void*) c
;
2251 dictDelete(server
.sharingpool
,de
->key
);
2254 c
= 0; /* If the pool is empty we want to add this object */
2259 retval
= dictAdd(server
.sharingpool
,o
,(void*)1);
2260 redisAssert(retval
== DICT_OK
);
2267 /* Check if the nul-terminated string 's' can be represented by a long
2268 * (that is, is a number that fits into long without any other space or
2269 * character before or after the digits).
2271 * If so, the function returns REDIS_OK and *longval is set to the value
2272 * of the number. Otherwise REDIS_ERR is returned */
2273 static int isStringRepresentableAsLong(sds s
, long *longval
) {
2274 char buf
[32], *endptr
;
2278 value
= strtol(s
, &endptr
, 10);
2279 if (endptr
[0] != '\0') return REDIS_ERR
;
2280 slen
= snprintf(buf
,32,"%ld",value
);
2282 /* If the number converted back into a string is not identical
2283 * then it's not possible to encode the string as integer */
2284 if (sdslen(s
) != (unsigned)slen
|| memcmp(buf
,s
,slen
)) return REDIS_ERR
;
2285 if (longval
) *longval
= value
;
2289 /* Try to encode a string object in order to save space */
2290 static int tryObjectEncoding(robj
*o
) {
2294 if (o
->encoding
!= REDIS_ENCODING_RAW
)
2295 return REDIS_ERR
; /* Already encoded */
2297 /* It's not save to encode shared objects: shared objects can be shared
2298 * everywhere in the "object space" of Redis. Encoded objects can only
2299 * appear as "values" (and not, for instance, as keys) */
2300 if (o
->refcount
> 1) return REDIS_ERR
;
2302 /* Currently we try to encode only strings */
2303 redisAssert(o
->type
== REDIS_STRING
);
2305 /* Check if we can represent this string as a long integer */
2306 if (isStringRepresentableAsLong(s
,&value
) == REDIS_ERR
) return REDIS_ERR
;
2308 /* Ok, this object can be encoded */
2309 o
->encoding
= REDIS_ENCODING_INT
;
2311 o
->ptr
= (void*) value
;
2315 /* Get a decoded version of an encoded object (returned as a new object).
2316 * If the object is already raw-encoded just increment the ref count. */
2317 static robj
*getDecodedObject(robj
*o
) {
2320 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2324 if (o
->type
== REDIS_STRING
&& o
->encoding
== REDIS_ENCODING_INT
) {
2327 snprintf(buf
,32,"%ld",(long)o
->ptr
);
2328 dec
= createStringObject(buf
,strlen(buf
));
2331 redisAssert(1 != 1);
2335 /* Compare two string objects via strcmp() or alike.
2336 * Note that the objects may be integer-encoded. In such a case we
2337 * use snprintf() to get a string representation of the numbers on the stack
2338 * and compare the strings, it's much faster than calling getDecodedObject().
2340 * Important note: if objects are not integer encoded, but binary-safe strings,
2341 * sdscmp() from sds.c will apply memcmp() so this function ca be considered
2343 static int compareStringObjects(robj
*a
, robj
*b
) {
2344 redisAssert(a
->type
== REDIS_STRING
&& b
->type
== REDIS_STRING
);
2345 char bufa
[128], bufb
[128], *astr
, *bstr
;
2348 if (a
== b
) return 0;
2349 if (a
->encoding
!= REDIS_ENCODING_RAW
) {
2350 snprintf(bufa
,sizeof(bufa
),"%ld",(long) a
->ptr
);
2356 if (b
->encoding
!= REDIS_ENCODING_RAW
) {
2357 snprintf(bufb
,sizeof(bufb
),"%ld",(long) b
->ptr
);
2363 return bothsds
? sdscmp(astr
,bstr
) : strcmp(astr
,bstr
);
2366 static size_t stringObjectLen(robj
*o
) {
2367 redisAssert(o
->type
== REDIS_STRING
);
2368 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2369 return sdslen(o
->ptr
);
2373 return snprintf(buf
,32,"%ld",(long)o
->ptr
);
2377 /*============================ DB saving/loading ============================ */
2379 static int rdbSaveType(FILE *fp
, unsigned char type
) {
2380 if (fwrite(&type
,1,1,fp
) == 0) return -1;
2384 static int rdbSaveTime(FILE *fp
, time_t t
) {
2385 int32_t t32
= (int32_t) t
;
2386 if (fwrite(&t32
,4,1,fp
) == 0) return -1;
2390 /* check rdbLoadLen() comments for more info */
2391 static int rdbSaveLen(FILE *fp
, uint32_t len
) {
2392 unsigned char buf
[2];
2395 /* Save a 6 bit len */
2396 buf
[0] = (len
&0xFF)|(REDIS_RDB_6BITLEN
<<6);
2397 if (fwrite(buf
,1,1,fp
) == 0) return -1;
2398 } else if (len
< (1<<14)) {
2399 /* Save a 14 bit len */
2400 buf
[0] = ((len
>>8)&0xFF)|(REDIS_RDB_14BITLEN
<<6);
2402 if (fwrite(buf
,2,1,fp
) == 0) return -1;
2404 /* Save a 32 bit len */
2405 buf
[0] = (REDIS_RDB_32BITLEN
<<6);
2406 if (fwrite(buf
,1,1,fp
) == 0) return -1;
2408 if (fwrite(&len
,4,1,fp
) == 0) return -1;
2413 /* String objects in the form "2391" "-100" without any space and with a
2414 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2415 * encoded as integers to save space */
2416 static int rdbTryIntegerEncoding(sds s
, unsigned char *enc
) {
2418 char *endptr
, buf
[32];
2420 /* Check if it's possible to encode this value as a number */
2421 value
= strtoll(s
, &endptr
, 10);
2422 if (endptr
[0] != '\0') return 0;
2423 snprintf(buf
,32,"%lld",value
);
2425 /* If the number converted back into a string is not identical
2426 * then it's not possible to encode the string as integer */
2427 if (strlen(buf
) != sdslen(s
) || memcmp(buf
,s
,sdslen(s
))) return 0;
2429 /* Finally check if it fits in our ranges */
2430 if (value
>= -(1<<7) && value
<= (1<<7)-1) {
2431 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT8
;
2432 enc
[1] = value
&0xFF;
2434 } else if (value
>= -(1<<15) && value
<= (1<<15)-1) {
2435 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT16
;
2436 enc
[1] = value
&0xFF;
2437 enc
[2] = (value
>>8)&0xFF;
2439 } else if (value
>= -((long long)1<<31) && value
<= ((long long)1<<31)-1) {
2440 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT32
;
2441 enc
[1] = value
&0xFF;
2442 enc
[2] = (value
>>8)&0xFF;
2443 enc
[3] = (value
>>16)&0xFF;
2444 enc
[4] = (value
>>24)&0xFF;
2451 static int rdbSaveLzfStringObject(FILE *fp
, robj
*obj
) {
2452 unsigned int comprlen
, outlen
;
2456 /* We require at least four bytes compression for this to be worth it */
2457 outlen
= sdslen(obj
->ptr
)-4;
2458 if (outlen
<= 0) return 0;
2459 if ((out
= zmalloc(outlen
+1)) == NULL
) return 0;
2460 comprlen
= lzf_compress(obj
->ptr
, sdslen(obj
->ptr
), out
, outlen
);
2461 if (comprlen
== 0) {
2465 /* Data compressed! Let's save it on disk */
2466 byte
= (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_LZF
;
2467 if (fwrite(&byte
,1,1,fp
) == 0) goto writeerr
;
2468 if (rdbSaveLen(fp
,comprlen
) == -1) goto writeerr
;
2469 if (rdbSaveLen(fp
,sdslen(obj
->ptr
)) == -1) goto writeerr
;
2470 if (fwrite(out
,comprlen
,1,fp
) == 0) goto writeerr
;
2479 /* Save a string objet as [len][data] on disk. If the object is a string
2480 * representation of an integer value we try to safe it in a special form */
2481 static int rdbSaveStringObjectRaw(FILE *fp
, robj
*obj
) {
2485 len
= sdslen(obj
->ptr
);
2487 /* Try integer encoding */
2489 unsigned char buf
[5];
2490 if ((enclen
= rdbTryIntegerEncoding(obj
->ptr
,buf
)) > 0) {
2491 if (fwrite(buf
,enclen
,1,fp
) == 0) return -1;
2496 /* Try LZF compression - under 20 bytes it's unable to compress even
2497 * aaaaaaaaaaaaaaaaaa so skip it */
2498 if (server
.rdbcompression
&& len
> 20) {
2501 retval
= rdbSaveLzfStringObject(fp
,obj
);
2502 if (retval
== -1) return -1;
2503 if (retval
> 0) return 0;
2504 /* retval == 0 means data can't be compressed, save the old way */
2507 /* Store verbatim */
2508 if (rdbSaveLen(fp
,len
) == -1) return -1;
2509 if (len
&& fwrite(obj
->ptr
,len
,1,fp
) == 0) return -1;
2513 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2514 static int rdbSaveStringObject(FILE *fp
, robj
*obj
) {
2517 obj
= getDecodedObject(obj
);
2518 retval
= rdbSaveStringObjectRaw(fp
,obj
);
2523 /* Save a double value. Doubles are saved as strings prefixed by an unsigned
2524 * 8 bit integer specifing the length of the representation.
2525 * This 8 bit integer has special values in order to specify the following
2531 static int rdbSaveDoubleValue(FILE *fp
, double val
) {
2532 unsigned char buf
[128];
2538 } else if (!isfinite(val
)) {
2540 buf
[0] = (val
< 0) ? 255 : 254;
2542 snprintf((char*)buf
+1,sizeof(buf
)-1,"%.17g",val
);
2543 buf
[0] = strlen((char*)buf
+1);
2546 if (fwrite(buf
,len
,1,fp
) == 0) return -1;
2550 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2551 static int rdbSave(char *filename
) {
2552 dictIterator
*di
= NULL
;
2557 time_t now
= time(NULL
);
2559 snprintf(tmpfile
,256,"temp-%d.rdb", (int) getpid());
2560 fp
= fopen(tmpfile
,"w");
2562 redisLog(REDIS_WARNING
, "Failed saving the DB: %s", strerror(errno
));
2565 if (fwrite("REDIS0001",9,1,fp
) == 0) goto werr
;
2566 for (j
= 0; j
< server
.dbnum
; j
++) {
2567 redisDb
*db
= server
.db
+j
;
2569 if (dictSize(d
) == 0) continue;
2570 di
= dictGetIterator(d
);
2576 /* Write the SELECT DB opcode */
2577 if (rdbSaveType(fp
,REDIS_SELECTDB
) == -1) goto werr
;
2578 if (rdbSaveLen(fp
,j
) == -1) goto werr
;
2580 /* Iterate this DB writing every entry */
2581 while((de
= dictNext(di
)) != NULL
) {
2582 robj
*key
= dictGetEntryKey(de
);
2583 robj
*o
= dictGetEntryVal(de
);
2584 time_t expiretime
= getExpire(db
,key
);
2586 /* Save the expire time */
2587 if (expiretime
!= -1) {
2588 /* If this key is already expired skip it */
2589 if (expiretime
< now
) continue;
2590 if (rdbSaveType(fp
,REDIS_EXPIRETIME
) == -1) goto werr
;
2591 if (rdbSaveTime(fp
,expiretime
) == -1) goto werr
;
2593 /* Save the key and associated value */
2594 if (rdbSaveType(fp
,o
->type
) == -1) goto werr
;
2595 if (rdbSaveStringObject(fp
,key
) == -1) goto werr
;
2596 if (o
->type
== REDIS_STRING
) {
2597 /* Save a string value */
2598 if (rdbSaveStringObject(fp
,o
) == -1) goto werr
;
2599 } else if (o
->type
== REDIS_LIST
) {
2600 /* Save a list value */
2601 list
*list
= o
->ptr
;
2605 if (rdbSaveLen(fp
,listLength(list
)) == -1) goto werr
;
2606 while((ln
= listYield(list
))) {
2607 robj
*eleobj
= listNodeValue(ln
);
2609 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
2611 } else if (o
->type
== REDIS_SET
) {
2612 /* Save a set value */
2614 dictIterator
*di
= dictGetIterator(set
);
2617 if (rdbSaveLen(fp
,dictSize(set
)) == -1) goto werr
;
2618 while((de
= dictNext(di
)) != NULL
) {
2619 robj
*eleobj
= dictGetEntryKey(de
);
2621 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
2623 dictReleaseIterator(di
);
2624 } else if (o
->type
== REDIS_ZSET
) {
2625 /* Save a set value */
2627 dictIterator
*di
= dictGetIterator(zs
->dict
);
2630 if (rdbSaveLen(fp
,dictSize(zs
->dict
)) == -1) goto werr
;
2631 while((de
= dictNext(di
)) != NULL
) {
2632 robj
*eleobj
= dictGetEntryKey(de
);
2633 double *score
= dictGetEntryVal(de
);
2635 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
2636 if (rdbSaveDoubleValue(fp
,*score
) == -1) goto werr
;
2638 dictReleaseIterator(di
);
2640 redisAssert(0 != 0);
2643 dictReleaseIterator(di
);
2646 if (rdbSaveType(fp
,REDIS_EOF
) == -1) goto werr
;
2648 /* Make sure data will not remain on the OS's output buffers */
2653 /* Use RENAME to make sure the DB file is changed atomically only
2654 * if the generate DB file is ok. */
2655 if (rename(tmpfile
,filename
) == -1) {
2656 redisLog(REDIS_WARNING
,"Error moving temp DB file on the final destination: %s", strerror(errno
));
2660 redisLog(REDIS_NOTICE
,"DB saved on disk");
2662 server
.lastsave
= time(NULL
);
2668 redisLog(REDIS_WARNING
,"Write error saving DB on disk: %s", strerror(errno
));
2669 if (di
) dictReleaseIterator(di
);
2673 static int rdbSaveBackground(char *filename
) {
2676 if (server
.bgsavechildpid
!= -1) return REDIS_ERR
;
2677 if ((childpid
= fork()) == 0) {
2680 if (rdbSave(filename
) == REDIS_OK
) {
2687 if (childpid
== -1) {
2688 redisLog(REDIS_WARNING
,"Can't save in background: fork: %s",
2692 redisLog(REDIS_NOTICE
,"Background saving started by pid %d",childpid
);
2693 server
.bgsavechildpid
= childpid
;
2696 return REDIS_OK
; /* unreached */
2699 static void rdbRemoveTempFile(pid_t childpid
) {
2702 snprintf(tmpfile
,256,"temp-%d.rdb", (int) childpid
);
2706 static int rdbLoadType(FILE *fp
) {
2708 if (fread(&type
,1,1,fp
) == 0) return -1;
2712 static time_t rdbLoadTime(FILE *fp
) {
2714 if (fread(&t32
,4,1,fp
) == 0) return -1;
2715 return (time_t) t32
;
2718 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2719 * of this file for a description of how this are stored on disk.
2721 * isencoded is set to 1 if the readed length is not actually a length but
2722 * an "encoding type", check the above comments for more info */
2723 static uint32_t rdbLoadLen(FILE *fp
, int rdbver
, int *isencoded
) {
2724 unsigned char buf
[2];
2727 if (isencoded
) *isencoded
= 0;
2729 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
2734 if (fread(buf
,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2735 type
= (buf
[0]&0xC0)>>6;
2736 if (type
== REDIS_RDB_6BITLEN
) {
2737 /* Read a 6 bit len */
2739 } else if (type
== REDIS_RDB_ENCVAL
) {
2740 /* Read a 6 bit len encoding type */
2741 if (isencoded
) *isencoded
= 1;
2743 } else if (type
== REDIS_RDB_14BITLEN
) {
2744 /* Read a 14 bit len */
2745 if (fread(buf
+1,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2746 return ((buf
[0]&0x3F)<<8)|buf
[1];
2748 /* Read a 32 bit len */
2749 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
2755 static robj
*rdbLoadIntegerObject(FILE *fp
, int enctype
) {
2756 unsigned char enc
[4];
2759 if (enctype
== REDIS_RDB_ENC_INT8
) {
2760 if (fread(enc
,1,1,fp
) == 0) return NULL
;
2761 val
= (signed char)enc
[0];
2762 } else if (enctype
== REDIS_RDB_ENC_INT16
) {
2764 if (fread(enc
,2,1,fp
) == 0) return NULL
;
2765 v
= enc
[0]|(enc
[1]<<8);
2767 } else if (enctype
== REDIS_RDB_ENC_INT32
) {
2769 if (fread(enc
,4,1,fp
) == 0) return NULL
;
2770 v
= enc
[0]|(enc
[1]<<8)|(enc
[2]<<16)|(enc
[3]<<24);
2773 val
= 0; /* anti-warning */
2776 return createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",val
));
2779 static robj
*rdbLoadLzfStringObject(FILE*fp
, int rdbver
) {
2780 unsigned int len
, clen
;
2781 unsigned char *c
= NULL
;
2784 if ((clen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2785 if ((len
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2786 if ((c
= zmalloc(clen
)) == NULL
) goto err
;
2787 if ((val
= sdsnewlen(NULL
,len
)) == NULL
) goto err
;
2788 if (fread(c
,clen
,1,fp
) == 0) goto err
;
2789 if (lzf_decompress(c
,clen
,val
,len
) == 0) goto err
;
2791 return createObject(REDIS_STRING
,val
);
2798 static robj
*rdbLoadStringObject(FILE*fp
, int rdbver
) {
2803 len
= rdbLoadLen(fp
,rdbver
,&isencoded
);
2806 case REDIS_RDB_ENC_INT8
:
2807 case REDIS_RDB_ENC_INT16
:
2808 case REDIS_RDB_ENC_INT32
:
2809 return tryObjectSharing(rdbLoadIntegerObject(fp
,len
));
2810 case REDIS_RDB_ENC_LZF
:
2811 return tryObjectSharing(rdbLoadLzfStringObject(fp
,rdbver
));
2817 if (len
== REDIS_RDB_LENERR
) return NULL
;
2818 val
= sdsnewlen(NULL
,len
);
2819 if (len
&& fread(val
,len
,1,fp
) == 0) {
2823 return tryObjectSharing(createObject(REDIS_STRING
,val
));
2826 /* For information about double serialization check rdbSaveDoubleValue() */
2827 static int rdbLoadDoubleValue(FILE *fp
, double *val
) {
2831 if (fread(&len
,1,1,fp
) == 0) return -1;
2833 case 255: *val
= R_NegInf
; return 0;
2834 case 254: *val
= R_PosInf
; return 0;
2835 case 253: *val
= R_Nan
; return 0;
2837 if (fread(buf
,len
,1,fp
) == 0) return -1;
2839 sscanf(buf
, "%lg", val
);
2844 static int rdbLoad(char *filename
) {
2846 robj
*keyobj
= NULL
;
2848 int type
, retval
, rdbver
;
2849 dict
*d
= server
.db
[0].dict
;
2850 redisDb
*db
= server
.db
+0;
2852 time_t expiretime
= -1, now
= time(NULL
);
2854 fp
= fopen(filename
,"r");
2855 if (!fp
) return REDIS_ERR
;
2856 if (fread(buf
,9,1,fp
) == 0) goto eoferr
;
2858 if (memcmp(buf
,"REDIS",5) != 0) {
2860 redisLog(REDIS_WARNING
,"Wrong signature trying to load DB from file");
2863 rdbver
= atoi(buf
+5);
2866 redisLog(REDIS_WARNING
,"Can't handle RDB format version %d",rdbver
);
2873 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
2874 if (type
== REDIS_EXPIRETIME
) {
2875 if ((expiretime
= rdbLoadTime(fp
)) == -1) goto eoferr
;
2876 /* We read the time so we need to read the object type again */
2877 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
2879 if (type
== REDIS_EOF
) break;
2880 /* Handle SELECT DB opcode as a special case */
2881 if (type
== REDIS_SELECTDB
) {
2882 if ((dbid
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2884 if (dbid
>= (unsigned)server
.dbnum
) {
2885 redisLog(REDIS_WARNING
,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server
.dbnum
);
2888 db
= server
.db
+dbid
;
2893 if ((keyobj
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2895 if (type
== REDIS_STRING
) {
2896 /* Read string value */
2897 if ((o
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2898 tryObjectEncoding(o
);
2899 } else if (type
== REDIS_LIST
|| type
== REDIS_SET
) {
2900 /* Read list/set value */
2903 if ((listlen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2905 o
= (type
== REDIS_LIST
) ? createListObject() : createSetObject();
2906 /* Load every single element of the list/set */
2910 if ((ele
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2911 tryObjectEncoding(ele
);
2912 if (type
== REDIS_LIST
) {
2913 listAddNodeTail((list
*)o
->ptr
,ele
);
2915 dictAdd((dict
*)o
->ptr
,ele
,NULL
);
2918 } else if (type
== REDIS_ZSET
) {
2919 /* Read list/set value */
2923 if ((zsetlen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2925 o
= createZsetObject();
2927 /* Load every single element of the list/set */
2930 double *score
= zmalloc(sizeof(double));
2932 if ((ele
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2933 tryObjectEncoding(ele
);
2934 if (rdbLoadDoubleValue(fp
,score
) == -1) goto eoferr
;
2935 dictAdd(zs
->dict
,ele
,score
);
2936 zslInsert(zs
->zsl
,*score
,ele
);
2937 incrRefCount(ele
); /* added to skiplist */
2940 redisAssert(0 != 0);
2942 /* Add the new object in the hash table */
2943 retval
= dictAdd(d
,keyobj
,o
);
2944 if (retval
== DICT_ERR
) {
2945 redisLog(REDIS_WARNING
,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj
->ptr
);
2948 /* Set the expire time if needed */
2949 if (expiretime
!= -1) {
2950 setExpire(db
,keyobj
,expiretime
);
2951 /* Delete this key if already expired */
2952 if (expiretime
< now
) deleteKey(db
,keyobj
);
2960 eoferr
: /* unexpected end of file is handled here with a fatal exit */
2961 if (keyobj
) decrRefCount(keyobj
);
2962 redisLog(REDIS_WARNING
,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
2964 return REDIS_ERR
; /* Just to avoid warning */
2967 /*================================== Commands =============================== */
2969 static void authCommand(redisClient
*c
) {
2970 if (!server
.requirepass
|| !strcmp(c
->argv
[1]->ptr
, server
.requirepass
)) {
2971 c
->authenticated
= 1;
2972 addReply(c
,shared
.ok
);
2974 c
->authenticated
= 0;
2975 addReplySds(c
,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
2979 static void pingCommand(redisClient
*c
) {
2980 addReply(c
,shared
.pong
);
2983 static void echoCommand(redisClient
*c
) {
2984 addReplyBulkLen(c
,c
->argv
[1]);
2985 addReply(c
,c
->argv
[1]);
2986 addReply(c
,shared
.crlf
);
2989 /*=================================== Strings =============================== */
2991 static void setGenericCommand(redisClient
*c
, int nx
) {
2994 if (nx
) deleteIfVolatile(c
->db
,c
->argv
[1]);
2995 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2996 if (retval
== DICT_ERR
) {
2998 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2999 incrRefCount(c
->argv
[2]);
3001 addReply(c
,shared
.czero
);
3005 incrRefCount(c
->argv
[1]);
3006 incrRefCount(c
->argv
[2]);
3009 removeExpire(c
->db
,c
->argv
[1]);
3010 addReply(c
, nx
? shared
.cone
: shared
.ok
);
3013 static void setCommand(redisClient
*c
) {
3014 setGenericCommand(c
,0);
3017 static void setnxCommand(redisClient
*c
) {
3018 setGenericCommand(c
,1);
3021 static void getCommand(redisClient
*c
) {
3022 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3025 addReply(c
,shared
.nullbulk
);
3027 if (o
->type
!= REDIS_STRING
) {
3028 addReply(c
,shared
.wrongtypeerr
);
3030 addReplyBulkLen(c
,o
);
3032 addReply(c
,shared
.crlf
);
3037 static void getsetCommand(redisClient
*c
) {
3039 if (dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]) == DICT_ERR
) {
3040 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
3042 incrRefCount(c
->argv
[1]);
3044 incrRefCount(c
->argv
[2]);
3046 removeExpire(c
->db
,c
->argv
[1]);
3049 static void mgetCommand(redisClient
*c
) {
3052 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",c
->argc
-1));
3053 for (j
= 1; j
< c
->argc
; j
++) {
3054 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[j
]);
3056 addReply(c
,shared
.nullbulk
);
3058 if (o
->type
!= REDIS_STRING
) {
3059 addReply(c
,shared
.nullbulk
);
3061 addReplyBulkLen(c
,o
);
3063 addReply(c
,shared
.crlf
);
3069 static void msetGenericCommand(redisClient
*c
, int nx
) {
3070 int j
, busykeys
= 0;
3072 if ((c
->argc
% 2) == 0) {
3073 addReplySds(c
,sdsnew("-ERR wrong number of arguments for MSET\r\n"));
3076 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
3077 * set nothing at all if at least one already key exists. */
3079 for (j
= 1; j
< c
->argc
; j
+= 2) {
3080 if (lookupKeyWrite(c
->db
,c
->argv
[j
]) != NULL
) {
3086 addReply(c
, shared
.czero
);
3090 for (j
= 1; j
< c
->argc
; j
+= 2) {
3093 tryObjectEncoding(c
->argv
[j
+1]);
3094 retval
= dictAdd(c
->db
->dict
,c
->argv
[j
],c
->argv
[j
+1]);
3095 if (retval
== DICT_ERR
) {
3096 dictReplace(c
->db
->dict
,c
->argv
[j
],c
->argv
[j
+1]);
3097 incrRefCount(c
->argv
[j
+1]);
3099 incrRefCount(c
->argv
[j
]);
3100 incrRefCount(c
->argv
[j
+1]);
3102 removeExpire(c
->db
,c
->argv
[j
]);
3104 server
.dirty
+= (c
->argc
-1)/2;
3105 addReply(c
, nx
? shared
.cone
: shared
.ok
);
3108 static void msetCommand(redisClient
*c
) {
3109 msetGenericCommand(c
,0);
3112 static void msetnxCommand(redisClient
*c
) {
3113 msetGenericCommand(c
,1);
3116 static void incrDecrCommand(redisClient
*c
, long long incr
) {
3121 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3125 if (o
->type
!= REDIS_STRING
) {
3130 if (o
->encoding
== REDIS_ENCODING_RAW
)
3131 value
= strtoll(o
->ptr
, &eptr
, 10);
3132 else if (o
->encoding
== REDIS_ENCODING_INT
)
3133 value
= (long)o
->ptr
;
3135 redisAssert(1 != 1);
3140 o
= createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",value
));
3141 tryObjectEncoding(o
);
3142 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],o
);
3143 if (retval
== DICT_ERR
) {
3144 dictReplace(c
->db
->dict
,c
->argv
[1],o
);
3145 removeExpire(c
->db
,c
->argv
[1]);
3147 incrRefCount(c
->argv
[1]);
3150 addReply(c
,shared
.colon
);
3152 addReply(c
,shared
.crlf
);
3155 static void incrCommand(redisClient
*c
) {
3156 incrDecrCommand(c
,1);
3159 static void decrCommand(redisClient
*c
) {
3160 incrDecrCommand(c
,-1);
3163 static void incrbyCommand(redisClient
*c
) {
3164 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
3165 incrDecrCommand(c
,incr
);
3168 static void decrbyCommand(redisClient
*c
) {
3169 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
3170 incrDecrCommand(c
,-incr
);
3173 /* ========================= Type agnostic commands ========================= */
3175 static void delCommand(redisClient
*c
) {
3178 for (j
= 1; j
< c
->argc
; j
++) {
3179 if (deleteKey(c
->db
,c
->argv
[j
])) {
3186 addReply(c
,shared
.czero
);
3189 addReply(c
,shared
.cone
);
3192 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",deleted
));
3197 static void existsCommand(redisClient
*c
) {
3198 addReply(c
,lookupKeyRead(c
->db
,c
->argv
[1]) ? shared
.cone
: shared
.czero
);
3201 static void selectCommand(redisClient
*c
) {
3202 int id
= atoi(c
->argv
[1]->ptr
);
3204 if (selectDb(c
,id
) == REDIS_ERR
) {
3205 addReplySds(c
,sdsnew("-ERR invalid DB index\r\n"));
3207 addReply(c
,shared
.ok
);
3211 static void randomkeyCommand(redisClient
*c
) {
3215 de
= dictGetRandomKey(c
->db
->dict
);
3216 if (!de
|| expireIfNeeded(c
->db
,dictGetEntryKey(de
)) == 0) break;
3219 addReply(c
,shared
.plus
);
3220 addReply(c
,shared
.crlf
);
3222 addReply(c
,shared
.plus
);
3223 addReply(c
,dictGetEntryKey(de
));
3224 addReply(c
,shared
.crlf
);
3228 static void keysCommand(redisClient
*c
) {
3231 sds pattern
= c
->argv
[1]->ptr
;
3232 int plen
= sdslen(pattern
);
3233 unsigned long numkeys
= 0, keyslen
= 0;
3234 robj
*lenobj
= createObject(REDIS_STRING
,NULL
);
3236 di
= dictGetIterator(c
->db
->dict
);
3238 decrRefCount(lenobj
);
3239 while((de
= dictNext(di
)) != NULL
) {
3240 robj
*keyobj
= dictGetEntryKey(de
);
3242 sds key
= keyobj
->ptr
;
3243 if ((pattern
[0] == '*' && pattern
[1] == '\0') ||
3244 stringmatchlen(pattern
,plen
,key
,sdslen(key
),0)) {
3245 if (expireIfNeeded(c
->db
,keyobj
) == 0) {
3247 addReply(c
,shared
.space
);
3250 keyslen
+= sdslen(key
);
3254 dictReleaseIterator(di
);
3255 lenobj
->ptr
= sdscatprintf(sdsempty(),"$%lu\r\n",keyslen
+(numkeys
? (numkeys
-1) : 0));
3256 addReply(c
,shared
.crlf
);
3259 static void dbsizeCommand(redisClient
*c
) {
3261 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c
->db
->dict
)));
3264 static void lastsaveCommand(redisClient
*c
) {
3266 sdscatprintf(sdsempty(),":%lu\r\n",server
.lastsave
));
3269 static void typeCommand(redisClient
*c
) {
3273 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3278 case REDIS_STRING
: type
= "+string"; break;
3279 case REDIS_LIST
: type
= "+list"; break;
3280 case REDIS_SET
: type
= "+set"; break;
3281 case REDIS_ZSET
: type
= "+zset"; break;
3282 default: type
= "unknown"; break;
3285 addReplySds(c
,sdsnew(type
));
3286 addReply(c
,shared
.crlf
);
3289 static void saveCommand(redisClient
*c
) {
3290 if (server
.bgsavechildpid
!= -1) {
3291 addReplySds(c
,sdsnew("-ERR background save in progress\r\n"));
3294 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
3295 addReply(c
,shared
.ok
);
3297 addReply(c
,shared
.err
);
3301 static void bgsaveCommand(redisClient
*c
) {
3302 if (server
.bgsavechildpid
!= -1) {
3303 addReplySds(c
,sdsnew("-ERR background save already in progress\r\n"));
3306 if (rdbSaveBackground(server
.dbfilename
) == REDIS_OK
) {
3307 char *status
= "+Background saving started\r\n";
3308 addReplySds(c
,sdsnew(status
));
3310 addReply(c
,shared
.err
);
3314 static void shutdownCommand(redisClient
*c
) {
3315 redisLog(REDIS_WARNING
,"User requested shutdown, saving DB...");
3316 /* Kill the saving child if there is a background saving in progress.
3317 We want to avoid race conditions, for instance our saving child may
3318 overwrite the synchronous saving did by SHUTDOWN. */
3319 if (server
.bgsavechildpid
!= -1) {
3320 redisLog(REDIS_WARNING
,"There is a live saving child. Killing it!");
3321 kill(server
.bgsavechildpid
,SIGKILL
);
3322 rdbRemoveTempFile(server
.bgsavechildpid
);
3324 if (server
.appendonly
) {
3325 /* Append only file: fsync() the AOF and exit */
3326 fsync(server
.appendfd
);
3329 /* Snapshotting. Perform a SYNC SAVE and exit */
3330 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
3331 if (server
.daemonize
)
3332 unlink(server
.pidfile
);
3333 redisLog(REDIS_WARNING
,"%zu bytes used at exit",zmalloc_used_memory());
3334 redisLog(REDIS_WARNING
,"Server exit now, bye bye...");
3337 /* Ooops.. error saving! The best we can do is to continue operating.
3338 * Note that if there was a background saving process, in the next
3339 * cron() Redis will be notified that the background saving aborted,
3340 * handling special stuff like slaves pending for synchronization... */
3341 redisLog(REDIS_WARNING
,"Error trying to save the DB, can't exit");
3342 addReplySds(c
,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3347 static void renameGenericCommand(redisClient
*c
, int nx
) {
3350 /* To use the same key as src and dst is probably an error */
3351 if (sdscmp(c
->argv
[1]->ptr
,c
->argv
[2]->ptr
) == 0) {
3352 addReply(c
,shared
.sameobjecterr
);
3356 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3358 addReply(c
,shared
.nokeyerr
);
3362 deleteIfVolatile(c
->db
,c
->argv
[2]);
3363 if (dictAdd(c
->db
->dict
,c
->argv
[2],o
) == DICT_ERR
) {
3366 addReply(c
,shared
.czero
);
3369 dictReplace(c
->db
->dict
,c
->argv
[2],o
);
3371 incrRefCount(c
->argv
[2]);
3373 deleteKey(c
->db
,c
->argv
[1]);
3375 addReply(c
,nx
? shared
.cone
: shared
.ok
);
3378 static void renameCommand(redisClient
*c
) {
3379 renameGenericCommand(c
,0);
3382 static void renamenxCommand(redisClient
*c
) {
3383 renameGenericCommand(c
,1);
3386 static void moveCommand(redisClient
*c
) {
3391 /* Obtain source and target DB pointers */
3394 if (selectDb(c
,atoi(c
->argv
[2]->ptr
)) == REDIS_ERR
) {
3395 addReply(c
,shared
.outofrangeerr
);
3399 selectDb(c
,srcid
); /* Back to the source DB */
3401 /* If the user is moving using as target the same
3402 * DB as the source DB it is probably an error. */
3404 addReply(c
,shared
.sameobjecterr
);
3408 /* Check if the element exists and get a reference */
3409 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3411 addReply(c
,shared
.czero
);
3415 /* Try to add the element to the target DB */
3416 deleteIfVolatile(dst
,c
->argv
[1]);
3417 if (dictAdd(dst
->dict
,c
->argv
[1],o
) == DICT_ERR
) {
3418 addReply(c
,shared
.czero
);
3421 incrRefCount(c
->argv
[1]);
3424 /* OK! key moved, free the entry in the source DB */
3425 deleteKey(src
,c
->argv
[1]);
3427 addReply(c
,shared
.cone
);
3430 /* =================================== Lists ================================ */
3431 static void pushGenericCommand(redisClient
*c
, int where
) {
3435 lobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3437 lobj
= createListObject();
3439 if (where
== REDIS_HEAD
) {
3440 listAddNodeHead(list
,c
->argv
[2]);
3442 listAddNodeTail(list
,c
->argv
[2]);
3444 dictAdd(c
->db
->dict
,c
->argv
[1],lobj
);
3445 incrRefCount(c
->argv
[1]);
3446 incrRefCount(c
->argv
[2]);
3448 if (lobj
->type
!= REDIS_LIST
) {
3449 addReply(c
,shared
.wrongtypeerr
);
3453 if (where
== REDIS_HEAD
) {
3454 listAddNodeHead(list
,c
->argv
[2]);
3456 listAddNodeTail(list
,c
->argv
[2]);
3458 incrRefCount(c
->argv
[2]);
3461 addReply(c
,shared
.ok
);
3464 static void lpushCommand(redisClient
*c
) {
3465 pushGenericCommand(c
,REDIS_HEAD
);
3468 static void rpushCommand(redisClient
*c
) {
3469 pushGenericCommand(c
,REDIS_TAIL
);
3472 static void llenCommand(redisClient
*c
) {
3476 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3478 addReply(c
,shared
.czero
);
3481 if (o
->type
!= REDIS_LIST
) {
3482 addReply(c
,shared
.wrongtypeerr
);
3485 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",listLength(l
)));
3490 static void lindexCommand(redisClient
*c
) {
3492 int index
= atoi(c
->argv
[2]->ptr
);
3494 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3496 addReply(c
,shared
.nullbulk
);
3498 if (o
->type
!= REDIS_LIST
) {
3499 addReply(c
,shared
.wrongtypeerr
);
3501 list
*list
= o
->ptr
;
3504 ln
= listIndex(list
, index
);
3506 addReply(c
,shared
.nullbulk
);
3508 robj
*ele
= listNodeValue(ln
);
3509 addReplyBulkLen(c
,ele
);
3511 addReply(c
,shared
.crlf
);
3517 static void lsetCommand(redisClient
*c
) {
3519 int index
= atoi(c
->argv
[2]->ptr
);
3521 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3523 addReply(c
,shared
.nokeyerr
);
3525 if (o
->type
!= REDIS_LIST
) {
3526 addReply(c
,shared
.wrongtypeerr
);
3528 list
*list
= o
->ptr
;
3531 ln
= listIndex(list
, index
);
3533 addReply(c
,shared
.outofrangeerr
);
3535 robj
*ele
= listNodeValue(ln
);
3538 listNodeValue(ln
) = c
->argv
[3];
3539 incrRefCount(c
->argv
[3]);
3540 addReply(c
,shared
.ok
);
3547 static void popGenericCommand(redisClient
*c
, int where
) {
3550 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3552 addReply(c
,shared
.nullbulk
);
3554 if (o
->type
!= REDIS_LIST
) {
3555 addReply(c
,shared
.wrongtypeerr
);
3557 list
*list
= o
->ptr
;
3560 if (where
== REDIS_HEAD
)
3561 ln
= listFirst(list
);
3563 ln
= listLast(list
);
3566 addReply(c
,shared
.nullbulk
);
3568 robj
*ele
= listNodeValue(ln
);
3569 addReplyBulkLen(c
,ele
);
3571 addReply(c
,shared
.crlf
);
3572 listDelNode(list
,ln
);
3579 static void lpopCommand(redisClient
*c
) {
3580 popGenericCommand(c
,REDIS_HEAD
);
3583 static void rpopCommand(redisClient
*c
) {
3584 popGenericCommand(c
,REDIS_TAIL
);
3587 static void lrangeCommand(redisClient
*c
) {
3589 int start
= atoi(c
->argv
[2]->ptr
);
3590 int end
= atoi(c
->argv
[3]->ptr
);
3592 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3594 addReply(c
,shared
.nullmultibulk
);
3596 if (o
->type
!= REDIS_LIST
) {
3597 addReply(c
,shared
.wrongtypeerr
);
3599 list
*list
= o
->ptr
;
3601 int llen
= listLength(list
);
3605 /* convert negative indexes */
3606 if (start
< 0) start
= llen
+start
;
3607 if (end
< 0) end
= llen
+end
;
3608 if (start
< 0) start
= 0;
3609 if (end
< 0) end
= 0;
3611 /* indexes sanity checks */
3612 if (start
> end
|| start
>= llen
) {
3613 /* Out of range start or start > end result in empty list */
3614 addReply(c
,shared
.emptymultibulk
);
3617 if (end
>= llen
) end
= llen
-1;
3618 rangelen
= (end
-start
)+1;
3620 /* Return the result in form of a multi-bulk reply */
3621 ln
= listIndex(list
, start
);
3622 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",rangelen
));
3623 for (j
= 0; j
< rangelen
; j
++) {
3624 ele
= listNodeValue(ln
);
3625 addReplyBulkLen(c
,ele
);
3627 addReply(c
,shared
.crlf
);
3634 static void ltrimCommand(redisClient
*c
) {
3636 int start
= atoi(c
->argv
[2]->ptr
);
3637 int end
= atoi(c
->argv
[3]->ptr
);
3639 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3641 addReply(c
,shared
.ok
);
3643 if (o
->type
!= REDIS_LIST
) {
3644 addReply(c
,shared
.wrongtypeerr
);
3646 list
*list
= o
->ptr
;
3648 int llen
= listLength(list
);
3649 int j
, ltrim
, rtrim
;
3651 /* convert negative indexes */
3652 if (start
< 0) start
= llen
+start
;
3653 if (end
< 0) end
= llen
+end
;
3654 if (start
< 0) start
= 0;
3655 if (end
< 0) end
= 0;
3657 /* indexes sanity checks */
3658 if (start
> end
|| start
>= llen
) {
3659 /* Out of range start or start > end result in empty list */
3663 if (end
>= llen
) end
= llen
-1;
3668 /* Remove list elements to perform the trim */
3669 for (j
= 0; j
< ltrim
; j
++) {
3670 ln
= listFirst(list
);
3671 listDelNode(list
,ln
);
3673 for (j
= 0; j
< rtrim
; j
++) {
3674 ln
= listLast(list
);
3675 listDelNode(list
,ln
);
3678 addReply(c
,shared
.ok
);
3683 static void lremCommand(redisClient
*c
) {
3686 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3688 addReply(c
,shared
.czero
);
3690 if (o
->type
!= REDIS_LIST
) {
3691 addReply(c
,shared
.wrongtypeerr
);
3693 list
*list
= o
->ptr
;
3694 listNode
*ln
, *next
;
3695 int toremove
= atoi(c
->argv
[2]->ptr
);
3700 toremove
= -toremove
;
3703 ln
= fromtail
? list
->tail
: list
->head
;
3705 robj
*ele
= listNodeValue(ln
);
3707 next
= fromtail
? ln
->prev
: ln
->next
;
3708 if (compareStringObjects(ele
,c
->argv
[3]) == 0) {
3709 listDelNode(list
,ln
);
3712 if (toremove
&& removed
== toremove
) break;
3716 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",removed
));
3721 /* This is the semantic of this command:
3722 * RPOPLPUSH srclist dstlist:
3723 * IF LLEN(srclist) > 0
3724 * element = RPOP srclist
3725 * LPUSH dstlist element
3732 * The idea is to be able to get an element from a list in a reliable way
3733 * since the element is not just returned but pushed against another list
3734 * as well. This command was originally proposed by Ezra Zygmuntowicz.
3736 static void rpoplpushcommand(redisClient
*c
) {
3739 sobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3741 addReply(c
,shared
.nullbulk
);
3743 if (sobj
->type
!= REDIS_LIST
) {
3744 addReply(c
,shared
.wrongtypeerr
);
3746 list
*srclist
= sobj
->ptr
;
3747 listNode
*ln
= listLast(srclist
);
3750 addReply(c
,shared
.nullbulk
);
3752 robj
*dobj
= lookupKeyWrite(c
->db
,c
->argv
[2]);
3753 robj
*ele
= listNodeValue(ln
);
3758 /* Create the list if the key does not exist */
3759 dobj
= createListObject();
3760 dictAdd(c
->db
->dict
,c
->argv
[2],dobj
);
3761 incrRefCount(c
->argv
[2]);
3762 } else if (dobj
->type
!= REDIS_LIST
) {
3763 addReply(c
,shared
.wrongtypeerr
);
3766 /* Add the element to the target list */
3767 dstlist
= dobj
->ptr
;
3768 listAddNodeHead(dstlist
,ele
);
3771 /* Send the element to the client as reply as well */
3772 addReplyBulkLen(c
,ele
);
3774 addReply(c
,shared
.crlf
);
3776 /* Finally remove the element from the source list */
3777 listDelNode(srclist
,ln
);
3785 /* ==================================== Sets ================================ */
3787 static void saddCommand(redisClient
*c
) {
3790 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3792 set
= createSetObject();
3793 dictAdd(c
->db
->dict
,c
->argv
[1],set
);
3794 incrRefCount(c
->argv
[1]);
3796 if (set
->type
!= REDIS_SET
) {
3797 addReply(c
,shared
.wrongtypeerr
);
3801 if (dictAdd(set
->ptr
,c
->argv
[2],NULL
) == DICT_OK
) {
3802 incrRefCount(c
->argv
[2]);
3804 addReply(c
,shared
.cone
);
3806 addReply(c
,shared
.czero
);
3810 static void sremCommand(redisClient
*c
) {
3813 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3815 addReply(c
,shared
.czero
);
3817 if (set
->type
!= REDIS_SET
) {
3818 addReply(c
,shared
.wrongtypeerr
);
3821 if (dictDelete(set
->ptr
,c
->argv
[2]) == DICT_OK
) {
3823 if (htNeedsResize(set
->ptr
)) dictResize(set
->ptr
);
3824 addReply(c
,shared
.cone
);
3826 addReply(c
,shared
.czero
);
3831 static void smoveCommand(redisClient
*c
) {
3832 robj
*srcset
, *dstset
;
3834 srcset
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3835 dstset
= lookupKeyWrite(c
->db
,c
->argv
[2]);
3837 /* If the source key does not exist return 0, if it's of the wrong type
3839 if (srcset
== NULL
|| srcset
->type
!= REDIS_SET
) {
3840 addReply(c
, srcset
? shared
.wrongtypeerr
: shared
.czero
);
3843 /* Error if the destination key is not a set as well */
3844 if (dstset
&& dstset
->type
!= REDIS_SET
) {
3845 addReply(c
,shared
.wrongtypeerr
);
3848 /* Remove the element from the source set */
3849 if (dictDelete(srcset
->ptr
,c
->argv
[3]) == DICT_ERR
) {
3850 /* Key not found in the src set! return zero */
3851 addReply(c
,shared
.czero
);
3855 /* Add the element to the destination set */
3857 dstset
= createSetObject();
3858 dictAdd(c
->db
->dict
,c
->argv
[2],dstset
);
3859 incrRefCount(c
->argv
[2]);
3861 if (dictAdd(dstset
->ptr
,c
->argv
[3],NULL
) == DICT_OK
)
3862 incrRefCount(c
->argv
[3]);
3863 addReply(c
,shared
.cone
);
3866 static void sismemberCommand(redisClient
*c
) {
3869 set
= lookupKeyRead(c
->db
,c
->argv
[1]);
3871 addReply(c
,shared
.czero
);
3873 if (set
->type
!= REDIS_SET
) {
3874 addReply(c
,shared
.wrongtypeerr
);
3877 if (dictFind(set
->ptr
,c
->argv
[2]))
3878 addReply(c
,shared
.cone
);
3880 addReply(c
,shared
.czero
);
3884 static void scardCommand(redisClient
*c
) {
3888 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3890 addReply(c
,shared
.czero
);
3893 if (o
->type
!= REDIS_SET
) {
3894 addReply(c
,shared
.wrongtypeerr
);
3897 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",
3903 static void spopCommand(redisClient
*c
) {
3907 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3909 addReply(c
,shared
.nullbulk
);
3911 if (set
->type
!= REDIS_SET
) {
3912 addReply(c
,shared
.wrongtypeerr
);
3915 de
= dictGetRandomKey(set
->ptr
);
3917 addReply(c
,shared
.nullbulk
);
3919 robj
*ele
= dictGetEntryKey(de
);
3921 addReplyBulkLen(c
,ele
);
3923 addReply(c
,shared
.crlf
);
3924 dictDelete(set
->ptr
,ele
);
3925 if (htNeedsResize(set
->ptr
)) dictResize(set
->ptr
);
3931 static void srandmemberCommand(redisClient
*c
) {
3935 set
= lookupKeyRead(c
->db
,c
->argv
[1]);
3937 addReply(c
,shared
.nullbulk
);
3939 if (set
->type
!= REDIS_SET
) {
3940 addReply(c
,shared
.wrongtypeerr
);
3943 de
= dictGetRandomKey(set
->ptr
);
3945 addReply(c
,shared
.nullbulk
);
3947 robj
*ele
= dictGetEntryKey(de
);
3949 addReplyBulkLen(c
,ele
);
3951 addReply(c
,shared
.crlf
);
3956 static int qsortCompareSetsByCardinality(const void *s1
, const void *s2
) {
3957 dict
**d1
= (void*) s1
, **d2
= (void*) s2
;
3959 return dictSize(*d1
)-dictSize(*d2
);
3962 static void sinterGenericCommand(redisClient
*c
, robj
**setskeys
, unsigned long setsnum
, robj
*dstkey
) {
3963 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
3966 robj
*lenobj
= NULL
, *dstset
= NULL
;
3967 unsigned long j
, cardinality
= 0;
3969 for (j
= 0; j
< setsnum
; j
++) {
3973 lookupKeyWrite(c
->db
,setskeys
[j
]) :
3974 lookupKeyRead(c
->db
,setskeys
[j
]);
3978 if (deleteKey(c
->db
,dstkey
))
3980 addReply(c
,shared
.czero
);
3982 addReply(c
,shared
.nullmultibulk
);
3986 if (setobj
->type
!= REDIS_SET
) {
3988 addReply(c
,shared
.wrongtypeerr
);
3991 dv
[j
] = setobj
->ptr
;
3993 /* Sort sets from the smallest to largest, this will improve our
3994 * algorithm's performace */
3995 qsort(dv
,setsnum
,sizeof(dict
*),qsortCompareSetsByCardinality
);
3997 /* The first thing we should output is the total number of elements...
3998 * since this is a multi-bulk write, but at this stage we don't know
3999 * the intersection set size, so we use a trick, append an empty object
4000 * to the output list and save the pointer to later modify it with the
4003 lenobj
= createObject(REDIS_STRING
,NULL
);
4005 decrRefCount(lenobj
);
4007 /* If we have a target key where to store the resulting set
4008 * create this key with an empty set inside */
4009 dstset
= createSetObject();
4012 /* Iterate all the elements of the first (smallest) set, and test
4013 * the element against all the other sets, if at least one set does
4014 * not include the element it is discarded */
4015 di
= dictGetIterator(dv
[0]);
4017 while((de
= dictNext(di
)) != NULL
) {
4020 for (j
= 1; j
< setsnum
; j
++)
4021 if (dictFind(dv
[j
],dictGetEntryKey(de
)) == NULL
) break;
4023 continue; /* at least one set does not contain the member */
4024 ele
= dictGetEntryKey(de
);
4026 addReplyBulkLen(c
,ele
);
4028 addReply(c
,shared
.crlf
);
4031 dictAdd(dstset
->ptr
,ele
,NULL
);
4035 dictReleaseIterator(di
);
4038 /* Store the resulting set into the target */
4039 deleteKey(c
->db
,dstkey
);
4040 dictAdd(c
->db
->dict
,dstkey
,dstset
);
4041 incrRefCount(dstkey
);
4045 lenobj
->ptr
= sdscatprintf(sdsempty(),"*%lu\r\n",cardinality
);
4047 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",
4048 dictSize((dict
*)dstset
->ptr
)));
4054 static void sinterCommand(redisClient
*c
) {
4055 sinterGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
);
4058 static void sinterstoreCommand(redisClient
*c
) {
4059 sinterGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1]);
4062 #define REDIS_OP_UNION 0
4063 #define REDIS_OP_DIFF 1
4065 static void sunionDiffGenericCommand(redisClient
*c
, robj
**setskeys
, int setsnum
, robj
*dstkey
, int op
) {
4066 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
4069 robj
*dstset
= NULL
;
4070 int j
, cardinality
= 0;
4072 for (j
= 0; j
< setsnum
; j
++) {
4076 lookupKeyWrite(c
->db
,setskeys
[j
]) :
4077 lookupKeyRead(c
->db
,setskeys
[j
]);
4082 if (setobj
->type
!= REDIS_SET
) {
4084 addReply(c
,shared
.wrongtypeerr
);
4087 dv
[j
] = setobj
->ptr
;
4090 /* We need a temp set object to store our union. If the dstkey
4091 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
4092 * this set object will be the resulting object to set into the target key*/
4093 dstset
= createSetObject();
4095 /* Iterate all the elements of all the sets, add every element a single
4096 * time to the result set */
4097 for (j
= 0; j
< setsnum
; j
++) {
4098 if (op
== REDIS_OP_DIFF
&& j
== 0 && !dv
[j
]) break; /* result set is empty */
4099 if (!dv
[j
]) continue; /* non existing keys are like empty sets */
4101 di
= dictGetIterator(dv
[j
]);
4103 while((de
= dictNext(di
)) != NULL
) {
4106 /* dictAdd will not add the same element multiple times */
4107 ele
= dictGetEntryKey(de
);
4108 if (op
== REDIS_OP_UNION
|| j
== 0) {
4109 if (dictAdd(dstset
->ptr
,ele
,NULL
) == DICT_OK
) {
4113 } else if (op
== REDIS_OP_DIFF
) {
4114 if (dictDelete(dstset
->ptr
,ele
) == DICT_OK
) {
4119 dictReleaseIterator(di
);
4121 if (op
== REDIS_OP_DIFF
&& cardinality
== 0) break; /* result set is empty */
4124 /* Output the content of the resulting set, if not in STORE mode */
4126 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",cardinality
));
4127 di
= dictGetIterator(dstset
->ptr
);
4128 while((de
= dictNext(di
)) != NULL
) {
4131 ele
= dictGetEntryKey(de
);
4132 addReplyBulkLen(c
,ele
);
4134 addReply(c
,shared
.crlf
);
4136 dictReleaseIterator(di
);
4138 /* If we have a target key where to store the resulting set
4139 * create this key with the result set inside */
4140 deleteKey(c
->db
,dstkey
);
4141 dictAdd(c
->db
->dict
,dstkey
,dstset
);
4142 incrRefCount(dstkey
);
4147 decrRefCount(dstset
);
4149 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",
4150 dictSize((dict
*)dstset
->ptr
)));
4156 static void sunionCommand(redisClient
*c
) {
4157 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_UNION
);
4160 static void sunionstoreCommand(redisClient
*c
) {
4161 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_UNION
);
4164 static void sdiffCommand(redisClient
*c
) {
4165 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_DIFF
);
4168 static void sdiffstoreCommand(redisClient
*c
) {
4169 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_DIFF
);
4172 /* ==================================== ZSets =============================== */
4174 /* ZSETs are ordered sets using two data structures to hold the same elements
4175 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
4178 * The elements are added to an hash table mapping Redis objects to scores.
4179 * At the same time the elements are added to a skip list mapping scores
4180 * to Redis objects (so objects are sorted by scores in this "view"). */
4182 /* This skiplist implementation is almost a C translation of the original
4183 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
4184 * Alternative to Balanced Trees", modified in three ways:
4185 * a) this implementation allows for repeated values.
4186 * b) the comparison is not just by key (our 'score') but by satellite data.
4187 * c) there is a back pointer, so it's a doubly linked list with the back
4188 * pointers being only at "level 1". This allows to traverse the list
4189 * from tail to head, useful for ZREVRANGE. */
4191 static zskiplistNode
*zslCreateNode(int level
, double score
, robj
*obj
) {
4192 zskiplistNode
*zn
= zmalloc(sizeof(*zn
));
4194 zn
->forward
= zmalloc(sizeof(zskiplistNode
*) * level
);
4200 static zskiplist
*zslCreate(void) {
4204 zsl
= zmalloc(sizeof(*zsl
));
4207 zsl
->header
= zslCreateNode(ZSKIPLIST_MAXLEVEL
,0,NULL
);
4208 for (j
= 0; j
< ZSKIPLIST_MAXLEVEL
; j
++)
4209 zsl
->header
->forward
[j
] = NULL
;
4210 zsl
->header
->backward
= NULL
;
4215 static void zslFreeNode(zskiplistNode
*node
) {
4216 decrRefCount(node
->obj
);
4217 zfree(node
->forward
);
4221 static void zslFree(zskiplist
*zsl
) {
4222 zskiplistNode
*node
= zsl
->header
->forward
[0], *next
;
4224 zfree(zsl
->header
->forward
);
4227 next
= node
->forward
[0];
4234 static int zslRandomLevel(void) {
4236 while ((random()&0xFFFF) < (ZSKIPLIST_P
* 0xFFFF))
4241 static void zslInsert(zskiplist
*zsl
, double score
, robj
*obj
) {
4242 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
4246 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4247 while (x
->forward
[i
] &&
4248 (x
->forward
[i
]->score
< score
||
4249 (x
->forward
[i
]->score
== score
&&
4250 compareStringObjects(x
->forward
[i
]->obj
,obj
) < 0)))
4254 /* we assume the key is not already inside, since we allow duplicated
4255 * scores, and the re-insertion of score and redis object should never
4256 * happpen since the caller of zslInsert() should test in the hash table
4257 * if the element is already inside or not. */
4258 level
= zslRandomLevel();
4259 if (level
> zsl
->level
) {
4260 for (i
= zsl
->level
; i
< level
; i
++)
4261 update
[i
] = zsl
->header
;
4264 x
= zslCreateNode(level
,score
,obj
);
4265 for (i
= 0; i
< level
; i
++) {
4266 x
->forward
[i
] = update
[i
]->forward
[i
];
4267 update
[i
]->forward
[i
] = x
;
4269 x
->backward
= (update
[0] == zsl
->header
) ? NULL
: update
[0];
4271 x
->forward
[0]->backward
= x
;
4277 /* Delete an element with matching score/object from the skiplist. */
4278 static int zslDelete(zskiplist
*zsl
, double score
, robj
*obj
) {
4279 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
4283 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4284 while (x
->forward
[i
] &&
4285 (x
->forward
[i
]->score
< score
||
4286 (x
->forward
[i
]->score
== score
&&
4287 compareStringObjects(x
->forward
[i
]->obj
,obj
) < 0)))
4291 /* We may have multiple elements with the same score, what we need
4292 * is to find the element with both the right score and object. */
4294 if (x
&& score
== x
->score
&& compareStringObjects(x
->obj
,obj
) == 0) {
4295 for (i
= 0; i
< zsl
->level
; i
++) {
4296 if (update
[i
]->forward
[i
] != x
) break;
4297 update
[i
]->forward
[i
] = x
->forward
[i
];
4299 if (x
->forward
[0]) {
4300 x
->forward
[0]->backward
= (x
->backward
== zsl
->header
) ?
4303 zsl
->tail
= x
->backward
;
4306 while(zsl
->level
> 1 && zsl
->header
->forward
[zsl
->level
-1] == NULL
)
4311 return 0; /* not found */
4313 return 0; /* not found */
4316 /* Delete all the elements with score between min and max from the skiplist.
4317 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
4318 * Note that this function takes the reference to the hash table view of the
4319 * sorted set, in order to remove the elements from the hash table too. */
4320 static unsigned long zslDeleteRange(zskiplist
*zsl
, double min
, double max
, dict
*dict
) {
4321 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
4322 unsigned long removed
= 0;
4326 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4327 while (x
->forward
[i
] && x
->forward
[i
]->score
< min
)
4331 /* We may have multiple elements with the same score, what we need
4332 * is to find the element with both the right score and object. */
4334 while (x
&& x
->score
<= max
) {
4335 zskiplistNode
*next
;
4337 for (i
= 0; i
< zsl
->level
; i
++) {
4338 if (update
[i
]->forward
[i
] != x
) break;
4339 update
[i
]->forward
[i
] = x
->forward
[i
];
4341 if (x
->forward
[0]) {
4342 x
->forward
[0]->backward
= (x
->backward
== zsl
->header
) ?
4345 zsl
->tail
= x
->backward
;
4347 next
= x
->forward
[0];
4348 dictDelete(dict
,x
->obj
);
4350 while(zsl
->level
> 1 && zsl
->header
->forward
[zsl
->level
-1] == NULL
)
4356 return removed
; /* not found */
4359 /* Find the first node having a score equal or greater than the specified one.
4360 * Returns NULL if there is no match. */
4361 static zskiplistNode
*zslFirstWithScore(zskiplist
*zsl
, double score
) {
4366 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4367 while (x
->forward
[i
] && x
->forward
[i
]->score
< score
)
4370 /* We may have multiple elements with the same score, what we need
4371 * is to find the element with both the right score and object. */
4372 return x
->forward
[0];
4375 /* The actual Z-commands implementations */
4377 /* This generic command implements both ZADD and ZINCRBY.
4378 * scoreval is the score if the operation is a ZADD (doincrement == 0) or
4379 * the increment if the operation is a ZINCRBY (doincrement == 1). */
4380 static void zaddGenericCommand(redisClient
*c
, robj
*key
, robj
*ele
, double scoreval
, int doincrement
) {
4385 zsetobj
= lookupKeyWrite(c
->db
,key
);
4386 if (zsetobj
== NULL
) {
4387 zsetobj
= createZsetObject();
4388 dictAdd(c
->db
->dict
,key
,zsetobj
);
4391 if (zsetobj
->type
!= REDIS_ZSET
) {
4392 addReply(c
,shared
.wrongtypeerr
);
4398 /* Ok now since we implement both ZADD and ZINCRBY here the code
4399 * needs to handle the two different conditions. It's all about setting
4400 * '*score', that is, the new score to set, to the right value. */
4401 score
= zmalloc(sizeof(double));
4405 /* Read the old score. If the element was not present starts from 0 */
4406 de
= dictFind(zs
->dict
,ele
);
4408 double *oldscore
= dictGetEntryVal(de
);
4409 *score
= *oldscore
+ scoreval
;
4417 /* What follows is a simple remove and re-insert operation that is common
4418 * to both ZADD and ZINCRBY... */
4419 if (dictAdd(zs
->dict
,ele
,score
) == DICT_OK
) {
4420 /* case 1: New element */
4421 incrRefCount(ele
); /* added to hash */
4422 zslInsert(zs
->zsl
,*score
,ele
);
4423 incrRefCount(ele
); /* added to skiplist */
4426 addReplyDouble(c
,*score
);
4428 addReply(c
,shared
.cone
);
4433 /* case 2: Score update operation */
4434 de
= dictFind(zs
->dict
,ele
);
4435 redisAssert(de
!= NULL
);
4436 oldscore
= dictGetEntryVal(de
);
4437 if (*score
!= *oldscore
) {
4440 /* Remove and insert the element in the skip list with new score */
4441 deleted
= zslDelete(zs
->zsl
,*oldscore
,ele
);
4442 redisAssert(deleted
!= 0);
4443 zslInsert(zs
->zsl
,*score
,ele
);
4445 /* Update the score in the hash table */
4446 dictReplace(zs
->dict
,ele
,score
);
4452 addReplyDouble(c
,*score
);
4454 addReply(c
,shared
.czero
);
4458 static void zaddCommand(redisClient
*c
) {
4461 scoreval
= strtod(c
->argv
[2]->ptr
,NULL
);
4462 zaddGenericCommand(c
,c
->argv
[1],c
->argv
[3],scoreval
,0);
4465 static void zincrbyCommand(redisClient
*c
) {
4468 scoreval
= strtod(c
->argv
[2]->ptr
,NULL
);
4469 zaddGenericCommand(c
,c
->argv
[1],c
->argv
[3],scoreval
,1);
4472 static void zremCommand(redisClient
*c
) {
4476 zsetobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4477 if (zsetobj
== NULL
) {
4478 addReply(c
,shared
.czero
);
4484 if (zsetobj
->type
!= REDIS_ZSET
) {
4485 addReply(c
,shared
.wrongtypeerr
);
4489 de
= dictFind(zs
->dict
,c
->argv
[2]);
4491 addReply(c
,shared
.czero
);
4494 /* Delete from the skiplist */
4495 oldscore
= dictGetEntryVal(de
);
4496 deleted
= zslDelete(zs
->zsl
,*oldscore
,c
->argv
[2]);
4497 redisAssert(deleted
!= 0);
4499 /* Delete from the hash table */
4500 dictDelete(zs
->dict
,c
->argv
[2]);
4501 if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
);
4503 addReply(c
,shared
.cone
);
4507 static void zremrangebyscoreCommand(redisClient
*c
) {
4508 double min
= strtod(c
->argv
[2]->ptr
,NULL
);
4509 double max
= strtod(c
->argv
[3]->ptr
,NULL
);
4513 zsetobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4514 if (zsetobj
== NULL
) {
4515 addReply(c
,shared
.czero
);
4519 if (zsetobj
->type
!= REDIS_ZSET
) {
4520 addReply(c
,shared
.wrongtypeerr
);
4524 deleted
= zslDeleteRange(zs
->zsl
,min
,max
,zs
->dict
);
4525 if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
);
4526 server
.dirty
+= deleted
;
4527 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",deleted
));
4531 static void zrangeGenericCommand(redisClient
*c
, int reverse
) {
4533 int start
= atoi(c
->argv
[2]->ptr
);
4534 int end
= atoi(c
->argv
[3]->ptr
);
4537 if (c
->argc
== 5 && !strcasecmp(c
->argv
[4]->ptr
,"withscores")) {
4539 } else if (c
->argc
>= 5) {
4540 addReply(c
,shared
.syntaxerr
);
4544 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4546 addReply(c
,shared
.nullmultibulk
);
4548 if (o
->type
!= REDIS_ZSET
) {
4549 addReply(c
,shared
.wrongtypeerr
);
4551 zset
*zsetobj
= o
->ptr
;
4552 zskiplist
*zsl
= zsetobj
->zsl
;
4555 int llen
= zsl
->length
;
4559 /* convert negative indexes */
4560 if (start
< 0) start
= llen
+start
;
4561 if (end
< 0) end
= llen
+end
;
4562 if (start
< 0) start
= 0;
4563 if (end
< 0) end
= 0;
4565 /* indexes sanity checks */
4566 if (start
> end
|| start
>= llen
) {
4567 /* Out of range start or start > end result in empty list */
4568 addReply(c
,shared
.emptymultibulk
);
4571 if (end
>= llen
) end
= llen
-1;
4572 rangelen
= (end
-start
)+1;
4574 /* Return the result in form of a multi-bulk reply */
4580 ln
= zsl
->header
->forward
[0];
4582 ln
= ln
->forward
[0];
4585 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",
4586 withscores
? (rangelen
*2) : rangelen
));
4587 for (j
= 0; j
< rangelen
; j
++) {
4589 addReplyBulkLen(c
,ele
);
4591 addReply(c
,shared
.crlf
);
4593 addReplyDouble(c
,ln
->score
);
4594 ln
= reverse
? ln
->backward
: ln
->forward
[0];
4600 static void zrangeCommand(redisClient
*c
) {
4601 zrangeGenericCommand(c
,0);
4604 static void zrevrangeCommand(redisClient
*c
) {
4605 zrangeGenericCommand(c
,1);
4608 static void zrangebyscoreCommand(redisClient
*c
) {
4610 double min
= strtod(c
->argv
[2]->ptr
,NULL
);
4611 double max
= strtod(c
->argv
[3]->ptr
,NULL
);
4612 int offset
= 0, limit
= -1;
4614 if (c
->argc
!= 4 && c
->argc
!= 7) {
4616 sdsnew("-ERR wrong number of arguments for ZRANGEBYSCORE\r\n"));
4618 } else if (c
->argc
== 7 && strcasecmp(c
->argv
[4]->ptr
,"limit")) {
4619 addReply(c
,shared
.syntaxerr
);
4621 } else if (c
->argc
== 7) {
4622 offset
= atoi(c
->argv
[5]->ptr
);
4623 limit
= atoi(c
->argv
[6]->ptr
);
4624 if (offset
< 0) offset
= 0;
4627 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4629 addReply(c
,shared
.nullmultibulk
);
4631 if (o
->type
!= REDIS_ZSET
) {
4632 addReply(c
,shared
.wrongtypeerr
);
4634 zset
*zsetobj
= o
->ptr
;
4635 zskiplist
*zsl
= zsetobj
->zsl
;
4638 unsigned int rangelen
= 0;
4640 /* Get the first node with the score >= min */
4641 ln
= zslFirstWithScore(zsl
,min
);
4643 /* No element matching the speciifed interval */
4644 addReply(c
,shared
.emptymultibulk
);
4648 /* We don't know in advance how many matching elements there
4649 * are in the list, so we push this object that will represent
4650 * the multi-bulk length in the output buffer, and will "fix"
4652 lenobj
= createObject(REDIS_STRING
,NULL
);
4654 decrRefCount(lenobj
);
4656 while(ln
&& ln
->score
<= max
) {
4659 ln
= ln
->forward
[0];
4662 if (limit
== 0) break;
4664 addReplyBulkLen(c
,ele
);
4666 addReply(c
,shared
.crlf
);
4667 ln
= ln
->forward
[0];
4669 if (limit
> 0) limit
--;
4671 lenobj
->ptr
= sdscatprintf(sdsempty(),"*%d\r\n",rangelen
);
4676 static void zcardCommand(redisClient
*c
) {
4680 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4682 addReply(c
,shared
.czero
);
4685 if (o
->type
!= REDIS_ZSET
) {
4686 addReply(c
,shared
.wrongtypeerr
);
4689 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",zs
->zsl
->length
));
4694 static void zscoreCommand(redisClient
*c
) {
4698 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4700 addReply(c
,shared
.nullbulk
);
4703 if (o
->type
!= REDIS_ZSET
) {
4704 addReply(c
,shared
.wrongtypeerr
);
4709 de
= dictFind(zs
->dict
,c
->argv
[2]);
4711 addReply(c
,shared
.nullbulk
);
4713 double *score
= dictGetEntryVal(de
);
4715 addReplyDouble(c
,*score
);
4721 /* ========================= Non type-specific commands ==================== */
4723 static void flushdbCommand(redisClient
*c
) {
4724 server
.dirty
+= dictSize(c
->db
->dict
);
4725 dictEmpty(c
->db
->dict
);
4726 dictEmpty(c
->db
->expires
);
4727 addReply(c
,shared
.ok
);
4730 static void flushallCommand(redisClient
*c
) {
4731 server
.dirty
+= emptyDb();
4732 addReply(c
,shared
.ok
);
4733 rdbSave(server
.dbfilename
);
4737 static redisSortOperation
*createSortOperation(int type
, robj
*pattern
) {
4738 redisSortOperation
*so
= zmalloc(sizeof(*so
));
4740 so
->pattern
= pattern
;
4744 /* Return the value associated to the key with a name obtained
4745 * substituting the first occurence of '*' in 'pattern' with 'subst' */
4746 static robj
*lookupKeyByPattern(redisDb
*db
, robj
*pattern
, robj
*subst
) {
4750 int prefixlen
, sublen
, postfixlen
;
4751 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
4755 char buf
[REDIS_SORTKEY_MAX
+1];
4758 /* If the pattern is "#" return the substitution object itself in order
4759 * to implement the "SORT ... GET #" feature. */
4760 spat
= pattern
->ptr
;
4761 if (spat
[0] == '#' && spat
[1] == '\0') {
4765 /* The substitution object may be specially encoded. If so we create
4766 * a decoded object on the fly. Otherwise getDecodedObject will just
4767 * increment the ref count, that we'll decrement later. */
4768 subst
= getDecodedObject(subst
);
4771 if (sdslen(spat
)+sdslen(ssub
)-1 > REDIS_SORTKEY_MAX
) return NULL
;
4772 p
= strchr(spat
,'*');
4774 decrRefCount(subst
);
4779 sublen
= sdslen(ssub
);
4780 postfixlen
= sdslen(spat
)-(prefixlen
+1);
4781 memcpy(keyname
.buf
,spat
,prefixlen
);
4782 memcpy(keyname
.buf
+prefixlen
,ssub
,sublen
);
4783 memcpy(keyname
.buf
+prefixlen
+sublen
,p
+1,postfixlen
);
4784 keyname
.buf
[prefixlen
+sublen
+postfixlen
] = '\0';
4785 keyname
.len
= prefixlen
+sublen
+postfixlen
;
4787 initStaticStringObject(keyobj
,((char*)&keyname
)+(sizeof(long)*2))
4788 decrRefCount(subst
);
4790 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
4791 return lookupKeyRead(db
,&keyobj
);
4794 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
4795 * the additional parameter is not standard but a BSD-specific we have to
4796 * pass sorting parameters via the global 'server' structure */
4797 static int sortCompare(const void *s1
, const void *s2
) {
4798 const redisSortObject
*so1
= s1
, *so2
= s2
;
4801 if (!server
.sort_alpha
) {
4802 /* Numeric sorting. Here it's trivial as we precomputed scores */
4803 if (so1
->u
.score
> so2
->u
.score
) {
4805 } else if (so1
->u
.score
< so2
->u
.score
) {
4811 /* Alphanumeric sorting */
4812 if (server
.sort_bypattern
) {
4813 if (!so1
->u
.cmpobj
|| !so2
->u
.cmpobj
) {
4814 /* At least one compare object is NULL */
4815 if (so1
->u
.cmpobj
== so2
->u
.cmpobj
)
4817 else if (so1
->u
.cmpobj
== NULL
)
4822 /* We have both the objects, use strcoll */
4823 cmp
= strcoll(so1
->u
.cmpobj
->ptr
,so2
->u
.cmpobj
->ptr
);
4826 /* Compare elements directly */
4829 dec1
= getDecodedObject(so1
->obj
);
4830 dec2
= getDecodedObject(so2
->obj
);
4831 cmp
= strcoll(dec1
->ptr
,dec2
->ptr
);
4836 return server
.sort_desc
? -cmp
: cmp
;
4839 /* The SORT command is the most complex command in Redis. Warning: this code
4840 * is optimized for speed and a bit less for readability */
4841 static void sortCommand(redisClient
*c
) {
4844 int desc
= 0, alpha
= 0;
4845 int limit_start
= 0, limit_count
= -1, start
, end
;
4846 int j
, dontsort
= 0, vectorlen
;
4847 int getop
= 0; /* GET operation counter */
4848 robj
*sortval
, *sortby
= NULL
, *storekey
= NULL
;
4849 redisSortObject
*vector
; /* Resulting vector to sort */
4851 /* Lookup the key to sort. It must be of the right types */
4852 sortval
= lookupKeyRead(c
->db
,c
->argv
[1]);
4853 if (sortval
== NULL
) {
4854 addReply(c
,shared
.nullmultibulk
);
4857 if (sortval
->type
!= REDIS_SET
&& sortval
->type
!= REDIS_LIST
&&
4858 sortval
->type
!= REDIS_ZSET
)
4860 addReply(c
,shared
.wrongtypeerr
);
4864 /* Create a list of operations to perform for every sorted element.
4865 * Operations can be GET/DEL/INCR/DECR */
4866 operations
= listCreate();
4867 listSetFreeMethod(operations
,zfree
);
4870 /* Now we need to protect sortval incrementing its count, in the future
4871 * SORT may have options able to overwrite/delete keys during the sorting
4872 * and the sorted key itself may get destroied */
4873 incrRefCount(sortval
);
4875 /* The SORT command has an SQL-alike syntax, parse it */
4876 while(j
< c
->argc
) {
4877 int leftargs
= c
->argc
-j
-1;
4878 if (!strcasecmp(c
->argv
[j
]->ptr
,"asc")) {
4880 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"desc")) {
4882 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"alpha")) {
4884 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"limit") && leftargs
>= 2) {
4885 limit_start
= atoi(c
->argv
[j
+1]->ptr
);
4886 limit_count
= atoi(c
->argv
[j
+2]->ptr
);
4888 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"store") && leftargs
>= 1) {
4889 storekey
= c
->argv
[j
+1];
4891 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"by") && leftargs
>= 1) {
4892 sortby
= c
->argv
[j
+1];
4893 /* If the BY pattern does not contain '*', i.e. it is constant,
4894 * we don't need to sort nor to lookup the weight keys. */
4895 if (strchr(c
->argv
[j
+1]->ptr
,'*') == NULL
) dontsort
= 1;
4897 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"get") && leftargs
>= 1) {
4898 listAddNodeTail(operations
,createSortOperation(
4899 REDIS_SORT_GET
,c
->argv
[j
+1]));
4903 decrRefCount(sortval
);
4904 listRelease(operations
);
4905 addReply(c
,shared
.syntaxerr
);
4911 /* Load the sorting vector with all the objects to sort */
4912 switch(sortval
->type
) {
4913 case REDIS_LIST
: vectorlen
= listLength((list
*)sortval
->ptr
); break;
4914 case REDIS_SET
: vectorlen
= dictSize((dict
*)sortval
->ptr
); break;
4915 case REDIS_ZSET
: vectorlen
= dictSize(((zset
*)sortval
->ptr
)->dict
); break;
4916 default: vectorlen
= 0; redisAssert(0); /* Avoid GCC warning */
4918 vector
= zmalloc(sizeof(redisSortObject
)*vectorlen
);
4921 if (sortval
->type
== REDIS_LIST
) {
4922 list
*list
= sortval
->ptr
;
4926 while((ln
= listYield(list
))) {
4927 robj
*ele
= ln
->value
;
4928 vector
[j
].obj
= ele
;
4929 vector
[j
].u
.score
= 0;
4930 vector
[j
].u
.cmpobj
= NULL
;
4938 if (sortval
->type
== REDIS_SET
) {
4941 zset
*zs
= sortval
->ptr
;
4945 di
= dictGetIterator(set
);
4946 while((setele
= dictNext(di
)) != NULL
) {
4947 vector
[j
].obj
= dictGetEntryKey(setele
);
4948 vector
[j
].u
.score
= 0;
4949 vector
[j
].u
.cmpobj
= NULL
;
4952 dictReleaseIterator(di
);
4954 redisAssert(j
== vectorlen
);
4956 /* Now it's time to load the right scores in the sorting vector */
4957 if (dontsort
== 0) {
4958 for (j
= 0; j
< vectorlen
; j
++) {
4962 byval
= lookupKeyByPattern(c
->db
,sortby
,vector
[j
].obj
);
4963 if (!byval
|| byval
->type
!= REDIS_STRING
) continue;
4965 vector
[j
].u
.cmpobj
= getDecodedObject(byval
);
4967 if (byval
->encoding
== REDIS_ENCODING_RAW
) {
4968 vector
[j
].u
.score
= strtod(byval
->ptr
,NULL
);
4970 /* Don't need to decode the object if it's
4971 * integer-encoded (the only encoding supported) so
4972 * far. We can just cast it */
4973 if (byval
->encoding
== REDIS_ENCODING_INT
) {
4974 vector
[j
].u
.score
= (long)byval
->ptr
;
4976 redisAssert(1 != 1);
4981 if (vector
[j
].obj
->encoding
== REDIS_ENCODING_RAW
)
4982 vector
[j
].u
.score
= strtod(vector
[j
].obj
->ptr
,NULL
);
4984 if (vector
[j
].obj
->encoding
== REDIS_ENCODING_INT
)
4985 vector
[j
].u
.score
= (long) vector
[j
].obj
->ptr
;
4987 redisAssert(1 != 1);
4994 /* We are ready to sort the vector... perform a bit of sanity check
4995 * on the LIMIT option too. We'll use a partial version of quicksort. */
4996 start
= (limit_start
< 0) ? 0 : limit_start
;
4997 end
= (limit_count
< 0) ? vectorlen
-1 : start
+limit_count
-1;
4998 if (start
>= vectorlen
) {
4999 start
= vectorlen
-1;
5002 if (end
>= vectorlen
) end
= vectorlen
-1;
5004 if (dontsort
== 0) {
5005 server
.sort_desc
= desc
;
5006 server
.sort_alpha
= alpha
;
5007 server
.sort_bypattern
= sortby
? 1 : 0;
5008 if (sortby
&& (start
!= 0 || end
!= vectorlen
-1))
5009 pqsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
, start
,end
);
5011 qsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
);
5014 /* Send command output to the output buffer, performing the specified
5015 * GET/DEL/INCR/DECR operations if any. */
5016 outputlen
= getop
? getop
*(end
-start
+1) : end
-start
+1;
5017 if (storekey
== NULL
) {
5018 /* STORE option not specified, sent the sorting result to client */
5019 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",outputlen
));
5020 for (j
= start
; j
<= end
; j
++) {
5023 addReplyBulkLen(c
,vector
[j
].obj
);
5024 addReply(c
,vector
[j
].obj
);
5025 addReply(c
,shared
.crlf
);
5027 listRewind(operations
);
5028 while((ln
= listYield(operations
))) {
5029 redisSortOperation
*sop
= ln
->value
;
5030 robj
*val
= lookupKeyByPattern(c
->db
,sop
->pattern
,
5033 if (sop
->type
== REDIS_SORT_GET
) {
5034 if (!val
|| val
->type
!= REDIS_STRING
) {
5035 addReply(c
,shared
.nullbulk
);
5037 addReplyBulkLen(c
,val
);
5039 addReply(c
,shared
.crlf
);
5042 redisAssert(sop
->type
== REDIS_SORT_GET
); /* always fails */
5047 robj
*listObject
= createListObject();
5048 list
*listPtr
= (list
*) listObject
->ptr
;
5050 /* STORE option specified, set the sorting result as a List object */
5051 for (j
= start
; j
<= end
; j
++) {
5054 listAddNodeTail(listPtr
,vector
[j
].obj
);
5055 incrRefCount(vector
[j
].obj
);
5057 listRewind(operations
);
5058 while((ln
= listYield(operations
))) {
5059 redisSortOperation
*sop
= ln
->value
;
5060 robj
*val
= lookupKeyByPattern(c
->db
,sop
->pattern
,
5063 if (sop
->type
== REDIS_SORT_GET
) {
5064 if (!val
|| val
->type
!= REDIS_STRING
) {
5065 listAddNodeTail(listPtr
,createStringObject("",0));
5067 listAddNodeTail(listPtr
,val
);
5071 redisAssert(sop
->type
== REDIS_SORT_GET
); /* always fails */
5075 if (dictReplace(c
->db
->dict
,storekey
,listObject
)) {
5076 incrRefCount(storekey
);
5078 /* Note: we add 1 because the DB is dirty anyway since even if the
5079 * SORT result is empty a new key is set and maybe the old content
5081 server
.dirty
+= 1+outputlen
;
5082 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",outputlen
));
5086 decrRefCount(sortval
);
5087 listRelease(operations
);
5088 for (j
= 0; j
< vectorlen
; j
++) {
5089 if (sortby
&& alpha
&& vector
[j
].u
.cmpobj
)
5090 decrRefCount(vector
[j
].u
.cmpobj
);
5095 /* Create the string returned by the INFO command. This is decoupled
5096 * by the INFO command itself as we need to report the same information
5097 * on memory corruption problems. */
5098 static sds
genRedisInfoString(void) {
5100 time_t uptime
= time(NULL
)-server
.stat_starttime
;
5103 info
= sdscatprintf(sdsempty(),
5104 "redis_version:%s\r\n"
5106 "multiplexing_api:%s\r\n"
5107 "uptime_in_seconds:%ld\r\n"
5108 "uptime_in_days:%ld\r\n"
5109 "connected_clients:%d\r\n"
5110 "connected_slaves:%d\r\n"
5111 "used_memory:%zu\r\n"
5112 "changes_since_last_save:%lld\r\n"
5113 "bgsave_in_progress:%d\r\n"
5114 "last_save_time:%ld\r\n"
5115 "bgrewriteaof_in_progress:%d\r\n"
5116 "total_connections_received:%lld\r\n"
5117 "total_commands_processed:%lld\r\n"
5120 (sizeof(long) == 8) ? "64" : "32",
5124 listLength(server
.clients
)-listLength(server
.slaves
),
5125 listLength(server
.slaves
),
5128 server
.bgsavechildpid
!= -1,
5130 server
.bgrewritechildpid
!= -1,
5131 server
.stat_numconnections
,
5132 server
.stat_numcommands
,
5133 server
.masterhost
== NULL
? "master" : "slave"
5135 if (server
.masterhost
) {
5136 info
= sdscatprintf(info
,
5137 "master_host:%s\r\n"
5138 "master_port:%d\r\n"
5139 "master_link_status:%s\r\n"
5140 "master_last_io_seconds_ago:%d\r\n"
5143 (server
.replstate
== REDIS_REPL_CONNECTED
) ?
5145 server
.master
? ((int)(time(NULL
)-server
.master
->lastinteraction
)) : -1
5148 for (j
= 0; j
< server
.dbnum
; j
++) {
5149 long long keys
, vkeys
;
5151 keys
= dictSize(server
.db
[j
].dict
);
5152 vkeys
= dictSize(server
.db
[j
].expires
);
5153 if (keys
|| vkeys
) {
5154 info
= sdscatprintf(info
, "db%d:keys=%lld,expires=%lld\r\n",
5161 static void infoCommand(redisClient
*c
) {
5162 sds info
= genRedisInfoString();
5163 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n",
5164 (unsigned long)sdslen(info
)));
5165 addReplySds(c
,info
);
5166 addReply(c
,shared
.crlf
);
5169 static void monitorCommand(redisClient
*c
) {
5170 /* ignore MONITOR if aleady slave or in monitor mode */
5171 if (c
->flags
& REDIS_SLAVE
) return;
5173 c
->flags
|= (REDIS_SLAVE
|REDIS_MONITOR
);
5175 listAddNodeTail(server
.monitors
,c
);
5176 addReply(c
,shared
.ok
);
5179 /* ================================= Expire ================================= */
5180 static int removeExpire(redisDb
*db
, robj
*key
) {
5181 if (dictDelete(db
->expires
,key
) == DICT_OK
) {
5188 static int setExpire(redisDb
*db
, robj
*key
, time_t when
) {
5189 if (dictAdd(db
->expires
,key
,(void*)when
) == DICT_ERR
) {
5197 /* Return the expire time of the specified key, or -1 if no expire
5198 * is associated with this key (i.e. the key is non volatile) */
5199 static time_t getExpire(redisDb
*db
, robj
*key
) {
5202 /* No expire? return ASAP */
5203 if (dictSize(db
->expires
) == 0 ||
5204 (de
= dictFind(db
->expires
,key
)) == NULL
) return -1;
5206 return (time_t) dictGetEntryVal(de
);
5209 static int expireIfNeeded(redisDb
*db
, robj
*key
) {
5213 /* No expire? return ASAP */
5214 if (dictSize(db
->expires
) == 0 ||
5215 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
5217 /* Lookup the expire */
5218 when
= (time_t) dictGetEntryVal(de
);
5219 if (time(NULL
) <= when
) return 0;
5221 /* Delete the key */
5222 dictDelete(db
->expires
,key
);
5223 return dictDelete(db
->dict
,key
) == DICT_OK
;
5226 static int deleteIfVolatile(redisDb
*db
, robj
*key
) {
5229 /* No expire? return ASAP */
5230 if (dictSize(db
->expires
) == 0 ||
5231 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
5233 /* Delete the key */
5235 dictDelete(db
->expires
,key
);
5236 return dictDelete(db
->dict
,key
) == DICT_OK
;
5239 static void expireGenericCommand(redisClient
*c
, robj
*key
, time_t seconds
) {
5242 de
= dictFind(c
->db
->dict
,key
);
5244 addReply(c
,shared
.czero
);
5248 if (deleteKey(c
->db
,key
)) server
.dirty
++;
5249 addReply(c
, shared
.cone
);
5252 time_t when
= time(NULL
)+seconds
;
5253 if (setExpire(c
->db
,key
,when
)) {
5254 addReply(c
,shared
.cone
);
5257 addReply(c
,shared
.czero
);
5263 static void expireCommand(redisClient
*c
) {
5264 expireGenericCommand(c
,c
->argv
[1],strtol(c
->argv
[2]->ptr
,NULL
,10));
5267 static void expireatCommand(redisClient
*c
) {
5268 expireGenericCommand(c
,c
->argv
[1],strtol(c
->argv
[2]->ptr
,NULL
,10)-time(NULL
));
5271 static void ttlCommand(redisClient
*c
) {
5275 expire
= getExpire(c
->db
,c
->argv
[1]);
5277 ttl
= (int) (expire
-time(NULL
));
5278 if (ttl
< 0) ttl
= -1;
5280 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",ttl
));
5283 /* =============================== Replication ============================= */
5285 static int syncWrite(int fd
, char *ptr
, ssize_t size
, int timeout
) {
5286 ssize_t nwritten
, ret
= size
;
5287 time_t start
= time(NULL
);
5291 if (aeWait(fd
,AE_WRITABLE
,1000) & AE_WRITABLE
) {
5292 nwritten
= write(fd
,ptr
,size
);
5293 if (nwritten
== -1) return -1;
5297 if ((time(NULL
)-start
) > timeout
) {
5305 static int syncRead(int fd
, char *ptr
, ssize_t size
, int timeout
) {
5306 ssize_t nread
, totread
= 0;
5307 time_t start
= time(NULL
);
5311 if (aeWait(fd
,AE_READABLE
,1000) & AE_READABLE
) {
5312 nread
= read(fd
,ptr
,size
);
5313 if (nread
== -1) return -1;
5318 if ((time(NULL
)-start
) > timeout
) {
5326 static int syncReadLine(int fd
, char *ptr
, ssize_t size
, int timeout
) {
5333 if (syncRead(fd
,&c
,1,timeout
) == -1) return -1;
5336 if (nread
&& *(ptr
-1) == '\r') *(ptr
-1) = '\0';
5347 static void syncCommand(redisClient
*c
) {
5348 /* ignore SYNC if aleady slave or in monitor mode */
5349 if (c
->flags
& REDIS_SLAVE
) return;
5351 /* SYNC can't be issued when the server has pending data to send to
5352 * the client about already issued commands. We need a fresh reply
5353 * buffer registering the differences between the BGSAVE and the current
5354 * dataset, so that we can copy to other slaves if needed. */
5355 if (listLength(c
->reply
) != 0) {
5356 addReplySds(c
,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
5360 redisLog(REDIS_NOTICE
,"Slave ask for synchronization");
5361 /* Here we need to check if there is a background saving operation
5362 * in progress, or if it is required to start one */
5363 if (server
.bgsavechildpid
!= -1) {
5364 /* Ok a background save is in progress. Let's check if it is a good
5365 * one for replication, i.e. if there is another slave that is
5366 * registering differences since the server forked to save */
5370 listRewind(server
.slaves
);
5371 while((ln
= listYield(server
.slaves
))) {
5373 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) break;
5376 /* Perfect, the server is already registering differences for
5377 * another slave. Set the right state, and copy the buffer. */
5378 listRelease(c
->reply
);
5379 c
->reply
= listDup(slave
->reply
);
5380 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
5381 redisLog(REDIS_NOTICE
,"Waiting for end of BGSAVE for SYNC");
5383 /* No way, we need to wait for the next BGSAVE in order to
5384 * register differences */
5385 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_START
;
5386 redisLog(REDIS_NOTICE
,"Waiting for next BGSAVE for SYNC");
5389 /* Ok we don't have a BGSAVE in progress, let's start one */
5390 redisLog(REDIS_NOTICE
,"Starting BGSAVE for SYNC");
5391 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
5392 redisLog(REDIS_NOTICE
,"Replication failed, can't BGSAVE");
5393 addReplySds(c
,sdsnew("-ERR Unalbe to perform background save\r\n"));
5396 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
5399 c
->flags
|= REDIS_SLAVE
;
5401 listAddNodeTail(server
.slaves
,c
);
5405 static void sendBulkToSlave(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
5406 redisClient
*slave
= privdata
;
5408 REDIS_NOTUSED(mask
);
5409 char buf
[REDIS_IOBUF_LEN
];
5410 ssize_t nwritten
, buflen
;
5412 if (slave
->repldboff
== 0) {
5413 /* Write the bulk write count before to transfer the DB. In theory here
5414 * we don't know how much room there is in the output buffer of the
5415 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
5416 * operations) will never be smaller than the few bytes we need. */
5419 bulkcount
= sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
5421 if (write(fd
,bulkcount
,sdslen(bulkcount
)) != (signed)sdslen(bulkcount
))
5429 lseek(slave
->repldbfd
,slave
->repldboff
,SEEK_SET
);
5430 buflen
= read(slave
->repldbfd
,buf
,REDIS_IOBUF_LEN
);
5432 redisLog(REDIS_WARNING
,"Read error sending DB to slave: %s",
5433 (buflen
== 0) ? "premature EOF" : strerror(errno
));
5437 if ((nwritten
= write(fd
,buf
,buflen
)) == -1) {
5438 redisLog(REDIS_DEBUG
,"Write error sending DB to slave: %s",
5443 slave
->repldboff
+= nwritten
;
5444 if (slave
->repldboff
== slave
->repldbsize
) {
5445 close(slave
->repldbfd
);
5446 slave
->repldbfd
= -1;
5447 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
5448 slave
->replstate
= REDIS_REPL_ONLINE
;
5449 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
,
5450 sendReplyToClient
, slave
) == AE_ERR
) {
5454 addReplySds(slave
,sdsempty());
5455 redisLog(REDIS_NOTICE
,"Synchronization with slave succeeded");
5459 /* This function is called at the end of every backgrond saving.
5460 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
5461 * otherwise REDIS_ERR is passed to the function.
5463 * The goal of this function is to handle slaves waiting for a successful
5464 * background saving in order to perform non-blocking synchronization. */
5465 static void updateSlavesWaitingBgsave(int bgsaveerr
) {
5467 int startbgsave
= 0;
5469 listRewind(server
.slaves
);
5470 while((ln
= listYield(server
.slaves
))) {
5471 redisClient
*slave
= ln
->value
;
5473 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) {
5475 slave
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
5476 } else if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) {
5477 struct redis_stat buf
;
5479 if (bgsaveerr
!= REDIS_OK
) {
5481 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE child returned an error");
5484 if ((slave
->repldbfd
= open(server
.dbfilename
,O_RDONLY
)) == -1 ||
5485 redis_fstat(slave
->repldbfd
,&buf
) == -1) {
5487 redisLog(REDIS_WARNING
,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno
));
5490 slave
->repldboff
= 0;
5491 slave
->repldbsize
= buf
.st_size
;
5492 slave
->replstate
= REDIS_REPL_SEND_BULK
;
5493 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
5494 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
, sendBulkToSlave
, slave
) == AE_ERR
) {
5501 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
5502 listRewind(server
.slaves
);
5503 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE failed");
5504 while((ln
= listYield(server
.slaves
))) {
5505 redisClient
*slave
= ln
->value
;
5507 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
)
5514 static int syncWithMaster(void) {
5515 char buf
[1024], tmpfile
[256], authcmd
[1024];
5517 int fd
= anetTcpConnect(NULL
,server
.masterhost
,server
.masterport
);
5521 redisLog(REDIS_WARNING
,"Unable to connect to MASTER: %s",
5526 /* AUTH with the master if required. */
5527 if(server
.masterauth
) {
5528 snprintf(authcmd
, 1024, "AUTH %s\r\n", server
.masterauth
);
5529 if (syncWrite(fd
, authcmd
, strlen(server
.masterauth
)+7, 5) == -1) {
5531 redisLog(REDIS_WARNING
,"Unable to AUTH to MASTER: %s",
5535 /* Read the AUTH result. */
5536 if (syncReadLine(fd
,buf
,1024,3600) == -1) {
5538 redisLog(REDIS_WARNING
,"I/O error reading auth result from MASTER: %s",
5542 if (buf
[0] != '+') {
5544 redisLog(REDIS_WARNING
,"Cannot AUTH to MASTER, is the masterauth password correct?");
5549 /* Issue the SYNC command */
5550 if (syncWrite(fd
,"SYNC \r\n",7,5) == -1) {
5552 redisLog(REDIS_WARNING
,"I/O error writing to MASTER: %s",
5556 /* Read the bulk write count */
5557 if (syncReadLine(fd
,buf
,1024,3600) == -1) {
5559 redisLog(REDIS_WARNING
,"I/O error reading bulk count from MASTER: %s",
5563 if (buf
[0] != '$') {
5565 redisLog(REDIS_WARNING
,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
5568 dumpsize
= atoi(buf
+1);
5569 redisLog(REDIS_NOTICE
,"Receiving %d bytes data dump from MASTER",dumpsize
);
5570 /* Read the bulk write data on a temp file */
5571 snprintf(tmpfile
,256,"temp-%d.%ld.rdb",(int)time(NULL
),(long int)random());
5572 dfd
= open(tmpfile
,O_CREAT
|O_WRONLY
,0644);
5575 redisLog(REDIS_WARNING
,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno
));
5579 int nread
, nwritten
;
5581 nread
= read(fd
,buf
,(dumpsize
< 1024)?dumpsize
:1024);
5583 redisLog(REDIS_WARNING
,"I/O error trying to sync with MASTER: %s",
5589 nwritten
= write(dfd
,buf
,nread
);
5590 if (nwritten
== -1) {
5591 redisLog(REDIS_WARNING
,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno
));
5599 if (rename(tmpfile
,server
.dbfilename
) == -1) {
5600 redisLog(REDIS_WARNING
,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno
));
5606 if (rdbLoad(server
.dbfilename
) != REDIS_OK
) {
5607 redisLog(REDIS_WARNING
,"Failed trying to load the MASTER synchronization DB from disk");
5611 server
.master
= createClient(fd
);
5612 server
.master
->flags
|= REDIS_MASTER
;
5613 server
.master
->authenticated
= 1;
5614 server
.replstate
= REDIS_REPL_CONNECTED
;
5618 static void slaveofCommand(redisClient
*c
) {
5619 if (!strcasecmp(c
->argv
[1]->ptr
,"no") &&
5620 !strcasecmp(c
->argv
[2]->ptr
,"one")) {
5621 if (server
.masterhost
) {
5622 sdsfree(server
.masterhost
);
5623 server
.masterhost
= NULL
;
5624 if (server
.master
) freeClient(server
.master
);
5625 server
.replstate
= REDIS_REPL_NONE
;
5626 redisLog(REDIS_NOTICE
,"MASTER MODE enabled (user request)");
5629 sdsfree(server
.masterhost
);
5630 server
.masterhost
= sdsdup(c
->argv
[1]->ptr
);
5631 server
.masterport
= atoi(c
->argv
[2]->ptr
);
5632 if (server
.master
) freeClient(server
.master
);
5633 server
.replstate
= REDIS_REPL_CONNECT
;
5634 redisLog(REDIS_NOTICE
,"SLAVE OF %s:%d enabled (user request)",
5635 server
.masterhost
, server
.masterport
);
5637 addReply(c
,shared
.ok
);
5640 /* ============================ Maxmemory directive ======================== */
5642 /* This function gets called when 'maxmemory' is set on the config file to limit
5643 * the max memory used by the server, and we are out of memory.
5644 * This function will try to, in order:
5646 * - Free objects from the free list
5647 * - Try to remove keys with an EXPIRE set
5649 * It is not possible to free enough memory to reach used-memory < maxmemory
5650 * the server will start refusing commands that will enlarge even more the
5653 static void freeMemoryIfNeeded(void) {
5654 while (server
.maxmemory
&& zmalloc_used_memory() > server
.maxmemory
) {
5655 if (listLength(server
.objfreelist
)) {
5658 listNode
*head
= listFirst(server
.objfreelist
);
5659 o
= listNodeValue(head
);
5660 listDelNode(server
.objfreelist
,head
);
5663 int j
, k
, freed
= 0;
5665 for (j
= 0; j
< server
.dbnum
; j
++) {
5667 robj
*minkey
= NULL
;
5668 struct dictEntry
*de
;
5670 if (dictSize(server
.db
[j
].expires
)) {
5672 /* From a sample of three keys drop the one nearest to
5673 * the natural expire */
5674 for (k
= 0; k
< 3; k
++) {
5677 de
= dictGetRandomKey(server
.db
[j
].expires
);
5678 t
= (time_t) dictGetEntryVal(de
);
5679 if (minttl
== -1 || t
< minttl
) {
5680 minkey
= dictGetEntryKey(de
);
5684 deleteKey(server
.db
+j
,minkey
);
5687 if (!freed
) return; /* nothing to free... */
5692 /* ============================== Append Only file ========================== */
5694 static void feedAppendOnlyFile(struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
) {
5695 sds buf
= sdsempty();
5701 /* The DB this command was targetting is not the same as the last command
5702 * we appendend. To issue a SELECT command is needed. */
5703 if (dictid
!= server
.appendseldb
) {
5706 snprintf(seldb
,sizeof(seldb
),"%d",dictid
);
5707 buf
= sdscatprintf(buf
,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
5708 (unsigned long)strlen(seldb
),seldb
);
5709 server
.appendseldb
= dictid
;
5712 /* "Fix" the argv vector if the command is EXPIRE. We want to translate
5713 * EXPIREs into EXPIREATs calls */
5714 if (cmd
->proc
== expireCommand
) {
5717 tmpargv
[0] = createStringObject("EXPIREAT",8);
5718 tmpargv
[1] = argv
[1];
5719 incrRefCount(argv
[1]);
5720 when
= time(NULL
)+strtol(argv
[2]->ptr
,NULL
,10);
5721 tmpargv
[2] = createObject(REDIS_STRING
,
5722 sdscatprintf(sdsempty(),"%ld",when
));
5726 /* Append the actual command */
5727 buf
= sdscatprintf(buf
,"*%d\r\n",argc
);
5728 for (j
= 0; j
< argc
; j
++) {
5731 o
= getDecodedObject(o
);
5732 buf
= sdscatprintf(buf
,"$%lu\r\n",(unsigned long)sdslen(o
->ptr
));
5733 buf
= sdscatlen(buf
,o
->ptr
,sdslen(o
->ptr
));
5734 buf
= sdscatlen(buf
,"\r\n",2);
5738 /* Free the objects from the modified argv for EXPIREAT */
5739 if (cmd
->proc
== expireCommand
) {
5740 for (j
= 0; j
< 3; j
++)
5741 decrRefCount(argv
[j
]);
5744 /* We want to perform a single write. This should be guaranteed atomic
5745 * at least if the filesystem we are writing is a real physical one.
5746 * While this will save us against the server being killed I don't think
5747 * there is much to do about the whole server stopping for power problems
5749 nwritten
= write(server
.appendfd
,buf
,sdslen(buf
));
5750 if (nwritten
!= (signed)sdslen(buf
)) {
5751 /* Ooops, we are in troubles. The best thing to do for now is
5752 * to simply exit instead to give the illusion that everything is
5753 * working as expected. */
5754 if (nwritten
== -1) {
5755 redisLog(REDIS_WARNING
,"Exiting on error writing to the append-only file: %s",strerror(errno
));
5757 redisLog(REDIS_WARNING
,"Exiting on short write while writing to the append-only file: %s",strerror(errno
));
5761 /* If a background append only file rewriting is in progress we want to
5762 * accumulate the differences between the child DB and the current one
5763 * in a buffer, so that when the child process will do its work we
5764 * can append the differences to the new append only file. */
5765 if (server
.bgrewritechildpid
!= -1)
5766 server
.bgrewritebuf
= sdscatlen(server
.bgrewritebuf
,buf
,sdslen(buf
));
5770 if (server
.appendfsync
== APPENDFSYNC_ALWAYS
||
5771 (server
.appendfsync
== APPENDFSYNC_EVERYSEC
&&
5772 now
-server
.lastfsync
> 1))
5774 fsync(server
.appendfd
); /* Let's try to get this data on the disk */
5775 server
.lastfsync
= now
;
5779 /* In Redis commands are always executed in the context of a client, so in
5780 * order to load the append only file we need to create a fake client. */
5781 static struct redisClient
*createFakeClient(void) {
5782 struct redisClient
*c
= zmalloc(sizeof(*c
));
5786 c
->querybuf
= sdsempty();
5790 /* We set the fake client as a slave waiting for the synchronization
5791 * so that Redis will not try to send replies to this client. */
5792 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_START
;
5793 c
->reply
= listCreate();
5794 listSetFreeMethod(c
->reply
,decrRefCount
);
5795 listSetDupMethod(c
->reply
,dupClientReplyValue
);
5799 static void freeFakeClient(struct redisClient
*c
) {
5800 sdsfree(c
->querybuf
);
5801 listRelease(c
->reply
);
5805 /* Replay the append log file. On error REDIS_OK is returned. On non fatal
5806 * error (the append only file is zero-length) REDIS_ERR is returned. On
5807 * fatal error an error message is logged and the program exists. */
5808 int loadAppendOnlyFile(char *filename
) {
5809 struct redisClient
*fakeClient
;
5810 FILE *fp
= fopen(filename
,"r");
5811 struct redis_stat sb
;
5813 if (redis_fstat(fileno(fp
),&sb
) != -1 && sb
.st_size
== 0)
5817 redisLog(REDIS_WARNING
,"Fatal error: can't open the append log file for reading: %s",strerror(errno
));
5821 fakeClient
= createFakeClient();
5828 struct redisCommand
*cmd
;
5830 if (fgets(buf
,sizeof(buf
),fp
) == NULL
) {
5836 if (buf
[0] != '*') goto fmterr
;
5838 argv
= zmalloc(sizeof(robj
*)*argc
);
5839 for (j
= 0; j
< argc
; j
++) {
5840 if (fgets(buf
,sizeof(buf
),fp
) == NULL
) goto readerr
;
5841 if (buf
[0] != '$') goto fmterr
;
5842 len
= strtol(buf
+1,NULL
,10);
5843 argsds
= sdsnewlen(NULL
,len
);
5844 if (len
&& fread(argsds
,len
,1,fp
) == 0) goto fmterr
;
5845 argv
[j
] = createObject(REDIS_STRING
,argsds
);
5846 if (fread(buf
,2,1,fp
) == 0) goto fmterr
; /* discard CRLF */
5849 /* Command lookup */
5850 cmd
= lookupCommand(argv
[0]->ptr
);
5852 redisLog(REDIS_WARNING
,"Unknown command '%s' reading the append only file", argv
[0]->ptr
);
5855 /* Try object sharing and encoding */
5856 if (server
.shareobjects
) {
5858 for(j
= 1; j
< argc
; j
++)
5859 argv
[j
] = tryObjectSharing(argv
[j
]);
5861 if (cmd
->flags
& REDIS_CMD_BULK
)
5862 tryObjectEncoding(argv
[argc
-1]);
5863 /* Run the command in the context of a fake client */
5864 fakeClient
->argc
= argc
;
5865 fakeClient
->argv
= argv
;
5866 cmd
->proc(fakeClient
);
5867 /* Discard the reply objects list from the fake client */
5868 while(listLength(fakeClient
->reply
))
5869 listDelNode(fakeClient
->reply
,listFirst(fakeClient
->reply
));
5870 /* Clean up, ready for the next command */
5871 for (j
= 0; j
< argc
; j
++) decrRefCount(argv
[j
]);
5875 freeFakeClient(fakeClient
);
5880 redisLog(REDIS_WARNING
,"Unexpected end of file reading the append only file");
5882 redisLog(REDIS_WARNING
,"Unrecoverable error reading the append only file: %s", strerror(errno
));
5886 redisLog(REDIS_WARNING
,"Bad file format reading the append only file");
5890 /* Write an object into a file in the bulk format $<count>\r\n<payload>\r\n */
5891 static int fwriteBulk(FILE *fp
, robj
*obj
) {
5893 obj
= getDecodedObject(obj
);
5894 snprintf(buf
,sizeof(buf
),"$%ld\r\n",(long)sdslen(obj
->ptr
));
5895 if (fwrite(buf
,strlen(buf
),1,fp
) == 0) goto err
;
5896 if (sdslen(obj
->ptr
) && fwrite(obj
->ptr
,sdslen(obj
->ptr
),1,fp
) == 0)
5898 if (fwrite("\r\n",2,1,fp
) == 0) goto err
;
5906 /* Write a double value in bulk format $<count>\r\n<payload>\r\n */
5907 static int fwriteBulkDouble(FILE *fp
, double d
) {
5908 char buf
[128], dbuf
[128];
5910 snprintf(dbuf
,sizeof(dbuf
),"%.17g\r\n",d
);
5911 snprintf(buf
,sizeof(buf
),"$%lu\r\n",(unsigned long)strlen(dbuf
)-2);
5912 if (fwrite(buf
,strlen(buf
),1,fp
) == 0) return 0;
5913 if (fwrite(dbuf
,strlen(dbuf
),1,fp
) == 0) return 0;
5917 /* Write a long value in bulk format $<count>\r\n<payload>\r\n */
5918 static int fwriteBulkLong(FILE *fp
, long l
) {
5919 char buf
[128], lbuf
[128];
5921 snprintf(lbuf
,sizeof(lbuf
),"%ld\r\n",l
);
5922 snprintf(buf
,sizeof(buf
),"$%lu\r\n",(unsigned long)strlen(lbuf
)-2);
5923 if (fwrite(buf
,strlen(buf
),1,fp
) == 0) return 0;
5924 if (fwrite(lbuf
,strlen(lbuf
),1,fp
) == 0) return 0;
5928 /* Write a sequence of commands able to fully rebuild the dataset into
5929 * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */
5930 static int rewriteAppendOnlyFile(char *filename
) {
5931 dictIterator
*di
= NULL
;
5936 time_t now
= time(NULL
);
5938 /* Note that we have to use a different temp name here compared to the
5939 * one used by rewriteAppendOnlyFileBackground() function. */
5940 snprintf(tmpfile
,256,"temp-rewriteaof-%d.aof", (int) getpid());
5941 fp
= fopen(tmpfile
,"w");
5943 redisLog(REDIS_WARNING
, "Failed rewriting the append only file: %s", strerror(errno
));
5946 for (j
= 0; j
< server
.dbnum
; j
++) {
5947 char selectcmd
[] = "*2\r\n$6\r\nSELECT\r\n";
5948 redisDb
*db
= server
.db
+j
;
5950 if (dictSize(d
) == 0) continue;
5951 di
= dictGetIterator(d
);
5957 /* SELECT the new DB */
5958 if (fwrite(selectcmd
,sizeof(selectcmd
)-1,1,fp
) == 0) goto werr
;
5959 if (fwriteBulkLong(fp
,j
) == 0) goto werr
;
5961 /* Iterate this DB writing every entry */
5962 while((de
= dictNext(di
)) != NULL
) {
5963 robj
*key
= dictGetEntryKey(de
);
5964 robj
*o
= dictGetEntryVal(de
);
5965 time_t expiretime
= getExpire(db
,key
);
5967 /* Save the key and associated value */
5968 if (o
->type
== REDIS_STRING
) {
5969 /* Emit a SET command */
5970 char cmd
[]="*3\r\n$3\r\nSET\r\n";
5971 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
5973 if (fwriteBulk(fp
,key
) == 0) goto werr
;
5974 if (fwriteBulk(fp
,o
) == 0) goto werr
;
5975 } else if (o
->type
== REDIS_LIST
) {
5976 /* Emit the RPUSHes needed to rebuild the list */
5977 list
*list
= o
->ptr
;
5981 while((ln
= listYield(list
))) {
5982 char cmd
[]="*3\r\n$5\r\nRPUSH\r\n";
5983 robj
*eleobj
= listNodeValue(ln
);
5985 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
5986 if (fwriteBulk(fp
,key
) == 0) goto werr
;
5987 if (fwriteBulk(fp
,eleobj
) == 0) goto werr
;
5989 } else if (o
->type
== REDIS_SET
) {
5990 /* Emit the SADDs needed to rebuild the set */
5992 dictIterator
*di
= dictGetIterator(set
);
5995 while((de
= dictNext(di
)) != NULL
) {
5996 char cmd
[]="*3\r\n$4\r\nSADD\r\n";
5997 robj
*eleobj
= dictGetEntryKey(de
);
5999 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6000 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6001 if (fwriteBulk(fp
,eleobj
) == 0) goto werr
;
6003 dictReleaseIterator(di
);
6004 } else if (o
->type
== REDIS_ZSET
) {
6005 /* Emit the ZADDs needed to rebuild the sorted set */
6007 dictIterator
*di
= dictGetIterator(zs
->dict
);
6010 while((de
= dictNext(di
)) != NULL
) {
6011 char cmd
[]="*4\r\n$4\r\nZADD\r\n";
6012 robj
*eleobj
= dictGetEntryKey(de
);
6013 double *score
= dictGetEntryVal(de
);
6015 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6016 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6017 if (fwriteBulkDouble(fp
,*score
) == 0) goto werr
;
6018 if (fwriteBulk(fp
,eleobj
) == 0) goto werr
;
6020 dictReleaseIterator(di
);
6022 redisAssert(0 != 0);
6024 /* Save the expire time */
6025 if (expiretime
!= -1) {
6026 char cmd
[]="*3\r\n$8\r\nEXPIREAT\r\n";
6027 /* If this key is already expired skip it */
6028 if (expiretime
< now
) continue;
6029 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6030 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6031 if (fwriteBulkLong(fp
,expiretime
) == 0) goto werr
;
6034 dictReleaseIterator(di
);
6037 /* Make sure data will not remain on the OS's output buffers */
6042 /* Use RENAME to make sure the DB file is changed atomically only
6043 * if the generate DB file is ok. */
6044 if (rename(tmpfile
,filename
) == -1) {
6045 redisLog(REDIS_WARNING
,"Error moving temp append only file on the final destination: %s", strerror(errno
));
6049 redisLog(REDIS_NOTICE
,"SYNC append only file rewrite performed");
6055 redisLog(REDIS_WARNING
,"Write error writing append only file on disk: %s", strerror(errno
));
6056 if (di
) dictReleaseIterator(di
);
6060 /* This is how rewriting of the append only file in background works:
6062 * 1) The user calls BGREWRITEAOF
6063 * 2) Redis calls this function, that forks():
6064 * 2a) the child rewrite the append only file in a temp file.
6065 * 2b) the parent accumulates differences in server.bgrewritebuf.
6066 * 3) When the child finished '2a' exists.
6067 * 4) The parent will trap the exit code, if it's OK, will append the
6068 * data accumulated into server.bgrewritebuf into the temp file, and
6069 * finally will rename(2) the temp file in the actual file name.
6070 * The the new file is reopened as the new append only file. Profit!
6072 static int rewriteAppendOnlyFileBackground(void) {
6075 if (server
.bgrewritechildpid
!= -1) return REDIS_ERR
;
6076 if ((childpid
= fork()) == 0) {
6081 snprintf(tmpfile
,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
6082 if (rewriteAppendOnlyFile(tmpfile
) == REDIS_OK
) {
6089 if (childpid
== -1) {
6090 redisLog(REDIS_WARNING
,
6091 "Can't rewrite append only file in background: fork: %s",
6095 redisLog(REDIS_NOTICE
,
6096 "Background append only file rewriting started by pid %d",childpid
);
6097 server
.bgrewritechildpid
= childpid
;
6098 /* We set appendseldb to -1 in order to force the next call to the
6099 * feedAppendOnlyFile() to issue a SELECT command, so the differences
6100 * accumulated by the parent into server.bgrewritebuf will start
6101 * with a SELECT statement and it will be safe to merge. */
6102 server
.appendseldb
= -1;
6105 return REDIS_OK
; /* unreached */
6108 static void bgrewriteaofCommand(redisClient
*c
) {
6109 if (server
.bgrewritechildpid
!= -1) {
6110 addReplySds(c
,sdsnew("-ERR background append only file rewriting already in progress\r\n"));
6113 if (rewriteAppendOnlyFileBackground() == REDIS_OK
) {
6114 char *status
= "+Background append only file rewriting started\r\n";
6115 addReplySds(c
,sdsnew(status
));
6117 addReply(c
,shared
.err
);
6121 static void aofRemoveTempFile(pid_t childpid
) {
6124 snprintf(tmpfile
,256,"temp-rewriteaof-bg-%d.aof", (int) childpid
);
6128 /* ================================= Debugging ============================== */
6130 static void debugCommand(redisClient
*c
) {
6131 if (!strcasecmp(c
->argv
[1]->ptr
,"segfault")) {
6133 } else if (!strcasecmp(c
->argv
[1]->ptr
,"reload")) {
6134 if (rdbSave(server
.dbfilename
) != REDIS_OK
) {
6135 addReply(c
,shared
.err
);
6139 if (rdbLoad(server
.dbfilename
) != REDIS_OK
) {
6140 addReply(c
,shared
.err
);
6143 redisLog(REDIS_WARNING
,"DB reloaded by DEBUG RELOAD");
6144 addReply(c
,shared
.ok
);
6145 } else if (!strcasecmp(c
->argv
[1]->ptr
,"loadaof")) {
6147 if (loadAppendOnlyFile(server
.appendfilename
) != REDIS_OK
) {
6148 addReply(c
,shared
.err
);
6151 redisLog(REDIS_WARNING
,"Append Only File loaded by DEBUG LOADAOF");
6152 addReply(c
,shared
.ok
);
6153 } else if (!strcasecmp(c
->argv
[1]->ptr
,"object") && c
->argc
== 3) {
6154 dictEntry
*de
= dictFind(c
->db
->dict
,c
->argv
[2]);
6158 addReply(c
,shared
.nokeyerr
);
6161 key
= dictGetEntryKey(de
);
6162 val
= dictGetEntryVal(de
);
6163 addReplySds(c
,sdscatprintf(sdsempty(),
6164 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
6165 (void*)key
, key
->refcount
, (void*)val
, val
->refcount
,
6168 addReplySds(c
,sdsnew(
6169 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>|RELOAD]\r\n"));
6173 static void _redisAssert(char *estr
) {
6174 redisLog(REDIS_WARNING
,"=== ASSERTION FAILED ===");
6175 redisLog(REDIS_WARNING
,"==> %s\n",estr
);
6176 #ifdef HAVE_BACKTRACE
6177 redisLog(REDIS_WARNING
,"(forcing SIGSEGV in order to print the stack trace)");
6182 /* =================================== Main! ================================ */
6185 int linuxOvercommitMemoryValue(void) {
6186 FILE *fp
= fopen("/proc/sys/vm/overcommit_memory","r");
6190 if (fgets(buf
,64,fp
) == NULL
) {
6199 void linuxOvercommitMemoryWarning(void) {
6200 if (linuxOvercommitMemoryValue() == 0) {
6201 redisLog(REDIS_WARNING
,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
6204 #endif /* __linux__ */
6206 static void daemonize(void) {
6210 if (fork() != 0) exit(0); /* parent exits */
6211 printf("New pid: %d\n", getpid());
6212 setsid(); /* create a new session */
6214 /* Every output goes to /dev/null. If Redis is daemonized but
6215 * the 'logfile' is set to 'stdout' in the configuration file
6216 * it will not log at all. */
6217 if ((fd
= open("/dev/null", O_RDWR
, 0)) != -1) {
6218 dup2(fd
, STDIN_FILENO
);
6219 dup2(fd
, STDOUT_FILENO
);
6220 dup2(fd
, STDERR_FILENO
);
6221 if (fd
> STDERR_FILENO
) close(fd
);
6223 /* Try to write the pid file */
6224 fp
= fopen(server
.pidfile
,"w");
6226 fprintf(fp
,"%d\n",getpid());
6231 int main(int argc
, char **argv
) {
6234 resetServerSaveParams();
6235 loadServerConfig(argv
[1]);
6236 } else if (argc
> 2) {
6237 fprintf(stderr
,"Usage: ./redis-server [/path/to/redis.conf]\n");
6240 redisLog(REDIS_WARNING
,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
6242 if (server
.daemonize
) daemonize();
6244 redisLog(REDIS_NOTICE
,"Server started, Redis version " REDIS_VERSION
);
6246 linuxOvercommitMemoryWarning();
6248 if (server
.appendonly
) {
6249 if (loadAppendOnlyFile(server
.appendfilename
) == REDIS_OK
)
6250 redisLog(REDIS_NOTICE
,"DB loaded from append only file");
6252 if (rdbLoad(server
.dbfilename
) == REDIS_OK
)
6253 redisLog(REDIS_NOTICE
,"DB loaded from disk");
6255 if (aeCreateFileEvent(server
.el
, server
.fd
, AE_READABLE
,
6256 acceptHandler
, NULL
) == AE_ERR
) oom("creating file event");
6257 redisLog(REDIS_NOTICE
,"The server is now ready to accept connections on port %d", server
.port
);
6259 aeDeleteEventLoop(server
.el
);
6263 /* ============================= Backtrace support ========================= */
6265 #ifdef HAVE_BACKTRACE
6266 static char *findFuncName(void *pointer
, unsigned long *offset
);
6268 static void *getMcontextEip(ucontext_t
*uc
) {
6269 #if defined(__FreeBSD__)
6270 return (void*) uc
->uc_mcontext
.mc_eip
;
6271 #elif defined(__dietlibc__)
6272 return (void*) uc
->uc_mcontext
.eip
;
6273 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
6275 return (void*) uc
->uc_mcontext
->__ss
.__rip
;
6277 return (void*) uc
->uc_mcontext
->__ss
.__eip
;
6279 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
6280 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
6281 return (void*) uc
->uc_mcontext
->__ss
.__rip
;
6283 return (void*) uc
->uc_mcontext
->__ss
.__eip
;
6285 #elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
6286 return (void*) uc
->uc_mcontext
.gregs
[REG_EIP
];
6287 #elif defined(__ia64__) /* Linux IA64 */
6288 return (void*) uc
->uc_mcontext
.sc_ip
;
6294 static void segvHandler(int sig
, siginfo_t
*info
, void *secret
) {
6296 char **messages
= NULL
;
6297 int i
, trace_size
= 0;
6298 unsigned long offset
=0;
6299 ucontext_t
*uc
= (ucontext_t
*) secret
;
6301 REDIS_NOTUSED(info
);
6303 redisLog(REDIS_WARNING
,
6304 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION
, sig
);
6305 infostring
= genRedisInfoString();
6306 redisLog(REDIS_WARNING
, "%s",infostring
);
6307 /* It's not safe to sdsfree() the returned string under memory
6308 * corruption conditions. Let it leak as we are going to abort */
6310 trace_size
= backtrace(trace
, 100);
6311 /* overwrite sigaction with caller's address */
6312 if (getMcontextEip(uc
) != NULL
) {
6313 trace
[1] = getMcontextEip(uc
);
6315 messages
= backtrace_symbols(trace
, trace_size
);
6317 for (i
=1; i
<trace_size
; ++i
) {
6318 char *fn
= findFuncName(trace
[i
], &offset
), *p
;
6320 p
= strchr(messages
[i
],'+');
6321 if (!fn
|| (p
&& ((unsigned long)strtol(p
+1,NULL
,10)) < offset
)) {
6322 redisLog(REDIS_WARNING
,"%s", messages
[i
]);
6324 redisLog(REDIS_WARNING
,"%d redis-server %p %s + %d", i
, trace
[i
], fn
, (unsigned int)offset
);
6327 // free(messages); Don't call free() with possibly corrupted memory.
6331 static void setupSigSegvAction(void) {
6332 struct sigaction act
;
6334 sigemptyset (&act
.sa_mask
);
6335 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
6336 * is used. Otherwise, sa_handler is used */
6337 act
.sa_flags
= SA_NODEFER
| SA_ONSTACK
| SA_RESETHAND
| SA_SIGINFO
;
6338 act
.sa_sigaction
= segvHandler
;
6339 sigaction (SIGSEGV
, &act
, NULL
);
6340 sigaction (SIGBUS
, &act
, NULL
);
6341 sigaction (SIGFPE
, &act
, NULL
);
6342 sigaction (SIGILL
, &act
, NULL
);
6343 sigaction (SIGBUS
, &act
, NULL
);
6347 #include "staticsymbols.h"
6348 /* This function try to convert a pointer into a function name. It's used in
6349 * oreder to provide a backtrace under segmentation fault that's able to
6350 * display functions declared as static (otherwise the backtrace is useless). */
6351 static char *findFuncName(void *pointer
, unsigned long *offset
){
6353 unsigned long off
, minoff
= 0;
6355 /* Try to match against the Symbol with the smallest offset */
6356 for (i
=0; symsTable
[i
].pointer
; i
++) {
6357 unsigned long lp
= (unsigned long) pointer
;
6359 if (lp
!= (unsigned long)-1 && lp
>= symsTable
[i
].pointer
) {
6360 off
=lp
-symsTable
[i
].pointer
;
6361 if (ret
< 0 || off
< minoff
) {
6367 if (ret
== -1) return NULL
;
6369 return symsTable
[ret
].name
;
6371 #else /* HAVE_BACKTRACE */
6372 static void setupSigSegvAction(void) {
6374 #endif /* HAVE_BACKTRACE */