2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
30 #define REDIS_VERSION "1.3.0"
40 #define __USE_POSIX199309
46 #endif /* HAVE_BACKTRACE */
54 #include <arpa/inet.h>
58 #include <sys/resource.h>
64 #include "solarisfixes.h"
68 #include "ae.h" /* Event driven programming library */
69 #include "sds.h" /* Dynamic safe strings */
70 #include "anet.h" /* Networking the easy way */
71 #include "dict.h" /* Hash tables */
72 #include "adlist.h" /* Linked lists */
73 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
74 #include "lzf.h" /* LZF compression library */
75 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
81 /* Static server configuration */
82 #define REDIS_SERVERPORT 6379 /* TCP port */
83 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
84 #define REDIS_IOBUF_LEN 1024
85 #define REDIS_LOADBUF_LEN 1024
86 #define REDIS_STATIC_ARGS 4
87 #define REDIS_DEFAULT_DBNUM 16
88 #define REDIS_CONFIGLINE_MAX 1024
89 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
90 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
91 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
92 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
93 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
95 /* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */
96 #define REDIS_WRITEV_THRESHOLD 3
97 /* Max number of iovecs used for each writev call */
98 #define REDIS_WRITEV_IOVEC_COUNT 256
100 /* Hash table parameters */
101 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
104 #define REDIS_CMD_BULK 1 /* Bulk write command */
105 #define REDIS_CMD_INLINE 2 /* Inline command */
106 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
107 this flags will return an error when the 'maxmemory' option is set in the
108 config file and the server is using more than maxmemory bytes of memory.
109 In short this commands are denied on low memory conditions. */
110 #define REDIS_CMD_DENYOOM 4
113 #define REDIS_STRING 0
119 /* Objects encoding */
120 #define REDIS_ENCODING_RAW 0 /* Raw representation */
121 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
123 /* Object types only used for dumping to disk */
124 #define REDIS_EXPIRETIME 253
125 #define REDIS_SELECTDB 254
126 #define REDIS_EOF 255
128 /* Defines related to the dump file format. To store 32 bits lengths for short
129 * keys requires a lot of space, so we check the most significant 2 bits of
130 * the first byte to interpreter the length:
132 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
133 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
134 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
135 * 11|000000 this means: specially encoded object will follow. The six bits
136 * number specify the kind of object that follows.
137 * See the REDIS_RDB_ENC_* defines.
139 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
140 * values, will fit inside. */
141 #define REDIS_RDB_6BITLEN 0
142 #define REDIS_RDB_14BITLEN 1
143 #define REDIS_RDB_32BITLEN 2
144 #define REDIS_RDB_ENCVAL 3
145 #define REDIS_RDB_LENERR UINT_MAX
147 /* When a length of a string object stored on disk has the first two bits
148 * set, the remaining two bits specify a special encoding for the object
149 * accordingly to the following defines: */
150 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
151 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
152 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
153 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
156 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
157 #define REDIS_SLAVE 2 /* This client is a slave server */
158 #define REDIS_MASTER 4 /* This client is a master server */
159 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
160 #define REDIS_MULTI 16 /* This client is in a MULTI context */
161 #define REDIS_BLOCKED 32 /* The client is waiting in a blocking operation */
163 /* Slave replication state - slave side */
164 #define REDIS_REPL_NONE 0 /* No active replication */
165 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
166 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
168 /* Slave replication state - from the point of view of master
169 * Note that in SEND_BULK and ONLINE state the slave receives new updates
170 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
171 * to start the next background saving in order to send updates to it. */
172 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
173 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
174 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
175 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
177 /* List related stuff */
181 /* Sort operations */
182 #define REDIS_SORT_GET 0
183 #define REDIS_SORT_ASC 1
184 #define REDIS_SORT_DESC 2
185 #define REDIS_SORTKEY_MAX 1024
188 #define REDIS_DEBUG 0
189 #define REDIS_NOTICE 1
190 #define REDIS_WARNING 2
192 /* Anti-warning macro... */
193 #define REDIS_NOTUSED(V) ((void) V)
195 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
196 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
198 /* Append only defines */
199 #define APPENDFSYNC_NO 0
200 #define APPENDFSYNC_ALWAYS 1
201 #define APPENDFSYNC_EVERYSEC 2
203 /* We can print the stacktrace, so our assert is defined this way: */
204 #define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e),exit(1)))
205 static void _redisAssert(char *estr
);
207 /*================================= Data types ============================== */
209 /* A redis object, that is a type able to hold a string / list / set */
210 typedef struct redisObject
{
213 unsigned char encoding
;
214 unsigned char notused
[2];
218 /* Macro used to initalize a Redis object allocated on the stack.
219 * Note that this macro is taken near the structure definition to make sure
220 * we'll update it when the structure is changed, to avoid bugs like
221 * bug #85 introduced exactly in this way. */
222 #define initStaticStringObject(_var,_ptr) do { \
224 _var.type = REDIS_STRING; \
225 _var.encoding = REDIS_ENCODING_RAW; \
229 typedef struct redisDb
{
230 dict
*dict
; /* The keyspace for this DB */
231 dict
*expires
; /* Timeout of keys with a timeout set */
232 dict
*blockingkeys
; /* Keys with clients waiting for data (BLPOP) */
236 /* Client MULTI/EXEC state */
237 typedef struct multiCmd
{
240 struct redisCommand
*cmd
;
243 typedef struct multiState
{
244 multiCmd
*commands
; /* Array of MULTI commands */
245 int count
; /* Total number of MULTI commands */
248 /* With multiplexing we need to take per-clinet state.
249 * Clients are taken in a liked list. */
250 typedef struct redisClient
{
255 robj
**argv
, **mbargv
;
257 int bulklen
; /* bulk read len. -1 if not in bulk read mode */
258 int multibulk
; /* multi bulk command format active */
261 time_t lastinteraction
; /* time of the last interaction, used for timeout */
262 int flags
; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
264 int slaveseldb
; /* slave selected db, if this client is a slave */
265 int authenticated
; /* when requirepass is non-NULL */
266 int replstate
; /* replication state if this is a slave */
267 int repldbfd
; /* replication DB file descriptor */
268 long repldboff
; /* replication DB file offset */
269 off_t repldbsize
; /* replication DB file size */
270 multiState mstate
; /* MULTI/EXEC state */
271 robj
*blockingkey
; /* The key we waiting to terminate a blocking
272 * operation such as BLPOP. Otherwise NULL. */
273 time_t blockingto
; /* Blocking operation timeout. If UNIX current time
274 * is >= blockingto then the operation timed out. */
282 /* Global server state structure */
287 dict
*sharingpool
; /* Poll used for object sharing */
288 unsigned int sharingpoolsize
;
289 long long dirty
; /* changes to DB from the last save */
291 list
*slaves
, *monitors
;
292 char neterr
[ANET_ERR_LEN
];
294 int cronloops
; /* number of times the cron function run */
295 list
*objfreelist
; /* A list of freed objects to avoid malloc() */
296 time_t lastsave
; /* Unix time of last save succeeede */
297 size_t usedmemory
; /* Used memory in megabytes */
298 /* Fields used only for stats */
299 time_t stat_starttime
; /* server start time */
300 long long stat_numcommands
; /* number of processed commands */
301 long long stat_numconnections
; /* number of connections received */
314 pid_t bgsavechildpid
;
315 pid_t bgrewritechildpid
;
316 sds bgrewritebuf
; /* buffer taken by parent during oppend only rewrite */
317 struct saveparam
*saveparams
;
322 char *appendfilename
;
326 /* Replication related */
331 redisClient
*master
; /* client that is master for this slave */
333 unsigned int maxclients
;
334 unsigned long maxmemory
;
335 /* Sort parameters - qsort_r() is only available under BSD so we
336 * have to take this state global, in order to pass it to sortCompare() */
342 typedef void redisCommandProc(redisClient
*c
);
343 struct redisCommand
{
345 redisCommandProc
*proc
;
350 struct redisFunctionSym
{
352 unsigned long pointer
;
355 typedef struct _redisSortObject
{
363 typedef struct _redisSortOperation
{
366 } redisSortOperation
;
368 /* ZSETs use a specialized version of Skiplists */
370 typedef struct zskiplistNode
{
371 struct zskiplistNode
**forward
;
372 struct zskiplistNode
*backward
;
377 typedef struct zskiplist
{
378 struct zskiplistNode
*header
, *tail
;
379 unsigned long length
;
383 typedef struct zset
{
388 /* Our shared "common" objects */
390 struct sharedObjectsStruct
{
391 robj
*crlf
, *ok
, *err
, *emptybulk
, *czero
, *cone
, *pong
, *space
,
392 *colon
, *nullbulk
, *nullmultibulk
, *queued
,
393 *emptymultibulk
, *wrongtypeerr
, *nokeyerr
, *syntaxerr
, *sameobjecterr
,
394 *outofrangeerr
, *plus
,
395 *select0
, *select1
, *select2
, *select3
, *select4
,
396 *select5
, *select6
, *select7
, *select8
, *select9
;
399 /* Global vars that are actally used as constants. The following double
400 * values are used for double on-disk serialization, and are initialized
401 * at runtime to avoid strange compiler optimizations. */
403 static double R_Zero
, R_PosInf
, R_NegInf
, R_Nan
;
405 /*================================ Prototypes =============================== */
407 static void freeStringObject(robj
*o
);
408 static void freeListObject(robj
*o
);
409 static void freeSetObject(robj
*o
);
410 static void decrRefCount(void *o
);
411 static robj
*createObject(int type
, void *ptr
);
412 static void freeClient(redisClient
*c
);
413 static int rdbLoad(char *filename
);
414 static void addReply(redisClient
*c
, robj
*obj
);
415 static void addReplySds(redisClient
*c
, sds s
);
416 static void incrRefCount(robj
*o
);
417 static int rdbSaveBackground(char *filename
);
418 static robj
*createStringObject(char *ptr
, size_t len
);
419 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
);
420 static void feedAppendOnlyFile(struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
);
421 static int syncWithMaster(void);
422 static robj
*tryObjectSharing(robj
*o
);
423 static int tryObjectEncoding(robj
*o
);
424 static robj
*getDecodedObject(robj
*o
);
425 static int removeExpire(redisDb
*db
, robj
*key
);
426 static int expireIfNeeded(redisDb
*db
, robj
*key
);
427 static int deleteIfVolatile(redisDb
*db
, robj
*key
);
428 static int deleteKey(redisDb
*db
, robj
*key
);
429 static time_t getExpire(redisDb
*db
, robj
*key
);
430 static int setExpire(redisDb
*db
, robj
*key
, time_t when
);
431 static void updateSlavesWaitingBgsave(int bgsaveerr
);
432 static void freeMemoryIfNeeded(void);
433 static int processCommand(redisClient
*c
);
434 static void setupSigSegvAction(void);
435 static void rdbRemoveTempFile(pid_t childpid
);
436 static void aofRemoveTempFile(pid_t childpid
);
437 static size_t stringObjectLen(robj
*o
);
438 static void processInputBuffer(redisClient
*c
);
439 static zskiplist
*zslCreate(void);
440 static void zslFree(zskiplist
*zsl
);
441 static void zslInsert(zskiplist
*zsl
, double score
, robj
*obj
);
442 static void sendReplyToClientWritev(aeEventLoop
*el
, int fd
, void *privdata
, int mask
);
443 static void initClientMultiState(redisClient
*c
);
444 static void freeClientMultiState(redisClient
*c
);
445 static void queueMultiCommand(redisClient
*c
, struct redisCommand
*cmd
);
446 static void unblockClient(redisClient
*c
);
447 static int handleClientsWaitingListPush(redisClient
*c
, robj
*key
, robj
*ele
);
449 static void authCommand(redisClient
*c
);
450 static void pingCommand(redisClient
*c
);
451 static void echoCommand(redisClient
*c
);
452 static void setCommand(redisClient
*c
);
453 static void setnxCommand(redisClient
*c
);
454 static void getCommand(redisClient
*c
);
455 static void delCommand(redisClient
*c
);
456 static void existsCommand(redisClient
*c
);
457 static void incrCommand(redisClient
*c
);
458 static void decrCommand(redisClient
*c
);
459 static void incrbyCommand(redisClient
*c
);
460 static void decrbyCommand(redisClient
*c
);
461 static void selectCommand(redisClient
*c
);
462 static void randomkeyCommand(redisClient
*c
);
463 static void keysCommand(redisClient
*c
);
464 static void dbsizeCommand(redisClient
*c
);
465 static void lastsaveCommand(redisClient
*c
);
466 static void saveCommand(redisClient
*c
);
467 static void bgsaveCommand(redisClient
*c
);
468 static void bgrewriteaofCommand(redisClient
*c
);
469 static void shutdownCommand(redisClient
*c
);
470 static void moveCommand(redisClient
*c
);
471 static void renameCommand(redisClient
*c
);
472 static void renamenxCommand(redisClient
*c
);
473 static void lpushCommand(redisClient
*c
);
474 static void rpushCommand(redisClient
*c
);
475 static void lpopCommand(redisClient
*c
);
476 static void rpopCommand(redisClient
*c
);
477 static void llenCommand(redisClient
*c
);
478 static void lindexCommand(redisClient
*c
);
479 static void lrangeCommand(redisClient
*c
);
480 static void ltrimCommand(redisClient
*c
);
481 static void typeCommand(redisClient
*c
);
482 static void lsetCommand(redisClient
*c
);
483 static void saddCommand(redisClient
*c
);
484 static void sremCommand(redisClient
*c
);
485 static void smoveCommand(redisClient
*c
);
486 static void sismemberCommand(redisClient
*c
);
487 static void scardCommand(redisClient
*c
);
488 static void spopCommand(redisClient
*c
);
489 static void srandmemberCommand(redisClient
*c
);
490 static void sinterCommand(redisClient
*c
);
491 static void sinterstoreCommand(redisClient
*c
);
492 static void sunionCommand(redisClient
*c
);
493 static void sunionstoreCommand(redisClient
*c
);
494 static void sdiffCommand(redisClient
*c
);
495 static void sdiffstoreCommand(redisClient
*c
);
496 static void syncCommand(redisClient
*c
);
497 static void flushdbCommand(redisClient
*c
);
498 static void flushallCommand(redisClient
*c
);
499 static void sortCommand(redisClient
*c
);
500 static void lremCommand(redisClient
*c
);
501 static void rpoplpushcommand(redisClient
*c
);
502 static void infoCommand(redisClient
*c
);
503 static void mgetCommand(redisClient
*c
);
504 static void monitorCommand(redisClient
*c
);
505 static void expireCommand(redisClient
*c
);
506 static void expireatCommand(redisClient
*c
);
507 static void getsetCommand(redisClient
*c
);
508 static void ttlCommand(redisClient
*c
);
509 static void slaveofCommand(redisClient
*c
);
510 static void debugCommand(redisClient
*c
);
511 static void msetCommand(redisClient
*c
);
512 static void msetnxCommand(redisClient
*c
);
513 static void zaddCommand(redisClient
*c
);
514 static void zincrbyCommand(redisClient
*c
);
515 static void zrangeCommand(redisClient
*c
);
516 static void zrangebyscoreCommand(redisClient
*c
);
517 static void zrevrangeCommand(redisClient
*c
);
518 static void zcardCommand(redisClient
*c
);
519 static void zremCommand(redisClient
*c
);
520 static void zscoreCommand(redisClient
*c
);
521 static void zremrangebyscoreCommand(redisClient
*c
);
522 static void multiCommand(redisClient
*c
);
523 static void execCommand(redisClient
*c
);
524 static void blpopCommand(redisClient
*c
);
525 static void brpopCommand(redisClient
*c
);
527 /*================================= Globals ================================= */
530 static struct redisServer server
; /* server global state */
531 static struct redisCommand cmdTable
[] = {
532 {"get",getCommand
,2,REDIS_CMD_INLINE
},
533 {"set",setCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
534 {"setnx",setnxCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
535 {"del",delCommand
,-2,REDIS_CMD_INLINE
},
536 {"exists",existsCommand
,2,REDIS_CMD_INLINE
},
537 {"incr",incrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
538 {"decr",decrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
539 {"mget",mgetCommand
,-2,REDIS_CMD_INLINE
},
540 {"rpush",rpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
541 {"lpush",lpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
542 {"rpop",rpopCommand
,2,REDIS_CMD_INLINE
},
543 {"lpop",lpopCommand
,2,REDIS_CMD_INLINE
},
544 {"brpop",brpopCommand
,3,REDIS_CMD_INLINE
},
545 {"blpop",blpopCommand
,3,REDIS_CMD_INLINE
},
546 {"llen",llenCommand
,2,REDIS_CMD_INLINE
},
547 {"lindex",lindexCommand
,3,REDIS_CMD_INLINE
},
548 {"lset",lsetCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
549 {"lrange",lrangeCommand
,4,REDIS_CMD_INLINE
},
550 {"ltrim",ltrimCommand
,4,REDIS_CMD_INLINE
},
551 {"lrem",lremCommand
,4,REDIS_CMD_BULK
},
552 {"rpoplpush",rpoplpushcommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
553 {"sadd",saddCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
554 {"srem",sremCommand
,3,REDIS_CMD_BULK
},
555 {"smove",smoveCommand
,4,REDIS_CMD_BULK
},
556 {"sismember",sismemberCommand
,3,REDIS_CMD_BULK
},
557 {"scard",scardCommand
,2,REDIS_CMD_INLINE
},
558 {"spop",spopCommand
,2,REDIS_CMD_INLINE
},
559 {"srandmember",srandmemberCommand
,2,REDIS_CMD_INLINE
},
560 {"sinter",sinterCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
561 {"sinterstore",sinterstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
562 {"sunion",sunionCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
563 {"sunionstore",sunionstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
564 {"sdiff",sdiffCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
565 {"sdiffstore",sdiffstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
566 {"smembers",sinterCommand
,2,REDIS_CMD_INLINE
},
567 {"zadd",zaddCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
568 {"zincrby",zincrbyCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
569 {"zrem",zremCommand
,3,REDIS_CMD_BULK
},
570 {"zremrangebyscore",zremrangebyscoreCommand
,4,REDIS_CMD_INLINE
},
571 {"zrange",zrangeCommand
,-4,REDIS_CMD_INLINE
},
572 {"zrangebyscore",zrangebyscoreCommand
,-4,REDIS_CMD_INLINE
},
573 {"zrevrange",zrevrangeCommand
,-4,REDIS_CMD_INLINE
},
574 {"zcard",zcardCommand
,2,REDIS_CMD_INLINE
},
575 {"zscore",zscoreCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
576 {"incrby",incrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
577 {"decrby",decrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
578 {"getset",getsetCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
579 {"mset",msetCommand
,-3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
580 {"msetnx",msetnxCommand
,-3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
581 {"randomkey",randomkeyCommand
,1,REDIS_CMD_INLINE
},
582 {"select",selectCommand
,2,REDIS_CMD_INLINE
},
583 {"move",moveCommand
,3,REDIS_CMD_INLINE
},
584 {"rename",renameCommand
,3,REDIS_CMD_INLINE
},
585 {"renamenx",renamenxCommand
,3,REDIS_CMD_INLINE
},
586 {"expire",expireCommand
,3,REDIS_CMD_INLINE
},
587 {"expireat",expireatCommand
,3,REDIS_CMD_INLINE
},
588 {"keys",keysCommand
,2,REDIS_CMD_INLINE
},
589 {"dbsize",dbsizeCommand
,1,REDIS_CMD_INLINE
},
590 {"auth",authCommand
,2,REDIS_CMD_INLINE
},
591 {"ping",pingCommand
,1,REDIS_CMD_INLINE
},
592 {"echo",echoCommand
,2,REDIS_CMD_BULK
},
593 {"save",saveCommand
,1,REDIS_CMD_INLINE
},
594 {"bgsave",bgsaveCommand
,1,REDIS_CMD_INLINE
},
595 {"bgrewriteaof",bgrewriteaofCommand
,1,REDIS_CMD_INLINE
},
596 {"shutdown",shutdownCommand
,1,REDIS_CMD_INLINE
},
597 {"lastsave",lastsaveCommand
,1,REDIS_CMD_INLINE
},
598 {"type",typeCommand
,2,REDIS_CMD_INLINE
},
599 {"multi",multiCommand
,1,REDIS_CMD_INLINE
},
600 {"exec",execCommand
,1,REDIS_CMD_INLINE
},
601 {"sync",syncCommand
,1,REDIS_CMD_INLINE
},
602 {"flushdb",flushdbCommand
,1,REDIS_CMD_INLINE
},
603 {"flushall",flushallCommand
,1,REDIS_CMD_INLINE
},
604 {"sort",sortCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
605 {"info",infoCommand
,1,REDIS_CMD_INLINE
},
606 {"monitor",monitorCommand
,1,REDIS_CMD_INLINE
},
607 {"ttl",ttlCommand
,2,REDIS_CMD_INLINE
},
608 {"slaveof",slaveofCommand
,3,REDIS_CMD_INLINE
},
609 {"debug",debugCommand
,-2,REDIS_CMD_INLINE
},
613 /*============================ Utility functions ============================ */
615 /* Glob-style pattern matching. */
616 int stringmatchlen(const char *pattern
, int patternLen
,
617 const char *string
, int stringLen
, int nocase
)
622 while (pattern
[1] == '*') {
627 return 1; /* match */
629 if (stringmatchlen(pattern
+1, patternLen
-1,
630 string
, stringLen
, nocase
))
631 return 1; /* match */
635 return 0; /* no match */
639 return 0; /* no match */
649 not = pattern
[0] == '^';
656 if (pattern
[0] == '\\') {
659 if (pattern
[0] == string
[0])
661 } else if (pattern
[0] == ']') {
663 } else if (patternLen
== 0) {
667 } else if (pattern
[1] == '-' && patternLen
>= 3) {
668 int start
= pattern
[0];
669 int end
= pattern
[2];
677 start
= tolower(start
);
683 if (c
>= start
&& c
<= end
)
687 if (pattern
[0] == string
[0])
690 if (tolower((int)pattern
[0]) == tolower((int)string
[0]))
700 return 0; /* no match */
706 if (patternLen
>= 2) {
713 if (pattern
[0] != string
[0])
714 return 0; /* no match */
716 if (tolower((int)pattern
[0]) != tolower((int)string
[0]))
717 return 0; /* no match */
725 if (stringLen
== 0) {
726 while(*pattern
== '*') {
733 if (patternLen
== 0 && stringLen
== 0)
738 static void redisLog(int level
, const char *fmt
, ...) {
742 fp
= (server
.logfile
== NULL
) ? stdout
: fopen(server
.logfile
,"a");
746 if (level
>= server
.verbosity
) {
752 strftime(buf
,64,"%d %b %H:%M:%S",localtime(&now
));
753 fprintf(fp
,"%s %c ",buf
,c
[level
]);
754 vfprintf(fp
, fmt
, ap
);
760 if (server
.logfile
) fclose(fp
);
763 /*====================== Hash table type implementation ==================== */
765 /* This is an hash table type that uses the SDS dynamic strings libary as
766 * keys and radis objects as values (objects can hold SDS strings,
769 static void dictVanillaFree(void *privdata
, void *val
)
771 DICT_NOTUSED(privdata
);
775 static void dictListDestructor(void *privdata
, void *val
)
777 DICT_NOTUSED(privdata
);
778 listRelease((list
*)val
);
781 static int sdsDictKeyCompare(void *privdata
, const void *key1
,
785 DICT_NOTUSED(privdata
);
787 l1
= sdslen((sds
)key1
);
788 l2
= sdslen((sds
)key2
);
789 if (l1
!= l2
) return 0;
790 return memcmp(key1
, key2
, l1
) == 0;
793 static void dictRedisObjectDestructor(void *privdata
, void *val
)
795 DICT_NOTUSED(privdata
);
800 static int dictObjKeyCompare(void *privdata
, const void *key1
,
803 const robj
*o1
= key1
, *o2
= key2
;
804 return sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
807 static unsigned int dictObjHash(const void *key
) {
809 return dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
812 static int dictEncObjKeyCompare(void *privdata
, const void *key1
,
815 robj
*o1
= (robj
*) key1
, *o2
= (robj
*) key2
;
818 o1
= getDecodedObject(o1
);
819 o2
= getDecodedObject(o2
);
820 cmp
= sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
826 static unsigned int dictEncObjHash(const void *key
) {
827 robj
*o
= (robj
*) key
;
829 o
= getDecodedObject(o
);
830 unsigned int hash
= dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
835 static dictType setDictType
= {
836 dictEncObjHash
, /* hash function */
839 dictEncObjKeyCompare
, /* key compare */
840 dictRedisObjectDestructor
, /* key destructor */
841 NULL
/* val destructor */
844 static dictType zsetDictType
= {
845 dictEncObjHash
, /* hash function */
848 dictEncObjKeyCompare
, /* key compare */
849 dictRedisObjectDestructor
, /* key destructor */
850 dictVanillaFree
/* val destructor of malloc(sizeof(double)) */
853 static dictType hashDictType
= {
854 dictObjHash
, /* hash function */
857 dictObjKeyCompare
, /* key compare */
858 dictRedisObjectDestructor
, /* key destructor */
859 dictRedisObjectDestructor
/* val destructor */
862 /* Keylist hash table type has unencoded redis objects as keys and
863 * lists as values. It's used for blocking operations (BLPOP) */
864 static dictType keylistDictType
= {
865 dictObjHash
, /* hash function */
868 dictObjKeyCompare
, /* key compare */
869 dictRedisObjectDestructor
, /* key destructor */
870 dictListDestructor
/* val destructor */
873 /* ========================= Random utility functions ======================= */
875 /* Redis generally does not try to recover from out of memory conditions
876 * when allocating objects or strings, it is not clear if it will be possible
877 * to report this condition to the client since the networking layer itself
878 * is based on heap allocation for send buffers, so we simply abort.
879 * At least the code will be simpler to read... */
880 static void oom(const char *msg
) {
881 redisLog(REDIS_WARNING
, "%s: Out of memory\n",msg
);
886 /* ====================== Redis server networking stuff ===================== */
887 static void closeTimedoutClients(void) {
890 time_t now
= time(NULL
);
892 listRewind(server
.clients
);
893 while ((ln
= listYield(server
.clients
)) != NULL
) {
894 c
= listNodeValue(ln
);
895 if (!(c
->flags
& REDIS_SLAVE
) && /* no timeout for slaves */
896 !(c
->flags
& REDIS_MASTER
) && /* no timeout for masters */
897 (now
- c
->lastinteraction
> server
.maxidletime
)) {
898 redisLog(REDIS_DEBUG
,"Closing idle client");
904 static int htNeedsResize(dict
*dict
) {
905 long long size
, used
;
907 size
= dictSlots(dict
);
908 used
= dictSize(dict
);
909 return (size
&& used
&& size
> DICT_HT_INITIAL_SIZE
&&
910 (used
*100/size
< REDIS_HT_MINFILL
));
913 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
914 * we resize the hash table to save memory */
915 static void tryResizeHashTables(void) {
918 for (j
= 0; j
< server
.dbnum
; j
++) {
919 if (htNeedsResize(server
.db
[j
].dict
)) {
920 redisLog(REDIS_DEBUG
,"The hash table %d is too sparse, resize it...",j
);
921 dictResize(server
.db
[j
].dict
);
922 redisLog(REDIS_DEBUG
,"Hash table %d resized.",j
);
924 if (htNeedsResize(server
.db
[j
].expires
))
925 dictResize(server
.db
[j
].expires
);
929 /* A background saving child (BGSAVE) terminated its work. Handle this. */
930 void backgroundSaveDoneHandler(int statloc
) {
931 int exitcode
= WEXITSTATUS(statloc
);
932 int bysignal
= WIFSIGNALED(statloc
);
934 if (!bysignal
&& exitcode
== 0) {
935 redisLog(REDIS_NOTICE
,
936 "Background saving terminated with success");
938 server
.lastsave
= time(NULL
);
939 } else if (!bysignal
&& exitcode
!= 0) {
940 redisLog(REDIS_WARNING
, "Background saving error");
942 redisLog(REDIS_WARNING
,
943 "Background saving terminated by signal");
944 rdbRemoveTempFile(server
.bgsavechildpid
);
946 server
.bgsavechildpid
= -1;
947 /* Possibly there are slaves waiting for a BGSAVE in order to be served
948 * (the first stage of SYNC is a bulk transfer of dump.rdb) */
949 updateSlavesWaitingBgsave(exitcode
== 0 ? REDIS_OK
: REDIS_ERR
);
952 /* A background append only file rewriting (BGREWRITEAOF) terminated its work.
954 void backgroundRewriteDoneHandler(int statloc
) {
955 int exitcode
= WEXITSTATUS(statloc
);
956 int bysignal
= WIFSIGNALED(statloc
);
958 if (!bysignal
&& exitcode
== 0) {
962 redisLog(REDIS_NOTICE
,
963 "Background append only file rewriting terminated with success");
964 /* Now it's time to flush the differences accumulated by the parent */
965 snprintf(tmpfile
,256,"temp-rewriteaof-bg-%d.aof", (int) server
.bgrewritechildpid
);
966 fd
= open(tmpfile
,O_WRONLY
|O_APPEND
);
968 redisLog(REDIS_WARNING
, "Not able to open the temp append only file produced by the child: %s", strerror(errno
));
971 /* Flush our data... */
972 if (write(fd
,server
.bgrewritebuf
,sdslen(server
.bgrewritebuf
)) !=
973 (signed) sdslen(server
.bgrewritebuf
)) {
974 redisLog(REDIS_WARNING
, "Error or short write trying to flush the parent diff of the append log file in the child temp file: %s", strerror(errno
));
978 redisLog(REDIS_NOTICE
,"Parent diff flushed into the new append log file with success (%lu bytes)",sdslen(server
.bgrewritebuf
));
979 /* Now our work is to rename the temp file into the stable file. And
980 * switch the file descriptor used by the server for append only. */
981 if (rename(tmpfile
,server
.appendfilename
) == -1) {
982 redisLog(REDIS_WARNING
,"Can't rename the temp append only file into the stable one: %s", strerror(errno
));
986 /* Mission completed... almost */
987 redisLog(REDIS_NOTICE
,"Append only file successfully rewritten.");
988 if (server
.appendfd
!= -1) {
989 /* If append only is actually enabled... */
990 close(server
.appendfd
);
991 server
.appendfd
= fd
;
993 server
.appendseldb
= -1; /* Make sure it will issue SELECT */
994 redisLog(REDIS_NOTICE
,"The new append only file was selected for future appends.");
996 /* If append only is disabled we just generate a dump in this
997 * format. Why not? */
1000 } else if (!bysignal
&& exitcode
!= 0) {
1001 redisLog(REDIS_WARNING
, "Background append only file rewriting error");
1003 redisLog(REDIS_WARNING
,
1004 "Background append only file rewriting terminated by signal");
1007 sdsfree(server
.bgrewritebuf
);
1008 server
.bgrewritebuf
= sdsempty();
1009 aofRemoveTempFile(server
.bgrewritechildpid
);
1010 server
.bgrewritechildpid
= -1;
1013 static int serverCron(struct aeEventLoop
*eventLoop
, long long id
, void *clientData
) {
1014 int j
, loops
= server
.cronloops
++;
1015 REDIS_NOTUSED(eventLoop
);
1017 REDIS_NOTUSED(clientData
);
1019 /* Update the global state with the amount of used memory */
1020 server
.usedmemory
= zmalloc_used_memory();
1022 /* Show some info about non-empty databases */
1023 for (j
= 0; j
< server
.dbnum
; j
++) {
1024 long long size
, used
, vkeys
;
1026 size
= dictSlots(server
.db
[j
].dict
);
1027 used
= dictSize(server
.db
[j
].dict
);
1028 vkeys
= dictSize(server
.db
[j
].expires
);
1029 if (!(loops
% 5) && (used
|| vkeys
)) {
1030 redisLog(REDIS_DEBUG
,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j
,used
,vkeys
,size
);
1031 /* dictPrintStats(server.dict); */
1035 /* We don't want to resize the hash tables while a bacground saving
1036 * is in progress: the saving child is created using fork() that is
1037 * implemented with a copy-on-write semantic in most modern systems, so
1038 * if we resize the HT while there is the saving child at work actually
1039 * a lot of memory movements in the parent will cause a lot of pages
1041 if (server
.bgsavechildpid
== -1) tryResizeHashTables();
1043 /* Show information about connected clients */
1045 redisLog(REDIS_DEBUG
,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
1046 listLength(server
.clients
)-listLength(server
.slaves
),
1047 listLength(server
.slaves
),
1049 dictSize(server
.sharingpool
));
1052 /* Close connections of timedout clients */
1053 if (server
.maxidletime
&& !(loops
% 10))
1054 closeTimedoutClients();
1056 /* Check if a background saving or AOF rewrite in progress terminated */
1057 if (server
.bgsavechildpid
!= -1 || server
.bgrewritechildpid
!= -1) {
1061 if ((pid
= wait3(&statloc
,WNOHANG
,NULL
)) != 0) {
1062 if (pid
== server
.bgsavechildpid
) {
1063 backgroundSaveDoneHandler(statloc
);
1065 backgroundRewriteDoneHandler(statloc
);
1069 /* If there is not a background saving in progress check if
1070 * we have to save now */
1071 time_t now
= time(NULL
);
1072 for (j
= 0; j
< server
.saveparamslen
; j
++) {
1073 struct saveparam
*sp
= server
.saveparams
+j
;
1075 if (server
.dirty
>= sp
->changes
&&
1076 now
-server
.lastsave
> sp
->seconds
) {
1077 redisLog(REDIS_NOTICE
,"%d changes in %d seconds. Saving...",
1078 sp
->changes
, sp
->seconds
);
1079 rdbSaveBackground(server
.dbfilename
);
1085 /* Try to expire a few timed out keys. The algorithm used is adaptive and
1086 * will use few CPU cycles if there are few expiring keys, otherwise
1087 * it will get more aggressive to avoid that too much memory is used by
1088 * keys that can be removed from the keyspace. */
1089 for (j
= 0; j
< server
.dbnum
; j
++) {
1091 redisDb
*db
= server
.db
+j
;
1093 /* Continue to expire if at the end of the cycle more than 25%
1094 * of the keys were expired. */
1096 int num
= dictSize(db
->expires
);
1097 time_t now
= time(NULL
);
1100 if (num
> REDIS_EXPIRELOOKUPS_PER_CRON
)
1101 num
= REDIS_EXPIRELOOKUPS_PER_CRON
;
1106 if ((de
= dictGetRandomKey(db
->expires
)) == NULL
) break;
1107 t
= (time_t) dictGetEntryVal(de
);
1109 deleteKey(db
,dictGetEntryKey(de
));
1113 } while (expired
> REDIS_EXPIRELOOKUPS_PER_CRON
/4);
1116 /* Check if we should connect to a MASTER */
1117 if (server
.replstate
== REDIS_REPL_CONNECT
) {
1118 redisLog(REDIS_NOTICE
,"Connecting to MASTER...");
1119 if (syncWithMaster() == REDIS_OK
) {
1120 redisLog(REDIS_NOTICE
,"MASTER <-> SLAVE sync succeeded");
1126 static void createSharedObjects(void) {
1127 shared
.crlf
= createObject(REDIS_STRING
,sdsnew("\r\n"));
1128 shared
.ok
= createObject(REDIS_STRING
,sdsnew("+OK\r\n"));
1129 shared
.err
= createObject(REDIS_STRING
,sdsnew("-ERR\r\n"));
1130 shared
.emptybulk
= createObject(REDIS_STRING
,sdsnew("$0\r\n\r\n"));
1131 shared
.czero
= createObject(REDIS_STRING
,sdsnew(":0\r\n"));
1132 shared
.cone
= createObject(REDIS_STRING
,sdsnew(":1\r\n"));
1133 shared
.nullbulk
= createObject(REDIS_STRING
,sdsnew("$-1\r\n"));
1134 shared
.nullmultibulk
= createObject(REDIS_STRING
,sdsnew("*-1\r\n"));
1135 shared
.emptymultibulk
= createObject(REDIS_STRING
,sdsnew("*0\r\n"));
1136 shared
.pong
= createObject(REDIS_STRING
,sdsnew("+PONG\r\n"));
1137 shared
.queued
= createObject(REDIS_STRING
,sdsnew("+QUEUED\r\n"));
1138 shared
.wrongtypeerr
= createObject(REDIS_STRING
,sdsnew(
1139 "-ERR Operation against a key holding the wrong kind of value\r\n"));
1140 shared
.nokeyerr
= createObject(REDIS_STRING
,sdsnew(
1141 "-ERR no such key\r\n"));
1142 shared
.syntaxerr
= createObject(REDIS_STRING
,sdsnew(
1143 "-ERR syntax error\r\n"));
1144 shared
.sameobjecterr
= createObject(REDIS_STRING
,sdsnew(
1145 "-ERR source and destination objects are the same\r\n"));
1146 shared
.outofrangeerr
= createObject(REDIS_STRING
,sdsnew(
1147 "-ERR index out of range\r\n"));
1148 shared
.space
= createObject(REDIS_STRING
,sdsnew(" "));
1149 shared
.colon
= createObject(REDIS_STRING
,sdsnew(":"));
1150 shared
.plus
= createObject(REDIS_STRING
,sdsnew("+"));
1151 shared
.select0
= createStringObject("select 0\r\n",10);
1152 shared
.select1
= createStringObject("select 1\r\n",10);
1153 shared
.select2
= createStringObject("select 2\r\n",10);
1154 shared
.select3
= createStringObject("select 3\r\n",10);
1155 shared
.select4
= createStringObject("select 4\r\n",10);
1156 shared
.select5
= createStringObject("select 5\r\n",10);
1157 shared
.select6
= createStringObject("select 6\r\n",10);
1158 shared
.select7
= createStringObject("select 7\r\n",10);
1159 shared
.select8
= createStringObject("select 8\r\n",10);
1160 shared
.select9
= createStringObject("select 9\r\n",10);
1163 static void appendServerSaveParams(time_t seconds
, int changes
) {
1164 server
.saveparams
= zrealloc(server
.saveparams
,sizeof(struct saveparam
)*(server
.saveparamslen
+1));
1165 server
.saveparams
[server
.saveparamslen
].seconds
= seconds
;
1166 server
.saveparams
[server
.saveparamslen
].changes
= changes
;
1167 server
.saveparamslen
++;
1170 static void resetServerSaveParams() {
1171 zfree(server
.saveparams
);
1172 server
.saveparams
= NULL
;
1173 server
.saveparamslen
= 0;
1176 static void initServerConfig() {
1177 server
.dbnum
= REDIS_DEFAULT_DBNUM
;
1178 server
.port
= REDIS_SERVERPORT
;
1179 server
.verbosity
= REDIS_DEBUG
;
1180 server
.maxidletime
= REDIS_MAXIDLETIME
;
1181 server
.saveparams
= NULL
;
1182 server
.logfile
= NULL
; /* NULL = log on standard output */
1183 server
.bindaddr
= NULL
;
1184 server
.glueoutputbuf
= 1;
1185 server
.daemonize
= 0;
1186 server
.appendonly
= 0;
1187 server
.appendfsync
= APPENDFSYNC_ALWAYS
;
1188 server
.lastfsync
= time(NULL
);
1189 server
.appendfd
= -1;
1190 server
.appendseldb
= -1; /* Make sure the first time will not match */
1191 server
.pidfile
= "/var/run/redis.pid";
1192 server
.dbfilename
= "dump.rdb";
1193 server
.appendfilename
= "appendonly.aof";
1194 server
.requirepass
= NULL
;
1195 server
.shareobjects
= 0;
1196 server
.rdbcompression
= 1;
1197 server
.sharingpoolsize
= 1024;
1198 server
.maxclients
= 0;
1199 server
.maxmemory
= 0;
1200 resetServerSaveParams();
1202 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1203 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1204 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1205 /* Replication related */
1207 server
.masterauth
= NULL
;
1208 server
.masterhost
= NULL
;
1209 server
.masterport
= 6379;
1210 server
.master
= NULL
;
1211 server
.replstate
= REDIS_REPL_NONE
;
1213 /* Double constants initialization */
1215 R_PosInf
= 1.0/R_Zero
;
1216 R_NegInf
= -1.0/R_Zero
;
1217 R_Nan
= R_Zero
/R_Zero
;
1220 static void initServer() {
1223 signal(SIGHUP
, SIG_IGN
);
1224 signal(SIGPIPE
, SIG_IGN
);
1225 setupSigSegvAction();
1227 server
.clients
= listCreate();
1228 server
.slaves
= listCreate();
1229 server
.monitors
= listCreate();
1230 server
.objfreelist
= listCreate();
1231 createSharedObjects();
1232 server
.el
= aeCreateEventLoop();
1233 server
.db
= zmalloc(sizeof(redisDb
)*server
.dbnum
);
1234 server
.sharingpool
= dictCreate(&setDictType
,NULL
);
1235 server
.fd
= anetTcpServer(server
.neterr
, server
.port
, server
.bindaddr
);
1236 if (server
.fd
== -1) {
1237 redisLog(REDIS_WARNING
, "Opening TCP port: %s", server
.neterr
);
1240 for (j
= 0; j
< server
.dbnum
; j
++) {
1241 server
.db
[j
].dict
= dictCreate(&hashDictType
,NULL
);
1242 server
.db
[j
].expires
= dictCreate(&setDictType
,NULL
);
1243 server
.db
[j
].blockingkeys
= dictCreate(&keylistDictType
,NULL
);
1244 server
.db
[j
].id
= j
;
1246 server
.cronloops
= 0;
1247 server
.bgsavechildpid
= -1;
1248 server
.bgrewritechildpid
= -1;
1249 server
.bgrewritebuf
= sdsempty();
1250 server
.lastsave
= time(NULL
);
1252 server
.usedmemory
= 0;
1253 server
.stat_numcommands
= 0;
1254 server
.stat_numconnections
= 0;
1255 server
.stat_starttime
= time(NULL
);
1256 aeCreateTimeEvent(server
.el
, 1, serverCron
, NULL
, NULL
);
1258 if (server
.appendonly
) {
1259 server
.appendfd
= open(server
.appendfilename
,O_WRONLY
|O_APPEND
|O_CREAT
,0644);
1260 if (server
.appendfd
== -1) {
1261 redisLog(REDIS_WARNING
, "Can't open the append-only file: %s",
1268 /* Empty the whole database */
1269 static long long emptyDb() {
1271 long long removed
= 0;
1273 for (j
= 0; j
< server
.dbnum
; j
++) {
1274 removed
+= dictSize(server
.db
[j
].dict
);
1275 dictEmpty(server
.db
[j
].dict
);
1276 dictEmpty(server
.db
[j
].expires
);
1281 static int yesnotoi(char *s
) {
1282 if (!strcasecmp(s
,"yes")) return 1;
1283 else if (!strcasecmp(s
,"no")) return 0;
1287 /* I agree, this is a very rudimental way to load a configuration...
1288 will improve later if the config gets more complex */
1289 static void loadServerConfig(char *filename
) {
1291 char buf
[REDIS_CONFIGLINE_MAX
+1], *err
= NULL
;
1295 if (filename
[0] == '-' && filename
[1] == '\0')
1298 if ((fp
= fopen(filename
,"r")) == NULL
) {
1299 redisLog(REDIS_WARNING
,"Fatal error, can't open config file");
1304 while(fgets(buf
,REDIS_CONFIGLINE_MAX
+1,fp
) != NULL
) {
1310 line
= sdstrim(line
," \t\r\n");
1312 /* Skip comments and blank lines*/
1313 if (line
[0] == '#' || line
[0] == '\0') {
1318 /* Split into arguments */
1319 argv
= sdssplitlen(line
,sdslen(line
)," ",1,&argc
);
1320 sdstolower(argv
[0]);
1322 /* Execute config directives */
1323 if (!strcasecmp(argv
[0],"timeout") && argc
== 2) {
1324 server
.maxidletime
= atoi(argv
[1]);
1325 if (server
.maxidletime
< 0) {
1326 err
= "Invalid timeout value"; goto loaderr
;
1328 } else if (!strcasecmp(argv
[0],"port") && argc
== 2) {
1329 server
.port
= atoi(argv
[1]);
1330 if (server
.port
< 1 || server
.port
> 65535) {
1331 err
= "Invalid port"; goto loaderr
;
1333 } else if (!strcasecmp(argv
[0],"bind") && argc
== 2) {
1334 server
.bindaddr
= zstrdup(argv
[1]);
1335 } else if (!strcasecmp(argv
[0],"save") && argc
== 3) {
1336 int seconds
= atoi(argv
[1]);
1337 int changes
= atoi(argv
[2]);
1338 if (seconds
< 1 || changes
< 0) {
1339 err
= "Invalid save parameters"; goto loaderr
;
1341 appendServerSaveParams(seconds
,changes
);
1342 } else if (!strcasecmp(argv
[0],"dir") && argc
== 2) {
1343 if (chdir(argv
[1]) == -1) {
1344 redisLog(REDIS_WARNING
,"Can't chdir to '%s': %s",
1345 argv
[1], strerror(errno
));
1348 } else if (!strcasecmp(argv
[0],"loglevel") && argc
== 2) {
1349 if (!strcasecmp(argv
[1],"debug")) server
.verbosity
= REDIS_DEBUG
;
1350 else if (!strcasecmp(argv
[1],"notice")) server
.verbosity
= REDIS_NOTICE
;
1351 else if (!strcasecmp(argv
[1],"warning")) server
.verbosity
= REDIS_WARNING
;
1353 err
= "Invalid log level. Must be one of debug, notice, warning";
1356 } else if (!strcasecmp(argv
[0],"logfile") && argc
== 2) {
1359 server
.logfile
= zstrdup(argv
[1]);
1360 if (!strcasecmp(server
.logfile
,"stdout")) {
1361 zfree(server
.logfile
);
1362 server
.logfile
= NULL
;
1364 if (server
.logfile
) {
1365 /* Test if we are able to open the file. The server will not
1366 * be able to abort just for this problem later... */
1367 logfp
= fopen(server
.logfile
,"a");
1368 if (logfp
== NULL
) {
1369 err
= sdscatprintf(sdsempty(),
1370 "Can't open the log file: %s", strerror(errno
));
1375 } else if (!strcasecmp(argv
[0],"databases") && argc
== 2) {
1376 server
.dbnum
= atoi(argv
[1]);
1377 if (server
.dbnum
< 1) {
1378 err
= "Invalid number of databases"; goto loaderr
;
1380 } else if (!strcasecmp(argv
[0],"maxclients") && argc
== 2) {
1381 server
.maxclients
= atoi(argv
[1]);
1382 } else if (!strcasecmp(argv
[0],"maxmemory") && argc
== 2) {
1383 server
.maxmemory
= strtoll(argv
[1], NULL
, 10);
1384 } else if (!strcasecmp(argv
[0],"slaveof") && argc
== 3) {
1385 server
.masterhost
= sdsnew(argv
[1]);
1386 server
.masterport
= atoi(argv
[2]);
1387 server
.replstate
= REDIS_REPL_CONNECT
;
1388 } else if (!strcasecmp(argv
[0],"masterauth") && argc
== 2) {
1389 server
.masterauth
= zstrdup(argv
[1]);
1390 } else if (!strcasecmp(argv
[0],"glueoutputbuf") && argc
== 2) {
1391 if ((server
.glueoutputbuf
= yesnotoi(argv
[1])) == -1) {
1392 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1394 } else if (!strcasecmp(argv
[0],"shareobjects") && argc
== 2) {
1395 if ((server
.shareobjects
= yesnotoi(argv
[1])) == -1) {
1396 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1398 } else if (!strcasecmp(argv
[0],"rdbcompression") && argc
== 2) {
1399 if ((server
.rdbcompression
= yesnotoi(argv
[1])) == -1) {
1400 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1402 } else if (!strcasecmp(argv
[0],"shareobjectspoolsize") && argc
== 2) {
1403 server
.sharingpoolsize
= atoi(argv
[1]);
1404 if (server
.sharingpoolsize
< 1) {
1405 err
= "invalid object sharing pool size"; goto loaderr
;
1407 } else if (!strcasecmp(argv
[0],"daemonize") && argc
== 2) {
1408 if ((server
.daemonize
= yesnotoi(argv
[1])) == -1) {
1409 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1411 } else if (!strcasecmp(argv
[0],"appendonly") && argc
== 2) {
1412 if ((server
.appendonly
= yesnotoi(argv
[1])) == -1) {
1413 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1415 } else if (!strcasecmp(argv
[0],"appendfsync") && argc
== 2) {
1416 if (!strcasecmp(argv
[1],"no")) {
1417 server
.appendfsync
= APPENDFSYNC_NO
;
1418 } else if (!strcasecmp(argv
[1],"always")) {
1419 server
.appendfsync
= APPENDFSYNC_ALWAYS
;
1420 } else if (!strcasecmp(argv
[1],"everysec")) {
1421 server
.appendfsync
= APPENDFSYNC_EVERYSEC
;
1423 err
= "argument must be 'no', 'always' or 'everysec'";
1426 } else if (!strcasecmp(argv
[0],"requirepass") && argc
== 2) {
1427 server
.requirepass
= zstrdup(argv
[1]);
1428 } else if (!strcasecmp(argv
[0],"pidfile") && argc
== 2) {
1429 server
.pidfile
= zstrdup(argv
[1]);
1430 } else if (!strcasecmp(argv
[0],"dbfilename") && argc
== 2) {
1431 server
.dbfilename
= zstrdup(argv
[1]);
1433 err
= "Bad directive or wrong number of arguments"; goto loaderr
;
1435 for (j
= 0; j
< argc
; j
++)
1440 if (fp
!= stdin
) fclose(fp
);
1444 fprintf(stderr
, "\n*** FATAL CONFIG FILE ERROR ***\n");
1445 fprintf(stderr
, "Reading the configuration file, at line %d\n", linenum
);
1446 fprintf(stderr
, ">>> '%s'\n", line
);
1447 fprintf(stderr
, "%s\n", err
);
1451 static void freeClientArgv(redisClient
*c
) {
1454 for (j
= 0; j
< c
->argc
; j
++)
1455 decrRefCount(c
->argv
[j
]);
1456 for (j
= 0; j
< c
->mbargc
; j
++)
1457 decrRefCount(c
->mbargv
[j
]);
1462 static void freeClient(redisClient
*c
) {
1465 /* Note that if the client we are freeing is blocked into a blocking
1466 * call, we have to set querybuf to NULL *before* to call unblockClient()
1467 * to avoid processInputBuffer() will get called. Also it is important
1468 * to remove the file events after this, because this call adds
1469 * the READABLE event. */
1470 sdsfree(c
->querybuf
);
1472 if (c
->flags
& REDIS_BLOCKED
)
1475 aeDeleteFileEvent(server
.el
,c
->fd
,AE_READABLE
);
1476 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1477 listRelease(c
->reply
);
1480 ln
= listSearchKey(server
.clients
,c
);
1481 redisAssert(ln
!= NULL
);
1482 listDelNode(server
.clients
,ln
);
1483 if (c
->flags
& REDIS_SLAVE
) {
1484 if (c
->replstate
== REDIS_REPL_SEND_BULK
&& c
->repldbfd
!= -1)
1486 list
*l
= (c
->flags
& REDIS_MONITOR
) ? server
.monitors
: server
.slaves
;
1487 ln
= listSearchKey(l
,c
);
1488 redisAssert(ln
!= NULL
);
1491 if (c
->flags
& REDIS_MASTER
) {
1492 server
.master
= NULL
;
1493 server
.replstate
= REDIS_REPL_CONNECT
;
1497 freeClientMultiState(c
);
1501 #define GLUEREPLY_UP_TO (1024)
1502 static void glueReplyBuffersIfNeeded(redisClient
*c
) {
1504 char buf
[GLUEREPLY_UP_TO
];
1508 listRewind(c
->reply
);
1509 while((ln
= listYield(c
->reply
))) {
1513 objlen
= sdslen(o
->ptr
);
1514 if (copylen
+ objlen
<= GLUEREPLY_UP_TO
) {
1515 memcpy(buf
+copylen
,o
->ptr
,objlen
);
1517 listDelNode(c
->reply
,ln
);
1519 if (copylen
== 0) return;
1523 /* Now the output buffer is empty, add the new single element */
1524 o
= createObject(REDIS_STRING
,sdsnewlen(buf
,copylen
));
1525 listAddNodeHead(c
->reply
,o
);
1528 static void sendReplyToClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1529 redisClient
*c
= privdata
;
1530 int nwritten
= 0, totwritten
= 0, objlen
;
1533 REDIS_NOTUSED(mask
);
1535 /* Use writev() if we have enough buffers to send */
1536 if (!server
.glueoutputbuf
&&
1537 listLength(c
->reply
) > REDIS_WRITEV_THRESHOLD
&&
1538 !(c
->flags
& REDIS_MASTER
))
1540 sendReplyToClientWritev(el
, fd
, privdata
, mask
);
1544 while(listLength(c
->reply
)) {
1545 if (server
.glueoutputbuf
&& listLength(c
->reply
) > 1)
1546 glueReplyBuffersIfNeeded(c
);
1548 o
= listNodeValue(listFirst(c
->reply
));
1549 objlen
= sdslen(o
->ptr
);
1552 listDelNode(c
->reply
,listFirst(c
->reply
));
1556 if (c
->flags
& REDIS_MASTER
) {
1557 /* Don't reply to a master */
1558 nwritten
= objlen
- c
->sentlen
;
1560 nwritten
= write(fd
, ((char*)o
->ptr
)+c
->sentlen
, objlen
- c
->sentlen
);
1561 if (nwritten
<= 0) break;
1563 c
->sentlen
+= nwritten
;
1564 totwritten
+= nwritten
;
1565 /* If we fully sent the object on head go to the next one */
1566 if (c
->sentlen
== objlen
) {
1567 listDelNode(c
->reply
,listFirst(c
->reply
));
1570 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1571 * bytes, in a single threaded server it's a good idea to serve
1572 * other clients as well, even if a very large request comes from
1573 * super fast link that is always able to accept data (in real world
1574 * scenario think about 'KEYS *' against the loopback interfae) */
1575 if (totwritten
> REDIS_MAX_WRITE_PER_EVENT
) break;
1577 if (nwritten
== -1) {
1578 if (errno
== EAGAIN
) {
1581 redisLog(REDIS_DEBUG
,
1582 "Error writing to client: %s", strerror(errno
));
1587 if (totwritten
> 0) c
->lastinteraction
= time(NULL
);
1588 if (listLength(c
->reply
) == 0) {
1590 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1594 static void sendReplyToClientWritev(aeEventLoop
*el
, int fd
, void *privdata
, int mask
)
1596 redisClient
*c
= privdata
;
1597 int nwritten
= 0, totwritten
= 0, objlen
, willwrite
;
1599 struct iovec iov
[REDIS_WRITEV_IOVEC_COUNT
];
1600 int offset
, ion
= 0;
1602 REDIS_NOTUSED(mask
);
1605 while (listLength(c
->reply
)) {
1606 offset
= c
->sentlen
;
1610 /* fill-in the iov[] array */
1611 for(node
= listFirst(c
->reply
); node
; node
= listNextNode(node
)) {
1612 o
= listNodeValue(node
);
1613 objlen
= sdslen(o
->ptr
);
1615 if (totwritten
+ objlen
- offset
> REDIS_MAX_WRITE_PER_EVENT
)
1618 if(ion
== REDIS_WRITEV_IOVEC_COUNT
)
1619 break; /* no more iovecs */
1621 iov
[ion
].iov_base
= ((char*)o
->ptr
) + offset
;
1622 iov
[ion
].iov_len
= objlen
- offset
;
1623 willwrite
+= objlen
- offset
;
1624 offset
= 0; /* just for the first item */
1631 /* write all collected blocks at once */
1632 if((nwritten
= writev(fd
, iov
, ion
)) < 0) {
1633 if (errno
!= EAGAIN
) {
1634 redisLog(REDIS_DEBUG
,
1635 "Error writing to client: %s", strerror(errno
));
1642 totwritten
+= nwritten
;
1643 offset
= c
->sentlen
;
1645 /* remove written robjs from c->reply */
1646 while (nwritten
&& listLength(c
->reply
)) {
1647 o
= listNodeValue(listFirst(c
->reply
));
1648 objlen
= sdslen(o
->ptr
);
1650 if(nwritten
>= objlen
- offset
) {
1651 listDelNode(c
->reply
, listFirst(c
->reply
));
1652 nwritten
-= objlen
- offset
;
1656 c
->sentlen
+= nwritten
;
1664 c
->lastinteraction
= time(NULL
);
1666 if (listLength(c
->reply
) == 0) {
1668 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1672 static struct redisCommand
*lookupCommand(char *name
) {
1674 while(cmdTable
[j
].name
!= NULL
) {
1675 if (!strcasecmp(name
,cmdTable
[j
].name
)) return &cmdTable
[j
];
1681 /* resetClient prepare the client to process the next command */
1682 static void resetClient(redisClient
*c
) {
1688 /* Call() is the core of Redis execution of a command */
1689 static void call(redisClient
*c
, struct redisCommand
*cmd
) {
1692 dirty
= server
.dirty
;
1694 if (server
.appendonly
&& server
.dirty
-dirty
)
1695 feedAppendOnlyFile(cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1696 if (server
.dirty
-dirty
&& listLength(server
.slaves
))
1697 replicationFeedSlaves(server
.slaves
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1698 if (listLength(server
.monitors
))
1699 replicationFeedSlaves(server
.monitors
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1700 server
.stat_numcommands
++;
1703 /* If this function gets called we already read a whole
1704 * command, argments are in the client argv/argc fields.
1705 * processCommand() execute the command or prepare the
1706 * server for a bulk read from the client.
1708 * If 1 is returned the client is still alive and valid and
1709 * and other operations can be performed by the caller. Otherwise
1710 * if 0 is returned the client was destroied (i.e. after QUIT). */
1711 static int processCommand(redisClient
*c
) {
1712 struct redisCommand
*cmd
;
1714 /* Free some memory if needed (maxmemory setting) */
1715 if (server
.maxmemory
) freeMemoryIfNeeded();
1717 /* Handle the multi bulk command type. This is an alternative protocol
1718 * supported by Redis in order to receive commands that are composed of
1719 * multiple binary-safe "bulk" arguments. The latency of processing is
1720 * a bit higher but this allows things like multi-sets, so if this
1721 * protocol is used only for MSET and similar commands this is a big win. */
1722 if (c
->multibulk
== 0 && c
->argc
== 1 && ((char*)(c
->argv
[0]->ptr
))[0] == '*') {
1723 c
->multibulk
= atoi(((char*)c
->argv
[0]->ptr
)+1);
1724 if (c
->multibulk
<= 0) {
1728 decrRefCount(c
->argv
[c
->argc
-1]);
1732 } else if (c
->multibulk
) {
1733 if (c
->bulklen
== -1) {
1734 if (((char*)c
->argv
[0]->ptr
)[0] != '$') {
1735 addReplySds(c
,sdsnew("-ERR multi bulk protocol error\r\n"));
1739 int bulklen
= atoi(((char*)c
->argv
[0]->ptr
)+1);
1740 decrRefCount(c
->argv
[0]);
1741 if (bulklen
< 0 || bulklen
> 1024*1024*1024) {
1743 addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n"));
1748 c
->bulklen
= bulklen
+2; /* add two bytes for CR+LF */
1752 c
->mbargv
= zrealloc(c
->mbargv
,(sizeof(robj
*))*(c
->mbargc
+1));
1753 c
->mbargv
[c
->mbargc
] = c
->argv
[0];
1757 if (c
->multibulk
== 0) {
1761 /* Here we need to swap the multi-bulk argc/argv with the
1762 * normal argc/argv of the client structure. */
1764 c
->argv
= c
->mbargv
;
1765 c
->mbargv
= auxargv
;
1768 c
->argc
= c
->mbargc
;
1769 c
->mbargc
= auxargc
;
1771 /* We need to set bulklen to something different than -1
1772 * in order for the code below to process the command without
1773 * to try to read the last argument of a bulk command as
1774 * a special argument. */
1776 /* continue below and process the command */
1783 /* -- end of multi bulk commands processing -- */
1785 /* The QUIT command is handled as a special case. Normal command
1786 * procs are unable to close the client connection safely */
1787 if (!strcasecmp(c
->argv
[0]->ptr
,"quit")) {
1791 cmd
= lookupCommand(c
->argv
[0]->ptr
);
1794 sdscatprintf(sdsempty(), "-ERR unknown command '%s'\r\n",
1795 (char*)c
->argv
[0]->ptr
));
1798 } else if ((cmd
->arity
> 0 && cmd
->arity
!= c
->argc
) ||
1799 (c
->argc
< -cmd
->arity
)) {
1801 sdscatprintf(sdsempty(),
1802 "-ERR wrong number of arguments for '%s' command\r\n",
1806 } else if (server
.maxmemory
&& cmd
->flags
& REDIS_CMD_DENYOOM
&& zmalloc_used_memory() > server
.maxmemory
) {
1807 addReplySds(c
,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1810 } else if (cmd
->flags
& REDIS_CMD_BULK
&& c
->bulklen
== -1) {
1811 int bulklen
= atoi(c
->argv
[c
->argc
-1]->ptr
);
1813 decrRefCount(c
->argv
[c
->argc
-1]);
1814 if (bulklen
< 0 || bulklen
> 1024*1024*1024) {
1816 addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n"));
1821 c
->bulklen
= bulklen
+2; /* add two bytes for CR+LF */
1822 /* It is possible that the bulk read is already in the
1823 * buffer. Check this condition and handle it accordingly.
1824 * This is just a fast path, alternative to call processInputBuffer().
1825 * It's a good idea since the code is small and this condition
1826 * happens most of the times. */
1827 if ((signed)sdslen(c
->querybuf
) >= c
->bulklen
) {
1828 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
1830 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
1835 /* Let's try to share objects on the command arguments vector */
1836 if (server
.shareobjects
) {
1838 for(j
= 1; j
< c
->argc
; j
++)
1839 c
->argv
[j
] = tryObjectSharing(c
->argv
[j
]);
1841 /* Let's try to encode the bulk object to save space. */
1842 if (cmd
->flags
& REDIS_CMD_BULK
)
1843 tryObjectEncoding(c
->argv
[c
->argc
-1]);
1845 /* Check if the user is authenticated */
1846 if (server
.requirepass
&& !c
->authenticated
&& cmd
->proc
!= authCommand
) {
1847 addReplySds(c
,sdsnew("-ERR operation not permitted\r\n"));
1852 /* Exec the command */
1853 if (c
->flags
& REDIS_MULTI
&& cmd
->proc
!= execCommand
) {
1854 queueMultiCommand(c
,cmd
);
1855 addReply(c
,shared
.queued
);
1860 /* Prepare the client for the next command */
1861 if (c
->flags
& REDIS_CLOSE
) {
1869 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
) {
1873 /* (args*2)+1 is enough room for args, spaces, newlines */
1874 robj
*static_outv
[REDIS_STATIC_ARGS
*2+1];
1876 if (argc
<= REDIS_STATIC_ARGS
) {
1879 outv
= zmalloc(sizeof(robj
*)*(argc
*2+1));
1882 for (j
= 0; j
< argc
; j
++) {
1883 if (j
!= 0) outv
[outc
++] = shared
.space
;
1884 if ((cmd
->flags
& REDIS_CMD_BULK
) && j
== argc
-1) {
1887 lenobj
= createObject(REDIS_STRING
,
1888 sdscatprintf(sdsempty(),"%lu\r\n",
1889 (unsigned long) stringObjectLen(argv
[j
])));
1890 lenobj
->refcount
= 0;
1891 outv
[outc
++] = lenobj
;
1893 outv
[outc
++] = argv
[j
];
1895 outv
[outc
++] = shared
.crlf
;
1897 /* Increment all the refcounts at start and decrement at end in order to
1898 * be sure to free objects if there is no slave in a replication state
1899 * able to be feed with commands */
1900 for (j
= 0; j
< outc
; j
++) incrRefCount(outv
[j
]);
1902 while((ln
= listYield(slaves
))) {
1903 redisClient
*slave
= ln
->value
;
1905 /* Don't feed slaves that are still waiting for BGSAVE to start */
1906 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) continue;
1908 /* Feed all the other slaves, MONITORs and so on */
1909 if (slave
->slaveseldb
!= dictid
) {
1913 case 0: selectcmd
= shared
.select0
; break;
1914 case 1: selectcmd
= shared
.select1
; break;
1915 case 2: selectcmd
= shared
.select2
; break;
1916 case 3: selectcmd
= shared
.select3
; break;
1917 case 4: selectcmd
= shared
.select4
; break;
1918 case 5: selectcmd
= shared
.select5
; break;
1919 case 6: selectcmd
= shared
.select6
; break;
1920 case 7: selectcmd
= shared
.select7
; break;
1921 case 8: selectcmd
= shared
.select8
; break;
1922 case 9: selectcmd
= shared
.select9
; break;
1924 selectcmd
= createObject(REDIS_STRING
,
1925 sdscatprintf(sdsempty(),"select %d\r\n",dictid
));
1926 selectcmd
->refcount
= 0;
1929 addReply(slave
,selectcmd
);
1930 slave
->slaveseldb
= dictid
;
1932 for (j
= 0; j
< outc
; j
++) addReply(slave
,outv
[j
]);
1934 for (j
= 0; j
< outc
; j
++) decrRefCount(outv
[j
]);
1935 if (outv
!= static_outv
) zfree(outv
);
1938 static void processInputBuffer(redisClient
*c
) {
1940 /* Before to process the input buffer, make sure the client is not
1941 * waitig for a blocking operation such as BLPOP. Note that the first
1942 * iteration the client is never blocked, otherwise the processInputBuffer
1943 * would not be called at all, but after the execution of the first commands
1944 * in the input buffer the client may be blocked, and the "goto again"
1945 * will try to reiterate. The following line will make it return asap. */
1946 if (c
->flags
& REDIS_BLOCKED
) return;
1947 if (c
->bulklen
== -1) {
1948 /* Read the first line of the query */
1949 char *p
= strchr(c
->querybuf
,'\n');
1956 query
= c
->querybuf
;
1957 c
->querybuf
= sdsempty();
1958 querylen
= 1+(p
-(query
));
1959 if (sdslen(query
) > querylen
) {
1960 /* leave data after the first line of the query in the buffer */
1961 c
->querybuf
= sdscatlen(c
->querybuf
,query
+querylen
,sdslen(query
)-querylen
);
1963 *p
= '\0'; /* remove "\n" */
1964 if (*(p
-1) == '\r') *(p
-1) = '\0'; /* and "\r" if any */
1965 sdsupdatelen(query
);
1967 /* Now we can split the query in arguments */
1968 argv
= sdssplitlen(query
,sdslen(query
)," ",1,&argc
);
1971 if (c
->argv
) zfree(c
->argv
);
1972 c
->argv
= zmalloc(sizeof(robj
*)*argc
);
1974 for (j
= 0; j
< argc
; j
++) {
1975 if (sdslen(argv
[j
])) {
1976 c
->argv
[c
->argc
] = createObject(REDIS_STRING
,argv
[j
]);
1984 /* Execute the command. If the client is still valid
1985 * after processCommand() return and there is something
1986 * on the query buffer try to process the next command. */
1987 if (processCommand(c
) && sdslen(c
->querybuf
)) goto again
;
1989 /* Nothing to process, argc == 0. Just process the query
1990 * buffer if it's not empty or return to the caller */
1991 if (sdslen(c
->querybuf
)) goto again
;
1994 } else if (sdslen(c
->querybuf
) >= REDIS_REQUEST_MAX_SIZE
) {
1995 redisLog(REDIS_DEBUG
, "Client protocol error");
2000 /* Bulk read handling. Note that if we are at this point
2001 the client already sent a command terminated with a newline,
2002 we are reading the bulk data that is actually the last
2003 argument of the command. */
2004 int qbl
= sdslen(c
->querybuf
);
2006 if (c
->bulklen
<= qbl
) {
2007 /* Copy everything but the final CRLF as final argument */
2008 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
2010 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
2011 /* Process the command. If the client is still valid after
2012 * the processing and there is more data in the buffer
2013 * try to parse it. */
2014 if (processCommand(c
) && sdslen(c
->querybuf
)) goto again
;
2020 static void readQueryFromClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
2021 redisClient
*c
= (redisClient
*) privdata
;
2022 char buf
[REDIS_IOBUF_LEN
];
2025 REDIS_NOTUSED(mask
);
2027 nread
= read(fd
, buf
, REDIS_IOBUF_LEN
);
2029 if (errno
== EAGAIN
) {
2032 redisLog(REDIS_DEBUG
, "Reading from client: %s",strerror(errno
));
2036 } else if (nread
== 0) {
2037 redisLog(REDIS_DEBUG
, "Client closed connection");
2042 c
->querybuf
= sdscatlen(c
->querybuf
, buf
, nread
);
2043 c
->lastinteraction
= time(NULL
);
2047 processInputBuffer(c
);
2050 static int selectDb(redisClient
*c
, int id
) {
2051 if (id
< 0 || id
>= server
.dbnum
)
2053 c
->db
= &server
.db
[id
];
2057 static void *dupClientReplyValue(void *o
) {
2058 incrRefCount((robj
*)o
);
2062 static redisClient
*createClient(int fd
) {
2063 redisClient
*c
= zmalloc(sizeof(*c
));
2065 anetNonBlock(NULL
,fd
);
2066 anetTcpNoDelay(NULL
,fd
);
2067 if (!c
) return NULL
;
2070 c
->querybuf
= sdsempty();
2079 c
->lastinteraction
= time(NULL
);
2080 c
->authenticated
= 0;
2081 c
->replstate
= REDIS_REPL_NONE
;
2082 c
->reply
= listCreate();
2083 c
->blockingkey
= NULL
;
2084 listSetFreeMethod(c
->reply
,decrRefCount
);
2085 listSetDupMethod(c
->reply
,dupClientReplyValue
);
2086 if (aeCreateFileEvent(server
.el
, c
->fd
, AE_READABLE
,
2087 readQueryFromClient
, c
) == AE_ERR
) {
2091 listAddNodeTail(server
.clients
,c
);
2092 initClientMultiState(c
);
2096 static void addReply(redisClient
*c
, robj
*obj
) {
2097 if (listLength(c
->reply
) == 0 &&
2098 (c
->replstate
== REDIS_REPL_NONE
||
2099 c
->replstate
== REDIS_REPL_ONLINE
) &&
2100 aeCreateFileEvent(server
.el
, c
->fd
, AE_WRITABLE
,
2101 sendReplyToClient
, c
) == AE_ERR
) return;
2102 listAddNodeTail(c
->reply
,getDecodedObject(obj
));
2105 static void addReplySds(redisClient
*c
, sds s
) {
2106 robj
*o
= createObject(REDIS_STRING
,s
);
2111 static void addReplyDouble(redisClient
*c
, double d
) {
2114 snprintf(buf
,sizeof(buf
),"%.17g",d
);
2115 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n%s\r\n",
2116 (unsigned long) strlen(buf
),buf
));
2119 static void addReplyBulkLen(redisClient
*c
, robj
*obj
) {
2122 if (obj
->encoding
== REDIS_ENCODING_RAW
) {
2123 len
= sdslen(obj
->ptr
);
2125 long n
= (long)obj
->ptr
;
2127 /* Compute how many bytes will take this integer as a radix 10 string */
2133 while((n
= n
/10) != 0) {
2137 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n",(unsigned long)len
));
2140 static void acceptHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
2145 REDIS_NOTUSED(mask
);
2146 REDIS_NOTUSED(privdata
);
2148 cfd
= anetAccept(server
.neterr
, fd
, cip
, &cport
);
2149 if (cfd
== AE_ERR
) {
2150 redisLog(REDIS_DEBUG
,"Accepting client connection: %s", server
.neterr
);
2153 redisLog(REDIS_DEBUG
,"Accepted %s:%d", cip
, cport
);
2154 if ((c
= createClient(cfd
)) == NULL
) {
2155 redisLog(REDIS_WARNING
,"Error allocating resoures for the client");
2156 close(cfd
); /* May be already closed, just ingore errors */
2159 /* If maxclient directive is set and this is one client more... close the
2160 * connection. Note that we create the client instead to check before
2161 * for this condition, since now the socket is already set in nonblocking
2162 * mode and we can send an error for free using the Kernel I/O */
2163 if (server
.maxclients
&& listLength(server
.clients
) > server
.maxclients
) {
2164 char *err
= "-ERR max number of clients reached\r\n";
2166 /* That's a best effort error message, don't check write errors */
2167 if (write(c
->fd
,err
,strlen(err
)) == -1) {
2168 /* Nothing to do, Just to avoid the warning... */
2173 server
.stat_numconnections
++;
2176 /* ======================= Redis objects implementation ===================== */
2178 static robj
*createObject(int type
, void *ptr
) {
2181 if (listLength(server
.objfreelist
)) {
2182 listNode
*head
= listFirst(server
.objfreelist
);
2183 o
= listNodeValue(head
);
2184 listDelNode(server
.objfreelist
,head
);
2186 o
= zmalloc(sizeof(*o
));
2189 o
->encoding
= REDIS_ENCODING_RAW
;
2195 static robj
*createStringObject(char *ptr
, size_t len
) {
2196 return createObject(REDIS_STRING
,sdsnewlen(ptr
,len
));
2199 static robj
*createListObject(void) {
2200 list
*l
= listCreate();
2202 listSetFreeMethod(l
,decrRefCount
);
2203 return createObject(REDIS_LIST
,l
);
2206 static robj
*createSetObject(void) {
2207 dict
*d
= dictCreate(&setDictType
,NULL
);
2208 return createObject(REDIS_SET
,d
);
2211 static robj
*createZsetObject(void) {
2212 zset
*zs
= zmalloc(sizeof(*zs
));
2214 zs
->dict
= dictCreate(&zsetDictType
,NULL
);
2215 zs
->zsl
= zslCreate();
2216 return createObject(REDIS_ZSET
,zs
);
2219 static void freeStringObject(robj
*o
) {
2220 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2225 static void freeListObject(robj
*o
) {
2226 listRelease((list
*) o
->ptr
);
2229 static void freeSetObject(robj
*o
) {
2230 dictRelease((dict
*) o
->ptr
);
2233 static void freeZsetObject(robj
*o
) {
2236 dictRelease(zs
->dict
);
2241 static void freeHashObject(robj
*o
) {
2242 dictRelease((dict
*) o
->ptr
);
2245 static void incrRefCount(robj
*o
) {
2247 #ifdef DEBUG_REFCOUNT
2248 if (o
->type
== REDIS_STRING
)
2249 printf("Increment '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
);
2253 static void decrRefCount(void *obj
) {
2256 #ifdef DEBUG_REFCOUNT
2257 if (o
->type
== REDIS_STRING
)
2258 printf("Decrement '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
-1);
2260 if (--(o
->refcount
) == 0) {
2262 case REDIS_STRING
: freeStringObject(o
); break;
2263 case REDIS_LIST
: freeListObject(o
); break;
2264 case REDIS_SET
: freeSetObject(o
); break;
2265 case REDIS_ZSET
: freeZsetObject(o
); break;
2266 case REDIS_HASH
: freeHashObject(o
); break;
2267 default: redisAssert(0 != 0); break;
2269 if (listLength(server
.objfreelist
) > REDIS_OBJFREELIST_MAX
||
2270 !listAddNodeHead(server
.objfreelist
,o
))
2275 static robj
*lookupKey(redisDb
*db
, robj
*key
) {
2276 dictEntry
*de
= dictFind(db
->dict
,key
);
2277 return de
? dictGetEntryVal(de
) : NULL
;
2280 static robj
*lookupKeyRead(redisDb
*db
, robj
*key
) {
2281 expireIfNeeded(db
,key
);
2282 return lookupKey(db
,key
);
2285 static robj
*lookupKeyWrite(redisDb
*db
, robj
*key
) {
2286 deleteIfVolatile(db
,key
);
2287 return lookupKey(db
,key
);
2290 static int deleteKey(redisDb
*db
, robj
*key
) {
2293 /* We need to protect key from destruction: after the first dictDelete()
2294 * it may happen that 'key' is no longer valid if we don't increment
2295 * it's count. This may happen when we get the object reference directly
2296 * from the hash table with dictRandomKey() or dict iterators */
2298 if (dictSize(db
->expires
)) dictDelete(db
->expires
,key
);
2299 retval
= dictDelete(db
->dict
,key
);
2302 return retval
== DICT_OK
;
2305 /* Try to share an object against the shared objects pool */
2306 static robj
*tryObjectSharing(robj
*o
) {
2307 struct dictEntry
*de
;
2310 if (o
== NULL
|| server
.shareobjects
== 0) return o
;
2312 redisAssert(o
->type
== REDIS_STRING
);
2313 de
= dictFind(server
.sharingpool
,o
);
2315 robj
*shared
= dictGetEntryKey(de
);
2317 c
= ((unsigned long) dictGetEntryVal(de
))+1;
2318 dictGetEntryVal(de
) = (void*) c
;
2319 incrRefCount(shared
);
2323 /* Here we are using a stream algorihtm: Every time an object is
2324 * shared we increment its count, everytime there is a miss we
2325 * recrement the counter of a random object. If this object reaches
2326 * zero we remove the object and put the current object instead. */
2327 if (dictSize(server
.sharingpool
) >=
2328 server
.sharingpoolsize
) {
2329 de
= dictGetRandomKey(server
.sharingpool
);
2330 redisAssert(de
!= NULL
);
2331 c
= ((unsigned long) dictGetEntryVal(de
))-1;
2332 dictGetEntryVal(de
) = (void*) c
;
2334 dictDelete(server
.sharingpool
,de
->key
);
2337 c
= 0; /* If the pool is empty we want to add this object */
2342 retval
= dictAdd(server
.sharingpool
,o
,(void*)1);
2343 redisAssert(retval
== DICT_OK
);
2350 /* Check if the nul-terminated string 's' can be represented by a long
2351 * (that is, is a number that fits into long without any other space or
2352 * character before or after the digits).
2354 * If so, the function returns REDIS_OK and *longval is set to the value
2355 * of the number. Otherwise REDIS_ERR is returned */
2356 static int isStringRepresentableAsLong(sds s
, long *longval
) {
2357 char buf
[32], *endptr
;
2361 value
= strtol(s
, &endptr
, 10);
2362 if (endptr
[0] != '\0') return REDIS_ERR
;
2363 slen
= snprintf(buf
,32,"%ld",value
);
2365 /* If the number converted back into a string is not identical
2366 * then it's not possible to encode the string as integer */
2367 if (sdslen(s
) != (unsigned)slen
|| memcmp(buf
,s
,slen
)) return REDIS_ERR
;
2368 if (longval
) *longval
= value
;
2372 /* Try to encode a string object in order to save space */
2373 static int tryObjectEncoding(robj
*o
) {
2377 if (o
->encoding
!= REDIS_ENCODING_RAW
)
2378 return REDIS_ERR
; /* Already encoded */
2380 /* It's not save to encode shared objects: shared objects can be shared
2381 * everywhere in the "object space" of Redis. Encoded objects can only
2382 * appear as "values" (and not, for instance, as keys) */
2383 if (o
->refcount
> 1) return REDIS_ERR
;
2385 /* Currently we try to encode only strings */
2386 redisAssert(o
->type
== REDIS_STRING
);
2388 /* Check if we can represent this string as a long integer */
2389 if (isStringRepresentableAsLong(s
,&value
) == REDIS_ERR
) return REDIS_ERR
;
2391 /* Ok, this object can be encoded */
2392 o
->encoding
= REDIS_ENCODING_INT
;
2394 o
->ptr
= (void*) value
;
2398 /* Get a decoded version of an encoded object (returned as a new object).
2399 * If the object is already raw-encoded just increment the ref count. */
2400 static robj
*getDecodedObject(robj
*o
) {
2403 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2407 if (o
->type
== REDIS_STRING
&& o
->encoding
== REDIS_ENCODING_INT
) {
2410 snprintf(buf
,32,"%ld",(long)o
->ptr
);
2411 dec
= createStringObject(buf
,strlen(buf
));
2414 redisAssert(1 != 1);
2418 /* Compare two string objects via strcmp() or alike.
2419 * Note that the objects may be integer-encoded. In such a case we
2420 * use snprintf() to get a string representation of the numbers on the stack
2421 * and compare the strings, it's much faster than calling getDecodedObject().
2423 * Important note: if objects are not integer encoded, but binary-safe strings,
2424 * sdscmp() from sds.c will apply memcmp() so this function ca be considered
2426 static int compareStringObjects(robj
*a
, robj
*b
) {
2427 redisAssert(a
->type
== REDIS_STRING
&& b
->type
== REDIS_STRING
);
2428 char bufa
[128], bufb
[128], *astr
, *bstr
;
2431 if (a
== b
) return 0;
2432 if (a
->encoding
!= REDIS_ENCODING_RAW
) {
2433 snprintf(bufa
,sizeof(bufa
),"%ld",(long) a
->ptr
);
2439 if (b
->encoding
!= REDIS_ENCODING_RAW
) {
2440 snprintf(bufb
,sizeof(bufb
),"%ld",(long) b
->ptr
);
2446 return bothsds
? sdscmp(astr
,bstr
) : strcmp(astr
,bstr
);
2449 static size_t stringObjectLen(robj
*o
) {
2450 redisAssert(o
->type
== REDIS_STRING
);
2451 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2452 return sdslen(o
->ptr
);
2456 return snprintf(buf
,32,"%ld",(long)o
->ptr
);
2460 /*============================ DB saving/loading ============================ */
2462 static int rdbSaveType(FILE *fp
, unsigned char type
) {
2463 if (fwrite(&type
,1,1,fp
) == 0) return -1;
2467 static int rdbSaveTime(FILE *fp
, time_t t
) {
2468 int32_t t32
= (int32_t) t
;
2469 if (fwrite(&t32
,4,1,fp
) == 0) return -1;
2473 /* check rdbLoadLen() comments for more info */
2474 static int rdbSaveLen(FILE *fp
, uint32_t len
) {
2475 unsigned char buf
[2];
2478 /* Save a 6 bit len */
2479 buf
[0] = (len
&0xFF)|(REDIS_RDB_6BITLEN
<<6);
2480 if (fwrite(buf
,1,1,fp
) == 0) return -1;
2481 } else if (len
< (1<<14)) {
2482 /* Save a 14 bit len */
2483 buf
[0] = ((len
>>8)&0xFF)|(REDIS_RDB_14BITLEN
<<6);
2485 if (fwrite(buf
,2,1,fp
) == 0) return -1;
2487 /* Save a 32 bit len */
2488 buf
[0] = (REDIS_RDB_32BITLEN
<<6);
2489 if (fwrite(buf
,1,1,fp
) == 0) return -1;
2491 if (fwrite(&len
,4,1,fp
) == 0) return -1;
2496 /* String objects in the form "2391" "-100" without any space and with a
2497 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2498 * encoded as integers to save space */
2499 static int rdbTryIntegerEncoding(sds s
, unsigned char *enc
) {
2501 char *endptr
, buf
[32];
2503 /* Check if it's possible to encode this value as a number */
2504 value
= strtoll(s
, &endptr
, 10);
2505 if (endptr
[0] != '\0') return 0;
2506 snprintf(buf
,32,"%lld",value
);
2508 /* If the number converted back into a string is not identical
2509 * then it's not possible to encode the string as integer */
2510 if (strlen(buf
) != sdslen(s
) || memcmp(buf
,s
,sdslen(s
))) return 0;
2512 /* Finally check if it fits in our ranges */
2513 if (value
>= -(1<<7) && value
<= (1<<7)-1) {
2514 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT8
;
2515 enc
[1] = value
&0xFF;
2517 } else if (value
>= -(1<<15) && value
<= (1<<15)-1) {
2518 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT16
;
2519 enc
[1] = value
&0xFF;
2520 enc
[2] = (value
>>8)&0xFF;
2522 } else if (value
>= -((long long)1<<31) && value
<= ((long long)1<<31)-1) {
2523 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT32
;
2524 enc
[1] = value
&0xFF;
2525 enc
[2] = (value
>>8)&0xFF;
2526 enc
[3] = (value
>>16)&0xFF;
2527 enc
[4] = (value
>>24)&0xFF;
2534 static int rdbSaveLzfStringObject(FILE *fp
, robj
*obj
) {
2535 unsigned int comprlen
, outlen
;
2539 /* We require at least four bytes compression for this to be worth it */
2540 outlen
= sdslen(obj
->ptr
)-4;
2541 if (outlen
<= 0) return 0;
2542 if ((out
= zmalloc(outlen
+1)) == NULL
) return 0;
2543 comprlen
= lzf_compress(obj
->ptr
, sdslen(obj
->ptr
), out
, outlen
);
2544 if (comprlen
== 0) {
2548 /* Data compressed! Let's save it on disk */
2549 byte
= (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_LZF
;
2550 if (fwrite(&byte
,1,1,fp
) == 0) goto writeerr
;
2551 if (rdbSaveLen(fp
,comprlen
) == -1) goto writeerr
;
2552 if (rdbSaveLen(fp
,sdslen(obj
->ptr
)) == -1) goto writeerr
;
2553 if (fwrite(out
,comprlen
,1,fp
) == 0) goto writeerr
;
2562 /* Save a string objet as [len][data] on disk. If the object is a string
2563 * representation of an integer value we try to safe it in a special form */
2564 static int rdbSaveStringObjectRaw(FILE *fp
, robj
*obj
) {
2568 len
= sdslen(obj
->ptr
);
2570 /* Try integer encoding */
2572 unsigned char buf
[5];
2573 if ((enclen
= rdbTryIntegerEncoding(obj
->ptr
,buf
)) > 0) {
2574 if (fwrite(buf
,enclen
,1,fp
) == 0) return -1;
2579 /* Try LZF compression - under 20 bytes it's unable to compress even
2580 * aaaaaaaaaaaaaaaaaa so skip it */
2581 if (server
.rdbcompression
&& len
> 20) {
2584 retval
= rdbSaveLzfStringObject(fp
,obj
);
2585 if (retval
== -1) return -1;
2586 if (retval
> 0) return 0;
2587 /* retval == 0 means data can't be compressed, save the old way */
2590 /* Store verbatim */
2591 if (rdbSaveLen(fp
,len
) == -1) return -1;
2592 if (len
&& fwrite(obj
->ptr
,len
,1,fp
) == 0) return -1;
2596 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2597 static int rdbSaveStringObject(FILE *fp
, robj
*obj
) {
2600 obj
= getDecodedObject(obj
);
2601 retval
= rdbSaveStringObjectRaw(fp
,obj
);
2606 /* Save a double value. Doubles are saved as strings prefixed by an unsigned
2607 * 8 bit integer specifing the length of the representation.
2608 * This 8 bit integer has special values in order to specify the following
2614 static int rdbSaveDoubleValue(FILE *fp
, double val
) {
2615 unsigned char buf
[128];
2621 } else if (!isfinite(val
)) {
2623 buf
[0] = (val
< 0) ? 255 : 254;
2625 snprintf((char*)buf
+1,sizeof(buf
)-1,"%.17g",val
);
2626 buf
[0] = strlen((char*)buf
+1);
2629 if (fwrite(buf
,len
,1,fp
) == 0) return -1;
2633 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2634 static int rdbSave(char *filename
) {
2635 dictIterator
*di
= NULL
;
2640 time_t now
= time(NULL
);
2642 snprintf(tmpfile
,256,"temp-%d.rdb", (int) getpid());
2643 fp
= fopen(tmpfile
,"w");
2645 redisLog(REDIS_WARNING
, "Failed saving the DB: %s", strerror(errno
));
2648 if (fwrite("REDIS0001",9,1,fp
) == 0) goto werr
;
2649 for (j
= 0; j
< server
.dbnum
; j
++) {
2650 redisDb
*db
= server
.db
+j
;
2652 if (dictSize(d
) == 0) continue;
2653 di
= dictGetIterator(d
);
2659 /* Write the SELECT DB opcode */
2660 if (rdbSaveType(fp
,REDIS_SELECTDB
) == -1) goto werr
;
2661 if (rdbSaveLen(fp
,j
) == -1) goto werr
;
2663 /* Iterate this DB writing every entry */
2664 while((de
= dictNext(di
)) != NULL
) {
2665 robj
*key
= dictGetEntryKey(de
);
2666 robj
*o
= dictGetEntryVal(de
);
2667 time_t expiretime
= getExpire(db
,key
);
2669 /* Save the expire time */
2670 if (expiretime
!= -1) {
2671 /* If this key is already expired skip it */
2672 if (expiretime
< now
) continue;
2673 if (rdbSaveType(fp
,REDIS_EXPIRETIME
) == -1) goto werr
;
2674 if (rdbSaveTime(fp
,expiretime
) == -1) goto werr
;
2676 /* Save the key and associated value */
2677 if (rdbSaveType(fp
,o
->type
) == -1) goto werr
;
2678 if (rdbSaveStringObject(fp
,key
) == -1) goto werr
;
2679 if (o
->type
== REDIS_STRING
) {
2680 /* Save a string value */
2681 if (rdbSaveStringObject(fp
,o
) == -1) goto werr
;
2682 } else if (o
->type
== REDIS_LIST
) {
2683 /* Save a list value */
2684 list
*list
= o
->ptr
;
2688 if (rdbSaveLen(fp
,listLength(list
)) == -1) goto werr
;
2689 while((ln
= listYield(list
))) {
2690 robj
*eleobj
= listNodeValue(ln
);
2692 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
2694 } else if (o
->type
== REDIS_SET
) {
2695 /* Save a set value */
2697 dictIterator
*di
= dictGetIterator(set
);
2700 if (rdbSaveLen(fp
,dictSize(set
)) == -1) goto werr
;
2701 while((de
= dictNext(di
)) != NULL
) {
2702 robj
*eleobj
= dictGetEntryKey(de
);
2704 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
2706 dictReleaseIterator(di
);
2707 } else if (o
->type
== REDIS_ZSET
) {
2708 /* Save a set value */
2710 dictIterator
*di
= dictGetIterator(zs
->dict
);
2713 if (rdbSaveLen(fp
,dictSize(zs
->dict
)) == -1) goto werr
;
2714 while((de
= dictNext(di
)) != NULL
) {
2715 robj
*eleobj
= dictGetEntryKey(de
);
2716 double *score
= dictGetEntryVal(de
);
2718 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
2719 if (rdbSaveDoubleValue(fp
,*score
) == -1) goto werr
;
2721 dictReleaseIterator(di
);
2723 redisAssert(0 != 0);
2726 dictReleaseIterator(di
);
2729 if (rdbSaveType(fp
,REDIS_EOF
) == -1) goto werr
;
2731 /* Make sure data will not remain on the OS's output buffers */
2736 /* Use RENAME to make sure the DB file is changed atomically only
2737 * if the generate DB file is ok. */
2738 if (rename(tmpfile
,filename
) == -1) {
2739 redisLog(REDIS_WARNING
,"Error moving temp DB file on the final destination: %s", strerror(errno
));
2743 redisLog(REDIS_NOTICE
,"DB saved on disk");
2745 server
.lastsave
= time(NULL
);
2751 redisLog(REDIS_WARNING
,"Write error saving DB on disk: %s", strerror(errno
));
2752 if (di
) dictReleaseIterator(di
);
2756 static int rdbSaveBackground(char *filename
) {
2759 if (server
.bgsavechildpid
!= -1) return REDIS_ERR
;
2760 if ((childpid
= fork()) == 0) {
2763 if (rdbSave(filename
) == REDIS_OK
) {
2770 if (childpid
== -1) {
2771 redisLog(REDIS_WARNING
,"Can't save in background: fork: %s",
2775 redisLog(REDIS_NOTICE
,"Background saving started by pid %d",childpid
);
2776 server
.bgsavechildpid
= childpid
;
2779 return REDIS_OK
; /* unreached */
2782 static void rdbRemoveTempFile(pid_t childpid
) {
2785 snprintf(tmpfile
,256,"temp-%d.rdb", (int) childpid
);
2789 static int rdbLoadType(FILE *fp
) {
2791 if (fread(&type
,1,1,fp
) == 0) return -1;
2795 static time_t rdbLoadTime(FILE *fp
) {
2797 if (fread(&t32
,4,1,fp
) == 0) return -1;
2798 return (time_t) t32
;
2801 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2802 * of this file for a description of how this are stored on disk.
2804 * isencoded is set to 1 if the readed length is not actually a length but
2805 * an "encoding type", check the above comments for more info */
2806 static uint32_t rdbLoadLen(FILE *fp
, int rdbver
, int *isencoded
) {
2807 unsigned char buf
[2];
2810 if (isencoded
) *isencoded
= 0;
2812 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
2817 if (fread(buf
,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2818 type
= (buf
[0]&0xC0)>>6;
2819 if (type
== REDIS_RDB_6BITLEN
) {
2820 /* Read a 6 bit len */
2822 } else if (type
== REDIS_RDB_ENCVAL
) {
2823 /* Read a 6 bit len encoding type */
2824 if (isencoded
) *isencoded
= 1;
2826 } else if (type
== REDIS_RDB_14BITLEN
) {
2827 /* Read a 14 bit len */
2828 if (fread(buf
+1,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2829 return ((buf
[0]&0x3F)<<8)|buf
[1];
2831 /* Read a 32 bit len */
2832 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
2838 static robj
*rdbLoadIntegerObject(FILE *fp
, int enctype
) {
2839 unsigned char enc
[4];
2842 if (enctype
== REDIS_RDB_ENC_INT8
) {
2843 if (fread(enc
,1,1,fp
) == 0) return NULL
;
2844 val
= (signed char)enc
[0];
2845 } else if (enctype
== REDIS_RDB_ENC_INT16
) {
2847 if (fread(enc
,2,1,fp
) == 0) return NULL
;
2848 v
= enc
[0]|(enc
[1]<<8);
2850 } else if (enctype
== REDIS_RDB_ENC_INT32
) {
2852 if (fread(enc
,4,1,fp
) == 0) return NULL
;
2853 v
= enc
[0]|(enc
[1]<<8)|(enc
[2]<<16)|(enc
[3]<<24);
2856 val
= 0; /* anti-warning */
2859 return createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",val
));
2862 static robj
*rdbLoadLzfStringObject(FILE*fp
, int rdbver
) {
2863 unsigned int len
, clen
;
2864 unsigned char *c
= NULL
;
2867 if ((clen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2868 if ((len
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2869 if ((c
= zmalloc(clen
)) == NULL
) goto err
;
2870 if ((val
= sdsnewlen(NULL
,len
)) == NULL
) goto err
;
2871 if (fread(c
,clen
,1,fp
) == 0) goto err
;
2872 if (lzf_decompress(c
,clen
,val
,len
) == 0) goto err
;
2874 return createObject(REDIS_STRING
,val
);
2881 static robj
*rdbLoadStringObject(FILE*fp
, int rdbver
) {
2886 len
= rdbLoadLen(fp
,rdbver
,&isencoded
);
2889 case REDIS_RDB_ENC_INT8
:
2890 case REDIS_RDB_ENC_INT16
:
2891 case REDIS_RDB_ENC_INT32
:
2892 return tryObjectSharing(rdbLoadIntegerObject(fp
,len
));
2893 case REDIS_RDB_ENC_LZF
:
2894 return tryObjectSharing(rdbLoadLzfStringObject(fp
,rdbver
));
2900 if (len
== REDIS_RDB_LENERR
) return NULL
;
2901 val
= sdsnewlen(NULL
,len
);
2902 if (len
&& fread(val
,len
,1,fp
) == 0) {
2906 return tryObjectSharing(createObject(REDIS_STRING
,val
));
2909 /* For information about double serialization check rdbSaveDoubleValue() */
2910 static int rdbLoadDoubleValue(FILE *fp
, double *val
) {
2914 if (fread(&len
,1,1,fp
) == 0) return -1;
2916 case 255: *val
= R_NegInf
; return 0;
2917 case 254: *val
= R_PosInf
; return 0;
2918 case 253: *val
= R_Nan
; return 0;
2920 if (fread(buf
,len
,1,fp
) == 0) return -1;
2922 sscanf(buf
, "%lg", val
);
2927 static int rdbLoad(char *filename
) {
2929 robj
*keyobj
= NULL
;
2931 int type
, retval
, rdbver
;
2932 dict
*d
= server
.db
[0].dict
;
2933 redisDb
*db
= server
.db
+0;
2935 time_t expiretime
= -1, now
= time(NULL
);
2937 fp
= fopen(filename
,"r");
2938 if (!fp
) return REDIS_ERR
;
2939 if (fread(buf
,9,1,fp
) == 0) goto eoferr
;
2941 if (memcmp(buf
,"REDIS",5) != 0) {
2943 redisLog(REDIS_WARNING
,"Wrong signature trying to load DB from file");
2946 rdbver
= atoi(buf
+5);
2949 redisLog(REDIS_WARNING
,"Can't handle RDB format version %d",rdbver
);
2956 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
2957 if (type
== REDIS_EXPIRETIME
) {
2958 if ((expiretime
= rdbLoadTime(fp
)) == -1) goto eoferr
;
2959 /* We read the time so we need to read the object type again */
2960 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
2962 if (type
== REDIS_EOF
) break;
2963 /* Handle SELECT DB opcode as a special case */
2964 if (type
== REDIS_SELECTDB
) {
2965 if ((dbid
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2967 if (dbid
>= (unsigned)server
.dbnum
) {
2968 redisLog(REDIS_WARNING
,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server
.dbnum
);
2971 db
= server
.db
+dbid
;
2976 if ((keyobj
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2978 if (type
== REDIS_STRING
) {
2979 /* Read string value */
2980 if ((o
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2981 tryObjectEncoding(o
);
2982 } else if (type
== REDIS_LIST
|| type
== REDIS_SET
) {
2983 /* Read list/set value */
2986 if ((listlen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2988 o
= (type
== REDIS_LIST
) ? createListObject() : createSetObject();
2989 /* Load every single element of the list/set */
2993 if ((ele
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2994 tryObjectEncoding(ele
);
2995 if (type
== REDIS_LIST
) {
2996 listAddNodeTail((list
*)o
->ptr
,ele
);
2998 dictAdd((dict
*)o
->ptr
,ele
,NULL
);
3001 } else if (type
== REDIS_ZSET
) {
3002 /* Read list/set value */
3006 if ((zsetlen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
3008 o
= createZsetObject();
3010 /* Load every single element of the list/set */
3013 double *score
= zmalloc(sizeof(double));
3015 if ((ele
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
3016 tryObjectEncoding(ele
);
3017 if (rdbLoadDoubleValue(fp
,score
) == -1) goto eoferr
;
3018 dictAdd(zs
->dict
,ele
,score
);
3019 zslInsert(zs
->zsl
,*score
,ele
);
3020 incrRefCount(ele
); /* added to skiplist */
3023 redisAssert(0 != 0);
3025 /* Add the new object in the hash table */
3026 retval
= dictAdd(d
,keyobj
,o
);
3027 if (retval
== DICT_ERR
) {
3028 redisLog(REDIS_WARNING
,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj
->ptr
);
3031 /* Set the expire time if needed */
3032 if (expiretime
!= -1) {
3033 setExpire(db
,keyobj
,expiretime
);
3034 /* Delete this key if already expired */
3035 if (expiretime
< now
) deleteKey(db
,keyobj
);
3043 eoferr
: /* unexpected end of file is handled here with a fatal exit */
3044 if (keyobj
) decrRefCount(keyobj
);
3045 redisLog(REDIS_WARNING
,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
3047 return REDIS_ERR
; /* Just to avoid warning */
3050 /*================================== Commands =============================== */
3052 static void authCommand(redisClient
*c
) {
3053 if (!server
.requirepass
|| !strcmp(c
->argv
[1]->ptr
, server
.requirepass
)) {
3054 c
->authenticated
= 1;
3055 addReply(c
,shared
.ok
);
3057 c
->authenticated
= 0;
3058 addReplySds(c
,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
3062 static void pingCommand(redisClient
*c
) {
3063 addReply(c
,shared
.pong
);
3066 static void echoCommand(redisClient
*c
) {
3067 addReplyBulkLen(c
,c
->argv
[1]);
3068 addReply(c
,c
->argv
[1]);
3069 addReply(c
,shared
.crlf
);
3072 /*=================================== Strings =============================== */
3074 static void setGenericCommand(redisClient
*c
, int nx
) {
3077 if (nx
) deleteIfVolatile(c
->db
,c
->argv
[1]);
3078 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
3079 if (retval
== DICT_ERR
) {
3081 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
3082 incrRefCount(c
->argv
[2]);
3084 addReply(c
,shared
.czero
);
3088 incrRefCount(c
->argv
[1]);
3089 incrRefCount(c
->argv
[2]);
3092 removeExpire(c
->db
,c
->argv
[1]);
3093 addReply(c
, nx
? shared
.cone
: shared
.ok
);
3096 static void setCommand(redisClient
*c
) {
3097 setGenericCommand(c
,0);
3100 static void setnxCommand(redisClient
*c
) {
3101 setGenericCommand(c
,1);
3104 static int getGenericCommand(redisClient
*c
) {
3105 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3108 addReply(c
,shared
.nullbulk
);
3111 if (o
->type
!= REDIS_STRING
) {
3112 addReply(c
,shared
.wrongtypeerr
);
3115 addReplyBulkLen(c
,o
);
3117 addReply(c
,shared
.crlf
);
3123 static void getCommand(redisClient
*c
) {
3124 getGenericCommand(c
);
3127 static void getsetCommand(redisClient
*c
) {
3128 if (getGenericCommand(c
) == REDIS_ERR
) return;
3129 if (dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]) == DICT_ERR
) {
3130 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
3132 incrRefCount(c
->argv
[1]);
3134 incrRefCount(c
->argv
[2]);
3136 removeExpire(c
->db
,c
->argv
[1]);
3139 static void mgetCommand(redisClient
*c
) {
3142 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",c
->argc
-1));
3143 for (j
= 1; j
< c
->argc
; j
++) {
3144 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[j
]);
3146 addReply(c
,shared
.nullbulk
);
3148 if (o
->type
!= REDIS_STRING
) {
3149 addReply(c
,shared
.nullbulk
);
3151 addReplyBulkLen(c
,o
);
3153 addReply(c
,shared
.crlf
);
3159 static void msetGenericCommand(redisClient
*c
, int nx
) {
3160 int j
, busykeys
= 0;
3162 if ((c
->argc
% 2) == 0) {
3163 addReplySds(c
,sdsnew("-ERR wrong number of arguments for MSET\r\n"));
3166 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
3167 * set nothing at all if at least one already key exists. */
3169 for (j
= 1; j
< c
->argc
; j
+= 2) {
3170 if (lookupKeyWrite(c
->db
,c
->argv
[j
]) != NULL
) {
3176 addReply(c
, shared
.czero
);
3180 for (j
= 1; j
< c
->argc
; j
+= 2) {
3183 tryObjectEncoding(c
->argv
[j
+1]);
3184 retval
= dictAdd(c
->db
->dict
,c
->argv
[j
],c
->argv
[j
+1]);
3185 if (retval
== DICT_ERR
) {
3186 dictReplace(c
->db
->dict
,c
->argv
[j
],c
->argv
[j
+1]);
3187 incrRefCount(c
->argv
[j
+1]);
3189 incrRefCount(c
->argv
[j
]);
3190 incrRefCount(c
->argv
[j
+1]);
3192 removeExpire(c
->db
,c
->argv
[j
]);
3194 server
.dirty
+= (c
->argc
-1)/2;
3195 addReply(c
, nx
? shared
.cone
: shared
.ok
);
3198 static void msetCommand(redisClient
*c
) {
3199 msetGenericCommand(c
,0);
3202 static void msetnxCommand(redisClient
*c
) {
3203 msetGenericCommand(c
,1);
3206 static void incrDecrCommand(redisClient
*c
, long long incr
) {
3211 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3215 if (o
->type
!= REDIS_STRING
) {
3220 if (o
->encoding
== REDIS_ENCODING_RAW
)
3221 value
= strtoll(o
->ptr
, &eptr
, 10);
3222 else if (o
->encoding
== REDIS_ENCODING_INT
)
3223 value
= (long)o
->ptr
;
3225 redisAssert(1 != 1);
3230 o
= createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",value
));
3231 tryObjectEncoding(o
);
3232 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],o
);
3233 if (retval
== DICT_ERR
) {
3234 dictReplace(c
->db
->dict
,c
->argv
[1],o
);
3235 removeExpire(c
->db
,c
->argv
[1]);
3237 incrRefCount(c
->argv
[1]);
3240 addReply(c
,shared
.colon
);
3242 addReply(c
,shared
.crlf
);
3245 static void incrCommand(redisClient
*c
) {
3246 incrDecrCommand(c
,1);
3249 static void decrCommand(redisClient
*c
) {
3250 incrDecrCommand(c
,-1);
3253 static void incrbyCommand(redisClient
*c
) {
3254 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
3255 incrDecrCommand(c
,incr
);
3258 static void decrbyCommand(redisClient
*c
) {
3259 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
3260 incrDecrCommand(c
,-incr
);
3263 /* ========================= Type agnostic commands ========================= */
3265 static void delCommand(redisClient
*c
) {
3268 for (j
= 1; j
< c
->argc
; j
++) {
3269 if (deleteKey(c
->db
,c
->argv
[j
])) {
3276 addReply(c
,shared
.czero
);
3279 addReply(c
,shared
.cone
);
3282 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",deleted
));
3287 static void existsCommand(redisClient
*c
) {
3288 addReply(c
,lookupKeyRead(c
->db
,c
->argv
[1]) ? shared
.cone
: shared
.czero
);
3291 static void selectCommand(redisClient
*c
) {
3292 int id
= atoi(c
->argv
[1]->ptr
);
3294 if (selectDb(c
,id
) == REDIS_ERR
) {
3295 addReplySds(c
,sdsnew("-ERR invalid DB index\r\n"));
3297 addReply(c
,shared
.ok
);
3301 static void randomkeyCommand(redisClient
*c
) {
3305 de
= dictGetRandomKey(c
->db
->dict
);
3306 if (!de
|| expireIfNeeded(c
->db
,dictGetEntryKey(de
)) == 0) break;
3309 addReply(c
,shared
.plus
);
3310 addReply(c
,shared
.crlf
);
3312 addReply(c
,shared
.plus
);
3313 addReply(c
,dictGetEntryKey(de
));
3314 addReply(c
,shared
.crlf
);
3318 static void keysCommand(redisClient
*c
) {
3321 sds pattern
= c
->argv
[1]->ptr
;
3322 int plen
= sdslen(pattern
);
3323 unsigned long numkeys
= 0, keyslen
= 0;
3324 robj
*lenobj
= createObject(REDIS_STRING
,NULL
);
3326 di
= dictGetIterator(c
->db
->dict
);
3328 decrRefCount(lenobj
);
3329 while((de
= dictNext(di
)) != NULL
) {
3330 robj
*keyobj
= dictGetEntryKey(de
);
3332 sds key
= keyobj
->ptr
;
3333 if ((pattern
[0] == '*' && pattern
[1] == '\0') ||
3334 stringmatchlen(pattern
,plen
,key
,sdslen(key
),0)) {
3335 if (expireIfNeeded(c
->db
,keyobj
) == 0) {
3337 addReply(c
,shared
.space
);
3340 keyslen
+= sdslen(key
);
3344 dictReleaseIterator(di
);
3345 lenobj
->ptr
= sdscatprintf(sdsempty(),"$%lu\r\n",keyslen
+(numkeys
? (numkeys
-1) : 0));
3346 addReply(c
,shared
.crlf
);
3349 static void dbsizeCommand(redisClient
*c
) {
3351 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c
->db
->dict
)));
3354 static void lastsaveCommand(redisClient
*c
) {
3356 sdscatprintf(sdsempty(),":%lu\r\n",server
.lastsave
));
3359 static void typeCommand(redisClient
*c
) {
3363 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3368 case REDIS_STRING
: type
= "+string"; break;
3369 case REDIS_LIST
: type
= "+list"; break;
3370 case REDIS_SET
: type
= "+set"; break;
3371 case REDIS_ZSET
: type
= "+zset"; break;
3372 default: type
= "unknown"; break;
3375 addReplySds(c
,sdsnew(type
));
3376 addReply(c
,shared
.crlf
);
3379 static void saveCommand(redisClient
*c
) {
3380 if (server
.bgsavechildpid
!= -1) {
3381 addReplySds(c
,sdsnew("-ERR background save in progress\r\n"));
3384 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
3385 addReply(c
,shared
.ok
);
3387 addReply(c
,shared
.err
);
3391 static void bgsaveCommand(redisClient
*c
) {
3392 if (server
.bgsavechildpid
!= -1) {
3393 addReplySds(c
,sdsnew("-ERR background save already in progress\r\n"));
3396 if (rdbSaveBackground(server
.dbfilename
) == REDIS_OK
) {
3397 char *status
= "+Background saving started\r\n";
3398 addReplySds(c
,sdsnew(status
));
3400 addReply(c
,shared
.err
);
3404 static void shutdownCommand(redisClient
*c
) {
3405 redisLog(REDIS_WARNING
,"User requested shutdown, saving DB...");
3406 /* Kill the saving child if there is a background saving in progress.
3407 We want to avoid race conditions, for instance our saving child may
3408 overwrite the synchronous saving did by SHUTDOWN. */
3409 if (server
.bgsavechildpid
!= -1) {
3410 redisLog(REDIS_WARNING
,"There is a live saving child. Killing it!");
3411 kill(server
.bgsavechildpid
,SIGKILL
);
3412 rdbRemoveTempFile(server
.bgsavechildpid
);
3414 if (server
.appendonly
) {
3415 /* Append only file: fsync() the AOF and exit */
3416 fsync(server
.appendfd
);
3419 /* Snapshotting. Perform a SYNC SAVE and exit */
3420 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
3421 if (server
.daemonize
)
3422 unlink(server
.pidfile
);
3423 redisLog(REDIS_WARNING
,"%zu bytes used at exit",zmalloc_used_memory());
3424 redisLog(REDIS_WARNING
,"Server exit now, bye bye...");
3427 /* Ooops.. error saving! The best we can do is to continue operating.
3428 * Note that if there was a background saving process, in the next
3429 * cron() Redis will be notified that the background saving aborted,
3430 * handling special stuff like slaves pending for synchronization... */
3431 redisLog(REDIS_WARNING
,"Error trying to save the DB, can't exit");
3432 addReplySds(c
,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3437 static void renameGenericCommand(redisClient
*c
, int nx
) {
3440 /* To use the same key as src and dst is probably an error */
3441 if (sdscmp(c
->argv
[1]->ptr
,c
->argv
[2]->ptr
) == 0) {
3442 addReply(c
,shared
.sameobjecterr
);
3446 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3448 addReply(c
,shared
.nokeyerr
);
3452 deleteIfVolatile(c
->db
,c
->argv
[2]);
3453 if (dictAdd(c
->db
->dict
,c
->argv
[2],o
) == DICT_ERR
) {
3456 addReply(c
,shared
.czero
);
3459 dictReplace(c
->db
->dict
,c
->argv
[2],o
);
3461 incrRefCount(c
->argv
[2]);
3463 deleteKey(c
->db
,c
->argv
[1]);
3465 addReply(c
,nx
? shared
.cone
: shared
.ok
);
3468 static void renameCommand(redisClient
*c
) {
3469 renameGenericCommand(c
,0);
3472 static void renamenxCommand(redisClient
*c
) {
3473 renameGenericCommand(c
,1);
3476 static void moveCommand(redisClient
*c
) {
3481 /* Obtain source and target DB pointers */
3484 if (selectDb(c
,atoi(c
->argv
[2]->ptr
)) == REDIS_ERR
) {
3485 addReply(c
,shared
.outofrangeerr
);
3489 selectDb(c
,srcid
); /* Back to the source DB */
3491 /* If the user is moving using as target the same
3492 * DB as the source DB it is probably an error. */
3494 addReply(c
,shared
.sameobjecterr
);
3498 /* Check if the element exists and get a reference */
3499 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3501 addReply(c
,shared
.czero
);
3505 /* Try to add the element to the target DB */
3506 deleteIfVolatile(dst
,c
->argv
[1]);
3507 if (dictAdd(dst
->dict
,c
->argv
[1],o
) == DICT_ERR
) {
3508 addReply(c
,shared
.czero
);
3511 incrRefCount(c
->argv
[1]);
3514 /* OK! key moved, free the entry in the source DB */
3515 deleteKey(src
,c
->argv
[1]);
3517 addReply(c
,shared
.cone
);
3520 /* =================================== Lists ================================ */
3521 static void pushGenericCommand(redisClient
*c
, int where
) {
3525 lobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3527 if (handleClientsWaitingListPush(c
,c
->argv
[1],c
->argv
[2])) return;
3528 lobj
= createListObject();
3530 if (where
== REDIS_HEAD
) {
3531 listAddNodeHead(list
,c
->argv
[2]);
3533 listAddNodeTail(list
,c
->argv
[2]);
3535 dictAdd(c
->db
->dict
,c
->argv
[1],lobj
);
3536 incrRefCount(c
->argv
[1]);
3537 incrRefCount(c
->argv
[2]);
3539 if (lobj
->type
!= REDIS_LIST
) {
3540 addReply(c
,shared
.wrongtypeerr
);
3543 if (handleClientsWaitingListPush(c
,c
->argv
[1],c
->argv
[2])) return;
3545 if (where
== REDIS_HEAD
) {
3546 listAddNodeHead(list
,c
->argv
[2]);
3548 listAddNodeTail(list
,c
->argv
[2]);
3550 incrRefCount(c
->argv
[2]);
3553 addReply(c
,shared
.ok
);
3556 static void lpushCommand(redisClient
*c
) {
3557 pushGenericCommand(c
,REDIS_HEAD
);
3560 static void rpushCommand(redisClient
*c
) {
3561 pushGenericCommand(c
,REDIS_TAIL
);
3564 static void llenCommand(redisClient
*c
) {
3568 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3570 addReply(c
,shared
.czero
);
3573 if (o
->type
!= REDIS_LIST
) {
3574 addReply(c
,shared
.wrongtypeerr
);
3577 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",listLength(l
)));
3582 static void lindexCommand(redisClient
*c
) {
3584 int index
= atoi(c
->argv
[2]->ptr
);
3586 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3588 addReply(c
,shared
.nullbulk
);
3590 if (o
->type
!= REDIS_LIST
) {
3591 addReply(c
,shared
.wrongtypeerr
);
3593 list
*list
= o
->ptr
;
3596 ln
= listIndex(list
, index
);
3598 addReply(c
,shared
.nullbulk
);
3600 robj
*ele
= listNodeValue(ln
);
3601 addReplyBulkLen(c
,ele
);
3603 addReply(c
,shared
.crlf
);
3609 static void lsetCommand(redisClient
*c
) {
3611 int index
= atoi(c
->argv
[2]->ptr
);
3613 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3615 addReply(c
,shared
.nokeyerr
);
3617 if (o
->type
!= REDIS_LIST
) {
3618 addReply(c
,shared
.wrongtypeerr
);
3620 list
*list
= o
->ptr
;
3623 ln
= listIndex(list
, index
);
3625 addReply(c
,shared
.outofrangeerr
);
3627 robj
*ele
= listNodeValue(ln
);
3630 listNodeValue(ln
) = c
->argv
[3];
3631 incrRefCount(c
->argv
[3]);
3632 addReply(c
,shared
.ok
);
3639 static void popGenericCommand(redisClient
*c
, int where
) {
3642 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3644 addReply(c
,shared
.nullbulk
);
3646 if (o
->type
!= REDIS_LIST
) {
3647 addReply(c
,shared
.wrongtypeerr
);
3649 list
*list
= o
->ptr
;
3652 if (where
== REDIS_HEAD
)
3653 ln
= listFirst(list
);
3655 ln
= listLast(list
);
3658 addReply(c
,shared
.nullbulk
);
3660 robj
*ele
= listNodeValue(ln
);
3661 addReplyBulkLen(c
,ele
);
3663 addReply(c
,shared
.crlf
);
3664 listDelNode(list
,ln
);
3671 static void lpopCommand(redisClient
*c
) {
3672 popGenericCommand(c
,REDIS_HEAD
);
3675 static void rpopCommand(redisClient
*c
) {
3676 popGenericCommand(c
,REDIS_TAIL
);
3679 static void lrangeCommand(redisClient
*c
) {
3681 int start
= atoi(c
->argv
[2]->ptr
);
3682 int end
= atoi(c
->argv
[3]->ptr
);
3684 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3686 addReply(c
,shared
.nullmultibulk
);
3688 if (o
->type
!= REDIS_LIST
) {
3689 addReply(c
,shared
.wrongtypeerr
);
3691 list
*list
= o
->ptr
;
3693 int llen
= listLength(list
);
3697 /* convert negative indexes */
3698 if (start
< 0) start
= llen
+start
;
3699 if (end
< 0) end
= llen
+end
;
3700 if (start
< 0) start
= 0;
3701 if (end
< 0) end
= 0;
3703 /* indexes sanity checks */
3704 if (start
> end
|| start
>= llen
) {
3705 /* Out of range start or start > end result in empty list */
3706 addReply(c
,shared
.emptymultibulk
);
3709 if (end
>= llen
) end
= llen
-1;
3710 rangelen
= (end
-start
)+1;
3712 /* Return the result in form of a multi-bulk reply */
3713 ln
= listIndex(list
, start
);
3714 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",rangelen
));
3715 for (j
= 0; j
< rangelen
; j
++) {
3716 ele
= listNodeValue(ln
);
3717 addReplyBulkLen(c
,ele
);
3719 addReply(c
,shared
.crlf
);
3726 static void ltrimCommand(redisClient
*c
) {
3728 int start
= atoi(c
->argv
[2]->ptr
);
3729 int end
= atoi(c
->argv
[3]->ptr
);
3731 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3733 addReply(c
,shared
.ok
);
3735 if (o
->type
!= REDIS_LIST
) {
3736 addReply(c
,shared
.wrongtypeerr
);
3738 list
*list
= o
->ptr
;
3740 int llen
= listLength(list
);
3741 int j
, ltrim
, rtrim
;
3743 /* convert negative indexes */
3744 if (start
< 0) start
= llen
+start
;
3745 if (end
< 0) end
= llen
+end
;
3746 if (start
< 0) start
= 0;
3747 if (end
< 0) end
= 0;
3749 /* indexes sanity checks */
3750 if (start
> end
|| start
>= llen
) {
3751 /* Out of range start or start > end result in empty list */
3755 if (end
>= llen
) end
= llen
-1;
3760 /* Remove list elements to perform the trim */
3761 for (j
= 0; j
< ltrim
; j
++) {
3762 ln
= listFirst(list
);
3763 listDelNode(list
,ln
);
3765 for (j
= 0; j
< rtrim
; j
++) {
3766 ln
= listLast(list
);
3767 listDelNode(list
,ln
);
3770 addReply(c
,shared
.ok
);
3775 static void lremCommand(redisClient
*c
) {
3778 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3780 addReply(c
,shared
.czero
);
3782 if (o
->type
!= REDIS_LIST
) {
3783 addReply(c
,shared
.wrongtypeerr
);
3785 list
*list
= o
->ptr
;
3786 listNode
*ln
, *next
;
3787 int toremove
= atoi(c
->argv
[2]->ptr
);
3792 toremove
= -toremove
;
3795 ln
= fromtail
? list
->tail
: list
->head
;
3797 robj
*ele
= listNodeValue(ln
);
3799 next
= fromtail
? ln
->prev
: ln
->next
;
3800 if (compareStringObjects(ele
,c
->argv
[3]) == 0) {
3801 listDelNode(list
,ln
);
3804 if (toremove
&& removed
== toremove
) break;
3808 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",removed
));
3813 /* This is the semantic of this command:
3814 * RPOPLPUSH srclist dstlist:
3815 * IF LLEN(srclist) > 0
3816 * element = RPOP srclist
3817 * LPUSH dstlist element
3824 * The idea is to be able to get an element from a list in a reliable way
3825 * since the element is not just returned but pushed against another list
3826 * as well. This command was originally proposed by Ezra Zygmuntowicz.
3828 static void rpoplpushcommand(redisClient
*c
) {
3831 sobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3833 addReply(c
,shared
.nullbulk
);
3835 if (sobj
->type
!= REDIS_LIST
) {
3836 addReply(c
,shared
.wrongtypeerr
);
3838 list
*srclist
= sobj
->ptr
;
3839 listNode
*ln
= listLast(srclist
);
3842 addReply(c
,shared
.nullbulk
);
3844 robj
*dobj
= lookupKeyWrite(c
->db
,c
->argv
[2]);
3845 robj
*ele
= listNodeValue(ln
);
3850 /* Create the list if the key does not exist */
3851 dobj
= createListObject();
3852 dictAdd(c
->db
->dict
,c
->argv
[2],dobj
);
3853 incrRefCount(c
->argv
[2]);
3854 } else if (dobj
->type
!= REDIS_LIST
) {
3855 addReply(c
,shared
.wrongtypeerr
);
3858 /* Add the element to the target list */
3859 dstlist
= dobj
->ptr
;
3860 listAddNodeHead(dstlist
,ele
);
3863 /* Send the element to the client as reply as well */
3864 addReplyBulkLen(c
,ele
);
3866 addReply(c
,shared
.crlf
);
3868 /* Finally remove the element from the source list */
3869 listDelNode(srclist
,ln
);
3877 /* ==================================== Sets ================================ */
3879 static void saddCommand(redisClient
*c
) {
3882 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3884 set
= createSetObject();
3885 dictAdd(c
->db
->dict
,c
->argv
[1],set
);
3886 incrRefCount(c
->argv
[1]);
3888 if (set
->type
!= REDIS_SET
) {
3889 addReply(c
,shared
.wrongtypeerr
);
3893 if (dictAdd(set
->ptr
,c
->argv
[2],NULL
) == DICT_OK
) {
3894 incrRefCount(c
->argv
[2]);
3896 addReply(c
,shared
.cone
);
3898 addReply(c
,shared
.czero
);
3902 static void sremCommand(redisClient
*c
) {
3905 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3907 addReply(c
,shared
.czero
);
3909 if (set
->type
!= REDIS_SET
) {
3910 addReply(c
,shared
.wrongtypeerr
);
3913 if (dictDelete(set
->ptr
,c
->argv
[2]) == DICT_OK
) {
3915 if (htNeedsResize(set
->ptr
)) dictResize(set
->ptr
);
3916 addReply(c
,shared
.cone
);
3918 addReply(c
,shared
.czero
);
3923 static void smoveCommand(redisClient
*c
) {
3924 robj
*srcset
, *dstset
;
3926 srcset
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3927 dstset
= lookupKeyWrite(c
->db
,c
->argv
[2]);
3929 /* If the source key does not exist return 0, if it's of the wrong type
3931 if (srcset
== NULL
|| srcset
->type
!= REDIS_SET
) {
3932 addReply(c
, srcset
? shared
.wrongtypeerr
: shared
.czero
);
3935 /* Error if the destination key is not a set as well */
3936 if (dstset
&& dstset
->type
!= REDIS_SET
) {
3937 addReply(c
,shared
.wrongtypeerr
);
3940 /* Remove the element from the source set */
3941 if (dictDelete(srcset
->ptr
,c
->argv
[3]) == DICT_ERR
) {
3942 /* Key not found in the src set! return zero */
3943 addReply(c
,shared
.czero
);
3947 /* Add the element to the destination set */
3949 dstset
= createSetObject();
3950 dictAdd(c
->db
->dict
,c
->argv
[2],dstset
);
3951 incrRefCount(c
->argv
[2]);
3953 if (dictAdd(dstset
->ptr
,c
->argv
[3],NULL
) == DICT_OK
)
3954 incrRefCount(c
->argv
[3]);
3955 addReply(c
,shared
.cone
);
3958 static void sismemberCommand(redisClient
*c
) {
3961 set
= lookupKeyRead(c
->db
,c
->argv
[1]);
3963 addReply(c
,shared
.czero
);
3965 if (set
->type
!= REDIS_SET
) {
3966 addReply(c
,shared
.wrongtypeerr
);
3969 if (dictFind(set
->ptr
,c
->argv
[2]))
3970 addReply(c
,shared
.cone
);
3972 addReply(c
,shared
.czero
);
3976 static void scardCommand(redisClient
*c
) {
3980 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3982 addReply(c
,shared
.czero
);
3985 if (o
->type
!= REDIS_SET
) {
3986 addReply(c
,shared
.wrongtypeerr
);
3989 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",
3995 static void spopCommand(redisClient
*c
) {
3999 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4001 addReply(c
,shared
.nullbulk
);
4003 if (set
->type
!= REDIS_SET
) {
4004 addReply(c
,shared
.wrongtypeerr
);
4007 de
= dictGetRandomKey(set
->ptr
);
4009 addReply(c
,shared
.nullbulk
);
4011 robj
*ele
= dictGetEntryKey(de
);
4013 addReplyBulkLen(c
,ele
);
4015 addReply(c
,shared
.crlf
);
4016 dictDelete(set
->ptr
,ele
);
4017 if (htNeedsResize(set
->ptr
)) dictResize(set
->ptr
);
4023 static void srandmemberCommand(redisClient
*c
) {
4027 set
= lookupKeyRead(c
->db
,c
->argv
[1]);
4029 addReply(c
,shared
.nullbulk
);
4031 if (set
->type
!= REDIS_SET
) {
4032 addReply(c
,shared
.wrongtypeerr
);
4035 de
= dictGetRandomKey(set
->ptr
);
4037 addReply(c
,shared
.nullbulk
);
4039 robj
*ele
= dictGetEntryKey(de
);
4041 addReplyBulkLen(c
,ele
);
4043 addReply(c
,shared
.crlf
);
4048 static int qsortCompareSetsByCardinality(const void *s1
, const void *s2
) {
4049 dict
**d1
= (void*) s1
, **d2
= (void*) s2
;
4051 return dictSize(*d1
)-dictSize(*d2
);
4054 static void sinterGenericCommand(redisClient
*c
, robj
**setskeys
, unsigned long setsnum
, robj
*dstkey
) {
4055 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
4058 robj
*lenobj
= NULL
, *dstset
= NULL
;
4059 unsigned long j
, cardinality
= 0;
4061 for (j
= 0; j
< setsnum
; j
++) {
4065 lookupKeyWrite(c
->db
,setskeys
[j
]) :
4066 lookupKeyRead(c
->db
,setskeys
[j
]);
4070 if (deleteKey(c
->db
,dstkey
))
4072 addReply(c
,shared
.czero
);
4074 addReply(c
,shared
.nullmultibulk
);
4078 if (setobj
->type
!= REDIS_SET
) {
4080 addReply(c
,shared
.wrongtypeerr
);
4083 dv
[j
] = setobj
->ptr
;
4085 /* Sort sets from the smallest to largest, this will improve our
4086 * algorithm's performace */
4087 qsort(dv
,setsnum
,sizeof(dict
*),qsortCompareSetsByCardinality
);
4089 /* The first thing we should output is the total number of elements...
4090 * since this is a multi-bulk write, but at this stage we don't know
4091 * the intersection set size, so we use a trick, append an empty object
4092 * to the output list and save the pointer to later modify it with the
4095 lenobj
= createObject(REDIS_STRING
,NULL
);
4097 decrRefCount(lenobj
);
4099 /* If we have a target key where to store the resulting set
4100 * create this key with an empty set inside */
4101 dstset
= createSetObject();
4104 /* Iterate all the elements of the first (smallest) set, and test
4105 * the element against all the other sets, if at least one set does
4106 * not include the element it is discarded */
4107 di
= dictGetIterator(dv
[0]);
4109 while((de
= dictNext(di
)) != NULL
) {
4112 for (j
= 1; j
< setsnum
; j
++)
4113 if (dictFind(dv
[j
],dictGetEntryKey(de
)) == NULL
) break;
4115 continue; /* at least one set does not contain the member */
4116 ele
= dictGetEntryKey(de
);
4118 addReplyBulkLen(c
,ele
);
4120 addReply(c
,shared
.crlf
);
4123 dictAdd(dstset
->ptr
,ele
,NULL
);
4127 dictReleaseIterator(di
);
4130 /* Store the resulting set into the target */
4131 deleteKey(c
->db
,dstkey
);
4132 dictAdd(c
->db
->dict
,dstkey
,dstset
);
4133 incrRefCount(dstkey
);
4137 lenobj
->ptr
= sdscatprintf(sdsempty(),"*%lu\r\n",cardinality
);
4139 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",
4140 dictSize((dict
*)dstset
->ptr
)));
4146 static void sinterCommand(redisClient
*c
) {
4147 sinterGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
);
4150 static void sinterstoreCommand(redisClient
*c
) {
4151 sinterGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1]);
4154 #define REDIS_OP_UNION 0
4155 #define REDIS_OP_DIFF 1
4157 static void sunionDiffGenericCommand(redisClient
*c
, robj
**setskeys
, int setsnum
, robj
*dstkey
, int op
) {
4158 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
4161 robj
*dstset
= NULL
;
4162 int j
, cardinality
= 0;
4164 for (j
= 0; j
< setsnum
; j
++) {
4168 lookupKeyWrite(c
->db
,setskeys
[j
]) :
4169 lookupKeyRead(c
->db
,setskeys
[j
]);
4174 if (setobj
->type
!= REDIS_SET
) {
4176 addReply(c
,shared
.wrongtypeerr
);
4179 dv
[j
] = setobj
->ptr
;
4182 /* We need a temp set object to store our union. If the dstkey
4183 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
4184 * this set object will be the resulting object to set into the target key*/
4185 dstset
= createSetObject();
4187 /* Iterate all the elements of all the sets, add every element a single
4188 * time to the result set */
4189 for (j
= 0; j
< setsnum
; j
++) {
4190 if (op
== REDIS_OP_DIFF
&& j
== 0 && !dv
[j
]) break; /* result set is empty */
4191 if (!dv
[j
]) continue; /* non existing keys are like empty sets */
4193 di
= dictGetIterator(dv
[j
]);
4195 while((de
= dictNext(di
)) != NULL
) {
4198 /* dictAdd will not add the same element multiple times */
4199 ele
= dictGetEntryKey(de
);
4200 if (op
== REDIS_OP_UNION
|| j
== 0) {
4201 if (dictAdd(dstset
->ptr
,ele
,NULL
) == DICT_OK
) {
4205 } else if (op
== REDIS_OP_DIFF
) {
4206 if (dictDelete(dstset
->ptr
,ele
) == DICT_OK
) {
4211 dictReleaseIterator(di
);
4213 if (op
== REDIS_OP_DIFF
&& cardinality
== 0) break; /* result set is empty */
4216 /* Output the content of the resulting set, if not in STORE mode */
4218 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",cardinality
));
4219 di
= dictGetIterator(dstset
->ptr
);
4220 while((de
= dictNext(di
)) != NULL
) {
4223 ele
= dictGetEntryKey(de
);
4224 addReplyBulkLen(c
,ele
);
4226 addReply(c
,shared
.crlf
);
4228 dictReleaseIterator(di
);
4230 /* If we have a target key where to store the resulting set
4231 * create this key with the result set inside */
4232 deleteKey(c
->db
,dstkey
);
4233 dictAdd(c
->db
->dict
,dstkey
,dstset
);
4234 incrRefCount(dstkey
);
4239 decrRefCount(dstset
);
4241 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",
4242 dictSize((dict
*)dstset
->ptr
)));
4248 static void sunionCommand(redisClient
*c
) {
4249 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_UNION
);
4252 static void sunionstoreCommand(redisClient
*c
) {
4253 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_UNION
);
4256 static void sdiffCommand(redisClient
*c
) {
4257 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_DIFF
);
4260 static void sdiffstoreCommand(redisClient
*c
) {
4261 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_DIFF
);
4264 /* ==================================== ZSets =============================== */
4266 /* ZSETs are ordered sets using two data structures to hold the same elements
4267 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
4270 * The elements are added to an hash table mapping Redis objects to scores.
4271 * At the same time the elements are added to a skip list mapping scores
4272 * to Redis objects (so objects are sorted by scores in this "view"). */
4274 /* This skiplist implementation is almost a C translation of the original
4275 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
4276 * Alternative to Balanced Trees", modified in three ways:
4277 * a) this implementation allows for repeated values.
4278 * b) the comparison is not just by key (our 'score') but by satellite data.
4279 * c) there is a back pointer, so it's a doubly linked list with the back
4280 * pointers being only at "level 1". This allows to traverse the list
4281 * from tail to head, useful for ZREVRANGE. */
4283 static zskiplistNode
*zslCreateNode(int level
, double score
, robj
*obj
) {
4284 zskiplistNode
*zn
= zmalloc(sizeof(*zn
));
4286 zn
->forward
= zmalloc(sizeof(zskiplistNode
*) * level
);
4292 static zskiplist
*zslCreate(void) {
4296 zsl
= zmalloc(sizeof(*zsl
));
4299 zsl
->header
= zslCreateNode(ZSKIPLIST_MAXLEVEL
,0,NULL
);
4300 for (j
= 0; j
< ZSKIPLIST_MAXLEVEL
; j
++)
4301 zsl
->header
->forward
[j
] = NULL
;
4302 zsl
->header
->backward
= NULL
;
4307 static void zslFreeNode(zskiplistNode
*node
) {
4308 decrRefCount(node
->obj
);
4309 zfree(node
->forward
);
4313 static void zslFree(zskiplist
*zsl
) {
4314 zskiplistNode
*node
= zsl
->header
->forward
[0], *next
;
4316 zfree(zsl
->header
->forward
);
4319 next
= node
->forward
[0];
4326 static int zslRandomLevel(void) {
4328 while ((random()&0xFFFF) < (ZSKIPLIST_P
* 0xFFFF))
4333 static void zslInsert(zskiplist
*zsl
, double score
, robj
*obj
) {
4334 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
4338 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4339 while (x
->forward
[i
] &&
4340 (x
->forward
[i
]->score
< score
||
4341 (x
->forward
[i
]->score
== score
&&
4342 compareStringObjects(x
->forward
[i
]->obj
,obj
) < 0)))
4346 /* we assume the key is not already inside, since we allow duplicated
4347 * scores, and the re-insertion of score and redis object should never
4348 * happpen since the caller of zslInsert() should test in the hash table
4349 * if the element is already inside or not. */
4350 level
= zslRandomLevel();
4351 if (level
> zsl
->level
) {
4352 for (i
= zsl
->level
; i
< level
; i
++)
4353 update
[i
] = zsl
->header
;
4356 x
= zslCreateNode(level
,score
,obj
);
4357 for (i
= 0; i
< level
; i
++) {
4358 x
->forward
[i
] = update
[i
]->forward
[i
];
4359 update
[i
]->forward
[i
] = x
;
4361 x
->backward
= (update
[0] == zsl
->header
) ? NULL
: update
[0];
4363 x
->forward
[0]->backward
= x
;
4369 /* Delete an element with matching score/object from the skiplist. */
4370 static int zslDelete(zskiplist
*zsl
, double score
, robj
*obj
) {
4371 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
4375 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4376 while (x
->forward
[i
] &&
4377 (x
->forward
[i
]->score
< score
||
4378 (x
->forward
[i
]->score
== score
&&
4379 compareStringObjects(x
->forward
[i
]->obj
,obj
) < 0)))
4383 /* We may have multiple elements with the same score, what we need
4384 * is to find the element with both the right score and object. */
4386 if (x
&& score
== x
->score
&& compareStringObjects(x
->obj
,obj
) == 0) {
4387 for (i
= 0; i
< zsl
->level
; i
++) {
4388 if (update
[i
]->forward
[i
] != x
) break;
4389 update
[i
]->forward
[i
] = x
->forward
[i
];
4391 if (x
->forward
[0]) {
4392 x
->forward
[0]->backward
= (x
->backward
== zsl
->header
) ?
4395 zsl
->tail
= x
->backward
;
4398 while(zsl
->level
> 1 && zsl
->header
->forward
[zsl
->level
-1] == NULL
)
4403 return 0; /* not found */
4405 return 0; /* not found */
4408 /* Delete all the elements with score between min and max from the skiplist.
4409 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
4410 * Note that this function takes the reference to the hash table view of the
4411 * sorted set, in order to remove the elements from the hash table too. */
4412 static unsigned long zslDeleteRange(zskiplist
*zsl
, double min
, double max
, dict
*dict
) {
4413 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
4414 unsigned long removed
= 0;
4418 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4419 while (x
->forward
[i
] && x
->forward
[i
]->score
< min
)
4423 /* We may have multiple elements with the same score, what we need
4424 * is to find the element with both the right score and object. */
4426 while (x
&& x
->score
<= max
) {
4427 zskiplistNode
*next
;
4429 for (i
= 0; i
< zsl
->level
; i
++) {
4430 if (update
[i
]->forward
[i
] != x
) break;
4431 update
[i
]->forward
[i
] = x
->forward
[i
];
4433 if (x
->forward
[0]) {
4434 x
->forward
[0]->backward
= (x
->backward
== zsl
->header
) ?
4437 zsl
->tail
= x
->backward
;
4439 next
= x
->forward
[0];
4440 dictDelete(dict
,x
->obj
);
4442 while(zsl
->level
> 1 && zsl
->header
->forward
[zsl
->level
-1] == NULL
)
4448 return removed
; /* not found */
4451 /* Find the first node having a score equal or greater than the specified one.
4452 * Returns NULL if there is no match. */
4453 static zskiplistNode
*zslFirstWithScore(zskiplist
*zsl
, double score
) {
4458 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4459 while (x
->forward
[i
] && x
->forward
[i
]->score
< score
)
4462 /* We may have multiple elements with the same score, what we need
4463 * is to find the element with both the right score and object. */
4464 return x
->forward
[0];
4467 /* The actual Z-commands implementations */
4469 /* This generic command implements both ZADD and ZINCRBY.
4470 * scoreval is the score if the operation is a ZADD (doincrement == 0) or
4471 * the increment if the operation is a ZINCRBY (doincrement == 1). */
4472 static void zaddGenericCommand(redisClient
*c
, robj
*key
, robj
*ele
, double scoreval
, int doincrement
) {
4477 zsetobj
= lookupKeyWrite(c
->db
,key
);
4478 if (zsetobj
== NULL
) {
4479 zsetobj
= createZsetObject();
4480 dictAdd(c
->db
->dict
,key
,zsetobj
);
4483 if (zsetobj
->type
!= REDIS_ZSET
) {
4484 addReply(c
,shared
.wrongtypeerr
);
4490 /* Ok now since we implement both ZADD and ZINCRBY here the code
4491 * needs to handle the two different conditions. It's all about setting
4492 * '*score', that is, the new score to set, to the right value. */
4493 score
= zmalloc(sizeof(double));
4497 /* Read the old score. If the element was not present starts from 0 */
4498 de
= dictFind(zs
->dict
,ele
);
4500 double *oldscore
= dictGetEntryVal(de
);
4501 *score
= *oldscore
+ scoreval
;
4509 /* What follows is a simple remove and re-insert operation that is common
4510 * to both ZADD and ZINCRBY... */
4511 if (dictAdd(zs
->dict
,ele
,score
) == DICT_OK
) {
4512 /* case 1: New element */
4513 incrRefCount(ele
); /* added to hash */
4514 zslInsert(zs
->zsl
,*score
,ele
);
4515 incrRefCount(ele
); /* added to skiplist */
4518 addReplyDouble(c
,*score
);
4520 addReply(c
,shared
.cone
);
4525 /* case 2: Score update operation */
4526 de
= dictFind(zs
->dict
,ele
);
4527 redisAssert(de
!= NULL
);
4528 oldscore
= dictGetEntryVal(de
);
4529 if (*score
!= *oldscore
) {
4532 /* Remove and insert the element in the skip list with new score */
4533 deleted
= zslDelete(zs
->zsl
,*oldscore
,ele
);
4534 redisAssert(deleted
!= 0);
4535 zslInsert(zs
->zsl
,*score
,ele
);
4537 /* Update the score in the hash table */
4538 dictReplace(zs
->dict
,ele
,score
);
4544 addReplyDouble(c
,*score
);
4546 addReply(c
,shared
.czero
);
4550 static void zaddCommand(redisClient
*c
) {
4553 scoreval
= strtod(c
->argv
[2]->ptr
,NULL
);
4554 zaddGenericCommand(c
,c
->argv
[1],c
->argv
[3],scoreval
,0);
4557 static void zincrbyCommand(redisClient
*c
) {
4560 scoreval
= strtod(c
->argv
[2]->ptr
,NULL
);
4561 zaddGenericCommand(c
,c
->argv
[1],c
->argv
[3],scoreval
,1);
4564 static void zremCommand(redisClient
*c
) {
4568 zsetobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4569 if (zsetobj
== NULL
) {
4570 addReply(c
,shared
.czero
);
4576 if (zsetobj
->type
!= REDIS_ZSET
) {
4577 addReply(c
,shared
.wrongtypeerr
);
4581 de
= dictFind(zs
->dict
,c
->argv
[2]);
4583 addReply(c
,shared
.czero
);
4586 /* Delete from the skiplist */
4587 oldscore
= dictGetEntryVal(de
);
4588 deleted
= zslDelete(zs
->zsl
,*oldscore
,c
->argv
[2]);
4589 redisAssert(deleted
!= 0);
4591 /* Delete from the hash table */
4592 dictDelete(zs
->dict
,c
->argv
[2]);
4593 if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
);
4595 addReply(c
,shared
.cone
);
4599 static void zremrangebyscoreCommand(redisClient
*c
) {
4600 double min
= strtod(c
->argv
[2]->ptr
,NULL
);
4601 double max
= strtod(c
->argv
[3]->ptr
,NULL
);
4605 zsetobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4606 if (zsetobj
== NULL
) {
4607 addReply(c
,shared
.czero
);
4611 if (zsetobj
->type
!= REDIS_ZSET
) {
4612 addReply(c
,shared
.wrongtypeerr
);
4616 deleted
= zslDeleteRange(zs
->zsl
,min
,max
,zs
->dict
);
4617 if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
);
4618 server
.dirty
+= deleted
;
4619 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",deleted
));
4623 static void zrangeGenericCommand(redisClient
*c
, int reverse
) {
4625 int start
= atoi(c
->argv
[2]->ptr
);
4626 int end
= atoi(c
->argv
[3]->ptr
);
4629 if (c
->argc
== 5 && !strcasecmp(c
->argv
[4]->ptr
,"withscores")) {
4631 } else if (c
->argc
>= 5) {
4632 addReply(c
,shared
.syntaxerr
);
4636 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4638 addReply(c
,shared
.nullmultibulk
);
4640 if (o
->type
!= REDIS_ZSET
) {
4641 addReply(c
,shared
.wrongtypeerr
);
4643 zset
*zsetobj
= o
->ptr
;
4644 zskiplist
*zsl
= zsetobj
->zsl
;
4647 int llen
= zsl
->length
;
4651 /* convert negative indexes */
4652 if (start
< 0) start
= llen
+start
;
4653 if (end
< 0) end
= llen
+end
;
4654 if (start
< 0) start
= 0;
4655 if (end
< 0) end
= 0;
4657 /* indexes sanity checks */
4658 if (start
> end
|| start
>= llen
) {
4659 /* Out of range start or start > end result in empty list */
4660 addReply(c
,shared
.emptymultibulk
);
4663 if (end
>= llen
) end
= llen
-1;
4664 rangelen
= (end
-start
)+1;
4666 /* Return the result in form of a multi-bulk reply */
4672 ln
= zsl
->header
->forward
[0];
4674 ln
= ln
->forward
[0];
4677 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",
4678 withscores
? (rangelen
*2) : rangelen
));
4679 for (j
= 0; j
< rangelen
; j
++) {
4681 addReplyBulkLen(c
,ele
);
4683 addReply(c
,shared
.crlf
);
4685 addReplyDouble(c
,ln
->score
);
4686 ln
= reverse
? ln
->backward
: ln
->forward
[0];
4692 static void zrangeCommand(redisClient
*c
) {
4693 zrangeGenericCommand(c
,0);
4696 static void zrevrangeCommand(redisClient
*c
) {
4697 zrangeGenericCommand(c
,1);
4700 static void zrangebyscoreCommand(redisClient
*c
) {
4702 double min
= strtod(c
->argv
[2]->ptr
,NULL
);
4703 double max
= strtod(c
->argv
[3]->ptr
,NULL
);
4704 int offset
= 0, limit
= -1;
4706 if (c
->argc
!= 4 && c
->argc
!= 7) {
4708 sdsnew("-ERR wrong number of arguments for ZRANGEBYSCORE\r\n"));
4710 } else if (c
->argc
== 7 && strcasecmp(c
->argv
[4]->ptr
,"limit")) {
4711 addReply(c
,shared
.syntaxerr
);
4713 } else if (c
->argc
== 7) {
4714 offset
= atoi(c
->argv
[5]->ptr
);
4715 limit
= atoi(c
->argv
[6]->ptr
);
4716 if (offset
< 0) offset
= 0;
4719 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4721 addReply(c
,shared
.nullmultibulk
);
4723 if (o
->type
!= REDIS_ZSET
) {
4724 addReply(c
,shared
.wrongtypeerr
);
4726 zset
*zsetobj
= o
->ptr
;
4727 zskiplist
*zsl
= zsetobj
->zsl
;
4730 unsigned int rangelen
= 0;
4732 /* Get the first node with the score >= min */
4733 ln
= zslFirstWithScore(zsl
,min
);
4735 /* No element matching the speciifed interval */
4736 addReply(c
,shared
.emptymultibulk
);
4740 /* We don't know in advance how many matching elements there
4741 * are in the list, so we push this object that will represent
4742 * the multi-bulk length in the output buffer, and will "fix"
4744 lenobj
= createObject(REDIS_STRING
,NULL
);
4746 decrRefCount(lenobj
);
4748 while(ln
&& ln
->score
<= max
) {
4751 ln
= ln
->forward
[0];
4754 if (limit
== 0) break;
4756 addReplyBulkLen(c
,ele
);
4758 addReply(c
,shared
.crlf
);
4759 ln
= ln
->forward
[0];
4761 if (limit
> 0) limit
--;
4763 lenobj
->ptr
= sdscatprintf(sdsempty(),"*%d\r\n",rangelen
);
4768 static void zcardCommand(redisClient
*c
) {
4772 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4774 addReply(c
,shared
.czero
);
4777 if (o
->type
!= REDIS_ZSET
) {
4778 addReply(c
,shared
.wrongtypeerr
);
4781 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",zs
->zsl
->length
));
4786 static void zscoreCommand(redisClient
*c
) {
4790 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4792 addReply(c
,shared
.nullbulk
);
4795 if (o
->type
!= REDIS_ZSET
) {
4796 addReply(c
,shared
.wrongtypeerr
);
4801 de
= dictFind(zs
->dict
,c
->argv
[2]);
4803 addReply(c
,shared
.nullbulk
);
4805 double *score
= dictGetEntryVal(de
);
4807 addReplyDouble(c
,*score
);
4813 /* ========================= Non type-specific commands ==================== */
4815 static void flushdbCommand(redisClient
*c
) {
4816 server
.dirty
+= dictSize(c
->db
->dict
);
4817 dictEmpty(c
->db
->dict
);
4818 dictEmpty(c
->db
->expires
);
4819 addReply(c
,shared
.ok
);
4822 static void flushallCommand(redisClient
*c
) {
4823 server
.dirty
+= emptyDb();
4824 addReply(c
,shared
.ok
);
4825 rdbSave(server
.dbfilename
);
4829 static redisSortOperation
*createSortOperation(int type
, robj
*pattern
) {
4830 redisSortOperation
*so
= zmalloc(sizeof(*so
));
4832 so
->pattern
= pattern
;
4836 /* Return the value associated to the key with a name obtained
4837 * substituting the first occurence of '*' in 'pattern' with 'subst' */
4838 static robj
*lookupKeyByPattern(redisDb
*db
, robj
*pattern
, robj
*subst
) {
4842 int prefixlen
, sublen
, postfixlen
;
4843 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
4847 char buf
[REDIS_SORTKEY_MAX
+1];
4850 /* If the pattern is "#" return the substitution object itself in order
4851 * to implement the "SORT ... GET #" feature. */
4852 spat
= pattern
->ptr
;
4853 if (spat
[0] == '#' && spat
[1] == '\0') {
4857 /* The substitution object may be specially encoded. If so we create
4858 * a decoded object on the fly. Otherwise getDecodedObject will just
4859 * increment the ref count, that we'll decrement later. */
4860 subst
= getDecodedObject(subst
);
4863 if (sdslen(spat
)+sdslen(ssub
)-1 > REDIS_SORTKEY_MAX
) return NULL
;
4864 p
= strchr(spat
,'*');
4866 decrRefCount(subst
);
4871 sublen
= sdslen(ssub
);
4872 postfixlen
= sdslen(spat
)-(prefixlen
+1);
4873 memcpy(keyname
.buf
,spat
,prefixlen
);
4874 memcpy(keyname
.buf
+prefixlen
,ssub
,sublen
);
4875 memcpy(keyname
.buf
+prefixlen
+sublen
,p
+1,postfixlen
);
4876 keyname
.buf
[prefixlen
+sublen
+postfixlen
] = '\0';
4877 keyname
.len
= prefixlen
+sublen
+postfixlen
;
4879 initStaticStringObject(keyobj
,((char*)&keyname
)+(sizeof(long)*2))
4880 decrRefCount(subst
);
4882 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
4883 return lookupKeyRead(db
,&keyobj
);
4886 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
4887 * the additional parameter is not standard but a BSD-specific we have to
4888 * pass sorting parameters via the global 'server' structure */
4889 static int sortCompare(const void *s1
, const void *s2
) {
4890 const redisSortObject
*so1
= s1
, *so2
= s2
;
4893 if (!server
.sort_alpha
) {
4894 /* Numeric sorting. Here it's trivial as we precomputed scores */
4895 if (so1
->u
.score
> so2
->u
.score
) {
4897 } else if (so1
->u
.score
< so2
->u
.score
) {
4903 /* Alphanumeric sorting */
4904 if (server
.sort_bypattern
) {
4905 if (!so1
->u
.cmpobj
|| !so2
->u
.cmpobj
) {
4906 /* At least one compare object is NULL */
4907 if (so1
->u
.cmpobj
== so2
->u
.cmpobj
)
4909 else if (so1
->u
.cmpobj
== NULL
)
4914 /* We have both the objects, use strcoll */
4915 cmp
= strcoll(so1
->u
.cmpobj
->ptr
,so2
->u
.cmpobj
->ptr
);
4918 /* Compare elements directly */
4921 dec1
= getDecodedObject(so1
->obj
);
4922 dec2
= getDecodedObject(so2
->obj
);
4923 cmp
= strcoll(dec1
->ptr
,dec2
->ptr
);
4928 return server
.sort_desc
? -cmp
: cmp
;
4931 /* The SORT command is the most complex command in Redis. Warning: this code
4932 * is optimized for speed and a bit less for readability */
4933 static void sortCommand(redisClient
*c
) {
4936 int desc
= 0, alpha
= 0;
4937 int limit_start
= 0, limit_count
= -1, start
, end
;
4938 int j
, dontsort
= 0, vectorlen
;
4939 int getop
= 0; /* GET operation counter */
4940 robj
*sortval
, *sortby
= NULL
, *storekey
= NULL
;
4941 redisSortObject
*vector
; /* Resulting vector to sort */
4943 /* Lookup the key to sort. It must be of the right types */
4944 sortval
= lookupKeyRead(c
->db
,c
->argv
[1]);
4945 if (sortval
== NULL
) {
4946 addReply(c
,shared
.nullmultibulk
);
4949 if (sortval
->type
!= REDIS_SET
&& sortval
->type
!= REDIS_LIST
&&
4950 sortval
->type
!= REDIS_ZSET
)
4952 addReply(c
,shared
.wrongtypeerr
);
4956 /* Create a list of operations to perform for every sorted element.
4957 * Operations can be GET/DEL/INCR/DECR */
4958 operations
= listCreate();
4959 listSetFreeMethod(operations
,zfree
);
4962 /* Now we need to protect sortval incrementing its count, in the future
4963 * SORT may have options able to overwrite/delete keys during the sorting
4964 * and the sorted key itself may get destroied */
4965 incrRefCount(sortval
);
4967 /* The SORT command has an SQL-alike syntax, parse it */
4968 while(j
< c
->argc
) {
4969 int leftargs
= c
->argc
-j
-1;
4970 if (!strcasecmp(c
->argv
[j
]->ptr
,"asc")) {
4972 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"desc")) {
4974 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"alpha")) {
4976 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"limit") && leftargs
>= 2) {
4977 limit_start
= atoi(c
->argv
[j
+1]->ptr
);
4978 limit_count
= atoi(c
->argv
[j
+2]->ptr
);
4980 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"store") && leftargs
>= 1) {
4981 storekey
= c
->argv
[j
+1];
4983 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"by") && leftargs
>= 1) {
4984 sortby
= c
->argv
[j
+1];
4985 /* If the BY pattern does not contain '*', i.e. it is constant,
4986 * we don't need to sort nor to lookup the weight keys. */
4987 if (strchr(c
->argv
[j
+1]->ptr
,'*') == NULL
) dontsort
= 1;
4989 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"get") && leftargs
>= 1) {
4990 listAddNodeTail(operations
,createSortOperation(
4991 REDIS_SORT_GET
,c
->argv
[j
+1]));
4995 decrRefCount(sortval
);
4996 listRelease(operations
);
4997 addReply(c
,shared
.syntaxerr
);
5003 /* Load the sorting vector with all the objects to sort */
5004 switch(sortval
->type
) {
5005 case REDIS_LIST
: vectorlen
= listLength((list
*)sortval
->ptr
); break;
5006 case REDIS_SET
: vectorlen
= dictSize((dict
*)sortval
->ptr
); break;
5007 case REDIS_ZSET
: vectorlen
= dictSize(((zset
*)sortval
->ptr
)->dict
); break;
5008 default: vectorlen
= 0; redisAssert(0); /* Avoid GCC warning */
5010 vector
= zmalloc(sizeof(redisSortObject
)*vectorlen
);
5013 if (sortval
->type
== REDIS_LIST
) {
5014 list
*list
= sortval
->ptr
;
5018 while((ln
= listYield(list
))) {
5019 robj
*ele
= ln
->value
;
5020 vector
[j
].obj
= ele
;
5021 vector
[j
].u
.score
= 0;
5022 vector
[j
].u
.cmpobj
= NULL
;
5030 if (sortval
->type
== REDIS_SET
) {
5033 zset
*zs
= sortval
->ptr
;
5037 di
= dictGetIterator(set
);
5038 while((setele
= dictNext(di
)) != NULL
) {
5039 vector
[j
].obj
= dictGetEntryKey(setele
);
5040 vector
[j
].u
.score
= 0;
5041 vector
[j
].u
.cmpobj
= NULL
;
5044 dictReleaseIterator(di
);
5046 redisAssert(j
== vectorlen
);
5048 /* Now it's time to load the right scores in the sorting vector */
5049 if (dontsort
== 0) {
5050 for (j
= 0; j
< vectorlen
; j
++) {
5054 byval
= lookupKeyByPattern(c
->db
,sortby
,vector
[j
].obj
);
5055 if (!byval
|| byval
->type
!= REDIS_STRING
) continue;
5057 vector
[j
].u
.cmpobj
= getDecodedObject(byval
);
5059 if (byval
->encoding
== REDIS_ENCODING_RAW
) {
5060 vector
[j
].u
.score
= strtod(byval
->ptr
,NULL
);
5062 /* Don't need to decode the object if it's
5063 * integer-encoded (the only encoding supported) so
5064 * far. We can just cast it */
5065 if (byval
->encoding
== REDIS_ENCODING_INT
) {
5066 vector
[j
].u
.score
= (long)byval
->ptr
;
5068 redisAssert(1 != 1);
5073 if (vector
[j
].obj
->encoding
== REDIS_ENCODING_RAW
)
5074 vector
[j
].u
.score
= strtod(vector
[j
].obj
->ptr
,NULL
);
5076 if (vector
[j
].obj
->encoding
== REDIS_ENCODING_INT
)
5077 vector
[j
].u
.score
= (long) vector
[j
].obj
->ptr
;
5079 redisAssert(1 != 1);
5086 /* We are ready to sort the vector... perform a bit of sanity check
5087 * on the LIMIT option too. We'll use a partial version of quicksort. */
5088 start
= (limit_start
< 0) ? 0 : limit_start
;
5089 end
= (limit_count
< 0) ? vectorlen
-1 : start
+limit_count
-1;
5090 if (start
>= vectorlen
) {
5091 start
= vectorlen
-1;
5094 if (end
>= vectorlen
) end
= vectorlen
-1;
5096 if (dontsort
== 0) {
5097 server
.sort_desc
= desc
;
5098 server
.sort_alpha
= alpha
;
5099 server
.sort_bypattern
= sortby
? 1 : 0;
5100 if (sortby
&& (start
!= 0 || end
!= vectorlen
-1))
5101 pqsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
, start
,end
);
5103 qsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
);
5106 /* Send command output to the output buffer, performing the specified
5107 * GET/DEL/INCR/DECR operations if any. */
5108 outputlen
= getop
? getop
*(end
-start
+1) : end
-start
+1;
5109 if (storekey
== NULL
) {
5110 /* STORE option not specified, sent the sorting result to client */
5111 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",outputlen
));
5112 for (j
= start
; j
<= end
; j
++) {
5115 addReplyBulkLen(c
,vector
[j
].obj
);
5116 addReply(c
,vector
[j
].obj
);
5117 addReply(c
,shared
.crlf
);
5119 listRewind(operations
);
5120 while((ln
= listYield(operations
))) {
5121 redisSortOperation
*sop
= ln
->value
;
5122 robj
*val
= lookupKeyByPattern(c
->db
,sop
->pattern
,
5125 if (sop
->type
== REDIS_SORT_GET
) {
5126 if (!val
|| val
->type
!= REDIS_STRING
) {
5127 addReply(c
,shared
.nullbulk
);
5129 addReplyBulkLen(c
,val
);
5131 addReply(c
,shared
.crlf
);
5134 redisAssert(sop
->type
== REDIS_SORT_GET
); /* always fails */
5139 robj
*listObject
= createListObject();
5140 list
*listPtr
= (list
*) listObject
->ptr
;
5142 /* STORE option specified, set the sorting result as a List object */
5143 for (j
= start
; j
<= end
; j
++) {
5146 listAddNodeTail(listPtr
,vector
[j
].obj
);
5147 incrRefCount(vector
[j
].obj
);
5149 listRewind(operations
);
5150 while((ln
= listYield(operations
))) {
5151 redisSortOperation
*sop
= ln
->value
;
5152 robj
*val
= lookupKeyByPattern(c
->db
,sop
->pattern
,
5155 if (sop
->type
== REDIS_SORT_GET
) {
5156 if (!val
|| val
->type
!= REDIS_STRING
) {
5157 listAddNodeTail(listPtr
,createStringObject("",0));
5159 listAddNodeTail(listPtr
,val
);
5163 redisAssert(sop
->type
== REDIS_SORT_GET
); /* always fails */
5167 if (dictReplace(c
->db
->dict
,storekey
,listObject
)) {
5168 incrRefCount(storekey
);
5170 /* Note: we add 1 because the DB is dirty anyway since even if the
5171 * SORT result is empty a new key is set and maybe the old content
5173 server
.dirty
+= 1+outputlen
;
5174 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",outputlen
));
5178 decrRefCount(sortval
);
5179 listRelease(operations
);
5180 for (j
= 0; j
< vectorlen
; j
++) {
5181 if (sortby
&& alpha
&& vector
[j
].u
.cmpobj
)
5182 decrRefCount(vector
[j
].u
.cmpobj
);
5187 /* Create the string returned by the INFO command. This is decoupled
5188 * by the INFO command itself as we need to report the same information
5189 * on memory corruption problems. */
5190 static sds
genRedisInfoString(void) {
5192 time_t uptime
= time(NULL
)-server
.stat_starttime
;
5195 info
= sdscatprintf(sdsempty(),
5196 "redis_version:%s\r\n"
5198 "multiplexing_api:%s\r\n"
5199 "uptime_in_seconds:%ld\r\n"
5200 "uptime_in_days:%ld\r\n"
5201 "connected_clients:%d\r\n"
5202 "connected_slaves:%d\r\n"
5203 "used_memory:%zu\r\n"
5204 "changes_since_last_save:%lld\r\n"
5205 "bgsave_in_progress:%d\r\n"
5206 "last_save_time:%ld\r\n"
5207 "bgrewriteaof_in_progress:%d\r\n"
5208 "total_connections_received:%lld\r\n"
5209 "total_commands_processed:%lld\r\n"
5212 (sizeof(long) == 8) ? "64" : "32",
5216 listLength(server
.clients
)-listLength(server
.slaves
),
5217 listLength(server
.slaves
),
5220 server
.bgsavechildpid
!= -1,
5222 server
.bgrewritechildpid
!= -1,
5223 server
.stat_numconnections
,
5224 server
.stat_numcommands
,
5225 server
.masterhost
== NULL
? "master" : "slave"
5227 if (server
.masterhost
) {
5228 info
= sdscatprintf(info
,
5229 "master_host:%s\r\n"
5230 "master_port:%d\r\n"
5231 "master_link_status:%s\r\n"
5232 "master_last_io_seconds_ago:%d\r\n"
5235 (server
.replstate
== REDIS_REPL_CONNECTED
) ?
5237 server
.master
? ((int)(time(NULL
)-server
.master
->lastinteraction
)) : -1
5240 for (j
= 0; j
< server
.dbnum
; j
++) {
5241 long long keys
, vkeys
;
5243 keys
= dictSize(server
.db
[j
].dict
);
5244 vkeys
= dictSize(server
.db
[j
].expires
);
5245 if (keys
|| vkeys
) {
5246 info
= sdscatprintf(info
, "db%d:keys=%lld,expires=%lld\r\n",
5253 static void infoCommand(redisClient
*c
) {
5254 sds info
= genRedisInfoString();
5255 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n",
5256 (unsigned long)sdslen(info
)));
5257 addReplySds(c
,info
);
5258 addReply(c
,shared
.crlf
);
5261 static void monitorCommand(redisClient
*c
) {
5262 /* ignore MONITOR if aleady slave or in monitor mode */
5263 if (c
->flags
& REDIS_SLAVE
) return;
5265 c
->flags
|= (REDIS_SLAVE
|REDIS_MONITOR
);
5267 listAddNodeTail(server
.monitors
,c
);
5268 addReply(c
,shared
.ok
);
5271 /* ================================= Expire ================================= */
5272 static int removeExpire(redisDb
*db
, robj
*key
) {
5273 if (dictDelete(db
->expires
,key
) == DICT_OK
) {
5280 static int setExpire(redisDb
*db
, robj
*key
, time_t when
) {
5281 if (dictAdd(db
->expires
,key
,(void*)when
) == DICT_ERR
) {
5289 /* Return the expire time of the specified key, or -1 if no expire
5290 * is associated with this key (i.e. the key is non volatile) */
5291 static time_t getExpire(redisDb
*db
, robj
*key
) {
5294 /* No expire? return ASAP */
5295 if (dictSize(db
->expires
) == 0 ||
5296 (de
= dictFind(db
->expires
,key
)) == NULL
) return -1;
5298 return (time_t) dictGetEntryVal(de
);
5301 static int expireIfNeeded(redisDb
*db
, robj
*key
) {
5305 /* No expire? return ASAP */
5306 if (dictSize(db
->expires
) == 0 ||
5307 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
5309 /* Lookup the expire */
5310 when
= (time_t) dictGetEntryVal(de
);
5311 if (time(NULL
) <= when
) return 0;
5313 /* Delete the key */
5314 dictDelete(db
->expires
,key
);
5315 return dictDelete(db
->dict
,key
) == DICT_OK
;
5318 static int deleteIfVolatile(redisDb
*db
, robj
*key
) {
5321 /* No expire? return ASAP */
5322 if (dictSize(db
->expires
) == 0 ||
5323 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
5325 /* Delete the key */
5327 dictDelete(db
->expires
,key
);
5328 return dictDelete(db
->dict
,key
) == DICT_OK
;
5331 static void expireGenericCommand(redisClient
*c
, robj
*key
, time_t seconds
) {
5334 de
= dictFind(c
->db
->dict
,key
);
5336 addReply(c
,shared
.czero
);
5340 if (deleteKey(c
->db
,key
)) server
.dirty
++;
5341 addReply(c
, shared
.cone
);
5344 time_t when
= time(NULL
)+seconds
;
5345 if (setExpire(c
->db
,key
,when
)) {
5346 addReply(c
,shared
.cone
);
5349 addReply(c
,shared
.czero
);
5355 static void expireCommand(redisClient
*c
) {
5356 expireGenericCommand(c
,c
->argv
[1],strtol(c
->argv
[2]->ptr
,NULL
,10));
5359 static void expireatCommand(redisClient
*c
) {
5360 expireGenericCommand(c
,c
->argv
[1],strtol(c
->argv
[2]->ptr
,NULL
,10)-time(NULL
));
5363 static void ttlCommand(redisClient
*c
) {
5367 expire
= getExpire(c
->db
,c
->argv
[1]);
5369 ttl
= (int) (expire
-time(NULL
));
5370 if (ttl
< 0) ttl
= -1;
5372 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",ttl
));
5375 /* ================================ MULTI/EXEC ============================== */
5377 /* Client state initialization for MULTI/EXEC */
5378 static void initClientMultiState(redisClient
*c
) {
5379 c
->mstate
.commands
= NULL
;
5380 c
->mstate
.count
= 0;
5383 /* Release all the resources associated with MULTI/EXEC state */
5384 static void freeClientMultiState(redisClient
*c
) {
5387 for (j
= 0; j
< c
->mstate
.count
; j
++) {
5389 multiCmd
*mc
= c
->mstate
.commands
+j
;
5391 for (i
= 0; i
< mc
->argc
; i
++)
5392 decrRefCount(mc
->argv
[i
]);
5395 zfree(c
->mstate
.commands
);
5398 /* Add a new command into the MULTI commands queue */
5399 static void queueMultiCommand(redisClient
*c
, struct redisCommand
*cmd
) {
5403 c
->mstate
.commands
= zrealloc(c
->mstate
.commands
,
5404 sizeof(multiCmd
)*(c
->mstate
.count
+1));
5405 mc
= c
->mstate
.commands
+c
->mstate
.count
;
5408 mc
->argv
= zmalloc(sizeof(robj
*)*c
->argc
);
5409 memcpy(mc
->argv
,c
->argv
,sizeof(robj
*)*c
->argc
);
5410 for (j
= 0; j
< c
->argc
; j
++)
5411 incrRefCount(mc
->argv
[j
]);
5415 static void multiCommand(redisClient
*c
) {
5416 c
->flags
|= REDIS_MULTI
;
5417 addReply(c
,shared
.ok
);
5420 static void execCommand(redisClient
*c
) {
5425 if (!(c
->flags
& REDIS_MULTI
)) {
5426 addReplySds(c
,sdsnew("-ERR EXEC without MULTI\r\n"));
5430 orig_argv
= c
->argv
;
5431 orig_argc
= c
->argc
;
5432 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",c
->mstate
.count
));
5433 for (j
= 0; j
< c
->mstate
.count
; j
++) {
5434 c
->argc
= c
->mstate
.commands
[j
].argc
;
5435 c
->argv
= c
->mstate
.commands
[j
].argv
;
5436 call(c
,c
->mstate
.commands
[j
].cmd
);
5438 c
->argv
= orig_argv
;
5439 c
->argc
= orig_argc
;
5440 freeClientMultiState(c
);
5441 initClientMultiState(c
);
5442 c
->flags
&= (~REDIS_MULTI
);
5445 /* =========================== Blocking Operations ========================= */
5447 /* Currently Redis blocking operations support is limited to list POP ops,
5448 * so the current implementation is not fully generic, but it is also not
5449 * completely specific so it will not require a rewrite to support new
5450 * kind of blocking operations in the future.
5452 * Still it's important to note that list blocking operations can be already
5453 * used as a notification mechanism in order to implement other blocking
5454 * operations at application level, so there must be a very strong evidence
5455 * of usefulness and generality before new blocking operations are implemented.
5457 * This is how the current blocking POP works, we use BLPOP as example:
5458 * - If the user calls BLPOP and the key exists and contains a non empty list
5459 * then LPOP is called instead. So BLPOP is semantically the same as LPOP
5460 * if there is not to block.
5461 * - If instead BLPOP is called and the key does not exists or the list is
5462 * empty we need to block. In order to do so we remove the notification for
5463 * new data to read in the client socket (so that we'll not serve new
5464 * requests if the blocking request is not served). Also we put the client
5465 * in a dictionary (server.blockingkeys) mapping keys to a list of clients
5466 * blocking for this keys.
5467 * - If a PUSH operation against a key with blocked clients waiting is
5468 * performed, we serve the first in the list: basically instead to push
5469 * the new element inside the list we return it to the (first / oldest)
5470 * blocking client, unblock the client, and remove it form the list.
5472 * The above comment and the source code should be enough in order to understand
5473 * the implementation and modify / fix it later.
5476 /* Set a client in blocking mode for the specified key, with the specified
5478 static void blockForKey(redisClient
*c
, robj
*key
, time_t timeout
) {
5482 c
->blockingkey
= key
;
5484 c
->blockingto
= timeout
;
5485 de
= dictFind(c
->db
->blockingkeys
,key
);
5490 retval
= dictAdd(c
->db
->blockingkeys
,key
,l
);
5491 assert(retval
== DICT_OK
);
5493 l
= dictGetEntryVal(de
);
5495 listAddNodeTail(l
,c
);
5496 c
->flags
|= REDIS_BLOCKED
;
5497 aeDeleteFileEvent(server
.el
,c
->fd
,AE_READABLE
);
5500 /* Unblock a client that's waiting in a blocking operation such as BLPOP */
5501 static void unblockClient(redisClient
*c
) {
5505 /* Remove this client from the list of clients waiting for this key. */
5506 assert(c
->blockingkey
!= NULL
);
5507 de
= dictFind(c
->db
->blockingkeys
,c
->blockingkey
);
5509 l
= dictGetEntryVal(de
);
5510 listDelNode(l
,listSearchKey(l
,c
));
5511 /* If the list is empty we need to remove it to avoid wasting memory */
5512 if (listLength(l
) == 0)
5513 dictDelete(c
->db
->blockingkeys
,c
->blockingkey
);
5514 /* Finally set the right flags in the client structure */
5515 decrRefCount(c
->blockingkey
);
5516 c
->blockingkey
= NULL
;
5517 c
->flags
&= (~REDIS_BLOCKED
);
5518 /* Ok now we are ready to get read events from socket, note that we
5519 * can't trap errors here as it's possible that unblockClients() is
5520 * called from freeClient() itself, and the only thing we can do
5521 * if we failed to register the READABLE event is to kill the client.
5522 * Still the following function should never fail in the real world as
5523 * we are sure the file descriptor is sane, and we exit on out of mem. */
5524 aeCreateFileEvent(server
.el
, c
->fd
, AE_READABLE
, readQueryFromClient
, c
);
5525 /* As a final step we want to process data if there is some command waiting
5526 * in the input buffer. Note that this is safe even if unblockClient()
5527 * gets called from freeClient() because freeClient() will be smart
5528 * enough to call this function *after* c->querybuf was set to NULL. */
5529 if (c
->querybuf
&& sdslen(c
->querybuf
) > 0) processInputBuffer(c
);
5532 /* This should be called from any function PUSHing into lists.
5533 * 'c' is the "pushing client", 'key' is the key it is pushing data against,
5534 * 'ele' is the element pushed.
5536 * If the function returns 0 there was no client waiting for a list push
5539 * If the function returns 1 there was a client waiting for a list push
5540 * against this key, the element was passed to this client thus it's not
5541 * needed to actually add it to the list and the caller should return asap. */
5542 static int handleClientsWaitingListPush(redisClient
*c
, robj
*key
, robj
*ele
) {
5543 struct dictEntry
*de
;
5544 redisClient
*receiver
;
5548 de
= dictFind(c
->db
->blockingkeys
,key
);
5549 if (de
== NULL
) return 0;
5550 l
= dictGetEntryVal(de
);
5553 receiver
= ln
->value
;
5555 if (listLength(l
) == 0)
5556 dictDelete(c
->db
->blockingkeys
,key
);
5558 addReplyBulkLen(receiver
,ele
);
5559 addReply(receiver
,ele
);
5560 addReply(receiver
,shared
.crlf
);
5561 unblockClient(receiver
);
5565 /* Blocking RPOP/LPOP */
5566 static void blockingPopGenericCommand(redisClient
*c
, int where
) {
5570 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
5572 if (o
->type
!= REDIS_LIST
) {
5573 popGenericCommand(c
,where
);
5576 list
*list
= o
->ptr
;
5577 if (listLength(list
) != 0) {
5578 /* If the list contains elements fall back to the usual
5579 * non-blocking POP operation */
5580 popGenericCommand(c
,where
);
5585 /* If the list is empty or the key does not exists we must block */
5586 timeout
= strtol(c
->argv
[2]->ptr
,NULL
,10);
5587 if (timeout
> 0) timeout
+= time(NULL
);
5588 blockForKey(c
,c
->argv
[1],timeout
);
5591 static void blpopCommand(redisClient
*c
) {
5592 blockingPopGenericCommand(c
,REDIS_HEAD
);
5595 static void brpopCommand(redisClient
*c
) {
5596 blockingPopGenericCommand(c
,REDIS_TAIL
);
5599 /* =============================== Replication ============================= */
5601 static int syncWrite(int fd
, char *ptr
, ssize_t size
, int timeout
) {
5602 ssize_t nwritten
, ret
= size
;
5603 time_t start
= time(NULL
);
5607 if (aeWait(fd
,AE_WRITABLE
,1000) & AE_WRITABLE
) {
5608 nwritten
= write(fd
,ptr
,size
);
5609 if (nwritten
== -1) return -1;
5613 if ((time(NULL
)-start
) > timeout
) {
5621 static int syncRead(int fd
, char *ptr
, ssize_t size
, int timeout
) {
5622 ssize_t nread
, totread
= 0;
5623 time_t start
= time(NULL
);
5627 if (aeWait(fd
,AE_READABLE
,1000) & AE_READABLE
) {
5628 nread
= read(fd
,ptr
,size
);
5629 if (nread
== -1) return -1;
5634 if ((time(NULL
)-start
) > timeout
) {
5642 static int syncReadLine(int fd
, char *ptr
, ssize_t size
, int timeout
) {
5649 if (syncRead(fd
,&c
,1,timeout
) == -1) return -1;
5652 if (nread
&& *(ptr
-1) == '\r') *(ptr
-1) = '\0';
5663 static void syncCommand(redisClient
*c
) {
5664 /* ignore SYNC if aleady slave or in monitor mode */
5665 if (c
->flags
& REDIS_SLAVE
) return;
5667 /* SYNC can't be issued when the server has pending data to send to
5668 * the client about already issued commands. We need a fresh reply
5669 * buffer registering the differences between the BGSAVE and the current
5670 * dataset, so that we can copy to other slaves if needed. */
5671 if (listLength(c
->reply
) != 0) {
5672 addReplySds(c
,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
5676 redisLog(REDIS_NOTICE
,"Slave ask for synchronization");
5677 /* Here we need to check if there is a background saving operation
5678 * in progress, or if it is required to start one */
5679 if (server
.bgsavechildpid
!= -1) {
5680 /* Ok a background save is in progress. Let's check if it is a good
5681 * one for replication, i.e. if there is another slave that is
5682 * registering differences since the server forked to save */
5686 listRewind(server
.slaves
);
5687 while((ln
= listYield(server
.slaves
))) {
5689 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) break;
5692 /* Perfect, the server is already registering differences for
5693 * another slave. Set the right state, and copy the buffer. */
5694 listRelease(c
->reply
);
5695 c
->reply
= listDup(slave
->reply
);
5696 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
5697 redisLog(REDIS_NOTICE
,"Waiting for end of BGSAVE for SYNC");
5699 /* No way, we need to wait for the next BGSAVE in order to
5700 * register differences */
5701 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_START
;
5702 redisLog(REDIS_NOTICE
,"Waiting for next BGSAVE for SYNC");
5705 /* Ok we don't have a BGSAVE in progress, let's start one */
5706 redisLog(REDIS_NOTICE
,"Starting BGSAVE for SYNC");
5707 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
5708 redisLog(REDIS_NOTICE
,"Replication failed, can't BGSAVE");
5709 addReplySds(c
,sdsnew("-ERR Unalbe to perform background save\r\n"));
5712 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
5715 c
->flags
|= REDIS_SLAVE
;
5717 listAddNodeTail(server
.slaves
,c
);
5721 static void sendBulkToSlave(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
5722 redisClient
*slave
= privdata
;
5724 REDIS_NOTUSED(mask
);
5725 char buf
[REDIS_IOBUF_LEN
];
5726 ssize_t nwritten
, buflen
;
5728 if (slave
->repldboff
== 0) {
5729 /* Write the bulk write count before to transfer the DB. In theory here
5730 * we don't know how much room there is in the output buffer of the
5731 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
5732 * operations) will never be smaller than the few bytes we need. */
5735 bulkcount
= sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
5737 if (write(fd
,bulkcount
,sdslen(bulkcount
)) != (signed)sdslen(bulkcount
))
5745 lseek(slave
->repldbfd
,slave
->repldboff
,SEEK_SET
);
5746 buflen
= read(slave
->repldbfd
,buf
,REDIS_IOBUF_LEN
);
5748 redisLog(REDIS_WARNING
,"Read error sending DB to slave: %s",
5749 (buflen
== 0) ? "premature EOF" : strerror(errno
));
5753 if ((nwritten
= write(fd
,buf
,buflen
)) == -1) {
5754 redisLog(REDIS_DEBUG
,"Write error sending DB to slave: %s",
5759 slave
->repldboff
+= nwritten
;
5760 if (slave
->repldboff
== slave
->repldbsize
) {
5761 close(slave
->repldbfd
);
5762 slave
->repldbfd
= -1;
5763 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
5764 slave
->replstate
= REDIS_REPL_ONLINE
;
5765 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
,
5766 sendReplyToClient
, slave
) == AE_ERR
) {
5770 addReplySds(slave
,sdsempty());
5771 redisLog(REDIS_NOTICE
,"Synchronization with slave succeeded");
5775 /* This function is called at the end of every backgrond saving.
5776 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
5777 * otherwise REDIS_ERR is passed to the function.
5779 * The goal of this function is to handle slaves waiting for a successful
5780 * background saving in order to perform non-blocking synchronization. */
5781 static void updateSlavesWaitingBgsave(int bgsaveerr
) {
5783 int startbgsave
= 0;
5785 listRewind(server
.slaves
);
5786 while((ln
= listYield(server
.slaves
))) {
5787 redisClient
*slave
= ln
->value
;
5789 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) {
5791 slave
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
5792 } else if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) {
5793 struct redis_stat buf
;
5795 if (bgsaveerr
!= REDIS_OK
) {
5797 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE child returned an error");
5800 if ((slave
->repldbfd
= open(server
.dbfilename
,O_RDONLY
)) == -1 ||
5801 redis_fstat(slave
->repldbfd
,&buf
) == -1) {
5803 redisLog(REDIS_WARNING
,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno
));
5806 slave
->repldboff
= 0;
5807 slave
->repldbsize
= buf
.st_size
;
5808 slave
->replstate
= REDIS_REPL_SEND_BULK
;
5809 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
5810 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
, sendBulkToSlave
, slave
) == AE_ERR
) {
5817 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
5818 listRewind(server
.slaves
);
5819 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE failed");
5820 while((ln
= listYield(server
.slaves
))) {
5821 redisClient
*slave
= ln
->value
;
5823 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
)
5830 static int syncWithMaster(void) {
5831 char buf
[1024], tmpfile
[256], authcmd
[1024];
5833 int fd
= anetTcpConnect(NULL
,server
.masterhost
,server
.masterport
);
5837 redisLog(REDIS_WARNING
,"Unable to connect to MASTER: %s",
5842 /* AUTH with the master if required. */
5843 if(server
.masterauth
) {
5844 snprintf(authcmd
, 1024, "AUTH %s\r\n", server
.masterauth
);
5845 if (syncWrite(fd
, authcmd
, strlen(server
.masterauth
)+7, 5) == -1) {
5847 redisLog(REDIS_WARNING
,"Unable to AUTH to MASTER: %s",
5851 /* Read the AUTH result. */
5852 if (syncReadLine(fd
,buf
,1024,3600) == -1) {
5854 redisLog(REDIS_WARNING
,"I/O error reading auth result from MASTER: %s",
5858 if (buf
[0] != '+') {
5860 redisLog(REDIS_WARNING
,"Cannot AUTH to MASTER, is the masterauth password correct?");
5865 /* Issue the SYNC command */
5866 if (syncWrite(fd
,"SYNC \r\n",7,5) == -1) {
5868 redisLog(REDIS_WARNING
,"I/O error writing to MASTER: %s",
5872 /* Read the bulk write count */
5873 if (syncReadLine(fd
,buf
,1024,3600) == -1) {
5875 redisLog(REDIS_WARNING
,"I/O error reading bulk count from MASTER: %s",
5879 if (buf
[0] != '$') {
5881 redisLog(REDIS_WARNING
,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
5884 dumpsize
= atoi(buf
+1);
5885 redisLog(REDIS_NOTICE
,"Receiving %d bytes data dump from MASTER",dumpsize
);
5886 /* Read the bulk write data on a temp file */
5887 snprintf(tmpfile
,256,"temp-%d.%ld.rdb",(int)time(NULL
),(long int)random());
5888 dfd
= open(tmpfile
,O_CREAT
|O_WRONLY
,0644);
5891 redisLog(REDIS_WARNING
,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno
));
5895 int nread
, nwritten
;
5897 nread
= read(fd
,buf
,(dumpsize
< 1024)?dumpsize
:1024);
5899 redisLog(REDIS_WARNING
,"I/O error trying to sync with MASTER: %s",
5905 nwritten
= write(dfd
,buf
,nread
);
5906 if (nwritten
== -1) {
5907 redisLog(REDIS_WARNING
,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno
));
5915 if (rename(tmpfile
,server
.dbfilename
) == -1) {
5916 redisLog(REDIS_WARNING
,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno
));
5922 if (rdbLoad(server
.dbfilename
) != REDIS_OK
) {
5923 redisLog(REDIS_WARNING
,"Failed trying to load the MASTER synchronization DB from disk");
5927 server
.master
= createClient(fd
);
5928 server
.master
->flags
|= REDIS_MASTER
;
5929 server
.master
->authenticated
= 1;
5930 server
.replstate
= REDIS_REPL_CONNECTED
;
5934 static void slaveofCommand(redisClient
*c
) {
5935 if (!strcasecmp(c
->argv
[1]->ptr
,"no") &&
5936 !strcasecmp(c
->argv
[2]->ptr
,"one")) {
5937 if (server
.masterhost
) {
5938 sdsfree(server
.masterhost
);
5939 server
.masterhost
= NULL
;
5940 if (server
.master
) freeClient(server
.master
);
5941 server
.replstate
= REDIS_REPL_NONE
;
5942 redisLog(REDIS_NOTICE
,"MASTER MODE enabled (user request)");
5945 sdsfree(server
.masterhost
);
5946 server
.masterhost
= sdsdup(c
->argv
[1]->ptr
);
5947 server
.masterport
= atoi(c
->argv
[2]->ptr
);
5948 if (server
.master
) freeClient(server
.master
);
5949 server
.replstate
= REDIS_REPL_CONNECT
;
5950 redisLog(REDIS_NOTICE
,"SLAVE OF %s:%d enabled (user request)",
5951 server
.masterhost
, server
.masterport
);
5953 addReply(c
,shared
.ok
);
5956 /* ============================ Maxmemory directive ======================== */
5958 /* This function gets called when 'maxmemory' is set on the config file to limit
5959 * the max memory used by the server, and we are out of memory.
5960 * This function will try to, in order:
5962 * - Free objects from the free list
5963 * - Try to remove keys with an EXPIRE set
5965 * It is not possible to free enough memory to reach used-memory < maxmemory
5966 * the server will start refusing commands that will enlarge even more the
5969 static void freeMemoryIfNeeded(void) {
5970 while (server
.maxmemory
&& zmalloc_used_memory() > server
.maxmemory
) {
5971 if (listLength(server
.objfreelist
)) {
5974 listNode
*head
= listFirst(server
.objfreelist
);
5975 o
= listNodeValue(head
);
5976 listDelNode(server
.objfreelist
,head
);
5979 int j
, k
, freed
= 0;
5981 for (j
= 0; j
< server
.dbnum
; j
++) {
5983 robj
*minkey
= NULL
;
5984 struct dictEntry
*de
;
5986 if (dictSize(server
.db
[j
].expires
)) {
5988 /* From a sample of three keys drop the one nearest to
5989 * the natural expire */
5990 for (k
= 0; k
< 3; k
++) {
5993 de
= dictGetRandomKey(server
.db
[j
].expires
);
5994 t
= (time_t) dictGetEntryVal(de
);
5995 if (minttl
== -1 || t
< minttl
) {
5996 minkey
= dictGetEntryKey(de
);
6000 deleteKey(server
.db
+j
,minkey
);
6003 if (!freed
) return; /* nothing to free... */
6008 /* ============================== Append Only file ========================== */
6010 static void feedAppendOnlyFile(struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
) {
6011 sds buf
= sdsempty();
6017 /* The DB this command was targetting is not the same as the last command
6018 * we appendend. To issue a SELECT command is needed. */
6019 if (dictid
!= server
.appendseldb
) {
6022 snprintf(seldb
,sizeof(seldb
),"%d",dictid
);
6023 buf
= sdscatprintf(buf
,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
6024 (unsigned long)strlen(seldb
),seldb
);
6025 server
.appendseldb
= dictid
;
6028 /* "Fix" the argv vector if the command is EXPIRE. We want to translate
6029 * EXPIREs into EXPIREATs calls */
6030 if (cmd
->proc
== expireCommand
) {
6033 tmpargv
[0] = createStringObject("EXPIREAT",8);
6034 tmpargv
[1] = argv
[1];
6035 incrRefCount(argv
[1]);
6036 when
= time(NULL
)+strtol(argv
[2]->ptr
,NULL
,10);
6037 tmpargv
[2] = createObject(REDIS_STRING
,
6038 sdscatprintf(sdsempty(),"%ld",when
));
6042 /* Append the actual command */
6043 buf
= sdscatprintf(buf
,"*%d\r\n",argc
);
6044 for (j
= 0; j
< argc
; j
++) {
6047 o
= getDecodedObject(o
);
6048 buf
= sdscatprintf(buf
,"$%lu\r\n",(unsigned long)sdslen(o
->ptr
));
6049 buf
= sdscatlen(buf
,o
->ptr
,sdslen(o
->ptr
));
6050 buf
= sdscatlen(buf
,"\r\n",2);
6054 /* Free the objects from the modified argv for EXPIREAT */
6055 if (cmd
->proc
== expireCommand
) {
6056 for (j
= 0; j
< 3; j
++)
6057 decrRefCount(argv
[j
]);
6060 /* We want to perform a single write. This should be guaranteed atomic
6061 * at least if the filesystem we are writing is a real physical one.
6062 * While this will save us against the server being killed I don't think
6063 * there is much to do about the whole server stopping for power problems
6065 nwritten
= write(server
.appendfd
,buf
,sdslen(buf
));
6066 if (nwritten
!= (signed)sdslen(buf
)) {
6067 /* Ooops, we are in troubles. The best thing to do for now is
6068 * to simply exit instead to give the illusion that everything is
6069 * working as expected. */
6070 if (nwritten
== -1) {
6071 redisLog(REDIS_WARNING
,"Exiting on error writing to the append-only file: %s",strerror(errno
));
6073 redisLog(REDIS_WARNING
,"Exiting on short write while writing to the append-only file: %s",strerror(errno
));
6077 /* If a background append only file rewriting is in progress we want to
6078 * accumulate the differences between the child DB and the current one
6079 * in a buffer, so that when the child process will do its work we
6080 * can append the differences to the new append only file. */
6081 if (server
.bgrewritechildpid
!= -1)
6082 server
.bgrewritebuf
= sdscatlen(server
.bgrewritebuf
,buf
,sdslen(buf
));
6086 if (server
.appendfsync
== APPENDFSYNC_ALWAYS
||
6087 (server
.appendfsync
== APPENDFSYNC_EVERYSEC
&&
6088 now
-server
.lastfsync
> 1))
6090 fsync(server
.appendfd
); /* Let's try to get this data on the disk */
6091 server
.lastfsync
= now
;
6095 /* In Redis commands are always executed in the context of a client, so in
6096 * order to load the append only file we need to create a fake client. */
6097 static struct redisClient
*createFakeClient(void) {
6098 struct redisClient
*c
= zmalloc(sizeof(*c
));
6102 c
->querybuf
= sdsempty();
6106 /* We set the fake client as a slave waiting for the synchronization
6107 * so that Redis will not try to send replies to this client. */
6108 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_START
;
6109 c
->reply
= listCreate();
6110 listSetFreeMethod(c
->reply
,decrRefCount
);
6111 listSetDupMethod(c
->reply
,dupClientReplyValue
);
6115 static void freeFakeClient(struct redisClient
*c
) {
6116 sdsfree(c
->querybuf
);
6117 listRelease(c
->reply
);
6121 /* Replay the append log file. On error REDIS_OK is returned. On non fatal
6122 * error (the append only file is zero-length) REDIS_ERR is returned. On
6123 * fatal error an error message is logged and the program exists. */
6124 int loadAppendOnlyFile(char *filename
) {
6125 struct redisClient
*fakeClient
;
6126 FILE *fp
= fopen(filename
,"r");
6127 struct redis_stat sb
;
6129 if (redis_fstat(fileno(fp
),&sb
) != -1 && sb
.st_size
== 0)
6133 redisLog(REDIS_WARNING
,"Fatal error: can't open the append log file for reading: %s",strerror(errno
));
6137 fakeClient
= createFakeClient();
6144 struct redisCommand
*cmd
;
6146 if (fgets(buf
,sizeof(buf
),fp
) == NULL
) {
6152 if (buf
[0] != '*') goto fmterr
;
6154 argv
= zmalloc(sizeof(robj
*)*argc
);
6155 for (j
= 0; j
< argc
; j
++) {
6156 if (fgets(buf
,sizeof(buf
),fp
) == NULL
) goto readerr
;
6157 if (buf
[0] != '$') goto fmterr
;
6158 len
= strtol(buf
+1,NULL
,10);
6159 argsds
= sdsnewlen(NULL
,len
);
6160 if (len
&& fread(argsds
,len
,1,fp
) == 0) goto fmterr
;
6161 argv
[j
] = createObject(REDIS_STRING
,argsds
);
6162 if (fread(buf
,2,1,fp
) == 0) goto fmterr
; /* discard CRLF */
6165 /* Command lookup */
6166 cmd
= lookupCommand(argv
[0]->ptr
);
6168 redisLog(REDIS_WARNING
,"Unknown command '%s' reading the append only file", argv
[0]->ptr
);
6171 /* Try object sharing and encoding */
6172 if (server
.shareobjects
) {
6174 for(j
= 1; j
< argc
; j
++)
6175 argv
[j
] = tryObjectSharing(argv
[j
]);
6177 if (cmd
->flags
& REDIS_CMD_BULK
)
6178 tryObjectEncoding(argv
[argc
-1]);
6179 /* Run the command in the context of a fake client */
6180 fakeClient
->argc
= argc
;
6181 fakeClient
->argv
= argv
;
6182 cmd
->proc(fakeClient
);
6183 /* Discard the reply objects list from the fake client */
6184 while(listLength(fakeClient
->reply
))
6185 listDelNode(fakeClient
->reply
,listFirst(fakeClient
->reply
));
6186 /* Clean up, ready for the next command */
6187 for (j
= 0; j
< argc
; j
++) decrRefCount(argv
[j
]);
6191 freeFakeClient(fakeClient
);
6196 redisLog(REDIS_WARNING
,"Unexpected end of file reading the append only file");
6198 redisLog(REDIS_WARNING
,"Unrecoverable error reading the append only file: %s", strerror(errno
));
6202 redisLog(REDIS_WARNING
,"Bad file format reading the append only file");
6206 /* Write an object into a file in the bulk format $<count>\r\n<payload>\r\n */
6207 static int fwriteBulk(FILE *fp
, robj
*obj
) {
6209 obj
= getDecodedObject(obj
);
6210 snprintf(buf
,sizeof(buf
),"$%ld\r\n",(long)sdslen(obj
->ptr
));
6211 if (fwrite(buf
,strlen(buf
),1,fp
) == 0) goto err
;
6212 if (sdslen(obj
->ptr
) && fwrite(obj
->ptr
,sdslen(obj
->ptr
),1,fp
) == 0)
6214 if (fwrite("\r\n",2,1,fp
) == 0) goto err
;
6222 /* Write a double value in bulk format $<count>\r\n<payload>\r\n */
6223 static int fwriteBulkDouble(FILE *fp
, double d
) {
6224 char buf
[128], dbuf
[128];
6226 snprintf(dbuf
,sizeof(dbuf
),"%.17g\r\n",d
);
6227 snprintf(buf
,sizeof(buf
),"$%lu\r\n",(unsigned long)strlen(dbuf
)-2);
6228 if (fwrite(buf
,strlen(buf
),1,fp
) == 0) return 0;
6229 if (fwrite(dbuf
,strlen(dbuf
),1,fp
) == 0) return 0;
6233 /* Write a long value in bulk format $<count>\r\n<payload>\r\n */
6234 static int fwriteBulkLong(FILE *fp
, long l
) {
6235 char buf
[128], lbuf
[128];
6237 snprintf(lbuf
,sizeof(lbuf
),"%ld\r\n",l
);
6238 snprintf(buf
,sizeof(buf
),"$%lu\r\n",(unsigned long)strlen(lbuf
)-2);
6239 if (fwrite(buf
,strlen(buf
),1,fp
) == 0) return 0;
6240 if (fwrite(lbuf
,strlen(lbuf
),1,fp
) == 0) return 0;
6244 /* Write a sequence of commands able to fully rebuild the dataset into
6245 * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */
6246 static int rewriteAppendOnlyFile(char *filename
) {
6247 dictIterator
*di
= NULL
;
6252 time_t now
= time(NULL
);
6254 /* Note that we have to use a different temp name here compared to the
6255 * one used by rewriteAppendOnlyFileBackground() function. */
6256 snprintf(tmpfile
,256,"temp-rewriteaof-%d.aof", (int) getpid());
6257 fp
= fopen(tmpfile
,"w");
6259 redisLog(REDIS_WARNING
, "Failed rewriting the append only file: %s", strerror(errno
));
6262 for (j
= 0; j
< server
.dbnum
; j
++) {
6263 char selectcmd
[] = "*2\r\n$6\r\nSELECT\r\n";
6264 redisDb
*db
= server
.db
+j
;
6266 if (dictSize(d
) == 0) continue;
6267 di
= dictGetIterator(d
);
6273 /* SELECT the new DB */
6274 if (fwrite(selectcmd
,sizeof(selectcmd
)-1,1,fp
) == 0) goto werr
;
6275 if (fwriteBulkLong(fp
,j
) == 0) goto werr
;
6277 /* Iterate this DB writing every entry */
6278 while((de
= dictNext(di
)) != NULL
) {
6279 robj
*key
= dictGetEntryKey(de
);
6280 robj
*o
= dictGetEntryVal(de
);
6281 time_t expiretime
= getExpire(db
,key
);
6283 /* Save the key and associated value */
6284 if (o
->type
== REDIS_STRING
) {
6285 /* Emit a SET command */
6286 char cmd
[]="*3\r\n$3\r\nSET\r\n";
6287 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6289 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6290 if (fwriteBulk(fp
,o
) == 0) goto werr
;
6291 } else if (o
->type
== REDIS_LIST
) {
6292 /* Emit the RPUSHes needed to rebuild the list */
6293 list
*list
= o
->ptr
;
6297 while((ln
= listYield(list
))) {
6298 char cmd
[]="*3\r\n$5\r\nRPUSH\r\n";
6299 robj
*eleobj
= listNodeValue(ln
);
6301 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6302 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6303 if (fwriteBulk(fp
,eleobj
) == 0) goto werr
;
6305 } else if (o
->type
== REDIS_SET
) {
6306 /* Emit the SADDs needed to rebuild the set */
6308 dictIterator
*di
= dictGetIterator(set
);
6311 while((de
= dictNext(di
)) != NULL
) {
6312 char cmd
[]="*3\r\n$4\r\nSADD\r\n";
6313 robj
*eleobj
= dictGetEntryKey(de
);
6315 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6316 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6317 if (fwriteBulk(fp
,eleobj
) == 0) goto werr
;
6319 dictReleaseIterator(di
);
6320 } else if (o
->type
== REDIS_ZSET
) {
6321 /* Emit the ZADDs needed to rebuild the sorted set */
6323 dictIterator
*di
= dictGetIterator(zs
->dict
);
6326 while((de
= dictNext(di
)) != NULL
) {
6327 char cmd
[]="*4\r\n$4\r\nZADD\r\n";
6328 robj
*eleobj
= dictGetEntryKey(de
);
6329 double *score
= dictGetEntryVal(de
);
6331 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6332 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6333 if (fwriteBulkDouble(fp
,*score
) == 0) goto werr
;
6334 if (fwriteBulk(fp
,eleobj
) == 0) goto werr
;
6336 dictReleaseIterator(di
);
6338 redisAssert(0 != 0);
6340 /* Save the expire time */
6341 if (expiretime
!= -1) {
6342 char cmd
[]="*3\r\n$8\r\nEXPIREAT\r\n";
6343 /* If this key is already expired skip it */
6344 if (expiretime
< now
) continue;
6345 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6346 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6347 if (fwriteBulkLong(fp
,expiretime
) == 0) goto werr
;
6350 dictReleaseIterator(di
);
6353 /* Make sure data will not remain on the OS's output buffers */
6358 /* Use RENAME to make sure the DB file is changed atomically only
6359 * if the generate DB file is ok. */
6360 if (rename(tmpfile
,filename
) == -1) {
6361 redisLog(REDIS_WARNING
,"Error moving temp append only file on the final destination: %s", strerror(errno
));
6365 redisLog(REDIS_NOTICE
,"SYNC append only file rewrite performed");
6371 redisLog(REDIS_WARNING
,"Write error writing append only file on disk: %s", strerror(errno
));
6372 if (di
) dictReleaseIterator(di
);
6376 /* This is how rewriting of the append only file in background works:
6378 * 1) The user calls BGREWRITEAOF
6379 * 2) Redis calls this function, that forks():
6380 * 2a) the child rewrite the append only file in a temp file.
6381 * 2b) the parent accumulates differences in server.bgrewritebuf.
6382 * 3) When the child finished '2a' exists.
6383 * 4) The parent will trap the exit code, if it's OK, will append the
6384 * data accumulated into server.bgrewritebuf into the temp file, and
6385 * finally will rename(2) the temp file in the actual file name.
6386 * The the new file is reopened as the new append only file. Profit!
6388 static int rewriteAppendOnlyFileBackground(void) {
6391 if (server
.bgrewritechildpid
!= -1) return REDIS_ERR
;
6392 if ((childpid
= fork()) == 0) {
6397 snprintf(tmpfile
,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
6398 if (rewriteAppendOnlyFile(tmpfile
) == REDIS_OK
) {
6405 if (childpid
== -1) {
6406 redisLog(REDIS_WARNING
,
6407 "Can't rewrite append only file in background: fork: %s",
6411 redisLog(REDIS_NOTICE
,
6412 "Background append only file rewriting started by pid %d",childpid
);
6413 server
.bgrewritechildpid
= childpid
;
6414 /* We set appendseldb to -1 in order to force the next call to the
6415 * feedAppendOnlyFile() to issue a SELECT command, so the differences
6416 * accumulated by the parent into server.bgrewritebuf will start
6417 * with a SELECT statement and it will be safe to merge. */
6418 server
.appendseldb
= -1;
6421 return REDIS_OK
; /* unreached */
6424 static void bgrewriteaofCommand(redisClient
*c
) {
6425 if (server
.bgrewritechildpid
!= -1) {
6426 addReplySds(c
,sdsnew("-ERR background append only file rewriting already in progress\r\n"));
6429 if (rewriteAppendOnlyFileBackground() == REDIS_OK
) {
6430 char *status
= "+Background append only file rewriting started\r\n";
6431 addReplySds(c
,sdsnew(status
));
6433 addReply(c
,shared
.err
);
6437 static void aofRemoveTempFile(pid_t childpid
) {
6440 snprintf(tmpfile
,256,"temp-rewriteaof-bg-%d.aof", (int) childpid
);
6444 /* ================================= Debugging ============================== */
6446 static void debugCommand(redisClient
*c
) {
6447 if (!strcasecmp(c
->argv
[1]->ptr
,"segfault")) {
6449 } else if (!strcasecmp(c
->argv
[1]->ptr
,"reload")) {
6450 if (rdbSave(server
.dbfilename
) != REDIS_OK
) {
6451 addReply(c
,shared
.err
);
6455 if (rdbLoad(server
.dbfilename
) != REDIS_OK
) {
6456 addReply(c
,shared
.err
);
6459 redisLog(REDIS_WARNING
,"DB reloaded by DEBUG RELOAD");
6460 addReply(c
,shared
.ok
);
6461 } else if (!strcasecmp(c
->argv
[1]->ptr
,"loadaof")) {
6463 if (loadAppendOnlyFile(server
.appendfilename
) != REDIS_OK
) {
6464 addReply(c
,shared
.err
);
6467 redisLog(REDIS_WARNING
,"Append Only File loaded by DEBUG LOADAOF");
6468 addReply(c
,shared
.ok
);
6469 } else if (!strcasecmp(c
->argv
[1]->ptr
,"object") && c
->argc
== 3) {
6470 dictEntry
*de
= dictFind(c
->db
->dict
,c
->argv
[2]);
6474 addReply(c
,shared
.nokeyerr
);
6477 key
= dictGetEntryKey(de
);
6478 val
= dictGetEntryVal(de
);
6479 addReplySds(c
,sdscatprintf(sdsempty(),
6480 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
6481 (void*)key
, key
->refcount
, (void*)val
, val
->refcount
,
6484 addReplySds(c
,sdsnew(
6485 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>|RELOAD]\r\n"));
6489 static void _redisAssert(char *estr
) {
6490 redisLog(REDIS_WARNING
,"=== ASSERTION FAILED ===");
6491 redisLog(REDIS_WARNING
,"==> %s\n",estr
);
6492 #ifdef HAVE_BACKTRACE
6493 redisLog(REDIS_WARNING
,"(forcing SIGSEGV in order to print the stack trace)");
6498 /* =================================== Main! ================================ */
6501 int linuxOvercommitMemoryValue(void) {
6502 FILE *fp
= fopen("/proc/sys/vm/overcommit_memory","r");
6506 if (fgets(buf
,64,fp
) == NULL
) {
6515 void linuxOvercommitMemoryWarning(void) {
6516 if (linuxOvercommitMemoryValue() == 0) {
6517 redisLog(REDIS_WARNING
,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
6520 #endif /* __linux__ */
6522 static void daemonize(void) {
6526 if (fork() != 0) exit(0); /* parent exits */
6527 printf("New pid: %d\n", getpid());
6528 setsid(); /* create a new session */
6530 /* Every output goes to /dev/null. If Redis is daemonized but
6531 * the 'logfile' is set to 'stdout' in the configuration file
6532 * it will not log at all. */
6533 if ((fd
= open("/dev/null", O_RDWR
, 0)) != -1) {
6534 dup2(fd
, STDIN_FILENO
);
6535 dup2(fd
, STDOUT_FILENO
);
6536 dup2(fd
, STDERR_FILENO
);
6537 if (fd
> STDERR_FILENO
) close(fd
);
6539 /* Try to write the pid file */
6540 fp
= fopen(server
.pidfile
,"w");
6542 fprintf(fp
,"%d\n",getpid());
6547 int main(int argc
, char **argv
) {
6550 resetServerSaveParams();
6551 loadServerConfig(argv
[1]);
6552 } else if (argc
> 2) {
6553 fprintf(stderr
,"Usage: ./redis-server [/path/to/redis.conf]\n");
6556 redisLog(REDIS_WARNING
,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
6558 if (server
.daemonize
) daemonize();
6560 redisLog(REDIS_NOTICE
,"Server started, Redis version " REDIS_VERSION
);
6562 linuxOvercommitMemoryWarning();
6564 if (server
.appendonly
) {
6565 if (loadAppendOnlyFile(server
.appendfilename
) == REDIS_OK
)
6566 redisLog(REDIS_NOTICE
,"DB loaded from append only file");
6568 if (rdbLoad(server
.dbfilename
) == REDIS_OK
)
6569 redisLog(REDIS_NOTICE
,"DB loaded from disk");
6571 if (aeCreateFileEvent(server
.el
, server
.fd
, AE_READABLE
,
6572 acceptHandler
, NULL
) == AE_ERR
) oom("creating file event");
6573 redisLog(REDIS_NOTICE
,"The server is now ready to accept connections on port %d", server
.port
);
6575 aeDeleteEventLoop(server
.el
);
6579 /* ============================= Backtrace support ========================= */
6581 #ifdef HAVE_BACKTRACE
6582 static char *findFuncName(void *pointer
, unsigned long *offset
);
6584 static void *getMcontextEip(ucontext_t
*uc
) {
6585 #if defined(__FreeBSD__)
6586 return (void*) uc
->uc_mcontext
.mc_eip
;
6587 #elif defined(__dietlibc__)
6588 return (void*) uc
->uc_mcontext
.eip
;
6589 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
6591 return (void*) uc
->uc_mcontext
->__ss
.__rip
;
6593 return (void*) uc
->uc_mcontext
->__ss
.__eip
;
6595 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
6596 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
6597 return (void*) uc
->uc_mcontext
->__ss
.__rip
;
6599 return (void*) uc
->uc_mcontext
->__ss
.__eip
;
6601 #elif defined(__i386__) || defined(__X86_64__) || defined(__x86_64__)
6602 return (void*) uc
->uc_mcontext
.gregs
[REG_EIP
]; /* Linux 32/64 bit */
6603 #elif defined(__ia64__) /* Linux IA64 */
6604 return (void*) uc
->uc_mcontext
.sc_ip
;
6610 static void segvHandler(int sig
, siginfo_t
*info
, void *secret
) {
6612 char **messages
= NULL
;
6613 int i
, trace_size
= 0;
6614 unsigned long offset
=0;
6615 ucontext_t
*uc
= (ucontext_t
*) secret
;
6617 REDIS_NOTUSED(info
);
6619 redisLog(REDIS_WARNING
,
6620 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION
, sig
);
6621 infostring
= genRedisInfoString();
6622 redisLog(REDIS_WARNING
, "%s",infostring
);
6623 /* It's not safe to sdsfree() the returned string under memory
6624 * corruption conditions. Let it leak as we are going to abort */
6626 trace_size
= backtrace(trace
, 100);
6627 /* overwrite sigaction with caller's address */
6628 if (getMcontextEip(uc
) != NULL
) {
6629 trace
[1] = getMcontextEip(uc
);
6631 messages
= backtrace_symbols(trace
, trace_size
);
6633 for (i
=1; i
<trace_size
; ++i
) {
6634 char *fn
= findFuncName(trace
[i
], &offset
), *p
;
6636 p
= strchr(messages
[i
],'+');
6637 if (!fn
|| (p
&& ((unsigned long)strtol(p
+1,NULL
,10)) < offset
)) {
6638 redisLog(REDIS_WARNING
,"%s", messages
[i
]);
6640 redisLog(REDIS_WARNING
,"%d redis-server %p %s + %d", i
, trace
[i
], fn
, (unsigned int)offset
);
6643 // free(messages); Don't call free() with possibly corrupted memory.
6647 static void setupSigSegvAction(void) {
6648 struct sigaction act
;
6650 sigemptyset (&act
.sa_mask
);
6651 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
6652 * is used. Otherwise, sa_handler is used */
6653 act
.sa_flags
= SA_NODEFER
| SA_ONSTACK
| SA_RESETHAND
| SA_SIGINFO
;
6654 act
.sa_sigaction
= segvHandler
;
6655 sigaction (SIGSEGV
, &act
, NULL
);
6656 sigaction (SIGBUS
, &act
, NULL
);
6657 sigaction (SIGFPE
, &act
, NULL
);
6658 sigaction (SIGILL
, &act
, NULL
);
6659 sigaction (SIGBUS
, &act
, NULL
);
6663 #include "staticsymbols.h"
6664 /* This function try to convert a pointer into a function name. It's used in
6665 * oreder to provide a backtrace under segmentation fault that's able to
6666 * display functions declared as static (otherwise the backtrace is useless). */
6667 static char *findFuncName(void *pointer
, unsigned long *offset
){
6669 unsigned long off
, minoff
= 0;
6671 /* Try to match against the Symbol with the smallest offset */
6672 for (i
=0; symsTable
[i
].pointer
; i
++) {
6673 unsigned long lp
= (unsigned long) pointer
;
6675 if (lp
!= (unsigned long)-1 && lp
>= symsTable
[i
].pointer
) {
6676 off
=lp
-symsTable
[i
].pointer
;
6677 if (ret
< 0 || off
< minoff
) {
6683 if (ret
== -1) return NULL
;
6685 return symsTable
[ret
].name
;
6687 #else /* HAVE_BACKTRACE */
6688 static void setupSigSegvAction(void) {
6690 #endif /* HAVE_BACKTRACE */