2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
30 #define REDIS_VERSION "1.1.90"
40 #define __USE_POSIX199309
46 #endif /* HAVE_BACKTRACE */
54 #include <arpa/inet.h>
58 #include <sys/resource.h>
64 #include "solarisfixes.h"
68 #include "ae.h" /* Event driven programming library */
69 #include "sds.h" /* Dynamic safe strings */
70 #include "anet.h" /* Networking the easy way */
71 #include "dict.h" /* Hash tables */
72 #include "adlist.h" /* Linked lists */
73 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
74 #include "lzf.h" /* LZF compression library */
75 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
81 /* Static server configuration */
82 #define REDIS_SERVERPORT 6379 /* TCP port */
83 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
84 #define REDIS_IOBUF_LEN 1024
85 #define REDIS_LOADBUF_LEN 1024
86 #define REDIS_STATIC_ARGS 4
87 #define REDIS_DEFAULT_DBNUM 16
88 #define REDIS_CONFIGLINE_MAX 1024
89 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
90 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
91 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
92 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
93 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
95 /* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */
96 #define REDIS_WRITEV_THRESHOLD 3
97 /* Max number of iovecs used for each writev call */
98 #define REDIS_WRITEV_IOVEC_COUNT 256
100 /* Hash table parameters */
101 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
104 #define REDIS_CMD_BULK 1 /* Bulk write command */
105 #define REDIS_CMD_INLINE 2 /* Inline command */
106 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
107 this flags will return an error when the 'maxmemory' option is set in the
108 config file and the server is using more than maxmemory bytes of memory.
109 In short this commands are denied on low memory conditions. */
110 #define REDIS_CMD_DENYOOM 4
113 #define REDIS_STRING 0
119 /* Objects encoding */
120 #define REDIS_ENCODING_RAW 0 /* Raw representation */
121 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
123 /* Object types only used for dumping to disk */
124 #define REDIS_EXPIRETIME 253
125 #define REDIS_SELECTDB 254
126 #define REDIS_EOF 255
128 /* Defines related to the dump file format. To store 32 bits lengths for short
129 * keys requires a lot of space, so we check the most significant 2 bits of
130 * the first byte to interpreter the length:
132 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
133 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
134 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
135 * 11|000000 this means: specially encoded object will follow. The six bits
136 * number specify the kind of object that follows.
137 * See the REDIS_RDB_ENC_* defines.
139 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
140 * values, will fit inside. */
141 #define REDIS_RDB_6BITLEN 0
142 #define REDIS_RDB_14BITLEN 1
143 #define REDIS_RDB_32BITLEN 2
144 #define REDIS_RDB_ENCVAL 3
145 #define REDIS_RDB_LENERR UINT_MAX
147 /* When a length of a string object stored on disk has the first two bits
148 * set, the remaining two bits specify a special encoding for the object
149 * accordingly to the following defines: */
150 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
151 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
152 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
153 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
156 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
157 #define REDIS_SLAVE 2 /* This client is a slave server */
158 #define REDIS_MASTER 4 /* This client is a master server */
159 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
161 /* Slave replication state - slave side */
162 #define REDIS_REPL_NONE 0 /* No active replication */
163 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
164 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
166 /* Slave replication state - from the point of view of master
167 * Note that in SEND_BULK and ONLINE state the slave receives new updates
168 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
169 * to start the next background saving in order to send updates to it. */
170 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
171 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
172 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
173 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
175 /* List related stuff */
179 /* Sort operations */
180 #define REDIS_SORT_GET 0
181 #define REDIS_SORT_ASC 1
182 #define REDIS_SORT_DESC 2
183 #define REDIS_SORTKEY_MAX 1024
186 #define REDIS_DEBUG 0
187 #define REDIS_NOTICE 1
188 #define REDIS_WARNING 2
190 /* Anti-warning macro... */
191 #define REDIS_NOTUSED(V) ((void) V)
193 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
194 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
196 /* Append only defines */
197 #define APPENDFSYNC_NO 0
198 #define APPENDFSYNC_ALWAYS 1
199 #define APPENDFSYNC_EVERYSEC 2
201 /* We can print the stacktrace, so our assert is defined this way: */
202 #define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e),exit(1)))
203 static void _redisAssert(char *estr
);
205 /*================================= Data types ============================== */
207 /* A redis object, that is a type able to hold a string / list / set */
208 typedef struct redisObject
{
211 unsigned char encoding
;
212 unsigned char notused
[2];
216 /* Macro used to initalize a Redis object allocated on the stack.
217 * Note that this macro is taken near the structure definition to make sure
218 * we'll update it when the structure is changed, to avoid bugs like
219 * bug #85 introduced exactly in this way. */
220 #define initStaticStringObject(_var,_ptr) do { \
222 _var.type = REDIS_STRING; \
223 _var.encoding = REDIS_ENCODING_RAW; \
227 typedef struct redisDb
{
233 /* With multiplexing we need to take per-clinet state.
234 * Clients are taken in a liked list. */
235 typedef struct redisClient
{
240 robj
**argv
, **mbargv
;
242 int bulklen
; /* bulk read len. -1 if not in bulk read mode */
243 int multibulk
; /* multi bulk command format active */
246 time_t lastinteraction
; /* time of the last interaction, used for timeout */
247 int flags
; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
248 int slaveseldb
; /* slave selected db, if this client is a slave */
249 int authenticated
; /* when requirepass is non-NULL */
250 int replstate
; /* replication state if this is a slave */
251 int repldbfd
; /* replication DB file descriptor */
252 long repldboff
; /* replication DB file offset */
253 off_t repldbsize
; /* replication DB file size */
261 /* Global server state structure */
267 unsigned int sharingpoolsize
;
268 long long dirty
; /* changes to DB from the last save */
270 list
*slaves
, *monitors
;
271 char neterr
[ANET_ERR_LEN
];
273 int cronloops
; /* number of times the cron function run */
274 list
*objfreelist
; /* A list of freed objects to avoid malloc() */
275 time_t lastsave
; /* Unix time of last save succeeede */
276 size_t usedmemory
; /* Used memory in megabytes */
277 /* Fields used only for stats */
278 time_t stat_starttime
; /* server start time */
279 long long stat_numcommands
; /* number of processed commands */
280 long long stat_numconnections
; /* number of connections received */
293 pid_t bgsavechildpid
;
294 pid_t bgrewritechildpid
;
295 sds bgrewritebuf
; /* buffer taken by parent during oppend only rewrite */
296 struct saveparam
*saveparams
;
301 char *appendfilename
;
304 /* Replication related */
309 redisClient
*master
; /* client that is master for this slave */
311 unsigned int maxclients
;
312 unsigned long maxmemory
;
313 /* Sort parameters - qsort_r() is only available under BSD so we
314 * have to take this state global, in order to pass it to sortCompare() */
320 typedef void redisCommandProc(redisClient
*c
);
321 struct redisCommand
{
323 redisCommandProc
*proc
;
328 struct redisFunctionSym
{
330 unsigned long pointer
;
333 typedef struct _redisSortObject
{
341 typedef struct _redisSortOperation
{
344 } redisSortOperation
;
346 /* ZSETs use a specialized version of Skiplists */
348 typedef struct zskiplistNode
{
349 struct zskiplistNode
**forward
;
350 struct zskiplistNode
*backward
;
355 typedef struct zskiplist
{
356 struct zskiplistNode
*header
, *tail
;
357 unsigned long length
;
361 typedef struct zset
{
366 /* Our shared "common" objects */
368 struct sharedObjectsStruct
{
369 robj
*crlf
, *ok
, *err
, *emptybulk
, *czero
, *cone
, *pong
, *space
,
370 *colon
, *nullbulk
, *nullmultibulk
,
371 *emptymultibulk
, *wrongtypeerr
, *nokeyerr
, *syntaxerr
, *sameobjecterr
,
372 *outofrangeerr
, *plus
,
373 *select0
, *select1
, *select2
, *select3
, *select4
,
374 *select5
, *select6
, *select7
, *select8
, *select9
;
377 /* Global vars that are actally used as constants. The following double
378 * values are used for double on-disk serialization, and are initialized
379 * at runtime to avoid strange compiler optimizations. */
381 static double R_Zero
, R_PosInf
, R_NegInf
, R_Nan
;
383 /*================================ Prototypes =============================== */
385 static void freeStringObject(robj
*o
);
386 static void freeListObject(robj
*o
);
387 static void freeSetObject(robj
*o
);
388 static void decrRefCount(void *o
);
389 static robj
*createObject(int type
, void *ptr
);
390 static void freeClient(redisClient
*c
);
391 static int rdbLoad(char *filename
);
392 static void addReply(redisClient
*c
, robj
*obj
);
393 static void addReplySds(redisClient
*c
, sds s
);
394 static void incrRefCount(robj
*o
);
395 static int rdbSaveBackground(char *filename
);
396 static robj
*createStringObject(char *ptr
, size_t len
);
397 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
);
398 static void feedAppendOnlyFile(struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
);
399 static int syncWithMaster(void);
400 static robj
*tryObjectSharing(robj
*o
);
401 static int tryObjectEncoding(robj
*o
);
402 static robj
*getDecodedObject(robj
*o
);
403 static int removeExpire(redisDb
*db
, robj
*key
);
404 static int expireIfNeeded(redisDb
*db
, robj
*key
);
405 static int deleteIfVolatile(redisDb
*db
, robj
*key
);
406 static int deleteKey(redisDb
*db
, robj
*key
);
407 static time_t getExpire(redisDb
*db
, robj
*key
);
408 static int setExpire(redisDb
*db
, robj
*key
, time_t when
);
409 static void updateSlavesWaitingBgsave(int bgsaveerr
);
410 static void freeMemoryIfNeeded(void);
411 static int processCommand(redisClient
*c
);
412 static void setupSigSegvAction(void);
413 static void rdbRemoveTempFile(pid_t childpid
);
414 static void aofRemoveTempFile(pid_t childpid
);
415 static size_t stringObjectLen(robj
*o
);
416 static void processInputBuffer(redisClient
*c
);
417 static zskiplist
*zslCreate(void);
418 static void zslFree(zskiplist
*zsl
);
419 static void zslInsert(zskiplist
*zsl
, double score
, robj
*obj
);
420 static void sendReplyToClientWritev(aeEventLoop
*el
, int fd
, void *privdata
, int mask
);
422 static void authCommand(redisClient
*c
);
423 static void pingCommand(redisClient
*c
);
424 static void echoCommand(redisClient
*c
);
425 static void setCommand(redisClient
*c
);
426 static void setnxCommand(redisClient
*c
);
427 static void getCommand(redisClient
*c
);
428 static void delCommand(redisClient
*c
);
429 static void existsCommand(redisClient
*c
);
430 static void incrCommand(redisClient
*c
);
431 static void decrCommand(redisClient
*c
);
432 static void incrbyCommand(redisClient
*c
);
433 static void decrbyCommand(redisClient
*c
);
434 static void selectCommand(redisClient
*c
);
435 static void randomkeyCommand(redisClient
*c
);
436 static void keysCommand(redisClient
*c
);
437 static void dbsizeCommand(redisClient
*c
);
438 static void lastsaveCommand(redisClient
*c
);
439 static void saveCommand(redisClient
*c
);
440 static void bgsaveCommand(redisClient
*c
);
441 static void bgrewriteaofCommand(redisClient
*c
);
442 static void shutdownCommand(redisClient
*c
);
443 static void moveCommand(redisClient
*c
);
444 static void renameCommand(redisClient
*c
);
445 static void renamenxCommand(redisClient
*c
);
446 static void lpushCommand(redisClient
*c
);
447 static void rpushCommand(redisClient
*c
);
448 static void lpopCommand(redisClient
*c
);
449 static void rpopCommand(redisClient
*c
);
450 static void llenCommand(redisClient
*c
);
451 static void lindexCommand(redisClient
*c
);
452 static void lrangeCommand(redisClient
*c
);
453 static void ltrimCommand(redisClient
*c
);
454 static void typeCommand(redisClient
*c
);
455 static void lsetCommand(redisClient
*c
);
456 static void saddCommand(redisClient
*c
);
457 static void sremCommand(redisClient
*c
);
458 static void smoveCommand(redisClient
*c
);
459 static void sismemberCommand(redisClient
*c
);
460 static void scardCommand(redisClient
*c
);
461 static void spopCommand(redisClient
*c
);
462 static void srandmemberCommand(redisClient
*c
);
463 static void sinterCommand(redisClient
*c
);
464 static void sinterstoreCommand(redisClient
*c
);
465 static void sunionCommand(redisClient
*c
);
466 static void sunionstoreCommand(redisClient
*c
);
467 static void sdiffCommand(redisClient
*c
);
468 static void sdiffstoreCommand(redisClient
*c
);
469 static void syncCommand(redisClient
*c
);
470 static void flushdbCommand(redisClient
*c
);
471 static void flushallCommand(redisClient
*c
);
472 static void sortCommand(redisClient
*c
);
473 static void lremCommand(redisClient
*c
);
474 static void rpoplpushcommand(redisClient
*c
);
475 static void infoCommand(redisClient
*c
);
476 static void mgetCommand(redisClient
*c
);
477 static void monitorCommand(redisClient
*c
);
478 static void expireCommand(redisClient
*c
);
479 static void expireatCommand(redisClient
*c
);
480 static void getsetCommand(redisClient
*c
);
481 static void ttlCommand(redisClient
*c
);
482 static void slaveofCommand(redisClient
*c
);
483 static void debugCommand(redisClient
*c
);
484 static void msetCommand(redisClient
*c
);
485 static void msetnxCommand(redisClient
*c
);
486 static void zaddCommand(redisClient
*c
);
487 static void zincrbyCommand(redisClient
*c
);
488 static void zrangeCommand(redisClient
*c
);
489 static void zrangebyscoreCommand(redisClient
*c
);
490 static void zrevrangeCommand(redisClient
*c
);
491 static void zcardCommand(redisClient
*c
);
492 static void zremCommand(redisClient
*c
);
493 static void zscoreCommand(redisClient
*c
);
494 static void zremrangebyscoreCommand(redisClient
*c
);
496 /*================================= Globals ================================= */
499 static struct redisServer server
; /* server global state */
500 static struct redisCommand cmdTable
[] = {
501 {"get",getCommand
,2,REDIS_CMD_INLINE
},
502 {"set",setCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
503 {"setnx",setnxCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
504 {"del",delCommand
,-2,REDIS_CMD_INLINE
},
505 {"exists",existsCommand
,2,REDIS_CMD_INLINE
},
506 {"incr",incrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
507 {"decr",decrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
508 {"mget",mgetCommand
,-2,REDIS_CMD_INLINE
},
509 {"rpush",rpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
510 {"lpush",lpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
511 {"rpop",rpopCommand
,2,REDIS_CMD_INLINE
},
512 {"lpop",lpopCommand
,2,REDIS_CMD_INLINE
},
513 {"llen",llenCommand
,2,REDIS_CMD_INLINE
},
514 {"lindex",lindexCommand
,3,REDIS_CMD_INLINE
},
515 {"lset",lsetCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
516 {"lrange",lrangeCommand
,4,REDIS_CMD_INLINE
},
517 {"ltrim",ltrimCommand
,4,REDIS_CMD_INLINE
},
518 {"lrem",lremCommand
,4,REDIS_CMD_BULK
},
519 {"rpoplpush",rpoplpushcommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
520 {"sadd",saddCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
521 {"srem",sremCommand
,3,REDIS_CMD_BULK
},
522 {"smove",smoveCommand
,4,REDIS_CMD_BULK
},
523 {"sismember",sismemberCommand
,3,REDIS_CMD_BULK
},
524 {"scard",scardCommand
,2,REDIS_CMD_INLINE
},
525 {"spop",spopCommand
,2,REDIS_CMD_INLINE
},
526 {"srandmember",srandmemberCommand
,2,REDIS_CMD_INLINE
},
527 {"sinter",sinterCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
528 {"sinterstore",sinterstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
529 {"sunion",sunionCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
530 {"sunionstore",sunionstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
531 {"sdiff",sdiffCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
532 {"sdiffstore",sdiffstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
533 {"smembers",sinterCommand
,2,REDIS_CMD_INLINE
},
534 {"zadd",zaddCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
535 {"zincrby",zincrbyCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
536 {"zrem",zremCommand
,3,REDIS_CMD_BULK
},
537 {"zremrangebyscore",zremrangebyscoreCommand
,4,REDIS_CMD_INLINE
},
538 {"zrange",zrangeCommand
,4,REDIS_CMD_INLINE
},
539 {"zrangebyscore",zrangebyscoreCommand
,-4,REDIS_CMD_INLINE
},
540 {"zrevrange",zrevrangeCommand
,4,REDIS_CMD_INLINE
},
541 {"zcard",zcardCommand
,2,REDIS_CMD_INLINE
},
542 {"zscore",zscoreCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
543 {"incrby",incrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
544 {"decrby",decrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
545 {"getset",getsetCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
546 {"mset",msetCommand
,-3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
547 {"msetnx",msetnxCommand
,-3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
548 {"randomkey",randomkeyCommand
,1,REDIS_CMD_INLINE
},
549 {"select",selectCommand
,2,REDIS_CMD_INLINE
},
550 {"move",moveCommand
,3,REDIS_CMD_INLINE
},
551 {"rename",renameCommand
,3,REDIS_CMD_INLINE
},
552 {"renamenx",renamenxCommand
,3,REDIS_CMD_INLINE
},
553 {"expire",expireCommand
,3,REDIS_CMD_INLINE
},
554 {"expireat",expireatCommand
,3,REDIS_CMD_INLINE
},
555 {"keys",keysCommand
,2,REDIS_CMD_INLINE
},
556 {"dbsize",dbsizeCommand
,1,REDIS_CMD_INLINE
},
557 {"auth",authCommand
,2,REDIS_CMD_INLINE
},
558 {"ping",pingCommand
,1,REDIS_CMD_INLINE
},
559 {"echo",echoCommand
,2,REDIS_CMD_BULK
},
560 {"save",saveCommand
,1,REDIS_CMD_INLINE
},
561 {"bgsave",bgsaveCommand
,1,REDIS_CMD_INLINE
},
562 {"bgrewriteaof",bgrewriteaofCommand
,1,REDIS_CMD_INLINE
},
563 {"shutdown",shutdownCommand
,1,REDIS_CMD_INLINE
},
564 {"lastsave",lastsaveCommand
,1,REDIS_CMD_INLINE
},
565 {"type",typeCommand
,2,REDIS_CMD_INLINE
},
566 {"sync",syncCommand
,1,REDIS_CMD_INLINE
},
567 {"flushdb",flushdbCommand
,1,REDIS_CMD_INLINE
},
568 {"flushall",flushallCommand
,1,REDIS_CMD_INLINE
},
569 {"sort",sortCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
570 {"info",infoCommand
,1,REDIS_CMD_INLINE
},
571 {"monitor",monitorCommand
,1,REDIS_CMD_INLINE
},
572 {"ttl",ttlCommand
,2,REDIS_CMD_INLINE
},
573 {"slaveof",slaveofCommand
,3,REDIS_CMD_INLINE
},
574 {"debug",debugCommand
,-2,REDIS_CMD_INLINE
},
578 /*============================ Utility functions ============================ */
580 /* Glob-style pattern matching. */
581 int stringmatchlen(const char *pattern
, int patternLen
,
582 const char *string
, int stringLen
, int nocase
)
587 while (pattern
[1] == '*') {
592 return 1; /* match */
594 if (stringmatchlen(pattern
+1, patternLen
-1,
595 string
, stringLen
, nocase
))
596 return 1; /* match */
600 return 0; /* no match */
604 return 0; /* no match */
614 not = pattern
[0] == '^';
621 if (pattern
[0] == '\\') {
624 if (pattern
[0] == string
[0])
626 } else if (pattern
[0] == ']') {
628 } else if (patternLen
== 0) {
632 } else if (pattern
[1] == '-' && patternLen
>= 3) {
633 int start
= pattern
[0];
634 int end
= pattern
[2];
642 start
= tolower(start
);
648 if (c
>= start
&& c
<= end
)
652 if (pattern
[0] == string
[0])
655 if (tolower((int)pattern
[0]) == tolower((int)string
[0]))
665 return 0; /* no match */
671 if (patternLen
>= 2) {
678 if (pattern
[0] != string
[0])
679 return 0; /* no match */
681 if (tolower((int)pattern
[0]) != tolower((int)string
[0]))
682 return 0; /* no match */
690 if (stringLen
== 0) {
691 while(*pattern
== '*') {
698 if (patternLen
== 0 && stringLen
== 0)
703 static void redisLog(int level
, const char *fmt
, ...) {
707 fp
= (server
.logfile
== NULL
) ? stdout
: fopen(server
.logfile
,"a");
711 if (level
>= server
.verbosity
) {
717 strftime(buf
,64,"%d %b %H:%M:%S",localtime(&now
));
718 fprintf(fp
,"%s %c ",buf
,c
[level
]);
719 vfprintf(fp
, fmt
, ap
);
725 if (server
.logfile
) fclose(fp
);
728 /*====================== Hash table type implementation ==================== */
730 /* This is an hash table type that uses the SDS dynamic strings libary as
731 * keys and radis objects as values (objects can hold SDS strings,
734 static void dictVanillaFree(void *privdata
, void *val
)
736 DICT_NOTUSED(privdata
);
740 static int sdsDictKeyCompare(void *privdata
, const void *key1
,
744 DICT_NOTUSED(privdata
);
746 l1
= sdslen((sds
)key1
);
747 l2
= sdslen((sds
)key2
);
748 if (l1
!= l2
) return 0;
749 return memcmp(key1
, key2
, l1
) == 0;
752 static void dictRedisObjectDestructor(void *privdata
, void *val
)
754 DICT_NOTUSED(privdata
);
759 static int dictObjKeyCompare(void *privdata
, const void *key1
,
762 const robj
*o1
= key1
, *o2
= key2
;
763 return sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
766 static unsigned int dictObjHash(const void *key
) {
768 return dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
771 static int dictEncObjKeyCompare(void *privdata
, const void *key1
,
774 robj
*o1
= (robj
*) key1
, *o2
= (robj
*) key2
;
777 o1
= getDecodedObject(o1
);
778 o2
= getDecodedObject(o2
);
779 cmp
= sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
785 static unsigned int dictEncObjHash(const void *key
) {
786 robj
*o
= (robj
*) key
;
788 o
= getDecodedObject(o
);
789 unsigned int hash
= dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
794 static dictType setDictType
= {
795 dictEncObjHash
, /* hash function */
798 dictEncObjKeyCompare
, /* key compare */
799 dictRedisObjectDestructor
, /* key destructor */
800 NULL
/* val destructor */
803 static dictType zsetDictType
= {
804 dictEncObjHash
, /* hash function */
807 dictEncObjKeyCompare
, /* key compare */
808 dictRedisObjectDestructor
, /* key destructor */
809 dictVanillaFree
/* val destructor of malloc(sizeof(double)) */
812 static dictType hashDictType
= {
813 dictObjHash
, /* hash function */
816 dictObjKeyCompare
, /* key compare */
817 dictRedisObjectDestructor
, /* key destructor */
818 dictRedisObjectDestructor
/* val destructor */
821 /* ========================= Random utility functions ======================= */
823 /* Redis generally does not try to recover from out of memory conditions
824 * when allocating objects or strings, it is not clear if it will be possible
825 * to report this condition to the client since the networking layer itself
826 * is based on heap allocation for send buffers, so we simply abort.
827 * At least the code will be simpler to read... */
828 static void oom(const char *msg
) {
829 fprintf(stderr
, "%s: Out of memory\n",msg
);
835 /* ====================== Redis server networking stuff ===================== */
836 static void closeTimedoutClients(void) {
839 time_t now
= time(NULL
);
841 listRewind(server
.clients
);
842 while ((ln
= listYield(server
.clients
)) != NULL
) {
843 c
= listNodeValue(ln
);
844 if (!(c
->flags
& REDIS_SLAVE
) && /* no timeout for slaves */
845 !(c
->flags
& REDIS_MASTER
) && /* no timeout for masters */
846 (now
- c
->lastinteraction
> server
.maxidletime
)) {
847 redisLog(REDIS_DEBUG
,"Closing idle client");
853 static int htNeedsResize(dict
*dict
) {
854 long long size
, used
;
856 size
= dictSlots(dict
);
857 used
= dictSize(dict
);
858 return (size
&& used
&& size
> DICT_HT_INITIAL_SIZE
&&
859 (used
*100/size
< REDIS_HT_MINFILL
));
862 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
863 * we resize the hash table to save memory */
864 static void tryResizeHashTables(void) {
867 for (j
= 0; j
< server
.dbnum
; j
++) {
868 if (htNeedsResize(server
.db
[j
].dict
)) {
869 redisLog(REDIS_DEBUG
,"The hash table %d is too sparse, resize it...",j
);
870 dictResize(server
.db
[j
].dict
);
871 redisLog(REDIS_DEBUG
,"Hash table %d resized.",j
);
873 if (htNeedsResize(server
.db
[j
].expires
))
874 dictResize(server
.db
[j
].expires
);
878 /* A background saving child (BGSAVE) terminated its work. Handle this. */
879 void backgroundSaveDoneHandler(int statloc
) {
880 int exitcode
= WEXITSTATUS(statloc
);
881 int bysignal
= WIFSIGNALED(statloc
);
883 if (!bysignal
&& exitcode
== 0) {
884 redisLog(REDIS_NOTICE
,
885 "Background saving terminated with success");
887 server
.lastsave
= time(NULL
);
888 } else if (!bysignal
&& exitcode
!= 0) {
889 redisLog(REDIS_WARNING
, "Background saving error");
891 redisLog(REDIS_WARNING
,
892 "Background saving terminated by signal");
893 rdbRemoveTempFile(server
.bgsavechildpid
);
895 server
.bgsavechildpid
= -1;
896 /* Possibly there are slaves waiting for a BGSAVE in order to be served
897 * (the first stage of SYNC is a bulk transfer of dump.rdb) */
898 updateSlavesWaitingBgsave(exitcode
== 0 ? REDIS_OK
: REDIS_ERR
);
901 /* A background append only file rewriting (BGREWRITEAOF) terminated its work.
903 void backgroundRewriteDoneHandler(int statloc
) {
904 int exitcode
= WEXITSTATUS(statloc
);
905 int bysignal
= WIFSIGNALED(statloc
);
907 if (!bysignal
&& exitcode
== 0) {
911 redisLog(REDIS_NOTICE
,
912 "Background append only file rewriting terminated with success");
913 /* Now it's time to flush the differences accumulated by the parent */
914 snprintf(tmpfile
,256,"temp-rewriteaof-bg-%d.aof", (int) server
.bgrewritechildpid
);
915 fd
= open(tmpfile
,O_WRONLY
|O_APPEND
);
917 redisLog(REDIS_WARNING
, "Not able to open the temp append only file produced by the child: %s", strerror(errno
));
920 /* Flush our data... */
921 if (write(fd
,server
.bgrewritebuf
,sdslen(server
.bgrewritebuf
)) !=
922 (signed) sdslen(server
.bgrewritebuf
)) {
923 redisLog(REDIS_WARNING
, "Error or short write trying to flush the parent diff of the append log file in the child temp file: %s", strerror(errno
));
927 redisLog(REDIS_WARNING
,"Parent diff flushed into the new append log file with success");
928 /* Now our work is to rename the temp file into the stable file. And
929 * switch the file descriptor used by the server for append only. */
930 if (rename(tmpfile
,server
.appendfilename
) == -1) {
931 redisLog(REDIS_WARNING
,"Can't rename the temp append only file into the stable one: %s", strerror(errno
));
935 /* Mission completed... almost */
936 redisLog(REDIS_NOTICE
,"Append only file successfully rewritten.");
937 if (server
.appendfd
!= -1) {
938 /* If append only is actually enabled... */
939 close(server
.appendfd
);
940 server
.appendfd
= fd
;
942 server
.appendseldb
= -1; /* Make sure it will issue SELECT */
943 redisLog(REDIS_NOTICE
,"The new append only file was selected for future appends.");
945 /* If append only is disabled we just generate a dump in this
946 * format. Why not? */
949 } else if (!bysignal
&& exitcode
!= 0) {
950 redisLog(REDIS_WARNING
, "Background append only file rewriting error");
952 redisLog(REDIS_WARNING
,
953 "Background append only file rewriting terminated by signal");
956 sdsfree(server
.bgrewritebuf
);
957 server
.bgrewritebuf
= sdsempty();
958 aofRemoveTempFile(server
.bgrewritechildpid
);
959 server
.bgrewritechildpid
= -1;
962 static int serverCron(struct aeEventLoop
*eventLoop
, long long id
, void *clientData
) {
963 int j
, loops
= server
.cronloops
++;
964 REDIS_NOTUSED(eventLoop
);
966 REDIS_NOTUSED(clientData
);
968 /* Update the global state with the amount of used memory */
969 server
.usedmemory
= zmalloc_used_memory();
971 /* Show some info about non-empty databases */
972 for (j
= 0; j
< server
.dbnum
; j
++) {
973 long long size
, used
, vkeys
;
975 size
= dictSlots(server
.db
[j
].dict
);
976 used
= dictSize(server
.db
[j
].dict
);
977 vkeys
= dictSize(server
.db
[j
].expires
);
978 if (!(loops
% 5) && (used
|| vkeys
)) {
979 redisLog(REDIS_DEBUG
,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j
,used
,vkeys
,size
);
980 /* dictPrintStats(server.dict); */
984 /* We don't want to resize the hash tables while a bacground saving
985 * is in progress: the saving child is created using fork() that is
986 * implemented with a copy-on-write semantic in most modern systems, so
987 * if we resize the HT while there is the saving child at work actually
988 * a lot of memory movements in the parent will cause a lot of pages
990 if (server
.bgsavechildpid
== -1) tryResizeHashTables();
992 /* Show information about connected clients */
994 redisLog(REDIS_DEBUG
,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
995 listLength(server
.clients
)-listLength(server
.slaves
),
996 listLength(server
.slaves
),
998 dictSize(server
.sharingpool
));
1001 /* Close connections of timedout clients */
1002 if (server
.maxidletime
&& !(loops
% 10))
1003 closeTimedoutClients();
1005 /* Check if a background saving or AOF rewrite in progress terminated */
1006 if (server
.bgsavechildpid
!= -1 || server
.bgrewritechildpid
!= -1) {
1010 if ((pid
= wait3(&statloc
,WNOHANG
,NULL
)) != 0) {
1011 if (pid
== server
.bgsavechildpid
) {
1012 backgroundSaveDoneHandler(statloc
);
1014 backgroundRewriteDoneHandler(statloc
);
1018 /* If there is not a background saving in progress check if
1019 * we have to save now */
1020 time_t now
= time(NULL
);
1021 for (j
= 0; j
< server
.saveparamslen
; j
++) {
1022 struct saveparam
*sp
= server
.saveparams
+j
;
1024 if (server
.dirty
>= sp
->changes
&&
1025 now
-server
.lastsave
> sp
->seconds
) {
1026 redisLog(REDIS_NOTICE
,"%d changes in %d seconds. Saving...",
1027 sp
->changes
, sp
->seconds
);
1028 rdbSaveBackground(server
.dbfilename
);
1034 /* Try to expire a few timed out keys. The algorithm used is adaptive and
1035 * will use few CPU cycles if there are few expiring keys, otherwise
1036 * it will get more aggressive to avoid that too much memory is used by
1037 * keys that can be removed from the keyspace. */
1038 for (j
= 0; j
< server
.dbnum
; j
++) {
1040 redisDb
*db
= server
.db
+j
;
1042 /* Continue to expire if at the end of the cycle more than 25%
1043 * of the keys were expired. */
1045 int num
= dictSize(db
->expires
);
1046 time_t now
= time(NULL
);
1049 if (num
> REDIS_EXPIRELOOKUPS_PER_CRON
)
1050 num
= REDIS_EXPIRELOOKUPS_PER_CRON
;
1055 if ((de
= dictGetRandomKey(db
->expires
)) == NULL
) break;
1056 t
= (time_t) dictGetEntryVal(de
);
1058 deleteKey(db
,dictGetEntryKey(de
));
1062 } while (expired
> REDIS_EXPIRELOOKUPS_PER_CRON
/4);
1065 /* Check if we should connect to a MASTER */
1066 if (server
.replstate
== REDIS_REPL_CONNECT
) {
1067 redisLog(REDIS_NOTICE
,"Connecting to MASTER...");
1068 if (syncWithMaster() == REDIS_OK
) {
1069 redisLog(REDIS_NOTICE
,"MASTER <-> SLAVE sync succeeded");
1075 static void createSharedObjects(void) {
1076 shared
.crlf
= createObject(REDIS_STRING
,sdsnew("\r\n"));
1077 shared
.ok
= createObject(REDIS_STRING
,sdsnew("+OK\r\n"));
1078 shared
.err
= createObject(REDIS_STRING
,sdsnew("-ERR\r\n"));
1079 shared
.emptybulk
= createObject(REDIS_STRING
,sdsnew("$0\r\n\r\n"));
1080 shared
.czero
= createObject(REDIS_STRING
,sdsnew(":0\r\n"));
1081 shared
.cone
= createObject(REDIS_STRING
,sdsnew(":1\r\n"));
1082 shared
.nullbulk
= createObject(REDIS_STRING
,sdsnew("$-1\r\n"));
1083 shared
.nullmultibulk
= createObject(REDIS_STRING
,sdsnew("*-1\r\n"));
1084 shared
.emptymultibulk
= createObject(REDIS_STRING
,sdsnew("*0\r\n"));
1086 shared
.pong
= createObject(REDIS_STRING
,sdsnew("+PONG\r\n"));
1087 shared
.wrongtypeerr
= createObject(REDIS_STRING
,sdsnew(
1088 "-ERR Operation against a key holding the wrong kind of value\r\n"));
1089 shared
.nokeyerr
= createObject(REDIS_STRING
,sdsnew(
1090 "-ERR no such key\r\n"));
1091 shared
.syntaxerr
= createObject(REDIS_STRING
,sdsnew(
1092 "-ERR syntax error\r\n"));
1093 shared
.sameobjecterr
= createObject(REDIS_STRING
,sdsnew(
1094 "-ERR source and destination objects are the same\r\n"));
1095 shared
.outofrangeerr
= createObject(REDIS_STRING
,sdsnew(
1096 "-ERR index out of range\r\n"));
1097 shared
.space
= createObject(REDIS_STRING
,sdsnew(" "));
1098 shared
.colon
= createObject(REDIS_STRING
,sdsnew(":"));
1099 shared
.plus
= createObject(REDIS_STRING
,sdsnew("+"));
1100 shared
.select0
= createStringObject("select 0\r\n",10);
1101 shared
.select1
= createStringObject("select 1\r\n",10);
1102 shared
.select2
= createStringObject("select 2\r\n",10);
1103 shared
.select3
= createStringObject("select 3\r\n",10);
1104 shared
.select4
= createStringObject("select 4\r\n",10);
1105 shared
.select5
= createStringObject("select 5\r\n",10);
1106 shared
.select6
= createStringObject("select 6\r\n",10);
1107 shared
.select7
= createStringObject("select 7\r\n",10);
1108 shared
.select8
= createStringObject("select 8\r\n",10);
1109 shared
.select9
= createStringObject("select 9\r\n",10);
1112 static void appendServerSaveParams(time_t seconds
, int changes
) {
1113 server
.saveparams
= zrealloc(server
.saveparams
,sizeof(struct saveparam
)*(server
.saveparamslen
+1));
1114 server
.saveparams
[server
.saveparamslen
].seconds
= seconds
;
1115 server
.saveparams
[server
.saveparamslen
].changes
= changes
;
1116 server
.saveparamslen
++;
1119 static void resetServerSaveParams() {
1120 zfree(server
.saveparams
);
1121 server
.saveparams
= NULL
;
1122 server
.saveparamslen
= 0;
1125 static void initServerConfig() {
1126 server
.dbnum
= REDIS_DEFAULT_DBNUM
;
1127 server
.port
= REDIS_SERVERPORT
;
1128 server
.verbosity
= REDIS_DEBUG
;
1129 server
.maxidletime
= REDIS_MAXIDLETIME
;
1130 server
.saveparams
= NULL
;
1131 server
.logfile
= NULL
; /* NULL = log on standard output */
1132 server
.bindaddr
= NULL
;
1133 server
.glueoutputbuf
= 1;
1134 server
.daemonize
= 0;
1135 server
.appendonly
= 0;
1136 server
.appendfsync
= APPENDFSYNC_ALWAYS
;
1137 server
.lastfsync
= time(NULL
);
1138 server
.appendfd
= -1;
1139 server
.appendseldb
= -1; /* Make sure the first time will not match */
1140 server
.pidfile
= "/var/run/redis.pid";
1141 server
.dbfilename
= "dump.rdb";
1142 server
.appendfilename
= "appendonly.aof";
1143 server
.requirepass
= NULL
;
1144 server
.shareobjects
= 0;
1145 server
.sharingpoolsize
= 1024;
1146 server
.maxclients
= 0;
1147 server
.maxmemory
= 0;
1148 resetServerSaveParams();
1150 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1151 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1152 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1153 /* Replication related */
1155 server
.masterauth
= NULL
;
1156 server
.masterhost
= NULL
;
1157 server
.masterport
= 6379;
1158 server
.master
= NULL
;
1159 server
.replstate
= REDIS_REPL_NONE
;
1161 /* Double constants initialization */
1163 R_PosInf
= 1.0/R_Zero
;
1164 R_NegInf
= -1.0/R_Zero
;
1165 R_Nan
= R_Zero
/R_Zero
;
1168 static void initServer() {
1171 signal(SIGHUP
, SIG_IGN
);
1172 signal(SIGPIPE
, SIG_IGN
);
1173 setupSigSegvAction();
1175 server
.clients
= listCreate();
1176 server
.slaves
= listCreate();
1177 server
.monitors
= listCreate();
1178 server
.objfreelist
= listCreate();
1179 createSharedObjects();
1180 server
.el
= aeCreateEventLoop();
1181 server
.db
= zmalloc(sizeof(redisDb
)*server
.dbnum
);
1182 server
.sharingpool
= dictCreate(&setDictType
,NULL
);
1183 server
.fd
= anetTcpServer(server
.neterr
, server
.port
, server
.bindaddr
);
1184 if (server
.fd
== -1) {
1185 redisLog(REDIS_WARNING
, "Opening TCP port: %s", server
.neterr
);
1188 for (j
= 0; j
< server
.dbnum
; j
++) {
1189 server
.db
[j
].dict
= dictCreate(&hashDictType
,NULL
);
1190 server
.db
[j
].expires
= dictCreate(&setDictType
,NULL
);
1191 server
.db
[j
].id
= j
;
1193 server
.cronloops
= 0;
1194 server
.bgsavechildpid
= -1;
1195 server
.bgrewritechildpid
= -1;
1196 server
.bgrewritebuf
= sdsempty();
1197 server
.lastsave
= time(NULL
);
1199 server
.usedmemory
= 0;
1200 server
.stat_numcommands
= 0;
1201 server
.stat_numconnections
= 0;
1202 server
.stat_starttime
= time(NULL
);
1203 aeCreateTimeEvent(server
.el
, 1, serverCron
, NULL
, NULL
);
1205 if (server
.appendonly
) {
1206 server
.appendfd
= open(server
.appendfilename
,O_WRONLY
|O_APPEND
|O_CREAT
,0644);
1207 if (server
.appendfd
== -1) {
1208 redisLog(REDIS_WARNING
, "Can't open the append-only file: %s",
1215 /* Empty the whole database */
1216 static long long emptyDb() {
1218 long long removed
= 0;
1220 for (j
= 0; j
< server
.dbnum
; j
++) {
1221 removed
+= dictSize(server
.db
[j
].dict
);
1222 dictEmpty(server
.db
[j
].dict
);
1223 dictEmpty(server
.db
[j
].expires
);
1228 static int yesnotoi(char *s
) {
1229 if (!strcasecmp(s
,"yes")) return 1;
1230 else if (!strcasecmp(s
,"no")) return 0;
1234 /* I agree, this is a very rudimental way to load a configuration...
1235 will improve later if the config gets more complex */
1236 static void loadServerConfig(char *filename
) {
1238 char buf
[REDIS_CONFIGLINE_MAX
+1], *err
= NULL
;
1242 if (filename
[0] == '-' && filename
[1] == '\0')
1245 if ((fp
= fopen(filename
,"r")) == NULL
) {
1246 redisLog(REDIS_WARNING
,"Fatal error, can't open config file");
1251 while(fgets(buf
,REDIS_CONFIGLINE_MAX
+1,fp
) != NULL
) {
1257 line
= sdstrim(line
," \t\r\n");
1259 /* Skip comments and blank lines*/
1260 if (line
[0] == '#' || line
[0] == '\0') {
1265 /* Split into arguments */
1266 argv
= sdssplitlen(line
,sdslen(line
)," ",1,&argc
);
1267 sdstolower(argv
[0]);
1269 /* Execute config directives */
1270 if (!strcasecmp(argv
[0],"timeout") && argc
== 2) {
1271 server
.maxidletime
= atoi(argv
[1]);
1272 if (server
.maxidletime
< 0) {
1273 err
= "Invalid timeout value"; goto loaderr
;
1275 } else if (!strcasecmp(argv
[0],"port") && argc
== 2) {
1276 server
.port
= atoi(argv
[1]);
1277 if (server
.port
< 1 || server
.port
> 65535) {
1278 err
= "Invalid port"; goto loaderr
;
1280 } else if (!strcasecmp(argv
[0],"bind") && argc
== 2) {
1281 server
.bindaddr
= zstrdup(argv
[1]);
1282 } else if (!strcasecmp(argv
[0],"save") && argc
== 3) {
1283 int seconds
= atoi(argv
[1]);
1284 int changes
= atoi(argv
[2]);
1285 if (seconds
< 1 || changes
< 0) {
1286 err
= "Invalid save parameters"; goto loaderr
;
1288 appendServerSaveParams(seconds
,changes
);
1289 } else if (!strcasecmp(argv
[0],"dir") && argc
== 2) {
1290 if (chdir(argv
[1]) == -1) {
1291 redisLog(REDIS_WARNING
,"Can't chdir to '%s': %s",
1292 argv
[1], strerror(errno
));
1295 } else if (!strcasecmp(argv
[0],"loglevel") && argc
== 2) {
1296 if (!strcasecmp(argv
[1],"debug")) server
.verbosity
= REDIS_DEBUG
;
1297 else if (!strcasecmp(argv
[1],"notice")) server
.verbosity
= REDIS_NOTICE
;
1298 else if (!strcasecmp(argv
[1],"warning")) server
.verbosity
= REDIS_WARNING
;
1300 err
= "Invalid log level. Must be one of debug, notice, warning";
1303 } else if (!strcasecmp(argv
[0],"logfile") && argc
== 2) {
1306 server
.logfile
= zstrdup(argv
[1]);
1307 if (!strcasecmp(server
.logfile
,"stdout")) {
1308 zfree(server
.logfile
);
1309 server
.logfile
= NULL
;
1311 if (server
.logfile
) {
1312 /* Test if we are able to open the file. The server will not
1313 * be able to abort just for this problem later... */
1314 logfp
= fopen(server
.logfile
,"a");
1315 if (logfp
== NULL
) {
1316 err
= sdscatprintf(sdsempty(),
1317 "Can't open the log file: %s", strerror(errno
));
1322 } else if (!strcasecmp(argv
[0],"databases") && argc
== 2) {
1323 server
.dbnum
= atoi(argv
[1]);
1324 if (server
.dbnum
< 1) {
1325 err
= "Invalid number of databases"; goto loaderr
;
1327 } else if (!strcasecmp(argv
[0],"maxclients") && argc
== 2) {
1328 server
.maxclients
= atoi(argv
[1]);
1329 } else if (!strcasecmp(argv
[0],"maxmemory") && argc
== 2) {
1330 server
.maxmemory
= strtoll(argv
[1], NULL
, 10);
1331 } else if (!strcasecmp(argv
[0],"slaveof") && argc
== 3) {
1332 server
.masterhost
= sdsnew(argv
[1]);
1333 server
.masterport
= atoi(argv
[2]);
1334 server
.replstate
= REDIS_REPL_CONNECT
;
1335 } else if (!strcasecmp(argv
[0],"masterauth") && argc
== 2) {
1336 server
.masterauth
= zstrdup(argv
[1]);
1337 } else if (!strcasecmp(argv
[0],"glueoutputbuf") && argc
== 2) {
1338 if ((server
.glueoutputbuf
= yesnotoi(argv
[1])) == -1) {
1339 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1341 } else if (!strcasecmp(argv
[0],"shareobjects") && argc
== 2) {
1342 if ((server
.shareobjects
= yesnotoi(argv
[1])) == -1) {
1343 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1345 } else if (!strcasecmp(argv
[0],"shareobjectspoolsize") && argc
== 2) {
1346 server
.sharingpoolsize
= atoi(argv
[1]);
1347 if (server
.sharingpoolsize
< 1) {
1348 err
= "invalid object sharing pool size"; goto loaderr
;
1350 } else if (!strcasecmp(argv
[0],"daemonize") && argc
== 2) {
1351 if ((server
.daemonize
= yesnotoi(argv
[1])) == -1) {
1352 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1354 } else if (!strcasecmp(argv
[0],"appendonly") && argc
== 2) {
1355 if ((server
.appendonly
= yesnotoi(argv
[1])) == -1) {
1356 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1358 } else if (!strcasecmp(argv
[0],"appendfsync") && argc
== 2) {
1359 if (!strcasecmp(argv
[1],"no")) {
1360 server
.appendfsync
= APPENDFSYNC_NO
;
1361 } else if (!strcasecmp(argv
[1],"always")) {
1362 server
.appendfsync
= APPENDFSYNC_ALWAYS
;
1363 } else if (!strcasecmp(argv
[1],"everysec")) {
1364 server
.appendfsync
= APPENDFSYNC_EVERYSEC
;
1366 err
= "argument must be 'no', 'always' or 'everysec'";
1369 } else if (!strcasecmp(argv
[0],"requirepass") && argc
== 2) {
1370 server
.requirepass
= zstrdup(argv
[1]);
1371 } else if (!strcasecmp(argv
[0],"pidfile") && argc
== 2) {
1372 server
.pidfile
= zstrdup(argv
[1]);
1373 } else if (!strcasecmp(argv
[0],"dbfilename") && argc
== 2) {
1374 server
.dbfilename
= zstrdup(argv
[1]);
1376 err
= "Bad directive or wrong number of arguments"; goto loaderr
;
1378 for (j
= 0; j
< argc
; j
++)
1383 if (fp
!= stdin
) fclose(fp
);
1387 fprintf(stderr
, "\n*** FATAL CONFIG FILE ERROR ***\n");
1388 fprintf(stderr
, "Reading the configuration file, at line %d\n", linenum
);
1389 fprintf(stderr
, ">>> '%s'\n", line
);
1390 fprintf(stderr
, "%s\n", err
);
1394 static void freeClientArgv(redisClient
*c
) {
1397 for (j
= 0; j
< c
->argc
; j
++)
1398 decrRefCount(c
->argv
[j
]);
1399 for (j
= 0; j
< c
->mbargc
; j
++)
1400 decrRefCount(c
->mbargv
[j
]);
1405 static void freeClient(redisClient
*c
) {
1408 aeDeleteFileEvent(server
.el
,c
->fd
,AE_READABLE
);
1409 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1410 sdsfree(c
->querybuf
);
1411 listRelease(c
->reply
);
1414 ln
= listSearchKey(server
.clients
,c
);
1415 redisAssert(ln
!= NULL
);
1416 listDelNode(server
.clients
,ln
);
1417 if (c
->flags
& REDIS_SLAVE
) {
1418 if (c
->replstate
== REDIS_REPL_SEND_BULK
&& c
->repldbfd
!= -1)
1420 list
*l
= (c
->flags
& REDIS_MONITOR
) ? server
.monitors
: server
.slaves
;
1421 ln
= listSearchKey(l
,c
);
1422 redisAssert(ln
!= NULL
);
1425 if (c
->flags
& REDIS_MASTER
) {
1426 server
.master
= NULL
;
1427 server
.replstate
= REDIS_REPL_CONNECT
;
1434 #define GLUEREPLY_UP_TO (1024)
1435 static void glueReplyBuffersIfNeeded(redisClient
*c
) {
1437 char buf
[GLUEREPLY_UP_TO
];
1441 listRewind(c
->reply
);
1442 while((ln
= listYield(c
->reply
))) {
1446 objlen
= sdslen(o
->ptr
);
1447 if (copylen
+ objlen
<= GLUEREPLY_UP_TO
) {
1448 memcpy(buf
+copylen
,o
->ptr
,objlen
);
1450 listDelNode(c
->reply
,ln
);
1452 if (copylen
== 0) return;
1456 /* Now the output buffer is empty, add the new single element */
1457 o
= createObject(REDIS_STRING
,sdsnewlen(buf
,copylen
));
1458 listAddNodeHead(c
->reply
,o
);
1461 static void sendReplyToClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1462 redisClient
*c
= privdata
;
1463 int nwritten
= 0, totwritten
= 0, objlen
;
1466 REDIS_NOTUSED(mask
);
1468 /* Use writev() if we have enough buffers to send */
1469 if (!server
.glueoutputbuf
&&
1470 listLength(c
->reply
) > REDIS_WRITEV_THRESHOLD
&&
1471 !(c
->flags
& REDIS_MASTER
))
1473 sendReplyToClientWritev(el
, fd
, privdata
, mask
);
1477 while(listLength(c
->reply
)) {
1478 if (server
.glueoutputbuf
&& listLength(c
->reply
) > 1)
1479 glueReplyBuffersIfNeeded(c
);
1481 o
= listNodeValue(listFirst(c
->reply
));
1482 objlen
= sdslen(o
->ptr
);
1485 listDelNode(c
->reply
,listFirst(c
->reply
));
1489 if (c
->flags
& REDIS_MASTER
) {
1490 /* Don't reply to a master */
1491 nwritten
= objlen
- c
->sentlen
;
1493 nwritten
= write(fd
, ((char*)o
->ptr
)+c
->sentlen
, objlen
- c
->sentlen
);
1494 if (nwritten
<= 0) break;
1496 c
->sentlen
+= nwritten
;
1497 totwritten
+= nwritten
;
1498 /* If we fully sent the object on head go to the next one */
1499 if (c
->sentlen
== objlen
) {
1500 listDelNode(c
->reply
,listFirst(c
->reply
));
1503 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1504 * bytes, in a single threaded server it's a good idea to serve
1505 * other clients as well, even if a very large request comes from
1506 * super fast link that is always able to accept data (in real world
1507 * scenario think about 'KEYS *' against the loopback interfae) */
1508 if (totwritten
> REDIS_MAX_WRITE_PER_EVENT
) break;
1510 if (nwritten
== -1) {
1511 if (errno
== EAGAIN
) {
1514 redisLog(REDIS_DEBUG
,
1515 "Error writing to client: %s", strerror(errno
));
1520 if (totwritten
> 0) c
->lastinteraction
= time(NULL
);
1521 if (listLength(c
->reply
) == 0) {
1523 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1527 static void sendReplyToClientWritev(aeEventLoop
*el
, int fd
, void *privdata
, int mask
)
1529 redisClient
*c
= privdata
;
1530 int nwritten
= 0, totwritten
= 0, objlen
, willwrite
;
1532 struct iovec iov
[REDIS_WRITEV_IOVEC_COUNT
];
1533 int offset
, ion
= 0;
1535 REDIS_NOTUSED(mask
);
1538 while (listLength(c
->reply
)) {
1539 offset
= c
->sentlen
;
1543 /* fill-in the iov[] array */
1544 for(node
= listFirst(c
->reply
); node
; node
= listNextNode(node
)) {
1545 o
= listNodeValue(node
);
1546 objlen
= sdslen(o
->ptr
);
1548 if (totwritten
+ objlen
- offset
> REDIS_MAX_WRITE_PER_EVENT
)
1551 if(ion
== REDIS_WRITEV_IOVEC_COUNT
)
1552 break; /* no more iovecs */
1554 iov
[ion
].iov_base
= ((char*)o
->ptr
) + offset
;
1555 iov
[ion
].iov_len
= objlen
- offset
;
1556 willwrite
+= objlen
- offset
;
1557 offset
= 0; /* just for the first item */
1564 /* write all collected blocks at once */
1565 if((nwritten
= writev(fd
, iov
, ion
)) < 0) {
1566 if (errno
!= EAGAIN
) {
1567 redisLog(REDIS_DEBUG
,
1568 "Error writing to client: %s", strerror(errno
));
1575 totwritten
+= nwritten
;
1576 offset
= c
->sentlen
;
1578 /* remove written robjs from c->reply */
1579 while (nwritten
&& listLength(c
->reply
)) {
1580 o
= listNodeValue(listFirst(c
->reply
));
1581 objlen
= sdslen(o
->ptr
);
1583 if(nwritten
>= objlen
- offset
) {
1584 listDelNode(c
->reply
, listFirst(c
->reply
));
1585 nwritten
-= objlen
- offset
;
1589 c
->sentlen
+= nwritten
;
1597 c
->lastinteraction
= time(NULL
);
1599 if (listLength(c
->reply
) == 0) {
1601 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1605 static struct redisCommand
*lookupCommand(char *name
) {
1607 while(cmdTable
[j
].name
!= NULL
) {
1608 if (!strcasecmp(name
,cmdTable
[j
].name
)) return &cmdTable
[j
];
1614 /* resetClient prepare the client to process the next command */
1615 static void resetClient(redisClient
*c
) {
1621 /* If this function gets called we already read a whole
1622 * command, argments are in the client argv/argc fields.
1623 * processCommand() execute the command or prepare the
1624 * server for a bulk read from the client.
1626 * If 1 is returned the client is still alive and valid and
1627 * and other operations can be performed by the caller. Otherwise
1628 * if 0 is returned the client was destroied (i.e. after QUIT). */
1629 static int processCommand(redisClient
*c
) {
1630 struct redisCommand
*cmd
;
1633 /* Free some memory if needed (maxmemory setting) */
1634 if (server
.maxmemory
) freeMemoryIfNeeded();
1636 /* Handle the multi bulk command type. This is an alternative protocol
1637 * supported by Redis in order to receive commands that are composed of
1638 * multiple binary-safe "bulk" arguments. The latency of processing is
1639 * a bit higher but this allows things like multi-sets, so if this
1640 * protocol is used only for MSET and similar commands this is a big win. */
1641 if (c
->multibulk
== 0 && c
->argc
== 1 && ((char*)(c
->argv
[0]->ptr
))[0] == '*') {
1642 c
->multibulk
= atoi(((char*)c
->argv
[0]->ptr
)+1);
1643 if (c
->multibulk
<= 0) {
1647 decrRefCount(c
->argv
[c
->argc
-1]);
1651 } else if (c
->multibulk
) {
1652 if (c
->bulklen
== -1) {
1653 if (((char*)c
->argv
[0]->ptr
)[0] != '$') {
1654 addReplySds(c
,sdsnew("-ERR multi bulk protocol error\r\n"));
1658 int bulklen
= atoi(((char*)c
->argv
[0]->ptr
)+1);
1659 decrRefCount(c
->argv
[0]);
1660 if (bulklen
< 0 || bulklen
> 1024*1024*1024) {
1662 addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n"));
1667 c
->bulklen
= bulklen
+2; /* add two bytes for CR+LF */
1671 c
->mbargv
= zrealloc(c
->mbargv
,(sizeof(robj
*))*(c
->mbargc
+1));
1672 c
->mbargv
[c
->mbargc
] = c
->argv
[0];
1676 if (c
->multibulk
== 0) {
1680 /* Here we need to swap the multi-bulk argc/argv with the
1681 * normal argc/argv of the client structure. */
1683 c
->argv
= c
->mbargv
;
1684 c
->mbargv
= auxargv
;
1687 c
->argc
= c
->mbargc
;
1688 c
->mbargc
= auxargc
;
1690 /* We need to set bulklen to something different than -1
1691 * in order for the code below to process the command without
1692 * to try to read the last argument of a bulk command as
1693 * a special argument. */
1695 /* continue below and process the command */
1702 /* -- end of multi bulk commands processing -- */
1704 /* The QUIT command is handled as a special case. Normal command
1705 * procs are unable to close the client connection safely */
1706 if (!strcasecmp(c
->argv
[0]->ptr
,"quit")) {
1710 cmd
= lookupCommand(c
->argv
[0]->ptr
);
1712 addReplySds(c
,sdsnew("-ERR unknown command\r\n"));
1715 } else if ((cmd
->arity
> 0 && cmd
->arity
!= c
->argc
) ||
1716 (c
->argc
< -cmd
->arity
)) {
1717 addReplySds(c
,sdsnew("-ERR wrong number of arguments\r\n"));
1720 } else if (server
.maxmemory
&& cmd
->flags
& REDIS_CMD_DENYOOM
&& zmalloc_used_memory() > server
.maxmemory
) {
1721 addReplySds(c
,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1724 } else if (cmd
->flags
& REDIS_CMD_BULK
&& c
->bulklen
== -1) {
1725 int bulklen
= atoi(c
->argv
[c
->argc
-1]->ptr
);
1727 decrRefCount(c
->argv
[c
->argc
-1]);
1728 if (bulklen
< 0 || bulklen
> 1024*1024*1024) {
1730 addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n"));
1735 c
->bulklen
= bulklen
+2; /* add two bytes for CR+LF */
1736 /* It is possible that the bulk read is already in the
1737 * buffer. Check this condition and handle it accordingly.
1738 * This is just a fast path, alternative to call processInputBuffer().
1739 * It's a good idea since the code is small and this condition
1740 * happens most of the times. */
1741 if ((signed)sdslen(c
->querybuf
) >= c
->bulklen
) {
1742 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
1744 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
1749 /* Let's try to share objects on the command arguments vector */
1750 if (server
.shareobjects
) {
1752 for(j
= 1; j
< c
->argc
; j
++)
1753 c
->argv
[j
] = tryObjectSharing(c
->argv
[j
]);
1755 /* Let's try to encode the bulk object to save space. */
1756 if (cmd
->flags
& REDIS_CMD_BULK
)
1757 tryObjectEncoding(c
->argv
[c
->argc
-1]);
1759 /* Check if the user is authenticated */
1760 if (server
.requirepass
&& !c
->authenticated
&& cmd
->proc
!= authCommand
) {
1761 addReplySds(c
,sdsnew("-ERR operation not permitted\r\n"));
1766 /* Exec the command */
1767 dirty
= server
.dirty
;
1769 if (server
.appendonly
&& server
.dirty
-dirty
)
1770 feedAppendOnlyFile(cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1771 if (server
.dirty
-dirty
&& listLength(server
.slaves
))
1772 replicationFeedSlaves(server
.slaves
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1773 if (listLength(server
.monitors
))
1774 replicationFeedSlaves(server
.monitors
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1775 server
.stat_numcommands
++;
1777 /* Prepare the client for the next command */
1778 if (c
->flags
& REDIS_CLOSE
) {
1786 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
) {
1790 /* (args*2)+1 is enough room for args, spaces, newlines */
1791 robj
*static_outv
[REDIS_STATIC_ARGS
*2+1];
1793 if (argc
<= REDIS_STATIC_ARGS
) {
1796 outv
= zmalloc(sizeof(robj
*)*(argc
*2+1));
1799 for (j
= 0; j
< argc
; j
++) {
1800 if (j
!= 0) outv
[outc
++] = shared
.space
;
1801 if ((cmd
->flags
& REDIS_CMD_BULK
) && j
== argc
-1) {
1804 lenobj
= createObject(REDIS_STRING
,
1805 sdscatprintf(sdsempty(),"%lu\r\n",
1806 stringObjectLen(argv
[j
])));
1807 lenobj
->refcount
= 0;
1808 outv
[outc
++] = lenobj
;
1810 outv
[outc
++] = argv
[j
];
1812 outv
[outc
++] = shared
.crlf
;
1814 /* Increment all the refcounts at start and decrement at end in order to
1815 * be sure to free objects if there is no slave in a replication state
1816 * able to be feed with commands */
1817 for (j
= 0; j
< outc
; j
++) incrRefCount(outv
[j
]);
1819 while((ln
= listYield(slaves
))) {
1820 redisClient
*slave
= ln
->value
;
1822 /* Don't feed slaves that are still waiting for BGSAVE to start */
1823 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) continue;
1825 /* Feed all the other slaves, MONITORs and so on */
1826 if (slave
->slaveseldb
!= dictid
) {
1830 case 0: selectcmd
= shared
.select0
; break;
1831 case 1: selectcmd
= shared
.select1
; break;
1832 case 2: selectcmd
= shared
.select2
; break;
1833 case 3: selectcmd
= shared
.select3
; break;
1834 case 4: selectcmd
= shared
.select4
; break;
1835 case 5: selectcmd
= shared
.select5
; break;
1836 case 6: selectcmd
= shared
.select6
; break;
1837 case 7: selectcmd
= shared
.select7
; break;
1838 case 8: selectcmd
= shared
.select8
; break;
1839 case 9: selectcmd
= shared
.select9
; break;
1841 selectcmd
= createObject(REDIS_STRING
,
1842 sdscatprintf(sdsempty(),"select %d\r\n",dictid
));
1843 selectcmd
->refcount
= 0;
1846 addReply(slave
,selectcmd
);
1847 slave
->slaveseldb
= dictid
;
1849 for (j
= 0; j
< outc
; j
++) addReply(slave
,outv
[j
]);
1851 for (j
= 0; j
< outc
; j
++) decrRefCount(outv
[j
]);
1852 if (outv
!= static_outv
) zfree(outv
);
1855 static void processInputBuffer(redisClient
*c
) {
1857 if (c
->bulklen
== -1) {
1858 /* Read the first line of the query */
1859 char *p
= strchr(c
->querybuf
,'\n');
1866 query
= c
->querybuf
;
1867 c
->querybuf
= sdsempty();
1868 querylen
= 1+(p
-(query
));
1869 if (sdslen(query
) > querylen
) {
1870 /* leave data after the first line of the query in the buffer */
1871 c
->querybuf
= sdscatlen(c
->querybuf
,query
+querylen
,sdslen(query
)-querylen
);
1873 *p
= '\0'; /* remove "\n" */
1874 if (*(p
-1) == '\r') *(p
-1) = '\0'; /* and "\r" if any */
1875 sdsupdatelen(query
);
1877 /* Now we can split the query in arguments */
1878 if (sdslen(query
) == 0) {
1879 /* Ignore empty query */
1883 argv
= sdssplitlen(query
,sdslen(query
)," ",1,&argc
);
1886 if (c
->argv
) zfree(c
->argv
);
1887 c
->argv
= zmalloc(sizeof(robj
*)*argc
);
1889 for (j
= 0; j
< argc
; j
++) {
1890 if (sdslen(argv
[j
])) {
1891 c
->argv
[c
->argc
] = createObject(REDIS_STRING
,argv
[j
]);
1898 /* Execute the command. If the client is still valid
1899 * after processCommand() return and there is something
1900 * on the query buffer try to process the next command. */
1901 if (c
->argc
&& processCommand(c
) && sdslen(c
->querybuf
)) goto again
;
1903 } else if (sdslen(c
->querybuf
) >= REDIS_REQUEST_MAX_SIZE
) {
1904 redisLog(REDIS_DEBUG
, "Client protocol error");
1909 /* Bulk read handling. Note that if we are at this point
1910 the client already sent a command terminated with a newline,
1911 we are reading the bulk data that is actually the last
1912 argument of the command. */
1913 int qbl
= sdslen(c
->querybuf
);
1915 if (c
->bulklen
<= qbl
) {
1916 /* Copy everything but the final CRLF as final argument */
1917 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
1919 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
1920 /* Process the command. If the client is still valid after
1921 * the processing and there is more data in the buffer
1922 * try to parse it. */
1923 if (processCommand(c
) && sdslen(c
->querybuf
)) goto again
;
1929 static void readQueryFromClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1930 redisClient
*c
= (redisClient
*) privdata
;
1931 char buf
[REDIS_IOBUF_LEN
];
1934 REDIS_NOTUSED(mask
);
1936 nread
= read(fd
, buf
, REDIS_IOBUF_LEN
);
1938 if (errno
== EAGAIN
) {
1941 redisLog(REDIS_DEBUG
, "Reading from client: %s",strerror(errno
));
1945 } else if (nread
== 0) {
1946 redisLog(REDIS_DEBUG
, "Client closed connection");
1951 c
->querybuf
= sdscatlen(c
->querybuf
, buf
, nread
);
1952 c
->lastinteraction
= time(NULL
);
1956 processInputBuffer(c
);
1959 static int selectDb(redisClient
*c
, int id
) {
1960 if (id
< 0 || id
>= server
.dbnum
)
1962 c
->db
= &server
.db
[id
];
1966 static void *dupClientReplyValue(void *o
) {
1967 incrRefCount((robj
*)o
);
1971 static redisClient
*createClient(int fd
) {
1972 redisClient
*c
= zmalloc(sizeof(*c
));
1974 anetNonBlock(NULL
,fd
);
1975 anetTcpNoDelay(NULL
,fd
);
1976 if (!c
) return NULL
;
1979 c
->querybuf
= sdsempty();
1988 c
->lastinteraction
= time(NULL
);
1989 c
->authenticated
= 0;
1990 c
->replstate
= REDIS_REPL_NONE
;
1991 c
->reply
= listCreate();
1992 listSetFreeMethod(c
->reply
,decrRefCount
);
1993 listSetDupMethod(c
->reply
,dupClientReplyValue
);
1994 if (aeCreateFileEvent(server
.el
, c
->fd
, AE_READABLE
,
1995 readQueryFromClient
, c
) == AE_ERR
) {
1999 listAddNodeTail(server
.clients
,c
);
2003 static void addReply(redisClient
*c
, robj
*obj
) {
2004 if (listLength(c
->reply
) == 0 &&
2005 (c
->replstate
== REDIS_REPL_NONE
||
2006 c
->replstate
== REDIS_REPL_ONLINE
) &&
2007 aeCreateFileEvent(server
.el
, c
->fd
, AE_WRITABLE
,
2008 sendReplyToClient
, c
) == AE_ERR
) return;
2009 listAddNodeTail(c
->reply
,getDecodedObject(obj
));
2012 static void addReplySds(redisClient
*c
, sds s
) {
2013 robj
*o
= createObject(REDIS_STRING
,s
);
2018 static void addReplyDouble(redisClient
*c
, double d
) {
2021 snprintf(buf
,sizeof(buf
),"%.17g",d
);
2022 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n%s\r\n",
2026 static void addReplyBulkLen(redisClient
*c
, robj
*obj
) {
2029 if (obj
->encoding
== REDIS_ENCODING_RAW
) {
2030 len
= sdslen(obj
->ptr
);
2032 long n
= (long)obj
->ptr
;
2039 while((n
= n
/10) != 0) {
2043 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n",len
));
2046 static void acceptHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
2051 REDIS_NOTUSED(mask
);
2052 REDIS_NOTUSED(privdata
);
2054 cfd
= anetAccept(server
.neterr
, fd
, cip
, &cport
);
2055 if (cfd
== AE_ERR
) {
2056 redisLog(REDIS_DEBUG
,"Accepting client connection: %s", server
.neterr
);
2059 redisLog(REDIS_DEBUG
,"Accepted %s:%d", cip
, cport
);
2060 if ((c
= createClient(cfd
)) == NULL
) {
2061 redisLog(REDIS_WARNING
,"Error allocating resoures for the client");
2062 close(cfd
); /* May be already closed, just ingore errors */
2065 /* If maxclient directive is set and this is one client more... close the
2066 * connection. Note that we create the client instead to check before
2067 * for this condition, since now the socket is already set in nonblocking
2068 * mode and we can send an error for free using the Kernel I/O */
2069 if (server
.maxclients
&& listLength(server
.clients
) > server
.maxclients
) {
2070 char *err
= "-ERR max number of clients reached\r\n";
2072 /* That's a best effort error message, don't check write errors */
2073 if (write(c
->fd
,err
,strlen(err
)) == -1) {
2074 /* Nothing to do, Just to avoid the warning... */
2079 server
.stat_numconnections
++;
2082 /* ======================= Redis objects implementation ===================== */
2084 static robj
*createObject(int type
, void *ptr
) {
2087 if (listLength(server
.objfreelist
)) {
2088 listNode
*head
= listFirst(server
.objfreelist
);
2089 o
= listNodeValue(head
);
2090 listDelNode(server
.objfreelist
,head
);
2092 o
= zmalloc(sizeof(*o
));
2095 o
->encoding
= REDIS_ENCODING_RAW
;
2101 static robj
*createStringObject(char *ptr
, size_t len
) {
2102 return createObject(REDIS_STRING
,sdsnewlen(ptr
,len
));
2105 static robj
*createListObject(void) {
2106 list
*l
= listCreate();
2108 listSetFreeMethod(l
,decrRefCount
);
2109 return createObject(REDIS_LIST
,l
);
2112 static robj
*createSetObject(void) {
2113 dict
*d
= dictCreate(&setDictType
,NULL
);
2114 return createObject(REDIS_SET
,d
);
2117 static robj
*createZsetObject(void) {
2118 zset
*zs
= zmalloc(sizeof(*zs
));
2120 zs
->dict
= dictCreate(&zsetDictType
,NULL
);
2121 zs
->zsl
= zslCreate();
2122 return createObject(REDIS_ZSET
,zs
);
2125 static void freeStringObject(robj
*o
) {
2126 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2131 static void freeListObject(robj
*o
) {
2132 listRelease((list
*) o
->ptr
);
2135 static void freeSetObject(robj
*o
) {
2136 dictRelease((dict
*) o
->ptr
);
2139 static void freeZsetObject(robj
*o
) {
2142 dictRelease(zs
->dict
);
2147 static void freeHashObject(robj
*o
) {
2148 dictRelease((dict
*) o
->ptr
);
2151 static void incrRefCount(robj
*o
) {
2153 #ifdef DEBUG_REFCOUNT
2154 if (o
->type
== REDIS_STRING
)
2155 printf("Increment '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
);
2159 static void decrRefCount(void *obj
) {
2162 #ifdef DEBUG_REFCOUNT
2163 if (o
->type
== REDIS_STRING
)
2164 printf("Decrement '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
-1);
2166 if (--(o
->refcount
) == 0) {
2168 case REDIS_STRING
: freeStringObject(o
); break;
2169 case REDIS_LIST
: freeListObject(o
); break;
2170 case REDIS_SET
: freeSetObject(o
); break;
2171 case REDIS_ZSET
: freeZsetObject(o
); break;
2172 case REDIS_HASH
: freeHashObject(o
); break;
2173 default: redisAssert(0 != 0); break;
2175 if (listLength(server
.objfreelist
) > REDIS_OBJFREELIST_MAX
||
2176 !listAddNodeHead(server
.objfreelist
,o
))
2181 static robj
*lookupKey(redisDb
*db
, robj
*key
) {
2182 dictEntry
*de
= dictFind(db
->dict
,key
);
2183 return de
? dictGetEntryVal(de
) : NULL
;
2186 static robj
*lookupKeyRead(redisDb
*db
, robj
*key
) {
2187 expireIfNeeded(db
,key
);
2188 return lookupKey(db
,key
);
2191 static robj
*lookupKeyWrite(redisDb
*db
, robj
*key
) {
2192 deleteIfVolatile(db
,key
);
2193 return lookupKey(db
,key
);
2196 static int deleteKey(redisDb
*db
, robj
*key
) {
2199 /* We need to protect key from destruction: after the first dictDelete()
2200 * it may happen that 'key' is no longer valid if we don't increment
2201 * it's count. This may happen when we get the object reference directly
2202 * from the hash table with dictRandomKey() or dict iterators */
2204 if (dictSize(db
->expires
)) dictDelete(db
->expires
,key
);
2205 retval
= dictDelete(db
->dict
,key
);
2208 return retval
== DICT_OK
;
2211 /* Try to share an object against the shared objects pool */
2212 static robj
*tryObjectSharing(robj
*o
) {
2213 struct dictEntry
*de
;
2216 if (o
== NULL
|| server
.shareobjects
== 0) return o
;
2218 redisAssert(o
->type
== REDIS_STRING
);
2219 de
= dictFind(server
.sharingpool
,o
);
2221 robj
*shared
= dictGetEntryKey(de
);
2223 c
= ((unsigned long) dictGetEntryVal(de
))+1;
2224 dictGetEntryVal(de
) = (void*) c
;
2225 incrRefCount(shared
);
2229 /* Here we are using a stream algorihtm: Every time an object is
2230 * shared we increment its count, everytime there is a miss we
2231 * recrement the counter of a random object. If this object reaches
2232 * zero we remove the object and put the current object instead. */
2233 if (dictSize(server
.sharingpool
) >=
2234 server
.sharingpoolsize
) {
2235 de
= dictGetRandomKey(server
.sharingpool
);
2236 redisAssert(de
!= NULL
);
2237 c
= ((unsigned long) dictGetEntryVal(de
))-1;
2238 dictGetEntryVal(de
) = (void*) c
;
2240 dictDelete(server
.sharingpool
,de
->key
);
2243 c
= 0; /* If the pool is empty we want to add this object */
2248 retval
= dictAdd(server
.sharingpool
,o
,(void*)1);
2249 redisAssert(retval
== DICT_OK
);
2256 /* Check if the nul-terminated string 's' can be represented by a long
2257 * (that is, is a number that fits into long without any other space or
2258 * character before or after the digits).
2260 * If so, the function returns REDIS_OK and *longval is set to the value
2261 * of the number. Otherwise REDIS_ERR is returned */
2262 static int isStringRepresentableAsLong(sds s
, long *longval
) {
2263 char buf
[32], *endptr
;
2267 value
= strtol(s
, &endptr
, 10);
2268 if (endptr
[0] != '\0') return REDIS_ERR
;
2269 slen
= snprintf(buf
,32,"%ld",value
);
2271 /* If the number converted back into a string is not identical
2272 * then it's not possible to encode the string as integer */
2273 if (sdslen(s
) != (unsigned)slen
|| memcmp(buf
,s
,slen
)) return REDIS_ERR
;
2274 if (longval
) *longval
= value
;
2278 /* Try to encode a string object in order to save space */
2279 static int tryObjectEncoding(robj
*o
) {
2283 if (o
->encoding
!= REDIS_ENCODING_RAW
)
2284 return REDIS_ERR
; /* Already encoded */
2286 /* It's not save to encode shared objects: shared objects can be shared
2287 * everywhere in the "object space" of Redis. Encoded objects can only
2288 * appear as "values" (and not, for instance, as keys) */
2289 if (o
->refcount
> 1) return REDIS_ERR
;
2291 /* Currently we try to encode only strings */
2292 redisAssert(o
->type
== REDIS_STRING
);
2294 /* Check if we can represent this string as a long integer */
2295 if (isStringRepresentableAsLong(s
,&value
) == REDIS_ERR
) return REDIS_ERR
;
2297 /* Ok, this object can be encoded */
2298 o
->encoding
= REDIS_ENCODING_INT
;
2300 o
->ptr
= (void*) value
;
2304 /* Get a decoded version of an encoded object (returned as a new object).
2305 * If the object is already raw-encoded just increment the ref count. */
2306 static robj
*getDecodedObject(robj
*o
) {
2309 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2313 if (o
->type
== REDIS_STRING
&& o
->encoding
== REDIS_ENCODING_INT
) {
2316 snprintf(buf
,32,"%ld",(long)o
->ptr
);
2317 dec
= createStringObject(buf
,strlen(buf
));
2320 redisAssert(1 != 1);
2324 /* Compare two string objects via strcmp() or alike.
2325 * Note that the objects may be integer-encoded. In such a case we
2326 * use snprintf() to get a string representation of the numbers on the stack
2327 * and compare the strings, it's much faster than calling getDecodedObject().
2329 * Important note: if objects are not integer encoded, but binary-safe strings,
2330 * sdscmp() from sds.c will apply memcmp() so this function ca be considered
2332 static int compareStringObjects(robj
*a
, robj
*b
) {
2333 redisAssert(a
->type
== REDIS_STRING
&& b
->type
== REDIS_STRING
);
2334 char bufa
[128], bufb
[128], *astr
, *bstr
;
2337 if (a
== b
) return 0;
2338 if (a
->encoding
!= REDIS_ENCODING_RAW
) {
2339 snprintf(bufa
,sizeof(bufa
),"%ld",(long) a
->ptr
);
2345 if (b
->encoding
!= REDIS_ENCODING_RAW
) {
2346 snprintf(bufb
,sizeof(bufb
),"%ld",(long) b
->ptr
);
2352 return bothsds
? sdscmp(astr
,bstr
) : strcmp(astr
,bstr
);
2355 static size_t stringObjectLen(robj
*o
) {
2356 redisAssert(o
->type
== REDIS_STRING
);
2357 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2358 return sdslen(o
->ptr
);
2362 return snprintf(buf
,32,"%ld",(long)o
->ptr
);
2366 /*============================ DB saving/loading ============================ */
2368 static int rdbSaveType(FILE *fp
, unsigned char type
) {
2369 if (fwrite(&type
,1,1,fp
) == 0) return -1;
2373 static int rdbSaveTime(FILE *fp
, time_t t
) {
2374 int32_t t32
= (int32_t) t
;
2375 if (fwrite(&t32
,4,1,fp
) == 0) return -1;
2379 /* check rdbLoadLen() comments for more info */
2380 static int rdbSaveLen(FILE *fp
, uint32_t len
) {
2381 unsigned char buf
[2];
2384 /* Save a 6 bit len */
2385 buf
[0] = (len
&0xFF)|(REDIS_RDB_6BITLEN
<<6);
2386 if (fwrite(buf
,1,1,fp
) == 0) return -1;
2387 } else if (len
< (1<<14)) {
2388 /* Save a 14 bit len */
2389 buf
[0] = ((len
>>8)&0xFF)|(REDIS_RDB_14BITLEN
<<6);
2391 if (fwrite(buf
,2,1,fp
) == 0) return -1;
2393 /* Save a 32 bit len */
2394 buf
[0] = (REDIS_RDB_32BITLEN
<<6);
2395 if (fwrite(buf
,1,1,fp
) == 0) return -1;
2397 if (fwrite(&len
,4,1,fp
) == 0) return -1;
2402 /* String objects in the form "2391" "-100" without any space and with a
2403 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2404 * encoded as integers to save space */
2405 static int rdbTryIntegerEncoding(sds s
, unsigned char *enc
) {
2407 char *endptr
, buf
[32];
2409 /* Check if it's possible to encode this value as a number */
2410 value
= strtoll(s
, &endptr
, 10);
2411 if (endptr
[0] != '\0') return 0;
2412 snprintf(buf
,32,"%lld",value
);
2414 /* If the number converted back into a string is not identical
2415 * then it's not possible to encode the string as integer */
2416 if (strlen(buf
) != sdslen(s
) || memcmp(buf
,s
,sdslen(s
))) return 0;
2418 /* Finally check if it fits in our ranges */
2419 if (value
>= -(1<<7) && value
<= (1<<7)-1) {
2420 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT8
;
2421 enc
[1] = value
&0xFF;
2423 } else if (value
>= -(1<<15) && value
<= (1<<15)-1) {
2424 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT16
;
2425 enc
[1] = value
&0xFF;
2426 enc
[2] = (value
>>8)&0xFF;
2428 } else if (value
>= -((long long)1<<31) && value
<= ((long long)1<<31)-1) {
2429 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT32
;
2430 enc
[1] = value
&0xFF;
2431 enc
[2] = (value
>>8)&0xFF;
2432 enc
[3] = (value
>>16)&0xFF;
2433 enc
[4] = (value
>>24)&0xFF;
2440 static int rdbSaveLzfStringObject(FILE *fp
, robj
*obj
) {
2441 unsigned int comprlen
, outlen
;
2445 /* We require at least four bytes compression for this to be worth it */
2446 outlen
= sdslen(obj
->ptr
)-4;
2447 if (outlen
<= 0) return 0;
2448 if ((out
= zmalloc(outlen
+1)) == NULL
) return 0;
2449 comprlen
= lzf_compress(obj
->ptr
, sdslen(obj
->ptr
), out
, outlen
);
2450 if (comprlen
== 0) {
2454 /* Data compressed! Let's save it on disk */
2455 byte
= (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_LZF
;
2456 if (fwrite(&byte
,1,1,fp
) == 0) goto writeerr
;
2457 if (rdbSaveLen(fp
,comprlen
) == -1) goto writeerr
;
2458 if (rdbSaveLen(fp
,sdslen(obj
->ptr
)) == -1) goto writeerr
;
2459 if (fwrite(out
,comprlen
,1,fp
) == 0) goto writeerr
;
2468 /* Save a string objet as [len][data] on disk. If the object is a string
2469 * representation of an integer value we try to safe it in a special form */
2470 static int rdbSaveStringObjectRaw(FILE *fp
, robj
*obj
) {
2474 len
= sdslen(obj
->ptr
);
2476 /* Try integer encoding */
2478 unsigned char buf
[5];
2479 if ((enclen
= rdbTryIntegerEncoding(obj
->ptr
,buf
)) > 0) {
2480 if (fwrite(buf
,enclen
,1,fp
) == 0) return -1;
2485 /* Try LZF compression - under 20 bytes it's unable to compress even
2486 * aaaaaaaaaaaaaaaaaa so skip it */
2490 retval
= rdbSaveLzfStringObject(fp
,obj
);
2491 if (retval
== -1) return -1;
2492 if (retval
> 0) return 0;
2493 /* retval == 0 means data can't be compressed, save the old way */
2496 /* Store verbatim */
2497 if (rdbSaveLen(fp
,len
) == -1) return -1;
2498 if (len
&& fwrite(obj
->ptr
,len
,1,fp
) == 0) return -1;
2502 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2503 static int rdbSaveStringObject(FILE *fp
, robj
*obj
) {
2506 obj
= getDecodedObject(obj
);
2507 retval
= rdbSaveStringObjectRaw(fp
,obj
);
2512 /* Save a double value. Doubles are saved as strings prefixed by an unsigned
2513 * 8 bit integer specifing the length of the representation.
2514 * This 8 bit integer has special values in order to specify the following
2520 static int rdbSaveDoubleValue(FILE *fp
, double val
) {
2521 unsigned char buf
[128];
2527 } else if (!isfinite(val
)) {
2529 buf
[0] = (val
< 0) ? 255 : 254;
2531 snprintf((char*)buf
+1,sizeof(buf
)-1,"%.17g",val
);
2532 buf
[0] = strlen((char*)buf
+1);
2535 if (fwrite(buf
,len
,1,fp
) == 0) return -1;
2539 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2540 static int rdbSave(char *filename
) {
2541 dictIterator
*di
= NULL
;
2546 time_t now
= time(NULL
);
2548 snprintf(tmpfile
,256,"temp-%d.rdb", (int) getpid());
2549 fp
= fopen(tmpfile
,"w");
2551 redisLog(REDIS_WARNING
, "Failed saving the DB: %s", strerror(errno
));
2554 if (fwrite("REDIS0001",9,1,fp
) == 0) goto werr
;
2555 for (j
= 0; j
< server
.dbnum
; j
++) {
2556 redisDb
*db
= server
.db
+j
;
2558 if (dictSize(d
) == 0) continue;
2559 di
= dictGetIterator(d
);
2565 /* Write the SELECT DB opcode */
2566 if (rdbSaveType(fp
,REDIS_SELECTDB
) == -1) goto werr
;
2567 if (rdbSaveLen(fp
,j
) == -1) goto werr
;
2569 /* Iterate this DB writing every entry */
2570 while((de
= dictNext(di
)) != NULL
) {
2571 robj
*key
= dictGetEntryKey(de
);
2572 robj
*o
= dictGetEntryVal(de
);
2573 time_t expiretime
= getExpire(db
,key
);
2575 /* Save the expire time */
2576 if (expiretime
!= -1) {
2577 /* If this key is already expired skip it */
2578 if (expiretime
< now
) continue;
2579 if (rdbSaveType(fp
,REDIS_EXPIRETIME
) == -1) goto werr
;
2580 if (rdbSaveTime(fp
,expiretime
) == -1) goto werr
;
2582 /* Save the key and associated value */
2583 if (rdbSaveType(fp
,o
->type
) == -1) goto werr
;
2584 if (rdbSaveStringObject(fp
,key
) == -1) goto werr
;
2585 if (o
->type
== REDIS_STRING
) {
2586 /* Save a string value */
2587 if (rdbSaveStringObject(fp
,o
) == -1) goto werr
;
2588 } else if (o
->type
== REDIS_LIST
) {
2589 /* Save a list value */
2590 list
*list
= o
->ptr
;
2594 if (rdbSaveLen(fp
,listLength(list
)) == -1) goto werr
;
2595 while((ln
= listYield(list
))) {
2596 robj
*eleobj
= listNodeValue(ln
);
2598 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
2600 } else if (o
->type
== REDIS_SET
) {
2601 /* Save a set value */
2603 dictIterator
*di
= dictGetIterator(set
);
2606 if (rdbSaveLen(fp
,dictSize(set
)) == -1) goto werr
;
2607 while((de
= dictNext(di
)) != NULL
) {
2608 robj
*eleobj
= dictGetEntryKey(de
);
2610 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
2612 dictReleaseIterator(di
);
2613 } else if (o
->type
== REDIS_ZSET
) {
2614 /* Save a set value */
2616 dictIterator
*di
= dictGetIterator(zs
->dict
);
2619 if (rdbSaveLen(fp
,dictSize(zs
->dict
)) == -1) goto werr
;
2620 while((de
= dictNext(di
)) != NULL
) {
2621 robj
*eleobj
= dictGetEntryKey(de
);
2622 double *score
= dictGetEntryVal(de
);
2624 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
2625 if (rdbSaveDoubleValue(fp
,*score
) == -1) goto werr
;
2627 dictReleaseIterator(di
);
2629 redisAssert(0 != 0);
2632 dictReleaseIterator(di
);
2635 if (rdbSaveType(fp
,REDIS_EOF
) == -1) goto werr
;
2637 /* Make sure data will not remain on the OS's output buffers */
2642 /* Use RENAME to make sure the DB file is changed atomically only
2643 * if the generate DB file is ok. */
2644 if (rename(tmpfile
,filename
) == -1) {
2645 redisLog(REDIS_WARNING
,"Error moving temp DB file on the final destination: %s", strerror(errno
));
2649 redisLog(REDIS_NOTICE
,"DB saved on disk");
2651 server
.lastsave
= time(NULL
);
2657 redisLog(REDIS_WARNING
,"Write error saving DB on disk: %s", strerror(errno
));
2658 if (di
) dictReleaseIterator(di
);
2662 static int rdbSaveBackground(char *filename
) {
2665 if (server
.bgsavechildpid
!= -1) return REDIS_ERR
;
2666 if ((childpid
= fork()) == 0) {
2669 if (rdbSave(filename
) == REDIS_OK
) {
2676 if (childpid
== -1) {
2677 redisLog(REDIS_WARNING
,"Can't save in background: fork: %s",
2681 redisLog(REDIS_NOTICE
,"Background saving started by pid %d",childpid
);
2682 server
.bgsavechildpid
= childpid
;
2685 return REDIS_OK
; /* unreached */
2688 static void rdbRemoveTempFile(pid_t childpid
) {
2691 snprintf(tmpfile
,256,"temp-%d.rdb", (int) childpid
);
2695 static int rdbLoadType(FILE *fp
) {
2697 if (fread(&type
,1,1,fp
) == 0) return -1;
2701 static time_t rdbLoadTime(FILE *fp
) {
2703 if (fread(&t32
,4,1,fp
) == 0) return -1;
2704 return (time_t) t32
;
2707 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2708 * of this file for a description of how this are stored on disk.
2710 * isencoded is set to 1 if the readed length is not actually a length but
2711 * an "encoding type", check the above comments for more info */
2712 static uint32_t rdbLoadLen(FILE *fp
, int rdbver
, int *isencoded
) {
2713 unsigned char buf
[2];
2716 if (isencoded
) *isencoded
= 0;
2718 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
2723 if (fread(buf
,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2724 type
= (buf
[0]&0xC0)>>6;
2725 if (type
== REDIS_RDB_6BITLEN
) {
2726 /* Read a 6 bit len */
2728 } else if (type
== REDIS_RDB_ENCVAL
) {
2729 /* Read a 6 bit len encoding type */
2730 if (isencoded
) *isencoded
= 1;
2732 } else if (type
== REDIS_RDB_14BITLEN
) {
2733 /* Read a 14 bit len */
2734 if (fread(buf
+1,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2735 return ((buf
[0]&0x3F)<<8)|buf
[1];
2737 /* Read a 32 bit len */
2738 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
2744 static robj
*rdbLoadIntegerObject(FILE *fp
, int enctype
) {
2745 unsigned char enc
[4];
2748 if (enctype
== REDIS_RDB_ENC_INT8
) {
2749 if (fread(enc
,1,1,fp
) == 0) return NULL
;
2750 val
= (signed char)enc
[0];
2751 } else if (enctype
== REDIS_RDB_ENC_INT16
) {
2753 if (fread(enc
,2,1,fp
) == 0) return NULL
;
2754 v
= enc
[0]|(enc
[1]<<8);
2756 } else if (enctype
== REDIS_RDB_ENC_INT32
) {
2758 if (fread(enc
,4,1,fp
) == 0) return NULL
;
2759 v
= enc
[0]|(enc
[1]<<8)|(enc
[2]<<16)|(enc
[3]<<24);
2762 val
= 0; /* anti-warning */
2765 return createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",val
));
2768 static robj
*rdbLoadLzfStringObject(FILE*fp
, int rdbver
) {
2769 unsigned int len
, clen
;
2770 unsigned char *c
= NULL
;
2773 if ((clen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2774 if ((len
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2775 if ((c
= zmalloc(clen
)) == NULL
) goto err
;
2776 if ((val
= sdsnewlen(NULL
,len
)) == NULL
) goto err
;
2777 if (fread(c
,clen
,1,fp
) == 0) goto err
;
2778 if (lzf_decompress(c
,clen
,val
,len
) == 0) goto err
;
2780 return createObject(REDIS_STRING
,val
);
2787 static robj
*rdbLoadStringObject(FILE*fp
, int rdbver
) {
2792 len
= rdbLoadLen(fp
,rdbver
,&isencoded
);
2795 case REDIS_RDB_ENC_INT8
:
2796 case REDIS_RDB_ENC_INT16
:
2797 case REDIS_RDB_ENC_INT32
:
2798 return tryObjectSharing(rdbLoadIntegerObject(fp
,len
));
2799 case REDIS_RDB_ENC_LZF
:
2800 return tryObjectSharing(rdbLoadLzfStringObject(fp
,rdbver
));
2806 if (len
== REDIS_RDB_LENERR
) return NULL
;
2807 val
= sdsnewlen(NULL
,len
);
2808 if (len
&& fread(val
,len
,1,fp
) == 0) {
2812 return tryObjectSharing(createObject(REDIS_STRING
,val
));
2815 /* For information about double serialization check rdbSaveDoubleValue() */
2816 static int rdbLoadDoubleValue(FILE *fp
, double *val
) {
2820 if (fread(&len
,1,1,fp
) == 0) return -1;
2822 case 255: *val
= R_NegInf
; return 0;
2823 case 254: *val
= R_PosInf
; return 0;
2824 case 253: *val
= R_Nan
; return 0;
2826 if (fread(buf
,len
,1,fp
) == 0) return -1;
2827 sscanf(buf
, "%lg", val
);
2832 static int rdbLoad(char *filename
) {
2834 robj
*keyobj
= NULL
;
2836 int type
, retval
, rdbver
;
2837 dict
*d
= server
.db
[0].dict
;
2838 redisDb
*db
= server
.db
+0;
2840 time_t expiretime
= -1, now
= time(NULL
);
2842 fp
= fopen(filename
,"r");
2843 if (!fp
) return REDIS_ERR
;
2844 if (fread(buf
,9,1,fp
) == 0) goto eoferr
;
2846 if (memcmp(buf
,"REDIS",5) != 0) {
2848 redisLog(REDIS_WARNING
,"Wrong signature trying to load DB from file");
2851 rdbver
= atoi(buf
+5);
2854 redisLog(REDIS_WARNING
,"Can't handle RDB format version %d",rdbver
);
2861 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
2862 if (type
== REDIS_EXPIRETIME
) {
2863 if ((expiretime
= rdbLoadTime(fp
)) == -1) goto eoferr
;
2864 /* We read the time so we need to read the object type again */
2865 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
2867 if (type
== REDIS_EOF
) break;
2868 /* Handle SELECT DB opcode as a special case */
2869 if (type
== REDIS_SELECTDB
) {
2870 if ((dbid
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2872 if (dbid
>= (unsigned)server
.dbnum
) {
2873 redisLog(REDIS_WARNING
,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server
.dbnum
);
2876 db
= server
.db
+dbid
;
2881 if ((keyobj
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2883 if (type
== REDIS_STRING
) {
2884 /* Read string value */
2885 if ((o
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2886 tryObjectEncoding(o
);
2887 } else if (type
== REDIS_LIST
|| type
== REDIS_SET
) {
2888 /* Read list/set value */
2891 if ((listlen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2893 o
= (type
== REDIS_LIST
) ? createListObject() : createSetObject();
2894 /* Load every single element of the list/set */
2898 if ((ele
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2899 tryObjectEncoding(ele
);
2900 if (type
== REDIS_LIST
) {
2901 listAddNodeTail((list
*)o
->ptr
,ele
);
2903 dictAdd((dict
*)o
->ptr
,ele
,NULL
);
2906 } else if (type
== REDIS_ZSET
) {
2907 /* Read list/set value */
2911 if ((zsetlen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2913 o
= createZsetObject();
2915 /* Load every single element of the list/set */
2918 double *score
= zmalloc(sizeof(double));
2920 if ((ele
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2921 tryObjectEncoding(ele
);
2922 if (rdbLoadDoubleValue(fp
,score
) == -1) goto eoferr
;
2923 dictAdd(zs
->dict
,ele
,score
);
2924 zslInsert(zs
->zsl
,*score
,ele
);
2925 incrRefCount(ele
); /* added to skiplist */
2928 redisAssert(0 != 0);
2930 /* Add the new object in the hash table */
2931 retval
= dictAdd(d
,keyobj
,o
);
2932 if (retval
== DICT_ERR
) {
2933 redisLog(REDIS_WARNING
,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj
->ptr
);
2936 /* Set the expire time if needed */
2937 if (expiretime
!= -1) {
2938 setExpire(db
,keyobj
,expiretime
);
2939 /* Delete this key if already expired */
2940 if (expiretime
< now
) deleteKey(db
,keyobj
);
2948 eoferr
: /* unexpected end of file is handled here with a fatal exit */
2949 if (keyobj
) decrRefCount(keyobj
);
2950 redisLog(REDIS_WARNING
,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
2952 return REDIS_ERR
; /* Just to avoid warning */
2955 /*================================== Commands =============================== */
2957 static void authCommand(redisClient
*c
) {
2958 if (!server
.requirepass
|| !strcmp(c
->argv
[1]->ptr
, server
.requirepass
)) {
2959 c
->authenticated
= 1;
2960 addReply(c
,shared
.ok
);
2962 c
->authenticated
= 0;
2963 addReplySds(c
,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
2967 static void pingCommand(redisClient
*c
) {
2968 addReply(c
,shared
.pong
);
2971 static void echoCommand(redisClient
*c
) {
2972 addReplyBulkLen(c
,c
->argv
[1]);
2973 addReply(c
,c
->argv
[1]);
2974 addReply(c
,shared
.crlf
);
2977 /*=================================== Strings =============================== */
2979 static void setGenericCommand(redisClient
*c
, int nx
) {
2982 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2983 if (retval
== DICT_ERR
) {
2985 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2986 incrRefCount(c
->argv
[2]);
2988 addReply(c
,shared
.czero
);
2992 incrRefCount(c
->argv
[1]);
2993 incrRefCount(c
->argv
[2]);
2996 removeExpire(c
->db
,c
->argv
[1]);
2997 addReply(c
, nx
? shared
.cone
: shared
.ok
);
3000 static void setCommand(redisClient
*c
) {
3001 setGenericCommand(c
,0);
3004 static void setnxCommand(redisClient
*c
) {
3005 setGenericCommand(c
,1);
3008 static void getCommand(redisClient
*c
) {
3009 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3012 addReply(c
,shared
.nullbulk
);
3014 if (o
->type
!= REDIS_STRING
) {
3015 addReply(c
,shared
.wrongtypeerr
);
3017 addReplyBulkLen(c
,o
);
3019 addReply(c
,shared
.crlf
);
3024 static void getsetCommand(redisClient
*c
) {
3026 if (dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]) == DICT_ERR
) {
3027 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
3029 incrRefCount(c
->argv
[1]);
3031 incrRefCount(c
->argv
[2]);
3033 removeExpire(c
->db
,c
->argv
[1]);
3036 static void mgetCommand(redisClient
*c
) {
3039 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",c
->argc
-1));
3040 for (j
= 1; j
< c
->argc
; j
++) {
3041 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[j
]);
3043 addReply(c
,shared
.nullbulk
);
3045 if (o
->type
!= REDIS_STRING
) {
3046 addReply(c
,shared
.nullbulk
);
3048 addReplyBulkLen(c
,o
);
3050 addReply(c
,shared
.crlf
);
3056 static void msetGenericCommand(redisClient
*c
, int nx
) {
3059 if ((c
->argc
% 2) == 0) {
3060 addReplySds(c
,sdsnew("-ERR wrong number of arguments\r\n"));
3063 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
3064 * set nothing at all if at least one already key exists. */
3066 for (j
= 1; j
< c
->argc
; j
+= 2) {
3067 if (dictFind(c
->db
->dict
,c
->argv
[j
]) != NULL
) {
3068 addReply(c
, shared
.czero
);
3074 for (j
= 1; j
< c
->argc
; j
+= 2) {
3077 tryObjectEncoding(c
->argv
[j
+1]);
3078 retval
= dictAdd(c
->db
->dict
,c
->argv
[j
],c
->argv
[j
+1]);
3079 if (retval
== DICT_ERR
) {
3080 dictReplace(c
->db
->dict
,c
->argv
[j
],c
->argv
[j
+1]);
3081 incrRefCount(c
->argv
[j
+1]);
3083 incrRefCount(c
->argv
[j
]);
3084 incrRefCount(c
->argv
[j
+1]);
3086 removeExpire(c
->db
,c
->argv
[j
]);
3088 server
.dirty
+= (c
->argc
-1)/2;
3089 addReply(c
, nx
? shared
.cone
: shared
.ok
);
3092 static void msetCommand(redisClient
*c
) {
3093 msetGenericCommand(c
,0);
3096 static void msetnxCommand(redisClient
*c
) {
3097 msetGenericCommand(c
,1);
3100 static void incrDecrCommand(redisClient
*c
, long long incr
) {
3105 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3109 if (o
->type
!= REDIS_STRING
) {
3114 if (o
->encoding
== REDIS_ENCODING_RAW
)
3115 value
= strtoll(o
->ptr
, &eptr
, 10);
3116 else if (o
->encoding
== REDIS_ENCODING_INT
)
3117 value
= (long)o
->ptr
;
3119 redisAssert(1 != 1);
3124 o
= createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",value
));
3125 tryObjectEncoding(o
);
3126 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],o
);
3127 if (retval
== DICT_ERR
) {
3128 dictReplace(c
->db
->dict
,c
->argv
[1],o
);
3129 removeExpire(c
->db
,c
->argv
[1]);
3131 incrRefCount(c
->argv
[1]);
3134 addReply(c
,shared
.colon
);
3136 addReply(c
,shared
.crlf
);
3139 static void incrCommand(redisClient
*c
) {
3140 incrDecrCommand(c
,1);
3143 static void decrCommand(redisClient
*c
) {
3144 incrDecrCommand(c
,-1);
3147 static void incrbyCommand(redisClient
*c
) {
3148 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
3149 incrDecrCommand(c
,incr
);
3152 static void decrbyCommand(redisClient
*c
) {
3153 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
3154 incrDecrCommand(c
,-incr
);
3157 /* ========================= Type agnostic commands ========================= */
3159 static void delCommand(redisClient
*c
) {
3162 for (j
= 1; j
< c
->argc
; j
++) {
3163 if (deleteKey(c
->db
,c
->argv
[j
])) {
3170 addReply(c
,shared
.czero
);
3173 addReply(c
,shared
.cone
);
3176 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",deleted
));
3181 static void existsCommand(redisClient
*c
) {
3182 addReply(c
,lookupKeyRead(c
->db
,c
->argv
[1]) ? shared
.cone
: shared
.czero
);
3185 static void selectCommand(redisClient
*c
) {
3186 int id
= atoi(c
->argv
[1]->ptr
);
3188 if (selectDb(c
,id
) == REDIS_ERR
) {
3189 addReplySds(c
,sdsnew("-ERR invalid DB index\r\n"));
3191 addReply(c
,shared
.ok
);
3195 static void randomkeyCommand(redisClient
*c
) {
3199 de
= dictGetRandomKey(c
->db
->dict
);
3200 if (!de
|| expireIfNeeded(c
->db
,dictGetEntryKey(de
)) == 0) break;
3203 addReply(c
,shared
.plus
);
3204 addReply(c
,shared
.crlf
);
3206 addReply(c
,shared
.plus
);
3207 addReply(c
,dictGetEntryKey(de
));
3208 addReply(c
,shared
.crlf
);
3212 static void keysCommand(redisClient
*c
) {
3215 sds pattern
= c
->argv
[1]->ptr
;
3216 int plen
= sdslen(pattern
);
3217 unsigned long numkeys
= 0, keyslen
= 0;
3218 robj
*lenobj
= createObject(REDIS_STRING
,NULL
);
3220 di
= dictGetIterator(c
->db
->dict
);
3222 decrRefCount(lenobj
);
3223 while((de
= dictNext(di
)) != NULL
) {
3224 robj
*keyobj
= dictGetEntryKey(de
);
3226 sds key
= keyobj
->ptr
;
3227 if ((pattern
[0] == '*' && pattern
[1] == '\0') ||
3228 stringmatchlen(pattern
,plen
,key
,sdslen(key
),0)) {
3229 if (expireIfNeeded(c
->db
,keyobj
) == 0) {
3231 addReply(c
,shared
.space
);
3234 keyslen
+= sdslen(key
);
3238 dictReleaseIterator(di
);
3239 lenobj
->ptr
= sdscatprintf(sdsempty(),"$%lu\r\n",keyslen
+(numkeys
? (numkeys
-1) : 0));
3240 addReply(c
,shared
.crlf
);
3243 static void dbsizeCommand(redisClient
*c
) {
3245 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c
->db
->dict
)));
3248 static void lastsaveCommand(redisClient
*c
) {
3250 sdscatprintf(sdsempty(),":%lu\r\n",server
.lastsave
));
3253 static void typeCommand(redisClient
*c
) {
3257 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3262 case REDIS_STRING
: type
= "+string"; break;
3263 case REDIS_LIST
: type
= "+list"; break;
3264 case REDIS_SET
: type
= "+set"; break;
3265 case REDIS_ZSET
: type
= "+zset"; break;
3266 default: type
= "unknown"; break;
3269 addReplySds(c
,sdsnew(type
));
3270 addReply(c
,shared
.crlf
);
3273 static void saveCommand(redisClient
*c
) {
3274 if (server
.bgsavechildpid
!= -1) {
3275 addReplySds(c
,sdsnew("-ERR background save in progress\r\n"));
3278 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
3279 addReply(c
,shared
.ok
);
3281 addReply(c
,shared
.err
);
3285 static void bgsaveCommand(redisClient
*c
) {
3286 if (server
.bgsavechildpid
!= -1) {
3287 addReplySds(c
,sdsnew("-ERR background save already in progress\r\n"));
3290 if (rdbSaveBackground(server
.dbfilename
) == REDIS_OK
) {
3291 addReply(c
,shared
.ok
);
3293 addReply(c
,shared
.err
);
3297 static void shutdownCommand(redisClient
*c
) {
3298 redisLog(REDIS_WARNING
,"User requested shutdown, saving DB...");
3299 /* Kill the saving child if there is a background saving in progress.
3300 We want to avoid race conditions, for instance our saving child may
3301 overwrite the synchronous saving did by SHUTDOWN. */
3302 if (server
.bgsavechildpid
!= -1) {
3303 redisLog(REDIS_WARNING
,"There is a live saving child. Killing it!");
3304 kill(server
.bgsavechildpid
,SIGKILL
);
3305 rdbRemoveTempFile(server
.bgsavechildpid
);
3308 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
3309 if (server
.daemonize
)
3310 unlink(server
.pidfile
);
3311 redisLog(REDIS_WARNING
,"%zu bytes used at exit",zmalloc_used_memory());
3312 redisLog(REDIS_WARNING
,"Server exit now, bye bye...");
3315 /* Ooops.. error saving! The best we can do is to continue operating.
3316 * Note that if there was a background saving process, in the next
3317 * cron() Redis will be notified that the background saving aborted,
3318 * handling special stuff like slaves pending for synchronization... */
3319 redisLog(REDIS_WARNING
,"Error trying to save the DB, can't exit");
3320 addReplySds(c
,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3324 static void renameGenericCommand(redisClient
*c
, int nx
) {
3327 /* To use the same key as src and dst is probably an error */
3328 if (sdscmp(c
->argv
[1]->ptr
,c
->argv
[2]->ptr
) == 0) {
3329 addReply(c
,shared
.sameobjecterr
);
3333 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3335 addReply(c
,shared
.nokeyerr
);
3339 deleteIfVolatile(c
->db
,c
->argv
[2]);
3340 if (dictAdd(c
->db
->dict
,c
->argv
[2],o
) == DICT_ERR
) {
3343 addReply(c
,shared
.czero
);
3346 dictReplace(c
->db
->dict
,c
->argv
[2],o
);
3348 incrRefCount(c
->argv
[2]);
3350 deleteKey(c
->db
,c
->argv
[1]);
3352 addReply(c
,nx
? shared
.cone
: shared
.ok
);
3355 static void renameCommand(redisClient
*c
) {
3356 renameGenericCommand(c
,0);
3359 static void renamenxCommand(redisClient
*c
) {
3360 renameGenericCommand(c
,1);
3363 static void moveCommand(redisClient
*c
) {
3368 /* Obtain source and target DB pointers */
3371 if (selectDb(c
,atoi(c
->argv
[2]->ptr
)) == REDIS_ERR
) {
3372 addReply(c
,shared
.outofrangeerr
);
3376 selectDb(c
,srcid
); /* Back to the source DB */
3378 /* If the user is moving using as target the same
3379 * DB as the source DB it is probably an error. */
3381 addReply(c
,shared
.sameobjecterr
);
3385 /* Check if the element exists and get a reference */
3386 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3388 addReply(c
,shared
.czero
);
3392 /* Try to add the element to the target DB */
3393 deleteIfVolatile(dst
,c
->argv
[1]);
3394 if (dictAdd(dst
->dict
,c
->argv
[1],o
) == DICT_ERR
) {
3395 addReply(c
,shared
.czero
);
3398 incrRefCount(c
->argv
[1]);
3401 /* OK! key moved, free the entry in the source DB */
3402 deleteKey(src
,c
->argv
[1]);
3404 addReply(c
,shared
.cone
);
3407 /* =================================== Lists ================================ */
3408 static void pushGenericCommand(redisClient
*c
, int where
) {
3412 lobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3414 lobj
= createListObject();
3416 if (where
== REDIS_HEAD
) {
3417 listAddNodeHead(list
,c
->argv
[2]);
3419 listAddNodeTail(list
,c
->argv
[2]);
3421 dictAdd(c
->db
->dict
,c
->argv
[1],lobj
);
3422 incrRefCount(c
->argv
[1]);
3423 incrRefCount(c
->argv
[2]);
3425 if (lobj
->type
!= REDIS_LIST
) {
3426 addReply(c
,shared
.wrongtypeerr
);
3430 if (where
== REDIS_HEAD
) {
3431 listAddNodeHead(list
,c
->argv
[2]);
3433 listAddNodeTail(list
,c
->argv
[2]);
3435 incrRefCount(c
->argv
[2]);
3438 addReply(c
,shared
.ok
);
3441 static void lpushCommand(redisClient
*c
) {
3442 pushGenericCommand(c
,REDIS_HEAD
);
3445 static void rpushCommand(redisClient
*c
) {
3446 pushGenericCommand(c
,REDIS_TAIL
);
3449 static void llenCommand(redisClient
*c
) {
3453 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3455 addReply(c
,shared
.czero
);
3458 if (o
->type
!= REDIS_LIST
) {
3459 addReply(c
,shared
.wrongtypeerr
);
3462 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",listLength(l
)));
3467 static void lindexCommand(redisClient
*c
) {
3469 int index
= atoi(c
->argv
[2]->ptr
);
3471 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3473 addReply(c
,shared
.nullbulk
);
3475 if (o
->type
!= REDIS_LIST
) {
3476 addReply(c
,shared
.wrongtypeerr
);
3478 list
*list
= o
->ptr
;
3481 ln
= listIndex(list
, index
);
3483 addReply(c
,shared
.nullbulk
);
3485 robj
*ele
= listNodeValue(ln
);
3486 addReplyBulkLen(c
,ele
);
3488 addReply(c
,shared
.crlf
);
3494 static void lsetCommand(redisClient
*c
) {
3496 int index
= atoi(c
->argv
[2]->ptr
);
3498 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3500 addReply(c
,shared
.nokeyerr
);
3502 if (o
->type
!= REDIS_LIST
) {
3503 addReply(c
,shared
.wrongtypeerr
);
3505 list
*list
= o
->ptr
;
3508 ln
= listIndex(list
, index
);
3510 addReply(c
,shared
.outofrangeerr
);
3512 robj
*ele
= listNodeValue(ln
);
3515 listNodeValue(ln
) = c
->argv
[3];
3516 incrRefCount(c
->argv
[3]);
3517 addReply(c
,shared
.ok
);
3524 static void popGenericCommand(redisClient
*c
, int where
) {
3527 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3529 addReply(c
,shared
.nullbulk
);
3531 if (o
->type
!= REDIS_LIST
) {
3532 addReply(c
,shared
.wrongtypeerr
);
3534 list
*list
= o
->ptr
;
3537 if (where
== REDIS_HEAD
)
3538 ln
= listFirst(list
);
3540 ln
= listLast(list
);
3543 addReply(c
,shared
.nullbulk
);
3545 robj
*ele
= listNodeValue(ln
);
3546 addReplyBulkLen(c
,ele
);
3548 addReply(c
,shared
.crlf
);
3549 listDelNode(list
,ln
);
3556 static void lpopCommand(redisClient
*c
) {
3557 popGenericCommand(c
,REDIS_HEAD
);
3560 static void rpopCommand(redisClient
*c
) {
3561 popGenericCommand(c
,REDIS_TAIL
);
3564 static void lrangeCommand(redisClient
*c
) {
3566 int start
= atoi(c
->argv
[2]->ptr
);
3567 int end
= atoi(c
->argv
[3]->ptr
);
3569 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3571 addReply(c
,shared
.nullmultibulk
);
3573 if (o
->type
!= REDIS_LIST
) {
3574 addReply(c
,shared
.wrongtypeerr
);
3576 list
*list
= o
->ptr
;
3578 int llen
= listLength(list
);
3582 /* convert negative indexes */
3583 if (start
< 0) start
= llen
+start
;
3584 if (end
< 0) end
= llen
+end
;
3585 if (start
< 0) start
= 0;
3586 if (end
< 0) end
= 0;
3588 /* indexes sanity checks */
3589 if (start
> end
|| start
>= llen
) {
3590 /* Out of range start or start > end result in empty list */
3591 addReply(c
,shared
.emptymultibulk
);
3594 if (end
>= llen
) end
= llen
-1;
3595 rangelen
= (end
-start
)+1;
3597 /* Return the result in form of a multi-bulk reply */
3598 ln
= listIndex(list
, start
);
3599 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",rangelen
));
3600 for (j
= 0; j
< rangelen
; j
++) {
3601 ele
= listNodeValue(ln
);
3602 addReplyBulkLen(c
,ele
);
3604 addReply(c
,shared
.crlf
);
3611 static void ltrimCommand(redisClient
*c
) {
3613 int start
= atoi(c
->argv
[2]->ptr
);
3614 int end
= atoi(c
->argv
[3]->ptr
);
3616 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3618 addReply(c
,shared
.nokeyerr
);
3620 if (o
->type
!= REDIS_LIST
) {
3621 addReply(c
,shared
.wrongtypeerr
);
3623 list
*list
= o
->ptr
;
3625 int llen
= listLength(list
);
3626 int j
, ltrim
, rtrim
;
3628 /* convert negative indexes */
3629 if (start
< 0) start
= llen
+start
;
3630 if (end
< 0) end
= llen
+end
;
3631 if (start
< 0) start
= 0;
3632 if (end
< 0) end
= 0;
3634 /* indexes sanity checks */
3635 if (start
> end
|| start
>= llen
) {
3636 /* Out of range start or start > end result in empty list */
3640 if (end
>= llen
) end
= llen
-1;
3645 /* Remove list elements to perform the trim */
3646 for (j
= 0; j
< ltrim
; j
++) {
3647 ln
= listFirst(list
);
3648 listDelNode(list
,ln
);
3650 for (j
= 0; j
< rtrim
; j
++) {
3651 ln
= listLast(list
);
3652 listDelNode(list
,ln
);
3655 addReply(c
,shared
.ok
);
3660 static void lremCommand(redisClient
*c
) {
3663 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3665 addReply(c
,shared
.czero
);
3667 if (o
->type
!= REDIS_LIST
) {
3668 addReply(c
,shared
.wrongtypeerr
);
3670 list
*list
= o
->ptr
;
3671 listNode
*ln
, *next
;
3672 int toremove
= atoi(c
->argv
[2]->ptr
);
3677 toremove
= -toremove
;
3680 ln
= fromtail
? list
->tail
: list
->head
;
3682 robj
*ele
= listNodeValue(ln
);
3684 next
= fromtail
? ln
->prev
: ln
->next
;
3685 if (compareStringObjects(ele
,c
->argv
[3]) == 0) {
3686 listDelNode(list
,ln
);
3689 if (toremove
&& removed
== toremove
) break;
3693 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",removed
));
3698 /* This is the semantic of this command:
3699 * RPOPLPUSH srclist dstlist:
3700 * IF LLEN(srclist) > 0
3701 * element = RPOP srclist
3702 * LPUSH dstlist element
3709 * The idea is to be able to get an element from a list in a reliable way
3710 * since the element is not just returned but pushed against another list
3711 * as well. This command was originally proposed by Ezra Zygmuntowicz.
3713 static void rpoplpushcommand(redisClient
*c
) {
3716 sobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3718 addReply(c
,shared
.nullbulk
);
3720 if (sobj
->type
!= REDIS_LIST
) {
3721 addReply(c
,shared
.wrongtypeerr
);
3723 list
*srclist
= sobj
->ptr
;
3724 listNode
*ln
= listLast(srclist
);
3727 addReply(c
,shared
.nullbulk
);
3729 robj
*dobj
= lookupKeyWrite(c
->db
,c
->argv
[2]);
3730 robj
*ele
= listNodeValue(ln
);
3735 /* Create the list if the key does not exist */
3736 dobj
= createListObject();
3737 dictAdd(c
->db
->dict
,c
->argv
[2],dobj
);
3738 incrRefCount(c
->argv
[2]);
3739 } else if (dobj
->type
!= REDIS_LIST
) {
3740 addReply(c
,shared
.wrongtypeerr
);
3743 /* Add the element to the target list */
3744 dstlist
= dobj
->ptr
;
3745 listAddNodeHead(dstlist
,ele
);
3748 /* Send the element to the client as reply as well */
3749 addReplyBulkLen(c
,ele
);
3751 addReply(c
,shared
.crlf
);
3753 /* Finally remove the element from the source list */
3754 listDelNode(srclist
,ln
);
3762 /* ==================================== Sets ================================ */
3764 static void saddCommand(redisClient
*c
) {
3767 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3769 set
= createSetObject();
3770 dictAdd(c
->db
->dict
,c
->argv
[1],set
);
3771 incrRefCount(c
->argv
[1]);
3773 if (set
->type
!= REDIS_SET
) {
3774 addReply(c
,shared
.wrongtypeerr
);
3778 if (dictAdd(set
->ptr
,c
->argv
[2],NULL
) == DICT_OK
) {
3779 incrRefCount(c
->argv
[2]);
3781 addReply(c
,shared
.cone
);
3783 addReply(c
,shared
.czero
);
3787 static void sremCommand(redisClient
*c
) {
3790 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3792 addReply(c
,shared
.czero
);
3794 if (set
->type
!= REDIS_SET
) {
3795 addReply(c
,shared
.wrongtypeerr
);
3798 if (dictDelete(set
->ptr
,c
->argv
[2]) == DICT_OK
) {
3800 if (htNeedsResize(set
->ptr
)) dictResize(set
->ptr
);
3801 addReply(c
,shared
.cone
);
3803 addReply(c
,shared
.czero
);
3808 static void smoveCommand(redisClient
*c
) {
3809 robj
*srcset
, *dstset
;
3811 srcset
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3812 dstset
= lookupKeyWrite(c
->db
,c
->argv
[2]);
3814 /* If the source key does not exist return 0, if it's of the wrong type
3816 if (srcset
== NULL
|| srcset
->type
!= REDIS_SET
) {
3817 addReply(c
, srcset
? shared
.wrongtypeerr
: shared
.czero
);
3820 /* Error if the destination key is not a set as well */
3821 if (dstset
&& dstset
->type
!= REDIS_SET
) {
3822 addReply(c
,shared
.wrongtypeerr
);
3825 /* Remove the element from the source set */
3826 if (dictDelete(srcset
->ptr
,c
->argv
[3]) == DICT_ERR
) {
3827 /* Key not found in the src set! return zero */
3828 addReply(c
,shared
.czero
);
3832 /* Add the element to the destination set */
3834 dstset
= createSetObject();
3835 dictAdd(c
->db
->dict
,c
->argv
[2],dstset
);
3836 incrRefCount(c
->argv
[2]);
3838 if (dictAdd(dstset
->ptr
,c
->argv
[3],NULL
) == DICT_OK
)
3839 incrRefCount(c
->argv
[3]);
3840 addReply(c
,shared
.cone
);
3843 static void sismemberCommand(redisClient
*c
) {
3846 set
= lookupKeyRead(c
->db
,c
->argv
[1]);
3848 addReply(c
,shared
.czero
);
3850 if (set
->type
!= REDIS_SET
) {
3851 addReply(c
,shared
.wrongtypeerr
);
3854 if (dictFind(set
->ptr
,c
->argv
[2]))
3855 addReply(c
,shared
.cone
);
3857 addReply(c
,shared
.czero
);
3861 static void scardCommand(redisClient
*c
) {
3865 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3867 addReply(c
,shared
.czero
);
3870 if (o
->type
!= REDIS_SET
) {
3871 addReply(c
,shared
.wrongtypeerr
);
3874 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",
3880 static void spopCommand(redisClient
*c
) {
3884 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3886 addReply(c
,shared
.nullbulk
);
3888 if (set
->type
!= REDIS_SET
) {
3889 addReply(c
,shared
.wrongtypeerr
);
3892 de
= dictGetRandomKey(set
->ptr
);
3894 addReply(c
,shared
.nullbulk
);
3896 robj
*ele
= dictGetEntryKey(de
);
3898 addReplyBulkLen(c
,ele
);
3900 addReply(c
,shared
.crlf
);
3901 dictDelete(set
->ptr
,ele
);
3902 if (htNeedsResize(set
->ptr
)) dictResize(set
->ptr
);
3908 static void srandmemberCommand(redisClient
*c
) {
3912 set
= lookupKeyRead(c
->db
,c
->argv
[1]);
3914 addReply(c
,shared
.nullbulk
);
3916 if (set
->type
!= REDIS_SET
) {
3917 addReply(c
,shared
.wrongtypeerr
);
3920 de
= dictGetRandomKey(set
->ptr
);
3922 addReply(c
,shared
.nullbulk
);
3924 robj
*ele
= dictGetEntryKey(de
);
3926 addReplyBulkLen(c
,ele
);
3928 addReply(c
,shared
.crlf
);
3933 static int qsortCompareSetsByCardinality(const void *s1
, const void *s2
) {
3934 dict
**d1
= (void*) s1
, **d2
= (void*) s2
;
3936 return dictSize(*d1
)-dictSize(*d2
);
3939 static void sinterGenericCommand(redisClient
*c
, robj
**setskeys
, unsigned long setsnum
, robj
*dstkey
) {
3940 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
3943 robj
*lenobj
= NULL
, *dstset
= NULL
;
3944 unsigned long j
, cardinality
= 0;
3946 for (j
= 0; j
< setsnum
; j
++) {
3950 lookupKeyWrite(c
->db
,setskeys
[j
]) :
3951 lookupKeyRead(c
->db
,setskeys
[j
]);
3955 deleteKey(c
->db
,dstkey
);
3956 addReply(c
,shared
.ok
);
3958 addReply(c
,shared
.nullmultibulk
);
3962 if (setobj
->type
!= REDIS_SET
) {
3964 addReply(c
,shared
.wrongtypeerr
);
3967 dv
[j
] = setobj
->ptr
;
3969 /* Sort sets from the smallest to largest, this will improve our
3970 * algorithm's performace */
3971 qsort(dv
,setsnum
,sizeof(dict
*),qsortCompareSetsByCardinality
);
3973 /* The first thing we should output is the total number of elements...
3974 * since this is a multi-bulk write, but at this stage we don't know
3975 * the intersection set size, so we use a trick, append an empty object
3976 * to the output list and save the pointer to later modify it with the
3979 lenobj
= createObject(REDIS_STRING
,NULL
);
3981 decrRefCount(lenobj
);
3983 /* If we have a target key where to store the resulting set
3984 * create this key with an empty set inside */
3985 dstset
= createSetObject();
3988 /* Iterate all the elements of the first (smallest) set, and test
3989 * the element against all the other sets, if at least one set does
3990 * not include the element it is discarded */
3991 di
= dictGetIterator(dv
[0]);
3993 while((de
= dictNext(di
)) != NULL
) {
3996 for (j
= 1; j
< setsnum
; j
++)
3997 if (dictFind(dv
[j
],dictGetEntryKey(de
)) == NULL
) break;
3999 continue; /* at least one set does not contain the member */
4000 ele
= dictGetEntryKey(de
);
4002 addReplyBulkLen(c
,ele
);
4004 addReply(c
,shared
.crlf
);
4007 dictAdd(dstset
->ptr
,ele
,NULL
);
4011 dictReleaseIterator(di
);
4014 /* Store the resulting set into the target */
4015 deleteKey(c
->db
,dstkey
);
4016 dictAdd(c
->db
->dict
,dstkey
,dstset
);
4017 incrRefCount(dstkey
);
4021 lenobj
->ptr
= sdscatprintf(sdsempty(),"*%lu\r\n",cardinality
);
4023 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",
4024 dictSize((dict
*)dstset
->ptr
)));
4030 static void sinterCommand(redisClient
*c
) {
4031 sinterGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
);
4034 static void sinterstoreCommand(redisClient
*c
) {
4035 sinterGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1]);
4038 #define REDIS_OP_UNION 0
4039 #define REDIS_OP_DIFF 1
4041 static void sunionDiffGenericCommand(redisClient
*c
, robj
**setskeys
, int setsnum
, robj
*dstkey
, int op
) {
4042 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
4045 robj
*dstset
= NULL
;
4046 int j
, cardinality
= 0;
4048 for (j
= 0; j
< setsnum
; j
++) {
4052 lookupKeyWrite(c
->db
,setskeys
[j
]) :
4053 lookupKeyRead(c
->db
,setskeys
[j
]);
4058 if (setobj
->type
!= REDIS_SET
) {
4060 addReply(c
,shared
.wrongtypeerr
);
4063 dv
[j
] = setobj
->ptr
;
4066 /* We need a temp set object to store our union. If the dstkey
4067 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
4068 * this set object will be the resulting object to set into the target key*/
4069 dstset
= createSetObject();
4071 /* Iterate all the elements of all the sets, add every element a single
4072 * time to the result set */
4073 for (j
= 0; j
< setsnum
; j
++) {
4074 if (op
== REDIS_OP_DIFF
&& j
== 0 && !dv
[j
]) break; /* result set is empty */
4075 if (!dv
[j
]) continue; /* non existing keys are like empty sets */
4077 di
= dictGetIterator(dv
[j
]);
4079 while((de
= dictNext(di
)) != NULL
) {
4082 /* dictAdd will not add the same element multiple times */
4083 ele
= dictGetEntryKey(de
);
4084 if (op
== REDIS_OP_UNION
|| j
== 0) {
4085 if (dictAdd(dstset
->ptr
,ele
,NULL
) == DICT_OK
) {
4089 } else if (op
== REDIS_OP_DIFF
) {
4090 if (dictDelete(dstset
->ptr
,ele
) == DICT_OK
) {
4095 dictReleaseIterator(di
);
4097 if (op
== REDIS_OP_DIFF
&& cardinality
== 0) break; /* result set is empty */
4100 /* Output the content of the resulting set, if not in STORE mode */
4102 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",cardinality
));
4103 di
= dictGetIterator(dstset
->ptr
);
4104 while((de
= dictNext(di
)) != NULL
) {
4107 ele
= dictGetEntryKey(de
);
4108 addReplyBulkLen(c
,ele
);
4110 addReply(c
,shared
.crlf
);
4112 dictReleaseIterator(di
);
4114 /* If we have a target key where to store the resulting set
4115 * create this key with the result set inside */
4116 deleteKey(c
->db
,dstkey
);
4117 dictAdd(c
->db
->dict
,dstkey
,dstset
);
4118 incrRefCount(dstkey
);
4123 decrRefCount(dstset
);
4125 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",
4126 dictSize((dict
*)dstset
->ptr
)));
4132 static void sunionCommand(redisClient
*c
) {
4133 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_UNION
);
4136 static void sunionstoreCommand(redisClient
*c
) {
4137 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_UNION
);
4140 static void sdiffCommand(redisClient
*c
) {
4141 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_DIFF
);
4144 static void sdiffstoreCommand(redisClient
*c
) {
4145 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_DIFF
);
4148 /* ==================================== ZSets =============================== */
4150 /* ZSETs are ordered sets using two data structures to hold the same elements
4151 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
4154 * The elements are added to an hash table mapping Redis objects to scores.
4155 * At the same time the elements are added to a skip list mapping scores
4156 * to Redis objects (so objects are sorted by scores in this "view"). */
4158 /* This skiplist implementation is almost a C translation of the original
4159 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
4160 * Alternative to Balanced Trees", modified in three ways:
4161 * a) this implementation allows for repeated values.
4162 * b) the comparison is not just by key (our 'score') but by satellite data.
4163 * c) there is a back pointer, so it's a doubly linked list with the back
4164 * pointers being only at "level 1". This allows to traverse the list
4165 * from tail to head, useful for ZREVRANGE. */
4167 static zskiplistNode
*zslCreateNode(int level
, double score
, robj
*obj
) {
4168 zskiplistNode
*zn
= zmalloc(sizeof(*zn
));
4170 zn
->forward
= zmalloc(sizeof(zskiplistNode
*) * level
);
4176 static zskiplist
*zslCreate(void) {
4180 zsl
= zmalloc(sizeof(*zsl
));
4183 zsl
->header
= zslCreateNode(ZSKIPLIST_MAXLEVEL
,0,NULL
);
4184 for (j
= 0; j
< ZSKIPLIST_MAXLEVEL
; j
++)
4185 zsl
->header
->forward
[j
] = NULL
;
4186 zsl
->header
->backward
= NULL
;
4191 static void zslFreeNode(zskiplistNode
*node
) {
4192 decrRefCount(node
->obj
);
4193 zfree(node
->forward
);
4197 static void zslFree(zskiplist
*zsl
) {
4198 zskiplistNode
*node
= zsl
->header
->forward
[0], *next
;
4200 zfree(zsl
->header
->forward
);
4203 next
= node
->forward
[0];
4210 static int zslRandomLevel(void) {
4212 while ((random()&0xFFFF) < (ZSKIPLIST_P
* 0xFFFF))
4217 static void zslInsert(zskiplist
*zsl
, double score
, robj
*obj
) {
4218 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
4222 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4223 while (x
->forward
[i
] &&
4224 (x
->forward
[i
]->score
< score
||
4225 (x
->forward
[i
]->score
== score
&&
4226 compareStringObjects(x
->forward
[i
]->obj
,obj
) < 0)))
4230 /* we assume the key is not already inside, since we allow duplicated
4231 * scores, and the re-insertion of score and redis object should never
4232 * happpen since the caller of zslInsert() should test in the hash table
4233 * if the element is already inside or not. */
4234 level
= zslRandomLevel();
4235 if (level
> zsl
->level
) {
4236 for (i
= zsl
->level
; i
< level
; i
++)
4237 update
[i
] = zsl
->header
;
4240 x
= zslCreateNode(level
,score
,obj
);
4241 for (i
= 0; i
< level
; i
++) {
4242 x
->forward
[i
] = update
[i
]->forward
[i
];
4243 update
[i
]->forward
[i
] = x
;
4245 x
->backward
= (update
[0] == zsl
->header
) ? NULL
: update
[0];
4247 x
->forward
[0]->backward
= x
;
4253 /* Delete an element with matching score/object from the skiplist. */
4254 static int zslDelete(zskiplist
*zsl
, double score
, robj
*obj
) {
4255 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
4259 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4260 while (x
->forward
[i
] &&
4261 (x
->forward
[i
]->score
< score
||
4262 (x
->forward
[i
]->score
== score
&&
4263 compareStringObjects(x
->forward
[i
]->obj
,obj
) < 0)))
4267 /* We may have multiple elements with the same score, what we need
4268 * is to find the element with both the right score and object. */
4270 if (x
&& score
== x
->score
&& compareStringObjects(x
->obj
,obj
) == 0) {
4271 for (i
= 0; i
< zsl
->level
; i
++) {
4272 if (update
[i
]->forward
[i
] != x
) break;
4273 update
[i
]->forward
[i
] = x
->forward
[i
];
4275 if (x
->forward
[0]) {
4276 x
->forward
[0]->backward
= (x
->backward
== zsl
->header
) ?
4279 zsl
->tail
= x
->backward
;
4282 while(zsl
->level
> 1 && zsl
->header
->forward
[zsl
->level
-1] == NULL
)
4287 return 0; /* not found */
4289 return 0; /* not found */
4292 /* Delete all the elements with score between min and max from the skiplist.
4293 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
4294 * Note that this function takes the reference to the hash table view of the
4295 * sorted set, in order to remove the elements from the hash table too. */
4296 static unsigned long zslDeleteRange(zskiplist
*zsl
, double min
, double max
, dict
*dict
) {
4297 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
4298 unsigned long removed
= 0;
4302 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4303 while (x
->forward
[i
] && x
->forward
[i
]->score
< min
)
4307 /* We may have multiple elements with the same score, what we need
4308 * is to find the element with both the right score and object. */
4310 while (x
&& x
->score
<= max
) {
4311 zskiplistNode
*next
;
4313 for (i
= 0; i
< zsl
->level
; i
++) {
4314 if (update
[i
]->forward
[i
] != x
) break;
4315 update
[i
]->forward
[i
] = x
->forward
[i
];
4317 if (x
->forward
[0]) {
4318 x
->forward
[0]->backward
= (x
->backward
== zsl
->header
) ?
4321 zsl
->tail
= x
->backward
;
4323 next
= x
->forward
[0];
4324 dictDelete(dict
,x
->obj
);
4326 while(zsl
->level
> 1 && zsl
->header
->forward
[zsl
->level
-1] == NULL
)
4332 return removed
; /* not found */
4335 /* Find the first node having a score equal or greater than the specified one.
4336 * Returns NULL if there is no match. */
4337 static zskiplistNode
*zslFirstWithScore(zskiplist
*zsl
, double score
) {
4342 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4343 while (x
->forward
[i
] && x
->forward
[i
]->score
< score
)
4346 /* We may have multiple elements with the same score, what we need
4347 * is to find the element with both the right score and object. */
4348 return x
->forward
[0];
4351 /* The actual Z-commands implementations */
4353 /* This generic command implements both ZADD and ZINCRBY.
4354 * scoreval is the score if the operation is a ZADD (doincrement == 0) or
4355 * the increment if the operation is a ZINCRBY (doincrement == 1). */
4356 static void zaddGenericCommand(redisClient
*c
, robj
*key
, robj
*ele
, double scoreval
, int doincrement
) {
4361 zsetobj
= lookupKeyWrite(c
->db
,key
);
4362 if (zsetobj
== NULL
) {
4363 zsetobj
= createZsetObject();
4364 dictAdd(c
->db
->dict
,key
,zsetobj
);
4367 if (zsetobj
->type
!= REDIS_ZSET
) {
4368 addReply(c
,shared
.wrongtypeerr
);
4374 /* Ok now since we implement both ZADD and ZINCRBY here the code
4375 * needs to handle the two different conditions. It's all about setting
4376 * '*score', that is, the new score to set, to the right value. */
4377 score
= zmalloc(sizeof(double));
4381 /* Read the old score. If the element was not present starts from 0 */
4382 de
= dictFind(zs
->dict
,ele
);
4384 double *oldscore
= dictGetEntryVal(de
);
4385 *score
= *oldscore
+ scoreval
;
4393 /* What follows is a simple remove and re-insert operation that is common
4394 * to both ZADD and ZINCRBY... */
4395 if (dictAdd(zs
->dict
,ele
,score
) == DICT_OK
) {
4396 /* case 1: New element */
4397 incrRefCount(ele
); /* added to hash */
4398 zslInsert(zs
->zsl
,*score
,ele
);
4399 incrRefCount(ele
); /* added to skiplist */
4402 addReplyDouble(c
,*score
);
4404 addReply(c
,shared
.cone
);
4409 /* case 2: Score update operation */
4410 de
= dictFind(zs
->dict
,ele
);
4411 redisAssert(de
!= NULL
);
4412 oldscore
= dictGetEntryVal(de
);
4413 if (*score
!= *oldscore
) {
4416 /* Remove and insert the element in the skip list with new score */
4417 deleted
= zslDelete(zs
->zsl
,*oldscore
,ele
);
4418 redisAssert(deleted
!= 0);
4419 zslInsert(zs
->zsl
,*score
,ele
);
4421 /* Update the score in the hash table */
4422 dictReplace(zs
->dict
,ele
,score
);
4428 addReplyDouble(c
,*score
);
4430 addReply(c
,shared
.czero
);
4434 static void zaddCommand(redisClient
*c
) {
4437 scoreval
= strtod(c
->argv
[2]->ptr
,NULL
);
4438 zaddGenericCommand(c
,c
->argv
[1],c
->argv
[3],scoreval
,0);
4441 static void zincrbyCommand(redisClient
*c
) {
4444 scoreval
= strtod(c
->argv
[2]->ptr
,NULL
);
4445 zaddGenericCommand(c
,c
->argv
[1],c
->argv
[3],scoreval
,1);
4448 static void zremCommand(redisClient
*c
) {
4452 zsetobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4453 if (zsetobj
== NULL
) {
4454 addReply(c
,shared
.czero
);
4460 if (zsetobj
->type
!= REDIS_ZSET
) {
4461 addReply(c
,shared
.wrongtypeerr
);
4465 de
= dictFind(zs
->dict
,c
->argv
[2]);
4467 addReply(c
,shared
.czero
);
4470 /* Delete from the skiplist */
4471 oldscore
= dictGetEntryVal(de
);
4472 deleted
= zslDelete(zs
->zsl
,*oldscore
,c
->argv
[2]);
4473 redisAssert(deleted
!= 0);
4475 /* Delete from the hash table */
4476 dictDelete(zs
->dict
,c
->argv
[2]);
4477 if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
);
4479 addReply(c
,shared
.cone
);
4483 static void zremrangebyscoreCommand(redisClient
*c
) {
4484 double min
= strtod(c
->argv
[2]->ptr
,NULL
);
4485 double max
= strtod(c
->argv
[3]->ptr
,NULL
);
4489 zsetobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4490 if (zsetobj
== NULL
) {
4491 addReply(c
,shared
.czero
);
4495 if (zsetobj
->type
!= REDIS_ZSET
) {
4496 addReply(c
,shared
.wrongtypeerr
);
4500 deleted
= zslDeleteRange(zs
->zsl
,min
,max
,zs
->dict
);
4501 if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
);
4502 server
.dirty
+= deleted
;
4503 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",deleted
));
4507 static void zrangeGenericCommand(redisClient
*c
, int reverse
) {
4509 int start
= atoi(c
->argv
[2]->ptr
);
4510 int end
= atoi(c
->argv
[3]->ptr
);
4512 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4514 addReply(c
,shared
.nullmultibulk
);
4516 if (o
->type
!= REDIS_ZSET
) {
4517 addReply(c
,shared
.wrongtypeerr
);
4519 zset
*zsetobj
= o
->ptr
;
4520 zskiplist
*zsl
= zsetobj
->zsl
;
4523 int llen
= zsl
->length
;
4527 /* convert negative indexes */
4528 if (start
< 0) start
= llen
+start
;
4529 if (end
< 0) end
= llen
+end
;
4530 if (start
< 0) start
= 0;
4531 if (end
< 0) end
= 0;
4533 /* indexes sanity checks */
4534 if (start
> end
|| start
>= llen
) {
4535 /* Out of range start or start > end result in empty list */
4536 addReply(c
,shared
.emptymultibulk
);
4539 if (end
>= llen
) end
= llen
-1;
4540 rangelen
= (end
-start
)+1;
4542 /* Return the result in form of a multi-bulk reply */
4548 ln
= zsl
->header
->forward
[0];
4550 ln
= ln
->forward
[0];
4553 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",rangelen
));
4554 for (j
= 0; j
< rangelen
; j
++) {
4556 addReplyBulkLen(c
,ele
);
4558 addReply(c
,shared
.crlf
);
4559 ln
= reverse
? ln
->backward
: ln
->forward
[0];
4565 static void zrangeCommand(redisClient
*c
) {
4566 zrangeGenericCommand(c
,0);
4569 static void zrevrangeCommand(redisClient
*c
) {
4570 zrangeGenericCommand(c
,1);
4573 static void zrangebyscoreCommand(redisClient
*c
) {
4575 double min
= strtod(c
->argv
[2]->ptr
,NULL
);
4576 double max
= strtod(c
->argv
[3]->ptr
,NULL
);
4577 int offset
= 0, limit
= -1;
4579 if (c
->argc
!= 4 && c
->argc
!= 7) {
4580 addReplySds(c
,sdsnew("-ERR wrong number of arguments\r\n"));
4582 } else if (c
->argc
== 7 && strcasecmp(c
->argv
[4]->ptr
,"limit")) {
4583 addReply(c
,shared
.syntaxerr
);
4585 } else if (c
->argc
== 7) {
4586 offset
= atoi(c
->argv
[5]->ptr
);
4587 limit
= atoi(c
->argv
[6]->ptr
);
4588 if (offset
< 0) offset
= 0;
4591 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4593 addReply(c
,shared
.nullmultibulk
);
4595 if (o
->type
!= REDIS_ZSET
) {
4596 addReply(c
,shared
.wrongtypeerr
);
4598 zset
*zsetobj
= o
->ptr
;
4599 zskiplist
*zsl
= zsetobj
->zsl
;
4602 unsigned int rangelen
= 0;
4604 /* Get the first node with the score >= min */
4605 ln
= zslFirstWithScore(zsl
,min
);
4607 /* No element matching the speciifed interval */
4608 addReply(c
,shared
.emptymultibulk
);
4612 /* We don't know in advance how many matching elements there
4613 * are in the list, so we push this object that will represent
4614 * the multi-bulk length in the output buffer, and will "fix"
4616 lenobj
= createObject(REDIS_STRING
,NULL
);
4618 decrRefCount(lenobj
);
4620 while(ln
&& ln
->score
<= max
) {
4623 ln
= ln
->forward
[0];
4626 if (limit
== 0) break;
4628 addReplyBulkLen(c
,ele
);
4630 addReply(c
,shared
.crlf
);
4631 ln
= ln
->forward
[0];
4633 if (limit
> 0) limit
--;
4635 lenobj
->ptr
= sdscatprintf(sdsempty(),"*%d\r\n",rangelen
);
4640 static void zcardCommand(redisClient
*c
) {
4644 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4646 addReply(c
,shared
.czero
);
4649 if (o
->type
!= REDIS_ZSET
) {
4650 addReply(c
,shared
.wrongtypeerr
);
4653 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",zs
->zsl
->length
));
4658 static void zscoreCommand(redisClient
*c
) {
4662 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4664 addReply(c
,shared
.nullbulk
);
4667 if (o
->type
!= REDIS_ZSET
) {
4668 addReply(c
,shared
.wrongtypeerr
);
4673 de
= dictFind(zs
->dict
,c
->argv
[2]);
4675 addReply(c
,shared
.nullbulk
);
4677 double *score
= dictGetEntryVal(de
);
4679 addReplyDouble(c
,*score
);
4685 /* ========================= Non type-specific commands ==================== */
4687 static void flushdbCommand(redisClient
*c
) {
4688 server
.dirty
+= dictSize(c
->db
->dict
);
4689 dictEmpty(c
->db
->dict
);
4690 dictEmpty(c
->db
->expires
);
4691 addReply(c
,shared
.ok
);
4694 static void flushallCommand(redisClient
*c
) {
4695 server
.dirty
+= emptyDb();
4696 addReply(c
,shared
.ok
);
4697 rdbSave(server
.dbfilename
);
4701 static redisSortOperation
*createSortOperation(int type
, robj
*pattern
) {
4702 redisSortOperation
*so
= zmalloc(sizeof(*so
));
4704 so
->pattern
= pattern
;
4708 /* Return the value associated to the key with a name obtained
4709 * substituting the first occurence of '*' in 'pattern' with 'subst' */
4710 static robj
*lookupKeyByPattern(redisDb
*db
, robj
*pattern
, robj
*subst
) {
4714 int prefixlen
, sublen
, postfixlen
;
4715 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
4719 char buf
[REDIS_SORTKEY_MAX
+1];
4722 /* If the pattern is "#" return the substitution object itself in order
4723 * to implement the "SORT ... GET #" feature. */
4724 spat
= pattern
->ptr
;
4725 if (spat
[0] == '#' && spat
[1] == '\0') {
4729 /* The substitution object may be specially encoded. If so we create
4730 * a decoded object on the fly. Otherwise getDecodedObject will just
4731 * increment the ref count, that we'll decrement later. */
4732 subst
= getDecodedObject(subst
);
4735 if (sdslen(spat
)+sdslen(ssub
)-1 > REDIS_SORTKEY_MAX
) return NULL
;
4736 p
= strchr(spat
,'*');
4738 decrRefCount(subst
);
4743 sublen
= sdslen(ssub
);
4744 postfixlen
= sdslen(spat
)-(prefixlen
+1);
4745 memcpy(keyname
.buf
,spat
,prefixlen
);
4746 memcpy(keyname
.buf
+prefixlen
,ssub
,sublen
);
4747 memcpy(keyname
.buf
+prefixlen
+sublen
,p
+1,postfixlen
);
4748 keyname
.buf
[prefixlen
+sublen
+postfixlen
] = '\0';
4749 keyname
.len
= prefixlen
+sublen
+postfixlen
;
4751 initStaticStringObject(keyobj
,((char*)&keyname
)+(sizeof(long)*2))
4752 decrRefCount(subst
);
4754 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
4755 return lookupKeyRead(db
,&keyobj
);
4758 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
4759 * the additional parameter is not standard but a BSD-specific we have to
4760 * pass sorting parameters via the global 'server' structure */
4761 static int sortCompare(const void *s1
, const void *s2
) {
4762 const redisSortObject
*so1
= s1
, *so2
= s2
;
4765 if (!server
.sort_alpha
) {
4766 /* Numeric sorting. Here it's trivial as we precomputed scores */
4767 if (so1
->u
.score
> so2
->u
.score
) {
4769 } else if (so1
->u
.score
< so2
->u
.score
) {
4775 /* Alphanumeric sorting */
4776 if (server
.sort_bypattern
) {
4777 if (!so1
->u
.cmpobj
|| !so2
->u
.cmpobj
) {
4778 /* At least one compare object is NULL */
4779 if (so1
->u
.cmpobj
== so2
->u
.cmpobj
)
4781 else if (so1
->u
.cmpobj
== NULL
)
4786 /* We have both the objects, use strcoll */
4787 cmp
= strcoll(so1
->u
.cmpobj
->ptr
,so2
->u
.cmpobj
->ptr
);
4790 /* Compare elements directly */
4793 dec1
= getDecodedObject(so1
->obj
);
4794 dec2
= getDecodedObject(so2
->obj
);
4795 cmp
= strcoll(dec1
->ptr
,dec2
->ptr
);
4800 return server
.sort_desc
? -cmp
: cmp
;
4803 /* The SORT command is the most complex command in Redis. Warning: this code
4804 * is optimized for speed and a bit less for readability */
4805 static void sortCommand(redisClient
*c
) {
4808 int desc
= 0, alpha
= 0;
4809 int limit_start
= 0, limit_count
= -1, start
, end
;
4810 int j
, dontsort
= 0, vectorlen
;
4811 int getop
= 0; /* GET operation counter */
4812 robj
*sortval
, *sortby
= NULL
, *storekey
= NULL
;
4813 redisSortObject
*vector
; /* Resulting vector to sort */
4815 /* Lookup the key to sort. It must be of the right types */
4816 sortval
= lookupKeyRead(c
->db
,c
->argv
[1]);
4817 if (sortval
== NULL
) {
4818 addReply(c
,shared
.nokeyerr
);
4821 if (sortval
->type
!= REDIS_SET
&& sortval
->type
!= REDIS_LIST
&&
4822 sortval
->type
!= REDIS_ZSET
)
4824 addReply(c
,shared
.wrongtypeerr
);
4828 /* Create a list of operations to perform for every sorted element.
4829 * Operations can be GET/DEL/INCR/DECR */
4830 operations
= listCreate();
4831 listSetFreeMethod(operations
,zfree
);
4834 /* Now we need to protect sortval incrementing its count, in the future
4835 * SORT may have options able to overwrite/delete keys during the sorting
4836 * and the sorted key itself may get destroied */
4837 incrRefCount(sortval
);
4839 /* The SORT command has an SQL-alike syntax, parse it */
4840 while(j
< c
->argc
) {
4841 int leftargs
= c
->argc
-j
-1;
4842 if (!strcasecmp(c
->argv
[j
]->ptr
,"asc")) {
4844 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"desc")) {
4846 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"alpha")) {
4848 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"limit") && leftargs
>= 2) {
4849 limit_start
= atoi(c
->argv
[j
+1]->ptr
);
4850 limit_count
= atoi(c
->argv
[j
+2]->ptr
);
4852 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"store") && leftargs
>= 1) {
4853 storekey
= c
->argv
[j
+1];
4855 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"by") && leftargs
>= 1) {
4856 sortby
= c
->argv
[j
+1];
4857 /* If the BY pattern does not contain '*', i.e. it is constant,
4858 * we don't need to sort nor to lookup the weight keys. */
4859 if (strchr(c
->argv
[j
+1]->ptr
,'*') == NULL
) dontsort
= 1;
4861 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"get") && leftargs
>= 1) {
4862 listAddNodeTail(operations
,createSortOperation(
4863 REDIS_SORT_GET
,c
->argv
[j
+1]));
4867 decrRefCount(sortval
);
4868 listRelease(operations
);
4869 addReply(c
,shared
.syntaxerr
);
4875 /* Load the sorting vector with all the objects to sort */
4876 switch(sortval
->type
) {
4877 case REDIS_LIST
: vectorlen
= listLength((list
*)sortval
->ptr
); break;
4878 case REDIS_SET
: vectorlen
= dictSize((dict
*)sortval
->ptr
); break;
4879 case REDIS_ZSET
: vectorlen
= dictSize(((zset
*)sortval
->ptr
)->dict
); break;
4880 default: vectorlen
= 0; redisAssert(0); /* Avoid GCC warning */
4882 vector
= zmalloc(sizeof(redisSortObject
)*vectorlen
);
4885 if (sortval
->type
== REDIS_LIST
) {
4886 list
*list
= sortval
->ptr
;
4890 while((ln
= listYield(list
))) {
4891 robj
*ele
= ln
->value
;
4892 vector
[j
].obj
= ele
;
4893 vector
[j
].u
.score
= 0;
4894 vector
[j
].u
.cmpobj
= NULL
;
4902 if (sortval
->type
== REDIS_SET
) {
4905 zset
*zs
= sortval
->ptr
;
4909 di
= dictGetIterator(set
);
4910 while((setele
= dictNext(di
)) != NULL
) {
4911 vector
[j
].obj
= dictGetEntryKey(setele
);
4912 vector
[j
].u
.score
= 0;
4913 vector
[j
].u
.cmpobj
= NULL
;
4916 dictReleaseIterator(di
);
4918 redisAssert(j
== vectorlen
);
4920 /* Now it's time to load the right scores in the sorting vector */
4921 if (dontsort
== 0) {
4922 for (j
= 0; j
< vectorlen
; j
++) {
4926 byval
= lookupKeyByPattern(c
->db
,sortby
,vector
[j
].obj
);
4927 if (!byval
|| byval
->type
!= REDIS_STRING
) continue;
4929 vector
[j
].u
.cmpobj
= getDecodedObject(byval
);
4931 if (byval
->encoding
== REDIS_ENCODING_RAW
) {
4932 vector
[j
].u
.score
= strtod(byval
->ptr
,NULL
);
4934 /* Don't need to decode the object if it's
4935 * integer-encoded (the only encoding supported) so
4936 * far. We can just cast it */
4937 if (byval
->encoding
== REDIS_ENCODING_INT
) {
4938 vector
[j
].u
.score
= (long)byval
->ptr
;
4940 redisAssert(1 != 1);
4945 if (vector
[j
].obj
->encoding
== REDIS_ENCODING_RAW
)
4946 vector
[j
].u
.score
= strtod(vector
[j
].obj
->ptr
,NULL
);
4948 if (vector
[j
].obj
->encoding
== REDIS_ENCODING_INT
)
4949 vector
[j
].u
.score
= (long) vector
[j
].obj
->ptr
;
4951 redisAssert(1 != 1);
4958 /* We are ready to sort the vector... perform a bit of sanity check
4959 * on the LIMIT option too. We'll use a partial version of quicksort. */
4960 start
= (limit_start
< 0) ? 0 : limit_start
;
4961 end
= (limit_count
< 0) ? vectorlen
-1 : start
+limit_count
-1;
4962 if (start
>= vectorlen
) {
4963 start
= vectorlen
-1;
4966 if (end
>= vectorlen
) end
= vectorlen
-1;
4968 if (dontsort
== 0) {
4969 server
.sort_desc
= desc
;
4970 server
.sort_alpha
= alpha
;
4971 server
.sort_bypattern
= sortby
? 1 : 0;
4972 if (sortby
&& (start
!= 0 || end
!= vectorlen
-1))
4973 pqsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
, start
,end
);
4975 qsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
);
4978 /* Send command output to the output buffer, performing the specified
4979 * GET/DEL/INCR/DECR operations if any. */
4980 outputlen
= getop
? getop
*(end
-start
+1) : end
-start
+1;
4981 if (storekey
== NULL
) {
4982 /* STORE option not specified, sent the sorting result to client */
4983 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",outputlen
));
4984 for (j
= start
; j
<= end
; j
++) {
4987 addReplyBulkLen(c
,vector
[j
].obj
);
4988 addReply(c
,vector
[j
].obj
);
4989 addReply(c
,shared
.crlf
);
4991 listRewind(operations
);
4992 while((ln
= listYield(operations
))) {
4993 redisSortOperation
*sop
= ln
->value
;
4994 robj
*val
= lookupKeyByPattern(c
->db
,sop
->pattern
,
4997 if (sop
->type
== REDIS_SORT_GET
) {
4998 if (!val
|| val
->type
!= REDIS_STRING
) {
4999 addReply(c
,shared
.nullbulk
);
5001 addReplyBulkLen(c
,val
);
5003 addReply(c
,shared
.crlf
);
5006 redisAssert(sop
->type
== REDIS_SORT_GET
); /* always fails */
5011 robj
*listObject
= createListObject();
5012 list
*listPtr
= (list
*) listObject
->ptr
;
5014 /* STORE option specified, set the sorting result as a List object */
5015 for (j
= start
; j
<= end
; j
++) {
5018 listAddNodeTail(listPtr
,vector
[j
].obj
);
5019 incrRefCount(vector
[j
].obj
);
5021 listRewind(operations
);
5022 while((ln
= listYield(operations
))) {
5023 redisSortOperation
*sop
= ln
->value
;
5024 robj
*val
= lookupKeyByPattern(c
->db
,sop
->pattern
,
5027 if (sop
->type
== REDIS_SORT_GET
) {
5028 if (!val
|| val
->type
!= REDIS_STRING
) {
5029 listAddNodeTail(listPtr
,createStringObject("",0));
5031 listAddNodeTail(listPtr
,val
);
5035 redisAssert(sop
->type
== REDIS_SORT_GET
); /* always fails */
5039 if (dictReplace(c
->db
->dict
,storekey
,listObject
)) {
5040 incrRefCount(storekey
);
5042 /* Note: we add 1 because the DB is dirty anyway since even if the
5043 * SORT result is empty a new key is set and maybe the old content
5045 server
.dirty
+= 1+outputlen
;
5046 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",outputlen
));
5050 decrRefCount(sortval
);
5051 listRelease(operations
);
5052 for (j
= 0; j
< vectorlen
; j
++) {
5053 if (sortby
&& alpha
&& vector
[j
].u
.cmpobj
)
5054 decrRefCount(vector
[j
].u
.cmpobj
);
5059 /* Create the string returned by the INFO command. This is decoupled
5060 * by the INFO command itself as we need to report the same information
5061 * on memory corruption problems. */
5062 static sds
genRedisInfoString(void) {
5064 time_t uptime
= time(NULL
)-server
.stat_starttime
;
5067 info
= sdscatprintf(sdsempty(),
5068 "redis_version:%s\r\n"
5070 "multiplexing_api:%s\r\n"
5071 "uptime_in_seconds:%ld\r\n"
5072 "uptime_in_days:%ld\r\n"
5073 "connected_clients:%d\r\n"
5074 "connected_slaves:%d\r\n"
5075 "used_memory:%zu\r\n"
5076 "changes_since_last_save:%lld\r\n"
5077 "bgsave_in_progress:%d\r\n"
5078 "last_save_time:%ld\r\n"
5079 "total_connections_received:%lld\r\n"
5080 "total_commands_processed:%lld\r\n"
5083 (sizeof(long) == 8) ? "64" : "32",
5087 listLength(server
.clients
)-listLength(server
.slaves
),
5088 listLength(server
.slaves
),
5091 server
.bgsavechildpid
!= -1,
5093 server
.stat_numconnections
,
5094 server
.stat_numcommands
,
5095 server
.masterhost
== NULL
? "master" : "slave"
5097 if (server
.masterhost
) {
5098 info
= sdscatprintf(info
,
5099 "master_host:%s\r\n"
5100 "master_port:%d\r\n"
5101 "master_link_status:%s\r\n"
5102 "master_last_io_seconds_ago:%d\r\n"
5105 (server
.replstate
== REDIS_REPL_CONNECTED
) ?
5107 server
.master
? ((int)(time(NULL
)-server
.master
->lastinteraction
)) : -1
5110 for (j
= 0; j
< server
.dbnum
; j
++) {
5111 long long keys
, vkeys
;
5113 keys
= dictSize(server
.db
[j
].dict
);
5114 vkeys
= dictSize(server
.db
[j
].expires
);
5115 if (keys
|| vkeys
) {
5116 info
= sdscatprintf(info
, "db%d:keys=%lld,expires=%lld\r\n",
5123 static void infoCommand(redisClient
*c
) {
5124 sds info
= genRedisInfoString();
5125 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n",sdslen(info
)));
5126 addReplySds(c
,info
);
5127 addReply(c
,shared
.crlf
);
5130 static void monitorCommand(redisClient
*c
) {
5131 /* ignore MONITOR if aleady slave or in monitor mode */
5132 if (c
->flags
& REDIS_SLAVE
) return;
5134 c
->flags
|= (REDIS_SLAVE
|REDIS_MONITOR
);
5136 listAddNodeTail(server
.monitors
,c
);
5137 addReply(c
,shared
.ok
);
5140 /* ================================= Expire ================================= */
5141 static int removeExpire(redisDb
*db
, robj
*key
) {
5142 if (dictDelete(db
->expires
,key
) == DICT_OK
) {
5149 static int setExpire(redisDb
*db
, robj
*key
, time_t when
) {
5150 if (dictAdd(db
->expires
,key
,(void*)when
) == DICT_ERR
) {
5158 /* Return the expire time of the specified key, or -1 if no expire
5159 * is associated with this key (i.e. the key is non volatile) */
5160 static time_t getExpire(redisDb
*db
, robj
*key
) {
5163 /* No expire? return ASAP */
5164 if (dictSize(db
->expires
) == 0 ||
5165 (de
= dictFind(db
->expires
,key
)) == NULL
) return -1;
5167 return (time_t) dictGetEntryVal(de
);
5170 static int expireIfNeeded(redisDb
*db
, robj
*key
) {
5174 /* No expire? return ASAP */
5175 if (dictSize(db
->expires
) == 0 ||
5176 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
5178 /* Lookup the expire */
5179 when
= (time_t) dictGetEntryVal(de
);
5180 if (time(NULL
) <= when
) return 0;
5182 /* Delete the key */
5183 dictDelete(db
->expires
,key
);
5184 return dictDelete(db
->dict
,key
) == DICT_OK
;
5187 static int deleteIfVolatile(redisDb
*db
, robj
*key
) {
5190 /* No expire? return ASAP */
5191 if (dictSize(db
->expires
) == 0 ||
5192 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
5194 /* Delete the key */
5196 dictDelete(db
->expires
,key
);
5197 return dictDelete(db
->dict
,key
) == DICT_OK
;
5200 static void expireGenericCommand(redisClient
*c
, robj
*key
, time_t seconds
) {
5203 de
= dictFind(c
->db
->dict
,key
);
5205 addReply(c
,shared
.czero
);
5209 if (deleteKey(c
->db
,key
)) server
.dirty
++;
5210 addReply(c
, shared
.cone
);
5213 time_t when
= time(NULL
)+seconds
;
5214 if (setExpire(c
->db
,key
,when
)) {
5215 addReply(c
,shared
.cone
);
5218 addReply(c
,shared
.czero
);
5224 static void expireCommand(redisClient
*c
) {
5225 expireGenericCommand(c
,c
->argv
[1],strtol(c
->argv
[2]->ptr
,NULL
,10));
5228 static void expireatCommand(redisClient
*c
) {
5229 expireGenericCommand(c
,c
->argv
[1],strtol(c
->argv
[2]->ptr
,NULL
,10)-time(NULL
));
5232 static void ttlCommand(redisClient
*c
) {
5236 expire
= getExpire(c
->db
,c
->argv
[1]);
5238 ttl
= (int) (expire
-time(NULL
));
5239 if (ttl
< 0) ttl
= -1;
5241 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",ttl
));
5244 /* =============================== Replication ============================= */
5246 static int syncWrite(int fd
, char *ptr
, ssize_t size
, int timeout
) {
5247 ssize_t nwritten
, ret
= size
;
5248 time_t start
= time(NULL
);
5252 if (aeWait(fd
,AE_WRITABLE
,1000) & AE_WRITABLE
) {
5253 nwritten
= write(fd
,ptr
,size
);
5254 if (nwritten
== -1) return -1;
5258 if ((time(NULL
)-start
) > timeout
) {
5266 static int syncRead(int fd
, char *ptr
, ssize_t size
, int timeout
) {
5267 ssize_t nread
, totread
= 0;
5268 time_t start
= time(NULL
);
5272 if (aeWait(fd
,AE_READABLE
,1000) & AE_READABLE
) {
5273 nread
= read(fd
,ptr
,size
);
5274 if (nread
== -1) return -1;
5279 if ((time(NULL
)-start
) > timeout
) {
5287 static int syncReadLine(int fd
, char *ptr
, ssize_t size
, int timeout
) {
5294 if (syncRead(fd
,&c
,1,timeout
) == -1) return -1;
5297 if (nread
&& *(ptr
-1) == '\r') *(ptr
-1) = '\0';
5308 static void syncCommand(redisClient
*c
) {
5309 /* ignore SYNC if aleady slave or in monitor mode */
5310 if (c
->flags
& REDIS_SLAVE
) return;
5312 /* SYNC can't be issued when the server has pending data to send to
5313 * the client about already issued commands. We need a fresh reply
5314 * buffer registering the differences between the BGSAVE and the current
5315 * dataset, so that we can copy to other slaves if needed. */
5316 if (listLength(c
->reply
) != 0) {
5317 addReplySds(c
,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
5321 redisLog(REDIS_NOTICE
,"Slave ask for synchronization");
5322 /* Here we need to check if there is a background saving operation
5323 * in progress, or if it is required to start one */
5324 if (server
.bgsavechildpid
!= -1) {
5325 /* Ok a background save is in progress. Let's check if it is a good
5326 * one for replication, i.e. if there is another slave that is
5327 * registering differences since the server forked to save */
5331 listRewind(server
.slaves
);
5332 while((ln
= listYield(server
.slaves
))) {
5334 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) break;
5337 /* Perfect, the server is already registering differences for
5338 * another slave. Set the right state, and copy the buffer. */
5339 listRelease(c
->reply
);
5340 c
->reply
= listDup(slave
->reply
);
5341 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
5342 redisLog(REDIS_NOTICE
,"Waiting for end of BGSAVE for SYNC");
5344 /* No way, we need to wait for the next BGSAVE in order to
5345 * register differences */
5346 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_START
;
5347 redisLog(REDIS_NOTICE
,"Waiting for next BGSAVE for SYNC");
5350 /* Ok we don't have a BGSAVE in progress, let's start one */
5351 redisLog(REDIS_NOTICE
,"Starting BGSAVE for SYNC");
5352 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
5353 redisLog(REDIS_NOTICE
,"Replication failed, can't BGSAVE");
5354 addReplySds(c
,sdsnew("-ERR Unalbe to perform background save\r\n"));
5357 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
5360 c
->flags
|= REDIS_SLAVE
;
5362 listAddNodeTail(server
.slaves
,c
);
5366 static void sendBulkToSlave(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
5367 redisClient
*slave
= privdata
;
5369 REDIS_NOTUSED(mask
);
5370 char buf
[REDIS_IOBUF_LEN
];
5371 ssize_t nwritten
, buflen
;
5373 if (slave
->repldboff
== 0) {
5374 /* Write the bulk write count before to transfer the DB. In theory here
5375 * we don't know how much room there is in the output buffer of the
5376 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
5377 * operations) will never be smaller than the few bytes we need. */
5380 bulkcount
= sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
5382 if (write(fd
,bulkcount
,sdslen(bulkcount
)) != (signed)sdslen(bulkcount
))
5390 lseek(slave
->repldbfd
,slave
->repldboff
,SEEK_SET
);
5391 buflen
= read(slave
->repldbfd
,buf
,REDIS_IOBUF_LEN
);
5393 redisLog(REDIS_WARNING
,"Read error sending DB to slave: %s",
5394 (buflen
== 0) ? "premature EOF" : strerror(errno
));
5398 if ((nwritten
= write(fd
,buf
,buflen
)) == -1) {
5399 redisLog(REDIS_DEBUG
,"Write error sending DB to slave: %s",
5404 slave
->repldboff
+= nwritten
;
5405 if (slave
->repldboff
== slave
->repldbsize
) {
5406 close(slave
->repldbfd
);
5407 slave
->repldbfd
= -1;
5408 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
5409 slave
->replstate
= REDIS_REPL_ONLINE
;
5410 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
,
5411 sendReplyToClient
, slave
) == AE_ERR
) {
5415 addReplySds(slave
,sdsempty());
5416 redisLog(REDIS_NOTICE
,"Synchronization with slave succeeded");
5420 /* This function is called at the end of every backgrond saving.
5421 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
5422 * otherwise REDIS_ERR is passed to the function.
5424 * The goal of this function is to handle slaves waiting for a successful
5425 * background saving in order to perform non-blocking synchronization. */
5426 static void updateSlavesWaitingBgsave(int bgsaveerr
) {
5428 int startbgsave
= 0;
5430 listRewind(server
.slaves
);
5431 while((ln
= listYield(server
.slaves
))) {
5432 redisClient
*slave
= ln
->value
;
5434 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) {
5436 slave
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
5437 } else if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) {
5438 struct redis_stat buf
;
5440 if (bgsaveerr
!= REDIS_OK
) {
5442 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE child returned an error");
5445 if ((slave
->repldbfd
= open(server
.dbfilename
,O_RDONLY
)) == -1 ||
5446 redis_fstat(slave
->repldbfd
,&buf
) == -1) {
5448 redisLog(REDIS_WARNING
,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno
));
5451 slave
->repldboff
= 0;
5452 slave
->repldbsize
= buf
.st_size
;
5453 slave
->replstate
= REDIS_REPL_SEND_BULK
;
5454 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
5455 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
, sendBulkToSlave
, slave
) == AE_ERR
) {
5462 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
5463 listRewind(server
.slaves
);
5464 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE failed");
5465 while((ln
= listYield(server
.slaves
))) {
5466 redisClient
*slave
= ln
->value
;
5468 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
)
5475 static int syncWithMaster(void) {
5476 char buf
[1024], tmpfile
[256], authcmd
[1024];
5478 int fd
= anetTcpConnect(NULL
,server
.masterhost
,server
.masterport
);
5482 redisLog(REDIS_WARNING
,"Unable to connect to MASTER: %s",
5487 /* AUTH with the master if required. */
5488 if(server
.masterauth
) {
5489 snprintf(authcmd
, 1024, "AUTH %s\r\n", server
.masterauth
);
5490 if (syncWrite(fd
, authcmd
, strlen(server
.masterauth
)+7, 5) == -1) {
5492 redisLog(REDIS_WARNING
,"Unable to AUTH to MASTER: %s",
5496 /* Read the AUTH result. */
5497 if (syncReadLine(fd
,buf
,1024,3600) == -1) {
5499 redisLog(REDIS_WARNING
,"I/O error reading auth result from MASTER: %s",
5503 if (buf
[0] != '+') {
5505 redisLog(REDIS_WARNING
,"Cannot AUTH to MASTER, is the masterauth password correct?");
5510 /* Issue the SYNC command */
5511 if (syncWrite(fd
,"SYNC \r\n",7,5) == -1) {
5513 redisLog(REDIS_WARNING
,"I/O error writing to MASTER: %s",
5517 /* Read the bulk write count */
5518 if (syncReadLine(fd
,buf
,1024,3600) == -1) {
5520 redisLog(REDIS_WARNING
,"I/O error reading bulk count from MASTER: %s",
5524 if (buf
[0] != '$') {
5526 redisLog(REDIS_WARNING
,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
5529 dumpsize
= atoi(buf
+1);
5530 redisLog(REDIS_NOTICE
,"Receiving %d bytes data dump from MASTER",dumpsize
);
5531 /* Read the bulk write data on a temp file */
5532 snprintf(tmpfile
,256,"temp-%d.%ld.rdb",(int)time(NULL
),(long int)random());
5533 dfd
= open(tmpfile
,O_CREAT
|O_WRONLY
,0644);
5536 redisLog(REDIS_WARNING
,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno
));
5540 int nread
, nwritten
;
5542 nread
= read(fd
,buf
,(dumpsize
< 1024)?dumpsize
:1024);
5544 redisLog(REDIS_WARNING
,"I/O error trying to sync with MASTER: %s",
5550 nwritten
= write(dfd
,buf
,nread
);
5551 if (nwritten
== -1) {
5552 redisLog(REDIS_WARNING
,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno
));
5560 if (rename(tmpfile
,server
.dbfilename
) == -1) {
5561 redisLog(REDIS_WARNING
,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno
));
5567 if (rdbLoad(server
.dbfilename
) != REDIS_OK
) {
5568 redisLog(REDIS_WARNING
,"Failed trying to load the MASTER synchronization DB from disk");
5572 server
.master
= createClient(fd
);
5573 server
.master
->flags
|= REDIS_MASTER
;
5574 server
.replstate
= REDIS_REPL_CONNECTED
;
5578 static void slaveofCommand(redisClient
*c
) {
5579 if (!strcasecmp(c
->argv
[1]->ptr
,"no") &&
5580 !strcasecmp(c
->argv
[2]->ptr
,"one")) {
5581 if (server
.masterhost
) {
5582 sdsfree(server
.masterhost
);
5583 server
.masterhost
= NULL
;
5584 if (server
.master
) freeClient(server
.master
);
5585 server
.replstate
= REDIS_REPL_NONE
;
5586 redisLog(REDIS_NOTICE
,"MASTER MODE enabled (user request)");
5589 sdsfree(server
.masterhost
);
5590 server
.masterhost
= sdsdup(c
->argv
[1]->ptr
);
5591 server
.masterport
= atoi(c
->argv
[2]->ptr
);
5592 if (server
.master
) freeClient(server
.master
);
5593 server
.replstate
= REDIS_REPL_CONNECT
;
5594 redisLog(REDIS_NOTICE
,"SLAVE OF %s:%d enabled (user request)",
5595 server
.masterhost
, server
.masterport
);
5597 addReply(c
,shared
.ok
);
5600 /* ============================ Maxmemory directive ======================== */
5602 /* This function gets called when 'maxmemory' is set on the config file to limit
5603 * the max memory used by the server, and we are out of memory.
5604 * This function will try to, in order:
5606 * - Free objects from the free list
5607 * - Try to remove keys with an EXPIRE set
5609 * It is not possible to free enough memory to reach used-memory < maxmemory
5610 * the server will start refusing commands that will enlarge even more the
5613 static void freeMemoryIfNeeded(void) {
5614 while (server
.maxmemory
&& zmalloc_used_memory() > server
.maxmemory
) {
5615 if (listLength(server
.objfreelist
)) {
5618 listNode
*head
= listFirst(server
.objfreelist
);
5619 o
= listNodeValue(head
);
5620 listDelNode(server
.objfreelist
,head
);
5623 int j
, k
, freed
= 0;
5625 for (j
= 0; j
< server
.dbnum
; j
++) {
5627 robj
*minkey
= NULL
;
5628 struct dictEntry
*de
;
5630 if (dictSize(server
.db
[j
].expires
)) {
5632 /* From a sample of three keys drop the one nearest to
5633 * the natural expire */
5634 for (k
= 0; k
< 3; k
++) {
5637 de
= dictGetRandomKey(server
.db
[j
].expires
);
5638 t
= (time_t) dictGetEntryVal(de
);
5639 if (minttl
== -1 || t
< minttl
) {
5640 minkey
= dictGetEntryKey(de
);
5644 deleteKey(server
.db
+j
,minkey
);
5647 if (!freed
) return; /* nothing to free... */
5652 /* ============================== Append Only file ========================== */
5654 static void feedAppendOnlyFile(struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
) {
5655 sds buf
= sdsempty();
5661 /* The DB this command was targetting is not the same as the last command
5662 * we appendend. To issue a SELECT command is needed. */
5663 if (dictid
!= server
.appendseldb
) {
5666 snprintf(seldb
,sizeof(seldb
),"%d",dictid
);
5667 buf
= sdscatprintf(buf
,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
5668 strlen(seldb
),seldb
);
5669 server
.appendseldb
= dictid
;
5672 /* "Fix" the argv vector if the command is EXPIRE. We want to translate
5673 * EXPIREs into EXPIREATs calls */
5674 if (cmd
->proc
== expireCommand
) {
5677 tmpargv
[0] = createStringObject("EXPIREAT",8);
5678 tmpargv
[1] = argv
[1];
5679 incrRefCount(argv
[1]);
5680 when
= time(NULL
)+strtol(argv
[2]->ptr
,NULL
,10);
5681 tmpargv
[2] = createObject(REDIS_STRING
,
5682 sdscatprintf(sdsempty(),"%ld",when
));
5686 /* Append the actual command */
5687 buf
= sdscatprintf(buf
,"*%d\r\n",argc
);
5688 for (j
= 0; j
< argc
; j
++) {
5691 o
= getDecodedObject(o
);
5692 buf
= sdscatprintf(buf
,"$%lu\r\n",sdslen(o
->ptr
));
5693 buf
= sdscatlen(buf
,o
->ptr
,sdslen(o
->ptr
));
5694 buf
= sdscatlen(buf
,"\r\n",2);
5698 /* Free the objects from the modified argv for EXPIREAT */
5699 if (cmd
->proc
== expireCommand
) {
5700 for (j
= 0; j
< 3; j
++)
5701 decrRefCount(argv
[j
]);
5704 /* We want to perform a single write. This should be guaranteed atomic
5705 * at least if the filesystem we are writing is a real physical one.
5706 * While this will save us against the server being killed I don't think
5707 * there is much to do about the whole server stopping for power problems
5709 nwritten
= write(server
.appendfd
,buf
,sdslen(buf
));
5710 if (nwritten
!= (signed)sdslen(buf
)) {
5711 /* Ooops, we are in troubles. The best thing to do for now is
5712 * to simply exit instead to give the illusion that everything is
5713 * working as expected. */
5714 if (nwritten
== -1) {
5715 redisLog(REDIS_WARNING
,"Exiting on error writing to the append-only file: %s",strerror(errno
));
5717 redisLog(REDIS_WARNING
,"Exiting on short write while writing to the append-only file: %s",strerror(errno
));
5721 /* If a background append only file rewriting is in progress we want to
5722 * accumulate the differences between the child DB and the current one
5723 * in a buffer, so that when the child process will do its work we
5724 * can append the differences to the new append only file. */
5725 if (server
.bgrewritechildpid
!= -1)
5726 server
.bgrewritebuf
= sdscatlen(server
.bgrewritebuf
,buf
,sdslen(buf
));
5730 if (server
.appendfsync
== APPENDFSYNC_ALWAYS
||
5731 (server
.appendfsync
== APPENDFSYNC_EVERYSEC
&&
5732 now
-server
.lastfsync
> 1))
5734 fsync(server
.appendfd
); /* Let's try to get this data on the disk */
5735 server
.lastfsync
= now
;
5739 /* In Redis commands are always executed in the context of a client, so in
5740 * order to load the append only file we need to create a fake client. */
5741 static struct redisClient
*createFakeClient(void) {
5742 struct redisClient
*c
= zmalloc(sizeof(*c
));
5746 c
->querybuf
= sdsempty();
5750 /* We set the fake client as a slave waiting for the synchronization
5751 * so that Redis will not try to send replies to this client. */
5752 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_START
;
5753 c
->reply
= listCreate();
5754 listSetFreeMethod(c
->reply
,decrRefCount
);
5755 listSetDupMethod(c
->reply
,dupClientReplyValue
);
5759 static void freeFakeClient(struct redisClient
*c
) {
5760 sdsfree(c
->querybuf
);
5761 listRelease(c
->reply
);
5765 /* Replay the append log file. On error REDIS_OK is returned. On non fatal
5766 * error (the append only file is zero-length) REDIS_ERR is returned. On
5767 * fatal error an error message is logged and the program exists. */
5768 int loadAppendOnlyFile(char *filename
) {
5769 struct redisClient
*fakeClient
;
5770 FILE *fp
= fopen(filename
,"r");
5771 struct redis_stat sb
;
5773 if (redis_fstat(fileno(fp
),&sb
) != -1 && sb
.st_size
== 0)
5777 redisLog(REDIS_WARNING
,"Fatal error: can't open the append log file for reading: %s",strerror(errno
));
5781 fakeClient
= createFakeClient();
5788 struct redisCommand
*cmd
;
5790 if (fgets(buf
,sizeof(buf
),fp
) == NULL
) {
5796 if (buf
[0] != '*') goto fmterr
;
5798 argv
= zmalloc(sizeof(robj
*)*argc
);
5799 for (j
= 0; j
< argc
; j
++) {
5800 if (fgets(buf
,sizeof(buf
),fp
) == NULL
) goto readerr
;
5801 if (buf
[0] != '$') goto fmterr
;
5802 len
= strtol(buf
+1,NULL
,10);
5803 argsds
= sdsnewlen(NULL
,len
);
5804 if (len
&& fread(argsds
,len
,1,fp
) == 0) goto fmterr
;
5805 argv
[j
] = createObject(REDIS_STRING
,argsds
);
5806 if (fread(buf
,2,1,fp
) == 0) goto fmterr
; /* discard CRLF */
5809 /* Command lookup */
5810 cmd
= lookupCommand(argv
[0]->ptr
);
5812 redisLog(REDIS_WARNING
,"Unknown command '%s' reading the append only file", argv
[0]->ptr
);
5815 /* Try object sharing and encoding */
5816 if (server
.shareobjects
) {
5818 for(j
= 1; j
< argc
; j
++)
5819 argv
[j
] = tryObjectSharing(argv
[j
]);
5821 if (cmd
->flags
& REDIS_CMD_BULK
)
5822 tryObjectEncoding(argv
[argc
-1]);
5823 /* Run the command in the context of a fake client */
5824 fakeClient
->argc
= argc
;
5825 fakeClient
->argv
= argv
;
5826 cmd
->proc(fakeClient
);
5827 /* Discard the reply objects list from the fake client */
5828 while(listLength(fakeClient
->reply
))
5829 listDelNode(fakeClient
->reply
,listFirst(fakeClient
->reply
));
5830 /* Clean up, ready for the next command */
5831 for (j
= 0; j
< argc
; j
++) decrRefCount(argv
[j
]);
5835 freeFakeClient(fakeClient
);
5840 redisLog(REDIS_WARNING
,"Unexpected end of file reading the append only file");
5842 redisLog(REDIS_WARNING
,"Unrecoverable error reading the append only file: %s", strerror(errno
));
5846 redisLog(REDIS_WARNING
,"Bad file format reading the append only file");
5850 /* Write an object into a file in the bulk format $<count>\r\n<payload>\r\n */
5851 static int fwriteBulk(FILE *fp
, robj
*obj
) {
5853 obj
= getDecodedObject(obj
);
5854 snprintf(buf
,sizeof(buf
),"$%ld\r\n",(long)sdslen(obj
->ptr
));
5855 if (fwrite(buf
,strlen(buf
),1,fp
) == 0) goto err
;
5856 if (fwrite(obj
->ptr
,sdslen(obj
->ptr
),1,fp
) == 0) goto err
;
5857 if (fwrite("\r\n",2,1,fp
) == 0) goto err
;
5865 /* Write a double value in bulk format $<count>\r\n<payload>\r\n */
5866 static int fwriteBulkDouble(FILE *fp
, double d
) {
5867 char buf
[128], dbuf
[128];
5869 snprintf(dbuf
,sizeof(dbuf
),"%.17g\r\n",d
);
5870 snprintf(buf
,sizeof(buf
),"$%lu\r\n",(unsigned long)strlen(dbuf
)-2);
5871 if (fwrite(buf
,strlen(buf
),1,fp
) == 0) return 0;
5872 if (fwrite(dbuf
,strlen(dbuf
),1,fp
) == 0) return 0;
5876 /* Write a long value in bulk format $<count>\r\n<payload>\r\n */
5877 static int fwriteBulkLong(FILE *fp
, long l
) {
5878 char buf
[128], lbuf
[128];
5880 snprintf(lbuf
,sizeof(lbuf
),"%ld\r\n",l
);
5881 snprintf(buf
,sizeof(buf
),"$%lu\r\n",(unsigned long)strlen(lbuf
)-2);
5882 if (fwrite(buf
,strlen(buf
),1,fp
) == 0) return 0;
5883 if (fwrite(lbuf
,strlen(lbuf
),1,fp
) == 0) return 0;
5887 /* Write a sequence of commands able to fully rebuild the dataset into
5888 * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */
5889 static int rewriteAppendOnlyFile(char *filename
) {
5890 dictIterator
*di
= NULL
;
5895 time_t now
= time(NULL
);
5897 /* Note that we have to use a different temp name here compared to the
5898 * one used by rewriteAppendOnlyFileBackground() function. */
5899 snprintf(tmpfile
,256,"temp-rewriteaof-%d.aof", (int) getpid());
5900 fp
= fopen(tmpfile
,"w");
5902 redisLog(REDIS_WARNING
, "Failed rewriting the append only file: %s", strerror(errno
));
5905 for (j
= 0; j
< server
.dbnum
; j
++) {
5906 char selectcmd
[] = "*2\r\n$6\r\nSELECT\r\n";
5907 redisDb
*db
= server
.db
+j
;
5909 if (dictSize(d
) == 0) continue;
5910 di
= dictGetIterator(d
);
5916 /* SELECT the new DB */
5917 if (fwrite(selectcmd
,sizeof(selectcmd
)-1,1,fp
) == 0) goto werr
;
5918 if (fwriteBulkLong(fp
,j
) == 0) goto werr
;
5920 /* Iterate this DB writing every entry */
5921 while((de
= dictNext(di
)) != NULL
) {
5922 robj
*key
= dictGetEntryKey(de
);
5923 robj
*o
= dictGetEntryVal(de
);
5924 time_t expiretime
= getExpire(db
,key
);
5926 /* Save the key and associated value */
5927 if (o
->type
== REDIS_STRING
) {
5928 /* Emit a SET command */
5929 char cmd
[]="*3\r\n$3\r\nSET\r\n";
5930 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
5932 if (fwriteBulk(fp
,key
) == 0) goto werr
;
5933 if (fwriteBulk(fp
,o
) == 0) goto werr
;
5934 } else if (o
->type
== REDIS_LIST
) {
5935 /* Emit the RPUSHes needed to rebuild the list */
5936 list
*list
= o
->ptr
;
5940 while((ln
= listYield(list
))) {
5941 char cmd
[]="*3\r\n$5\r\nRPUSH\r\n";
5942 robj
*eleobj
= listNodeValue(ln
);
5944 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
5945 if (fwriteBulk(fp
,key
) == 0) goto werr
;
5946 if (fwriteBulk(fp
,eleobj
) == 0) goto werr
;
5948 } else if (o
->type
== REDIS_SET
) {
5949 /* Emit the SADDs needed to rebuild the set */
5951 dictIterator
*di
= dictGetIterator(set
);
5954 while((de
= dictNext(di
)) != NULL
) {
5955 char cmd
[]="*3\r\n$4\r\nSADD\r\n";
5956 robj
*eleobj
= dictGetEntryKey(de
);
5958 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
5959 if (fwriteBulk(fp
,key
) == 0) goto werr
;
5960 if (fwriteBulk(fp
,eleobj
) == 0) goto werr
;
5962 dictReleaseIterator(di
);
5963 } else if (o
->type
== REDIS_ZSET
) {
5964 /* Emit the ZADDs needed to rebuild the sorted set */
5966 dictIterator
*di
= dictGetIterator(zs
->dict
);
5969 while((de
= dictNext(di
)) != NULL
) {
5970 char cmd
[]="*4\r\n$4\r\nZADD\r\n";
5971 robj
*eleobj
= dictGetEntryKey(de
);
5972 double *score
= dictGetEntryVal(de
);
5974 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
5975 if (fwriteBulk(fp
,key
) == 0) goto werr
;
5976 if (fwriteBulkDouble(fp
,*score
) == 0) goto werr
;
5977 if (fwriteBulk(fp
,eleobj
) == 0) goto werr
;
5979 dictReleaseIterator(di
);
5981 redisAssert(0 != 0);
5983 /* Save the expire time */
5984 if (expiretime
!= -1) {
5985 char cmd
[]="*3\r\n$6\r\nEXPIRE\r\n";
5986 /* If this key is already expired skip it */
5987 if (expiretime
< now
) continue;
5988 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
5989 if (fwriteBulk(fp
,key
) == 0) goto werr
;
5990 if (fwriteBulkLong(fp
,expiretime
) == 0) goto werr
;
5993 dictReleaseIterator(di
);
5996 /* Make sure data will not remain on the OS's output buffers */
6001 /* Use RENAME to make sure the DB file is changed atomically only
6002 * if the generate DB file is ok. */
6003 if (rename(tmpfile
,filename
) == -1) {
6004 redisLog(REDIS_WARNING
,"Error moving temp append only file on the final destination: %s", strerror(errno
));
6008 redisLog(REDIS_NOTICE
,"SYNC append only file rewrite performed");
6014 redisLog(REDIS_WARNING
,"Write error writing append only fileon disk: %s", strerror(errno
));
6015 if (di
) dictReleaseIterator(di
);
6019 /* This is how rewriting of the append only file in background works:
6021 * 1) The user calls BGREWRITEAOF
6022 * 2) Redis calls this function, that forks():
6023 * 2a) the child rewrite the append only file in a temp file.
6024 * 2b) the parent accumulates differences in server.bgrewritebuf.
6025 * 3) When the child finished '2a' exists.
6026 * 4) The parent will trap the exit code, if it's OK, will append the
6027 * data accumulated into server.bgrewritebuf into the temp file, and
6028 * finally will rename(2) the temp file in the actual file name.
6029 * The the new file is reopened as the new append only file. Profit!
6031 static int rewriteAppendOnlyFileBackground(void) {
6034 if (server
.bgrewritechildpid
!= -1) return REDIS_ERR
;
6035 if ((childpid
= fork()) == 0) {
6040 snprintf(tmpfile
,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
6041 if (rewriteAppendOnlyFile(tmpfile
) == REDIS_OK
) {
6048 if (childpid
== -1) {
6049 redisLog(REDIS_WARNING
,
6050 "Can't rewrite append only file in background: fork: %s",
6054 redisLog(REDIS_NOTICE
,
6055 "Background append only file rewriting started by pid %d",childpid
);
6056 server
.bgrewritechildpid
= childpid
;
6057 /* We set appendseldb to -1 in order to force the next call to the
6058 * feedAppendOnlyFile() to issue a SELECT command, so the differences
6059 * accumulated by the parent into server.bgrewritebuf will start
6060 * with a SELECT statement and it will be safe to merge. */
6061 server
.appendseldb
= -1;
6064 return REDIS_OK
; /* unreached */
6067 static void bgrewriteaofCommand(redisClient
*c
) {
6068 if (server
.bgrewritechildpid
!= -1) {
6069 addReplySds(c
,sdsnew("-ERR background append only file rewriting already in progress\r\n"));
6072 if (rewriteAppendOnlyFileBackground() == REDIS_OK
) {
6073 addReply(c
,shared
.ok
);
6075 addReply(c
,shared
.err
);
6079 static void aofRemoveTempFile(pid_t childpid
) {
6082 snprintf(tmpfile
,256,"temp-rewriteaof-bg-%d.aof", (int) childpid
);
6086 /* ================================= Debugging ============================== */
6088 static void debugCommand(redisClient
*c
) {
6089 if (!strcasecmp(c
->argv
[1]->ptr
,"segfault")) {
6091 } else if (!strcasecmp(c
->argv
[1]->ptr
,"reload")) {
6092 if (rdbSave(server
.dbfilename
) != REDIS_OK
) {
6093 addReply(c
,shared
.err
);
6097 if (rdbLoad(server
.dbfilename
) != REDIS_OK
) {
6098 addReply(c
,shared
.err
);
6101 redisLog(REDIS_WARNING
,"DB reloaded by DEBUG RELOAD");
6102 addReply(c
,shared
.ok
);
6103 } else if (!strcasecmp(c
->argv
[1]->ptr
,"object") && c
->argc
== 3) {
6104 dictEntry
*de
= dictFind(c
->db
->dict
,c
->argv
[2]);
6108 addReply(c
,shared
.nokeyerr
);
6111 key
= dictGetEntryKey(de
);
6112 val
= dictGetEntryVal(de
);
6113 addReplySds(c
,sdscatprintf(sdsempty(),
6114 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
6115 (void*)key
, key
->refcount
, (void*)val
, val
->refcount
,
6118 addReplySds(c
,sdsnew(
6119 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>|RELOAD]\r\n"));
6123 static void _redisAssert(char *estr
) {
6124 redisLog(REDIS_WARNING
,"=== ASSERTION FAILED ===");
6125 redisLog(REDIS_WARNING
,"==> %s\n",estr
);
6126 #ifdef HAVE_BACKTRACE
6127 redisLog(REDIS_WARNING
,"(forcing SIGSEGV in order to print the stack trace)");
6132 /* =================================== Main! ================================ */
6135 int linuxOvercommitMemoryValue(void) {
6136 FILE *fp
= fopen("/proc/sys/vm/overcommit_memory","r");
6140 if (fgets(buf
,64,fp
) == NULL
) {
6149 void linuxOvercommitMemoryWarning(void) {
6150 if (linuxOvercommitMemoryValue() == 0) {
6151 redisLog(REDIS_WARNING
,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
6154 #endif /* __linux__ */
6156 static void daemonize(void) {
6160 if (fork() != 0) exit(0); /* parent exits */
6161 setsid(); /* create a new session */
6163 /* Every output goes to /dev/null. If Redis is daemonized but
6164 * the 'logfile' is set to 'stdout' in the configuration file
6165 * it will not log at all. */
6166 if ((fd
= open("/dev/null", O_RDWR
, 0)) != -1) {
6167 dup2(fd
, STDIN_FILENO
);
6168 dup2(fd
, STDOUT_FILENO
);
6169 dup2(fd
, STDERR_FILENO
);
6170 if (fd
> STDERR_FILENO
) close(fd
);
6172 /* Try to write the pid file */
6173 fp
= fopen(server
.pidfile
,"w");
6175 fprintf(fp
,"%d\n",getpid());
6180 int main(int argc
, char **argv
) {
6183 resetServerSaveParams();
6184 loadServerConfig(argv
[1]);
6185 } else if (argc
> 2) {
6186 fprintf(stderr
,"Usage: ./redis-server [/path/to/redis.conf]\n");
6189 redisLog(REDIS_WARNING
,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
6192 if (server
.daemonize
) daemonize();
6193 redisLog(REDIS_NOTICE
,"Server started, Redis version " REDIS_VERSION
);
6195 linuxOvercommitMemoryWarning();
6197 if (server
.appendonly
) {
6198 if (loadAppendOnlyFile(server
.appendfilename
) == REDIS_OK
)
6199 redisLog(REDIS_NOTICE
,"DB loaded from append only file");
6201 if (rdbLoad(server
.dbfilename
) == REDIS_OK
)
6202 redisLog(REDIS_NOTICE
,"DB loaded from disk");
6204 if (aeCreateFileEvent(server
.el
, server
.fd
, AE_READABLE
,
6205 acceptHandler
, NULL
) == AE_ERR
) oom("creating file event");
6206 redisLog(REDIS_NOTICE
,"The server is now ready to accept connections on port %d", server
.port
);
6208 aeDeleteEventLoop(server
.el
);
6212 /* ============================= Backtrace support ========================= */
6214 #ifdef HAVE_BACKTRACE
6215 static char *findFuncName(void *pointer
, unsigned long *offset
);
6217 static void *getMcontextEip(ucontext_t
*uc
) {
6218 #if defined(__FreeBSD__)
6219 return (void*) uc
->uc_mcontext
.mc_eip
;
6220 #elif defined(__dietlibc__)
6221 return (void*) uc
->uc_mcontext
.eip
;
6222 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
6224 return (void*) uc
->uc_mcontext
->__ss
.__rip
;
6226 return (void*) uc
->uc_mcontext
->__ss
.__eip
;
6228 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
6229 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
6230 return (void*) uc
->uc_mcontext
->__ss
.__rip
;
6232 return (void*) uc
->uc_mcontext
->__ss
.__eip
;
6234 #elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
6235 return (void*) uc
->uc_mcontext
.gregs
[REG_EIP
];
6236 #elif defined(__ia64__) /* Linux IA64 */
6237 return (void*) uc
->uc_mcontext
.sc_ip
;
6243 static void segvHandler(int sig
, siginfo_t
*info
, void *secret
) {
6245 char **messages
= NULL
;
6246 int i
, trace_size
= 0;
6247 unsigned long offset
=0;
6248 ucontext_t
*uc
= (ucontext_t
*) secret
;
6250 REDIS_NOTUSED(info
);
6252 redisLog(REDIS_WARNING
,
6253 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION
, sig
);
6254 infostring
= genRedisInfoString();
6255 redisLog(REDIS_WARNING
, "%s",infostring
);
6256 /* It's not safe to sdsfree() the returned string under memory
6257 * corruption conditions. Let it leak as we are going to abort */
6259 trace_size
= backtrace(trace
, 100);
6260 /* overwrite sigaction with caller's address */
6261 if (getMcontextEip(uc
) != NULL
) {
6262 trace
[1] = getMcontextEip(uc
);
6264 messages
= backtrace_symbols(trace
, trace_size
);
6266 for (i
=1; i
<trace_size
; ++i
) {
6267 char *fn
= findFuncName(trace
[i
], &offset
), *p
;
6269 p
= strchr(messages
[i
],'+');
6270 if (!fn
|| (p
&& ((unsigned long)strtol(p
+1,NULL
,10)) < offset
)) {
6271 redisLog(REDIS_WARNING
,"%s", messages
[i
]);
6273 redisLog(REDIS_WARNING
,"%d redis-server %p %s + %d", i
, trace
[i
], fn
, (unsigned int)offset
);
6276 // free(messages); Don't call free() with possibly corrupted memory.
6280 static void setupSigSegvAction(void) {
6281 struct sigaction act
;
6283 sigemptyset (&act
.sa_mask
);
6284 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
6285 * is used. Otherwise, sa_handler is used */
6286 act
.sa_flags
= SA_NODEFER
| SA_ONSTACK
| SA_RESETHAND
| SA_SIGINFO
;
6287 act
.sa_sigaction
= segvHandler
;
6288 sigaction (SIGSEGV
, &act
, NULL
);
6289 sigaction (SIGBUS
, &act
, NULL
);
6290 sigaction (SIGFPE
, &act
, NULL
);
6291 sigaction (SIGILL
, &act
, NULL
);
6292 sigaction (SIGBUS
, &act
, NULL
);
6296 #include "staticsymbols.h"
6297 /* This function try to convert a pointer into a function name. It's used in
6298 * oreder to provide a backtrace under segmentation fault that's able to
6299 * display functions declared as static (otherwise the backtrace is useless). */
6300 static char *findFuncName(void *pointer
, unsigned long *offset
){
6302 unsigned long off
, minoff
= 0;
6304 /* Try to match against the Symbol with the smallest offset */
6305 for (i
=0; symsTable
[i
].pointer
; i
++) {
6306 unsigned long lp
= (unsigned long) pointer
;
6308 if (lp
!= (unsigned long)-1 && lp
>= symsTable
[i
].pointer
) {
6309 off
=lp
-symsTable
[i
].pointer
;
6310 if (ret
< 0 || off
< minoff
) {
6316 if (ret
== -1) return NULL
;
6318 return symsTable
[ret
].name
;
6320 #else /* HAVE_BACKTRACE */
6321 static void setupSigSegvAction(void) {
6323 #endif /* HAVE_BACKTRACE */