2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
30 #define REDIS_VERSION "1.3.2"
40 #define __USE_POSIX199309
46 #endif /* HAVE_BACKTRACE */
54 #include <arpa/inet.h>
58 #include <sys/resource.h>
64 #include "solarisfixes.h"
68 #include "ae.h" /* Event driven programming library */
69 #include "sds.h" /* Dynamic safe strings */
70 #include "anet.h" /* Networking the easy way */
71 #include "dict.h" /* Hash tables */
72 #include "adlist.h" /* Linked lists */
73 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
74 #include "lzf.h" /* LZF compression library */
75 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
81 /* Static server configuration */
82 #define REDIS_SERVERPORT 6379 /* TCP port */
83 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
84 #define REDIS_IOBUF_LEN 1024
85 #define REDIS_LOADBUF_LEN 1024
86 #define REDIS_STATIC_ARGS 4
87 #define REDIS_DEFAULT_DBNUM 16
88 #define REDIS_CONFIGLINE_MAX 1024
89 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
90 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
91 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
92 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
93 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
95 /* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */
96 #define REDIS_WRITEV_THRESHOLD 3
97 /* Max number of iovecs used for each writev call */
98 #define REDIS_WRITEV_IOVEC_COUNT 256
100 /* Hash table parameters */
101 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
104 #define REDIS_CMD_BULK 1 /* Bulk write command */
105 #define REDIS_CMD_INLINE 2 /* Inline command */
106 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
107 this flags will return an error when the 'maxmemory' option is set in the
108 config file and the server is using more than maxmemory bytes of memory.
109 In short this commands are denied on low memory conditions. */
110 #define REDIS_CMD_DENYOOM 4
113 #define REDIS_STRING 0
119 /* Objects encoding */
120 #define REDIS_ENCODING_RAW 0 /* Raw representation */
121 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
123 /* Object types only used for dumping to disk */
124 #define REDIS_EXPIRETIME 253
125 #define REDIS_SELECTDB 254
126 #define REDIS_EOF 255
128 /* Defines related to the dump file format. To store 32 bits lengths for short
129 * keys requires a lot of space, so we check the most significant 2 bits of
130 * the first byte to interpreter the length:
132 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
133 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
134 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
135 * 11|000000 this means: specially encoded object will follow. The six bits
136 * number specify the kind of object that follows.
137 * See the REDIS_RDB_ENC_* defines.
139 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
140 * values, will fit inside. */
141 #define REDIS_RDB_6BITLEN 0
142 #define REDIS_RDB_14BITLEN 1
143 #define REDIS_RDB_32BITLEN 2
144 #define REDIS_RDB_ENCVAL 3
145 #define REDIS_RDB_LENERR UINT_MAX
147 /* When a length of a string object stored on disk has the first two bits
148 * set, the remaining two bits specify a special encoding for the object
149 * accordingly to the following defines: */
150 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
151 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
152 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
153 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
156 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
157 #define REDIS_SLAVE 2 /* This client is a slave server */
158 #define REDIS_MASTER 4 /* This client is a master server */
159 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
160 #define REDIS_MULTI 16 /* This client is in a MULTI context */
161 #define REDIS_BLOCKED 32 /* The client is waiting in a blocking operation */
163 /* Slave replication state - slave side */
164 #define REDIS_REPL_NONE 0 /* No active replication */
165 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
166 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
168 /* Slave replication state - from the point of view of master
169 * Note that in SEND_BULK and ONLINE state the slave receives new updates
170 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
171 * to start the next background saving in order to send updates to it. */
172 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
173 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
174 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
175 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
177 /* List related stuff */
181 /* Sort operations */
182 #define REDIS_SORT_GET 0
183 #define REDIS_SORT_ASC 1
184 #define REDIS_SORT_DESC 2
185 #define REDIS_SORTKEY_MAX 1024
188 #define REDIS_DEBUG 0
189 #define REDIS_NOTICE 1
190 #define REDIS_WARNING 2
192 /* Anti-warning macro... */
193 #define REDIS_NOTUSED(V) ((void) V)
195 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
196 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
198 /* Append only defines */
199 #define APPENDFSYNC_NO 0
200 #define APPENDFSYNC_ALWAYS 1
201 #define APPENDFSYNC_EVERYSEC 2
203 /* We can print the stacktrace, so our assert is defined this way: */
204 #define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e),exit(1)))
205 static void _redisAssert(char *estr
);
207 /*================================= Data types ============================== */
209 /* A redis object, that is a type able to hold a string / list / set */
210 typedef struct redisObject
{
213 unsigned char encoding
;
214 unsigned char notused
[2];
218 /* Macro used to initalize a Redis object allocated on the stack.
219 * Note that this macro is taken near the structure definition to make sure
220 * we'll update it when the structure is changed, to avoid bugs like
221 * bug #85 introduced exactly in this way. */
222 #define initStaticStringObject(_var,_ptr) do { \
224 _var.type = REDIS_STRING; \
225 _var.encoding = REDIS_ENCODING_RAW; \
229 typedef struct redisDb
{
230 dict
*dict
; /* The keyspace for this DB */
231 dict
*expires
; /* Timeout of keys with a timeout set */
232 dict
*blockingkeys
; /* Keys with clients waiting for data (BLPOP) */
236 /* Client MULTI/EXEC state */
237 typedef struct multiCmd
{
240 struct redisCommand
*cmd
;
243 typedef struct multiState
{
244 multiCmd
*commands
; /* Array of MULTI commands */
245 int count
; /* Total number of MULTI commands */
248 /* With multiplexing we need to take per-clinet state.
249 * Clients are taken in a liked list. */
250 typedef struct redisClient
{
255 robj
**argv
, **mbargv
;
257 int bulklen
; /* bulk read len. -1 if not in bulk read mode */
258 int multibulk
; /* multi bulk command format active */
261 time_t lastinteraction
; /* time of the last interaction, used for timeout */
262 int flags
; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
264 int slaveseldb
; /* slave selected db, if this client is a slave */
265 int authenticated
; /* when requirepass is non-NULL */
266 int replstate
; /* replication state if this is a slave */
267 int repldbfd
; /* replication DB file descriptor */
268 long repldboff
; /* replication DB file offset */
269 off_t repldbsize
; /* replication DB file size */
270 multiState mstate
; /* MULTI/EXEC state */
271 robj
**blockingkeys
; /* The key we waiting to terminate a blocking
272 * operation such as BLPOP. Otherwise NULL. */
273 int blockingkeysnum
; /* Number of blocking keys */
274 time_t blockingto
; /* Blocking operation timeout. If UNIX current time
275 * is >= blockingto then the operation timed out. */
283 /* Global server state structure */
288 dict
*sharingpool
; /* Poll used for object sharing */
289 unsigned int sharingpoolsize
;
290 long long dirty
; /* changes to DB from the last save */
292 list
*slaves
, *monitors
;
293 char neterr
[ANET_ERR_LEN
];
295 int cronloops
; /* number of times the cron function run */
296 list
*objfreelist
; /* A list of freed objects to avoid malloc() */
297 time_t lastsave
; /* Unix time of last save succeeede */
298 size_t usedmemory
; /* Used memory in megabytes */
299 /* Fields used only for stats */
300 time_t stat_starttime
; /* server start time */
301 long long stat_numcommands
; /* number of processed commands */
302 long long stat_numconnections
; /* number of connections received */
315 pid_t bgsavechildpid
;
316 pid_t bgrewritechildpid
;
317 sds bgrewritebuf
; /* buffer taken by parent during oppend only rewrite */
318 struct saveparam
*saveparams
;
323 char *appendfilename
;
327 /* Replication related */
332 redisClient
*master
; /* client that is master for this slave */
334 unsigned int maxclients
;
335 unsigned long maxmemory
;
336 unsigned int blockedclients
;
337 /* Sort parameters - qsort_r() is only available under BSD so we
338 * have to take this state global, in order to pass it to sortCompare() */
344 typedef void redisCommandProc(redisClient
*c
);
345 struct redisCommand
{
347 redisCommandProc
*proc
;
352 struct redisFunctionSym
{
354 unsigned long pointer
;
357 typedef struct _redisSortObject
{
365 typedef struct _redisSortOperation
{
368 } redisSortOperation
;
370 /* ZSETs use a specialized version of Skiplists */
372 typedef struct zskiplistNode
{
373 struct zskiplistNode
**forward
;
374 struct zskiplistNode
*backward
;
379 typedef struct zskiplist
{
380 struct zskiplistNode
*header
, *tail
;
381 unsigned long length
;
385 typedef struct zset
{
390 /* Our shared "common" objects */
392 struct sharedObjectsStruct
{
393 robj
*crlf
, *ok
, *err
, *emptybulk
, *czero
, *cone
, *pong
, *space
,
394 *colon
, *nullbulk
, *nullmultibulk
, *queued
,
395 *emptymultibulk
, *wrongtypeerr
, *nokeyerr
, *syntaxerr
, *sameobjecterr
,
396 *outofrangeerr
, *plus
,
397 *select0
, *select1
, *select2
, *select3
, *select4
,
398 *select5
, *select6
, *select7
, *select8
, *select9
;
401 /* Global vars that are actally used as constants. The following double
402 * values are used for double on-disk serialization, and are initialized
403 * at runtime to avoid strange compiler optimizations. */
405 static double R_Zero
, R_PosInf
, R_NegInf
, R_Nan
;
407 /*================================ Prototypes =============================== */
409 static void freeStringObject(robj
*o
);
410 static void freeListObject(robj
*o
);
411 static void freeSetObject(robj
*o
);
412 static void decrRefCount(void *o
);
413 static robj
*createObject(int type
, void *ptr
);
414 static void freeClient(redisClient
*c
);
415 static int rdbLoad(char *filename
);
416 static void addReply(redisClient
*c
, robj
*obj
);
417 static void addReplySds(redisClient
*c
, sds s
);
418 static void incrRefCount(robj
*o
);
419 static int rdbSaveBackground(char *filename
);
420 static robj
*createStringObject(char *ptr
, size_t len
);
421 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
);
422 static void feedAppendOnlyFile(struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
);
423 static int syncWithMaster(void);
424 static robj
*tryObjectSharing(robj
*o
);
425 static int tryObjectEncoding(robj
*o
);
426 static robj
*getDecodedObject(robj
*o
);
427 static int removeExpire(redisDb
*db
, robj
*key
);
428 static int expireIfNeeded(redisDb
*db
, robj
*key
);
429 static int deleteIfVolatile(redisDb
*db
, robj
*key
);
430 static int deleteKey(redisDb
*db
, robj
*key
);
431 static time_t getExpire(redisDb
*db
, robj
*key
);
432 static int setExpire(redisDb
*db
, robj
*key
, time_t when
);
433 static void updateSlavesWaitingBgsave(int bgsaveerr
);
434 static void freeMemoryIfNeeded(void);
435 static int processCommand(redisClient
*c
);
436 static void setupSigSegvAction(void);
437 static void rdbRemoveTempFile(pid_t childpid
);
438 static void aofRemoveTempFile(pid_t childpid
);
439 static size_t stringObjectLen(robj
*o
);
440 static void processInputBuffer(redisClient
*c
);
441 static zskiplist
*zslCreate(void);
442 static void zslFree(zskiplist
*zsl
);
443 static void zslInsert(zskiplist
*zsl
, double score
, robj
*obj
);
444 static void sendReplyToClientWritev(aeEventLoop
*el
, int fd
, void *privdata
, int mask
);
445 static void initClientMultiState(redisClient
*c
);
446 static void freeClientMultiState(redisClient
*c
);
447 static void queueMultiCommand(redisClient
*c
, struct redisCommand
*cmd
);
448 static void unblockClient(redisClient
*c
);
449 static int handleClientsWaitingListPush(redisClient
*c
, robj
*key
, robj
*ele
);
451 static void authCommand(redisClient
*c
);
452 static void pingCommand(redisClient
*c
);
453 static void echoCommand(redisClient
*c
);
454 static void setCommand(redisClient
*c
);
455 static void setnxCommand(redisClient
*c
);
456 static void getCommand(redisClient
*c
);
457 static void delCommand(redisClient
*c
);
458 static void existsCommand(redisClient
*c
);
459 static void incrCommand(redisClient
*c
);
460 static void decrCommand(redisClient
*c
);
461 static void incrbyCommand(redisClient
*c
);
462 static void decrbyCommand(redisClient
*c
);
463 static void selectCommand(redisClient
*c
);
464 static void randomkeyCommand(redisClient
*c
);
465 static void keysCommand(redisClient
*c
);
466 static void dbsizeCommand(redisClient
*c
);
467 static void lastsaveCommand(redisClient
*c
);
468 static void saveCommand(redisClient
*c
);
469 static void bgsaveCommand(redisClient
*c
);
470 static void bgrewriteaofCommand(redisClient
*c
);
471 static void shutdownCommand(redisClient
*c
);
472 static void moveCommand(redisClient
*c
);
473 static void renameCommand(redisClient
*c
);
474 static void renamenxCommand(redisClient
*c
);
475 static void lpushCommand(redisClient
*c
);
476 static void rpushCommand(redisClient
*c
);
477 static void lpopCommand(redisClient
*c
);
478 static void rpopCommand(redisClient
*c
);
479 static void llenCommand(redisClient
*c
);
480 static void lindexCommand(redisClient
*c
);
481 static void lrangeCommand(redisClient
*c
);
482 static void ltrimCommand(redisClient
*c
);
483 static void typeCommand(redisClient
*c
);
484 static void lsetCommand(redisClient
*c
);
485 static void saddCommand(redisClient
*c
);
486 static void sremCommand(redisClient
*c
);
487 static void smoveCommand(redisClient
*c
);
488 static void sismemberCommand(redisClient
*c
);
489 static void scardCommand(redisClient
*c
);
490 static void spopCommand(redisClient
*c
);
491 static void srandmemberCommand(redisClient
*c
);
492 static void sinterCommand(redisClient
*c
);
493 static void sinterstoreCommand(redisClient
*c
);
494 static void sunionCommand(redisClient
*c
);
495 static void sunionstoreCommand(redisClient
*c
);
496 static void sdiffCommand(redisClient
*c
);
497 static void sdiffstoreCommand(redisClient
*c
);
498 static void syncCommand(redisClient
*c
);
499 static void flushdbCommand(redisClient
*c
);
500 static void flushallCommand(redisClient
*c
);
501 static void sortCommand(redisClient
*c
);
502 static void lremCommand(redisClient
*c
);
503 static void rpoplpushcommand(redisClient
*c
);
504 static void infoCommand(redisClient
*c
);
505 static void mgetCommand(redisClient
*c
);
506 static void monitorCommand(redisClient
*c
);
507 static void expireCommand(redisClient
*c
);
508 static void expireatCommand(redisClient
*c
);
509 static void getsetCommand(redisClient
*c
);
510 static void ttlCommand(redisClient
*c
);
511 static void slaveofCommand(redisClient
*c
);
512 static void debugCommand(redisClient
*c
);
513 static void msetCommand(redisClient
*c
);
514 static void msetnxCommand(redisClient
*c
);
515 static void zaddCommand(redisClient
*c
);
516 static void zincrbyCommand(redisClient
*c
);
517 static void zrangeCommand(redisClient
*c
);
518 static void zrangebyscoreCommand(redisClient
*c
);
519 static void zrevrangeCommand(redisClient
*c
);
520 static void zcardCommand(redisClient
*c
);
521 static void zremCommand(redisClient
*c
);
522 static void zscoreCommand(redisClient
*c
);
523 static void zremrangebyscoreCommand(redisClient
*c
);
524 static void multiCommand(redisClient
*c
);
525 static void execCommand(redisClient
*c
);
526 static void blpopCommand(redisClient
*c
);
527 static void brpopCommand(redisClient
*c
);
529 /*================================= Globals ================================= */
532 static struct redisServer server
; /* server global state */
533 static struct redisCommand cmdTable
[] = {
534 {"get",getCommand
,2,REDIS_CMD_INLINE
},
535 {"set",setCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
536 {"setnx",setnxCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
537 {"del",delCommand
,-2,REDIS_CMD_INLINE
},
538 {"exists",existsCommand
,2,REDIS_CMD_INLINE
},
539 {"incr",incrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
540 {"decr",decrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
541 {"mget",mgetCommand
,-2,REDIS_CMD_INLINE
},
542 {"rpush",rpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
543 {"lpush",lpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
544 {"rpop",rpopCommand
,2,REDIS_CMD_INLINE
},
545 {"lpop",lpopCommand
,2,REDIS_CMD_INLINE
},
546 {"brpop",brpopCommand
,-3,REDIS_CMD_INLINE
},
547 {"blpop",blpopCommand
,-3,REDIS_CMD_INLINE
},
548 {"llen",llenCommand
,2,REDIS_CMD_INLINE
},
549 {"lindex",lindexCommand
,3,REDIS_CMD_INLINE
},
550 {"lset",lsetCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
551 {"lrange",lrangeCommand
,4,REDIS_CMD_INLINE
},
552 {"ltrim",ltrimCommand
,4,REDIS_CMD_INLINE
},
553 {"lrem",lremCommand
,4,REDIS_CMD_BULK
},
554 {"rpoplpush",rpoplpushcommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
555 {"sadd",saddCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
556 {"srem",sremCommand
,3,REDIS_CMD_BULK
},
557 {"smove",smoveCommand
,4,REDIS_CMD_BULK
},
558 {"sismember",sismemberCommand
,3,REDIS_CMD_BULK
},
559 {"scard",scardCommand
,2,REDIS_CMD_INLINE
},
560 {"spop",spopCommand
,2,REDIS_CMD_INLINE
},
561 {"srandmember",srandmemberCommand
,2,REDIS_CMD_INLINE
},
562 {"sinter",sinterCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
563 {"sinterstore",sinterstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
564 {"sunion",sunionCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
565 {"sunionstore",sunionstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
566 {"sdiff",sdiffCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
567 {"sdiffstore",sdiffstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
568 {"smembers",sinterCommand
,2,REDIS_CMD_INLINE
},
569 {"zadd",zaddCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
570 {"zincrby",zincrbyCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
571 {"zrem",zremCommand
,3,REDIS_CMD_BULK
},
572 {"zremrangebyscore",zremrangebyscoreCommand
,4,REDIS_CMD_INLINE
},
573 {"zrange",zrangeCommand
,-4,REDIS_CMD_INLINE
},
574 {"zrangebyscore",zrangebyscoreCommand
,-4,REDIS_CMD_INLINE
},
575 {"zrevrange",zrevrangeCommand
,-4,REDIS_CMD_INLINE
},
576 {"zcard",zcardCommand
,2,REDIS_CMD_INLINE
},
577 {"zscore",zscoreCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
578 {"incrby",incrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
579 {"decrby",decrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
580 {"getset",getsetCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
581 {"mset",msetCommand
,-3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
582 {"msetnx",msetnxCommand
,-3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
583 {"randomkey",randomkeyCommand
,1,REDIS_CMD_INLINE
},
584 {"select",selectCommand
,2,REDIS_CMD_INLINE
},
585 {"move",moveCommand
,3,REDIS_CMD_INLINE
},
586 {"rename",renameCommand
,3,REDIS_CMD_INLINE
},
587 {"renamenx",renamenxCommand
,3,REDIS_CMD_INLINE
},
588 {"expire",expireCommand
,3,REDIS_CMD_INLINE
},
589 {"expireat",expireatCommand
,3,REDIS_CMD_INLINE
},
590 {"keys",keysCommand
,2,REDIS_CMD_INLINE
},
591 {"dbsize",dbsizeCommand
,1,REDIS_CMD_INLINE
},
592 {"auth",authCommand
,2,REDIS_CMD_INLINE
},
593 {"ping",pingCommand
,1,REDIS_CMD_INLINE
},
594 {"echo",echoCommand
,2,REDIS_CMD_BULK
},
595 {"save",saveCommand
,1,REDIS_CMD_INLINE
},
596 {"bgsave",bgsaveCommand
,1,REDIS_CMD_INLINE
},
597 {"bgrewriteaof",bgrewriteaofCommand
,1,REDIS_CMD_INLINE
},
598 {"shutdown",shutdownCommand
,1,REDIS_CMD_INLINE
},
599 {"lastsave",lastsaveCommand
,1,REDIS_CMD_INLINE
},
600 {"type",typeCommand
,2,REDIS_CMD_INLINE
},
601 {"multi",multiCommand
,1,REDIS_CMD_INLINE
},
602 {"exec",execCommand
,1,REDIS_CMD_INLINE
},
603 {"sync",syncCommand
,1,REDIS_CMD_INLINE
},
604 {"flushdb",flushdbCommand
,1,REDIS_CMD_INLINE
},
605 {"flushall",flushallCommand
,1,REDIS_CMD_INLINE
},
606 {"sort",sortCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
607 {"info",infoCommand
,1,REDIS_CMD_INLINE
},
608 {"monitor",monitorCommand
,1,REDIS_CMD_INLINE
},
609 {"ttl",ttlCommand
,2,REDIS_CMD_INLINE
},
610 {"slaveof",slaveofCommand
,3,REDIS_CMD_INLINE
},
611 {"debug",debugCommand
,-2,REDIS_CMD_INLINE
},
615 /*============================ Utility functions ============================ */
617 /* Glob-style pattern matching. */
618 int stringmatchlen(const char *pattern
, int patternLen
,
619 const char *string
, int stringLen
, int nocase
)
624 while (pattern
[1] == '*') {
629 return 1; /* match */
631 if (stringmatchlen(pattern
+1, patternLen
-1,
632 string
, stringLen
, nocase
))
633 return 1; /* match */
637 return 0; /* no match */
641 return 0; /* no match */
651 not = pattern
[0] == '^';
658 if (pattern
[0] == '\\') {
661 if (pattern
[0] == string
[0])
663 } else if (pattern
[0] == ']') {
665 } else if (patternLen
== 0) {
669 } else if (pattern
[1] == '-' && patternLen
>= 3) {
670 int start
= pattern
[0];
671 int end
= pattern
[2];
679 start
= tolower(start
);
685 if (c
>= start
&& c
<= end
)
689 if (pattern
[0] == string
[0])
692 if (tolower((int)pattern
[0]) == tolower((int)string
[0]))
702 return 0; /* no match */
708 if (patternLen
>= 2) {
715 if (pattern
[0] != string
[0])
716 return 0; /* no match */
718 if (tolower((int)pattern
[0]) != tolower((int)string
[0]))
719 return 0; /* no match */
727 if (stringLen
== 0) {
728 while(*pattern
== '*') {
735 if (patternLen
== 0 && stringLen
== 0)
740 static void redisLog(int level
, const char *fmt
, ...) {
744 fp
= (server
.logfile
== NULL
) ? stdout
: fopen(server
.logfile
,"a");
748 if (level
>= server
.verbosity
) {
754 strftime(buf
,64,"%d %b %H:%M:%S",localtime(&now
));
755 fprintf(fp
,"%s %c ",buf
,c
[level
]);
756 vfprintf(fp
, fmt
, ap
);
762 if (server
.logfile
) fclose(fp
);
765 /*====================== Hash table type implementation ==================== */
767 /* This is an hash table type that uses the SDS dynamic strings libary as
768 * keys and radis objects as values (objects can hold SDS strings,
771 static void dictVanillaFree(void *privdata
, void *val
)
773 DICT_NOTUSED(privdata
);
777 static void dictListDestructor(void *privdata
, void *val
)
779 DICT_NOTUSED(privdata
);
780 listRelease((list
*)val
);
783 static int sdsDictKeyCompare(void *privdata
, const void *key1
,
787 DICT_NOTUSED(privdata
);
789 l1
= sdslen((sds
)key1
);
790 l2
= sdslen((sds
)key2
);
791 if (l1
!= l2
) return 0;
792 return memcmp(key1
, key2
, l1
) == 0;
795 static void dictRedisObjectDestructor(void *privdata
, void *val
)
797 DICT_NOTUSED(privdata
);
802 static int dictObjKeyCompare(void *privdata
, const void *key1
,
805 const robj
*o1
= key1
, *o2
= key2
;
806 return sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
809 static unsigned int dictObjHash(const void *key
) {
811 return dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
814 static int dictEncObjKeyCompare(void *privdata
, const void *key1
,
817 robj
*o1
= (robj
*) key1
, *o2
= (robj
*) key2
;
820 o1
= getDecodedObject(o1
);
821 o2
= getDecodedObject(o2
);
822 cmp
= sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
828 static unsigned int dictEncObjHash(const void *key
) {
829 robj
*o
= (robj
*) key
;
831 o
= getDecodedObject(o
);
832 unsigned int hash
= dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
837 static dictType setDictType
= {
838 dictEncObjHash
, /* hash function */
841 dictEncObjKeyCompare
, /* key compare */
842 dictRedisObjectDestructor
, /* key destructor */
843 NULL
/* val destructor */
846 static dictType zsetDictType
= {
847 dictEncObjHash
, /* hash function */
850 dictEncObjKeyCompare
, /* key compare */
851 dictRedisObjectDestructor
, /* key destructor */
852 dictVanillaFree
/* val destructor of malloc(sizeof(double)) */
855 static dictType hashDictType
= {
856 dictObjHash
, /* hash function */
859 dictObjKeyCompare
, /* key compare */
860 dictRedisObjectDestructor
, /* key destructor */
861 dictRedisObjectDestructor
/* val destructor */
864 /* Keylist hash table type has unencoded redis objects as keys and
865 * lists as values. It's used for blocking operations (BLPOP) */
866 static dictType keylistDictType
= {
867 dictObjHash
, /* hash function */
870 dictObjKeyCompare
, /* key compare */
871 dictRedisObjectDestructor
, /* key destructor */
872 dictListDestructor
/* val destructor */
875 /* ========================= Random utility functions ======================= */
877 /* Redis generally does not try to recover from out of memory conditions
878 * when allocating objects or strings, it is not clear if it will be possible
879 * to report this condition to the client since the networking layer itself
880 * is based on heap allocation for send buffers, so we simply abort.
881 * At least the code will be simpler to read... */
882 static void oom(const char *msg
) {
883 redisLog(REDIS_WARNING
, "%s: Out of memory\n",msg
);
888 /* ====================== Redis server networking stuff ===================== */
889 static void closeTimedoutClients(void) {
892 time_t now
= time(NULL
);
894 listRewind(server
.clients
);
895 while ((ln
= listYield(server
.clients
)) != NULL
) {
896 c
= listNodeValue(ln
);
897 if (server
.maxidletime
&&
898 !(c
->flags
& REDIS_SLAVE
) && /* no timeout for slaves */
899 !(c
->flags
& REDIS_MASTER
) && /* no timeout for masters */
900 (now
- c
->lastinteraction
> server
.maxidletime
))
902 redisLog(REDIS_DEBUG
,"Closing idle client");
904 } else if (c
->flags
& REDIS_BLOCKED
) {
905 if (c
->blockingto
!= 0 && c
->blockingto
< now
) {
906 addReply(c
,shared
.nullmultibulk
);
913 static int htNeedsResize(dict
*dict
) {
914 long long size
, used
;
916 size
= dictSlots(dict
);
917 used
= dictSize(dict
);
918 return (size
&& used
&& size
> DICT_HT_INITIAL_SIZE
&&
919 (used
*100/size
< REDIS_HT_MINFILL
));
922 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
923 * we resize the hash table to save memory */
924 static void tryResizeHashTables(void) {
927 for (j
= 0; j
< server
.dbnum
; j
++) {
928 if (htNeedsResize(server
.db
[j
].dict
)) {
929 redisLog(REDIS_DEBUG
,"The hash table %d is too sparse, resize it...",j
);
930 dictResize(server
.db
[j
].dict
);
931 redisLog(REDIS_DEBUG
,"Hash table %d resized.",j
);
933 if (htNeedsResize(server
.db
[j
].expires
))
934 dictResize(server
.db
[j
].expires
);
938 /* A background saving child (BGSAVE) terminated its work. Handle this. */
939 void backgroundSaveDoneHandler(int statloc
) {
940 int exitcode
= WEXITSTATUS(statloc
);
941 int bysignal
= WIFSIGNALED(statloc
);
943 if (!bysignal
&& exitcode
== 0) {
944 redisLog(REDIS_NOTICE
,
945 "Background saving terminated with success");
947 server
.lastsave
= time(NULL
);
948 } else if (!bysignal
&& exitcode
!= 0) {
949 redisLog(REDIS_WARNING
, "Background saving error");
951 redisLog(REDIS_WARNING
,
952 "Background saving terminated by signal");
953 rdbRemoveTempFile(server
.bgsavechildpid
);
955 server
.bgsavechildpid
= -1;
956 /* Possibly there are slaves waiting for a BGSAVE in order to be served
957 * (the first stage of SYNC is a bulk transfer of dump.rdb) */
958 updateSlavesWaitingBgsave(exitcode
== 0 ? REDIS_OK
: REDIS_ERR
);
961 /* A background append only file rewriting (BGREWRITEAOF) terminated its work.
963 void backgroundRewriteDoneHandler(int statloc
) {
964 int exitcode
= WEXITSTATUS(statloc
);
965 int bysignal
= WIFSIGNALED(statloc
);
967 if (!bysignal
&& exitcode
== 0) {
971 redisLog(REDIS_NOTICE
,
972 "Background append only file rewriting terminated with success");
973 /* Now it's time to flush the differences accumulated by the parent */
974 snprintf(tmpfile
,256,"temp-rewriteaof-bg-%d.aof", (int) server
.bgrewritechildpid
);
975 fd
= open(tmpfile
,O_WRONLY
|O_APPEND
);
977 redisLog(REDIS_WARNING
, "Not able to open the temp append only file produced by the child: %s", strerror(errno
));
980 /* Flush our data... */
981 if (write(fd
,server
.bgrewritebuf
,sdslen(server
.bgrewritebuf
)) !=
982 (signed) sdslen(server
.bgrewritebuf
)) {
983 redisLog(REDIS_WARNING
, "Error or short write trying to flush the parent diff of the append log file in the child temp file: %s", strerror(errno
));
987 redisLog(REDIS_NOTICE
,"Parent diff flushed into the new append log file with success (%lu bytes)",sdslen(server
.bgrewritebuf
));
988 /* Now our work is to rename the temp file into the stable file. And
989 * switch the file descriptor used by the server for append only. */
990 if (rename(tmpfile
,server
.appendfilename
) == -1) {
991 redisLog(REDIS_WARNING
,"Can't rename the temp append only file into the stable one: %s", strerror(errno
));
995 /* Mission completed... almost */
996 redisLog(REDIS_NOTICE
,"Append only file successfully rewritten.");
997 if (server
.appendfd
!= -1) {
998 /* If append only is actually enabled... */
999 close(server
.appendfd
);
1000 server
.appendfd
= fd
;
1002 server
.appendseldb
= -1; /* Make sure it will issue SELECT */
1003 redisLog(REDIS_NOTICE
,"The new append only file was selected for future appends.");
1005 /* If append only is disabled we just generate a dump in this
1006 * format. Why not? */
1009 } else if (!bysignal
&& exitcode
!= 0) {
1010 redisLog(REDIS_WARNING
, "Background append only file rewriting error");
1012 redisLog(REDIS_WARNING
,
1013 "Background append only file rewriting terminated by signal");
1016 sdsfree(server
.bgrewritebuf
);
1017 server
.bgrewritebuf
= sdsempty();
1018 aofRemoveTempFile(server
.bgrewritechildpid
);
1019 server
.bgrewritechildpid
= -1;
1022 static int serverCron(struct aeEventLoop
*eventLoop
, long long id
, void *clientData
) {
1023 int j
, loops
= server
.cronloops
++;
1024 REDIS_NOTUSED(eventLoop
);
1026 REDIS_NOTUSED(clientData
);
1028 /* Update the global state with the amount of used memory */
1029 server
.usedmemory
= zmalloc_used_memory();
1031 /* Show some info about non-empty databases */
1032 for (j
= 0; j
< server
.dbnum
; j
++) {
1033 long long size
, used
, vkeys
;
1035 size
= dictSlots(server
.db
[j
].dict
);
1036 used
= dictSize(server
.db
[j
].dict
);
1037 vkeys
= dictSize(server
.db
[j
].expires
);
1038 if (!(loops
% 5) && (used
|| vkeys
)) {
1039 redisLog(REDIS_DEBUG
,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j
,used
,vkeys
,size
);
1040 /* dictPrintStats(server.dict); */
1044 /* We don't want to resize the hash tables while a bacground saving
1045 * is in progress: the saving child is created using fork() that is
1046 * implemented with a copy-on-write semantic in most modern systems, so
1047 * if we resize the HT while there is the saving child at work actually
1048 * a lot of memory movements in the parent will cause a lot of pages
1050 if (server
.bgsavechildpid
== -1) tryResizeHashTables();
1052 /* Show information about connected clients */
1054 redisLog(REDIS_DEBUG
,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
1055 listLength(server
.clients
)-listLength(server
.slaves
),
1056 listLength(server
.slaves
),
1058 dictSize(server
.sharingpool
));
1061 /* Close connections of timedout clients */
1062 if ((server
.maxidletime
&& !(loops
% 10)) || server
.blockedclients
)
1063 closeTimedoutClients();
1065 /* Check if a background saving or AOF rewrite in progress terminated */
1066 if (server
.bgsavechildpid
!= -1 || server
.bgrewritechildpid
!= -1) {
1070 if ((pid
= wait3(&statloc
,WNOHANG
,NULL
)) != 0) {
1071 if (pid
== server
.bgsavechildpid
) {
1072 backgroundSaveDoneHandler(statloc
);
1074 backgroundRewriteDoneHandler(statloc
);
1078 /* If there is not a background saving in progress check if
1079 * we have to save now */
1080 time_t now
= time(NULL
);
1081 for (j
= 0; j
< server
.saveparamslen
; j
++) {
1082 struct saveparam
*sp
= server
.saveparams
+j
;
1084 if (server
.dirty
>= sp
->changes
&&
1085 now
-server
.lastsave
> sp
->seconds
) {
1086 redisLog(REDIS_NOTICE
,"%d changes in %d seconds. Saving...",
1087 sp
->changes
, sp
->seconds
);
1088 rdbSaveBackground(server
.dbfilename
);
1094 /* Try to expire a few timed out keys. The algorithm used is adaptive and
1095 * will use few CPU cycles if there are few expiring keys, otherwise
1096 * it will get more aggressive to avoid that too much memory is used by
1097 * keys that can be removed from the keyspace. */
1098 for (j
= 0; j
< server
.dbnum
; j
++) {
1100 redisDb
*db
= server
.db
+j
;
1102 /* Continue to expire if at the end of the cycle more than 25%
1103 * of the keys were expired. */
1105 int num
= dictSize(db
->expires
);
1106 time_t now
= time(NULL
);
1109 if (num
> REDIS_EXPIRELOOKUPS_PER_CRON
)
1110 num
= REDIS_EXPIRELOOKUPS_PER_CRON
;
1115 if ((de
= dictGetRandomKey(db
->expires
)) == NULL
) break;
1116 t
= (time_t) dictGetEntryVal(de
);
1118 deleteKey(db
,dictGetEntryKey(de
));
1122 } while (expired
> REDIS_EXPIRELOOKUPS_PER_CRON
/4);
1125 /* Check if we should connect to a MASTER */
1126 if (server
.replstate
== REDIS_REPL_CONNECT
) {
1127 redisLog(REDIS_NOTICE
,"Connecting to MASTER...");
1128 if (syncWithMaster() == REDIS_OK
) {
1129 redisLog(REDIS_NOTICE
,"MASTER <-> SLAVE sync succeeded");
1135 static void createSharedObjects(void) {
1136 shared
.crlf
= createObject(REDIS_STRING
,sdsnew("\r\n"));
1137 shared
.ok
= createObject(REDIS_STRING
,sdsnew("+OK\r\n"));
1138 shared
.err
= createObject(REDIS_STRING
,sdsnew("-ERR\r\n"));
1139 shared
.emptybulk
= createObject(REDIS_STRING
,sdsnew("$0\r\n\r\n"));
1140 shared
.czero
= createObject(REDIS_STRING
,sdsnew(":0\r\n"));
1141 shared
.cone
= createObject(REDIS_STRING
,sdsnew(":1\r\n"));
1142 shared
.nullbulk
= createObject(REDIS_STRING
,sdsnew("$-1\r\n"));
1143 shared
.nullmultibulk
= createObject(REDIS_STRING
,sdsnew("*-1\r\n"));
1144 shared
.emptymultibulk
= createObject(REDIS_STRING
,sdsnew("*0\r\n"));
1145 shared
.pong
= createObject(REDIS_STRING
,sdsnew("+PONG\r\n"));
1146 shared
.queued
= createObject(REDIS_STRING
,sdsnew("+QUEUED\r\n"));
1147 shared
.wrongtypeerr
= createObject(REDIS_STRING
,sdsnew(
1148 "-ERR Operation against a key holding the wrong kind of value\r\n"));
1149 shared
.nokeyerr
= createObject(REDIS_STRING
,sdsnew(
1150 "-ERR no such key\r\n"));
1151 shared
.syntaxerr
= createObject(REDIS_STRING
,sdsnew(
1152 "-ERR syntax error\r\n"));
1153 shared
.sameobjecterr
= createObject(REDIS_STRING
,sdsnew(
1154 "-ERR source and destination objects are the same\r\n"));
1155 shared
.outofrangeerr
= createObject(REDIS_STRING
,sdsnew(
1156 "-ERR index out of range\r\n"));
1157 shared
.space
= createObject(REDIS_STRING
,sdsnew(" "));
1158 shared
.colon
= createObject(REDIS_STRING
,sdsnew(":"));
1159 shared
.plus
= createObject(REDIS_STRING
,sdsnew("+"));
1160 shared
.select0
= createStringObject("select 0\r\n",10);
1161 shared
.select1
= createStringObject("select 1\r\n",10);
1162 shared
.select2
= createStringObject("select 2\r\n",10);
1163 shared
.select3
= createStringObject("select 3\r\n",10);
1164 shared
.select4
= createStringObject("select 4\r\n",10);
1165 shared
.select5
= createStringObject("select 5\r\n",10);
1166 shared
.select6
= createStringObject("select 6\r\n",10);
1167 shared
.select7
= createStringObject("select 7\r\n",10);
1168 shared
.select8
= createStringObject("select 8\r\n",10);
1169 shared
.select9
= createStringObject("select 9\r\n",10);
1172 static void appendServerSaveParams(time_t seconds
, int changes
) {
1173 server
.saveparams
= zrealloc(server
.saveparams
,sizeof(struct saveparam
)*(server
.saveparamslen
+1));
1174 server
.saveparams
[server
.saveparamslen
].seconds
= seconds
;
1175 server
.saveparams
[server
.saveparamslen
].changes
= changes
;
1176 server
.saveparamslen
++;
1179 static void resetServerSaveParams() {
1180 zfree(server
.saveparams
);
1181 server
.saveparams
= NULL
;
1182 server
.saveparamslen
= 0;
1185 static void initServerConfig() {
1186 server
.dbnum
= REDIS_DEFAULT_DBNUM
;
1187 server
.port
= REDIS_SERVERPORT
;
1188 server
.verbosity
= REDIS_DEBUG
;
1189 server
.maxidletime
= REDIS_MAXIDLETIME
;
1190 server
.saveparams
= NULL
;
1191 server
.logfile
= NULL
; /* NULL = log on standard output */
1192 server
.bindaddr
= NULL
;
1193 server
.glueoutputbuf
= 1;
1194 server
.daemonize
= 0;
1195 server
.appendonly
= 0;
1196 server
.appendfsync
= APPENDFSYNC_ALWAYS
;
1197 server
.lastfsync
= time(NULL
);
1198 server
.appendfd
= -1;
1199 server
.appendseldb
= -1; /* Make sure the first time will not match */
1200 server
.pidfile
= "/var/run/redis.pid";
1201 server
.dbfilename
= "dump.rdb";
1202 server
.appendfilename
= "appendonly.aof";
1203 server
.requirepass
= NULL
;
1204 server
.shareobjects
= 0;
1205 server
.rdbcompression
= 1;
1206 server
.sharingpoolsize
= 1024;
1207 server
.maxclients
= 0;
1208 server
.blockedclients
= 0;
1209 server
.maxmemory
= 0;
1210 resetServerSaveParams();
1212 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1213 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1214 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1215 /* Replication related */
1217 server
.masterauth
= NULL
;
1218 server
.masterhost
= NULL
;
1219 server
.masterport
= 6379;
1220 server
.master
= NULL
;
1221 server
.replstate
= REDIS_REPL_NONE
;
1223 /* Double constants initialization */
1225 R_PosInf
= 1.0/R_Zero
;
1226 R_NegInf
= -1.0/R_Zero
;
1227 R_Nan
= R_Zero
/R_Zero
;
1230 static void initServer() {
1233 signal(SIGHUP
, SIG_IGN
);
1234 signal(SIGPIPE
, SIG_IGN
);
1235 setupSigSegvAction();
1237 server
.clients
= listCreate();
1238 server
.slaves
= listCreate();
1239 server
.monitors
= listCreate();
1240 server
.objfreelist
= listCreate();
1241 createSharedObjects();
1242 server
.el
= aeCreateEventLoop();
1243 server
.db
= zmalloc(sizeof(redisDb
)*server
.dbnum
);
1244 server
.sharingpool
= dictCreate(&setDictType
,NULL
);
1245 server
.fd
= anetTcpServer(server
.neterr
, server
.port
, server
.bindaddr
);
1246 if (server
.fd
== -1) {
1247 redisLog(REDIS_WARNING
, "Opening TCP port: %s", server
.neterr
);
1250 for (j
= 0; j
< server
.dbnum
; j
++) {
1251 server
.db
[j
].dict
= dictCreate(&hashDictType
,NULL
);
1252 server
.db
[j
].expires
= dictCreate(&setDictType
,NULL
);
1253 server
.db
[j
].blockingkeys
= dictCreate(&keylistDictType
,NULL
);
1254 server
.db
[j
].id
= j
;
1256 server
.cronloops
= 0;
1257 server
.bgsavechildpid
= -1;
1258 server
.bgrewritechildpid
= -1;
1259 server
.bgrewritebuf
= sdsempty();
1260 server
.lastsave
= time(NULL
);
1262 server
.usedmemory
= 0;
1263 server
.stat_numcommands
= 0;
1264 server
.stat_numconnections
= 0;
1265 server
.stat_starttime
= time(NULL
);
1266 aeCreateTimeEvent(server
.el
, 1, serverCron
, NULL
, NULL
);
1268 if (server
.appendonly
) {
1269 server
.appendfd
= open(server
.appendfilename
,O_WRONLY
|O_APPEND
|O_CREAT
,0644);
1270 if (server
.appendfd
== -1) {
1271 redisLog(REDIS_WARNING
, "Can't open the append-only file: %s",
1278 /* Empty the whole database */
1279 static long long emptyDb() {
1281 long long removed
= 0;
1283 for (j
= 0; j
< server
.dbnum
; j
++) {
1284 removed
+= dictSize(server
.db
[j
].dict
);
1285 dictEmpty(server
.db
[j
].dict
);
1286 dictEmpty(server
.db
[j
].expires
);
1291 static int yesnotoi(char *s
) {
1292 if (!strcasecmp(s
,"yes")) return 1;
1293 else if (!strcasecmp(s
,"no")) return 0;
1297 /* I agree, this is a very rudimental way to load a configuration...
1298 will improve later if the config gets more complex */
1299 static void loadServerConfig(char *filename
) {
1301 char buf
[REDIS_CONFIGLINE_MAX
+1], *err
= NULL
;
1305 if (filename
[0] == '-' && filename
[1] == '\0')
1308 if ((fp
= fopen(filename
,"r")) == NULL
) {
1309 redisLog(REDIS_WARNING
,"Fatal error, can't open config file");
1314 while(fgets(buf
,REDIS_CONFIGLINE_MAX
+1,fp
) != NULL
) {
1320 line
= sdstrim(line
," \t\r\n");
1322 /* Skip comments and blank lines*/
1323 if (line
[0] == '#' || line
[0] == '\0') {
1328 /* Split into arguments */
1329 argv
= sdssplitlen(line
,sdslen(line
)," ",1,&argc
);
1330 sdstolower(argv
[0]);
1332 /* Execute config directives */
1333 if (!strcasecmp(argv
[0],"timeout") && argc
== 2) {
1334 server
.maxidletime
= atoi(argv
[1]);
1335 if (server
.maxidletime
< 0) {
1336 err
= "Invalid timeout value"; goto loaderr
;
1338 } else if (!strcasecmp(argv
[0],"port") && argc
== 2) {
1339 server
.port
= atoi(argv
[1]);
1340 if (server
.port
< 1 || server
.port
> 65535) {
1341 err
= "Invalid port"; goto loaderr
;
1343 } else if (!strcasecmp(argv
[0],"bind") && argc
== 2) {
1344 server
.bindaddr
= zstrdup(argv
[1]);
1345 } else if (!strcasecmp(argv
[0],"save") && argc
== 3) {
1346 int seconds
= atoi(argv
[1]);
1347 int changes
= atoi(argv
[2]);
1348 if (seconds
< 1 || changes
< 0) {
1349 err
= "Invalid save parameters"; goto loaderr
;
1351 appendServerSaveParams(seconds
,changes
);
1352 } else if (!strcasecmp(argv
[0],"dir") && argc
== 2) {
1353 if (chdir(argv
[1]) == -1) {
1354 redisLog(REDIS_WARNING
,"Can't chdir to '%s': %s",
1355 argv
[1], strerror(errno
));
1358 } else if (!strcasecmp(argv
[0],"loglevel") && argc
== 2) {
1359 if (!strcasecmp(argv
[1],"debug")) server
.verbosity
= REDIS_DEBUG
;
1360 else if (!strcasecmp(argv
[1],"notice")) server
.verbosity
= REDIS_NOTICE
;
1361 else if (!strcasecmp(argv
[1],"warning")) server
.verbosity
= REDIS_WARNING
;
1363 err
= "Invalid log level. Must be one of debug, notice, warning";
1366 } else if (!strcasecmp(argv
[0],"logfile") && argc
== 2) {
1369 server
.logfile
= zstrdup(argv
[1]);
1370 if (!strcasecmp(server
.logfile
,"stdout")) {
1371 zfree(server
.logfile
);
1372 server
.logfile
= NULL
;
1374 if (server
.logfile
) {
1375 /* Test if we are able to open the file. The server will not
1376 * be able to abort just for this problem later... */
1377 logfp
= fopen(server
.logfile
,"a");
1378 if (logfp
== NULL
) {
1379 err
= sdscatprintf(sdsempty(),
1380 "Can't open the log file: %s", strerror(errno
));
1385 } else if (!strcasecmp(argv
[0],"databases") && argc
== 2) {
1386 server
.dbnum
= atoi(argv
[1]);
1387 if (server
.dbnum
< 1) {
1388 err
= "Invalid number of databases"; goto loaderr
;
1390 } else if (!strcasecmp(argv
[0],"maxclients") && argc
== 2) {
1391 server
.maxclients
= atoi(argv
[1]);
1392 } else if (!strcasecmp(argv
[0],"maxmemory") && argc
== 2) {
1393 server
.maxmemory
= strtoll(argv
[1], NULL
, 10);
1394 } else if (!strcasecmp(argv
[0],"slaveof") && argc
== 3) {
1395 server
.masterhost
= sdsnew(argv
[1]);
1396 server
.masterport
= atoi(argv
[2]);
1397 server
.replstate
= REDIS_REPL_CONNECT
;
1398 } else if (!strcasecmp(argv
[0],"masterauth") && argc
== 2) {
1399 server
.masterauth
= zstrdup(argv
[1]);
1400 } else if (!strcasecmp(argv
[0],"glueoutputbuf") && argc
== 2) {
1401 if ((server
.glueoutputbuf
= yesnotoi(argv
[1])) == -1) {
1402 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1404 } else if (!strcasecmp(argv
[0],"shareobjects") && argc
== 2) {
1405 if ((server
.shareobjects
= yesnotoi(argv
[1])) == -1) {
1406 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1408 } else if (!strcasecmp(argv
[0],"rdbcompression") && argc
== 2) {
1409 if ((server
.rdbcompression
= yesnotoi(argv
[1])) == -1) {
1410 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1412 } else if (!strcasecmp(argv
[0],"shareobjectspoolsize") && argc
== 2) {
1413 server
.sharingpoolsize
= atoi(argv
[1]);
1414 if (server
.sharingpoolsize
< 1) {
1415 err
= "invalid object sharing pool size"; goto loaderr
;
1417 } else if (!strcasecmp(argv
[0],"daemonize") && argc
== 2) {
1418 if ((server
.daemonize
= yesnotoi(argv
[1])) == -1) {
1419 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1421 } else if (!strcasecmp(argv
[0],"appendonly") && argc
== 2) {
1422 if ((server
.appendonly
= yesnotoi(argv
[1])) == -1) {
1423 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1425 } else if (!strcasecmp(argv
[0],"appendfsync") && argc
== 2) {
1426 if (!strcasecmp(argv
[1],"no")) {
1427 server
.appendfsync
= APPENDFSYNC_NO
;
1428 } else if (!strcasecmp(argv
[1],"always")) {
1429 server
.appendfsync
= APPENDFSYNC_ALWAYS
;
1430 } else if (!strcasecmp(argv
[1],"everysec")) {
1431 server
.appendfsync
= APPENDFSYNC_EVERYSEC
;
1433 err
= "argument must be 'no', 'always' or 'everysec'";
1436 } else if (!strcasecmp(argv
[0],"requirepass") && argc
== 2) {
1437 server
.requirepass
= zstrdup(argv
[1]);
1438 } else if (!strcasecmp(argv
[0],"pidfile") && argc
== 2) {
1439 server
.pidfile
= zstrdup(argv
[1]);
1440 } else if (!strcasecmp(argv
[0],"dbfilename") && argc
== 2) {
1441 server
.dbfilename
= zstrdup(argv
[1]);
1443 err
= "Bad directive or wrong number of arguments"; goto loaderr
;
1445 for (j
= 0; j
< argc
; j
++)
1450 if (fp
!= stdin
) fclose(fp
);
1454 fprintf(stderr
, "\n*** FATAL CONFIG FILE ERROR ***\n");
1455 fprintf(stderr
, "Reading the configuration file, at line %d\n", linenum
);
1456 fprintf(stderr
, ">>> '%s'\n", line
);
1457 fprintf(stderr
, "%s\n", err
);
1461 static void freeClientArgv(redisClient
*c
) {
1464 for (j
= 0; j
< c
->argc
; j
++)
1465 decrRefCount(c
->argv
[j
]);
1466 for (j
= 0; j
< c
->mbargc
; j
++)
1467 decrRefCount(c
->mbargv
[j
]);
1472 static void freeClient(redisClient
*c
) {
1475 /* Note that if the client we are freeing is blocked into a blocking
1476 * call, we have to set querybuf to NULL *before* to call unblockClient()
1477 * to avoid processInputBuffer() will get called. Also it is important
1478 * to remove the file events after this, because this call adds
1479 * the READABLE event. */
1480 sdsfree(c
->querybuf
);
1482 if (c
->flags
& REDIS_BLOCKED
)
1485 aeDeleteFileEvent(server
.el
,c
->fd
,AE_READABLE
);
1486 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1487 listRelease(c
->reply
);
1490 ln
= listSearchKey(server
.clients
,c
);
1491 redisAssert(ln
!= NULL
);
1492 listDelNode(server
.clients
,ln
);
1493 if (c
->flags
& REDIS_SLAVE
) {
1494 if (c
->replstate
== REDIS_REPL_SEND_BULK
&& c
->repldbfd
!= -1)
1496 list
*l
= (c
->flags
& REDIS_MONITOR
) ? server
.monitors
: server
.slaves
;
1497 ln
= listSearchKey(l
,c
);
1498 redisAssert(ln
!= NULL
);
1501 if (c
->flags
& REDIS_MASTER
) {
1502 server
.master
= NULL
;
1503 server
.replstate
= REDIS_REPL_CONNECT
;
1507 freeClientMultiState(c
);
1511 #define GLUEREPLY_UP_TO (1024)
1512 static void glueReplyBuffersIfNeeded(redisClient
*c
) {
1514 char buf
[GLUEREPLY_UP_TO
];
1518 listRewind(c
->reply
);
1519 while((ln
= listYield(c
->reply
))) {
1523 objlen
= sdslen(o
->ptr
);
1524 if (copylen
+ objlen
<= GLUEREPLY_UP_TO
) {
1525 memcpy(buf
+copylen
,o
->ptr
,objlen
);
1527 listDelNode(c
->reply
,ln
);
1529 if (copylen
== 0) return;
1533 /* Now the output buffer is empty, add the new single element */
1534 o
= createObject(REDIS_STRING
,sdsnewlen(buf
,copylen
));
1535 listAddNodeHead(c
->reply
,o
);
1538 static void sendReplyToClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1539 redisClient
*c
= privdata
;
1540 int nwritten
= 0, totwritten
= 0, objlen
;
1543 REDIS_NOTUSED(mask
);
1545 /* Use writev() if we have enough buffers to send */
1546 if (!server
.glueoutputbuf
&&
1547 listLength(c
->reply
) > REDIS_WRITEV_THRESHOLD
&&
1548 !(c
->flags
& REDIS_MASTER
))
1550 sendReplyToClientWritev(el
, fd
, privdata
, mask
);
1554 while(listLength(c
->reply
)) {
1555 if (server
.glueoutputbuf
&& listLength(c
->reply
) > 1)
1556 glueReplyBuffersIfNeeded(c
);
1558 o
= listNodeValue(listFirst(c
->reply
));
1559 objlen
= sdslen(o
->ptr
);
1562 listDelNode(c
->reply
,listFirst(c
->reply
));
1566 if (c
->flags
& REDIS_MASTER
) {
1567 /* Don't reply to a master */
1568 nwritten
= objlen
- c
->sentlen
;
1570 nwritten
= write(fd
, ((char*)o
->ptr
)+c
->sentlen
, objlen
- c
->sentlen
);
1571 if (nwritten
<= 0) break;
1573 c
->sentlen
+= nwritten
;
1574 totwritten
+= nwritten
;
1575 /* If we fully sent the object on head go to the next one */
1576 if (c
->sentlen
== objlen
) {
1577 listDelNode(c
->reply
,listFirst(c
->reply
));
1580 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1581 * bytes, in a single threaded server it's a good idea to serve
1582 * other clients as well, even if a very large request comes from
1583 * super fast link that is always able to accept data (in real world
1584 * scenario think about 'KEYS *' against the loopback interfae) */
1585 if (totwritten
> REDIS_MAX_WRITE_PER_EVENT
) break;
1587 if (nwritten
== -1) {
1588 if (errno
== EAGAIN
) {
1591 redisLog(REDIS_DEBUG
,
1592 "Error writing to client: %s", strerror(errno
));
1597 if (totwritten
> 0) c
->lastinteraction
= time(NULL
);
1598 if (listLength(c
->reply
) == 0) {
1600 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1604 static void sendReplyToClientWritev(aeEventLoop
*el
, int fd
, void *privdata
, int mask
)
1606 redisClient
*c
= privdata
;
1607 int nwritten
= 0, totwritten
= 0, objlen
, willwrite
;
1609 struct iovec iov
[REDIS_WRITEV_IOVEC_COUNT
];
1610 int offset
, ion
= 0;
1612 REDIS_NOTUSED(mask
);
1615 while (listLength(c
->reply
)) {
1616 offset
= c
->sentlen
;
1620 /* fill-in the iov[] array */
1621 for(node
= listFirst(c
->reply
); node
; node
= listNextNode(node
)) {
1622 o
= listNodeValue(node
);
1623 objlen
= sdslen(o
->ptr
);
1625 if (totwritten
+ objlen
- offset
> REDIS_MAX_WRITE_PER_EVENT
)
1628 if(ion
== REDIS_WRITEV_IOVEC_COUNT
)
1629 break; /* no more iovecs */
1631 iov
[ion
].iov_base
= ((char*)o
->ptr
) + offset
;
1632 iov
[ion
].iov_len
= objlen
- offset
;
1633 willwrite
+= objlen
- offset
;
1634 offset
= 0; /* just for the first item */
1641 /* write all collected blocks at once */
1642 if((nwritten
= writev(fd
, iov
, ion
)) < 0) {
1643 if (errno
!= EAGAIN
) {
1644 redisLog(REDIS_DEBUG
,
1645 "Error writing to client: %s", strerror(errno
));
1652 totwritten
+= nwritten
;
1653 offset
= c
->sentlen
;
1655 /* remove written robjs from c->reply */
1656 while (nwritten
&& listLength(c
->reply
)) {
1657 o
= listNodeValue(listFirst(c
->reply
));
1658 objlen
= sdslen(o
->ptr
);
1660 if(nwritten
>= objlen
- offset
) {
1661 listDelNode(c
->reply
, listFirst(c
->reply
));
1662 nwritten
-= objlen
- offset
;
1666 c
->sentlen
+= nwritten
;
1674 c
->lastinteraction
= time(NULL
);
1676 if (listLength(c
->reply
) == 0) {
1678 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1682 static struct redisCommand
*lookupCommand(char *name
) {
1684 while(cmdTable
[j
].name
!= NULL
) {
1685 if (!strcasecmp(name
,cmdTable
[j
].name
)) return &cmdTable
[j
];
1691 /* resetClient prepare the client to process the next command */
1692 static void resetClient(redisClient
*c
) {
1698 /* Call() is the core of Redis execution of a command */
1699 static void call(redisClient
*c
, struct redisCommand
*cmd
) {
1702 dirty
= server
.dirty
;
1704 if (server
.appendonly
&& server
.dirty
-dirty
)
1705 feedAppendOnlyFile(cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1706 if (server
.dirty
-dirty
&& listLength(server
.slaves
))
1707 replicationFeedSlaves(server
.slaves
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1708 if (listLength(server
.monitors
))
1709 replicationFeedSlaves(server
.monitors
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1710 server
.stat_numcommands
++;
1713 /* If this function gets called we already read a whole
1714 * command, argments are in the client argv/argc fields.
1715 * processCommand() execute the command or prepare the
1716 * server for a bulk read from the client.
1718 * If 1 is returned the client is still alive and valid and
1719 * and other operations can be performed by the caller. Otherwise
1720 * if 0 is returned the client was destroied (i.e. after QUIT). */
1721 static int processCommand(redisClient
*c
) {
1722 struct redisCommand
*cmd
;
1724 /* Free some memory if needed (maxmemory setting) */
1725 if (server
.maxmemory
) freeMemoryIfNeeded();
1727 /* Handle the multi bulk command type. This is an alternative protocol
1728 * supported by Redis in order to receive commands that are composed of
1729 * multiple binary-safe "bulk" arguments. The latency of processing is
1730 * a bit higher but this allows things like multi-sets, so if this
1731 * protocol is used only for MSET and similar commands this is a big win. */
1732 if (c
->multibulk
== 0 && c
->argc
== 1 && ((char*)(c
->argv
[0]->ptr
))[0] == '*') {
1733 c
->multibulk
= atoi(((char*)c
->argv
[0]->ptr
)+1);
1734 if (c
->multibulk
<= 0) {
1738 decrRefCount(c
->argv
[c
->argc
-1]);
1742 } else if (c
->multibulk
) {
1743 if (c
->bulklen
== -1) {
1744 if (((char*)c
->argv
[0]->ptr
)[0] != '$') {
1745 addReplySds(c
,sdsnew("-ERR multi bulk protocol error\r\n"));
1749 int bulklen
= atoi(((char*)c
->argv
[0]->ptr
)+1);
1750 decrRefCount(c
->argv
[0]);
1751 if (bulklen
< 0 || bulklen
> 1024*1024*1024) {
1753 addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n"));
1758 c
->bulklen
= bulklen
+2; /* add two bytes for CR+LF */
1762 c
->mbargv
= zrealloc(c
->mbargv
,(sizeof(robj
*))*(c
->mbargc
+1));
1763 c
->mbargv
[c
->mbargc
] = c
->argv
[0];
1767 if (c
->multibulk
== 0) {
1771 /* Here we need to swap the multi-bulk argc/argv with the
1772 * normal argc/argv of the client structure. */
1774 c
->argv
= c
->mbargv
;
1775 c
->mbargv
= auxargv
;
1778 c
->argc
= c
->mbargc
;
1779 c
->mbargc
= auxargc
;
1781 /* We need to set bulklen to something different than -1
1782 * in order for the code below to process the command without
1783 * to try to read the last argument of a bulk command as
1784 * a special argument. */
1786 /* continue below and process the command */
1793 /* -- end of multi bulk commands processing -- */
1795 /* The QUIT command is handled as a special case. Normal command
1796 * procs are unable to close the client connection safely */
1797 if (!strcasecmp(c
->argv
[0]->ptr
,"quit")) {
1801 cmd
= lookupCommand(c
->argv
[0]->ptr
);
1804 sdscatprintf(sdsempty(), "-ERR unknown command '%s'\r\n",
1805 (char*)c
->argv
[0]->ptr
));
1808 } else if ((cmd
->arity
> 0 && cmd
->arity
!= c
->argc
) ||
1809 (c
->argc
< -cmd
->arity
)) {
1811 sdscatprintf(sdsempty(),
1812 "-ERR wrong number of arguments for '%s' command\r\n",
1816 } else if (server
.maxmemory
&& cmd
->flags
& REDIS_CMD_DENYOOM
&& zmalloc_used_memory() > server
.maxmemory
) {
1817 addReplySds(c
,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1820 } else if (cmd
->flags
& REDIS_CMD_BULK
&& c
->bulklen
== -1) {
1821 int bulklen
= atoi(c
->argv
[c
->argc
-1]->ptr
);
1823 decrRefCount(c
->argv
[c
->argc
-1]);
1824 if (bulklen
< 0 || bulklen
> 1024*1024*1024) {
1826 addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n"));
1831 c
->bulklen
= bulklen
+2; /* add two bytes for CR+LF */
1832 /* It is possible that the bulk read is already in the
1833 * buffer. Check this condition and handle it accordingly.
1834 * This is just a fast path, alternative to call processInputBuffer().
1835 * It's a good idea since the code is small and this condition
1836 * happens most of the times. */
1837 if ((signed)sdslen(c
->querybuf
) >= c
->bulklen
) {
1838 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
1840 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
1845 /* Let's try to share objects on the command arguments vector */
1846 if (server
.shareobjects
) {
1848 for(j
= 1; j
< c
->argc
; j
++)
1849 c
->argv
[j
] = tryObjectSharing(c
->argv
[j
]);
1851 /* Let's try to encode the bulk object to save space. */
1852 if (cmd
->flags
& REDIS_CMD_BULK
)
1853 tryObjectEncoding(c
->argv
[c
->argc
-1]);
1855 /* Check if the user is authenticated */
1856 if (server
.requirepass
&& !c
->authenticated
&& cmd
->proc
!= authCommand
) {
1857 addReplySds(c
,sdsnew("-ERR operation not permitted\r\n"));
1862 /* Exec the command */
1863 if (c
->flags
& REDIS_MULTI
&& cmd
->proc
!= execCommand
) {
1864 queueMultiCommand(c
,cmd
);
1865 addReply(c
,shared
.queued
);
1870 /* Prepare the client for the next command */
1871 if (c
->flags
& REDIS_CLOSE
) {
1879 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
) {
1883 /* (args*2)+1 is enough room for args, spaces, newlines */
1884 robj
*static_outv
[REDIS_STATIC_ARGS
*2+1];
1886 if (argc
<= REDIS_STATIC_ARGS
) {
1889 outv
= zmalloc(sizeof(robj
*)*(argc
*2+1));
1892 for (j
= 0; j
< argc
; j
++) {
1893 if (j
!= 0) outv
[outc
++] = shared
.space
;
1894 if ((cmd
->flags
& REDIS_CMD_BULK
) && j
== argc
-1) {
1897 lenobj
= createObject(REDIS_STRING
,
1898 sdscatprintf(sdsempty(),"%lu\r\n",
1899 (unsigned long) stringObjectLen(argv
[j
])));
1900 lenobj
->refcount
= 0;
1901 outv
[outc
++] = lenobj
;
1903 outv
[outc
++] = argv
[j
];
1905 outv
[outc
++] = shared
.crlf
;
1907 /* Increment all the refcounts at start and decrement at end in order to
1908 * be sure to free objects if there is no slave in a replication state
1909 * able to be feed with commands */
1910 for (j
= 0; j
< outc
; j
++) incrRefCount(outv
[j
]);
1912 while((ln
= listYield(slaves
))) {
1913 redisClient
*slave
= ln
->value
;
1915 /* Don't feed slaves that are still waiting for BGSAVE to start */
1916 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) continue;
1918 /* Feed all the other slaves, MONITORs and so on */
1919 if (slave
->slaveseldb
!= dictid
) {
1923 case 0: selectcmd
= shared
.select0
; break;
1924 case 1: selectcmd
= shared
.select1
; break;
1925 case 2: selectcmd
= shared
.select2
; break;
1926 case 3: selectcmd
= shared
.select3
; break;
1927 case 4: selectcmd
= shared
.select4
; break;
1928 case 5: selectcmd
= shared
.select5
; break;
1929 case 6: selectcmd
= shared
.select6
; break;
1930 case 7: selectcmd
= shared
.select7
; break;
1931 case 8: selectcmd
= shared
.select8
; break;
1932 case 9: selectcmd
= shared
.select9
; break;
1934 selectcmd
= createObject(REDIS_STRING
,
1935 sdscatprintf(sdsempty(),"select %d\r\n",dictid
));
1936 selectcmd
->refcount
= 0;
1939 addReply(slave
,selectcmd
);
1940 slave
->slaveseldb
= dictid
;
1942 for (j
= 0; j
< outc
; j
++) addReply(slave
,outv
[j
]);
1944 for (j
= 0; j
< outc
; j
++) decrRefCount(outv
[j
]);
1945 if (outv
!= static_outv
) zfree(outv
);
1948 static void processInputBuffer(redisClient
*c
) {
1950 /* Before to process the input buffer, make sure the client is not
1951 * waitig for a blocking operation such as BLPOP. Note that the first
1952 * iteration the client is never blocked, otherwise the processInputBuffer
1953 * would not be called at all, but after the execution of the first commands
1954 * in the input buffer the client may be blocked, and the "goto again"
1955 * will try to reiterate. The following line will make it return asap. */
1956 if (c
->flags
& REDIS_BLOCKED
) return;
1957 if (c
->bulklen
== -1) {
1958 /* Read the first line of the query */
1959 char *p
= strchr(c
->querybuf
,'\n');
1966 query
= c
->querybuf
;
1967 c
->querybuf
= sdsempty();
1968 querylen
= 1+(p
-(query
));
1969 if (sdslen(query
) > querylen
) {
1970 /* leave data after the first line of the query in the buffer */
1971 c
->querybuf
= sdscatlen(c
->querybuf
,query
+querylen
,sdslen(query
)-querylen
);
1973 *p
= '\0'; /* remove "\n" */
1974 if (*(p
-1) == '\r') *(p
-1) = '\0'; /* and "\r" if any */
1975 sdsupdatelen(query
);
1977 /* Now we can split the query in arguments */
1978 argv
= sdssplitlen(query
,sdslen(query
)," ",1,&argc
);
1981 if (c
->argv
) zfree(c
->argv
);
1982 c
->argv
= zmalloc(sizeof(robj
*)*argc
);
1984 for (j
= 0; j
< argc
; j
++) {
1985 if (sdslen(argv
[j
])) {
1986 c
->argv
[c
->argc
] = createObject(REDIS_STRING
,argv
[j
]);
1994 /* Execute the command. If the client is still valid
1995 * after processCommand() return and there is something
1996 * on the query buffer try to process the next command. */
1997 if (processCommand(c
) && sdslen(c
->querybuf
)) goto again
;
1999 /* Nothing to process, argc == 0. Just process the query
2000 * buffer if it's not empty or return to the caller */
2001 if (sdslen(c
->querybuf
)) goto again
;
2004 } else if (sdslen(c
->querybuf
) >= REDIS_REQUEST_MAX_SIZE
) {
2005 redisLog(REDIS_DEBUG
, "Client protocol error");
2010 /* Bulk read handling. Note that if we are at this point
2011 the client already sent a command terminated with a newline,
2012 we are reading the bulk data that is actually the last
2013 argument of the command. */
2014 int qbl
= sdslen(c
->querybuf
);
2016 if (c
->bulklen
<= qbl
) {
2017 /* Copy everything but the final CRLF as final argument */
2018 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
2020 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
2021 /* Process the command. If the client is still valid after
2022 * the processing and there is more data in the buffer
2023 * try to parse it. */
2024 if (processCommand(c
) && sdslen(c
->querybuf
)) goto again
;
2030 static void readQueryFromClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
2031 redisClient
*c
= (redisClient
*) privdata
;
2032 char buf
[REDIS_IOBUF_LEN
];
2035 REDIS_NOTUSED(mask
);
2037 nread
= read(fd
, buf
, REDIS_IOBUF_LEN
);
2039 if (errno
== EAGAIN
) {
2042 redisLog(REDIS_DEBUG
, "Reading from client: %s",strerror(errno
));
2046 } else if (nread
== 0) {
2047 redisLog(REDIS_DEBUG
, "Client closed connection");
2052 c
->querybuf
= sdscatlen(c
->querybuf
, buf
, nread
);
2053 c
->lastinteraction
= time(NULL
);
2057 processInputBuffer(c
);
2060 static int selectDb(redisClient
*c
, int id
) {
2061 if (id
< 0 || id
>= server
.dbnum
)
2063 c
->db
= &server
.db
[id
];
2067 static void *dupClientReplyValue(void *o
) {
2068 incrRefCount((robj
*)o
);
2072 static redisClient
*createClient(int fd
) {
2073 redisClient
*c
= zmalloc(sizeof(*c
));
2075 anetNonBlock(NULL
,fd
);
2076 anetTcpNoDelay(NULL
,fd
);
2077 if (!c
) return NULL
;
2080 c
->querybuf
= sdsempty();
2089 c
->lastinteraction
= time(NULL
);
2090 c
->authenticated
= 0;
2091 c
->replstate
= REDIS_REPL_NONE
;
2092 c
->reply
= listCreate();
2093 c
->blockingkeys
= NULL
;
2094 c
->blockingkeysnum
= 0;
2095 listSetFreeMethod(c
->reply
,decrRefCount
);
2096 listSetDupMethod(c
->reply
,dupClientReplyValue
);
2097 if (aeCreateFileEvent(server
.el
, c
->fd
, AE_READABLE
,
2098 readQueryFromClient
, c
) == AE_ERR
) {
2102 listAddNodeTail(server
.clients
,c
);
2103 initClientMultiState(c
);
2107 static void addReply(redisClient
*c
, robj
*obj
) {
2108 if (listLength(c
->reply
) == 0 &&
2109 (c
->replstate
== REDIS_REPL_NONE
||
2110 c
->replstate
== REDIS_REPL_ONLINE
) &&
2111 aeCreateFileEvent(server
.el
, c
->fd
, AE_WRITABLE
,
2112 sendReplyToClient
, c
) == AE_ERR
) return;
2113 listAddNodeTail(c
->reply
,getDecodedObject(obj
));
2116 static void addReplySds(redisClient
*c
, sds s
) {
2117 robj
*o
= createObject(REDIS_STRING
,s
);
2122 static void addReplyDouble(redisClient
*c
, double d
) {
2125 snprintf(buf
,sizeof(buf
),"%.17g",d
);
2126 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n%s\r\n",
2127 (unsigned long) strlen(buf
),buf
));
2130 static void addReplyBulkLen(redisClient
*c
, robj
*obj
) {
2133 if (obj
->encoding
== REDIS_ENCODING_RAW
) {
2134 len
= sdslen(obj
->ptr
);
2136 long n
= (long)obj
->ptr
;
2138 /* Compute how many bytes will take this integer as a radix 10 string */
2144 while((n
= n
/10) != 0) {
2148 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n",(unsigned long)len
));
2151 static void acceptHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
2156 REDIS_NOTUSED(mask
);
2157 REDIS_NOTUSED(privdata
);
2159 cfd
= anetAccept(server
.neterr
, fd
, cip
, &cport
);
2160 if (cfd
== AE_ERR
) {
2161 redisLog(REDIS_DEBUG
,"Accepting client connection: %s", server
.neterr
);
2164 redisLog(REDIS_DEBUG
,"Accepted %s:%d", cip
, cport
);
2165 if ((c
= createClient(cfd
)) == NULL
) {
2166 redisLog(REDIS_WARNING
,"Error allocating resoures for the client");
2167 close(cfd
); /* May be already closed, just ingore errors */
2170 /* If maxclient directive is set and this is one client more... close the
2171 * connection. Note that we create the client instead to check before
2172 * for this condition, since now the socket is already set in nonblocking
2173 * mode and we can send an error for free using the Kernel I/O */
2174 if (server
.maxclients
&& listLength(server
.clients
) > server
.maxclients
) {
2175 char *err
= "-ERR max number of clients reached\r\n";
2177 /* That's a best effort error message, don't check write errors */
2178 if (write(c
->fd
,err
,strlen(err
)) == -1) {
2179 /* Nothing to do, Just to avoid the warning... */
2184 server
.stat_numconnections
++;
2187 /* ======================= Redis objects implementation ===================== */
2189 static robj
*createObject(int type
, void *ptr
) {
2192 if (listLength(server
.objfreelist
)) {
2193 listNode
*head
= listFirst(server
.objfreelist
);
2194 o
= listNodeValue(head
);
2195 listDelNode(server
.objfreelist
,head
);
2197 o
= zmalloc(sizeof(*o
));
2200 o
->encoding
= REDIS_ENCODING_RAW
;
2206 static robj
*createStringObject(char *ptr
, size_t len
) {
2207 return createObject(REDIS_STRING
,sdsnewlen(ptr
,len
));
2210 static robj
*createListObject(void) {
2211 list
*l
= listCreate();
2213 listSetFreeMethod(l
,decrRefCount
);
2214 return createObject(REDIS_LIST
,l
);
2217 static robj
*createSetObject(void) {
2218 dict
*d
= dictCreate(&setDictType
,NULL
);
2219 return createObject(REDIS_SET
,d
);
2222 static robj
*createZsetObject(void) {
2223 zset
*zs
= zmalloc(sizeof(*zs
));
2225 zs
->dict
= dictCreate(&zsetDictType
,NULL
);
2226 zs
->zsl
= zslCreate();
2227 return createObject(REDIS_ZSET
,zs
);
2230 static void freeStringObject(robj
*o
) {
2231 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2236 static void freeListObject(robj
*o
) {
2237 listRelease((list
*) o
->ptr
);
2240 static void freeSetObject(robj
*o
) {
2241 dictRelease((dict
*) o
->ptr
);
2244 static void freeZsetObject(robj
*o
) {
2247 dictRelease(zs
->dict
);
2252 static void freeHashObject(robj
*o
) {
2253 dictRelease((dict
*) o
->ptr
);
2256 static void incrRefCount(robj
*o
) {
2258 #ifdef DEBUG_REFCOUNT
2259 if (o
->type
== REDIS_STRING
)
2260 printf("Increment '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
);
2264 static void decrRefCount(void *obj
) {
2267 #ifdef DEBUG_REFCOUNT
2268 if (o
->type
== REDIS_STRING
)
2269 printf("Decrement '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
-1);
2271 if (--(o
->refcount
) == 0) {
2273 case REDIS_STRING
: freeStringObject(o
); break;
2274 case REDIS_LIST
: freeListObject(o
); break;
2275 case REDIS_SET
: freeSetObject(o
); break;
2276 case REDIS_ZSET
: freeZsetObject(o
); break;
2277 case REDIS_HASH
: freeHashObject(o
); break;
2278 default: redisAssert(0 != 0); break;
2280 if (listLength(server
.objfreelist
) > REDIS_OBJFREELIST_MAX
||
2281 !listAddNodeHead(server
.objfreelist
,o
))
2286 static robj
*lookupKey(redisDb
*db
, robj
*key
) {
2287 dictEntry
*de
= dictFind(db
->dict
,key
);
2288 return de
? dictGetEntryVal(de
) : NULL
;
2291 static robj
*lookupKeyRead(redisDb
*db
, robj
*key
) {
2292 expireIfNeeded(db
,key
);
2293 return lookupKey(db
,key
);
2296 static robj
*lookupKeyWrite(redisDb
*db
, robj
*key
) {
2297 deleteIfVolatile(db
,key
);
2298 return lookupKey(db
,key
);
2301 static int deleteKey(redisDb
*db
, robj
*key
) {
2304 /* We need to protect key from destruction: after the first dictDelete()
2305 * it may happen that 'key' is no longer valid if we don't increment
2306 * it's count. This may happen when we get the object reference directly
2307 * from the hash table with dictRandomKey() or dict iterators */
2309 if (dictSize(db
->expires
)) dictDelete(db
->expires
,key
);
2310 retval
= dictDelete(db
->dict
,key
);
2313 return retval
== DICT_OK
;
2316 /* Try to share an object against the shared objects pool */
2317 static robj
*tryObjectSharing(robj
*o
) {
2318 struct dictEntry
*de
;
2321 if (o
== NULL
|| server
.shareobjects
== 0) return o
;
2323 redisAssert(o
->type
== REDIS_STRING
);
2324 de
= dictFind(server
.sharingpool
,o
);
2326 robj
*shared
= dictGetEntryKey(de
);
2328 c
= ((unsigned long) dictGetEntryVal(de
))+1;
2329 dictGetEntryVal(de
) = (void*) c
;
2330 incrRefCount(shared
);
2334 /* Here we are using a stream algorihtm: Every time an object is
2335 * shared we increment its count, everytime there is a miss we
2336 * recrement the counter of a random object. If this object reaches
2337 * zero we remove the object and put the current object instead. */
2338 if (dictSize(server
.sharingpool
) >=
2339 server
.sharingpoolsize
) {
2340 de
= dictGetRandomKey(server
.sharingpool
);
2341 redisAssert(de
!= NULL
);
2342 c
= ((unsigned long) dictGetEntryVal(de
))-1;
2343 dictGetEntryVal(de
) = (void*) c
;
2345 dictDelete(server
.sharingpool
,de
->key
);
2348 c
= 0; /* If the pool is empty we want to add this object */
2353 retval
= dictAdd(server
.sharingpool
,o
,(void*)1);
2354 redisAssert(retval
== DICT_OK
);
2361 /* Check if the nul-terminated string 's' can be represented by a long
2362 * (that is, is a number that fits into long without any other space or
2363 * character before or after the digits).
2365 * If so, the function returns REDIS_OK and *longval is set to the value
2366 * of the number. Otherwise REDIS_ERR is returned */
2367 static int isStringRepresentableAsLong(sds s
, long *longval
) {
2368 char buf
[32], *endptr
;
2372 value
= strtol(s
, &endptr
, 10);
2373 if (endptr
[0] != '\0') return REDIS_ERR
;
2374 slen
= snprintf(buf
,32,"%ld",value
);
2376 /* If the number converted back into a string is not identical
2377 * then it's not possible to encode the string as integer */
2378 if (sdslen(s
) != (unsigned)slen
|| memcmp(buf
,s
,slen
)) return REDIS_ERR
;
2379 if (longval
) *longval
= value
;
2383 /* Try to encode a string object in order to save space */
2384 static int tryObjectEncoding(robj
*o
) {
2388 if (o
->encoding
!= REDIS_ENCODING_RAW
)
2389 return REDIS_ERR
; /* Already encoded */
2391 /* It's not save to encode shared objects: shared objects can be shared
2392 * everywhere in the "object space" of Redis. Encoded objects can only
2393 * appear as "values" (and not, for instance, as keys) */
2394 if (o
->refcount
> 1) return REDIS_ERR
;
2396 /* Currently we try to encode only strings */
2397 redisAssert(o
->type
== REDIS_STRING
);
2399 /* Check if we can represent this string as a long integer */
2400 if (isStringRepresentableAsLong(s
,&value
) == REDIS_ERR
) return REDIS_ERR
;
2402 /* Ok, this object can be encoded */
2403 o
->encoding
= REDIS_ENCODING_INT
;
2405 o
->ptr
= (void*) value
;
2409 /* Get a decoded version of an encoded object (returned as a new object).
2410 * If the object is already raw-encoded just increment the ref count. */
2411 static robj
*getDecodedObject(robj
*o
) {
2414 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2418 if (o
->type
== REDIS_STRING
&& o
->encoding
== REDIS_ENCODING_INT
) {
2421 snprintf(buf
,32,"%ld",(long)o
->ptr
);
2422 dec
= createStringObject(buf
,strlen(buf
));
2425 redisAssert(1 != 1);
2429 /* Compare two string objects via strcmp() or alike.
2430 * Note that the objects may be integer-encoded. In such a case we
2431 * use snprintf() to get a string representation of the numbers on the stack
2432 * and compare the strings, it's much faster than calling getDecodedObject().
2434 * Important note: if objects are not integer encoded, but binary-safe strings,
2435 * sdscmp() from sds.c will apply memcmp() so this function ca be considered
2437 static int compareStringObjects(robj
*a
, robj
*b
) {
2438 redisAssert(a
->type
== REDIS_STRING
&& b
->type
== REDIS_STRING
);
2439 char bufa
[128], bufb
[128], *astr
, *bstr
;
2442 if (a
== b
) return 0;
2443 if (a
->encoding
!= REDIS_ENCODING_RAW
) {
2444 snprintf(bufa
,sizeof(bufa
),"%ld",(long) a
->ptr
);
2450 if (b
->encoding
!= REDIS_ENCODING_RAW
) {
2451 snprintf(bufb
,sizeof(bufb
),"%ld",(long) b
->ptr
);
2457 return bothsds
? sdscmp(astr
,bstr
) : strcmp(astr
,bstr
);
2460 static size_t stringObjectLen(robj
*o
) {
2461 redisAssert(o
->type
== REDIS_STRING
);
2462 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2463 return sdslen(o
->ptr
);
2467 return snprintf(buf
,32,"%ld",(long)o
->ptr
);
2471 /*============================ RDB saving/loading =========================== */
2473 static int rdbSaveType(FILE *fp
, unsigned char type
) {
2474 if (fwrite(&type
,1,1,fp
) == 0) return -1;
2478 static int rdbSaveTime(FILE *fp
, time_t t
) {
2479 int32_t t32
= (int32_t) t
;
2480 if (fwrite(&t32
,4,1,fp
) == 0) return -1;
2484 /* check rdbLoadLen() comments for more info */
2485 static int rdbSaveLen(FILE *fp
, uint32_t len
) {
2486 unsigned char buf
[2];
2489 /* Save a 6 bit len */
2490 buf
[0] = (len
&0xFF)|(REDIS_RDB_6BITLEN
<<6);
2491 if (fwrite(buf
,1,1,fp
) == 0) return -1;
2492 } else if (len
< (1<<14)) {
2493 /* Save a 14 bit len */
2494 buf
[0] = ((len
>>8)&0xFF)|(REDIS_RDB_14BITLEN
<<6);
2496 if (fwrite(buf
,2,1,fp
) == 0) return -1;
2498 /* Save a 32 bit len */
2499 buf
[0] = (REDIS_RDB_32BITLEN
<<6);
2500 if (fwrite(buf
,1,1,fp
) == 0) return -1;
2502 if (fwrite(&len
,4,1,fp
) == 0) return -1;
2507 /* String objects in the form "2391" "-100" without any space and with a
2508 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2509 * encoded as integers to save space */
2510 static int rdbTryIntegerEncoding(sds s
, unsigned char *enc
) {
2512 char *endptr
, buf
[32];
2514 /* Check if it's possible to encode this value as a number */
2515 value
= strtoll(s
, &endptr
, 10);
2516 if (endptr
[0] != '\0') return 0;
2517 snprintf(buf
,32,"%lld",value
);
2519 /* If the number converted back into a string is not identical
2520 * then it's not possible to encode the string as integer */
2521 if (strlen(buf
) != sdslen(s
) || memcmp(buf
,s
,sdslen(s
))) return 0;
2523 /* Finally check if it fits in our ranges */
2524 if (value
>= -(1<<7) && value
<= (1<<7)-1) {
2525 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT8
;
2526 enc
[1] = value
&0xFF;
2528 } else if (value
>= -(1<<15) && value
<= (1<<15)-1) {
2529 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT16
;
2530 enc
[1] = value
&0xFF;
2531 enc
[2] = (value
>>8)&0xFF;
2533 } else if (value
>= -((long long)1<<31) && value
<= ((long long)1<<31)-1) {
2534 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT32
;
2535 enc
[1] = value
&0xFF;
2536 enc
[2] = (value
>>8)&0xFF;
2537 enc
[3] = (value
>>16)&0xFF;
2538 enc
[4] = (value
>>24)&0xFF;
2545 static int rdbSaveLzfStringObject(FILE *fp
, robj
*obj
) {
2546 unsigned int comprlen
, outlen
;
2550 /* We require at least four bytes compression for this to be worth it */
2551 outlen
= sdslen(obj
->ptr
)-4;
2552 if (outlen
<= 0) return 0;
2553 if ((out
= zmalloc(outlen
+1)) == NULL
) return 0;
2554 comprlen
= lzf_compress(obj
->ptr
, sdslen(obj
->ptr
), out
, outlen
);
2555 if (comprlen
== 0) {
2559 /* Data compressed! Let's save it on disk */
2560 byte
= (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_LZF
;
2561 if (fwrite(&byte
,1,1,fp
) == 0) goto writeerr
;
2562 if (rdbSaveLen(fp
,comprlen
) == -1) goto writeerr
;
2563 if (rdbSaveLen(fp
,sdslen(obj
->ptr
)) == -1) goto writeerr
;
2564 if (fwrite(out
,comprlen
,1,fp
) == 0) goto writeerr
;
2573 /* Save a string objet as [len][data] on disk. If the object is a string
2574 * representation of an integer value we try to safe it in a special form */
2575 static int rdbSaveStringObjectRaw(FILE *fp
, robj
*obj
) {
2579 len
= sdslen(obj
->ptr
);
2581 /* Try integer encoding */
2583 unsigned char buf
[5];
2584 if ((enclen
= rdbTryIntegerEncoding(obj
->ptr
,buf
)) > 0) {
2585 if (fwrite(buf
,enclen
,1,fp
) == 0) return -1;
2590 /* Try LZF compression - under 20 bytes it's unable to compress even
2591 * aaaaaaaaaaaaaaaaaa so skip it */
2592 if (server
.rdbcompression
&& len
> 20) {
2595 retval
= rdbSaveLzfStringObject(fp
,obj
);
2596 if (retval
== -1) return -1;
2597 if (retval
> 0) return 0;
2598 /* retval == 0 means data can't be compressed, save the old way */
2601 /* Store verbatim */
2602 if (rdbSaveLen(fp
,len
) == -1) return -1;
2603 if (len
&& fwrite(obj
->ptr
,len
,1,fp
) == 0) return -1;
2607 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2608 static int rdbSaveStringObject(FILE *fp
, robj
*obj
) {
2611 obj
= getDecodedObject(obj
);
2612 retval
= rdbSaveStringObjectRaw(fp
,obj
);
2617 /* Save a double value. Doubles are saved as strings prefixed by an unsigned
2618 * 8 bit integer specifing the length of the representation.
2619 * This 8 bit integer has special values in order to specify the following
2625 static int rdbSaveDoubleValue(FILE *fp
, double val
) {
2626 unsigned char buf
[128];
2632 } else if (!isfinite(val
)) {
2634 buf
[0] = (val
< 0) ? 255 : 254;
2636 snprintf((char*)buf
+1,sizeof(buf
)-1,"%.17g",val
);
2637 buf
[0] = strlen((char*)buf
+1);
2640 if (fwrite(buf
,len
,1,fp
) == 0) return -1;
2644 /* Save a Redis object. */
2645 static int rdbSaveObject(FILE *fp
, robj
*o
) {
2646 if (o
->type
== REDIS_STRING
) {
2647 /* Save a string value */
2648 if (rdbSaveStringObject(fp
,o
) == -1) return -1;
2649 } else if (o
->type
== REDIS_LIST
) {
2650 /* Save a list value */
2651 list
*list
= o
->ptr
;
2655 if (rdbSaveLen(fp
,listLength(list
)) == -1) return -1;
2656 while((ln
= listYield(list
))) {
2657 robj
*eleobj
= listNodeValue(ln
);
2659 if (rdbSaveStringObject(fp
,eleobj
) == -1) return -1;
2661 } else if (o
->type
== REDIS_SET
) {
2662 /* Save a set value */
2664 dictIterator
*di
= dictGetIterator(set
);
2667 if (rdbSaveLen(fp
,dictSize(set
)) == -1) return -1;
2668 while((de
= dictNext(di
)) != NULL
) {
2669 robj
*eleobj
= dictGetEntryKey(de
);
2671 if (rdbSaveStringObject(fp
,eleobj
) == -1) return -1;
2673 dictReleaseIterator(di
);
2674 } else if (o
->type
== REDIS_ZSET
) {
2675 /* Save a set value */
2677 dictIterator
*di
= dictGetIterator(zs
->dict
);
2680 if (rdbSaveLen(fp
,dictSize(zs
->dict
)) == -1) return -1;
2681 while((de
= dictNext(di
)) != NULL
) {
2682 robj
*eleobj
= dictGetEntryKey(de
);
2683 double *score
= dictGetEntryVal(de
);
2685 if (rdbSaveStringObject(fp
,eleobj
) == -1) return -1;
2686 if (rdbSaveDoubleValue(fp
,*score
) == -1) return -1;
2688 dictReleaseIterator(di
);
2690 redisAssert(0 != 0);
2695 /* Return the length the object will have on disk if saved with
2696 * the rdbSaveObject() function. Currently we use a trick to get
2697 * this length with very little changes to the code. In the future
2698 * we could switch to a faster solution. */
2699 static off_t
rdbSavedObjectLen(robj
*o
) {
2700 static FILE *fp
= NULL
;
2702 if (fp
== NULL
) fp
= fopen("/dev/null","w");
2706 assert(rdbSaveObject(fp
,o
) != 1);
2710 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2711 static int rdbSave(char *filename
) {
2712 dictIterator
*di
= NULL
;
2717 time_t now
= time(NULL
);
2719 snprintf(tmpfile
,256,"temp-%d.rdb", (int) getpid());
2720 fp
= fopen(tmpfile
,"w");
2722 redisLog(REDIS_WARNING
, "Failed saving the DB: %s", strerror(errno
));
2725 if (fwrite("REDIS0001",9,1,fp
) == 0) goto werr
;
2726 for (j
= 0; j
< server
.dbnum
; j
++) {
2727 redisDb
*db
= server
.db
+j
;
2729 if (dictSize(d
) == 0) continue;
2730 di
= dictGetIterator(d
);
2736 /* Write the SELECT DB opcode */
2737 if (rdbSaveType(fp
,REDIS_SELECTDB
) == -1) goto werr
;
2738 if (rdbSaveLen(fp
,j
) == -1) goto werr
;
2740 /* Iterate this DB writing every entry */
2741 while((de
= dictNext(di
)) != NULL
) {
2742 robj
*key
= dictGetEntryKey(de
);
2743 robj
*o
= dictGetEntryVal(de
);
2744 time_t expiretime
= getExpire(db
,key
);
2746 /* Save the expire time */
2747 if (expiretime
!= -1) {
2748 /* If this key is already expired skip it */
2749 if (expiretime
< now
) continue;
2750 if (rdbSaveType(fp
,REDIS_EXPIRETIME
) == -1) goto werr
;
2751 if (rdbSaveTime(fp
,expiretime
) == -1) goto werr
;
2753 /* Save the key and associated value */
2754 if (rdbSaveType(fp
,o
->type
) == -1) goto werr
;
2755 if (rdbSaveStringObject(fp
,key
) == -1) goto werr
;
2756 /* Save the actual value */
2757 if (rdbSaveObject(fp
,o
) == -1) goto werr
;
2759 dictReleaseIterator(di
);
2762 if (rdbSaveType(fp
,REDIS_EOF
) == -1) goto werr
;
2764 /* Make sure data will not remain on the OS's output buffers */
2769 /* Use RENAME to make sure the DB file is changed atomically only
2770 * if the generate DB file is ok. */
2771 if (rename(tmpfile
,filename
) == -1) {
2772 redisLog(REDIS_WARNING
,"Error moving temp DB file on the final destination: %s", strerror(errno
));
2776 redisLog(REDIS_NOTICE
,"DB saved on disk");
2778 server
.lastsave
= time(NULL
);
2784 redisLog(REDIS_WARNING
,"Write error saving DB on disk: %s", strerror(errno
));
2785 if (di
) dictReleaseIterator(di
);
2789 static int rdbSaveBackground(char *filename
) {
2792 if (server
.bgsavechildpid
!= -1) return REDIS_ERR
;
2793 if ((childpid
= fork()) == 0) {
2796 if (rdbSave(filename
) == REDIS_OK
) {
2803 if (childpid
== -1) {
2804 redisLog(REDIS_WARNING
,"Can't save in background: fork: %s",
2808 redisLog(REDIS_NOTICE
,"Background saving started by pid %d",childpid
);
2809 server
.bgsavechildpid
= childpid
;
2812 return REDIS_OK
; /* unreached */
2815 static void rdbRemoveTempFile(pid_t childpid
) {
2818 snprintf(tmpfile
,256,"temp-%d.rdb", (int) childpid
);
2822 static int rdbLoadType(FILE *fp
) {
2824 if (fread(&type
,1,1,fp
) == 0) return -1;
2828 static time_t rdbLoadTime(FILE *fp
) {
2830 if (fread(&t32
,4,1,fp
) == 0) return -1;
2831 return (time_t) t32
;
2834 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2835 * of this file for a description of how this are stored on disk.
2837 * isencoded is set to 1 if the readed length is not actually a length but
2838 * an "encoding type", check the above comments for more info */
2839 static uint32_t rdbLoadLen(FILE *fp
, int rdbver
, int *isencoded
) {
2840 unsigned char buf
[2];
2843 if (isencoded
) *isencoded
= 0;
2845 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
2850 if (fread(buf
,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2851 type
= (buf
[0]&0xC0)>>6;
2852 if (type
== REDIS_RDB_6BITLEN
) {
2853 /* Read a 6 bit len */
2855 } else if (type
== REDIS_RDB_ENCVAL
) {
2856 /* Read a 6 bit len encoding type */
2857 if (isencoded
) *isencoded
= 1;
2859 } else if (type
== REDIS_RDB_14BITLEN
) {
2860 /* Read a 14 bit len */
2861 if (fread(buf
+1,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2862 return ((buf
[0]&0x3F)<<8)|buf
[1];
2864 /* Read a 32 bit len */
2865 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
2871 static robj
*rdbLoadIntegerObject(FILE *fp
, int enctype
) {
2872 unsigned char enc
[4];
2875 if (enctype
== REDIS_RDB_ENC_INT8
) {
2876 if (fread(enc
,1,1,fp
) == 0) return NULL
;
2877 val
= (signed char)enc
[0];
2878 } else if (enctype
== REDIS_RDB_ENC_INT16
) {
2880 if (fread(enc
,2,1,fp
) == 0) return NULL
;
2881 v
= enc
[0]|(enc
[1]<<8);
2883 } else if (enctype
== REDIS_RDB_ENC_INT32
) {
2885 if (fread(enc
,4,1,fp
) == 0) return NULL
;
2886 v
= enc
[0]|(enc
[1]<<8)|(enc
[2]<<16)|(enc
[3]<<24);
2889 val
= 0; /* anti-warning */
2892 return createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",val
));
2895 static robj
*rdbLoadLzfStringObject(FILE*fp
, int rdbver
) {
2896 unsigned int len
, clen
;
2897 unsigned char *c
= NULL
;
2900 if ((clen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2901 if ((len
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2902 if ((c
= zmalloc(clen
)) == NULL
) goto err
;
2903 if ((val
= sdsnewlen(NULL
,len
)) == NULL
) goto err
;
2904 if (fread(c
,clen
,1,fp
) == 0) goto err
;
2905 if (lzf_decompress(c
,clen
,val
,len
) == 0) goto err
;
2907 return createObject(REDIS_STRING
,val
);
2914 static robj
*rdbLoadStringObject(FILE*fp
, int rdbver
) {
2919 len
= rdbLoadLen(fp
,rdbver
,&isencoded
);
2922 case REDIS_RDB_ENC_INT8
:
2923 case REDIS_RDB_ENC_INT16
:
2924 case REDIS_RDB_ENC_INT32
:
2925 return tryObjectSharing(rdbLoadIntegerObject(fp
,len
));
2926 case REDIS_RDB_ENC_LZF
:
2927 return tryObjectSharing(rdbLoadLzfStringObject(fp
,rdbver
));
2933 if (len
== REDIS_RDB_LENERR
) return NULL
;
2934 val
= sdsnewlen(NULL
,len
);
2935 if (len
&& fread(val
,len
,1,fp
) == 0) {
2939 return tryObjectSharing(createObject(REDIS_STRING
,val
));
2942 /* For information about double serialization check rdbSaveDoubleValue() */
2943 static int rdbLoadDoubleValue(FILE *fp
, double *val
) {
2947 if (fread(&len
,1,1,fp
) == 0) return -1;
2949 case 255: *val
= R_NegInf
; return 0;
2950 case 254: *val
= R_PosInf
; return 0;
2951 case 253: *val
= R_Nan
; return 0;
2953 if (fread(buf
,len
,1,fp
) == 0) return -1;
2955 sscanf(buf
, "%lg", val
);
2960 static int rdbLoad(char *filename
) {
2962 robj
*keyobj
= NULL
;
2964 int type
, retval
, rdbver
;
2965 dict
*d
= server
.db
[0].dict
;
2966 redisDb
*db
= server
.db
+0;
2968 time_t expiretime
= -1, now
= time(NULL
);
2970 fp
= fopen(filename
,"r");
2971 if (!fp
) return REDIS_ERR
;
2972 if (fread(buf
,9,1,fp
) == 0) goto eoferr
;
2974 if (memcmp(buf
,"REDIS",5) != 0) {
2976 redisLog(REDIS_WARNING
,"Wrong signature trying to load DB from file");
2979 rdbver
= atoi(buf
+5);
2982 redisLog(REDIS_WARNING
,"Can't handle RDB format version %d",rdbver
);
2989 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
2990 if (type
== REDIS_EXPIRETIME
) {
2991 if ((expiretime
= rdbLoadTime(fp
)) == -1) goto eoferr
;
2992 /* We read the time so we need to read the object type again */
2993 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
2995 if (type
== REDIS_EOF
) break;
2996 /* Handle SELECT DB opcode as a special case */
2997 if (type
== REDIS_SELECTDB
) {
2998 if ((dbid
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
3000 if (dbid
>= (unsigned)server
.dbnum
) {
3001 redisLog(REDIS_WARNING
,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server
.dbnum
);
3004 db
= server
.db
+dbid
;
3009 if ((keyobj
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
3011 if (type
== REDIS_STRING
) {
3012 /* Read string value */
3013 if ((o
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
3014 tryObjectEncoding(o
);
3015 } else if (type
== REDIS_LIST
|| type
== REDIS_SET
) {
3016 /* Read list/set value */
3019 if ((listlen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
3021 o
= (type
== REDIS_LIST
) ? createListObject() : createSetObject();
3022 /* Load every single element of the list/set */
3026 if ((ele
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
3027 tryObjectEncoding(ele
);
3028 if (type
== REDIS_LIST
) {
3029 listAddNodeTail((list
*)o
->ptr
,ele
);
3031 dictAdd((dict
*)o
->ptr
,ele
,NULL
);
3034 } else if (type
== REDIS_ZSET
) {
3035 /* Read list/set value */
3039 if ((zsetlen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
3041 o
= createZsetObject();
3043 /* Load every single element of the list/set */
3046 double *score
= zmalloc(sizeof(double));
3048 if ((ele
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
3049 tryObjectEncoding(ele
);
3050 if (rdbLoadDoubleValue(fp
,score
) == -1) goto eoferr
;
3051 dictAdd(zs
->dict
,ele
,score
);
3052 zslInsert(zs
->zsl
,*score
,ele
);
3053 incrRefCount(ele
); /* added to skiplist */
3056 redisAssert(0 != 0);
3058 /* Add the new object in the hash table */
3059 retval
= dictAdd(d
,keyobj
,o
);
3060 if (retval
== DICT_ERR
) {
3061 redisLog(REDIS_WARNING
,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj
->ptr
);
3064 /* Set the expire time if needed */
3065 if (expiretime
!= -1) {
3066 setExpire(db
,keyobj
,expiretime
);
3067 /* Delete this key if already expired */
3068 if (expiretime
< now
) deleteKey(db
,keyobj
);
3076 eoferr
: /* unexpected end of file is handled here with a fatal exit */
3077 if (keyobj
) decrRefCount(keyobj
);
3078 redisLog(REDIS_WARNING
,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
3080 return REDIS_ERR
; /* Just to avoid warning */
3083 /*================================== Commands =============================== */
3085 static void authCommand(redisClient
*c
) {
3086 if (!server
.requirepass
|| !strcmp(c
->argv
[1]->ptr
, server
.requirepass
)) {
3087 c
->authenticated
= 1;
3088 addReply(c
,shared
.ok
);
3090 c
->authenticated
= 0;
3091 addReplySds(c
,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
3095 static void pingCommand(redisClient
*c
) {
3096 addReply(c
,shared
.pong
);
3099 static void echoCommand(redisClient
*c
) {
3100 addReplyBulkLen(c
,c
->argv
[1]);
3101 addReply(c
,c
->argv
[1]);
3102 addReply(c
,shared
.crlf
);
3105 /*=================================== Strings =============================== */
3107 static void setGenericCommand(redisClient
*c
, int nx
) {
3110 if (nx
) deleteIfVolatile(c
->db
,c
->argv
[1]);
3111 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
3112 if (retval
== DICT_ERR
) {
3114 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
3115 incrRefCount(c
->argv
[2]);
3117 addReply(c
,shared
.czero
);
3121 incrRefCount(c
->argv
[1]);
3122 incrRefCount(c
->argv
[2]);
3125 removeExpire(c
->db
,c
->argv
[1]);
3126 addReply(c
, nx
? shared
.cone
: shared
.ok
);
3129 static void setCommand(redisClient
*c
) {
3130 setGenericCommand(c
,0);
3133 static void setnxCommand(redisClient
*c
) {
3134 setGenericCommand(c
,1);
3137 static int getGenericCommand(redisClient
*c
) {
3138 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3141 addReply(c
,shared
.nullbulk
);
3144 if (o
->type
!= REDIS_STRING
) {
3145 addReply(c
,shared
.wrongtypeerr
);
3148 addReplyBulkLen(c
,o
);
3150 addReply(c
,shared
.crlf
);
3156 static void getCommand(redisClient
*c
) {
3157 getGenericCommand(c
);
3160 static void getsetCommand(redisClient
*c
) {
3161 if (getGenericCommand(c
) == REDIS_ERR
) return;
3162 if (dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]) == DICT_ERR
) {
3163 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
3165 incrRefCount(c
->argv
[1]);
3167 incrRefCount(c
->argv
[2]);
3169 removeExpire(c
->db
,c
->argv
[1]);
3172 static void mgetCommand(redisClient
*c
) {
3175 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",c
->argc
-1));
3176 for (j
= 1; j
< c
->argc
; j
++) {
3177 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[j
]);
3179 addReply(c
,shared
.nullbulk
);
3181 if (o
->type
!= REDIS_STRING
) {
3182 addReply(c
,shared
.nullbulk
);
3184 addReplyBulkLen(c
,o
);
3186 addReply(c
,shared
.crlf
);
3192 static void msetGenericCommand(redisClient
*c
, int nx
) {
3193 int j
, busykeys
= 0;
3195 if ((c
->argc
% 2) == 0) {
3196 addReplySds(c
,sdsnew("-ERR wrong number of arguments for MSET\r\n"));
3199 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
3200 * set nothing at all if at least one already key exists. */
3202 for (j
= 1; j
< c
->argc
; j
+= 2) {
3203 if (lookupKeyWrite(c
->db
,c
->argv
[j
]) != NULL
) {
3209 addReply(c
, shared
.czero
);
3213 for (j
= 1; j
< c
->argc
; j
+= 2) {
3216 tryObjectEncoding(c
->argv
[j
+1]);
3217 retval
= dictAdd(c
->db
->dict
,c
->argv
[j
],c
->argv
[j
+1]);
3218 if (retval
== DICT_ERR
) {
3219 dictReplace(c
->db
->dict
,c
->argv
[j
],c
->argv
[j
+1]);
3220 incrRefCount(c
->argv
[j
+1]);
3222 incrRefCount(c
->argv
[j
]);
3223 incrRefCount(c
->argv
[j
+1]);
3225 removeExpire(c
->db
,c
->argv
[j
]);
3227 server
.dirty
+= (c
->argc
-1)/2;
3228 addReply(c
, nx
? shared
.cone
: shared
.ok
);
3231 static void msetCommand(redisClient
*c
) {
3232 msetGenericCommand(c
,0);
3235 static void msetnxCommand(redisClient
*c
) {
3236 msetGenericCommand(c
,1);
3239 static void incrDecrCommand(redisClient
*c
, long long incr
) {
3244 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3248 if (o
->type
!= REDIS_STRING
) {
3253 if (o
->encoding
== REDIS_ENCODING_RAW
)
3254 value
= strtoll(o
->ptr
, &eptr
, 10);
3255 else if (o
->encoding
== REDIS_ENCODING_INT
)
3256 value
= (long)o
->ptr
;
3258 redisAssert(1 != 1);
3263 o
= createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",value
));
3264 tryObjectEncoding(o
);
3265 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],o
);
3266 if (retval
== DICT_ERR
) {
3267 dictReplace(c
->db
->dict
,c
->argv
[1],o
);
3268 removeExpire(c
->db
,c
->argv
[1]);
3270 incrRefCount(c
->argv
[1]);
3273 addReply(c
,shared
.colon
);
3275 addReply(c
,shared
.crlf
);
3278 static void incrCommand(redisClient
*c
) {
3279 incrDecrCommand(c
,1);
3282 static void decrCommand(redisClient
*c
) {
3283 incrDecrCommand(c
,-1);
3286 static void incrbyCommand(redisClient
*c
) {
3287 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
3288 incrDecrCommand(c
,incr
);
3291 static void decrbyCommand(redisClient
*c
) {
3292 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
3293 incrDecrCommand(c
,-incr
);
3296 /* ========================= Type agnostic commands ========================= */
3298 static void delCommand(redisClient
*c
) {
3301 for (j
= 1; j
< c
->argc
; j
++) {
3302 if (deleteKey(c
->db
,c
->argv
[j
])) {
3309 addReply(c
,shared
.czero
);
3312 addReply(c
,shared
.cone
);
3315 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",deleted
));
3320 static void existsCommand(redisClient
*c
) {
3321 addReply(c
,lookupKeyRead(c
->db
,c
->argv
[1]) ? shared
.cone
: shared
.czero
);
3324 static void selectCommand(redisClient
*c
) {
3325 int id
= atoi(c
->argv
[1]->ptr
);
3327 if (selectDb(c
,id
) == REDIS_ERR
) {
3328 addReplySds(c
,sdsnew("-ERR invalid DB index\r\n"));
3330 addReply(c
,shared
.ok
);
3334 static void randomkeyCommand(redisClient
*c
) {
3338 de
= dictGetRandomKey(c
->db
->dict
);
3339 if (!de
|| expireIfNeeded(c
->db
,dictGetEntryKey(de
)) == 0) break;
3342 addReply(c
,shared
.plus
);
3343 addReply(c
,shared
.crlf
);
3345 addReply(c
,shared
.plus
);
3346 addReply(c
,dictGetEntryKey(de
));
3347 addReply(c
,shared
.crlf
);
3351 static void keysCommand(redisClient
*c
) {
3354 sds pattern
= c
->argv
[1]->ptr
;
3355 int plen
= sdslen(pattern
);
3356 unsigned long numkeys
= 0, keyslen
= 0;
3357 robj
*lenobj
= createObject(REDIS_STRING
,NULL
);
3359 di
= dictGetIterator(c
->db
->dict
);
3361 decrRefCount(lenobj
);
3362 while((de
= dictNext(di
)) != NULL
) {
3363 robj
*keyobj
= dictGetEntryKey(de
);
3365 sds key
= keyobj
->ptr
;
3366 if ((pattern
[0] == '*' && pattern
[1] == '\0') ||
3367 stringmatchlen(pattern
,plen
,key
,sdslen(key
),0)) {
3368 if (expireIfNeeded(c
->db
,keyobj
) == 0) {
3370 addReply(c
,shared
.space
);
3373 keyslen
+= sdslen(key
);
3377 dictReleaseIterator(di
);
3378 lenobj
->ptr
= sdscatprintf(sdsempty(),"$%lu\r\n",keyslen
+(numkeys
? (numkeys
-1) : 0));
3379 addReply(c
,shared
.crlf
);
3382 static void dbsizeCommand(redisClient
*c
) {
3384 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c
->db
->dict
)));
3387 static void lastsaveCommand(redisClient
*c
) {
3389 sdscatprintf(sdsempty(),":%lu\r\n",server
.lastsave
));
3392 static void typeCommand(redisClient
*c
) {
3396 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3401 case REDIS_STRING
: type
= "+string"; break;
3402 case REDIS_LIST
: type
= "+list"; break;
3403 case REDIS_SET
: type
= "+set"; break;
3404 case REDIS_ZSET
: type
= "+zset"; break;
3405 default: type
= "unknown"; break;
3408 addReplySds(c
,sdsnew(type
));
3409 addReply(c
,shared
.crlf
);
3412 static void saveCommand(redisClient
*c
) {
3413 if (server
.bgsavechildpid
!= -1) {
3414 addReplySds(c
,sdsnew("-ERR background save in progress\r\n"));
3417 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
3418 addReply(c
,shared
.ok
);
3420 addReply(c
,shared
.err
);
3424 static void bgsaveCommand(redisClient
*c
) {
3425 if (server
.bgsavechildpid
!= -1) {
3426 addReplySds(c
,sdsnew("-ERR background save already in progress\r\n"));
3429 if (rdbSaveBackground(server
.dbfilename
) == REDIS_OK
) {
3430 char *status
= "+Background saving started\r\n";
3431 addReplySds(c
,sdsnew(status
));
3433 addReply(c
,shared
.err
);
3437 static void shutdownCommand(redisClient
*c
) {
3438 redisLog(REDIS_WARNING
,"User requested shutdown, saving DB...");
3439 /* Kill the saving child if there is a background saving in progress.
3440 We want to avoid race conditions, for instance our saving child may
3441 overwrite the synchronous saving did by SHUTDOWN. */
3442 if (server
.bgsavechildpid
!= -1) {
3443 redisLog(REDIS_WARNING
,"There is a live saving child. Killing it!");
3444 kill(server
.bgsavechildpid
,SIGKILL
);
3445 rdbRemoveTempFile(server
.bgsavechildpid
);
3447 if (server
.appendonly
) {
3448 /* Append only file: fsync() the AOF and exit */
3449 fsync(server
.appendfd
);
3452 /* Snapshotting. Perform a SYNC SAVE and exit */
3453 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
3454 if (server
.daemonize
)
3455 unlink(server
.pidfile
);
3456 redisLog(REDIS_WARNING
,"%zu bytes used at exit",zmalloc_used_memory());
3457 redisLog(REDIS_WARNING
,"Server exit now, bye bye...");
3460 /* Ooops.. error saving! The best we can do is to continue operating.
3461 * Note that if there was a background saving process, in the next
3462 * cron() Redis will be notified that the background saving aborted,
3463 * handling special stuff like slaves pending for synchronization... */
3464 redisLog(REDIS_WARNING
,"Error trying to save the DB, can't exit");
3465 addReplySds(c
,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3470 static void renameGenericCommand(redisClient
*c
, int nx
) {
3473 /* To use the same key as src and dst is probably an error */
3474 if (sdscmp(c
->argv
[1]->ptr
,c
->argv
[2]->ptr
) == 0) {
3475 addReply(c
,shared
.sameobjecterr
);
3479 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3481 addReply(c
,shared
.nokeyerr
);
3485 deleteIfVolatile(c
->db
,c
->argv
[2]);
3486 if (dictAdd(c
->db
->dict
,c
->argv
[2],o
) == DICT_ERR
) {
3489 addReply(c
,shared
.czero
);
3492 dictReplace(c
->db
->dict
,c
->argv
[2],o
);
3494 incrRefCount(c
->argv
[2]);
3496 deleteKey(c
->db
,c
->argv
[1]);
3498 addReply(c
,nx
? shared
.cone
: shared
.ok
);
3501 static void renameCommand(redisClient
*c
) {
3502 renameGenericCommand(c
,0);
3505 static void renamenxCommand(redisClient
*c
) {
3506 renameGenericCommand(c
,1);
3509 static void moveCommand(redisClient
*c
) {
3514 /* Obtain source and target DB pointers */
3517 if (selectDb(c
,atoi(c
->argv
[2]->ptr
)) == REDIS_ERR
) {
3518 addReply(c
,shared
.outofrangeerr
);
3522 selectDb(c
,srcid
); /* Back to the source DB */
3524 /* If the user is moving using as target the same
3525 * DB as the source DB it is probably an error. */
3527 addReply(c
,shared
.sameobjecterr
);
3531 /* Check if the element exists and get a reference */
3532 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3534 addReply(c
,shared
.czero
);
3538 /* Try to add the element to the target DB */
3539 deleteIfVolatile(dst
,c
->argv
[1]);
3540 if (dictAdd(dst
->dict
,c
->argv
[1],o
) == DICT_ERR
) {
3541 addReply(c
,shared
.czero
);
3544 incrRefCount(c
->argv
[1]);
3547 /* OK! key moved, free the entry in the source DB */
3548 deleteKey(src
,c
->argv
[1]);
3550 addReply(c
,shared
.cone
);
3553 /* =================================== Lists ================================ */
3554 static void pushGenericCommand(redisClient
*c
, int where
) {
3558 lobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3560 if (handleClientsWaitingListPush(c
,c
->argv
[1],c
->argv
[2])) {
3561 addReply(c
,shared
.ok
);
3564 lobj
= createListObject();
3566 if (where
== REDIS_HEAD
) {
3567 listAddNodeHead(list
,c
->argv
[2]);
3569 listAddNodeTail(list
,c
->argv
[2]);
3571 dictAdd(c
->db
->dict
,c
->argv
[1],lobj
);
3572 incrRefCount(c
->argv
[1]);
3573 incrRefCount(c
->argv
[2]);
3575 if (lobj
->type
!= REDIS_LIST
) {
3576 addReply(c
,shared
.wrongtypeerr
);
3579 if (handleClientsWaitingListPush(c
,c
->argv
[1],c
->argv
[2])) {
3580 addReply(c
,shared
.ok
);
3584 if (where
== REDIS_HEAD
) {
3585 listAddNodeHead(list
,c
->argv
[2]);
3587 listAddNodeTail(list
,c
->argv
[2]);
3589 incrRefCount(c
->argv
[2]);
3592 addReply(c
,shared
.ok
);
3595 static void lpushCommand(redisClient
*c
) {
3596 pushGenericCommand(c
,REDIS_HEAD
);
3599 static void rpushCommand(redisClient
*c
) {
3600 pushGenericCommand(c
,REDIS_TAIL
);
3603 static void llenCommand(redisClient
*c
) {
3607 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3609 addReply(c
,shared
.czero
);
3612 if (o
->type
!= REDIS_LIST
) {
3613 addReply(c
,shared
.wrongtypeerr
);
3616 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",listLength(l
)));
3621 static void lindexCommand(redisClient
*c
) {
3623 int index
= atoi(c
->argv
[2]->ptr
);
3625 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3627 addReply(c
,shared
.nullbulk
);
3629 if (o
->type
!= REDIS_LIST
) {
3630 addReply(c
,shared
.wrongtypeerr
);
3632 list
*list
= o
->ptr
;
3635 ln
= listIndex(list
, index
);
3637 addReply(c
,shared
.nullbulk
);
3639 robj
*ele
= listNodeValue(ln
);
3640 addReplyBulkLen(c
,ele
);
3642 addReply(c
,shared
.crlf
);
3648 static void lsetCommand(redisClient
*c
) {
3650 int index
= atoi(c
->argv
[2]->ptr
);
3652 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3654 addReply(c
,shared
.nokeyerr
);
3656 if (o
->type
!= REDIS_LIST
) {
3657 addReply(c
,shared
.wrongtypeerr
);
3659 list
*list
= o
->ptr
;
3662 ln
= listIndex(list
, index
);
3664 addReply(c
,shared
.outofrangeerr
);
3666 robj
*ele
= listNodeValue(ln
);
3669 listNodeValue(ln
) = c
->argv
[3];
3670 incrRefCount(c
->argv
[3]);
3671 addReply(c
,shared
.ok
);
3678 static void popGenericCommand(redisClient
*c
, int where
) {
3681 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3683 addReply(c
,shared
.nullbulk
);
3685 if (o
->type
!= REDIS_LIST
) {
3686 addReply(c
,shared
.wrongtypeerr
);
3688 list
*list
= o
->ptr
;
3691 if (where
== REDIS_HEAD
)
3692 ln
= listFirst(list
);
3694 ln
= listLast(list
);
3697 addReply(c
,shared
.nullbulk
);
3699 robj
*ele
= listNodeValue(ln
);
3700 addReplyBulkLen(c
,ele
);
3702 addReply(c
,shared
.crlf
);
3703 listDelNode(list
,ln
);
3710 static void lpopCommand(redisClient
*c
) {
3711 popGenericCommand(c
,REDIS_HEAD
);
3714 static void rpopCommand(redisClient
*c
) {
3715 popGenericCommand(c
,REDIS_TAIL
);
3718 static void lrangeCommand(redisClient
*c
) {
3720 int start
= atoi(c
->argv
[2]->ptr
);
3721 int end
= atoi(c
->argv
[3]->ptr
);
3723 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3725 addReply(c
,shared
.nullmultibulk
);
3727 if (o
->type
!= REDIS_LIST
) {
3728 addReply(c
,shared
.wrongtypeerr
);
3730 list
*list
= o
->ptr
;
3732 int llen
= listLength(list
);
3736 /* convert negative indexes */
3737 if (start
< 0) start
= llen
+start
;
3738 if (end
< 0) end
= llen
+end
;
3739 if (start
< 0) start
= 0;
3740 if (end
< 0) end
= 0;
3742 /* indexes sanity checks */
3743 if (start
> end
|| start
>= llen
) {
3744 /* Out of range start or start > end result in empty list */
3745 addReply(c
,shared
.emptymultibulk
);
3748 if (end
>= llen
) end
= llen
-1;
3749 rangelen
= (end
-start
)+1;
3751 /* Return the result in form of a multi-bulk reply */
3752 ln
= listIndex(list
, start
);
3753 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",rangelen
));
3754 for (j
= 0; j
< rangelen
; j
++) {
3755 ele
= listNodeValue(ln
);
3756 addReplyBulkLen(c
,ele
);
3758 addReply(c
,shared
.crlf
);
3765 static void ltrimCommand(redisClient
*c
) {
3767 int start
= atoi(c
->argv
[2]->ptr
);
3768 int end
= atoi(c
->argv
[3]->ptr
);
3770 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3772 addReply(c
,shared
.ok
);
3774 if (o
->type
!= REDIS_LIST
) {
3775 addReply(c
,shared
.wrongtypeerr
);
3777 list
*list
= o
->ptr
;
3779 int llen
= listLength(list
);
3780 int j
, ltrim
, rtrim
;
3782 /* convert negative indexes */
3783 if (start
< 0) start
= llen
+start
;
3784 if (end
< 0) end
= llen
+end
;
3785 if (start
< 0) start
= 0;
3786 if (end
< 0) end
= 0;
3788 /* indexes sanity checks */
3789 if (start
> end
|| start
>= llen
) {
3790 /* Out of range start or start > end result in empty list */
3794 if (end
>= llen
) end
= llen
-1;
3799 /* Remove list elements to perform the trim */
3800 for (j
= 0; j
< ltrim
; j
++) {
3801 ln
= listFirst(list
);
3802 listDelNode(list
,ln
);
3804 for (j
= 0; j
< rtrim
; j
++) {
3805 ln
= listLast(list
);
3806 listDelNode(list
,ln
);
3809 addReply(c
,shared
.ok
);
3814 static void lremCommand(redisClient
*c
) {
3817 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3819 addReply(c
,shared
.czero
);
3821 if (o
->type
!= REDIS_LIST
) {
3822 addReply(c
,shared
.wrongtypeerr
);
3824 list
*list
= o
->ptr
;
3825 listNode
*ln
, *next
;
3826 int toremove
= atoi(c
->argv
[2]->ptr
);
3831 toremove
= -toremove
;
3834 ln
= fromtail
? list
->tail
: list
->head
;
3836 robj
*ele
= listNodeValue(ln
);
3838 next
= fromtail
? ln
->prev
: ln
->next
;
3839 if (compareStringObjects(ele
,c
->argv
[3]) == 0) {
3840 listDelNode(list
,ln
);
3843 if (toremove
&& removed
== toremove
) break;
3847 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",removed
));
3852 /* This is the semantic of this command:
3853 * RPOPLPUSH srclist dstlist:
3854 * IF LLEN(srclist) > 0
3855 * element = RPOP srclist
3856 * LPUSH dstlist element
3863 * The idea is to be able to get an element from a list in a reliable way
3864 * since the element is not just returned but pushed against another list
3865 * as well. This command was originally proposed by Ezra Zygmuntowicz.
3867 static void rpoplpushcommand(redisClient
*c
) {
3870 sobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3872 addReply(c
,shared
.nullbulk
);
3874 if (sobj
->type
!= REDIS_LIST
) {
3875 addReply(c
,shared
.wrongtypeerr
);
3877 list
*srclist
= sobj
->ptr
;
3878 listNode
*ln
= listLast(srclist
);
3881 addReply(c
,shared
.nullbulk
);
3883 robj
*dobj
= lookupKeyWrite(c
->db
,c
->argv
[2]);
3884 robj
*ele
= listNodeValue(ln
);
3887 if (dobj
&& dobj
->type
!= REDIS_LIST
) {
3888 addReply(c
,shared
.wrongtypeerr
);
3892 /* Add the element to the target list (unless it's directly
3893 * passed to some BLPOP-ing client */
3894 if (!handleClientsWaitingListPush(c
,c
->argv
[2],ele
)) {
3896 /* Create the list if the key does not exist */
3897 dobj
= createListObject();
3898 dictAdd(c
->db
->dict
,c
->argv
[2],dobj
);
3899 incrRefCount(c
->argv
[2]);
3901 dstlist
= dobj
->ptr
;
3902 listAddNodeHead(dstlist
,ele
);
3906 /* Send the element to the client as reply as well */
3907 addReplyBulkLen(c
,ele
);
3909 addReply(c
,shared
.crlf
);
3911 /* Finally remove the element from the source list */
3912 listDelNode(srclist
,ln
);
3920 /* ==================================== Sets ================================ */
3922 static void saddCommand(redisClient
*c
) {
3925 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3927 set
= createSetObject();
3928 dictAdd(c
->db
->dict
,c
->argv
[1],set
);
3929 incrRefCount(c
->argv
[1]);
3931 if (set
->type
!= REDIS_SET
) {
3932 addReply(c
,shared
.wrongtypeerr
);
3936 if (dictAdd(set
->ptr
,c
->argv
[2],NULL
) == DICT_OK
) {
3937 incrRefCount(c
->argv
[2]);
3939 addReply(c
,shared
.cone
);
3941 addReply(c
,shared
.czero
);
3945 static void sremCommand(redisClient
*c
) {
3948 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3950 addReply(c
,shared
.czero
);
3952 if (set
->type
!= REDIS_SET
) {
3953 addReply(c
,shared
.wrongtypeerr
);
3956 if (dictDelete(set
->ptr
,c
->argv
[2]) == DICT_OK
) {
3958 if (htNeedsResize(set
->ptr
)) dictResize(set
->ptr
);
3959 addReply(c
,shared
.cone
);
3961 addReply(c
,shared
.czero
);
3966 static void smoveCommand(redisClient
*c
) {
3967 robj
*srcset
, *dstset
;
3969 srcset
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3970 dstset
= lookupKeyWrite(c
->db
,c
->argv
[2]);
3972 /* If the source key does not exist return 0, if it's of the wrong type
3974 if (srcset
== NULL
|| srcset
->type
!= REDIS_SET
) {
3975 addReply(c
, srcset
? shared
.wrongtypeerr
: shared
.czero
);
3978 /* Error if the destination key is not a set as well */
3979 if (dstset
&& dstset
->type
!= REDIS_SET
) {
3980 addReply(c
,shared
.wrongtypeerr
);
3983 /* Remove the element from the source set */
3984 if (dictDelete(srcset
->ptr
,c
->argv
[3]) == DICT_ERR
) {
3985 /* Key not found in the src set! return zero */
3986 addReply(c
,shared
.czero
);
3990 /* Add the element to the destination set */
3992 dstset
= createSetObject();
3993 dictAdd(c
->db
->dict
,c
->argv
[2],dstset
);
3994 incrRefCount(c
->argv
[2]);
3996 if (dictAdd(dstset
->ptr
,c
->argv
[3],NULL
) == DICT_OK
)
3997 incrRefCount(c
->argv
[3]);
3998 addReply(c
,shared
.cone
);
4001 static void sismemberCommand(redisClient
*c
) {
4004 set
= lookupKeyRead(c
->db
,c
->argv
[1]);
4006 addReply(c
,shared
.czero
);
4008 if (set
->type
!= REDIS_SET
) {
4009 addReply(c
,shared
.wrongtypeerr
);
4012 if (dictFind(set
->ptr
,c
->argv
[2]))
4013 addReply(c
,shared
.cone
);
4015 addReply(c
,shared
.czero
);
4019 static void scardCommand(redisClient
*c
) {
4023 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4025 addReply(c
,shared
.czero
);
4028 if (o
->type
!= REDIS_SET
) {
4029 addReply(c
,shared
.wrongtypeerr
);
4032 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",
4038 static void spopCommand(redisClient
*c
) {
4042 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4044 addReply(c
,shared
.nullbulk
);
4046 if (set
->type
!= REDIS_SET
) {
4047 addReply(c
,shared
.wrongtypeerr
);
4050 de
= dictGetRandomKey(set
->ptr
);
4052 addReply(c
,shared
.nullbulk
);
4054 robj
*ele
= dictGetEntryKey(de
);
4056 addReplyBulkLen(c
,ele
);
4058 addReply(c
,shared
.crlf
);
4059 dictDelete(set
->ptr
,ele
);
4060 if (htNeedsResize(set
->ptr
)) dictResize(set
->ptr
);
4066 static void srandmemberCommand(redisClient
*c
) {
4070 set
= lookupKeyRead(c
->db
,c
->argv
[1]);
4072 addReply(c
,shared
.nullbulk
);
4074 if (set
->type
!= REDIS_SET
) {
4075 addReply(c
,shared
.wrongtypeerr
);
4078 de
= dictGetRandomKey(set
->ptr
);
4080 addReply(c
,shared
.nullbulk
);
4082 robj
*ele
= dictGetEntryKey(de
);
4084 addReplyBulkLen(c
,ele
);
4086 addReply(c
,shared
.crlf
);
4091 static int qsortCompareSetsByCardinality(const void *s1
, const void *s2
) {
4092 dict
**d1
= (void*) s1
, **d2
= (void*) s2
;
4094 return dictSize(*d1
)-dictSize(*d2
);
4097 static void sinterGenericCommand(redisClient
*c
, robj
**setskeys
, unsigned long setsnum
, robj
*dstkey
) {
4098 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
4101 robj
*lenobj
= NULL
, *dstset
= NULL
;
4102 unsigned long j
, cardinality
= 0;
4104 for (j
= 0; j
< setsnum
; j
++) {
4108 lookupKeyWrite(c
->db
,setskeys
[j
]) :
4109 lookupKeyRead(c
->db
,setskeys
[j
]);
4113 if (deleteKey(c
->db
,dstkey
))
4115 addReply(c
,shared
.czero
);
4117 addReply(c
,shared
.nullmultibulk
);
4121 if (setobj
->type
!= REDIS_SET
) {
4123 addReply(c
,shared
.wrongtypeerr
);
4126 dv
[j
] = setobj
->ptr
;
4128 /* Sort sets from the smallest to largest, this will improve our
4129 * algorithm's performace */
4130 qsort(dv
,setsnum
,sizeof(dict
*),qsortCompareSetsByCardinality
);
4132 /* The first thing we should output is the total number of elements...
4133 * since this is a multi-bulk write, but at this stage we don't know
4134 * the intersection set size, so we use a trick, append an empty object
4135 * to the output list and save the pointer to later modify it with the
4138 lenobj
= createObject(REDIS_STRING
,NULL
);
4140 decrRefCount(lenobj
);
4142 /* If we have a target key where to store the resulting set
4143 * create this key with an empty set inside */
4144 dstset
= createSetObject();
4147 /* Iterate all the elements of the first (smallest) set, and test
4148 * the element against all the other sets, if at least one set does
4149 * not include the element it is discarded */
4150 di
= dictGetIterator(dv
[0]);
4152 while((de
= dictNext(di
)) != NULL
) {
4155 for (j
= 1; j
< setsnum
; j
++)
4156 if (dictFind(dv
[j
],dictGetEntryKey(de
)) == NULL
) break;
4158 continue; /* at least one set does not contain the member */
4159 ele
= dictGetEntryKey(de
);
4161 addReplyBulkLen(c
,ele
);
4163 addReply(c
,shared
.crlf
);
4166 dictAdd(dstset
->ptr
,ele
,NULL
);
4170 dictReleaseIterator(di
);
4173 /* Store the resulting set into the target */
4174 deleteKey(c
->db
,dstkey
);
4175 dictAdd(c
->db
->dict
,dstkey
,dstset
);
4176 incrRefCount(dstkey
);
4180 lenobj
->ptr
= sdscatprintf(sdsempty(),"*%lu\r\n",cardinality
);
4182 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",
4183 dictSize((dict
*)dstset
->ptr
)));
4189 static void sinterCommand(redisClient
*c
) {
4190 sinterGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
);
4193 static void sinterstoreCommand(redisClient
*c
) {
4194 sinterGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1]);
4197 #define REDIS_OP_UNION 0
4198 #define REDIS_OP_DIFF 1
4200 static void sunionDiffGenericCommand(redisClient
*c
, robj
**setskeys
, int setsnum
, robj
*dstkey
, int op
) {
4201 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
4204 robj
*dstset
= NULL
;
4205 int j
, cardinality
= 0;
4207 for (j
= 0; j
< setsnum
; j
++) {
4211 lookupKeyWrite(c
->db
,setskeys
[j
]) :
4212 lookupKeyRead(c
->db
,setskeys
[j
]);
4217 if (setobj
->type
!= REDIS_SET
) {
4219 addReply(c
,shared
.wrongtypeerr
);
4222 dv
[j
] = setobj
->ptr
;
4225 /* We need a temp set object to store our union. If the dstkey
4226 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
4227 * this set object will be the resulting object to set into the target key*/
4228 dstset
= createSetObject();
4230 /* Iterate all the elements of all the sets, add every element a single
4231 * time to the result set */
4232 for (j
= 0; j
< setsnum
; j
++) {
4233 if (op
== REDIS_OP_DIFF
&& j
== 0 && !dv
[j
]) break; /* result set is empty */
4234 if (!dv
[j
]) continue; /* non existing keys are like empty sets */
4236 di
= dictGetIterator(dv
[j
]);
4238 while((de
= dictNext(di
)) != NULL
) {
4241 /* dictAdd will not add the same element multiple times */
4242 ele
= dictGetEntryKey(de
);
4243 if (op
== REDIS_OP_UNION
|| j
== 0) {
4244 if (dictAdd(dstset
->ptr
,ele
,NULL
) == DICT_OK
) {
4248 } else if (op
== REDIS_OP_DIFF
) {
4249 if (dictDelete(dstset
->ptr
,ele
) == DICT_OK
) {
4254 dictReleaseIterator(di
);
4256 if (op
== REDIS_OP_DIFF
&& cardinality
== 0) break; /* result set is empty */
4259 /* Output the content of the resulting set, if not in STORE mode */
4261 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",cardinality
));
4262 di
= dictGetIterator(dstset
->ptr
);
4263 while((de
= dictNext(di
)) != NULL
) {
4266 ele
= dictGetEntryKey(de
);
4267 addReplyBulkLen(c
,ele
);
4269 addReply(c
,shared
.crlf
);
4271 dictReleaseIterator(di
);
4273 /* If we have a target key where to store the resulting set
4274 * create this key with the result set inside */
4275 deleteKey(c
->db
,dstkey
);
4276 dictAdd(c
->db
->dict
,dstkey
,dstset
);
4277 incrRefCount(dstkey
);
4282 decrRefCount(dstset
);
4284 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",
4285 dictSize((dict
*)dstset
->ptr
)));
4291 static void sunionCommand(redisClient
*c
) {
4292 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_UNION
);
4295 static void sunionstoreCommand(redisClient
*c
) {
4296 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_UNION
);
4299 static void sdiffCommand(redisClient
*c
) {
4300 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_DIFF
);
4303 static void sdiffstoreCommand(redisClient
*c
) {
4304 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_DIFF
);
4307 /* ==================================== ZSets =============================== */
4309 /* ZSETs are ordered sets using two data structures to hold the same elements
4310 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
4313 * The elements are added to an hash table mapping Redis objects to scores.
4314 * At the same time the elements are added to a skip list mapping scores
4315 * to Redis objects (so objects are sorted by scores in this "view"). */
4317 /* This skiplist implementation is almost a C translation of the original
4318 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
4319 * Alternative to Balanced Trees", modified in three ways:
4320 * a) this implementation allows for repeated values.
4321 * b) the comparison is not just by key (our 'score') but by satellite data.
4322 * c) there is a back pointer, so it's a doubly linked list with the back
4323 * pointers being only at "level 1". This allows to traverse the list
4324 * from tail to head, useful for ZREVRANGE. */
4326 static zskiplistNode
*zslCreateNode(int level
, double score
, robj
*obj
) {
4327 zskiplistNode
*zn
= zmalloc(sizeof(*zn
));
4329 zn
->forward
= zmalloc(sizeof(zskiplistNode
*) * level
);
4335 static zskiplist
*zslCreate(void) {
4339 zsl
= zmalloc(sizeof(*zsl
));
4342 zsl
->header
= zslCreateNode(ZSKIPLIST_MAXLEVEL
,0,NULL
);
4343 for (j
= 0; j
< ZSKIPLIST_MAXLEVEL
; j
++)
4344 zsl
->header
->forward
[j
] = NULL
;
4345 zsl
->header
->backward
= NULL
;
4350 static void zslFreeNode(zskiplistNode
*node
) {
4351 decrRefCount(node
->obj
);
4352 zfree(node
->forward
);
4356 static void zslFree(zskiplist
*zsl
) {
4357 zskiplistNode
*node
= zsl
->header
->forward
[0], *next
;
4359 zfree(zsl
->header
->forward
);
4362 next
= node
->forward
[0];
4369 static int zslRandomLevel(void) {
4371 while ((random()&0xFFFF) < (ZSKIPLIST_P
* 0xFFFF))
4376 static void zslInsert(zskiplist
*zsl
, double score
, robj
*obj
) {
4377 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
4381 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4382 while (x
->forward
[i
] &&
4383 (x
->forward
[i
]->score
< score
||
4384 (x
->forward
[i
]->score
== score
&&
4385 compareStringObjects(x
->forward
[i
]->obj
,obj
) < 0)))
4389 /* we assume the key is not already inside, since we allow duplicated
4390 * scores, and the re-insertion of score and redis object should never
4391 * happpen since the caller of zslInsert() should test in the hash table
4392 * if the element is already inside or not. */
4393 level
= zslRandomLevel();
4394 if (level
> zsl
->level
) {
4395 for (i
= zsl
->level
; i
< level
; i
++)
4396 update
[i
] = zsl
->header
;
4399 x
= zslCreateNode(level
,score
,obj
);
4400 for (i
= 0; i
< level
; i
++) {
4401 x
->forward
[i
] = update
[i
]->forward
[i
];
4402 update
[i
]->forward
[i
] = x
;
4404 x
->backward
= (update
[0] == zsl
->header
) ? NULL
: update
[0];
4406 x
->forward
[0]->backward
= x
;
4412 /* Delete an element with matching score/object from the skiplist. */
4413 static int zslDelete(zskiplist
*zsl
, double score
, robj
*obj
) {
4414 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
4418 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4419 while (x
->forward
[i
] &&
4420 (x
->forward
[i
]->score
< score
||
4421 (x
->forward
[i
]->score
== score
&&
4422 compareStringObjects(x
->forward
[i
]->obj
,obj
) < 0)))
4426 /* We may have multiple elements with the same score, what we need
4427 * is to find the element with both the right score and object. */
4429 if (x
&& score
== x
->score
&& compareStringObjects(x
->obj
,obj
) == 0) {
4430 for (i
= 0; i
< zsl
->level
; i
++) {
4431 if (update
[i
]->forward
[i
] != x
) break;
4432 update
[i
]->forward
[i
] = x
->forward
[i
];
4434 if (x
->forward
[0]) {
4435 x
->forward
[0]->backward
= (x
->backward
== zsl
->header
) ?
4438 zsl
->tail
= x
->backward
;
4441 while(zsl
->level
> 1 && zsl
->header
->forward
[zsl
->level
-1] == NULL
)
4446 return 0; /* not found */
4448 return 0; /* not found */
4451 /* Delete all the elements with score between min and max from the skiplist.
4452 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
4453 * Note that this function takes the reference to the hash table view of the
4454 * sorted set, in order to remove the elements from the hash table too. */
4455 static unsigned long zslDeleteRange(zskiplist
*zsl
, double min
, double max
, dict
*dict
) {
4456 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
4457 unsigned long removed
= 0;
4461 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4462 while (x
->forward
[i
] && x
->forward
[i
]->score
< min
)
4466 /* We may have multiple elements with the same score, what we need
4467 * is to find the element with both the right score and object. */
4469 while (x
&& x
->score
<= max
) {
4470 zskiplistNode
*next
;
4472 for (i
= 0; i
< zsl
->level
; i
++) {
4473 if (update
[i
]->forward
[i
] != x
) break;
4474 update
[i
]->forward
[i
] = x
->forward
[i
];
4476 if (x
->forward
[0]) {
4477 x
->forward
[0]->backward
= (x
->backward
== zsl
->header
) ?
4480 zsl
->tail
= x
->backward
;
4482 next
= x
->forward
[0];
4483 dictDelete(dict
,x
->obj
);
4485 while(zsl
->level
> 1 && zsl
->header
->forward
[zsl
->level
-1] == NULL
)
4491 return removed
; /* not found */
4494 /* Find the first node having a score equal or greater than the specified one.
4495 * Returns NULL if there is no match. */
4496 static zskiplistNode
*zslFirstWithScore(zskiplist
*zsl
, double score
) {
4501 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4502 while (x
->forward
[i
] && x
->forward
[i
]->score
< score
)
4505 /* We may have multiple elements with the same score, what we need
4506 * is to find the element with both the right score and object. */
4507 return x
->forward
[0];
4510 /* The actual Z-commands implementations */
4512 /* This generic command implements both ZADD and ZINCRBY.
4513 * scoreval is the score if the operation is a ZADD (doincrement == 0) or
4514 * the increment if the operation is a ZINCRBY (doincrement == 1). */
4515 static void zaddGenericCommand(redisClient
*c
, robj
*key
, robj
*ele
, double scoreval
, int doincrement
) {
4520 zsetobj
= lookupKeyWrite(c
->db
,key
);
4521 if (zsetobj
== NULL
) {
4522 zsetobj
= createZsetObject();
4523 dictAdd(c
->db
->dict
,key
,zsetobj
);
4526 if (zsetobj
->type
!= REDIS_ZSET
) {
4527 addReply(c
,shared
.wrongtypeerr
);
4533 /* Ok now since we implement both ZADD and ZINCRBY here the code
4534 * needs to handle the two different conditions. It's all about setting
4535 * '*score', that is, the new score to set, to the right value. */
4536 score
= zmalloc(sizeof(double));
4540 /* Read the old score. If the element was not present starts from 0 */
4541 de
= dictFind(zs
->dict
,ele
);
4543 double *oldscore
= dictGetEntryVal(de
);
4544 *score
= *oldscore
+ scoreval
;
4552 /* What follows is a simple remove and re-insert operation that is common
4553 * to both ZADD and ZINCRBY... */
4554 if (dictAdd(zs
->dict
,ele
,score
) == DICT_OK
) {
4555 /* case 1: New element */
4556 incrRefCount(ele
); /* added to hash */
4557 zslInsert(zs
->zsl
,*score
,ele
);
4558 incrRefCount(ele
); /* added to skiplist */
4561 addReplyDouble(c
,*score
);
4563 addReply(c
,shared
.cone
);
4568 /* case 2: Score update operation */
4569 de
= dictFind(zs
->dict
,ele
);
4570 redisAssert(de
!= NULL
);
4571 oldscore
= dictGetEntryVal(de
);
4572 if (*score
!= *oldscore
) {
4575 /* Remove and insert the element in the skip list with new score */
4576 deleted
= zslDelete(zs
->zsl
,*oldscore
,ele
);
4577 redisAssert(deleted
!= 0);
4578 zslInsert(zs
->zsl
,*score
,ele
);
4580 /* Update the score in the hash table */
4581 dictReplace(zs
->dict
,ele
,score
);
4587 addReplyDouble(c
,*score
);
4589 addReply(c
,shared
.czero
);
4593 static void zaddCommand(redisClient
*c
) {
4596 scoreval
= strtod(c
->argv
[2]->ptr
,NULL
);
4597 zaddGenericCommand(c
,c
->argv
[1],c
->argv
[3],scoreval
,0);
4600 static void zincrbyCommand(redisClient
*c
) {
4603 scoreval
= strtod(c
->argv
[2]->ptr
,NULL
);
4604 zaddGenericCommand(c
,c
->argv
[1],c
->argv
[3],scoreval
,1);
4607 static void zremCommand(redisClient
*c
) {
4611 zsetobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4612 if (zsetobj
== NULL
) {
4613 addReply(c
,shared
.czero
);
4619 if (zsetobj
->type
!= REDIS_ZSET
) {
4620 addReply(c
,shared
.wrongtypeerr
);
4624 de
= dictFind(zs
->dict
,c
->argv
[2]);
4626 addReply(c
,shared
.czero
);
4629 /* Delete from the skiplist */
4630 oldscore
= dictGetEntryVal(de
);
4631 deleted
= zslDelete(zs
->zsl
,*oldscore
,c
->argv
[2]);
4632 redisAssert(deleted
!= 0);
4634 /* Delete from the hash table */
4635 dictDelete(zs
->dict
,c
->argv
[2]);
4636 if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
);
4638 addReply(c
,shared
.cone
);
4642 static void zremrangebyscoreCommand(redisClient
*c
) {
4643 double min
= strtod(c
->argv
[2]->ptr
,NULL
);
4644 double max
= strtod(c
->argv
[3]->ptr
,NULL
);
4648 zsetobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4649 if (zsetobj
== NULL
) {
4650 addReply(c
,shared
.czero
);
4654 if (zsetobj
->type
!= REDIS_ZSET
) {
4655 addReply(c
,shared
.wrongtypeerr
);
4659 deleted
= zslDeleteRange(zs
->zsl
,min
,max
,zs
->dict
);
4660 if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
);
4661 server
.dirty
+= deleted
;
4662 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",deleted
));
4666 static void zrangeGenericCommand(redisClient
*c
, int reverse
) {
4668 int start
= atoi(c
->argv
[2]->ptr
);
4669 int end
= atoi(c
->argv
[3]->ptr
);
4672 if (c
->argc
== 5 && !strcasecmp(c
->argv
[4]->ptr
,"withscores")) {
4674 } else if (c
->argc
>= 5) {
4675 addReply(c
,shared
.syntaxerr
);
4679 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4681 addReply(c
,shared
.nullmultibulk
);
4683 if (o
->type
!= REDIS_ZSET
) {
4684 addReply(c
,shared
.wrongtypeerr
);
4686 zset
*zsetobj
= o
->ptr
;
4687 zskiplist
*zsl
= zsetobj
->zsl
;
4690 int llen
= zsl
->length
;
4694 /* convert negative indexes */
4695 if (start
< 0) start
= llen
+start
;
4696 if (end
< 0) end
= llen
+end
;
4697 if (start
< 0) start
= 0;
4698 if (end
< 0) end
= 0;
4700 /* indexes sanity checks */
4701 if (start
> end
|| start
>= llen
) {
4702 /* Out of range start or start > end result in empty list */
4703 addReply(c
,shared
.emptymultibulk
);
4706 if (end
>= llen
) end
= llen
-1;
4707 rangelen
= (end
-start
)+1;
4709 /* Return the result in form of a multi-bulk reply */
4715 ln
= zsl
->header
->forward
[0];
4717 ln
= ln
->forward
[0];
4720 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",
4721 withscores
? (rangelen
*2) : rangelen
));
4722 for (j
= 0; j
< rangelen
; j
++) {
4724 addReplyBulkLen(c
,ele
);
4726 addReply(c
,shared
.crlf
);
4728 addReplyDouble(c
,ln
->score
);
4729 ln
= reverse
? ln
->backward
: ln
->forward
[0];
4735 static void zrangeCommand(redisClient
*c
) {
4736 zrangeGenericCommand(c
,0);
4739 static void zrevrangeCommand(redisClient
*c
) {
4740 zrangeGenericCommand(c
,1);
4743 static void zrangebyscoreCommand(redisClient
*c
) {
4745 double min
= strtod(c
->argv
[2]->ptr
,NULL
);
4746 double max
= strtod(c
->argv
[3]->ptr
,NULL
);
4747 int offset
= 0, limit
= -1;
4749 if (c
->argc
!= 4 && c
->argc
!= 7) {
4751 sdsnew("-ERR wrong number of arguments for ZRANGEBYSCORE\r\n"));
4753 } else if (c
->argc
== 7 && strcasecmp(c
->argv
[4]->ptr
,"limit")) {
4754 addReply(c
,shared
.syntaxerr
);
4756 } else if (c
->argc
== 7) {
4757 offset
= atoi(c
->argv
[5]->ptr
);
4758 limit
= atoi(c
->argv
[6]->ptr
);
4759 if (offset
< 0) offset
= 0;
4762 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4764 addReply(c
,shared
.nullmultibulk
);
4766 if (o
->type
!= REDIS_ZSET
) {
4767 addReply(c
,shared
.wrongtypeerr
);
4769 zset
*zsetobj
= o
->ptr
;
4770 zskiplist
*zsl
= zsetobj
->zsl
;
4773 unsigned int rangelen
= 0;
4775 /* Get the first node with the score >= min */
4776 ln
= zslFirstWithScore(zsl
,min
);
4778 /* No element matching the speciifed interval */
4779 addReply(c
,shared
.emptymultibulk
);
4783 /* We don't know in advance how many matching elements there
4784 * are in the list, so we push this object that will represent
4785 * the multi-bulk length in the output buffer, and will "fix"
4787 lenobj
= createObject(REDIS_STRING
,NULL
);
4789 decrRefCount(lenobj
);
4791 while(ln
&& ln
->score
<= max
) {
4794 ln
= ln
->forward
[0];
4797 if (limit
== 0) break;
4799 addReplyBulkLen(c
,ele
);
4801 addReply(c
,shared
.crlf
);
4802 ln
= ln
->forward
[0];
4804 if (limit
> 0) limit
--;
4806 lenobj
->ptr
= sdscatprintf(sdsempty(),"*%d\r\n",rangelen
);
4811 static void zcardCommand(redisClient
*c
) {
4815 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4817 addReply(c
,shared
.czero
);
4820 if (o
->type
!= REDIS_ZSET
) {
4821 addReply(c
,shared
.wrongtypeerr
);
4824 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",zs
->zsl
->length
));
4829 static void zscoreCommand(redisClient
*c
) {
4833 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4835 addReply(c
,shared
.nullbulk
);
4838 if (o
->type
!= REDIS_ZSET
) {
4839 addReply(c
,shared
.wrongtypeerr
);
4844 de
= dictFind(zs
->dict
,c
->argv
[2]);
4846 addReply(c
,shared
.nullbulk
);
4848 double *score
= dictGetEntryVal(de
);
4850 addReplyDouble(c
,*score
);
4856 /* ========================= Non type-specific commands ==================== */
4858 static void flushdbCommand(redisClient
*c
) {
4859 server
.dirty
+= dictSize(c
->db
->dict
);
4860 dictEmpty(c
->db
->dict
);
4861 dictEmpty(c
->db
->expires
);
4862 addReply(c
,shared
.ok
);
4865 static void flushallCommand(redisClient
*c
) {
4866 server
.dirty
+= emptyDb();
4867 addReply(c
,shared
.ok
);
4868 rdbSave(server
.dbfilename
);
4872 static redisSortOperation
*createSortOperation(int type
, robj
*pattern
) {
4873 redisSortOperation
*so
= zmalloc(sizeof(*so
));
4875 so
->pattern
= pattern
;
4879 /* Return the value associated to the key with a name obtained
4880 * substituting the first occurence of '*' in 'pattern' with 'subst' */
4881 static robj
*lookupKeyByPattern(redisDb
*db
, robj
*pattern
, robj
*subst
) {
4885 int prefixlen
, sublen
, postfixlen
;
4886 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
4890 char buf
[REDIS_SORTKEY_MAX
+1];
4893 /* If the pattern is "#" return the substitution object itself in order
4894 * to implement the "SORT ... GET #" feature. */
4895 spat
= pattern
->ptr
;
4896 if (spat
[0] == '#' && spat
[1] == '\0') {
4900 /* The substitution object may be specially encoded. If so we create
4901 * a decoded object on the fly. Otherwise getDecodedObject will just
4902 * increment the ref count, that we'll decrement later. */
4903 subst
= getDecodedObject(subst
);
4906 if (sdslen(spat
)+sdslen(ssub
)-1 > REDIS_SORTKEY_MAX
) return NULL
;
4907 p
= strchr(spat
,'*');
4909 decrRefCount(subst
);
4914 sublen
= sdslen(ssub
);
4915 postfixlen
= sdslen(spat
)-(prefixlen
+1);
4916 memcpy(keyname
.buf
,spat
,prefixlen
);
4917 memcpy(keyname
.buf
+prefixlen
,ssub
,sublen
);
4918 memcpy(keyname
.buf
+prefixlen
+sublen
,p
+1,postfixlen
);
4919 keyname
.buf
[prefixlen
+sublen
+postfixlen
] = '\0';
4920 keyname
.len
= prefixlen
+sublen
+postfixlen
;
4922 initStaticStringObject(keyobj
,((char*)&keyname
)+(sizeof(long)*2))
4923 decrRefCount(subst
);
4925 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
4926 return lookupKeyRead(db
,&keyobj
);
4929 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
4930 * the additional parameter is not standard but a BSD-specific we have to
4931 * pass sorting parameters via the global 'server' structure */
4932 static int sortCompare(const void *s1
, const void *s2
) {
4933 const redisSortObject
*so1
= s1
, *so2
= s2
;
4936 if (!server
.sort_alpha
) {
4937 /* Numeric sorting. Here it's trivial as we precomputed scores */
4938 if (so1
->u
.score
> so2
->u
.score
) {
4940 } else if (so1
->u
.score
< so2
->u
.score
) {
4946 /* Alphanumeric sorting */
4947 if (server
.sort_bypattern
) {
4948 if (!so1
->u
.cmpobj
|| !so2
->u
.cmpobj
) {
4949 /* At least one compare object is NULL */
4950 if (so1
->u
.cmpobj
== so2
->u
.cmpobj
)
4952 else if (so1
->u
.cmpobj
== NULL
)
4957 /* We have both the objects, use strcoll */
4958 cmp
= strcoll(so1
->u
.cmpobj
->ptr
,so2
->u
.cmpobj
->ptr
);
4961 /* Compare elements directly */
4964 dec1
= getDecodedObject(so1
->obj
);
4965 dec2
= getDecodedObject(so2
->obj
);
4966 cmp
= strcoll(dec1
->ptr
,dec2
->ptr
);
4971 return server
.sort_desc
? -cmp
: cmp
;
4974 /* The SORT command is the most complex command in Redis. Warning: this code
4975 * is optimized for speed and a bit less for readability */
4976 static void sortCommand(redisClient
*c
) {
4979 int desc
= 0, alpha
= 0;
4980 int limit_start
= 0, limit_count
= -1, start
, end
;
4981 int j
, dontsort
= 0, vectorlen
;
4982 int getop
= 0; /* GET operation counter */
4983 robj
*sortval
, *sortby
= NULL
, *storekey
= NULL
;
4984 redisSortObject
*vector
; /* Resulting vector to sort */
4986 /* Lookup the key to sort. It must be of the right types */
4987 sortval
= lookupKeyRead(c
->db
,c
->argv
[1]);
4988 if (sortval
== NULL
) {
4989 addReply(c
,shared
.nullmultibulk
);
4992 if (sortval
->type
!= REDIS_SET
&& sortval
->type
!= REDIS_LIST
&&
4993 sortval
->type
!= REDIS_ZSET
)
4995 addReply(c
,shared
.wrongtypeerr
);
4999 /* Create a list of operations to perform for every sorted element.
5000 * Operations can be GET/DEL/INCR/DECR */
5001 operations
= listCreate();
5002 listSetFreeMethod(operations
,zfree
);
5005 /* Now we need to protect sortval incrementing its count, in the future
5006 * SORT may have options able to overwrite/delete keys during the sorting
5007 * and the sorted key itself may get destroied */
5008 incrRefCount(sortval
);
5010 /* The SORT command has an SQL-alike syntax, parse it */
5011 while(j
< c
->argc
) {
5012 int leftargs
= c
->argc
-j
-1;
5013 if (!strcasecmp(c
->argv
[j
]->ptr
,"asc")) {
5015 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"desc")) {
5017 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"alpha")) {
5019 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"limit") && leftargs
>= 2) {
5020 limit_start
= atoi(c
->argv
[j
+1]->ptr
);
5021 limit_count
= atoi(c
->argv
[j
+2]->ptr
);
5023 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"store") && leftargs
>= 1) {
5024 storekey
= c
->argv
[j
+1];
5026 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"by") && leftargs
>= 1) {
5027 sortby
= c
->argv
[j
+1];
5028 /* If the BY pattern does not contain '*', i.e. it is constant,
5029 * we don't need to sort nor to lookup the weight keys. */
5030 if (strchr(c
->argv
[j
+1]->ptr
,'*') == NULL
) dontsort
= 1;
5032 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"get") && leftargs
>= 1) {
5033 listAddNodeTail(operations
,createSortOperation(
5034 REDIS_SORT_GET
,c
->argv
[j
+1]));
5038 decrRefCount(sortval
);
5039 listRelease(operations
);
5040 addReply(c
,shared
.syntaxerr
);
5046 /* Load the sorting vector with all the objects to sort */
5047 switch(sortval
->type
) {
5048 case REDIS_LIST
: vectorlen
= listLength((list
*)sortval
->ptr
); break;
5049 case REDIS_SET
: vectorlen
= dictSize((dict
*)sortval
->ptr
); break;
5050 case REDIS_ZSET
: vectorlen
= dictSize(((zset
*)sortval
->ptr
)->dict
); break;
5051 default: vectorlen
= 0; redisAssert(0); /* Avoid GCC warning */
5053 vector
= zmalloc(sizeof(redisSortObject
)*vectorlen
);
5056 if (sortval
->type
== REDIS_LIST
) {
5057 list
*list
= sortval
->ptr
;
5061 while((ln
= listYield(list
))) {
5062 robj
*ele
= ln
->value
;
5063 vector
[j
].obj
= ele
;
5064 vector
[j
].u
.score
= 0;
5065 vector
[j
].u
.cmpobj
= NULL
;
5073 if (sortval
->type
== REDIS_SET
) {
5076 zset
*zs
= sortval
->ptr
;
5080 di
= dictGetIterator(set
);
5081 while((setele
= dictNext(di
)) != NULL
) {
5082 vector
[j
].obj
= dictGetEntryKey(setele
);
5083 vector
[j
].u
.score
= 0;
5084 vector
[j
].u
.cmpobj
= NULL
;
5087 dictReleaseIterator(di
);
5089 redisAssert(j
== vectorlen
);
5091 /* Now it's time to load the right scores in the sorting vector */
5092 if (dontsort
== 0) {
5093 for (j
= 0; j
< vectorlen
; j
++) {
5097 byval
= lookupKeyByPattern(c
->db
,sortby
,vector
[j
].obj
);
5098 if (!byval
|| byval
->type
!= REDIS_STRING
) continue;
5100 vector
[j
].u
.cmpobj
= getDecodedObject(byval
);
5102 if (byval
->encoding
== REDIS_ENCODING_RAW
) {
5103 vector
[j
].u
.score
= strtod(byval
->ptr
,NULL
);
5105 /* Don't need to decode the object if it's
5106 * integer-encoded (the only encoding supported) so
5107 * far. We can just cast it */
5108 if (byval
->encoding
== REDIS_ENCODING_INT
) {
5109 vector
[j
].u
.score
= (long)byval
->ptr
;
5111 redisAssert(1 != 1);
5116 if (vector
[j
].obj
->encoding
== REDIS_ENCODING_RAW
)
5117 vector
[j
].u
.score
= strtod(vector
[j
].obj
->ptr
,NULL
);
5119 if (vector
[j
].obj
->encoding
== REDIS_ENCODING_INT
)
5120 vector
[j
].u
.score
= (long) vector
[j
].obj
->ptr
;
5122 redisAssert(1 != 1);
5129 /* We are ready to sort the vector... perform a bit of sanity check
5130 * on the LIMIT option too. We'll use a partial version of quicksort. */
5131 start
= (limit_start
< 0) ? 0 : limit_start
;
5132 end
= (limit_count
< 0) ? vectorlen
-1 : start
+limit_count
-1;
5133 if (start
>= vectorlen
) {
5134 start
= vectorlen
-1;
5137 if (end
>= vectorlen
) end
= vectorlen
-1;
5139 if (dontsort
== 0) {
5140 server
.sort_desc
= desc
;
5141 server
.sort_alpha
= alpha
;
5142 server
.sort_bypattern
= sortby
? 1 : 0;
5143 if (sortby
&& (start
!= 0 || end
!= vectorlen
-1))
5144 pqsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
, start
,end
);
5146 qsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
);
5149 /* Send command output to the output buffer, performing the specified
5150 * GET/DEL/INCR/DECR operations if any. */
5151 outputlen
= getop
? getop
*(end
-start
+1) : end
-start
+1;
5152 if (storekey
== NULL
) {
5153 /* STORE option not specified, sent the sorting result to client */
5154 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",outputlen
));
5155 for (j
= start
; j
<= end
; j
++) {
5158 addReplyBulkLen(c
,vector
[j
].obj
);
5159 addReply(c
,vector
[j
].obj
);
5160 addReply(c
,shared
.crlf
);
5162 listRewind(operations
);
5163 while((ln
= listYield(operations
))) {
5164 redisSortOperation
*sop
= ln
->value
;
5165 robj
*val
= lookupKeyByPattern(c
->db
,sop
->pattern
,
5168 if (sop
->type
== REDIS_SORT_GET
) {
5169 if (!val
|| val
->type
!= REDIS_STRING
) {
5170 addReply(c
,shared
.nullbulk
);
5172 addReplyBulkLen(c
,val
);
5174 addReply(c
,shared
.crlf
);
5177 redisAssert(sop
->type
== REDIS_SORT_GET
); /* always fails */
5182 robj
*listObject
= createListObject();
5183 list
*listPtr
= (list
*) listObject
->ptr
;
5185 /* STORE option specified, set the sorting result as a List object */
5186 for (j
= start
; j
<= end
; j
++) {
5189 listAddNodeTail(listPtr
,vector
[j
].obj
);
5190 incrRefCount(vector
[j
].obj
);
5192 listRewind(operations
);
5193 while((ln
= listYield(operations
))) {
5194 redisSortOperation
*sop
= ln
->value
;
5195 robj
*val
= lookupKeyByPattern(c
->db
,sop
->pattern
,
5198 if (sop
->type
== REDIS_SORT_GET
) {
5199 if (!val
|| val
->type
!= REDIS_STRING
) {
5200 listAddNodeTail(listPtr
,createStringObject("",0));
5202 listAddNodeTail(listPtr
,val
);
5206 redisAssert(sop
->type
== REDIS_SORT_GET
); /* always fails */
5210 if (dictReplace(c
->db
->dict
,storekey
,listObject
)) {
5211 incrRefCount(storekey
);
5213 /* Note: we add 1 because the DB is dirty anyway since even if the
5214 * SORT result is empty a new key is set and maybe the old content
5216 server
.dirty
+= 1+outputlen
;
5217 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",outputlen
));
5221 decrRefCount(sortval
);
5222 listRelease(operations
);
5223 for (j
= 0; j
< vectorlen
; j
++) {
5224 if (sortby
&& alpha
&& vector
[j
].u
.cmpobj
)
5225 decrRefCount(vector
[j
].u
.cmpobj
);
5230 /* Create the string returned by the INFO command. This is decoupled
5231 * by the INFO command itself as we need to report the same information
5232 * on memory corruption problems. */
5233 static sds
genRedisInfoString(void) {
5235 time_t uptime
= time(NULL
)-server
.stat_starttime
;
5238 info
= sdscatprintf(sdsempty(),
5239 "redis_version:%s\r\n"
5241 "multiplexing_api:%s\r\n"
5242 "uptime_in_seconds:%ld\r\n"
5243 "uptime_in_days:%ld\r\n"
5244 "connected_clients:%d\r\n"
5245 "connected_slaves:%d\r\n"
5246 "blocked_clients:%d\r\n"
5247 "used_memory:%zu\r\n"
5248 "changes_since_last_save:%lld\r\n"
5249 "bgsave_in_progress:%d\r\n"
5250 "last_save_time:%ld\r\n"
5251 "bgrewriteaof_in_progress:%d\r\n"
5252 "total_connections_received:%lld\r\n"
5253 "total_commands_processed:%lld\r\n"
5256 (sizeof(long) == 8) ? "64" : "32",
5260 listLength(server
.clients
)-listLength(server
.slaves
),
5261 listLength(server
.slaves
),
5262 server
.blockedclients
,
5265 server
.bgsavechildpid
!= -1,
5267 server
.bgrewritechildpid
!= -1,
5268 server
.stat_numconnections
,
5269 server
.stat_numcommands
,
5270 server
.masterhost
== NULL
? "master" : "slave"
5272 if (server
.masterhost
) {
5273 info
= sdscatprintf(info
,
5274 "master_host:%s\r\n"
5275 "master_port:%d\r\n"
5276 "master_link_status:%s\r\n"
5277 "master_last_io_seconds_ago:%d\r\n"
5280 (server
.replstate
== REDIS_REPL_CONNECTED
) ?
5282 server
.master
? ((int)(time(NULL
)-server
.master
->lastinteraction
)) : -1
5285 for (j
= 0; j
< server
.dbnum
; j
++) {
5286 long long keys
, vkeys
;
5288 keys
= dictSize(server
.db
[j
].dict
);
5289 vkeys
= dictSize(server
.db
[j
].expires
);
5290 if (keys
|| vkeys
) {
5291 info
= sdscatprintf(info
, "db%d:keys=%lld,expires=%lld\r\n",
5298 static void infoCommand(redisClient
*c
) {
5299 sds info
= genRedisInfoString();
5300 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n",
5301 (unsigned long)sdslen(info
)));
5302 addReplySds(c
,info
);
5303 addReply(c
,shared
.crlf
);
5306 static void monitorCommand(redisClient
*c
) {
5307 /* ignore MONITOR if aleady slave or in monitor mode */
5308 if (c
->flags
& REDIS_SLAVE
) return;
5310 c
->flags
|= (REDIS_SLAVE
|REDIS_MONITOR
);
5312 listAddNodeTail(server
.monitors
,c
);
5313 addReply(c
,shared
.ok
);
5316 /* ================================= Expire ================================= */
5317 static int removeExpire(redisDb
*db
, robj
*key
) {
5318 if (dictDelete(db
->expires
,key
) == DICT_OK
) {
5325 static int setExpire(redisDb
*db
, robj
*key
, time_t when
) {
5326 if (dictAdd(db
->expires
,key
,(void*)when
) == DICT_ERR
) {
5334 /* Return the expire time of the specified key, or -1 if no expire
5335 * is associated with this key (i.e. the key is non volatile) */
5336 static time_t getExpire(redisDb
*db
, robj
*key
) {
5339 /* No expire? return ASAP */
5340 if (dictSize(db
->expires
) == 0 ||
5341 (de
= dictFind(db
->expires
,key
)) == NULL
) return -1;
5343 return (time_t) dictGetEntryVal(de
);
5346 static int expireIfNeeded(redisDb
*db
, robj
*key
) {
5350 /* No expire? return ASAP */
5351 if (dictSize(db
->expires
) == 0 ||
5352 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
5354 /* Lookup the expire */
5355 when
= (time_t) dictGetEntryVal(de
);
5356 if (time(NULL
) <= when
) return 0;
5358 /* Delete the key */
5359 dictDelete(db
->expires
,key
);
5360 return dictDelete(db
->dict
,key
) == DICT_OK
;
5363 static int deleteIfVolatile(redisDb
*db
, robj
*key
) {
5366 /* No expire? return ASAP */
5367 if (dictSize(db
->expires
) == 0 ||
5368 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
5370 /* Delete the key */
5372 dictDelete(db
->expires
,key
);
5373 return dictDelete(db
->dict
,key
) == DICT_OK
;
5376 static void expireGenericCommand(redisClient
*c
, robj
*key
, time_t seconds
) {
5379 de
= dictFind(c
->db
->dict
,key
);
5381 addReply(c
,shared
.czero
);
5385 if (deleteKey(c
->db
,key
)) server
.dirty
++;
5386 addReply(c
, shared
.cone
);
5389 time_t when
= time(NULL
)+seconds
;
5390 if (setExpire(c
->db
,key
,when
)) {
5391 addReply(c
,shared
.cone
);
5394 addReply(c
,shared
.czero
);
5400 static void expireCommand(redisClient
*c
) {
5401 expireGenericCommand(c
,c
->argv
[1],strtol(c
->argv
[2]->ptr
,NULL
,10));
5404 static void expireatCommand(redisClient
*c
) {
5405 expireGenericCommand(c
,c
->argv
[1],strtol(c
->argv
[2]->ptr
,NULL
,10)-time(NULL
));
5408 static void ttlCommand(redisClient
*c
) {
5412 expire
= getExpire(c
->db
,c
->argv
[1]);
5414 ttl
= (int) (expire
-time(NULL
));
5415 if (ttl
< 0) ttl
= -1;
5417 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",ttl
));
5420 /* ================================ MULTI/EXEC ============================== */
5422 /* Client state initialization for MULTI/EXEC */
5423 static void initClientMultiState(redisClient
*c
) {
5424 c
->mstate
.commands
= NULL
;
5425 c
->mstate
.count
= 0;
5428 /* Release all the resources associated with MULTI/EXEC state */
5429 static void freeClientMultiState(redisClient
*c
) {
5432 for (j
= 0; j
< c
->mstate
.count
; j
++) {
5434 multiCmd
*mc
= c
->mstate
.commands
+j
;
5436 for (i
= 0; i
< mc
->argc
; i
++)
5437 decrRefCount(mc
->argv
[i
]);
5440 zfree(c
->mstate
.commands
);
5443 /* Add a new command into the MULTI commands queue */
5444 static void queueMultiCommand(redisClient
*c
, struct redisCommand
*cmd
) {
5448 c
->mstate
.commands
= zrealloc(c
->mstate
.commands
,
5449 sizeof(multiCmd
)*(c
->mstate
.count
+1));
5450 mc
= c
->mstate
.commands
+c
->mstate
.count
;
5453 mc
->argv
= zmalloc(sizeof(robj
*)*c
->argc
);
5454 memcpy(mc
->argv
,c
->argv
,sizeof(robj
*)*c
->argc
);
5455 for (j
= 0; j
< c
->argc
; j
++)
5456 incrRefCount(mc
->argv
[j
]);
5460 static void multiCommand(redisClient
*c
) {
5461 c
->flags
|= REDIS_MULTI
;
5462 addReply(c
,shared
.ok
);
5465 static void execCommand(redisClient
*c
) {
5470 if (!(c
->flags
& REDIS_MULTI
)) {
5471 addReplySds(c
,sdsnew("-ERR EXEC without MULTI\r\n"));
5475 orig_argv
= c
->argv
;
5476 orig_argc
= c
->argc
;
5477 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",c
->mstate
.count
));
5478 for (j
= 0; j
< c
->mstate
.count
; j
++) {
5479 c
->argc
= c
->mstate
.commands
[j
].argc
;
5480 c
->argv
= c
->mstate
.commands
[j
].argv
;
5481 call(c
,c
->mstate
.commands
[j
].cmd
);
5483 c
->argv
= orig_argv
;
5484 c
->argc
= orig_argc
;
5485 freeClientMultiState(c
);
5486 initClientMultiState(c
);
5487 c
->flags
&= (~REDIS_MULTI
);
5490 /* =========================== Blocking Operations ========================= */
5492 /* Currently Redis blocking operations support is limited to list POP ops,
5493 * so the current implementation is not fully generic, but it is also not
5494 * completely specific so it will not require a rewrite to support new
5495 * kind of blocking operations in the future.
5497 * Still it's important to note that list blocking operations can be already
5498 * used as a notification mechanism in order to implement other blocking
5499 * operations at application level, so there must be a very strong evidence
5500 * of usefulness and generality before new blocking operations are implemented.
5502 * This is how the current blocking POP works, we use BLPOP as example:
5503 * - If the user calls BLPOP and the key exists and contains a non empty list
5504 * then LPOP is called instead. So BLPOP is semantically the same as LPOP
5505 * if there is not to block.
5506 * - If instead BLPOP is called and the key does not exists or the list is
5507 * empty we need to block. In order to do so we remove the notification for
5508 * new data to read in the client socket (so that we'll not serve new
5509 * requests if the blocking request is not served). Also we put the client
5510 * in a dictionary (db->blockingkeys) mapping keys to a list of clients
5511 * blocking for this keys.
5512 * - If a PUSH operation against a key with blocked clients waiting is
5513 * performed, we serve the first in the list: basically instead to push
5514 * the new element inside the list we return it to the (first / oldest)
5515 * blocking client, unblock the client, and remove it form the list.
5517 * The above comment and the source code should be enough in order to understand
5518 * the implementation and modify / fix it later.
5521 /* Set a client in blocking mode for the specified key, with the specified
5523 static void blockForKeys(redisClient
*c
, robj
**keys
, int numkeys
, time_t timeout
) {
5528 c
->blockingkeys
= zmalloc(sizeof(robj
*)*numkeys
);
5529 c
->blockingkeysnum
= numkeys
;
5530 c
->blockingto
= timeout
;
5531 for (j
= 0; j
< numkeys
; j
++) {
5532 /* Add the key in the client structure, to map clients -> keys */
5533 c
->blockingkeys
[j
] = keys
[j
];
5534 incrRefCount(keys
[j
]);
5536 /* And in the other "side", to map keys -> clients */
5537 de
= dictFind(c
->db
->blockingkeys
,keys
[j
]);
5541 /* For every key we take a list of clients blocked for it */
5543 retval
= dictAdd(c
->db
->blockingkeys
,keys
[j
],l
);
5544 incrRefCount(keys
[j
]);
5545 assert(retval
== DICT_OK
);
5547 l
= dictGetEntryVal(de
);
5549 listAddNodeTail(l
,c
);
5551 /* Mark the client as a blocked client */
5552 c
->flags
|= REDIS_BLOCKED
;
5553 aeDeleteFileEvent(server
.el
,c
->fd
,AE_READABLE
);
5554 server
.blockedclients
++;
5557 /* Unblock a client that's waiting in a blocking operation such as BLPOP */
5558 static void unblockClient(redisClient
*c
) {
5563 assert(c
->blockingkeys
!= NULL
);
5564 /* The client may wait for multiple keys, so unblock it for every key. */
5565 for (j
= 0; j
< c
->blockingkeysnum
; j
++) {
5566 /* Remove this client from the list of clients waiting for this key. */
5567 de
= dictFind(c
->db
->blockingkeys
,c
->blockingkeys
[j
]);
5569 l
= dictGetEntryVal(de
);
5570 listDelNode(l
,listSearchKey(l
,c
));
5571 /* If the list is empty we need to remove it to avoid wasting memory */
5572 if (listLength(l
) == 0)
5573 dictDelete(c
->db
->blockingkeys
,c
->blockingkeys
[j
]);
5574 decrRefCount(c
->blockingkeys
[j
]);
5576 /* Cleanup the client structure */
5577 zfree(c
->blockingkeys
);
5578 c
->blockingkeys
= NULL
;
5579 c
->flags
&= (~REDIS_BLOCKED
);
5580 server
.blockedclients
--;
5581 /* Ok now we are ready to get read events from socket, note that we
5582 * can't trap errors here as it's possible that unblockClients() is
5583 * called from freeClient() itself, and the only thing we can do
5584 * if we failed to register the READABLE event is to kill the client.
5585 * Still the following function should never fail in the real world as
5586 * we are sure the file descriptor is sane, and we exit on out of mem. */
5587 aeCreateFileEvent(server
.el
, c
->fd
, AE_READABLE
, readQueryFromClient
, c
);
5588 /* As a final step we want to process data if there is some command waiting
5589 * in the input buffer. Note that this is safe even if unblockClient()
5590 * gets called from freeClient() because freeClient() will be smart
5591 * enough to call this function *after* c->querybuf was set to NULL. */
5592 if (c
->querybuf
&& sdslen(c
->querybuf
) > 0) processInputBuffer(c
);
5595 /* This should be called from any function PUSHing into lists.
5596 * 'c' is the "pushing client", 'key' is the key it is pushing data against,
5597 * 'ele' is the element pushed.
5599 * If the function returns 0 there was no client waiting for a list push
5602 * If the function returns 1 there was a client waiting for a list push
5603 * against this key, the element was passed to this client thus it's not
5604 * needed to actually add it to the list and the caller should return asap. */
5605 static int handleClientsWaitingListPush(redisClient
*c
, robj
*key
, robj
*ele
) {
5606 struct dictEntry
*de
;
5607 redisClient
*receiver
;
5611 de
= dictFind(c
->db
->blockingkeys
,key
);
5612 if (de
== NULL
) return 0;
5613 l
= dictGetEntryVal(de
);
5616 receiver
= ln
->value
;
5618 addReplySds(receiver
,sdsnew("*2\r\n"));
5619 addReplyBulkLen(receiver
,key
);
5620 addReply(receiver
,key
);
5621 addReply(receiver
,shared
.crlf
);
5622 addReplyBulkLen(receiver
,ele
);
5623 addReply(receiver
,ele
);
5624 addReply(receiver
,shared
.crlf
);
5625 unblockClient(receiver
);
5629 /* Blocking RPOP/LPOP */
5630 static void blockingPopGenericCommand(redisClient
*c
, int where
) {
5635 for (j
= 1; j
< c
->argc
-1; j
++) {
5636 o
= lookupKeyWrite(c
->db
,c
->argv
[j
]);
5638 if (o
->type
!= REDIS_LIST
) {
5639 addReply(c
,shared
.wrongtypeerr
);
5642 list
*list
= o
->ptr
;
5643 if (listLength(list
) != 0) {
5644 /* If the list contains elements fall back to the usual
5645 * non-blocking POP operation */
5646 robj
*argv
[2], **orig_argv
;
5649 /* We need to alter the command arguments before to call
5650 * popGenericCommand() as the command takes a single key. */
5651 orig_argv
= c
->argv
;
5652 orig_argc
= c
->argc
;
5653 argv
[1] = c
->argv
[j
];
5657 /* Also the return value is different, we need to output
5658 * the multi bulk reply header and the key name. The
5659 * "real" command will add the last element (the value)
5660 * for us. If this souds like an hack to you it's just
5661 * because it is... */
5662 addReplySds(c
,sdsnew("*2\r\n"));
5663 addReplyBulkLen(c
,argv
[1]);
5664 addReply(c
,argv
[1]);
5665 addReply(c
,shared
.crlf
);
5666 popGenericCommand(c
,where
);
5668 /* Fix the client structure with the original stuff */
5669 c
->argv
= orig_argv
;
5670 c
->argc
= orig_argc
;
5676 /* If the list is empty or the key does not exists we must block */
5677 timeout
= strtol(c
->argv
[c
->argc
-1]->ptr
,NULL
,10);
5678 if (timeout
> 0) timeout
+= time(NULL
);
5679 blockForKeys(c
,c
->argv
+1,c
->argc
-2,timeout
);
5682 static void blpopCommand(redisClient
*c
) {
5683 blockingPopGenericCommand(c
,REDIS_HEAD
);
5686 static void brpopCommand(redisClient
*c
) {
5687 blockingPopGenericCommand(c
,REDIS_TAIL
);
5690 /* =============================== Replication ============================= */
5692 static int syncWrite(int fd
, char *ptr
, ssize_t size
, int timeout
) {
5693 ssize_t nwritten
, ret
= size
;
5694 time_t start
= time(NULL
);
5698 if (aeWait(fd
,AE_WRITABLE
,1000) & AE_WRITABLE
) {
5699 nwritten
= write(fd
,ptr
,size
);
5700 if (nwritten
== -1) return -1;
5704 if ((time(NULL
)-start
) > timeout
) {
5712 static int syncRead(int fd
, char *ptr
, ssize_t size
, int timeout
) {
5713 ssize_t nread
, totread
= 0;
5714 time_t start
= time(NULL
);
5718 if (aeWait(fd
,AE_READABLE
,1000) & AE_READABLE
) {
5719 nread
= read(fd
,ptr
,size
);
5720 if (nread
== -1) return -1;
5725 if ((time(NULL
)-start
) > timeout
) {
5733 static int syncReadLine(int fd
, char *ptr
, ssize_t size
, int timeout
) {
5740 if (syncRead(fd
,&c
,1,timeout
) == -1) return -1;
5743 if (nread
&& *(ptr
-1) == '\r') *(ptr
-1) = '\0';
5754 static void syncCommand(redisClient
*c
) {
5755 /* ignore SYNC if aleady slave or in monitor mode */
5756 if (c
->flags
& REDIS_SLAVE
) return;
5758 /* SYNC can't be issued when the server has pending data to send to
5759 * the client about already issued commands. We need a fresh reply
5760 * buffer registering the differences between the BGSAVE and the current
5761 * dataset, so that we can copy to other slaves if needed. */
5762 if (listLength(c
->reply
) != 0) {
5763 addReplySds(c
,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
5767 redisLog(REDIS_NOTICE
,"Slave ask for synchronization");
5768 /* Here we need to check if there is a background saving operation
5769 * in progress, or if it is required to start one */
5770 if (server
.bgsavechildpid
!= -1) {
5771 /* Ok a background save is in progress. Let's check if it is a good
5772 * one for replication, i.e. if there is another slave that is
5773 * registering differences since the server forked to save */
5777 listRewind(server
.slaves
);
5778 while((ln
= listYield(server
.slaves
))) {
5780 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) break;
5783 /* Perfect, the server is already registering differences for
5784 * another slave. Set the right state, and copy the buffer. */
5785 listRelease(c
->reply
);
5786 c
->reply
= listDup(slave
->reply
);
5787 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
5788 redisLog(REDIS_NOTICE
,"Waiting for end of BGSAVE for SYNC");
5790 /* No way, we need to wait for the next BGSAVE in order to
5791 * register differences */
5792 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_START
;
5793 redisLog(REDIS_NOTICE
,"Waiting for next BGSAVE for SYNC");
5796 /* Ok we don't have a BGSAVE in progress, let's start one */
5797 redisLog(REDIS_NOTICE
,"Starting BGSAVE for SYNC");
5798 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
5799 redisLog(REDIS_NOTICE
,"Replication failed, can't BGSAVE");
5800 addReplySds(c
,sdsnew("-ERR Unalbe to perform background save\r\n"));
5803 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
5806 c
->flags
|= REDIS_SLAVE
;
5808 listAddNodeTail(server
.slaves
,c
);
5812 static void sendBulkToSlave(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
5813 redisClient
*slave
= privdata
;
5815 REDIS_NOTUSED(mask
);
5816 char buf
[REDIS_IOBUF_LEN
];
5817 ssize_t nwritten
, buflen
;
5819 if (slave
->repldboff
== 0) {
5820 /* Write the bulk write count before to transfer the DB. In theory here
5821 * we don't know how much room there is in the output buffer of the
5822 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
5823 * operations) will never be smaller than the few bytes we need. */
5826 bulkcount
= sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
5828 if (write(fd
,bulkcount
,sdslen(bulkcount
)) != (signed)sdslen(bulkcount
))
5836 lseek(slave
->repldbfd
,slave
->repldboff
,SEEK_SET
);
5837 buflen
= read(slave
->repldbfd
,buf
,REDIS_IOBUF_LEN
);
5839 redisLog(REDIS_WARNING
,"Read error sending DB to slave: %s",
5840 (buflen
== 0) ? "premature EOF" : strerror(errno
));
5844 if ((nwritten
= write(fd
,buf
,buflen
)) == -1) {
5845 redisLog(REDIS_DEBUG
,"Write error sending DB to slave: %s",
5850 slave
->repldboff
+= nwritten
;
5851 if (slave
->repldboff
== slave
->repldbsize
) {
5852 close(slave
->repldbfd
);
5853 slave
->repldbfd
= -1;
5854 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
5855 slave
->replstate
= REDIS_REPL_ONLINE
;
5856 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
,
5857 sendReplyToClient
, slave
) == AE_ERR
) {
5861 addReplySds(slave
,sdsempty());
5862 redisLog(REDIS_NOTICE
,"Synchronization with slave succeeded");
5866 /* This function is called at the end of every backgrond saving.
5867 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
5868 * otherwise REDIS_ERR is passed to the function.
5870 * The goal of this function is to handle slaves waiting for a successful
5871 * background saving in order to perform non-blocking synchronization. */
5872 static void updateSlavesWaitingBgsave(int bgsaveerr
) {
5874 int startbgsave
= 0;
5876 listRewind(server
.slaves
);
5877 while((ln
= listYield(server
.slaves
))) {
5878 redisClient
*slave
= ln
->value
;
5880 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) {
5882 slave
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
5883 } else if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) {
5884 struct redis_stat buf
;
5886 if (bgsaveerr
!= REDIS_OK
) {
5888 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE child returned an error");
5891 if ((slave
->repldbfd
= open(server
.dbfilename
,O_RDONLY
)) == -1 ||
5892 redis_fstat(slave
->repldbfd
,&buf
) == -1) {
5894 redisLog(REDIS_WARNING
,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno
));
5897 slave
->repldboff
= 0;
5898 slave
->repldbsize
= buf
.st_size
;
5899 slave
->replstate
= REDIS_REPL_SEND_BULK
;
5900 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
5901 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
, sendBulkToSlave
, slave
) == AE_ERR
) {
5908 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
5909 listRewind(server
.slaves
);
5910 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE failed");
5911 while((ln
= listYield(server
.slaves
))) {
5912 redisClient
*slave
= ln
->value
;
5914 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
)
5921 static int syncWithMaster(void) {
5922 char buf
[1024], tmpfile
[256], authcmd
[1024];
5924 int fd
= anetTcpConnect(NULL
,server
.masterhost
,server
.masterport
);
5928 redisLog(REDIS_WARNING
,"Unable to connect to MASTER: %s",
5933 /* AUTH with the master if required. */
5934 if(server
.masterauth
) {
5935 snprintf(authcmd
, 1024, "AUTH %s\r\n", server
.masterauth
);
5936 if (syncWrite(fd
, authcmd
, strlen(server
.masterauth
)+7, 5) == -1) {
5938 redisLog(REDIS_WARNING
,"Unable to AUTH to MASTER: %s",
5942 /* Read the AUTH result. */
5943 if (syncReadLine(fd
,buf
,1024,3600) == -1) {
5945 redisLog(REDIS_WARNING
,"I/O error reading auth result from MASTER: %s",
5949 if (buf
[0] != '+') {
5951 redisLog(REDIS_WARNING
,"Cannot AUTH to MASTER, is the masterauth password correct?");
5956 /* Issue the SYNC command */
5957 if (syncWrite(fd
,"SYNC \r\n",7,5) == -1) {
5959 redisLog(REDIS_WARNING
,"I/O error writing to MASTER: %s",
5963 /* Read the bulk write count */
5964 if (syncReadLine(fd
,buf
,1024,3600) == -1) {
5966 redisLog(REDIS_WARNING
,"I/O error reading bulk count from MASTER: %s",
5970 if (buf
[0] != '$') {
5972 redisLog(REDIS_WARNING
,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
5975 dumpsize
= atoi(buf
+1);
5976 redisLog(REDIS_NOTICE
,"Receiving %d bytes data dump from MASTER",dumpsize
);
5977 /* Read the bulk write data on a temp file */
5978 snprintf(tmpfile
,256,"temp-%d.%ld.rdb",(int)time(NULL
),(long int)random());
5979 dfd
= open(tmpfile
,O_CREAT
|O_WRONLY
,0644);
5982 redisLog(REDIS_WARNING
,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno
));
5986 int nread
, nwritten
;
5988 nread
= read(fd
,buf
,(dumpsize
< 1024)?dumpsize
:1024);
5990 redisLog(REDIS_WARNING
,"I/O error trying to sync with MASTER: %s",
5996 nwritten
= write(dfd
,buf
,nread
);
5997 if (nwritten
== -1) {
5998 redisLog(REDIS_WARNING
,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno
));
6006 if (rename(tmpfile
,server
.dbfilename
) == -1) {
6007 redisLog(REDIS_WARNING
,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno
));
6013 if (rdbLoad(server
.dbfilename
) != REDIS_OK
) {
6014 redisLog(REDIS_WARNING
,"Failed trying to load the MASTER synchronization DB from disk");
6018 server
.master
= createClient(fd
);
6019 server
.master
->flags
|= REDIS_MASTER
;
6020 server
.master
->authenticated
= 1;
6021 server
.replstate
= REDIS_REPL_CONNECTED
;
6025 static void slaveofCommand(redisClient
*c
) {
6026 if (!strcasecmp(c
->argv
[1]->ptr
,"no") &&
6027 !strcasecmp(c
->argv
[2]->ptr
,"one")) {
6028 if (server
.masterhost
) {
6029 sdsfree(server
.masterhost
);
6030 server
.masterhost
= NULL
;
6031 if (server
.master
) freeClient(server
.master
);
6032 server
.replstate
= REDIS_REPL_NONE
;
6033 redisLog(REDIS_NOTICE
,"MASTER MODE enabled (user request)");
6036 sdsfree(server
.masterhost
);
6037 server
.masterhost
= sdsdup(c
->argv
[1]->ptr
);
6038 server
.masterport
= atoi(c
->argv
[2]->ptr
);
6039 if (server
.master
) freeClient(server
.master
);
6040 server
.replstate
= REDIS_REPL_CONNECT
;
6041 redisLog(REDIS_NOTICE
,"SLAVE OF %s:%d enabled (user request)",
6042 server
.masterhost
, server
.masterport
);
6044 addReply(c
,shared
.ok
);
6047 /* ============================ Maxmemory directive ======================== */
6049 /* This function gets called when 'maxmemory' is set on the config file to limit
6050 * the max memory used by the server, and we are out of memory.
6051 * This function will try to, in order:
6053 * - Free objects from the free list
6054 * - Try to remove keys with an EXPIRE set
6056 * It is not possible to free enough memory to reach used-memory < maxmemory
6057 * the server will start refusing commands that will enlarge even more the
6060 static void freeMemoryIfNeeded(void) {
6061 while (server
.maxmemory
&& zmalloc_used_memory() > server
.maxmemory
) {
6062 if (listLength(server
.objfreelist
)) {
6065 listNode
*head
= listFirst(server
.objfreelist
);
6066 o
= listNodeValue(head
);
6067 listDelNode(server
.objfreelist
,head
);
6070 int j
, k
, freed
= 0;
6072 for (j
= 0; j
< server
.dbnum
; j
++) {
6074 robj
*minkey
= NULL
;
6075 struct dictEntry
*de
;
6077 if (dictSize(server
.db
[j
].expires
)) {
6079 /* From a sample of three keys drop the one nearest to
6080 * the natural expire */
6081 for (k
= 0; k
< 3; k
++) {
6084 de
= dictGetRandomKey(server
.db
[j
].expires
);
6085 t
= (time_t) dictGetEntryVal(de
);
6086 if (minttl
== -1 || t
< minttl
) {
6087 minkey
= dictGetEntryKey(de
);
6091 deleteKey(server
.db
+j
,minkey
);
6094 if (!freed
) return; /* nothing to free... */
6099 /* ============================== Append Only file ========================== */
6101 static void feedAppendOnlyFile(struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
) {
6102 sds buf
= sdsempty();
6108 /* The DB this command was targetting is not the same as the last command
6109 * we appendend. To issue a SELECT command is needed. */
6110 if (dictid
!= server
.appendseldb
) {
6113 snprintf(seldb
,sizeof(seldb
),"%d",dictid
);
6114 buf
= sdscatprintf(buf
,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
6115 (unsigned long)strlen(seldb
),seldb
);
6116 server
.appendseldb
= dictid
;
6119 /* "Fix" the argv vector if the command is EXPIRE. We want to translate
6120 * EXPIREs into EXPIREATs calls */
6121 if (cmd
->proc
== expireCommand
) {
6124 tmpargv
[0] = createStringObject("EXPIREAT",8);
6125 tmpargv
[1] = argv
[1];
6126 incrRefCount(argv
[1]);
6127 when
= time(NULL
)+strtol(argv
[2]->ptr
,NULL
,10);
6128 tmpargv
[2] = createObject(REDIS_STRING
,
6129 sdscatprintf(sdsempty(),"%ld",when
));
6133 /* Append the actual command */
6134 buf
= sdscatprintf(buf
,"*%d\r\n",argc
);
6135 for (j
= 0; j
< argc
; j
++) {
6138 o
= getDecodedObject(o
);
6139 buf
= sdscatprintf(buf
,"$%lu\r\n",(unsigned long)sdslen(o
->ptr
));
6140 buf
= sdscatlen(buf
,o
->ptr
,sdslen(o
->ptr
));
6141 buf
= sdscatlen(buf
,"\r\n",2);
6145 /* Free the objects from the modified argv for EXPIREAT */
6146 if (cmd
->proc
== expireCommand
) {
6147 for (j
= 0; j
< 3; j
++)
6148 decrRefCount(argv
[j
]);
6151 /* We want to perform a single write. This should be guaranteed atomic
6152 * at least if the filesystem we are writing is a real physical one.
6153 * While this will save us against the server being killed I don't think
6154 * there is much to do about the whole server stopping for power problems
6156 nwritten
= write(server
.appendfd
,buf
,sdslen(buf
));
6157 if (nwritten
!= (signed)sdslen(buf
)) {
6158 /* Ooops, we are in troubles. The best thing to do for now is
6159 * to simply exit instead to give the illusion that everything is
6160 * working as expected. */
6161 if (nwritten
== -1) {
6162 redisLog(REDIS_WARNING
,"Exiting on error writing to the append-only file: %s",strerror(errno
));
6164 redisLog(REDIS_WARNING
,"Exiting on short write while writing to the append-only file: %s",strerror(errno
));
6168 /* If a background append only file rewriting is in progress we want to
6169 * accumulate the differences between the child DB and the current one
6170 * in a buffer, so that when the child process will do its work we
6171 * can append the differences to the new append only file. */
6172 if (server
.bgrewritechildpid
!= -1)
6173 server
.bgrewritebuf
= sdscatlen(server
.bgrewritebuf
,buf
,sdslen(buf
));
6177 if (server
.appendfsync
== APPENDFSYNC_ALWAYS
||
6178 (server
.appendfsync
== APPENDFSYNC_EVERYSEC
&&
6179 now
-server
.lastfsync
> 1))
6181 fsync(server
.appendfd
); /* Let's try to get this data on the disk */
6182 server
.lastfsync
= now
;
6186 /* In Redis commands are always executed in the context of a client, so in
6187 * order to load the append only file we need to create a fake client. */
6188 static struct redisClient
*createFakeClient(void) {
6189 struct redisClient
*c
= zmalloc(sizeof(*c
));
6193 c
->querybuf
= sdsempty();
6197 /* We set the fake client as a slave waiting for the synchronization
6198 * so that Redis will not try to send replies to this client. */
6199 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_START
;
6200 c
->reply
= listCreate();
6201 listSetFreeMethod(c
->reply
,decrRefCount
);
6202 listSetDupMethod(c
->reply
,dupClientReplyValue
);
6206 static void freeFakeClient(struct redisClient
*c
) {
6207 sdsfree(c
->querybuf
);
6208 listRelease(c
->reply
);
6212 /* Replay the append log file. On error REDIS_OK is returned. On non fatal
6213 * error (the append only file is zero-length) REDIS_ERR is returned. On
6214 * fatal error an error message is logged and the program exists. */
6215 int loadAppendOnlyFile(char *filename
) {
6216 struct redisClient
*fakeClient
;
6217 FILE *fp
= fopen(filename
,"r");
6218 struct redis_stat sb
;
6220 if (redis_fstat(fileno(fp
),&sb
) != -1 && sb
.st_size
== 0)
6224 redisLog(REDIS_WARNING
,"Fatal error: can't open the append log file for reading: %s",strerror(errno
));
6228 fakeClient
= createFakeClient();
6235 struct redisCommand
*cmd
;
6237 if (fgets(buf
,sizeof(buf
),fp
) == NULL
) {
6243 if (buf
[0] != '*') goto fmterr
;
6245 argv
= zmalloc(sizeof(robj
*)*argc
);
6246 for (j
= 0; j
< argc
; j
++) {
6247 if (fgets(buf
,sizeof(buf
),fp
) == NULL
) goto readerr
;
6248 if (buf
[0] != '$') goto fmterr
;
6249 len
= strtol(buf
+1,NULL
,10);
6250 argsds
= sdsnewlen(NULL
,len
);
6251 if (len
&& fread(argsds
,len
,1,fp
) == 0) goto fmterr
;
6252 argv
[j
] = createObject(REDIS_STRING
,argsds
);
6253 if (fread(buf
,2,1,fp
) == 0) goto fmterr
; /* discard CRLF */
6256 /* Command lookup */
6257 cmd
= lookupCommand(argv
[0]->ptr
);
6259 redisLog(REDIS_WARNING
,"Unknown command '%s' reading the append only file", argv
[0]->ptr
);
6262 /* Try object sharing and encoding */
6263 if (server
.shareobjects
) {
6265 for(j
= 1; j
< argc
; j
++)
6266 argv
[j
] = tryObjectSharing(argv
[j
]);
6268 if (cmd
->flags
& REDIS_CMD_BULK
)
6269 tryObjectEncoding(argv
[argc
-1]);
6270 /* Run the command in the context of a fake client */
6271 fakeClient
->argc
= argc
;
6272 fakeClient
->argv
= argv
;
6273 cmd
->proc(fakeClient
);
6274 /* Discard the reply objects list from the fake client */
6275 while(listLength(fakeClient
->reply
))
6276 listDelNode(fakeClient
->reply
,listFirst(fakeClient
->reply
));
6277 /* Clean up, ready for the next command */
6278 for (j
= 0; j
< argc
; j
++) decrRefCount(argv
[j
]);
6282 freeFakeClient(fakeClient
);
6287 redisLog(REDIS_WARNING
,"Unexpected end of file reading the append only file");
6289 redisLog(REDIS_WARNING
,"Unrecoverable error reading the append only file: %s", strerror(errno
));
6293 redisLog(REDIS_WARNING
,"Bad file format reading the append only file");
6297 /* Write an object into a file in the bulk format $<count>\r\n<payload>\r\n */
6298 static int fwriteBulk(FILE *fp
, robj
*obj
) {
6300 obj
= getDecodedObject(obj
);
6301 snprintf(buf
,sizeof(buf
),"$%ld\r\n",(long)sdslen(obj
->ptr
));
6302 if (fwrite(buf
,strlen(buf
),1,fp
) == 0) goto err
;
6303 if (sdslen(obj
->ptr
) && fwrite(obj
->ptr
,sdslen(obj
->ptr
),1,fp
) == 0)
6305 if (fwrite("\r\n",2,1,fp
) == 0) goto err
;
6313 /* Write a double value in bulk format $<count>\r\n<payload>\r\n */
6314 static int fwriteBulkDouble(FILE *fp
, double d
) {
6315 char buf
[128], dbuf
[128];
6317 snprintf(dbuf
,sizeof(dbuf
),"%.17g\r\n",d
);
6318 snprintf(buf
,sizeof(buf
),"$%lu\r\n",(unsigned long)strlen(dbuf
)-2);
6319 if (fwrite(buf
,strlen(buf
),1,fp
) == 0) return 0;
6320 if (fwrite(dbuf
,strlen(dbuf
),1,fp
) == 0) return 0;
6324 /* Write a long value in bulk format $<count>\r\n<payload>\r\n */
6325 static int fwriteBulkLong(FILE *fp
, long l
) {
6326 char buf
[128], lbuf
[128];
6328 snprintf(lbuf
,sizeof(lbuf
),"%ld\r\n",l
);
6329 snprintf(buf
,sizeof(buf
),"$%lu\r\n",(unsigned long)strlen(lbuf
)-2);
6330 if (fwrite(buf
,strlen(buf
),1,fp
) == 0) return 0;
6331 if (fwrite(lbuf
,strlen(lbuf
),1,fp
) == 0) return 0;
6335 /* Write a sequence of commands able to fully rebuild the dataset into
6336 * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */
6337 static int rewriteAppendOnlyFile(char *filename
) {
6338 dictIterator
*di
= NULL
;
6343 time_t now
= time(NULL
);
6345 /* Note that we have to use a different temp name here compared to the
6346 * one used by rewriteAppendOnlyFileBackground() function. */
6347 snprintf(tmpfile
,256,"temp-rewriteaof-%d.aof", (int) getpid());
6348 fp
= fopen(tmpfile
,"w");
6350 redisLog(REDIS_WARNING
, "Failed rewriting the append only file: %s", strerror(errno
));
6353 for (j
= 0; j
< server
.dbnum
; j
++) {
6354 char selectcmd
[] = "*2\r\n$6\r\nSELECT\r\n";
6355 redisDb
*db
= server
.db
+j
;
6357 if (dictSize(d
) == 0) continue;
6358 di
= dictGetIterator(d
);
6364 /* SELECT the new DB */
6365 if (fwrite(selectcmd
,sizeof(selectcmd
)-1,1,fp
) == 0) goto werr
;
6366 if (fwriteBulkLong(fp
,j
) == 0) goto werr
;
6368 /* Iterate this DB writing every entry */
6369 while((de
= dictNext(di
)) != NULL
) {
6370 robj
*key
= dictGetEntryKey(de
);
6371 robj
*o
= dictGetEntryVal(de
);
6372 time_t expiretime
= getExpire(db
,key
);
6374 /* Save the key and associated value */
6375 if (o
->type
== REDIS_STRING
) {
6376 /* Emit a SET command */
6377 char cmd
[]="*3\r\n$3\r\nSET\r\n";
6378 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6380 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6381 if (fwriteBulk(fp
,o
) == 0) goto werr
;
6382 } else if (o
->type
== REDIS_LIST
) {
6383 /* Emit the RPUSHes needed to rebuild the list */
6384 list
*list
= o
->ptr
;
6388 while((ln
= listYield(list
))) {
6389 char cmd
[]="*3\r\n$5\r\nRPUSH\r\n";
6390 robj
*eleobj
= listNodeValue(ln
);
6392 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6393 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6394 if (fwriteBulk(fp
,eleobj
) == 0) goto werr
;
6396 } else if (o
->type
== REDIS_SET
) {
6397 /* Emit the SADDs needed to rebuild the set */
6399 dictIterator
*di
= dictGetIterator(set
);
6402 while((de
= dictNext(di
)) != NULL
) {
6403 char cmd
[]="*3\r\n$4\r\nSADD\r\n";
6404 robj
*eleobj
= dictGetEntryKey(de
);
6406 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6407 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6408 if (fwriteBulk(fp
,eleobj
) == 0) goto werr
;
6410 dictReleaseIterator(di
);
6411 } else if (o
->type
== REDIS_ZSET
) {
6412 /* Emit the ZADDs needed to rebuild the sorted set */
6414 dictIterator
*di
= dictGetIterator(zs
->dict
);
6417 while((de
= dictNext(di
)) != NULL
) {
6418 char cmd
[]="*4\r\n$4\r\nZADD\r\n";
6419 robj
*eleobj
= dictGetEntryKey(de
);
6420 double *score
= dictGetEntryVal(de
);
6422 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6423 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6424 if (fwriteBulkDouble(fp
,*score
) == 0) goto werr
;
6425 if (fwriteBulk(fp
,eleobj
) == 0) goto werr
;
6427 dictReleaseIterator(di
);
6429 redisAssert(0 != 0);
6431 /* Save the expire time */
6432 if (expiretime
!= -1) {
6433 char cmd
[]="*3\r\n$8\r\nEXPIREAT\r\n";
6434 /* If this key is already expired skip it */
6435 if (expiretime
< now
) continue;
6436 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6437 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6438 if (fwriteBulkLong(fp
,expiretime
) == 0) goto werr
;
6441 dictReleaseIterator(di
);
6444 /* Make sure data will not remain on the OS's output buffers */
6449 /* Use RENAME to make sure the DB file is changed atomically only
6450 * if the generate DB file is ok. */
6451 if (rename(tmpfile
,filename
) == -1) {
6452 redisLog(REDIS_WARNING
,"Error moving temp append only file on the final destination: %s", strerror(errno
));
6456 redisLog(REDIS_NOTICE
,"SYNC append only file rewrite performed");
6462 redisLog(REDIS_WARNING
,"Write error writing append only file on disk: %s", strerror(errno
));
6463 if (di
) dictReleaseIterator(di
);
6467 /* This is how rewriting of the append only file in background works:
6469 * 1) The user calls BGREWRITEAOF
6470 * 2) Redis calls this function, that forks():
6471 * 2a) the child rewrite the append only file in a temp file.
6472 * 2b) the parent accumulates differences in server.bgrewritebuf.
6473 * 3) When the child finished '2a' exists.
6474 * 4) The parent will trap the exit code, if it's OK, will append the
6475 * data accumulated into server.bgrewritebuf into the temp file, and
6476 * finally will rename(2) the temp file in the actual file name.
6477 * The the new file is reopened as the new append only file. Profit!
6479 static int rewriteAppendOnlyFileBackground(void) {
6482 if (server
.bgrewritechildpid
!= -1) return REDIS_ERR
;
6483 if ((childpid
= fork()) == 0) {
6488 snprintf(tmpfile
,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
6489 if (rewriteAppendOnlyFile(tmpfile
) == REDIS_OK
) {
6496 if (childpid
== -1) {
6497 redisLog(REDIS_WARNING
,
6498 "Can't rewrite append only file in background: fork: %s",
6502 redisLog(REDIS_NOTICE
,
6503 "Background append only file rewriting started by pid %d",childpid
);
6504 server
.bgrewritechildpid
= childpid
;
6505 /* We set appendseldb to -1 in order to force the next call to the
6506 * feedAppendOnlyFile() to issue a SELECT command, so the differences
6507 * accumulated by the parent into server.bgrewritebuf will start
6508 * with a SELECT statement and it will be safe to merge. */
6509 server
.appendseldb
= -1;
6512 return REDIS_OK
; /* unreached */
6515 static void bgrewriteaofCommand(redisClient
*c
) {
6516 if (server
.bgrewritechildpid
!= -1) {
6517 addReplySds(c
,sdsnew("-ERR background append only file rewriting already in progress\r\n"));
6520 if (rewriteAppendOnlyFileBackground() == REDIS_OK
) {
6521 char *status
= "+Background append only file rewriting started\r\n";
6522 addReplySds(c
,sdsnew(status
));
6524 addReply(c
,shared
.err
);
6528 static void aofRemoveTempFile(pid_t childpid
) {
6531 snprintf(tmpfile
,256,"temp-rewriteaof-bg-%d.aof", (int) childpid
);
6535 /* ================================= Debugging ============================== */
6537 static void debugCommand(redisClient
*c
) {
6538 if (!strcasecmp(c
->argv
[1]->ptr
,"segfault")) {
6540 } else if (!strcasecmp(c
->argv
[1]->ptr
,"reload")) {
6541 if (rdbSave(server
.dbfilename
) != REDIS_OK
) {
6542 addReply(c
,shared
.err
);
6546 if (rdbLoad(server
.dbfilename
) != REDIS_OK
) {
6547 addReply(c
,shared
.err
);
6550 redisLog(REDIS_WARNING
,"DB reloaded by DEBUG RELOAD");
6551 addReply(c
,shared
.ok
);
6552 } else if (!strcasecmp(c
->argv
[1]->ptr
,"loadaof")) {
6554 if (loadAppendOnlyFile(server
.appendfilename
) != REDIS_OK
) {
6555 addReply(c
,shared
.err
);
6558 redisLog(REDIS_WARNING
,"Append Only File loaded by DEBUG LOADAOF");
6559 addReply(c
,shared
.ok
);
6560 } else if (!strcasecmp(c
->argv
[1]->ptr
,"object") && c
->argc
== 3) {
6561 dictEntry
*de
= dictFind(c
->db
->dict
,c
->argv
[2]);
6565 addReply(c
,shared
.nokeyerr
);
6568 key
= dictGetEntryKey(de
);
6569 val
= dictGetEntryVal(de
);
6570 addReplySds(c
,sdscatprintf(sdsempty(),
6571 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d serializedlength:%lld\r\n",
6572 (void*)key
, key
->refcount
, (void*)val
, val
->refcount
,
6573 val
->encoding
, rdbSavedObjectLen(val
)));
6575 addReplySds(c
,sdsnew(
6576 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>|RELOAD]\r\n"));
6580 static void _redisAssert(char *estr
) {
6581 redisLog(REDIS_WARNING
,"=== ASSERTION FAILED ===");
6582 redisLog(REDIS_WARNING
,"==> %s\n",estr
);
6583 #ifdef HAVE_BACKTRACE
6584 redisLog(REDIS_WARNING
,"(forcing SIGSEGV in order to print the stack trace)");
6589 /* =================================== Main! ================================ */
6592 int linuxOvercommitMemoryValue(void) {
6593 FILE *fp
= fopen("/proc/sys/vm/overcommit_memory","r");
6597 if (fgets(buf
,64,fp
) == NULL
) {
6606 void linuxOvercommitMemoryWarning(void) {
6607 if (linuxOvercommitMemoryValue() == 0) {
6608 redisLog(REDIS_WARNING
,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
6611 #endif /* __linux__ */
6613 static void daemonize(void) {
6617 if (fork() != 0) exit(0); /* parent exits */
6618 printf("New pid: %d\n", getpid());
6619 setsid(); /* create a new session */
6621 /* Every output goes to /dev/null. If Redis is daemonized but
6622 * the 'logfile' is set to 'stdout' in the configuration file
6623 * it will not log at all. */
6624 if ((fd
= open("/dev/null", O_RDWR
, 0)) != -1) {
6625 dup2(fd
, STDIN_FILENO
);
6626 dup2(fd
, STDOUT_FILENO
);
6627 dup2(fd
, STDERR_FILENO
);
6628 if (fd
> STDERR_FILENO
) close(fd
);
6630 /* Try to write the pid file */
6631 fp
= fopen(server
.pidfile
,"w");
6633 fprintf(fp
,"%d\n",getpid());
6638 int main(int argc
, char **argv
) {
6641 resetServerSaveParams();
6642 loadServerConfig(argv
[1]);
6643 } else if (argc
> 2) {
6644 fprintf(stderr
,"Usage: ./redis-server [/path/to/redis.conf]\n");
6647 redisLog(REDIS_WARNING
,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
6649 if (server
.daemonize
) daemonize();
6651 redisLog(REDIS_NOTICE
,"Server started, Redis version " REDIS_VERSION
);
6653 linuxOvercommitMemoryWarning();
6655 if (server
.appendonly
) {
6656 if (loadAppendOnlyFile(server
.appendfilename
) == REDIS_OK
)
6657 redisLog(REDIS_NOTICE
,"DB loaded from append only file");
6659 if (rdbLoad(server
.dbfilename
) == REDIS_OK
)
6660 redisLog(REDIS_NOTICE
,"DB loaded from disk");
6662 if (aeCreateFileEvent(server
.el
, server
.fd
, AE_READABLE
,
6663 acceptHandler
, NULL
) == AE_ERR
) oom("creating file event");
6664 redisLog(REDIS_NOTICE
,"The server is now ready to accept connections on port %d", server
.port
);
6666 aeDeleteEventLoop(server
.el
);
6670 /* ============================= Backtrace support ========================= */
6672 #ifdef HAVE_BACKTRACE
6673 static char *findFuncName(void *pointer
, unsigned long *offset
);
6675 static void *getMcontextEip(ucontext_t
*uc
) {
6676 #if defined(__FreeBSD__)
6677 return (void*) uc
->uc_mcontext
.mc_eip
;
6678 #elif defined(__dietlibc__)
6679 return (void*) uc
->uc_mcontext
.eip
;
6680 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
6682 return (void*) uc
->uc_mcontext
->__ss
.__rip
;
6684 return (void*) uc
->uc_mcontext
->__ss
.__eip
;
6686 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
6687 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
6688 return (void*) uc
->uc_mcontext
->__ss
.__rip
;
6690 return (void*) uc
->uc_mcontext
->__ss
.__eip
;
6692 #elif defined(__i386__) || defined(__X86_64__) || defined(__x86_64__)
6693 return (void*) uc
->uc_mcontext
.gregs
[REG_EIP
]; /* Linux 32/64 bit */
6694 #elif defined(__ia64__) /* Linux IA64 */
6695 return (void*) uc
->uc_mcontext
.sc_ip
;
6701 static void segvHandler(int sig
, siginfo_t
*info
, void *secret
) {
6703 char **messages
= NULL
;
6704 int i
, trace_size
= 0;
6705 unsigned long offset
=0;
6706 ucontext_t
*uc
= (ucontext_t
*) secret
;
6708 REDIS_NOTUSED(info
);
6710 redisLog(REDIS_WARNING
,
6711 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION
, sig
);
6712 infostring
= genRedisInfoString();
6713 redisLog(REDIS_WARNING
, "%s",infostring
);
6714 /* It's not safe to sdsfree() the returned string under memory
6715 * corruption conditions. Let it leak as we are going to abort */
6717 trace_size
= backtrace(trace
, 100);
6718 /* overwrite sigaction with caller's address */
6719 if (getMcontextEip(uc
) != NULL
) {
6720 trace
[1] = getMcontextEip(uc
);
6722 messages
= backtrace_symbols(trace
, trace_size
);
6724 for (i
=1; i
<trace_size
; ++i
) {
6725 char *fn
= findFuncName(trace
[i
], &offset
), *p
;
6727 p
= strchr(messages
[i
],'+');
6728 if (!fn
|| (p
&& ((unsigned long)strtol(p
+1,NULL
,10)) < offset
)) {
6729 redisLog(REDIS_WARNING
,"%s", messages
[i
]);
6731 redisLog(REDIS_WARNING
,"%d redis-server %p %s + %d", i
, trace
[i
], fn
, (unsigned int)offset
);
6734 /* free(messages); Don't call free() with possibly corrupted memory. */
6738 static void setupSigSegvAction(void) {
6739 struct sigaction act
;
6741 sigemptyset (&act
.sa_mask
);
6742 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
6743 * is used. Otherwise, sa_handler is used */
6744 act
.sa_flags
= SA_NODEFER
| SA_ONSTACK
| SA_RESETHAND
| SA_SIGINFO
;
6745 act
.sa_sigaction
= segvHandler
;
6746 sigaction (SIGSEGV
, &act
, NULL
);
6747 sigaction (SIGBUS
, &act
, NULL
);
6748 sigaction (SIGFPE
, &act
, NULL
);
6749 sigaction (SIGILL
, &act
, NULL
);
6750 sigaction (SIGBUS
, &act
, NULL
);
6754 #include "staticsymbols.h"
6755 /* This function try to convert a pointer into a function name. It's used in
6756 * oreder to provide a backtrace under segmentation fault that's able to
6757 * display functions declared as static (otherwise the backtrace is useless). */
6758 static char *findFuncName(void *pointer
, unsigned long *offset
){
6760 unsigned long off
, minoff
= 0;
6762 /* Try to match against the Symbol with the smallest offset */
6763 for (i
=0; symsTable
[i
].pointer
; i
++) {
6764 unsigned long lp
= (unsigned long) pointer
;
6766 if (lp
!= (unsigned long)-1 && lp
>= symsTable
[i
].pointer
) {
6767 off
=lp
-symsTable
[i
].pointer
;
6768 if (ret
< 0 || off
< minoff
) {
6774 if (ret
== -1) return NULL
;
6776 return symsTable
[ret
].name
;
6778 #else /* HAVE_BACKTRACE */
6779 static void setupSigSegvAction(void) {
6781 #endif /* HAVE_BACKTRACE */