2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
30 #define REDIS_VERSION "1.3.0"
40 #define __USE_POSIX199309
46 #endif /* HAVE_BACKTRACE */
54 #include <arpa/inet.h>
58 #include <sys/resource.h>
64 #include "solarisfixes.h"
68 #include "ae.h" /* Event driven programming library */
69 #include "sds.h" /* Dynamic safe strings */
70 #include "anet.h" /* Networking the easy way */
71 #include "dict.h" /* Hash tables */
72 #include "adlist.h" /* Linked lists */
73 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
74 #include "lzf.h" /* LZF compression library */
75 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
81 /* Static server configuration */
82 #define REDIS_SERVERPORT 6379 /* TCP port */
83 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
84 #define REDIS_IOBUF_LEN 1024
85 #define REDIS_LOADBUF_LEN 1024
86 #define REDIS_STATIC_ARGS 4
87 #define REDIS_DEFAULT_DBNUM 16
88 #define REDIS_CONFIGLINE_MAX 1024
89 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
90 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
91 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
92 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
93 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
95 /* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */
96 #define REDIS_WRITEV_THRESHOLD 3
97 /* Max number of iovecs used for each writev call */
98 #define REDIS_WRITEV_IOVEC_COUNT 256
100 /* Hash table parameters */
101 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
104 #define REDIS_CMD_BULK 1 /* Bulk write command */
105 #define REDIS_CMD_INLINE 2 /* Inline command */
106 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
107 this flags will return an error when the 'maxmemory' option is set in the
108 config file and the server is using more than maxmemory bytes of memory.
109 In short this commands are denied on low memory conditions. */
110 #define REDIS_CMD_DENYOOM 4
113 #define REDIS_STRING 0
119 /* Objects encoding */
120 #define REDIS_ENCODING_RAW 0 /* Raw representation */
121 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
123 /* Object types only used for dumping to disk */
124 #define REDIS_EXPIRETIME 253
125 #define REDIS_SELECTDB 254
126 #define REDIS_EOF 255
128 /* Defines related to the dump file format. To store 32 bits lengths for short
129 * keys requires a lot of space, so we check the most significant 2 bits of
130 * the first byte to interpreter the length:
132 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
133 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
134 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
135 * 11|000000 this means: specially encoded object will follow. The six bits
136 * number specify the kind of object that follows.
137 * See the REDIS_RDB_ENC_* defines.
139 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
140 * values, will fit inside. */
141 #define REDIS_RDB_6BITLEN 0
142 #define REDIS_RDB_14BITLEN 1
143 #define REDIS_RDB_32BITLEN 2
144 #define REDIS_RDB_ENCVAL 3
145 #define REDIS_RDB_LENERR UINT_MAX
147 /* When a length of a string object stored on disk has the first two bits
148 * set, the remaining two bits specify a special encoding for the object
149 * accordingly to the following defines: */
150 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
151 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
152 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
153 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
156 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
157 #define REDIS_SLAVE 2 /* This client is a slave server */
158 #define REDIS_MASTER 4 /* This client is a master server */
159 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
160 #define REDIS_MULTI 16 /* This client is in a MULTI context */
161 #define REDIS_BLOCKED 32 /* The client is waiting in a blocking operation */
163 /* Slave replication state - slave side */
164 #define REDIS_REPL_NONE 0 /* No active replication */
165 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
166 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
168 /* Slave replication state - from the point of view of master
169 * Note that in SEND_BULK and ONLINE state the slave receives new updates
170 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
171 * to start the next background saving in order to send updates to it. */
172 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
173 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
174 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
175 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
177 /* List related stuff */
181 /* Sort operations */
182 #define REDIS_SORT_GET 0
183 #define REDIS_SORT_ASC 1
184 #define REDIS_SORT_DESC 2
185 #define REDIS_SORTKEY_MAX 1024
188 #define REDIS_DEBUG 0
189 #define REDIS_NOTICE 1
190 #define REDIS_WARNING 2
192 /* Anti-warning macro... */
193 #define REDIS_NOTUSED(V) ((void) V)
195 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
196 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
198 /* Append only defines */
199 #define APPENDFSYNC_NO 0
200 #define APPENDFSYNC_ALWAYS 1
201 #define APPENDFSYNC_EVERYSEC 2
203 /* We can print the stacktrace, so our assert is defined this way: */
204 #define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e),exit(1)))
205 static void _redisAssert(char *estr
);
207 /*================================= Data types ============================== */
209 /* A redis object, that is a type able to hold a string / list / set */
210 typedef struct redisObject
{
213 unsigned char encoding
;
214 unsigned char notused
[2];
218 /* Macro used to initalize a Redis object allocated on the stack.
219 * Note that this macro is taken near the structure definition to make sure
220 * we'll update it when the structure is changed, to avoid bugs like
221 * bug #85 introduced exactly in this way. */
222 #define initStaticStringObject(_var,_ptr) do { \
224 _var.type = REDIS_STRING; \
225 _var.encoding = REDIS_ENCODING_RAW; \
229 typedef struct redisDb
{
230 dict
*dict
; /* The keyspace for this DB */
231 dict
*expires
; /* Timeout of keys with a timeout set */
232 dict
*blockingkeys
; /* Keys with clients waiting for data (BLPOP) */
236 /* Client MULTI/EXEC state */
237 typedef struct multiCmd
{
240 struct redisCommand
*cmd
;
243 typedef struct multiState
{
244 multiCmd
*commands
; /* Array of MULTI commands */
245 int count
; /* Total number of MULTI commands */
248 /* With multiplexing we need to take per-clinet state.
249 * Clients are taken in a liked list. */
250 typedef struct redisClient
{
255 robj
**argv
, **mbargv
;
257 int bulklen
; /* bulk read len. -1 if not in bulk read mode */
258 int multibulk
; /* multi bulk command format active */
261 time_t lastinteraction
; /* time of the last interaction, used for timeout */
262 int flags
; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
264 int slaveseldb
; /* slave selected db, if this client is a slave */
265 int authenticated
; /* when requirepass is non-NULL */
266 int replstate
; /* replication state if this is a slave */
267 int repldbfd
; /* replication DB file descriptor */
268 long repldboff
; /* replication DB file offset */
269 off_t repldbsize
; /* replication DB file size */
270 multiState mstate
; /* MULTI/EXEC state */
271 robj
*blockingkey
; /* The key we waiting to terminate a blocking
272 * operation such as BLPOP. Otherwise NULL. */
273 time_t blockingto
; /* Blocking operation timeout. If UNIX current time
274 * is >= blockingto then the operation timed out. */
282 /* Global server state structure */
287 dict
*sharingpool
; /* Poll used for object sharing */
288 unsigned int sharingpoolsize
;
289 long long dirty
; /* changes to DB from the last save */
291 list
*slaves
, *monitors
;
292 char neterr
[ANET_ERR_LEN
];
294 int cronloops
; /* number of times the cron function run */
295 list
*objfreelist
; /* A list of freed objects to avoid malloc() */
296 time_t lastsave
; /* Unix time of last save succeeede */
297 size_t usedmemory
; /* Used memory in megabytes */
298 /* Fields used only for stats */
299 time_t stat_starttime
; /* server start time */
300 long long stat_numcommands
; /* number of processed commands */
301 long long stat_numconnections
; /* number of connections received */
314 pid_t bgsavechildpid
;
315 pid_t bgrewritechildpid
;
316 sds bgrewritebuf
; /* buffer taken by parent during oppend only rewrite */
317 struct saveparam
*saveparams
;
322 char *appendfilename
;
326 /* Replication related */
331 redisClient
*master
; /* client that is master for this slave */
333 unsigned int maxclients
;
334 unsigned long maxmemory
;
335 unsigned int blockedclients
;
336 /* Sort parameters - qsort_r() is only available under BSD so we
337 * have to take this state global, in order to pass it to sortCompare() */
343 typedef void redisCommandProc(redisClient
*c
);
344 struct redisCommand
{
346 redisCommandProc
*proc
;
351 struct redisFunctionSym
{
353 unsigned long pointer
;
356 typedef struct _redisSortObject
{
364 typedef struct _redisSortOperation
{
367 } redisSortOperation
;
369 /* ZSETs use a specialized version of Skiplists */
371 typedef struct zskiplistNode
{
372 struct zskiplistNode
**forward
;
373 struct zskiplistNode
*backward
;
378 typedef struct zskiplist
{
379 struct zskiplistNode
*header
, *tail
;
380 unsigned long length
;
384 typedef struct zset
{
389 /* Our shared "common" objects */
391 struct sharedObjectsStruct
{
392 robj
*crlf
, *ok
, *err
, *emptybulk
, *czero
, *cone
, *pong
, *space
,
393 *colon
, *nullbulk
, *nullmultibulk
, *queued
,
394 *emptymultibulk
, *wrongtypeerr
, *nokeyerr
, *syntaxerr
, *sameobjecterr
,
395 *outofrangeerr
, *plus
,
396 *select0
, *select1
, *select2
, *select3
, *select4
,
397 *select5
, *select6
, *select7
, *select8
, *select9
;
400 /* Global vars that are actally used as constants. The following double
401 * values are used for double on-disk serialization, and are initialized
402 * at runtime to avoid strange compiler optimizations. */
404 static double R_Zero
, R_PosInf
, R_NegInf
, R_Nan
;
406 /*================================ Prototypes =============================== */
408 static void freeStringObject(robj
*o
);
409 static void freeListObject(robj
*o
);
410 static void freeSetObject(robj
*o
);
411 static void decrRefCount(void *o
);
412 static robj
*createObject(int type
, void *ptr
);
413 static void freeClient(redisClient
*c
);
414 static int rdbLoad(char *filename
);
415 static void addReply(redisClient
*c
, robj
*obj
);
416 static void addReplySds(redisClient
*c
, sds s
);
417 static void incrRefCount(robj
*o
);
418 static int rdbSaveBackground(char *filename
);
419 static robj
*createStringObject(char *ptr
, size_t len
);
420 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
);
421 static void feedAppendOnlyFile(struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
);
422 static int syncWithMaster(void);
423 static robj
*tryObjectSharing(robj
*o
);
424 static int tryObjectEncoding(robj
*o
);
425 static robj
*getDecodedObject(robj
*o
);
426 static int removeExpire(redisDb
*db
, robj
*key
);
427 static int expireIfNeeded(redisDb
*db
, robj
*key
);
428 static int deleteIfVolatile(redisDb
*db
, robj
*key
);
429 static int deleteKey(redisDb
*db
, robj
*key
);
430 static time_t getExpire(redisDb
*db
, robj
*key
);
431 static int setExpire(redisDb
*db
, robj
*key
, time_t when
);
432 static void updateSlavesWaitingBgsave(int bgsaveerr
);
433 static void freeMemoryIfNeeded(void);
434 static int processCommand(redisClient
*c
);
435 static void setupSigSegvAction(void);
436 static void rdbRemoveTempFile(pid_t childpid
);
437 static void aofRemoveTempFile(pid_t childpid
);
438 static size_t stringObjectLen(robj
*o
);
439 static void processInputBuffer(redisClient
*c
);
440 static zskiplist
*zslCreate(void);
441 static void zslFree(zskiplist
*zsl
);
442 static void zslInsert(zskiplist
*zsl
, double score
, robj
*obj
);
443 static void sendReplyToClientWritev(aeEventLoop
*el
, int fd
, void *privdata
, int mask
);
444 static void initClientMultiState(redisClient
*c
);
445 static void freeClientMultiState(redisClient
*c
);
446 static void queueMultiCommand(redisClient
*c
, struct redisCommand
*cmd
);
447 static void unblockClient(redisClient
*c
);
448 static int handleClientsWaitingListPush(redisClient
*c
, robj
*key
, robj
*ele
);
450 static void authCommand(redisClient
*c
);
451 static void pingCommand(redisClient
*c
);
452 static void echoCommand(redisClient
*c
);
453 static void setCommand(redisClient
*c
);
454 static void setnxCommand(redisClient
*c
);
455 static void getCommand(redisClient
*c
);
456 static void delCommand(redisClient
*c
);
457 static void existsCommand(redisClient
*c
);
458 static void incrCommand(redisClient
*c
);
459 static void decrCommand(redisClient
*c
);
460 static void incrbyCommand(redisClient
*c
);
461 static void decrbyCommand(redisClient
*c
);
462 static void selectCommand(redisClient
*c
);
463 static void randomkeyCommand(redisClient
*c
);
464 static void keysCommand(redisClient
*c
);
465 static void dbsizeCommand(redisClient
*c
);
466 static void lastsaveCommand(redisClient
*c
);
467 static void saveCommand(redisClient
*c
);
468 static void bgsaveCommand(redisClient
*c
);
469 static void bgrewriteaofCommand(redisClient
*c
);
470 static void shutdownCommand(redisClient
*c
);
471 static void moveCommand(redisClient
*c
);
472 static void renameCommand(redisClient
*c
);
473 static void renamenxCommand(redisClient
*c
);
474 static void lpushCommand(redisClient
*c
);
475 static void rpushCommand(redisClient
*c
);
476 static void lpopCommand(redisClient
*c
);
477 static void rpopCommand(redisClient
*c
);
478 static void llenCommand(redisClient
*c
);
479 static void lindexCommand(redisClient
*c
);
480 static void lrangeCommand(redisClient
*c
);
481 static void ltrimCommand(redisClient
*c
);
482 static void typeCommand(redisClient
*c
);
483 static void lsetCommand(redisClient
*c
);
484 static void saddCommand(redisClient
*c
);
485 static void sremCommand(redisClient
*c
);
486 static void smoveCommand(redisClient
*c
);
487 static void sismemberCommand(redisClient
*c
);
488 static void scardCommand(redisClient
*c
);
489 static void spopCommand(redisClient
*c
);
490 static void srandmemberCommand(redisClient
*c
);
491 static void sinterCommand(redisClient
*c
);
492 static void sinterstoreCommand(redisClient
*c
);
493 static void sunionCommand(redisClient
*c
);
494 static void sunionstoreCommand(redisClient
*c
);
495 static void sdiffCommand(redisClient
*c
);
496 static void sdiffstoreCommand(redisClient
*c
);
497 static void syncCommand(redisClient
*c
);
498 static void flushdbCommand(redisClient
*c
);
499 static void flushallCommand(redisClient
*c
);
500 static void sortCommand(redisClient
*c
);
501 static void lremCommand(redisClient
*c
);
502 static void rpoplpushcommand(redisClient
*c
);
503 static void infoCommand(redisClient
*c
);
504 static void mgetCommand(redisClient
*c
);
505 static void monitorCommand(redisClient
*c
);
506 static void expireCommand(redisClient
*c
);
507 static void expireatCommand(redisClient
*c
);
508 static void getsetCommand(redisClient
*c
);
509 static void ttlCommand(redisClient
*c
);
510 static void slaveofCommand(redisClient
*c
);
511 static void debugCommand(redisClient
*c
);
512 static void msetCommand(redisClient
*c
);
513 static void msetnxCommand(redisClient
*c
);
514 static void zaddCommand(redisClient
*c
);
515 static void zincrbyCommand(redisClient
*c
);
516 static void zrangeCommand(redisClient
*c
);
517 static void zrangebyscoreCommand(redisClient
*c
);
518 static void zrevrangeCommand(redisClient
*c
);
519 static void zcardCommand(redisClient
*c
);
520 static void zremCommand(redisClient
*c
);
521 static void zscoreCommand(redisClient
*c
);
522 static void zremrangebyscoreCommand(redisClient
*c
);
523 static void multiCommand(redisClient
*c
);
524 static void execCommand(redisClient
*c
);
525 static void blpopCommand(redisClient
*c
);
526 static void brpopCommand(redisClient
*c
);
528 /*================================= Globals ================================= */
531 static struct redisServer server
; /* server global state */
532 static struct redisCommand cmdTable
[] = {
533 {"get",getCommand
,2,REDIS_CMD_INLINE
},
534 {"set",setCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
535 {"setnx",setnxCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
536 {"del",delCommand
,-2,REDIS_CMD_INLINE
},
537 {"exists",existsCommand
,2,REDIS_CMD_INLINE
},
538 {"incr",incrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
539 {"decr",decrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
540 {"mget",mgetCommand
,-2,REDIS_CMD_INLINE
},
541 {"rpush",rpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
542 {"lpush",lpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
543 {"rpop",rpopCommand
,2,REDIS_CMD_INLINE
},
544 {"lpop",lpopCommand
,2,REDIS_CMD_INLINE
},
545 {"brpop",brpopCommand
,3,REDIS_CMD_INLINE
},
546 {"blpop",blpopCommand
,3,REDIS_CMD_INLINE
},
547 {"llen",llenCommand
,2,REDIS_CMD_INLINE
},
548 {"lindex",lindexCommand
,3,REDIS_CMD_INLINE
},
549 {"lset",lsetCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
550 {"lrange",lrangeCommand
,4,REDIS_CMD_INLINE
},
551 {"ltrim",ltrimCommand
,4,REDIS_CMD_INLINE
},
552 {"lrem",lremCommand
,4,REDIS_CMD_BULK
},
553 {"rpoplpush",rpoplpushcommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
554 {"sadd",saddCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
555 {"srem",sremCommand
,3,REDIS_CMD_BULK
},
556 {"smove",smoveCommand
,4,REDIS_CMD_BULK
},
557 {"sismember",sismemberCommand
,3,REDIS_CMD_BULK
},
558 {"scard",scardCommand
,2,REDIS_CMD_INLINE
},
559 {"spop",spopCommand
,2,REDIS_CMD_INLINE
},
560 {"srandmember",srandmemberCommand
,2,REDIS_CMD_INLINE
},
561 {"sinter",sinterCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
562 {"sinterstore",sinterstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
563 {"sunion",sunionCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
564 {"sunionstore",sunionstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
565 {"sdiff",sdiffCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
566 {"sdiffstore",sdiffstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
567 {"smembers",sinterCommand
,2,REDIS_CMD_INLINE
},
568 {"zadd",zaddCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
569 {"zincrby",zincrbyCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
570 {"zrem",zremCommand
,3,REDIS_CMD_BULK
},
571 {"zremrangebyscore",zremrangebyscoreCommand
,4,REDIS_CMD_INLINE
},
572 {"zrange",zrangeCommand
,-4,REDIS_CMD_INLINE
},
573 {"zrangebyscore",zrangebyscoreCommand
,-4,REDIS_CMD_INLINE
},
574 {"zrevrange",zrevrangeCommand
,-4,REDIS_CMD_INLINE
},
575 {"zcard",zcardCommand
,2,REDIS_CMD_INLINE
},
576 {"zscore",zscoreCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
577 {"incrby",incrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
578 {"decrby",decrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
579 {"getset",getsetCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
580 {"mset",msetCommand
,-3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
581 {"msetnx",msetnxCommand
,-3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
582 {"randomkey",randomkeyCommand
,1,REDIS_CMD_INLINE
},
583 {"select",selectCommand
,2,REDIS_CMD_INLINE
},
584 {"move",moveCommand
,3,REDIS_CMD_INLINE
},
585 {"rename",renameCommand
,3,REDIS_CMD_INLINE
},
586 {"renamenx",renamenxCommand
,3,REDIS_CMD_INLINE
},
587 {"expire",expireCommand
,3,REDIS_CMD_INLINE
},
588 {"expireat",expireatCommand
,3,REDIS_CMD_INLINE
},
589 {"keys",keysCommand
,2,REDIS_CMD_INLINE
},
590 {"dbsize",dbsizeCommand
,1,REDIS_CMD_INLINE
},
591 {"auth",authCommand
,2,REDIS_CMD_INLINE
},
592 {"ping",pingCommand
,1,REDIS_CMD_INLINE
},
593 {"echo",echoCommand
,2,REDIS_CMD_BULK
},
594 {"save",saveCommand
,1,REDIS_CMD_INLINE
},
595 {"bgsave",bgsaveCommand
,1,REDIS_CMD_INLINE
},
596 {"bgrewriteaof",bgrewriteaofCommand
,1,REDIS_CMD_INLINE
},
597 {"shutdown",shutdownCommand
,1,REDIS_CMD_INLINE
},
598 {"lastsave",lastsaveCommand
,1,REDIS_CMD_INLINE
},
599 {"type",typeCommand
,2,REDIS_CMD_INLINE
},
600 {"multi",multiCommand
,1,REDIS_CMD_INLINE
},
601 {"exec",execCommand
,1,REDIS_CMD_INLINE
},
602 {"sync",syncCommand
,1,REDIS_CMD_INLINE
},
603 {"flushdb",flushdbCommand
,1,REDIS_CMD_INLINE
},
604 {"flushall",flushallCommand
,1,REDIS_CMD_INLINE
},
605 {"sort",sortCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
606 {"info",infoCommand
,1,REDIS_CMD_INLINE
},
607 {"monitor",monitorCommand
,1,REDIS_CMD_INLINE
},
608 {"ttl",ttlCommand
,2,REDIS_CMD_INLINE
},
609 {"slaveof",slaveofCommand
,3,REDIS_CMD_INLINE
},
610 {"debug",debugCommand
,-2,REDIS_CMD_INLINE
},
614 /*============================ Utility functions ============================ */
616 /* Glob-style pattern matching. */
617 int stringmatchlen(const char *pattern
, int patternLen
,
618 const char *string
, int stringLen
, int nocase
)
623 while (pattern
[1] == '*') {
628 return 1; /* match */
630 if (stringmatchlen(pattern
+1, patternLen
-1,
631 string
, stringLen
, nocase
))
632 return 1; /* match */
636 return 0; /* no match */
640 return 0; /* no match */
650 not = pattern
[0] == '^';
657 if (pattern
[0] == '\\') {
660 if (pattern
[0] == string
[0])
662 } else if (pattern
[0] == ']') {
664 } else if (patternLen
== 0) {
668 } else if (pattern
[1] == '-' && patternLen
>= 3) {
669 int start
= pattern
[0];
670 int end
= pattern
[2];
678 start
= tolower(start
);
684 if (c
>= start
&& c
<= end
)
688 if (pattern
[0] == string
[0])
691 if (tolower((int)pattern
[0]) == tolower((int)string
[0]))
701 return 0; /* no match */
707 if (patternLen
>= 2) {
714 if (pattern
[0] != string
[0])
715 return 0; /* no match */
717 if (tolower((int)pattern
[0]) != tolower((int)string
[0]))
718 return 0; /* no match */
726 if (stringLen
== 0) {
727 while(*pattern
== '*') {
734 if (patternLen
== 0 && stringLen
== 0)
739 static void redisLog(int level
, const char *fmt
, ...) {
743 fp
= (server
.logfile
== NULL
) ? stdout
: fopen(server
.logfile
,"a");
747 if (level
>= server
.verbosity
) {
753 strftime(buf
,64,"%d %b %H:%M:%S",localtime(&now
));
754 fprintf(fp
,"%s %c ",buf
,c
[level
]);
755 vfprintf(fp
, fmt
, ap
);
761 if (server
.logfile
) fclose(fp
);
764 /*====================== Hash table type implementation ==================== */
766 /* This is an hash table type that uses the SDS dynamic strings libary as
767 * keys and radis objects as values (objects can hold SDS strings,
770 static void dictVanillaFree(void *privdata
, void *val
)
772 DICT_NOTUSED(privdata
);
776 static void dictListDestructor(void *privdata
, void *val
)
778 DICT_NOTUSED(privdata
);
779 listRelease((list
*)val
);
782 static int sdsDictKeyCompare(void *privdata
, const void *key1
,
786 DICT_NOTUSED(privdata
);
788 l1
= sdslen((sds
)key1
);
789 l2
= sdslen((sds
)key2
);
790 if (l1
!= l2
) return 0;
791 return memcmp(key1
, key2
, l1
) == 0;
794 static void dictRedisObjectDestructor(void *privdata
, void *val
)
796 DICT_NOTUSED(privdata
);
801 static int dictObjKeyCompare(void *privdata
, const void *key1
,
804 const robj
*o1
= key1
, *o2
= key2
;
805 return sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
808 static unsigned int dictObjHash(const void *key
) {
810 return dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
813 static int dictEncObjKeyCompare(void *privdata
, const void *key1
,
816 robj
*o1
= (robj
*) key1
, *o2
= (robj
*) key2
;
819 o1
= getDecodedObject(o1
);
820 o2
= getDecodedObject(o2
);
821 cmp
= sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
827 static unsigned int dictEncObjHash(const void *key
) {
828 robj
*o
= (robj
*) key
;
830 o
= getDecodedObject(o
);
831 unsigned int hash
= dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
836 static dictType setDictType
= {
837 dictEncObjHash
, /* hash function */
840 dictEncObjKeyCompare
, /* key compare */
841 dictRedisObjectDestructor
, /* key destructor */
842 NULL
/* val destructor */
845 static dictType zsetDictType
= {
846 dictEncObjHash
, /* hash function */
849 dictEncObjKeyCompare
, /* key compare */
850 dictRedisObjectDestructor
, /* key destructor */
851 dictVanillaFree
/* val destructor of malloc(sizeof(double)) */
854 static dictType hashDictType
= {
855 dictObjHash
, /* hash function */
858 dictObjKeyCompare
, /* key compare */
859 dictRedisObjectDestructor
, /* key destructor */
860 dictRedisObjectDestructor
/* val destructor */
863 /* Keylist hash table type has unencoded redis objects as keys and
864 * lists as values. It's used for blocking operations (BLPOP) */
865 static dictType keylistDictType
= {
866 dictObjHash
, /* hash function */
869 dictObjKeyCompare
, /* key compare */
870 dictRedisObjectDestructor
, /* key destructor */
871 dictListDestructor
/* val destructor */
874 /* ========================= Random utility functions ======================= */
876 /* Redis generally does not try to recover from out of memory conditions
877 * when allocating objects or strings, it is not clear if it will be possible
878 * to report this condition to the client since the networking layer itself
879 * is based on heap allocation for send buffers, so we simply abort.
880 * At least the code will be simpler to read... */
881 static void oom(const char *msg
) {
882 redisLog(REDIS_WARNING
, "%s: Out of memory\n",msg
);
887 /* ====================== Redis server networking stuff ===================== */
888 static void closeTimedoutClients(void) {
891 time_t now
= time(NULL
);
893 listRewind(server
.clients
);
894 while ((ln
= listYield(server
.clients
)) != NULL
) {
895 c
= listNodeValue(ln
);
896 if (server
.maxidletime
&&
897 !(c
->flags
& REDIS_SLAVE
) && /* no timeout for slaves */
898 !(c
->flags
& REDIS_MASTER
) && /* no timeout for masters */
899 (now
- c
->lastinteraction
> server
.maxidletime
))
901 redisLog(REDIS_DEBUG
,"Closing idle client");
903 } else if (c
->flags
& REDIS_BLOCKED
) {
904 if (c
->blockingto
!= 0 && c
->blockingto
< now
) {
905 addReply(c
,shared
.nullbulk
);
912 static int htNeedsResize(dict
*dict
) {
913 long long size
, used
;
915 size
= dictSlots(dict
);
916 used
= dictSize(dict
);
917 return (size
&& used
&& size
> DICT_HT_INITIAL_SIZE
&&
918 (used
*100/size
< REDIS_HT_MINFILL
));
921 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
922 * we resize the hash table to save memory */
923 static void tryResizeHashTables(void) {
926 for (j
= 0; j
< server
.dbnum
; j
++) {
927 if (htNeedsResize(server
.db
[j
].dict
)) {
928 redisLog(REDIS_DEBUG
,"The hash table %d is too sparse, resize it...",j
);
929 dictResize(server
.db
[j
].dict
);
930 redisLog(REDIS_DEBUG
,"Hash table %d resized.",j
);
932 if (htNeedsResize(server
.db
[j
].expires
))
933 dictResize(server
.db
[j
].expires
);
937 /* A background saving child (BGSAVE) terminated its work. Handle this. */
938 void backgroundSaveDoneHandler(int statloc
) {
939 int exitcode
= WEXITSTATUS(statloc
);
940 int bysignal
= WIFSIGNALED(statloc
);
942 if (!bysignal
&& exitcode
== 0) {
943 redisLog(REDIS_NOTICE
,
944 "Background saving terminated with success");
946 server
.lastsave
= time(NULL
);
947 } else if (!bysignal
&& exitcode
!= 0) {
948 redisLog(REDIS_WARNING
, "Background saving error");
950 redisLog(REDIS_WARNING
,
951 "Background saving terminated by signal");
952 rdbRemoveTempFile(server
.bgsavechildpid
);
954 server
.bgsavechildpid
= -1;
955 /* Possibly there are slaves waiting for a BGSAVE in order to be served
956 * (the first stage of SYNC is a bulk transfer of dump.rdb) */
957 updateSlavesWaitingBgsave(exitcode
== 0 ? REDIS_OK
: REDIS_ERR
);
960 /* A background append only file rewriting (BGREWRITEAOF) terminated its work.
962 void backgroundRewriteDoneHandler(int statloc
) {
963 int exitcode
= WEXITSTATUS(statloc
);
964 int bysignal
= WIFSIGNALED(statloc
);
966 if (!bysignal
&& exitcode
== 0) {
970 redisLog(REDIS_NOTICE
,
971 "Background append only file rewriting terminated with success");
972 /* Now it's time to flush the differences accumulated by the parent */
973 snprintf(tmpfile
,256,"temp-rewriteaof-bg-%d.aof", (int) server
.bgrewritechildpid
);
974 fd
= open(tmpfile
,O_WRONLY
|O_APPEND
);
976 redisLog(REDIS_WARNING
, "Not able to open the temp append only file produced by the child: %s", strerror(errno
));
979 /* Flush our data... */
980 if (write(fd
,server
.bgrewritebuf
,sdslen(server
.bgrewritebuf
)) !=
981 (signed) sdslen(server
.bgrewritebuf
)) {
982 redisLog(REDIS_WARNING
, "Error or short write trying to flush the parent diff of the append log file in the child temp file: %s", strerror(errno
));
986 redisLog(REDIS_NOTICE
,"Parent diff flushed into the new append log file with success (%lu bytes)",sdslen(server
.bgrewritebuf
));
987 /* Now our work is to rename the temp file into the stable file. And
988 * switch the file descriptor used by the server for append only. */
989 if (rename(tmpfile
,server
.appendfilename
) == -1) {
990 redisLog(REDIS_WARNING
,"Can't rename the temp append only file into the stable one: %s", strerror(errno
));
994 /* Mission completed... almost */
995 redisLog(REDIS_NOTICE
,"Append only file successfully rewritten.");
996 if (server
.appendfd
!= -1) {
997 /* If append only is actually enabled... */
998 close(server
.appendfd
);
999 server
.appendfd
= fd
;
1001 server
.appendseldb
= -1; /* Make sure it will issue SELECT */
1002 redisLog(REDIS_NOTICE
,"The new append only file was selected for future appends.");
1004 /* If append only is disabled we just generate a dump in this
1005 * format. Why not? */
1008 } else if (!bysignal
&& exitcode
!= 0) {
1009 redisLog(REDIS_WARNING
, "Background append only file rewriting error");
1011 redisLog(REDIS_WARNING
,
1012 "Background append only file rewriting terminated by signal");
1015 sdsfree(server
.bgrewritebuf
);
1016 server
.bgrewritebuf
= sdsempty();
1017 aofRemoveTempFile(server
.bgrewritechildpid
);
1018 server
.bgrewritechildpid
= -1;
1021 static int serverCron(struct aeEventLoop
*eventLoop
, long long id
, void *clientData
) {
1022 int j
, loops
= server
.cronloops
++;
1023 REDIS_NOTUSED(eventLoop
);
1025 REDIS_NOTUSED(clientData
);
1027 /* Update the global state with the amount of used memory */
1028 server
.usedmemory
= zmalloc_used_memory();
1030 /* Show some info about non-empty databases */
1031 for (j
= 0; j
< server
.dbnum
; j
++) {
1032 long long size
, used
, vkeys
;
1034 size
= dictSlots(server
.db
[j
].dict
);
1035 used
= dictSize(server
.db
[j
].dict
);
1036 vkeys
= dictSize(server
.db
[j
].expires
);
1037 if (!(loops
% 5) && (used
|| vkeys
)) {
1038 redisLog(REDIS_DEBUG
,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j
,used
,vkeys
,size
);
1039 /* dictPrintStats(server.dict); */
1043 /* We don't want to resize the hash tables while a bacground saving
1044 * is in progress: the saving child is created using fork() that is
1045 * implemented with a copy-on-write semantic in most modern systems, so
1046 * if we resize the HT while there is the saving child at work actually
1047 * a lot of memory movements in the parent will cause a lot of pages
1049 if (server
.bgsavechildpid
== -1) tryResizeHashTables();
1051 /* Show information about connected clients */
1053 redisLog(REDIS_DEBUG
,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
1054 listLength(server
.clients
)-listLength(server
.slaves
),
1055 listLength(server
.slaves
),
1057 dictSize(server
.sharingpool
));
1060 /* Close connections of timedout clients */
1061 if ((server
.maxidletime
&& !(loops
% 10)) || server
.blockedclients
)
1062 closeTimedoutClients();
1064 /* Check if a background saving or AOF rewrite in progress terminated */
1065 if (server
.bgsavechildpid
!= -1 || server
.bgrewritechildpid
!= -1) {
1069 if ((pid
= wait3(&statloc
,WNOHANG
,NULL
)) != 0) {
1070 if (pid
== server
.bgsavechildpid
) {
1071 backgroundSaveDoneHandler(statloc
);
1073 backgroundRewriteDoneHandler(statloc
);
1077 /* If there is not a background saving in progress check if
1078 * we have to save now */
1079 time_t now
= time(NULL
);
1080 for (j
= 0; j
< server
.saveparamslen
; j
++) {
1081 struct saveparam
*sp
= server
.saveparams
+j
;
1083 if (server
.dirty
>= sp
->changes
&&
1084 now
-server
.lastsave
> sp
->seconds
) {
1085 redisLog(REDIS_NOTICE
,"%d changes in %d seconds. Saving...",
1086 sp
->changes
, sp
->seconds
);
1087 rdbSaveBackground(server
.dbfilename
);
1093 /* Try to expire a few timed out keys. The algorithm used is adaptive and
1094 * will use few CPU cycles if there are few expiring keys, otherwise
1095 * it will get more aggressive to avoid that too much memory is used by
1096 * keys that can be removed from the keyspace. */
1097 for (j
= 0; j
< server
.dbnum
; j
++) {
1099 redisDb
*db
= server
.db
+j
;
1101 /* Continue to expire if at the end of the cycle more than 25%
1102 * of the keys were expired. */
1104 int num
= dictSize(db
->expires
);
1105 time_t now
= time(NULL
);
1108 if (num
> REDIS_EXPIRELOOKUPS_PER_CRON
)
1109 num
= REDIS_EXPIRELOOKUPS_PER_CRON
;
1114 if ((de
= dictGetRandomKey(db
->expires
)) == NULL
) break;
1115 t
= (time_t) dictGetEntryVal(de
);
1117 deleteKey(db
,dictGetEntryKey(de
));
1121 } while (expired
> REDIS_EXPIRELOOKUPS_PER_CRON
/4);
1124 /* Check if we should connect to a MASTER */
1125 if (server
.replstate
== REDIS_REPL_CONNECT
) {
1126 redisLog(REDIS_NOTICE
,"Connecting to MASTER...");
1127 if (syncWithMaster() == REDIS_OK
) {
1128 redisLog(REDIS_NOTICE
,"MASTER <-> SLAVE sync succeeded");
1134 static void createSharedObjects(void) {
1135 shared
.crlf
= createObject(REDIS_STRING
,sdsnew("\r\n"));
1136 shared
.ok
= createObject(REDIS_STRING
,sdsnew("+OK\r\n"));
1137 shared
.err
= createObject(REDIS_STRING
,sdsnew("-ERR\r\n"));
1138 shared
.emptybulk
= createObject(REDIS_STRING
,sdsnew("$0\r\n\r\n"));
1139 shared
.czero
= createObject(REDIS_STRING
,sdsnew(":0\r\n"));
1140 shared
.cone
= createObject(REDIS_STRING
,sdsnew(":1\r\n"));
1141 shared
.nullbulk
= createObject(REDIS_STRING
,sdsnew("$-1\r\n"));
1142 shared
.nullmultibulk
= createObject(REDIS_STRING
,sdsnew("*-1\r\n"));
1143 shared
.emptymultibulk
= createObject(REDIS_STRING
,sdsnew("*0\r\n"));
1144 shared
.pong
= createObject(REDIS_STRING
,sdsnew("+PONG\r\n"));
1145 shared
.queued
= createObject(REDIS_STRING
,sdsnew("+QUEUED\r\n"));
1146 shared
.wrongtypeerr
= createObject(REDIS_STRING
,sdsnew(
1147 "-ERR Operation against a key holding the wrong kind of value\r\n"));
1148 shared
.nokeyerr
= createObject(REDIS_STRING
,sdsnew(
1149 "-ERR no such key\r\n"));
1150 shared
.syntaxerr
= createObject(REDIS_STRING
,sdsnew(
1151 "-ERR syntax error\r\n"));
1152 shared
.sameobjecterr
= createObject(REDIS_STRING
,sdsnew(
1153 "-ERR source and destination objects are the same\r\n"));
1154 shared
.outofrangeerr
= createObject(REDIS_STRING
,sdsnew(
1155 "-ERR index out of range\r\n"));
1156 shared
.space
= createObject(REDIS_STRING
,sdsnew(" "));
1157 shared
.colon
= createObject(REDIS_STRING
,sdsnew(":"));
1158 shared
.plus
= createObject(REDIS_STRING
,sdsnew("+"));
1159 shared
.select0
= createStringObject("select 0\r\n",10);
1160 shared
.select1
= createStringObject("select 1\r\n",10);
1161 shared
.select2
= createStringObject("select 2\r\n",10);
1162 shared
.select3
= createStringObject("select 3\r\n",10);
1163 shared
.select4
= createStringObject("select 4\r\n",10);
1164 shared
.select5
= createStringObject("select 5\r\n",10);
1165 shared
.select6
= createStringObject("select 6\r\n",10);
1166 shared
.select7
= createStringObject("select 7\r\n",10);
1167 shared
.select8
= createStringObject("select 8\r\n",10);
1168 shared
.select9
= createStringObject("select 9\r\n",10);
1171 static void appendServerSaveParams(time_t seconds
, int changes
) {
1172 server
.saveparams
= zrealloc(server
.saveparams
,sizeof(struct saveparam
)*(server
.saveparamslen
+1));
1173 server
.saveparams
[server
.saveparamslen
].seconds
= seconds
;
1174 server
.saveparams
[server
.saveparamslen
].changes
= changes
;
1175 server
.saveparamslen
++;
1178 static void resetServerSaveParams() {
1179 zfree(server
.saveparams
);
1180 server
.saveparams
= NULL
;
1181 server
.saveparamslen
= 0;
1184 static void initServerConfig() {
1185 server
.dbnum
= REDIS_DEFAULT_DBNUM
;
1186 server
.port
= REDIS_SERVERPORT
;
1187 server
.verbosity
= REDIS_DEBUG
;
1188 server
.maxidletime
= REDIS_MAXIDLETIME
;
1189 server
.saveparams
= NULL
;
1190 server
.logfile
= NULL
; /* NULL = log on standard output */
1191 server
.bindaddr
= NULL
;
1192 server
.glueoutputbuf
= 1;
1193 server
.daemonize
= 0;
1194 server
.appendonly
= 0;
1195 server
.appendfsync
= APPENDFSYNC_ALWAYS
;
1196 server
.lastfsync
= time(NULL
);
1197 server
.appendfd
= -1;
1198 server
.appendseldb
= -1; /* Make sure the first time will not match */
1199 server
.pidfile
= "/var/run/redis.pid";
1200 server
.dbfilename
= "dump.rdb";
1201 server
.appendfilename
= "appendonly.aof";
1202 server
.requirepass
= NULL
;
1203 server
.shareobjects
= 0;
1204 server
.rdbcompression
= 1;
1205 server
.sharingpoolsize
= 1024;
1206 server
.maxclients
= 0;
1207 server
.blockedclients
= 0;
1208 server
.maxmemory
= 0;
1209 resetServerSaveParams();
1211 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1212 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1213 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1214 /* Replication related */
1216 server
.masterauth
= NULL
;
1217 server
.masterhost
= NULL
;
1218 server
.masterport
= 6379;
1219 server
.master
= NULL
;
1220 server
.replstate
= REDIS_REPL_NONE
;
1222 /* Double constants initialization */
1224 R_PosInf
= 1.0/R_Zero
;
1225 R_NegInf
= -1.0/R_Zero
;
1226 R_Nan
= R_Zero
/R_Zero
;
1229 static void initServer() {
1232 signal(SIGHUP
, SIG_IGN
);
1233 signal(SIGPIPE
, SIG_IGN
);
1234 setupSigSegvAction();
1236 server
.clients
= listCreate();
1237 server
.slaves
= listCreate();
1238 server
.monitors
= listCreate();
1239 server
.objfreelist
= listCreate();
1240 createSharedObjects();
1241 server
.el
= aeCreateEventLoop();
1242 server
.db
= zmalloc(sizeof(redisDb
)*server
.dbnum
);
1243 server
.sharingpool
= dictCreate(&setDictType
,NULL
);
1244 server
.fd
= anetTcpServer(server
.neterr
, server
.port
, server
.bindaddr
);
1245 if (server
.fd
== -1) {
1246 redisLog(REDIS_WARNING
, "Opening TCP port: %s", server
.neterr
);
1249 for (j
= 0; j
< server
.dbnum
; j
++) {
1250 server
.db
[j
].dict
= dictCreate(&hashDictType
,NULL
);
1251 server
.db
[j
].expires
= dictCreate(&setDictType
,NULL
);
1252 server
.db
[j
].blockingkeys
= dictCreate(&keylistDictType
,NULL
);
1253 server
.db
[j
].id
= j
;
1255 server
.cronloops
= 0;
1256 server
.bgsavechildpid
= -1;
1257 server
.bgrewritechildpid
= -1;
1258 server
.bgrewritebuf
= sdsempty();
1259 server
.lastsave
= time(NULL
);
1261 server
.usedmemory
= 0;
1262 server
.stat_numcommands
= 0;
1263 server
.stat_numconnections
= 0;
1264 server
.stat_starttime
= time(NULL
);
1265 aeCreateTimeEvent(server
.el
, 1, serverCron
, NULL
, NULL
);
1267 if (server
.appendonly
) {
1268 server
.appendfd
= open(server
.appendfilename
,O_WRONLY
|O_APPEND
|O_CREAT
,0644);
1269 if (server
.appendfd
== -1) {
1270 redisLog(REDIS_WARNING
, "Can't open the append-only file: %s",
1277 /* Empty the whole database */
1278 static long long emptyDb() {
1280 long long removed
= 0;
1282 for (j
= 0; j
< server
.dbnum
; j
++) {
1283 removed
+= dictSize(server
.db
[j
].dict
);
1284 dictEmpty(server
.db
[j
].dict
);
1285 dictEmpty(server
.db
[j
].expires
);
1290 static int yesnotoi(char *s
) {
1291 if (!strcasecmp(s
,"yes")) return 1;
1292 else if (!strcasecmp(s
,"no")) return 0;
1296 /* I agree, this is a very rudimental way to load a configuration...
1297 will improve later if the config gets more complex */
1298 static void loadServerConfig(char *filename
) {
1300 char buf
[REDIS_CONFIGLINE_MAX
+1], *err
= NULL
;
1304 if (filename
[0] == '-' && filename
[1] == '\0')
1307 if ((fp
= fopen(filename
,"r")) == NULL
) {
1308 redisLog(REDIS_WARNING
,"Fatal error, can't open config file");
1313 while(fgets(buf
,REDIS_CONFIGLINE_MAX
+1,fp
) != NULL
) {
1319 line
= sdstrim(line
," \t\r\n");
1321 /* Skip comments and blank lines*/
1322 if (line
[0] == '#' || line
[0] == '\0') {
1327 /* Split into arguments */
1328 argv
= sdssplitlen(line
,sdslen(line
)," ",1,&argc
);
1329 sdstolower(argv
[0]);
1331 /* Execute config directives */
1332 if (!strcasecmp(argv
[0],"timeout") && argc
== 2) {
1333 server
.maxidletime
= atoi(argv
[1]);
1334 if (server
.maxidletime
< 0) {
1335 err
= "Invalid timeout value"; goto loaderr
;
1337 } else if (!strcasecmp(argv
[0],"port") && argc
== 2) {
1338 server
.port
= atoi(argv
[1]);
1339 if (server
.port
< 1 || server
.port
> 65535) {
1340 err
= "Invalid port"; goto loaderr
;
1342 } else if (!strcasecmp(argv
[0],"bind") && argc
== 2) {
1343 server
.bindaddr
= zstrdup(argv
[1]);
1344 } else if (!strcasecmp(argv
[0],"save") && argc
== 3) {
1345 int seconds
= atoi(argv
[1]);
1346 int changes
= atoi(argv
[2]);
1347 if (seconds
< 1 || changes
< 0) {
1348 err
= "Invalid save parameters"; goto loaderr
;
1350 appendServerSaveParams(seconds
,changes
);
1351 } else if (!strcasecmp(argv
[0],"dir") && argc
== 2) {
1352 if (chdir(argv
[1]) == -1) {
1353 redisLog(REDIS_WARNING
,"Can't chdir to '%s': %s",
1354 argv
[1], strerror(errno
));
1357 } else if (!strcasecmp(argv
[0],"loglevel") && argc
== 2) {
1358 if (!strcasecmp(argv
[1],"debug")) server
.verbosity
= REDIS_DEBUG
;
1359 else if (!strcasecmp(argv
[1],"notice")) server
.verbosity
= REDIS_NOTICE
;
1360 else if (!strcasecmp(argv
[1],"warning")) server
.verbosity
= REDIS_WARNING
;
1362 err
= "Invalid log level. Must be one of debug, notice, warning";
1365 } else if (!strcasecmp(argv
[0],"logfile") && argc
== 2) {
1368 server
.logfile
= zstrdup(argv
[1]);
1369 if (!strcasecmp(server
.logfile
,"stdout")) {
1370 zfree(server
.logfile
);
1371 server
.logfile
= NULL
;
1373 if (server
.logfile
) {
1374 /* Test if we are able to open the file. The server will not
1375 * be able to abort just for this problem later... */
1376 logfp
= fopen(server
.logfile
,"a");
1377 if (logfp
== NULL
) {
1378 err
= sdscatprintf(sdsempty(),
1379 "Can't open the log file: %s", strerror(errno
));
1384 } else if (!strcasecmp(argv
[0],"databases") && argc
== 2) {
1385 server
.dbnum
= atoi(argv
[1]);
1386 if (server
.dbnum
< 1) {
1387 err
= "Invalid number of databases"; goto loaderr
;
1389 } else if (!strcasecmp(argv
[0],"maxclients") && argc
== 2) {
1390 server
.maxclients
= atoi(argv
[1]);
1391 } else if (!strcasecmp(argv
[0],"maxmemory") && argc
== 2) {
1392 server
.maxmemory
= strtoll(argv
[1], NULL
, 10);
1393 } else if (!strcasecmp(argv
[0],"slaveof") && argc
== 3) {
1394 server
.masterhost
= sdsnew(argv
[1]);
1395 server
.masterport
= atoi(argv
[2]);
1396 server
.replstate
= REDIS_REPL_CONNECT
;
1397 } else if (!strcasecmp(argv
[0],"masterauth") && argc
== 2) {
1398 server
.masterauth
= zstrdup(argv
[1]);
1399 } else if (!strcasecmp(argv
[0],"glueoutputbuf") && argc
== 2) {
1400 if ((server
.glueoutputbuf
= yesnotoi(argv
[1])) == -1) {
1401 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1403 } else if (!strcasecmp(argv
[0],"shareobjects") && argc
== 2) {
1404 if ((server
.shareobjects
= yesnotoi(argv
[1])) == -1) {
1405 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1407 } else if (!strcasecmp(argv
[0],"rdbcompression") && argc
== 2) {
1408 if ((server
.rdbcompression
= yesnotoi(argv
[1])) == -1) {
1409 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1411 } else if (!strcasecmp(argv
[0],"shareobjectspoolsize") && argc
== 2) {
1412 server
.sharingpoolsize
= atoi(argv
[1]);
1413 if (server
.sharingpoolsize
< 1) {
1414 err
= "invalid object sharing pool size"; goto loaderr
;
1416 } else if (!strcasecmp(argv
[0],"daemonize") && argc
== 2) {
1417 if ((server
.daemonize
= yesnotoi(argv
[1])) == -1) {
1418 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1420 } else if (!strcasecmp(argv
[0],"appendonly") && argc
== 2) {
1421 if ((server
.appendonly
= yesnotoi(argv
[1])) == -1) {
1422 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1424 } else if (!strcasecmp(argv
[0],"appendfsync") && argc
== 2) {
1425 if (!strcasecmp(argv
[1],"no")) {
1426 server
.appendfsync
= APPENDFSYNC_NO
;
1427 } else if (!strcasecmp(argv
[1],"always")) {
1428 server
.appendfsync
= APPENDFSYNC_ALWAYS
;
1429 } else if (!strcasecmp(argv
[1],"everysec")) {
1430 server
.appendfsync
= APPENDFSYNC_EVERYSEC
;
1432 err
= "argument must be 'no', 'always' or 'everysec'";
1435 } else if (!strcasecmp(argv
[0],"requirepass") && argc
== 2) {
1436 server
.requirepass
= zstrdup(argv
[1]);
1437 } else if (!strcasecmp(argv
[0],"pidfile") && argc
== 2) {
1438 server
.pidfile
= zstrdup(argv
[1]);
1439 } else if (!strcasecmp(argv
[0],"dbfilename") && argc
== 2) {
1440 server
.dbfilename
= zstrdup(argv
[1]);
1442 err
= "Bad directive or wrong number of arguments"; goto loaderr
;
1444 for (j
= 0; j
< argc
; j
++)
1449 if (fp
!= stdin
) fclose(fp
);
1453 fprintf(stderr
, "\n*** FATAL CONFIG FILE ERROR ***\n");
1454 fprintf(stderr
, "Reading the configuration file, at line %d\n", linenum
);
1455 fprintf(stderr
, ">>> '%s'\n", line
);
1456 fprintf(stderr
, "%s\n", err
);
1460 static void freeClientArgv(redisClient
*c
) {
1463 for (j
= 0; j
< c
->argc
; j
++)
1464 decrRefCount(c
->argv
[j
]);
1465 for (j
= 0; j
< c
->mbargc
; j
++)
1466 decrRefCount(c
->mbargv
[j
]);
1471 static void freeClient(redisClient
*c
) {
1474 /* Note that if the client we are freeing is blocked into a blocking
1475 * call, we have to set querybuf to NULL *before* to call unblockClient()
1476 * to avoid processInputBuffer() will get called. Also it is important
1477 * to remove the file events after this, because this call adds
1478 * the READABLE event. */
1479 sdsfree(c
->querybuf
);
1481 if (c
->flags
& REDIS_BLOCKED
)
1484 aeDeleteFileEvent(server
.el
,c
->fd
,AE_READABLE
);
1485 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1486 listRelease(c
->reply
);
1489 ln
= listSearchKey(server
.clients
,c
);
1490 redisAssert(ln
!= NULL
);
1491 listDelNode(server
.clients
,ln
);
1492 if (c
->flags
& REDIS_SLAVE
) {
1493 if (c
->replstate
== REDIS_REPL_SEND_BULK
&& c
->repldbfd
!= -1)
1495 list
*l
= (c
->flags
& REDIS_MONITOR
) ? server
.monitors
: server
.slaves
;
1496 ln
= listSearchKey(l
,c
);
1497 redisAssert(ln
!= NULL
);
1500 if (c
->flags
& REDIS_MASTER
) {
1501 server
.master
= NULL
;
1502 server
.replstate
= REDIS_REPL_CONNECT
;
1506 freeClientMultiState(c
);
1510 #define GLUEREPLY_UP_TO (1024)
1511 static void glueReplyBuffersIfNeeded(redisClient
*c
) {
1513 char buf
[GLUEREPLY_UP_TO
];
1517 listRewind(c
->reply
);
1518 while((ln
= listYield(c
->reply
))) {
1522 objlen
= sdslen(o
->ptr
);
1523 if (copylen
+ objlen
<= GLUEREPLY_UP_TO
) {
1524 memcpy(buf
+copylen
,o
->ptr
,objlen
);
1526 listDelNode(c
->reply
,ln
);
1528 if (copylen
== 0) return;
1532 /* Now the output buffer is empty, add the new single element */
1533 o
= createObject(REDIS_STRING
,sdsnewlen(buf
,copylen
));
1534 listAddNodeHead(c
->reply
,o
);
1537 static void sendReplyToClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1538 redisClient
*c
= privdata
;
1539 int nwritten
= 0, totwritten
= 0, objlen
;
1542 REDIS_NOTUSED(mask
);
1544 /* Use writev() if we have enough buffers to send */
1545 if (!server
.glueoutputbuf
&&
1546 listLength(c
->reply
) > REDIS_WRITEV_THRESHOLD
&&
1547 !(c
->flags
& REDIS_MASTER
))
1549 sendReplyToClientWritev(el
, fd
, privdata
, mask
);
1553 while(listLength(c
->reply
)) {
1554 if (server
.glueoutputbuf
&& listLength(c
->reply
) > 1)
1555 glueReplyBuffersIfNeeded(c
);
1557 o
= listNodeValue(listFirst(c
->reply
));
1558 objlen
= sdslen(o
->ptr
);
1561 listDelNode(c
->reply
,listFirst(c
->reply
));
1565 if (c
->flags
& REDIS_MASTER
) {
1566 /* Don't reply to a master */
1567 nwritten
= objlen
- c
->sentlen
;
1569 nwritten
= write(fd
, ((char*)o
->ptr
)+c
->sentlen
, objlen
- c
->sentlen
);
1570 if (nwritten
<= 0) break;
1572 c
->sentlen
+= nwritten
;
1573 totwritten
+= nwritten
;
1574 /* If we fully sent the object on head go to the next one */
1575 if (c
->sentlen
== objlen
) {
1576 listDelNode(c
->reply
,listFirst(c
->reply
));
1579 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1580 * bytes, in a single threaded server it's a good idea to serve
1581 * other clients as well, even if a very large request comes from
1582 * super fast link that is always able to accept data (in real world
1583 * scenario think about 'KEYS *' against the loopback interfae) */
1584 if (totwritten
> REDIS_MAX_WRITE_PER_EVENT
) break;
1586 if (nwritten
== -1) {
1587 if (errno
== EAGAIN
) {
1590 redisLog(REDIS_DEBUG
,
1591 "Error writing to client: %s", strerror(errno
));
1596 if (totwritten
> 0) c
->lastinteraction
= time(NULL
);
1597 if (listLength(c
->reply
) == 0) {
1599 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1603 static void sendReplyToClientWritev(aeEventLoop
*el
, int fd
, void *privdata
, int mask
)
1605 redisClient
*c
= privdata
;
1606 int nwritten
= 0, totwritten
= 0, objlen
, willwrite
;
1608 struct iovec iov
[REDIS_WRITEV_IOVEC_COUNT
];
1609 int offset
, ion
= 0;
1611 REDIS_NOTUSED(mask
);
1614 while (listLength(c
->reply
)) {
1615 offset
= c
->sentlen
;
1619 /* fill-in the iov[] array */
1620 for(node
= listFirst(c
->reply
); node
; node
= listNextNode(node
)) {
1621 o
= listNodeValue(node
);
1622 objlen
= sdslen(o
->ptr
);
1624 if (totwritten
+ objlen
- offset
> REDIS_MAX_WRITE_PER_EVENT
)
1627 if(ion
== REDIS_WRITEV_IOVEC_COUNT
)
1628 break; /* no more iovecs */
1630 iov
[ion
].iov_base
= ((char*)o
->ptr
) + offset
;
1631 iov
[ion
].iov_len
= objlen
- offset
;
1632 willwrite
+= objlen
- offset
;
1633 offset
= 0; /* just for the first item */
1640 /* write all collected blocks at once */
1641 if((nwritten
= writev(fd
, iov
, ion
)) < 0) {
1642 if (errno
!= EAGAIN
) {
1643 redisLog(REDIS_DEBUG
,
1644 "Error writing to client: %s", strerror(errno
));
1651 totwritten
+= nwritten
;
1652 offset
= c
->sentlen
;
1654 /* remove written robjs from c->reply */
1655 while (nwritten
&& listLength(c
->reply
)) {
1656 o
= listNodeValue(listFirst(c
->reply
));
1657 objlen
= sdslen(o
->ptr
);
1659 if(nwritten
>= objlen
- offset
) {
1660 listDelNode(c
->reply
, listFirst(c
->reply
));
1661 nwritten
-= objlen
- offset
;
1665 c
->sentlen
+= nwritten
;
1673 c
->lastinteraction
= time(NULL
);
1675 if (listLength(c
->reply
) == 0) {
1677 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1681 static struct redisCommand
*lookupCommand(char *name
) {
1683 while(cmdTable
[j
].name
!= NULL
) {
1684 if (!strcasecmp(name
,cmdTable
[j
].name
)) return &cmdTable
[j
];
1690 /* resetClient prepare the client to process the next command */
1691 static void resetClient(redisClient
*c
) {
1697 /* Call() is the core of Redis execution of a command */
1698 static void call(redisClient
*c
, struct redisCommand
*cmd
) {
1701 dirty
= server
.dirty
;
1703 if (server
.appendonly
&& server
.dirty
-dirty
)
1704 feedAppendOnlyFile(cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1705 if (server
.dirty
-dirty
&& listLength(server
.slaves
))
1706 replicationFeedSlaves(server
.slaves
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1707 if (listLength(server
.monitors
))
1708 replicationFeedSlaves(server
.monitors
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1709 server
.stat_numcommands
++;
1712 /* If this function gets called we already read a whole
1713 * command, argments are in the client argv/argc fields.
1714 * processCommand() execute the command or prepare the
1715 * server for a bulk read from the client.
1717 * If 1 is returned the client is still alive and valid and
1718 * and other operations can be performed by the caller. Otherwise
1719 * if 0 is returned the client was destroied (i.e. after QUIT). */
1720 static int processCommand(redisClient
*c
) {
1721 struct redisCommand
*cmd
;
1723 /* Free some memory if needed (maxmemory setting) */
1724 if (server
.maxmemory
) freeMemoryIfNeeded();
1726 /* Handle the multi bulk command type. This is an alternative protocol
1727 * supported by Redis in order to receive commands that are composed of
1728 * multiple binary-safe "bulk" arguments. The latency of processing is
1729 * a bit higher but this allows things like multi-sets, so if this
1730 * protocol is used only for MSET and similar commands this is a big win. */
1731 if (c
->multibulk
== 0 && c
->argc
== 1 && ((char*)(c
->argv
[0]->ptr
))[0] == '*') {
1732 c
->multibulk
= atoi(((char*)c
->argv
[0]->ptr
)+1);
1733 if (c
->multibulk
<= 0) {
1737 decrRefCount(c
->argv
[c
->argc
-1]);
1741 } else if (c
->multibulk
) {
1742 if (c
->bulklen
== -1) {
1743 if (((char*)c
->argv
[0]->ptr
)[0] != '$') {
1744 addReplySds(c
,sdsnew("-ERR multi bulk protocol error\r\n"));
1748 int bulklen
= atoi(((char*)c
->argv
[0]->ptr
)+1);
1749 decrRefCount(c
->argv
[0]);
1750 if (bulklen
< 0 || bulklen
> 1024*1024*1024) {
1752 addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n"));
1757 c
->bulklen
= bulklen
+2; /* add two bytes for CR+LF */
1761 c
->mbargv
= zrealloc(c
->mbargv
,(sizeof(robj
*))*(c
->mbargc
+1));
1762 c
->mbargv
[c
->mbargc
] = c
->argv
[0];
1766 if (c
->multibulk
== 0) {
1770 /* Here we need to swap the multi-bulk argc/argv with the
1771 * normal argc/argv of the client structure. */
1773 c
->argv
= c
->mbargv
;
1774 c
->mbargv
= auxargv
;
1777 c
->argc
= c
->mbargc
;
1778 c
->mbargc
= auxargc
;
1780 /* We need to set bulklen to something different than -1
1781 * in order for the code below to process the command without
1782 * to try to read the last argument of a bulk command as
1783 * a special argument. */
1785 /* continue below and process the command */
1792 /* -- end of multi bulk commands processing -- */
1794 /* The QUIT command is handled as a special case. Normal command
1795 * procs are unable to close the client connection safely */
1796 if (!strcasecmp(c
->argv
[0]->ptr
,"quit")) {
1800 cmd
= lookupCommand(c
->argv
[0]->ptr
);
1803 sdscatprintf(sdsempty(), "-ERR unknown command '%s'\r\n",
1804 (char*)c
->argv
[0]->ptr
));
1807 } else if ((cmd
->arity
> 0 && cmd
->arity
!= c
->argc
) ||
1808 (c
->argc
< -cmd
->arity
)) {
1810 sdscatprintf(sdsempty(),
1811 "-ERR wrong number of arguments for '%s' command\r\n",
1815 } else if (server
.maxmemory
&& cmd
->flags
& REDIS_CMD_DENYOOM
&& zmalloc_used_memory() > server
.maxmemory
) {
1816 addReplySds(c
,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1819 } else if (cmd
->flags
& REDIS_CMD_BULK
&& c
->bulklen
== -1) {
1820 int bulklen
= atoi(c
->argv
[c
->argc
-1]->ptr
);
1822 decrRefCount(c
->argv
[c
->argc
-1]);
1823 if (bulklen
< 0 || bulklen
> 1024*1024*1024) {
1825 addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n"));
1830 c
->bulklen
= bulklen
+2; /* add two bytes for CR+LF */
1831 /* It is possible that the bulk read is already in the
1832 * buffer. Check this condition and handle it accordingly.
1833 * This is just a fast path, alternative to call processInputBuffer().
1834 * It's a good idea since the code is small and this condition
1835 * happens most of the times. */
1836 if ((signed)sdslen(c
->querybuf
) >= c
->bulklen
) {
1837 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
1839 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
1844 /* Let's try to share objects on the command arguments vector */
1845 if (server
.shareobjects
) {
1847 for(j
= 1; j
< c
->argc
; j
++)
1848 c
->argv
[j
] = tryObjectSharing(c
->argv
[j
]);
1850 /* Let's try to encode the bulk object to save space. */
1851 if (cmd
->flags
& REDIS_CMD_BULK
)
1852 tryObjectEncoding(c
->argv
[c
->argc
-1]);
1854 /* Check if the user is authenticated */
1855 if (server
.requirepass
&& !c
->authenticated
&& cmd
->proc
!= authCommand
) {
1856 addReplySds(c
,sdsnew("-ERR operation not permitted\r\n"));
1861 /* Exec the command */
1862 if (c
->flags
& REDIS_MULTI
&& cmd
->proc
!= execCommand
) {
1863 queueMultiCommand(c
,cmd
);
1864 addReply(c
,shared
.queued
);
1869 /* Prepare the client for the next command */
1870 if (c
->flags
& REDIS_CLOSE
) {
1878 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
) {
1882 /* (args*2)+1 is enough room for args, spaces, newlines */
1883 robj
*static_outv
[REDIS_STATIC_ARGS
*2+1];
1885 if (argc
<= REDIS_STATIC_ARGS
) {
1888 outv
= zmalloc(sizeof(robj
*)*(argc
*2+1));
1891 for (j
= 0; j
< argc
; j
++) {
1892 if (j
!= 0) outv
[outc
++] = shared
.space
;
1893 if ((cmd
->flags
& REDIS_CMD_BULK
) && j
== argc
-1) {
1896 lenobj
= createObject(REDIS_STRING
,
1897 sdscatprintf(sdsempty(),"%lu\r\n",
1898 (unsigned long) stringObjectLen(argv
[j
])));
1899 lenobj
->refcount
= 0;
1900 outv
[outc
++] = lenobj
;
1902 outv
[outc
++] = argv
[j
];
1904 outv
[outc
++] = shared
.crlf
;
1906 /* Increment all the refcounts at start and decrement at end in order to
1907 * be sure to free objects if there is no slave in a replication state
1908 * able to be feed with commands */
1909 for (j
= 0; j
< outc
; j
++) incrRefCount(outv
[j
]);
1911 while((ln
= listYield(slaves
))) {
1912 redisClient
*slave
= ln
->value
;
1914 /* Don't feed slaves that are still waiting for BGSAVE to start */
1915 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) continue;
1917 /* Feed all the other slaves, MONITORs and so on */
1918 if (slave
->slaveseldb
!= dictid
) {
1922 case 0: selectcmd
= shared
.select0
; break;
1923 case 1: selectcmd
= shared
.select1
; break;
1924 case 2: selectcmd
= shared
.select2
; break;
1925 case 3: selectcmd
= shared
.select3
; break;
1926 case 4: selectcmd
= shared
.select4
; break;
1927 case 5: selectcmd
= shared
.select5
; break;
1928 case 6: selectcmd
= shared
.select6
; break;
1929 case 7: selectcmd
= shared
.select7
; break;
1930 case 8: selectcmd
= shared
.select8
; break;
1931 case 9: selectcmd
= shared
.select9
; break;
1933 selectcmd
= createObject(REDIS_STRING
,
1934 sdscatprintf(sdsempty(),"select %d\r\n",dictid
));
1935 selectcmd
->refcount
= 0;
1938 addReply(slave
,selectcmd
);
1939 slave
->slaveseldb
= dictid
;
1941 for (j
= 0; j
< outc
; j
++) addReply(slave
,outv
[j
]);
1943 for (j
= 0; j
< outc
; j
++) decrRefCount(outv
[j
]);
1944 if (outv
!= static_outv
) zfree(outv
);
1947 static void processInputBuffer(redisClient
*c
) {
1949 /* Before to process the input buffer, make sure the client is not
1950 * waitig for a blocking operation such as BLPOP. Note that the first
1951 * iteration the client is never blocked, otherwise the processInputBuffer
1952 * would not be called at all, but after the execution of the first commands
1953 * in the input buffer the client may be blocked, and the "goto again"
1954 * will try to reiterate. The following line will make it return asap. */
1955 if (c
->flags
& REDIS_BLOCKED
) return;
1956 if (c
->bulklen
== -1) {
1957 /* Read the first line of the query */
1958 char *p
= strchr(c
->querybuf
,'\n');
1965 query
= c
->querybuf
;
1966 c
->querybuf
= sdsempty();
1967 querylen
= 1+(p
-(query
));
1968 if (sdslen(query
) > querylen
) {
1969 /* leave data after the first line of the query in the buffer */
1970 c
->querybuf
= sdscatlen(c
->querybuf
,query
+querylen
,sdslen(query
)-querylen
);
1972 *p
= '\0'; /* remove "\n" */
1973 if (*(p
-1) == '\r') *(p
-1) = '\0'; /* and "\r" if any */
1974 sdsupdatelen(query
);
1976 /* Now we can split the query in arguments */
1977 argv
= sdssplitlen(query
,sdslen(query
)," ",1,&argc
);
1980 if (c
->argv
) zfree(c
->argv
);
1981 c
->argv
= zmalloc(sizeof(robj
*)*argc
);
1983 for (j
= 0; j
< argc
; j
++) {
1984 if (sdslen(argv
[j
])) {
1985 c
->argv
[c
->argc
] = createObject(REDIS_STRING
,argv
[j
]);
1993 /* Execute the command. If the client is still valid
1994 * after processCommand() return and there is something
1995 * on the query buffer try to process the next command. */
1996 if (processCommand(c
) && sdslen(c
->querybuf
)) goto again
;
1998 /* Nothing to process, argc == 0. Just process the query
1999 * buffer if it's not empty or return to the caller */
2000 if (sdslen(c
->querybuf
)) goto again
;
2003 } else if (sdslen(c
->querybuf
) >= REDIS_REQUEST_MAX_SIZE
) {
2004 redisLog(REDIS_DEBUG
, "Client protocol error");
2009 /* Bulk read handling. Note that if we are at this point
2010 the client already sent a command terminated with a newline,
2011 we are reading the bulk data that is actually the last
2012 argument of the command. */
2013 int qbl
= sdslen(c
->querybuf
);
2015 if (c
->bulklen
<= qbl
) {
2016 /* Copy everything but the final CRLF as final argument */
2017 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
2019 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
2020 /* Process the command. If the client is still valid after
2021 * the processing and there is more data in the buffer
2022 * try to parse it. */
2023 if (processCommand(c
) && sdslen(c
->querybuf
)) goto again
;
2029 static void readQueryFromClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
2030 redisClient
*c
= (redisClient
*) privdata
;
2031 char buf
[REDIS_IOBUF_LEN
];
2034 REDIS_NOTUSED(mask
);
2036 nread
= read(fd
, buf
, REDIS_IOBUF_LEN
);
2038 if (errno
== EAGAIN
) {
2041 redisLog(REDIS_DEBUG
, "Reading from client: %s",strerror(errno
));
2045 } else if (nread
== 0) {
2046 redisLog(REDIS_DEBUG
, "Client closed connection");
2051 c
->querybuf
= sdscatlen(c
->querybuf
, buf
, nread
);
2052 c
->lastinteraction
= time(NULL
);
2056 processInputBuffer(c
);
2059 static int selectDb(redisClient
*c
, int id
) {
2060 if (id
< 0 || id
>= server
.dbnum
)
2062 c
->db
= &server
.db
[id
];
2066 static void *dupClientReplyValue(void *o
) {
2067 incrRefCount((robj
*)o
);
2071 static redisClient
*createClient(int fd
) {
2072 redisClient
*c
= zmalloc(sizeof(*c
));
2074 anetNonBlock(NULL
,fd
);
2075 anetTcpNoDelay(NULL
,fd
);
2076 if (!c
) return NULL
;
2079 c
->querybuf
= sdsempty();
2088 c
->lastinteraction
= time(NULL
);
2089 c
->authenticated
= 0;
2090 c
->replstate
= REDIS_REPL_NONE
;
2091 c
->reply
= listCreate();
2092 c
->blockingkey
= NULL
;
2093 listSetFreeMethod(c
->reply
,decrRefCount
);
2094 listSetDupMethod(c
->reply
,dupClientReplyValue
);
2095 if (aeCreateFileEvent(server
.el
, c
->fd
, AE_READABLE
,
2096 readQueryFromClient
, c
) == AE_ERR
) {
2100 listAddNodeTail(server
.clients
,c
);
2101 initClientMultiState(c
);
2105 static void addReply(redisClient
*c
, robj
*obj
) {
2106 if (listLength(c
->reply
) == 0 &&
2107 (c
->replstate
== REDIS_REPL_NONE
||
2108 c
->replstate
== REDIS_REPL_ONLINE
) &&
2109 aeCreateFileEvent(server
.el
, c
->fd
, AE_WRITABLE
,
2110 sendReplyToClient
, c
) == AE_ERR
) return;
2111 listAddNodeTail(c
->reply
,getDecodedObject(obj
));
2114 static void addReplySds(redisClient
*c
, sds s
) {
2115 robj
*o
= createObject(REDIS_STRING
,s
);
2120 static void addReplyDouble(redisClient
*c
, double d
) {
2123 snprintf(buf
,sizeof(buf
),"%.17g",d
);
2124 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n%s\r\n",
2125 (unsigned long) strlen(buf
),buf
));
2128 static void addReplyBulkLen(redisClient
*c
, robj
*obj
) {
2131 if (obj
->encoding
== REDIS_ENCODING_RAW
) {
2132 len
= sdslen(obj
->ptr
);
2134 long n
= (long)obj
->ptr
;
2136 /* Compute how many bytes will take this integer as a radix 10 string */
2142 while((n
= n
/10) != 0) {
2146 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n",(unsigned long)len
));
2149 static void acceptHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
2154 REDIS_NOTUSED(mask
);
2155 REDIS_NOTUSED(privdata
);
2157 cfd
= anetAccept(server
.neterr
, fd
, cip
, &cport
);
2158 if (cfd
== AE_ERR
) {
2159 redisLog(REDIS_DEBUG
,"Accepting client connection: %s", server
.neterr
);
2162 redisLog(REDIS_DEBUG
,"Accepted %s:%d", cip
, cport
);
2163 if ((c
= createClient(cfd
)) == NULL
) {
2164 redisLog(REDIS_WARNING
,"Error allocating resoures for the client");
2165 close(cfd
); /* May be already closed, just ingore errors */
2168 /* If maxclient directive is set and this is one client more... close the
2169 * connection. Note that we create the client instead to check before
2170 * for this condition, since now the socket is already set in nonblocking
2171 * mode and we can send an error for free using the Kernel I/O */
2172 if (server
.maxclients
&& listLength(server
.clients
) > server
.maxclients
) {
2173 char *err
= "-ERR max number of clients reached\r\n";
2175 /* That's a best effort error message, don't check write errors */
2176 if (write(c
->fd
,err
,strlen(err
)) == -1) {
2177 /* Nothing to do, Just to avoid the warning... */
2182 server
.stat_numconnections
++;
2185 /* ======================= Redis objects implementation ===================== */
2187 static robj
*createObject(int type
, void *ptr
) {
2190 if (listLength(server
.objfreelist
)) {
2191 listNode
*head
= listFirst(server
.objfreelist
);
2192 o
= listNodeValue(head
);
2193 listDelNode(server
.objfreelist
,head
);
2195 o
= zmalloc(sizeof(*o
));
2198 o
->encoding
= REDIS_ENCODING_RAW
;
2204 static robj
*createStringObject(char *ptr
, size_t len
) {
2205 return createObject(REDIS_STRING
,sdsnewlen(ptr
,len
));
2208 static robj
*createListObject(void) {
2209 list
*l
= listCreate();
2211 listSetFreeMethod(l
,decrRefCount
);
2212 return createObject(REDIS_LIST
,l
);
2215 static robj
*createSetObject(void) {
2216 dict
*d
= dictCreate(&setDictType
,NULL
);
2217 return createObject(REDIS_SET
,d
);
2220 static robj
*createZsetObject(void) {
2221 zset
*zs
= zmalloc(sizeof(*zs
));
2223 zs
->dict
= dictCreate(&zsetDictType
,NULL
);
2224 zs
->zsl
= zslCreate();
2225 return createObject(REDIS_ZSET
,zs
);
2228 static void freeStringObject(robj
*o
) {
2229 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2234 static void freeListObject(robj
*o
) {
2235 listRelease((list
*) o
->ptr
);
2238 static void freeSetObject(robj
*o
) {
2239 dictRelease((dict
*) o
->ptr
);
2242 static void freeZsetObject(robj
*o
) {
2245 dictRelease(zs
->dict
);
2250 static void freeHashObject(robj
*o
) {
2251 dictRelease((dict
*) o
->ptr
);
2254 static void incrRefCount(robj
*o
) {
2256 #ifdef DEBUG_REFCOUNT
2257 if (o
->type
== REDIS_STRING
)
2258 printf("Increment '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
);
2262 static void decrRefCount(void *obj
) {
2265 #ifdef DEBUG_REFCOUNT
2266 if (o
->type
== REDIS_STRING
)
2267 printf("Decrement '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
-1);
2269 if (--(o
->refcount
) == 0) {
2271 case REDIS_STRING
: freeStringObject(o
); break;
2272 case REDIS_LIST
: freeListObject(o
); break;
2273 case REDIS_SET
: freeSetObject(o
); break;
2274 case REDIS_ZSET
: freeZsetObject(o
); break;
2275 case REDIS_HASH
: freeHashObject(o
); break;
2276 default: redisAssert(0 != 0); break;
2278 if (listLength(server
.objfreelist
) > REDIS_OBJFREELIST_MAX
||
2279 !listAddNodeHead(server
.objfreelist
,o
))
2284 static robj
*lookupKey(redisDb
*db
, robj
*key
) {
2285 dictEntry
*de
= dictFind(db
->dict
,key
);
2286 return de
? dictGetEntryVal(de
) : NULL
;
2289 static robj
*lookupKeyRead(redisDb
*db
, robj
*key
) {
2290 expireIfNeeded(db
,key
);
2291 return lookupKey(db
,key
);
2294 static robj
*lookupKeyWrite(redisDb
*db
, robj
*key
) {
2295 deleteIfVolatile(db
,key
);
2296 return lookupKey(db
,key
);
2299 static int deleteKey(redisDb
*db
, robj
*key
) {
2302 /* We need to protect key from destruction: after the first dictDelete()
2303 * it may happen that 'key' is no longer valid if we don't increment
2304 * it's count. This may happen when we get the object reference directly
2305 * from the hash table with dictRandomKey() or dict iterators */
2307 if (dictSize(db
->expires
)) dictDelete(db
->expires
,key
);
2308 retval
= dictDelete(db
->dict
,key
);
2311 return retval
== DICT_OK
;
2314 /* Try to share an object against the shared objects pool */
2315 static robj
*tryObjectSharing(robj
*o
) {
2316 struct dictEntry
*de
;
2319 if (o
== NULL
|| server
.shareobjects
== 0) return o
;
2321 redisAssert(o
->type
== REDIS_STRING
);
2322 de
= dictFind(server
.sharingpool
,o
);
2324 robj
*shared
= dictGetEntryKey(de
);
2326 c
= ((unsigned long) dictGetEntryVal(de
))+1;
2327 dictGetEntryVal(de
) = (void*) c
;
2328 incrRefCount(shared
);
2332 /* Here we are using a stream algorihtm: Every time an object is
2333 * shared we increment its count, everytime there is a miss we
2334 * recrement the counter of a random object. If this object reaches
2335 * zero we remove the object and put the current object instead. */
2336 if (dictSize(server
.sharingpool
) >=
2337 server
.sharingpoolsize
) {
2338 de
= dictGetRandomKey(server
.sharingpool
);
2339 redisAssert(de
!= NULL
);
2340 c
= ((unsigned long) dictGetEntryVal(de
))-1;
2341 dictGetEntryVal(de
) = (void*) c
;
2343 dictDelete(server
.sharingpool
,de
->key
);
2346 c
= 0; /* If the pool is empty we want to add this object */
2351 retval
= dictAdd(server
.sharingpool
,o
,(void*)1);
2352 redisAssert(retval
== DICT_OK
);
2359 /* Check if the nul-terminated string 's' can be represented by a long
2360 * (that is, is a number that fits into long without any other space or
2361 * character before or after the digits).
2363 * If so, the function returns REDIS_OK and *longval is set to the value
2364 * of the number. Otherwise REDIS_ERR is returned */
2365 static int isStringRepresentableAsLong(sds s
, long *longval
) {
2366 char buf
[32], *endptr
;
2370 value
= strtol(s
, &endptr
, 10);
2371 if (endptr
[0] != '\0') return REDIS_ERR
;
2372 slen
= snprintf(buf
,32,"%ld",value
);
2374 /* If the number converted back into a string is not identical
2375 * then it's not possible to encode the string as integer */
2376 if (sdslen(s
) != (unsigned)slen
|| memcmp(buf
,s
,slen
)) return REDIS_ERR
;
2377 if (longval
) *longval
= value
;
2381 /* Try to encode a string object in order to save space */
2382 static int tryObjectEncoding(robj
*o
) {
2386 if (o
->encoding
!= REDIS_ENCODING_RAW
)
2387 return REDIS_ERR
; /* Already encoded */
2389 /* It's not save to encode shared objects: shared objects can be shared
2390 * everywhere in the "object space" of Redis. Encoded objects can only
2391 * appear as "values" (and not, for instance, as keys) */
2392 if (o
->refcount
> 1) return REDIS_ERR
;
2394 /* Currently we try to encode only strings */
2395 redisAssert(o
->type
== REDIS_STRING
);
2397 /* Check if we can represent this string as a long integer */
2398 if (isStringRepresentableAsLong(s
,&value
) == REDIS_ERR
) return REDIS_ERR
;
2400 /* Ok, this object can be encoded */
2401 o
->encoding
= REDIS_ENCODING_INT
;
2403 o
->ptr
= (void*) value
;
2407 /* Get a decoded version of an encoded object (returned as a new object).
2408 * If the object is already raw-encoded just increment the ref count. */
2409 static robj
*getDecodedObject(robj
*o
) {
2412 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2416 if (o
->type
== REDIS_STRING
&& o
->encoding
== REDIS_ENCODING_INT
) {
2419 snprintf(buf
,32,"%ld",(long)o
->ptr
);
2420 dec
= createStringObject(buf
,strlen(buf
));
2423 redisAssert(1 != 1);
2427 /* Compare two string objects via strcmp() or alike.
2428 * Note that the objects may be integer-encoded. In such a case we
2429 * use snprintf() to get a string representation of the numbers on the stack
2430 * and compare the strings, it's much faster than calling getDecodedObject().
2432 * Important note: if objects are not integer encoded, but binary-safe strings,
2433 * sdscmp() from sds.c will apply memcmp() so this function ca be considered
2435 static int compareStringObjects(robj
*a
, robj
*b
) {
2436 redisAssert(a
->type
== REDIS_STRING
&& b
->type
== REDIS_STRING
);
2437 char bufa
[128], bufb
[128], *astr
, *bstr
;
2440 if (a
== b
) return 0;
2441 if (a
->encoding
!= REDIS_ENCODING_RAW
) {
2442 snprintf(bufa
,sizeof(bufa
),"%ld",(long) a
->ptr
);
2448 if (b
->encoding
!= REDIS_ENCODING_RAW
) {
2449 snprintf(bufb
,sizeof(bufb
),"%ld",(long) b
->ptr
);
2455 return bothsds
? sdscmp(astr
,bstr
) : strcmp(astr
,bstr
);
2458 static size_t stringObjectLen(robj
*o
) {
2459 redisAssert(o
->type
== REDIS_STRING
);
2460 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2461 return sdslen(o
->ptr
);
2465 return snprintf(buf
,32,"%ld",(long)o
->ptr
);
2469 /*============================ DB saving/loading ============================ */
2471 static int rdbSaveType(FILE *fp
, unsigned char type
) {
2472 if (fwrite(&type
,1,1,fp
) == 0) return -1;
2476 static int rdbSaveTime(FILE *fp
, time_t t
) {
2477 int32_t t32
= (int32_t) t
;
2478 if (fwrite(&t32
,4,1,fp
) == 0) return -1;
2482 /* check rdbLoadLen() comments for more info */
2483 static int rdbSaveLen(FILE *fp
, uint32_t len
) {
2484 unsigned char buf
[2];
2487 /* Save a 6 bit len */
2488 buf
[0] = (len
&0xFF)|(REDIS_RDB_6BITLEN
<<6);
2489 if (fwrite(buf
,1,1,fp
) == 0) return -1;
2490 } else if (len
< (1<<14)) {
2491 /* Save a 14 bit len */
2492 buf
[0] = ((len
>>8)&0xFF)|(REDIS_RDB_14BITLEN
<<6);
2494 if (fwrite(buf
,2,1,fp
) == 0) return -1;
2496 /* Save a 32 bit len */
2497 buf
[0] = (REDIS_RDB_32BITLEN
<<6);
2498 if (fwrite(buf
,1,1,fp
) == 0) return -1;
2500 if (fwrite(&len
,4,1,fp
) == 0) return -1;
2505 /* String objects in the form "2391" "-100" without any space and with a
2506 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2507 * encoded as integers to save space */
2508 static int rdbTryIntegerEncoding(sds s
, unsigned char *enc
) {
2510 char *endptr
, buf
[32];
2512 /* Check if it's possible to encode this value as a number */
2513 value
= strtoll(s
, &endptr
, 10);
2514 if (endptr
[0] != '\0') return 0;
2515 snprintf(buf
,32,"%lld",value
);
2517 /* If the number converted back into a string is not identical
2518 * then it's not possible to encode the string as integer */
2519 if (strlen(buf
) != sdslen(s
) || memcmp(buf
,s
,sdslen(s
))) return 0;
2521 /* Finally check if it fits in our ranges */
2522 if (value
>= -(1<<7) && value
<= (1<<7)-1) {
2523 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT8
;
2524 enc
[1] = value
&0xFF;
2526 } else if (value
>= -(1<<15) && value
<= (1<<15)-1) {
2527 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT16
;
2528 enc
[1] = value
&0xFF;
2529 enc
[2] = (value
>>8)&0xFF;
2531 } else if (value
>= -((long long)1<<31) && value
<= ((long long)1<<31)-1) {
2532 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT32
;
2533 enc
[1] = value
&0xFF;
2534 enc
[2] = (value
>>8)&0xFF;
2535 enc
[3] = (value
>>16)&0xFF;
2536 enc
[4] = (value
>>24)&0xFF;
2543 static int rdbSaveLzfStringObject(FILE *fp
, robj
*obj
) {
2544 unsigned int comprlen
, outlen
;
2548 /* We require at least four bytes compression for this to be worth it */
2549 outlen
= sdslen(obj
->ptr
)-4;
2550 if (outlen
<= 0) return 0;
2551 if ((out
= zmalloc(outlen
+1)) == NULL
) return 0;
2552 comprlen
= lzf_compress(obj
->ptr
, sdslen(obj
->ptr
), out
, outlen
);
2553 if (comprlen
== 0) {
2557 /* Data compressed! Let's save it on disk */
2558 byte
= (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_LZF
;
2559 if (fwrite(&byte
,1,1,fp
) == 0) goto writeerr
;
2560 if (rdbSaveLen(fp
,comprlen
) == -1) goto writeerr
;
2561 if (rdbSaveLen(fp
,sdslen(obj
->ptr
)) == -1) goto writeerr
;
2562 if (fwrite(out
,comprlen
,1,fp
) == 0) goto writeerr
;
2571 /* Save a string objet as [len][data] on disk. If the object is a string
2572 * representation of an integer value we try to safe it in a special form */
2573 static int rdbSaveStringObjectRaw(FILE *fp
, robj
*obj
) {
2577 len
= sdslen(obj
->ptr
);
2579 /* Try integer encoding */
2581 unsigned char buf
[5];
2582 if ((enclen
= rdbTryIntegerEncoding(obj
->ptr
,buf
)) > 0) {
2583 if (fwrite(buf
,enclen
,1,fp
) == 0) return -1;
2588 /* Try LZF compression - under 20 bytes it's unable to compress even
2589 * aaaaaaaaaaaaaaaaaa so skip it */
2590 if (server
.rdbcompression
&& len
> 20) {
2593 retval
= rdbSaveLzfStringObject(fp
,obj
);
2594 if (retval
== -1) return -1;
2595 if (retval
> 0) return 0;
2596 /* retval == 0 means data can't be compressed, save the old way */
2599 /* Store verbatim */
2600 if (rdbSaveLen(fp
,len
) == -1) return -1;
2601 if (len
&& fwrite(obj
->ptr
,len
,1,fp
) == 0) return -1;
2605 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2606 static int rdbSaveStringObject(FILE *fp
, robj
*obj
) {
2609 obj
= getDecodedObject(obj
);
2610 retval
= rdbSaveStringObjectRaw(fp
,obj
);
2615 /* Save a double value. Doubles are saved as strings prefixed by an unsigned
2616 * 8 bit integer specifing the length of the representation.
2617 * This 8 bit integer has special values in order to specify the following
2623 static int rdbSaveDoubleValue(FILE *fp
, double val
) {
2624 unsigned char buf
[128];
2630 } else if (!isfinite(val
)) {
2632 buf
[0] = (val
< 0) ? 255 : 254;
2634 snprintf((char*)buf
+1,sizeof(buf
)-1,"%.17g",val
);
2635 buf
[0] = strlen((char*)buf
+1);
2638 if (fwrite(buf
,len
,1,fp
) == 0) return -1;
2642 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2643 static int rdbSave(char *filename
) {
2644 dictIterator
*di
= NULL
;
2649 time_t now
= time(NULL
);
2651 snprintf(tmpfile
,256,"temp-%d.rdb", (int) getpid());
2652 fp
= fopen(tmpfile
,"w");
2654 redisLog(REDIS_WARNING
, "Failed saving the DB: %s", strerror(errno
));
2657 if (fwrite("REDIS0001",9,1,fp
) == 0) goto werr
;
2658 for (j
= 0; j
< server
.dbnum
; j
++) {
2659 redisDb
*db
= server
.db
+j
;
2661 if (dictSize(d
) == 0) continue;
2662 di
= dictGetIterator(d
);
2668 /* Write the SELECT DB opcode */
2669 if (rdbSaveType(fp
,REDIS_SELECTDB
) == -1) goto werr
;
2670 if (rdbSaveLen(fp
,j
) == -1) goto werr
;
2672 /* Iterate this DB writing every entry */
2673 while((de
= dictNext(di
)) != NULL
) {
2674 robj
*key
= dictGetEntryKey(de
);
2675 robj
*o
= dictGetEntryVal(de
);
2676 time_t expiretime
= getExpire(db
,key
);
2678 /* Save the expire time */
2679 if (expiretime
!= -1) {
2680 /* If this key is already expired skip it */
2681 if (expiretime
< now
) continue;
2682 if (rdbSaveType(fp
,REDIS_EXPIRETIME
) == -1) goto werr
;
2683 if (rdbSaveTime(fp
,expiretime
) == -1) goto werr
;
2685 /* Save the key and associated value */
2686 if (rdbSaveType(fp
,o
->type
) == -1) goto werr
;
2687 if (rdbSaveStringObject(fp
,key
) == -1) goto werr
;
2688 if (o
->type
== REDIS_STRING
) {
2689 /* Save a string value */
2690 if (rdbSaveStringObject(fp
,o
) == -1) goto werr
;
2691 } else if (o
->type
== REDIS_LIST
) {
2692 /* Save a list value */
2693 list
*list
= o
->ptr
;
2697 if (rdbSaveLen(fp
,listLength(list
)) == -1) goto werr
;
2698 while((ln
= listYield(list
))) {
2699 robj
*eleobj
= listNodeValue(ln
);
2701 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
2703 } else if (o
->type
== REDIS_SET
) {
2704 /* Save a set value */
2706 dictIterator
*di
= dictGetIterator(set
);
2709 if (rdbSaveLen(fp
,dictSize(set
)) == -1) goto werr
;
2710 while((de
= dictNext(di
)) != NULL
) {
2711 robj
*eleobj
= dictGetEntryKey(de
);
2713 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
2715 dictReleaseIterator(di
);
2716 } else if (o
->type
== REDIS_ZSET
) {
2717 /* Save a set value */
2719 dictIterator
*di
= dictGetIterator(zs
->dict
);
2722 if (rdbSaveLen(fp
,dictSize(zs
->dict
)) == -1) goto werr
;
2723 while((de
= dictNext(di
)) != NULL
) {
2724 robj
*eleobj
= dictGetEntryKey(de
);
2725 double *score
= dictGetEntryVal(de
);
2727 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
2728 if (rdbSaveDoubleValue(fp
,*score
) == -1) goto werr
;
2730 dictReleaseIterator(di
);
2732 redisAssert(0 != 0);
2735 dictReleaseIterator(di
);
2738 if (rdbSaveType(fp
,REDIS_EOF
) == -1) goto werr
;
2740 /* Make sure data will not remain on the OS's output buffers */
2745 /* Use RENAME to make sure the DB file is changed atomically only
2746 * if the generate DB file is ok. */
2747 if (rename(tmpfile
,filename
) == -1) {
2748 redisLog(REDIS_WARNING
,"Error moving temp DB file on the final destination: %s", strerror(errno
));
2752 redisLog(REDIS_NOTICE
,"DB saved on disk");
2754 server
.lastsave
= time(NULL
);
2760 redisLog(REDIS_WARNING
,"Write error saving DB on disk: %s", strerror(errno
));
2761 if (di
) dictReleaseIterator(di
);
2765 static int rdbSaveBackground(char *filename
) {
2768 if (server
.bgsavechildpid
!= -1) return REDIS_ERR
;
2769 if ((childpid
= fork()) == 0) {
2772 if (rdbSave(filename
) == REDIS_OK
) {
2779 if (childpid
== -1) {
2780 redisLog(REDIS_WARNING
,"Can't save in background: fork: %s",
2784 redisLog(REDIS_NOTICE
,"Background saving started by pid %d",childpid
);
2785 server
.bgsavechildpid
= childpid
;
2788 return REDIS_OK
; /* unreached */
2791 static void rdbRemoveTempFile(pid_t childpid
) {
2794 snprintf(tmpfile
,256,"temp-%d.rdb", (int) childpid
);
2798 static int rdbLoadType(FILE *fp
) {
2800 if (fread(&type
,1,1,fp
) == 0) return -1;
2804 static time_t rdbLoadTime(FILE *fp
) {
2806 if (fread(&t32
,4,1,fp
) == 0) return -1;
2807 return (time_t) t32
;
2810 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2811 * of this file for a description of how this are stored on disk.
2813 * isencoded is set to 1 if the readed length is not actually a length but
2814 * an "encoding type", check the above comments for more info */
2815 static uint32_t rdbLoadLen(FILE *fp
, int rdbver
, int *isencoded
) {
2816 unsigned char buf
[2];
2819 if (isencoded
) *isencoded
= 0;
2821 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
2826 if (fread(buf
,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2827 type
= (buf
[0]&0xC0)>>6;
2828 if (type
== REDIS_RDB_6BITLEN
) {
2829 /* Read a 6 bit len */
2831 } else if (type
== REDIS_RDB_ENCVAL
) {
2832 /* Read a 6 bit len encoding type */
2833 if (isencoded
) *isencoded
= 1;
2835 } else if (type
== REDIS_RDB_14BITLEN
) {
2836 /* Read a 14 bit len */
2837 if (fread(buf
+1,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2838 return ((buf
[0]&0x3F)<<8)|buf
[1];
2840 /* Read a 32 bit len */
2841 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
2847 static robj
*rdbLoadIntegerObject(FILE *fp
, int enctype
) {
2848 unsigned char enc
[4];
2851 if (enctype
== REDIS_RDB_ENC_INT8
) {
2852 if (fread(enc
,1,1,fp
) == 0) return NULL
;
2853 val
= (signed char)enc
[0];
2854 } else if (enctype
== REDIS_RDB_ENC_INT16
) {
2856 if (fread(enc
,2,1,fp
) == 0) return NULL
;
2857 v
= enc
[0]|(enc
[1]<<8);
2859 } else if (enctype
== REDIS_RDB_ENC_INT32
) {
2861 if (fread(enc
,4,1,fp
) == 0) return NULL
;
2862 v
= enc
[0]|(enc
[1]<<8)|(enc
[2]<<16)|(enc
[3]<<24);
2865 val
= 0; /* anti-warning */
2868 return createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",val
));
2871 static robj
*rdbLoadLzfStringObject(FILE*fp
, int rdbver
) {
2872 unsigned int len
, clen
;
2873 unsigned char *c
= NULL
;
2876 if ((clen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2877 if ((len
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2878 if ((c
= zmalloc(clen
)) == NULL
) goto err
;
2879 if ((val
= sdsnewlen(NULL
,len
)) == NULL
) goto err
;
2880 if (fread(c
,clen
,1,fp
) == 0) goto err
;
2881 if (lzf_decompress(c
,clen
,val
,len
) == 0) goto err
;
2883 return createObject(REDIS_STRING
,val
);
2890 static robj
*rdbLoadStringObject(FILE*fp
, int rdbver
) {
2895 len
= rdbLoadLen(fp
,rdbver
,&isencoded
);
2898 case REDIS_RDB_ENC_INT8
:
2899 case REDIS_RDB_ENC_INT16
:
2900 case REDIS_RDB_ENC_INT32
:
2901 return tryObjectSharing(rdbLoadIntegerObject(fp
,len
));
2902 case REDIS_RDB_ENC_LZF
:
2903 return tryObjectSharing(rdbLoadLzfStringObject(fp
,rdbver
));
2909 if (len
== REDIS_RDB_LENERR
) return NULL
;
2910 val
= sdsnewlen(NULL
,len
);
2911 if (len
&& fread(val
,len
,1,fp
) == 0) {
2915 return tryObjectSharing(createObject(REDIS_STRING
,val
));
2918 /* For information about double serialization check rdbSaveDoubleValue() */
2919 static int rdbLoadDoubleValue(FILE *fp
, double *val
) {
2923 if (fread(&len
,1,1,fp
) == 0) return -1;
2925 case 255: *val
= R_NegInf
; return 0;
2926 case 254: *val
= R_PosInf
; return 0;
2927 case 253: *val
= R_Nan
; return 0;
2929 if (fread(buf
,len
,1,fp
) == 0) return -1;
2931 sscanf(buf
, "%lg", val
);
2936 static int rdbLoad(char *filename
) {
2938 robj
*keyobj
= NULL
;
2940 int type
, retval
, rdbver
;
2941 dict
*d
= server
.db
[0].dict
;
2942 redisDb
*db
= server
.db
+0;
2944 time_t expiretime
= -1, now
= time(NULL
);
2946 fp
= fopen(filename
,"r");
2947 if (!fp
) return REDIS_ERR
;
2948 if (fread(buf
,9,1,fp
) == 0) goto eoferr
;
2950 if (memcmp(buf
,"REDIS",5) != 0) {
2952 redisLog(REDIS_WARNING
,"Wrong signature trying to load DB from file");
2955 rdbver
= atoi(buf
+5);
2958 redisLog(REDIS_WARNING
,"Can't handle RDB format version %d",rdbver
);
2965 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
2966 if (type
== REDIS_EXPIRETIME
) {
2967 if ((expiretime
= rdbLoadTime(fp
)) == -1) goto eoferr
;
2968 /* We read the time so we need to read the object type again */
2969 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
2971 if (type
== REDIS_EOF
) break;
2972 /* Handle SELECT DB opcode as a special case */
2973 if (type
== REDIS_SELECTDB
) {
2974 if ((dbid
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2976 if (dbid
>= (unsigned)server
.dbnum
) {
2977 redisLog(REDIS_WARNING
,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server
.dbnum
);
2980 db
= server
.db
+dbid
;
2985 if ((keyobj
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2987 if (type
== REDIS_STRING
) {
2988 /* Read string value */
2989 if ((o
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2990 tryObjectEncoding(o
);
2991 } else if (type
== REDIS_LIST
|| type
== REDIS_SET
) {
2992 /* Read list/set value */
2995 if ((listlen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2997 o
= (type
== REDIS_LIST
) ? createListObject() : createSetObject();
2998 /* Load every single element of the list/set */
3002 if ((ele
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
3003 tryObjectEncoding(ele
);
3004 if (type
== REDIS_LIST
) {
3005 listAddNodeTail((list
*)o
->ptr
,ele
);
3007 dictAdd((dict
*)o
->ptr
,ele
,NULL
);
3010 } else if (type
== REDIS_ZSET
) {
3011 /* Read list/set value */
3015 if ((zsetlen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
3017 o
= createZsetObject();
3019 /* Load every single element of the list/set */
3022 double *score
= zmalloc(sizeof(double));
3024 if ((ele
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
3025 tryObjectEncoding(ele
);
3026 if (rdbLoadDoubleValue(fp
,score
) == -1) goto eoferr
;
3027 dictAdd(zs
->dict
,ele
,score
);
3028 zslInsert(zs
->zsl
,*score
,ele
);
3029 incrRefCount(ele
); /* added to skiplist */
3032 redisAssert(0 != 0);
3034 /* Add the new object in the hash table */
3035 retval
= dictAdd(d
,keyobj
,o
);
3036 if (retval
== DICT_ERR
) {
3037 redisLog(REDIS_WARNING
,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj
->ptr
);
3040 /* Set the expire time if needed */
3041 if (expiretime
!= -1) {
3042 setExpire(db
,keyobj
,expiretime
);
3043 /* Delete this key if already expired */
3044 if (expiretime
< now
) deleteKey(db
,keyobj
);
3052 eoferr
: /* unexpected end of file is handled here with a fatal exit */
3053 if (keyobj
) decrRefCount(keyobj
);
3054 redisLog(REDIS_WARNING
,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
3056 return REDIS_ERR
; /* Just to avoid warning */
3059 /*================================== Commands =============================== */
3061 static void authCommand(redisClient
*c
) {
3062 if (!server
.requirepass
|| !strcmp(c
->argv
[1]->ptr
, server
.requirepass
)) {
3063 c
->authenticated
= 1;
3064 addReply(c
,shared
.ok
);
3066 c
->authenticated
= 0;
3067 addReplySds(c
,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
3071 static void pingCommand(redisClient
*c
) {
3072 addReply(c
,shared
.pong
);
3075 static void echoCommand(redisClient
*c
) {
3076 addReplyBulkLen(c
,c
->argv
[1]);
3077 addReply(c
,c
->argv
[1]);
3078 addReply(c
,shared
.crlf
);
3081 /*=================================== Strings =============================== */
3083 static void setGenericCommand(redisClient
*c
, int nx
) {
3086 if (nx
) deleteIfVolatile(c
->db
,c
->argv
[1]);
3087 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
3088 if (retval
== DICT_ERR
) {
3090 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
3091 incrRefCount(c
->argv
[2]);
3093 addReply(c
,shared
.czero
);
3097 incrRefCount(c
->argv
[1]);
3098 incrRefCount(c
->argv
[2]);
3101 removeExpire(c
->db
,c
->argv
[1]);
3102 addReply(c
, nx
? shared
.cone
: shared
.ok
);
3105 static void setCommand(redisClient
*c
) {
3106 setGenericCommand(c
,0);
3109 static void setnxCommand(redisClient
*c
) {
3110 setGenericCommand(c
,1);
3113 static int getGenericCommand(redisClient
*c
) {
3114 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3117 addReply(c
,shared
.nullbulk
);
3120 if (o
->type
!= REDIS_STRING
) {
3121 addReply(c
,shared
.wrongtypeerr
);
3124 addReplyBulkLen(c
,o
);
3126 addReply(c
,shared
.crlf
);
3132 static void getCommand(redisClient
*c
) {
3133 getGenericCommand(c
);
3136 static void getsetCommand(redisClient
*c
) {
3137 if (getGenericCommand(c
) == REDIS_ERR
) return;
3138 if (dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]) == DICT_ERR
) {
3139 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
3141 incrRefCount(c
->argv
[1]);
3143 incrRefCount(c
->argv
[2]);
3145 removeExpire(c
->db
,c
->argv
[1]);
3148 static void mgetCommand(redisClient
*c
) {
3151 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",c
->argc
-1));
3152 for (j
= 1; j
< c
->argc
; j
++) {
3153 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[j
]);
3155 addReply(c
,shared
.nullbulk
);
3157 if (o
->type
!= REDIS_STRING
) {
3158 addReply(c
,shared
.nullbulk
);
3160 addReplyBulkLen(c
,o
);
3162 addReply(c
,shared
.crlf
);
3168 static void msetGenericCommand(redisClient
*c
, int nx
) {
3169 int j
, busykeys
= 0;
3171 if ((c
->argc
% 2) == 0) {
3172 addReplySds(c
,sdsnew("-ERR wrong number of arguments for MSET\r\n"));
3175 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
3176 * set nothing at all if at least one already key exists. */
3178 for (j
= 1; j
< c
->argc
; j
+= 2) {
3179 if (lookupKeyWrite(c
->db
,c
->argv
[j
]) != NULL
) {
3185 addReply(c
, shared
.czero
);
3189 for (j
= 1; j
< c
->argc
; j
+= 2) {
3192 tryObjectEncoding(c
->argv
[j
+1]);
3193 retval
= dictAdd(c
->db
->dict
,c
->argv
[j
],c
->argv
[j
+1]);
3194 if (retval
== DICT_ERR
) {
3195 dictReplace(c
->db
->dict
,c
->argv
[j
],c
->argv
[j
+1]);
3196 incrRefCount(c
->argv
[j
+1]);
3198 incrRefCount(c
->argv
[j
]);
3199 incrRefCount(c
->argv
[j
+1]);
3201 removeExpire(c
->db
,c
->argv
[j
]);
3203 server
.dirty
+= (c
->argc
-1)/2;
3204 addReply(c
, nx
? shared
.cone
: shared
.ok
);
3207 static void msetCommand(redisClient
*c
) {
3208 msetGenericCommand(c
,0);
3211 static void msetnxCommand(redisClient
*c
) {
3212 msetGenericCommand(c
,1);
3215 static void incrDecrCommand(redisClient
*c
, long long incr
) {
3220 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3224 if (o
->type
!= REDIS_STRING
) {
3229 if (o
->encoding
== REDIS_ENCODING_RAW
)
3230 value
= strtoll(o
->ptr
, &eptr
, 10);
3231 else if (o
->encoding
== REDIS_ENCODING_INT
)
3232 value
= (long)o
->ptr
;
3234 redisAssert(1 != 1);
3239 o
= createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",value
));
3240 tryObjectEncoding(o
);
3241 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],o
);
3242 if (retval
== DICT_ERR
) {
3243 dictReplace(c
->db
->dict
,c
->argv
[1],o
);
3244 removeExpire(c
->db
,c
->argv
[1]);
3246 incrRefCount(c
->argv
[1]);
3249 addReply(c
,shared
.colon
);
3251 addReply(c
,shared
.crlf
);
3254 static void incrCommand(redisClient
*c
) {
3255 incrDecrCommand(c
,1);
3258 static void decrCommand(redisClient
*c
) {
3259 incrDecrCommand(c
,-1);
3262 static void incrbyCommand(redisClient
*c
) {
3263 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
3264 incrDecrCommand(c
,incr
);
3267 static void decrbyCommand(redisClient
*c
) {
3268 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
3269 incrDecrCommand(c
,-incr
);
3272 /* ========================= Type agnostic commands ========================= */
3274 static void delCommand(redisClient
*c
) {
3277 for (j
= 1; j
< c
->argc
; j
++) {
3278 if (deleteKey(c
->db
,c
->argv
[j
])) {
3285 addReply(c
,shared
.czero
);
3288 addReply(c
,shared
.cone
);
3291 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",deleted
));
3296 static void existsCommand(redisClient
*c
) {
3297 addReply(c
,lookupKeyRead(c
->db
,c
->argv
[1]) ? shared
.cone
: shared
.czero
);
3300 static void selectCommand(redisClient
*c
) {
3301 int id
= atoi(c
->argv
[1]->ptr
);
3303 if (selectDb(c
,id
) == REDIS_ERR
) {
3304 addReplySds(c
,sdsnew("-ERR invalid DB index\r\n"));
3306 addReply(c
,shared
.ok
);
3310 static void randomkeyCommand(redisClient
*c
) {
3314 de
= dictGetRandomKey(c
->db
->dict
);
3315 if (!de
|| expireIfNeeded(c
->db
,dictGetEntryKey(de
)) == 0) break;
3318 addReply(c
,shared
.plus
);
3319 addReply(c
,shared
.crlf
);
3321 addReply(c
,shared
.plus
);
3322 addReply(c
,dictGetEntryKey(de
));
3323 addReply(c
,shared
.crlf
);
3327 static void keysCommand(redisClient
*c
) {
3330 sds pattern
= c
->argv
[1]->ptr
;
3331 int plen
= sdslen(pattern
);
3332 unsigned long numkeys
= 0, keyslen
= 0;
3333 robj
*lenobj
= createObject(REDIS_STRING
,NULL
);
3335 di
= dictGetIterator(c
->db
->dict
);
3337 decrRefCount(lenobj
);
3338 while((de
= dictNext(di
)) != NULL
) {
3339 robj
*keyobj
= dictGetEntryKey(de
);
3341 sds key
= keyobj
->ptr
;
3342 if ((pattern
[0] == '*' && pattern
[1] == '\0') ||
3343 stringmatchlen(pattern
,plen
,key
,sdslen(key
),0)) {
3344 if (expireIfNeeded(c
->db
,keyobj
) == 0) {
3346 addReply(c
,shared
.space
);
3349 keyslen
+= sdslen(key
);
3353 dictReleaseIterator(di
);
3354 lenobj
->ptr
= sdscatprintf(sdsempty(),"$%lu\r\n",keyslen
+(numkeys
? (numkeys
-1) : 0));
3355 addReply(c
,shared
.crlf
);
3358 static void dbsizeCommand(redisClient
*c
) {
3360 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c
->db
->dict
)));
3363 static void lastsaveCommand(redisClient
*c
) {
3365 sdscatprintf(sdsempty(),":%lu\r\n",server
.lastsave
));
3368 static void typeCommand(redisClient
*c
) {
3372 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3377 case REDIS_STRING
: type
= "+string"; break;
3378 case REDIS_LIST
: type
= "+list"; break;
3379 case REDIS_SET
: type
= "+set"; break;
3380 case REDIS_ZSET
: type
= "+zset"; break;
3381 default: type
= "unknown"; break;
3384 addReplySds(c
,sdsnew(type
));
3385 addReply(c
,shared
.crlf
);
3388 static void saveCommand(redisClient
*c
) {
3389 if (server
.bgsavechildpid
!= -1) {
3390 addReplySds(c
,sdsnew("-ERR background save in progress\r\n"));
3393 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
3394 addReply(c
,shared
.ok
);
3396 addReply(c
,shared
.err
);
3400 static void bgsaveCommand(redisClient
*c
) {
3401 if (server
.bgsavechildpid
!= -1) {
3402 addReplySds(c
,sdsnew("-ERR background save already in progress\r\n"));
3405 if (rdbSaveBackground(server
.dbfilename
) == REDIS_OK
) {
3406 char *status
= "+Background saving started\r\n";
3407 addReplySds(c
,sdsnew(status
));
3409 addReply(c
,shared
.err
);
3413 static void shutdownCommand(redisClient
*c
) {
3414 redisLog(REDIS_WARNING
,"User requested shutdown, saving DB...");
3415 /* Kill the saving child if there is a background saving in progress.
3416 We want to avoid race conditions, for instance our saving child may
3417 overwrite the synchronous saving did by SHUTDOWN. */
3418 if (server
.bgsavechildpid
!= -1) {
3419 redisLog(REDIS_WARNING
,"There is a live saving child. Killing it!");
3420 kill(server
.bgsavechildpid
,SIGKILL
);
3421 rdbRemoveTempFile(server
.bgsavechildpid
);
3423 if (server
.appendonly
) {
3424 /* Append only file: fsync() the AOF and exit */
3425 fsync(server
.appendfd
);
3428 /* Snapshotting. Perform a SYNC SAVE and exit */
3429 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
3430 if (server
.daemonize
)
3431 unlink(server
.pidfile
);
3432 redisLog(REDIS_WARNING
,"%zu bytes used at exit",zmalloc_used_memory());
3433 redisLog(REDIS_WARNING
,"Server exit now, bye bye...");
3436 /* Ooops.. error saving! The best we can do is to continue operating.
3437 * Note that if there was a background saving process, in the next
3438 * cron() Redis will be notified that the background saving aborted,
3439 * handling special stuff like slaves pending for synchronization... */
3440 redisLog(REDIS_WARNING
,"Error trying to save the DB, can't exit");
3441 addReplySds(c
,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3446 static void renameGenericCommand(redisClient
*c
, int nx
) {
3449 /* To use the same key as src and dst is probably an error */
3450 if (sdscmp(c
->argv
[1]->ptr
,c
->argv
[2]->ptr
) == 0) {
3451 addReply(c
,shared
.sameobjecterr
);
3455 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3457 addReply(c
,shared
.nokeyerr
);
3461 deleteIfVolatile(c
->db
,c
->argv
[2]);
3462 if (dictAdd(c
->db
->dict
,c
->argv
[2],o
) == DICT_ERR
) {
3465 addReply(c
,shared
.czero
);
3468 dictReplace(c
->db
->dict
,c
->argv
[2],o
);
3470 incrRefCount(c
->argv
[2]);
3472 deleteKey(c
->db
,c
->argv
[1]);
3474 addReply(c
,nx
? shared
.cone
: shared
.ok
);
3477 static void renameCommand(redisClient
*c
) {
3478 renameGenericCommand(c
,0);
3481 static void renamenxCommand(redisClient
*c
) {
3482 renameGenericCommand(c
,1);
3485 static void moveCommand(redisClient
*c
) {
3490 /* Obtain source and target DB pointers */
3493 if (selectDb(c
,atoi(c
->argv
[2]->ptr
)) == REDIS_ERR
) {
3494 addReply(c
,shared
.outofrangeerr
);
3498 selectDb(c
,srcid
); /* Back to the source DB */
3500 /* If the user is moving using as target the same
3501 * DB as the source DB it is probably an error. */
3503 addReply(c
,shared
.sameobjecterr
);
3507 /* Check if the element exists and get a reference */
3508 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3510 addReply(c
,shared
.czero
);
3514 /* Try to add the element to the target DB */
3515 deleteIfVolatile(dst
,c
->argv
[1]);
3516 if (dictAdd(dst
->dict
,c
->argv
[1],o
) == DICT_ERR
) {
3517 addReply(c
,shared
.czero
);
3520 incrRefCount(c
->argv
[1]);
3523 /* OK! key moved, free the entry in the source DB */
3524 deleteKey(src
,c
->argv
[1]);
3526 addReply(c
,shared
.cone
);
3529 /* =================================== Lists ================================ */
3530 static void pushGenericCommand(redisClient
*c
, int where
) {
3534 lobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3536 if (handleClientsWaitingListPush(c
,c
->argv
[1],c
->argv
[2])) {
3537 addReply(c
,shared
.ok
);
3540 lobj
= createListObject();
3542 if (where
== REDIS_HEAD
) {
3543 listAddNodeHead(list
,c
->argv
[2]);
3545 listAddNodeTail(list
,c
->argv
[2]);
3547 dictAdd(c
->db
->dict
,c
->argv
[1],lobj
);
3548 incrRefCount(c
->argv
[1]);
3549 incrRefCount(c
->argv
[2]);
3551 if (lobj
->type
!= REDIS_LIST
) {
3552 addReply(c
,shared
.wrongtypeerr
);
3555 if (handleClientsWaitingListPush(c
,c
->argv
[1],c
->argv
[2])) {
3556 addReply(c
,shared
.ok
);
3560 if (where
== REDIS_HEAD
) {
3561 listAddNodeHead(list
,c
->argv
[2]);
3563 listAddNodeTail(list
,c
->argv
[2]);
3565 incrRefCount(c
->argv
[2]);
3568 addReply(c
,shared
.ok
);
3571 static void lpushCommand(redisClient
*c
) {
3572 pushGenericCommand(c
,REDIS_HEAD
);
3575 static void rpushCommand(redisClient
*c
) {
3576 pushGenericCommand(c
,REDIS_TAIL
);
3579 static void llenCommand(redisClient
*c
) {
3583 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3585 addReply(c
,shared
.czero
);
3588 if (o
->type
!= REDIS_LIST
) {
3589 addReply(c
,shared
.wrongtypeerr
);
3592 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",listLength(l
)));
3597 static void lindexCommand(redisClient
*c
) {
3599 int index
= atoi(c
->argv
[2]->ptr
);
3601 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3603 addReply(c
,shared
.nullbulk
);
3605 if (o
->type
!= REDIS_LIST
) {
3606 addReply(c
,shared
.wrongtypeerr
);
3608 list
*list
= o
->ptr
;
3611 ln
= listIndex(list
, index
);
3613 addReply(c
,shared
.nullbulk
);
3615 robj
*ele
= listNodeValue(ln
);
3616 addReplyBulkLen(c
,ele
);
3618 addReply(c
,shared
.crlf
);
3624 static void lsetCommand(redisClient
*c
) {
3626 int index
= atoi(c
->argv
[2]->ptr
);
3628 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3630 addReply(c
,shared
.nokeyerr
);
3632 if (o
->type
!= REDIS_LIST
) {
3633 addReply(c
,shared
.wrongtypeerr
);
3635 list
*list
= o
->ptr
;
3638 ln
= listIndex(list
, index
);
3640 addReply(c
,shared
.outofrangeerr
);
3642 robj
*ele
= listNodeValue(ln
);
3645 listNodeValue(ln
) = c
->argv
[3];
3646 incrRefCount(c
->argv
[3]);
3647 addReply(c
,shared
.ok
);
3654 static void popGenericCommand(redisClient
*c
, int where
) {
3657 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3659 addReply(c
,shared
.nullbulk
);
3661 if (o
->type
!= REDIS_LIST
) {
3662 addReply(c
,shared
.wrongtypeerr
);
3664 list
*list
= o
->ptr
;
3667 if (where
== REDIS_HEAD
)
3668 ln
= listFirst(list
);
3670 ln
= listLast(list
);
3673 addReply(c
,shared
.nullbulk
);
3675 robj
*ele
= listNodeValue(ln
);
3676 addReplyBulkLen(c
,ele
);
3678 addReply(c
,shared
.crlf
);
3679 listDelNode(list
,ln
);
3686 static void lpopCommand(redisClient
*c
) {
3687 popGenericCommand(c
,REDIS_HEAD
);
3690 static void rpopCommand(redisClient
*c
) {
3691 popGenericCommand(c
,REDIS_TAIL
);
3694 static void lrangeCommand(redisClient
*c
) {
3696 int start
= atoi(c
->argv
[2]->ptr
);
3697 int end
= atoi(c
->argv
[3]->ptr
);
3699 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3701 addReply(c
,shared
.nullmultibulk
);
3703 if (o
->type
!= REDIS_LIST
) {
3704 addReply(c
,shared
.wrongtypeerr
);
3706 list
*list
= o
->ptr
;
3708 int llen
= listLength(list
);
3712 /* convert negative indexes */
3713 if (start
< 0) start
= llen
+start
;
3714 if (end
< 0) end
= llen
+end
;
3715 if (start
< 0) start
= 0;
3716 if (end
< 0) end
= 0;
3718 /* indexes sanity checks */
3719 if (start
> end
|| start
>= llen
) {
3720 /* Out of range start or start > end result in empty list */
3721 addReply(c
,shared
.emptymultibulk
);
3724 if (end
>= llen
) end
= llen
-1;
3725 rangelen
= (end
-start
)+1;
3727 /* Return the result in form of a multi-bulk reply */
3728 ln
= listIndex(list
, start
);
3729 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",rangelen
));
3730 for (j
= 0; j
< rangelen
; j
++) {
3731 ele
= listNodeValue(ln
);
3732 addReplyBulkLen(c
,ele
);
3734 addReply(c
,shared
.crlf
);
3741 static void ltrimCommand(redisClient
*c
) {
3743 int start
= atoi(c
->argv
[2]->ptr
);
3744 int end
= atoi(c
->argv
[3]->ptr
);
3746 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3748 addReply(c
,shared
.ok
);
3750 if (o
->type
!= REDIS_LIST
) {
3751 addReply(c
,shared
.wrongtypeerr
);
3753 list
*list
= o
->ptr
;
3755 int llen
= listLength(list
);
3756 int j
, ltrim
, rtrim
;
3758 /* convert negative indexes */
3759 if (start
< 0) start
= llen
+start
;
3760 if (end
< 0) end
= llen
+end
;
3761 if (start
< 0) start
= 0;
3762 if (end
< 0) end
= 0;
3764 /* indexes sanity checks */
3765 if (start
> end
|| start
>= llen
) {
3766 /* Out of range start or start > end result in empty list */
3770 if (end
>= llen
) end
= llen
-1;
3775 /* Remove list elements to perform the trim */
3776 for (j
= 0; j
< ltrim
; j
++) {
3777 ln
= listFirst(list
);
3778 listDelNode(list
,ln
);
3780 for (j
= 0; j
< rtrim
; j
++) {
3781 ln
= listLast(list
);
3782 listDelNode(list
,ln
);
3785 addReply(c
,shared
.ok
);
3790 static void lremCommand(redisClient
*c
) {
3793 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3795 addReply(c
,shared
.czero
);
3797 if (o
->type
!= REDIS_LIST
) {
3798 addReply(c
,shared
.wrongtypeerr
);
3800 list
*list
= o
->ptr
;
3801 listNode
*ln
, *next
;
3802 int toremove
= atoi(c
->argv
[2]->ptr
);
3807 toremove
= -toremove
;
3810 ln
= fromtail
? list
->tail
: list
->head
;
3812 robj
*ele
= listNodeValue(ln
);
3814 next
= fromtail
? ln
->prev
: ln
->next
;
3815 if (compareStringObjects(ele
,c
->argv
[3]) == 0) {
3816 listDelNode(list
,ln
);
3819 if (toremove
&& removed
== toremove
) break;
3823 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",removed
));
3828 /* This is the semantic of this command:
3829 * RPOPLPUSH srclist dstlist:
3830 * IF LLEN(srclist) > 0
3831 * element = RPOP srclist
3832 * LPUSH dstlist element
3839 * The idea is to be able to get an element from a list in a reliable way
3840 * since the element is not just returned but pushed against another list
3841 * as well. This command was originally proposed by Ezra Zygmuntowicz.
3843 static void rpoplpushcommand(redisClient
*c
) {
3846 sobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3848 addReply(c
,shared
.nullbulk
);
3850 if (sobj
->type
!= REDIS_LIST
) {
3851 addReply(c
,shared
.wrongtypeerr
);
3853 list
*srclist
= sobj
->ptr
;
3854 listNode
*ln
= listLast(srclist
);
3857 addReply(c
,shared
.nullbulk
);
3859 robj
*dobj
= lookupKeyWrite(c
->db
,c
->argv
[2]);
3860 robj
*ele
= listNodeValue(ln
);
3865 /* Create the list if the key does not exist */
3866 dobj
= createListObject();
3867 dictAdd(c
->db
->dict
,c
->argv
[2],dobj
);
3868 incrRefCount(c
->argv
[2]);
3869 } else if (dobj
->type
!= REDIS_LIST
) {
3870 addReply(c
,shared
.wrongtypeerr
);
3873 /* Add the element to the target list */
3874 dstlist
= dobj
->ptr
;
3875 listAddNodeHead(dstlist
,ele
);
3878 /* Send the element to the client as reply as well */
3879 addReplyBulkLen(c
,ele
);
3881 addReply(c
,shared
.crlf
);
3883 /* Finally remove the element from the source list */
3884 listDelNode(srclist
,ln
);
3892 /* ==================================== Sets ================================ */
3894 static void saddCommand(redisClient
*c
) {
3897 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3899 set
= createSetObject();
3900 dictAdd(c
->db
->dict
,c
->argv
[1],set
);
3901 incrRefCount(c
->argv
[1]);
3903 if (set
->type
!= REDIS_SET
) {
3904 addReply(c
,shared
.wrongtypeerr
);
3908 if (dictAdd(set
->ptr
,c
->argv
[2],NULL
) == DICT_OK
) {
3909 incrRefCount(c
->argv
[2]);
3911 addReply(c
,shared
.cone
);
3913 addReply(c
,shared
.czero
);
3917 static void sremCommand(redisClient
*c
) {
3920 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3922 addReply(c
,shared
.czero
);
3924 if (set
->type
!= REDIS_SET
) {
3925 addReply(c
,shared
.wrongtypeerr
);
3928 if (dictDelete(set
->ptr
,c
->argv
[2]) == DICT_OK
) {
3930 if (htNeedsResize(set
->ptr
)) dictResize(set
->ptr
);
3931 addReply(c
,shared
.cone
);
3933 addReply(c
,shared
.czero
);
3938 static void smoveCommand(redisClient
*c
) {
3939 robj
*srcset
, *dstset
;
3941 srcset
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3942 dstset
= lookupKeyWrite(c
->db
,c
->argv
[2]);
3944 /* If the source key does not exist return 0, if it's of the wrong type
3946 if (srcset
== NULL
|| srcset
->type
!= REDIS_SET
) {
3947 addReply(c
, srcset
? shared
.wrongtypeerr
: shared
.czero
);
3950 /* Error if the destination key is not a set as well */
3951 if (dstset
&& dstset
->type
!= REDIS_SET
) {
3952 addReply(c
,shared
.wrongtypeerr
);
3955 /* Remove the element from the source set */
3956 if (dictDelete(srcset
->ptr
,c
->argv
[3]) == DICT_ERR
) {
3957 /* Key not found in the src set! return zero */
3958 addReply(c
,shared
.czero
);
3962 /* Add the element to the destination set */
3964 dstset
= createSetObject();
3965 dictAdd(c
->db
->dict
,c
->argv
[2],dstset
);
3966 incrRefCount(c
->argv
[2]);
3968 if (dictAdd(dstset
->ptr
,c
->argv
[3],NULL
) == DICT_OK
)
3969 incrRefCount(c
->argv
[3]);
3970 addReply(c
,shared
.cone
);
3973 static void sismemberCommand(redisClient
*c
) {
3976 set
= lookupKeyRead(c
->db
,c
->argv
[1]);
3978 addReply(c
,shared
.czero
);
3980 if (set
->type
!= REDIS_SET
) {
3981 addReply(c
,shared
.wrongtypeerr
);
3984 if (dictFind(set
->ptr
,c
->argv
[2]))
3985 addReply(c
,shared
.cone
);
3987 addReply(c
,shared
.czero
);
3991 static void scardCommand(redisClient
*c
) {
3995 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3997 addReply(c
,shared
.czero
);
4000 if (o
->type
!= REDIS_SET
) {
4001 addReply(c
,shared
.wrongtypeerr
);
4004 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",
4010 static void spopCommand(redisClient
*c
) {
4014 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4016 addReply(c
,shared
.nullbulk
);
4018 if (set
->type
!= REDIS_SET
) {
4019 addReply(c
,shared
.wrongtypeerr
);
4022 de
= dictGetRandomKey(set
->ptr
);
4024 addReply(c
,shared
.nullbulk
);
4026 robj
*ele
= dictGetEntryKey(de
);
4028 addReplyBulkLen(c
,ele
);
4030 addReply(c
,shared
.crlf
);
4031 dictDelete(set
->ptr
,ele
);
4032 if (htNeedsResize(set
->ptr
)) dictResize(set
->ptr
);
4038 static void srandmemberCommand(redisClient
*c
) {
4042 set
= lookupKeyRead(c
->db
,c
->argv
[1]);
4044 addReply(c
,shared
.nullbulk
);
4046 if (set
->type
!= REDIS_SET
) {
4047 addReply(c
,shared
.wrongtypeerr
);
4050 de
= dictGetRandomKey(set
->ptr
);
4052 addReply(c
,shared
.nullbulk
);
4054 robj
*ele
= dictGetEntryKey(de
);
4056 addReplyBulkLen(c
,ele
);
4058 addReply(c
,shared
.crlf
);
4063 static int qsortCompareSetsByCardinality(const void *s1
, const void *s2
) {
4064 dict
**d1
= (void*) s1
, **d2
= (void*) s2
;
4066 return dictSize(*d1
)-dictSize(*d2
);
4069 static void sinterGenericCommand(redisClient
*c
, robj
**setskeys
, unsigned long setsnum
, robj
*dstkey
) {
4070 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
4073 robj
*lenobj
= NULL
, *dstset
= NULL
;
4074 unsigned long j
, cardinality
= 0;
4076 for (j
= 0; j
< setsnum
; j
++) {
4080 lookupKeyWrite(c
->db
,setskeys
[j
]) :
4081 lookupKeyRead(c
->db
,setskeys
[j
]);
4085 if (deleteKey(c
->db
,dstkey
))
4087 addReply(c
,shared
.czero
);
4089 addReply(c
,shared
.nullmultibulk
);
4093 if (setobj
->type
!= REDIS_SET
) {
4095 addReply(c
,shared
.wrongtypeerr
);
4098 dv
[j
] = setobj
->ptr
;
4100 /* Sort sets from the smallest to largest, this will improve our
4101 * algorithm's performace */
4102 qsort(dv
,setsnum
,sizeof(dict
*),qsortCompareSetsByCardinality
);
4104 /* The first thing we should output is the total number of elements...
4105 * since this is a multi-bulk write, but at this stage we don't know
4106 * the intersection set size, so we use a trick, append an empty object
4107 * to the output list and save the pointer to later modify it with the
4110 lenobj
= createObject(REDIS_STRING
,NULL
);
4112 decrRefCount(lenobj
);
4114 /* If we have a target key where to store the resulting set
4115 * create this key with an empty set inside */
4116 dstset
= createSetObject();
4119 /* Iterate all the elements of the first (smallest) set, and test
4120 * the element against all the other sets, if at least one set does
4121 * not include the element it is discarded */
4122 di
= dictGetIterator(dv
[0]);
4124 while((de
= dictNext(di
)) != NULL
) {
4127 for (j
= 1; j
< setsnum
; j
++)
4128 if (dictFind(dv
[j
],dictGetEntryKey(de
)) == NULL
) break;
4130 continue; /* at least one set does not contain the member */
4131 ele
= dictGetEntryKey(de
);
4133 addReplyBulkLen(c
,ele
);
4135 addReply(c
,shared
.crlf
);
4138 dictAdd(dstset
->ptr
,ele
,NULL
);
4142 dictReleaseIterator(di
);
4145 /* Store the resulting set into the target */
4146 deleteKey(c
->db
,dstkey
);
4147 dictAdd(c
->db
->dict
,dstkey
,dstset
);
4148 incrRefCount(dstkey
);
4152 lenobj
->ptr
= sdscatprintf(sdsempty(),"*%lu\r\n",cardinality
);
4154 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",
4155 dictSize((dict
*)dstset
->ptr
)));
4161 static void sinterCommand(redisClient
*c
) {
4162 sinterGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
);
4165 static void sinterstoreCommand(redisClient
*c
) {
4166 sinterGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1]);
4169 #define REDIS_OP_UNION 0
4170 #define REDIS_OP_DIFF 1
4172 static void sunionDiffGenericCommand(redisClient
*c
, robj
**setskeys
, int setsnum
, robj
*dstkey
, int op
) {
4173 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
4176 robj
*dstset
= NULL
;
4177 int j
, cardinality
= 0;
4179 for (j
= 0; j
< setsnum
; j
++) {
4183 lookupKeyWrite(c
->db
,setskeys
[j
]) :
4184 lookupKeyRead(c
->db
,setskeys
[j
]);
4189 if (setobj
->type
!= REDIS_SET
) {
4191 addReply(c
,shared
.wrongtypeerr
);
4194 dv
[j
] = setobj
->ptr
;
4197 /* We need a temp set object to store our union. If the dstkey
4198 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
4199 * this set object will be the resulting object to set into the target key*/
4200 dstset
= createSetObject();
4202 /* Iterate all the elements of all the sets, add every element a single
4203 * time to the result set */
4204 for (j
= 0; j
< setsnum
; j
++) {
4205 if (op
== REDIS_OP_DIFF
&& j
== 0 && !dv
[j
]) break; /* result set is empty */
4206 if (!dv
[j
]) continue; /* non existing keys are like empty sets */
4208 di
= dictGetIterator(dv
[j
]);
4210 while((de
= dictNext(di
)) != NULL
) {
4213 /* dictAdd will not add the same element multiple times */
4214 ele
= dictGetEntryKey(de
);
4215 if (op
== REDIS_OP_UNION
|| j
== 0) {
4216 if (dictAdd(dstset
->ptr
,ele
,NULL
) == DICT_OK
) {
4220 } else if (op
== REDIS_OP_DIFF
) {
4221 if (dictDelete(dstset
->ptr
,ele
) == DICT_OK
) {
4226 dictReleaseIterator(di
);
4228 if (op
== REDIS_OP_DIFF
&& cardinality
== 0) break; /* result set is empty */
4231 /* Output the content of the resulting set, if not in STORE mode */
4233 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",cardinality
));
4234 di
= dictGetIterator(dstset
->ptr
);
4235 while((de
= dictNext(di
)) != NULL
) {
4238 ele
= dictGetEntryKey(de
);
4239 addReplyBulkLen(c
,ele
);
4241 addReply(c
,shared
.crlf
);
4243 dictReleaseIterator(di
);
4245 /* If we have a target key where to store the resulting set
4246 * create this key with the result set inside */
4247 deleteKey(c
->db
,dstkey
);
4248 dictAdd(c
->db
->dict
,dstkey
,dstset
);
4249 incrRefCount(dstkey
);
4254 decrRefCount(dstset
);
4256 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",
4257 dictSize((dict
*)dstset
->ptr
)));
4263 static void sunionCommand(redisClient
*c
) {
4264 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_UNION
);
4267 static void sunionstoreCommand(redisClient
*c
) {
4268 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_UNION
);
4271 static void sdiffCommand(redisClient
*c
) {
4272 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_DIFF
);
4275 static void sdiffstoreCommand(redisClient
*c
) {
4276 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_DIFF
);
4279 /* ==================================== ZSets =============================== */
4281 /* ZSETs are ordered sets using two data structures to hold the same elements
4282 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
4285 * The elements are added to an hash table mapping Redis objects to scores.
4286 * At the same time the elements are added to a skip list mapping scores
4287 * to Redis objects (so objects are sorted by scores in this "view"). */
4289 /* This skiplist implementation is almost a C translation of the original
4290 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
4291 * Alternative to Balanced Trees", modified in three ways:
4292 * a) this implementation allows for repeated values.
4293 * b) the comparison is not just by key (our 'score') but by satellite data.
4294 * c) there is a back pointer, so it's a doubly linked list with the back
4295 * pointers being only at "level 1". This allows to traverse the list
4296 * from tail to head, useful for ZREVRANGE. */
4298 static zskiplistNode
*zslCreateNode(int level
, double score
, robj
*obj
) {
4299 zskiplistNode
*zn
= zmalloc(sizeof(*zn
));
4301 zn
->forward
= zmalloc(sizeof(zskiplistNode
*) * level
);
4307 static zskiplist
*zslCreate(void) {
4311 zsl
= zmalloc(sizeof(*zsl
));
4314 zsl
->header
= zslCreateNode(ZSKIPLIST_MAXLEVEL
,0,NULL
);
4315 for (j
= 0; j
< ZSKIPLIST_MAXLEVEL
; j
++)
4316 zsl
->header
->forward
[j
] = NULL
;
4317 zsl
->header
->backward
= NULL
;
4322 static void zslFreeNode(zskiplistNode
*node
) {
4323 decrRefCount(node
->obj
);
4324 zfree(node
->forward
);
4328 static void zslFree(zskiplist
*zsl
) {
4329 zskiplistNode
*node
= zsl
->header
->forward
[0], *next
;
4331 zfree(zsl
->header
->forward
);
4334 next
= node
->forward
[0];
4341 static int zslRandomLevel(void) {
4343 while ((random()&0xFFFF) < (ZSKIPLIST_P
* 0xFFFF))
4348 static void zslInsert(zskiplist
*zsl
, double score
, robj
*obj
) {
4349 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
4353 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4354 while (x
->forward
[i
] &&
4355 (x
->forward
[i
]->score
< score
||
4356 (x
->forward
[i
]->score
== score
&&
4357 compareStringObjects(x
->forward
[i
]->obj
,obj
) < 0)))
4361 /* we assume the key is not already inside, since we allow duplicated
4362 * scores, and the re-insertion of score and redis object should never
4363 * happpen since the caller of zslInsert() should test in the hash table
4364 * if the element is already inside or not. */
4365 level
= zslRandomLevel();
4366 if (level
> zsl
->level
) {
4367 for (i
= zsl
->level
; i
< level
; i
++)
4368 update
[i
] = zsl
->header
;
4371 x
= zslCreateNode(level
,score
,obj
);
4372 for (i
= 0; i
< level
; i
++) {
4373 x
->forward
[i
] = update
[i
]->forward
[i
];
4374 update
[i
]->forward
[i
] = x
;
4376 x
->backward
= (update
[0] == zsl
->header
) ? NULL
: update
[0];
4378 x
->forward
[0]->backward
= x
;
4384 /* Delete an element with matching score/object from the skiplist. */
4385 static int zslDelete(zskiplist
*zsl
, double score
, robj
*obj
) {
4386 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
4390 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4391 while (x
->forward
[i
] &&
4392 (x
->forward
[i
]->score
< score
||
4393 (x
->forward
[i
]->score
== score
&&
4394 compareStringObjects(x
->forward
[i
]->obj
,obj
) < 0)))
4398 /* We may have multiple elements with the same score, what we need
4399 * is to find the element with both the right score and object. */
4401 if (x
&& score
== x
->score
&& compareStringObjects(x
->obj
,obj
) == 0) {
4402 for (i
= 0; i
< zsl
->level
; i
++) {
4403 if (update
[i
]->forward
[i
] != x
) break;
4404 update
[i
]->forward
[i
] = x
->forward
[i
];
4406 if (x
->forward
[0]) {
4407 x
->forward
[0]->backward
= (x
->backward
== zsl
->header
) ?
4410 zsl
->tail
= x
->backward
;
4413 while(zsl
->level
> 1 && zsl
->header
->forward
[zsl
->level
-1] == NULL
)
4418 return 0; /* not found */
4420 return 0; /* not found */
4423 /* Delete all the elements with score between min and max from the skiplist.
4424 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
4425 * Note that this function takes the reference to the hash table view of the
4426 * sorted set, in order to remove the elements from the hash table too. */
4427 static unsigned long zslDeleteRange(zskiplist
*zsl
, double min
, double max
, dict
*dict
) {
4428 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
4429 unsigned long removed
= 0;
4433 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4434 while (x
->forward
[i
] && x
->forward
[i
]->score
< min
)
4438 /* We may have multiple elements with the same score, what we need
4439 * is to find the element with both the right score and object. */
4441 while (x
&& x
->score
<= max
) {
4442 zskiplistNode
*next
;
4444 for (i
= 0; i
< zsl
->level
; i
++) {
4445 if (update
[i
]->forward
[i
] != x
) break;
4446 update
[i
]->forward
[i
] = x
->forward
[i
];
4448 if (x
->forward
[0]) {
4449 x
->forward
[0]->backward
= (x
->backward
== zsl
->header
) ?
4452 zsl
->tail
= x
->backward
;
4454 next
= x
->forward
[0];
4455 dictDelete(dict
,x
->obj
);
4457 while(zsl
->level
> 1 && zsl
->header
->forward
[zsl
->level
-1] == NULL
)
4463 return removed
; /* not found */
4466 /* Find the first node having a score equal or greater than the specified one.
4467 * Returns NULL if there is no match. */
4468 static zskiplistNode
*zslFirstWithScore(zskiplist
*zsl
, double score
) {
4473 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4474 while (x
->forward
[i
] && x
->forward
[i
]->score
< score
)
4477 /* We may have multiple elements with the same score, what we need
4478 * is to find the element with both the right score and object. */
4479 return x
->forward
[0];
4482 /* The actual Z-commands implementations */
4484 /* This generic command implements both ZADD and ZINCRBY.
4485 * scoreval is the score if the operation is a ZADD (doincrement == 0) or
4486 * the increment if the operation is a ZINCRBY (doincrement == 1). */
4487 static void zaddGenericCommand(redisClient
*c
, robj
*key
, robj
*ele
, double scoreval
, int doincrement
) {
4492 zsetobj
= lookupKeyWrite(c
->db
,key
);
4493 if (zsetobj
== NULL
) {
4494 zsetobj
= createZsetObject();
4495 dictAdd(c
->db
->dict
,key
,zsetobj
);
4498 if (zsetobj
->type
!= REDIS_ZSET
) {
4499 addReply(c
,shared
.wrongtypeerr
);
4505 /* Ok now since we implement both ZADD and ZINCRBY here the code
4506 * needs to handle the two different conditions. It's all about setting
4507 * '*score', that is, the new score to set, to the right value. */
4508 score
= zmalloc(sizeof(double));
4512 /* Read the old score. If the element was not present starts from 0 */
4513 de
= dictFind(zs
->dict
,ele
);
4515 double *oldscore
= dictGetEntryVal(de
);
4516 *score
= *oldscore
+ scoreval
;
4524 /* What follows is a simple remove and re-insert operation that is common
4525 * to both ZADD and ZINCRBY... */
4526 if (dictAdd(zs
->dict
,ele
,score
) == DICT_OK
) {
4527 /* case 1: New element */
4528 incrRefCount(ele
); /* added to hash */
4529 zslInsert(zs
->zsl
,*score
,ele
);
4530 incrRefCount(ele
); /* added to skiplist */
4533 addReplyDouble(c
,*score
);
4535 addReply(c
,shared
.cone
);
4540 /* case 2: Score update operation */
4541 de
= dictFind(zs
->dict
,ele
);
4542 redisAssert(de
!= NULL
);
4543 oldscore
= dictGetEntryVal(de
);
4544 if (*score
!= *oldscore
) {
4547 /* Remove and insert the element in the skip list with new score */
4548 deleted
= zslDelete(zs
->zsl
,*oldscore
,ele
);
4549 redisAssert(deleted
!= 0);
4550 zslInsert(zs
->zsl
,*score
,ele
);
4552 /* Update the score in the hash table */
4553 dictReplace(zs
->dict
,ele
,score
);
4559 addReplyDouble(c
,*score
);
4561 addReply(c
,shared
.czero
);
4565 static void zaddCommand(redisClient
*c
) {
4568 scoreval
= strtod(c
->argv
[2]->ptr
,NULL
);
4569 zaddGenericCommand(c
,c
->argv
[1],c
->argv
[3],scoreval
,0);
4572 static void zincrbyCommand(redisClient
*c
) {
4575 scoreval
= strtod(c
->argv
[2]->ptr
,NULL
);
4576 zaddGenericCommand(c
,c
->argv
[1],c
->argv
[3],scoreval
,1);
4579 static void zremCommand(redisClient
*c
) {
4583 zsetobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4584 if (zsetobj
== NULL
) {
4585 addReply(c
,shared
.czero
);
4591 if (zsetobj
->type
!= REDIS_ZSET
) {
4592 addReply(c
,shared
.wrongtypeerr
);
4596 de
= dictFind(zs
->dict
,c
->argv
[2]);
4598 addReply(c
,shared
.czero
);
4601 /* Delete from the skiplist */
4602 oldscore
= dictGetEntryVal(de
);
4603 deleted
= zslDelete(zs
->zsl
,*oldscore
,c
->argv
[2]);
4604 redisAssert(deleted
!= 0);
4606 /* Delete from the hash table */
4607 dictDelete(zs
->dict
,c
->argv
[2]);
4608 if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
);
4610 addReply(c
,shared
.cone
);
4614 static void zremrangebyscoreCommand(redisClient
*c
) {
4615 double min
= strtod(c
->argv
[2]->ptr
,NULL
);
4616 double max
= strtod(c
->argv
[3]->ptr
,NULL
);
4620 zsetobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4621 if (zsetobj
== NULL
) {
4622 addReply(c
,shared
.czero
);
4626 if (zsetobj
->type
!= REDIS_ZSET
) {
4627 addReply(c
,shared
.wrongtypeerr
);
4631 deleted
= zslDeleteRange(zs
->zsl
,min
,max
,zs
->dict
);
4632 if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
);
4633 server
.dirty
+= deleted
;
4634 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",deleted
));
4638 static void zrangeGenericCommand(redisClient
*c
, int reverse
) {
4640 int start
= atoi(c
->argv
[2]->ptr
);
4641 int end
= atoi(c
->argv
[3]->ptr
);
4644 if (c
->argc
== 5 && !strcasecmp(c
->argv
[4]->ptr
,"withscores")) {
4646 } else if (c
->argc
>= 5) {
4647 addReply(c
,shared
.syntaxerr
);
4651 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4653 addReply(c
,shared
.nullmultibulk
);
4655 if (o
->type
!= REDIS_ZSET
) {
4656 addReply(c
,shared
.wrongtypeerr
);
4658 zset
*zsetobj
= o
->ptr
;
4659 zskiplist
*zsl
= zsetobj
->zsl
;
4662 int llen
= zsl
->length
;
4666 /* convert negative indexes */
4667 if (start
< 0) start
= llen
+start
;
4668 if (end
< 0) end
= llen
+end
;
4669 if (start
< 0) start
= 0;
4670 if (end
< 0) end
= 0;
4672 /* indexes sanity checks */
4673 if (start
> end
|| start
>= llen
) {
4674 /* Out of range start or start > end result in empty list */
4675 addReply(c
,shared
.emptymultibulk
);
4678 if (end
>= llen
) end
= llen
-1;
4679 rangelen
= (end
-start
)+1;
4681 /* Return the result in form of a multi-bulk reply */
4687 ln
= zsl
->header
->forward
[0];
4689 ln
= ln
->forward
[0];
4692 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",
4693 withscores
? (rangelen
*2) : rangelen
));
4694 for (j
= 0; j
< rangelen
; j
++) {
4696 addReplyBulkLen(c
,ele
);
4698 addReply(c
,shared
.crlf
);
4700 addReplyDouble(c
,ln
->score
);
4701 ln
= reverse
? ln
->backward
: ln
->forward
[0];
4707 static void zrangeCommand(redisClient
*c
) {
4708 zrangeGenericCommand(c
,0);
4711 static void zrevrangeCommand(redisClient
*c
) {
4712 zrangeGenericCommand(c
,1);
4715 static void zrangebyscoreCommand(redisClient
*c
) {
4717 double min
= strtod(c
->argv
[2]->ptr
,NULL
);
4718 double max
= strtod(c
->argv
[3]->ptr
,NULL
);
4719 int offset
= 0, limit
= -1;
4721 if (c
->argc
!= 4 && c
->argc
!= 7) {
4723 sdsnew("-ERR wrong number of arguments for ZRANGEBYSCORE\r\n"));
4725 } else if (c
->argc
== 7 && strcasecmp(c
->argv
[4]->ptr
,"limit")) {
4726 addReply(c
,shared
.syntaxerr
);
4728 } else if (c
->argc
== 7) {
4729 offset
= atoi(c
->argv
[5]->ptr
);
4730 limit
= atoi(c
->argv
[6]->ptr
);
4731 if (offset
< 0) offset
= 0;
4734 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4736 addReply(c
,shared
.nullmultibulk
);
4738 if (o
->type
!= REDIS_ZSET
) {
4739 addReply(c
,shared
.wrongtypeerr
);
4741 zset
*zsetobj
= o
->ptr
;
4742 zskiplist
*zsl
= zsetobj
->zsl
;
4745 unsigned int rangelen
= 0;
4747 /* Get the first node with the score >= min */
4748 ln
= zslFirstWithScore(zsl
,min
);
4750 /* No element matching the speciifed interval */
4751 addReply(c
,shared
.emptymultibulk
);
4755 /* We don't know in advance how many matching elements there
4756 * are in the list, so we push this object that will represent
4757 * the multi-bulk length in the output buffer, and will "fix"
4759 lenobj
= createObject(REDIS_STRING
,NULL
);
4761 decrRefCount(lenobj
);
4763 while(ln
&& ln
->score
<= max
) {
4766 ln
= ln
->forward
[0];
4769 if (limit
== 0) break;
4771 addReplyBulkLen(c
,ele
);
4773 addReply(c
,shared
.crlf
);
4774 ln
= ln
->forward
[0];
4776 if (limit
> 0) limit
--;
4778 lenobj
->ptr
= sdscatprintf(sdsempty(),"*%d\r\n",rangelen
);
4783 static void zcardCommand(redisClient
*c
) {
4787 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4789 addReply(c
,shared
.czero
);
4792 if (o
->type
!= REDIS_ZSET
) {
4793 addReply(c
,shared
.wrongtypeerr
);
4796 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",zs
->zsl
->length
));
4801 static void zscoreCommand(redisClient
*c
) {
4805 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4807 addReply(c
,shared
.nullbulk
);
4810 if (o
->type
!= REDIS_ZSET
) {
4811 addReply(c
,shared
.wrongtypeerr
);
4816 de
= dictFind(zs
->dict
,c
->argv
[2]);
4818 addReply(c
,shared
.nullbulk
);
4820 double *score
= dictGetEntryVal(de
);
4822 addReplyDouble(c
,*score
);
4828 /* ========================= Non type-specific commands ==================== */
4830 static void flushdbCommand(redisClient
*c
) {
4831 server
.dirty
+= dictSize(c
->db
->dict
);
4832 dictEmpty(c
->db
->dict
);
4833 dictEmpty(c
->db
->expires
);
4834 addReply(c
,shared
.ok
);
4837 static void flushallCommand(redisClient
*c
) {
4838 server
.dirty
+= emptyDb();
4839 addReply(c
,shared
.ok
);
4840 rdbSave(server
.dbfilename
);
4844 static redisSortOperation
*createSortOperation(int type
, robj
*pattern
) {
4845 redisSortOperation
*so
= zmalloc(sizeof(*so
));
4847 so
->pattern
= pattern
;
4851 /* Return the value associated to the key with a name obtained
4852 * substituting the first occurence of '*' in 'pattern' with 'subst' */
4853 static robj
*lookupKeyByPattern(redisDb
*db
, robj
*pattern
, robj
*subst
) {
4857 int prefixlen
, sublen
, postfixlen
;
4858 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
4862 char buf
[REDIS_SORTKEY_MAX
+1];
4865 /* If the pattern is "#" return the substitution object itself in order
4866 * to implement the "SORT ... GET #" feature. */
4867 spat
= pattern
->ptr
;
4868 if (spat
[0] == '#' && spat
[1] == '\0') {
4872 /* The substitution object may be specially encoded. If so we create
4873 * a decoded object on the fly. Otherwise getDecodedObject will just
4874 * increment the ref count, that we'll decrement later. */
4875 subst
= getDecodedObject(subst
);
4878 if (sdslen(spat
)+sdslen(ssub
)-1 > REDIS_SORTKEY_MAX
) return NULL
;
4879 p
= strchr(spat
,'*');
4881 decrRefCount(subst
);
4886 sublen
= sdslen(ssub
);
4887 postfixlen
= sdslen(spat
)-(prefixlen
+1);
4888 memcpy(keyname
.buf
,spat
,prefixlen
);
4889 memcpy(keyname
.buf
+prefixlen
,ssub
,sublen
);
4890 memcpy(keyname
.buf
+prefixlen
+sublen
,p
+1,postfixlen
);
4891 keyname
.buf
[prefixlen
+sublen
+postfixlen
] = '\0';
4892 keyname
.len
= prefixlen
+sublen
+postfixlen
;
4894 initStaticStringObject(keyobj
,((char*)&keyname
)+(sizeof(long)*2))
4895 decrRefCount(subst
);
4897 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
4898 return lookupKeyRead(db
,&keyobj
);
4901 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
4902 * the additional parameter is not standard but a BSD-specific we have to
4903 * pass sorting parameters via the global 'server' structure */
4904 static int sortCompare(const void *s1
, const void *s2
) {
4905 const redisSortObject
*so1
= s1
, *so2
= s2
;
4908 if (!server
.sort_alpha
) {
4909 /* Numeric sorting. Here it's trivial as we precomputed scores */
4910 if (so1
->u
.score
> so2
->u
.score
) {
4912 } else if (so1
->u
.score
< so2
->u
.score
) {
4918 /* Alphanumeric sorting */
4919 if (server
.sort_bypattern
) {
4920 if (!so1
->u
.cmpobj
|| !so2
->u
.cmpobj
) {
4921 /* At least one compare object is NULL */
4922 if (so1
->u
.cmpobj
== so2
->u
.cmpobj
)
4924 else if (so1
->u
.cmpobj
== NULL
)
4929 /* We have both the objects, use strcoll */
4930 cmp
= strcoll(so1
->u
.cmpobj
->ptr
,so2
->u
.cmpobj
->ptr
);
4933 /* Compare elements directly */
4936 dec1
= getDecodedObject(so1
->obj
);
4937 dec2
= getDecodedObject(so2
->obj
);
4938 cmp
= strcoll(dec1
->ptr
,dec2
->ptr
);
4943 return server
.sort_desc
? -cmp
: cmp
;
4946 /* The SORT command is the most complex command in Redis. Warning: this code
4947 * is optimized for speed and a bit less for readability */
4948 static void sortCommand(redisClient
*c
) {
4951 int desc
= 0, alpha
= 0;
4952 int limit_start
= 0, limit_count
= -1, start
, end
;
4953 int j
, dontsort
= 0, vectorlen
;
4954 int getop
= 0; /* GET operation counter */
4955 robj
*sortval
, *sortby
= NULL
, *storekey
= NULL
;
4956 redisSortObject
*vector
; /* Resulting vector to sort */
4958 /* Lookup the key to sort. It must be of the right types */
4959 sortval
= lookupKeyRead(c
->db
,c
->argv
[1]);
4960 if (sortval
== NULL
) {
4961 addReply(c
,shared
.nullmultibulk
);
4964 if (sortval
->type
!= REDIS_SET
&& sortval
->type
!= REDIS_LIST
&&
4965 sortval
->type
!= REDIS_ZSET
)
4967 addReply(c
,shared
.wrongtypeerr
);
4971 /* Create a list of operations to perform for every sorted element.
4972 * Operations can be GET/DEL/INCR/DECR */
4973 operations
= listCreate();
4974 listSetFreeMethod(operations
,zfree
);
4977 /* Now we need to protect sortval incrementing its count, in the future
4978 * SORT may have options able to overwrite/delete keys during the sorting
4979 * and the sorted key itself may get destroied */
4980 incrRefCount(sortval
);
4982 /* The SORT command has an SQL-alike syntax, parse it */
4983 while(j
< c
->argc
) {
4984 int leftargs
= c
->argc
-j
-1;
4985 if (!strcasecmp(c
->argv
[j
]->ptr
,"asc")) {
4987 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"desc")) {
4989 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"alpha")) {
4991 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"limit") && leftargs
>= 2) {
4992 limit_start
= atoi(c
->argv
[j
+1]->ptr
);
4993 limit_count
= atoi(c
->argv
[j
+2]->ptr
);
4995 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"store") && leftargs
>= 1) {
4996 storekey
= c
->argv
[j
+1];
4998 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"by") && leftargs
>= 1) {
4999 sortby
= c
->argv
[j
+1];
5000 /* If the BY pattern does not contain '*', i.e. it is constant,
5001 * we don't need to sort nor to lookup the weight keys. */
5002 if (strchr(c
->argv
[j
+1]->ptr
,'*') == NULL
) dontsort
= 1;
5004 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"get") && leftargs
>= 1) {
5005 listAddNodeTail(operations
,createSortOperation(
5006 REDIS_SORT_GET
,c
->argv
[j
+1]));
5010 decrRefCount(sortval
);
5011 listRelease(operations
);
5012 addReply(c
,shared
.syntaxerr
);
5018 /* Load the sorting vector with all the objects to sort */
5019 switch(sortval
->type
) {
5020 case REDIS_LIST
: vectorlen
= listLength((list
*)sortval
->ptr
); break;
5021 case REDIS_SET
: vectorlen
= dictSize((dict
*)sortval
->ptr
); break;
5022 case REDIS_ZSET
: vectorlen
= dictSize(((zset
*)sortval
->ptr
)->dict
); break;
5023 default: vectorlen
= 0; redisAssert(0); /* Avoid GCC warning */
5025 vector
= zmalloc(sizeof(redisSortObject
)*vectorlen
);
5028 if (sortval
->type
== REDIS_LIST
) {
5029 list
*list
= sortval
->ptr
;
5033 while((ln
= listYield(list
))) {
5034 robj
*ele
= ln
->value
;
5035 vector
[j
].obj
= ele
;
5036 vector
[j
].u
.score
= 0;
5037 vector
[j
].u
.cmpobj
= NULL
;
5045 if (sortval
->type
== REDIS_SET
) {
5048 zset
*zs
= sortval
->ptr
;
5052 di
= dictGetIterator(set
);
5053 while((setele
= dictNext(di
)) != NULL
) {
5054 vector
[j
].obj
= dictGetEntryKey(setele
);
5055 vector
[j
].u
.score
= 0;
5056 vector
[j
].u
.cmpobj
= NULL
;
5059 dictReleaseIterator(di
);
5061 redisAssert(j
== vectorlen
);
5063 /* Now it's time to load the right scores in the sorting vector */
5064 if (dontsort
== 0) {
5065 for (j
= 0; j
< vectorlen
; j
++) {
5069 byval
= lookupKeyByPattern(c
->db
,sortby
,vector
[j
].obj
);
5070 if (!byval
|| byval
->type
!= REDIS_STRING
) continue;
5072 vector
[j
].u
.cmpobj
= getDecodedObject(byval
);
5074 if (byval
->encoding
== REDIS_ENCODING_RAW
) {
5075 vector
[j
].u
.score
= strtod(byval
->ptr
,NULL
);
5077 /* Don't need to decode the object if it's
5078 * integer-encoded (the only encoding supported) so
5079 * far. We can just cast it */
5080 if (byval
->encoding
== REDIS_ENCODING_INT
) {
5081 vector
[j
].u
.score
= (long)byval
->ptr
;
5083 redisAssert(1 != 1);
5088 if (vector
[j
].obj
->encoding
== REDIS_ENCODING_RAW
)
5089 vector
[j
].u
.score
= strtod(vector
[j
].obj
->ptr
,NULL
);
5091 if (vector
[j
].obj
->encoding
== REDIS_ENCODING_INT
)
5092 vector
[j
].u
.score
= (long) vector
[j
].obj
->ptr
;
5094 redisAssert(1 != 1);
5101 /* We are ready to sort the vector... perform a bit of sanity check
5102 * on the LIMIT option too. We'll use a partial version of quicksort. */
5103 start
= (limit_start
< 0) ? 0 : limit_start
;
5104 end
= (limit_count
< 0) ? vectorlen
-1 : start
+limit_count
-1;
5105 if (start
>= vectorlen
) {
5106 start
= vectorlen
-1;
5109 if (end
>= vectorlen
) end
= vectorlen
-1;
5111 if (dontsort
== 0) {
5112 server
.sort_desc
= desc
;
5113 server
.sort_alpha
= alpha
;
5114 server
.sort_bypattern
= sortby
? 1 : 0;
5115 if (sortby
&& (start
!= 0 || end
!= vectorlen
-1))
5116 pqsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
, start
,end
);
5118 qsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
);
5121 /* Send command output to the output buffer, performing the specified
5122 * GET/DEL/INCR/DECR operations if any. */
5123 outputlen
= getop
? getop
*(end
-start
+1) : end
-start
+1;
5124 if (storekey
== NULL
) {
5125 /* STORE option not specified, sent the sorting result to client */
5126 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",outputlen
));
5127 for (j
= start
; j
<= end
; j
++) {
5130 addReplyBulkLen(c
,vector
[j
].obj
);
5131 addReply(c
,vector
[j
].obj
);
5132 addReply(c
,shared
.crlf
);
5134 listRewind(operations
);
5135 while((ln
= listYield(operations
))) {
5136 redisSortOperation
*sop
= ln
->value
;
5137 robj
*val
= lookupKeyByPattern(c
->db
,sop
->pattern
,
5140 if (sop
->type
== REDIS_SORT_GET
) {
5141 if (!val
|| val
->type
!= REDIS_STRING
) {
5142 addReply(c
,shared
.nullbulk
);
5144 addReplyBulkLen(c
,val
);
5146 addReply(c
,shared
.crlf
);
5149 redisAssert(sop
->type
== REDIS_SORT_GET
); /* always fails */
5154 robj
*listObject
= createListObject();
5155 list
*listPtr
= (list
*) listObject
->ptr
;
5157 /* STORE option specified, set the sorting result as a List object */
5158 for (j
= start
; j
<= end
; j
++) {
5161 listAddNodeTail(listPtr
,vector
[j
].obj
);
5162 incrRefCount(vector
[j
].obj
);
5164 listRewind(operations
);
5165 while((ln
= listYield(operations
))) {
5166 redisSortOperation
*sop
= ln
->value
;
5167 robj
*val
= lookupKeyByPattern(c
->db
,sop
->pattern
,
5170 if (sop
->type
== REDIS_SORT_GET
) {
5171 if (!val
|| val
->type
!= REDIS_STRING
) {
5172 listAddNodeTail(listPtr
,createStringObject("",0));
5174 listAddNodeTail(listPtr
,val
);
5178 redisAssert(sop
->type
== REDIS_SORT_GET
); /* always fails */
5182 if (dictReplace(c
->db
->dict
,storekey
,listObject
)) {
5183 incrRefCount(storekey
);
5185 /* Note: we add 1 because the DB is dirty anyway since even if the
5186 * SORT result is empty a new key is set and maybe the old content
5188 server
.dirty
+= 1+outputlen
;
5189 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",outputlen
));
5193 decrRefCount(sortval
);
5194 listRelease(operations
);
5195 for (j
= 0; j
< vectorlen
; j
++) {
5196 if (sortby
&& alpha
&& vector
[j
].u
.cmpobj
)
5197 decrRefCount(vector
[j
].u
.cmpobj
);
5202 /* Create the string returned by the INFO command. This is decoupled
5203 * by the INFO command itself as we need to report the same information
5204 * on memory corruption problems. */
5205 static sds
genRedisInfoString(void) {
5207 time_t uptime
= time(NULL
)-server
.stat_starttime
;
5210 info
= sdscatprintf(sdsempty(),
5211 "redis_version:%s\r\n"
5213 "multiplexing_api:%s\r\n"
5214 "uptime_in_seconds:%ld\r\n"
5215 "uptime_in_days:%ld\r\n"
5216 "connected_clients:%d\r\n"
5217 "connected_slaves:%d\r\n"
5218 "blocked_clients:%d\r\n"
5219 "used_memory:%zu\r\n"
5220 "changes_since_last_save:%lld\r\n"
5221 "bgsave_in_progress:%d\r\n"
5222 "last_save_time:%ld\r\n"
5223 "bgrewriteaof_in_progress:%d\r\n"
5224 "total_connections_received:%lld\r\n"
5225 "total_commands_processed:%lld\r\n"
5228 (sizeof(long) == 8) ? "64" : "32",
5232 listLength(server
.clients
)-listLength(server
.slaves
),
5233 listLength(server
.slaves
),
5234 server
.blockedclients
,
5237 server
.bgsavechildpid
!= -1,
5239 server
.bgrewritechildpid
!= -1,
5240 server
.stat_numconnections
,
5241 server
.stat_numcommands
,
5242 server
.masterhost
== NULL
? "master" : "slave"
5244 if (server
.masterhost
) {
5245 info
= sdscatprintf(info
,
5246 "master_host:%s\r\n"
5247 "master_port:%d\r\n"
5248 "master_link_status:%s\r\n"
5249 "master_last_io_seconds_ago:%d\r\n"
5252 (server
.replstate
== REDIS_REPL_CONNECTED
) ?
5254 server
.master
? ((int)(time(NULL
)-server
.master
->lastinteraction
)) : -1
5257 for (j
= 0; j
< server
.dbnum
; j
++) {
5258 long long keys
, vkeys
;
5260 keys
= dictSize(server
.db
[j
].dict
);
5261 vkeys
= dictSize(server
.db
[j
].expires
);
5262 if (keys
|| vkeys
) {
5263 info
= sdscatprintf(info
, "db%d:keys=%lld,expires=%lld\r\n",
5270 static void infoCommand(redisClient
*c
) {
5271 sds info
= genRedisInfoString();
5272 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n",
5273 (unsigned long)sdslen(info
)));
5274 addReplySds(c
,info
);
5275 addReply(c
,shared
.crlf
);
5278 static void monitorCommand(redisClient
*c
) {
5279 /* ignore MONITOR if aleady slave or in monitor mode */
5280 if (c
->flags
& REDIS_SLAVE
) return;
5282 c
->flags
|= (REDIS_SLAVE
|REDIS_MONITOR
);
5284 listAddNodeTail(server
.monitors
,c
);
5285 addReply(c
,shared
.ok
);
5288 /* ================================= Expire ================================= */
5289 static int removeExpire(redisDb
*db
, robj
*key
) {
5290 if (dictDelete(db
->expires
,key
) == DICT_OK
) {
5297 static int setExpire(redisDb
*db
, robj
*key
, time_t when
) {
5298 if (dictAdd(db
->expires
,key
,(void*)when
) == DICT_ERR
) {
5306 /* Return the expire time of the specified key, or -1 if no expire
5307 * is associated with this key (i.e. the key is non volatile) */
5308 static time_t getExpire(redisDb
*db
, robj
*key
) {
5311 /* No expire? return ASAP */
5312 if (dictSize(db
->expires
) == 0 ||
5313 (de
= dictFind(db
->expires
,key
)) == NULL
) return -1;
5315 return (time_t) dictGetEntryVal(de
);
5318 static int expireIfNeeded(redisDb
*db
, robj
*key
) {
5322 /* No expire? return ASAP */
5323 if (dictSize(db
->expires
) == 0 ||
5324 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
5326 /* Lookup the expire */
5327 when
= (time_t) dictGetEntryVal(de
);
5328 if (time(NULL
) <= when
) return 0;
5330 /* Delete the key */
5331 dictDelete(db
->expires
,key
);
5332 return dictDelete(db
->dict
,key
) == DICT_OK
;
5335 static int deleteIfVolatile(redisDb
*db
, robj
*key
) {
5338 /* No expire? return ASAP */
5339 if (dictSize(db
->expires
) == 0 ||
5340 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
5342 /* Delete the key */
5344 dictDelete(db
->expires
,key
);
5345 return dictDelete(db
->dict
,key
) == DICT_OK
;
5348 static void expireGenericCommand(redisClient
*c
, robj
*key
, time_t seconds
) {
5351 de
= dictFind(c
->db
->dict
,key
);
5353 addReply(c
,shared
.czero
);
5357 if (deleteKey(c
->db
,key
)) server
.dirty
++;
5358 addReply(c
, shared
.cone
);
5361 time_t when
= time(NULL
)+seconds
;
5362 if (setExpire(c
->db
,key
,when
)) {
5363 addReply(c
,shared
.cone
);
5366 addReply(c
,shared
.czero
);
5372 static void expireCommand(redisClient
*c
) {
5373 expireGenericCommand(c
,c
->argv
[1],strtol(c
->argv
[2]->ptr
,NULL
,10));
5376 static void expireatCommand(redisClient
*c
) {
5377 expireGenericCommand(c
,c
->argv
[1],strtol(c
->argv
[2]->ptr
,NULL
,10)-time(NULL
));
5380 static void ttlCommand(redisClient
*c
) {
5384 expire
= getExpire(c
->db
,c
->argv
[1]);
5386 ttl
= (int) (expire
-time(NULL
));
5387 if (ttl
< 0) ttl
= -1;
5389 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",ttl
));
5392 /* ================================ MULTI/EXEC ============================== */
5394 /* Client state initialization for MULTI/EXEC */
5395 static void initClientMultiState(redisClient
*c
) {
5396 c
->mstate
.commands
= NULL
;
5397 c
->mstate
.count
= 0;
5400 /* Release all the resources associated with MULTI/EXEC state */
5401 static void freeClientMultiState(redisClient
*c
) {
5404 for (j
= 0; j
< c
->mstate
.count
; j
++) {
5406 multiCmd
*mc
= c
->mstate
.commands
+j
;
5408 for (i
= 0; i
< mc
->argc
; i
++)
5409 decrRefCount(mc
->argv
[i
]);
5412 zfree(c
->mstate
.commands
);
5415 /* Add a new command into the MULTI commands queue */
5416 static void queueMultiCommand(redisClient
*c
, struct redisCommand
*cmd
) {
5420 c
->mstate
.commands
= zrealloc(c
->mstate
.commands
,
5421 sizeof(multiCmd
)*(c
->mstate
.count
+1));
5422 mc
= c
->mstate
.commands
+c
->mstate
.count
;
5425 mc
->argv
= zmalloc(sizeof(robj
*)*c
->argc
);
5426 memcpy(mc
->argv
,c
->argv
,sizeof(robj
*)*c
->argc
);
5427 for (j
= 0; j
< c
->argc
; j
++)
5428 incrRefCount(mc
->argv
[j
]);
5432 static void multiCommand(redisClient
*c
) {
5433 c
->flags
|= REDIS_MULTI
;
5434 addReply(c
,shared
.ok
);
5437 static void execCommand(redisClient
*c
) {
5442 if (!(c
->flags
& REDIS_MULTI
)) {
5443 addReplySds(c
,sdsnew("-ERR EXEC without MULTI\r\n"));
5447 orig_argv
= c
->argv
;
5448 orig_argc
= c
->argc
;
5449 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",c
->mstate
.count
));
5450 for (j
= 0; j
< c
->mstate
.count
; j
++) {
5451 c
->argc
= c
->mstate
.commands
[j
].argc
;
5452 c
->argv
= c
->mstate
.commands
[j
].argv
;
5453 call(c
,c
->mstate
.commands
[j
].cmd
);
5455 c
->argv
= orig_argv
;
5456 c
->argc
= orig_argc
;
5457 freeClientMultiState(c
);
5458 initClientMultiState(c
);
5459 c
->flags
&= (~REDIS_MULTI
);
5462 /* =========================== Blocking Operations ========================= */
5464 /* Currently Redis blocking operations support is limited to list POP ops,
5465 * so the current implementation is not fully generic, but it is also not
5466 * completely specific so it will not require a rewrite to support new
5467 * kind of blocking operations in the future.
5469 * Still it's important to note that list blocking operations can be already
5470 * used as a notification mechanism in order to implement other blocking
5471 * operations at application level, so there must be a very strong evidence
5472 * of usefulness and generality before new blocking operations are implemented.
5474 * This is how the current blocking POP works, we use BLPOP as example:
5475 * - If the user calls BLPOP and the key exists and contains a non empty list
5476 * then LPOP is called instead. So BLPOP is semantically the same as LPOP
5477 * if there is not to block.
5478 * - If instead BLPOP is called and the key does not exists or the list is
5479 * empty we need to block. In order to do so we remove the notification for
5480 * new data to read in the client socket (so that we'll not serve new
5481 * requests if the blocking request is not served). Also we put the client
5482 * in a dictionary (db->blockingkeys) mapping keys to a list of clients
5483 * blocking for this keys.
5484 * - If a PUSH operation against a key with blocked clients waiting is
5485 * performed, we serve the first in the list: basically instead to push
5486 * the new element inside the list we return it to the (first / oldest)
5487 * blocking client, unblock the client, and remove it form the list.
5489 * The above comment and the source code should be enough in order to understand
5490 * the implementation and modify / fix it later.
5493 /* Set a client in blocking mode for the specified key, with the specified
5495 static void blockForKey(redisClient
*c
, robj
*key
, time_t timeout
) {
5499 c
->blockingkey
= key
;
5501 c
->blockingto
= timeout
;
5502 de
= dictFind(c
->db
->blockingkeys
,key
);
5506 /* We take a list of clients blocked for a given key */
5508 retval
= dictAdd(c
->db
->blockingkeys
,key
,l
);
5510 assert(retval
== DICT_OK
);
5512 l
= dictGetEntryVal(de
);
5514 /* Add this client to the list, and mark it as blocked */
5515 listAddNodeTail(l
,c
);
5516 c
->flags
|= REDIS_BLOCKED
;
5517 aeDeleteFileEvent(server
.el
,c
->fd
,AE_READABLE
);
5518 server
.blockedclients
++;
5521 /* Unblock a client that's waiting in a blocking operation such as BLPOP */
5522 static void unblockClient(redisClient
*c
) {
5526 /* Remove this client from the list of clients waiting for this key. */
5527 assert(c
->blockingkey
!= NULL
);
5528 de
= dictFind(c
->db
->blockingkeys
,c
->blockingkey
);
5530 l
= dictGetEntryVal(de
);
5531 listDelNode(l
,listSearchKey(l
,c
));
5532 /* If the list is empty we need to remove it to avoid wasting memory */
5533 if (listLength(l
) == 0)
5534 dictDelete(c
->db
->blockingkeys
,c
->blockingkey
);
5535 /* Finally set the right flags in the client structure */
5536 decrRefCount(c
->blockingkey
);
5537 c
->blockingkey
= NULL
;
5538 c
->flags
&= (~REDIS_BLOCKED
);
5539 server
.blockedclients
--;
5540 /* Ok now we are ready to get read events from socket, note that we
5541 * can't trap errors here as it's possible that unblockClients() is
5542 * called from freeClient() itself, and the only thing we can do
5543 * if we failed to register the READABLE event is to kill the client.
5544 * Still the following function should never fail in the real world as
5545 * we are sure the file descriptor is sane, and we exit on out of mem. */
5546 aeCreateFileEvent(server
.el
, c
->fd
, AE_READABLE
, readQueryFromClient
, c
);
5547 /* As a final step we want to process data if there is some command waiting
5548 * in the input buffer. Note that this is safe even if unblockClient()
5549 * gets called from freeClient() because freeClient() will be smart
5550 * enough to call this function *after* c->querybuf was set to NULL. */
5551 if (c
->querybuf
&& sdslen(c
->querybuf
) > 0) processInputBuffer(c
);
5554 /* This should be called from any function PUSHing into lists.
5555 * 'c' is the "pushing client", 'key' is the key it is pushing data against,
5556 * 'ele' is the element pushed.
5558 * If the function returns 0 there was no client waiting for a list push
5561 * If the function returns 1 there was a client waiting for a list push
5562 * against this key, the element was passed to this client thus it's not
5563 * needed to actually add it to the list and the caller should return asap. */
5564 static int handleClientsWaitingListPush(redisClient
*c
, robj
*key
, robj
*ele
) {
5565 struct dictEntry
*de
;
5566 redisClient
*receiver
;
5570 de
= dictFind(c
->db
->blockingkeys
,key
);
5571 if (de
== NULL
) return 0;
5572 l
= dictGetEntryVal(de
);
5575 receiver
= ln
->value
;
5577 addReplyBulkLen(receiver
,ele
);
5578 addReply(receiver
,ele
);
5579 addReply(receiver
,shared
.crlf
);
5580 unblockClient(receiver
);
5584 /* Blocking RPOP/LPOP */
5585 static void blockingPopGenericCommand(redisClient
*c
, int where
) {
5589 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
5591 if (o
->type
!= REDIS_LIST
) {
5592 popGenericCommand(c
,where
);
5595 list
*list
= o
->ptr
;
5596 if (listLength(list
) != 0) {
5597 /* If the list contains elements fall back to the usual
5598 * non-blocking POP operation */
5599 popGenericCommand(c
,where
);
5604 /* If the list is empty or the key does not exists we must block */
5605 timeout
= strtol(c
->argv
[2]->ptr
,NULL
,10);
5606 if (timeout
> 0) timeout
+= time(NULL
);
5607 blockForKey(c
,c
->argv
[1],timeout
);
5610 static void blpopCommand(redisClient
*c
) {
5611 blockingPopGenericCommand(c
,REDIS_HEAD
);
5614 static void brpopCommand(redisClient
*c
) {
5615 blockingPopGenericCommand(c
,REDIS_TAIL
);
5618 /* =============================== Replication ============================= */
5620 static int syncWrite(int fd
, char *ptr
, ssize_t size
, int timeout
) {
5621 ssize_t nwritten
, ret
= size
;
5622 time_t start
= time(NULL
);
5626 if (aeWait(fd
,AE_WRITABLE
,1000) & AE_WRITABLE
) {
5627 nwritten
= write(fd
,ptr
,size
);
5628 if (nwritten
== -1) return -1;
5632 if ((time(NULL
)-start
) > timeout
) {
5640 static int syncRead(int fd
, char *ptr
, ssize_t size
, int timeout
) {
5641 ssize_t nread
, totread
= 0;
5642 time_t start
= time(NULL
);
5646 if (aeWait(fd
,AE_READABLE
,1000) & AE_READABLE
) {
5647 nread
= read(fd
,ptr
,size
);
5648 if (nread
== -1) return -1;
5653 if ((time(NULL
)-start
) > timeout
) {
5661 static int syncReadLine(int fd
, char *ptr
, ssize_t size
, int timeout
) {
5668 if (syncRead(fd
,&c
,1,timeout
) == -1) return -1;
5671 if (nread
&& *(ptr
-1) == '\r') *(ptr
-1) = '\0';
5682 static void syncCommand(redisClient
*c
) {
5683 /* ignore SYNC if aleady slave or in monitor mode */
5684 if (c
->flags
& REDIS_SLAVE
) return;
5686 /* SYNC can't be issued when the server has pending data to send to
5687 * the client about already issued commands. We need a fresh reply
5688 * buffer registering the differences between the BGSAVE and the current
5689 * dataset, so that we can copy to other slaves if needed. */
5690 if (listLength(c
->reply
) != 0) {
5691 addReplySds(c
,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
5695 redisLog(REDIS_NOTICE
,"Slave ask for synchronization");
5696 /* Here we need to check if there is a background saving operation
5697 * in progress, or if it is required to start one */
5698 if (server
.bgsavechildpid
!= -1) {
5699 /* Ok a background save is in progress. Let's check if it is a good
5700 * one for replication, i.e. if there is another slave that is
5701 * registering differences since the server forked to save */
5705 listRewind(server
.slaves
);
5706 while((ln
= listYield(server
.slaves
))) {
5708 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) break;
5711 /* Perfect, the server is already registering differences for
5712 * another slave. Set the right state, and copy the buffer. */
5713 listRelease(c
->reply
);
5714 c
->reply
= listDup(slave
->reply
);
5715 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
5716 redisLog(REDIS_NOTICE
,"Waiting for end of BGSAVE for SYNC");
5718 /* No way, we need to wait for the next BGSAVE in order to
5719 * register differences */
5720 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_START
;
5721 redisLog(REDIS_NOTICE
,"Waiting for next BGSAVE for SYNC");
5724 /* Ok we don't have a BGSAVE in progress, let's start one */
5725 redisLog(REDIS_NOTICE
,"Starting BGSAVE for SYNC");
5726 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
5727 redisLog(REDIS_NOTICE
,"Replication failed, can't BGSAVE");
5728 addReplySds(c
,sdsnew("-ERR Unalbe to perform background save\r\n"));
5731 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
5734 c
->flags
|= REDIS_SLAVE
;
5736 listAddNodeTail(server
.slaves
,c
);
5740 static void sendBulkToSlave(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
5741 redisClient
*slave
= privdata
;
5743 REDIS_NOTUSED(mask
);
5744 char buf
[REDIS_IOBUF_LEN
];
5745 ssize_t nwritten
, buflen
;
5747 if (slave
->repldboff
== 0) {
5748 /* Write the bulk write count before to transfer the DB. In theory here
5749 * we don't know how much room there is in the output buffer of the
5750 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
5751 * operations) will never be smaller than the few bytes we need. */
5754 bulkcount
= sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
5756 if (write(fd
,bulkcount
,sdslen(bulkcount
)) != (signed)sdslen(bulkcount
))
5764 lseek(slave
->repldbfd
,slave
->repldboff
,SEEK_SET
);
5765 buflen
= read(slave
->repldbfd
,buf
,REDIS_IOBUF_LEN
);
5767 redisLog(REDIS_WARNING
,"Read error sending DB to slave: %s",
5768 (buflen
== 0) ? "premature EOF" : strerror(errno
));
5772 if ((nwritten
= write(fd
,buf
,buflen
)) == -1) {
5773 redisLog(REDIS_DEBUG
,"Write error sending DB to slave: %s",
5778 slave
->repldboff
+= nwritten
;
5779 if (slave
->repldboff
== slave
->repldbsize
) {
5780 close(slave
->repldbfd
);
5781 slave
->repldbfd
= -1;
5782 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
5783 slave
->replstate
= REDIS_REPL_ONLINE
;
5784 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
,
5785 sendReplyToClient
, slave
) == AE_ERR
) {
5789 addReplySds(slave
,sdsempty());
5790 redisLog(REDIS_NOTICE
,"Synchronization with slave succeeded");
5794 /* This function is called at the end of every backgrond saving.
5795 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
5796 * otherwise REDIS_ERR is passed to the function.
5798 * The goal of this function is to handle slaves waiting for a successful
5799 * background saving in order to perform non-blocking synchronization. */
5800 static void updateSlavesWaitingBgsave(int bgsaveerr
) {
5802 int startbgsave
= 0;
5804 listRewind(server
.slaves
);
5805 while((ln
= listYield(server
.slaves
))) {
5806 redisClient
*slave
= ln
->value
;
5808 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) {
5810 slave
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
5811 } else if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) {
5812 struct redis_stat buf
;
5814 if (bgsaveerr
!= REDIS_OK
) {
5816 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE child returned an error");
5819 if ((slave
->repldbfd
= open(server
.dbfilename
,O_RDONLY
)) == -1 ||
5820 redis_fstat(slave
->repldbfd
,&buf
) == -1) {
5822 redisLog(REDIS_WARNING
,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno
));
5825 slave
->repldboff
= 0;
5826 slave
->repldbsize
= buf
.st_size
;
5827 slave
->replstate
= REDIS_REPL_SEND_BULK
;
5828 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
5829 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
, sendBulkToSlave
, slave
) == AE_ERR
) {
5836 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
5837 listRewind(server
.slaves
);
5838 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE failed");
5839 while((ln
= listYield(server
.slaves
))) {
5840 redisClient
*slave
= ln
->value
;
5842 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
)
5849 static int syncWithMaster(void) {
5850 char buf
[1024], tmpfile
[256], authcmd
[1024];
5852 int fd
= anetTcpConnect(NULL
,server
.masterhost
,server
.masterport
);
5856 redisLog(REDIS_WARNING
,"Unable to connect to MASTER: %s",
5861 /* AUTH with the master if required. */
5862 if(server
.masterauth
) {
5863 snprintf(authcmd
, 1024, "AUTH %s\r\n", server
.masterauth
);
5864 if (syncWrite(fd
, authcmd
, strlen(server
.masterauth
)+7, 5) == -1) {
5866 redisLog(REDIS_WARNING
,"Unable to AUTH to MASTER: %s",
5870 /* Read the AUTH result. */
5871 if (syncReadLine(fd
,buf
,1024,3600) == -1) {
5873 redisLog(REDIS_WARNING
,"I/O error reading auth result from MASTER: %s",
5877 if (buf
[0] != '+') {
5879 redisLog(REDIS_WARNING
,"Cannot AUTH to MASTER, is the masterauth password correct?");
5884 /* Issue the SYNC command */
5885 if (syncWrite(fd
,"SYNC \r\n",7,5) == -1) {
5887 redisLog(REDIS_WARNING
,"I/O error writing to MASTER: %s",
5891 /* Read the bulk write count */
5892 if (syncReadLine(fd
,buf
,1024,3600) == -1) {
5894 redisLog(REDIS_WARNING
,"I/O error reading bulk count from MASTER: %s",
5898 if (buf
[0] != '$') {
5900 redisLog(REDIS_WARNING
,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
5903 dumpsize
= atoi(buf
+1);
5904 redisLog(REDIS_NOTICE
,"Receiving %d bytes data dump from MASTER",dumpsize
);
5905 /* Read the bulk write data on a temp file */
5906 snprintf(tmpfile
,256,"temp-%d.%ld.rdb",(int)time(NULL
),(long int)random());
5907 dfd
= open(tmpfile
,O_CREAT
|O_WRONLY
,0644);
5910 redisLog(REDIS_WARNING
,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno
));
5914 int nread
, nwritten
;
5916 nread
= read(fd
,buf
,(dumpsize
< 1024)?dumpsize
:1024);
5918 redisLog(REDIS_WARNING
,"I/O error trying to sync with MASTER: %s",
5924 nwritten
= write(dfd
,buf
,nread
);
5925 if (nwritten
== -1) {
5926 redisLog(REDIS_WARNING
,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno
));
5934 if (rename(tmpfile
,server
.dbfilename
) == -1) {
5935 redisLog(REDIS_WARNING
,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno
));
5941 if (rdbLoad(server
.dbfilename
) != REDIS_OK
) {
5942 redisLog(REDIS_WARNING
,"Failed trying to load the MASTER synchronization DB from disk");
5946 server
.master
= createClient(fd
);
5947 server
.master
->flags
|= REDIS_MASTER
;
5948 server
.master
->authenticated
= 1;
5949 server
.replstate
= REDIS_REPL_CONNECTED
;
5953 static void slaveofCommand(redisClient
*c
) {
5954 if (!strcasecmp(c
->argv
[1]->ptr
,"no") &&
5955 !strcasecmp(c
->argv
[2]->ptr
,"one")) {
5956 if (server
.masterhost
) {
5957 sdsfree(server
.masterhost
);
5958 server
.masterhost
= NULL
;
5959 if (server
.master
) freeClient(server
.master
);
5960 server
.replstate
= REDIS_REPL_NONE
;
5961 redisLog(REDIS_NOTICE
,"MASTER MODE enabled (user request)");
5964 sdsfree(server
.masterhost
);
5965 server
.masterhost
= sdsdup(c
->argv
[1]->ptr
);
5966 server
.masterport
= atoi(c
->argv
[2]->ptr
);
5967 if (server
.master
) freeClient(server
.master
);
5968 server
.replstate
= REDIS_REPL_CONNECT
;
5969 redisLog(REDIS_NOTICE
,"SLAVE OF %s:%d enabled (user request)",
5970 server
.masterhost
, server
.masterport
);
5972 addReply(c
,shared
.ok
);
5975 /* ============================ Maxmemory directive ======================== */
5977 /* This function gets called when 'maxmemory' is set on the config file to limit
5978 * the max memory used by the server, and we are out of memory.
5979 * This function will try to, in order:
5981 * - Free objects from the free list
5982 * - Try to remove keys with an EXPIRE set
5984 * It is not possible to free enough memory to reach used-memory < maxmemory
5985 * the server will start refusing commands that will enlarge even more the
5988 static void freeMemoryIfNeeded(void) {
5989 while (server
.maxmemory
&& zmalloc_used_memory() > server
.maxmemory
) {
5990 if (listLength(server
.objfreelist
)) {
5993 listNode
*head
= listFirst(server
.objfreelist
);
5994 o
= listNodeValue(head
);
5995 listDelNode(server
.objfreelist
,head
);
5998 int j
, k
, freed
= 0;
6000 for (j
= 0; j
< server
.dbnum
; j
++) {
6002 robj
*minkey
= NULL
;
6003 struct dictEntry
*de
;
6005 if (dictSize(server
.db
[j
].expires
)) {
6007 /* From a sample of three keys drop the one nearest to
6008 * the natural expire */
6009 for (k
= 0; k
< 3; k
++) {
6012 de
= dictGetRandomKey(server
.db
[j
].expires
);
6013 t
= (time_t) dictGetEntryVal(de
);
6014 if (minttl
== -1 || t
< minttl
) {
6015 minkey
= dictGetEntryKey(de
);
6019 deleteKey(server
.db
+j
,minkey
);
6022 if (!freed
) return; /* nothing to free... */
6027 /* ============================== Append Only file ========================== */
6029 static void feedAppendOnlyFile(struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
) {
6030 sds buf
= sdsempty();
6036 /* The DB this command was targetting is not the same as the last command
6037 * we appendend. To issue a SELECT command is needed. */
6038 if (dictid
!= server
.appendseldb
) {
6041 snprintf(seldb
,sizeof(seldb
),"%d",dictid
);
6042 buf
= sdscatprintf(buf
,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
6043 (unsigned long)strlen(seldb
),seldb
);
6044 server
.appendseldb
= dictid
;
6047 /* "Fix" the argv vector if the command is EXPIRE. We want to translate
6048 * EXPIREs into EXPIREATs calls */
6049 if (cmd
->proc
== expireCommand
) {
6052 tmpargv
[0] = createStringObject("EXPIREAT",8);
6053 tmpargv
[1] = argv
[1];
6054 incrRefCount(argv
[1]);
6055 when
= time(NULL
)+strtol(argv
[2]->ptr
,NULL
,10);
6056 tmpargv
[2] = createObject(REDIS_STRING
,
6057 sdscatprintf(sdsempty(),"%ld",when
));
6061 /* Append the actual command */
6062 buf
= sdscatprintf(buf
,"*%d\r\n",argc
);
6063 for (j
= 0; j
< argc
; j
++) {
6066 o
= getDecodedObject(o
);
6067 buf
= sdscatprintf(buf
,"$%lu\r\n",(unsigned long)sdslen(o
->ptr
));
6068 buf
= sdscatlen(buf
,o
->ptr
,sdslen(o
->ptr
));
6069 buf
= sdscatlen(buf
,"\r\n",2);
6073 /* Free the objects from the modified argv for EXPIREAT */
6074 if (cmd
->proc
== expireCommand
) {
6075 for (j
= 0; j
< 3; j
++)
6076 decrRefCount(argv
[j
]);
6079 /* We want to perform a single write. This should be guaranteed atomic
6080 * at least if the filesystem we are writing is a real physical one.
6081 * While this will save us against the server being killed I don't think
6082 * there is much to do about the whole server stopping for power problems
6084 nwritten
= write(server
.appendfd
,buf
,sdslen(buf
));
6085 if (nwritten
!= (signed)sdslen(buf
)) {
6086 /* Ooops, we are in troubles. The best thing to do for now is
6087 * to simply exit instead to give the illusion that everything is
6088 * working as expected. */
6089 if (nwritten
== -1) {
6090 redisLog(REDIS_WARNING
,"Exiting on error writing to the append-only file: %s",strerror(errno
));
6092 redisLog(REDIS_WARNING
,"Exiting on short write while writing to the append-only file: %s",strerror(errno
));
6096 /* If a background append only file rewriting is in progress we want to
6097 * accumulate the differences between the child DB and the current one
6098 * in a buffer, so that when the child process will do its work we
6099 * can append the differences to the new append only file. */
6100 if (server
.bgrewritechildpid
!= -1)
6101 server
.bgrewritebuf
= sdscatlen(server
.bgrewritebuf
,buf
,sdslen(buf
));
6105 if (server
.appendfsync
== APPENDFSYNC_ALWAYS
||
6106 (server
.appendfsync
== APPENDFSYNC_EVERYSEC
&&
6107 now
-server
.lastfsync
> 1))
6109 fsync(server
.appendfd
); /* Let's try to get this data on the disk */
6110 server
.lastfsync
= now
;
6114 /* In Redis commands are always executed in the context of a client, so in
6115 * order to load the append only file we need to create a fake client. */
6116 static struct redisClient
*createFakeClient(void) {
6117 struct redisClient
*c
= zmalloc(sizeof(*c
));
6121 c
->querybuf
= sdsempty();
6125 /* We set the fake client as a slave waiting for the synchronization
6126 * so that Redis will not try to send replies to this client. */
6127 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_START
;
6128 c
->reply
= listCreate();
6129 listSetFreeMethod(c
->reply
,decrRefCount
);
6130 listSetDupMethod(c
->reply
,dupClientReplyValue
);
6134 static void freeFakeClient(struct redisClient
*c
) {
6135 sdsfree(c
->querybuf
);
6136 listRelease(c
->reply
);
6140 /* Replay the append log file. On error REDIS_OK is returned. On non fatal
6141 * error (the append only file is zero-length) REDIS_ERR is returned. On
6142 * fatal error an error message is logged and the program exists. */
6143 int loadAppendOnlyFile(char *filename
) {
6144 struct redisClient
*fakeClient
;
6145 FILE *fp
= fopen(filename
,"r");
6146 struct redis_stat sb
;
6148 if (redis_fstat(fileno(fp
),&sb
) != -1 && sb
.st_size
== 0)
6152 redisLog(REDIS_WARNING
,"Fatal error: can't open the append log file for reading: %s",strerror(errno
));
6156 fakeClient
= createFakeClient();
6163 struct redisCommand
*cmd
;
6165 if (fgets(buf
,sizeof(buf
),fp
) == NULL
) {
6171 if (buf
[0] != '*') goto fmterr
;
6173 argv
= zmalloc(sizeof(robj
*)*argc
);
6174 for (j
= 0; j
< argc
; j
++) {
6175 if (fgets(buf
,sizeof(buf
),fp
) == NULL
) goto readerr
;
6176 if (buf
[0] != '$') goto fmterr
;
6177 len
= strtol(buf
+1,NULL
,10);
6178 argsds
= sdsnewlen(NULL
,len
);
6179 if (len
&& fread(argsds
,len
,1,fp
) == 0) goto fmterr
;
6180 argv
[j
] = createObject(REDIS_STRING
,argsds
);
6181 if (fread(buf
,2,1,fp
) == 0) goto fmterr
; /* discard CRLF */
6184 /* Command lookup */
6185 cmd
= lookupCommand(argv
[0]->ptr
);
6187 redisLog(REDIS_WARNING
,"Unknown command '%s' reading the append only file", argv
[0]->ptr
);
6190 /* Try object sharing and encoding */
6191 if (server
.shareobjects
) {
6193 for(j
= 1; j
< argc
; j
++)
6194 argv
[j
] = tryObjectSharing(argv
[j
]);
6196 if (cmd
->flags
& REDIS_CMD_BULK
)
6197 tryObjectEncoding(argv
[argc
-1]);
6198 /* Run the command in the context of a fake client */
6199 fakeClient
->argc
= argc
;
6200 fakeClient
->argv
= argv
;
6201 cmd
->proc(fakeClient
);
6202 /* Discard the reply objects list from the fake client */
6203 while(listLength(fakeClient
->reply
))
6204 listDelNode(fakeClient
->reply
,listFirst(fakeClient
->reply
));
6205 /* Clean up, ready for the next command */
6206 for (j
= 0; j
< argc
; j
++) decrRefCount(argv
[j
]);
6210 freeFakeClient(fakeClient
);
6215 redisLog(REDIS_WARNING
,"Unexpected end of file reading the append only file");
6217 redisLog(REDIS_WARNING
,"Unrecoverable error reading the append only file: %s", strerror(errno
));
6221 redisLog(REDIS_WARNING
,"Bad file format reading the append only file");
6225 /* Write an object into a file in the bulk format $<count>\r\n<payload>\r\n */
6226 static int fwriteBulk(FILE *fp
, robj
*obj
) {
6228 obj
= getDecodedObject(obj
);
6229 snprintf(buf
,sizeof(buf
),"$%ld\r\n",(long)sdslen(obj
->ptr
));
6230 if (fwrite(buf
,strlen(buf
),1,fp
) == 0) goto err
;
6231 if (sdslen(obj
->ptr
) && fwrite(obj
->ptr
,sdslen(obj
->ptr
),1,fp
) == 0)
6233 if (fwrite("\r\n",2,1,fp
) == 0) goto err
;
6241 /* Write a double value in bulk format $<count>\r\n<payload>\r\n */
6242 static int fwriteBulkDouble(FILE *fp
, double d
) {
6243 char buf
[128], dbuf
[128];
6245 snprintf(dbuf
,sizeof(dbuf
),"%.17g\r\n",d
);
6246 snprintf(buf
,sizeof(buf
),"$%lu\r\n",(unsigned long)strlen(dbuf
)-2);
6247 if (fwrite(buf
,strlen(buf
),1,fp
) == 0) return 0;
6248 if (fwrite(dbuf
,strlen(dbuf
),1,fp
) == 0) return 0;
6252 /* Write a long value in bulk format $<count>\r\n<payload>\r\n */
6253 static int fwriteBulkLong(FILE *fp
, long l
) {
6254 char buf
[128], lbuf
[128];
6256 snprintf(lbuf
,sizeof(lbuf
),"%ld\r\n",l
);
6257 snprintf(buf
,sizeof(buf
),"$%lu\r\n",(unsigned long)strlen(lbuf
)-2);
6258 if (fwrite(buf
,strlen(buf
),1,fp
) == 0) return 0;
6259 if (fwrite(lbuf
,strlen(lbuf
),1,fp
) == 0) return 0;
6263 /* Write a sequence of commands able to fully rebuild the dataset into
6264 * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */
6265 static int rewriteAppendOnlyFile(char *filename
) {
6266 dictIterator
*di
= NULL
;
6271 time_t now
= time(NULL
);
6273 /* Note that we have to use a different temp name here compared to the
6274 * one used by rewriteAppendOnlyFileBackground() function. */
6275 snprintf(tmpfile
,256,"temp-rewriteaof-%d.aof", (int) getpid());
6276 fp
= fopen(tmpfile
,"w");
6278 redisLog(REDIS_WARNING
, "Failed rewriting the append only file: %s", strerror(errno
));
6281 for (j
= 0; j
< server
.dbnum
; j
++) {
6282 char selectcmd
[] = "*2\r\n$6\r\nSELECT\r\n";
6283 redisDb
*db
= server
.db
+j
;
6285 if (dictSize(d
) == 0) continue;
6286 di
= dictGetIterator(d
);
6292 /* SELECT the new DB */
6293 if (fwrite(selectcmd
,sizeof(selectcmd
)-1,1,fp
) == 0) goto werr
;
6294 if (fwriteBulkLong(fp
,j
) == 0) goto werr
;
6296 /* Iterate this DB writing every entry */
6297 while((de
= dictNext(di
)) != NULL
) {
6298 robj
*key
= dictGetEntryKey(de
);
6299 robj
*o
= dictGetEntryVal(de
);
6300 time_t expiretime
= getExpire(db
,key
);
6302 /* Save the key and associated value */
6303 if (o
->type
== REDIS_STRING
) {
6304 /* Emit a SET command */
6305 char cmd
[]="*3\r\n$3\r\nSET\r\n";
6306 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6308 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6309 if (fwriteBulk(fp
,o
) == 0) goto werr
;
6310 } else if (o
->type
== REDIS_LIST
) {
6311 /* Emit the RPUSHes needed to rebuild the list */
6312 list
*list
= o
->ptr
;
6316 while((ln
= listYield(list
))) {
6317 char cmd
[]="*3\r\n$5\r\nRPUSH\r\n";
6318 robj
*eleobj
= listNodeValue(ln
);
6320 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6321 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6322 if (fwriteBulk(fp
,eleobj
) == 0) goto werr
;
6324 } else if (o
->type
== REDIS_SET
) {
6325 /* Emit the SADDs needed to rebuild the set */
6327 dictIterator
*di
= dictGetIterator(set
);
6330 while((de
= dictNext(di
)) != NULL
) {
6331 char cmd
[]="*3\r\n$4\r\nSADD\r\n";
6332 robj
*eleobj
= dictGetEntryKey(de
);
6334 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6335 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6336 if (fwriteBulk(fp
,eleobj
) == 0) goto werr
;
6338 dictReleaseIterator(di
);
6339 } else if (o
->type
== REDIS_ZSET
) {
6340 /* Emit the ZADDs needed to rebuild the sorted set */
6342 dictIterator
*di
= dictGetIterator(zs
->dict
);
6345 while((de
= dictNext(di
)) != NULL
) {
6346 char cmd
[]="*4\r\n$4\r\nZADD\r\n";
6347 robj
*eleobj
= dictGetEntryKey(de
);
6348 double *score
= dictGetEntryVal(de
);
6350 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6351 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6352 if (fwriteBulkDouble(fp
,*score
) == 0) goto werr
;
6353 if (fwriteBulk(fp
,eleobj
) == 0) goto werr
;
6355 dictReleaseIterator(di
);
6357 redisAssert(0 != 0);
6359 /* Save the expire time */
6360 if (expiretime
!= -1) {
6361 char cmd
[]="*3\r\n$8\r\nEXPIREAT\r\n";
6362 /* If this key is already expired skip it */
6363 if (expiretime
< now
) continue;
6364 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6365 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6366 if (fwriteBulkLong(fp
,expiretime
) == 0) goto werr
;
6369 dictReleaseIterator(di
);
6372 /* Make sure data will not remain on the OS's output buffers */
6377 /* Use RENAME to make sure the DB file is changed atomically only
6378 * if the generate DB file is ok. */
6379 if (rename(tmpfile
,filename
) == -1) {
6380 redisLog(REDIS_WARNING
,"Error moving temp append only file on the final destination: %s", strerror(errno
));
6384 redisLog(REDIS_NOTICE
,"SYNC append only file rewrite performed");
6390 redisLog(REDIS_WARNING
,"Write error writing append only file on disk: %s", strerror(errno
));
6391 if (di
) dictReleaseIterator(di
);
6395 /* This is how rewriting of the append only file in background works:
6397 * 1) The user calls BGREWRITEAOF
6398 * 2) Redis calls this function, that forks():
6399 * 2a) the child rewrite the append only file in a temp file.
6400 * 2b) the parent accumulates differences in server.bgrewritebuf.
6401 * 3) When the child finished '2a' exists.
6402 * 4) The parent will trap the exit code, if it's OK, will append the
6403 * data accumulated into server.bgrewritebuf into the temp file, and
6404 * finally will rename(2) the temp file in the actual file name.
6405 * The the new file is reopened as the new append only file. Profit!
6407 static int rewriteAppendOnlyFileBackground(void) {
6410 if (server
.bgrewritechildpid
!= -1) return REDIS_ERR
;
6411 if ((childpid
= fork()) == 0) {
6416 snprintf(tmpfile
,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
6417 if (rewriteAppendOnlyFile(tmpfile
) == REDIS_OK
) {
6424 if (childpid
== -1) {
6425 redisLog(REDIS_WARNING
,
6426 "Can't rewrite append only file in background: fork: %s",
6430 redisLog(REDIS_NOTICE
,
6431 "Background append only file rewriting started by pid %d",childpid
);
6432 server
.bgrewritechildpid
= childpid
;
6433 /* We set appendseldb to -1 in order to force the next call to the
6434 * feedAppendOnlyFile() to issue a SELECT command, so the differences
6435 * accumulated by the parent into server.bgrewritebuf will start
6436 * with a SELECT statement and it will be safe to merge. */
6437 server
.appendseldb
= -1;
6440 return REDIS_OK
; /* unreached */
6443 static void bgrewriteaofCommand(redisClient
*c
) {
6444 if (server
.bgrewritechildpid
!= -1) {
6445 addReplySds(c
,sdsnew("-ERR background append only file rewriting already in progress\r\n"));
6448 if (rewriteAppendOnlyFileBackground() == REDIS_OK
) {
6449 char *status
= "+Background append only file rewriting started\r\n";
6450 addReplySds(c
,sdsnew(status
));
6452 addReply(c
,shared
.err
);
6456 static void aofRemoveTempFile(pid_t childpid
) {
6459 snprintf(tmpfile
,256,"temp-rewriteaof-bg-%d.aof", (int) childpid
);
6463 /* ================================= Debugging ============================== */
6465 static void debugCommand(redisClient
*c
) {
6466 if (!strcasecmp(c
->argv
[1]->ptr
,"segfault")) {
6468 } else if (!strcasecmp(c
->argv
[1]->ptr
,"reload")) {
6469 if (rdbSave(server
.dbfilename
) != REDIS_OK
) {
6470 addReply(c
,shared
.err
);
6474 if (rdbLoad(server
.dbfilename
) != REDIS_OK
) {
6475 addReply(c
,shared
.err
);
6478 redisLog(REDIS_WARNING
,"DB reloaded by DEBUG RELOAD");
6479 addReply(c
,shared
.ok
);
6480 } else if (!strcasecmp(c
->argv
[1]->ptr
,"loadaof")) {
6482 if (loadAppendOnlyFile(server
.appendfilename
) != REDIS_OK
) {
6483 addReply(c
,shared
.err
);
6486 redisLog(REDIS_WARNING
,"Append Only File loaded by DEBUG LOADAOF");
6487 addReply(c
,shared
.ok
);
6488 } else if (!strcasecmp(c
->argv
[1]->ptr
,"object") && c
->argc
== 3) {
6489 dictEntry
*de
= dictFind(c
->db
->dict
,c
->argv
[2]);
6493 addReply(c
,shared
.nokeyerr
);
6496 key
= dictGetEntryKey(de
);
6497 val
= dictGetEntryVal(de
);
6498 addReplySds(c
,sdscatprintf(sdsempty(),
6499 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
6500 (void*)key
, key
->refcount
, (void*)val
, val
->refcount
,
6503 addReplySds(c
,sdsnew(
6504 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>|RELOAD]\r\n"));
6508 static void _redisAssert(char *estr
) {
6509 redisLog(REDIS_WARNING
,"=== ASSERTION FAILED ===");
6510 redisLog(REDIS_WARNING
,"==> %s\n",estr
);
6511 #ifdef HAVE_BACKTRACE
6512 redisLog(REDIS_WARNING
,"(forcing SIGSEGV in order to print the stack trace)");
6517 /* =================================== Main! ================================ */
6520 int linuxOvercommitMemoryValue(void) {
6521 FILE *fp
= fopen("/proc/sys/vm/overcommit_memory","r");
6525 if (fgets(buf
,64,fp
) == NULL
) {
6534 void linuxOvercommitMemoryWarning(void) {
6535 if (linuxOvercommitMemoryValue() == 0) {
6536 redisLog(REDIS_WARNING
,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
6539 #endif /* __linux__ */
6541 static void daemonize(void) {
6545 if (fork() != 0) exit(0); /* parent exits */
6546 printf("New pid: %d\n", getpid());
6547 setsid(); /* create a new session */
6549 /* Every output goes to /dev/null. If Redis is daemonized but
6550 * the 'logfile' is set to 'stdout' in the configuration file
6551 * it will not log at all. */
6552 if ((fd
= open("/dev/null", O_RDWR
, 0)) != -1) {
6553 dup2(fd
, STDIN_FILENO
);
6554 dup2(fd
, STDOUT_FILENO
);
6555 dup2(fd
, STDERR_FILENO
);
6556 if (fd
> STDERR_FILENO
) close(fd
);
6558 /* Try to write the pid file */
6559 fp
= fopen(server
.pidfile
,"w");
6561 fprintf(fp
,"%d\n",getpid());
6566 int main(int argc
, char **argv
) {
6569 resetServerSaveParams();
6570 loadServerConfig(argv
[1]);
6571 } else if (argc
> 2) {
6572 fprintf(stderr
,"Usage: ./redis-server [/path/to/redis.conf]\n");
6575 redisLog(REDIS_WARNING
,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
6577 if (server
.daemonize
) daemonize();
6579 redisLog(REDIS_NOTICE
,"Server started, Redis version " REDIS_VERSION
);
6581 linuxOvercommitMemoryWarning();
6583 if (server
.appendonly
) {
6584 if (loadAppendOnlyFile(server
.appendfilename
) == REDIS_OK
)
6585 redisLog(REDIS_NOTICE
,"DB loaded from append only file");
6587 if (rdbLoad(server
.dbfilename
) == REDIS_OK
)
6588 redisLog(REDIS_NOTICE
,"DB loaded from disk");
6590 if (aeCreateFileEvent(server
.el
, server
.fd
, AE_READABLE
,
6591 acceptHandler
, NULL
) == AE_ERR
) oom("creating file event");
6592 redisLog(REDIS_NOTICE
,"The server is now ready to accept connections on port %d", server
.port
);
6594 aeDeleteEventLoop(server
.el
);
6598 /* ============================= Backtrace support ========================= */
6600 #ifdef HAVE_BACKTRACE
6601 static char *findFuncName(void *pointer
, unsigned long *offset
);
6603 static void *getMcontextEip(ucontext_t
*uc
) {
6604 #if defined(__FreeBSD__)
6605 return (void*) uc
->uc_mcontext
.mc_eip
;
6606 #elif defined(__dietlibc__)
6607 return (void*) uc
->uc_mcontext
.eip
;
6608 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
6610 return (void*) uc
->uc_mcontext
->__ss
.__rip
;
6612 return (void*) uc
->uc_mcontext
->__ss
.__eip
;
6614 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
6615 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
6616 return (void*) uc
->uc_mcontext
->__ss
.__rip
;
6618 return (void*) uc
->uc_mcontext
->__ss
.__eip
;
6620 #elif defined(__i386__) || defined(__X86_64__) || defined(__x86_64__)
6621 return (void*) uc
->uc_mcontext
.gregs
[REG_EIP
]; /* Linux 32/64 bit */
6622 #elif defined(__ia64__) /* Linux IA64 */
6623 return (void*) uc
->uc_mcontext
.sc_ip
;
6629 static void segvHandler(int sig
, siginfo_t
*info
, void *secret
) {
6631 char **messages
= NULL
;
6632 int i
, trace_size
= 0;
6633 unsigned long offset
=0;
6634 ucontext_t
*uc
= (ucontext_t
*) secret
;
6636 REDIS_NOTUSED(info
);
6638 redisLog(REDIS_WARNING
,
6639 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION
, sig
);
6640 infostring
= genRedisInfoString();
6641 redisLog(REDIS_WARNING
, "%s",infostring
);
6642 /* It's not safe to sdsfree() the returned string under memory
6643 * corruption conditions. Let it leak as we are going to abort */
6645 trace_size
= backtrace(trace
, 100);
6646 /* overwrite sigaction with caller's address */
6647 if (getMcontextEip(uc
) != NULL
) {
6648 trace
[1] = getMcontextEip(uc
);
6650 messages
= backtrace_symbols(trace
, trace_size
);
6652 for (i
=1; i
<trace_size
; ++i
) {
6653 char *fn
= findFuncName(trace
[i
], &offset
), *p
;
6655 p
= strchr(messages
[i
],'+');
6656 if (!fn
|| (p
&& ((unsigned long)strtol(p
+1,NULL
,10)) < offset
)) {
6657 redisLog(REDIS_WARNING
,"%s", messages
[i
]);
6659 redisLog(REDIS_WARNING
,"%d redis-server %p %s + %d", i
, trace
[i
], fn
, (unsigned int)offset
);
6662 // free(messages); Don't call free() with possibly corrupted memory.
6666 static void setupSigSegvAction(void) {
6667 struct sigaction act
;
6669 sigemptyset (&act
.sa_mask
);
6670 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
6671 * is used. Otherwise, sa_handler is used */
6672 act
.sa_flags
= SA_NODEFER
| SA_ONSTACK
| SA_RESETHAND
| SA_SIGINFO
;
6673 act
.sa_sigaction
= segvHandler
;
6674 sigaction (SIGSEGV
, &act
, NULL
);
6675 sigaction (SIGBUS
, &act
, NULL
);
6676 sigaction (SIGFPE
, &act
, NULL
);
6677 sigaction (SIGILL
, &act
, NULL
);
6678 sigaction (SIGBUS
, &act
, NULL
);
6682 #include "staticsymbols.h"
6683 /* This function try to convert a pointer into a function name. It's used in
6684 * oreder to provide a backtrace under segmentation fault that's able to
6685 * display functions declared as static (otherwise the backtrace is useless). */
6686 static char *findFuncName(void *pointer
, unsigned long *offset
){
6688 unsigned long off
, minoff
= 0;
6690 /* Try to match against the Symbol with the smallest offset */
6691 for (i
=0; symsTable
[i
].pointer
; i
++) {
6692 unsigned long lp
= (unsigned long) pointer
;
6694 if (lp
!= (unsigned long)-1 && lp
>= symsTable
[i
].pointer
) {
6695 off
=lp
-symsTable
[i
].pointer
;
6696 if (ret
< 0 || off
< minoff
) {
6702 if (ret
== -1) return NULL
;
6704 return symsTable
[ret
].name
;
6706 #else /* HAVE_BACKTRACE */
6707 static void setupSigSegvAction(void) {
6709 #endif /* HAVE_BACKTRACE */