2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
30 #define REDIS_VERSION "1.07"
40 #define __USE_POSIX199309
46 #endif /* HAVE_BACKTRACE */
54 #include <arpa/inet.h>
58 #include <sys/resource.h>
64 #include "solarisfixes.h"
68 #include "ae.h" /* Event driven programming library */
69 #include "sds.h" /* Dynamic safe strings */
70 #include "anet.h" /* Networking the easy way */
71 #include "dict.h" /* Hash tables */
72 #include "adlist.h" /* Linked lists */
73 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
74 #include "lzf.h" /* LZF compression library */
75 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
81 /* Static server configuration */
82 #define REDIS_SERVERPORT 6379 /* TCP port */
83 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
84 #define REDIS_IOBUF_LEN 1024
85 #define REDIS_LOADBUF_LEN 1024
86 #define REDIS_STATIC_ARGS 4
87 #define REDIS_DEFAULT_DBNUM 16
88 #define REDIS_CONFIGLINE_MAX 1024
89 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
90 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
91 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
92 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
93 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
95 /* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */
96 #define REDIS_WRITEV_THRESHOLD 3
97 /* Max number of iovecs used for each writev call */
98 #define REDIS_WRITEV_IOVEC_COUNT 256
100 /* Hash table parameters */
101 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
104 #define REDIS_CMD_BULK 1 /* Bulk write command */
105 #define REDIS_CMD_INLINE 2 /* Inline command */
106 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
107 this flags will return an error when the 'maxmemory' option is set in the
108 config file and the server is using more than maxmemory bytes of memory.
109 In short this commands are denied on low memory conditions. */
110 #define REDIS_CMD_DENYOOM 4
113 #define REDIS_STRING 0
119 /* Objects encoding */
120 #define REDIS_ENCODING_RAW 0 /* Raw representation */
121 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
123 /* Object types only used for dumping to disk */
124 #define REDIS_EXPIRETIME 253
125 #define REDIS_SELECTDB 254
126 #define REDIS_EOF 255
128 /* Defines related to the dump file format. To store 32 bits lengths for short
129 * keys requires a lot of space, so we check the most significant 2 bits of
130 * the first byte to interpreter the length:
132 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
133 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
134 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
135 * 11|000000 this means: specially encoded object will follow. The six bits
136 * number specify the kind of object that follows.
137 * See the REDIS_RDB_ENC_* defines.
139 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
140 * values, will fit inside. */
141 #define REDIS_RDB_6BITLEN 0
142 #define REDIS_RDB_14BITLEN 1
143 #define REDIS_RDB_32BITLEN 2
144 #define REDIS_RDB_ENCVAL 3
145 #define REDIS_RDB_LENERR UINT_MAX
147 /* When a length of a string object stored on disk has the first two bits
148 * set, the remaining two bits specify a special encoding for the object
149 * accordingly to the following defines: */
150 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
151 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
152 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
153 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
156 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
157 #define REDIS_SLAVE 2 /* This client is a slave server */
158 #define REDIS_MASTER 4 /* This client is a master server */
159 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
161 /* Slave replication state - slave side */
162 #define REDIS_REPL_NONE 0 /* No active replication */
163 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
164 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
166 /* Slave replication state - from the point of view of master
167 * Note that in SEND_BULK and ONLINE state the slave receives new updates
168 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
169 * to start the next background saving in order to send updates to it. */
170 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
171 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
172 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
173 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
175 /* List related stuff */
179 /* Sort operations */
180 #define REDIS_SORT_GET 0
181 #define REDIS_SORT_ASC 1
182 #define REDIS_SORT_DESC 2
183 #define REDIS_SORTKEY_MAX 1024
186 #define REDIS_DEBUG 0
187 #define REDIS_NOTICE 1
188 #define REDIS_WARNING 2
190 /* Anti-warning macro... */
191 #define REDIS_NOTUSED(V) ((void) V)
193 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
194 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
196 /* Append only defines */
197 #define APPENDFSYNC_NO 0
198 #define APPENDFSYNC_ALWAYS 1
199 #define APPENDFSYNC_EVERYSEC 2
201 /*================================= Data types ============================== */
203 /* A redis object, that is a type able to hold a string / list / set */
204 typedef struct redisObject
{
207 unsigned char encoding
;
208 unsigned char notused
[2];
212 typedef struct redisDb
{
218 /* With multiplexing we need to take per-clinet state.
219 * Clients are taken in a liked list. */
220 typedef struct redisClient
{
225 robj
**argv
, **mbargv
;
227 int bulklen
; /* bulk read len. -1 if not in bulk read mode */
228 int multibulk
; /* multi bulk command format active */
231 time_t lastinteraction
; /* time of the last interaction, used for timeout */
232 int flags
; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
233 int slaveseldb
; /* slave selected db, if this client is a slave */
234 int authenticated
; /* when requirepass is non-NULL */
235 int replstate
; /* replication state if this is a slave */
236 int repldbfd
; /* replication DB file descriptor */
237 long repldboff
; /* replication DB file offset */
238 off_t repldbsize
; /* replication DB file size */
246 /* Global server state structure */
252 unsigned int sharingpoolsize
;
253 long long dirty
; /* changes to DB from the last save */
255 list
*slaves
, *monitors
;
256 char neterr
[ANET_ERR_LEN
];
258 int cronloops
; /* number of times the cron function run */
259 list
*objfreelist
; /* A list of freed objects to avoid malloc() */
260 time_t lastsave
; /* Unix time of last save succeeede */
261 size_t usedmemory
; /* Used memory in megabytes */
262 /* Fields used only for stats */
263 time_t stat_starttime
; /* server start time */
264 long long stat_numcommands
; /* number of processed commands */
265 long long stat_numconnections
; /* number of connections received */
278 pid_t bgsavechildpid
;
279 pid_t bgrewritechildpid
;
280 sds bgrewritebuf
; /* buffer taken by parent during oppend only rewrite */
281 struct saveparam
*saveparams
;
286 char *appendfilename
;
289 /* Replication related */
294 redisClient
*master
; /* client that is master for this slave */
296 unsigned int maxclients
;
297 unsigned long maxmemory
;
298 /* Sort parameters - qsort_r() is only available under BSD so we
299 * have to take this state global, in order to pass it to sortCompare() */
305 typedef void redisCommandProc(redisClient
*c
);
306 struct redisCommand
{
308 redisCommandProc
*proc
;
313 struct redisFunctionSym
{
315 unsigned long pointer
;
318 typedef struct _redisSortObject
{
326 typedef struct _redisSortOperation
{
329 } redisSortOperation
;
331 /* ZSETs use a specialized version of Skiplists */
333 typedef struct zskiplistNode
{
334 struct zskiplistNode
**forward
;
335 struct zskiplistNode
*backward
;
340 typedef struct zskiplist
{
341 struct zskiplistNode
*header
, *tail
;
342 unsigned long length
;
346 typedef struct zset
{
351 /* Our shared "common" objects */
353 struct sharedObjectsStruct
{
354 robj
*crlf
, *ok
, *err
, *emptybulk
, *czero
, *cone
, *pong
, *space
,
355 *colon
, *nullbulk
, *nullmultibulk
,
356 *emptymultibulk
, *wrongtypeerr
, *nokeyerr
, *syntaxerr
, *sameobjecterr
,
357 *outofrangeerr
, *plus
,
358 *select0
, *select1
, *select2
, *select3
, *select4
,
359 *select5
, *select6
, *select7
, *select8
, *select9
;
362 /* Global vars that are actally used as constants. The following double
363 * values are used for double on-disk serialization, and are initialized
364 * at runtime to avoid strange compiler optimizations. */
366 static double R_Zero
, R_PosInf
, R_NegInf
, R_Nan
;
368 /*================================ Prototypes =============================== */
370 static void freeStringObject(robj
*o
);
371 static void freeListObject(robj
*o
);
372 static void freeSetObject(robj
*o
);
373 static void decrRefCount(void *o
);
374 static robj
*createObject(int type
, void *ptr
);
375 static void freeClient(redisClient
*c
);
376 static int rdbLoad(char *filename
);
377 static void addReply(redisClient
*c
, robj
*obj
);
378 static void addReplySds(redisClient
*c
, sds s
);
379 static void incrRefCount(robj
*o
);
380 static int rdbSaveBackground(char *filename
);
381 static robj
*createStringObject(char *ptr
, size_t len
);
382 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
);
383 static void feedAppendOnlyFile(struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
);
384 static int syncWithMaster(void);
385 static robj
*tryObjectSharing(robj
*o
);
386 static int tryObjectEncoding(robj
*o
);
387 static robj
*getDecodedObject(robj
*o
);
388 static int removeExpire(redisDb
*db
, robj
*key
);
389 static int expireIfNeeded(redisDb
*db
, robj
*key
);
390 static int deleteIfVolatile(redisDb
*db
, robj
*key
);
391 static int deleteKey(redisDb
*db
, robj
*key
);
392 static time_t getExpire(redisDb
*db
, robj
*key
);
393 static int setExpire(redisDb
*db
, robj
*key
, time_t when
);
394 static void updateSlavesWaitingBgsave(int bgsaveerr
);
395 static void freeMemoryIfNeeded(void);
396 static int processCommand(redisClient
*c
);
397 static void setupSigSegvAction(void);
398 static void rdbRemoveTempFile(pid_t childpid
);
399 static void aofRemoveTempFile(pid_t childpid
);
400 static size_t stringObjectLen(robj
*o
);
401 static void processInputBuffer(redisClient
*c
);
402 static zskiplist
*zslCreate(void);
403 static void zslFree(zskiplist
*zsl
);
404 static void zslInsert(zskiplist
*zsl
, double score
, robj
*obj
);
405 static void sendReplyToClientWritev(aeEventLoop
*el
, int fd
, void *privdata
, int mask
);
407 static void authCommand(redisClient
*c
);
408 static void pingCommand(redisClient
*c
);
409 static void echoCommand(redisClient
*c
);
410 static void setCommand(redisClient
*c
);
411 static void setnxCommand(redisClient
*c
);
412 static void getCommand(redisClient
*c
);
413 static void delCommand(redisClient
*c
);
414 static void existsCommand(redisClient
*c
);
415 static void incrCommand(redisClient
*c
);
416 static void decrCommand(redisClient
*c
);
417 static void incrbyCommand(redisClient
*c
);
418 static void decrbyCommand(redisClient
*c
);
419 static void selectCommand(redisClient
*c
);
420 static void randomkeyCommand(redisClient
*c
);
421 static void keysCommand(redisClient
*c
);
422 static void dbsizeCommand(redisClient
*c
);
423 static void lastsaveCommand(redisClient
*c
);
424 static void saveCommand(redisClient
*c
);
425 static void bgsaveCommand(redisClient
*c
);
426 static void bgrewriteaofCommand(redisClient
*c
);
427 static void shutdownCommand(redisClient
*c
);
428 static void moveCommand(redisClient
*c
);
429 static void renameCommand(redisClient
*c
);
430 static void renamenxCommand(redisClient
*c
);
431 static void lpushCommand(redisClient
*c
);
432 static void rpushCommand(redisClient
*c
);
433 static void lpopCommand(redisClient
*c
);
434 static void rpopCommand(redisClient
*c
);
435 static void llenCommand(redisClient
*c
);
436 static void lindexCommand(redisClient
*c
);
437 static void lrangeCommand(redisClient
*c
);
438 static void ltrimCommand(redisClient
*c
);
439 static void typeCommand(redisClient
*c
);
440 static void lsetCommand(redisClient
*c
);
441 static void saddCommand(redisClient
*c
);
442 static void sremCommand(redisClient
*c
);
443 static void smoveCommand(redisClient
*c
);
444 static void sismemberCommand(redisClient
*c
);
445 static void scardCommand(redisClient
*c
);
446 static void spopCommand(redisClient
*c
);
447 static void srandmemberCommand(redisClient
*c
);
448 static void sinterCommand(redisClient
*c
);
449 static void sinterstoreCommand(redisClient
*c
);
450 static void sunionCommand(redisClient
*c
);
451 static void sunionstoreCommand(redisClient
*c
);
452 static void sdiffCommand(redisClient
*c
);
453 static void sdiffstoreCommand(redisClient
*c
);
454 static void syncCommand(redisClient
*c
);
455 static void flushdbCommand(redisClient
*c
);
456 static void flushallCommand(redisClient
*c
);
457 static void sortCommand(redisClient
*c
);
458 static void lremCommand(redisClient
*c
);
459 static void rpoplpushcommand(redisClient
*c
);
460 static void infoCommand(redisClient
*c
);
461 static void mgetCommand(redisClient
*c
);
462 static void monitorCommand(redisClient
*c
);
463 static void expireCommand(redisClient
*c
);
464 static void expireatCommand(redisClient
*c
);
465 static void getsetCommand(redisClient
*c
);
466 static void ttlCommand(redisClient
*c
);
467 static void slaveofCommand(redisClient
*c
);
468 static void debugCommand(redisClient
*c
);
469 static void msetCommand(redisClient
*c
);
470 static void msetnxCommand(redisClient
*c
);
471 static void zaddCommand(redisClient
*c
);
472 static void zincrbyCommand(redisClient
*c
);
473 static void zrangeCommand(redisClient
*c
);
474 static void zrangebyscoreCommand(redisClient
*c
);
475 static void zrevrangeCommand(redisClient
*c
);
476 static void zcardCommand(redisClient
*c
);
477 static void zremCommand(redisClient
*c
);
478 static void zscoreCommand(redisClient
*c
);
479 static void zremrangebyscoreCommand(redisClient
*c
);
481 /*================================= Globals ================================= */
484 static struct redisServer server
; /* server global state */
485 static struct redisCommand cmdTable
[] = {
486 {"get",getCommand
,2,REDIS_CMD_INLINE
},
487 {"set",setCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
488 {"setnx",setnxCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
489 {"del",delCommand
,-2,REDIS_CMD_INLINE
},
490 {"exists",existsCommand
,2,REDIS_CMD_INLINE
},
491 {"incr",incrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
492 {"decr",decrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
493 {"mget",mgetCommand
,-2,REDIS_CMD_INLINE
},
494 {"rpush",rpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
495 {"lpush",lpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
496 {"rpop",rpopCommand
,2,REDIS_CMD_INLINE
},
497 {"lpop",lpopCommand
,2,REDIS_CMD_INLINE
},
498 {"llen",llenCommand
,2,REDIS_CMD_INLINE
},
499 {"lindex",lindexCommand
,3,REDIS_CMD_INLINE
},
500 {"lset",lsetCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
501 {"lrange",lrangeCommand
,4,REDIS_CMD_INLINE
},
502 {"ltrim",ltrimCommand
,4,REDIS_CMD_INLINE
},
503 {"lrem",lremCommand
,4,REDIS_CMD_BULK
},
504 {"rpoplpush",rpoplpushcommand
,3,REDIS_CMD_BULK
},
505 {"sadd",saddCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
506 {"srem",sremCommand
,3,REDIS_CMD_BULK
},
507 {"smove",smoveCommand
,4,REDIS_CMD_BULK
},
508 {"sismember",sismemberCommand
,3,REDIS_CMD_BULK
},
509 {"scard",scardCommand
,2,REDIS_CMD_INLINE
},
510 {"spop",spopCommand
,2,REDIS_CMD_INLINE
},
511 {"srandmember",srandmemberCommand
,2,REDIS_CMD_INLINE
},
512 {"sinter",sinterCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
513 {"sinterstore",sinterstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
514 {"sunion",sunionCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
515 {"sunionstore",sunionstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
516 {"sdiff",sdiffCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
517 {"sdiffstore",sdiffstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
518 {"smembers",sinterCommand
,2,REDIS_CMD_INLINE
},
519 {"zadd",zaddCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
520 {"zincrby",zincrbyCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
521 {"zrem",zremCommand
,3,REDIS_CMD_BULK
},
522 {"zremrangebyscore",zremrangebyscoreCommand
,4,REDIS_CMD_INLINE
},
523 {"zrange",zrangeCommand
,4,REDIS_CMD_INLINE
},
524 {"zrangebyscore",zrangebyscoreCommand
,4,REDIS_CMD_INLINE
},
525 {"zrevrange",zrevrangeCommand
,4,REDIS_CMD_INLINE
},
526 {"zcard",zcardCommand
,2,REDIS_CMD_INLINE
},
527 {"zscore",zscoreCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
528 {"incrby",incrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
529 {"decrby",decrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
530 {"getset",getsetCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
531 {"mset",msetCommand
,-3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
532 {"msetnx",msetnxCommand
,-3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
533 {"randomkey",randomkeyCommand
,1,REDIS_CMD_INLINE
},
534 {"select",selectCommand
,2,REDIS_CMD_INLINE
},
535 {"move",moveCommand
,3,REDIS_CMD_INLINE
},
536 {"rename",renameCommand
,3,REDIS_CMD_INLINE
},
537 {"renamenx",renamenxCommand
,3,REDIS_CMD_INLINE
},
538 {"expire",expireCommand
,3,REDIS_CMD_INLINE
},
539 {"expireat",expireatCommand
,3,REDIS_CMD_INLINE
},
540 {"keys",keysCommand
,2,REDIS_CMD_INLINE
},
541 {"dbsize",dbsizeCommand
,1,REDIS_CMD_INLINE
},
542 {"auth",authCommand
,2,REDIS_CMD_INLINE
},
543 {"ping",pingCommand
,1,REDIS_CMD_INLINE
},
544 {"echo",echoCommand
,2,REDIS_CMD_BULK
},
545 {"save",saveCommand
,1,REDIS_CMD_INLINE
},
546 {"bgsave",bgsaveCommand
,1,REDIS_CMD_INLINE
},
547 {"bgrewriteaof",bgrewriteaofCommand
,1,REDIS_CMD_INLINE
},
548 {"shutdown",shutdownCommand
,1,REDIS_CMD_INLINE
},
549 {"lastsave",lastsaveCommand
,1,REDIS_CMD_INLINE
},
550 {"type",typeCommand
,2,REDIS_CMD_INLINE
},
551 {"sync",syncCommand
,1,REDIS_CMD_INLINE
},
552 {"flushdb",flushdbCommand
,1,REDIS_CMD_INLINE
},
553 {"flushall",flushallCommand
,1,REDIS_CMD_INLINE
},
554 {"sort",sortCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
555 {"info",infoCommand
,1,REDIS_CMD_INLINE
},
556 {"monitor",monitorCommand
,1,REDIS_CMD_INLINE
},
557 {"ttl",ttlCommand
,2,REDIS_CMD_INLINE
},
558 {"slaveof",slaveofCommand
,3,REDIS_CMD_INLINE
},
559 {"debug",debugCommand
,-2,REDIS_CMD_INLINE
},
563 /*============================ Utility functions ============================ */
565 /* Glob-style pattern matching. */
566 int stringmatchlen(const char *pattern
, int patternLen
,
567 const char *string
, int stringLen
, int nocase
)
572 while (pattern
[1] == '*') {
577 return 1; /* match */
579 if (stringmatchlen(pattern
+1, patternLen
-1,
580 string
, stringLen
, nocase
))
581 return 1; /* match */
585 return 0; /* no match */
589 return 0; /* no match */
599 not = pattern
[0] == '^';
606 if (pattern
[0] == '\\') {
609 if (pattern
[0] == string
[0])
611 } else if (pattern
[0] == ']') {
613 } else if (patternLen
== 0) {
617 } else if (pattern
[1] == '-' && patternLen
>= 3) {
618 int start
= pattern
[0];
619 int end
= pattern
[2];
627 start
= tolower(start
);
633 if (c
>= start
&& c
<= end
)
637 if (pattern
[0] == string
[0])
640 if (tolower((int)pattern
[0]) == tolower((int)string
[0]))
650 return 0; /* no match */
656 if (patternLen
>= 2) {
663 if (pattern
[0] != string
[0])
664 return 0; /* no match */
666 if (tolower((int)pattern
[0]) != tolower((int)string
[0]))
667 return 0; /* no match */
675 if (stringLen
== 0) {
676 while(*pattern
== '*') {
683 if (patternLen
== 0 && stringLen
== 0)
688 static void redisLog(int level
, const char *fmt
, ...) {
692 fp
= (server
.logfile
== NULL
) ? stdout
: fopen(server
.logfile
,"a");
696 if (level
>= server
.verbosity
) {
702 strftime(buf
,64,"%d %b %H:%M:%S",localtime(&now
));
703 fprintf(fp
,"%s %c ",buf
,c
[level
]);
704 vfprintf(fp
, fmt
, ap
);
710 if (server
.logfile
) fclose(fp
);
713 /*====================== Hash table type implementation ==================== */
715 /* This is an hash table type that uses the SDS dynamic strings libary as
716 * keys and radis objects as values (objects can hold SDS strings,
719 static void dictVanillaFree(void *privdata
, void *val
)
721 DICT_NOTUSED(privdata
);
725 static int sdsDictKeyCompare(void *privdata
, const void *key1
,
729 DICT_NOTUSED(privdata
);
731 l1
= sdslen((sds
)key1
);
732 l2
= sdslen((sds
)key2
);
733 if (l1
!= l2
) return 0;
734 return memcmp(key1
, key2
, l1
) == 0;
737 static void dictRedisObjectDestructor(void *privdata
, void *val
)
739 DICT_NOTUSED(privdata
);
744 static int dictObjKeyCompare(void *privdata
, const void *key1
,
747 const robj
*o1
= key1
, *o2
= key2
;
748 return sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
751 static unsigned int dictObjHash(const void *key
) {
753 return dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
756 static int dictEncObjKeyCompare(void *privdata
, const void *key1
,
759 robj
*o1
= (robj
*) key1
, *o2
= (robj
*) key2
;
762 o1
= getDecodedObject(o1
);
763 o2
= getDecodedObject(o2
);
764 cmp
= sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
770 static unsigned int dictEncObjHash(const void *key
) {
771 robj
*o
= (robj
*) key
;
773 o
= getDecodedObject(o
);
774 unsigned int hash
= dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
779 static dictType setDictType
= {
780 dictEncObjHash
, /* hash function */
783 dictEncObjKeyCompare
, /* key compare */
784 dictRedisObjectDestructor
, /* key destructor */
785 NULL
/* val destructor */
788 static dictType zsetDictType
= {
789 dictEncObjHash
, /* hash function */
792 dictEncObjKeyCompare
, /* key compare */
793 dictRedisObjectDestructor
, /* key destructor */
794 dictVanillaFree
/* val destructor */
797 static dictType hashDictType
= {
798 dictObjHash
, /* hash function */
801 dictObjKeyCompare
, /* key compare */
802 dictRedisObjectDestructor
, /* key destructor */
803 dictRedisObjectDestructor
/* val destructor */
806 /* ========================= Random utility functions ======================= */
808 /* Redis generally does not try to recover from out of memory conditions
809 * when allocating objects or strings, it is not clear if it will be possible
810 * to report this condition to the client since the networking layer itself
811 * is based on heap allocation for send buffers, so we simply abort.
812 * At least the code will be simpler to read... */
813 static void oom(const char *msg
) {
814 fprintf(stderr
, "%s: Out of memory\n",msg
);
820 /* ====================== Redis server networking stuff ===================== */
821 static void closeTimedoutClients(void) {
824 time_t now
= time(NULL
);
826 listRewind(server
.clients
);
827 while ((ln
= listYield(server
.clients
)) != NULL
) {
828 c
= listNodeValue(ln
);
829 if (!(c
->flags
& REDIS_SLAVE
) && /* no timeout for slaves */
830 !(c
->flags
& REDIS_MASTER
) && /* no timeout for masters */
831 (now
- c
->lastinteraction
> server
.maxidletime
)) {
832 redisLog(REDIS_DEBUG
,"Closing idle client");
838 static int htNeedsResize(dict
*dict
) {
839 long long size
, used
;
841 size
= dictSlots(dict
);
842 used
= dictSize(dict
);
843 return (size
&& used
&& size
> DICT_HT_INITIAL_SIZE
&&
844 (used
*100/size
< REDIS_HT_MINFILL
));
847 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
848 * we resize the hash table to save memory */
849 static void tryResizeHashTables(void) {
852 for (j
= 0; j
< server
.dbnum
; j
++) {
853 if (htNeedsResize(server
.db
[j
].dict
)) {
854 redisLog(REDIS_DEBUG
,"The hash table %d is too sparse, resize it...",j
);
855 dictResize(server
.db
[j
].dict
);
856 redisLog(REDIS_DEBUG
,"Hash table %d resized.",j
);
858 if (htNeedsResize(server
.db
[j
].expires
))
859 dictResize(server
.db
[j
].expires
);
863 /* A background saving child (BGSAVE) terminated its work. Handle this. */
864 void backgroundSaveDoneHandler(int statloc
) {
865 int exitcode
= WEXITSTATUS(statloc
);
866 int bysignal
= WIFSIGNALED(statloc
);
868 if (!bysignal
&& exitcode
== 0) {
869 redisLog(REDIS_NOTICE
,
870 "Background saving terminated with success");
872 server
.lastsave
= time(NULL
);
873 } else if (!bysignal
&& exitcode
!= 0) {
874 redisLog(REDIS_WARNING
, "Background saving error");
876 redisLog(REDIS_WARNING
,
877 "Background saving terminated by signal");
878 rdbRemoveTempFile(server
.bgsavechildpid
);
880 server
.bgsavechildpid
= -1;
881 /* Possibly there are slaves waiting for a BGSAVE in order to be served
882 * (the first stage of SYNC is a bulk transfer of dump.rdb) */
883 updateSlavesWaitingBgsave(exitcode
== 0 ? REDIS_OK
: REDIS_ERR
);
886 /* A background append only file rewriting (BGREWRITEAOF) terminated its work.
888 void backgroundRewriteDoneHandler(int statloc
) {
889 int exitcode
= WEXITSTATUS(statloc
);
890 int bysignal
= WIFSIGNALED(statloc
);
892 if (!bysignal
&& exitcode
== 0) {
896 redisLog(REDIS_NOTICE
,
897 "Background append only file rewriting terminated with success");
898 /* Now it's time to flush the differences accumulated by the parent */
899 snprintf(tmpfile
,256,"temp-rewriteaof-bg-%d.aof", (int) server
.bgrewritechildpid
);
900 fd
= open(tmpfile
,O_WRONLY
|O_APPEND
);
902 redisLog(REDIS_WARNING
, "Not able to open the temp append only file produced by the child: %s", strerror(errno
));
905 /* Flush our data... */
906 if (write(fd
,server
.bgrewritebuf
,sdslen(server
.bgrewritebuf
)) !=
907 (signed) sdslen(server
.bgrewritebuf
)) {
908 redisLog(REDIS_WARNING
, "Error or short write trying to flush the parent diff of the append log file in the child temp file: %s", strerror(errno
));
912 redisLog(REDIS_WARNING
,"Parent diff flushed into the new append log file with success");
913 /* Now our work is to rename the temp file into the stable file. And
914 * switch the file descriptor used by the server for append only. */
915 if (rename(tmpfile
,server
.appendfilename
) == -1) {
916 redisLog(REDIS_WARNING
,"Can't rename the temp append only file into the stable one: %s", strerror(errno
));
920 /* Mission completed... almost */
921 redisLog(REDIS_NOTICE
,"Append only file successfully rewritten.");
922 if (server
.appendfd
!= -1) {
923 /* If append only is actually enabled... */
924 close(server
.appendfd
);
925 server
.appendfd
= fd
;
927 server
.appendseldb
= -1; /* Make sure it will issue SELECT */
928 redisLog(REDIS_NOTICE
,"The new append only file was selected for future appends.");
930 /* If append only is disabled we just generate a dump in this
931 * format. Why not? */
934 } else if (!bysignal
&& exitcode
!= 0) {
935 redisLog(REDIS_WARNING
, "Background append only file rewriting error");
937 redisLog(REDIS_WARNING
,
938 "Background append only file rewriting terminated by signal");
941 sdsfree(server
.bgrewritebuf
);
942 server
.bgrewritebuf
= sdsempty();
943 aofRemoveTempFile(server
.bgrewritechildpid
);
944 server
.bgrewritechildpid
= -1;
947 static int serverCron(struct aeEventLoop
*eventLoop
, long long id
, void *clientData
) {
948 int j
, loops
= server
.cronloops
++;
949 REDIS_NOTUSED(eventLoop
);
951 REDIS_NOTUSED(clientData
);
953 /* Update the global state with the amount of used memory */
954 server
.usedmemory
= zmalloc_used_memory();
956 /* Show some info about non-empty databases */
957 for (j
= 0; j
< server
.dbnum
; j
++) {
958 long long size
, used
, vkeys
;
960 size
= dictSlots(server
.db
[j
].dict
);
961 used
= dictSize(server
.db
[j
].dict
);
962 vkeys
= dictSize(server
.db
[j
].expires
);
963 if (!(loops
% 5) && (used
|| vkeys
)) {
964 redisLog(REDIS_DEBUG
,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j
,used
,vkeys
,size
);
965 /* dictPrintStats(server.dict); */
969 /* We don't want to resize the hash tables while a bacground saving
970 * is in progress: the saving child is created using fork() that is
971 * implemented with a copy-on-write semantic in most modern systems, so
972 * if we resize the HT while there is the saving child at work actually
973 * a lot of memory movements in the parent will cause a lot of pages
975 if (server
.bgsavechildpid
== -1) tryResizeHashTables();
977 /* Show information about connected clients */
979 redisLog(REDIS_DEBUG
,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
980 listLength(server
.clients
)-listLength(server
.slaves
),
981 listLength(server
.slaves
),
983 dictSize(server
.sharingpool
));
986 /* Close connections of timedout clients */
987 if (server
.maxidletime
&& !(loops
% 10))
988 closeTimedoutClients();
990 /* Check if a background saving or AOF rewrite in progress terminated */
991 if (server
.bgsavechildpid
!= -1 || server
.bgrewritechildpid
!= -1) {
995 if ((pid
= wait3(&statloc
,WNOHANG
,NULL
)) != 0) {
996 if (pid
== server
.bgsavechildpid
) {
997 backgroundSaveDoneHandler(statloc
);
999 backgroundRewriteDoneHandler(statloc
);
1003 /* If there is not a background saving in progress check if
1004 * we have to save now */
1005 time_t now
= time(NULL
);
1006 for (j
= 0; j
< server
.saveparamslen
; j
++) {
1007 struct saveparam
*sp
= server
.saveparams
+j
;
1009 if (server
.dirty
>= sp
->changes
&&
1010 now
-server
.lastsave
> sp
->seconds
) {
1011 redisLog(REDIS_NOTICE
,"%d changes in %d seconds. Saving...",
1012 sp
->changes
, sp
->seconds
);
1013 rdbSaveBackground(server
.dbfilename
);
1019 /* Try to expire a few timed out keys. The algorithm used is adaptive and
1020 * will use few CPU cycles if there are few expiring keys, otherwise
1021 * it will get more aggressive to avoid that too much memory is used by
1022 * keys that can be removed from the keyspace. */
1023 for (j
= 0; j
< server
.dbnum
; j
++) {
1025 redisDb
*db
= server
.db
+j
;
1027 /* Continue to expire if at the end of the cycle more than 25%
1028 * of the keys were expired. */
1030 int num
= dictSize(db
->expires
);
1031 time_t now
= time(NULL
);
1034 if (num
> REDIS_EXPIRELOOKUPS_PER_CRON
)
1035 num
= REDIS_EXPIRELOOKUPS_PER_CRON
;
1040 if ((de
= dictGetRandomKey(db
->expires
)) == NULL
) break;
1041 t
= (time_t) dictGetEntryVal(de
);
1043 deleteKey(db
,dictGetEntryKey(de
));
1047 } while (expired
> REDIS_EXPIRELOOKUPS_PER_CRON
/4);
1050 /* Check if we should connect to a MASTER */
1051 if (server
.replstate
== REDIS_REPL_CONNECT
) {
1052 redisLog(REDIS_NOTICE
,"Connecting to MASTER...");
1053 if (syncWithMaster() == REDIS_OK
) {
1054 redisLog(REDIS_NOTICE
,"MASTER <-> SLAVE sync succeeded");
1060 static void createSharedObjects(void) {
1061 shared
.crlf
= createObject(REDIS_STRING
,sdsnew("\r\n"));
1062 shared
.ok
= createObject(REDIS_STRING
,sdsnew("+OK\r\n"));
1063 shared
.err
= createObject(REDIS_STRING
,sdsnew("-ERR\r\n"));
1064 shared
.emptybulk
= createObject(REDIS_STRING
,sdsnew("$0\r\n\r\n"));
1065 shared
.czero
= createObject(REDIS_STRING
,sdsnew(":0\r\n"));
1066 shared
.cone
= createObject(REDIS_STRING
,sdsnew(":1\r\n"));
1067 shared
.nullbulk
= createObject(REDIS_STRING
,sdsnew("$-1\r\n"));
1068 shared
.nullmultibulk
= createObject(REDIS_STRING
,sdsnew("*-1\r\n"));
1069 shared
.emptymultibulk
= createObject(REDIS_STRING
,sdsnew("*0\r\n"));
1071 shared
.pong
= createObject(REDIS_STRING
,sdsnew("+PONG\r\n"));
1072 shared
.wrongtypeerr
= createObject(REDIS_STRING
,sdsnew(
1073 "-ERR Operation against a key holding the wrong kind of value\r\n"));
1074 shared
.nokeyerr
= createObject(REDIS_STRING
,sdsnew(
1075 "-ERR no such key\r\n"));
1076 shared
.syntaxerr
= createObject(REDIS_STRING
,sdsnew(
1077 "-ERR syntax error\r\n"));
1078 shared
.sameobjecterr
= createObject(REDIS_STRING
,sdsnew(
1079 "-ERR source and destination objects are the same\r\n"));
1080 shared
.outofrangeerr
= createObject(REDIS_STRING
,sdsnew(
1081 "-ERR index out of range\r\n"));
1082 shared
.space
= createObject(REDIS_STRING
,sdsnew(" "));
1083 shared
.colon
= createObject(REDIS_STRING
,sdsnew(":"));
1084 shared
.plus
= createObject(REDIS_STRING
,sdsnew("+"));
1085 shared
.select0
= createStringObject("select 0\r\n",10);
1086 shared
.select1
= createStringObject("select 1\r\n",10);
1087 shared
.select2
= createStringObject("select 2\r\n",10);
1088 shared
.select3
= createStringObject("select 3\r\n",10);
1089 shared
.select4
= createStringObject("select 4\r\n",10);
1090 shared
.select5
= createStringObject("select 5\r\n",10);
1091 shared
.select6
= createStringObject("select 6\r\n",10);
1092 shared
.select7
= createStringObject("select 7\r\n",10);
1093 shared
.select8
= createStringObject("select 8\r\n",10);
1094 shared
.select9
= createStringObject("select 9\r\n",10);
1097 static void appendServerSaveParams(time_t seconds
, int changes
) {
1098 server
.saveparams
= zrealloc(server
.saveparams
,sizeof(struct saveparam
)*(server
.saveparamslen
+1));
1099 server
.saveparams
[server
.saveparamslen
].seconds
= seconds
;
1100 server
.saveparams
[server
.saveparamslen
].changes
= changes
;
1101 server
.saveparamslen
++;
1104 static void resetServerSaveParams() {
1105 zfree(server
.saveparams
);
1106 server
.saveparams
= NULL
;
1107 server
.saveparamslen
= 0;
1110 static void initServerConfig() {
1111 server
.dbnum
= REDIS_DEFAULT_DBNUM
;
1112 server
.port
= REDIS_SERVERPORT
;
1113 server
.verbosity
= REDIS_DEBUG
;
1114 server
.maxidletime
= REDIS_MAXIDLETIME
;
1115 server
.saveparams
= NULL
;
1116 server
.logfile
= NULL
; /* NULL = log on standard output */
1117 server
.bindaddr
= NULL
;
1118 server
.glueoutputbuf
= 1;
1119 server
.daemonize
= 0;
1120 server
.appendonly
= 0;
1121 server
.appendfsync
= APPENDFSYNC_ALWAYS
;
1122 server
.lastfsync
= time(NULL
);
1123 server
.appendfd
= -1;
1124 server
.appendseldb
= -1; /* Make sure the first time will not match */
1125 server
.pidfile
= "/var/run/redis.pid";
1126 server
.dbfilename
= "dump.rdb";
1127 server
.appendfilename
= "appendonly.aof";
1128 server
.requirepass
= NULL
;
1129 server
.shareobjects
= 0;
1130 server
.sharingpoolsize
= 1024;
1131 server
.maxclients
= 0;
1132 server
.maxmemory
= 0;
1133 resetServerSaveParams();
1135 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1136 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1137 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1138 /* Replication related */
1140 server
.masterauth
= NULL
;
1141 server
.masterhost
= NULL
;
1142 server
.masterport
= 6379;
1143 server
.master
= NULL
;
1144 server
.replstate
= REDIS_REPL_NONE
;
1146 /* Double constants initialization */
1148 R_PosInf
= 1.0/R_Zero
;
1149 R_NegInf
= -1.0/R_Zero
;
1150 R_Nan
= R_Zero
/R_Zero
;
1153 static void initServer() {
1156 signal(SIGHUP
, SIG_IGN
);
1157 signal(SIGPIPE
, SIG_IGN
);
1158 setupSigSegvAction();
1160 server
.clients
= listCreate();
1161 server
.slaves
= listCreate();
1162 server
.monitors
= listCreate();
1163 server
.objfreelist
= listCreate();
1164 createSharedObjects();
1165 server
.el
= aeCreateEventLoop();
1166 server
.db
= zmalloc(sizeof(redisDb
)*server
.dbnum
);
1167 server
.sharingpool
= dictCreate(&setDictType
,NULL
);
1168 server
.fd
= anetTcpServer(server
.neterr
, server
.port
, server
.bindaddr
);
1169 if (server
.fd
== -1) {
1170 redisLog(REDIS_WARNING
, "Opening TCP port: %s", server
.neterr
);
1173 for (j
= 0; j
< server
.dbnum
; j
++) {
1174 server
.db
[j
].dict
= dictCreate(&hashDictType
,NULL
);
1175 server
.db
[j
].expires
= dictCreate(&setDictType
,NULL
);
1176 server
.db
[j
].id
= j
;
1178 server
.cronloops
= 0;
1179 server
.bgsavechildpid
= -1;
1180 server
.bgrewritechildpid
= -1;
1181 server
.bgrewritebuf
= sdsempty();
1182 server
.lastsave
= time(NULL
);
1184 server
.usedmemory
= 0;
1185 server
.stat_numcommands
= 0;
1186 server
.stat_numconnections
= 0;
1187 server
.stat_starttime
= time(NULL
);
1188 aeCreateTimeEvent(server
.el
, 1, serverCron
, NULL
, NULL
);
1190 if (server
.appendonly
) {
1191 server
.appendfd
= open(server
.appendfilename
,O_WRONLY
|O_APPEND
|O_CREAT
,0644);
1192 if (server
.appendfd
== -1) {
1193 redisLog(REDIS_WARNING
, "Can't open the append-only file: %s",
1200 /* Empty the whole database */
1201 static long long emptyDb() {
1203 long long removed
= 0;
1205 for (j
= 0; j
< server
.dbnum
; j
++) {
1206 removed
+= dictSize(server
.db
[j
].dict
);
1207 dictEmpty(server
.db
[j
].dict
);
1208 dictEmpty(server
.db
[j
].expires
);
1213 static int yesnotoi(char *s
) {
1214 if (!strcasecmp(s
,"yes")) return 1;
1215 else if (!strcasecmp(s
,"no")) return 0;
1219 /* I agree, this is a very rudimental way to load a configuration...
1220 will improve later if the config gets more complex */
1221 static void loadServerConfig(char *filename
) {
1223 char buf
[REDIS_CONFIGLINE_MAX
+1], *err
= NULL
;
1227 if (filename
[0] == '-' && filename
[1] == '\0')
1230 if ((fp
= fopen(filename
,"r")) == NULL
) {
1231 redisLog(REDIS_WARNING
,"Fatal error, can't open config file");
1236 while(fgets(buf
,REDIS_CONFIGLINE_MAX
+1,fp
) != NULL
) {
1242 line
= sdstrim(line
," \t\r\n");
1244 /* Skip comments and blank lines*/
1245 if (line
[0] == '#' || line
[0] == '\0') {
1250 /* Split into arguments */
1251 argv
= sdssplitlen(line
,sdslen(line
)," ",1,&argc
);
1252 sdstolower(argv
[0]);
1254 /* Execute config directives */
1255 if (!strcasecmp(argv
[0],"timeout") && argc
== 2) {
1256 server
.maxidletime
= atoi(argv
[1]);
1257 if (server
.maxidletime
< 0) {
1258 err
= "Invalid timeout value"; goto loaderr
;
1260 } else if (!strcasecmp(argv
[0],"port") && argc
== 2) {
1261 server
.port
= atoi(argv
[1]);
1262 if (server
.port
< 1 || server
.port
> 65535) {
1263 err
= "Invalid port"; goto loaderr
;
1265 } else if (!strcasecmp(argv
[0],"bind") && argc
== 2) {
1266 server
.bindaddr
= zstrdup(argv
[1]);
1267 } else if (!strcasecmp(argv
[0],"save") && argc
== 3) {
1268 int seconds
= atoi(argv
[1]);
1269 int changes
= atoi(argv
[2]);
1270 if (seconds
< 1 || changes
< 0) {
1271 err
= "Invalid save parameters"; goto loaderr
;
1273 appendServerSaveParams(seconds
,changes
);
1274 } else if (!strcasecmp(argv
[0],"dir") && argc
== 2) {
1275 if (chdir(argv
[1]) == -1) {
1276 redisLog(REDIS_WARNING
,"Can't chdir to '%s': %s",
1277 argv
[1], strerror(errno
));
1280 } else if (!strcasecmp(argv
[0],"loglevel") && argc
== 2) {
1281 if (!strcasecmp(argv
[1],"debug")) server
.verbosity
= REDIS_DEBUG
;
1282 else if (!strcasecmp(argv
[1],"notice")) server
.verbosity
= REDIS_NOTICE
;
1283 else if (!strcasecmp(argv
[1],"warning")) server
.verbosity
= REDIS_WARNING
;
1285 err
= "Invalid log level. Must be one of debug, notice, warning";
1288 } else if (!strcasecmp(argv
[0],"logfile") && argc
== 2) {
1291 server
.logfile
= zstrdup(argv
[1]);
1292 if (!strcasecmp(server
.logfile
,"stdout")) {
1293 zfree(server
.logfile
);
1294 server
.logfile
= NULL
;
1296 if (server
.logfile
) {
1297 /* Test if we are able to open the file. The server will not
1298 * be able to abort just for this problem later... */
1299 logfp
= fopen(server
.logfile
,"a");
1300 if (logfp
== NULL
) {
1301 err
= sdscatprintf(sdsempty(),
1302 "Can't open the log file: %s", strerror(errno
));
1307 } else if (!strcasecmp(argv
[0],"databases") && argc
== 2) {
1308 server
.dbnum
= atoi(argv
[1]);
1309 if (server
.dbnum
< 1) {
1310 err
= "Invalid number of databases"; goto loaderr
;
1312 } else if (!strcasecmp(argv
[0],"maxclients") && argc
== 2) {
1313 server
.maxclients
= atoi(argv
[1]);
1314 } else if (!strcasecmp(argv
[0],"maxmemory") && argc
== 2) {
1315 server
.maxmemory
= strtoll(argv
[1], NULL
, 10);
1316 } else if (!strcasecmp(argv
[0],"slaveof") && argc
== 3) {
1317 server
.masterhost
= sdsnew(argv
[1]);
1318 server
.masterport
= atoi(argv
[2]);
1319 server
.replstate
= REDIS_REPL_CONNECT
;
1320 } else if (!strcasecmp(argv
[0],"masterauth") && argc
== 2) {
1321 server
.masterauth
= zstrdup(argv
[1]);
1322 } else if (!strcasecmp(argv
[0],"glueoutputbuf") && argc
== 2) {
1323 if ((server
.glueoutputbuf
= yesnotoi(argv
[1])) == -1) {
1324 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1326 } else if (!strcasecmp(argv
[0],"shareobjects") && argc
== 2) {
1327 if ((server
.shareobjects
= yesnotoi(argv
[1])) == -1) {
1328 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1330 } else if (!strcasecmp(argv
[0],"shareobjectspoolsize") && argc
== 2) {
1331 server
.sharingpoolsize
= atoi(argv
[1]);
1332 if (server
.sharingpoolsize
< 1) {
1333 err
= "invalid object sharing pool size"; goto loaderr
;
1335 } else if (!strcasecmp(argv
[0],"daemonize") && argc
== 2) {
1336 if ((server
.daemonize
= yesnotoi(argv
[1])) == -1) {
1337 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1339 } else if (!strcasecmp(argv
[0],"appendonly") && argc
== 2) {
1340 if ((server
.appendonly
= yesnotoi(argv
[1])) == -1) {
1341 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1343 } else if (!strcasecmp(argv
[0],"appendfsync") && argc
== 2) {
1344 if (!strcasecmp(argv
[1],"no")) {
1345 server
.appendfsync
= APPENDFSYNC_NO
;
1346 } else if (!strcasecmp(argv
[1],"always")) {
1347 server
.appendfsync
= APPENDFSYNC_ALWAYS
;
1348 } else if (!strcasecmp(argv
[1],"everysec")) {
1349 server
.appendfsync
= APPENDFSYNC_EVERYSEC
;
1351 err
= "argument must be 'no', 'always' or 'everysec'";
1354 } else if (!strcasecmp(argv
[0],"requirepass") && argc
== 2) {
1355 server
.requirepass
= zstrdup(argv
[1]);
1356 } else if (!strcasecmp(argv
[0],"pidfile") && argc
== 2) {
1357 server
.pidfile
= zstrdup(argv
[1]);
1358 } else if (!strcasecmp(argv
[0],"dbfilename") && argc
== 2) {
1359 server
.dbfilename
= zstrdup(argv
[1]);
1361 err
= "Bad directive or wrong number of arguments"; goto loaderr
;
1363 for (j
= 0; j
< argc
; j
++)
1368 if (fp
!= stdin
) fclose(fp
);
1372 fprintf(stderr
, "\n*** FATAL CONFIG FILE ERROR ***\n");
1373 fprintf(stderr
, "Reading the configuration file, at line %d\n", linenum
);
1374 fprintf(stderr
, ">>> '%s'\n", line
);
1375 fprintf(stderr
, "%s\n", err
);
1379 static void freeClientArgv(redisClient
*c
) {
1382 for (j
= 0; j
< c
->argc
; j
++)
1383 decrRefCount(c
->argv
[j
]);
1384 for (j
= 0; j
< c
->mbargc
; j
++)
1385 decrRefCount(c
->mbargv
[j
]);
1390 static void freeClient(redisClient
*c
) {
1393 aeDeleteFileEvent(server
.el
,c
->fd
,AE_READABLE
);
1394 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1395 sdsfree(c
->querybuf
);
1396 listRelease(c
->reply
);
1399 ln
= listSearchKey(server
.clients
,c
);
1401 listDelNode(server
.clients
,ln
);
1402 if (c
->flags
& REDIS_SLAVE
) {
1403 if (c
->replstate
== REDIS_REPL_SEND_BULK
&& c
->repldbfd
!= -1)
1405 list
*l
= (c
->flags
& REDIS_MONITOR
) ? server
.monitors
: server
.slaves
;
1406 ln
= listSearchKey(l
,c
);
1410 if (c
->flags
& REDIS_MASTER
) {
1411 server
.master
= NULL
;
1412 server
.replstate
= REDIS_REPL_CONNECT
;
1419 #define GLUEREPLY_UP_TO (1024)
1420 static void glueReplyBuffersIfNeeded(redisClient
*c
) {
1422 char buf
[GLUEREPLY_UP_TO
];
1426 listRewind(c
->reply
);
1427 while((ln
= listYield(c
->reply
))) {
1431 objlen
= sdslen(o
->ptr
);
1432 if (copylen
+ objlen
<= GLUEREPLY_UP_TO
) {
1433 memcpy(buf
+copylen
,o
->ptr
,objlen
);
1435 listDelNode(c
->reply
,ln
);
1437 if (copylen
== 0) return;
1441 /* Now the output buffer is empty, add the new single element */
1442 o
= createObject(REDIS_STRING
,sdsnewlen(buf
,copylen
));
1443 listAddNodeHead(c
->reply
,o
);
1446 static void sendReplyToClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1447 redisClient
*c
= privdata
;
1448 int nwritten
= 0, totwritten
= 0, objlen
;
1451 REDIS_NOTUSED(mask
);
1453 /* Use writev() if we have enough buffers to send */
1454 if (!server
.glueoutputbuf
&&
1455 listLength(c
->reply
) > REDIS_WRITEV_THRESHOLD
&&
1456 !(c
->flags
& REDIS_MASTER
))
1458 sendReplyToClientWritev(el
, fd
, privdata
, mask
);
1462 while(listLength(c
->reply
)) {
1463 if (server
.glueoutputbuf
&& listLength(c
->reply
) > 1)
1464 glueReplyBuffersIfNeeded(c
);
1466 o
= listNodeValue(listFirst(c
->reply
));
1467 objlen
= sdslen(o
->ptr
);
1470 listDelNode(c
->reply
,listFirst(c
->reply
));
1474 if (c
->flags
& REDIS_MASTER
) {
1475 /* Don't reply to a master */
1476 nwritten
= objlen
- c
->sentlen
;
1478 nwritten
= write(fd
, ((char*)o
->ptr
)+c
->sentlen
, objlen
- c
->sentlen
);
1479 if (nwritten
<= 0) break;
1481 c
->sentlen
+= nwritten
;
1482 totwritten
+= nwritten
;
1483 /* If we fully sent the object on head go to the next one */
1484 if (c
->sentlen
== objlen
) {
1485 listDelNode(c
->reply
,listFirst(c
->reply
));
1488 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1489 * bytes, in a single threaded server it's a good idea to serve
1490 * other clients as well, even if a very large request comes from
1491 * super fast link that is always able to accept data (in real world
1492 * scenario think about 'KEYS *' against the loopback interfae) */
1493 if (totwritten
> REDIS_MAX_WRITE_PER_EVENT
) break;
1495 if (nwritten
== -1) {
1496 if (errno
== EAGAIN
) {
1499 redisLog(REDIS_DEBUG
,
1500 "Error writing to client: %s", strerror(errno
));
1505 if (totwritten
> 0) c
->lastinteraction
= time(NULL
);
1506 if (listLength(c
->reply
) == 0) {
1508 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1512 static void sendReplyToClientWritev(aeEventLoop
*el
, int fd
, void *privdata
, int mask
)
1514 redisClient
*c
= privdata
;
1515 int nwritten
= 0, totwritten
= 0, objlen
, willwrite
;
1517 struct iovec iov
[REDIS_WRITEV_IOVEC_COUNT
];
1518 int offset
, ion
= 0;
1520 REDIS_NOTUSED(mask
);
1523 while (listLength(c
->reply
)) {
1524 offset
= c
->sentlen
;
1528 /* fill-in the iov[] array */
1529 for(node
= listFirst(c
->reply
); node
; node
= listNextNode(node
)) {
1530 o
= listNodeValue(node
);
1531 objlen
= sdslen(o
->ptr
);
1533 if (totwritten
+ objlen
- offset
> REDIS_MAX_WRITE_PER_EVENT
)
1536 if(ion
== REDIS_WRITEV_IOVEC_COUNT
)
1537 break; /* no more iovecs */
1539 iov
[ion
].iov_base
= ((char*)o
->ptr
) + offset
;
1540 iov
[ion
].iov_len
= objlen
- offset
;
1541 willwrite
+= objlen
- offset
;
1542 offset
= 0; /* just for the first item */
1549 /* write all collected blocks at once */
1550 if((nwritten
= writev(fd
, iov
, ion
)) < 0) {
1551 if (errno
!= EAGAIN
) {
1552 redisLog(REDIS_DEBUG
,
1553 "Error writing to client: %s", strerror(errno
));
1560 totwritten
+= nwritten
;
1561 offset
= c
->sentlen
;
1563 /* remove written robjs from c->reply */
1564 while (nwritten
&& listLength(c
->reply
)) {
1565 o
= listNodeValue(listFirst(c
->reply
));
1566 objlen
= sdslen(o
->ptr
);
1568 if(nwritten
>= objlen
- offset
) {
1569 listDelNode(c
->reply
, listFirst(c
->reply
));
1570 nwritten
-= objlen
- offset
;
1574 c
->sentlen
+= nwritten
;
1582 c
->lastinteraction
= time(NULL
);
1584 if (listLength(c
->reply
) == 0) {
1586 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1590 static struct redisCommand
*lookupCommand(char *name
) {
1592 while(cmdTable
[j
].name
!= NULL
) {
1593 if (!strcasecmp(name
,cmdTable
[j
].name
)) return &cmdTable
[j
];
1599 /* resetClient prepare the client to process the next command */
1600 static void resetClient(redisClient
*c
) {
1606 /* If this function gets called we already read a whole
1607 * command, argments are in the client argv/argc fields.
1608 * processCommand() execute the command or prepare the
1609 * server for a bulk read from the client.
1611 * If 1 is returned the client is still alive and valid and
1612 * and other operations can be performed by the caller. Otherwise
1613 * if 0 is returned the client was destroied (i.e. after QUIT). */
1614 static int processCommand(redisClient
*c
) {
1615 struct redisCommand
*cmd
;
1618 /* Free some memory if needed (maxmemory setting) */
1619 if (server
.maxmemory
) freeMemoryIfNeeded();
1621 /* Handle the multi bulk command type. This is an alternative protocol
1622 * supported by Redis in order to receive commands that are composed of
1623 * multiple binary-safe "bulk" arguments. The latency of processing is
1624 * a bit higher but this allows things like multi-sets, so if this
1625 * protocol is used only for MSET and similar commands this is a big win. */
1626 if (c
->multibulk
== 0 && c
->argc
== 1 && ((char*)(c
->argv
[0]->ptr
))[0] == '*') {
1627 c
->multibulk
= atoi(((char*)c
->argv
[0]->ptr
)+1);
1628 if (c
->multibulk
<= 0) {
1632 decrRefCount(c
->argv
[c
->argc
-1]);
1636 } else if (c
->multibulk
) {
1637 if (c
->bulklen
== -1) {
1638 if (((char*)c
->argv
[0]->ptr
)[0] != '$') {
1639 addReplySds(c
,sdsnew("-ERR multi bulk protocol error\r\n"));
1643 int bulklen
= atoi(((char*)c
->argv
[0]->ptr
)+1);
1644 decrRefCount(c
->argv
[0]);
1645 if (bulklen
< 0 || bulklen
> 1024*1024*1024) {
1647 addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n"));
1652 c
->bulklen
= bulklen
+2; /* add two bytes for CR+LF */
1656 c
->mbargv
= zrealloc(c
->mbargv
,(sizeof(robj
*))*(c
->mbargc
+1));
1657 c
->mbargv
[c
->mbargc
] = c
->argv
[0];
1661 if (c
->multibulk
== 0) {
1665 /* Here we need to swap the multi-bulk argc/argv with the
1666 * normal argc/argv of the client structure. */
1668 c
->argv
= c
->mbargv
;
1669 c
->mbargv
= auxargv
;
1672 c
->argc
= c
->mbargc
;
1673 c
->mbargc
= auxargc
;
1675 /* We need to set bulklen to something different than -1
1676 * in order for the code below to process the command without
1677 * to try to read the last argument of a bulk command as
1678 * a special argument. */
1680 /* continue below and process the command */
1687 /* -- end of multi bulk commands processing -- */
1689 /* The QUIT command is handled as a special case. Normal command
1690 * procs are unable to close the client connection safely */
1691 if (!strcasecmp(c
->argv
[0]->ptr
,"quit")) {
1695 cmd
= lookupCommand(c
->argv
[0]->ptr
);
1697 addReplySds(c
,sdsnew("-ERR unknown command\r\n"));
1700 } else if ((cmd
->arity
> 0 && cmd
->arity
!= c
->argc
) ||
1701 (c
->argc
< -cmd
->arity
)) {
1702 addReplySds(c
,sdsnew("-ERR wrong number of arguments\r\n"));
1705 } else if (server
.maxmemory
&& cmd
->flags
& REDIS_CMD_DENYOOM
&& zmalloc_used_memory() > server
.maxmemory
) {
1706 addReplySds(c
,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1709 } else if (cmd
->flags
& REDIS_CMD_BULK
&& c
->bulklen
== -1) {
1710 int bulklen
= atoi(c
->argv
[c
->argc
-1]->ptr
);
1712 decrRefCount(c
->argv
[c
->argc
-1]);
1713 if (bulklen
< 0 || bulklen
> 1024*1024*1024) {
1715 addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n"));
1720 c
->bulklen
= bulklen
+2; /* add two bytes for CR+LF */
1721 /* It is possible that the bulk read is already in the
1722 * buffer. Check this condition and handle it accordingly.
1723 * This is just a fast path, alternative to call processInputBuffer().
1724 * It's a good idea since the code is small and this condition
1725 * happens most of the times. */
1726 if ((signed)sdslen(c
->querybuf
) >= c
->bulklen
) {
1727 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
1729 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
1734 /* Let's try to share objects on the command arguments vector */
1735 if (server
.shareobjects
) {
1737 for(j
= 1; j
< c
->argc
; j
++)
1738 c
->argv
[j
] = tryObjectSharing(c
->argv
[j
]);
1740 /* Let's try to encode the bulk object to save space. */
1741 if (cmd
->flags
& REDIS_CMD_BULK
)
1742 tryObjectEncoding(c
->argv
[c
->argc
-1]);
1744 /* Check if the user is authenticated */
1745 if (server
.requirepass
&& !c
->authenticated
&& cmd
->proc
!= authCommand
) {
1746 addReplySds(c
,sdsnew("-ERR operation not permitted\r\n"));
1751 /* Exec the command */
1752 dirty
= server
.dirty
;
1754 if (server
.appendonly
&& server
.dirty
-dirty
)
1755 feedAppendOnlyFile(cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1756 if (server
.dirty
-dirty
&& listLength(server
.slaves
))
1757 replicationFeedSlaves(server
.slaves
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1758 if (listLength(server
.monitors
))
1759 replicationFeedSlaves(server
.monitors
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1760 server
.stat_numcommands
++;
1762 /* Prepare the client for the next command */
1763 if (c
->flags
& REDIS_CLOSE
) {
1771 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
) {
1775 /* (args*2)+1 is enough room for args, spaces, newlines */
1776 robj
*static_outv
[REDIS_STATIC_ARGS
*2+1];
1778 if (argc
<= REDIS_STATIC_ARGS
) {
1781 outv
= zmalloc(sizeof(robj
*)*(argc
*2+1));
1784 for (j
= 0; j
< argc
; j
++) {
1785 if (j
!= 0) outv
[outc
++] = shared
.space
;
1786 if ((cmd
->flags
& REDIS_CMD_BULK
) && j
== argc
-1) {
1789 lenobj
= createObject(REDIS_STRING
,
1790 sdscatprintf(sdsempty(),"%d\r\n",
1791 stringObjectLen(argv
[j
])));
1792 lenobj
->refcount
= 0;
1793 outv
[outc
++] = lenobj
;
1795 outv
[outc
++] = argv
[j
];
1797 outv
[outc
++] = shared
.crlf
;
1799 /* Increment all the refcounts at start and decrement at end in order to
1800 * be sure to free objects if there is no slave in a replication state
1801 * able to be feed with commands */
1802 for (j
= 0; j
< outc
; j
++) incrRefCount(outv
[j
]);
1804 while((ln
= listYield(slaves
))) {
1805 redisClient
*slave
= ln
->value
;
1807 /* Don't feed slaves that are still waiting for BGSAVE to start */
1808 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) continue;
1810 /* Feed all the other slaves, MONITORs and so on */
1811 if (slave
->slaveseldb
!= dictid
) {
1815 case 0: selectcmd
= shared
.select0
; break;
1816 case 1: selectcmd
= shared
.select1
; break;
1817 case 2: selectcmd
= shared
.select2
; break;
1818 case 3: selectcmd
= shared
.select3
; break;
1819 case 4: selectcmd
= shared
.select4
; break;
1820 case 5: selectcmd
= shared
.select5
; break;
1821 case 6: selectcmd
= shared
.select6
; break;
1822 case 7: selectcmd
= shared
.select7
; break;
1823 case 8: selectcmd
= shared
.select8
; break;
1824 case 9: selectcmd
= shared
.select9
; break;
1826 selectcmd
= createObject(REDIS_STRING
,
1827 sdscatprintf(sdsempty(),"select %d\r\n",dictid
));
1828 selectcmd
->refcount
= 0;
1831 addReply(slave
,selectcmd
);
1832 slave
->slaveseldb
= dictid
;
1834 for (j
= 0; j
< outc
; j
++) addReply(slave
,outv
[j
]);
1836 for (j
= 0; j
< outc
; j
++) decrRefCount(outv
[j
]);
1837 if (outv
!= static_outv
) zfree(outv
);
1840 static void processInputBuffer(redisClient
*c
) {
1842 if (c
->bulklen
== -1) {
1843 /* Read the first line of the query */
1844 char *p
= strchr(c
->querybuf
,'\n');
1851 query
= c
->querybuf
;
1852 c
->querybuf
= sdsempty();
1853 querylen
= 1+(p
-(query
));
1854 if (sdslen(query
) > querylen
) {
1855 /* leave data after the first line of the query in the buffer */
1856 c
->querybuf
= sdscatlen(c
->querybuf
,query
+querylen
,sdslen(query
)-querylen
);
1858 *p
= '\0'; /* remove "\n" */
1859 if (*(p
-1) == '\r') *(p
-1) = '\0'; /* and "\r" if any */
1860 sdsupdatelen(query
);
1862 /* Now we can split the query in arguments */
1863 if (sdslen(query
) == 0) {
1864 /* Ignore empty query */
1868 argv
= sdssplitlen(query
,sdslen(query
)," ",1,&argc
);
1871 if (c
->argv
) zfree(c
->argv
);
1872 c
->argv
= zmalloc(sizeof(robj
*)*argc
);
1874 for (j
= 0; j
< argc
; j
++) {
1875 if (sdslen(argv
[j
])) {
1876 c
->argv
[c
->argc
] = createObject(REDIS_STRING
,argv
[j
]);
1883 /* Execute the command. If the client is still valid
1884 * after processCommand() return and there is something
1885 * on the query buffer try to process the next command. */
1886 if (c
->argc
&& processCommand(c
) && sdslen(c
->querybuf
)) goto again
;
1888 } else if (sdslen(c
->querybuf
) >= REDIS_REQUEST_MAX_SIZE
) {
1889 redisLog(REDIS_DEBUG
, "Client protocol error");
1894 /* Bulk read handling. Note that if we are at this point
1895 the client already sent a command terminated with a newline,
1896 we are reading the bulk data that is actually the last
1897 argument of the command. */
1898 int qbl
= sdslen(c
->querybuf
);
1900 if (c
->bulklen
<= qbl
) {
1901 /* Copy everything but the final CRLF as final argument */
1902 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
1904 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
1905 /* Process the command. If the client is still valid after
1906 * the processing and there is more data in the buffer
1907 * try to parse it. */
1908 if (processCommand(c
) && sdslen(c
->querybuf
)) goto again
;
1914 static void readQueryFromClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1915 redisClient
*c
= (redisClient
*) privdata
;
1916 char buf
[REDIS_IOBUF_LEN
];
1919 REDIS_NOTUSED(mask
);
1921 nread
= read(fd
, buf
, REDIS_IOBUF_LEN
);
1923 if (errno
== EAGAIN
) {
1926 redisLog(REDIS_DEBUG
, "Reading from client: %s",strerror(errno
));
1930 } else if (nread
== 0) {
1931 redisLog(REDIS_DEBUG
, "Client closed connection");
1936 c
->querybuf
= sdscatlen(c
->querybuf
, buf
, nread
);
1937 c
->lastinteraction
= time(NULL
);
1941 processInputBuffer(c
);
1944 static int selectDb(redisClient
*c
, int id
) {
1945 if (id
< 0 || id
>= server
.dbnum
)
1947 c
->db
= &server
.db
[id
];
1951 static void *dupClientReplyValue(void *o
) {
1952 incrRefCount((robj
*)o
);
1956 static redisClient
*createClient(int fd
) {
1957 redisClient
*c
= zmalloc(sizeof(*c
));
1959 anetNonBlock(NULL
,fd
);
1960 anetTcpNoDelay(NULL
,fd
);
1961 if (!c
) return NULL
;
1964 c
->querybuf
= sdsempty();
1973 c
->lastinteraction
= time(NULL
);
1974 c
->authenticated
= 0;
1975 c
->replstate
= REDIS_REPL_NONE
;
1976 c
->reply
= listCreate();
1977 listSetFreeMethod(c
->reply
,decrRefCount
);
1978 listSetDupMethod(c
->reply
,dupClientReplyValue
);
1979 if (aeCreateFileEvent(server
.el
, c
->fd
, AE_READABLE
,
1980 readQueryFromClient
, c
) == AE_ERR
) {
1984 listAddNodeTail(server
.clients
,c
);
1988 static void addReply(redisClient
*c
, robj
*obj
) {
1989 if (listLength(c
->reply
) == 0 &&
1990 (c
->replstate
== REDIS_REPL_NONE
||
1991 c
->replstate
== REDIS_REPL_ONLINE
) &&
1992 aeCreateFileEvent(server
.el
, c
->fd
, AE_WRITABLE
,
1993 sendReplyToClient
, c
) == AE_ERR
) return;
1994 listAddNodeTail(c
->reply
,getDecodedObject(obj
));
1997 static void addReplySds(redisClient
*c
, sds s
) {
1998 robj
*o
= createObject(REDIS_STRING
,s
);
2003 static void addReplyDouble(redisClient
*c
, double d
) {
2006 snprintf(buf
,sizeof(buf
),"%.17g",d
);
2007 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n%s\r\n",
2011 static void addReplyBulkLen(redisClient
*c
, robj
*obj
) {
2014 if (obj
->encoding
== REDIS_ENCODING_RAW
) {
2015 len
= sdslen(obj
->ptr
);
2017 long n
= (long)obj
->ptr
;
2024 while((n
= n
/10) != 0) {
2028 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",len
));
2031 static void acceptHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
2036 REDIS_NOTUSED(mask
);
2037 REDIS_NOTUSED(privdata
);
2039 cfd
= anetAccept(server
.neterr
, fd
, cip
, &cport
);
2040 if (cfd
== AE_ERR
) {
2041 redisLog(REDIS_DEBUG
,"Accepting client connection: %s", server
.neterr
);
2044 redisLog(REDIS_DEBUG
,"Accepted %s:%d", cip
, cport
);
2045 if ((c
= createClient(cfd
)) == NULL
) {
2046 redisLog(REDIS_WARNING
,"Error allocating resoures for the client");
2047 close(cfd
); /* May be already closed, just ingore errors */
2050 /* If maxclient directive is set and this is one client more... close the
2051 * connection. Note that we create the client instead to check before
2052 * for this condition, since now the socket is already set in nonblocking
2053 * mode and we can send an error for free using the Kernel I/O */
2054 if (server
.maxclients
&& listLength(server
.clients
) > server
.maxclients
) {
2055 char *err
= "-ERR max number of clients reached\r\n";
2057 /* That's a best effort error message, don't check write errors */
2058 if (write(c
->fd
,err
,strlen(err
)) == -1) {
2059 /* Nothing to do, Just to avoid the warning... */
2064 server
.stat_numconnections
++;
2067 /* ======================= Redis objects implementation ===================== */
2069 static robj
*createObject(int type
, void *ptr
) {
2072 if (listLength(server
.objfreelist
)) {
2073 listNode
*head
= listFirst(server
.objfreelist
);
2074 o
= listNodeValue(head
);
2075 listDelNode(server
.objfreelist
,head
);
2077 o
= zmalloc(sizeof(*o
));
2080 o
->encoding
= REDIS_ENCODING_RAW
;
2086 static robj
*createStringObject(char *ptr
, size_t len
) {
2087 return createObject(REDIS_STRING
,sdsnewlen(ptr
,len
));
2090 static robj
*createListObject(void) {
2091 list
*l
= listCreate();
2093 listSetFreeMethod(l
,decrRefCount
);
2094 return createObject(REDIS_LIST
,l
);
2097 static robj
*createSetObject(void) {
2098 dict
*d
= dictCreate(&setDictType
,NULL
);
2099 return createObject(REDIS_SET
,d
);
2102 static robj
*createZsetObject(void) {
2103 zset
*zs
= zmalloc(sizeof(*zs
));
2105 zs
->dict
= dictCreate(&zsetDictType
,NULL
);
2106 zs
->zsl
= zslCreate();
2107 return createObject(REDIS_ZSET
,zs
);
2110 static void freeStringObject(robj
*o
) {
2111 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2116 static void freeListObject(robj
*o
) {
2117 listRelease((list
*) o
->ptr
);
2120 static void freeSetObject(robj
*o
) {
2121 dictRelease((dict
*) o
->ptr
);
2124 static void freeZsetObject(robj
*o
) {
2127 dictRelease(zs
->dict
);
2132 static void freeHashObject(robj
*o
) {
2133 dictRelease((dict
*) o
->ptr
);
2136 static void incrRefCount(robj
*o
) {
2138 #ifdef DEBUG_REFCOUNT
2139 if (o
->type
== REDIS_STRING
)
2140 printf("Increment '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
);
2144 static void decrRefCount(void *obj
) {
2147 #ifdef DEBUG_REFCOUNT
2148 if (o
->type
== REDIS_STRING
)
2149 printf("Decrement '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
-1);
2151 if (--(o
->refcount
) == 0) {
2153 case REDIS_STRING
: freeStringObject(o
); break;
2154 case REDIS_LIST
: freeListObject(o
); break;
2155 case REDIS_SET
: freeSetObject(o
); break;
2156 case REDIS_ZSET
: freeZsetObject(o
); break;
2157 case REDIS_HASH
: freeHashObject(o
); break;
2158 default: assert(0 != 0); break;
2160 if (listLength(server
.objfreelist
) > REDIS_OBJFREELIST_MAX
||
2161 !listAddNodeHead(server
.objfreelist
,o
))
2166 static robj
*lookupKey(redisDb
*db
, robj
*key
) {
2167 dictEntry
*de
= dictFind(db
->dict
,key
);
2168 return de
? dictGetEntryVal(de
) : NULL
;
2171 static robj
*lookupKeyRead(redisDb
*db
, robj
*key
) {
2172 expireIfNeeded(db
,key
);
2173 return lookupKey(db
,key
);
2176 static robj
*lookupKeyWrite(redisDb
*db
, robj
*key
) {
2177 deleteIfVolatile(db
,key
);
2178 return lookupKey(db
,key
);
2181 static int deleteKey(redisDb
*db
, robj
*key
) {
2184 /* We need to protect key from destruction: after the first dictDelete()
2185 * it may happen that 'key' is no longer valid if we don't increment
2186 * it's count. This may happen when we get the object reference directly
2187 * from the hash table with dictRandomKey() or dict iterators */
2189 if (dictSize(db
->expires
)) dictDelete(db
->expires
,key
);
2190 retval
= dictDelete(db
->dict
,key
);
2193 return retval
== DICT_OK
;
2196 /* Try to share an object against the shared objects pool */
2197 static robj
*tryObjectSharing(robj
*o
) {
2198 struct dictEntry
*de
;
2201 if (o
== NULL
|| server
.shareobjects
== 0) return o
;
2203 assert(o
->type
== REDIS_STRING
);
2204 de
= dictFind(server
.sharingpool
,o
);
2206 robj
*shared
= dictGetEntryKey(de
);
2208 c
= ((unsigned long) dictGetEntryVal(de
))+1;
2209 dictGetEntryVal(de
) = (void*) c
;
2210 incrRefCount(shared
);
2214 /* Here we are using a stream algorihtm: Every time an object is
2215 * shared we increment its count, everytime there is a miss we
2216 * recrement the counter of a random object. If this object reaches
2217 * zero we remove the object and put the current object instead. */
2218 if (dictSize(server
.sharingpool
) >=
2219 server
.sharingpoolsize
) {
2220 de
= dictGetRandomKey(server
.sharingpool
);
2222 c
= ((unsigned long) dictGetEntryVal(de
))-1;
2223 dictGetEntryVal(de
) = (void*) c
;
2225 dictDelete(server
.sharingpool
,de
->key
);
2228 c
= 0; /* If the pool is empty we want to add this object */
2233 retval
= dictAdd(server
.sharingpool
,o
,(void*)1);
2234 assert(retval
== DICT_OK
);
2241 /* Check if the nul-terminated string 's' can be represented by a long
2242 * (that is, is a number that fits into long without any other space or
2243 * character before or after the digits).
2245 * If so, the function returns REDIS_OK and *longval is set to the value
2246 * of the number. Otherwise REDIS_ERR is returned */
2247 static int isStringRepresentableAsLong(sds s
, long *longval
) {
2248 char buf
[32], *endptr
;
2252 value
= strtol(s
, &endptr
, 10);
2253 if (endptr
[0] != '\0') return REDIS_ERR
;
2254 slen
= snprintf(buf
,32,"%ld",value
);
2256 /* If the number converted back into a string is not identical
2257 * then it's not possible to encode the string as integer */
2258 if (sdslen(s
) != (unsigned)slen
|| memcmp(buf
,s
,slen
)) return REDIS_ERR
;
2259 if (longval
) *longval
= value
;
2263 /* Try to encode a string object in order to save space */
2264 static int tryObjectEncoding(robj
*o
) {
2268 if (o
->encoding
!= REDIS_ENCODING_RAW
)
2269 return REDIS_ERR
; /* Already encoded */
2271 /* It's not save to encode shared objects: shared objects can be shared
2272 * everywhere in the "object space" of Redis. Encoded objects can only
2273 * appear as "values" (and not, for instance, as keys) */
2274 if (o
->refcount
> 1) return REDIS_ERR
;
2276 /* Currently we try to encode only strings */
2277 assert(o
->type
== REDIS_STRING
);
2279 /* Check if we can represent this string as a long integer */
2280 if (isStringRepresentableAsLong(s
,&value
) == REDIS_ERR
) return REDIS_ERR
;
2282 /* Ok, this object can be encoded */
2283 o
->encoding
= REDIS_ENCODING_INT
;
2285 o
->ptr
= (void*) value
;
2289 /* Get a decoded version of an encoded object (returned as a new object).
2290 * If the object is already raw-encoded just increment the ref count. */
2291 static robj
*getDecodedObject(robj
*o
) {
2294 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2298 if (o
->type
== REDIS_STRING
&& o
->encoding
== REDIS_ENCODING_INT
) {
2301 snprintf(buf
,32,"%ld",(long)o
->ptr
);
2302 dec
= createStringObject(buf
,strlen(buf
));
2309 /* Compare two string objects via strcmp() or alike.
2310 * Note that the objects may be integer-encoded. In such a case we
2311 * use snprintf() to get a string representation of the numbers on the stack
2312 * and compare the strings, it's much faster than calling getDecodedObject(). */
2313 static int compareStringObjects(robj
*a
, robj
*b
) {
2314 assert(a
->type
== REDIS_STRING
&& b
->type
== REDIS_STRING
);
2315 char bufa
[128], bufb
[128], *astr
, *bstr
;
2318 if (a
== b
) return 0;
2319 if (a
->encoding
!= REDIS_ENCODING_RAW
) {
2320 snprintf(bufa
,sizeof(bufa
),"%ld",(long) a
->ptr
);
2326 if (b
->encoding
!= REDIS_ENCODING_RAW
) {
2327 snprintf(bufb
,sizeof(bufb
),"%ld",(long) b
->ptr
);
2333 return bothsds
? sdscmp(astr
,bstr
) : strcmp(astr
,bstr
);
2336 static size_t stringObjectLen(robj
*o
) {
2337 assert(o
->type
== REDIS_STRING
);
2338 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2339 return sdslen(o
->ptr
);
2343 return snprintf(buf
,32,"%ld",(long)o
->ptr
);
2347 /*============================ DB saving/loading ============================ */
2349 static int rdbSaveType(FILE *fp
, unsigned char type
) {
2350 if (fwrite(&type
,1,1,fp
) == 0) return -1;
2354 static int rdbSaveTime(FILE *fp
, time_t t
) {
2355 int32_t t32
= (int32_t) t
;
2356 if (fwrite(&t32
,4,1,fp
) == 0) return -1;
2360 /* check rdbLoadLen() comments for more info */
2361 static int rdbSaveLen(FILE *fp
, uint32_t len
) {
2362 unsigned char buf
[2];
2365 /* Save a 6 bit len */
2366 buf
[0] = (len
&0xFF)|(REDIS_RDB_6BITLEN
<<6);
2367 if (fwrite(buf
,1,1,fp
) == 0) return -1;
2368 } else if (len
< (1<<14)) {
2369 /* Save a 14 bit len */
2370 buf
[0] = ((len
>>8)&0xFF)|(REDIS_RDB_14BITLEN
<<6);
2372 if (fwrite(buf
,2,1,fp
) == 0) return -1;
2374 /* Save a 32 bit len */
2375 buf
[0] = (REDIS_RDB_32BITLEN
<<6);
2376 if (fwrite(buf
,1,1,fp
) == 0) return -1;
2378 if (fwrite(&len
,4,1,fp
) == 0) return -1;
2383 /* String objects in the form "2391" "-100" without any space and with a
2384 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2385 * encoded as integers to save space */
2386 static int rdbTryIntegerEncoding(sds s
, unsigned char *enc
) {
2388 char *endptr
, buf
[32];
2390 /* Check if it's possible to encode this value as a number */
2391 value
= strtoll(s
, &endptr
, 10);
2392 if (endptr
[0] != '\0') return 0;
2393 snprintf(buf
,32,"%lld",value
);
2395 /* If the number converted back into a string is not identical
2396 * then it's not possible to encode the string as integer */
2397 if (strlen(buf
) != sdslen(s
) || memcmp(buf
,s
,sdslen(s
))) return 0;
2399 /* Finally check if it fits in our ranges */
2400 if (value
>= -(1<<7) && value
<= (1<<7)-1) {
2401 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT8
;
2402 enc
[1] = value
&0xFF;
2404 } else if (value
>= -(1<<15) && value
<= (1<<15)-1) {
2405 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT16
;
2406 enc
[1] = value
&0xFF;
2407 enc
[2] = (value
>>8)&0xFF;
2409 } else if (value
>= -((long long)1<<31) && value
<= ((long long)1<<31)-1) {
2410 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT32
;
2411 enc
[1] = value
&0xFF;
2412 enc
[2] = (value
>>8)&0xFF;
2413 enc
[3] = (value
>>16)&0xFF;
2414 enc
[4] = (value
>>24)&0xFF;
2421 static int rdbSaveLzfStringObject(FILE *fp
, robj
*obj
) {
2422 unsigned int comprlen
, outlen
;
2426 /* We require at least four bytes compression for this to be worth it */
2427 outlen
= sdslen(obj
->ptr
)-4;
2428 if (outlen
<= 0) return 0;
2429 if ((out
= zmalloc(outlen
+1)) == NULL
) return 0;
2430 comprlen
= lzf_compress(obj
->ptr
, sdslen(obj
->ptr
), out
, outlen
);
2431 if (comprlen
== 0) {
2435 /* Data compressed! Let's save it on disk */
2436 byte
= (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_LZF
;
2437 if (fwrite(&byte
,1,1,fp
) == 0) goto writeerr
;
2438 if (rdbSaveLen(fp
,comprlen
) == -1) goto writeerr
;
2439 if (rdbSaveLen(fp
,sdslen(obj
->ptr
)) == -1) goto writeerr
;
2440 if (fwrite(out
,comprlen
,1,fp
) == 0) goto writeerr
;
2449 /* Save a string objet as [len][data] on disk. If the object is a string
2450 * representation of an integer value we try to safe it in a special form */
2451 static int rdbSaveStringObjectRaw(FILE *fp
, robj
*obj
) {
2455 len
= sdslen(obj
->ptr
);
2457 /* Try integer encoding */
2459 unsigned char buf
[5];
2460 if ((enclen
= rdbTryIntegerEncoding(obj
->ptr
,buf
)) > 0) {
2461 if (fwrite(buf
,enclen
,1,fp
) == 0) return -1;
2466 /* Try LZF compression - under 20 bytes it's unable to compress even
2467 * aaaaaaaaaaaaaaaaaa so skip it */
2471 retval
= rdbSaveLzfStringObject(fp
,obj
);
2472 if (retval
== -1) return -1;
2473 if (retval
> 0) return 0;
2474 /* retval == 0 means data can't be compressed, save the old way */
2477 /* Store verbatim */
2478 if (rdbSaveLen(fp
,len
) == -1) return -1;
2479 if (len
&& fwrite(obj
->ptr
,len
,1,fp
) == 0) return -1;
2483 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2484 static int rdbSaveStringObject(FILE *fp
, robj
*obj
) {
2487 obj
= getDecodedObject(obj
);
2488 retval
= rdbSaveStringObjectRaw(fp
,obj
);
2493 /* Save a double value. Doubles are saved as strings prefixed by an unsigned
2494 * 8 bit integer specifing the length of the representation.
2495 * This 8 bit integer has special values in order to specify the following
2501 static int rdbSaveDoubleValue(FILE *fp
, double val
) {
2502 unsigned char buf
[128];
2508 } else if (!isfinite(val
)) {
2510 buf
[0] = (val
< 0) ? 255 : 254;
2512 snprintf((char*)buf
+1,sizeof(buf
)-1,"%.17g",val
);
2513 buf
[0] = strlen((char*)buf
+1);
2516 if (fwrite(buf
,len
,1,fp
) == 0) return -1;
2520 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2521 static int rdbSave(char *filename
) {
2522 dictIterator
*di
= NULL
;
2527 time_t now
= time(NULL
);
2529 snprintf(tmpfile
,256,"temp-%d.rdb", (int) getpid());
2530 fp
= fopen(tmpfile
,"w");
2532 redisLog(REDIS_WARNING
, "Failed saving the DB: %s", strerror(errno
));
2535 if (fwrite("REDIS0001",9,1,fp
) == 0) goto werr
;
2536 for (j
= 0; j
< server
.dbnum
; j
++) {
2537 redisDb
*db
= server
.db
+j
;
2539 if (dictSize(d
) == 0) continue;
2540 di
= dictGetIterator(d
);
2546 /* Write the SELECT DB opcode */
2547 if (rdbSaveType(fp
,REDIS_SELECTDB
) == -1) goto werr
;
2548 if (rdbSaveLen(fp
,j
) == -1) goto werr
;
2550 /* Iterate this DB writing every entry */
2551 while((de
= dictNext(di
)) != NULL
) {
2552 robj
*key
= dictGetEntryKey(de
);
2553 robj
*o
= dictGetEntryVal(de
);
2554 time_t expiretime
= getExpire(db
,key
);
2556 /* Save the expire time */
2557 if (expiretime
!= -1) {
2558 /* If this key is already expired skip it */
2559 if (expiretime
< now
) continue;
2560 if (rdbSaveType(fp
,REDIS_EXPIRETIME
) == -1) goto werr
;
2561 if (rdbSaveTime(fp
,expiretime
) == -1) goto werr
;
2563 /* Save the key and associated value */
2564 if (rdbSaveType(fp
,o
->type
) == -1) goto werr
;
2565 if (rdbSaveStringObject(fp
,key
) == -1) goto werr
;
2566 if (o
->type
== REDIS_STRING
) {
2567 /* Save a string value */
2568 if (rdbSaveStringObject(fp
,o
) == -1) goto werr
;
2569 } else if (o
->type
== REDIS_LIST
) {
2570 /* Save a list value */
2571 list
*list
= o
->ptr
;
2575 if (rdbSaveLen(fp
,listLength(list
)) == -1) goto werr
;
2576 while((ln
= listYield(list
))) {
2577 robj
*eleobj
= listNodeValue(ln
);
2579 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
2581 } else if (o
->type
== REDIS_SET
) {
2582 /* Save a set value */
2584 dictIterator
*di
= dictGetIterator(set
);
2587 if (rdbSaveLen(fp
,dictSize(set
)) == -1) goto werr
;
2588 while((de
= dictNext(di
)) != NULL
) {
2589 robj
*eleobj
= dictGetEntryKey(de
);
2591 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
2593 dictReleaseIterator(di
);
2594 } else if (o
->type
== REDIS_ZSET
) {
2595 /* Save a set value */
2597 dictIterator
*di
= dictGetIterator(zs
->dict
);
2600 if (rdbSaveLen(fp
,dictSize(zs
->dict
)) == -1) goto werr
;
2601 while((de
= dictNext(di
)) != NULL
) {
2602 robj
*eleobj
= dictGetEntryKey(de
);
2603 double *score
= dictGetEntryVal(de
);
2605 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
2606 if (rdbSaveDoubleValue(fp
,*score
) == -1) goto werr
;
2608 dictReleaseIterator(di
);
2613 dictReleaseIterator(di
);
2616 if (rdbSaveType(fp
,REDIS_EOF
) == -1) goto werr
;
2618 /* Make sure data will not remain on the OS's output buffers */
2623 /* Use RENAME to make sure the DB file is changed atomically only
2624 * if the generate DB file is ok. */
2625 if (rename(tmpfile
,filename
) == -1) {
2626 redisLog(REDIS_WARNING
,"Error moving temp DB file on the final destination: %s", strerror(errno
));
2630 redisLog(REDIS_NOTICE
,"DB saved on disk");
2632 server
.lastsave
= time(NULL
);
2638 redisLog(REDIS_WARNING
,"Write error saving DB on disk: %s", strerror(errno
));
2639 if (di
) dictReleaseIterator(di
);
2643 static int rdbSaveBackground(char *filename
) {
2646 if (server
.bgsavechildpid
!= -1) return REDIS_ERR
;
2647 if ((childpid
= fork()) == 0) {
2650 if (rdbSave(filename
) == REDIS_OK
) {
2657 if (childpid
== -1) {
2658 redisLog(REDIS_WARNING
,"Can't save in background: fork: %s",
2662 redisLog(REDIS_NOTICE
,"Background saving started by pid %d",childpid
);
2663 server
.bgsavechildpid
= childpid
;
2666 return REDIS_OK
; /* unreached */
2669 static void rdbRemoveTempFile(pid_t childpid
) {
2672 snprintf(tmpfile
,256,"temp-%d.rdb", (int) childpid
);
2676 static int rdbLoadType(FILE *fp
) {
2678 if (fread(&type
,1,1,fp
) == 0) return -1;
2682 static time_t rdbLoadTime(FILE *fp
) {
2684 if (fread(&t32
,4,1,fp
) == 0) return -1;
2685 return (time_t) t32
;
2688 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2689 * of this file for a description of how this are stored on disk.
2691 * isencoded is set to 1 if the readed length is not actually a length but
2692 * an "encoding type", check the above comments for more info */
2693 static uint32_t rdbLoadLen(FILE *fp
, int rdbver
, int *isencoded
) {
2694 unsigned char buf
[2];
2697 if (isencoded
) *isencoded
= 0;
2699 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
2704 if (fread(buf
,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2705 type
= (buf
[0]&0xC0)>>6;
2706 if (type
== REDIS_RDB_6BITLEN
) {
2707 /* Read a 6 bit len */
2709 } else if (type
== REDIS_RDB_ENCVAL
) {
2710 /* Read a 6 bit len encoding type */
2711 if (isencoded
) *isencoded
= 1;
2713 } else if (type
== REDIS_RDB_14BITLEN
) {
2714 /* Read a 14 bit len */
2715 if (fread(buf
+1,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2716 return ((buf
[0]&0x3F)<<8)|buf
[1];
2718 /* Read a 32 bit len */
2719 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
2725 static robj
*rdbLoadIntegerObject(FILE *fp
, int enctype
) {
2726 unsigned char enc
[4];
2729 if (enctype
== REDIS_RDB_ENC_INT8
) {
2730 if (fread(enc
,1,1,fp
) == 0) return NULL
;
2731 val
= (signed char)enc
[0];
2732 } else if (enctype
== REDIS_RDB_ENC_INT16
) {
2734 if (fread(enc
,2,1,fp
) == 0) return NULL
;
2735 v
= enc
[0]|(enc
[1]<<8);
2737 } else if (enctype
== REDIS_RDB_ENC_INT32
) {
2739 if (fread(enc
,4,1,fp
) == 0) return NULL
;
2740 v
= enc
[0]|(enc
[1]<<8)|(enc
[2]<<16)|(enc
[3]<<24);
2743 val
= 0; /* anti-warning */
2746 return createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",val
));
2749 static robj
*rdbLoadLzfStringObject(FILE*fp
, int rdbver
) {
2750 unsigned int len
, clen
;
2751 unsigned char *c
= NULL
;
2754 if ((clen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2755 if ((len
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2756 if ((c
= zmalloc(clen
)) == NULL
) goto err
;
2757 if ((val
= sdsnewlen(NULL
,len
)) == NULL
) goto err
;
2758 if (fread(c
,clen
,1,fp
) == 0) goto err
;
2759 if (lzf_decompress(c
,clen
,val
,len
) == 0) goto err
;
2761 return createObject(REDIS_STRING
,val
);
2768 static robj
*rdbLoadStringObject(FILE*fp
, int rdbver
) {
2773 len
= rdbLoadLen(fp
,rdbver
,&isencoded
);
2776 case REDIS_RDB_ENC_INT8
:
2777 case REDIS_RDB_ENC_INT16
:
2778 case REDIS_RDB_ENC_INT32
:
2779 return tryObjectSharing(rdbLoadIntegerObject(fp
,len
));
2780 case REDIS_RDB_ENC_LZF
:
2781 return tryObjectSharing(rdbLoadLzfStringObject(fp
,rdbver
));
2787 if (len
== REDIS_RDB_LENERR
) return NULL
;
2788 val
= sdsnewlen(NULL
,len
);
2789 if (len
&& fread(val
,len
,1,fp
) == 0) {
2793 return tryObjectSharing(createObject(REDIS_STRING
,val
));
2796 /* For information about double serialization check rdbSaveDoubleValue() */
2797 static int rdbLoadDoubleValue(FILE *fp
, double *val
) {
2801 if (fread(&len
,1,1,fp
) == 0) return -1;
2803 case 255: *val
= R_NegInf
; return 0;
2804 case 254: *val
= R_PosInf
; return 0;
2805 case 253: *val
= R_Nan
; return 0;
2807 if (fread(buf
,len
,1,fp
) == 0) return -1;
2808 sscanf(buf
, "%lg", val
);
2813 static int rdbLoad(char *filename
) {
2815 robj
*keyobj
= NULL
;
2817 int type
, retval
, rdbver
;
2818 dict
*d
= server
.db
[0].dict
;
2819 redisDb
*db
= server
.db
+0;
2821 time_t expiretime
= -1, now
= time(NULL
);
2823 fp
= fopen(filename
,"r");
2824 if (!fp
) return REDIS_ERR
;
2825 if (fread(buf
,9,1,fp
) == 0) goto eoferr
;
2827 if (memcmp(buf
,"REDIS",5) != 0) {
2829 redisLog(REDIS_WARNING
,"Wrong signature trying to load DB from file");
2832 rdbver
= atoi(buf
+5);
2835 redisLog(REDIS_WARNING
,"Can't handle RDB format version %d",rdbver
);
2842 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
2843 if (type
== REDIS_EXPIRETIME
) {
2844 if ((expiretime
= rdbLoadTime(fp
)) == -1) goto eoferr
;
2845 /* We read the time so we need to read the object type again */
2846 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
2848 if (type
== REDIS_EOF
) break;
2849 /* Handle SELECT DB opcode as a special case */
2850 if (type
== REDIS_SELECTDB
) {
2851 if ((dbid
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2853 if (dbid
>= (unsigned)server
.dbnum
) {
2854 redisLog(REDIS_WARNING
,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server
.dbnum
);
2857 db
= server
.db
+dbid
;
2862 if ((keyobj
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2864 if (type
== REDIS_STRING
) {
2865 /* Read string value */
2866 if ((o
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2867 tryObjectEncoding(o
);
2868 } else if (type
== REDIS_LIST
|| type
== REDIS_SET
) {
2869 /* Read list/set value */
2872 if ((listlen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2874 o
= (type
== REDIS_LIST
) ? createListObject() : createSetObject();
2875 /* Load every single element of the list/set */
2879 if ((ele
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2880 tryObjectEncoding(ele
);
2881 if (type
== REDIS_LIST
) {
2882 listAddNodeTail((list
*)o
->ptr
,ele
);
2884 dictAdd((dict
*)o
->ptr
,ele
,NULL
);
2887 } else if (type
== REDIS_ZSET
) {
2888 /* Read list/set value */
2892 if ((zsetlen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2894 o
= createZsetObject();
2896 /* Load every single element of the list/set */
2899 double *score
= zmalloc(sizeof(double));
2901 if ((ele
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2902 tryObjectEncoding(ele
);
2903 if (rdbLoadDoubleValue(fp
,score
) == -1) goto eoferr
;
2904 dictAdd(zs
->dict
,ele
,score
);
2905 zslInsert(zs
->zsl
,*score
,ele
);
2906 incrRefCount(ele
); /* added to skiplist */
2911 /* Add the new object in the hash table */
2912 retval
= dictAdd(d
,keyobj
,o
);
2913 if (retval
== DICT_ERR
) {
2914 redisLog(REDIS_WARNING
,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj
->ptr
);
2917 /* Set the expire time if needed */
2918 if (expiretime
!= -1) {
2919 setExpire(db
,keyobj
,expiretime
);
2920 /* Delete this key if already expired */
2921 if (expiretime
< now
) deleteKey(db
,keyobj
);
2929 eoferr
: /* unexpected end of file is handled here with a fatal exit */
2930 if (keyobj
) decrRefCount(keyobj
);
2931 redisLog(REDIS_WARNING
,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
2933 return REDIS_ERR
; /* Just to avoid warning */
2936 /*================================== Commands =============================== */
2938 static void authCommand(redisClient
*c
) {
2939 if (!server
.requirepass
|| !strcmp(c
->argv
[1]->ptr
, server
.requirepass
)) {
2940 c
->authenticated
= 1;
2941 addReply(c
,shared
.ok
);
2943 c
->authenticated
= 0;
2944 addReplySds(c
,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
2948 static void pingCommand(redisClient
*c
) {
2949 addReply(c
,shared
.pong
);
2952 static void echoCommand(redisClient
*c
) {
2953 addReplyBulkLen(c
,c
->argv
[1]);
2954 addReply(c
,c
->argv
[1]);
2955 addReply(c
,shared
.crlf
);
2958 /*=================================== Strings =============================== */
2960 static void setGenericCommand(redisClient
*c
, int nx
) {
2963 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2964 if (retval
== DICT_ERR
) {
2966 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2967 incrRefCount(c
->argv
[2]);
2969 addReply(c
,shared
.czero
);
2973 incrRefCount(c
->argv
[1]);
2974 incrRefCount(c
->argv
[2]);
2977 removeExpire(c
->db
,c
->argv
[1]);
2978 addReply(c
, nx
? shared
.cone
: shared
.ok
);
2981 static void setCommand(redisClient
*c
) {
2982 setGenericCommand(c
,0);
2985 static void setnxCommand(redisClient
*c
) {
2986 setGenericCommand(c
,1);
2989 static void getCommand(redisClient
*c
) {
2990 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2993 addReply(c
,shared
.nullbulk
);
2995 if (o
->type
!= REDIS_STRING
) {
2996 addReply(c
,shared
.wrongtypeerr
);
2998 addReplyBulkLen(c
,o
);
3000 addReply(c
,shared
.crlf
);
3005 static void getsetCommand(redisClient
*c
) {
3007 if (dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]) == DICT_ERR
) {
3008 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
3010 incrRefCount(c
->argv
[1]);
3012 incrRefCount(c
->argv
[2]);
3014 removeExpire(c
->db
,c
->argv
[1]);
3017 static void mgetCommand(redisClient
*c
) {
3020 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",c
->argc
-1));
3021 for (j
= 1; j
< c
->argc
; j
++) {
3022 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[j
]);
3024 addReply(c
,shared
.nullbulk
);
3026 if (o
->type
!= REDIS_STRING
) {
3027 addReply(c
,shared
.nullbulk
);
3029 addReplyBulkLen(c
,o
);
3031 addReply(c
,shared
.crlf
);
3037 static void msetGenericCommand(redisClient
*c
, int nx
) {
3040 if ((c
->argc
% 2) == 0) {
3041 addReplySds(c
,sdsnew("-ERR wrong number of arguments\r\n"));
3044 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
3045 * set nothing at all if at least one already key exists. */
3047 for (j
= 1; j
< c
->argc
; j
+= 2) {
3048 if (dictFind(c
->db
->dict
,c
->argv
[j
]) != NULL
) {
3049 addReply(c
, shared
.czero
);
3055 for (j
= 1; j
< c
->argc
; j
+= 2) {
3058 retval
= dictAdd(c
->db
->dict
,c
->argv
[j
],c
->argv
[j
+1]);
3059 if (retval
== DICT_ERR
) {
3060 dictReplace(c
->db
->dict
,c
->argv
[j
],c
->argv
[j
+1]);
3061 incrRefCount(c
->argv
[j
+1]);
3063 incrRefCount(c
->argv
[j
]);
3064 incrRefCount(c
->argv
[j
+1]);
3066 removeExpire(c
->db
,c
->argv
[j
]);
3068 server
.dirty
+= (c
->argc
-1)/2;
3069 addReply(c
, nx
? shared
.cone
: shared
.ok
);
3072 static void msetCommand(redisClient
*c
) {
3073 msetGenericCommand(c
,0);
3076 static void msetnxCommand(redisClient
*c
) {
3077 msetGenericCommand(c
,1);
3080 static void incrDecrCommand(redisClient
*c
, long long incr
) {
3085 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3089 if (o
->type
!= REDIS_STRING
) {
3094 if (o
->encoding
== REDIS_ENCODING_RAW
)
3095 value
= strtoll(o
->ptr
, &eptr
, 10);
3096 else if (o
->encoding
== REDIS_ENCODING_INT
)
3097 value
= (long)o
->ptr
;
3104 o
= createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",value
));
3105 tryObjectEncoding(o
);
3106 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],o
);
3107 if (retval
== DICT_ERR
) {
3108 dictReplace(c
->db
->dict
,c
->argv
[1],o
);
3109 removeExpire(c
->db
,c
->argv
[1]);
3111 incrRefCount(c
->argv
[1]);
3114 addReply(c
,shared
.colon
);
3116 addReply(c
,shared
.crlf
);
3119 static void incrCommand(redisClient
*c
) {
3120 incrDecrCommand(c
,1);
3123 static void decrCommand(redisClient
*c
) {
3124 incrDecrCommand(c
,-1);
3127 static void incrbyCommand(redisClient
*c
) {
3128 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
3129 incrDecrCommand(c
,incr
);
3132 static void decrbyCommand(redisClient
*c
) {
3133 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
3134 incrDecrCommand(c
,-incr
);
3137 /* ========================= Type agnostic commands ========================= */
3139 static void delCommand(redisClient
*c
) {
3142 for (j
= 1; j
< c
->argc
; j
++) {
3143 if (deleteKey(c
->db
,c
->argv
[j
])) {
3150 addReply(c
,shared
.czero
);
3153 addReply(c
,shared
.cone
);
3156 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",deleted
));
3161 static void existsCommand(redisClient
*c
) {
3162 addReply(c
,lookupKeyRead(c
->db
,c
->argv
[1]) ? shared
.cone
: shared
.czero
);
3165 static void selectCommand(redisClient
*c
) {
3166 int id
= atoi(c
->argv
[1]->ptr
);
3168 if (selectDb(c
,id
) == REDIS_ERR
) {
3169 addReplySds(c
,sdsnew("-ERR invalid DB index\r\n"));
3171 addReply(c
,shared
.ok
);
3175 static void randomkeyCommand(redisClient
*c
) {
3179 de
= dictGetRandomKey(c
->db
->dict
);
3180 if (!de
|| expireIfNeeded(c
->db
,dictGetEntryKey(de
)) == 0) break;
3183 addReply(c
,shared
.plus
);
3184 addReply(c
,shared
.crlf
);
3186 addReply(c
,shared
.plus
);
3187 addReply(c
,dictGetEntryKey(de
));
3188 addReply(c
,shared
.crlf
);
3192 static void keysCommand(redisClient
*c
) {
3195 sds pattern
= c
->argv
[1]->ptr
;
3196 int plen
= sdslen(pattern
);
3197 int numkeys
= 0, keyslen
= 0;
3198 robj
*lenobj
= createObject(REDIS_STRING
,NULL
);
3200 di
= dictGetIterator(c
->db
->dict
);
3202 decrRefCount(lenobj
);
3203 while((de
= dictNext(di
)) != NULL
) {
3204 robj
*keyobj
= dictGetEntryKey(de
);
3206 sds key
= keyobj
->ptr
;
3207 if ((pattern
[0] == '*' && pattern
[1] == '\0') ||
3208 stringmatchlen(pattern
,plen
,key
,sdslen(key
),0)) {
3209 if (expireIfNeeded(c
->db
,keyobj
) == 0) {
3211 addReply(c
,shared
.space
);
3214 keyslen
+= sdslen(key
);
3218 dictReleaseIterator(di
);
3219 lenobj
->ptr
= sdscatprintf(sdsempty(),"$%lu\r\n",keyslen
+(numkeys
? (numkeys
-1) : 0));
3220 addReply(c
,shared
.crlf
);
3223 static void dbsizeCommand(redisClient
*c
) {
3225 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c
->db
->dict
)));
3228 static void lastsaveCommand(redisClient
*c
) {
3230 sdscatprintf(sdsempty(),":%lu\r\n",server
.lastsave
));
3233 static void typeCommand(redisClient
*c
) {
3237 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3242 case REDIS_STRING
: type
= "+string"; break;
3243 case REDIS_LIST
: type
= "+list"; break;
3244 case REDIS_SET
: type
= "+set"; break;
3245 case REDIS_ZSET
: type
= "+zset"; break;
3246 default: type
= "unknown"; break;
3249 addReplySds(c
,sdsnew(type
));
3250 addReply(c
,shared
.crlf
);
3253 static void saveCommand(redisClient
*c
) {
3254 if (server
.bgsavechildpid
!= -1) {
3255 addReplySds(c
,sdsnew("-ERR background save in progress\r\n"));
3258 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
3259 addReply(c
,shared
.ok
);
3261 addReply(c
,shared
.err
);
3265 static void bgsaveCommand(redisClient
*c
) {
3266 if (server
.bgsavechildpid
!= -1) {
3267 addReplySds(c
,sdsnew("-ERR background save already in progress\r\n"));
3270 if (rdbSaveBackground(server
.dbfilename
) == REDIS_OK
) {
3271 addReply(c
,shared
.ok
);
3273 addReply(c
,shared
.err
);
3277 static void shutdownCommand(redisClient
*c
) {
3278 redisLog(REDIS_WARNING
,"User requested shutdown, saving DB...");
3279 /* Kill the saving child if there is a background saving in progress.
3280 We want to avoid race conditions, for instance our saving child may
3281 overwrite the synchronous saving did by SHUTDOWN. */
3282 if (server
.bgsavechildpid
!= -1) {
3283 redisLog(REDIS_WARNING
,"There is a live saving child. Killing it!");
3284 kill(server
.bgsavechildpid
,SIGKILL
);
3285 rdbRemoveTempFile(server
.bgsavechildpid
);
3288 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
3289 if (server
.daemonize
)
3290 unlink(server
.pidfile
);
3291 redisLog(REDIS_WARNING
,"%zu bytes used at exit",zmalloc_used_memory());
3292 redisLog(REDIS_WARNING
,"Server exit now, bye bye...");
3295 /* Ooops.. error saving! The best we can do is to continue operating.
3296 * Note that if there was a background saving process, in the next
3297 * cron() Redis will be notified that the background saving aborted,
3298 * handling special stuff like slaves pending for synchronization... */
3299 redisLog(REDIS_WARNING
,"Error trying to save the DB, can't exit");
3300 addReplySds(c
,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3304 static void renameGenericCommand(redisClient
*c
, int nx
) {
3307 /* To use the same key as src and dst is probably an error */
3308 if (sdscmp(c
->argv
[1]->ptr
,c
->argv
[2]->ptr
) == 0) {
3309 addReply(c
,shared
.sameobjecterr
);
3313 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3315 addReply(c
,shared
.nokeyerr
);
3319 deleteIfVolatile(c
->db
,c
->argv
[2]);
3320 if (dictAdd(c
->db
->dict
,c
->argv
[2],o
) == DICT_ERR
) {
3323 addReply(c
,shared
.czero
);
3326 dictReplace(c
->db
->dict
,c
->argv
[2],o
);
3328 incrRefCount(c
->argv
[2]);
3330 deleteKey(c
->db
,c
->argv
[1]);
3332 addReply(c
,nx
? shared
.cone
: shared
.ok
);
3335 static void renameCommand(redisClient
*c
) {
3336 renameGenericCommand(c
,0);
3339 static void renamenxCommand(redisClient
*c
) {
3340 renameGenericCommand(c
,1);
3343 static void moveCommand(redisClient
*c
) {
3348 /* Obtain source and target DB pointers */
3351 if (selectDb(c
,atoi(c
->argv
[2]->ptr
)) == REDIS_ERR
) {
3352 addReply(c
,shared
.outofrangeerr
);
3356 selectDb(c
,srcid
); /* Back to the source DB */
3358 /* If the user is moving using as target the same
3359 * DB as the source DB it is probably an error. */
3361 addReply(c
,shared
.sameobjecterr
);
3365 /* Check if the element exists and get a reference */
3366 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3368 addReply(c
,shared
.czero
);
3372 /* Try to add the element to the target DB */
3373 deleteIfVolatile(dst
,c
->argv
[1]);
3374 if (dictAdd(dst
->dict
,c
->argv
[1],o
) == DICT_ERR
) {
3375 addReply(c
,shared
.czero
);
3378 incrRefCount(c
->argv
[1]);
3381 /* OK! key moved, free the entry in the source DB */
3382 deleteKey(src
,c
->argv
[1]);
3384 addReply(c
,shared
.cone
);
3387 /* =================================== Lists ================================ */
3388 static void pushGenericCommand(redisClient
*c
, int where
) {
3392 lobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3394 lobj
= createListObject();
3396 if (where
== REDIS_HEAD
) {
3397 listAddNodeHead(list
,c
->argv
[2]);
3399 listAddNodeTail(list
,c
->argv
[2]);
3401 dictAdd(c
->db
->dict
,c
->argv
[1],lobj
);
3402 incrRefCount(c
->argv
[1]);
3403 incrRefCount(c
->argv
[2]);
3405 if (lobj
->type
!= REDIS_LIST
) {
3406 addReply(c
,shared
.wrongtypeerr
);
3410 if (where
== REDIS_HEAD
) {
3411 listAddNodeHead(list
,c
->argv
[2]);
3413 listAddNodeTail(list
,c
->argv
[2]);
3415 incrRefCount(c
->argv
[2]);
3418 addReply(c
,shared
.ok
);
3421 static void lpushCommand(redisClient
*c
) {
3422 pushGenericCommand(c
,REDIS_HEAD
);
3425 static void rpushCommand(redisClient
*c
) {
3426 pushGenericCommand(c
,REDIS_TAIL
);
3429 static void llenCommand(redisClient
*c
) {
3433 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3435 addReply(c
,shared
.czero
);
3438 if (o
->type
!= REDIS_LIST
) {
3439 addReply(c
,shared
.wrongtypeerr
);
3442 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",listLength(l
)));
3447 static void lindexCommand(redisClient
*c
) {
3449 int index
= atoi(c
->argv
[2]->ptr
);
3451 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3453 addReply(c
,shared
.nullbulk
);
3455 if (o
->type
!= REDIS_LIST
) {
3456 addReply(c
,shared
.wrongtypeerr
);
3458 list
*list
= o
->ptr
;
3461 ln
= listIndex(list
, index
);
3463 addReply(c
,shared
.nullbulk
);
3465 robj
*ele
= listNodeValue(ln
);
3466 addReplyBulkLen(c
,ele
);
3468 addReply(c
,shared
.crlf
);
3474 static void lsetCommand(redisClient
*c
) {
3476 int index
= atoi(c
->argv
[2]->ptr
);
3478 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3480 addReply(c
,shared
.nokeyerr
);
3482 if (o
->type
!= REDIS_LIST
) {
3483 addReply(c
,shared
.wrongtypeerr
);
3485 list
*list
= o
->ptr
;
3488 ln
= listIndex(list
, index
);
3490 addReply(c
,shared
.outofrangeerr
);
3492 robj
*ele
= listNodeValue(ln
);
3495 listNodeValue(ln
) = c
->argv
[3];
3496 incrRefCount(c
->argv
[3]);
3497 addReply(c
,shared
.ok
);
3504 static void popGenericCommand(redisClient
*c
, int where
) {
3507 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3509 addReply(c
,shared
.nullbulk
);
3511 if (o
->type
!= REDIS_LIST
) {
3512 addReply(c
,shared
.wrongtypeerr
);
3514 list
*list
= o
->ptr
;
3517 if (where
== REDIS_HEAD
)
3518 ln
= listFirst(list
);
3520 ln
= listLast(list
);
3523 addReply(c
,shared
.nullbulk
);
3525 robj
*ele
= listNodeValue(ln
);
3526 addReplyBulkLen(c
,ele
);
3528 addReply(c
,shared
.crlf
);
3529 listDelNode(list
,ln
);
3536 static void lpopCommand(redisClient
*c
) {
3537 popGenericCommand(c
,REDIS_HEAD
);
3540 static void rpopCommand(redisClient
*c
) {
3541 popGenericCommand(c
,REDIS_TAIL
);
3544 static void lrangeCommand(redisClient
*c
) {
3546 int start
= atoi(c
->argv
[2]->ptr
);
3547 int end
= atoi(c
->argv
[3]->ptr
);
3549 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3551 addReply(c
,shared
.nullmultibulk
);
3553 if (o
->type
!= REDIS_LIST
) {
3554 addReply(c
,shared
.wrongtypeerr
);
3556 list
*list
= o
->ptr
;
3558 int llen
= listLength(list
);
3562 /* convert negative indexes */
3563 if (start
< 0) start
= llen
+start
;
3564 if (end
< 0) end
= llen
+end
;
3565 if (start
< 0) start
= 0;
3566 if (end
< 0) end
= 0;
3568 /* indexes sanity checks */
3569 if (start
> end
|| start
>= llen
) {
3570 /* Out of range start or start > end result in empty list */
3571 addReply(c
,shared
.emptymultibulk
);
3574 if (end
>= llen
) end
= llen
-1;
3575 rangelen
= (end
-start
)+1;
3577 /* Return the result in form of a multi-bulk reply */
3578 ln
= listIndex(list
, start
);
3579 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",rangelen
));
3580 for (j
= 0; j
< rangelen
; j
++) {
3581 ele
= listNodeValue(ln
);
3582 addReplyBulkLen(c
,ele
);
3584 addReply(c
,shared
.crlf
);
3591 static void ltrimCommand(redisClient
*c
) {
3593 int start
= atoi(c
->argv
[2]->ptr
);
3594 int end
= atoi(c
->argv
[3]->ptr
);
3596 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3598 addReply(c
,shared
.nokeyerr
);
3600 if (o
->type
!= REDIS_LIST
) {
3601 addReply(c
,shared
.wrongtypeerr
);
3603 list
*list
= o
->ptr
;
3605 int llen
= listLength(list
);
3606 int j
, ltrim
, rtrim
;
3608 /* convert negative indexes */
3609 if (start
< 0) start
= llen
+start
;
3610 if (end
< 0) end
= llen
+end
;
3611 if (start
< 0) start
= 0;
3612 if (end
< 0) end
= 0;
3614 /* indexes sanity checks */
3615 if (start
> end
|| start
>= llen
) {
3616 /* Out of range start or start > end result in empty list */
3620 if (end
>= llen
) end
= llen
-1;
3625 /* Remove list elements to perform the trim */
3626 for (j
= 0; j
< ltrim
; j
++) {
3627 ln
= listFirst(list
);
3628 listDelNode(list
,ln
);
3630 for (j
= 0; j
< rtrim
; j
++) {
3631 ln
= listLast(list
);
3632 listDelNode(list
,ln
);
3635 addReply(c
,shared
.ok
);
3640 static void lremCommand(redisClient
*c
) {
3643 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3645 addReply(c
,shared
.czero
);
3647 if (o
->type
!= REDIS_LIST
) {
3648 addReply(c
,shared
.wrongtypeerr
);
3650 list
*list
= o
->ptr
;
3651 listNode
*ln
, *next
;
3652 int toremove
= atoi(c
->argv
[2]->ptr
);
3657 toremove
= -toremove
;
3660 ln
= fromtail
? list
->tail
: list
->head
;
3662 robj
*ele
= listNodeValue(ln
);
3664 next
= fromtail
? ln
->prev
: ln
->next
;
3665 if (compareStringObjects(ele
,c
->argv
[3]) == 0) {
3666 listDelNode(list
,ln
);
3669 if (toremove
&& removed
== toremove
) break;
3673 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",removed
));
3678 /* This is the semantic of this command:
3679 * RPOPLPUSH srclist dstlist:
3680 * IF LLEN(srclist) > 0
3681 * element = RPOP srclist
3682 * LPUSH dstlist element
3689 * The idea is to be able to get an element from a list in a reliable way
3690 * since the element is not just returned but pushed against another list
3691 * as well. This command was originally proposed by Ezra Zygmuntowicz.
3693 static void rpoplpushcommand(redisClient
*c
) {
3696 sobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3698 addReply(c
,shared
.nullbulk
);
3700 if (sobj
->type
!= REDIS_LIST
) {
3701 addReply(c
,shared
.wrongtypeerr
);
3703 list
*srclist
= sobj
->ptr
;
3704 listNode
*ln
= listLast(srclist
);
3707 addReply(c
,shared
.nullbulk
);
3709 robj
*dobj
= lookupKeyWrite(c
->db
,c
->argv
[2]);
3710 robj
*ele
= listNodeValue(ln
);
3715 /* Create the list if the key does not exist */
3716 dobj
= createListObject();
3717 dictAdd(c
->db
->dict
,c
->argv
[2],dobj
);
3718 incrRefCount(c
->argv
[2]);
3719 } else if (dobj
->type
!= REDIS_LIST
) {
3720 addReply(c
,shared
.wrongtypeerr
);
3723 /* Add the element to the target list */
3724 dstlist
= dobj
->ptr
;
3725 listAddNodeHead(dstlist
,ele
);
3728 /* Send the element to the client as reply as well */
3729 addReplyBulkLen(c
,ele
);
3731 addReply(c
,shared
.crlf
);
3733 /* Finally remove the element from the source list */
3734 listDelNode(srclist
,ln
);
3742 /* ==================================== Sets ================================ */
3744 static void saddCommand(redisClient
*c
) {
3747 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3749 set
= createSetObject();
3750 dictAdd(c
->db
->dict
,c
->argv
[1],set
);
3751 incrRefCount(c
->argv
[1]);
3753 if (set
->type
!= REDIS_SET
) {
3754 addReply(c
,shared
.wrongtypeerr
);
3758 if (dictAdd(set
->ptr
,c
->argv
[2],NULL
) == DICT_OK
) {
3759 incrRefCount(c
->argv
[2]);
3761 addReply(c
,shared
.cone
);
3763 addReply(c
,shared
.czero
);
3767 static void sremCommand(redisClient
*c
) {
3770 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3772 addReply(c
,shared
.czero
);
3774 if (set
->type
!= REDIS_SET
) {
3775 addReply(c
,shared
.wrongtypeerr
);
3778 if (dictDelete(set
->ptr
,c
->argv
[2]) == DICT_OK
) {
3780 if (htNeedsResize(set
->ptr
)) dictResize(set
->ptr
);
3781 addReply(c
,shared
.cone
);
3783 addReply(c
,shared
.czero
);
3788 static void smoveCommand(redisClient
*c
) {
3789 robj
*srcset
, *dstset
;
3791 srcset
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3792 dstset
= lookupKeyWrite(c
->db
,c
->argv
[2]);
3794 /* If the source key does not exist return 0, if it's of the wrong type
3796 if (srcset
== NULL
|| srcset
->type
!= REDIS_SET
) {
3797 addReply(c
, srcset
? shared
.wrongtypeerr
: shared
.czero
);
3800 /* Error if the destination key is not a set as well */
3801 if (dstset
&& dstset
->type
!= REDIS_SET
) {
3802 addReply(c
,shared
.wrongtypeerr
);
3805 /* Remove the element from the source set */
3806 if (dictDelete(srcset
->ptr
,c
->argv
[3]) == DICT_ERR
) {
3807 /* Key not found in the src set! return zero */
3808 addReply(c
,shared
.czero
);
3812 /* Add the element to the destination set */
3814 dstset
= createSetObject();
3815 dictAdd(c
->db
->dict
,c
->argv
[2],dstset
);
3816 incrRefCount(c
->argv
[2]);
3818 if (dictAdd(dstset
->ptr
,c
->argv
[3],NULL
) == DICT_OK
)
3819 incrRefCount(c
->argv
[3]);
3820 addReply(c
,shared
.cone
);
3823 static void sismemberCommand(redisClient
*c
) {
3826 set
= lookupKeyRead(c
->db
,c
->argv
[1]);
3828 addReply(c
,shared
.czero
);
3830 if (set
->type
!= REDIS_SET
) {
3831 addReply(c
,shared
.wrongtypeerr
);
3834 if (dictFind(set
->ptr
,c
->argv
[2]))
3835 addReply(c
,shared
.cone
);
3837 addReply(c
,shared
.czero
);
3841 static void scardCommand(redisClient
*c
) {
3845 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3847 addReply(c
,shared
.czero
);
3850 if (o
->type
!= REDIS_SET
) {
3851 addReply(c
,shared
.wrongtypeerr
);
3854 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",
3860 static void spopCommand(redisClient
*c
) {
3864 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3866 addReply(c
,shared
.nullbulk
);
3868 if (set
->type
!= REDIS_SET
) {
3869 addReply(c
,shared
.wrongtypeerr
);
3872 de
= dictGetRandomKey(set
->ptr
);
3874 addReply(c
,shared
.nullbulk
);
3876 robj
*ele
= dictGetEntryKey(de
);
3878 addReplyBulkLen(c
,ele
);
3880 addReply(c
,shared
.crlf
);
3881 dictDelete(set
->ptr
,ele
);
3882 if (htNeedsResize(set
->ptr
)) dictResize(set
->ptr
);
3888 static void srandmemberCommand(redisClient
*c
) {
3892 set
= lookupKeyRead(c
->db
,c
->argv
[1]);
3894 addReply(c
,shared
.nullbulk
);
3896 if (set
->type
!= REDIS_SET
) {
3897 addReply(c
,shared
.wrongtypeerr
);
3900 de
= dictGetRandomKey(set
->ptr
);
3902 addReply(c
,shared
.nullbulk
);
3904 robj
*ele
= dictGetEntryKey(de
);
3906 addReplyBulkLen(c
,ele
);
3908 addReply(c
,shared
.crlf
);
3913 static int qsortCompareSetsByCardinality(const void *s1
, const void *s2
) {
3914 dict
**d1
= (void*) s1
, **d2
= (void*) s2
;
3916 return dictSize(*d1
)-dictSize(*d2
);
3919 static void sinterGenericCommand(redisClient
*c
, robj
**setskeys
, int setsnum
, robj
*dstkey
) {
3920 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
3923 robj
*lenobj
= NULL
, *dstset
= NULL
;
3924 int j
, cardinality
= 0;
3926 for (j
= 0; j
< setsnum
; j
++) {
3930 lookupKeyWrite(c
->db
,setskeys
[j
]) :
3931 lookupKeyRead(c
->db
,setskeys
[j
]);
3935 deleteKey(c
->db
,dstkey
);
3936 addReply(c
,shared
.ok
);
3938 addReply(c
,shared
.nullmultibulk
);
3942 if (setobj
->type
!= REDIS_SET
) {
3944 addReply(c
,shared
.wrongtypeerr
);
3947 dv
[j
] = setobj
->ptr
;
3949 /* Sort sets from the smallest to largest, this will improve our
3950 * algorithm's performace */
3951 qsort(dv
,setsnum
,sizeof(dict
*),qsortCompareSetsByCardinality
);
3953 /* The first thing we should output is the total number of elements...
3954 * since this is a multi-bulk write, but at this stage we don't know
3955 * the intersection set size, so we use a trick, append an empty object
3956 * to the output list and save the pointer to later modify it with the
3959 lenobj
= createObject(REDIS_STRING
,NULL
);
3961 decrRefCount(lenobj
);
3963 /* If we have a target key where to store the resulting set
3964 * create this key with an empty set inside */
3965 dstset
= createSetObject();
3968 /* Iterate all the elements of the first (smallest) set, and test
3969 * the element against all the other sets, if at least one set does
3970 * not include the element it is discarded */
3971 di
= dictGetIterator(dv
[0]);
3973 while((de
= dictNext(di
)) != NULL
) {
3976 for (j
= 1; j
< setsnum
; j
++)
3977 if (dictFind(dv
[j
],dictGetEntryKey(de
)) == NULL
) break;
3979 continue; /* at least one set does not contain the member */
3980 ele
= dictGetEntryKey(de
);
3982 addReplyBulkLen(c
,ele
);
3984 addReply(c
,shared
.crlf
);
3987 dictAdd(dstset
->ptr
,ele
,NULL
);
3991 dictReleaseIterator(di
);
3994 /* Store the resulting set into the target */
3995 deleteKey(c
->db
,dstkey
);
3996 dictAdd(c
->db
->dict
,dstkey
,dstset
);
3997 incrRefCount(dstkey
);
4001 lenobj
->ptr
= sdscatprintf(sdsempty(),"*%d\r\n",cardinality
);
4003 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",
4004 dictSize((dict
*)dstset
->ptr
)));
4010 static void sinterCommand(redisClient
*c
) {
4011 sinterGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
);
4014 static void sinterstoreCommand(redisClient
*c
) {
4015 sinterGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1]);
4018 #define REDIS_OP_UNION 0
4019 #define REDIS_OP_DIFF 1
4021 static void sunionDiffGenericCommand(redisClient
*c
, robj
**setskeys
, int setsnum
, robj
*dstkey
, int op
) {
4022 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
4025 robj
*dstset
= NULL
;
4026 int j
, cardinality
= 0;
4028 for (j
= 0; j
< setsnum
; j
++) {
4032 lookupKeyWrite(c
->db
,setskeys
[j
]) :
4033 lookupKeyRead(c
->db
,setskeys
[j
]);
4038 if (setobj
->type
!= REDIS_SET
) {
4040 addReply(c
,shared
.wrongtypeerr
);
4043 dv
[j
] = setobj
->ptr
;
4046 /* We need a temp set object to store our union. If the dstkey
4047 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
4048 * this set object will be the resulting object to set into the target key*/
4049 dstset
= createSetObject();
4051 /* Iterate all the elements of all the sets, add every element a single
4052 * time to the result set */
4053 for (j
= 0; j
< setsnum
; j
++) {
4054 if (op
== REDIS_OP_DIFF
&& j
== 0 && !dv
[j
]) break; /* result set is empty */
4055 if (!dv
[j
]) continue; /* non existing keys are like empty sets */
4057 di
= dictGetIterator(dv
[j
]);
4059 while((de
= dictNext(di
)) != NULL
) {
4062 /* dictAdd will not add the same element multiple times */
4063 ele
= dictGetEntryKey(de
);
4064 if (op
== REDIS_OP_UNION
|| j
== 0) {
4065 if (dictAdd(dstset
->ptr
,ele
,NULL
) == DICT_OK
) {
4069 } else if (op
== REDIS_OP_DIFF
) {
4070 if (dictDelete(dstset
->ptr
,ele
) == DICT_OK
) {
4075 dictReleaseIterator(di
);
4077 if (op
== REDIS_OP_DIFF
&& cardinality
== 0) break; /* result set is empty */
4080 /* Output the content of the resulting set, if not in STORE mode */
4082 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",cardinality
));
4083 di
= dictGetIterator(dstset
->ptr
);
4084 while((de
= dictNext(di
)) != NULL
) {
4087 ele
= dictGetEntryKey(de
);
4088 addReplyBulkLen(c
,ele
);
4090 addReply(c
,shared
.crlf
);
4092 dictReleaseIterator(di
);
4094 /* If we have a target key where to store the resulting set
4095 * create this key with the result set inside */
4096 deleteKey(c
->db
,dstkey
);
4097 dictAdd(c
->db
->dict
,dstkey
,dstset
);
4098 incrRefCount(dstkey
);
4103 decrRefCount(dstset
);
4105 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",
4106 dictSize((dict
*)dstset
->ptr
)));
4112 static void sunionCommand(redisClient
*c
) {
4113 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_UNION
);
4116 static void sunionstoreCommand(redisClient
*c
) {
4117 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_UNION
);
4120 static void sdiffCommand(redisClient
*c
) {
4121 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_DIFF
);
4124 static void sdiffstoreCommand(redisClient
*c
) {
4125 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_DIFF
);
4128 /* ==================================== ZSets =============================== */
4130 /* ZSETs are ordered sets using two data structures to hold the same elements
4131 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
4134 * The elements are added to an hash table mapping Redis objects to scores.
4135 * At the same time the elements are added to a skip list mapping scores
4136 * to Redis objects (so objects are sorted by scores in this "view"). */
4138 /* This skiplist implementation is almost a C translation of the original
4139 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
4140 * Alternative to Balanced Trees", modified in three ways:
4141 * a) this implementation allows for repeated values.
4142 * b) the comparison is not just by key (our 'score') but by satellite data.
4143 * c) there is a back pointer, so it's a doubly linked list with the back
4144 * pointers being only at "level 1". This allows to traverse the list
4145 * from tail to head, useful for ZREVRANGE. */
4147 static zskiplistNode
*zslCreateNode(int level
, double score
, robj
*obj
) {
4148 zskiplistNode
*zn
= zmalloc(sizeof(*zn
));
4150 zn
->forward
= zmalloc(sizeof(zskiplistNode
*) * level
);
4156 static zskiplist
*zslCreate(void) {
4160 zsl
= zmalloc(sizeof(*zsl
));
4163 zsl
->header
= zslCreateNode(ZSKIPLIST_MAXLEVEL
,0,NULL
);
4164 for (j
= 0; j
< ZSKIPLIST_MAXLEVEL
; j
++)
4165 zsl
->header
->forward
[j
] = NULL
;
4166 zsl
->header
->backward
= NULL
;
4171 static void zslFreeNode(zskiplistNode
*node
) {
4172 decrRefCount(node
->obj
);
4173 zfree(node
->forward
);
4177 static void zslFree(zskiplist
*zsl
) {
4178 zskiplistNode
*node
= zsl
->header
->forward
[0], *next
;
4180 zfree(zsl
->header
->forward
);
4183 next
= node
->forward
[0];
4190 static int zslRandomLevel(void) {
4192 while ((random()&0xFFFF) < (ZSKIPLIST_P
* 0xFFFF))
4197 static void zslInsert(zskiplist
*zsl
, double score
, robj
*obj
) {
4198 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
4202 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4203 while (x
->forward
[i
] &&
4204 (x
->forward
[i
]->score
< score
||
4205 (x
->forward
[i
]->score
== score
&&
4206 compareStringObjects(x
->forward
[i
]->obj
,obj
) < 0)))
4210 /* we assume the key is not already inside, since we allow duplicated
4211 * scores, and the re-insertion of score and redis object should never
4212 * happpen since the caller of zslInsert() should test in the hash table
4213 * if the element is already inside or not. */
4214 level
= zslRandomLevel();
4215 if (level
> zsl
->level
) {
4216 for (i
= zsl
->level
; i
< level
; i
++)
4217 update
[i
] = zsl
->header
;
4220 x
= zslCreateNode(level
,score
,obj
);
4221 for (i
= 0; i
< level
; i
++) {
4222 x
->forward
[i
] = update
[i
]->forward
[i
];
4223 update
[i
]->forward
[i
] = x
;
4225 x
->backward
= (update
[0] == zsl
->header
) ? NULL
: update
[0];
4227 x
->forward
[0]->backward
= x
;
4233 /* Delete an element with matching score/object from the skiplist. */
4234 static int zslDelete(zskiplist
*zsl
, double score
, robj
*obj
) {
4235 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
4239 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4240 while (x
->forward
[i
] &&
4241 (x
->forward
[i
]->score
< score
||
4242 (x
->forward
[i
]->score
== score
&&
4243 compareStringObjects(x
->forward
[i
]->obj
,obj
) < 0)))
4247 /* We may have multiple elements with the same score, what we need
4248 * is to find the element with both the right score and object. */
4250 if (x
&& score
== x
->score
&& compareStringObjects(x
->obj
,obj
) == 0) {
4251 for (i
= 0; i
< zsl
->level
; i
++) {
4252 if (update
[i
]->forward
[i
] != x
) break;
4253 update
[i
]->forward
[i
] = x
->forward
[i
];
4255 if (x
->forward
[0]) {
4256 x
->forward
[0]->backward
= (x
->backward
== zsl
->header
) ?
4259 zsl
->tail
= x
->backward
;
4262 while(zsl
->level
> 1 && zsl
->header
->forward
[zsl
->level
-1] == NULL
)
4267 return 0; /* not found */
4269 return 0; /* not found */
4272 /* Delete all the elements with score between min and max from the skiplist.
4273 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
4274 * Note that this function takes the reference to the hash table view of the
4275 * sorted set, in order to remove the elements from the hash table too. */
4276 static unsigned long zslDeleteRange(zskiplist
*zsl
, double min
, double max
, dict
*dict
) {
4277 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
4278 unsigned long removed
= 0;
4282 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4283 while (x
->forward
[i
] && x
->forward
[i
]->score
< min
)
4287 /* We may have multiple elements with the same score, what we need
4288 * is to find the element with both the right score and object. */
4290 while (x
&& x
->score
<= max
) {
4291 zskiplistNode
*next
;
4293 for (i
= 0; i
< zsl
->level
; i
++) {
4294 if (update
[i
]->forward
[i
] != x
) break;
4295 update
[i
]->forward
[i
] = x
->forward
[i
];
4297 if (x
->forward
[0]) {
4298 x
->forward
[0]->backward
= (x
->backward
== zsl
->header
) ?
4301 zsl
->tail
= x
->backward
;
4303 next
= x
->forward
[0];
4304 dictDelete(dict
,x
->obj
);
4306 while(zsl
->level
> 1 && zsl
->header
->forward
[zsl
->level
-1] == NULL
)
4312 return removed
; /* not found */
4315 /* Find the first node having a score equal or greater than the specified one.
4316 * Returns NULL if there is no match. */
4317 static zskiplistNode
*zslFirstWithScore(zskiplist
*zsl
, double score
) {
4322 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4323 while (x
->forward
[i
] && x
->forward
[i
]->score
< score
)
4326 /* We may have multiple elements with the same score, what we need
4327 * is to find the element with both the right score and object. */
4328 return x
->forward
[0];
4331 /* The actual Z-commands implementations */
4333 /* This generic command implements both ZADD and ZINCRBY.
4334 * scoreval is the score if the operation is a ZADD (doincrement == 0) or
4335 * the increment if the operation is a ZINCRBY (doincrement == 1). */
4336 static void zaddGenericCommand(redisClient
*c
, robj
*key
, robj
*ele
, double scoreval
, int doincrement
) {
4341 zsetobj
= lookupKeyWrite(c
->db
,key
);
4342 if (zsetobj
== NULL
) {
4343 zsetobj
= createZsetObject();
4344 dictAdd(c
->db
->dict
,key
,zsetobj
);
4347 if (zsetobj
->type
!= REDIS_ZSET
) {
4348 addReply(c
,shared
.wrongtypeerr
);
4354 /* Ok now since we implement both ZADD and ZINCRBY here the code
4355 * needs to handle the two different conditions. It's all about setting
4356 * '*score', that is, the new score to set, to the right value. */
4357 score
= zmalloc(sizeof(double));
4361 /* Read the old score. If the element was not present starts from 0 */
4362 de
= dictFind(zs
->dict
,ele
);
4364 double *oldscore
= dictGetEntryVal(de
);
4365 *score
= *oldscore
+ scoreval
;
4373 /* What follows is a simple remove and re-insert operation that is common
4374 * to both ZADD and ZINCRBY... */
4375 if (dictAdd(zs
->dict
,ele
,score
) == DICT_OK
) {
4376 /* case 1: New element */
4377 incrRefCount(ele
); /* added to hash */
4378 zslInsert(zs
->zsl
,*score
,ele
);
4379 incrRefCount(ele
); /* added to skiplist */
4382 addReplyDouble(c
,*score
);
4384 addReply(c
,shared
.cone
);
4389 /* case 2: Score update operation */
4390 de
= dictFind(zs
->dict
,ele
);
4392 oldscore
= dictGetEntryVal(de
);
4393 if (*score
!= *oldscore
) {
4396 /* Remove and insert the element in the skip list with new score */
4397 deleted
= zslDelete(zs
->zsl
,*oldscore
,ele
);
4398 assert(deleted
!= 0);
4399 zslInsert(zs
->zsl
,*score
,ele
);
4401 /* Update the score in the hash table */
4402 dictReplace(zs
->dict
,ele
,score
);
4408 addReplyDouble(c
,*score
);
4410 addReply(c
,shared
.czero
);
4414 static void zaddCommand(redisClient
*c
) {
4417 scoreval
= strtod(c
->argv
[2]->ptr
,NULL
);
4418 zaddGenericCommand(c
,c
->argv
[1],c
->argv
[3],scoreval
,0);
4421 static void zincrbyCommand(redisClient
*c
) {
4424 scoreval
= strtod(c
->argv
[2]->ptr
,NULL
);
4425 zaddGenericCommand(c
,c
->argv
[1],c
->argv
[3],scoreval
,1);
4428 static void zremCommand(redisClient
*c
) {
4432 zsetobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4433 if (zsetobj
== NULL
) {
4434 addReply(c
,shared
.czero
);
4440 if (zsetobj
->type
!= REDIS_ZSET
) {
4441 addReply(c
,shared
.wrongtypeerr
);
4445 de
= dictFind(zs
->dict
,c
->argv
[2]);
4447 addReply(c
,shared
.czero
);
4450 /* Delete from the skiplist */
4451 oldscore
= dictGetEntryVal(de
);
4452 deleted
= zslDelete(zs
->zsl
,*oldscore
,c
->argv
[2]);
4453 assert(deleted
!= 0);
4455 /* Delete from the hash table */
4456 dictDelete(zs
->dict
,c
->argv
[2]);
4457 if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
);
4459 addReply(c
,shared
.cone
);
4463 static void zremrangebyscoreCommand(redisClient
*c
) {
4464 double min
= strtod(c
->argv
[2]->ptr
,NULL
);
4465 double max
= strtod(c
->argv
[3]->ptr
,NULL
);
4469 zsetobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4470 if (zsetobj
== NULL
) {
4471 addReply(c
,shared
.czero
);
4475 if (zsetobj
->type
!= REDIS_ZSET
) {
4476 addReply(c
,shared
.wrongtypeerr
);
4480 deleted
= zslDeleteRange(zs
->zsl
,min
,max
,zs
->dict
);
4481 if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
);
4482 server
.dirty
+= deleted
;
4483 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",deleted
));
4487 static void zrangeGenericCommand(redisClient
*c
, int reverse
) {
4489 int start
= atoi(c
->argv
[2]->ptr
);
4490 int end
= atoi(c
->argv
[3]->ptr
);
4492 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4494 addReply(c
,shared
.nullmultibulk
);
4496 if (o
->type
!= REDIS_ZSET
) {
4497 addReply(c
,shared
.wrongtypeerr
);
4499 zset
*zsetobj
= o
->ptr
;
4500 zskiplist
*zsl
= zsetobj
->zsl
;
4503 int llen
= zsl
->length
;
4507 /* convert negative indexes */
4508 if (start
< 0) start
= llen
+start
;
4509 if (end
< 0) end
= llen
+end
;
4510 if (start
< 0) start
= 0;
4511 if (end
< 0) end
= 0;
4513 /* indexes sanity checks */
4514 if (start
> end
|| start
>= llen
) {
4515 /* Out of range start or start > end result in empty list */
4516 addReply(c
,shared
.emptymultibulk
);
4519 if (end
>= llen
) end
= llen
-1;
4520 rangelen
= (end
-start
)+1;
4522 /* Return the result in form of a multi-bulk reply */
4528 ln
= zsl
->header
->forward
[0];
4530 ln
= ln
->forward
[0];
4533 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",rangelen
));
4534 for (j
= 0; j
< rangelen
; j
++) {
4536 addReplyBulkLen(c
,ele
);
4538 addReply(c
,shared
.crlf
);
4539 ln
= reverse
? ln
->backward
: ln
->forward
[0];
4545 static void zrangeCommand(redisClient
*c
) {
4546 zrangeGenericCommand(c
,0);
4549 static void zrevrangeCommand(redisClient
*c
) {
4550 zrangeGenericCommand(c
,1);
4553 static void zrangebyscoreCommand(redisClient
*c
) {
4555 double min
= strtod(c
->argv
[2]->ptr
,NULL
);
4556 double max
= strtod(c
->argv
[3]->ptr
,NULL
);
4558 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4560 addReply(c
,shared
.nullmultibulk
);
4562 if (o
->type
!= REDIS_ZSET
) {
4563 addReply(c
,shared
.wrongtypeerr
);
4565 zset
*zsetobj
= o
->ptr
;
4566 zskiplist
*zsl
= zsetobj
->zsl
;
4569 unsigned int rangelen
= 0;
4571 /* Get the first node with the score >= min */
4572 ln
= zslFirstWithScore(zsl
,min
);
4574 /* No element matching the speciifed interval */
4575 addReply(c
,shared
.emptymultibulk
);
4579 /* We don't know in advance how many matching elements there
4580 * are in the list, so we push this object that will represent
4581 * the multi-bulk length in the output buffer, and will "fix"
4583 lenobj
= createObject(REDIS_STRING
,NULL
);
4586 while(ln
&& ln
->score
<= max
) {
4588 addReplyBulkLen(c
,ele
);
4590 addReply(c
,shared
.crlf
);
4591 ln
= ln
->forward
[0];
4594 lenobj
->ptr
= sdscatprintf(sdsempty(),"*%d\r\n",rangelen
);
4599 static void zcardCommand(redisClient
*c
) {
4603 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4605 addReply(c
,shared
.czero
);
4608 if (o
->type
!= REDIS_ZSET
) {
4609 addReply(c
,shared
.wrongtypeerr
);
4612 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",zs
->zsl
->length
));
4617 static void zscoreCommand(redisClient
*c
) {
4621 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4623 addReply(c
,shared
.nullbulk
);
4626 if (o
->type
!= REDIS_ZSET
) {
4627 addReply(c
,shared
.wrongtypeerr
);
4632 de
= dictFind(zs
->dict
,c
->argv
[2]);
4634 addReply(c
,shared
.nullbulk
);
4636 double *score
= dictGetEntryVal(de
);
4638 addReplyDouble(c
,*score
);
4644 /* ========================= Non type-specific commands ==================== */
4646 static void flushdbCommand(redisClient
*c
) {
4647 server
.dirty
+= dictSize(c
->db
->dict
);
4648 dictEmpty(c
->db
->dict
);
4649 dictEmpty(c
->db
->expires
);
4650 addReply(c
,shared
.ok
);
4653 static void flushallCommand(redisClient
*c
) {
4654 server
.dirty
+= emptyDb();
4655 addReply(c
,shared
.ok
);
4656 rdbSave(server
.dbfilename
);
4660 static redisSortOperation
*createSortOperation(int type
, robj
*pattern
) {
4661 redisSortOperation
*so
= zmalloc(sizeof(*so
));
4663 so
->pattern
= pattern
;
4667 /* Return the value associated to the key with a name obtained
4668 * substituting the first occurence of '*' in 'pattern' with 'subst' */
4669 static robj
*lookupKeyByPattern(redisDb
*db
, robj
*pattern
, robj
*subst
) {
4673 int prefixlen
, sublen
, postfixlen
;
4674 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
4678 char buf
[REDIS_SORTKEY_MAX
+1];
4681 /* If the pattern is "#" return the substitution object itself in order
4682 * to implement the "SORT ... GET #" feature. */
4683 spat
= pattern
->ptr
;
4684 if (spat
[0] == '#' && spat
[1] == '\0') {
4688 /* The substitution object may be specially encoded. If so we create
4689 * a decoded object on the fly. Otherwise getDecodedObject will just
4690 * increment the ref count, that we'll decrement later. */
4691 subst
= getDecodedObject(subst
);
4694 if (sdslen(spat
)+sdslen(ssub
)-1 > REDIS_SORTKEY_MAX
) return NULL
;
4695 p
= strchr(spat
,'*');
4697 decrRefCount(subst
);
4702 sublen
= sdslen(ssub
);
4703 postfixlen
= sdslen(spat
)-(prefixlen
+1);
4704 memcpy(keyname
.buf
,spat
,prefixlen
);
4705 memcpy(keyname
.buf
+prefixlen
,ssub
,sublen
);
4706 memcpy(keyname
.buf
+prefixlen
+sublen
,p
+1,postfixlen
);
4707 keyname
.buf
[prefixlen
+sublen
+postfixlen
] = '\0';
4708 keyname
.len
= prefixlen
+sublen
+postfixlen
;
4710 keyobj
.refcount
= 1;
4711 keyobj
.type
= REDIS_STRING
;
4712 keyobj
.ptr
= ((char*)&keyname
)+(sizeof(long)*2);
4714 decrRefCount(subst
);
4716 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
4717 return lookupKeyRead(db
,&keyobj
);
4720 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
4721 * the additional parameter is not standard but a BSD-specific we have to
4722 * pass sorting parameters via the global 'server' structure */
4723 static int sortCompare(const void *s1
, const void *s2
) {
4724 const redisSortObject
*so1
= s1
, *so2
= s2
;
4727 if (!server
.sort_alpha
) {
4728 /* Numeric sorting. Here it's trivial as we precomputed scores */
4729 if (so1
->u
.score
> so2
->u
.score
) {
4731 } else if (so1
->u
.score
< so2
->u
.score
) {
4737 /* Alphanumeric sorting */
4738 if (server
.sort_bypattern
) {
4739 if (!so1
->u
.cmpobj
|| !so2
->u
.cmpobj
) {
4740 /* At least one compare object is NULL */
4741 if (so1
->u
.cmpobj
== so2
->u
.cmpobj
)
4743 else if (so1
->u
.cmpobj
== NULL
)
4748 /* We have both the objects, use strcoll */
4749 cmp
= strcoll(so1
->u
.cmpobj
->ptr
,so2
->u
.cmpobj
->ptr
);
4752 /* Compare elements directly */
4755 dec1
= getDecodedObject(so1
->obj
);
4756 dec2
= getDecodedObject(so2
->obj
);
4757 cmp
= strcoll(dec1
->ptr
,dec2
->ptr
);
4762 return server
.sort_desc
? -cmp
: cmp
;
4765 /* The SORT command is the most complex command in Redis. Warning: this code
4766 * is optimized for speed and a bit less for readability */
4767 static void sortCommand(redisClient
*c
) {
4770 int desc
= 0, alpha
= 0;
4771 int limit_start
= 0, limit_count
= -1, start
, end
;
4772 int j
, dontsort
= 0, vectorlen
;
4773 int getop
= 0; /* GET operation counter */
4774 robj
*sortval
, *sortby
= NULL
, *storekey
= NULL
;
4775 redisSortObject
*vector
; /* Resulting vector to sort */
4777 /* Lookup the key to sort. It must be of the right types */
4778 sortval
= lookupKeyRead(c
->db
,c
->argv
[1]);
4779 if (sortval
== NULL
) {
4780 addReply(c
,shared
.nokeyerr
);
4783 if (sortval
->type
!= REDIS_SET
&& sortval
->type
!= REDIS_LIST
) {
4784 addReply(c
,shared
.wrongtypeerr
);
4788 /* Create a list of operations to perform for every sorted element.
4789 * Operations can be GET/DEL/INCR/DECR */
4790 operations
= listCreate();
4791 listSetFreeMethod(operations
,zfree
);
4794 /* Now we need to protect sortval incrementing its count, in the future
4795 * SORT may have options able to overwrite/delete keys during the sorting
4796 * and the sorted key itself may get destroied */
4797 incrRefCount(sortval
);
4799 /* The SORT command has an SQL-alike syntax, parse it */
4800 while(j
< c
->argc
) {
4801 int leftargs
= c
->argc
-j
-1;
4802 if (!strcasecmp(c
->argv
[j
]->ptr
,"asc")) {
4804 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"desc")) {
4806 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"alpha")) {
4808 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"limit") && leftargs
>= 2) {
4809 limit_start
= atoi(c
->argv
[j
+1]->ptr
);
4810 limit_count
= atoi(c
->argv
[j
+2]->ptr
);
4812 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"store") && leftargs
>= 1) {
4813 storekey
= c
->argv
[j
+1];
4815 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"by") && leftargs
>= 1) {
4816 sortby
= c
->argv
[j
+1];
4817 /* If the BY pattern does not contain '*', i.e. it is constant,
4818 * we don't need to sort nor to lookup the weight keys. */
4819 if (strchr(c
->argv
[j
+1]->ptr
,'*') == NULL
) dontsort
= 1;
4821 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"get") && leftargs
>= 1) {
4822 listAddNodeTail(operations
,createSortOperation(
4823 REDIS_SORT_GET
,c
->argv
[j
+1]));
4827 decrRefCount(sortval
);
4828 listRelease(operations
);
4829 addReply(c
,shared
.syntaxerr
);
4835 /* Load the sorting vector with all the objects to sort */
4836 vectorlen
= (sortval
->type
== REDIS_LIST
) ?
4837 listLength((list
*)sortval
->ptr
) :
4838 dictSize((dict
*)sortval
->ptr
);
4839 vector
= zmalloc(sizeof(redisSortObject
)*vectorlen
);
4841 if (sortval
->type
== REDIS_LIST
) {
4842 list
*list
= sortval
->ptr
;
4846 while((ln
= listYield(list
))) {
4847 robj
*ele
= ln
->value
;
4848 vector
[j
].obj
= ele
;
4849 vector
[j
].u
.score
= 0;
4850 vector
[j
].u
.cmpobj
= NULL
;
4854 dict
*set
= sortval
->ptr
;
4858 di
= dictGetIterator(set
);
4859 while((setele
= dictNext(di
)) != NULL
) {
4860 vector
[j
].obj
= dictGetEntryKey(setele
);
4861 vector
[j
].u
.score
= 0;
4862 vector
[j
].u
.cmpobj
= NULL
;
4865 dictReleaseIterator(di
);
4867 assert(j
== vectorlen
);
4869 /* Now it's time to load the right scores in the sorting vector */
4870 if (dontsort
== 0) {
4871 for (j
= 0; j
< vectorlen
; j
++) {
4875 byval
= lookupKeyByPattern(c
->db
,sortby
,vector
[j
].obj
);
4876 if (!byval
|| byval
->type
!= REDIS_STRING
) continue;
4878 vector
[j
].u
.cmpobj
= getDecodedObject(byval
);
4880 if (byval
->encoding
== REDIS_ENCODING_RAW
) {
4881 vector
[j
].u
.score
= strtod(byval
->ptr
,NULL
);
4883 /* Don't need to decode the object if it's
4884 * integer-encoded (the only encoding supported) so
4885 * far. We can just cast it */
4886 if (byval
->encoding
== REDIS_ENCODING_INT
) {
4887 vector
[j
].u
.score
= (long)byval
->ptr
;
4894 if (vector
[j
].obj
->encoding
== REDIS_ENCODING_RAW
)
4895 vector
[j
].u
.score
= strtod(vector
[j
].obj
->ptr
,NULL
);
4897 if (vector
[j
].obj
->encoding
== REDIS_ENCODING_INT
)
4898 vector
[j
].u
.score
= (long) vector
[j
].obj
->ptr
;
4907 /* We are ready to sort the vector... perform a bit of sanity check
4908 * on the LIMIT option too. We'll use a partial version of quicksort. */
4909 start
= (limit_start
< 0) ? 0 : limit_start
;
4910 end
= (limit_count
< 0) ? vectorlen
-1 : start
+limit_count
-1;
4911 if (start
>= vectorlen
) {
4912 start
= vectorlen
-1;
4915 if (end
>= vectorlen
) end
= vectorlen
-1;
4917 if (dontsort
== 0) {
4918 server
.sort_desc
= desc
;
4919 server
.sort_alpha
= alpha
;
4920 server
.sort_bypattern
= sortby
? 1 : 0;
4921 if (sortby
&& (start
!= 0 || end
!= vectorlen
-1))
4922 pqsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
, start
,end
);
4924 qsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
);
4927 /* Send command output to the output buffer, performing the specified
4928 * GET/DEL/INCR/DECR operations if any. */
4929 outputlen
= getop
? getop
*(end
-start
+1) : end
-start
+1;
4930 if (storekey
== NULL
) {
4931 /* STORE option not specified, sent the sorting result to client */
4932 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",outputlen
));
4933 for (j
= start
; j
<= end
; j
++) {
4936 addReplyBulkLen(c
,vector
[j
].obj
);
4937 addReply(c
,vector
[j
].obj
);
4938 addReply(c
,shared
.crlf
);
4940 listRewind(operations
);
4941 while((ln
= listYield(operations
))) {
4942 redisSortOperation
*sop
= ln
->value
;
4943 robj
*val
= lookupKeyByPattern(c
->db
,sop
->pattern
,
4946 if (sop
->type
== REDIS_SORT_GET
) {
4947 if (!val
|| val
->type
!= REDIS_STRING
) {
4948 addReply(c
,shared
.nullbulk
);
4950 addReplyBulkLen(c
,val
);
4952 addReply(c
,shared
.crlf
);
4955 assert(sop
->type
== REDIS_SORT_GET
); /* always fails */
4960 robj
*listObject
= createListObject();
4961 list
*listPtr
= (list
*) listObject
->ptr
;
4963 /* STORE option specified, set the sorting result as a List object */
4964 for (j
= start
; j
<= end
; j
++) {
4967 listAddNodeTail(listPtr
,vector
[j
].obj
);
4968 incrRefCount(vector
[j
].obj
);
4970 listRewind(operations
);
4971 while((ln
= listYield(operations
))) {
4972 redisSortOperation
*sop
= ln
->value
;
4973 robj
*val
= lookupKeyByPattern(c
->db
,sop
->pattern
,
4976 if (sop
->type
== REDIS_SORT_GET
) {
4977 if (!val
|| val
->type
!= REDIS_STRING
) {
4978 listAddNodeTail(listPtr
,createStringObject("",0));
4980 listAddNodeTail(listPtr
,val
);
4984 assert(sop
->type
== REDIS_SORT_GET
); /* always fails */
4988 if (dictReplace(c
->db
->dict
,storekey
,listObject
)) {
4989 incrRefCount(storekey
);
4991 /* Note: we add 1 because the DB is dirty anyway since even if the
4992 * SORT result is empty a new key is set and maybe the old content
4994 server
.dirty
+= 1+outputlen
;
4995 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",outputlen
));
4999 decrRefCount(sortval
);
5000 listRelease(operations
);
5001 for (j
= 0; j
< vectorlen
; j
++) {
5002 if (sortby
&& alpha
&& vector
[j
].u
.cmpobj
)
5003 decrRefCount(vector
[j
].u
.cmpobj
);
5008 static void infoCommand(redisClient
*c
) {
5010 time_t uptime
= time(NULL
)-server
.stat_starttime
;
5013 info
= sdscatprintf(sdsempty(),
5014 "redis_version:%s\r\n"
5016 "uptime_in_seconds:%d\r\n"
5017 "uptime_in_days:%d\r\n"
5018 "connected_clients:%d\r\n"
5019 "connected_slaves:%d\r\n"
5020 "used_memory:%zu\r\n"
5021 "changes_since_last_save:%lld\r\n"
5022 "bgsave_in_progress:%d\r\n"
5023 "last_save_time:%d\r\n"
5024 "total_connections_received:%lld\r\n"
5025 "total_commands_processed:%lld\r\n"
5028 (sizeof(long) == 8) ? "64" : "32",
5031 listLength(server
.clients
)-listLength(server
.slaves
),
5032 listLength(server
.slaves
),
5035 server
.bgsavechildpid
!= -1,
5037 server
.stat_numconnections
,
5038 server
.stat_numcommands
,
5039 server
.masterhost
== NULL
? "master" : "slave"
5041 if (server
.masterhost
) {
5042 info
= sdscatprintf(info
,
5043 "master_host:%s\r\n"
5044 "master_port:%d\r\n"
5045 "master_link_status:%s\r\n"
5046 "master_last_io_seconds_ago:%d\r\n"
5049 (server
.replstate
== REDIS_REPL_CONNECTED
) ?
5051 server
.master
? ((int)(time(NULL
)-server
.master
->lastinteraction
)) : -1
5054 for (j
= 0; j
< server
.dbnum
; j
++) {
5055 long long keys
, vkeys
;
5057 keys
= dictSize(server
.db
[j
].dict
);
5058 vkeys
= dictSize(server
.db
[j
].expires
);
5059 if (keys
|| vkeys
) {
5060 info
= sdscatprintf(info
, "db%d:keys=%lld,expires=%lld\r\n",
5064 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info
)));
5065 addReplySds(c
,info
);
5066 addReply(c
,shared
.crlf
);
5069 static void monitorCommand(redisClient
*c
) {
5070 /* ignore MONITOR if aleady slave or in monitor mode */
5071 if (c
->flags
& REDIS_SLAVE
) return;
5073 c
->flags
|= (REDIS_SLAVE
|REDIS_MONITOR
);
5075 listAddNodeTail(server
.monitors
,c
);
5076 addReply(c
,shared
.ok
);
5079 /* ================================= Expire ================================= */
5080 static int removeExpire(redisDb
*db
, robj
*key
) {
5081 if (dictDelete(db
->expires
,key
) == DICT_OK
) {
5088 static int setExpire(redisDb
*db
, robj
*key
, time_t when
) {
5089 if (dictAdd(db
->expires
,key
,(void*)when
) == DICT_ERR
) {
5097 /* Return the expire time of the specified key, or -1 if no expire
5098 * is associated with this key (i.e. the key is non volatile) */
5099 static time_t getExpire(redisDb
*db
, robj
*key
) {
5102 /* No expire? return ASAP */
5103 if (dictSize(db
->expires
) == 0 ||
5104 (de
= dictFind(db
->expires
,key
)) == NULL
) return -1;
5106 return (time_t) dictGetEntryVal(de
);
5109 static int expireIfNeeded(redisDb
*db
, robj
*key
) {
5113 /* No expire? return ASAP */
5114 if (dictSize(db
->expires
) == 0 ||
5115 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
5117 /* Lookup the expire */
5118 when
= (time_t) dictGetEntryVal(de
);
5119 if (time(NULL
) <= when
) return 0;
5121 /* Delete the key */
5122 dictDelete(db
->expires
,key
);
5123 return dictDelete(db
->dict
,key
) == DICT_OK
;
5126 static int deleteIfVolatile(redisDb
*db
, robj
*key
) {
5129 /* No expire? return ASAP */
5130 if (dictSize(db
->expires
) == 0 ||
5131 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
5133 /* Delete the key */
5135 dictDelete(db
->expires
,key
);
5136 return dictDelete(db
->dict
,key
) == DICT_OK
;
5139 static void expireGenericCommand(redisClient
*c
, robj
*key
, time_t seconds
) {
5142 de
= dictFind(c
->db
->dict
,key
);
5144 addReply(c
,shared
.czero
);
5148 if (deleteKey(c
->db
,key
)) server
.dirty
++;
5149 addReply(c
, shared
.cone
);
5152 time_t when
= time(NULL
)+seconds
;
5153 if (setExpire(c
->db
,key
,when
)) {
5154 addReply(c
,shared
.cone
);
5157 addReply(c
,shared
.czero
);
5163 static void expireCommand(redisClient
*c
) {
5164 expireGenericCommand(c
,c
->argv
[1],strtol(c
->argv
[2]->ptr
,NULL
,10));
5167 static void expireatCommand(redisClient
*c
) {
5168 expireGenericCommand(c
,c
->argv
[1],strtol(c
->argv
[2]->ptr
,NULL
,10)-time(NULL
));
5171 static void ttlCommand(redisClient
*c
) {
5175 expire
= getExpire(c
->db
,c
->argv
[1]);
5177 ttl
= (int) (expire
-time(NULL
));
5178 if (ttl
< 0) ttl
= -1;
5180 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",ttl
));
5183 /* =============================== Replication ============================= */
5185 static int syncWrite(int fd
, char *ptr
, ssize_t size
, int timeout
) {
5186 ssize_t nwritten
, ret
= size
;
5187 time_t start
= time(NULL
);
5191 if (aeWait(fd
,AE_WRITABLE
,1000) & AE_WRITABLE
) {
5192 nwritten
= write(fd
,ptr
,size
);
5193 if (nwritten
== -1) return -1;
5197 if ((time(NULL
)-start
) > timeout
) {
5205 static int syncRead(int fd
, char *ptr
, ssize_t size
, int timeout
) {
5206 ssize_t nread
, totread
= 0;
5207 time_t start
= time(NULL
);
5211 if (aeWait(fd
,AE_READABLE
,1000) & AE_READABLE
) {
5212 nread
= read(fd
,ptr
,size
);
5213 if (nread
== -1) return -1;
5218 if ((time(NULL
)-start
) > timeout
) {
5226 static int syncReadLine(int fd
, char *ptr
, ssize_t size
, int timeout
) {
5233 if (syncRead(fd
,&c
,1,timeout
) == -1) return -1;
5236 if (nread
&& *(ptr
-1) == '\r') *(ptr
-1) = '\0';
5247 static void syncCommand(redisClient
*c
) {
5248 /* ignore SYNC if aleady slave or in monitor mode */
5249 if (c
->flags
& REDIS_SLAVE
) return;
5251 /* SYNC can't be issued when the server has pending data to send to
5252 * the client about already issued commands. We need a fresh reply
5253 * buffer registering the differences between the BGSAVE and the current
5254 * dataset, so that we can copy to other slaves if needed. */
5255 if (listLength(c
->reply
) != 0) {
5256 addReplySds(c
,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
5260 redisLog(REDIS_NOTICE
,"Slave ask for synchronization");
5261 /* Here we need to check if there is a background saving operation
5262 * in progress, or if it is required to start one */
5263 if (server
.bgsavechildpid
!= -1) {
5264 /* Ok a background save is in progress. Let's check if it is a good
5265 * one for replication, i.e. if there is another slave that is
5266 * registering differences since the server forked to save */
5270 listRewind(server
.slaves
);
5271 while((ln
= listYield(server
.slaves
))) {
5273 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) break;
5276 /* Perfect, the server is already registering differences for
5277 * another slave. Set the right state, and copy the buffer. */
5278 listRelease(c
->reply
);
5279 c
->reply
= listDup(slave
->reply
);
5280 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
5281 redisLog(REDIS_NOTICE
,"Waiting for end of BGSAVE for SYNC");
5283 /* No way, we need to wait for the next BGSAVE in order to
5284 * register differences */
5285 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_START
;
5286 redisLog(REDIS_NOTICE
,"Waiting for next BGSAVE for SYNC");
5289 /* Ok we don't have a BGSAVE in progress, let's start one */
5290 redisLog(REDIS_NOTICE
,"Starting BGSAVE for SYNC");
5291 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
5292 redisLog(REDIS_NOTICE
,"Replication failed, can't BGSAVE");
5293 addReplySds(c
,sdsnew("-ERR Unalbe to perform background save\r\n"));
5296 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
5299 c
->flags
|= REDIS_SLAVE
;
5301 listAddNodeTail(server
.slaves
,c
);
5305 static void sendBulkToSlave(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
5306 redisClient
*slave
= privdata
;
5308 REDIS_NOTUSED(mask
);
5309 char buf
[REDIS_IOBUF_LEN
];
5310 ssize_t nwritten
, buflen
;
5312 if (slave
->repldboff
== 0) {
5313 /* Write the bulk write count before to transfer the DB. In theory here
5314 * we don't know how much room there is in the output buffer of the
5315 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
5316 * operations) will never be smaller than the few bytes we need. */
5319 bulkcount
= sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
5321 if (write(fd
,bulkcount
,sdslen(bulkcount
)) != (signed)sdslen(bulkcount
))
5329 lseek(slave
->repldbfd
,slave
->repldboff
,SEEK_SET
);
5330 buflen
= read(slave
->repldbfd
,buf
,REDIS_IOBUF_LEN
);
5332 redisLog(REDIS_WARNING
,"Read error sending DB to slave: %s",
5333 (buflen
== 0) ? "premature EOF" : strerror(errno
));
5337 if ((nwritten
= write(fd
,buf
,buflen
)) == -1) {
5338 redisLog(REDIS_DEBUG
,"Write error sending DB to slave: %s",
5343 slave
->repldboff
+= nwritten
;
5344 if (slave
->repldboff
== slave
->repldbsize
) {
5345 close(slave
->repldbfd
);
5346 slave
->repldbfd
= -1;
5347 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
5348 slave
->replstate
= REDIS_REPL_ONLINE
;
5349 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
,
5350 sendReplyToClient
, slave
) == AE_ERR
) {
5354 addReplySds(slave
,sdsempty());
5355 redisLog(REDIS_NOTICE
,"Synchronization with slave succeeded");
5359 /* This function is called at the end of every backgrond saving.
5360 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
5361 * otherwise REDIS_ERR is passed to the function.
5363 * The goal of this function is to handle slaves waiting for a successful
5364 * background saving in order to perform non-blocking synchronization. */
5365 static void updateSlavesWaitingBgsave(int bgsaveerr
) {
5367 int startbgsave
= 0;
5369 listRewind(server
.slaves
);
5370 while((ln
= listYield(server
.slaves
))) {
5371 redisClient
*slave
= ln
->value
;
5373 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) {
5375 slave
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
5376 } else if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) {
5377 struct redis_stat buf
;
5379 if (bgsaveerr
!= REDIS_OK
) {
5381 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE child returned an error");
5384 if ((slave
->repldbfd
= open(server
.dbfilename
,O_RDONLY
)) == -1 ||
5385 redis_fstat(slave
->repldbfd
,&buf
) == -1) {
5387 redisLog(REDIS_WARNING
,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno
));
5390 slave
->repldboff
= 0;
5391 slave
->repldbsize
= buf
.st_size
;
5392 slave
->replstate
= REDIS_REPL_SEND_BULK
;
5393 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
5394 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
, sendBulkToSlave
, slave
) == AE_ERR
) {
5401 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
5402 listRewind(server
.slaves
);
5403 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE failed");
5404 while((ln
= listYield(server
.slaves
))) {
5405 redisClient
*slave
= ln
->value
;
5407 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
)
5414 static int syncWithMaster(void) {
5415 char buf
[1024], tmpfile
[256], authcmd
[1024];
5417 int fd
= anetTcpConnect(NULL
,server
.masterhost
,server
.masterport
);
5421 redisLog(REDIS_WARNING
,"Unable to connect to MASTER: %s",
5426 /* AUTH with the master if required. */
5427 if(server
.masterauth
) {
5428 snprintf(authcmd
, 1024, "AUTH %s\r\n", server
.masterauth
);
5429 if (syncWrite(fd
, authcmd
, strlen(server
.masterauth
)+7, 5) == -1) {
5431 redisLog(REDIS_WARNING
,"Unable to AUTH to MASTER: %s",
5435 /* Read the AUTH result. */
5436 if (syncReadLine(fd
,buf
,1024,3600) == -1) {
5438 redisLog(REDIS_WARNING
,"I/O error reading auth result from MASTER: %s",
5442 if (buf
[0] != '+') {
5444 redisLog(REDIS_WARNING
,"Cannot AUTH to MASTER, is the masterauth password correct?");
5449 /* Issue the SYNC command */
5450 if (syncWrite(fd
,"SYNC \r\n",7,5) == -1) {
5452 redisLog(REDIS_WARNING
,"I/O error writing to MASTER: %s",
5456 /* Read the bulk write count */
5457 if (syncReadLine(fd
,buf
,1024,3600) == -1) {
5459 redisLog(REDIS_WARNING
,"I/O error reading bulk count from MASTER: %s",
5463 if (buf
[0] != '$') {
5465 redisLog(REDIS_WARNING
,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
5468 dumpsize
= atoi(buf
+1);
5469 redisLog(REDIS_NOTICE
,"Receiving %d bytes data dump from MASTER",dumpsize
);
5470 /* Read the bulk write data on a temp file */
5471 snprintf(tmpfile
,256,"temp-%d.%ld.rdb",(int)time(NULL
),(long int)random());
5472 dfd
= open(tmpfile
,O_CREAT
|O_WRONLY
,0644);
5475 redisLog(REDIS_WARNING
,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno
));
5479 int nread
, nwritten
;
5481 nread
= read(fd
,buf
,(dumpsize
< 1024)?dumpsize
:1024);
5483 redisLog(REDIS_WARNING
,"I/O error trying to sync with MASTER: %s",
5489 nwritten
= write(dfd
,buf
,nread
);
5490 if (nwritten
== -1) {
5491 redisLog(REDIS_WARNING
,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno
));
5499 if (rename(tmpfile
,server
.dbfilename
) == -1) {
5500 redisLog(REDIS_WARNING
,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno
));
5506 if (rdbLoad(server
.dbfilename
) != REDIS_OK
) {
5507 redisLog(REDIS_WARNING
,"Failed trying to load the MASTER synchronization DB from disk");
5511 server
.master
= createClient(fd
);
5512 server
.master
->flags
|= REDIS_MASTER
;
5513 server
.replstate
= REDIS_REPL_CONNECTED
;
5517 static void slaveofCommand(redisClient
*c
) {
5518 if (!strcasecmp(c
->argv
[1]->ptr
,"no") &&
5519 !strcasecmp(c
->argv
[2]->ptr
,"one")) {
5520 if (server
.masterhost
) {
5521 sdsfree(server
.masterhost
);
5522 server
.masterhost
= NULL
;
5523 if (server
.master
) freeClient(server
.master
);
5524 server
.replstate
= REDIS_REPL_NONE
;
5525 redisLog(REDIS_NOTICE
,"MASTER MODE enabled (user request)");
5528 sdsfree(server
.masterhost
);
5529 server
.masterhost
= sdsdup(c
->argv
[1]->ptr
);
5530 server
.masterport
= atoi(c
->argv
[2]->ptr
);
5531 if (server
.master
) freeClient(server
.master
);
5532 server
.replstate
= REDIS_REPL_CONNECT
;
5533 redisLog(REDIS_NOTICE
,"SLAVE OF %s:%d enabled (user request)",
5534 server
.masterhost
, server
.masterport
);
5536 addReply(c
,shared
.ok
);
5539 /* ============================ Maxmemory directive ======================== */
5541 /* This function gets called when 'maxmemory' is set on the config file to limit
5542 * the max memory used by the server, and we are out of memory.
5543 * This function will try to, in order:
5545 * - Free objects from the free list
5546 * - Try to remove keys with an EXPIRE set
5548 * It is not possible to free enough memory to reach used-memory < maxmemory
5549 * the server will start refusing commands that will enlarge even more the
5552 static void freeMemoryIfNeeded(void) {
5553 while (server
.maxmemory
&& zmalloc_used_memory() > server
.maxmemory
) {
5554 if (listLength(server
.objfreelist
)) {
5557 listNode
*head
= listFirst(server
.objfreelist
);
5558 o
= listNodeValue(head
);
5559 listDelNode(server
.objfreelist
,head
);
5562 int j
, k
, freed
= 0;
5564 for (j
= 0; j
< server
.dbnum
; j
++) {
5566 robj
*minkey
= NULL
;
5567 struct dictEntry
*de
;
5569 if (dictSize(server
.db
[j
].expires
)) {
5571 /* From a sample of three keys drop the one nearest to
5572 * the natural expire */
5573 for (k
= 0; k
< 3; k
++) {
5576 de
= dictGetRandomKey(server
.db
[j
].expires
);
5577 t
= (time_t) dictGetEntryVal(de
);
5578 if (minttl
== -1 || t
< minttl
) {
5579 minkey
= dictGetEntryKey(de
);
5583 deleteKey(server
.db
+j
,minkey
);
5586 if (!freed
) return; /* nothing to free... */
5591 /* ============================== Append Only file ========================== */
5593 static void feedAppendOnlyFile(struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
) {
5594 sds buf
= sdsempty();
5600 /* The DB this command was targetting is not the same as the last command
5601 * we appendend. To issue a SELECT command is needed. */
5602 if (dictid
!= server
.appendseldb
) {
5605 snprintf(seldb
,sizeof(seldb
),"%d",dictid
);
5606 buf
= sdscatprintf(buf
,"*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
5607 strlen(seldb
),seldb
);
5608 server
.appendseldb
= dictid
;
5611 /* "Fix" the argv vector if the command is EXPIRE. We want to translate
5612 * EXPIREs into EXPIREATs calls */
5613 if (cmd
->proc
== expireCommand
) {
5616 tmpargv
[0] = createStringObject("EXPIREAT",8);
5617 tmpargv
[1] = argv
[1];
5618 incrRefCount(argv
[1]);
5619 when
= time(NULL
)+strtol(argv
[2]->ptr
,NULL
,10);
5620 tmpargv
[2] = createObject(REDIS_STRING
,
5621 sdscatprintf(sdsempty(),"%ld",when
));
5625 /* Append the actual command */
5626 buf
= sdscatprintf(buf
,"*%d\r\n",argc
);
5627 for (j
= 0; j
< argc
; j
++) {
5630 o
= getDecodedObject(o
);
5631 buf
= sdscatprintf(buf
,"$%d\r\n",sdslen(o
->ptr
));
5632 buf
= sdscatlen(buf
,o
->ptr
,sdslen(o
->ptr
));
5633 buf
= sdscatlen(buf
,"\r\n",2);
5637 /* Free the objects from the modified argv for EXPIREAT */
5638 if (cmd
->proc
== expireCommand
) {
5639 for (j
= 0; j
< 3; j
++)
5640 decrRefCount(argv
[j
]);
5643 /* We want to perform a single write. This should be guaranteed atomic
5644 * at least if the filesystem we are writing is a real physical one.
5645 * While this will save us against the server being killed I don't think
5646 * there is much to do about the whole server stopping for power problems
5648 nwritten
= write(server
.appendfd
,buf
,sdslen(buf
));
5649 if (nwritten
!= (signed)sdslen(buf
)) {
5650 /* Ooops, we are in troubles. The best thing to do for now is
5651 * to simply exit instead to give the illusion that everything is
5652 * working as expected. */
5653 if (nwritten
== -1) {
5654 redisLog(REDIS_WARNING
,"Exiting on error writing to the append-only file: %s",strerror(errno
));
5656 redisLog(REDIS_WARNING
,"Exiting on short write while writing to the append-only file: %s",strerror(errno
));
5660 /* If a background append only file rewriting is in progress we want to
5661 * accumulate the differences between the child DB and the current one
5662 * in a buffer, so that when the child process will do its work we
5663 * can append the differences to the new append only file. */
5664 if (server
.bgrewritechildpid
!= -1)
5665 server
.bgrewritebuf
= sdscatlen(server
.bgrewritebuf
,buf
,sdslen(buf
));
5669 if (server
.appendfsync
== APPENDFSYNC_ALWAYS
||
5670 (server
.appendfsync
== APPENDFSYNC_EVERYSEC
&&
5671 now
-server
.lastfsync
> 1))
5673 fsync(server
.appendfd
); /* Let's try to get this data on the disk */
5674 server
.lastfsync
= now
;
5678 /* In Redis commands are always executed in the context of a client, so in
5679 * order to load the append only file we need to create a fake client. */
5680 static struct redisClient
*createFakeClient(void) {
5681 struct redisClient
*c
= zmalloc(sizeof(*c
));
5685 c
->querybuf
= sdsempty();
5689 /* We set the fake client as a slave waiting for the synchronization
5690 * so that Redis will not try to send replies to this client. */
5691 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_START
;
5692 c
->reply
= listCreate();
5693 listSetFreeMethod(c
->reply
,decrRefCount
);
5694 listSetDupMethod(c
->reply
,dupClientReplyValue
);
5698 static void freeFakeClient(struct redisClient
*c
) {
5699 sdsfree(c
->querybuf
);
5700 listRelease(c
->reply
);
5704 /* Replay the append log file. On error REDIS_OK is returned. On non fatal
5705 * error (the append only file is zero-length) REDIS_ERR is returned. On
5706 * fatal error an error message is logged and the program exists. */
5707 int loadAppendOnlyFile(char *filename
) {
5708 struct redisClient
*fakeClient
;
5709 FILE *fp
= fopen(filename
,"r");
5710 struct redis_stat sb
;
5712 if (redis_fstat(fileno(fp
),&sb
) != -1 && sb
.st_size
== 0)
5716 redisLog(REDIS_WARNING
,"Fatal error: can't open the append log file for reading: %s",strerror(errno
));
5720 fakeClient
= createFakeClient();
5727 struct redisCommand
*cmd
;
5729 if (fgets(buf
,sizeof(buf
),fp
) == NULL
) {
5735 if (buf
[0] != '*') goto fmterr
;
5737 argv
= zmalloc(sizeof(robj
*)*argc
);
5738 for (j
= 0; j
< argc
; j
++) {
5739 if (fgets(buf
,sizeof(buf
),fp
) == NULL
) goto readerr
;
5740 if (buf
[0] != '$') goto fmterr
;
5741 len
= strtol(buf
+1,NULL
,10);
5742 argsds
= sdsnewlen(NULL
,len
);
5743 if (fread(argsds
,len
,1,fp
) == 0) goto fmterr
;
5744 argv
[j
] = createObject(REDIS_STRING
,argsds
);
5745 if (fread(buf
,2,1,fp
) == 0) goto fmterr
; /* discard CRLF */
5748 /* Command lookup */
5749 cmd
= lookupCommand(argv
[0]->ptr
);
5751 redisLog(REDIS_WARNING
,"Unknown command '%s' reading the append only file", argv
[0]->ptr
);
5754 /* Try object sharing and encoding */
5755 if (server
.shareobjects
) {
5757 for(j
= 1; j
< argc
; j
++)
5758 argv
[j
] = tryObjectSharing(argv
[j
]);
5760 if (cmd
->flags
& REDIS_CMD_BULK
)
5761 tryObjectEncoding(argv
[argc
-1]);
5762 /* Run the command in the context of a fake client */
5763 fakeClient
->argc
= argc
;
5764 fakeClient
->argv
= argv
;
5765 cmd
->proc(fakeClient
);
5766 /* Discard the reply objects list from the fake client */
5767 while(listLength(fakeClient
->reply
))
5768 listDelNode(fakeClient
->reply
,listFirst(fakeClient
->reply
));
5769 /* Clean up, ready for the next command */
5770 for (j
= 0; j
< argc
; j
++) decrRefCount(argv
[j
]);
5774 freeFakeClient(fakeClient
);
5779 redisLog(REDIS_WARNING
,"Unexpected end of file reading the append only file");
5781 redisLog(REDIS_WARNING
,"Unrecoverable error reading the append only file: %s", strerror(errno
));
5785 redisLog(REDIS_WARNING
,"Bad file format reading the append only file");
5789 /* Write an object into a file in the bulk format $<count>\r\n<payload>\r\n */
5790 static int fwriteBulk(FILE *fp
, robj
*obj
) {
5792 obj
= getDecodedObject(obj
);
5793 snprintf(buf
,sizeof(buf
),"$%ld\r\n",(long)sdslen(obj
->ptr
));
5794 if (fwrite(buf
,strlen(buf
),1,fp
) == 0) goto err
;
5795 if (fwrite(obj
->ptr
,sdslen(obj
->ptr
),1,fp
) == 0) goto err
;
5796 if (fwrite("\r\n",2,1,fp
) == 0) goto err
;
5804 /* Write a double value in bulk format $<count>\r\n<payload>\r\n */
5805 static int fwriteBulkDouble(FILE *fp
, double d
) {
5806 char buf
[128], dbuf
[128];
5808 snprintf(dbuf
,sizeof(dbuf
),"%.17g\r\n",d
);
5809 snprintf(buf
,sizeof(buf
),"$%lu\r\n",(unsigned long)strlen(dbuf
)-2);
5810 if (fwrite(buf
,strlen(buf
),1,fp
) == 0) return 0;
5811 if (fwrite(dbuf
,strlen(dbuf
),1,fp
) == 0) return 0;
5815 /* Write a long value in bulk format $<count>\r\n<payload>\r\n */
5816 static int fwriteBulkLong(FILE *fp
, long l
) {
5817 char buf
[128], lbuf
[128];
5819 snprintf(lbuf
,sizeof(lbuf
),"%ld\r\n",l
);
5820 snprintf(buf
,sizeof(buf
),"$%lu\r\n",(unsigned long)strlen(lbuf
)-2);
5821 if (fwrite(buf
,strlen(buf
),1,fp
) == 0) return 0;
5822 if (fwrite(lbuf
,strlen(lbuf
),1,fp
) == 0) return 0;
5826 /* Write a sequence of commands able to fully rebuild the dataset into
5827 * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */
5828 static int rewriteAppendOnlyFile(char *filename
) {
5829 dictIterator
*di
= NULL
;
5834 time_t now
= time(NULL
);
5836 /* Note that we have to use a different temp name here compared to the
5837 * one used by rewriteAppendOnlyFileBackground() function. */
5838 snprintf(tmpfile
,256,"temp-rewriteaof-%d.aof", (int) getpid());
5839 fp
= fopen(tmpfile
,"w");
5841 redisLog(REDIS_WARNING
, "Failed rewriting the append only file: %s", strerror(errno
));
5844 for (j
= 0; j
< server
.dbnum
; j
++) {
5845 char selectcmd
[] = "*2\r\n$6\r\nSELECT\r\n";
5846 redisDb
*db
= server
.db
+j
;
5848 if (dictSize(d
) == 0) continue;
5849 di
= dictGetIterator(d
);
5855 /* SELECT the new DB */
5856 if (fwrite(selectcmd
,sizeof(selectcmd
)-1,1,fp
) == 0) goto werr
;
5857 if (fwriteBulkLong(fp
,j
) == 0) goto werr
;
5859 /* Iterate this DB writing every entry */
5860 while((de
= dictNext(di
)) != NULL
) {
5861 robj
*key
= dictGetEntryKey(de
);
5862 robj
*o
= dictGetEntryVal(de
);
5863 time_t expiretime
= getExpire(db
,key
);
5865 /* Save the key and associated value */
5866 if (o
->type
== REDIS_STRING
) {
5867 /* Emit a SET command */
5868 char cmd
[]="*3\r\n$3\r\nSET\r\n";
5869 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
5871 if (fwriteBulk(fp
,key
) == 0) goto werr
;
5872 if (fwriteBulk(fp
,o
) == 0) goto werr
;
5873 } else if (o
->type
== REDIS_LIST
) {
5874 /* Emit the RPUSHes needed to rebuild the list */
5875 list
*list
= o
->ptr
;
5879 while((ln
= listYield(list
))) {
5880 char cmd
[]="*3\r\n$5\r\nRPUSH\r\n";
5881 robj
*eleobj
= listNodeValue(ln
);
5883 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
5884 if (fwriteBulk(fp
,key
) == 0) goto werr
;
5885 if (fwriteBulk(fp
,eleobj
) == 0) goto werr
;
5887 } else if (o
->type
== REDIS_SET
) {
5888 /* Emit the SADDs needed to rebuild the set */
5890 dictIterator
*di
= dictGetIterator(set
);
5893 while((de
= dictNext(di
)) != NULL
) {
5894 char cmd
[]="*3\r\n$4\r\nSADD\r\n";
5895 robj
*eleobj
= dictGetEntryKey(de
);
5897 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
5898 if (fwriteBulk(fp
,key
) == 0) goto werr
;
5899 if (fwriteBulk(fp
,eleobj
) == 0) goto werr
;
5901 dictReleaseIterator(di
);
5902 } else if (o
->type
== REDIS_ZSET
) {
5903 /* Emit the ZADDs needed to rebuild the sorted set */
5905 dictIterator
*di
= dictGetIterator(zs
->dict
);
5908 while((de
= dictNext(di
)) != NULL
) {
5909 char cmd
[]="*4\r\n$4\r\nZADD\r\n";
5910 robj
*eleobj
= dictGetEntryKey(de
);
5911 double *score
= dictGetEntryVal(de
);
5913 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
5914 if (fwriteBulk(fp
,key
) == 0) goto werr
;
5915 if (fwriteBulkDouble(fp
,*score
) == 0) goto werr
;
5916 if (fwriteBulk(fp
,eleobj
) == 0) goto werr
;
5918 dictReleaseIterator(di
);
5922 /* Save the expire time */
5923 if (expiretime
!= -1) {
5924 char cmd
[]="*3\r\n$6\r\nEXPIRE\r\n";
5925 /* If this key is already expired skip it */
5926 if (expiretime
< now
) continue;
5927 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
5928 if (fwriteBulk(fp
,key
) == 0) goto werr
;
5929 if (fwriteBulkLong(fp
,expiretime
) == 0) goto werr
;
5932 dictReleaseIterator(di
);
5935 /* Make sure data will not remain on the OS's output buffers */
5940 /* Use RENAME to make sure the DB file is changed atomically only
5941 * if the generate DB file is ok. */
5942 if (rename(tmpfile
,filename
) == -1) {
5943 redisLog(REDIS_WARNING
,"Error moving temp append only file on the final destination: %s", strerror(errno
));
5947 redisLog(REDIS_NOTICE
,"SYNC append only file rewrite performed");
5953 redisLog(REDIS_WARNING
,"Write error writing append only fileon disk: %s", strerror(errno
));
5954 if (di
) dictReleaseIterator(di
);
5958 /* This is how rewriting of the append only file in background works:
5960 * 1) The user calls BGREWRITEAOF
5961 * 2) Redis calls this function, that forks():
5962 * 2a) the child rewrite the append only file in a temp file.
5963 * 2b) the parent accumulates differences in server.bgrewritebuf.
5964 * 3) When the child finished '2a' exists.
5965 * 4) The parent will trap the exit code, if it's OK, will append the
5966 * data accumulated into server.bgrewritebuf into the temp file, and
5967 * finally will rename(2) the temp file in the actual file name.
5968 * The the new file is reopened as the new append only file. Profit!
5970 static int rewriteAppendOnlyFileBackground(void) {
5973 if (server
.bgrewritechildpid
!= -1) return REDIS_ERR
;
5974 if ((childpid
= fork()) == 0) {
5979 snprintf(tmpfile
,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
5980 if (rewriteAppendOnlyFile(tmpfile
) == REDIS_OK
) {
5987 if (childpid
== -1) {
5988 redisLog(REDIS_WARNING
,
5989 "Can't rewrite append only file in background: fork: %s",
5993 redisLog(REDIS_NOTICE
,
5994 "Background append only file rewriting started by pid %d",childpid
);
5995 server
.bgrewritechildpid
= childpid
;
5996 /* We set appendseldb to -1 in order to force the next call to the
5997 * feedAppendOnlyFile() to issue a SELECT command, so the differences
5998 * accumulated by the parent into server.bgrewritebuf will start
5999 * with a SELECT statement and it will be safe to merge. */
6000 server
.appendseldb
= -1;
6003 return REDIS_OK
; /* unreached */
6006 static void bgrewriteaofCommand(redisClient
*c
) {
6007 if (server
.bgrewritechildpid
!= -1) {
6008 addReplySds(c
,sdsnew("-ERR background append only file rewriting already in progress\r\n"));
6011 if (rewriteAppendOnlyFileBackground() == REDIS_OK
) {
6012 addReply(c
,shared
.ok
);
6014 addReply(c
,shared
.err
);
6018 static void aofRemoveTempFile(pid_t childpid
) {
6021 snprintf(tmpfile
,256,"temp-rewriteaof-bg-%d.aof", (int) childpid
);
6025 /* ================================= Debugging ============================== */
6027 static void debugCommand(redisClient
*c
) {
6028 if (!strcasecmp(c
->argv
[1]->ptr
,"segfault")) {
6030 } else if (!strcasecmp(c
->argv
[1]->ptr
,"reload")) {
6031 if (rdbSave(server
.dbfilename
) != REDIS_OK
) {
6032 addReply(c
,shared
.err
);
6036 if (rdbLoad(server
.dbfilename
) != REDIS_OK
) {
6037 addReply(c
,shared
.err
);
6040 redisLog(REDIS_WARNING
,"DB reloaded by DEBUG RELOAD");
6041 addReply(c
,shared
.ok
);
6042 } else if (!strcasecmp(c
->argv
[1]->ptr
,"object") && c
->argc
== 3) {
6043 dictEntry
*de
= dictFind(c
->db
->dict
,c
->argv
[2]);
6047 addReply(c
,shared
.nokeyerr
);
6050 key
= dictGetEntryKey(de
);
6051 val
= dictGetEntryVal(de
);
6052 addReplySds(c
,sdscatprintf(sdsempty(),
6053 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
6054 key
, key
->refcount
, val
, val
->refcount
, val
->encoding
));
6056 addReplySds(c
,sdsnew(
6057 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>|RELOAD]\r\n"));
6061 /* =================================== Main! ================================ */
6064 int linuxOvercommitMemoryValue(void) {
6065 FILE *fp
= fopen("/proc/sys/vm/overcommit_memory","r");
6069 if (fgets(buf
,64,fp
) == NULL
) {
6078 void linuxOvercommitMemoryWarning(void) {
6079 if (linuxOvercommitMemoryValue() == 0) {
6080 redisLog(REDIS_WARNING
,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
6083 #endif /* __linux__ */
6085 static void daemonize(void) {
6089 if (fork() != 0) exit(0); /* parent exits */
6090 setsid(); /* create a new session */
6092 /* Every output goes to /dev/null. If Redis is daemonized but
6093 * the 'logfile' is set to 'stdout' in the configuration file
6094 * it will not log at all. */
6095 if ((fd
= open("/dev/null", O_RDWR
, 0)) != -1) {
6096 dup2(fd
, STDIN_FILENO
);
6097 dup2(fd
, STDOUT_FILENO
);
6098 dup2(fd
, STDERR_FILENO
);
6099 if (fd
> STDERR_FILENO
) close(fd
);
6101 /* Try to write the pid file */
6102 fp
= fopen(server
.pidfile
,"w");
6104 fprintf(fp
,"%d\n",getpid());
6109 int main(int argc
, char **argv
) {
6112 resetServerSaveParams();
6113 loadServerConfig(argv
[1]);
6114 } else if (argc
> 2) {
6115 fprintf(stderr
,"Usage: ./redis-server [/path/to/redis.conf]\n");
6118 redisLog(REDIS_WARNING
,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
6121 if (server
.daemonize
) daemonize();
6122 redisLog(REDIS_NOTICE
,"Server started, Redis version " REDIS_VERSION
);
6124 linuxOvercommitMemoryWarning();
6126 if (server
.appendonly
) {
6127 if (loadAppendOnlyFile(server
.appendfilename
) == REDIS_OK
)
6128 redisLog(REDIS_NOTICE
,"DB loaded from append only file");
6130 if (rdbLoad(server
.dbfilename
) == REDIS_OK
)
6131 redisLog(REDIS_NOTICE
,"DB loaded from disk");
6133 if (aeCreateFileEvent(server
.el
, server
.fd
, AE_READABLE
,
6134 acceptHandler
, NULL
) == AE_ERR
) oom("creating file event");
6135 redisLog(REDIS_NOTICE
,"The server is now ready to accept connections on port %d", server
.port
);
6137 aeDeleteEventLoop(server
.el
);
6141 /* ============================= Backtrace support ========================= */
6143 #ifdef HAVE_BACKTRACE
6144 static char *findFuncName(void *pointer
, unsigned long *offset
);
6146 static void *getMcontextEip(ucontext_t
*uc
) {
6147 #if defined(__FreeBSD__)
6148 return (void*) uc
->uc_mcontext
.mc_eip
;
6149 #elif defined(__dietlibc__)
6150 return (void*) uc
->uc_mcontext
.eip
;
6151 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
6152 return (void*) uc
->uc_mcontext
->__ss
.__eip
;
6153 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
6154 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
6155 return (void*) uc
->uc_mcontext
->__ss
.__rip
;
6157 return (void*) uc
->uc_mcontext
->__ss
.__eip
;
6159 #elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
6160 return (void*) uc
->uc_mcontext
.gregs
[REG_EIP
];
6161 #elif defined(__ia64__) /* Linux IA64 */
6162 return (void*) uc
->uc_mcontext
.sc_ip
;
6168 static void segvHandler(int sig
, siginfo_t
*info
, void *secret
) {
6170 char **messages
= NULL
;
6171 int i
, trace_size
= 0;
6172 unsigned long offset
=0;
6173 time_t uptime
= time(NULL
)-server
.stat_starttime
;
6174 ucontext_t
*uc
= (ucontext_t
*) secret
;
6175 REDIS_NOTUSED(info
);
6177 redisLog(REDIS_WARNING
,
6178 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION
, sig
);
6179 redisLog(REDIS_WARNING
, "%s", sdscatprintf(sdsempty(),
6180 "redis_version:%s; "
6181 "uptime_in_seconds:%d; "
6182 "connected_clients:%d; "
6183 "connected_slaves:%d; "
6185 "changes_since_last_save:%lld; "
6186 "bgsave_in_progress:%d; "
6187 "last_save_time:%d; "
6188 "total_connections_received:%lld; "
6189 "total_commands_processed:%lld; "
6193 listLength(server
.clients
)-listLength(server
.slaves
),
6194 listLength(server
.slaves
),
6197 server
.bgsavechildpid
!= -1,
6199 server
.stat_numconnections
,
6200 server
.stat_numcommands
,
6201 server
.masterhost
== NULL
? "master" : "slave"
6204 trace_size
= backtrace(trace
, 100);
6205 /* overwrite sigaction with caller's address */
6206 if (getMcontextEip(uc
) != NULL
) {
6207 trace
[1] = getMcontextEip(uc
);
6209 messages
= backtrace_symbols(trace
, trace_size
);
6211 for (i
=1; i
<trace_size
; ++i
) {
6212 char *fn
= findFuncName(trace
[i
], &offset
), *p
;
6214 p
= strchr(messages
[i
],'+');
6215 if (!fn
|| (p
&& ((unsigned long)strtol(p
+1,NULL
,10)) < offset
)) {
6216 redisLog(REDIS_WARNING
,"%s", messages
[i
]);
6218 redisLog(REDIS_WARNING
,"%d redis-server %p %s + %d", i
, trace
[i
], fn
, (unsigned int)offset
);
6225 static void setupSigSegvAction(void) {
6226 struct sigaction act
;
6228 sigemptyset (&act
.sa_mask
);
6229 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
6230 * is used. Otherwise, sa_handler is used */
6231 act
.sa_flags
= SA_NODEFER
| SA_ONSTACK
| SA_RESETHAND
| SA_SIGINFO
;
6232 act
.sa_sigaction
= segvHandler
;
6233 sigaction (SIGSEGV
, &act
, NULL
);
6234 sigaction (SIGBUS
, &act
, NULL
);
6235 sigaction (SIGFPE
, &act
, NULL
);
6236 sigaction (SIGILL
, &act
, NULL
);
6237 sigaction (SIGBUS
, &act
, NULL
);
6241 #include "staticsymbols.h"
6242 /* This function try to convert a pointer into a function name. It's used in
6243 * oreder to provide a backtrace under segmentation fault that's able to
6244 * display functions declared as static (otherwise the backtrace is useless). */
6245 static char *findFuncName(void *pointer
, unsigned long *offset
){
6247 unsigned long off
, minoff
= 0;
6249 /* Try to match against the Symbol with the smallest offset */
6250 for (i
=0; symsTable
[i
].pointer
; i
++) {
6251 unsigned long lp
= (unsigned long) pointer
;
6253 if (lp
!= (unsigned long)-1 && lp
>= symsTable
[i
].pointer
) {
6254 off
=lp
-symsTable
[i
].pointer
;
6255 if (ret
< 0 || off
< minoff
) {
6261 if (ret
== -1) return NULL
;
6263 return symsTable
[ret
].name
;
6265 #else /* HAVE_BACKTRACE */
6266 static void setupSigSegvAction(void) {
6268 #endif /* HAVE_BACKTRACE */