2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
30 #define REDIS_VERSION "1.1.91"
40 #define __USE_POSIX199309
46 #endif /* HAVE_BACKTRACE */
54 #include <arpa/inet.h>
58 #include <sys/resource.h>
64 #include "solarisfixes.h"
68 #include "ae.h" /* Event driven programming library */
69 #include "sds.h" /* Dynamic safe strings */
70 #include "anet.h" /* Networking the easy way */
71 #include "dict.h" /* Hash tables */
72 #include "adlist.h" /* Linked lists */
73 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
74 #include "lzf.h" /* LZF compression library */
75 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
81 /* Static server configuration */
82 #define REDIS_SERVERPORT 6379 /* TCP port */
83 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
84 #define REDIS_IOBUF_LEN 1024
85 #define REDIS_LOADBUF_LEN 1024
86 #define REDIS_STATIC_ARGS 4
87 #define REDIS_DEFAULT_DBNUM 16
88 #define REDIS_CONFIGLINE_MAX 1024
89 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
90 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
91 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
92 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
93 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
95 /* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */
96 #define REDIS_WRITEV_THRESHOLD 3
97 /* Max number of iovecs used for each writev call */
98 #define REDIS_WRITEV_IOVEC_COUNT 256
100 /* Hash table parameters */
101 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
104 #define REDIS_CMD_BULK 1 /* Bulk write command */
105 #define REDIS_CMD_INLINE 2 /* Inline command */
106 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
107 this flags will return an error when the 'maxmemory' option is set in the
108 config file and the server is using more than maxmemory bytes of memory.
109 In short this commands are denied on low memory conditions. */
110 #define REDIS_CMD_DENYOOM 4
113 #define REDIS_STRING 0
119 /* Objects encoding */
120 #define REDIS_ENCODING_RAW 0 /* Raw representation */
121 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
123 /* Object types only used for dumping to disk */
124 #define REDIS_EXPIRETIME 253
125 #define REDIS_SELECTDB 254
126 #define REDIS_EOF 255
128 /* Defines related to the dump file format. To store 32 bits lengths for short
129 * keys requires a lot of space, so we check the most significant 2 bits of
130 * the first byte to interpreter the length:
132 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
133 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
134 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
135 * 11|000000 this means: specially encoded object will follow. The six bits
136 * number specify the kind of object that follows.
137 * See the REDIS_RDB_ENC_* defines.
139 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
140 * values, will fit inside. */
141 #define REDIS_RDB_6BITLEN 0
142 #define REDIS_RDB_14BITLEN 1
143 #define REDIS_RDB_32BITLEN 2
144 #define REDIS_RDB_ENCVAL 3
145 #define REDIS_RDB_LENERR UINT_MAX
147 /* When a length of a string object stored on disk has the first two bits
148 * set, the remaining two bits specify a special encoding for the object
149 * accordingly to the following defines: */
150 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
151 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
152 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
153 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
156 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
157 #define REDIS_SLAVE 2 /* This client is a slave server */
158 #define REDIS_MASTER 4 /* This client is a master server */
159 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
161 /* Slave replication state - slave side */
162 #define REDIS_REPL_NONE 0 /* No active replication */
163 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
164 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
166 /* Slave replication state - from the point of view of master
167 * Note that in SEND_BULK and ONLINE state the slave receives new updates
168 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
169 * to start the next background saving in order to send updates to it. */
170 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
171 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
172 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
173 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
175 /* List related stuff */
179 /* Sort operations */
180 #define REDIS_SORT_GET 0
181 #define REDIS_SORT_ASC 1
182 #define REDIS_SORT_DESC 2
183 #define REDIS_SORTKEY_MAX 1024
186 #define REDIS_DEBUG 0
187 #define REDIS_NOTICE 1
188 #define REDIS_WARNING 2
190 /* Anti-warning macro... */
191 #define REDIS_NOTUSED(V) ((void) V)
193 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
194 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
196 /* Append only defines */
197 #define APPENDFSYNC_NO 0
198 #define APPENDFSYNC_ALWAYS 1
199 #define APPENDFSYNC_EVERYSEC 2
201 /* We can print the stacktrace, so our assert is defined this way: */
202 #define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e),exit(1)))
203 static void _redisAssert(char *estr
);
205 /*================================= Data types ============================== */
207 /* A redis object, that is a type able to hold a string / list / set */
208 typedef struct redisObject
{
211 unsigned char encoding
;
212 unsigned char notused
[2];
216 /* Macro used to initalize a Redis object allocated on the stack.
217 * Note that this macro is taken near the structure definition to make sure
218 * we'll update it when the structure is changed, to avoid bugs like
219 * bug #85 introduced exactly in this way. */
220 #define initStaticStringObject(_var,_ptr) do { \
222 _var.type = REDIS_STRING; \
223 _var.encoding = REDIS_ENCODING_RAW; \
227 typedef struct redisDb
{
233 /* With multiplexing we need to take per-clinet state.
234 * Clients are taken in a liked list. */
235 typedef struct redisClient
{
240 robj
**argv
, **mbargv
;
242 int bulklen
; /* bulk read len. -1 if not in bulk read mode */
243 int multibulk
; /* multi bulk command format active */
246 time_t lastinteraction
; /* time of the last interaction, used for timeout */
247 int flags
; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
248 int slaveseldb
; /* slave selected db, if this client is a slave */
249 int authenticated
; /* when requirepass is non-NULL */
250 int replstate
; /* replication state if this is a slave */
251 int repldbfd
; /* replication DB file descriptor */
252 long repldboff
; /* replication DB file offset */
253 off_t repldbsize
; /* replication DB file size */
261 /* Global server state structure */
267 unsigned int sharingpoolsize
;
268 long long dirty
; /* changes to DB from the last save */
270 list
*slaves
, *monitors
;
271 char neterr
[ANET_ERR_LEN
];
273 int cronloops
; /* number of times the cron function run */
274 list
*objfreelist
; /* A list of freed objects to avoid malloc() */
275 time_t lastsave
; /* Unix time of last save succeeede */
276 size_t usedmemory
; /* Used memory in megabytes */
277 /* Fields used only for stats */
278 time_t stat_starttime
; /* server start time */
279 long long stat_numcommands
; /* number of processed commands */
280 long long stat_numconnections
; /* number of connections received */
293 pid_t bgsavechildpid
;
294 pid_t bgrewritechildpid
;
295 sds bgrewritebuf
; /* buffer taken by parent during oppend only rewrite */
296 struct saveparam
*saveparams
;
301 char *appendfilename
;
304 /* Replication related */
309 redisClient
*master
; /* client that is master for this slave */
311 unsigned int maxclients
;
312 unsigned long maxmemory
;
313 /* Sort parameters - qsort_r() is only available under BSD so we
314 * have to take this state global, in order to pass it to sortCompare() */
320 typedef void redisCommandProc(redisClient
*c
);
321 struct redisCommand
{
323 redisCommandProc
*proc
;
328 struct redisFunctionSym
{
330 unsigned long pointer
;
333 typedef struct _redisSortObject
{
341 typedef struct _redisSortOperation
{
344 } redisSortOperation
;
346 /* ZSETs use a specialized version of Skiplists */
348 typedef struct zskiplistNode
{
349 struct zskiplistNode
**forward
;
350 struct zskiplistNode
*backward
;
355 typedef struct zskiplist
{
356 struct zskiplistNode
*header
, *tail
;
357 unsigned long length
;
361 typedef struct zset
{
366 /* Our shared "common" objects */
368 struct sharedObjectsStruct
{
369 robj
*crlf
, *ok
, *err
, *emptybulk
, *czero
, *cone
, *pong
, *space
,
370 *colon
, *nullbulk
, *nullmultibulk
,
371 *emptymultibulk
, *wrongtypeerr
, *nokeyerr
, *syntaxerr
, *sameobjecterr
,
372 *outofrangeerr
, *plus
,
373 *select0
, *select1
, *select2
, *select3
, *select4
,
374 *select5
, *select6
, *select7
, *select8
, *select9
;
377 /* Global vars that are actally used as constants. The following double
378 * values are used for double on-disk serialization, and are initialized
379 * at runtime to avoid strange compiler optimizations. */
381 static double R_Zero
, R_PosInf
, R_NegInf
, R_Nan
;
383 /*================================ Prototypes =============================== */
385 static void freeStringObject(robj
*o
);
386 static void freeListObject(robj
*o
);
387 static void freeSetObject(robj
*o
);
388 static void decrRefCount(void *o
);
389 static robj
*createObject(int type
, void *ptr
);
390 static void freeClient(redisClient
*c
);
391 static int rdbLoad(char *filename
);
392 static void addReply(redisClient
*c
, robj
*obj
);
393 static void addReplySds(redisClient
*c
, sds s
);
394 static void incrRefCount(robj
*o
);
395 static int rdbSaveBackground(char *filename
);
396 static robj
*createStringObject(char *ptr
, size_t len
);
397 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
);
398 static void feedAppendOnlyFile(struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
);
399 static int syncWithMaster(void);
400 static robj
*tryObjectSharing(robj
*o
);
401 static int tryObjectEncoding(robj
*o
);
402 static robj
*getDecodedObject(robj
*o
);
403 static int removeExpire(redisDb
*db
, robj
*key
);
404 static int expireIfNeeded(redisDb
*db
, robj
*key
);
405 static int deleteIfVolatile(redisDb
*db
, robj
*key
);
406 static int deleteKey(redisDb
*db
, robj
*key
);
407 static time_t getExpire(redisDb
*db
, robj
*key
);
408 static int setExpire(redisDb
*db
, robj
*key
, time_t when
);
409 static void updateSlavesWaitingBgsave(int bgsaveerr
);
410 static void freeMemoryIfNeeded(void);
411 static int processCommand(redisClient
*c
);
412 static void setupSigSegvAction(void);
413 static void rdbRemoveTempFile(pid_t childpid
);
414 static void aofRemoveTempFile(pid_t childpid
);
415 static size_t stringObjectLen(robj
*o
);
416 static void processInputBuffer(redisClient
*c
);
417 static zskiplist
*zslCreate(void);
418 static void zslFree(zskiplist
*zsl
);
419 static void zslInsert(zskiplist
*zsl
, double score
, robj
*obj
);
420 static void sendReplyToClientWritev(aeEventLoop
*el
, int fd
, void *privdata
, int mask
);
422 static void authCommand(redisClient
*c
);
423 static void pingCommand(redisClient
*c
);
424 static void echoCommand(redisClient
*c
);
425 static void setCommand(redisClient
*c
);
426 static void setnxCommand(redisClient
*c
);
427 static void getCommand(redisClient
*c
);
428 static void delCommand(redisClient
*c
);
429 static void existsCommand(redisClient
*c
);
430 static void incrCommand(redisClient
*c
);
431 static void decrCommand(redisClient
*c
);
432 static void incrbyCommand(redisClient
*c
);
433 static void decrbyCommand(redisClient
*c
);
434 static void selectCommand(redisClient
*c
);
435 static void randomkeyCommand(redisClient
*c
);
436 static void keysCommand(redisClient
*c
);
437 static void dbsizeCommand(redisClient
*c
);
438 static void lastsaveCommand(redisClient
*c
);
439 static void saveCommand(redisClient
*c
);
440 static void bgsaveCommand(redisClient
*c
);
441 static void bgrewriteaofCommand(redisClient
*c
);
442 static void shutdownCommand(redisClient
*c
);
443 static void moveCommand(redisClient
*c
);
444 static void renameCommand(redisClient
*c
);
445 static void renamenxCommand(redisClient
*c
);
446 static void lpushCommand(redisClient
*c
);
447 static void rpushCommand(redisClient
*c
);
448 static void lpopCommand(redisClient
*c
);
449 static void rpopCommand(redisClient
*c
);
450 static void llenCommand(redisClient
*c
);
451 static void lindexCommand(redisClient
*c
);
452 static void lrangeCommand(redisClient
*c
);
453 static void ltrimCommand(redisClient
*c
);
454 static void typeCommand(redisClient
*c
);
455 static void lsetCommand(redisClient
*c
);
456 static void saddCommand(redisClient
*c
);
457 static void sremCommand(redisClient
*c
);
458 static void smoveCommand(redisClient
*c
);
459 static void sismemberCommand(redisClient
*c
);
460 static void scardCommand(redisClient
*c
);
461 static void spopCommand(redisClient
*c
);
462 static void srandmemberCommand(redisClient
*c
);
463 static void sinterCommand(redisClient
*c
);
464 static void sinterstoreCommand(redisClient
*c
);
465 static void sunionCommand(redisClient
*c
);
466 static void sunionstoreCommand(redisClient
*c
);
467 static void sdiffCommand(redisClient
*c
);
468 static void sdiffstoreCommand(redisClient
*c
);
469 static void syncCommand(redisClient
*c
);
470 static void flushdbCommand(redisClient
*c
);
471 static void flushallCommand(redisClient
*c
);
472 static void sortCommand(redisClient
*c
);
473 static void lremCommand(redisClient
*c
);
474 static void rpoplpushcommand(redisClient
*c
);
475 static void infoCommand(redisClient
*c
);
476 static void mgetCommand(redisClient
*c
);
477 static void monitorCommand(redisClient
*c
);
478 static void expireCommand(redisClient
*c
);
479 static void expireatCommand(redisClient
*c
);
480 static void getsetCommand(redisClient
*c
);
481 static void ttlCommand(redisClient
*c
);
482 static void slaveofCommand(redisClient
*c
);
483 static void debugCommand(redisClient
*c
);
484 static void msetCommand(redisClient
*c
);
485 static void msetnxCommand(redisClient
*c
);
486 static void zaddCommand(redisClient
*c
);
487 static void zincrbyCommand(redisClient
*c
);
488 static void zrangeCommand(redisClient
*c
);
489 static void zrangebyscoreCommand(redisClient
*c
);
490 static void zrevrangeCommand(redisClient
*c
);
491 static void zcardCommand(redisClient
*c
);
492 static void zremCommand(redisClient
*c
);
493 static void zscoreCommand(redisClient
*c
);
494 static void zremrangebyscoreCommand(redisClient
*c
);
496 /*================================= Globals ================================= */
499 static struct redisServer server
; /* server global state */
500 static struct redisCommand cmdTable
[] = {
501 {"get",getCommand
,2,REDIS_CMD_INLINE
},
502 {"set",setCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
503 {"setnx",setnxCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
504 {"del",delCommand
,-2,REDIS_CMD_INLINE
},
505 {"exists",existsCommand
,2,REDIS_CMD_INLINE
},
506 {"incr",incrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
507 {"decr",decrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
508 {"mget",mgetCommand
,-2,REDIS_CMD_INLINE
},
509 {"rpush",rpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
510 {"lpush",lpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
511 {"rpop",rpopCommand
,2,REDIS_CMD_INLINE
},
512 {"lpop",lpopCommand
,2,REDIS_CMD_INLINE
},
513 {"llen",llenCommand
,2,REDIS_CMD_INLINE
},
514 {"lindex",lindexCommand
,3,REDIS_CMD_INLINE
},
515 {"lset",lsetCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
516 {"lrange",lrangeCommand
,4,REDIS_CMD_INLINE
},
517 {"ltrim",ltrimCommand
,4,REDIS_CMD_INLINE
},
518 {"lrem",lremCommand
,4,REDIS_CMD_BULK
},
519 {"rpoplpush",rpoplpushcommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
520 {"sadd",saddCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
521 {"srem",sremCommand
,3,REDIS_CMD_BULK
},
522 {"smove",smoveCommand
,4,REDIS_CMD_BULK
},
523 {"sismember",sismemberCommand
,3,REDIS_CMD_BULK
},
524 {"scard",scardCommand
,2,REDIS_CMD_INLINE
},
525 {"spop",spopCommand
,2,REDIS_CMD_INLINE
},
526 {"srandmember",srandmemberCommand
,2,REDIS_CMD_INLINE
},
527 {"sinter",sinterCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
528 {"sinterstore",sinterstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
529 {"sunion",sunionCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
530 {"sunionstore",sunionstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
531 {"sdiff",sdiffCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
532 {"sdiffstore",sdiffstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
533 {"smembers",sinterCommand
,2,REDIS_CMD_INLINE
},
534 {"zadd",zaddCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
535 {"zincrby",zincrbyCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
536 {"zrem",zremCommand
,3,REDIS_CMD_BULK
},
537 {"zremrangebyscore",zremrangebyscoreCommand
,4,REDIS_CMD_INLINE
},
538 {"zrange",zrangeCommand
,4,REDIS_CMD_INLINE
},
539 {"zrangebyscore",zrangebyscoreCommand
,-4,REDIS_CMD_INLINE
},
540 {"zrevrange",zrevrangeCommand
,4,REDIS_CMD_INLINE
},
541 {"zcard",zcardCommand
,2,REDIS_CMD_INLINE
},
542 {"zscore",zscoreCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
543 {"incrby",incrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
544 {"decrby",decrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
545 {"getset",getsetCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
546 {"mset",msetCommand
,-3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
547 {"msetnx",msetnxCommand
,-3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
548 {"randomkey",randomkeyCommand
,1,REDIS_CMD_INLINE
},
549 {"select",selectCommand
,2,REDIS_CMD_INLINE
},
550 {"move",moveCommand
,3,REDIS_CMD_INLINE
},
551 {"rename",renameCommand
,3,REDIS_CMD_INLINE
},
552 {"renamenx",renamenxCommand
,3,REDIS_CMD_INLINE
},
553 {"expire",expireCommand
,3,REDIS_CMD_INLINE
},
554 {"expireat",expireatCommand
,3,REDIS_CMD_INLINE
},
555 {"keys",keysCommand
,2,REDIS_CMD_INLINE
},
556 {"dbsize",dbsizeCommand
,1,REDIS_CMD_INLINE
},
557 {"auth",authCommand
,2,REDIS_CMD_INLINE
},
558 {"ping",pingCommand
,1,REDIS_CMD_INLINE
},
559 {"echo",echoCommand
,2,REDIS_CMD_BULK
},
560 {"save",saveCommand
,1,REDIS_CMD_INLINE
},
561 {"bgsave",bgsaveCommand
,1,REDIS_CMD_INLINE
},
562 {"bgrewriteaof",bgrewriteaofCommand
,1,REDIS_CMD_INLINE
},
563 {"shutdown",shutdownCommand
,1,REDIS_CMD_INLINE
},
564 {"lastsave",lastsaveCommand
,1,REDIS_CMD_INLINE
},
565 {"type",typeCommand
,2,REDIS_CMD_INLINE
},
566 {"sync",syncCommand
,1,REDIS_CMD_INLINE
},
567 {"flushdb",flushdbCommand
,1,REDIS_CMD_INLINE
},
568 {"flushall",flushallCommand
,1,REDIS_CMD_INLINE
},
569 {"sort",sortCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
570 {"info",infoCommand
,1,REDIS_CMD_INLINE
},
571 {"monitor",monitorCommand
,1,REDIS_CMD_INLINE
},
572 {"ttl",ttlCommand
,2,REDIS_CMD_INLINE
},
573 {"slaveof",slaveofCommand
,3,REDIS_CMD_INLINE
},
574 {"debug",debugCommand
,-2,REDIS_CMD_INLINE
},
578 /*============================ Utility functions ============================ */
580 /* Glob-style pattern matching. */
581 int stringmatchlen(const char *pattern
, int patternLen
,
582 const char *string
, int stringLen
, int nocase
)
587 while (pattern
[1] == '*') {
592 return 1; /* match */
594 if (stringmatchlen(pattern
+1, patternLen
-1,
595 string
, stringLen
, nocase
))
596 return 1; /* match */
600 return 0; /* no match */
604 return 0; /* no match */
614 not = pattern
[0] == '^';
621 if (pattern
[0] == '\\') {
624 if (pattern
[0] == string
[0])
626 } else if (pattern
[0] == ']') {
628 } else if (patternLen
== 0) {
632 } else if (pattern
[1] == '-' && patternLen
>= 3) {
633 int start
= pattern
[0];
634 int end
= pattern
[2];
642 start
= tolower(start
);
648 if (c
>= start
&& c
<= end
)
652 if (pattern
[0] == string
[0])
655 if (tolower((int)pattern
[0]) == tolower((int)string
[0]))
665 return 0; /* no match */
671 if (patternLen
>= 2) {
678 if (pattern
[0] != string
[0])
679 return 0; /* no match */
681 if (tolower((int)pattern
[0]) != tolower((int)string
[0]))
682 return 0; /* no match */
690 if (stringLen
== 0) {
691 while(*pattern
== '*') {
698 if (patternLen
== 0 && stringLen
== 0)
703 static void redisLog(int level
, const char *fmt
, ...) {
707 fp
= (server
.logfile
== NULL
) ? stdout
: fopen(server
.logfile
,"a");
711 if (level
>= server
.verbosity
) {
717 strftime(buf
,64,"%d %b %H:%M:%S",localtime(&now
));
718 fprintf(fp
,"%s %c ",buf
,c
[level
]);
719 vfprintf(fp
, fmt
, ap
);
725 if (server
.logfile
) fclose(fp
);
728 /*====================== Hash table type implementation ==================== */
730 /* This is an hash table type that uses the SDS dynamic strings libary as
731 * keys and radis objects as values (objects can hold SDS strings,
734 static void dictVanillaFree(void *privdata
, void *val
)
736 DICT_NOTUSED(privdata
);
740 static int sdsDictKeyCompare(void *privdata
, const void *key1
,
744 DICT_NOTUSED(privdata
);
746 l1
= sdslen((sds
)key1
);
747 l2
= sdslen((sds
)key2
);
748 if (l1
!= l2
) return 0;
749 return memcmp(key1
, key2
, l1
) == 0;
752 static void dictRedisObjectDestructor(void *privdata
, void *val
)
754 DICT_NOTUSED(privdata
);
759 static int dictObjKeyCompare(void *privdata
, const void *key1
,
762 const robj
*o1
= key1
, *o2
= key2
;
763 return sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
766 static unsigned int dictObjHash(const void *key
) {
768 return dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
771 static int dictEncObjKeyCompare(void *privdata
, const void *key1
,
774 robj
*o1
= (robj
*) key1
, *o2
= (robj
*) key2
;
777 o1
= getDecodedObject(o1
);
778 o2
= getDecodedObject(o2
);
779 cmp
= sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
785 static unsigned int dictEncObjHash(const void *key
) {
786 robj
*o
= (robj
*) key
;
788 o
= getDecodedObject(o
);
789 unsigned int hash
= dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
794 static dictType setDictType
= {
795 dictEncObjHash
, /* hash function */
798 dictEncObjKeyCompare
, /* key compare */
799 dictRedisObjectDestructor
, /* key destructor */
800 NULL
/* val destructor */
803 static dictType zsetDictType
= {
804 dictEncObjHash
, /* hash function */
807 dictEncObjKeyCompare
, /* key compare */
808 dictRedisObjectDestructor
, /* key destructor */
809 dictVanillaFree
/* val destructor of malloc(sizeof(double)) */
812 static dictType hashDictType
= {
813 dictObjHash
, /* hash function */
816 dictObjKeyCompare
, /* key compare */
817 dictRedisObjectDestructor
, /* key destructor */
818 dictRedisObjectDestructor
/* val destructor */
821 /* ========================= Random utility functions ======================= */
823 /* Redis generally does not try to recover from out of memory conditions
824 * when allocating objects or strings, it is not clear if it will be possible
825 * to report this condition to the client since the networking layer itself
826 * is based on heap allocation for send buffers, so we simply abort.
827 * At least the code will be simpler to read... */
828 static void oom(const char *msg
) {
829 redisLog(REDIS_WARNING
, "%s: Out of memory\n",msg
);
834 /* ====================== Redis server networking stuff ===================== */
835 static void closeTimedoutClients(void) {
838 time_t now
= time(NULL
);
840 listRewind(server
.clients
);
841 while ((ln
= listYield(server
.clients
)) != NULL
) {
842 c
= listNodeValue(ln
);
843 if (!(c
->flags
& REDIS_SLAVE
) && /* no timeout for slaves */
844 !(c
->flags
& REDIS_MASTER
) && /* no timeout for masters */
845 (now
- c
->lastinteraction
> server
.maxidletime
)) {
846 redisLog(REDIS_DEBUG
,"Closing idle client");
852 static int htNeedsResize(dict
*dict
) {
853 long long size
, used
;
855 size
= dictSlots(dict
);
856 used
= dictSize(dict
);
857 return (size
&& used
&& size
> DICT_HT_INITIAL_SIZE
&&
858 (used
*100/size
< REDIS_HT_MINFILL
));
861 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
862 * we resize the hash table to save memory */
863 static void tryResizeHashTables(void) {
866 for (j
= 0; j
< server
.dbnum
; j
++) {
867 if (htNeedsResize(server
.db
[j
].dict
)) {
868 redisLog(REDIS_DEBUG
,"The hash table %d is too sparse, resize it...",j
);
869 dictResize(server
.db
[j
].dict
);
870 redisLog(REDIS_DEBUG
,"Hash table %d resized.",j
);
872 if (htNeedsResize(server
.db
[j
].expires
))
873 dictResize(server
.db
[j
].expires
);
877 /* A background saving child (BGSAVE) terminated its work. Handle this. */
878 void backgroundSaveDoneHandler(int statloc
) {
879 int exitcode
= WEXITSTATUS(statloc
);
880 int bysignal
= WIFSIGNALED(statloc
);
882 if (!bysignal
&& exitcode
== 0) {
883 redisLog(REDIS_NOTICE
,
884 "Background saving terminated with success");
886 server
.lastsave
= time(NULL
);
887 } else if (!bysignal
&& exitcode
!= 0) {
888 redisLog(REDIS_WARNING
, "Background saving error");
890 redisLog(REDIS_WARNING
,
891 "Background saving terminated by signal");
892 rdbRemoveTempFile(server
.bgsavechildpid
);
894 server
.bgsavechildpid
= -1;
895 /* Possibly there are slaves waiting for a BGSAVE in order to be served
896 * (the first stage of SYNC is a bulk transfer of dump.rdb) */
897 updateSlavesWaitingBgsave(exitcode
== 0 ? REDIS_OK
: REDIS_ERR
);
900 /* A background append only file rewriting (BGREWRITEAOF) terminated its work.
902 void backgroundRewriteDoneHandler(int statloc
) {
903 int exitcode
= WEXITSTATUS(statloc
);
904 int bysignal
= WIFSIGNALED(statloc
);
906 if (!bysignal
&& exitcode
== 0) {
910 redisLog(REDIS_NOTICE
,
911 "Background append only file rewriting terminated with success");
912 /* Now it's time to flush the differences accumulated by the parent */
913 snprintf(tmpfile
,256,"temp-rewriteaof-bg-%d.aof", (int) server
.bgrewritechildpid
);
914 fd
= open(tmpfile
,O_WRONLY
|O_APPEND
);
916 redisLog(REDIS_WARNING
, "Not able to open the temp append only file produced by the child: %s", strerror(errno
));
919 /* Flush our data... */
920 if (write(fd
,server
.bgrewritebuf
,sdslen(server
.bgrewritebuf
)) !=
921 (signed) sdslen(server
.bgrewritebuf
)) {
922 redisLog(REDIS_WARNING
, "Error or short write trying to flush the parent diff of the append log file in the child temp file: %s", strerror(errno
));
926 redisLog(REDIS_NOTICE
,"Parent diff flushed into the new append log file with success (%lu bytes)",sdslen(server
.bgrewritebuf
));
927 /* Now our work is to rename the temp file into the stable file. And
928 * switch the file descriptor used by the server for append only. */
929 if (rename(tmpfile
,server
.appendfilename
) == -1) {
930 redisLog(REDIS_WARNING
,"Can't rename the temp append only file into the stable one: %s", strerror(errno
));
934 /* Mission completed... almost */
935 redisLog(REDIS_NOTICE
,"Append only file successfully rewritten.");
936 if (server
.appendfd
!= -1) {
937 /* If append only is actually enabled... */
938 close(server
.appendfd
);
939 server
.appendfd
= fd
;
941 server
.appendseldb
= -1; /* Make sure it will issue SELECT */
942 redisLog(REDIS_NOTICE
,"The new append only file was selected for future appends.");
944 /* If append only is disabled we just generate a dump in this
945 * format. Why not? */
948 } else if (!bysignal
&& exitcode
!= 0) {
949 redisLog(REDIS_WARNING
, "Background append only file rewriting error");
951 redisLog(REDIS_WARNING
,
952 "Background append only file rewriting terminated by signal");
955 sdsfree(server
.bgrewritebuf
);
956 server
.bgrewritebuf
= sdsempty();
957 aofRemoveTempFile(server
.bgrewritechildpid
);
958 server
.bgrewritechildpid
= -1;
961 static int serverCron(struct aeEventLoop
*eventLoop
, long long id
, void *clientData
) {
962 int j
, loops
= server
.cronloops
++;
963 REDIS_NOTUSED(eventLoop
);
965 REDIS_NOTUSED(clientData
);
967 /* Update the global state with the amount of used memory */
968 server
.usedmemory
= zmalloc_used_memory();
970 /* Show some info about non-empty databases */
971 for (j
= 0; j
< server
.dbnum
; j
++) {
972 long long size
, used
, vkeys
;
974 size
= dictSlots(server
.db
[j
].dict
);
975 used
= dictSize(server
.db
[j
].dict
);
976 vkeys
= dictSize(server
.db
[j
].expires
);
977 if (!(loops
% 5) && (used
|| vkeys
)) {
978 redisLog(REDIS_DEBUG
,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j
,used
,vkeys
,size
);
979 /* dictPrintStats(server.dict); */
983 /* We don't want to resize the hash tables while a bacground saving
984 * is in progress: the saving child is created using fork() that is
985 * implemented with a copy-on-write semantic in most modern systems, so
986 * if we resize the HT while there is the saving child at work actually
987 * a lot of memory movements in the parent will cause a lot of pages
989 if (server
.bgsavechildpid
== -1) tryResizeHashTables();
991 /* Show information about connected clients */
993 redisLog(REDIS_DEBUG
,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
994 listLength(server
.clients
)-listLength(server
.slaves
),
995 listLength(server
.slaves
),
997 dictSize(server
.sharingpool
));
1000 /* Close connections of timedout clients */
1001 if (server
.maxidletime
&& !(loops
% 10))
1002 closeTimedoutClients();
1004 /* Check if a background saving or AOF rewrite in progress terminated */
1005 if (server
.bgsavechildpid
!= -1 || server
.bgrewritechildpid
!= -1) {
1009 if ((pid
= wait3(&statloc
,WNOHANG
,NULL
)) != 0) {
1010 if (pid
== server
.bgsavechildpid
) {
1011 backgroundSaveDoneHandler(statloc
);
1013 backgroundRewriteDoneHandler(statloc
);
1017 /* If there is not a background saving in progress check if
1018 * we have to save now */
1019 time_t now
= time(NULL
);
1020 for (j
= 0; j
< server
.saveparamslen
; j
++) {
1021 struct saveparam
*sp
= server
.saveparams
+j
;
1023 if (server
.dirty
>= sp
->changes
&&
1024 now
-server
.lastsave
> sp
->seconds
) {
1025 redisLog(REDIS_NOTICE
,"%d changes in %d seconds. Saving...",
1026 sp
->changes
, sp
->seconds
);
1027 rdbSaveBackground(server
.dbfilename
);
1033 /* Try to expire a few timed out keys. The algorithm used is adaptive and
1034 * will use few CPU cycles if there are few expiring keys, otherwise
1035 * it will get more aggressive to avoid that too much memory is used by
1036 * keys that can be removed from the keyspace. */
1037 for (j
= 0; j
< server
.dbnum
; j
++) {
1039 redisDb
*db
= server
.db
+j
;
1041 /* Continue to expire if at the end of the cycle more than 25%
1042 * of the keys were expired. */
1044 int num
= dictSize(db
->expires
);
1045 time_t now
= time(NULL
);
1048 if (num
> REDIS_EXPIRELOOKUPS_PER_CRON
)
1049 num
= REDIS_EXPIRELOOKUPS_PER_CRON
;
1054 if ((de
= dictGetRandomKey(db
->expires
)) == NULL
) break;
1055 t
= (time_t) dictGetEntryVal(de
);
1057 deleteKey(db
,dictGetEntryKey(de
));
1061 } while (expired
> REDIS_EXPIRELOOKUPS_PER_CRON
/4);
1064 /* Check if we should connect to a MASTER */
1065 if (server
.replstate
== REDIS_REPL_CONNECT
) {
1066 redisLog(REDIS_NOTICE
,"Connecting to MASTER...");
1067 if (syncWithMaster() == REDIS_OK
) {
1068 redisLog(REDIS_NOTICE
,"MASTER <-> SLAVE sync succeeded");
1074 static void createSharedObjects(void) {
1075 shared
.crlf
= createObject(REDIS_STRING
,sdsnew("\r\n"));
1076 shared
.ok
= createObject(REDIS_STRING
,sdsnew("+OK\r\n"));
1077 shared
.err
= createObject(REDIS_STRING
,sdsnew("-ERR\r\n"));
1078 shared
.emptybulk
= createObject(REDIS_STRING
,sdsnew("$0\r\n\r\n"));
1079 shared
.czero
= createObject(REDIS_STRING
,sdsnew(":0\r\n"));
1080 shared
.cone
= createObject(REDIS_STRING
,sdsnew(":1\r\n"));
1081 shared
.nullbulk
= createObject(REDIS_STRING
,sdsnew("$-1\r\n"));
1082 shared
.nullmultibulk
= createObject(REDIS_STRING
,sdsnew("*-1\r\n"));
1083 shared
.emptymultibulk
= createObject(REDIS_STRING
,sdsnew("*0\r\n"));
1085 shared
.pong
= createObject(REDIS_STRING
,sdsnew("+PONG\r\n"));
1086 shared
.wrongtypeerr
= createObject(REDIS_STRING
,sdsnew(
1087 "-ERR Operation against a key holding the wrong kind of value\r\n"));
1088 shared
.nokeyerr
= createObject(REDIS_STRING
,sdsnew(
1089 "-ERR no such key\r\n"));
1090 shared
.syntaxerr
= createObject(REDIS_STRING
,sdsnew(
1091 "-ERR syntax error\r\n"));
1092 shared
.sameobjecterr
= createObject(REDIS_STRING
,sdsnew(
1093 "-ERR source and destination objects are the same\r\n"));
1094 shared
.outofrangeerr
= createObject(REDIS_STRING
,sdsnew(
1095 "-ERR index out of range\r\n"));
1096 shared
.space
= createObject(REDIS_STRING
,sdsnew(" "));
1097 shared
.colon
= createObject(REDIS_STRING
,sdsnew(":"));
1098 shared
.plus
= createObject(REDIS_STRING
,sdsnew("+"));
1099 shared
.select0
= createStringObject("select 0\r\n",10);
1100 shared
.select1
= createStringObject("select 1\r\n",10);
1101 shared
.select2
= createStringObject("select 2\r\n",10);
1102 shared
.select3
= createStringObject("select 3\r\n",10);
1103 shared
.select4
= createStringObject("select 4\r\n",10);
1104 shared
.select5
= createStringObject("select 5\r\n",10);
1105 shared
.select6
= createStringObject("select 6\r\n",10);
1106 shared
.select7
= createStringObject("select 7\r\n",10);
1107 shared
.select8
= createStringObject("select 8\r\n",10);
1108 shared
.select9
= createStringObject("select 9\r\n",10);
1111 static void appendServerSaveParams(time_t seconds
, int changes
) {
1112 server
.saveparams
= zrealloc(server
.saveparams
,sizeof(struct saveparam
)*(server
.saveparamslen
+1));
1113 server
.saveparams
[server
.saveparamslen
].seconds
= seconds
;
1114 server
.saveparams
[server
.saveparamslen
].changes
= changes
;
1115 server
.saveparamslen
++;
1118 static void resetServerSaveParams() {
1119 zfree(server
.saveparams
);
1120 server
.saveparams
= NULL
;
1121 server
.saveparamslen
= 0;
1124 static void initServerConfig() {
1125 server
.dbnum
= REDIS_DEFAULT_DBNUM
;
1126 server
.port
= REDIS_SERVERPORT
;
1127 server
.verbosity
= REDIS_DEBUG
;
1128 server
.maxidletime
= REDIS_MAXIDLETIME
;
1129 server
.saveparams
= NULL
;
1130 server
.logfile
= NULL
; /* NULL = log on standard output */
1131 server
.bindaddr
= NULL
;
1132 server
.glueoutputbuf
= 1;
1133 server
.daemonize
= 0;
1134 server
.appendonly
= 0;
1135 server
.appendfsync
= APPENDFSYNC_ALWAYS
;
1136 server
.lastfsync
= time(NULL
);
1137 server
.appendfd
= -1;
1138 server
.appendseldb
= -1; /* Make sure the first time will not match */
1139 server
.pidfile
= "/var/run/redis.pid";
1140 server
.dbfilename
= "dump.rdb";
1141 server
.appendfilename
= "appendonly.aof";
1142 server
.requirepass
= NULL
;
1143 server
.shareobjects
= 0;
1144 server
.sharingpoolsize
= 1024;
1145 server
.maxclients
= 0;
1146 server
.maxmemory
= 0;
1147 resetServerSaveParams();
1149 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1150 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1151 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1152 /* Replication related */
1154 server
.masterauth
= NULL
;
1155 server
.masterhost
= NULL
;
1156 server
.masterport
= 6379;
1157 server
.master
= NULL
;
1158 server
.replstate
= REDIS_REPL_NONE
;
1160 /* Double constants initialization */
1162 R_PosInf
= 1.0/R_Zero
;
1163 R_NegInf
= -1.0/R_Zero
;
1164 R_Nan
= R_Zero
/R_Zero
;
1167 static void initServer() {
1170 signal(SIGHUP
, SIG_IGN
);
1171 signal(SIGPIPE
, SIG_IGN
);
1172 setupSigSegvAction();
1174 server
.clients
= listCreate();
1175 server
.slaves
= listCreate();
1176 server
.monitors
= listCreate();
1177 server
.objfreelist
= listCreate();
1178 createSharedObjects();
1179 server
.el
= aeCreateEventLoop();
1180 server
.db
= zmalloc(sizeof(redisDb
)*server
.dbnum
);
1181 server
.sharingpool
= dictCreate(&setDictType
,NULL
);
1182 server
.fd
= anetTcpServer(server
.neterr
, server
.port
, server
.bindaddr
);
1183 if (server
.fd
== -1) {
1184 redisLog(REDIS_WARNING
, "Opening TCP port: %s", server
.neterr
);
1187 for (j
= 0; j
< server
.dbnum
; j
++) {
1188 server
.db
[j
].dict
= dictCreate(&hashDictType
,NULL
);
1189 server
.db
[j
].expires
= dictCreate(&setDictType
,NULL
);
1190 server
.db
[j
].id
= j
;
1192 server
.cronloops
= 0;
1193 server
.bgsavechildpid
= -1;
1194 server
.bgrewritechildpid
= -1;
1195 server
.bgrewritebuf
= sdsempty();
1196 server
.lastsave
= time(NULL
);
1198 server
.usedmemory
= 0;
1199 server
.stat_numcommands
= 0;
1200 server
.stat_numconnections
= 0;
1201 server
.stat_starttime
= time(NULL
);
1202 aeCreateTimeEvent(server
.el
, 1, serverCron
, NULL
, NULL
);
1204 if (server
.appendonly
) {
1205 server
.appendfd
= open(server
.appendfilename
,O_WRONLY
|O_APPEND
|O_CREAT
,0644);
1206 if (server
.appendfd
== -1) {
1207 redisLog(REDIS_WARNING
, "Can't open the append-only file: %s",
1214 /* Empty the whole database */
1215 static long long emptyDb() {
1217 long long removed
= 0;
1219 for (j
= 0; j
< server
.dbnum
; j
++) {
1220 removed
+= dictSize(server
.db
[j
].dict
);
1221 dictEmpty(server
.db
[j
].dict
);
1222 dictEmpty(server
.db
[j
].expires
);
1227 static int yesnotoi(char *s
) {
1228 if (!strcasecmp(s
,"yes")) return 1;
1229 else if (!strcasecmp(s
,"no")) return 0;
1233 /* I agree, this is a very rudimental way to load a configuration...
1234 will improve later if the config gets more complex */
1235 static void loadServerConfig(char *filename
) {
1237 char buf
[REDIS_CONFIGLINE_MAX
+1], *err
= NULL
;
1241 if (filename
[0] == '-' && filename
[1] == '\0')
1244 if ((fp
= fopen(filename
,"r")) == NULL
) {
1245 redisLog(REDIS_WARNING
,"Fatal error, can't open config file");
1250 while(fgets(buf
,REDIS_CONFIGLINE_MAX
+1,fp
) != NULL
) {
1256 line
= sdstrim(line
," \t\r\n");
1258 /* Skip comments and blank lines*/
1259 if (line
[0] == '#' || line
[0] == '\0') {
1264 /* Split into arguments */
1265 argv
= sdssplitlen(line
,sdslen(line
)," ",1,&argc
);
1266 sdstolower(argv
[0]);
1268 /* Execute config directives */
1269 if (!strcasecmp(argv
[0],"timeout") && argc
== 2) {
1270 server
.maxidletime
= atoi(argv
[1]);
1271 if (server
.maxidletime
< 0) {
1272 err
= "Invalid timeout value"; goto loaderr
;
1274 } else if (!strcasecmp(argv
[0],"port") && argc
== 2) {
1275 server
.port
= atoi(argv
[1]);
1276 if (server
.port
< 1 || server
.port
> 65535) {
1277 err
= "Invalid port"; goto loaderr
;
1279 } else if (!strcasecmp(argv
[0],"bind") && argc
== 2) {
1280 server
.bindaddr
= zstrdup(argv
[1]);
1281 } else if (!strcasecmp(argv
[0],"save") && argc
== 3) {
1282 int seconds
= atoi(argv
[1]);
1283 int changes
= atoi(argv
[2]);
1284 if (seconds
< 1 || changes
< 0) {
1285 err
= "Invalid save parameters"; goto loaderr
;
1287 appendServerSaveParams(seconds
,changes
);
1288 } else if (!strcasecmp(argv
[0],"dir") && argc
== 2) {
1289 if (chdir(argv
[1]) == -1) {
1290 redisLog(REDIS_WARNING
,"Can't chdir to '%s': %s",
1291 argv
[1], strerror(errno
));
1294 } else if (!strcasecmp(argv
[0],"loglevel") && argc
== 2) {
1295 if (!strcasecmp(argv
[1],"debug")) server
.verbosity
= REDIS_DEBUG
;
1296 else if (!strcasecmp(argv
[1],"notice")) server
.verbosity
= REDIS_NOTICE
;
1297 else if (!strcasecmp(argv
[1],"warning")) server
.verbosity
= REDIS_WARNING
;
1299 err
= "Invalid log level. Must be one of debug, notice, warning";
1302 } else if (!strcasecmp(argv
[0],"logfile") && argc
== 2) {
1305 server
.logfile
= zstrdup(argv
[1]);
1306 if (!strcasecmp(server
.logfile
,"stdout")) {
1307 zfree(server
.logfile
);
1308 server
.logfile
= NULL
;
1310 if (server
.logfile
) {
1311 /* Test if we are able to open the file. The server will not
1312 * be able to abort just for this problem later... */
1313 logfp
= fopen(server
.logfile
,"a");
1314 if (logfp
== NULL
) {
1315 err
= sdscatprintf(sdsempty(),
1316 "Can't open the log file: %s", strerror(errno
));
1321 } else if (!strcasecmp(argv
[0],"databases") && argc
== 2) {
1322 server
.dbnum
= atoi(argv
[1]);
1323 if (server
.dbnum
< 1) {
1324 err
= "Invalid number of databases"; goto loaderr
;
1326 } else if (!strcasecmp(argv
[0],"maxclients") && argc
== 2) {
1327 server
.maxclients
= atoi(argv
[1]);
1328 } else if (!strcasecmp(argv
[0],"maxmemory") && argc
== 2) {
1329 server
.maxmemory
= strtoll(argv
[1], NULL
, 10);
1330 } else if (!strcasecmp(argv
[0],"slaveof") && argc
== 3) {
1331 server
.masterhost
= sdsnew(argv
[1]);
1332 server
.masterport
= atoi(argv
[2]);
1333 server
.replstate
= REDIS_REPL_CONNECT
;
1334 } else if (!strcasecmp(argv
[0],"masterauth") && argc
== 2) {
1335 server
.masterauth
= zstrdup(argv
[1]);
1336 } else if (!strcasecmp(argv
[0],"glueoutputbuf") && argc
== 2) {
1337 if ((server
.glueoutputbuf
= yesnotoi(argv
[1])) == -1) {
1338 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1340 } else if (!strcasecmp(argv
[0],"shareobjects") && argc
== 2) {
1341 if ((server
.shareobjects
= yesnotoi(argv
[1])) == -1) {
1342 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1344 } else if (!strcasecmp(argv
[0],"shareobjectspoolsize") && argc
== 2) {
1345 server
.sharingpoolsize
= atoi(argv
[1]);
1346 if (server
.sharingpoolsize
< 1) {
1347 err
= "invalid object sharing pool size"; goto loaderr
;
1349 } else if (!strcasecmp(argv
[0],"daemonize") && argc
== 2) {
1350 if ((server
.daemonize
= yesnotoi(argv
[1])) == -1) {
1351 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1353 } else if (!strcasecmp(argv
[0],"appendonly") && argc
== 2) {
1354 if ((server
.appendonly
= yesnotoi(argv
[1])) == -1) {
1355 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1357 } else if (!strcasecmp(argv
[0],"appendfsync") && argc
== 2) {
1358 if (!strcasecmp(argv
[1],"no")) {
1359 server
.appendfsync
= APPENDFSYNC_NO
;
1360 } else if (!strcasecmp(argv
[1],"always")) {
1361 server
.appendfsync
= APPENDFSYNC_ALWAYS
;
1362 } else if (!strcasecmp(argv
[1],"everysec")) {
1363 server
.appendfsync
= APPENDFSYNC_EVERYSEC
;
1365 err
= "argument must be 'no', 'always' or 'everysec'";
1368 } else if (!strcasecmp(argv
[0],"requirepass") && argc
== 2) {
1369 server
.requirepass
= zstrdup(argv
[1]);
1370 } else if (!strcasecmp(argv
[0],"pidfile") && argc
== 2) {
1371 server
.pidfile
= zstrdup(argv
[1]);
1372 } else if (!strcasecmp(argv
[0],"dbfilename") && argc
== 2) {
1373 server
.dbfilename
= zstrdup(argv
[1]);
1375 err
= "Bad directive or wrong number of arguments"; goto loaderr
;
1377 for (j
= 0; j
< argc
; j
++)
1382 if (fp
!= stdin
) fclose(fp
);
1386 fprintf(stderr
, "\n*** FATAL CONFIG FILE ERROR ***\n");
1387 fprintf(stderr
, "Reading the configuration file, at line %d\n", linenum
);
1388 fprintf(stderr
, ">>> '%s'\n", line
);
1389 fprintf(stderr
, "%s\n", err
);
1393 static void freeClientArgv(redisClient
*c
) {
1396 for (j
= 0; j
< c
->argc
; j
++)
1397 decrRefCount(c
->argv
[j
]);
1398 for (j
= 0; j
< c
->mbargc
; j
++)
1399 decrRefCount(c
->mbargv
[j
]);
1404 static void freeClient(redisClient
*c
) {
1407 aeDeleteFileEvent(server
.el
,c
->fd
,AE_READABLE
);
1408 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1409 sdsfree(c
->querybuf
);
1410 listRelease(c
->reply
);
1413 ln
= listSearchKey(server
.clients
,c
);
1414 redisAssert(ln
!= NULL
);
1415 listDelNode(server
.clients
,ln
);
1416 if (c
->flags
& REDIS_SLAVE
) {
1417 if (c
->replstate
== REDIS_REPL_SEND_BULK
&& c
->repldbfd
!= -1)
1419 list
*l
= (c
->flags
& REDIS_MONITOR
) ? server
.monitors
: server
.slaves
;
1420 ln
= listSearchKey(l
,c
);
1421 redisAssert(ln
!= NULL
);
1424 if (c
->flags
& REDIS_MASTER
) {
1425 server
.master
= NULL
;
1426 server
.replstate
= REDIS_REPL_CONNECT
;
1433 #define GLUEREPLY_UP_TO (1024)
1434 static void glueReplyBuffersIfNeeded(redisClient
*c
) {
1436 char buf
[GLUEREPLY_UP_TO
];
1440 listRewind(c
->reply
);
1441 while((ln
= listYield(c
->reply
))) {
1445 objlen
= sdslen(o
->ptr
);
1446 if (copylen
+ objlen
<= GLUEREPLY_UP_TO
) {
1447 memcpy(buf
+copylen
,o
->ptr
,objlen
);
1449 listDelNode(c
->reply
,ln
);
1451 if (copylen
== 0) return;
1455 /* Now the output buffer is empty, add the new single element */
1456 o
= createObject(REDIS_STRING
,sdsnewlen(buf
,copylen
));
1457 listAddNodeHead(c
->reply
,o
);
1460 static void sendReplyToClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1461 redisClient
*c
= privdata
;
1462 int nwritten
= 0, totwritten
= 0, objlen
;
1465 REDIS_NOTUSED(mask
);
1467 /* Use writev() if we have enough buffers to send */
1468 if (!server
.glueoutputbuf
&&
1469 listLength(c
->reply
) > REDIS_WRITEV_THRESHOLD
&&
1470 !(c
->flags
& REDIS_MASTER
))
1472 sendReplyToClientWritev(el
, fd
, privdata
, mask
);
1476 while(listLength(c
->reply
)) {
1477 if (server
.glueoutputbuf
&& listLength(c
->reply
) > 1)
1478 glueReplyBuffersIfNeeded(c
);
1480 o
= listNodeValue(listFirst(c
->reply
));
1481 objlen
= sdslen(o
->ptr
);
1484 listDelNode(c
->reply
,listFirst(c
->reply
));
1488 if (c
->flags
& REDIS_MASTER
) {
1489 /* Don't reply to a master */
1490 nwritten
= objlen
- c
->sentlen
;
1492 nwritten
= write(fd
, ((char*)o
->ptr
)+c
->sentlen
, objlen
- c
->sentlen
);
1493 if (nwritten
<= 0) break;
1495 c
->sentlen
+= nwritten
;
1496 totwritten
+= nwritten
;
1497 /* If we fully sent the object on head go to the next one */
1498 if (c
->sentlen
== objlen
) {
1499 listDelNode(c
->reply
,listFirst(c
->reply
));
1502 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1503 * bytes, in a single threaded server it's a good idea to serve
1504 * other clients as well, even if a very large request comes from
1505 * super fast link that is always able to accept data (in real world
1506 * scenario think about 'KEYS *' against the loopback interfae) */
1507 if (totwritten
> REDIS_MAX_WRITE_PER_EVENT
) break;
1509 if (nwritten
== -1) {
1510 if (errno
== EAGAIN
) {
1513 redisLog(REDIS_DEBUG
,
1514 "Error writing to client: %s", strerror(errno
));
1519 if (totwritten
> 0) c
->lastinteraction
= time(NULL
);
1520 if (listLength(c
->reply
) == 0) {
1522 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1526 static void sendReplyToClientWritev(aeEventLoop
*el
, int fd
, void *privdata
, int mask
)
1528 redisClient
*c
= privdata
;
1529 int nwritten
= 0, totwritten
= 0, objlen
, willwrite
;
1531 struct iovec iov
[REDIS_WRITEV_IOVEC_COUNT
];
1532 int offset
, ion
= 0;
1534 REDIS_NOTUSED(mask
);
1537 while (listLength(c
->reply
)) {
1538 offset
= c
->sentlen
;
1542 /* fill-in the iov[] array */
1543 for(node
= listFirst(c
->reply
); node
; node
= listNextNode(node
)) {
1544 o
= listNodeValue(node
);
1545 objlen
= sdslen(o
->ptr
);
1547 if (totwritten
+ objlen
- offset
> REDIS_MAX_WRITE_PER_EVENT
)
1550 if(ion
== REDIS_WRITEV_IOVEC_COUNT
)
1551 break; /* no more iovecs */
1553 iov
[ion
].iov_base
= ((char*)o
->ptr
) + offset
;
1554 iov
[ion
].iov_len
= objlen
- offset
;
1555 willwrite
+= objlen
- offset
;
1556 offset
= 0; /* just for the first item */
1563 /* write all collected blocks at once */
1564 if((nwritten
= writev(fd
, iov
, ion
)) < 0) {
1565 if (errno
!= EAGAIN
) {
1566 redisLog(REDIS_DEBUG
,
1567 "Error writing to client: %s", strerror(errno
));
1574 totwritten
+= nwritten
;
1575 offset
= c
->sentlen
;
1577 /* remove written robjs from c->reply */
1578 while (nwritten
&& listLength(c
->reply
)) {
1579 o
= listNodeValue(listFirst(c
->reply
));
1580 objlen
= sdslen(o
->ptr
);
1582 if(nwritten
>= objlen
- offset
) {
1583 listDelNode(c
->reply
, listFirst(c
->reply
));
1584 nwritten
-= objlen
- offset
;
1588 c
->sentlen
+= nwritten
;
1596 c
->lastinteraction
= time(NULL
);
1598 if (listLength(c
->reply
) == 0) {
1600 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1604 static struct redisCommand
*lookupCommand(char *name
) {
1606 while(cmdTable
[j
].name
!= NULL
) {
1607 if (!strcasecmp(name
,cmdTable
[j
].name
)) return &cmdTable
[j
];
1613 /* resetClient prepare the client to process the next command */
1614 static void resetClient(redisClient
*c
) {
1620 /* If this function gets called we already read a whole
1621 * command, argments are in the client argv/argc fields.
1622 * processCommand() execute the command or prepare the
1623 * server for a bulk read from the client.
1625 * If 1 is returned the client is still alive and valid and
1626 * and other operations can be performed by the caller. Otherwise
1627 * if 0 is returned the client was destroied (i.e. after QUIT). */
1628 static int processCommand(redisClient
*c
) {
1629 struct redisCommand
*cmd
;
1632 /* Free some memory if needed (maxmemory setting) */
1633 if (server
.maxmemory
) freeMemoryIfNeeded();
1635 /* Handle the multi bulk command type. This is an alternative protocol
1636 * supported by Redis in order to receive commands that are composed of
1637 * multiple binary-safe "bulk" arguments. The latency of processing is
1638 * a bit higher but this allows things like multi-sets, so if this
1639 * protocol is used only for MSET and similar commands this is a big win. */
1640 if (c
->multibulk
== 0 && c
->argc
== 1 && ((char*)(c
->argv
[0]->ptr
))[0] == '*') {
1641 c
->multibulk
= atoi(((char*)c
->argv
[0]->ptr
)+1);
1642 if (c
->multibulk
<= 0) {
1646 decrRefCount(c
->argv
[c
->argc
-1]);
1650 } else if (c
->multibulk
) {
1651 if (c
->bulklen
== -1) {
1652 if (((char*)c
->argv
[0]->ptr
)[0] != '$') {
1653 addReplySds(c
,sdsnew("-ERR multi bulk protocol error\r\n"));
1657 int bulklen
= atoi(((char*)c
->argv
[0]->ptr
)+1);
1658 decrRefCount(c
->argv
[0]);
1659 if (bulklen
< 0 || bulklen
> 1024*1024*1024) {
1661 addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n"));
1666 c
->bulklen
= bulklen
+2; /* add two bytes for CR+LF */
1670 c
->mbargv
= zrealloc(c
->mbargv
,(sizeof(robj
*))*(c
->mbargc
+1));
1671 c
->mbargv
[c
->mbargc
] = c
->argv
[0];
1675 if (c
->multibulk
== 0) {
1679 /* Here we need to swap the multi-bulk argc/argv with the
1680 * normal argc/argv of the client structure. */
1682 c
->argv
= c
->mbargv
;
1683 c
->mbargv
= auxargv
;
1686 c
->argc
= c
->mbargc
;
1687 c
->mbargc
= auxargc
;
1689 /* We need to set bulklen to something different than -1
1690 * in order for the code below to process the command without
1691 * to try to read the last argument of a bulk command as
1692 * a special argument. */
1694 /* continue below and process the command */
1701 /* -- end of multi bulk commands processing -- */
1703 /* The QUIT command is handled as a special case. Normal command
1704 * procs are unable to close the client connection safely */
1705 if (!strcasecmp(c
->argv
[0]->ptr
,"quit")) {
1709 cmd
= lookupCommand(c
->argv
[0]->ptr
);
1711 addReplySds(c
,sdsnew("-ERR unknown command\r\n"));
1714 } else if ((cmd
->arity
> 0 && cmd
->arity
!= c
->argc
) ||
1715 (c
->argc
< -cmd
->arity
)) {
1717 sdscatprintf(sdsempty(),
1718 "-ERR wrong number of arguments for '%s' command\r\n",
1722 } else if (server
.maxmemory
&& cmd
->flags
& REDIS_CMD_DENYOOM
&& zmalloc_used_memory() > server
.maxmemory
) {
1723 addReplySds(c
,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1726 } else if (cmd
->flags
& REDIS_CMD_BULK
&& c
->bulklen
== -1) {
1727 int bulklen
= atoi(c
->argv
[c
->argc
-1]->ptr
);
1729 decrRefCount(c
->argv
[c
->argc
-1]);
1730 if (bulklen
< 0 || bulklen
> 1024*1024*1024) {
1732 addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n"));
1737 c
->bulklen
= bulklen
+2; /* add two bytes for CR+LF */
1738 /* It is possible that the bulk read is already in the
1739 * buffer. Check this condition and handle it accordingly.
1740 * This is just a fast path, alternative to call processInputBuffer().
1741 * It's a good idea since the code is small and this condition
1742 * happens most of the times. */
1743 if ((signed)sdslen(c
->querybuf
) >= c
->bulklen
) {
1744 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
1746 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
1751 /* Let's try to share objects on the command arguments vector */
1752 if (server
.shareobjects
) {
1754 for(j
= 1; j
< c
->argc
; j
++)
1755 c
->argv
[j
] = tryObjectSharing(c
->argv
[j
]);
1757 /* Let's try to encode the bulk object to save space. */
1758 if (cmd
->flags
& REDIS_CMD_BULK
)
1759 tryObjectEncoding(c
->argv
[c
->argc
-1]);
1761 /* Check if the user is authenticated */
1762 if (server
.requirepass
&& !c
->authenticated
&& cmd
->proc
!= authCommand
) {
1763 addReplySds(c
,sdsnew("-ERR operation not permitted\r\n"));
1768 /* Exec the command */
1769 dirty
= server
.dirty
;
1771 if (server
.appendonly
&& server
.dirty
-dirty
)
1772 feedAppendOnlyFile(cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1773 if (server
.dirty
-dirty
&& listLength(server
.slaves
))
1774 replicationFeedSlaves(server
.slaves
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1775 if (listLength(server
.monitors
))
1776 replicationFeedSlaves(server
.monitors
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1777 server
.stat_numcommands
++;
1779 /* Prepare the client for the next command */
1780 if (c
->flags
& REDIS_CLOSE
) {
1788 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
) {
1792 /* (args*2)+1 is enough room for args, spaces, newlines */
1793 robj
*static_outv
[REDIS_STATIC_ARGS
*2+1];
1795 if (argc
<= REDIS_STATIC_ARGS
) {
1798 outv
= zmalloc(sizeof(robj
*)*(argc
*2+1));
1801 for (j
= 0; j
< argc
; j
++) {
1802 if (j
!= 0) outv
[outc
++] = shared
.space
;
1803 if ((cmd
->flags
& REDIS_CMD_BULK
) && j
== argc
-1) {
1806 lenobj
= createObject(REDIS_STRING
,
1807 sdscatprintf(sdsempty(),"%lu\r\n",
1808 (unsigned long) stringObjectLen(argv
[j
])));
1809 lenobj
->refcount
= 0;
1810 outv
[outc
++] = lenobj
;
1812 outv
[outc
++] = argv
[j
];
1814 outv
[outc
++] = shared
.crlf
;
1816 /* Increment all the refcounts at start and decrement at end in order to
1817 * be sure to free objects if there is no slave in a replication state
1818 * able to be feed with commands */
1819 for (j
= 0; j
< outc
; j
++) incrRefCount(outv
[j
]);
1821 while((ln
= listYield(slaves
))) {
1822 redisClient
*slave
= ln
->value
;
1824 /* Don't feed slaves that are still waiting for BGSAVE to start */
1825 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) continue;
1827 /* Feed all the other slaves, MONITORs and so on */
1828 if (slave
->slaveseldb
!= dictid
) {
1832 case 0: selectcmd
= shared
.select0
; break;
1833 case 1: selectcmd
= shared
.select1
; break;
1834 case 2: selectcmd
= shared
.select2
; break;
1835 case 3: selectcmd
= shared
.select3
; break;
1836 case 4: selectcmd
= shared
.select4
; break;
1837 case 5: selectcmd
= shared
.select5
; break;
1838 case 6: selectcmd
= shared
.select6
; break;
1839 case 7: selectcmd
= shared
.select7
; break;
1840 case 8: selectcmd
= shared
.select8
; break;
1841 case 9: selectcmd
= shared
.select9
; break;
1843 selectcmd
= createObject(REDIS_STRING
,
1844 sdscatprintf(sdsempty(),"select %d\r\n",dictid
));
1845 selectcmd
->refcount
= 0;
1848 addReply(slave
,selectcmd
);
1849 slave
->slaveseldb
= dictid
;
1851 for (j
= 0; j
< outc
; j
++) addReply(slave
,outv
[j
]);
1853 for (j
= 0; j
< outc
; j
++) decrRefCount(outv
[j
]);
1854 if (outv
!= static_outv
) zfree(outv
);
1857 static void processInputBuffer(redisClient
*c
) {
1859 if (c
->bulklen
== -1) {
1860 /* Read the first line of the query */
1861 char *p
= strchr(c
->querybuf
,'\n');
1868 query
= c
->querybuf
;
1869 c
->querybuf
= sdsempty();
1870 querylen
= 1+(p
-(query
));
1871 if (sdslen(query
) > querylen
) {
1872 /* leave data after the first line of the query in the buffer */
1873 c
->querybuf
= sdscatlen(c
->querybuf
,query
+querylen
,sdslen(query
)-querylen
);
1875 *p
= '\0'; /* remove "\n" */
1876 if (*(p
-1) == '\r') *(p
-1) = '\0'; /* and "\r" if any */
1877 sdsupdatelen(query
);
1879 /* Now we can split the query in arguments */
1880 if (sdslen(query
) == 0) {
1881 /* Ignore empty query */
1885 argv
= sdssplitlen(query
,sdslen(query
)," ",1,&argc
);
1888 if (c
->argv
) zfree(c
->argv
);
1889 c
->argv
= zmalloc(sizeof(robj
*)*argc
);
1891 for (j
= 0; j
< argc
; j
++) {
1892 if (sdslen(argv
[j
])) {
1893 c
->argv
[c
->argc
] = createObject(REDIS_STRING
,argv
[j
]);
1900 /* Execute the command. If the client is still valid
1901 * after processCommand() return and there is something
1902 * on the query buffer try to process the next command. */
1903 if (c
->argc
&& processCommand(c
) && sdslen(c
->querybuf
)) goto again
;
1905 } else if (sdslen(c
->querybuf
) >= REDIS_REQUEST_MAX_SIZE
) {
1906 redisLog(REDIS_DEBUG
, "Client protocol error");
1911 /* Bulk read handling. Note that if we are at this point
1912 the client already sent a command terminated with a newline,
1913 we are reading the bulk data that is actually the last
1914 argument of the command. */
1915 int qbl
= sdslen(c
->querybuf
);
1917 if (c
->bulklen
<= qbl
) {
1918 /* Copy everything but the final CRLF as final argument */
1919 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
1921 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
1922 /* Process the command. If the client is still valid after
1923 * the processing and there is more data in the buffer
1924 * try to parse it. */
1925 if (processCommand(c
) && sdslen(c
->querybuf
)) goto again
;
1931 static void readQueryFromClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1932 redisClient
*c
= (redisClient
*) privdata
;
1933 char buf
[REDIS_IOBUF_LEN
];
1936 REDIS_NOTUSED(mask
);
1938 nread
= read(fd
, buf
, REDIS_IOBUF_LEN
);
1940 if (errno
== EAGAIN
) {
1943 redisLog(REDIS_DEBUG
, "Reading from client: %s",strerror(errno
));
1947 } else if (nread
== 0) {
1948 redisLog(REDIS_DEBUG
, "Client closed connection");
1953 c
->querybuf
= sdscatlen(c
->querybuf
, buf
, nread
);
1954 c
->lastinteraction
= time(NULL
);
1958 processInputBuffer(c
);
1961 static int selectDb(redisClient
*c
, int id
) {
1962 if (id
< 0 || id
>= server
.dbnum
)
1964 c
->db
= &server
.db
[id
];
1968 static void *dupClientReplyValue(void *o
) {
1969 incrRefCount((robj
*)o
);
1973 static redisClient
*createClient(int fd
) {
1974 redisClient
*c
= zmalloc(sizeof(*c
));
1976 anetNonBlock(NULL
,fd
);
1977 anetTcpNoDelay(NULL
,fd
);
1978 if (!c
) return NULL
;
1981 c
->querybuf
= sdsempty();
1990 c
->lastinteraction
= time(NULL
);
1991 c
->authenticated
= 0;
1992 c
->replstate
= REDIS_REPL_NONE
;
1993 c
->reply
= listCreate();
1994 listSetFreeMethod(c
->reply
,decrRefCount
);
1995 listSetDupMethod(c
->reply
,dupClientReplyValue
);
1996 if (aeCreateFileEvent(server
.el
, c
->fd
, AE_READABLE
,
1997 readQueryFromClient
, c
) == AE_ERR
) {
2001 listAddNodeTail(server
.clients
,c
);
2005 static void addReply(redisClient
*c
, robj
*obj
) {
2006 if (listLength(c
->reply
) == 0 &&
2007 (c
->replstate
== REDIS_REPL_NONE
||
2008 c
->replstate
== REDIS_REPL_ONLINE
) &&
2009 aeCreateFileEvent(server
.el
, c
->fd
, AE_WRITABLE
,
2010 sendReplyToClient
, c
) == AE_ERR
) return;
2011 listAddNodeTail(c
->reply
,getDecodedObject(obj
));
2014 static void addReplySds(redisClient
*c
, sds s
) {
2015 robj
*o
= createObject(REDIS_STRING
,s
);
2020 static void addReplyDouble(redisClient
*c
, double d
) {
2023 snprintf(buf
,sizeof(buf
),"%.17g",d
);
2024 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n%s\r\n",
2025 (unsigned long) strlen(buf
),buf
));
2028 static void addReplyBulkLen(redisClient
*c
, robj
*obj
) {
2031 if (obj
->encoding
== REDIS_ENCODING_RAW
) {
2032 len
= sdslen(obj
->ptr
);
2034 long n
= (long)obj
->ptr
;
2041 while((n
= n
/10) != 0) {
2045 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n",(unsigned long)len
));
2048 static void acceptHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
2053 REDIS_NOTUSED(mask
);
2054 REDIS_NOTUSED(privdata
);
2056 cfd
= anetAccept(server
.neterr
, fd
, cip
, &cport
);
2057 if (cfd
== AE_ERR
) {
2058 redisLog(REDIS_DEBUG
,"Accepting client connection: %s", server
.neterr
);
2061 redisLog(REDIS_DEBUG
,"Accepted %s:%d", cip
, cport
);
2062 if ((c
= createClient(cfd
)) == NULL
) {
2063 redisLog(REDIS_WARNING
,"Error allocating resoures for the client");
2064 close(cfd
); /* May be already closed, just ingore errors */
2067 /* If maxclient directive is set and this is one client more... close the
2068 * connection. Note that we create the client instead to check before
2069 * for this condition, since now the socket is already set in nonblocking
2070 * mode and we can send an error for free using the Kernel I/O */
2071 if (server
.maxclients
&& listLength(server
.clients
) > server
.maxclients
) {
2072 char *err
= "-ERR max number of clients reached\r\n";
2074 /* That's a best effort error message, don't check write errors */
2075 if (write(c
->fd
,err
,strlen(err
)) == -1) {
2076 /* Nothing to do, Just to avoid the warning... */
2081 server
.stat_numconnections
++;
2084 /* ======================= Redis objects implementation ===================== */
2086 static robj
*createObject(int type
, void *ptr
) {
2089 if (listLength(server
.objfreelist
)) {
2090 listNode
*head
= listFirst(server
.objfreelist
);
2091 o
= listNodeValue(head
);
2092 listDelNode(server
.objfreelist
,head
);
2094 o
= zmalloc(sizeof(*o
));
2097 o
->encoding
= REDIS_ENCODING_RAW
;
2103 static robj
*createStringObject(char *ptr
, size_t len
) {
2104 return createObject(REDIS_STRING
,sdsnewlen(ptr
,len
));
2107 static robj
*createListObject(void) {
2108 list
*l
= listCreate();
2110 listSetFreeMethod(l
,decrRefCount
);
2111 return createObject(REDIS_LIST
,l
);
2114 static robj
*createSetObject(void) {
2115 dict
*d
= dictCreate(&setDictType
,NULL
);
2116 return createObject(REDIS_SET
,d
);
2119 static robj
*createZsetObject(void) {
2120 zset
*zs
= zmalloc(sizeof(*zs
));
2122 zs
->dict
= dictCreate(&zsetDictType
,NULL
);
2123 zs
->zsl
= zslCreate();
2124 return createObject(REDIS_ZSET
,zs
);
2127 static void freeStringObject(robj
*o
) {
2128 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2133 static void freeListObject(robj
*o
) {
2134 listRelease((list
*) o
->ptr
);
2137 static void freeSetObject(robj
*o
) {
2138 dictRelease((dict
*) o
->ptr
);
2141 static void freeZsetObject(robj
*o
) {
2144 dictRelease(zs
->dict
);
2149 static void freeHashObject(robj
*o
) {
2150 dictRelease((dict
*) o
->ptr
);
2153 static void incrRefCount(robj
*o
) {
2155 #ifdef DEBUG_REFCOUNT
2156 if (o
->type
== REDIS_STRING
)
2157 printf("Increment '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
);
2161 static void decrRefCount(void *obj
) {
2164 #ifdef DEBUG_REFCOUNT
2165 if (o
->type
== REDIS_STRING
)
2166 printf("Decrement '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
-1);
2168 if (--(o
->refcount
) == 0) {
2170 case REDIS_STRING
: freeStringObject(o
); break;
2171 case REDIS_LIST
: freeListObject(o
); break;
2172 case REDIS_SET
: freeSetObject(o
); break;
2173 case REDIS_ZSET
: freeZsetObject(o
); break;
2174 case REDIS_HASH
: freeHashObject(o
); break;
2175 default: redisAssert(0 != 0); break;
2177 if (listLength(server
.objfreelist
) > REDIS_OBJFREELIST_MAX
||
2178 !listAddNodeHead(server
.objfreelist
,o
))
2183 static robj
*lookupKey(redisDb
*db
, robj
*key
) {
2184 dictEntry
*de
= dictFind(db
->dict
,key
);
2185 return de
? dictGetEntryVal(de
) : NULL
;
2188 static robj
*lookupKeyRead(redisDb
*db
, robj
*key
) {
2189 expireIfNeeded(db
,key
);
2190 return lookupKey(db
,key
);
2193 static robj
*lookupKeyWrite(redisDb
*db
, robj
*key
) {
2194 deleteIfVolatile(db
,key
);
2195 return lookupKey(db
,key
);
2198 static int deleteKey(redisDb
*db
, robj
*key
) {
2201 /* We need to protect key from destruction: after the first dictDelete()
2202 * it may happen that 'key' is no longer valid if we don't increment
2203 * it's count. This may happen when we get the object reference directly
2204 * from the hash table with dictRandomKey() or dict iterators */
2206 if (dictSize(db
->expires
)) dictDelete(db
->expires
,key
);
2207 retval
= dictDelete(db
->dict
,key
);
2210 return retval
== DICT_OK
;
2213 /* Try to share an object against the shared objects pool */
2214 static robj
*tryObjectSharing(robj
*o
) {
2215 struct dictEntry
*de
;
2218 if (o
== NULL
|| server
.shareobjects
== 0) return o
;
2220 redisAssert(o
->type
== REDIS_STRING
);
2221 de
= dictFind(server
.sharingpool
,o
);
2223 robj
*shared
= dictGetEntryKey(de
);
2225 c
= ((unsigned long) dictGetEntryVal(de
))+1;
2226 dictGetEntryVal(de
) = (void*) c
;
2227 incrRefCount(shared
);
2231 /* Here we are using a stream algorihtm: Every time an object is
2232 * shared we increment its count, everytime there is a miss we
2233 * recrement the counter of a random object. If this object reaches
2234 * zero we remove the object and put the current object instead. */
2235 if (dictSize(server
.sharingpool
) >=
2236 server
.sharingpoolsize
) {
2237 de
= dictGetRandomKey(server
.sharingpool
);
2238 redisAssert(de
!= NULL
);
2239 c
= ((unsigned long) dictGetEntryVal(de
))-1;
2240 dictGetEntryVal(de
) = (void*) c
;
2242 dictDelete(server
.sharingpool
,de
->key
);
2245 c
= 0; /* If the pool is empty we want to add this object */
2250 retval
= dictAdd(server
.sharingpool
,o
,(void*)1);
2251 redisAssert(retval
== DICT_OK
);
2258 /* Check if the nul-terminated string 's' can be represented by a long
2259 * (that is, is a number that fits into long without any other space or
2260 * character before or after the digits).
2262 * If so, the function returns REDIS_OK and *longval is set to the value
2263 * of the number. Otherwise REDIS_ERR is returned */
2264 static int isStringRepresentableAsLong(sds s
, long *longval
) {
2265 char buf
[32], *endptr
;
2269 value
= strtol(s
, &endptr
, 10);
2270 if (endptr
[0] != '\0') return REDIS_ERR
;
2271 slen
= snprintf(buf
,32,"%ld",value
);
2273 /* If the number converted back into a string is not identical
2274 * then it's not possible to encode the string as integer */
2275 if (sdslen(s
) != (unsigned)slen
|| memcmp(buf
,s
,slen
)) return REDIS_ERR
;
2276 if (longval
) *longval
= value
;
2280 /* Try to encode a string object in order to save space */
2281 static int tryObjectEncoding(robj
*o
) {
2285 if (o
->encoding
!= REDIS_ENCODING_RAW
)
2286 return REDIS_ERR
; /* Already encoded */
2288 /* It's not save to encode shared objects: shared objects can be shared
2289 * everywhere in the "object space" of Redis. Encoded objects can only
2290 * appear as "values" (and not, for instance, as keys) */
2291 if (o
->refcount
> 1) return REDIS_ERR
;
2293 /* Currently we try to encode only strings */
2294 redisAssert(o
->type
== REDIS_STRING
);
2296 /* Check if we can represent this string as a long integer */
2297 if (isStringRepresentableAsLong(s
,&value
) == REDIS_ERR
) return REDIS_ERR
;
2299 /* Ok, this object can be encoded */
2300 o
->encoding
= REDIS_ENCODING_INT
;
2302 o
->ptr
= (void*) value
;
2306 /* Get a decoded version of an encoded object (returned as a new object).
2307 * If the object is already raw-encoded just increment the ref count. */
2308 static robj
*getDecodedObject(robj
*o
) {
2311 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2315 if (o
->type
== REDIS_STRING
&& o
->encoding
== REDIS_ENCODING_INT
) {
2318 snprintf(buf
,32,"%ld",(long)o
->ptr
);
2319 dec
= createStringObject(buf
,strlen(buf
));
2322 redisAssert(1 != 1);
2326 /* Compare two string objects via strcmp() or alike.
2327 * Note that the objects may be integer-encoded. In such a case we
2328 * use snprintf() to get a string representation of the numbers on the stack
2329 * and compare the strings, it's much faster than calling getDecodedObject().
2331 * Important note: if objects are not integer encoded, but binary-safe strings,
2332 * sdscmp() from sds.c will apply memcmp() so this function ca be considered
2334 static int compareStringObjects(robj
*a
, robj
*b
) {
2335 redisAssert(a
->type
== REDIS_STRING
&& b
->type
== REDIS_STRING
);
2336 char bufa
[128], bufb
[128], *astr
, *bstr
;
2339 if (a
== b
) return 0;
2340 if (a
->encoding
!= REDIS_ENCODING_RAW
) {
2341 snprintf(bufa
,sizeof(bufa
),"%ld",(long) a
->ptr
);
2347 if (b
->encoding
!= REDIS_ENCODING_RAW
) {
2348 snprintf(bufb
,sizeof(bufb
),"%ld",(long) b
->ptr
);
2354 return bothsds
? sdscmp(astr
,bstr
) : strcmp(astr
,bstr
);
2357 static size_t stringObjectLen(robj
*o
) {
2358 redisAssert(o
->type
== REDIS_STRING
);
2359 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2360 return sdslen(o
->ptr
);
2364 return snprintf(buf
,32,"%ld",(long)o
->ptr
);
2368 /*============================ DB saving/loading ============================ */
2370 static int rdbSaveType(FILE *fp
, unsigned char type
) {
2371 if (fwrite(&type
,1,1,fp
) == 0) return -1;
2375 static int rdbSaveTime(FILE *fp
, time_t t
) {
2376 int32_t t32
= (int32_t) t
;
2377 if (fwrite(&t32
,4,1,fp
) == 0) return -1;
2381 /* check rdbLoadLen() comments for more info */
2382 static int rdbSaveLen(FILE *fp
, uint32_t len
) {
2383 unsigned char buf
[2];
2386 /* Save a 6 bit len */
2387 buf
[0] = (len
&0xFF)|(REDIS_RDB_6BITLEN
<<6);
2388 if (fwrite(buf
,1,1,fp
) == 0) return -1;
2389 } else if (len
< (1<<14)) {
2390 /* Save a 14 bit len */
2391 buf
[0] = ((len
>>8)&0xFF)|(REDIS_RDB_14BITLEN
<<6);
2393 if (fwrite(buf
,2,1,fp
) == 0) return -1;
2395 /* Save a 32 bit len */
2396 buf
[0] = (REDIS_RDB_32BITLEN
<<6);
2397 if (fwrite(buf
,1,1,fp
) == 0) return -1;
2399 if (fwrite(&len
,4,1,fp
) == 0) return -1;
2404 /* String objects in the form "2391" "-100" without any space and with a
2405 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2406 * encoded as integers to save space */
2407 static int rdbTryIntegerEncoding(sds s
, unsigned char *enc
) {
2409 char *endptr
, buf
[32];
2411 /* Check if it's possible to encode this value as a number */
2412 value
= strtoll(s
, &endptr
, 10);
2413 if (endptr
[0] != '\0') return 0;
2414 snprintf(buf
,32,"%lld",value
);
2416 /* If the number converted back into a string is not identical
2417 * then it's not possible to encode the string as integer */
2418 if (strlen(buf
) != sdslen(s
) || memcmp(buf
,s
,sdslen(s
))) return 0;
2420 /* Finally check if it fits in our ranges */
2421 if (value
>= -(1<<7) && value
<= (1<<7)-1) {
2422 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT8
;
2423 enc
[1] = value
&0xFF;
2425 } else if (value
>= -(1<<15) && value
<= (1<<15)-1) {
2426 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT16
;
2427 enc
[1] = value
&0xFF;
2428 enc
[2] = (value
>>8)&0xFF;
2430 } else if (value
>= -((long long)1<<31) && value
<= ((long long)1<<31)-1) {
2431 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT32
;
2432 enc
[1] = value
&0xFF;
2433 enc
[2] = (value
>>8)&0xFF;
2434 enc
[3] = (value
>>16)&0xFF;
2435 enc
[4] = (value
>>24)&0xFF;
2442 static int rdbSaveLzfStringObject(FILE *fp
, robj
*obj
) {
2443 unsigned int comprlen
, outlen
;
2447 /* We require at least four bytes compression for this to be worth it */
2448 outlen
= sdslen(obj
->ptr
)-4;
2449 if (outlen
<= 0) return 0;
2450 if ((out
= zmalloc(outlen
+1)) == NULL
) return 0;
2451 comprlen
= lzf_compress(obj
->ptr
, sdslen(obj
->ptr
), out
, outlen
);
2452 if (comprlen
== 0) {
2456 /* Data compressed! Let's save it on disk */
2457 byte
= (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_LZF
;
2458 if (fwrite(&byte
,1,1,fp
) == 0) goto writeerr
;
2459 if (rdbSaveLen(fp
,comprlen
) == -1) goto writeerr
;
2460 if (rdbSaveLen(fp
,sdslen(obj
->ptr
)) == -1) goto writeerr
;
2461 if (fwrite(out
,comprlen
,1,fp
) == 0) goto writeerr
;
2470 /* Save a string objet as [len][data] on disk. If the object is a string
2471 * representation of an integer value we try to safe it in a special form */
2472 static int rdbSaveStringObjectRaw(FILE *fp
, robj
*obj
) {
2476 len
= sdslen(obj
->ptr
);
2478 /* Try integer encoding */
2480 unsigned char buf
[5];
2481 if ((enclen
= rdbTryIntegerEncoding(obj
->ptr
,buf
)) > 0) {
2482 if (fwrite(buf
,enclen
,1,fp
) == 0) return -1;
2487 /* Try LZF compression - under 20 bytes it's unable to compress even
2488 * aaaaaaaaaaaaaaaaaa so skip it */
2492 retval
= rdbSaveLzfStringObject(fp
,obj
);
2493 if (retval
== -1) return -1;
2494 if (retval
> 0) return 0;
2495 /* retval == 0 means data can't be compressed, save the old way */
2498 /* Store verbatim */
2499 if (rdbSaveLen(fp
,len
) == -1) return -1;
2500 if (len
&& fwrite(obj
->ptr
,len
,1,fp
) == 0) return -1;
2504 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2505 static int rdbSaveStringObject(FILE *fp
, robj
*obj
) {
2508 obj
= getDecodedObject(obj
);
2509 retval
= rdbSaveStringObjectRaw(fp
,obj
);
2514 /* Save a double value. Doubles are saved as strings prefixed by an unsigned
2515 * 8 bit integer specifing the length of the representation.
2516 * This 8 bit integer has special values in order to specify the following
2522 static int rdbSaveDoubleValue(FILE *fp
, double val
) {
2523 unsigned char buf
[128];
2529 } else if (!isfinite(val
)) {
2531 buf
[0] = (val
< 0) ? 255 : 254;
2533 snprintf((char*)buf
+1,sizeof(buf
)-1,"%.17g",val
);
2534 buf
[0] = strlen((char*)buf
+1);
2537 if (fwrite(buf
,len
,1,fp
) == 0) return -1;
2541 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2542 static int rdbSave(char *filename
) {
2543 dictIterator
*di
= NULL
;
2548 time_t now
= time(NULL
);
2550 snprintf(tmpfile
,256,"temp-%d.rdb", (int) getpid());
2551 fp
= fopen(tmpfile
,"w");
2553 redisLog(REDIS_WARNING
, "Failed saving the DB: %s", strerror(errno
));
2556 if (fwrite("REDIS0001",9,1,fp
) == 0) goto werr
;
2557 for (j
= 0; j
< server
.dbnum
; j
++) {
2558 redisDb
*db
= server
.db
+j
;
2560 if (dictSize(d
) == 0) continue;
2561 di
= dictGetIterator(d
);
2567 /* Write the SELECT DB opcode */
2568 if (rdbSaveType(fp
,REDIS_SELECTDB
) == -1) goto werr
;
2569 if (rdbSaveLen(fp
,j
) == -1) goto werr
;
2571 /* Iterate this DB writing every entry */
2572 while((de
= dictNext(di
)) != NULL
) {
2573 robj
*key
= dictGetEntryKey(de
);
2574 robj
*o
= dictGetEntryVal(de
);
2575 time_t expiretime
= getExpire(db
,key
);
2577 /* Save the expire time */
2578 if (expiretime
!= -1) {
2579 /* If this key is already expired skip it */
2580 if (expiretime
< now
) continue;
2581 if (rdbSaveType(fp
,REDIS_EXPIRETIME
) == -1) goto werr
;
2582 if (rdbSaveTime(fp
,expiretime
) == -1) goto werr
;
2584 /* Save the key and associated value */
2585 if (rdbSaveType(fp
,o
->type
) == -1) goto werr
;
2586 if (rdbSaveStringObject(fp
,key
) == -1) goto werr
;
2587 if (o
->type
== REDIS_STRING
) {
2588 /* Save a string value */
2589 if (rdbSaveStringObject(fp
,o
) == -1) goto werr
;
2590 } else if (o
->type
== REDIS_LIST
) {
2591 /* Save a list value */
2592 list
*list
= o
->ptr
;
2596 if (rdbSaveLen(fp
,listLength(list
)) == -1) goto werr
;
2597 while((ln
= listYield(list
))) {
2598 robj
*eleobj
= listNodeValue(ln
);
2600 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
2602 } else if (o
->type
== REDIS_SET
) {
2603 /* Save a set value */
2605 dictIterator
*di
= dictGetIterator(set
);
2608 if (rdbSaveLen(fp
,dictSize(set
)) == -1) goto werr
;
2609 while((de
= dictNext(di
)) != NULL
) {
2610 robj
*eleobj
= dictGetEntryKey(de
);
2612 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
2614 dictReleaseIterator(di
);
2615 } else if (o
->type
== REDIS_ZSET
) {
2616 /* Save a set value */
2618 dictIterator
*di
= dictGetIterator(zs
->dict
);
2621 if (rdbSaveLen(fp
,dictSize(zs
->dict
)) == -1) goto werr
;
2622 while((de
= dictNext(di
)) != NULL
) {
2623 robj
*eleobj
= dictGetEntryKey(de
);
2624 double *score
= dictGetEntryVal(de
);
2626 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
2627 if (rdbSaveDoubleValue(fp
,*score
) == -1) goto werr
;
2629 dictReleaseIterator(di
);
2631 redisAssert(0 != 0);
2634 dictReleaseIterator(di
);
2637 if (rdbSaveType(fp
,REDIS_EOF
) == -1) goto werr
;
2639 /* Make sure data will not remain on the OS's output buffers */
2644 /* Use RENAME to make sure the DB file is changed atomically only
2645 * if the generate DB file is ok. */
2646 if (rename(tmpfile
,filename
) == -1) {
2647 redisLog(REDIS_WARNING
,"Error moving temp DB file on the final destination: %s", strerror(errno
));
2651 redisLog(REDIS_NOTICE
,"DB saved on disk");
2653 server
.lastsave
= time(NULL
);
2659 redisLog(REDIS_WARNING
,"Write error saving DB on disk: %s", strerror(errno
));
2660 if (di
) dictReleaseIterator(di
);
2664 static int rdbSaveBackground(char *filename
) {
2667 if (server
.bgsavechildpid
!= -1) return REDIS_ERR
;
2668 if ((childpid
= fork()) == 0) {
2671 if (rdbSave(filename
) == REDIS_OK
) {
2678 if (childpid
== -1) {
2679 redisLog(REDIS_WARNING
,"Can't save in background: fork: %s",
2683 redisLog(REDIS_NOTICE
,"Background saving started by pid %d",childpid
);
2684 server
.bgsavechildpid
= childpid
;
2687 return REDIS_OK
; /* unreached */
2690 static void rdbRemoveTempFile(pid_t childpid
) {
2693 snprintf(tmpfile
,256,"temp-%d.rdb", (int) childpid
);
2697 static int rdbLoadType(FILE *fp
) {
2699 if (fread(&type
,1,1,fp
) == 0) return -1;
2703 static time_t rdbLoadTime(FILE *fp
) {
2705 if (fread(&t32
,4,1,fp
) == 0) return -1;
2706 return (time_t) t32
;
2709 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2710 * of this file for a description of how this are stored on disk.
2712 * isencoded is set to 1 if the readed length is not actually a length but
2713 * an "encoding type", check the above comments for more info */
2714 static uint32_t rdbLoadLen(FILE *fp
, int rdbver
, int *isencoded
) {
2715 unsigned char buf
[2];
2718 if (isencoded
) *isencoded
= 0;
2720 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
2725 if (fread(buf
,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2726 type
= (buf
[0]&0xC0)>>6;
2727 if (type
== REDIS_RDB_6BITLEN
) {
2728 /* Read a 6 bit len */
2730 } else if (type
== REDIS_RDB_ENCVAL
) {
2731 /* Read a 6 bit len encoding type */
2732 if (isencoded
) *isencoded
= 1;
2734 } else if (type
== REDIS_RDB_14BITLEN
) {
2735 /* Read a 14 bit len */
2736 if (fread(buf
+1,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2737 return ((buf
[0]&0x3F)<<8)|buf
[1];
2739 /* Read a 32 bit len */
2740 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
2746 static robj
*rdbLoadIntegerObject(FILE *fp
, int enctype
) {
2747 unsigned char enc
[4];
2750 if (enctype
== REDIS_RDB_ENC_INT8
) {
2751 if (fread(enc
,1,1,fp
) == 0) return NULL
;
2752 val
= (signed char)enc
[0];
2753 } else if (enctype
== REDIS_RDB_ENC_INT16
) {
2755 if (fread(enc
,2,1,fp
) == 0) return NULL
;
2756 v
= enc
[0]|(enc
[1]<<8);
2758 } else if (enctype
== REDIS_RDB_ENC_INT32
) {
2760 if (fread(enc
,4,1,fp
) == 0) return NULL
;
2761 v
= enc
[0]|(enc
[1]<<8)|(enc
[2]<<16)|(enc
[3]<<24);
2764 val
= 0; /* anti-warning */
2767 return createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",val
));
2770 static robj
*rdbLoadLzfStringObject(FILE*fp
, int rdbver
) {
2771 unsigned int len
, clen
;
2772 unsigned char *c
= NULL
;
2775 if ((clen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2776 if ((len
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2777 if ((c
= zmalloc(clen
)) == NULL
) goto err
;
2778 if ((val
= sdsnewlen(NULL
,len
)) == NULL
) goto err
;
2779 if (fread(c
,clen
,1,fp
) == 0) goto err
;
2780 if (lzf_decompress(c
,clen
,val
,len
) == 0) goto err
;
2782 return createObject(REDIS_STRING
,val
);
2789 static robj
*rdbLoadStringObject(FILE*fp
, int rdbver
) {
2794 len
= rdbLoadLen(fp
,rdbver
,&isencoded
);
2797 case REDIS_RDB_ENC_INT8
:
2798 case REDIS_RDB_ENC_INT16
:
2799 case REDIS_RDB_ENC_INT32
:
2800 return tryObjectSharing(rdbLoadIntegerObject(fp
,len
));
2801 case REDIS_RDB_ENC_LZF
:
2802 return tryObjectSharing(rdbLoadLzfStringObject(fp
,rdbver
));
2808 if (len
== REDIS_RDB_LENERR
) return NULL
;
2809 val
= sdsnewlen(NULL
,len
);
2810 if (len
&& fread(val
,len
,1,fp
) == 0) {
2814 return tryObjectSharing(createObject(REDIS_STRING
,val
));
2817 /* For information about double serialization check rdbSaveDoubleValue() */
2818 static int rdbLoadDoubleValue(FILE *fp
, double *val
) {
2822 if (fread(&len
,1,1,fp
) == 0) return -1;
2824 case 255: *val
= R_NegInf
; return 0;
2825 case 254: *val
= R_PosInf
; return 0;
2826 case 253: *val
= R_Nan
; return 0;
2828 if (fread(buf
,len
,1,fp
) == 0) return -1;
2830 sscanf(buf
, "%lg", val
);
2835 static int rdbLoad(char *filename
) {
2837 robj
*keyobj
= NULL
;
2839 int type
, retval
, rdbver
;
2840 dict
*d
= server
.db
[0].dict
;
2841 redisDb
*db
= server
.db
+0;
2843 time_t expiretime
= -1, now
= time(NULL
);
2845 fp
= fopen(filename
,"r");
2846 if (!fp
) return REDIS_ERR
;
2847 if (fread(buf
,9,1,fp
) == 0) goto eoferr
;
2849 if (memcmp(buf
,"REDIS",5) != 0) {
2851 redisLog(REDIS_WARNING
,"Wrong signature trying to load DB from file");
2854 rdbver
= atoi(buf
+5);
2857 redisLog(REDIS_WARNING
,"Can't handle RDB format version %d",rdbver
);
2864 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
2865 if (type
== REDIS_EXPIRETIME
) {
2866 if ((expiretime
= rdbLoadTime(fp
)) == -1) goto eoferr
;
2867 /* We read the time so we need to read the object type again */
2868 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
2870 if (type
== REDIS_EOF
) break;
2871 /* Handle SELECT DB opcode as a special case */
2872 if (type
== REDIS_SELECTDB
) {
2873 if ((dbid
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2875 if (dbid
>= (unsigned)server
.dbnum
) {
2876 redisLog(REDIS_WARNING
,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server
.dbnum
);
2879 db
= server
.db
+dbid
;
2884 if ((keyobj
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2886 if (type
== REDIS_STRING
) {
2887 /* Read string value */
2888 if ((o
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2889 tryObjectEncoding(o
);
2890 } else if (type
== REDIS_LIST
|| type
== REDIS_SET
) {
2891 /* Read list/set value */
2894 if ((listlen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2896 o
= (type
== REDIS_LIST
) ? createListObject() : createSetObject();
2897 /* Load every single element of the list/set */
2901 if ((ele
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2902 tryObjectEncoding(ele
);
2903 if (type
== REDIS_LIST
) {
2904 listAddNodeTail((list
*)o
->ptr
,ele
);
2906 dictAdd((dict
*)o
->ptr
,ele
,NULL
);
2909 } else if (type
== REDIS_ZSET
) {
2910 /* Read list/set value */
2914 if ((zsetlen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2916 o
= createZsetObject();
2918 /* Load every single element of the list/set */
2921 double *score
= zmalloc(sizeof(double));
2923 if ((ele
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2924 tryObjectEncoding(ele
);
2925 if (rdbLoadDoubleValue(fp
,score
) == -1) goto eoferr
;
2926 dictAdd(zs
->dict
,ele
,score
);
2927 zslInsert(zs
->zsl
,*score
,ele
);
2928 incrRefCount(ele
); /* added to skiplist */
2931 redisAssert(0 != 0);
2933 /* Add the new object in the hash table */
2934 retval
= dictAdd(d
,keyobj
,o
);
2935 if (retval
== DICT_ERR
) {
2936 redisLog(REDIS_WARNING
,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj
->ptr
);
2939 /* Set the expire time if needed */
2940 if (expiretime
!= -1) {
2941 setExpire(db
,keyobj
,expiretime
);
2942 /* Delete this key if already expired */
2943 if (expiretime
< now
) deleteKey(db
,keyobj
);
2951 eoferr
: /* unexpected end of file is handled here with a fatal exit */
2952 if (keyobj
) decrRefCount(keyobj
);
2953 redisLog(REDIS_WARNING
,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
2955 return REDIS_ERR
; /* Just to avoid warning */
2958 /*================================== Commands =============================== */
2960 static void authCommand(redisClient
*c
) {
2961 if (!server
.requirepass
|| !strcmp(c
->argv
[1]->ptr
, server
.requirepass
)) {
2962 c
->authenticated
= 1;
2963 addReply(c
,shared
.ok
);
2965 c
->authenticated
= 0;
2966 addReplySds(c
,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
2970 static void pingCommand(redisClient
*c
) {
2971 addReply(c
,shared
.pong
);
2974 static void echoCommand(redisClient
*c
) {
2975 addReplyBulkLen(c
,c
->argv
[1]);
2976 addReply(c
,c
->argv
[1]);
2977 addReply(c
,shared
.crlf
);
2980 /*=================================== Strings =============================== */
2982 static void setGenericCommand(redisClient
*c
, int nx
) {
2985 if (nx
) deleteIfVolatile(c
->db
,c
->argv
[1]);
2986 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2987 if (retval
== DICT_ERR
) {
2989 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2990 incrRefCount(c
->argv
[2]);
2992 addReply(c
,shared
.czero
);
2996 incrRefCount(c
->argv
[1]);
2997 incrRefCount(c
->argv
[2]);
3000 removeExpire(c
->db
,c
->argv
[1]);
3001 addReply(c
, nx
? shared
.cone
: shared
.ok
);
3004 static void setCommand(redisClient
*c
) {
3005 setGenericCommand(c
,0);
3008 static void setnxCommand(redisClient
*c
) {
3009 setGenericCommand(c
,1);
3012 static void getCommand(redisClient
*c
) {
3013 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3016 addReply(c
,shared
.nullbulk
);
3018 if (o
->type
!= REDIS_STRING
) {
3019 addReply(c
,shared
.wrongtypeerr
);
3021 addReplyBulkLen(c
,o
);
3023 addReply(c
,shared
.crlf
);
3028 static void getsetCommand(redisClient
*c
) {
3030 if (dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]) == DICT_ERR
) {
3031 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
3033 incrRefCount(c
->argv
[1]);
3035 incrRefCount(c
->argv
[2]);
3037 removeExpire(c
->db
,c
->argv
[1]);
3040 static void mgetCommand(redisClient
*c
) {
3043 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",c
->argc
-1));
3044 for (j
= 1; j
< c
->argc
; j
++) {
3045 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[j
]);
3047 addReply(c
,shared
.nullbulk
);
3049 if (o
->type
!= REDIS_STRING
) {
3050 addReply(c
,shared
.nullbulk
);
3052 addReplyBulkLen(c
,o
);
3054 addReply(c
,shared
.crlf
);
3060 static void msetGenericCommand(redisClient
*c
, int nx
) {
3061 int j
, busykeys
= 0;
3063 if ((c
->argc
% 2) == 0) {
3064 addReplySds(c
,sdsnew("-ERR wrong number of arguments for MSET\r\n"));
3067 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
3068 * set nothing at all if at least one already key exists. */
3070 for (j
= 1; j
< c
->argc
; j
+= 2) {
3071 if (lookupKeyWrite(c
->db
,c
->argv
[j
]) != NULL
) {
3077 addReply(c
, shared
.czero
);
3081 for (j
= 1; j
< c
->argc
; j
+= 2) {
3084 tryObjectEncoding(c
->argv
[j
+1]);
3085 retval
= dictAdd(c
->db
->dict
,c
->argv
[j
],c
->argv
[j
+1]);
3086 if (retval
== DICT_ERR
) {
3087 dictReplace(c
->db
->dict
,c
->argv
[j
],c
->argv
[j
+1]);
3088 incrRefCount(c
->argv
[j
+1]);
3090 incrRefCount(c
->argv
[j
]);
3091 incrRefCount(c
->argv
[j
+1]);
3093 removeExpire(c
->db
,c
->argv
[j
]);
3095 server
.dirty
+= (c
->argc
-1)/2;
3096 addReply(c
, nx
? shared
.cone
: shared
.ok
);
3099 static void msetCommand(redisClient
*c
) {
3100 msetGenericCommand(c
,0);
3103 static void msetnxCommand(redisClient
*c
) {
3104 msetGenericCommand(c
,1);
3107 static void incrDecrCommand(redisClient
*c
, long long incr
) {
3112 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3116 if (o
->type
!= REDIS_STRING
) {
3121 if (o
->encoding
== REDIS_ENCODING_RAW
)
3122 value
= strtoll(o
->ptr
, &eptr
, 10);
3123 else if (o
->encoding
== REDIS_ENCODING_INT
)
3124 value
= (long)o
->ptr
;
3126 redisAssert(1 != 1);
3131 o
= createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",value
));
3132 tryObjectEncoding(o
);
3133 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],o
);
3134 if (retval
== DICT_ERR
) {
3135 dictReplace(c
->db
->dict
,c
->argv
[1],o
);
3136 removeExpire(c
->db
,c
->argv
[1]);
3138 incrRefCount(c
->argv
[1]);
3141 addReply(c
,shared
.colon
);
3143 addReply(c
,shared
.crlf
);
3146 static void incrCommand(redisClient
*c
) {
3147 incrDecrCommand(c
,1);
3150 static void decrCommand(redisClient
*c
) {
3151 incrDecrCommand(c
,-1);
3154 static void incrbyCommand(redisClient
*c
) {
3155 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
3156 incrDecrCommand(c
,incr
);
3159 static void decrbyCommand(redisClient
*c
) {
3160 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
3161 incrDecrCommand(c
,-incr
);
3164 /* ========================= Type agnostic commands ========================= */
3166 static void delCommand(redisClient
*c
) {
3169 for (j
= 1; j
< c
->argc
; j
++) {
3170 if (deleteKey(c
->db
,c
->argv
[j
])) {
3177 addReply(c
,shared
.czero
);
3180 addReply(c
,shared
.cone
);
3183 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",deleted
));
3188 static void existsCommand(redisClient
*c
) {
3189 addReply(c
,lookupKeyRead(c
->db
,c
->argv
[1]) ? shared
.cone
: shared
.czero
);
3192 static void selectCommand(redisClient
*c
) {
3193 int id
= atoi(c
->argv
[1]->ptr
);
3195 if (selectDb(c
,id
) == REDIS_ERR
) {
3196 addReplySds(c
,sdsnew("-ERR invalid DB index\r\n"));
3198 addReply(c
,shared
.ok
);
3202 static void randomkeyCommand(redisClient
*c
) {
3206 de
= dictGetRandomKey(c
->db
->dict
);
3207 if (!de
|| expireIfNeeded(c
->db
,dictGetEntryKey(de
)) == 0) break;
3210 addReply(c
,shared
.plus
);
3211 addReply(c
,shared
.crlf
);
3213 addReply(c
,shared
.plus
);
3214 addReply(c
,dictGetEntryKey(de
));
3215 addReply(c
,shared
.crlf
);
3219 static void keysCommand(redisClient
*c
) {
3222 sds pattern
= c
->argv
[1]->ptr
;
3223 int plen
= sdslen(pattern
);
3224 unsigned long numkeys
= 0, keyslen
= 0;
3225 robj
*lenobj
= createObject(REDIS_STRING
,NULL
);
3227 di
= dictGetIterator(c
->db
->dict
);
3229 decrRefCount(lenobj
);
3230 while((de
= dictNext(di
)) != NULL
) {
3231 robj
*keyobj
= dictGetEntryKey(de
);
3233 sds key
= keyobj
->ptr
;
3234 if ((pattern
[0] == '*' && pattern
[1] == '\0') ||
3235 stringmatchlen(pattern
,plen
,key
,sdslen(key
),0)) {
3236 if (expireIfNeeded(c
->db
,keyobj
) == 0) {
3238 addReply(c
,shared
.space
);
3241 keyslen
+= sdslen(key
);
3245 dictReleaseIterator(di
);
3246 lenobj
->ptr
= sdscatprintf(sdsempty(),"$%lu\r\n",keyslen
+(numkeys
? (numkeys
-1) : 0));
3247 addReply(c
,shared
.crlf
);
3250 static void dbsizeCommand(redisClient
*c
) {
3252 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c
->db
->dict
)));
3255 static void lastsaveCommand(redisClient
*c
) {
3257 sdscatprintf(sdsempty(),":%lu\r\n",server
.lastsave
));
3260 static void typeCommand(redisClient
*c
) {
3264 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3269 case REDIS_STRING
: type
= "+string"; break;
3270 case REDIS_LIST
: type
= "+list"; break;
3271 case REDIS_SET
: type
= "+set"; break;
3272 case REDIS_ZSET
: type
= "+zset"; break;
3273 default: type
= "unknown"; break;
3276 addReplySds(c
,sdsnew(type
));
3277 addReply(c
,shared
.crlf
);
3280 static void saveCommand(redisClient
*c
) {
3281 if (server
.bgsavechildpid
!= -1) {
3282 addReplySds(c
,sdsnew("-ERR background save in progress\r\n"));
3285 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
3286 addReply(c
,shared
.ok
);
3288 addReply(c
,shared
.err
);
3292 static void bgsaveCommand(redisClient
*c
) {
3293 if (server
.bgsavechildpid
!= -1) {
3294 addReplySds(c
,sdsnew("-ERR background save already in progress\r\n"));
3297 if (rdbSaveBackground(server
.dbfilename
) == REDIS_OK
) {
3298 addReply(c
,shared
.ok
);
3300 addReply(c
,shared
.err
);
3304 static void shutdownCommand(redisClient
*c
) {
3305 redisLog(REDIS_WARNING
,"User requested shutdown, saving DB...");
3306 /* Kill the saving child if there is a background saving in progress.
3307 We want to avoid race conditions, for instance our saving child may
3308 overwrite the synchronous saving did by SHUTDOWN. */
3309 if (server
.bgsavechildpid
!= -1) {
3310 redisLog(REDIS_WARNING
,"There is a live saving child. Killing it!");
3311 kill(server
.bgsavechildpid
,SIGKILL
);
3312 rdbRemoveTempFile(server
.bgsavechildpid
);
3315 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
3316 if (server
.daemonize
)
3317 unlink(server
.pidfile
);
3318 redisLog(REDIS_WARNING
,"%zu bytes used at exit",zmalloc_used_memory());
3319 redisLog(REDIS_WARNING
,"Server exit now, bye bye...");
3322 /* Ooops.. error saving! The best we can do is to continue operating.
3323 * Note that if there was a background saving process, in the next
3324 * cron() Redis will be notified that the background saving aborted,
3325 * handling special stuff like slaves pending for synchronization... */
3326 redisLog(REDIS_WARNING
,"Error trying to save the DB, can't exit");
3327 addReplySds(c
,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3331 static void renameGenericCommand(redisClient
*c
, int nx
) {
3334 /* To use the same key as src and dst is probably an error */
3335 if (sdscmp(c
->argv
[1]->ptr
,c
->argv
[2]->ptr
) == 0) {
3336 addReply(c
,shared
.sameobjecterr
);
3340 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3342 addReply(c
,shared
.nokeyerr
);
3346 deleteIfVolatile(c
->db
,c
->argv
[2]);
3347 if (dictAdd(c
->db
->dict
,c
->argv
[2],o
) == DICT_ERR
) {
3350 addReply(c
,shared
.czero
);
3353 dictReplace(c
->db
->dict
,c
->argv
[2],o
);
3355 incrRefCount(c
->argv
[2]);
3357 deleteKey(c
->db
,c
->argv
[1]);
3359 addReply(c
,nx
? shared
.cone
: shared
.ok
);
3362 static void renameCommand(redisClient
*c
) {
3363 renameGenericCommand(c
,0);
3366 static void renamenxCommand(redisClient
*c
) {
3367 renameGenericCommand(c
,1);
3370 static void moveCommand(redisClient
*c
) {
3375 /* Obtain source and target DB pointers */
3378 if (selectDb(c
,atoi(c
->argv
[2]->ptr
)) == REDIS_ERR
) {
3379 addReply(c
,shared
.outofrangeerr
);
3383 selectDb(c
,srcid
); /* Back to the source DB */
3385 /* If the user is moving using as target the same
3386 * DB as the source DB it is probably an error. */
3388 addReply(c
,shared
.sameobjecterr
);
3392 /* Check if the element exists and get a reference */
3393 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3395 addReply(c
,shared
.czero
);
3399 /* Try to add the element to the target DB */
3400 deleteIfVolatile(dst
,c
->argv
[1]);
3401 if (dictAdd(dst
->dict
,c
->argv
[1],o
) == DICT_ERR
) {
3402 addReply(c
,shared
.czero
);
3405 incrRefCount(c
->argv
[1]);
3408 /* OK! key moved, free the entry in the source DB */
3409 deleteKey(src
,c
->argv
[1]);
3411 addReply(c
,shared
.cone
);
3414 /* =================================== Lists ================================ */
3415 static void pushGenericCommand(redisClient
*c
, int where
) {
3419 lobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3421 lobj
= createListObject();
3423 if (where
== REDIS_HEAD
) {
3424 listAddNodeHead(list
,c
->argv
[2]);
3426 listAddNodeTail(list
,c
->argv
[2]);
3428 dictAdd(c
->db
->dict
,c
->argv
[1],lobj
);
3429 incrRefCount(c
->argv
[1]);
3430 incrRefCount(c
->argv
[2]);
3432 if (lobj
->type
!= REDIS_LIST
) {
3433 addReply(c
,shared
.wrongtypeerr
);
3437 if (where
== REDIS_HEAD
) {
3438 listAddNodeHead(list
,c
->argv
[2]);
3440 listAddNodeTail(list
,c
->argv
[2]);
3442 incrRefCount(c
->argv
[2]);
3445 addReply(c
,shared
.ok
);
3448 static void lpushCommand(redisClient
*c
) {
3449 pushGenericCommand(c
,REDIS_HEAD
);
3452 static void rpushCommand(redisClient
*c
) {
3453 pushGenericCommand(c
,REDIS_TAIL
);
3456 static void llenCommand(redisClient
*c
) {
3460 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3462 addReply(c
,shared
.czero
);
3465 if (o
->type
!= REDIS_LIST
) {
3466 addReply(c
,shared
.wrongtypeerr
);
3469 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",listLength(l
)));
3474 static void lindexCommand(redisClient
*c
) {
3476 int index
= atoi(c
->argv
[2]->ptr
);
3478 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3480 addReply(c
,shared
.nullbulk
);
3482 if (o
->type
!= REDIS_LIST
) {
3483 addReply(c
,shared
.wrongtypeerr
);
3485 list
*list
= o
->ptr
;
3488 ln
= listIndex(list
, index
);
3490 addReply(c
,shared
.nullbulk
);
3492 robj
*ele
= listNodeValue(ln
);
3493 addReplyBulkLen(c
,ele
);
3495 addReply(c
,shared
.crlf
);
3501 static void lsetCommand(redisClient
*c
) {
3503 int index
= atoi(c
->argv
[2]->ptr
);
3505 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3507 addReply(c
,shared
.nokeyerr
);
3509 if (o
->type
!= REDIS_LIST
) {
3510 addReply(c
,shared
.wrongtypeerr
);
3512 list
*list
= o
->ptr
;
3515 ln
= listIndex(list
, index
);
3517 addReply(c
,shared
.outofrangeerr
);
3519 robj
*ele
= listNodeValue(ln
);
3522 listNodeValue(ln
) = c
->argv
[3];
3523 incrRefCount(c
->argv
[3]);
3524 addReply(c
,shared
.ok
);
3531 static void popGenericCommand(redisClient
*c
, int where
) {
3534 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3536 addReply(c
,shared
.nullbulk
);
3538 if (o
->type
!= REDIS_LIST
) {
3539 addReply(c
,shared
.wrongtypeerr
);
3541 list
*list
= o
->ptr
;
3544 if (where
== REDIS_HEAD
)
3545 ln
= listFirst(list
);
3547 ln
= listLast(list
);
3550 addReply(c
,shared
.nullbulk
);
3552 robj
*ele
= listNodeValue(ln
);
3553 addReplyBulkLen(c
,ele
);
3555 addReply(c
,shared
.crlf
);
3556 listDelNode(list
,ln
);
3563 static void lpopCommand(redisClient
*c
) {
3564 popGenericCommand(c
,REDIS_HEAD
);
3567 static void rpopCommand(redisClient
*c
) {
3568 popGenericCommand(c
,REDIS_TAIL
);
3571 static void lrangeCommand(redisClient
*c
) {
3573 int start
= atoi(c
->argv
[2]->ptr
);
3574 int end
= atoi(c
->argv
[3]->ptr
);
3576 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3578 addReply(c
,shared
.nullmultibulk
);
3580 if (o
->type
!= REDIS_LIST
) {
3581 addReply(c
,shared
.wrongtypeerr
);
3583 list
*list
= o
->ptr
;
3585 int llen
= listLength(list
);
3589 /* convert negative indexes */
3590 if (start
< 0) start
= llen
+start
;
3591 if (end
< 0) end
= llen
+end
;
3592 if (start
< 0) start
= 0;
3593 if (end
< 0) end
= 0;
3595 /* indexes sanity checks */
3596 if (start
> end
|| start
>= llen
) {
3597 /* Out of range start or start > end result in empty list */
3598 addReply(c
,shared
.emptymultibulk
);
3601 if (end
>= llen
) end
= llen
-1;
3602 rangelen
= (end
-start
)+1;
3604 /* Return the result in form of a multi-bulk reply */
3605 ln
= listIndex(list
, start
);
3606 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",rangelen
));
3607 for (j
= 0; j
< rangelen
; j
++) {
3608 ele
= listNodeValue(ln
);
3609 addReplyBulkLen(c
,ele
);
3611 addReply(c
,shared
.crlf
);
3618 static void ltrimCommand(redisClient
*c
) {
3620 int start
= atoi(c
->argv
[2]->ptr
);
3621 int end
= atoi(c
->argv
[3]->ptr
);
3623 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3625 addReply(c
,shared
.nokeyerr
);
3627 if (o
->type
!= REDIS_LIST
) {
3628 addReply(c
,shared
.wrongtypeerr
);
3630 list
*list
= o
->ptr
;
3632 int llen
= listLength(list
);
3633 int j
, ltrim
, rtrim
;
3635 /* convert negative indexes */
3636 if (start
< 0) start
= llen
+start
;
3637 if (end
< 0) end
= llen
+end
;
3638 if (start
< 0) start
= 0;
3639 if (end
< 0) end
= 0;
3641 /* indexes sanity checks */
3642 if (start
> end
|| start
>= llen
) {
3643 /* Out of range start or start > end result in empty list */
3647 if (end
>= llen
) end
= llen
-1;
3652 /* Remove list elements to perform the trim */
3653 for (j
= 0; j
< ltrim
; j
++) {
3654 ln
= listFirst(list
);
3655 listDelNode(list
,ln
);
3657 for (j
= 0; j
< rtrim
; j
++) {
3658 ln
= listLast(list
);
3659 listDelNode(list
,ln
);
3662 addReply(c
,shared
.ok
);
3667 static void lremCommand(redisClient
*c
) {
3670 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3672 addReply(c
,shared
.czero
);
3674 if (o
->type
!= REDIS_LIST
) {
3675 addReply(c
,shared
.wrongtypeerr
);
3677 list
*list
= o
->ptr
;
3678 listNode
*ln
, *next
;
3679 int toremove
= atoi(c
->argv
[2]->ptr
);
3684 toremove
= -toremove
;
3687 ln
= fromtail
? list
->tail
: list
->head
;
3689 robj
*ele
= listNodeValue(ln
);
3691 next
= fromtail
? ln
->prev
: ln
->next
;
3692 if (compareStringObjects(ele
,c
->argv
[3]) == 0) {
3693 listDelNode(list
,ln
);
3696 if (toremove
&& removed
== toremove
) break;
3700 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",removed
));
3705 /* This is the semantic of this command:
3706 * RPOPLPUSH srclist dstlist:
3707 * IF LLEN(srclist) > 0
3708 * element = RPOP srclist
3709 * LPUSH dstlist element
3716 * The idea is to be able to get an element from a list in a reliable way
3717 * since the element is not just returned but pushed against another list
3718 * as well. This command was originally proposed by Ezra Zygmuntowicz.
3720 static void rpoplpushcommand(redisClient
*c
) {
3723 sobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3725 addReply(c
,shared
.nullbulk
);
3727 if (sobj
->type
!= REDIS_LIST
) {
3728 addReply(c
,shared
.wrongtypeerr
);
3730 list
*srclist
= sobj
->ptr
;
3731 listNode
*ln
= listLast(srclist
);
3734 addReply(c
,shared
.nullbulk
);
3736 robj
*dobj
= lookupKeyWrite(c
->db
,c
->argv
[2]);
3737 robj
*ele
= listNodeValue(ln
);
3742 /* Create the list if the key does not exist */
3743 dobj
= createListObject();
3744 dictAdd(c
->db
->dict
,c
->argv
[2],dobj
);
3745 incrRefCount(c
->argv
[2]);
3746 } else if (dobj
->type
!= REDIS_LIST
) {
3747 addReply(c
,shared
.wrongtypeerr
);
3750 /* Add the element to the target list */
3751 dstlist
= dobj
->ptr
;
3752 listAddNodeHead(dstlist
,ele
);
3755 /* Send the element to the client as reply as well */
3756 addReplyBulkLen(c
,ele
);
3758 addReply(c
,shared
.crlf
);
3760 /* Finally remove the element from the source list */
3761 listDelNode(srclist
,ln
);
3769 /* ==================================== Sets ================================ */
3771 static void saddCommand(redisClient
*c
) {
3774 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3776 set
= createSetObject();
3777 dictAdd(c
->db
->dict
,c
->argv
[1],set
);
3778 incrRefCount(c
->argv
[1]);
3780 if (set
->type
!= REDIS_SET
) {
3781 addReply(c
,shared
.wrongtypeerr
);
3785 if (dictAdd(set
->ptr
,c
->argv
[2],NULL
) == DICT_OK
) {
3786 incrRefCount(c
->argv
[2]);
3788 addReply(c
,shared
.cone
);
3790 addReply(c
,shared
.czero
);
3794 static void sremCommand(redisClient
*c
) {
3797 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3799 addReply(c
,shared
.czero
);
3801 if (set
->type
!= REDIS_SET
) {
3802 addReply(c
,shared
.wrongtypeerr
);
3805 if (dictDelete(set
->ptr
,c
->argv
[2]) == DICT_OK
) {
3807 if (htNeedsResize(set
->ptr
)) dictResize(set
->ptr
);
3808 addReply(c
,shared
.cone
);
3810 addReply(c
,shared
.czero
);
3815 static void smoveCommand(redisClient
*c
) {
3816 robj
*srcset
, *dstset
;
3818 srcset
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3819 dstset
= lookupKeyWrite(c
->db
,c
->argv
[2]);
3821 /* If the source key does not exist return 0, if it's of the wrong type
3823 if (srcset
== NULL
|| srcset
->type
!= REDIS_SET
) {
3824 addReply(c
, srcset
? shared
.wrongtypeerr
: shared
.czero
);
3827 /* Error if the destination key is not a set as well */
3828 if (dstset
&& dstset
->type
!= REDIS_SET
) {
3829 addReply(c
,shared
.wrongtypeerr
);
3832 /* Remove the element from the source set */
3833 if (dictDelete(srcset
->ptr
,c
->argv
[3]) == DICT_ERR
) {
3834 /* Key not found in the src set! return zero */
3835 addReply(c
,shared
.czero
);
3839 /* Add the element to the destination set */
3841 dstset
= createSetObject();
3842 dictAdd(c
->db
->dict
,c
->argv
[2],dstset
);
3843 incrRefCount(c
->argv
[2]);
3845 if (dictAdd(dstset
->ptr
,c
->argv
[3],NULL
) == DICT_OK
)
3846 incrRefCount(c
->argv
[3]);
3847 addReply(c
,shared
.cone
);
3850 static void sismemberCommand(redisClient
*c
) {
3853 set
= lookupKeyRead(c
->db
,c
->argv
[1]);
3855 addReply(c
,shared
.czero
);
3857 if (set
->type
!= REDIS_SET
) {
3858 addReply(c
,shared
.wrongtypeerr
);
3861 if (dictFind(set
->ptr
,c
->argv
[2]))
3862 addReply(c
,shared
.cone
);
3864 addReply(c
,shared
.czero
);
3868 static void scardCommand(redisClient
*c
) {
3872 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3874 addReply(c
,shared
.czero
);
3877 if (o
->type
!= REDIS_SET
) {
3878 addReply(c
,shared
.wrongtypeerr
);
3881 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",
3887 static void spopCommand(redisClient
*c
) {
3891 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3893 addReply(c
,shared
.nullbulk
);
3895 if (set
->type
!= REDIS_SET
) {
3896 addReply(c
,shared
.wrongtypeerr
);
3899 de
= dictGetRandomKey(set
->ptr
);
3901 addReply(c
,shared
.nullbulk
);
3903 robj
*ele
= dictGetEntryKey(de
);
3905 addReplyBulkLen(c
,ele
);
3907 addReply(c
,shared
.crlf
);
3908 dictDelete(set
->ptr
,ele
);
3909 if (htNeedsResize(set
->ptr
)) dictResize(set
->ptr
);
3915 static void srandmemberCommand(redisClient
*c
) {
3919 set
= lookupKeyRead(c
->db
,c
->argv
[1]);
3921 addReply(c
,shared
.nullbulk
);
3923 if (set
->type
!= REDIS_SET
) {
3924 addReply(c
,shared
.wrongtypeerr
);
3927 de
= dictGetRandomKey(set
->ptr
);
3929 addReply(c
,shared
.nullbulk
);
3931 robj
*ele
= dictGetEntryKey(de
);
3933 addReplyBulkLen(c
,ele
);
3935 addReply(c
,shared
.crlf
);
3940 static int qsortCompareSetsByCardinality(const void *s1
, const void *s2
) {
3941 dict
**d1
= (void*) s1
, **d2
= (void*) s2
;
3943 return dictSize(*d1
)-dictSize(*d2
);
3946 static void sinterGenericCommand(redisClient
*c
, robj
**setskeys
, unsigned long setsnum
, robj
*dstkey
) {
3947 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
3950 robj
*lenobj
= NULL
, *dstset
= NULL
;
3951 unsigned long j
, cardinality
= 0;
3953 for (j
= 0; j
< setsnum
; j
++) {
3957 lookupKeyWrite(c
->db
,setskeys
[j
]) :
3958 lookupKeyRead(c
->db
,setskeys
[j
]);
3962 deleteKey(c
->db
,dstkey
);
3963 addReply(c
,shared
.ok
);
3965 addReply(c
,shared
.nullmultibulk
);
3969 if (setobj
->type
!= REDIS_SET
) {
3971 addReply(c
,shared
.wrongtypeerr
);
3974 dv
[j
] = setobj
->ptr
;
3976 /* Sort sets from the smallest to largest, this will improve our
3977 * algorithm's performace */
3978 qsort(dv
,setsnum
,sizeof(dict
*),qsortCompareSetsByCardinality
);
3980 /* The first thing we should output is the total number of elements...
3981 * since this is a multi-bulk write, but at this stage we don't know
3982 * the intersection set size, so we use a trick, append an empty object
3983 * to the output list and save the pointer to later modify it with the
3986 lenobj
= createObject(REDIS_STRING
,NULL
);
3988 decrRefCount(lenobj
);
3990 /* If we have a target key where to store the resulting set
3991 * create this key with an empty set inside */
3992 dstset
= createSetObject();
3995 /* Iterate all the elements of the first (smallest) set, and test
3996 * the element against all the other sets, if at least one set does
3997 * not include the element it is discarded */
3998 di
= dictGetIterator(dv
[0]);
4000 while((de
= dictNext(di
)) != NULL
) {
4003 for (j
= 1; j
< setsnum
; j
++)
4004 if (dictFind(dv
[j
],dictGetEntryKey(de
)) == NULL
) break;
4006 continue; /* at least one set does not contain the member */
4007 ele
= dictGetEntryKey(de
);
4009 addReplyBulkLen(c
,ele
);
4011 addReply(c
,shared
.crlf
);
4014 dictAdd(dstset
->ptr
,ele
,NULL
);
4018 dictReleaseIterator(di
);
4021 /* Store the resulting set into the target */
4022 deleteKey(c
->db
,dstkey
);
4023 dictAdd(c
->db
->dict
,dstkey
,dstset
);
4024 incrRefCount(dstkey
);
4028 lenobj
->ptr
= sdscatprintf(sdsempty(),"*%lu\r\n",cardinality
);
4030 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",
4031 dictSize((dict
*)dstset
->ptr
)));
4037 static void sinterCommand(redisClient
*c
) {
4038 sinterGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
);
4041 static void sinterstoreCommand(redisClient
*c
) {
4042 sinterGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1]);
4045 #define REDIS_OP_UNION 0
4046 #define REDIS_OP_DIFF 1
4048 static void sunionDiffGenericCommand(redisClient
*c
, robj
**setskeys
, int setsnum
, robj
*dstkey
, int op
) {
4049 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
4052 robj
*dstset
= NULL
;
4053 int j
, cardinality
= 0;
4055 for (j
= 0; j
< setsnum
; j
++) {
4059 lookupKeyWrite(c
->db
,setskeys
[j
]) :
4060 lookupKeyRead(c
->db
,setskeys
[j
]);
4065 if (setobj
->type
!= REDIS_SET
) {
4067 addReply(c
,shared
.wrongtypeerr
);
4070 dv
[j
] = setobj
->ptr
;
4073 /* We need a temp set object to store our union. If the dstkey
4074 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
4075 * this set object will be the resulting object to set into the target key*/
4076 dstset
= createSetObject();
4078 /* Iterate all the elements of all the sets, add every element a single
4079 * time to the result set */
4080 for (j
= 0; j
< setsnum
; j
++) {
4081 if (op
== REDIS_OP_DIFF
&& j
== 0 && !dv
[j
]) break; /* result set is empty */
4082 if (!dv
[j
]) continue; /* non existing keys are like empty sets */
4084 di
= dictGetIterator(dv
[j
]);
4086 while((de
= dictNext(di
)) != NULL
) {
4089 /* dictAdd will not add the same element multiple times */
4090 ele
= dictGetEntryKey(de
);
4091 if (op
== REDIS_OP_UNION
|| j
== 0) {
4092 if (dictAdd(dstset
->ptr
,ele
,NULL
) == DICT_OK
) {
4096 } else if (op
== REDIS_OP_DIFF
) {
4097 if (dictDelete(dstset
->ptr
,ele
) == DICT_OK
) {
4102 dictReleaseIterator(di
);
4104 if (op
== REDIS_OP_DIFF
&& cardinality
== 0) break; /* result set is empty */
4107 /* Output the content of the resulting set, if not in STORE mode */
4109 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",cardinality
));
4110 di
= dictGetIterator(dstset
->ptr
);
4111 while((de
= dictNext(di
)) != NULL
) {
4114 ele
= dictGetEntryKey(de
);
4115 addReplyBulkLen(c
,ele
);
4117 addReply(c
,shared
.crlf
);
4119 dictReleaseIterator(di
);
4121 /* If we have a target key where to store the resulting set
4122 * create this key with the result set inside */
4123 deleteKey(c
->db
,dstkey
);
4124 dictAdd(c
->db
->dict
,dstkey
,dstset
);
4125 incrRefCount(dstkey
);
4130 decrRefCount(dstset
);
4132 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",
4133 dictSize((dict
*)dstset
->ptr
)));
4139 static void sunionCommand(redisClient
*c
) {
4140 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_UNION
);
4143 static void sunionstoreCommand(redisClient
*c
) {
4144 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_UNION
);
4147 static void sdiffCommand(redisClient
*c
) {
4148 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_DIFF
);
4151 static void sdiffstoreCommand(redisClient
*c
) {
4152 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_DIFF
);
4155 /* ==================================== ZSets =============================== */
4157 /* ZSETs are ordered sets using two data structures to hold the same elements
4158 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
4161 * The elements are added to an hash table mapping Redis objects to scores.
4162 * At the same time the elements are added to a skip list mapping scores
4163 * to Redis objects (so objects are sorted by scores in this "view"). */
4165 /* This skiplist implementation is almost a C translation of the original
4166 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
4167 * Alternative to Balanced Trees", modified in three ways:
4168 * a) this implementation allows for repeated values.
4169 * b) the comparison is not just by key (our 'score') but by satellite data.
4170 * c) there is a back pointer, so it's a doubly linked list with the back
4171 * pointers being only at "level 1". This allows to traverse the list
4172 * from tail to head, useful for ZREVRANGE. */
4174 static zskiplistNode
*zslCreateNode(int level
, double score
, robj
*obj
) {
4175 zskiplistNode
*zn
= zmalloc(sizeof(*zn
));
4177 zn
->forward
= zmalloc(sizeof(zskiplistNode
*) * level
);
4183 static zskiplist
*zslCreate(void) {
4187 zsl
= zmalloc(sizeof(*zsl
));
4190 zsl
->header
= zslCreateNode(ZSKIPLIST_MAXLEVEL
,0,NULL
);
4191 for (j
= 0; j
< ZSKIPLIST_MAXLEVEL
; j
++)
4192 zsl
->header
->forward
[j
] = NULL
;
4193 zsl
->header
->backward
= NULL
;
4198 static void zslFreeNode(zskiplistNode
*node
) {
4199 decrRefCount(node
->obj
);
4200 zfree(node
->forward
);
4204 static void zslFree(zskiplist
*zsl
) {
4205 zskiplistNode
*node
= zsl
->header
->forward
[0], *next
;
4207 zfree(zsl
->header
->forward
);
4210 next
= node
->forward
[0];
4217 static int zslRandomLevel(void) {
4219 while ((random()&0xFFFF) < (ZSKIPLIST_P
* 0xFFFF))
4224 static void zslInsert(zskiplist
*zsl
, double score
, robj
*obj
) {
4225 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
4229 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4230 while (x
->forward
[i
] &&
4231 (x
->forward
[i
]->score
< score
||
4232 (x
->forward
[i
]->score
== score
&&
4233 compareStringObjects(x
->forward
[i
]->obj
,obj
) < 0)))
4237 /* we assume the key is not already inside, since we allow duplicated
4238 * scores, and the re-insertion of score and redis object should never
4239 * happpen since the caller of zslInsert() should test in the hash table
4240 * if the element is already inside or not. */
4241 level
= zslRandomLevel();
4242 if (level
> zsl
->level
) {
4243 for (i
= zsl
->level
; i
< level
; i
++)
4244 update
[i
] = zsl
->header
;
4247 x
= zslCreateNode(level
,score
,obj
);
4248 for (i
= 0; i
< level
; i
++) {
4249 x
->forward
[i
] = update
[i
]->forward
[i
];
4250 update
[i
]->forward
[i
] = x
;
4252 x
->backward
= (update
[0] == zsl
->header
) ? NULL
: update
[0];
4254 x
->forward
[0]->backward
= x
;
4260 /* Delete an element with matching score/object from the skiplist. */
4261 static int zslDelete(zskiplist
*zsl
, double score
, robj
*obj
) {
4262 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
4266 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4267 while (x
->forward
[i
] &&
4268 (x
->forward
[i
]->score
< score
||
4269 (x
->forward
[i
]->score
== score
&&
4270 compareStringObjects(x
->forward
[i
]->obj
,obj
) < 0)))
4274 /* We may have multiple elements with the same score, what we need
4275 * is to find the element with both the right score and object. */
4277 if (x
&& score
== x
->score
&& compareStringObjects(x
->obj
,obj
) == 0) {
4278 for (i
= 0; i
< zsl
->level
; i
++) {
4279 if (update
[i
]->forward
[i
] != x
) break;
4280 update
[i
]->forward
[i
] = x
->forward
[i
];
4282 if (x
->forward
[0]) {
4283 x
->forward
[0]->backward
= (x
->backward
== zsl
->header
) ?
4286 zsl
->tail
= x
->backward
;
4289 while(zsl
->level
> 1 && zsl
->header
->forward
[zsl
->level
-1] == NULL
)
4294 return 0; /* not found */
4296 return 0; /* not found */
4299 /* Delete all the elements with score between min and max from the skiplist.
4300 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
4301 * Note that this function takes the reference to the hash table view of the
4302 * sorted set, in order to remove the elements from the hash table too. */
4303 static unsigned long zslDeleteRange(zskiplist
*zsl
, double min
, double max
, dict
*dict
) {
4304 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
4305 unsigned long removed
= 0;
4309 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4310 while (x
->forward
[i
] && x
->forward
[i
]->score
< min
)
4314 /* We may have multiple elements with the same score, what we need
4315 * is to find the element with both the right score and object. */
4317 while (x
&& x
->score
<= max
) {
4318 zskiplistNode
*next
;
4320 for (i
= 0; i
< zsl
->level
; i
++) {
4321 if (update
[i
]->forward
[i
] != x
) break;
4322 update
[i
]->forward
[i
] = x
->forward
[i
];
4324 if (x
->forward
[0]) {
4325 x
->forward
[0]->backward
= (x
->backward
== zsl
->header
) ?
4328 zsl
->tail
= x
->backward
;
4330 next
= x
->forward
[0];
4331 dictDelete(dict
,x
->obj
);
4333 while(zsl
->level
> 1 && zsl
->header
->forward
[zsl
->level
-1] == NULL
)
4339 return removed
; /* not found */
4342 /* Find the first node having a score equal or greater than the specified one.
4343 * Returns NULL if there is no match. */
4344 static zskiplistNode
*zslFirstWithScore(zskiplist
*zsl
, double score
) {
4349 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4350 while (x
->forward
[i
] && x
->forward
[i
]->score
< score
)
4353 /* We may have multiple elements with the same score, what we need
4354 * is to find the element with both the right score and object. */
4355 return x
->forward
[0];
4358 /* The actual Z-commands implementations */
4360 /* This generic command implements both ZADD and ZINCRBY.
4361 * scoreval is the score if the operation is a ZADD (doincrement == 0) or
4362 * the increment if the operation is a ZINCRBY (doincrement == 1). */
4363 static void zaddGenericCommand(redisClient
*c
, robj
*key
, robj
*ele
, double scoreval
, int doincrement
) {
4368 zsetobj
= lookupKeyWrite(c
->db
,key
);
4369 if (zsetobj
== NULL
) {
4370 zsetobj
= createZsetObject();
4371 dictAdd(c
->db
->dict
,key
,zsetobj
);
4374 if (zsetobj
->type
!= REDIS_ZSET
) {
4375 addReply(c
,shared
.wrongtypeerr
);
4381 /* Ok now since we implement both ZADD and ZINCRBY here the code
4382 * needs to handle the two different conditions. It's all about setting
4383 * '*score', that is, the new score to set, to the right value. */
4384 score
= zmalloc(sizeof(double));
4388 /* Read the old score. If the element was not present starts from 0 */
4389 de
= dictFind(zs
->dict
,ele
);
4391 double *oldscore
= dictGetEntryVal(de
);
4392 *score
= *oldscore
+ scoreval
;
4400 /* What follows is a simple remove and re-insert operation that is common
4401 * to both ZADD and ZINCRBY... */
4402 if (dictAdd(zs
->dict
,ele
,score
) == DICT_OK
) {
4403 /* case 1: New element */
4404 incrRefCount(ele
); /* added to hash */
4405 zslInsert(zs
->zsl
,*score
,ele
);
4406 incrRefCount(ele
); /* added to skiplist */
4409 addReplyDouble(c
,*score
);
4411 addReply(c
,shared
.cone
);
4416 /* case 2: Score update operation */
4417 de
= dictFind(zs
->dict
,ele
);
4418 redisAssert(de
!= NULL
);
4419 oldscore
= dictGetEntryVal(de
);
4420 if (*score
!= *oldscore
) {
4423 /* Remove and insert the element in the skip list with new score */
4424 deleted
= zslDelete(zs
->zsl
,*oldscore
,ele
);
4425 redisAssert(deleted
!= 0);
4426 zslInsert(zs
->zsl
,*score
,ele
);
4428 /* Update the score in the hash table */
4429 dictReplace(zs
->dict
,ele
,score
);
4435 addReplyDouble(c
,*score
);
4437 addReply(c
,shared
.czero
);
4441 static void zaddCommand(redisClient
*c
) {
4444 scoreval
= strtod(c
->argv
[2]->ptr
,NULL
);
4445 zaddGenericCommand(c
,c
->argv
[1],c
->argv
[3],scoreval
,0);
4448 static void zincrbyCommand(redisClient
*c
) {
4451 scoreval
= strtod(c
->argv
[2]->ptr
,NULL
);
4452 zaddGenericCommand(c
,c
->argv
[1],c
->argv
[3],scoreval
,1);
4455 static void zremCommand(redisClient
*c
) {
4459 zsetobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4460 if (zsetobj
== NULL
) {
4461 addReply(c
,shared
.czero
);
4467 if (zsetobj
->type
!= REDIS_ZSET
) {
4468 addReply(c
,shared
.wrongtypeerr
);
4472 de
= dictFind(zs
->dict
,c
->argv
[2]);
4474 addReply(c
,shared
.czero
);
4477 /* Delete from the skiplist */
4478 oldscore
= dictGetEntryVal(de
);
4479 deleted
= zslDelete(zs
->zsl
,*oldscore
,c
->argv
[2]);
4480 redisAssert(deleted
!= 0);
4482 /* Delete from the hash table */
4483 dictDelete(zs
->dict
,c
->argv
[2]);
4484 if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
);
4486 addReply(c
,shared
.cone
);
4490 static void zremrangebyscoreCommand(redisClient
*c
) {
4491 double min
= strtod(c
->argv
[2]->ptr
,NULL
);
4492 double max
= strtod(c
->argv
[3]->ptr
,NULL
);
4496 zsetobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4497 if (zsetobj
== NULL
) {
4498 addReply(c
,shared
.czero
);
4502 if (zsetobj
->type
!= REDIS_ZSET
) {
4503 addReply(c
,shared
.wrongtypeerr
);
4507 deleted
= zslDeleteRange(zs
->zsl
,min
,max
,zs
->dict
);
4508 if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
);
4509 server
.dirty
+= deleted
;
4510 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",deleted
));
4514 static void zrangeGenericCommand(redisClient
*c
, int reverse
) {
4516 int start
= atoi(c
->argv
[2]->ptr
);
4517 int end
= atoi(c
->argv
[3]->ptr
);
4519 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4521 addReply(c
,shared
.nullmultibulk
);
4523 if (o
->type
!= REDIS_ZSET
) {
4524 addReply(c
,shared
.wrongtypeerr
);
4526 zset
*zsetobj
= o
->ptr
;
4527 zskiplist
*zsl
= zsetobj
->zsl
;
4530 int llen
= zsl
->length
;
4534 /* convert negative indexes */
4535 if (start
< 0) start
= llen
+start
;
4536 if (end
< 0) end
= llen
+end
;
4537 if (start
< 0) start
= 0;
4538 if (end
< 0) end
= 0;
4540 /* indexes sanity checks */
4541 if (start
> end
|| start
>= llen
) {
4542 /* Out of range start or start > end result in empty list */
4543 addReply(c
,shared
.emptymultibulk
);
4546 if (end
>= llen
) end
= llen
-1;
4547 rangelen
= (end
-start
)+1;
4549 /* Return the result in form of a multi-bulk reply */
4555 ln
= zsl
->header
->forward
[0];
4557 ln
= ln
->forward
[0];
4560 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",rangelen
));
4561 for (j
= 0; j
< rangelen
; j
++) {
4563 addReplyBulkLen(c
,ele
);
4565 addReply(c
,shared
.crlf
);
4566 ln
= reverse
? ln
->backward
: ln
->forward
[0];
4572 static void zrangeCommand(redisClient
*c
) {
4573 zrangeGenericCommand(c
,0);
4576 static void zrevrangeCommand(redisClient
*c
) {
4577 zrangeGenericCommand(c
,1);
4580 static void zrangebyscoreCommand(redisClient
*c
) {
4582 double min
= strtod(c
->argv
[2]->ptr
,NULL
);
4583 double max
= strtod(c
->argv
[3]->ptr
,NULL
);
4584 int offset
= 0, limit
= -1;
4586 if (c
->argc
!= 4 && c
->argc
!= 7) {
4588 sdsnew("-ERR wrong number of arguments for ZRANGEBYSCORE\r\n"));
4590 } else if (c
->argc
== 7 && strcasecmp(c
->argv
[4]->ptr
,"limit")) {
4591 addReply(c
,shared
.syntaxerr
);
4593 } else if (c
->argc
== 7) {
4594 offset
= atoi(c
->argv
[5]->ptr
);
4595 limit
= atoi(c
->argv
[6]->ptr
);
4596 if (offset
< 0) offset
= 0;
4599 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4601 addReply(c
,shared
.nullmultibulk
);
4603 if (o
->type
!= REDIS_ZSET
) {
4604 addReply(c
,shared
.wrongtypeerr
);
4606 zset
*zsetobj
= o
->ptr
;
4607 zskiplist
*zsl
= zsetobj
->zsl
;
4610 unsigned int rangelen
= 0;
4612 /* Get the first node with the score >= min */
4613 ln
= zslFirstWithScore(zsl
,min
);
4615 /* No element matching the speciifed interval */
4616 addReply(c
,shared
.emptymultibulk
);
4620 /* We don't know in advance how many matching elements there
4621 * are in the list, so we push this object that will represent
4622 * the multi-bulk length in the output buffer, and will "fix"
4624 lenobj
= createObject(REDIS_STRING
,NULL
);
4626 decrRefCount(lenobj
);
4628 while(ln
&& ln
->score
<= max
) {
4631 ln
= ln
->forward
[0];
4634 if (limit
== 0) break;
4636 addReplyBulkLen(c
,ele
);
4638 addReply(c
,shared
.crlf
);
4639 ln
= ln
->forward
[0];
4641 if (limit
> 0) limit
--;
4643 lenobj
->ptr
= sdscatprintf(sdsempty(),"*%d\r\n",rangelen
);
4648 static void zcardCommand(redisClient
*c
) {
4652 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4654 addReply(c
,shared
.czero
);
4657 if (o
->type
!= REDIS_ZSET
) {
4658 addReply(c
,shared
.wrongtypeerr
);
4661 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",zs
->zsl
->length
));
4666 static void zscoreCommand(redisClient
*c
) {
4670 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4672 addReply(c
,shared
.nullbulk
);
4675 if (o
->type
!= REDIS_ZSET
) {
4676 addReply(c
,shared
.wrongtypeerr
);
4681 de
= dictFind(zs
->dict
,c
->argv
[2]);
4683 addReply(c
,shared
.nullbulk
);
4685 double *score
= dictGetEntryVal(de
);
4687 addReplyDouble(c
,*score
);
4693 /* ========================= Non type-specific commands ==================== */
4695 static void flushdbCommand(redisClient
*c
) {
4696 server
.dirty
+= dictSize(c
->db
->dict
);
4697 dictEmpty(c
->db
->dict
);
4698 dictEmpty(c
->db
->expires
);
4699 addReply(c
,shared
.ok
);
4702 static void flushallCommand(redisClient
*c
) {
4703 server
.dirty
+= emptyDb();
4704 addReply(c
,shared
.ok
);
4705 rdbSave(server
.dbfilename
);
4709 static redisSortOperation
*createSortOperation(int type
, robj
*pattern
) {
4710 redisSortOperation
*so
= zmalloc(sizeof(*so
));
4712 so
->pattern
= pattern
;
4716 /* Return the value associated to the key with a name obtained
4717 * substituting the first occurence of '*' in 'pattern' with 'subst' */
4718 static robj
*lookupKeyByPattern(redisDb
*db
, robj
*pattern
, robj
*subst
) {
4722 int prefixlen
, sublen
, postfixlen
;
4723 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
4727 char buf
[REDIS_SORTKEY_MAX
+1];
4730 /* If the pattern is "#" return the substitution object itself in order
4731 * to implement the "SORT ... GET #" feature. */
4732 spat
= pattern
->ptr
;
4733 if (spat
[0] == '#' && spat
[1] == '\0') {
4737 /* The substitution object may be specially encoded. If so we create
4738 * a decoded object on the fly. Otherwise getDecodedObject will just
4739 * increment the ref count, that we'll decrement later. */
4740 subst
= getDecodedObject(subst
);
4743 if (sdslen(spat
)+sdslen(ssub
)-1 > REDIS_SORTKEY_MAX
) return NULL
;
4744 p
= strchr(spat
,'*');
4746 decrRefCount(subst
);
4751 sublen
= sdslen(ssub
);
4752 postfixlen
= sdslen(spat
)-(prefixlen
+1);
4753 memcpy(keyname
.buf
,spat
,prefixlen
);
4754 memcpy(keyname
.buf
+prefixlen
,ssub
,sublen
);
4755 memcpy(keyname
.buf
+prefixlen
+sublen
,p
+1,postfixlen
);
4756 keyname
.buf
[prefixlen
+sublen
+postfixlen
] = '\0';
4757 keyname
.len
= prefixlen
+sublen
+postfixlen
;
4759 initStaticStringObject(keyobj
,((char*)&keyname
)+(sizeof(long)*2))
4760 decrRefCount(subst
);
4762 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
4763 return lookupKeyRead(db
,&keyobj
);
4766 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
4767 * the additional parameter is not standard but a BSD-specific we have to
4768 * pass sorting parameters via the global 'server' structure */
4769 static int sortCompare(const void *s1
, const void *s2
) {
4770 const redisSortObject
*so1
= s1
, *so2
= s2
;
4773 if (!server
.sort_alpha
) {
4774 /* Numeric sorting. Here it's trivial as we precomputed scores */
4775 if (so1
->u
.score
> so2
->u
.score
) {
4777 } else if (so1
->u
.score
< so2
->u
.score
) {
4783 /* Alphanumeric sorting */
4784 if (server
.sort_bypattern
) {
4785 if (!so1
->u
.cmpobj
|| !so2
->u
.cmpobj
) {
4786 /* At least one compare object is NULL */
4787 if (so1
->u
.cmpobj
== so2
->u
.cmpobj
)
4789 else if (so1
->u
.cmpobj
== NULL
)
4794 /* We have both the objects, use strcoll */
4795 cmp
= strcoll(so1
->u
.cmpobj
->ptr
,so2
->u
.cmpobj
->ptr
);
4798 /* Compare elements directly */
4801 dec1
= getDecodedObject(so1
->obj
);
4802 dec2
= getDecodedObject(so2
->obj
);
4803 cmp
= strcoll(dec1
->ptr
,dec2
->ptr
);
4808 return server
.sort_desc
? -cmp
: cmp
;
4811 /* The SORT command is the most complex command in Redis. Warning: this code
4812 * is optimized for speed and a bit less for readability */
4813 static void sortCommand(redisClient
*c
) {
4816 int desc
= 0, alpha
= 0;
4817 int limit_start
= 0, limit_count
= -1, start
, end
;
4818 int j
, dontsort
= 0, vectorlen
;
4819 int getop
= 0; /* GET operation counter */
4820 robj
*sortval
, *sortby
= NULL
, *storekey
= NULL
;
4821 redisSortObject
*vector
; /* Resulting vector to sort */
4823 /* Lookup the key to sort. It must be of the right types */
4824 sortval
= lookupKeyRead(c
->db
,c
->argv
[1]);
4825 if (sortval
== NULL
) {
4826 addReply(c
,shared
.nokeyerr
);
4829 if (sortval
->type
!= REDIS_SET
&& sortval
->type
!= REDIS_LIST
&&
4830 sortval
->type
!= REDIS_ZSET
)
4832 addReply(c
,shared
.wrongtypeerr
);
4836 /* Create a list of operations to perform for every sorted element.
4837 * Operations can be GET/DEL/INCR/DECR */
4838 operations
= listCreate();
4839 listSetFreeMethod(operations
,zfree
);
4842 /* Now we need to protect sortval incrementing its count, in the future
4843 * SORT may have options able to overwrite/delete keys during the sorting
4844 * and the sorted key itself may get destroied */
4845 incrRefCount(sortval
);
4847 /* The SORT command has an SQL-alike syntax, parse it */
4848 while(j
< c
->argc
) {
4849 int leftargs
= c
->argc
-j
-1;
4850 if (!strcasecmp(c
->argv
[j
]->ptr
,"asc")) {
4852 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"desc")) {
4854 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"alpha")) {
4856 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"limit") && leftargs
>= 2) {
4857 limit_start
= atoi(c
->argv
[j
+1]->ptr
);
4858 limit_count
= atoi(c
->argv
[j
+2]->ptr
);
4860 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"store") && leftargs
>= 1) {
4861 storekey
= c
->argv
[j
+1];
4863 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"by") && leftargs
>= 1) {
4864 sortby
= c
->argv
[j
+1];
4865 /* If the BY pattern does not contain '*', i.e. it is constant,
4866 * we don't need to sort nor to lookup the weight keys. */
4867 if (strchr(c
->argv
[j
+1]->ptr
,'*') == NULL
) dontsort
= 1;
4869 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"get") && leftargs
>= 1) {
4870 listAddNodeTail(operations
,createSortOperation(
4871 REDIS_SORT_GET
,c
->argv
[j
+1]));
4875 decrRefCount(sortval
);
4876 listRelease(operations
);
4877 addReply(c
,shared
.syntaxerr
);
4883 /* Load the sorting vector with all the objects to sort */
4884 switch(sortval
->type
) {
4885 case REDIS_LIST
: vectorlen
= listLength((list
*)sortval
->ptr
); break;
4886 case REDIS_SET
: vectorlen
= dictSize((dict
*)sortval
->ptr
); break;
4887 case REDIS_ZSET
: vectorlen
= dictSize(((zset
*)sortval
->ptr
)->dict
); break;
4888 default: vectorlen
= 0; redisAssert(0); /* Avoid GCC warning */
4890 vector
= zmalloc(sizeof(redisSortObject
)*vectorlen
);
4893 if (sortval
->type
== REDIS_LIST
) {
4894 list
*list
= sortval
->ptr
;
4898 while((ln
= listYield(list
))) {
4899 robj
*ele
= ln
->value
;
4900 vector
[j
].obj
= ele
;
4901 vector
[j
].u
.score
= 0;
4902 vector
[j
].u
.cmpobj
= NULL
;
4910 if (sortval
->type
== REDIS_SET
) {
4913 zset
*zs
= sortval
->ptr
;
4917 di
= dictGetIterator(set
);
4918 while((setele
= dictNext(di
)) != NULL
) {
4919 vector
[j
].obj
= dictGetEntryKey(setele
);
4920 vector
[j
].u
.score
= 0;
4921 vector
[j
].u
.cmpobj
= NULL
;
4924 dictReleaseIterator(di
);
4926 redisAssert(j
== vectorlen
);
4928 /* Now it's time to load the right scores in the sorting vector */
4929 if (dontsort
== 0) {
4930 for (j
= 0; j
< vectorlen
; j
++) {
4934 byval
= lookupKeyByPattern(c
->db
,sortby
,vector
[j
].obj
);
4935 if (!byval
|| byval
->type
!= REDIS_STRING
) continue;
4937 vector
[j
].u
.cmpobj
= getDecodedObject(byval
);
4939 if (byval
->encoding
== REDIS_ENCODING_RAW
) {
4940 vector
[j
].u
.score
= strtod(byval
->ptr
,NULL
);
4942 /* Don't need to decode the object if it's
4943 * integer-encoded (the only encoding supported) so
4944 * far. We can just cast it */
4945 if (byval
->encoding
== REDIS_ENCODING_INT
) {
4946 vector
[j
].u
.score
= (long)byval
->ptr
;
4948 redisAssert(1 != 1);
4953 if (vector
[j
].obj
->encoding
== REDIS_ENCODING_RAW
)
4954 vector
[j
].u
.score
= strtod(vector
[j
].obj
->ptr
,NULL
);
4956 if (vector
[j
].obj
->encoding
== REDIS_ENCODING_INT
)
4957 vector
[j
].u
.score
= (long) vector
[j
].obj
->ptr
;
4959 redisAssert(1 != 1);
4966 /* We are ready to sort the vector... perform a bit of sanity check
4967 * on the LIMIT option too. We'll use a partial version of quicksort. */
4968 start
= (limit_start
< 0) ? 0 : limit_start
;
4969 end
= (limit_count
< 0) ? vectorlen
-1 : start
+limit_count
-1;
4970 if (start
>= vectorlen
) {
4971 start
= vectorlen
-1;
4974 if (end
>= vectorlen
) end
= vectorlen
-1;
4976 if (dontsort
== 0) {
4977 server
.sort_desc
= desc
;
4978 server
.sort_alpha
= alpha
;
4979 server
.sort_bypattern
= sortby
? 1 : 0;
4980 if (sortby
&& (start
!= 0 || end
!= vectorlen
-1))
4981 pqsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
, start
,end
);
4983 qsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
);
4986 /* Send command output to the output buffer, performing the specified
4987 * GET/DEL/INCR/DECR operations if any. */
4988 outputlen
= getop
? getop
*(end
-start
+1) : end
-start
+1;
4989 if (storekey
== NULL
) {
4990 /* STORE option not specified, sent the sorting result to client */
4991 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",outputlen
));
4992 for (j
= start
; j
<= end
; j
++) {
4995 addReplyBulkLen(c
,vector
[j
].obj
);
4996 addReply(c
,vector
[j
].obj
);
4997 addReply(c
,shared
.crlf
);
4999 listRewind(operations
);
5000 while((ln
= listYield(operations
))) {
5001 redisSortOperation
*sop
= ln
->value
;
5002 robj
*val
= lookupKeyByPattern(c
->db
,sop
->pattern
,
5005 if (sop
->type
== REDIS_SORT_GET
) {
5006 if (!val
|| val
->type
!= REDIS_STRING
) {
5007 addReply(c
,shared
.nullbulk
);
5009 addReplyBulkLen(c
,val
);
5011 addReply(c
,shared
.crlf
);
5014 redisAssert(sop
->type
== REDIS_SORT_GET
); /* always fails */
5019 robj
*listObject
= createListObject();
5020 list
*listPtr
= (list
*) listObject
->ptr
;
5022 /* STORE option specified, set the sorting result as a List object */
5023 for (j
= start
; j
<= end
; j
++) {
5026 listAddNodeTail(listPtr
,vector
[j
].obj
);
5027 incrRefCount(vector
[j
].obj
);
5029 listRewind(operations
);
5030 while((ln
= listYield(operations
))) {
5031 redisSortOperation
*sop
= ln
->value
;
5032 robj
*val
= lookupKeyByPattern(c
->db
,sop
->pattern
,
5035 if (sop
->type
== REDIS_SORT_GET
) {
5036 if (!val
|| val
->type
!= REDIS_STRING
) {
5037 listAddNodeTail(listPtr
,createStringObject("",0));
5039 listAddNodeTail(listPtr
,val
);
5043 redisAssert(sop
->type
== REDIS_SORT_GET
); /* always fails */
5047 if (dictReplace(c
->db
->dict
,storekey
,listObject
)) {
5048 incrRefCount(storekey
);
5050 /* Note: we add 1 because the DB is dirty anyway since even if the
5051 * SORT result is empty a new key is set and maybe the old content
5053 server
.dirty
+= 1+outputlen
;
5054 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",outputlen
));
5058 decrRefCount(sortval
);
5059 listRelease(operations
);
5060 for (j
= 0; j
< vectorlen
; j
++) {
5061 if (sortby
&& alpha
&& vector
[j
].u
.cmpobj
)
5062 decrRefCount(vector
[j
].u
.cmpobj
);
5067 /* Create the string returned by the INFO command. This is decoupled
5068 * by the INFO command itself as we need to report the same information
5069 * on memory corruption problems. */
5070 static sds
genRedisInfoString(void) {
5072 time_t uptime
= time(NULL
)-server
.stat_starttime
;
5075 info
= sdscatprintf(sdsempty(),
5076 "redis_version:%s\r\n"
5078 "multiplexing_api:%s\r\n"
5079 "uptime_in_seconds:%ld\r\n"
5080 "uptime_in_days:%ld\r\n"
5081 "connected_clients:%d\r\n"
5082 "connected_slaves:%d\r\n"
5083 "used_memory:%zu\r\n"
5084 "changes_since_last_save:%lld\r\n"
5085 "bgsave_in_progress:%d\r\n"
5086 "last_save_time:%ld\r\n"
5087 "total_connections_received:%lld\r\n"
5088 "total_commands_processed:%lld\r\n"
5091 (sizeof(long) == 8) ? "64" : "32",
5095 listLength(server
.clients
)-listLength(server
.slaves
),
5096 listLength(server
.slaves
),
5099 server
.bgsavechildpid
!= -1,
5101 server
.stat_numconnections
,
5102 server
.stat_numcommands
,
5103 server
.masterhost
== NULL
? "master" : "slave"
5105 if (server
.masterhost
) {
5106 info
= sdscatprintf(info
,
5107 "master_host:%s\r\n"
5108 "master_port:%d\r\n"
5109 "master_link_status:%s\r\n"
5110 "master_last_io_seconds_ago:%d\r\n"
5113 (server
.replstate
== REDIS_REPL_CONNECTED
) ?
5115 server
.master
? ((int)(time(NULL
)-server
.master
->lastinteraction
)) : -1
5118 for (j
= 0; j
< server
.dbnum
; j
++) {
5119 long long keys
, vkeys
;
5121 keys
= dictSize(server
.db
[j
].dict
);
5122 vkeys
= dictSize(server
.db
[j
].expires
);
5123 if (keys
|| vkeys
) {
5124 info
= sdscatprintf(info
, "db%d:keys=%lld,expires=%lld\r\n",
5131 static void infoCommand(redisClient
*c
) {
5132 sds info
= genRedisInfoString();
5133 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n",
5134 (unsigned long)sdslen(info
)));
5135 addReplySds(c
,info
);
5136 addReply(c
,shared
.crlf
);
5139 static void monitorCommand(redisClient
*c
) {
5140 /* ignore MONITOR if aleady slave or in monitor mode */
5141 if (c
->flags
& REDIS_SLAVE
) return;
5143 c
->flags
|= (REDIS_SLAVE
|REDIS_MONITOR
);
5145 listAddNodeTail(server
.monitors
,c
);
5146 addReply(c
,shared
.ok
);
5149 /* ================================= Expire ================================= */
5150 static int removeExpire(redisDb
*db
, robj
*key
) {
5151 if (dictDelete(db
->expires
,key
) == DICT_OK
) {
5158 static int setExpire(redisDb
*db
, robj
*key
, time_t when
) {
5159 if (dictAdd(db
->expires
,key
,(void*)when
) == DICT_ERR
) {
5167 /* Return the expire time of the specified key, or -1 if no expire
5168 * is associated with this key (i.e. the key is non volatile) */
5169 static time_t getExpire(redisDb
*db
, robj
*key
) {
5172 /* No expire? return ASAP */
5173 if (dictSize(db
->expires
) == 0 ||
5174 (de
= dictFind(db
->expires
,key
)) == NULL
) return -1;
5176 return (time_t) dictGetEntryVal(de
);
5179 static int expireIfNeeded(redisDb
*db
, robj
*key
) {
5183 /* No expire? return ASAP */
5184 if (dictSize(db
->expires
) == 0 ||
5185 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
5187 /* Lookup the expire */
5188 when
= (time_t) dictGetEntryVal(de
);
5189 if (time(NULL
) <= when
) return 0;
5191 /* Delete the key */
5192 dictDelete(db
->expires
,key
);
5193 return dictDelete(db
->dict
,key
) == DICT_OK
;
5196 static int deleteIfVolatile(redisDb
*db
, robj
*key
) {
5199 /* No expire? return ASAP */
5200 if (dictSize(db
->expires
) == 0 ||
5201 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
5203 /* Delete the key */
5205 dictDelete(db
->expires
,key
);
5206 return dictDelete(db
->dict
,key
) == DICT_OK
;
5209 static void expireGenericCommand(redisClient
*c
, robj
*key
, time_t seconds
) {
5212 de
= dictFind(c
->db
->dict
,key
);
5214 addReply(c
,shared
.czero
);
5218 if (deleteKey(c
->db
,key
)) server
.dirty
++;
5219 addReply(c
, shared
.cone
);
5222 time_t when
= time(NULL
)+seconds
;
5223 if (setExpire(c
->db
,key
,when
)) {
5224 addReply(c
,shared
.cone
);
5227 addReply(c
,shared
.czero
);
5233 static void expireCommand(redisClient
*c
) {
5234 expireGenericCommand(c
,c
->argv
[1],strtol(c
->argv
[2]->ptr
,NULL
,10));
5237 static void expireatCommand(redisClient
*c
) {
5238 expireGenericCommand(c
,c
->argv
[1],strtol(c
->argv
[2]->ptr
,NULL
,10)-time(NULL
));
5241 static void ttlCommand(redisClient
*c
) {
5245 expire
= getExpire(c
->db
,c
->argv
[1]);
5247 ttl
= (int) (expire
-time(NULL
));
5248 if (ttl
< 0) ttl
= -1;
5250 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",ttl
));
5253 /* =============================== Replication ============================= */
5255 static int syncWrite(int fd
, char *ptr
, ssize_t size
, int timeout
) {
5256 ssize_t nwritten
, ret
= size
;
5257 time_t start
= time(NULL
);
5261 if (aeWait(fd
,AE_WRITABLE
,1000) & AE_WRITABLE
) {
5262 nwritten
= write(fd
,ptr
,size
);
5263 if (nwritten
== -1) return -1;
5267 if ((time(NULL
)-start
) > timeout
) {
5275 static int syncRead(int fd
, char *ptr
, ssize_t size
, int timeout
) {
5276 ssize_t nread
, totread
= 0;
5277 time_t start
= time(NULL
);
5281 if (aeWait(fd
,AE_READABLE
,1000) & AE_READABLE
) {
5282 nread
= read(fd
,ptr
,size
);
5283 if (nread
== -1) return -1;
5288 if ((time(NULL
)-start
) > timeout
) {
5296 static int syncReadLine(int fd
, char *ptr
, ssize_t size
, int timeout
) {
5303 if (syncRead(fd
,&c
,1,timeout
) == -1) return -1;
5306 if (nread
&& *(ptr
-1) == '\r') *(ptr
-1) = '\0';
5317 static void syncCommand(redisClient
*c
) {
5318 /* ignore SYNC if aleady slave or in monitor mode */
5319 if (c
->flags
& REDIS_SLAVE
) return;
5321 /* SYNC can't be issued when the server has pending data to send to
5322 * the client about already issued commands. We need a fresh reply
5323 * buffer registering the differences between the BGSAVE and the current
5324 * dataset, so that we can copy to other slaves if needed. */
5325 if (listLength(c
->reply
) != 0) {
5326 addReplySds(c
,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
5330 redisLog(REDIS_NOTICE
,"Slave ask for synchronization");
5331 /* Here we need to check if there is a background saving operation
5332 * in progress, or if it is required to start one */
5333 if (server
.bgsavechildpid
!= -1) {
5334 /* Ok a background save is in progress. Let's check if it is a good
5335 * one for replication, i.e. if there is another slave that is
5336 * registering differences since the server forked to save */
5340 listRewind(server
.slaves
);
5341 while((ln
= listYield(server
.slaves
))) {
5343 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) break;
5346 /* Perfect, the server is already registering differences for
5347 * another slave. Set the right state, and copy the buffer. */
5348 listRelease(c
->reply
);
5349 c
->reply
= listDup(slave
->reply
);
5350 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
5351 redisLog(REDIS_NOTICE
,"Waiting for end of BGSAVE for SYNC");
5353 /* No way, we need to wait for the next BGSAVE in order to
5354 * register differences */
5355 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_START
;
5356 redisLog(REDIS_NOTICE
,"Waiting for next BGSAVE for SYNC");
5359 /* Ok we don't have a BGSAVE in progress, let's start one */
5360 redisLog(REDIS_NOTICE
,"Starting BGSAVE for SYNC");
5361 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
5362 redisLog(REDIS_NOTICE
,"Replication failed, can't BGSAVE");
5363 addReplySds(c
,sdsnew("-ERR Unalbe to perform background save\r\n"));
5366 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
5369 c
->flags
|= REDIS_SLAVE
;
5371 listAddNodeTail(server
.slaves
,c
);
5375 static void sendBulkToSlave(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
5376 redisClient
*slave
= privdata
;
5378 REDIS_NOTUSED(mask
);
5379 char buf
[REDIS_IOBUF_LEN
];
5380 ssize_t nwritten
, buflen
;
5382 if (slave
->repldboff
== 0) {
5383 /* Write the bulk write count before to transfer the DB. In theory here
5384 * we don't know how much room there is in the output buffer of the
5385 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
5386 * operations) will never be smaller than the few bytes we need. */
5389 bulkcount
= sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
5391 if (write(fd
,bulkcount
,sdslen(bulkcount
)) != (signed)sdslen(bulkcount
))
5399 lseek(slave
->repldbfd
,slave
->repldboff
,SEEK_SET
);
5400 buflen
= read(slave
->repldbfd
,buf
,REDIS_IOBUF_LEN
);
5402 redisLog(REDIS_WARNING
,"Read error sending DB to slave: %s",
5403 (buflen
== 0) ? "premature EOF" : strerror(errno
));
5407 if ((nwritten
= write(fd
,buf
,buflen
)) == -1) {
5408 redisLog(REDIS_DEBUG
,"Write error sending DB to slave: %s",
5413 slave
->repldboff
+= nwritten
;
5414 if (slave
->repldboff
== slave
->repldbsize
) {
5415 close(slave
->repldbfd
);
5416 slave
->repldbfd
= -1;
5417 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
5418 slave
->replstate
= REDIS_REPL_ONLINE
;
5419 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
,
5420 sendReplyToClient
, slave
) == AE_ERR
) {
5424 addReplySds(slave
,sdsempty());
5425 redisLog(REDIS_NOTICE
,"Synchronization with slave succeeded");
5429 /* This function is called at the end of every backgrond saving.
5430 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
5431 * otherwise REDIS_ERR is passed to the function.
5433 * The goal of this function is to handle slaves waiting for a successful
5434 * background saving in order to perform non-blocking synchronization. */
5435 static void updateSlavesWaitingBgsave(int bgsaveerr
) {
5437 int startbgsave
= 0;
5439 listRewind(server
.slaves
);
5440 while((ln
= listYield(server
.slaves
))) {
5441 redisClient
*slave
= ln
->value
;
5443 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) {
5445 slave
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
5446 } else if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) {
5447 struct redis_stat buf
;
5449 if (bgsaveerr
!= REDIS_OK
) {
5451 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE child returned an error");
5454 if ((slave
->repldbfd
= open(server
.dbfilename
,O_RDONLY
)) == -1 ||
5455 redis_fstat(slave
->repldbfd
,&buf
) == -1) {
5457 redisLog(REDIS_WARNING
,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno
));
5460 slave
->repldboff
= 0;
5461 slave
->repldbsize
= buf
.st_size
;
5462 slave
->replstate
= REDIS_REPL_SEND_BULK
;
5463 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
5464 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
, sendBulkToSlave
, slave
) == AE_ERR
) {
5471 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
5472 listRewind(server
.slaves
);
5473 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE failed");
5474 while((ln
= listYield(server
.slaves
))) {
5475 redisClient
*slave
= ln
->value
;
5477 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
)
5484 static int syncWithMaster(void) {
5485 char buf
[1024], tmpfile
[256], authcmd
[1024];
5487 int fd
= anetTcpConnect(NULL
,server
.masterhost
,server
.masterport
);
5491 redisLog(REDIS_WARNING
,"Unable to connect to MASTER: %s",
5496 /* AUTH with the master if required. */
5497 if(server
.masterauth
) {
5498 snprintf(authcmd
, 1024, "AUTH %s\r\n", server
.masterauth
);
5499 if (syncWrite(fd
, authcmd
, strlen(server
.masterauth
)+7, 5) == -1) {
5501 redisLog(REDIS_WARNING
,"Unable to AUTH to MASTER: %s",
5505 /* Read the AUTH result. */
5506 if (syncReadLine(fd
,buf
,1024,3600) == -1) {
5508 redisLog(REDIS_WARNING
,"I/O error reading auth result from MASTER: %s",
5512 if (buf
[0] != '+') {
5514 redisLog(REDIS_WARNING
,"Cannot AUTH to MASTER, is the masterauth password correct?");
5519 /* Issue the SYNC command */
5520 if (syncWrite(fd
,"SYNC \r\n",7,5) == -1) {
5522 redisLog(REDIS_WARNING
,"I/O error writing to MASTER: %s",
5526 /* Read the bulk write count */
5527 if (syncReadLine(fd
,buf
,1024,3600) == -1) {
5529 redisLog(REDIS_WARNING
,"I/O error reading bulk count from MASTER: %s",
5533 if (buf
[0] != '$') {
5535 redisLog(REDIS_WARNING
,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
5538 dumpsize
= atoi(buf
+1);
5539 redisLog(REDIS_NOTICE
,"Receiving %d bytes data dump from MASTER",dumpsize
);
5540 /* Read the bulk write data on a temp file */
5541 snprintf(tmpfile
,256,"temp-%d.%ld.rdb",(int)time(NULL
),(long int)random());
5542 dfd
= open(tmpfile
,O_CREAT
|O_WRONLY
,0644);
5545 redisLog(REDIS_WARNING
,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno
));
5549 int nread
, nwritten
;
5551 nread
= read(fd
,buf
,(dumpsize
< 1024)?dumpsize
:1024);
5553 redisLog(REDIS_WARNING
,"I/O error trying to sync with MASTER: %s",
5559 nwritten
= write(dfd
,buf
,nread
);
5560 if (nwritten
== -1) {
5561 redisLog(REDIS_WARNING
,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno
));
5569 if (rename(tmpfile
,server
.dbfilename
) == -1) {
5570 redisLog(REDIS_WARNING
,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno
));
5576 if (rdbLoad(server
.dbfilename
) != REDIS_OK
) {
5577 redisLog(REDIS_WARNING
,"Failed trying to load the MASTER synchronization DB from disk");
5581 server
.master
= createClient(fd
);
5582 server
.master
->flags
|= REDIS_MASTER
;
5583 server
.replstate
= REDIS_REPL_CONNECTED
;
5587 static void slaveofCommand(redisClient
*c
) {
5588 if (!strcasecmp(c
->argv
[1]->ptr
,"no") &&
5589 !strcasecmp(c
->argv
[2]->ptr
,"one")) {
5590 if (server
.masterhost
) {
5591 sdsfree(server
.masterhost
);
5592 server
.masterhost
= NULL
;
5593 if (server
.master
) freeClient(server
.master
);
5594 server
.replstate
= REDIS_REPL_NONE
;
5595 redisLog(REDIS_NOTICE
,"MASTER MODE enabled (user request)");
5598 sdsfree(server
.masterhost
);
5599 server
.masterhost
= sdsdup(c
->argv
[1]->ptr
);
5600 server
.masterport
= atoi(c
->argv
[2]->ptr
);
5601 if (server
.master
) freeClient(server
.master
);
5602 server
.replstate
= REDIS_REPL_CONNECT
;
5603 redisLog(REDIS_NOTICE
,"SLAVE OF %s:%d enabled (user request)",
5604 server
.masterhost
, server
.masterport
);
5606 addReply(c
,shared
.ok
);
5609 /* ============================ Maxmemory directive ======================== */
5611 /* This function gets called when 'maxmemory' is set on the config file to limit
5612 * the max memory used by the server, and we are out of memory.
5613 * This function will try to, in order:
5615 * - Free objects from the free list
5616 * - Try to remove keys with an EXPIRE set
5618 * It is not possible to free enough memory to reach used-memory < maxmemory
5619 * the server will start refusing commands that will enlarge even more the
5622 static void freeMemoryIfNeeded(void) {
5623 while (server
.maxmemory
&& zmalloc_used_memory() > server
.maxmemory
) {
5624 if (listLength(server
.objfreelist
)) {
5627 listNode
*head
= listFirst(server
.objfreelist
);
5628 o
= listNodeValue(head
);
5629 listDelNode(server
.objfreelist
,head
);
5632 int j
, k
, freed
= 0;
5634 for (j
= 0; j
< server
.dbnum
; j
++) {
5636 robj
*minkey
= NULL
;
5637 struct dictEntry
*de
;
5639 if (dictSize(server
.db
[j
].expires
)) {
5641 /* From a sample of three keys drop the one nearest to
5642 * the natural expire */
5643 for (k
= 0; k
< 3; k
++) {
5646 de
= dictGetRandomKey(server
.db
[j
].expires
);
5647 t
= (time_t) dictGetEntryVal(de
);
5648 if (minttl
== -1 || t
< minttl
) {
5649 minkey
= dictGetEntryKey(de
);
5653 deleteKey(server
.db
+j
,minkey
);
5656 if (!freed
) return; /* nothing to free... */
5661 /* ============================== Append Only file ========================== */
5663 static void feedAppendOnlyFile(struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
) {
5664 sds buf
= sdsempty();
5670 /* The DB this command was targetting is not the same as the last command
5671 * we appendend. To issue a SELECT command is needed. */
5672 if (dictid
!= server
.appendseldb
) {
5675 snprintf(seldb
,sizeof(seldb
),"%d",dictid
);
5676 buf
= sdscatprintf(buf
,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
5677 (unsigned long)strlen(seldb
),seldb
);
5678 server
.appendseldb
= dictid
;
5681 /* "Fix" the argv vector if the command is EXPIRE. We want to translate
5682 * EXPIREs into EXPIREATs calls */
5683 if (cmd
->proc
== expireCommand
) {
5686 tmpargv
[0] = createStringObject("EXPIREAT",8);
5687 tmpargv
[1] = argv
[1];
5688 incrRefCount(argv
[1]);
5689 when
= time(NULL
)+strtol(argv
[2]->ptr
,NULL
,10);
5690 tmpargv
[2] = createObject(REDIS_STRING
,
5691 sdscatprintf(sdsempty(),"%ld",when
));
5695 /* Append the actual command */
5696 buf
= sdscatprintf(buf
,"*%d\r\n",argc
);
5697 for (j
= 0; j
< argc
; j
++) {
5700 o
= getDecodedObject(o
);
5701 buf
= sdscatprintf(buf
,"$%lu\r\n",(unsigned long)sdslen(o
->ptr
));
5702 buf
= sdscatlen(buf
,o
->ptr
,sdslen(o
->ptr
));
5703 buf
= sdscatlen(buf
,"\r\n",2);
5707 /* Free the objects from the modified argv for EXPIREAT */
5708 if (cmd
->proc
== expireCommand
) {
5709 for (j
= 0; j
< 3; j
++)
5710 decrRefCount(argv
[j
]);
5713 /* We want to perform a single write. This should be guaranteed atomic
5714 * at least if the filesystem we are writing is a real physical one.
5715 * While this will save us against the server being killed I don't think
5716 * there is much to do about the whole server stopping for power problems
5718 nwritten
= write(server
.appendfd
,buf
,sdslen(buf
));
5719 if (nwritten
!= (signed)sdslen(buf
)) {
5720 /* Ooops, we are in troubles. The best thing to do for now is
5721 * to simply exit instead to give the illusion that everything is
5722 * working as expected. */
5723 if (nwritten
== -1) {
5724 redisLog(REDIS_WARNING
,"Exiting on error writing to the append-only file: %s",strerror(errno
));
5726 redisLog(REDIS_WARNING
,"Exiting on short write while writing to the append-only file: %s",strerror(errno
));
5730 /* If a background append only file rewriting is in progress we want to
5731 * accumulate the differences between the child DB and the current one
5732 * in a buffer, so that when the child process will do its work we
5733 * can append the differences to the new append only file. */
5734 if (server
.bgrewritechildpid
!= -1)
5735 server
.bgrewritebuf
= sdscatlen(server
.bgrewritebuf
,buf
,sdslen(buf
));
5739 if (server
.appendfsync
== APPENDFSYNC_ALWAYS
||
5740 (server
.appendfsync
== APPENDFSYNC_EVERYSEC
&&
5741 now
-server
.lastfsync
> 1))
5743 fsync(server
.appendfd
); /* Let's try to get this data on the disk */
5744 server
.lastfsync
= now
;
5748 /* In Redis commands are always executed in the context of a client, so in
5749 * order to load the append only file we need to create a fake client. */
5750 static struct redisClient
*createFakeClient(void) {
5751 struct redisClient
*c
= zmalloc(sizeof(*c
));
5755 c
->querybuf
= sdsempty();
5759 /* We set the fake client as a slave waiting for the synchronization
5760 * so that Redis will not try to send replies to this client. */
5761 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_START
;
5762 c
->reply
= listCreate();
5763 listSetFreeMethod(c
->reply
,decrRefCount
);
5764 listSetDupMethod(c
->reply
,dupClientReplyValue
);
5768 static void freeFakeClient(struct redisClient
*c
) {
5769 sdsfree(c
->querybuf
);
5770 listRelease(c
->reply
);
5774 /* Replay the append log file. On error REDIS_OK is returned. On non fatal
5775 * error (the append only file is zero-length) REDIS_ERR is returned. On
5776 * fatal error an error message is logged and the program exists. */
5777 int loadAppendOnlyFile(char *filename
) {
5778 struct redisClient
*fakeClient
;
5779 FILE *fp
= fopen(filename
,"r");
5780 struct redis_stat sb
;
5782 if (redis_fstat(fileno(fp
),&sb
) != -1 && sb
.st_size
== 0)
5786 redisLog(REDIS_WARNING
,"Fatal error: can't open the append log file for reading: %s",strerror(errno
));
5790 fakeClient
= createFakeClient();
5797 struct redisCommand
*cmd
;
5799 if (fgets(buf
,sizeof(buf
),fp
) == NULL
) {
5805 if (buf
[0] != '*') goto fmterr
;
5807 argv
= zmalloc(sizeof(robj
*)*argc
);
5808 for (j
= 0; j
< argc
; j
++) {
5809 if (fgets(buf
,sizeof(buf
),fp
) == NULL
) goto readerr
;
5810 if (buf
[0] != '$') goto fmterr
;
5811 len
= strtol(buf
+1,NULL
,10);
5812 argsds
= sdsnewlen(NULL
,len
);
5813 if (len
&& fread(argsds
,len
,1,fp
) == 0) goto fmterr
;
5814 argv
[j
] = createObject(REDIS_STRING
,argsds
);
5815 if (fread(buf
,2,1,fp
) == 0) goto fmterr
; /* discard CRLF */
5818 /* Command lookup */
5819 cmd
= lookupCommand(argv
[0]->ptr
);
5821 redisLog(REDIS_WARNING
,"Unknown command '%s' reading the append only file", argv
[0]->ptr
);
5824 /* Try object sharing and encoding */
5825 if (server
.shareobjects
) {
5827 for(j
= 1; j
< argc
; j
++)
5828 argv
[j
] = tryObjectSharing(argv
[j
]);
5830 if (cmd
->flags
& REDIS_CMD_BULK
)
5831 tryObjectEncoding(argv
[argc
-1]);
5832 /* Run the command in the context of a fake client */
5833 fakeClient
->argc
= argc
;
5834 fakeClient
->argv
= argv
;
5835 cmd
->proc(fakeClient
);
5836 /* Discard the reply objects list from the fake client */
5837 while(listLength(fakeClient
->reply
))
5838 listDelNode(fakeClient
->reply
,listFirst(fakeClient
->reply
));
5839 /* Clean up, ready for the next command */
5840 for (j
= 0; j
< argc
; j
++) decrRefCount(argv
[j
]);
5844 freeFakeClient(fakeClient
);
5849 redisLog(REDIS_WARNING
,"Unexpected end of file reading the append only file");
5851 redisLog(REDIS_WARNING
,"Unrecoverable error reading the append only file: %s", strerror(errno
));
5855 redisLog(REDIS_WARNING
,"Bad file format reading the append only file");
5859 /* Write an object into a file in the bulk format $<count>\r\n<payload>\r\n */
5860 static int fwriteBulk(FILE *fp
, robj
*obj
) {
5862 obj
= getDecodedObject(obj
);
5863 snprintf(buf
,sizeof(buf
),"$%ld\r\n",(long)sdslen(obj
->ptr
));
5864 if (fwrite(buf
,strlen(buf
),1,fp
) == 0) goto err
;
5865 if (fwrite(obj
->ptr
,sdslen(obj
->ptr
),1,fp
) == 0) goto err
;
5866 if (fwrite("\r\n",2,1,fp
) == 0) goto err
;
5874 /* Write a double value in bulk format $<count>\r\n<payload>\r\n */
5875 static int fwriteBulkDouble(FILE *fp
, double d
) {
5876 char buf
[128], dbuf
[128];
5878 snprintf(dbuf
,sizeof(dbuf
),"%.17g\r\n",d
);
5879 snprintf(buf
,sizeof(buf
),"$%lu\r\n",(unsigned long)strlen(dbuf
)-2);
5880 if (fwrite(buf
,strlen(buf
),1,fp
) == 0) return 0;
5881 if (fwrite(dbuf
,strlen(dbuf
),1,fp
) == 0) return 0;
5885 /* Write a long value in bulk format $<count>\r\n<payload>\r\n */
5886 static int fwriteBulkLong(FILE *fp
, long l
) {
5887 char buf
[128], lbuf
[128];
5889 snprintf(lbuf
,sizeof(lbuf
),"%ld\r\n",l
);
5890 snprintf(buf
,sizeof(buf
),"$%lu\r\n",(unsigned long)strlen(lbuf
)-2);
5891 if (fwrite(buf
,strlen(buf
),1,fp
) == 0) return 0;
5892 if (fwrite(lbuf
,strlen(lbuf
),1,fp
) == 0) return 0;
5896 /* Write a sequence of commands able to fully rebuild the dataset into
5897 * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */
5898 static int rewriteAppendOnlyFile(char *filename
) {
5899 dictIterator
*di
= NULL
;
5904 time_t now
= time(NULL
);
5906 /* Note that we have to use a different temp name here compared to the
5907 * one used by rewriteAppendOnlyFileBackground() function. */
5908 snprintf(tmpfile
,256,"temp-rewriteaof-%d.aof", (int) getpid());
5909 fp
= fopen(tmpfile
,"w");
5911 redisLog(REDIS_WARNING
, "Failed rewriting the append only file: %s", strerror(errno
));
5914 for (j
= 0; j
< server
.dbnum
; j
++) {
5915 char selectcmd
[] = "*2\r\n$6\r\nSELECT\r\n";
5916 redisDb
*db
= server
.db
+j
;
5918 if (dictSize(d
) == 0) continue;
5919 di
= dictGetIterator(d
);
5925 /* SELECT the new DB */
5926 if (fwrite(selectcmd
,sizeof(selectcmd
)-1,1,fp
) == 0) goto werr
;
5927 if (fwriteBulkLong(fp
,j
) == 0) goto werr
;
5929 /* Iterate this DB writing every entry */
5930 while((de
= dictNext(di
)) != NULL
) {
5931 robj
*key
= dictGetEntryKey(de
);
5932 robj
*o
= dictGetEntryVal(de
);
5933 time_t expiretime
= getExpire(db
,key
);
5935 /* Save the key and associated value */
5936 if (o
->type
== REDIS_STRING
) {
5937 /* Emit a SET command */
5938 char cmd
[]="*3\r\n$3\r\nSET\r\n";
5939 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
5941 if (fwriteBulk(fp
,key
) == 0) goto werr
;
5942 if (fwriteBulk(fp
,o
) == 0) goto werr
;
5943 } else if (o
->type
== REDIS_LIST
) {
5944 /* Emit the RPUSHes needed to rebuild the list */
5945 list
*list
= o
->ptr
;
5949 while((ln
= listYield(list
))) {
5950 char cmd
[]="*3\r\n$5\r\nRPUSH\r\n";
5951 robj
*eleobj
= listNodeValue(ln
);
5953 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
5954 if (fwriteBulk(fp
,key
) == 0) goto werr
;
5955 if (fwriteBulk(fp
,eleobj
) == 0) goto werr
;
5957 } else if (o
->type
== REDIS_SET
) {
5958 /* Emit the SADDs needed to rebuild the set */
5960 dictIterator
*di
= dictGetIterator(set
);
5963 while((de
= dictNext(di
)) != NULL
) {
5964 char cmd
[]="*3\r\n$4\r\nSADD\r\n";
5965 robj
*eleobj
= dictGetEntryKey(de
);
5967 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
5968 if (fwriteBulk(fp
,key
) == 0) goto werr
;
5969 if (fwriteBulk(fp
,eleobj
) == 0) goto werr
;
5971 dictReleaseIterator(di
);
5972 } else if (o
->type
== REDIS_ZSET
) {
5973 /* Emit the ZADDs needed to rebuild the sorted set */
5975 dictIterator
*di
= dictGetIterator(zs
->dict
);
5978 while((de
= dictNext(di
)) != NULL
) {
5979 char cmd
[]="*4\r\n$4\r\nZADD\r\n";
5980 robj
*eleobj
= dictGetEntryKey(de
);
5981 double *score
= dictGetEntryVal(de
);
5983 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
5984 if (fwriteBulk(fp
,key
) == 0) goto werr
;
5985 if (fwriteBulkDouble(fp
,*score
) == 0) goto werr
;
5986 if (fwriteBulk(fp
,eleobj
) == 0) goto werr
;
5988 dictReleaseIterator(di
);
5990 redisAssert(0 != 0);
5992 /* Save the expire time */
5993 if (expiretime
!= -1) {
5994 char cmd
[]="*3\r\n$6\r\nEXPIRE\r\n";
5995 /* If this key is already expired skip it */
5996 if (expiretime
< now
) continue;
5997 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
5998 if (fwriteBulk(fp
,key
) == 0) goto werr
;
5999 if (fwriteBulkLong(fp
,expiretime
) == 0) goto werr
;
6002 dictReleaseIterator(di
);
6005 /* Make sure data will not remain on the OS's output buffers */
6010 /* Use RENAME to make sure the DB file is changed atomically only
6011 * if the generate DB file is ok. */
6012 if (rename(tmpfile
,filename
) == -1) {
6013 redisLog(REDIS_WARNING
,"Error moving temp append only file on the final destination: %s", strerror(errno
));
6017 redisLog(REDIS_NOTICE
,"SYNC append only file rewrite performed");
6023 redisLog(REDIS_WARNING
,"Write error writing append only fileon disk: %s", strerror(errno
));
6024 if (di
) dictReleaseIterator(di
);
6028 /* This is how rewriting of the append only file in background works:
6030 * 1) The user calls BGREWRITEAOF
6031 * 2) Redis calls this function, that forks():
6032 * 2a) the child rewrite the append only file in a temp file.
6033 * 2b) the parent accumulates differences in server.bgrewritebuf.
6034 * 3) When the child finished '2a' exists.
6035 * 4) The parent will trap the exit code, if it's OK, will append the
6036 * data accumulated into server.bgrewritebuf into the temp file, and
6037 * finally will rename(2) the temp file in the actual file name.
6038 * The the new file is reopened as the new append only file. Profit!
6040 static int rewriteAppendOnlyFileBackground(void) {
6043 if (server
.bgrewritechildpid
!= -1) return REDIS_ERR
;
6044 if ((childpid
= fork()) == 0) {
6049 snprintf(tmpfile
,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
6050 if (rewriteAppendOnlyFile(tmpfile
) == REDIS_OK
) {
6057 if (childpid
== -1) {
6058 redisLog(REDIS_WARNING
,
6059 "Can't rewrite append only file in background: fork: %s",
6063 redisLog(REDIS_NOTICE
,
6064 "Background append only file rewriting started by pid %d",childpid
);
6065 server
.bgrewritechildpid
= childpid
;
6066 /* We set appendseldb to -1 in order to force the next call to the
6067 * feedAppendOnlyFile() to issue a SELECT command, so the differences
6068 * accumulated by the parent into server.bgrewritebuf will start
6069 * with a SELECT statement and it will be safe to merge. */
6070 server
.appendseldb
= -1;
6073 return REDIS_OK
; /* unreached */
6076 static void bgrewriteaofCommand(redisClient
*c
) {
6077 if (server
.bgrewritechildpid
!= -1) {
6078 addReplySds(c
,sdsnew("-ERR background append only file rewriting already in progress\r\n"));
6081 if (rewriteAppendOnlyFileBackground() == REDIS_OK
) {
6082 addReply(c
,shared
.ok
);
6084 addReply(c
,shared
.err
);
6088 static void aofRemoveTempFile(pid_t childpid
) {
6091 snprintf(tmpfile
,256,"temp-rewriteaof-bg-%d.aof", (int) childpid
);
6095 /* ================================= Debugging ============================== */
6097 static void debugCommand(redisClient
*c
) {
6098 if (!strcasecmp(c
->argv
[1]->ptr
,"segfault")) {
6100 } else if (!strcasecmp(c
->argv
[1]->ptr
,"reload")) {
6101 if (rdbSave(server
.dbfilename
) != REDIS_OK
) {
6102 addReply(c
,shared
.err
);
6106 if (rdbLoad(server
.dbfilename
) != REDIS_OK
) {
6107 addReply(c
,shared
.err
);
6110 redisLog(REDIS_WARNING
,"DB reloaded by DEBUG RELOAD");
6111 addReply(c
,shared
.ok
);
6112 } else if (!strcasecmp(c
->argv
[1]->ptr
,"object") && c
->argc
== 3) {
6113 dictEntry
*de
= dictFind(c
->db
->dict
,c
->argv
[2]);
6117 addReply(c
,shared
.nokeyerr
);
6120 key
= dictGetEntryKey(de
);
6121 val
= dictGetEntryVal(de
);
6122 addReplySds(c
,sdscatprintf(sdsempty(),
6123 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
6124 (void*)key
, key
->refcount
, (void*)val
, val
->refcount
,
6127 addReplySds(c
,sdsnew(
6128 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>|RELOAD]\r\n"));
6132 static void _redisAssert(char *estr
) {
6133 redisLog(REDIS_WARNING
,"=== ASSERTION FAILED ===");
6134 redisLog(REDIS_WARNING
,"==> %s\n",estr
);
6135 #ifdef HAVE_BACKTRACE
6136 redisLog(REDIS_WARNING
,"(forcing SIGSEGV in order to print the stack trace)");
6141 /* =================================== Main! ================================ */
6144 int linuxOvercommitMemoryValue(void) {
6145 FILE *fp
= fopen("/proc/sys/vm/overcommit_memory","r");
6149 if (fgets(buf
,64,fp
) == NULL
) {
6158 void linuxOvercommitMemoryWarning(void) {
6159 if (linuxOvercommitMemoryValue() == 0) {
6160 redisLog(REDIS_WARNING
,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
6163 #endif /* __linux__ */
6165 static void daemonize(void) {
6169 if (fork() != 0) exit(0); /* parent exits */
6170 printf("New pid: %d\n", getpid());
6171 setsid(); /* create a new session */
6173 /* Every output goes to /dev/null. If Redis is daemonized but
6174 * the 'logfile' is set to 'stdout' in the configuration file
6175 * it will not log at all. */
6176 if ((fd
= open("/dev/null", O_RDWR
, 0)) != -1) {
6177 dup2(fd
, STDIN_FILENO
);
6178 dup2(fd
, STDOUT_FILENO
);
6179 dup2(fd
, STDERR_FILENO
);
6180 if (fd
> STDERR_FILENO
) close(fd
);
6182 /* Try to write the pid file */
6183 fp
= fopen(server
.pidfile
,"w");
6185 fprintf(fp
,"%d\n",getpid());
6190 int main(int argc
, char **argv
) {
6193 resetServerSaveParams();
6194 loadServerConfig(argv
[1]);
6195 } else if (argc
> 2) {
6196 fprintf(stderr
,"Usage: ./redis-server [/path/to/redis.conf]\n");
6199 redisLog(REDIS_WARNING
,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
6201 if (server
.daemonize
) daemonize();
6203 redisLog(REDIS_NOTICE
,"Server started, Redis version " REDIS_VERSION
);
6205 linuxOvercommitMemoryWarning();
6207 if (server
.appendonly
) {
6208 if (loadAppendOnlyFile(server
.appendfilename
) == REDIS_OK
)
6209 redisLog(REDIS_NOTICE
,"DB loaded from append only file");
6211 if (rdbLoad(server
.dbfilename
) == REDIS_OK
)
6212 redisLog(REDIS_NOTICE
,"DB loaded from disk");
6214 if (aeCreateFileEvent(server
.el
, server
.fd
, AE_READABLE
,
6215 acceptHandler
, NULL
) == AE_ERR
) oom("creating file event");
6216 redisLog(REDIS_NOTICE
,"The server is now ready to accept connections on port %d", server
.port
);
6218 aeDeleteEventLoop(server
.el
);
6222 /* ============================= Backtrace support ========================= */
6224 #ifdef HAVE_BACKTRACE
6225 static char *findFuncName(void *pointer
, unsigned long *offset
);
6227 static void *getMcontextEip(ucontext_t
*uc
) {
6228 #if defined(__FreeBSD__)
6229 return (void*) uc
->uc_mcontext
.mc_eip
;
6230 #elif defined(__dietlibc__)
6231 return (void*) uc
->uc_mcontext
.eip
;
6232 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
6234 return (void*) uc
->uc_mcontext
->__ss
.__rip
;
6236 return (void*) uc
->uc_mcontext
->__ss
.__eip
;
6238 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
6239 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
6240 return (void*) uc
->uc_mcontext
->__ss
.__rip
;
6242 return (void*) uc
->uc_mcontext
->__ss
.__eip
;
6244 #elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
6245 return (void*) uc
->uc_mcontext
.gregs
[REG_EIP
];
6246 #elif defined(__ia64__) /* Linux IA64 */
6247 return (void*) uc
->uc_mcontext
.sc_ip
;
6253 static void segvHandler(int sig
, siginfo_t
*info
, void *secret
) {
6255 char **messages
= NULL
;
6256 int i
, trace_size
= 0;
6257 unsigned long offset
=0;
6258 ucontext_t
*uc
= (ucontext_t
*) secret
;
6260 REDIS_NOTUSED(info
);
6262 redisLog(REDIS_WARNING
,
6263 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION
, sig
);
6264 infostring
= genRedisInfoString();
6265 redisLog(REDIS_WARNING
, "%s",infostring
);
6266 /* It's not safe to sdsfree() the returned string under memory
6267 * corruption conditions. Let it leak as we are going to abort */
6269 trace_size
= backtrace(trace
, 100);
6270 /* overwrite sigaction with caller's address */
6271 if (getMcontextEip(uc
) != NULL
) {
6272 trace
[1] = getMcontextEip(uc
);
6274 messages
= backtrace_symbols(trace
, trace_size
);
6276 for (i
=1; i
<trace_size
; ++i
) {
6277 char *fn
= findFuncName(trace
[i
], &offset
), *p
;
6279 p
= strchr(messages
[i
],'+');
6280 if (!fn
|| (p
&& ((unsigned long)strtol(p
+1,NULL
,10)) < offset
)) {
6281 redisLog(REDIS_WARNING
,"%s", messages
[i
]);
6283 redisLog(REDIS_WARNING
,"%d redis-server %p %s + %d", i
, trace
[i
], fn
, (unsigned int)offset
);
6286 // free(messages); Don't call free() with possibly corrupted memory.
6290 static void setupSigSegvAction(void) {
6291 struct sigaction act
;
6293 sigemptyset (&act
.sa_mask
);
6294 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
6295 * is used. Otherwise, sa_handler is used */
6296 act
.sa_flags
= SA_NODEFER
| SA_ONSTACK
| SA_RESETHAND
| SA_SIGINFO
;
6297 act
.sa_sigaction
= segvHandler
;
6298 sigaction (SIGSEGV
, &act
, NULL
);
6299 sigaction (SIGBUS
, &act
, NULL
);
6300 sigaction (SIGFPE
, &act
, NULL
);
6301 sigaction (SIGILL
, &act
, NULL
);
6302 sigaction (SIGBUS
, &act
, NULL
);
6306 #include "staticsymbols.h"
6307 /* This function try to convert a pointer into a function name. It's used in
6308 * oreder to provide a backtrace under segmentation fault that's able to
6309 * display functions declared as static (otherwise the backtrace is useless). */
6310 static char *findFuncName(void *pointer
, unsigned long *offset
){
6312 unsigned long off
, minoff
= 0;
6314 /* Try to match against the Symbol with the smallest offset */
6315 for (i
=0; symsTable
[i
].pointer
; i
++) {
6316 unsigned long lp
= (unsigned long) pointer
;
6318 if (lp
!= (unsigned long)-1 && lp
>= symsTable
[i
].pointer
) {
6319 off
=lp
-symsTable
[i
].pointer
;
6320 if (ret
< 0 || off
< minoff
) {
6326 if (ret
== -1) return NULL
;
6328 return symsTable
[ret
].name
;
6330 #else /* HAVE_BACKTRACE */
6331 static void setupSigSegvAction(void) {
6333 #endif /* HAVE_BACKTRACE */