2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
30 #define REDIS_VERSION "1.3.2"
40 #define __USE_POSIX199309
46 #endif /* HAVE_BACKTRACE */
54 #include <arpa/inet.h>
58 #include <sys/resource.h>
64 #include "solarisfixes.h"
68 #include "ae.h" /* Event driven programming library */
69 #include "sds.h" /* Dynamic safe strings */
70 #include "anet.h" /* Networking the easy way */
71 #include "dict.h" /* Hash tables */
72 #include "adlist.h" /* Linked lists */
73 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
74 #include "lzf.h" /* LZF compression library */
75 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
81 /* Static server configuration */
82 #define REDIS_SERVERPORT 6379 /* TCP port */
83 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
84 #define REDIS_IOBUF_LEN 1024
85 #define REDIS_LOADBUF_LEN 1024
86 #define REDIS_STATIC_ARGS 4
87 #define REDIS_DEFAULT_DBNUM 16
88 #define REDIS_CONFIGLINE_MAX 1024
89 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
90 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
91 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
92 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
93 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
95 /* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */
96 #define REDIS_WRITEV_THRESHOLD 3
97 /* Max number of iovecs used for each writev call */
98 #define REDIS_WRITEV_IOVEC_COUNT 256
100 /* Hash table parameters */
101 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
104 #define REDIS_CMD_BULK 1 /* Bulk write command */
105 #define REDIS_CMD_INLINE 2 /* Inline command */
106 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
107 this flags will return an error when the 'maxmemory' option is set in the
108 config file and the server is using more than maxmemory bytes of memory.
109 In short this commands are denied on low memory conditions. */
110 #define REDIS_CMD_DENYOOM 4
113 #define REDIS_STRING 0
119 /* Objects encoding */
120 #define REDIS_ENCODING_RAW 0 /* Raw representation */
121 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
123 /* Object types only used for dumping to disk */
124 #define REDIS_EXPIRETIME 253
125 #define REDIS_SELECTDB 254
126 #define REDIS_EOF 255
128 /* Defines related to the dump file format. To store 32 bits lengths for short
129 * keys requires a lot of space, so we check the most significant 2 bits of
130 * the first byte to interpreter the length:
132 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
133 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
134 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
135 * 11|000000 this means: specially encoded object will follow. The six bits
136 * number specify the kind of object that follows.
137 * See the REDIS_RDB_ENC_* defines.
139 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
140 * values, will fit inside. */
141 #define REDIS_RDB_6BITLEN 0
142 #define REDIS_RDB_14BITLEN 1
143 #define REDIS_RDB_32BITLEN 2
144 #define REDIS_RDB_ENCVAL 3
145 #define REDIS_RDB_LENERR UINT_MAX
147 /* When a length of a string object stored on disk has the first two bits
148 * set, the remaining two bits specify a special encoding for the object
149 * accordingly to the following defines: */
150 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
151 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
152 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
153 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
155 /* Virtual memory object->where field. */
156 #define REDIS_VM_MEMORY 0 /* The object is on memory */
157 #define REDIS_VM_SWAPPED 1 /* The object is on disk */
158 #define REDIS_VM_SWAPPING 2 /* Redis is swapping this object on disk */
159 #define REDIS_VM_LOADING 3 /* Redis is loading this object from disk */
161 /* Virtual memory static configuration stuff.
162 * Check vmFindContiguousPages() to know more about this magic numbers. */
163 #define REDIS_VM_MAX_NEAR_PAGES 65536
164 #define REDIS_VM_MAX_RANDOM_JUMP 4096
167 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
168 #define REDIS_SLAVE 2 /* This client is a slave server */
169 #define REDIS_MASTER 4 /* This client is a master server */
170 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
171 #define REDIS_MULTI 16 /* This client is in a MULTI context */
172 #define REDIS_BLOCKED 32 /* The client is waiting in a blocking operation */
174 /* Slave replication state - slave side */
175 #define REDIS_REPL_NONE 0 /* No active replication */
176 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
177 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
179 /* Slave replication state - from the point of view of master
180 * Note that in SEND_BULK and ONLINE state the slave receives new updates
181 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
182 * to start the next background saving in order to send updates to it. */
183 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
184 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
185 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
186 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
188 /* List related stuff */
192 /* Sort operations */
193 #define REDIS_SORT_GET 0
194 #define REDIS_SORT_ASC 1
195 #define REDIS_SORT_DESC 2
196 #define REDIS_SORTKEY_MAX 1024
199 #define REDIS_DEBUG 0
200 #define REDIS_NOTICE 1
201 #define REDIS_WARNING 2
203 /* Anti-warning macro... */
204 #define REDIS_NOTUSED(V) ((void) V)
206 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
207 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
209 /* Append only defines */
210 #define APPENDFSYNC_NO 0
211 #define APPENDFSYNC_ALWAYS 1
212 #define APPENDFSYNC_EVERYSEC 2
214 /* We can print the stacktrace, so our assert is defined this way: */
215 #define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e),exit(1)))
216 static void _redisAssert(char *estr
);
218 /*================================= Data types ============================== */
220 /* A redis object, that is a type able to hold a string / list / set */
222 /* The VM object structure */
223 struct redisObjectVM
{
224 off_t page
; /* the page at witch the object is stored on disk */
225 off_t usedpages
; /* number of pages used on disk */
226 time_t atime
; /* Last access time */
229 /* The actual Redis Object */
230 typedef struct redisObject
{
233 unsigned char encoding
;
234 unsigned char storage
; /* where? REDIS_VM_MEMORY, REDIS_VM_SWAPPED, ... */
235 unsigned char notused
;
237 /* VM fields, this are only allocated if VM is active, otherwise the
238 * object allocation function will just allocate
239 * sizeof(redisObjct) minus sizeof(redisObjectVM), so using
240 * Redis without VM active will not have any overhead. */
241 struct redisObjectVM vm
;
244 /* Macro used to initalize a Redis object allocated on the stack.
245 * Note that this macro is taken near the structure definition to make sure
246 * we'll update it when the structure is changed, to avoid bugs like
247 * bug #85 introduced exactly in this way. */
248 #define initStaticStringObject(_var,_ptr) do { \
250 _var.type = REDIS_STRING; \
251 _var.encoding = REDIS_ENCODING_RAW; \
253 if (server.vm_enabled) _var.storage = REDIS_VM_MEMORY; \
256 typedef struct redisDb
{
257 dict
*dict
; /* The keyspace for this DB */
258 dict
*expires
; /* Timeout of keys with a timeout set */
259 dict
*blockingkeys
; /* Keys with clients waiting for data (BLPOP) */
263 /* Client MULTI/EXEC state */
264 typedef struct multiCmd
{
267 struct redisCommand
*cmd
;
270 typedef struct multiState
{
271 multiCmd
*commands
; /* Array of MULTI commands */
272 int count
; /* Total number of MULTI commands */
275 /* With multiplexing we need to take per-clinet state.
276 * Clients are taken in a liked list. */
277 typedef struct redisClient
{
282 robj
**argv
, **mbargv
;
284 int bulklen
; /* bulk read len. -1 if not in bulk read mode */
285 int multibulk
; /* multi bulk command format active */
288 time_t lastinteraction
; /* time of the last interaction, used for timeout */
289 int flags
; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
291 int slaveseldb
; /* slave selected db, if this client is a slave */
292 int authenticated
; /* when requirepass is non-NULL */
293 int replstate
; /* replication state if this is a slave */
294 int repldbfd
; /* replication DB file descriptor */
295 long repldboff
; /* replication DB file offset */
296 off_t repldbsize
; /* replication DB file size */
297 multiState mstate
; /* MULTI/EXEC state */
298 robj
**blockingkeys
; /* The key we waiting to terminate a blocking
299 * operation such as BLPOP. Otherwise NULL. */
300 int blockingkeysnum
; /* Number of blocking keys */
301 time_t blockingto
; /* Blocking operation timeout. If UNIX current time
302 * is >= blockingto then the operation timed out. */
310 /* Global server state structure */
315 dict
*sharingpool
; /* Poll used for object sharing */
316 unsigned int sharingpoolsize
;
317 long long dirty
; /* changes to DB from the last save */
319 list
*slaves
, *monitors
;
320 char neterr
[ANET_ERR_LEN
];
322 int cronloops
; /* number of times the cron function run */
323 list
*objfreelist
; /* A list of freed objects to avoid malloc() */
324 time_t lastsave
; /* Unix time of last save succeeede */
325 size_t usedmemory
; /* Used memory in megabytes */
326 /* Fields used only for stats */
327 time_t stat_starttime
; /* server start time */
328 long long stat_numcommands
; /* number of processed commands */
329 long long stat_numconnections
; /* number of connections received */
342 pid_t bgsavechildpid
;
343 pid_t bgrewritechildpid
;
344 sds bgrewritebuf
; /* buffer taken by parent during oppend only rewrite */
345 struct saveparam
*saveparams
;
350 char *appendfilename
;
354 /* Replication related */
359 redisClient
*master
; /* client that is master for this slave */
361 unsigned int maxclients
;
362 unsigned long maxmemory
;
363 unsigned int blockedclients
;
364 /* Sort parameters - qsort_r() is only available under BSD so we
365 * have to take this state global, in order to pass it to sortCompare() */
369 /* Virtual memory configuration */
374 /* Virtual memory state */
377 off_t vm_next_page
; /* Next probably empty page */
378 off_t vm_near_pages
; /* Number of pages allocated sequentially */
379 unsigned char *vm_bitmap
; /* Bitmap of free/used pages */
380 time_t unixtime
; /* Unix time sampled every second. */
383 typedef void redisCommandProc(redisClient
*c
);
384 struct redisCommand
{
386 redisCommandProc
*proc
;
391 struct redisFunctionSym
{
393 unsigned long pointer
;
396 typedef struct _redisSortObject
{
404 typedef struct _redisSortOperation
{
407 } redisSortOperation
;
409 /* ZSETs use a specialized version of Skiplists */
411 typedef struct zskiplistNode
{
412 struct zskiplistNode
**forward
;
413 struct zskiplistNode
*backward
;
418 typedef struct zskiplist
{
419 struct zskiplistNode
*header
, *tail
;
420 unsigned long length
;
424 typedef struct zset
{
429 /* Our shared "common" objects */
431 struct sharedObjectsStruct
{
432 robj
*crlf
, *ok
, *err
, *emptybulk
, *czero
, *cone
, *pong
, *space
,
433 *colon
, *nullbulk
, *nullmultibulk
, *queued
,
434 *emptymultibulk
, *wrongtypeerr
, *nokeyerr
, *syntaxerr
, *sameobjecterr
,
435 *outofrangeerr
, *plus
,
436 *select0
, *select1
, *select2
, *select3
, *select4
,
437 *select5
, *select6
, *select7
, *select8
, *select9
;
440 /* Global vars that are actally used as constants. The following double
441 * values are used for double on-disk serialization, and are initialized
442 * at runtime to avoid strange compiler optimizations. */
444 static double R_Zero
, R_PosInf
, R_NegInf
, R_Nan
;
446 /*================================ Prototypes =============================== */
448 static void freeStringObject(robj
*o
);
449 static void freeListObject(robj
*o
);
450 static void freeSetObject(robj
*o
);
451 static void decrRefCount(void *o
);
452 static robj
*createObject(int type
, void *ptr
);
453 static void freeClient(redisClient
*c
);
454 static int rdbLoad(char *filename
);
455 static void addReply(redisClient
*c
, robj
*obj
);
456 static void addReplySds(redisClient
*c
, sds s
);
457 static void incrRefCount(robj
*o
);
458 static int rdbSaveBackground(char *filename
);
459 static robj
*createStringObject(char *ptr
, size_t len
);
460 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
);
461 static void feedAppendOnlyFile(struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
);
462 static int syncWithMaster(void);
463 static robj
*tryObjectSharing(robj
*o
);
464 static int tryObjectEncoding(robj
*o
);
465 static robj
*getDecodedObject(robj
*o
);
466 static int removeExpire(redisDb
*db
, robj
*key
);
467 static int expireIfNeeded(redisDb
*db
, robj
*key
);
468 static int deleteIfVolatile(redisDb
*db
, robj
*key
);
469 static int deleteKey(redisDb
*db
, robj
*key
);
470 static time_t getExpire(redisDb
*db
, robj
*key
);
471 static int setExpire(redisDb
*db
, robj
*key
, time_t when
);
472 static void updateSlavesWaitingBgsave(int bgsaveerr
);
473 static void freeMemoryIfNeeded(void);
474 static int processCommand(redisClient
*c
);
475 static void setupSigSegvAction(void);
476 static void rdbRemoveTempFile(pid_t childpid
);
477 static void aofRemoveTempFile(pid_t childpid
);
478 static size_t stringObjectLen(robj
*o
);
479 static void processInputBuffer(redisClient
*c
);
480 static zskiplist
*zslCreate(void);
481 static void zslFree(zskiplist
*zsl
);
482 static void zslInsert(zskiplist
*zsl
, double score
, robj
*obj
);
483 static void sendReplyToClientWritev(aeEventLoop
*el
, int fd
, void *privdata
, int mask
);
484 static void initClientMultiState(redisClient
*c
);
485 static void freeClientMultiState(redisClient
*c
);
486 static void queueMultiCommand(redisClient
*c
, struct redisCommand
*cmd
);
487 static void unblockClient(redisClient
*c
);
488 static int handleClientsWaitingListPush(redisClient
*c
, robj
*key
, robj
*ele
);
489 static void vmInit(void);
490 static void vmMarkPagesFree(off_t page
, off_t count
);
492 static void authCommand(redisClient
*c
);
493 static void pingCommand(redisClient
*c
);
494 static void echoCommand(redisClient
*c
);
495 static void setCommand(redisClient
*c
);
496 static void setnxCommand(redisClient
*c
);
497 static void getCommand(redisClient
*c
);
498 static void delCommand(redisClient
*c
);
499 static void existsCommand(redisClient
*c
);
500 static void incrCommand(redisClient
*c
);
501 static void decrCommand(redisClient
*c
);
502 static void incrbyCommand(redisClient
*c
);
503 static void decrbyCommand(redisClient
*c
);
504 static void selectCommand(redisClient
*c
);
505 static void randomkeyCommand(redisClient
*c
);
506 static void keysCommand(redisClient
*c
);
507 static void dbsizeCommand(redisClient
*c
);
508 static void lastsaveCommand(redisClient
*c
);
509 static void saveCommand(redisClient
*c
);
510 static void bgsaveCommand(redisClient
*c
);
511 static void bgrewriteaofCommand(redisClient
*c
);
512 static void shutdownCommand(redisClient
*c
);
513 static void moveCommand(redisClient
*c
);
514 static void renameCommand(redisClient
*c
);
515 static void renamenxCommand(redisClient
*c
);
516 static void lpushCommand(redisClient
*c
);
517 static void rpushCommand(redisClient
*c
);
518 static void lpopCommand(redisClient
*c
);
519 static void rpopCommand(redisClient
*c
);
520 static void llenCommand(redisClient
*c
);
521 static void lindexCommand(redisClient
*c
);
522 static void lrangeCommand(redisClient
*c
);
523 static void ltrimCommand(redisClient
*c
);
524 static void typeCommand(redisClient
*c
);
525 static void lsetCommand(redisClient
*c
);
526 static void saddCommand(redisClient
*c
);
527 static void sremCommand(redisClient
*c
);
528 static void smoveCommand(redisClient
*c
);
529 static void sismemberCommand(redisClient
*c
);
530 static void scardCommand(redisClient
*c
);
531 static void spopCommand(redisClient
*c
);
532 static void srandmemberCommand(redisClient
*c
);
533 static void sinterCommand(redisClient
*c
);
534 static void sinterstoreCommand(redisClient
*c
);
535 static void sunionCommand(redisClient
*c
);
536 static void sunionstoreCommand(redisClient
*c
);
537 static void sdiffCommand(redisClient
*c
);
538 static void sdiffstoreCommand(redisClient
*c
);
539 static void syncCommand(redisClient
*c
);
540 static void flushdbCommand(redisClient
*c
);
541 static void flushallCommand(redisClient
*c
);
542 static void sortCommand(redisClient
*c
);
543 static void lremCommand(redisClient
*c
);
544 static void rpoplpushcommand(redisClient
*c
);
545 static void infoCommand(redisClient
*c
);
546 static void mgetCommand(redisClient
*c
);
547 static void monitorCommand(redisClient
*c
);
548 static void expireCommand(redisClient
*c
);
549 static void expireatCommand(redisClient
*c
);
550 static void getsetCommand(redisClient
*c
);
551 static void ttlCommand(redisClient
*c
);
552 static void slaveofCommand(redisClient
*c
);
553 static void debugCommand(redisClient
*c
);
554 static void msetCommand(redisClient
*c
);
555 static void msetnxCommand(redisClient
*c
);
556 static void zaddCommand(redisClient
*c
);
557 static void zincrbyCommand(redisClient
*c
);
558 static void zrangeCommand(redisClient
*c
);
559 static void zrangebyscoreCommand(redisClient
*c
);
560 static void zrevrangeCommand(redisClient
*c
);
561 static void zcardCommand(redisClient
*c
);
562 static void zremCommand(redisClient
*c
);
563 static void zscoreCommand(redisClient
*c
);
564 static void zremrangebyscoreCommand(redisClient
*c
);
565 static void multiCommand(redisClient
*c
);
566 static void execCommand(redisClient
*c
);
567 static void blpopCommand(redisClient
*c
);
568 static void brpopCommand(redisClient
*c
);
570 /*================================= Globals ================================= */
573 static struct redisServer server
; /* server global state */
574 static struct redisCommand cmdTable
[] = {
575 {"get",getCommand
,2,REDIS_CMD_INLINE
},
576 {"set",setCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
577 {"setnx",setnxCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
578 {"del",delCommand
,-2,REDIS_CMD_INLINE
},
579 {"exists",existsCommand
,2,REDIS_CMD_INLINE
},
580 {"incr",incrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
581 {"decr",decrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
582 {"mget",mgetCommand
,-2,REDIS_CMD_INLINE
},
583 {"rpush",rpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
584 {"lpush",lpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
585 {"rpop",rpopCommand
,2,REDIS_CMD_INLINE
},
586 {"lpop",lpopCommand
,2,REDIS_CMD_INLINE
},
587 {"brpop",brpopCommand
,-3,REDIS_CMD_INLINE
},
588 {"blpop",blpopCommand
,-3,REDIS_CMD_INLINE
},
589 {"llen",llenCommand
,2,REDIS_CMD_INLINE
},
590 {"lindex",lindexCommand
,3,REDIS_CMD_INLINE
},
591 {"lset",lsetCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
592 {"lrange",lrangeCommand
,4,REDIS_CMD_INLINE
},
593 {"ltrim",ltrimCommand
,4,REDIS_CMD_INLINE
},
594 {"lrem",lremCommand
,4,REDIS_CMD_BULK
},
595 {"rpoplpush",rpoplpushcommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
596 {"sadd",saddCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
597 {"srem",sremCommand
,3,REDIS_CMD_BULK
},
598 {"smove",smoveCommand
,4,REDIS_CMD_BULK
},
599 {"sismember",sismemberCommand
,3,REDIS_CMD_BULK
},
600 {"scard",scardCommand
,2,REDIS_CMD_INLINE
},
601 {"spop",spopCommand
,2,REDIS_CMD_INLINE
},
602 {"srandmember",srandmemberCommand
,2,REDIS_CMD_INLINE
},
603 {"sinter",sinterCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
604 {"sinterstore",sinterstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
605 {"sunion",sunionCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
606 {"sunionstore",sunionstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
607 {"sdiff",sdiffCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
608 {"sdiffstore",sdiffstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
609 {"smembers",sinterCommand
,2,REDIS_CMD_INLINE
},
610 {"zadd",zaddCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
611 {"zincrby",zincrbyCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
612 {"zrem",zremCommand
,3,REDIS_CMD_BULK
},
613 {"zremrangebyscore",zremrangebyscoreCommand
,4,REDIS_CMD_INLINE
},
614 {"zrange",zrangeCommand
,-4,REDIS_CMD_INLINE
},
615 {"zrangebyscore",zrangebyscoreCommand
,-4,REDIS_CMD_INLINE
},
616 {"zrevrange",zrevrangeCommand
,-4,REDIS_CMD_INLINE
},
617 {"zcard",zcardCommand
,2,REDIS_CMD_INLINE
},
618 {"zscore",zscoreCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
619 {"incrby",incrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
620 {"decrby",decrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
621 {"getset",getsetCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
622 {"mset",msetCommand
,-3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
623 {"msetnx",msetnxCommand
,-3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
624 {"randomkey",randomkeyCommand
,1,REDIS_CMD_INLINE
},
625 {"select",selectCommand
,2,REDIS_CMD_INLINE
},
626 {"move",moveCommand
,3,REDIS_CMD_INLINE
},
627 {"rename",renameCommand
,3,REDIS_CMD_INLINE
},
628 {"renamenx",renamenxCommand
,3,REDIS_CMD_INLINE
},
629 {"expire",expireCommand
,3,REDIS_CMD_INLINE
},
630 {"expireat",expireatCommand
,3,REDIS_CMD_INLINE
},
631 {"keys",keysCommand
,2,REDIS_CMD_INLINE
},
632 {"dbsize",dbsizeCommand
,1,REDIS_CMD_INLINE
},
633 {"auth",authCommand
,2,REDIS_CMD_INLINE
},
634 {"ping",pingCommand
,1,REDIS_CMD_INLINE
},
635 {"echo",echoCommand
,2,REDIS_CMD_BULK
},
636 {"save",saveCommand
,1,REDIS_CMD_INLINE
},
637 {"bgsave",bgsaveCommand
,1,REDIS_CMD_INLINE
},
638 {"bgrewriteaof",bgrewriteaofCommand
,1,REDIS_CMD_INLINE
},
639 {"shutdown",shutdownCommand
,1,REDIS_CMD_INLINE
},
640 {"lastsave",lastsaveCommand
,1,REDIS_CMD_INLINE
},
641 {"type",typeCommand
,2,REDIS_CMD_INLINE
},
642 {"multi",multiCommand
,1,REDIS_CMD_INLINE
},
643 {"exec",execCommand
,1,REDIS_CMD_INLINE
},
644 {"sync",syncCommand
,1,REDIS_CMD_INLINE
},
645 {"flushdb",flushdbCommand
,1,REDIS_CMD_INLINE
},
646 {"flushall",flushallCommand
,1,REDIS_CMD_INLINE
},
647 {"sort",sortCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
648 {"info",infoCommand
,1,REDIS_CMD_INLINE
},
649 {"monitor",monitorCommand
,1,REDIS_CMD_INLINE
},
650 {"ttl",ttlCommand
,2,REDIS_CMD_INLINE
},
651 {"slaveof",slaveofCommand
,3,REDIS_CMD_INLINE
},
652 {"debug",debugCommand
,-2,REDIS_CMD_INLINE
},
656 /*============================ Utility functions ============================ */
658 /* Glob-style pattern matching. */
659 int stringmatchlen(const char *pattern
, int patternLen
,
660 const char *string
, int stringLen
, int nocase
)
665 while (pattern
[1] == '*') {
670 return 1; /* match */
672 if (stringmatchlen(pattern
+1, patternLen
-1,
673 string
, stringLen
, nocase
))
674 return 1; /* match */
678 return 0; /* no match */
682 return 0; /* no match */
692 not = pattern
[0] == '^';
699 if (pattern
[0] == '\\') {
702 if (pattern
[0] == string
[0])
704 } else if (pattern
[0] == ']') {
706 } else if (patternLen
== 0) {
710 } else if (pattern
[1] == '-' && patternLen
>= 3) {
711 int start
= pattern
[0];
712 int end
= pattern
[2];
720 start
= tolower(start
);
726 if (c
>= start
&& c
<= end
)
730 if (pattern
[0] == string
[0])
733 if (tolower((int)pattern
[0]) == tolower((int)string
[0]))
743 return 0; /* no match */
749 if (patternLen
>= 2) {
756 if (pattern
[0] != string
[0])
757 return 0; /* no match */
759 if (tolower((int)pattern
[0]) != tolower((int)string
[0]))
760 return 0; /* no match */
768 if (stringLen
== 0) {
769 while(*pattern
== '*') {
776 if (patternLen
== 0 && stringLen
== 0)
781 static void redisLog(int level
, const char *fmt
, ...) {
785 fp
= (server
.logfile
== NULL
) ? stdout
: fopen(server
.logfile
,"a");
789 if (level
>= server
.verbosity
) {
795 strftime(buf
,64,"%d %b %H:%M:%S",localtime(&now
));
796 fprintf(fp
,"%s %c ",buf
,c
[level
]);
797 vfprintf(fp
, fmt
, ap
);
803 if (server
.logfile
) fclose(fp
);
806 /*====================== Hash table type implementation ==================== */
808 /* This is an hash table type that uses the SDS dynamic strings libary as
809 * keys and radis objects as values (objects can hold SDS strings,
812 static void dictVanillaFree(void *privdata
, void *val
)
814 DICT_NOTUSED(privdata
);
818 static void dictListDestructor(void *privdata
, void *val
)
820 DICT_NOTUSED(privdata
);
821 listRelease((list
*)val
);
824 static int sdsDictKeyCompare(void *privdata
, const void *key1
,
828 DICT_NOTUSED(privdata
);
830 l1
= sdslen((sds
)key1
);
831 l2
= sdslen((sds
)key2
);
832 if (l1
!= l2
) return 0;
833 return memcmp(key1
, key2
, l1
) == 0;
836 static void dictRedisObjectDestructor(void *privdata
, void *val
)
838 DICT_NOTUSED(privdata
);
840 if (val
== NULL
) return; /* Values of swapped out keys as set to NULL */
844 static int dictObjKeyCompare(void *privdata
, const void *key1
,
847 const robj
*o1
= key1
, *o2
= key2
;
848 return sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
851 static unsigned int dictObjHash(const void *key
) {
853 return dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
856 static int dictEncObjKeyCompare(void *privdata
, const void *key1
,
859 robj
*o1
= (robj
*) key1
, *o2
= (robj
*) key2
;
862 o1
= getDecodedObject(o1
);
863 o2
= getDecodedObject(o2
);
864 cmp
= sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
870 static unsigned int dictEncObjHash(const void *key
) {
871 robj
*o
= (robj
*) key
;
873 o
= getDecodedObject(o
);
874 unsigned int hash
= dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
879 static dictType setDictType
= {
880 dictEncObjHash
, /* hash function */
883 dictEncObjKeyCompare
, /* key compare */
884 dictRedisObjectDestructor
, /* key destructor */
885 NULL
/* val destructor */
888 static dictType zsetDictType
= {
889 dictEncObjHash
, /* hash function */
892 dictEncObjKeyCompare
, /* key compare */
893 dictRedisObjectDestructor
, /* key destructor */
894 dictVanillaFree
/* val destructor of malloc(sizeof(double)) */
897 static dictType hashDictType
= {
898 dictObjHash
, /* hash function */
901 dictObjKeyCompare
, /* key compare */
902 dictRedisObjectDestructor
, /* key destructor */
903 dictRedisObjectDestructor
/* val destructor */
906 /* Keylist hash table type has unencoded redis objects as keys and
907 * lists as values. It's used for blocking operations (BLPOP) */
908 static dictType keylistDictType
= {
909 dictObjHash
, /* hash function */
912 dictObjKeyCompare
, /* key compare */
913 dictRedisObjectDestructor
, /* key destructor */
914 dictListDestructor
/* val destructor */
917 /* ========================= Random utility functions ======================= */
919 /* Redis generally does not try to recover from out of memory conditions
920 * when allocating objects or strings, it is not clear if it will be possible
921 * to report this condition to the client since the networking layer itself
922 * is based on heap allocation for send buffers, so we simply abort.
923 * At least the code will be simpler to read... */
924 static void oom(const char *msg
) {
925 redisLog(REDIS_WARNING
, "%s: Out of memory\n",msg
);
930 /* ====================== Redis server networking stuff ===================== */
931 static void closeTimedoutClients(void) {
934 time_t now
= time(NULL
);
936 listRewind(server
.clients
);
937 while ((ln
= listYield(server
.clients
)) != NULL
) {
938 c
= listNodeValue(ln
);
939 if (server
.maxidletime
&&
940 !(c
->flags
& REDIS_SLAVE
) && /* no timeout for slaves */
941 !(c
->flags
& REDIS_MASTER
) && /* no timeout for masters */
942 (now
- c
->lastinteraction
> server
.maxidletime
))
944 redisLog(REDIS_DEBUG
,"Closing idle client");
946 } else if (c
->flags
& REDIS_BLOCKED
) {
947 if (c
->blockingto
!= 0 && c
->blockingto
< now
) {
948 addReply(c
,shared
.nullmultibulk
);
955 static int htNeedsResize(dict
*dict
) {
956 long long size
, used
;
958 size
= dictSlots(dict
);
959 used
= dictSize(dict
);
960 return (size
&& used
&& size
> DICT_HT_INITIAL_SIZE
&&
961 (used
*100/size
< REDIS_HT_MINFILL
));
964 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
965 * we resize the hash table to save memory */
966 static void tryResizeHashTables(void) {
969 for (j
= 0; j
< server
.dbnum
; j
++) {
970 if (htNeedsResize(server
.db
[j
].dict
)) {
971 redisLog(REDIS_DEBUG
,"The hash table %d is too sparse, resize it...",j
);
972 dictResize(server
.db
[j
].dict
);
973 redisLog(REDIS_DEBUG
,"Hash table %d resized.",j
);
975 if (htNeedsResize(server
.db
[j
].expires
))
976 dictResize(server
.db
[j
].expires
);
980 /* A background saving child (BGSAVE) terminated its work. Handle this. */
981 void backgroundSaveDoneHandler(int statloc
) {
982 int exitcode
= WEXITSTATUS(statloc
);
983 int bysignal
= WIFSIGNALED(statloc
);
985 if (!bysignal
&& exitcode
== 0) {
986 redisLog(REDIS_NOTICE
,
987 "Background saving terminated with success");
989 server
.lastsave
= time(NULL
);
990 } else if (!bysignal
&& exitcode
!= 0) {
991 redisLog(REDIS_WARNING
, "Background saving error");
993 redisLog(REDIS_WARNING
,
994 "Background saving terminated by signal");
995 rdbRemoveTempFile(server
.bgsavechildpid
);
997 server
.bgsavechildpid
= -1;
998 /* Possibly there are slaves waiting for a BGSAVE in order to be served
999 * (the first stage of SYNC is a bulk transfer of dump.rdb) */
1000 updateSlavesWaitingBgsave(exitcode
== 0 ? REDIS_OK
: REDIS_ERR
);
1003 /* A background append only file rewriting (BGREWRITEAOF) terminated its work.
1005 void backgroundRewriteDoneHandler(int statloc
) {
1006 int exitcode
= WEXITSTATUS(statloc
);
1007 int bysignal
= WIFSIGNALED(statloc
);
1009 if (!bysignal
&& exitcode
== 0) {
1013 redisLog(REDIS_NOTICE
,
1014 "Background append only file rewriting terminated with success");
1015 /* Now it's time to flush the differences accumulated by the parent */
1016 snprintf(tmpfile
,256,"temp-rewriteaof-bg-%d.aof", (int) server
.bgrewritechildpid
);
1017 fd
= open(tmpfile
,O_WRONLY
|O_APPEND
);
1019 redisLog(REDIS_WARNING
, "Not able to open the temp append only file produced by the child: %s", strerror(errno
));
1022 /* Flush our data... */
1023 if (write(fd
,server
.bgrewritebuf
,sdslen(server
.bgrewritebuf
)) !=
1024 (signed) sdslen(server
.bgrewritebuf
)) {
1025 redisLog(REDIS_WARNING
, "Error or short write trying to flush the parent diff of the append log file in the child temp file: %s", strerror(errno
));
1029 redisLog(REDIS_NOTICE
,"Parent diff flushed into the new append log file with success (%lu bytes)",sdslen(server
.bgrewritebuf
));
1030 /* Now our work is to rename the temp file into the stable file. And
1031 * switch the file descriptor used by the server for append only. */
1032 if (rename(tmpfile
,server
.appendfilename
) == -1) {
1033 redisLog(REDIS_WARNING
,"Can't rename the temp append only file into the stable one: %s", strerror(errno
));
1037 /* Mission completed... almost */
1038 redisLog(REDIS_NOTICE
,"Append only file successfully rewritten.");
1039 if (server
.appendfd
!= -1) {
1040 /* If append only is actually enabled... */
1041 close(server
.appendfd
);
1042 server
.appendfd
= fd
;
1044 server
.appendseldb
= -1; /* Make sure it will issue SELECT */
1045 redisLog(REDIS_NOTICE
,"The new append only file was selected for future appends.");
1047 /* If append only is disabled we just generate a dump in this
1048 * format. Why not? */
1051 } else if (!bysignal
&& exitcode
!= 0) {
1052 redisLog(REDIS_WARNING
, "Background append only file rewriting error");
1054 redisLog(REDIS_WARNING
,
1055 "Background append only file rewriting terminated by signal");
1058 sdsfree(server
.bgrewritebuf
);
1059 server
.bgrewritebuf
= sdsempty();
1060 aofRemoveTempFile(server
.bgrewritechildpid
);
1061 server
.bgrewritechildpid
= -1;
1064 static int serverCron(struct aeEventLoop
*eventLoop
, long long id
, void *clientData
) {
1065 int j
, loops
= server
.cronloops
++;
1066 REDIS_NOTUSED(eventLoop
);
1068 REDIS_NOTUSED(clientData
);
1070 /* We take a cached value of the unix time in the global state because
1071 * with virtual memory and aging there is to store the current time
1072 * in objects at every object access, and accuracy is not needed.
1073 * To access a global var is faster than calling time(NULL) */
1074 server
.unixtime
= time(NULL
);
1076 /* Update the global state with the amount of used memory */
1077 server
.usedmemory
= zmalloc_used_memory();
1079 /* Show some info about non-empty databases */
1080 for (j
= 0; j
< server
.dbnum
; j
++) {
1081 long long size
, used
, vkeys
;
1083 size
= dictSlots(server
.db
[j
].dict
);
1084 used
= dictSize(server
.db
[j
].dict
);
1085 vkeys
= dictSize(server
.db
[j
].expires
);
1086 if (!(loops
% 5) && (used
|| vkeys
)) {
1087 redisLog(REDIS_DEBUG
,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j
,used
,vkeys
,size
);
1088 /* dictPrintStats(server.dict); */
1092 /* We don't want to resize the hash tables while a bacground saving
1093 * is in progress: the saving child is created using fork() that is
1094 * implemented with a copy-on-write semantic in most modern systems, so
1095 * if we resize the HT while there is the saving child at work actually
1096 * a lot of memory movements in the parent will cause a lot of pages
1098 if (server
.bgsavechildpid
== -1) tryResizeHashTables();
1100 /* Show information about connected clients */
1102 redisLog(REDIS_DEBUG
,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
1103 listLength(server
.clients
)-listLength(server
.slaves
),
1104 listLength(server
.slaves
),
1106 dictSize(server
.sharingpool
));
1109 /* Close connections of timedout clients */
1110 if ((server
.maxidletime
&& !(loops
% 10)) || server
.blockedclients
)
1111 closeTimedoutClients();
1113 /* Check if a background saving or AOF rewrite in progress terminated */
1114 if (server
.bgsavechildpid
!= -1 || server
.bgrewritechildpid
!= -1) {
1118 if ((pid
= wait3(&statloc
,WNOHANG
,NULL
)) != 0) {
1119 if (pid
== server
.bgsavechildpid
) {
1120 backgroundSaveDoneHandler(statloc
);
1122 backgroundRewriteDoneHandler(statloc
);
1126 /* If there is not a background saving in progress check if
1127 * we have to save now */
1128 time_t now
= time(NULL
);
1129 for (j
= 0; j
< server
.saveparamslen
; j
++) {
1130 struct saveparam
*sp
= server
.saveparams
+j
;
1132 if (server
.dirty
>= sp
->changes
&&
1133 now
-server
.lastsave
> sp
->seconds
) {
1134 redisLog(REDIS_NOTICE
,"%d changes in %d seconds. Saving...",
1135 sp
->changes
, sp
->seconds
);
1136 rdbSaveBackground(server
.dbfilename
);
1142 /* Try to expire a few timed out keys. The algorithm used is adaptive and
1143 * will use few CPU cycles if there are few expiring keys, otherwise
1144 * it will get more aggressive to avoid that too much memory is used by
1145 * keys that can be removed from the keyspace. */
1146 for (j
= 0; j
< server
.dbnum
; j
++) {
1148 redisDb
*db
= server
.db
+j
;
1150 /* Continue to expire if at the end of the cycle more than 25%
1151 * of the keys were expired. */
1153 int num
= dictSize(db
->expires
);
1154 time_t now
= time(NULL
);
1157 if (num
> REDIS_EXPIRELOOKUPS_PER_CRON
)
1158 num
= REDIS_EXPIRELOOKUPS_PER_CRON
;
1163 if ((de
= dictGetRandomKey(db
->expires
)) == NULL
) break;
1164 t
= (time_t) dictGetEntryVal(de
);
1166 deleteKey(db
,dictGetEntryKey(de
));
1170 } while (expired
> REDIS_EXPIRELOOKUPS_PER_CRON
/4);
1173 /* Check if we should connect to a MASTER */
1174 if (server
.replstate
== REDIS_REPL_CONNECT
) {
1175 redisLog(REDIS_NOTICE
,"Connecting to MASTER...");
1176 if (syncWithMaster() == REDIS_OK
) {
1177 redisLog(REDIS_NOTICE
,"MASTER <-> SLAVE sync succeeded");
1183 static void createSharedObjects(void) {
1184 shared
.crlf
= createObject(REDIS_STRING
,sdsnew("\r\n"));
1185 shared
.ok
= createObject(REDIS_STRING
,sdsnew("+OK\r\n"));
1186 shared
.err
= createObject(REDIS_STRING
,sdsnew("-ERR\r\n"));
1187 shared
.emptybulk
= createObject(REDIS_STRING
,sdsnew("$0\r\n\r\n"));
1188 shared
.czero
= createObject(REDIS_STRING
,sdsnew(":0\r\n"));
1189 shared
.cone
= createObject(REDIS_STRING
,sdsnew(":1\r\n"));
1190 shared
.nullbulk
= createObject(REDIS_STRING
,sdsnew("$-1\r\n"));
1191 shared
.nullmultibulk
= createObject(REDIS_STRING
,sdsnew("*-1\r\n"));
1192 shared
.emptymultibulk
= createObject(REDIS_STRING
,sdsnew("*0\r\n"));
1193 shared
.pong
= createObject(REDIS_STRING
,sdsnew("+PONG\r\n"));
1194 shared
.queued
= createObject(REDIS_STRING
,sdsnew("+QUEUED\r\n"));
1195 shared
.wrongtypeerr
= createObject(REDIS_STRING
,sdsnew(
1196 "-ERR Operation against a key holding the wrong kind of value\r\n"));
1197 shared
.nokeyerr
= createObject(REDIS_STRING
,sdsnew(
1198 "-ERR no such key\r\n"));
1199 shared
.syntaxerr
= createObject(REDIS_STRING
,sdsnew(
1200 "-ERR syntax error\r\n"));
1201 shared
.sameobjecterr
= createObject(REDIS_STRING
,sdsnew(
1202 "-ERR source and destination objects are the same\r\n"));
1203 shared
.outofrangeerr
= createObject(REDIS_STRING
,sdsnew(
1204 "-ERR index out of range\r\n"));
1205 shared
.space
= createObject(REDIS_STRING
,sdsnew(" "));
1206 shared
.colon
= createObject(REDIS_STRING
,sdsnew(":"));
1207 shared
.plus
= createObject(REDIS_STRING
,sdsnew("+"));
1208 shared
.select0
= createStringObject("select 0\r\n",10);
1209 shared
.select1
= createStringObject("select 1\r\n",10);
1210 shared
.select2
= createStringObject("select 2\r\n",10);
1211 shared
.select3
= createStringObject("select 3\r\n",10);
1212 shared
.select4
= createStringObject("select 4\r\n",10);
1213 shared
.select5
= createStringObject("select 5\r\n",10);
1214 shared
.select6
= createStringObject("select 6\r\n",10);
1215 shared
.select7
= createStringObject("select 7\r\n",10);
1216 shared
.select8
= createStringObject("select 8\r\n",10);
1217 shared
.select9
= createStringObject("select 9\r\n",10);
1220 static void appendServerSaveParams(time_t seconds
, int changes
) {
1221 server
.saveparams
= zrealloc(server
.saveparams
,sizeof(struct saveparam
)*(server
.saveparamslen
+1));
1222 server
.saveparams
[server
.saveparamslen
].seconds
= seconds
;
1223 server
.saveparams
[server
.saveparamslen
].changes
= changes
;
1224 server
.saveparamslen
++;
1227 static void resetServerSaveParams() {
1228 zfree(server
.saveparams
);
1229 server
.saveparams
= NULL
;
1230 server
.saveparamslen
= 0;
1233 static void initServerConfig() {
1234 server
.dbnum
= REDIS_DEFAULT_DBNUM
;
1235 server
.port
= REDIS_SERVERPORT
;
1236 server
.verbosity
= REDIS_DEBUG
;
1237 server
.maxidletime
= REDIS_MAXIDLETIME
;
1238 server
.saveparams
= NULL
;
1239 server
.logfile
= NULL
; /* NULL = log on standard output */
1240 server
.bindaddr
= NULL
;
1241 server
.glueoutputbuf
= 1;
1242 server
.daemonize
= 0;
1243 server
.appendonly
= 0;
1244 server
.appendfsync
= APPENDFSYNC_ALWAYS
;
1245 server
.lastfsync
= time(NULL
);
1246 server
.appendfd
= -1;
1247 server
.appendseldb
= -1; /* Make sure the first time will not match */
1248 server
.pidfile
= "/var/run/redis.pid";
1249 server
.dbfilename
= "dump.rdb";
1250 server
.appendfilename
= "appendonly.aof";
1251 server
.requirepass
= NULL
;
1252 server
.shareobjects
= 0;
1253 server
.rdbcompression
= 1;
1254 server
.sharingpoolsize
= 1024;
1255 server
.maxclients
= 0;
1256 server
.blockedclients
= 0;
1257 server
.maxmemory
= 0;
1258 server
.vm_enabled
= 0;
1259 server
.vm_page_size
= 256; /* 256 bytes per page */
1260 server
.vm_pages
= 1024*1024*100; /* 104 millions of pages */
1261 server
.vm_max_memory
= 1024LL*1024*1024*1; /* 1 GB of RAM */
1263 resetServerSaveParams();
1265 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1266 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1267 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1268 /* Replication related */
1270 server
.masterauth
= NULL
;
1271 server
.masterhost
= NULL
;
1272 server
.masterport
= 6379;
1273 server
.master
= NULL
;
1274 server
.replstate
= REDIS_REPL_NONE
;
1276 /* Double constants initialization */
1278 R_PosInf
= 1.0/R_Zero
;
1279 R_NegInf
= -1.0/R_Zero
;
1280 R_Nan
= R_Zero
/R_Zero
;
1283 static void initServer() {
1286 signal(SIGHUP
, SIG_IGN
);
1287 signal(SIGPIPE
, SIG_IGN
);
1288 setupSigSegvAction();
1290 server
.clients
= listCreate();
1291 server
.slaves
= listCreate();
1292 server
.monitors
= listCreate();
1293 server
.objfreelist
= listCreate();
1294 createSharedObjects();
1295 server
.el
= aeCreateEventLoop();
1296 server
.db
= zmalloc(sizeof(redisDb
)*server
.dbnum
);
1297 server
.sharingpool
= dictCreate(&setDictType
,NULL
);
1298 server
.fd
= anetTcpServer(server
.neterr
, server
.port
, server
.bindaddr
);
1299 if (server
.fd
== -1) {
1300 redisLog(REDIS_WARNING
, "Opening TCP port: %s", server
.neterr
);
1303 for (j
= 0; j
< server
.dbnum
; j
++) {
1304 server
.db
[j
].dict
= dictCreate(&hashDictType
,NULL
);
1305 server
.db
[j
].expires
= dictCreate(&setDictType
,NULL
);
1306 server
.db
[j
].blockingkeys
= dictCreate(&keylistDictType
,NULL
);
1307 server
.db
[j
].id
= j
;
1309 server
.cronloops
= 0;
1310 server
.bgsavechildpid
= -1;
1311 server
.bgrewritechildpid
= -1;
1312 server
.bgrewritebuf
= sdsempty();
1313 server
.lastsave
= time(NULL
);
1315 server
.usedmemory
= 0;
1316 server
.stat_numcommands
= 0;
1317 server
.stat_numconnections
= 0;
1318 server
.stat_starttime
= time(NULL
);
1319 server
.unixtime
= time(NULL
);
1320 aeCreateTimeEvent(server
.el
, 1, serverCron
, NULL
, NULL
);
1322 if (server
.appendonly
) {
1323 server
.appendfd
= open(server
.appendfilename
,O_WRONLY
|O_APPEND
|O_CREAT
,0644);
1324 if (server
.appendfd
== -1) {
1325 redisLog(REDIS_WARNING
, "Can't open the append-only file: %s",
1331 if (server
.vm_enabled
) vmInit();
1334 /* Empty the whole database */
1335 static long long emptyDb() {
1337 long long removed
= 0;
1339 for (j
= 0; j
< server
.dbnum
; j
++) {
1340 removed
+= dictSize(server
.db
[j
].dict
);
1341 dictEmpty(server
.db
[j
].dict
);
1342 dictEmpty(server
.db
[j
].expires
);
1347 static int yesnotoi(char *s
) {
1348 if (!strcasecmp(s
,"yes")) return 1;
1349 else if (!strcasecmp(s
,"no")) return 0;
1353 /* I agree, this is a very rudimental way to load a configuration...
1354 will improve later if the config gets more complex */
1355 static void loadServerConfig(char *filename
) {
1357 char buf
[REDIS_CONFIGLINE_MAX
+1], *err
= NULL
;
1361 if (filename
[0] == '-' && filename
[1] == '\0')
1364 if ((fp
= fopen(filename
,"r")) == NULL
) {
1365 redisLog(REDIS_WARNING
,"Fatal error, can't open config file");
1370 while(fgets(buf
,REDIS_CONFIGLINE_MAX
+1,fp
) != NULL
) {
1376 line
= sdstrim(line
," \t\r\n");
1378 /* Skip comments and blank lines*/
1379 if (line
[0] == '#' || line
[0] == '\0') {
1384 /* Split into arguments */
1385 argv
= sdssplitlen(line
,sdslen(line
)," ",1,&argc
);
1386 sdstolower(argv
[0]);
1388 /* Execute config directives */
1389 if (!strcasecmp(argv
[0],"timeout") && argc
== 2) {
1390 server
.maxidletime
= atoi(argv
[1]);
1391 if (server
.maxidletime
< 0) {
1392 err
= "Invalid timeout value"; goto loaderr
;
1394 } else if (!strcasecmp(argv
[0],"port") && argc
== 2) {
1395 server
.port
= atoi(argv
[1]);
1396 if (server
.port
< 1 || server
.port
> 65535) {
1397 err
= "Invalid port"; goto loaderr
;
1399 } else if (!strcasecmp(argv
[0],"bind") && argc
== 2) {
1400 server
.bindaddr
= zstrdup(argv
[1]);
1401 } else if (!strcasecmp(argv
[0],"save") && argc
== 3) {
1402 int seconds
= atoi(argv
[1]);
1403 int changes
= atoi(argv
[2]);
1404 if (seconds
< 1 || changes
< 0) {
1405 err
= "Invalid save parameters"; goto loaderr
;
1407 appendServerSaveParams(seconds
,changes
);
1408 } else if (!strcasecmp(argv
[0],"dir") && argc
== 2) {
1409 if (chdir(argv
[1]) == -1) {
1410 redisLog(REDIS_WARNING
,"Can't chdir to '%s': %s",
1411 argv
[1], strerror(errno
));
1414 } else if (!strcasecmp(argv
[0],"loglevel") && argc
== 2) {
1415 if (!strcasecmp(argv
[1],"debug")) server
.verbosity
= REDIS_DEBUG
;
1416 else if (!strcasecmp(argv
[1],"notice")) server
.verbosity
= REDIS_NOTICE
;
1417 else if (!strcasecmp(argv
[1],"warning")) server
.verbosity
= REDIS_WARNING
;
1419 err
= "Invalid log level. Must be one of debug, notice, warning";
1422 } else if (!strcasecmp(argv
[0],"logfile") && argc
== 2) {
1425 server
.logfile
= zstrdup(argv
[1]);
1426 if (!strcasecmp(server
.logfile
,"stdout")) {
1427 zfree(server
.logfile
);
1428 server
.logfile
= NULL
;
1430 if (server
.logfile
) {
1431 /* Test if we are able to open the file. The server will not
1432 * be able to abort just for this problem later... */
1433 logfp
= fopen(server
.logfile
,"a");
1434 if (logfp
== NULL
) {
1435 err
= sdscatprintf(sdsempty(),
1436 "Can't open the log file: %s", strerror(errno
));
1441 } else if (!strcasecmp(argv
[0],"databases") && argc
== 2) {
1442 server
.dbnum
= atoi(argv
[1]);
1443 if (server
.dbnum
< 1) {
1444 err
= "Invalid number of databases"; goto loaderr
;
1446 } else if (!strcasecmp(argv
[0],"maxclients") && argc
== 2) {
1447 server
.maxclients
= atoi(argv
[1]);
1448 } else if (!strcasecmp(argv
[0],"maxmemory") && argc
== 2) {
1449 server
.maxmemory
= strtoll(argv
[1], NULL
, 10);
1450 } else if (!strcasecmp(argv
[0],"slaveof") && argc
== 3) {
1451 server
.masterhost
= sdsnew(argv
[1]);
1452 server
.masterport
= atoi(argv
[2]);
1453 server
.replstate
= REDIS_REPL_CONNECT
;
1454 } else if (!strcasecmp(argv
[0],"masterauth") && argc
== 2) {
1455 server
.masterauth
= zstrdup(argv
[1]);
1456 } else if (!strcasecmp(argv
[0],"glueoutputbuf") && argc
== 2) {
1457 if ((server
.glueoutputbuf
= yesnotoi(argv
[1])) == -1) {
1458 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1460 } else if (!strcasecmp(argv
[0],"shareobjects") && argc
== 2) {
1461 if ((server
.shareobjects
= yesnotoi(argv
[1])) == -1) {
1462 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1464 } else if (!strcasecmp(argv
[0],"rdbcompression") && argc
== 2) {
1465 if ((server
.rdbcompression
= yesnotoi(argv
[1])) == -1) {
1466 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1468 } else if (!strcasecmp(argv
[0],"shareobjectspoolsize") && argc
== 2) {
1469 server
.sharingpoolsize
= atoi(argv
[1]);
1470 if (server
.sharingpoolsize
< 1) {
1471 err
= "invalid object sharing pool size"; goto loaderr
;
1473 } else if (!strcasecmp(argv
[0],"daemonize") && argc
== 2) {
1474 if ((server
.daemonize
= yesnotoi(argv
[1])) == -1) {
1475 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1477 } else if (!strcasecmp(argv
[0],"appendonly") && argc
== 2) {
1478 if ((server
.appendonly
= yesnotoi(argv
[1])) == -1) {
1479 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1481 } else if (!strcasecmp(argv
[0],"appendfsync") && argc
== 2) {
1482 if (!strcasecmp(argv
[1],"no")) {
1483 server
.appendfsync
= APPENDFSYNC_NO
;
1484 } else if (!strcasecmp(argv
[1],"always")) {
1485 server
.appendfsync
= APPENDFSYNC_ALWAYS
;
1486 } else if (!strcasecmp(argv
[1],"everysec")) {
1487 server
.appendfsync
= APPENDFSYNC_EVERYSEC
;
1489 err
= "argument must be 'no', 'always' or 'everysec'";
1492 } else if (!strcasecmp(argv
[0],"requirepass") && argc
== 2) {
1493 server
.requirepass
= zstrdup(argv
[1]);
1494 } else if (!strcasecmp(argv
[0],"pidfile") && argc
== 2) {
1495 server
.pidfile
= zstrdup(argv
[1]);
1496 } else if (!strcasecmp(argv
[0],"dbfilename") && argc
== 2) {
1497 server
.dbfilename
= zstrdup(argv
[1]);
1498 } else if (!strcasecmp(argv
[0],"vm-enabled") && argc
== 2) {
1499 if ((server
.vm_enabled
= yesnotoi(argv
[1])) == -1) {
1500 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1503 err
= "Bad directive or wrong number of arguments"; goto loaderr
;
1505 for (j
= 0; j
< argc
; j
++)
1510 if (fp
!= stdin
) fclose(fp
);
1514 fprintf(stderr
, "\n*** FATAL CONFIG FILE ERROR ***\n");
1515 fprintf(stderr
, "Reading the configuration file, at line %d\n", linenum
);
1516 fprintf(stderr
, ">>> '%s'\n", line
);
1517 fprintf(stderr
, "%s\n", err
);
1521 static void freeClientArgv(redisClient
*c
) {
1524 for (j
= 0; j
< c
->argc
; j
++)
1525 decrRefCount(c
->argv
[j
]);
1526 for (j
= 0; j
< c
->mbargc
; j
++)
1527 decrRefCount(c
->mbargv
[j
]);
1532 static void freeClient(redisClient
*c
) {
1535 /* Note that if the client we are freeing is blocked into a blocking
1536 * call, we have to set querybuf to NULL *before* to call unblockClient()
1537 * to avoid processInputBuffer() will get called. Also it is important
1538 * to remove the file events after this, because this call adds
1539 * the READABLE event. */
1540 sdsfree(c
->querybuf
);
1542 if (c
->flags
& REDIS_BLOCKED
)
1545 aeDeleteFileEvent(server
.el
,c
->fd
,AE_READABLE
);
1546 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1547 listRelease(c
->reply
);
1550 ln
= listSearchKey(server
.clients
,c
);
1551 redisAssert(ln
!= NULL
);
1552 listDelNode(server
.clients
,ln
);
1553 if (c
->flags
& REDIS_SLAVE
) {
1554 if (c
->replstate
== REDIS_REPL_SEND_BULK
&& c
->repldbfd
!= -1)
1556 list
*l
= (c
->flags
& REDIS_MONITOR
) ? server
.monitors
: server
.slaves
;
1557 ln
= listSearchKey(l
,c
);
1558 redisAssert(ln
!= NULL
);
1561 if (c
->flags
& REDIS_MASTER
) {
1562 server
.master
= NULL
;
1563 server
.replstate
= REDIS_REPL_CONNECT
;
1567 freeClientMultiState(c
);
1571 #define GLUEREPLY_UP_TO (1024)
1572 static void glueReplyBuffersIfNeeded(redisClient
*c
) {
1574 char buf
[GLUEREPLY_UP_TO
];
1578 listRewind(c
->reply
);
1579 while((ln
= listYield(c
->reply
))) {
1583 objlen
= sdslen(o
->ptr
);
1584 if (copylen
+ objlen
<= GLUEREPLY_UP_TO
) {
1585 memcpy(buf
+copylen
,o
->ptr
,objlen
);
1587 listDelNode(c
->reply
,ln
);
1589 if (copylen
== 0) return;
1593 /* Now the output buffer is empty, add the new single element */
1594 o
= createObject(REDIS_STRING
,sdsnewlen(buf
,copylen
));
1595 listAddNodeHead(c
->reply
,o
);
1598 static void sendReplyToClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1599 redisClient
*c
= privdata
;
1600 int nwritten
= 0, totwritten
= 0, objlen
;
1603 REDIS_NOTUSED(mask
);
1605 /* Use writev() if we have enough buffers to send */
1606 if (!server
.glueoutputbuf
&&
1607 listLength(c
->reply
) > REDIS_WRITEV_THRESHOLD
&&
1608 !(c
->flags
& REDIS_MASTER
))
1610 sendReplyToClientWritev(el
, fd
, privdata
, mask
);
1614 while(listLength(c
->reply
)) {
1615 if (server
.glueoutputbuf
&& listLength(c
->reply
) > 1)
1616 glueReplyBuffersIfNeeded(c
);
1618 o
= listNodeValue(listFirst(c
->reply
));
1619 objlen
= sdslen(o
->ptr
);
1622 listDelNode(c
->reply
,listFirst(c
->reply
));
1626 if (c
->flags
& REDIS_MASTER
) {
1627 /* Don't reply to a master */
1628 nwritten
= objlen
- c
->sentlen
;
1630 nwritten
= write(fd
, ((char*)o
->ptr
)+c
->sentlen
, objlen
- c
->sentlen
);
1631 if (nwritten
<= 0) break;
1633 c
->sentlen
+= nwritten
;
1634 totwritten
+= nwritten
;
1635 /* If we fully sent the object on head go to the next one */
1636 if (c
->sentlen
== objlen
) {
1637 listDelNode(c
->reply
,listFirst(c
->reply
));
1640 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1641 * bytes, in a single threaded server it's a good idea to serve
1642 * other clients as well, even if a very large request comes from
1643 * super fast link that is always able to accept data (in real world
1644 * scenario think about 'KEYS *' against the loopback interfae) */
1645 if (totwritten
> REDIS_MAX_WRITE_PER_EVENT
) break;
1647 if (nwritten
== -1) {
1648 if (errno
== EAGAIN
) {
1651 redisLog(REDIS_DEBUG
,
1652 "Error writing to client: %s", strerror(errno
));
1657 if (totwritten
> 0) c
->lastinteraction
= time(NULL
);
1658 if (listLength(c
->reply
) == 0) {
1660 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1664 static void sendReplyToClientWritev(aeEventLoop
*el
, int fd
, void *privdata
, int mask
)
1666 redisClient
*c
= privdata
;
1667 int nwritten
= 0, totwritten
= 0, objlen
, willwrite
;
1669 struct iovec iov
[REDIS_WRITEV_IOVEC_COUNT
];
1670 int offset
, ion
= 0;
1672 REDIS_NOTUSED(mask
);
1675 while (listLength(c
->reply
)) {
1676 offset
= c
->sentlen
;
1680 /* fill-in the iov[] array */
1681 for(node
= listFirst(c
->reply
); node
; node
= listNextNode(node
)) {
1682 o
= listNodeValue(node
);
1683 objlen
= sdslen(o
->ptr
);
1685 if (totwritten
+ objlen
- offset
> REDIS_MAX_WRITE_PER_EVENT
)
1688 if(ion
== REDIS_WRITEV_IOVEC_COUNT
)
1689 break; /* no more iovecs */
1691 iov
[ion
].iov_base
= ((char*)o
->ptr
) + offset
;
1692 iov
[ion
].iov_len
= objlen
- offset
;
1693 willwrite
+= objlen
- offset
;
1694 offset
= 0; /* just for the first item */
1701 /* write all collected blocks at once */
1702 if((nwritten
= writev(fd
, iov
, ion
)) < 0) {
1703 if (errno
!= EAGAIN
) {
1704 redisLog(REDIS_DEBUG
,
1705 "Error writing to client: %s", strerror(errno
));
1712 totwritten
+= nwritten
;
1713 offset
= c
->sentlen
;
1715 /* remove written robjs from c->reply */
1716 while (nwritten
&& listLength(c
->reply
)) {
1717 o
= listNodeValue(listFirst(c
->reply
));
1718 objlen
= sdslen(o
->ptr
);
1720 if(nwritten
>= objlen
- offset
) {
1721 listDelNode(c
->reply
, listFirst(c
->reply
));
1722 nwritten
-= objlen
- offset
;
1726 c
->sentlen
+= nwritten
;
1734 c
->lastinteraction
= time(NULL
);
1736 if (listLength(c
->reply
) == 0) {
1738 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1742 static struct redisCommand
*lookupCommand(char *name
) {
1744 while(cmdTable
[j
].name
!= NULL
) {
1745 if (!strcasecmp(name
,cmdTable
[j
].name
)) return &cmdTable
[j
];
1751 /* resetClient prepare the client to process the next command */
1752 static void resetClient(redisClient
*c
) {
1758 /* Call() is the core of Redis execution of a command */
1759 static void call(redisClient
*c
, struct redisCommand
*cmd
) {
1762 dirty
= server
.dirty
;
1764 if (server
.appendonly
&& server
.dirty
-dirty
)
1765 feedAppendOnlyFile(cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1766 if (server
.dirty
-dirty
&& listLength(server
.slaves
))
1767 replicationFeedSlaves(server
.slaves
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1768 if (listLength(server
.monitors
))
1769 replicationFeedSlaves(server
.monitors
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1770 server
.stat_numcommands
++;
1773 /* If this function gets called we already read a whole
1774 * command, argments are in the client argv/argc fields.
1775 * processCommand() execute the command or prepare the
1776 * server for a bulk read from the client.
1778 * If 1 is returned the client is still alive and valid and
1779 * and other operations can be performed by the caller. Otherwise
1780 * if 0 is returned the client was destroied (i.e. after QUIT). */
1781 static int processCommand(redisClient
*c
) {
1782 struct redisCommand
*cmd
;
1784 /* Free some memory if needed (maxmemory setting) */
1785 if (server
.maxmemory
) freeMemoryIfNeeded();
1787 /* Handle the multi bulk command type. This is an alternative protocol
1788 * supported by Redis in order to receive commands that are composed of
1789 * multiple binary-safe "bulk" arguments. The latency of processing is
1790 * a bit higher but this allows things like multi-sets, so if this
1791 * protocol is used only for MSET and similar commands this is a big win. */
1792 if (c
->multibulk
== 0 && c
->argc
== 1 && ((char*)(c
->argv
[0]->ptr
))[0] == '*') {
1793 c
->multibulk
= atoi(((char*)c
->argv
[0]->ptr
)+1);
1794 if (c
->multibulk
<= 0) {
1798 decrRefCount(c
->argv
[c
->argc
-1]);
1802 } else if (c
->multibulk
) {
1803 if (c
->bulklen
== -1) {
1804 if (((char*)c
->argv
[0]->ptr
)[0] != '$') {
1805 addReplySds(c
,sdsnew("-ERR multi bulk protocol error\r\n"));
1809 int bulklen
= atoi(((char*)c
->argv
[0]->ptr
)+1);
1810 decrRefCount(c
->argv
[0]);
1811 if (bulklen
< 0 || bulklen
> 1024*1024*1024) {
1813 addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n"));
1818 c
->bulklen
= bulklen
+2; /* add two bytes for CR+LF */
1822 c
->mbargv
= zrealloc(c
->mbargv
,(sizeof(robj
*))*(c
->mbargc
+1));
1823 c
->mbargv
[c
->mbargc
] = c
->argv
[0];
1827 if (c
->multibulk
== 0) {
1831 /* Here we need to swap the multi-bulk argc/argv with the
1832 * normal argc/argv of the client structure. */
1834 c
->argv
= c
->mbargv
;
1835 c
->mbargv
= auxargv
;
1838 c
->argc
= c
->mbargc
;
1839 c
->mbargc
= auxargc
;
1841 /* We need to set bulklen to something different than -1
1842 * in order for the code below to process the command without
1843 * to try to read the last argument of a bulk command as
1844 * a special argument. */
1846 /* continue below and process the command */
1853 /* -- end of multi bulk commands processing -- */
1855 /* The QUIT command is handled as a special case. Normal command
1856 * procs are unable to close the client connection safely */
1857 if (!strcasecmp(c
->argv
[0]->ptr
,"quit")) {
1861 cmd
= lookupCommand(c
->argv
[0]->ptr
);
1864 sdscatprintf(sdsempty(), "-ERR unknown command '%s'\r\n",
1865 (char*)c
->argv
[0]->ptr
));
1868 } else if ((cmd
->arity
> 0 && cmd
->arity
!= c
->argc
) ||
1869 (c
->argc
< -cmd
->arity
)) {
1871 sdscatprintf(sdsempty(),
1872 "-ERR wrong number of arguments for '%s' command\r\n",
1876 } else if (server
.maxmemory
&& cmd
->flags
& REDIS_CMD_DENYOOM
&& zmalloc_used_memory() > server
.maxmemory
) {
1877 addReplySds(c
,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1880 } else if (cmd
->flags
& REDIS_CMD_BULK
&& c
->bulklen
== -1) {
1881 int bulklen
= atoi(c
->argv
[c
->argc
-1]->ptr
);
1883 decrRefCount(c
->argv
[c
->argc
-1]);
1884 if (bulklen
< 0 || bulklen
> 1024*1024*1024) {
1886 addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n"));
1891 c
->bulklen
= bulklen
+2; /* add two bytes for CR+LF */
1892 /* It is possible that the bulk read is already in the
1893 * buffer. Check this condition and handle it accordingly.
1894 * This is just a fast path, alternative to call processInputBuffer().
1895 * It's a good idea since the code is small and this condition
1896 * happens most of the times. */
1897 if ((signed)sdslen(c
->querybuf
) >= c
->bulklen
) {
1898 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
1900 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
1905 /* Let's try to share objects on the command arguments vector */
1906 if (server
.shareobjects
) {
1908 for(j
= 1; j
< c
->argc
; j
++)
1909 c
->argv
[j
] = tryObjectSharing(c
->argv
[j
]);
1911 /* Let's try to encode the bulk object to save space. */
1912 if (cmd
->flags
& REDIS_CMD_BULK
)
1913 tryObjectEncoding(c
->argv
[c
->argc
-1]);
1915 /* Check if the user is authenticated */
1916 if (server
.requirepass
&& !c
->authenticated
&& cmd
->proc
!= authCommand
) {
1917 addReplySds(c
,sdsnew("-ERR operation not permitted\r\n"));
1922 /* Exec the command */
1923 if (c
->flags
& REDIS_MULTI
&& cmd
->proc
!= execCommand
) {
1924 queueMultiCommand(c
,cmd
);
1925 addReply(c
,shared
.queued
);
1930 /* Prepare the client for the next command */
1931 if (c
->flags
& REDIS_CLOSE
) {
1939 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
) {
1943 /* (args*2)+1 is enough room for args, spaces, newlines */
1944 robj
*static_outv
[REDIS_STATIC_ARGS
*2+1];
1946 if (argc
<= REDIS_STATIC_ARGS
) {
1949 outv
= zmalloc(sizeof(robj
*)*(argc
*2+1));
1952 for (j
= 0; j
< argc
; j
++) {
1953 if (j
!= 0) outv
[outc
++] = shared
.space
;
1954 if ((cmd
->flags
& REDIS_CMD_BULK
) && j
== argc
-1) {
1957 lenobj
= createObject(REDIS_STRING
,
1958 sdscatprintf(sdsempty(),"%lu\r\n",
1959 (unsigned long) stringObjectLen(argv
[j
])));
1960 lenobj
->refcount
= 0;
1961 outv
[outc
++] = lenobj
;
1963 outv
[outc
++] = argv
[j
];
1965 outv
[outc
++] = shared
.crlf
;
1967 /* Increment all the refcounts at start and decrement at end in order to
1968 * be sure to free objects if there is no slave in a replication state
1969 * able to be feed with commands */
1970 for (j
= 0; j
< outc
; j
++) incrRefCount(outv
[j
]);
1972 while((ln
= listYield(slaves
))) {
1973 redisClient
*slave
= ln
->value
;
1975 /* Don't feed slaves that are still waiting for BGSAVE to start */
1976 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) continue;
1978 /* Feed all the other slaves, MONITORs and so on */
1979 if (slave
->slaveseldb
!= dictid
) {
1983 case 0: selectcmd
= shared
.select0
; break;
1984 case 1: selectcmd
= shared
.select1
; break;
1985 case 2: selectcmd
= shared
.select2
; break;
1986 case 3: selectcmd
= shared
.select3
; break;
1987 case 4: selectcmd
= shared
.select4
; break;
1988 case 5: selectcmd
= shared
.select5
; break;
1989 case 6: selectcmd
= shared
.select6
; break;
1990 case 7: selectcmd
= shared
.select7
; break;
1991 case 8: selectcmd
= shared
.select8
; break;
1992 case 9: selectcmd
= shared
.select9
; break;
1994 selectcmd
= createObject(REDIS_STRING
,
1995 sdscatprintf(sdsempty(),"select %d\r\n",dictid
));
1996 selectcmd
->refcount
= 0;
1999 addReply(slave
,selectcmd
);
2000 slave
->slaveseldb
= dictid
;
2002 for (j
= 0; j
< outc
; j
++) addReply(slave
,outv
[j
]);
2004 for (j
= 0; j
< outc
; j
++) decrRefCount(outv
[j
]);
2005 if (outv
!= static_outv
) zfree(outv
);
2008 static void processInputBuffer(redisClient
*c
) {
2010 /* Before to process the input buffer, make sure the client is not
2011 * waitig for a blocking operation such as BLPOP. Note that the first
2012 * iteration the client is never blocked, otherwise the processInputBuffer
2013 * would not be called at all, but after the execution of the first commands
2014 * in the input buffer the client may be blocked, and the "goto again"
2015 * will try to reiterate. The following line will make it return asap. */
2016 if (c
->flags
& REDIS_BLOCKED
) return;
2017 if (c
->bulklen
== -1) {
2018 /* Read the first line of the query */
2019 char *p
= strchr(c
->querybuf
,'\n');
2026 query
= c
->querybuf
;
2027 c
->querybuf
= sdsempty();
2028 querylen
= 1+(p
-(query
));
2029 if (sdslen(query
) > querylen
) {
2030 /* leave data after the first line of the query in the buffer */
2031 c
->querybuf
= sdscatlen(c
->querybuf
,query
+querylen
,sdslen(query
)-querylen
);
2033 *p
= '\0'; /* remove "\n" */
2034 if (*(p
-1) == '\r') *(p
-1) = '\0'; /* and "\r" if any */
2035 sdsupdatelen(query
);
2037 /* Now we can split the query in arguments */
2038 argv
= sdssplitlen(query
,sdslen(query
)," ",1,&argc
);
2041 if (c
->argv
) zfree(c
->argv
);
2042 c
->argv
= zmalloc(sizeof(robj
*)*argc
);
2044 for (j
= 0; j
< argc
; j
++) {
2045 if (sdslen(argv
[j
])) {
2046 c
->argv
[c
->argc
] = createObject(REDIS_STRING
,argv
[j
]);
2054 /* Execute the command. If the client is still valid
2055 * after processCommand() return and there is something
2056 * on the query buffer try to process the next command. */
2057 if (processCommand(c
) && sdslen(c
->querybuf
)) goto again
;
2059 /* Nothing to process, argc == 0. Just process the query
2060 * buffer if it's not empty or return to the caller */
2061 if (sdslen(c
->querybuf
)) goto again
;
2064 } else if (sdslen(c
->querybuf
) >= REDIS_REQUEST_MAX_SIZE
) {
2065 redisLog(REDIS_DEBUG
, "Client protocol error");
2070 /* Bulk read handling. Note that if we are at this point
2071 the client already sent a command terminated with a newline,
2072 we are reading the bulk data that is actually the last
2073 argument of the command. */
2074 int qbl
= sdslen(c
->querybuf
);
2076 if (c
->bulklen
<= qbl
) {
2077 /* Copy everything but the final CRLF as final argument */
2078 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
2080 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
2081 /* Process the command. If the client is still valid after
2082 * the processing and there is more data in the buffer
2083 * try to parse it. */
2084 if (processCommand(c
) && sdslen(c
->querybuf
)) goto again
;
2090 static void readQueryFromClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
2091 redisClient
*c
= (redisClient
*) privdata
;
2092 char buf
[REDIS_IOBUF_LEN
];
2095 REDIS_NOTUSED(mask
);
2097 nread
= read(fd
, buf
, REDIS_IOBUF_LEN
);
2099 if (errno
== EAGAIN
) {
2102 redisLog(REDIS_DEBUG
, "Reading from client: %s",strerror(errno
));
2106 } else if (nread
== 0) {
2107 redisLog(REDIS_DEBUG
, "Client closed connection");
2112 c
->querybuf
= sdscatlen(c
->querybuf
, buf
, nread
);
2113 c
->lastinteraction
= time(NULL
);
2117 processInputBuffer(c
);
2120 static int selectDb(redisClient
*c
, int id
) {
2121 if (id
< 0 || id
>= server
.dbnum
)
2123 c
->db
= &server
.db
[id
];
2127 static void *dupClientReplyValue(void *o
) {
2128 incrRefCount((robj
*)o
);
2132 static redisClient
*createClient(int fd
) {
2133 redisClient
*c
= zmalloc(sizeof(*c
));
2135 anetNonBlock(NULL
,fd
);
2136 anetTcpNoDelay(NULL
,fd
);
2137 if (!c
) return NULL
;
2140 c
->querybuf
= sdsempty();
2149 c
->lastinteraction
= time(NULL
);
2150 c
->authenticated
= 0;
2151 c
->replstate
= REDIS_REPL_NONE
;
2152 c
->reply
= listCreate();
2153 c
->blockingkeys
= NULL
;
2154 c
->blockingkeysnum
= 0;
2155 listSetFreeMethod(c
->reply
,decrRefCount
);
2156 listSetDupMethod(c
->reply
,dupClientReplyValue
);
2157 if (aeCreateFileEvent(server
.el
, c
->fd
, AE_READABLE
,
2158 readQueryFromClient
, c
) == AE_ERR
) {
2162 listAddNodeTail(server
.clients
,c
);
2163 initClientMultiState(c
);
2167 static void addReply(redisClient
*c
, robj
*obj
) {
2168 if (listLength(c
->reply
) == 0 &&
2169 (c
->replstate
== REDIS_REPL_NONE
||
2170 c
->replstate
== REDIS_REPL_ONLINE
) &&
2171 aeCreateFileEvent(server
.el
, c
->fd
, AE_WRITABLE
,
2172 sendReplyToClient
, c
) == AE_ERR
) return;
2173 listAddNodeTail(c
->reply
,getDecodedObject(obj
));
2176 static void addReplySds(redisClient
*c
, sds s
) {
2177 robj
*o
= createObject(REDIS_STRING
,s
);
2182 static void addReplyDouble(redisClient
*c
, double d
) {
2185 snprintf(buf
,sizeof(buf
),"%.17g",d
);
2186 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n%s\r\n",
2187 (unsigned long) strlen(buf
),buf
));
2190 static void addReplyBulkLen(redisClient
*c
, robj
*obj
) {
2193 if (obj
->encoding
== REDIS_ENCODING_RAW
) {
2194 len
= sdslen(obj
->ptr
);
2196 long n
= (long)obj
->ptr
;
2198 /* Compute how many bytes will take this integer as a radix 10 string */
2204 while((n
= n
/10) != 0) {
2208 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n",(unsigned long)len
));
2211 static void acceptHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
2216 REDIS_NOTUSED(mask
);
2217 REDIS_NOTUSED(privdata
);
2219 cfd
= anetAccept(server
.neterr
, fd
, cip
, &cport
);
2220 if (cfd
== AE_ERR
) {
2221 redisLog(REDIS_DEBUG
,"Accepting client connection: %s", server
.neterr
);
2224 redisLog(REDIS_DEBUG
,"Accepted %s:%d", cip
, cport
);
2225 if ((c
= createClient(cfd
)) == NULL
) {
2226 redisLog(REDIS_WARNING
,"Error allocating resoures for the client");
2227 close(cfd
); /* May be already closed, just ingore errors */
2230 /* If maxclient directive is set and this is one client more... close the
2231 * connection. Note that we create the client instead to check before
2232 * for this condition, since now the socket is already set in nonblocking
2233 * mode and we can send an error for free using the Kernel I/O */
2234 if (server
.maxclients
&& listLength(server
.clients
) > server
.maxclients
) {
2235 char *err
= "-ERR max number of clients reached\r\n";
2237 /* That's a best effort error message, don't check write errors */
2238 if (write(c
->fd
,err
,strlen(err
)) == -1) {
2239 /* Nothing to do, Just to avoid the warning... */
2244 server
.stat_numconnections
++;
2247 /* ======================= Redis objects implementation ===================== */
2249 static robj
*createObject(int type
, void *ptr
) {
2252 if (listLength(server
.objfreelist
)) {
2253 listNode
*head
= listFirst(server
.objfreelist
);
2254 o
= listNodeValue(head
);
2255 listDelNode(server
.objfreelist
,head
);
2257 if (server
.vm_enabled
) {
2258 o
= zmalloc(sizeof(*o
));
2260 o
= zmalloc(sizeof(*o
)-sizeof(struct redisObjectVM
));
2264 o
->encoding
= REDIS_ENCODING_RAW
;
2267 if (server
.vm_enabled
) {
2268 o
->vm
.atime
= server
.unixtime
;
2269 o
->storage
= REDIS_VM_MEMORY
;
2274 static robj
*createStringObject(char *ptr
, size_t len
) {
2275 return createObject(REDIS_STRING
,sdsnewlen(ptr
,len
));
2278 static robj
*createListObject(void) {
2279 list
*l
= listCreate();
2281 listSetFreeMethod(l
,decrRefCount
);
2282 return createObject(REDIS_LIST
,l
);
2285 static robj
*createSetObject(void) {
2286 dict
*d
= dictCreate(&setDictType
,NULL
);
2287 return createObject(REDIS_SET
,d
);
2290 static robj
*createZsetObject(void) {
2291 zset
*zs
= zmalloc(sizeof(*zs
));
2293 zs
->dict
= dictCreate(&zsetDictType
,NULL
);
2294 zs
->zsl
= zslCreate();
2295 return createObject(REDIS_ZSET
,zs
);
2298 static void freeStringObject(robj
*o
) {
2299 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2304 static void freeListObject(robj
*o
) {
2305 listRelease((list
*) o
->ptr
);
2308 static void freeSetObject(robj
*o
) {
2309 dictRelease((dict
*) o
->ptr
);
2312 static void freeZsetObject(robj
*o
) {
2315 dictRelease(zs
->dict
);
2320 static void freeHashObject(robj
*o
) {
2321 dictRelease((dict
*) o
->ptr
);
2324 static void incrRefCount(robj
*o
) {
2325 assert(!server
.vm_enabled
|| o
->storage
== REDIS_VM_MEMORY
);
2329 static void decrRefCount(void *obj
) {
2332 /* REDIS_VM_SWAPPED */
2333 if (server
.vm_enabled
&& o
->storage
== REDIS_VM_SWAPPED
) {
2334 assert(o
->refcount
== 1);
2335 assert(o
->type
== REDIS_STRING
);
2336 freeStringObject(o
);
2337 vmMarkPagesFree(o
->vm
.page
,o
->vm
.usedpages
);
2338 if (listLength(server
.objfreelist
) > REDIS_OBJFREELIST_MAX
||
2339 !listAddNodeHead(server
.objfreelist
,o
))
2343 /* REDIS_VM_MEMORY */
2344 if (--(o
->refcount
) == 0) {
2346 case REDIS_STRING
: freeStringObject(o
); break;
2347 case REDIS_LIST
: freeListObject(o
); break;
2348 case REDIS_SET
: freeSetObject(o
); break;
2349 case REDIS_ZSET
: freeZsetObject(o
); break;
2350 case REDIS_HASH
: freeHashObject(o
); break;
2351 default: redisAssert(0 != 0); break;
2353 if (listLength(server
.objfreelist
) > REDIS_OBJFREELIST_MAX
||
2354 !listAddNodeHead(server
.objfreelist
,o
))
2359 static robj
*lookupKey(redisDb
*db
, robj
*key
) {
2360 dictEntry
*de
= dictFind(db
->dict
,key
);
2362 robj
*o
= dictGetEntryVal(de
);
2364 /* Update the access time of the key for the aging algorithm. */
2365 if (server
.vm_enabled
) o
->vm
.atime
= server
.unixtime
;
2372 static robj
*lookupKeyRead(redisDb
*db
, robj
*key
) {
2373 expireIfNeeded(db
,key
);
2374 return lookupKey(db
,key
);
2377 static robj
*lookupKeyWrite(redisDb
*db
, robj
*key
) {
2378 deleteIfVolatile(db
,key
);
2379 return lookupKey(db
,key
);
2382 static int deleteKey(redisDb
*db
, robj
*key
) {
2385 /* We need to protect key from destruction: after the first dictDelete()
2386 * it may happen that 'key' is no longer valid if we don't increment
2387 * it's count. This may happen when we get the object reference directly
2388 * from the hash table with dictRandomKey() or dict iterators */
2390 if (dictSize(db
->expires
)) dictDelete(db
->expires
,key
);
2391 retval
= dictDelete(db
->dict
,key
);
2394 return retval
== DICT_OK
;
2397 /* Try to share an object against the shared objects pool */
2398 static robj
*tryObjectSharing(robj
*o
) {
2399 struct dictEntry
*de
;
2402 if (o
== NULL
|| server
.shareobjects
== 0) return o
;
2404 redisAssert(o
->type
== REDIS_STRING
);
2405 de
= dictFind(server
.sharingpool
,o
);
2407 robj
*shared
= dictGetEntryKey(de
);
2409 c
= ((unsigned long) dictGetEntryVal(de
))+1;
2410 dictGetEntryVal(de
) = (void*) c
;
2411 incrRefCount(shared
);
2415 /* Here we are using a stream algorihtm: Every time an object is
2416 * shared we increment its count, everytime there is a miss we
2417 * recrement the counter of a random object. If this object reaches
2418 * zero we remove the object and put the current object instead. */
2419 if (dictSize(server
.sharingpool
) >=
2420 server
.sharingpoolsize
) {
2421 de
= dictGetRandomKey(server
.sharingpool
);
2422 redisAssert(de
!= NULL
);
2423 c
= ((unsigned long) dictGetEntryVal(de
))-1;
2424 dictGetEntryVal(de
) = (void*) c
;
2426 dictDelete(server
.sharingpool
,de
->key
);
2429 c
= 0; /* If the pool is empty we want to add this object */
2434 retval
= dictAdd(server
.sharingpool
,o
,(void*)1);
2435 redisAssert(retval
== DICT_OK
);
2442 /* Check if the nul-terminated string 's' can be represented by a long
2443 * (that is, is a number that fits into long without any other space or
2444 * character before or after the digits).
2446 * If so, the function returns REDIS_OK and *longval is set to the value
2447 * of the number. Otherwise REDIS_ERR is returned */
2448 static int isStringRepresentableAsLong(sds s
, long *longval
) {
2449 char buf
[32], *endptr
;
2453 value
= strtol(s
, &endptr
, 10);
2454 if (endptr
[0] != '\0') return REDIS_ERR
;
2455 slen
= snprintf(buf
,32,"%ld",value
);
2457 /* If the number converted back into a string is not identical
2458 * then it's not possible to encode the string as integer */
2459 if (sdslen(s
) != (unsigned)slen
|| memcmp(buf
,s
,slen
)) return REDIS_ERR
;
2460 if (longval
) *longval
= value
;
2464 /* Try to encode a string object in order to save space */
2465 static int tryObjectEncoding(robj
*o
) {
2469 if (o
->encoding
!= REDIS_ENCODING_RAW
)
2470 return REDIS_ERR
; /* Already encoded */
2472 /* It's not save to encode shared objects: shared objects can be shared
2473 * everywhere in the "object space" of Redis. Encoded objects can only
2474 * appear as "values" (and not, for instance, as keys) */
2475 if (o
->refcount
> 1) return REDIS_ERR
;
2477 /* Currently we try to encode only strings */
2478 redisAssert(o
->type
== REDIS_STRING
);
2480 /* Check if we can represent this string as a long integer */
2481 if (isStringRepresentableAsLong(s
,&value
) == REDIS_ERR
) return REDIS_ERR
;
2483 /* Ok, this object can be encoded */
2484 o
->encoding
= REDIS_ENCODING_INT
;
2486 o
->ptr
= (void*) value
;
2490 /* Get a decoded version of an encoded object (returned as a new object).
2491 * If the object is already raw-encoded just increment the ref count. */
2492 static robj
*getDecodedObject(robj
*o
) {
2495 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2499 if (o
->type
== REDIS_STRING
&& o
->encoding
== REDIS_ENCODING_INT
) {
2502 snprintf(buf
,32,"%ld",(long)o
->ptr
);
2503 dec
= createStringObject(buf
,strlen(buf
));
2506 redisAssert(1 != 1);
2510 /* Compare two string objects via strcmp() or alike.
2511 * Note that the objects may be integer-encoded. In such a case we
2512 * use snprintf() to get a string representation of the numbers on the stack
2513 * and compare the strings, it's much faster than calling getDecodedObject().
2515 * Important note: if objects are not integer encoded, but binary-safe strings,
2516 * sdscmp() from sds.c will apply memcmp() so this function ca be considered
2518 static int compareStringObjects(robj
*a
, robj
*b
) {
2519 redisAssert(a
->type
== REDIS_STRING
&& b
->type
== REDIS_STRING
);
2520 char bufa
[128], bufb
[128], *astr
, *bstr
;
2523 if (a
== b
) return 0;
2524 if (a
->encoding
!= REDIS_ENCODING_RAW
) {
2525 snprintf(bufa
,sizeof(bufa
),"%ld",(long) a
->ptr
);
2531 if (b
->encoding
!= REDIS_ENCODING_RAW
) {
2532 snprintf(bufb
,sizeof(bufb
),"%ld",(long) b
->ptr
);
2538 return bothsds
? sdscmp(astr
,bstr
) : strcmp(astr
,bstr
);
2541 static size_t stringObjectLen(robj
*o
) {
2542 redisAssert(o
->type
== REDIS_STRING
);
2543 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2544 return sdslen(o
->ptr
);
2548 return snprintf(buf
,32,"%ld",(long)o
->ptr
);
2552 /*============================ RDB saving/loading =========================== */
2554 static int rdbSaveType(FILE *fp
, unsigned char type
) {
2555 if (fwrite(&type
,1,1,fp
) == 0) return -1;
2559 static int rdbSaveTime(FILE *fp
, time_t t
) {
2560 int32_t t32
= (int32_t) t
;
2561 if (fwrite(&t32
,4,1,fp
) == 0) return -1;
2565 /* check rdbLoadLen() comments for more info */
2566 static int rdbSaveLen(FILE *fp
, uint32_t len
) {
2567 unsigned char buf
[2];
2570 /* Save a 6 bit len */
2571 buf
[0] = (len
&0xFF)|(REDIS_RDB_6BITLEN
<<6);
2572 if (fwrite(buf
,1,1,fp
) == 0) return -1;
2573 } else if (len
< (1<<14)) {
2574 /* Save a 14 bit len */
2575 buf
[0] = ((len
>>8)&0xFF)|(REDIS_RDB_14BITLEN
<<6);
2577 if (fwrite(buf
,2,1,fp
) == 0) return -1;
2579 /* Save a 32 bit len */
2580 buf
[0] = (REDIS_RDB_32BITLEN
<<6);
2581 if (fwrite(buf
,1,1,fp
) == 0) return -1;
2583 if (fwrite(&len
,4,1,fp
) == 0) return -1;
2588 /* String objects in the form "2391" "-100" without any space and with a
2589 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2590 * encoded as integers to save space */
2591 static int rdbTryIntegerEncoding(sds s
, unsigned char *enc
) {
2593 char *endptr
, buf
[32];
2595 /* Check if it's possible to encode this value as a number */
2596 value
= strtoll(s
, &endptr
, 10);
2597 if (endptr
[0] != '\0') return 0;
2598 snprintf(buf
,32,"%lld",value
);
2600 /* If the number converted back into a string is not identical
2601 * then it's not possible to encode the string as integer */
2602 if (strlen(buf
) != sdslen(s
) || memcmp(buf
,s
,sdslen(s
))) return 0;
2604 /* Finally check if it fits in our ranges */
2605 if (value
>= -(1<<7) && value
<= (1<<7)-1) {
2606 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT8
;
2607 enc
[1] = value
&0xFF;
2609 } else if (value
>= -(1<<15) && value
<= (1<<15)-1) {
2610 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT16
;
2611 enc
[1] = value
&0xFF;
2612 enc
[2] = (value
>>8)&0xFF;
2614 } else if (value
>= -((long long)1<<31) && value
<= ((long long)1<<31)-1) {
2615 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT32
;
2616 enc
[1] = value
&0xFF;
2617 enc
[2] = (value
>>8)&0xFF;
2618 enc
[3] = (value
>>16)&0xFF;
2619 enc
[4] = (value
>>24)&0xFF;
2626 static int rdbSaveLzfStringObject(FILE *fp
, robj
*obj
) {
2627 unsigned int comprlen
, outlen
;
2631 /* We require at least four bytes compression for this to be worth it */
2632 outlen
= sdslen(obj
->ptr
)-4;
2633 if (outlen
<= 0) return 0;
2634 if ((out
= zmalloc(outlen
+1)) == NULL
) return 0;
2635 comprlen
= lzf_compress(obj
->ptr
, sdslen(obj
->ptr
), out
, outlen
);
2636 if (comprlen
== 0) {
2640 /* Data compressed! Let's save it on disk */
2641 byte
= (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_LZF
;
2642 if (fwrite(&byte
,1,1,fp
) == 0) goto writeerr
;
2643 if (rdbSaveLen(fp
,comprlen
) == -1) goto writeerr
;
2644 if (rdbSaveLen(fp
,sdslen(obj
->ptr
)) == -1) goto writeerr
;
2645 if (fwrite(out
,comprlen
,1,fp
) == 0) goto writeerr
;
2654 /* Save a string objet as [len][data] on disk. If the object is a string
2655 * representation of an integer value we try to safe it in a special form */
2656 static int rdbSaveStringObjectRaw(FILE *fp
, robj
*obj
) {
2660 len
= sdslen(obj
->ptr
);
2662 /* Try integer encoding */
2664 unsigned char buf
[5];
2665 if ((enclen
= rdbTryIntegerEncoding(obj
->ptr
,buf
)) > 0) {
2666 if (fwrite(buf
,enclen
,1,fp
) == 0) return -1;
2671 /* Try LZF compression - under 20 bytes it's unable to compress even
2672 * aaaaaaaaaaaaaaaaaa so skip it */
2673 if (server
.rdbcompression
&& len
> 20) {
2676 retval
= rdbSaveLzfStringObject(fp
,obj
);
2677 if (retval
== -1) return -1;
2678 if (retval
> 0) return 0;
2679 /* retval == 0 means data can't be compressed, save the old way */
2682 /* Store verbatim */
2683 if (rdbSaveLen(fp
,len
) == -1) return -1;
2684 if (len
&& fwrite(obj
->ptr
,len
,1,fp
) == 0) return -1;
2688 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2689 static int rdbSaveStringObject(FILE *fp
, robj
*obj
) {
2692 obj
= getDecodedObject(obj
);
2693 retval
= rdbSaveStringObjectRaw(fp
,obj
);
2698 /* Save a double value. Doubles are saved as strings prefixed by an unsigned
2699 * 8 bit integer specifing the length of the representation.
2700 * This 8 bit integer has special values in order to specify the following
2706 static int rdbSaveDoubleValue(FILE *fp
, double val
) {
2707 unsigned char buf
[128];
2713 } else if (!isfinite(val
)) {
2715 buf
[0] = (val
< 0) ? 255 : 254;
2717 snprintf((char*)buf
+1,sizeof(buf
)-1,"%.17g",val
);
2718 buf
[0] = strlen((char*)buf
+1);
2721 if (fwrite(buf
,len
,1,fp
) == 0) return -1;
2725 /* Save a Redis object. */
2726 static int rdbSaveObject(FILE *fp
, robj
*o
) {
2727 if (o
->type
== REDIS_STRING
) {
2728 /* Save a string value */
2729 if (rdbSaveStringObject(fp
,o
) == -1) return -1;
2730 } else if (o
->type
== REDIS_LIST
) {
2731 /* Save a list value */
2732 list
*list
= o
->ptr
;
2736 if (rdbSaveLen(fp
,listLength(list
)) == -1) return -1;
2737 while((ln
= listYield(list
))) {
2738 robj
*eleobj
= listNodeValue(ln
);
2740 if (rdbSaveStringObject(fp
,eleobj
) == -1) return -1;
2742 } else if (o
->type
== REDIS_SET
) {
2743 /* Save a set value */
2745 dictIterator
*di
= dictGetIterator(set
);
2748 if (rdbSaveLen(fp
,dictSize(set
)) == -1) return -1;
2749 while((de
= dictNext(di
)) != NULL
) {
2750 robj
*eleobj
= dictGetEntryKey(de
);
2752 if (rdbSaveStringObject(fp
,eleobj
) == -1) return -1;
2754 dictReleaseIterator(di
);
2755 } else if (o
->type
== REDIS_ZSET
) {
2756 /* Save a set value */
2758 dictIterator
*di
= dictGetIterator(zs
->dict
);
2761 if (rdbSaveLen(fp
,dictSize(zs
->dict
)) == -1) return -1;
2762 while((de
= dictNext(di
)) != NULL
) {
2763 robj
*eleobj
= dictGetEntryKey(de
);
2764 double *score
= dictGetEntryVal(de
);
2766 if (rdbSaveStringObject(fp
,eleobj
) == -1) return -1;
2767 if (rdbSaveDoubleValue(fp
,*score
) == -1) return -1;
2769 dictReleaseIterator(di
);
2771 redisAssert(0 != 0);
2776 /* Return the length the object will have on disk if saved with
2777 * the rdbSaveObject() function. Currently we use a trick to get
2778 * this length with very little changes to the code. In the future
2779 * we could switch to a faster solution. */
2780 static off_t
rdbSavedObjectLen(robj
*o
) {
2781 static FILE *fp
= NULL
;
2783 if (fp
== NULL
) fp
= fopen("/dev/null","w");
2787 assert(rdbSaveObject(fp
,o
) != 1);
2791 /* Return the number of pages required to save this object in the swap file */
2792 static off_t
rdbSavedObjectPages(robj
*o
) {
2793 off_t bytes
= rdbSavedObjectLen(o
);
2795 return (bytes
+(server
.vm_page_size
-1))/server
.vm_page_size
;
2798 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2799 static int rdbSave(char *filename
) {
2800 dictIterator
*di
= NULL
;
2805 time_t now
= time(NULL
);
2807 snprintf(tmpfile
,256,"temp-%d.rdb", (int) getpid());
2808 fp
= fopen(tmpfile
,"w");
2810 redisLog(REDIS_WARNING
, "Failed saving the DB: %s", strerror(errno
));
2813 if (fwrite("REDIS0001",9,1,fp
) == 0) goto werr
;
2814 for (j
= 0; j
< server
.dbnum
; j
++) {
2815 redisDb
*db
= server
.db
+j
;
2817 if (dictSize(d
) == 0) continue;
2818 di
= dictGetIterator(d
);
2824 /* Write the SELECT DB opcode */
2825 if (rdbSaveType(fp
,REDIS_SELECTDB
) == -1) goto werr
;
2826 if (rdbSaveLen(fp
,j
) == -1) goto werr
;
2828 /* Iterate this DB writing every entry */
2829 while((de
= dictNext(di
)) != NULL
) {
2830 robj
*key
= dictGetEntryKey(de
);
2831 robj
*o
= dictGetEntryVal(de
);
2832 time_t expiretime
= getExpire(db
,key
);
2834 /* Save the expire time */
2835 if (expiretime
!= -1) {
2836 /* If this key is already expired skip it */
2837 if (expiretime
< now
) continue;
2838 if (rdbSaveType(fp
,REDIS_EXPIRETIME
) == -1) goto werr
;
2839 if (rdbSaveTime(fp
,expiretime
) == -1) goto werr
;
2841 /* Save the key and associated value */
2842 if (rdbSaveType(fp
,o
->type
) == -1) goto werr
;
2843 if (rdbSaveStringObject(fp
,key
) == -1) goto werr
;
2844 /* Save the actual value */
2845 if (rdbSaveObject(fp
,o
) == -1) goto werr
;
2847 dictReleaseIterator(di
);
2850 if (rdbSaveType(fp
,REDIS_EOF
) == -1) goto werr
;
2852 /* Make sure data will not remain on the OS's output buffers */
2857 /* Use RENAME to make sure the DB file is changed atomically only
2858 * if the generate DB file is ok. */
2859 if (rename(tmpfile
,filename
) == -1) {
2860 redisLog(REDIS_WARNING
,"Error moving temp DB file on the final destination: %s", strerror(errno
));
2864 redisLog(REDIS_NOTICE
,"DB saved on disk");
2866 server
.lastsave
= time(NULL
);
2872 redisLog(REDIS_WARNING
,"Write error saving DB on disk: %s", strerror(errno
));
2873 if (di
) dictReleaseIterator(di
);
2877 static int rdbSaveBackground(char *filename
) {
2880 if (server
.bgsavechildpid
!= -1) return REDIS_ERR
;
2881 if ((childpid
= fork()) == 0) {
2884 if (rdbSave(filename
) == REDIS_OK
) {
2891 if (childpid
== -1) {
2892 redisLog(REDIS_WARNING
,"Can't save in background: fork: %s",
2896 redisLog(REDIS_NOTICE
,"Background saving started by pid %d",childpid
);
2897 server
.bgsavechildpid
= childpid
;
2900 return REDIS_OK
; /* unreached */
2903 static void rdbRemoveTempFile(pid_t childpid
) {
2906 snprintf(tmpfile
,256,"temp-%d.rdb", (int) childpid
);
2910 static int rdbLoadType(FILE *fp
) {
2912 if (fread(&type
,1,1,fp
) == 0) return -1;
2916 static time_t rdbLoadTime(FILE *fp
) {
2918 if (fread(&t32
,4,1,fp
) == 0) return -1;
2919 return (time_t) t32
;
2922 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2923 * of this file for a description of how this are stored on disk.
2925 * isencoded is set to 1 if the readed length is not actually a length but
2926 * an "encoding type", check the above comments for more info */
2927 static uint32_t rdbLoadLen(FILE *fp
, int *isencoded
) {
2928 unsigned char buf
[2];
2932 if (isencoded
) *isencoded
= 0;
2933 if (fread(buf
,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2934 type
= (buf
[0]&0xC0)>>6;
2935 if (type
== REDIS_RDB_6BITLEN
) {
2936 /* Read a 6 bit len */
2938 } else if (type
== REDIS_RDB_ENCVAL
) {
2939 /* Read a 6 bit len encoding type */
2940 if (isencoded
) *isencoded
= 1;
2942 } else if (type
== REDIS_RDB_14BITLEN
) {
2943 /* Read a 14 bit len */
2944 if (fread(buf
+1,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2945 return ((buf
[0]&0x3F)<<8)|buf
[1];
2947 /* Read a 32 bit len */
2948 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
2953 static robj
*rdbLoadIntegerObject(FILE *fp
, int enctype
) {
2954 unsigned char enc
[4];
2957 if (enctype
== REDIS_RDB_ENC_INT8
) {
2958 if (fread(enc
,1,1,fp
) == 0) return NULL
;
2959 val
= (signed char)enc
[0];
2960 } else if (enctype
== REDIS_RDB_ENC_INT16
) {
2962 if (fread(enc
,2,1,fp
) == 0) return NULL
;
2963 v
= enc
[0]|(enc
[1]<<8);
2965 } else if (enctype
== REDIS_RDB_ENC_INT32
) {
2967 if (fread(enc
,4,1,fp
) == 0) return NULL
;
2968 v
= enc
[0]|(enc
[1]<<8)|(enc
[2]<<16)|(enc
[3]<<24);
2971 val
= 0; /* anti-warning */
2974 return createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",val
));
2977 static robj
*rdbLoadLzfStringObject(FILE*fp
) {
2978 unsigned int len
, clen
;
2979 unsigned char *c
= NULL
;
2982 if ((clen
= rdbLoadLen(fp
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2983 if ((len
= rdbLoadLen(fp
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2984 if ((c
= zmalloc(clen
)) == NULL
) goto err
;
2985 if ((val
= sdsnewlen(NULL
,len
)) == NULL
) goto err
;
2986 if (fread(c
,clen
,1,fp
) == 0) goto err
;
2987 if (lzf_decompress(c
,clen
,val
,len
) == 0) goto err
;
2989 return createObject(REDIS_STRING
,val
);
2996 static robj
*rdbLoadStringObject(FILE*fp
) {
3001 len
= rdbLoadLen(fp
,&isencoded
);
3004 case REDIS_RDB_ENC_INT8
:
3005 case REDIS_RDB_ENC_INT16
:
3006 case REDIS_RDB_ENC_INT32
:
3007 return tryObjectSharing(rdbLoadIntegerObject(fp
,len
));
3008 case REDIS_RDB_ENC_LZF
:
3009 return tryObjectSharing(rdbLoadLzfStringObject(fp
));
3015 if (len
== REDIS_RDB_LENERR
) return NULL
;
3016 val
= sdsnewlen(NULL
,len
);
3017 if (len
&& fread(val
,len
,1,fp
) == 0) {
3021 return tryObjectSharing(createObject(REDIS_STRING
,val
));
3024 /* For information about double serialization check rdbSaveDoubleValue() */
3025 static int rdbLoadDoubleValue(FILE *fp
, double *val
) {
3029 if (fread(&len
,1,1,fp
) == 0) return -1;
3031 case 255: *val
= R_NegInf
; return 0;
3032 case 254: *val
= R_PosInf
; return 0;
3033 case 253: *val
= R_Nan
; return 0;
3035 if (fread(buf
,len
,1,fp
) == 0) return -1;
3037 sscanf(buf
, "%lg", val
);
3042 /* Load a Redis object of the specified type from the specified file.
3043 * On success a newly allocated object is returned, otherwise NULL. */
3044 static robj
*rdbLoadObject(int type
, FILE *fp
) {
3047 if (type
== REDIS_STRING
) {
3048 /* Read string value */
3049 if ((o
= rdbLoadStringObject(fp
)) == NULL
) return NULL
;
3050 tryObjectEncoding(o
);
3051 } else if (type
== REDIS_LIST
|| type
== REDIS_SET
) {
3052 /* Read list/set value */
3055 if ((listlen
= rdbLoadLen(fp
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
3056 o
= (type
== REDIS_LIST
) ? createListObject() : createSetObject();
3057 /* Load every single element of the list/set */
3061 if ((ele
= rdbLoadStringObject(fp
)) == NULL
) return NULL
;
3062 tryObjectEncoding(ele
);
3063 if (type
== REDIS_LIST
) {
3064 listAddNodeTail((list
*)o
->ptr
,ele
);
3066 dictAdd((dict
*)o
->ptr
,ele
,NULL
);
3069 } else if (type
== REDIS_ZSET
) {
3070 /* Read list/set value */
3074 if ((zsetlen
= rdbLoadLen(fp
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
3075 o
= createZsetObject();
3077 /* Load every single element of the list/set */
3080 double *score
= zmalloc(sizeof(double));
3082 if ((ele
= rdbLoadStringObject(fp
)) == NULL
) return NULL
;
3083 tryObjectEncoding(ele
);
3084 if (rdbLoadDoubleValue(fp
,score
) == -1) return NULL
;
3085 dictAdd(zs
->dict
,ele
,score
);
3086 zslInsert(zs
->zsl
,*score
,ele
);
3087 incrRefCount(ele
); /* added to skiplist */
3090 redisAssert(0 != 0);
3095 static int rdbLoad(char *filename
) {
3097 robj
*keyobj
= NULL
;
3099 int type
, retval
, rdbver
;
3100 dict
*d
= server
.db
[0].dict
;
3101 redisDb
*db
= server
.db
+0;
3103 time_t expiretime
= -1, now
= time(NULL
);
3105 fp
= fopen(filename
,"r");
3106 if (!fp
) return REDIS_ERR
;
3107 if (fread(buf
,9,1,fp
) == 0) goto eoferr
;
3109 if (memcmp(buf
,"REDIS",5) != 0) {
3111 redisLog(REDIS_WARNING
,"Wrong signature trying to load DB from file");
3114 rdbver
= atoi(buf
+5);
3117 redisLog(REDIS_WARNING
,"Can't handle RDB format version %d",rdbver
);
3124 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
3125 if (type
== REDIS_EXPIRETIME
) {
3126 if ((expiretime
= rdbLoadTime(fp
)) == -1) goto eoferr
;
3127 /* We read the time so we need to read the object type again */
3128 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
3130 if (type
== REDIS_EOF
) break;
3131 /* Handle SELECT DB opcode as a special case */
3132 if (type
== REDIS_SELECTDB
) {
3133 if ((dbid
= rdbLoadLen(fp
,NULL
)) == REDIS_RDB_LENERR
)
3135 if (dbid
>= (unsigned)server
.dbnum
) {
3136 redisLog(REDIS_WARNING
,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server
.dbnum
);
3139 db
= server
.db
+dbid
;
3144 if ((keyobj
= rdbLoadStringObject(fp
)) == NULL
) goto eoferr
;
3146 if ((o
= rdbLoadObject(type
,fp
)) == NULL
) goto eoferr
;
3147 /* Add the new object in the hash table */
3148 retval
= dictAdd(d
,keyobj
,o
);
3149 if (retval
== DICT_ERR
) {
3150 redisLog(REDIS_WARNING
,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj
->ptr
);
3153 /* Set the expire time if needed */
3154 if (expiretime
!= -1) {
3155 setExpire(db
,keyobj
,expiretime
);
3156 /* Delete this key if already expired */
3157 if (expiretime
< now
) deleteKey(db
,keyobj
);
3165 eoferr
: /* unexpected end of file is handled here with a fatal exit */
3166 if (keyobj
) decrRefCount(keyobj
);
3167 redisLog(REDIS_WARNING
,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
3169 return REDIS_ERR
; /* Just to avoid warning */
3172 /*================================== Commands =============================== */
3174 static void authCommand(redisClient
*c
) {
3175 if (!server
.requirepass
|| !strcmp(c
->argv
[1]->ptr
, server
.requirepass
)) {
3176 c
->authenticated
= 1;
3177 addReply(c
,shared
.ok
);
3179 c
->authenticated
= 0;
3180 addReplySds(c
,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
3184 static void pingCommand(redisClient
*c
) {
3185 addReply(c
,shared
.pong
);
3188 static void echoCommand(redisClient
*c
) {
3189 addReplyBulkLen(c
,c
->argv
[1]);
3190 addReply(c
,c
->argv
[1]);
3191 addReply(c
,shared
.crlf
);
3194 /*=================================== Strings =============================== */
3196 static void setGenericCommand(redisClient
*c
, int nx
) {
3199 if (nx
) deleteIfVolatile(c
->db
,c
->argv
[1]);
3200 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
3201 if (retval
== DICT_ERR
) {
3203 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
3204 incrRefCount(c
->argv
[2]);
3206 addReply(c
,shared
.czero
);
3210 incrRefCount(c
->argv
[1]);
3211 incrRefCount(c
->argv
[2]);
3214 removeExpire(c
->db
,c
->argv
[1]);
3215 addReply(c
, nx
? shared
.cone
: shared
.ok
);
3218 static void setCommand(redisClient
*c
) {
3219 setGenericCommand(c
,0);
3222 static void setnxCommand(redisClient
*c
) {
3223 setGenericCommand(c
,1);
3226 static int getGenericCommand(redisClient
*c
) {
3227 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3230 addReply(c
,shared
.nullbulk
);
3233 if (o
->type
!= REDIS_STRING
) {
3234 addReply(c
,shared
.wrongtypeerr
);
3237 addReplyBulkLen(c
,o
);
3239 addReply(c
,shared
.crlf
);
3245 static void getCommand(redisClient
*c
) {
3246 getGenericCommand(c
);
3249 static void getsetCommand(redisClient
*c
) {
3250 if (getGenericCommand(c
) == REDIS_ERR
) return;
3251 if (dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]) == DICT_ERR
) {
3252 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
3254 incrRefCount(c
->argv
[1]);
3256 incrRefCount(c
->argv
[2]);
3258 removeExpire(c
->db
,c
->argv
[1]);
3261 static void mgetCommand(redisClient
*c
) {
3264 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",c
->argc
-1));
3265 for (j
= 1; j
< c
->argc
; j
++) {
3266 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[j
]);
3268 addReply(c
,shared
.nullbulk
);
3270 if (o
->type
!= REDIS_STRING
) {
3271 addReply(c
,shared
.nullbulk
);
3273 addReplyBulkLen(c
,o
);
3275 addReply(c
,shared
.crlf
);
3281 static void msetGenericCommand(redisClient
*c
, int nx
) {
3282 int j
, busykeys
= 0;
3284 if ((c
->argc
% 2) == 0) {
3285 addReplySds(c
,sdsnew("-ERR wrong number of arguments for MSET\r\n"));
3288 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
3289 * set nothing at all if at least one already key exists. */
3291 for (j
= 1; j
< c
->argc
; j
+= 2) {
3292 if (lookupKeyWrite(c
->db
,c
->argv
[j
]) != NULL
) {
3298 addReply(c
, shared
.czero
);
3302 for (j
= 1; j
< c
->argc
; j
+= 2) {
3305 tryObjectEncoding(c
->argv
[j
+1]);
3306 retval
= dictAdd(c
->db
->dict
,c
->argv
[j
],c
->argv
[j
+1]);
3307 if (retval
== DICT_ERR
) {
3308 dictReplace(c
->db
->dict
,c
->argv
[j
],c
->argv
[j
+1]);
3309 incrRefCount(c
->argv
[j
+1]);
3311 incrRefCount(c
->argv
[j
]);
3312 incrRefCount(c
->argv
[j
+1]);
3314 removeExpire(c
->db
,c
->argv
[j
]);
3316 server
.dirty
+= (c
->argc
-1)/2;
3317 addReply(c
, nx
? shared
.cone
: shared
.ok
);
3320 static void msetCommand(redisClient
*c
) {
3321 msetGenericCommand(c
,0);
3324 static void msetnxCommand(redisClient
*c
) {
3325 msetGenericCommand(c
,1);
3328 static void incrDecrCommand(redisClient
*c
, long long incr
) {
3333 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3337 if (o
->type
!= REDIS_STRING
) {
3342 if (o
->encoding
== REDIS_ENCODING_RAW
)
3343 value
= strtoll(o
->ptr
, &eptr
, 10);
3344 else if (o
->encoding
== REDIS_ENCODING_INT
)
3345 value
= (long)o
->ptr
;
3347 redisAssert(1 != 1);
3352 o
= createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",value
));
3353 tryObjectEncoding(o
);
3354 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],o
);
3355 if (retval
== DICT_ERR
) {
3356 dictReplace(c
->db
->dict
,c
->argv
[1],o
);
3357 removeExpire(c
->db
,c
->argv
[1]);
3359 incrRefCount(c
->argv
[1]);
3362 addReply(c
,shared
.colon
);
3364 addReply(c
,shared
.crlf
);
3367 static void incrCommand(redisClient
*c
) {
3368 incrDecrCommand(c
,1);
3371 static void decrCommand(redisClient
*c
) {
3372 incrDecrCommand(c
,-1);
3375 static void incrbyCommand(redisClient
*c
) {
3376 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
3377 incrDecrCommand(c
,incr
);
3380 static void decrbyCommand(redisClient
*c
) {
3381 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
3382 incrDecrCommand(c
,-incr
);
3385 /* ========================= Type agnostic commands ========================= */
3387 static void delCommand(redisClient
*c
) {
3390 for (j
= 1; j
< c
->argc
; j
++) {
3391 if (deleteKey(c
->db
,c
->argv
[j
])) {
3398 addReply(c
,shared
.czero
);
3401 addReply(c
,shared
.cone
);
3404 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",deleted
));
3409 static void existsCommand(redisClient
*c
) {
3410 addReply(c
,lookupKeyRead(c
->db
,c
->argv
[1]) ? shared
.cone
: shared
.czero
);
3413 static void selectCommand(redisClient
*c
) {
3414 int id
= atoi(c
->argv
[1]->ptr
);
3416 if (selectDb(c
,id
) == REDIS_ERR
) {
3417 addReplySds(c
,sdsnew("-ERR invalid DB index\r\n"));
3419 addReply(c
,shared
.ok
);
3423 static void randomkeyCommand(redisClient
*c
) {
3427 de
= dictGetRandomKey(c
->db
->dict
);
3428 if (!de
|| expireIfNeeded(c
->db
,dictGetEntryKey(de
)) == 0) break;
3431 addReply(c
,shared
.plus
);
3432 addReply(c
,shared
.crlf
);
3434 addReply(c
,shared
.plus
);
3435 addReply(c
,dictGetEntryKey(de
));
3436 addReply(c
,shared
.crlf
);
3440 static void keysCommand(redisClient
*c
) {
3443 sds pattern
= c
->argv
[1]->ptr
;
3444 int plen
= sdslen(pattern
);
3445 unsigned long numkeys
= 0, keyslen
= 0;
3446 robj
*lenobj
= createObject(REDIS_STRING
,NULL
);
3448 di
= dictGetIterator(c
->db
->dict
);
3450 decrRefCount(lenobj
);
3451 while((de
= dictNext(di
)) != NULL
) {
3452 robj
*keyobj
= dictGetEntryKey(de
);
3454 sds key
= keyobj
->ptr
;
3455 if ((pattern
[0] == '*' && pattern
[1] == '\0') ||
3456 stringmatchlen(pattern
,plen
,key
,sdslen(key
),0)) {
3457 if (expireIfNeeded(c
->db
,keyobj
) == 0) {
3459 addReply(c
,shared
.space
);
3462 keyslen
+= sdslen(key
);
3466 dictReleaseIterator(di
);
3467 lenobj
->ptr
= sdscatprintf(sdsempty(),"$%lu\r\n",keyslen
+(numkeys
? (numkeys
-1) : 0));
3468 addReply(c
,shared
.crlf
);
3471 static void dbsizeCommand(redisClient
*c
) {
3473 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c
->db
->dict
)));
3476 static void lastsaveCommand(redisClient
*c
) {
3478 sdscatprintf(sdsempty(),":%lu\r\n",server
.lastsave
));
3481 static void typeCommand(redisClient
*c
) {
3485 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3490 case REDIS_STRING
: type
= "+string"; break;
3491 case REDIS_LIST
: type
= "+list"; break;
3492 case REDIS_SET
: type
= "+set"; break;
3493 case REDIS_ZSET
: type
= "+zset"; break;
3494 default: type
= "unknown"; break;
3497 addReplySds(c
,sdsnew(type
));
3498 addReply(c
,shared
.crlf
);
3501 static void saveCommand(redisClient
*c
) {
3502 if (server
.bgsavechildpid
!= -1) {
3503 addReplySds(c
,sdsnew("-ERR background save in progress\r\n"));
3506 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
3507 addReply(c
,shared
.ok
);
3509 addReply(c
,shared
.err
);
3513 static void bgsaveCommand(redisClient
*c
) {
3514 if (server
.bgsavechildpid
!= -1) {
3515 addReplySds(c
,sdsnew("-ERR background save already in progress\r\n"));
3518 if (rdbSaveBackground(server
.dbfilename
) == REDIS_OK
) {
3519 char *status
= "+Background saving started\r\n";
3520 addReplySds(c
,sdsnew(status
));
3522 addReply(c
,shared
.err
);
3526 static void shutdownCommand(redisClient
*c
) {
3527 redisLog(REDIS_WARNING
,"User requested shutdown, saving DB...");
3528 /* Kill the saving child if there is a background saving in progress.
3529 We want to avoid race conditions, for instance our saving child may
3530 overwrite the synchronous saving did by SHUTDOWN. */
3531 if (server
.bgsavechildpid
!= -1) {
3532 redisLog(REDIS_WARNING
,"There is a live saving child. Killing it!");
3533 kill(server
.bgsavechildpid
,SIGKILL
);
3534 rdbRemoveTempFile(server
.bgsavechildpid
);
3536 if (server
.appendonly
) {
3537 /* Append only file: fsync() the AOF and exit */
3538 fsync(server
.appendfd
);
3541 /* Snapshotting. Perform a SYNC SAVE and exit */
3542 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
3543 if (server
.daemonize
)
3544 unlink(server
.pidfile
);
3545 redisLog(REDIS_WARNING
,"%zu bytes used at exit",zmalloc_used_memory());
3546 redisLog(REDIS_WARNING
,"Server exit now, bye bye...");
3549 /* Ooops.. error saving! The best we can do is to continue operating.
3550 * Note that if there was a background saving process, in the next
3551 * cron() Redis will be notified that the background saving aborted,
3552 * handling special stuff like slaves pending for synchronization... */
3553 redisLog(REDIS_WARNING
,"Error trying to save the DB, can't exit");
3554 addReplySds(c
,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3559 static void renameGenericCommand(redisClient
*c
, int nx
) {
3562 /* To use the same key as src and dst is probably an error */
3563 if (sdscmp(c
->argv
[1]->ptr
,c
->argv
[2]->ptr
) == 0) {
3564 addReply(c
,shared
.sameobjecterr
);
3568 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3570 addReply(c
,shared
.nokeyerr
);
3574 deleteIfVolatile(c
->db
,c
->argv
[2]);
3575 if (dictAdd(c
->db
->dict
,c
->argv
[2],o
) == DICT_ERR
) {
3578 addReply(c
,shared
.czero
);
3581 dictReplace(c
->db
->dict
,c
->argv
[2],o
);
3583 incrRefCount(c
->argv
[2]);
3585 deleteKey(c
->db
,c
->argv
[1]);
3587 addReply(c
,nx
? shared
.cone
: shared
.ok
);
3590 static void renameCommand(redisClient
*c
) {
3591 renameGenericCommand(c
,0);
3594 static void renamenxCommand(redisClient
*c
) {
3595 renameGenericCommand(c
,1);
3598 static void moveCommand(redisClient
*c
) {
3603 /* Obtain source and target DB pointers */
3606 if (selectDb(c
,atoi(c
->argv
[2]->ptr
)) == REDIS_ERR
) {
3607 addReply(c
,shared
.outofrangeerr
);
3611 selectDb(c
,srcid
); /* Back to the source DB */
3613 /* If the user is moving using as target the same
3614 * DB as the source DB it is probably an error. */
3616 addReply(c
,shared
.sameobjecterr
);
3620 /* Check if the element exists and get a reference */
3621 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3623 addReply(c
,shared
.czero
);
3627 /* Try to add the element to the target DB */
3628 deleteIfVolatile(dst
,c
->argv
[1]);
3629 if (dictAdd(dst
->dict
,c
->argv
[1],o
) == DICT_ERR
) {
3630 addReply(c
,shared
.czero
);
3633 incrRefCount(c
->argv
[1]);
3636 /* OK! key moved, free the entry in the source DB */
3637 deleteKey(src
,c
->argv
[1]);
3639 addReply(c
,shared
.cone
);
3642 /* =================================== Lists ================================ */
3643 static void pushGenericCommand(redisClient
*c
, int where
) {
3647 lobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3649 if (handleClientsWaitingListPush(c
,c
->argv
[1],c
->argv
[2])) {
3650 addReply(c
,shared
.ok
);
3653 lobj
= createListObject();
3655 if (where
== REDIS_HEAD
) {
3656 listAddNodeHead(list
,c
->argv
[2]);
3658 listAddNodeTail(list
,c
->argv
[2]);
3660 dictAdd(c
->db
->dict
,c
->argv
[1],lobj
);
3661 incrRefCount(c
->argv
[1]);
3662 incrRefCount(c
->argv
[2]);
3664 if (lobj
->type
!= REDIS_LIST
) {
3665 addReply(c
,shared
.wrongtypeerr
);
3668 if (handleClientsWaitingListPush(c
,c
->argv
[1],c
->argv
[2])) {
3669 addReply(c
,shared
.ok
);
3673 if (where
== REDIS_HEAD
) {
3674 listAddNodeHead(list
,c
->argv
[2]);
3676 listAddNodeTail(list
,c
->argv
[2]);
3678 incrRefCount(c
->argv
[2]);
3681 addReply(c
,shared
.ok
);
3684 static void lpushCommand(redisClient
*c
) {
3685 pushGenericCommand(c
,REDIS_HEAD
);
3688 static void rpushCommand(redisClient
*c
) {
3689 pushGenericCommand(c
,REDIS_TAIL
);
3692 static void llenCommand(redisClient
*c
) {
3696 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3698 addReply(c
,shared
.czero
);
3701 if (o
->type
!= REDIS_LIST
) {
3702 addReply(c
,shared
.wrongtypeerr
);
3705 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",listLength(l
)));
3710 static void lindexCommand(redisClient
*c
) {
3712 int index
= atoi(c
->argv
[2]->ptr
);
3714 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3716 addReply(c
,shared
.nullbulk
);
3718 if (o
->type
!= REDIS_LIST
) {
3719 addReply(c
,shared
.wrongtypeerr
);
3721 list
*list
= o
->ptr
;
3724 ln
= listIndex(list
, index
);
3726 addReply(c
,shared
.nullbulk
);
3728 robj
*ele
= listNodeValue(ln
);
3729 addReplyBulkLen(c
,ele
);
3731 addReply(c
,shared
.crlf
);
3737 static void lsetCommand(redisClient
*c
) {
3739 int index
= atoi(c
->argv
[2]->ptr
);
3741 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3743 addReply(c
,shared
.nokeyerr
);
3745 if (o
->type
!= REDIS_LIST
) {
3746 addReply(c
,shared
.wrongtypeerr
);
3748 list
*list
= o
->ptr
;
3751 ln
= listIndex(list
, index
);
3753 addReply(c
,shared
.outofrangeerr
);
3755 robj
*ele
= listNodeValue(ln
);
3758 listNodeValue(ln
) = c
->argv
[3];
3759 incrRefCount(c
->argv
[3]);
3760 addReply(c
,shared
.ok
);
3767 static void popGenericCommand(redisClient
*c
, int where
) {
3770 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3772 addReply(c
,shared
.nullbulk
);
3774 if (o
->type
!= REDIS_LIST
) {
3775 addReply(c
,shared
.wrongtypeerr
);
3777 list
*list
= o
->ptr
;
3780 if (where
== REDIS_HEAD
)
3781 ln
= listFirst(list
);
3783 ln
= listLast(list
);
3786 addReply(c
,shared
.nullbulk
);
3788 robj
*ele
= listNodeValue(ln
);
3789 addReplyBulkLen(c
,ele
);
3791 addReply(c
,shared
.crlf
);
3792 listDelNode(list
,ln
);
3799 static void lpopCommand(redisClient
*c
) {
3800 popGenericCommand(c
,REDIS_HEAD
);
3803 static void rpopCommand(redisClient
*c
) {
3804 popGenericCommand(c
,REDIS_TAIL
);
3807 static void lrangeCommand(redisClient
*c
) {
3809 int start
= atoi(c
->argv
[2]->ptr
);
3810 int end
= atoi(c
->argv
[3]->ptr
);
3812 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3814 addReply(c
,shared
.nullmultibulk
);
3816 if (o
->type
!= REDIS_LIST
) {
3817 addReply(c
,shared
.wrongtypeerr
);
3819 list
*list
= o
->ptr
;
3821 int llen
= listLength(list
);
3825 /* convert negative indexes */
3826 if (start
< 0) start
= llen
+start
;
3827 if (end
< 0) end
= llen
+end
;
3828 if (start
< 0) start
= 0;
3829 if (end
< 0) end
= 0;
3831 /* indexes sanity checks */
3832 if (start
> end
|| start
>= llen
) {
3833 /* Out of range start or start > end result in empty list */
3834 addReply(c
,shared
.emptymultibulk
);
3837 if (end
>= llen
) end
= llen
-1;
3838 rangelen
= (end
-start
)+1;
3840 /* Return the result in form of a multi-bulk reply */
3841 ln
= listIndex(list
, start
);
3842 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",rangelen
));
3843 for (j
= 0; j
< rangelen
; j
++) {
3844 ele
= listNodeValue(ln
);
3845 addReplyBulkLen(c
,ele
);
3847 addReply(c
,shared
.crlf
);
3854 static void ltrimCommand(redisClient
*c
) {
3856 int start
= atoi(c
->argv
[2]->ptr
);
3857 int end
= atoi(c
->argv
[3]->ptr
);
3859 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3861 addReply(c
,shared
.ok
);
3863 if (o
->type
!= REDIS_LIST
) {
3864 addReply(c
,shared
.wrongtypeerr
);
3866 list
*list
= o
->ptr
;
3868 int llen
= listLength(list
);
3869 int j
, ltrim
, rtrim
;
3871 /* convert negative indexes */
3872 if (start
< 0) start
= llen
+start
;
3873 if (end
< 0) end
= llen
+end
;
3874 if (start
< 0) start
= 0;
3875 if (end
< 0) end
= 0;
3877 /* indexes sanity checks */
3878 if (start
> end
|| start
>= llen
) {
3879 /* Out of range start or start > end result in empty list */
3883 if (end
>= llen
) end
= llen
-1;
3888 /* Remove list elements to perform the trim */
3889 for (j
= 0; j
< ltrim
; j
++) {
3890 ln
= listFirst(list
);
3891 listDelNode(list
,ln
);
3893 for (j
= 0; j
< rtrim
; j
++) {
3894 ln
= listLast(list
);
3895 listDelNode(list
,ln
);
3898 addReply(c
,shared
.ok
);
3903 static void lremCommand(redisClient
*c
) {
3906 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3908 addReply(c
,shared
.czero
);
3910 if (o
->type
!= REDIS_LIST
) {
3911 addReply(c
,shared
.wrongtypeerr
);
3913 list
*list
= o
->ptr
;
3914 listNode
*ln
, *next
;
3915 int toremove
= atoi(c
->argv
[2]->ptr
);
3920 toremove
= -toremove
;
3923 ln
= fromtail
? list
->tail
: list
->head
;
3925 robj
*ele
= listNodeValue(ln
);
3927 next
= fromtail
? ln
->prev
: ln
->next
;
3928 if (compareStringObjects(ele
,c
->argv
[3]) == 0) {
3929 listDelNode(list
,ln
);
3932 if (toremove
&& removed
== toremove
) break;
3936 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",removed
));
3941 /* This is the semantic of this command:
3942 * RPOPLPUSH srclist dstlist:
3943 * IF LLEN(srclist) > 0
3944 * element = RPOP srclist
3945 * LPUSH dstlist element
3952 * The idea is to be able to get an element from a list in a reliable way
3953 * since the element is not just returned but pushed against another list
3954 * as well. This command was originally proposed by Ezra Zygmuntowicz.
3956 static void rpoplpushcommand(redisClient
*c
) {
3959 sobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3961 addReply(c
,shared
.nullbulk
);
3963 if (sobj
->type
!= REDIS_LIST
) {
3964 addReply(c
,shared
.wrongtypeerr
);
3966 list
*srclist
= sobj
->ptr
;
3967 listNode
*ln
= listLast(srclist
);
3970 addReply(c
,shared
.nullbulk
);
3972 robj
*dobj
= lookupKeyWrite(c
->db
,c
->argv
[2]);
3973 robj
*ele
= listNodeValue(ln
);
3976 if (dobj
&& dobj
->type
!= REDIS_LIST
) {
3977 addReply(c
,shared
.wrongtypeerr
);
3981 /* Add the element to the target list (unless it's directly
3982 * passed to some BLPOP-ing client */
3983 if (!handleClientsWaitingListPush(c
,c
->argv
[2],ele
)) {
3985 /* Create the list if the key does not exist */
3986 dobj
= createListObject();
3987 dictAdd(c
->db
->dict
,c
->argv
[2],dobj
);
3988 incrRefCount(c
->argv
[2]);
3990 dstlist
= dobj
->ptr
;
3991 listAddNodeHead(dstlist
,ele
);
3995 /* Send the element to the client as reply as well */
3996 addReplyBulkLen(c
,ele
);
3998 addReply(c
,shared
.crlf
);
4000 /* Finally remove the element from the source list */
4001 listDelNode(srclist
,ln
);
4009 /* ==================================== Sets ================================ */
4011 static void saddCommand(redisClient
*c
) {
4014 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4016 set
= createSetObject();
4017 dictAdd(c
->db
->dict
,c
->argv
[1],set
);
4018 incrRefCount(c
->argv
[1]);
4020 if (set
->type
!= REDIS_SET
) {
4021 addReply(c
,shared
.wrongtypeerr
);
4025 if (dictAdd(set
->ptr
,c
->argv
[2],NULL
) == DICT_OK
) {
4026 incrRefCount(c
->argv
[2]);
4028 addReply(c
,shared
.cone
);
4030 addReply(c
,shared
.czero
);
4034 static void sremCommand(redisClient
*c
) {
4037 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4039 addReply(c
,shared
.czero
);
4041 if (set
->type
!= REDIS_SET
) {
4042 addReply(c
,shared
.wrongtypeerr
);
4045 if (dictDelete(set
->ptr
,c
->argv
[2]) == DICT_OK
) {
4047 if (htNeedsResize(set
->ptr
)) dictResize(set
->ptr
);
4048 addReply(c
,shared
.cone
);
4050 addReply(c
,shared
.czero
);
4055 static void smoveCommand(redisClient
*c
) {
4056 robj
*srcset
, *dstset
;
4058 srcset
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4059 dstset
= lookupKeyWrite(c
->db
,c
->argv
[2]);
4061 /* If the source key does not exist return 0, if it's of the wrong type
4063 if (srcset
== NULL
|| srcset
->type
!= REDIS_SET
) {
4064 addReply(c
, srcset
? shared
.wrongtypeerr
: shared
.czero
);
4067 /* Error if the destination key is not a set as well */
4068 if (dstset
&& dstset
->type
!= REDIS_SET
) {
4069 addReply(c
,shared
.wrongtypeerr
);
4072 /* Remove the element from the source set */
4073 if (dictDelete(srcset
->ptr
,c
->argv
[3]) == DICT_ERR
) {
4074 /* Key not found in the src set! return zero */
4075 addReply(c
,shared
.czero
);
4079 /* Add the element to the destination set */
4081 dstset
= createSetObject();
4082 dictAdd(c
->db
->dict
,c
->argv
[2],dstset
);
4083 incrRefCount(c
->argv
[2]);
4085 if (dictAdd(dstset
->ptr
,c
->argv
[3],NULL
) == DICT_OK
)
4086 incrRefCount(c
->argv
[3]);
4087 addReply(c
,shared
.cone
);
4090 static void sismemberCommand(redisClient
*c
) {
4093 set
= lookupKeyRead(c
->db
,c
->argv
[1]);
4095 addReply(c
,shared
.czero
);
4097 if (set
->type
!= REDIS_SET
) {
4098 addReply(c
,shared
.wrongtypeerr
);
4101 if (dictFind(set
->ptr
,c
->argv
[2]))
4102 addReply(c
,shared
.cone
);
4104 addReply(c
,shared
.czero
);
4108 static void scardCommand(redisClient
*c
) {
4112 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4114 addReply(c
,shared
.czero
);
4117 if (o
->type
!= REDIS_SET
) {
4118 addReply(c
,shared
.wrongtypeerr
);
4121 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",
4127 static void spopCommand(redisClient
*c
) {
4131 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4133 addReply(c
,shared
.nullbulk
);
4135 if (set
->type
!= REDIS_SET
) {
4136 addReply(c
,shared
.wrongtypeerr
);
4139 de
= dictGetRandomKey(set
->ptr
);
4141 addReply(c
,shared
.nullbulk
);
4143 robj
*ele
= dictGetEntryKey(de
);
4145 addReplyBulkLen(c
,ele
);
4147 addReply(c
,shared
.crlf
);
4148 dictDelete(set
->ptr
,ele
);
4149 if (htNeedsResize(set
->ptr
)) dictResize(set
->ptr
);
4155 static void srandmemberCommand(redisClient
*c
) {
4159 set
= lookupKeyRead(c
->db
,c
->argv
[1]);
4161 addReply(c
,shared
.nullbulk
);
4163 if (set
->type
!= REDIS_SET
) {
4164 addReply(c
,shared
.wrongtypeerr
);
4167 de
= dictGetRandomKey(set
->ptr
);
4169 addReply(c
,shared
.nullbulk
);
4171 robj
*ele
= dictGetEntryKey(de
);
4173 addReplyBulkLen(c
,ele
);
4175 addReply(c
,shared
.crlf
);
4180 static int qsortCompareSetsByCardinality(const void *s1
, const void *s2
) {
4181 dict
**d1
= (void*) s1
, **d2
= (void*) s2
;
4183 return dictSize(*d1
)-dictSize(*d2
);
4186 static void sinterGenericCommand(redisClient
*c
, robj
**setskeys
, unsigned long setsnum
, robj
*dstkey
) {
4187 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
4190 robj
*lenobj
= NULL
, *dstset
= NULL
;
4191 unsigned long j
, cardinality
= 0;
4193 for (j
= 0; j
< setsnum
; j
++) {
4197 lookupKeyWrite(c
->db
,setskeys
[j
]) :
4198 lookupKeyRead(c
->db
,setskeys
[j
]);
4202 if (deleteKey(c
->db
,dstkey
))
4204 addReply(c
,shared
.czero
);
4206 addReply(c
,shared
.nullmultibulk
);
4210 if (setobj
->type
!= REDIS_SET
) {
4212 addReply(c
,shared
.wrongtypeerr
);
4215 dv
[j
] = setobj
->ptr
;
4217 /* Sort sets from the smallest to largest, this will improve our
4218 * algorithm's performace */
4219 qsort(dv
,setsnum
,sizeof(dict
*),qsortCompareSetsByCardinality
);
4221 /* The first thing we should output is the total number of elements...
4222 * since this is a multi-bulk write, but at this stage we don't know
4223 * the intersection set size, so we use a trick, append an empty object
4224 * to the output list and save the pointer to later modify it with the
4227 lenobj
= createObject(REDIS_STRING
,NULL
);
4229 decrRefCount(lenobj
);
4231 /* If we have a target key where to store the resulting set
4232 * create this key with an empty set inside */
4233 dstset
= createSetObject();
4236 /* Iterate all the elements of the first (smallest) set, and test
4237 * the element against all the other sets, if at least one set does
4238 * not include the element it is discarded */
4239 di
= dictGetIterator(dv
[0]);
4241 while((de
= dictNext(di
)) != NULL
) {
4244 for (j
= 1; j
< setsnum
; j
++)
4245 if (dictFind(dv
[j
],dictGetEntryKey(de
)) == NULL
) break;
4247 continue; /* at least one set does not contain the member */
4248 ele
= dictGetEntryKey(de
);
4250 addReplyBulkLen(c
,ele
);
4252 addReply(c
,shared
.crlf
);
4255 dictAdd(dstset
->ptr
,ele
,NULL
);
4259 dictReleaseIterator(di
);
4262 /* Store the resulting set into the target */
4263 deleteKey(c
->db
,dstkey
);
4264 dictAdd(c
->db
->dict
,dstkey
,dstset
);
4265 incrRefCount(dstkey
);
4269 lenobj
->ptr
= sdscatprintf(sdsempty(),"*%lu\r\n",cardinality
);
4271 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",
4272 dictSize((dict
*)dstset
->ptr
)));
4278 static void sinterCommand(redisClient
*c
) {
4279 sinterGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
);
4282 static void sinterstoreCommand(redisClient
*c
) {
4283 sinterGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1]);
4286 #define REDIS_OP_UNION 0
4287 #define REDIS_OP_DIFF 1
4289 static void sunionDiffGenericCommand(redisClient
*c
, robj
**setskeys
, int setsnum
, robj
*dstkey
, int op
) {
4290 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
4293 robj
*dstset
= NULL
;
4294 int j
, cardinality
= 0;
4296 for (j
= 0; j
< setsnum
; j
++) {
4300 lookupKeyWrite(c
->db
,setskeys
[j
]) :
4301 lookupKeyRead(c
->db
,setskeys
[j
]);
4306 if (setobj
->type
!= REDIS_SET
) {
4308 addReply(c
,shared
.wrongtypeerr
);
4311 dv
[j
] = setobj
->ptr
;
4314 /* We need a temp set object to store our union. If the dstkey
4315 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
4316 * this set object will be the resulting object to set into the target key*/
4317 dstset
= createSetObject();
4319 /* Iterate all the elements of all the sets, add every element a single
4320 * time to the result set */
4321 for (j
= 0; j
< setsnum
; j
++) {
4322 if (op
== REDIS_OP_DIFF
&& j
== 0 && !dv
[j
]) break; /* result set is empty */
4323 if (!dv
[j
]) continue; /* non existing keys are like empty sets */
4325 di
= dictGetIterator(dv
[j
]);
4327 while((de
= dictNext(di
)) != NULL
) {
4330 /* dictAdd will not add the same element multiple times */
4331 ele
= dictGetEntryKey(de
);
4332 if (op
== REDIS_OP_UNION
|| j
== 0) {
4333 if (dictAdd(dstset
->ptr
,ele
,NULL
) == DICT_OK
) {
4337 } else if (op
== REDIS_OP_DIFF
) {
4338 if (dictDelete(dstset
->ptr
,ele
) == DICT_OK
) {
4343 dictReleaseIterator(di
);
4345 if (op
== REDIS_OP_DIFF
&& cardinality
== 0) break; /* result set is empty */
4348 /* Output the content of the resulting set, if not in STORE mode */
4350 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",cardinality
));
4351 di
= dictGetIterator(dstset
->ptr
);
4352 while((de
= dictNext(di
)) != NULL
) {
4355 ele
= dictGetEntryKey(de
);
4356 addReplyBulkLen(c
,ele
);
4358 addReply(c
,shared
.crlf
);
4360 dictReleaseIterator(di
);
4362 /* If we have a target key where to store the resulting set
4363 * create this key with the result set inside */
4364 deleteKey(c
->db
,dstkey
);
4365 dictAdd(c
->db
->dict
,dstkey
,dstset
);
4366 incrRefCount(dstkey
);
4371 decrRefCount(dstset
);
4373 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",
4374 dictSize((dict
*)dstset
->ptr
)));
4380 static void sunionCommand(redisClient
*c
) {
4381 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_UNION
);
4384 static void sunionstoreCommand(redisClient
*c
) {
4385 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_UNION
);
4388 static void sdiffCommand(redisClient
*c
) {
4389 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_DIFF
);
4392 static void sdiffstoreCommand(redisClient
*c
) {
4393 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_DIFF
);
4396 /* ==================================== ZSets =============================== */
4398 /* ZSETs are ordered sets using two data structures to hold the same elements
4399 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
4402 * The elements are added to an hash table mapping Redis objects to scores.
4403 * At the same time the elements are added to a skip list mapping scores
4404 * to Redis objects (so objects are sorted by scores in this "view"). */
4406 /* This skiplist implementation is almost a C translation of the original
4407 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
4408 * Alternative to Balanced Trees", modified in three ways:
4409 * a) this implementation allows for repeated values.
4410 * b) the comparison is not just by key (our 'score') but by satellite data.
4411 * c) there is a back pointer, so it's a doubly linked list with the back
4412 * pointers being only at "level 1". This allows to traverse the list
4413 * from tail to head, useful for ZREVRANGE. */
4415 static zskiplistNode
*zslCreateNode(int level
, double score
, robj
*obj
) {
4416 zskiplistNode
*zn
= zmalloc(sizeof(*zn
));
4418 zn
->forward
= zmalloc(sizeof(zskiplistNode
*) * level
);
4424 static zskiplist
*zslCreate(void) {
4428 zsl
= zmalloc(sizeof(*zsl
));
4431 zsl
->header
= zslCreateNode(ZSKIPLIST_MAXLEVEL
,0,NULL
);
4432 for (j
= 0; j
< ZSKIPLIST_MAXLEVEL
; j
++)
4433 zsl
->header
->forward
[j
] = NULL
;
4434 zsl
->header
->backward
= NULL
;
4439 static void zslFreeNode(zskiplistNode
*node
) {
4440 decrRefCount(node
->obj
);
4441 zfree(node
->forward
);
4445 static void zslFree(zskiplist
*zsl
) {
4446 zskiplistNode
*node
= zsl
->header
->forward
[0], *next
;
4448 zfree(zsl
->header
->forward
);
4451 next
= node
->forward
[0];
4458 static int zslRandomLevel(void) {
4460 while ((random()&0xFFFF) < (ZSKIPLIST_P
* 0xFFFF))
4465 static void zslInsert(zskiplist
*zsl
, double score
, robj
*obj
) {
4466 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
4470 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4471 while (x
->forward
[i
] &&
4472 (x
->forward
[i
]->score
< score
||
4473 (x
->forward
[i
]->score
== score
&&
4474 compareStringObjects(x
->forward
[i
]->obj
,obj
) < 0)))
4478 /* we assume the key is not already inside, since we allow duplicated
4479 * scores, and the re-insertion of score and redis object should never
4480 * happpen since the caller of zslInsert() should test in the hash table
4481 * if the element is already inside or not. */
4482 level
= zslRandomLevel();
4483 if (level
> zsl
->level
) {
4484 for (i
= zsl
->level
; i
< level
; i
++)
4485 update
[i
] = zsl
->header
;
4488 x
= zslCreateNode(level
,score
,obj
);
4489 for (i
= 0; i
< level
; i
++) {
4490 x
->forward
[i
] = update
[i
]->forward
[i
];
4491 update
[i
]->forward
[i
] = x
;
4493 x
->backward
= (update
[0] == zsl
->header
) ? NULL
: update
[0];
4495 x
->forward
[0]->backward
= x
;
4501 /* Delete an element with matching score/object from the skiplist. */
4502 static int zslDelete(zskiplist
*zsl
, double score
, robj
*obj
) {
4503 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
4507 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4508 while (x
->forward
[i
] &&
4509 (x
->forward
[i
]->score
< score
||
4510 (x
->forward
[i
]->score
== score
&&
4511 compareStringObjects(x
->forward
[i
]->obj
,obj
) < 0)))
4515 /* We may have multiple elements with the same score, what we need
4516 * is to find the element with both the right score and object. */
4518 if (x
&& score
== x
->score
&& compareStringObjects(x
->obj
,obj
) == 0) {
4519 for (i
= 0; i
< zsl
->level
; i
++) {
4520 if (update
[i
]->forward
[i
] != x
) break;
4521 update
[i
]->forward
[i
] = x
->forward
[i
];
4523 if (x
->forward
[0]) {
4524 x
->forward
[0]->backward
= (x
->backward
== zsl
->header
) ?
4527 zsl
->tail
= x
->backward
;
4530 while(zsl
->level
> 1 && zsl
->header
->forward
[zsl
->level
-1] == NULL
)
4535 return 0; /* not found */
4537 return 0; /* not found */
4540 /* Delete all the elements with score between min and max from the skiplist.
4541 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
4542 * Note that this function takes the reference to the hash table view of the
4543 * sorted set, in order to remove the elements from the hash table too. */
4544 static unsigned long zslDeleteRange(zskiplist
*zsl
, double min
, double max
, dict
*dict
) {
4545 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
4546 unsigned long removed
= 0;
4550 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4551 while (x
->forward
[i
] && x
->forward
[i
]->score
< min
)
4555 /* We may have multiple elements with the same score, what we need
4556 * is to find the element with both the right score and object. */
4558 while (x
&& x
->score
<= max
) {
4559 zskiplistNode
*next
;
4561 for (i
= 0; i
< zsl
->level
; i
++) {
4562 if (update
[i
]->forward
[i
] != x
) break;
4563 update
[i
]->forward
[i
] = x
->forward
[i
];
4565 if (x
->forward
[0]) {
4566 x
->forward
[0]->backward
= (x
->backward
== zsl
->header
) ?
4569 zsl
->tail
= x
->backward
;
4571 next
= x
->forward
[0];
4572 dictDelete(dict
,x
->obj
);
4574 while(zsl
->level
> 1 && zsl
->header
->forward
[zsl
->level
-1] == NULL
)
4580 return removed
; /* not found */
4583 /* Find the first node having a score equal or greater than the specified one.
4584 * Returns NULL if there is no match. */
4585 static zskiplistNode
*zslFirstWithScore(zskiplist
*zsl
, double score
) {
4590 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4591 while (x
->forward
[i
] && x
->forward
[i
]->score
< score
)
4594 /* We may have multiple elements with the same score, what we need
4595 * is to find the element with both the right score and object. */
4596 return x
->forward
[0];
4599 /* The actual Z-commands implementations */
4601 /* This generic command implements both ZADD and ZINCRBY.
4602 * scoreval is the score if the operation is a ZADD (doincrement == 0) or
4603 * the increment if the operation is a ZINCRBY (doincrement == 1). */
4604 static void zaddGenericCommand(redisClient
*c
, robj
*key
, robj
*ele
, double scoreval
, int doincrement
) {
4609 zsetobj
= lookupKeyWrite(c
->db
,key
);
4610 if (zsetobj
== NULL
) {
4611 zsetobj
= createZsetObject();
4612 dictAdd(c
->db
->dict
,key
,zsetobj
);
4615 if (zsetobj
->type
!= REDIS_ZSET
) {
4616 addReply(c
,shared
.wrongtypeerr
);
4622 /* Ok now since we implement both ZADD and ZINCRBY here the code
4623 * needs to handle the two different conditions. It's all about setting
4624 * '*score', that is, the new score to set, to the right value. */
4625 score
= zmalloc(sizeof(double));
4629 /* Read the old score. If the element was not present starts from 0 */
4630 de
= dictFind(zs
->dict
,ele
);
4632 double *oldscore
= dictGetEntryVal(de
);
4633 *score
= *oldscore
+ scoreval
;
4641 /* What follows is a simple remove and re-insert operation that is common
4642 * to both ZADD and ZINCRBY... */
4643 if (dictAdd(zs
->dict
,ele
,score
) == DICT_OK
) {
4644 /* case 1: New element */
4645 incrRefCount(ele
); /* added to hash */
4646 zslInsert(zs
->zsl
,*score
,ele
);
4647 incrRefCount(ele
); /* added to skiplist */
4650 addReplyDouble(c
,*score
);
4652 addReply(c
,shared
.cone
);
4657 /* case 2: Score update operation */
4658 de
= dictFind(zs
->dict
,ele
);
4659 redisAssert(de
!= NULL
);
4660 oldscore
= dictGetEntryVal(de
);
4661 if (*score
!= *oldscore
) {
4664 /* Remove and insert the element in the skip list with new score */
4665 deleted
= zslDelete(zs
->zsl
,*oldscore
,ele
);
4666 redisAssert(deleted
!= 0);
4667 zslInsert(zs
->zsl
,*score
,ele
);
4669 /* Update the score in the hash table */
4670 dictReplace(zs
->dict
,ele
,score
);
4676 addReplyDouble(c
,*score
);
4678 addReply(c
,shared
.czero
);
4682 static void zaddCommand(redisClient
*c
) {
4685 scoreval
= strtod(c
->argv
[2]->ptr
,NULL
);
4686 zaddGenericCommand(c
,c
->argv
[1],c
->argv
[3],scoreval
,0);
4689 static void zincrbyCommand(redisClient
*c
) {
4692 scoreval
= strtod(c
->argv
[2]->ptr
,NULL
);
4693 zaddGenericCommand(c
,c
->argv
[1],c
->argv
[3],scoreval
,1);
4696 static void zremCommand(redisClient
*c
) {
4700 zsetobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4701 if (zsetobj
== NULL
) {
4702 addReply(c
,shared
.czero
);
4708 if (zsetobj
->type
!= REDIS_ZSET
) {
4709 addReply(c
,shared
.wrongtypeerr
);
4713 de
= dictFind(zs
->dict
,c
->argv
[2]);
4715 addReply(c
,shared
.czero
);
4718 /* Delete from the skiplist */
4719 oldscore
= dictGetEntryVal(de
);
4720 deleted
= zslDelete(zs
->zsl
,*oldscore
,c
->argv
[2]);
4721 redisAssert(deleted
!= 0);
4723 /* Delete from the hash table */
4724 dictDelete(zs
->dict
,c
->argv
[2]);
4725 if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
);
4727 addReply(c
,shared
.cone
);
4731 static void zremrangebyscoreCommand(redisClient
*c
) {
4732 double min
= strtod(c
->argv
[2]->ptr
,NULL
);
4733 double max
= strtod(c
->argv
[3]->ptr
,NULL
);
4737 zsetobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4738 if (zsetobj
== NULL
) {
4739 addReply(c
,shared
.czero
);
4743 if (zsetobj
->type
!= REDIS_ZSET
) {
4744 addReply(c
,shared
.wrongtypeerr
);
4748 deleted
= zslDeleteRange(zs
->zsl
,min
,max
,zs
->dict
);
4749 if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
);
4750 server
.dirty
+= deleted
;
4751 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",deleted
));
4755 static void zrangeGenericCommand(redisClient
*c
, int reverse
) {
4757 int start
= atoi(c
->argv
[2]->ptr
);
4758 int end
= atoi(c
->argv
[3]->ptr
);
4761 if (c
->argc
== 5 && !strcasecmp(c
->argv
[4]->ptr
,"withscores")) {
4763 } else if (c
->argc
>= 5) {
4764 addReply(c
,shared
.syntaxerr
);
4768 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4770 addReply(c
,shared
.nullmultibulk
);
4772 if (o
->type
!= REDIS_ZSET
) {
4773 addReply(c
,shared
.wrongtypeerr
);
4775 zset
*zsetobj
= o
->ptr
;
4776 zskiplist
*zsl
= zsetobj
->zsl
;
4779 int llen
= zsl
->length
;
4783 /* convert negative indexes */
4784 if (start
< 0) start
= llen
+start
;
4785 if (end
< 0) end
= llen
+end
;
4786 if (start
< 0) start
= 0;
4787 if (end
< 0) end
= 0;
4789 /* indexes sanity checks */
4790 if (start
> end
|| start
>= llen
) {
4791 /* Out of range start or start > end result in empty list */
4792 addReply(c
,shared
.emptymultibulk
);
4795 if (end
>= llen
) end
= llen
-1;
4796 rangelen
= (end
-start
)+1;
4798 /* Return the result in form of a multi-bulk reply */
4804 ln
= zsl
->header
->forward
[0];
4806 ln
= ln
->forward
[0];
4809 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",
4810 withscores
? (rangelen
*2) : rangelen
));
4811 for (j
= 0; j
< rangelen
; j
++) {
4813 addReplyBulkLen(c
,ele
);
4815 addReply(c
,shared
.crlf
);
4817 addReplyDouble(c
,ln
->score
);
4818 ln
= reverse
? ln
->backward
: ln
->forward
[0];
4824 static void zrangeCommand(redisClient
*c
) {
4825 zrangeGenericCommand(c
,0);
4828 static void zrevrangeCommand(redisClient
*c
) {
4829 zrangeGenericCommand(c
,1);
4832 static void zrangebyscoreCommand(redisClient
*c
) {
4834 double min
= strtod(c
->argv
[2]->ptr
,NULL
);
4835 double max
= strtod(c
->argv
[3]->ptr
,NULL
);
4836 int offset
= 0, limit
= -1;
4838 if (c
->argc
!= 4 && c
->argc
!= 7) {
4840 sdsnew("-ERR wrong number of arguments for ZRANGEBYSCORE\r\n"));
4842 } else if (c
->argc
== 7 && strcasecmp(c
->argv
[4]->ptr
,"limit")) {
4843 addReply(c
,shared
.syntaxerr
);
4845 } else if (c
->argc
== 7) {
4846 offset
= atoi(c
->argv
[5]->ptr
);
4847 limit
= atoi(c
->argv
[6]->ptr
);
4848 if (offset
< 0) offset
= 0;
4851 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4853 addReply(c
,shared
.nullmultibulk
);
4855 if (o
->type
!= REDIS_ZSET
) {
4856 addReply(c
,shared
.wrongtypeerr
);
4858 zset
*zsetobj
= o
->ptr
;
4859 zskiplist
*zsl
= zsetobj
->zsl
;
4862 unsigned int rangelen
= 0;
4864 /* Get the first node with the score >= min */
4865 ln
= zslFirstWithScore(zsl
,min
);
4867 /* No element matching the speciifed interval */
4868 addReply(c
,shared
.emptymultibulk
);
4872 /* We don't know in advance how many matching elements there
4873 * are in the list, so we push this object that will represent
4874 * the multi-bulk length in the output buffer, and will "fix"
4876 lenobj
= createObject(REDIS_STRING
,NULL
);
4878 decrRefCount(lenobj
);
4880 while(ln
&& ln
->score
<= max
) {
4883 ln
= ln
->forward
[0];
4886 if (limit
== 0) break;
4888 addReplyBulkLen(c
,ele
);
4890 addReply(c
,shared
.crlf
);
4891 ln
= ln
->forward
[0];
4893 if (limit
> 0) limit
--;
4895 lenobj
->ptr
= sdscatprintf(sdsempty(),"*%d\r\n",rangelen
);
4900 static void zcardCommand(redisClient
*c
) {
4904 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4906 addReply(c
,shared
.czero
);
4909 if (o
->type
!= REDIS_ZSET
) {
4910 addReply(c
,shared
.wrongtypeerr
);
4913 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",zs
->zsl
->length
));
4918 static void zscoreCommand(redisClient
*c
) {
4922 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4924 addReply(c
,shared
.nullbulk
);
4927 if (o
->type
!= REDIS_ZSET
) {
4928 addReply(c
,shared
.wrongtypeerr
);
4933 de
= dictFind(zs
->dict
,c
->argv
[2]);
4935 addReply(c
,shared
.nullbulk
);
4937 double *score
= dictGetEntryVal(de
);
4939 addReplyDouble(c
,*score
);
4945 /* ========================= Non type-specific commands ==================== */
4947 static void flushdbCommand(redisClient
*c
) {
4948 server
.dirty
+= dictSize(c
->db
->dict
);
4949 dictEmpty(c
->db
->dict
);
4950 dictEmpty(c
->db
->expires
);
4951 addReply(c
,shared
.ok
);
4954 static void flushallCommand(redisClient
*c
) {
4955 server
.dirty
+= emptyDb();
4956 addReply(c
,shared
.ok
);
4957 rdbSave(server
.dbfilename
);
4961 static redisSortOperation
*createSortOperation(int type
, robj
*pattern
) {
4962 redisSortOperation
*so
= zmalloc(sizeof(*so
));
4964 so
->pattern
= pattern
;
4968 /* Return the value associated to the key with a name obtained
4969 * substituting the first occurence of '*' in 'pattern' with 'subst' */
4970 static robj
*lookupKeyByPattern(redisDb
*db
, robj
*pattern
, robj
*subst
) {
4974 int prefixlen
, sublen
, postfixlen
;
4975 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
4979 char buf
[REDIS_SORTKEY_MAX
+1];
4982 /* If the pattern is "#" return the substitution object itself in order
4983 * to implement the "SORT ... GET #" feature. */
4984 spat
= pattern
->ptr
;
4985 if (spat
[0] == '#' && spat
[1] == '\0') {
4989 /* The substitution object may be specially encoded. If so we create
4990 * a decoded object on the fly. Otherwise getDecodedObject will just
4991 * increment the ref count, that we'll decrement later. */
4992 subst
= getDecodedObject(subst
);
4995 if (sdslen(spat
)+sdslen(ssub
)-1 > REDIS_SORTKEY_MAX
) return NULL
;
4996 p
= strchr(spat
,'*');
4998 decrRefCount(subst
);
5003 sublen
= sdslen(ssub
);
5004 postfixlen
= sdslen(spat
)-(prefixlen
+1);
5005 memcpy(keyname
.buf
,spat
,prefixlen
);
5006 memcpy(keyname
.buf
+prefixlen
,ssub
,sublen
);
5007 memcpy(keyname
.buf
+prefixlen
+sublen
,p
+1,postfixlen
);
5008 keyname
.buf
[prefixlen
+sublen
+postfixlen
] = '\0';
5009 keyname
.len
= prefixlen
+sublen
+postfixlen
;
5011 initStaticStringObject(keyobj
,((char*)&keyname
)+(sizeof(long)*2))
5012 decrRefCount(subst
);
5014 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
5015 return lookupKeyRead(db
,&keyobj
);
5018 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
5019 * the additional parameter is not standard but a BSD-specific we have to
5020 * pass sorting parameters via the global 'server' structure */
5021 static int sortCompare(const void *s1
, const void *s2
) {
5022 const redisSortObject
*so1
= s1
, *so2
= s2
;
5025 if (!server
.sort_alpha
) {
5026 /* Numeric sorting. Here it's trivial as we precomputed scores */
5027 if (so1
->u
.score
> so2
->u
.score
) {
5029 } else if (so1
->u
.score
< so2
->u
.score
) {
5035 /* Alphanumeric sorting */
5036 if (server
.sort_bypattern
) {
5037 if (!so1
->u
.cmpobj
|| !so2
->u
.cmpobj
) {
5038 /* At least one compare object is NULL */
5039 if (so1
->u
.cmpobj
== so2
->u
.cmpobj
)
5041 else if (so1
->u
.cmpobj
== NULL
)
5046 /* We have both the objects, use strcoll */
5047 cmp
= strcoll(so1
->u
.cmpobj
->ptr
,so2
->u
.cmpobj
->ptr
);
5050 /* Compare elements directly */
5053 dec1
= getDecodedObject(so1
->obj
);
5054 dec2
= getDecodedObject(so2
->obj
);
5055 cmp
= strcoll(dec1
->ptr
,dec2
->ptr
);
5060 return server
.sort_desc
? -cmp
: cmp
;
5063 /* The SORT command is the most complex command in Redis. Warning: this code
5064 * is optimized for speed and a bit less for readability */
5065 static void sortCommand(redisClient
*c
) {
5068 int desc
= 0, alpha
= 0;
5069 int limit_start
= 0, limit_count
= -1, start
, end
;
5070 int j
, dontsort
= 0, vectorlen
;
5071 int getop
= 0; /* GET operation counter */
5072 robj
*sortval
, *sortby
= NULL
, *storekey
= NULL
;
5073 redisSortObject
*vector
; /* Resulting vector to sort */
5075 /* Lookup the key to sort. It must be of the right types */
5076 sortval
= lookupKeyRead(c
->db
,c
->argv
[1]);
5077 if (sortval
== NULL
) {
5078 addReply(c
,shared
.nullmultibulk
);
5081 if (sortval
->type
!= REDIS_SET
&& sortval
->type
!= REDIS_LIST
&&
5082 sortval
->type
!= REDIS_ZSET
)
5084 addReply(c
,shared
.wrongtypeerr
);
5088 /* Create a list of operations to perform for every sorted element.
5089 * Operations can be GET/DEL/INCR/DECR */
5090 operations
= listCreate();
5091 listSetFreeMethod(operations
,zfree
);
5094 /* Now we need to protect sortval incrementing its count, in the future
5095 * SORT may have options able to overwrite/delete keys during the sorting
5096 * and the sorted key itself may get destroied */
5097 incrRefCount(sortval
);
5099 /* The SORT command has an SQL-alike syntax, parse it */
5100 while(j
< c
->argc
) {
5101 int leftargs
= c
->argc
-j
-1;
5102 if (!strcasecmp(c
->argv
[j
]->ptr
,"asc")) {
5104 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"desc")) {
5106 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"alpha")) {
5108 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"limit") && leftargs
>= 2) {
5109 limit_start
= atoi(c
->argv
[j
+1]->ptr
);
5110 limit_count
= atoi(c
->argv
[j
+2]->ptr
);
5112 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"store") && leftargs
>= 1) {
5113 storekey
= c
->argv
[j
+1];
5115 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"by") && leftargs
>= 1) {
5116 sortby
= c
->argv
[j
+1];
5117 /* If the BY pattern does not contain '*', i.e. it is constant,
5118 * we don't need to sort nor to lookup the weight keys. */
5119 if (strchr(c
->argv
[j
+1]->ptr
,'*') == NULL
) dontsort
= 1;
5121 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"get") && leftargs
>= 1) {
5122 listAddNodeTail(operations
,createSortOperation(
5123 REDIS_SORT_GET
,c
->argv
[j
+1]));
5127 decrRefCount(sortval
);
5128 listRelease(operations
);
5129 addReply(c
,shared
.syntaxerr
);
5135 /* Load the sorting vector with all the objects to sort */
5136 switch(sortval
->type
) {
5137 case REDIS_LIST
: vectorlen
= listLength((list
*)sortval
->ptr
); break;
5138 case REDIS_SET
: vectorlen
= dictSize((dict
*)sortval
->ptr
); break;
5139 case REDIS_ZSET
: vectorlen
= dictSize(((zset
*)sortval
->ptr
)->dict
); break;
5140 default: vectorlen
= 0; redisAssert(0); /* Avoid GCC warning */
5142 vector
= zmalloc(sizeof(redisSortObject
)*vectorlen
);
5145 if (sortval
->type
== REDIS_LIST
) {
5146 list
*list
= sortval
->ptr
;
5150 while((ln
= listYield(list
))) {
5151 robj
*ele
= ln
->value
;
5152 vector
[j
].obj
= ele
;
5153 vector
[j
].u
.score
= 0;
5154 vector
[j
].u
.cmpobj
= NULL
;
5162 if (sortval
->type
== REDIS_SET
) {
5165 zset
*zs
= sortval
->ptr
;
5169 di
= dictGetIterator(set
);
5170 while((setele
= dictNext(di
)) != NULL
) {
5171 vector
[j
].obj
= dictGetEntryKey(setele
);
5172 vector
[j
].u
.score
= 0;
5173 vector
[j
].u
.cmpobj
= NULL
;
5176 dictReleaseIterator(di
);
5178 redisAssert(j
== vectorlen
);
5180 /* Now it's time to load the right scores in the sorting vector */
5181 if (dontsort
== 0) {
5182 for (j
= 0; j
< vectorlen
; j
++) {
5186 byval
= lookupKeyByPattern(c
->db
,sortby
,vector
[j
].obj
);
5187 if (!byval
|| byval
->type
!= REDIS_STRING
) continue;
5189 vector
[j
].u
.cmpobj
= getDecodedObject(byval
);
5191 if (byval
->encoding
== REDIS_ENCODING_RAW
) {
5192 vector
[j
].u
.score
= strtod(byval
->ptr
,NULL
);
5194 /* Don't need to decode the object if it's
5195 * integer-encoded (the only encoding supported) so
5196 * far. We can just cast it */
5197 if (byval
->encoding
== REDIS_ENCODING_INT
) {
5198 vector
[j
].u
.score
= (long)byval
->ptr
;
5200 redisAssert(1 != 1);
5205 if (vector
[j
].obj
->encoding
== REDIS_ENCODING_RAW
)
5206 vector
[j
].u
.score
= strtod(vector
[j
].obj
->ptr
,NULL
);
5208 if (vector
[j
].obj
->encoding
== REDIS_ENCODING_INT
)
5209 vector
[j
].u
.score
= (long) vector
[j
].obj
->ptr
;
5211 redisAssert(1 != 1);
5218 /* We are ready to sort the vector... perform a bit of sanity check
5219 * on the LIMIT option too. We'll use a partial version of quicksort. */
5220 start
= (limit_start
< 0) ? 0 : limit_start
;
5221 end
= (limit_count
< 0) ? vectorlen
-1 : start
+limit_count
-1;
5222 if (start
>= vectorlen
) {
5223 start
= vectorlen
-1;
5226 if (end
>= vectorlen
) end
= vectorlen
-1;
5228 if (dontsort
== 0) {
5229 server
.sort_desc
= desc
;
5230 server
.sort_alpha
= alpha
;
5231 server
.sort_bypattern
= sortby
? 1 : 0;
5232 if (sortby
&& (start
!= 0 || end
!= vectorlen
-1))
5233 pqsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
, start
,end
);
5235 qsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
);
5238 /* Send command output to the output buffer, performing the specified
5239 * GET/DEL/INCR/DECR operations if any. */
5240 outputlen
= getop
? getop
*(end
-start
+1) : end
-start
+1;
5241 if (storekey
== NULL
) {
5242 /* STORE option not specified, sent the sorting result to client */
5243 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",outputlen
));
5244 for (j
= start
; j
<= end
; j
++) {
5247 addReplyBulkLen(c
,vector
[j
].obj
);
5248 addReply(c
,vector
[j
].obj
);
5249 addReply(c
,shared
.crlf
);
5251 listRewind(operations
);
5252 while((ln
= listYield(operations
))) {
5253 redisSortOperation
*sop
= ln
->value
;
5254 robj
*val
= lookupKeyByPattern(c
->db
,sop
->pattern
,
5257 if (sop
->type
== REDIS_SORT_GET
) {
5258 if (!val
|| val
->type
!= REDIS_STRING
) {
5259 addReply(c
,shared
.nullbulk
);
5261 addReplyBulkLen(c
,val
);
5263 addReply(c
,shared
.crlf
);
5266 redisAssert(sop
->type
== REDIS_SORT_GET
); /* always fails */
5271 robj
*listObject
= createListObject();
5272 list
*listPtr
= (list
*) listObject
->ptr
;
5274 /* STORE option specified, set the sorting result as a List object */
5275 for (j
= start
; j
<= end
; j
++) {
5278 listAddNodeTail(listPtr
,vector
[j
].obj
);
5279 incrRefCount(vector
[j
].obj
);
5281 listRewind(operations
);
5282 while((ln
= listYield(operations
))) {
5283 redisSortOperation
*sop
= ln
->value
;
5284 robj
*val
= lookupKeyByPattern(c
->db
,sop
->pattern
,
5287 if (sop
->type
== REDIS_SORT_GET
) {
5288 if (!val
|| val
->type
!= REDIS_STRING
) {
5289 listAddNodeTail(listPtr
,createStringObject("",0));
5291 listAddNodeTail(listPtr
,val
);
5295 redisAssert(sop
->type
== REDIS_SORT_GET
); /* always fails */
5299 if (dictReplace(c
->db
->dict
,storekey
,listObject
)) {
5300 incrRefCount(storekey
);
5302 /* Note: we add 1 because the DB is dirty anyway since even if the
5303 * SORT result is empty a new key is set and maybe the old content
5305 server
.dirty
+= 1+outputlen
;
5306 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",outputlen
));
5310 decrRefCount(sortval
);
5311 listRelease(operations
);
5312 for (j
= 0; j
< vectorlen
; j
++) {
5313 if (sortby
&& alpha
&& vector
[j
].u
.cmpobj
)
5314 decrRefCount(vector
[j
].u
.cmpobj
);
5319 /* Create the string returned by the INFO command. This is decoupled
5320 * by the INFO command itself as we need to report the same information
5321 * on memory corruption problems. */
5322 static sds
genRedisInfoString(void) {
5324 time_t uptime
= time(NULL
)-server
.stat_starttime
;
5327 info
= sdscatprintf(sdsempty(),
5328 "redis_version:%s\r\n"
5330 "multiplexing_api:%s\r\n"
5331 "uptime_in_seconds:%ld\r\n"
5332 "uptime_in_days:%ld\r\n"
5333 "connected_clients:%d\r\n"
5334 "connected_slaves:%d\r\n"
5335 "blocked_clients:%d\r\n"
5336 "used_memory:%zu\r\n"
5337 "changes_since_last_save:%lld\r\n"
5338 "bgsave_in_progress:%d\r\n"
5339 "last_save_time:%ld\r\n"
5340 "bgrewriteaof_in_progress:%d\r\n"
5341 "total_connections_received:%lld\r\n"
5342 "total_commands_processed:%lld\r\n"
5345 (sizeof(long) == 8) ? "64" : "32",
5349 listLength(server
.clients
)-listLength(server
.slaves
),
5350 listLength(server
.slaves
),
5351 server
.blockedclients
,
5354 server
.bgsavechildpid
!= -1,
5356 server
.bgrewritechildpid
!= -1,
5357 server
.stat_numconnections
,
5358 server
.stat_numcommands
,
5359 server
.masterhost
== NULL
? "master" : "slave"
5361 if (server
.masterhost
) {
5362 info
= sdscatprintf(info
,
5363 "master_host:%s\r\n"
5364 "master_port:%d\r\n"
5365 "master_link_status:%s\r\n"
5366 "master_last_io_seconds_ago:%d\r\n"
5369 (server
.replstate
== REDIS_REPL_CONNECTED
) ?
5371 server
.master
? ((int)(time(NULL
)-server
.master
->lastinteraction
)) : -1
5374 for (j
= 0; j
< server
.dbnum
; j
++) {
5375 long long keys
, vkeys
;
5377 keys
= dictSize(server
.db
[j
].dict
);
5378 vkeys
= dictSize(server
.db
[j
].expires
);
5379 if (keys
|| vkeys
) {
5380 info
= sdscatprintf(info
, "db%d:keys=%lld,expires=%lld\r\n",
5387 static void infoCommand(redisClient
*c
) {
5388 sds info
= genRedisInfoString();
5389 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n",
5390 (unsigned long)sdslen(info
)));
5391 addReplySds(c
,info
);
5392 addReply(c
,shared
.crlf
);
5395 static void monitorCommand(redisClient
*c
) {
5396 /* ignore MONITOR if aleady slave or in monitor mode */
5397 if (c
->flags
& REDIS_SLAVE
) return;
5399 c
->flags
|= (REDIS_SLAVE
|REDIS_MONITOR
);
5401 listAddNodeTail(server
.monitors
,c
);
5402 addReply(c
,shared
.ok
);
5405 /* ================================= Expire ================================= */
5406 static int removeExpire(redisDb
*db
, robj
*key
) {
5407 if (dictDelete(db
->expires
,key
) == DICT_OK
) {
5414 static int setExpire(redisDb
*db
, robj
*key
, time_t when
) {
5415 if (dictAdd(db
->expires
,key
,(void*)when
) == DICT_ERR
) {
5423 /* Return the expire time of the specified key, or -1 if no expire
5424 * is associated with this key (i.e. the key is non volatile) */
5425 static time_t getExpire(redisDb
*db
, robj
*key
) {
5428 /* No expire? return ASAP */
5429 if (dictSize(db
->expires
) == 0 ||
5430 (de
= dictFind(db
->expires
,key
)) == NULL
) return -1;
5432 return (time_t) dictGetEntryVal(de
);
5435 static int expireIfNeeded(redisDb
*db
, robj
*key
) {
5439 /* No expire? return ASAP */
5440 if (dictSize(db
->expires
) == 0 ||
5441 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
5443 /* Lookup the expire */
5444 when
= (time_t) dictGetEntryVal(de
);
5445 if (time(NULL
) <= when
) return 0;
5447 /* Delete the key */
5448 dictDelete(db
->expires
,key
);
5449 return dictDelete(db
->dict
,key
) == DICT_OK
;
5452 static int deleteIfVolatile(redisDb
*db
, robj
*key
) {
5455 /* No expire? return ASAP */
5456 if (dictSize(db
->expires
) == 0 ||
5457 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
5459 /* Delete the key */
5461 dictDelete(db
->expires
,key
);
5462 return dictDelete(db
->dict
,key
) == DICT_OK
;
5465 static void expireGenericCommand(redisClient
*c
, robj
*key
, time_t seconds
) {
5468 de
= dictFind(c
->db
->dict
,key
);
5470 addReply(c
,shared
.czero
);
5474 if (deleteKey(c
->db
,key
)) server
.dirty
++;
5475 addReply(c
, shared
.cone
);
5478 time_t when
= time(NULL
)+seconds
;
5479 if (setExpire(c
->db
,key
,when
)) {
5480 addReply(c
,shared
.cone
);
5483 addReply(c
,shared
.czero
);
5489 static void expireCommand(redisClient
*c
) {
5490 expireGenericCommand(c
,c
->argv
[1],strtol(c
->argv
[2]->ptr
,NULL
,10));
5493 static void expireatCommand(redisClient
*c
) {
5494 expireGenericCommand(c
,c
->argv
[1],strtol(c
->argv
[2]->ptr
,NULL
,10)-time(NULL
));
5497 static void ttlCommand(redisClient
*c
) {
5501 expire
= getExpire(c
->db
,c
->argv
[1]);
5503 ttl
= (int) (expire
-time(NULL
));
5504 if (ttl
< 0) ttl
= -1;
5506 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",ttl
));
5509 /* ================================ MULTI/EXEC ============================== */
5511 /* Client state initialization for MULTI/EXEC */
5512 static void initClientMultiState(redisClient
*c
) {
5513 c
->mstate
.commands
= NULL
;
5514 c
->mstate
.count
= 0;
5517 /* Release all the resources associated with MULTI/EXEC state */
5518 static void freeClientMultiState(redisClient
*c
) {
5521 for (j
= 0; j
< c
->mstate
.count
; j
++) {
5523 multiCmd
*mc
= c
->mstate
.commands
+j
;
5525 for (i
= 0; i
< mc
->argc
; i
++)
5526 decrRefCount(mc
->argv
[i
]);
5529 zfree(c
->mstate
.commands
);
5532 /* Add a new command into the MULTI commands queue */
5533 static void queueMultiCommand(redisClient
*c
, struct redisCommand
*cmd
) {
5537 c
->mstate
.commands
= zrealloc(c
->mstate
.commands
,
5538 sizeof(multiCmd
)*(c
->mstate
.count
+1));
5539 mc
= c
->mstate
.commands
+c
->mstate
.count
;
5542 mc
->argv
= zmalloc(sizeof(robj
*)*c
->argc
);
5543 memcpy(mc
->argv
,c
->argv
,sizeof(robj
*)*c
->argc
);
5544 for (j
= 0; j
< c
->argc
; j
++)
5545 incrRefCount(mc
->argv
[j
]);
5549 static void multiCommand(redisClient
*c
) {
5550 c
->flags
|= REDIS_MULTI
;
5551 addReply(c
,shared
.ok
);
5554 static void execCommand(redisClient
*c
) {
5559 if (!(c
->flags
& REDIS_MULTI
)) {
5560 addReplySds(c
,sdsnew("-ERR EXEC without MULTI\r\n"));
5564 orig_argv
= c
->argv
;
5565 orig_argc
= c
->argc
;
5566 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",c
->mstate
.count
));
5567 for (j
= 0; j
< c
->mstate
.count
; j
++) {
5568 c
->argc
= c
->mstate
.commands
[j
].argc
;
5569 c
->argv
= c
->mstate
.commands
[j
].argv
;
5570 call(c
,c
->mstate
.commands
[j
].cmd
);
5572 c
->argv
= orig_argv
;
5573 c
->argc
= orig_argc
;
5574 freeClientMultiState(c
);
5575 initClientMultiState(c
);
5576 c
->flags
&= (~REDIS_MULTI
);
5579 /* =========================== Blocking Operations ========================= */
5581 /* Currently Redis blocking operations support is limited to list POP ops,
5582 * so the current implementation is not fully generic, but it is also not
5583 * completely specific so it will not require a rewrite to support new
5584 * kind of blocking operations in the future.
5586 * Still it's important to note that list blocking operations can be already
5587 * used as a notification mechanism in order to implement other blocking
5588 * operations at application level, so there must be a very strong evidence
5589 * of usefulness and generality before new blocking operations are implemented.
5591 * This is how the current blocking POP works, we use BLPOP as example:
5592 * - If the user calls BLPOP and the key exists and contains a non empty list
5593 * then LPOP is called instead. So BLPOP is semantically the same as LPOP
5594 * if there is not to block.
5595 * - If instead BLPOP is called and the key does not exists or the list is
5596 * empty we need to block. In order to do so we remove the notification for
5597 * new data to read in the client socket (so that we'll not serve new
5598 * requests if the blocking request is not served). Also we put the client
5599 * in a dictionary (db->blockingkeys) mapping keys to a list of clients
5600 * blocking for this keys.
5601 * - If a PUSH operation against a key with blocked clients waiting is
5602 * performed, we serve the first in the list: basically instead to push
5603 * the new element inside the list we return it to the (first / oldest)
5604 * blocking client, unblock the client, and remove it form the list.
5606 * The above comment and the source code should be enough in order to understand
5607 * the implementation and modify / fix it later.
5610 /* Set a client in blocking mode for the specified key, with the specified
5612 static void blockForKeys(redisClient
*c
, robj
**keys
, int numkeys
, time_t timeout
) {
5617 c
->blockingkeys
= zmalloc(sizeof(robj
*)*numkeys
);
5618 c
->blockingkeysnum
= numkeys
;
5619 c
->blockingto
= timeout
;
5620 for (j
= 0; j
< numkeys
; j
++) {
5621 /* Add the key in the client structure, to map clients -> keys */
5622 c
->blockingkeys
[j
] = keys
[j
];
5623 incrRefCount(keys
[j
]);
5625 /* And in the other "side", to map keys -> clients */
5626 de
= dictFind(c
->db
->blockingkeys
,keys
[j
]);
5630 /* For every key we take a list of clients blocked for it */
5632 retval
= dictAdd(c
->db
->blockingkeys
,keys
[j
],l
);
5633 incrRefCount(keys
[j
]);
5634 assert(retval
== DICT_OK
);
5636 l
= dictGetEntryVal(de
);
5638 listAddNodeTail(l
,c
);
5640 /* Mark the client as a blocked client */
5641 c
->flags
|= REDIS_BLOCKED
;
5642 aeDeleteFileEvent(server
.el
,c
->fd
,AE_READABLE
);
5643 server
.blockedclients
++;
5646 /* Unblock a client that's waiting in a blocking operation such as BLPOP */
5647 static void unblockClient(redisClient
*c
) {
5652 assert(c
->blockingkeys
!= NULL
);
5653 /* The client may wait for multiple keys, so unblock it for every key. */
5654 for (j
= 0; j
< c
->blockingkeysnum
; j
++) {
5655 /* Remove this client from the list of clients waiting for this key. */
5656 de
= dictFind(c
->db
->blockingkeys
,c
->blockingkeys
[j
]);
5658 l
= dictGetEntryVal(de
);
5659 listDelNode(l
,listSearchKey(l
,c
));
5660 /* If the list is empty we need to remove it to avoid wasting memory */
5661 if (listLength(l
) == 0)
5662 dictDelete(c
->db
->blockingkeys
,c
->blockingkeys
[j
]);
5663 decrRefCount(c
->blockingkeys
[j
]);
5665 /* Cleanup the client structure */
5666 zfree(c
->blockingkeys
);
5667 c
->blockingkeys
= NULL
;
5668 c
->flags
&= (~REDIS_BLOCKED
);
5669 server
.blockedclients
--;
5670 /* Ok now we are ready to get read events from socket, note that we
5671 * can't trap errors here as it's possible that unblockClients() is
5672 * called from freeClient() itself, and the only thing we can do
5673 * if we failed to register the READABLE event is to kill the client.
5674 * Still the following function should never fail in the real world as
5675 * we are sure the file descriptor is sane, and we exit on out of mem. */
5676 aeCreateFileEvent(server
.el
, c
->fd
, AE_READABLE
, readQueryFromClient
, c
);
5677 /* As a final step we want to process data if there is some command waiting
5678 * in the input buffer. Note that this is safe even if unblockClient()
5679 * gets called from freeClient() because freeClient() will be smart
5680 * enough to call this function *after* c->querybuf was set to NULL. */
5681 if (c
->querybuf
&& sdslen(c
->querybuf
) > 0) processInputBuffer(c
);
5684 /* This should be called from any function PUSHing into lists.
5685 * 'c' is the "pushing client", 'key' is the key it is pushing data against,
5686 * 'ele' is the element pushed.
5688 * If the function returns 0 there was no client waiting for a list push
5691 * If the function returns 1 there was a client waiting for a list push
5692 * against this key, the element was passed to this client thus it's not
5693 * needed to actually add it to the list and the caller should return asap. */
5694 static int handleClientsWaitingListPush(redisClient
*c
, robj
*key
, robj
*ele
) {
5695 struct dictEntry
*de
;
5696 redisClient
*receiver
;
5700 de
= dictFind(c
->db
->blockingkeys
,key
);
5701 if (de
== NULL
) return 0;
5702 l
= dictGetEntryVal(de
);
5705 receiver
= ln
->value
;
5707 addReplySds(receiver
,sdsnew("*2\r\n"));
5708 addReplyBulkLen(receiver
,key
);
5709 addReply(receiver
,key
);
5710 addReply(receiver
,shared
.crlf
);
5711 addReplyBulkLen(receiver
,ele
);
5712 addReply(receiver
,ele
);
5713 addReply(receiver
,shared
.crlf
);
5714 unblockClient(receiver
);
5718 /* Blocking RPOP/LPOP */
5719 static void blockingPopGenericCommand(redisClient
*c
, int where
) {
5724 for (j
= 1; j
< c
->argc
-1; j
++) {
5725 o
= lookupKeyWrite(c
->db
,c
->argv
[j
]);
5727 if (o
->type
!= REDIS_LIST
) {
5728 addReply(c
,shared
.wrongtypeerr
);
5731 list
*list
= o
->ptr
;
5732 if (listLength(list
) != 0) {
5733 /* If the list contains elements fall back to the usual
5734 * non-blocking POP operation */
5735 robj
*argv
[2], **orig_argv
;
5738 /* We need to alter the command arguments before to call
5739 * popGenericCommand() as the command takes a single key. */
5740 orig_argv
= c
->argv
;
5741 orig_argc
= c
->argc
;
5742 argv
[1] = c
->argv
[j
];
5746 /* Also the return value is different, we need to output
5747 * the multi bulk reply header and the key name. The
5748 * "real" command will add the last element (the value)
5749 * for us. If this souds like an hack to you it's just
5750 * because it is... */
5751 addReplySds(c
,sdsnew("*2\r\n"));
5752 addReplyBulkLen(c
,argv
[1]);
5753 addReply(c
,argv
[1]);
5754 addReply(c
,shared
.crlf
);
5755 popGenericCommand(c
,where
);
5757 /* Fix the client structure with the original stuff */
5758 c
->argv
= orig_argv
;
5759 c
->argc
= orig_argc
;
5765 /* If the list is empty or the key does not exists we must block */
5766 timeout
= strtol(c
->argv
[c
->argc
-1]->ptr
,NULL
,10);
5767 if (timeout
> 0) timeout
+= time(NULL
);
5768 blockForKeys(c
,c
->argv
+1,c
->argc
-2,timeout
);
5771 static void blpopCommand(redisClient
*c
) {
5772 blockingPopGenericCommand(c
,REDIS_HEAD
);
5775 static void brpopCommand(redisClient
*c
) {
5776 blockingPopGenericCommand(c
,REDIS_TAIL
);
5779 /* =============================== Replication ============================= */
5781 static int syncWrite(int fd
, char *ptr
, ssize_t size
, int timeout
) {
5782 ssize_t nwritten
, ret
= size
;
5783 time_t start
= time(NULL
);
5787 if (aeWait(fd
,AE_WRITABLE
,1000) & AE_WRITABLE
) {
5788 nwritten
= write(fd
,ptr
,size
);
5789 if (nwritten
== -1) return -1;
5793 if ((time(NULL
)-start
) > timeout
) {
5801 static int syncRead(int fd
, char *ptr
, ssize_t size
, int timeout
) {
5802 ssize_t nread
, totread
= 0;
5803 time_t start
= time(NULL
);
5807 if (aeWait(fd
,AE_READABLE
,1000) & AE_READABLE
) {
5808 nread
= read(fd
,ptr
,size
);
5809 if (nread
== -1) return -1;
5814 if ((time(NULL
)-start
) > timeout
) {
5822 static int syncReadLine(int fd
, char *ptr
, ssize_t size
, int timeout
) {
5829 if (syncRead(fd
,&c
,1,timeout
) == -1) return -1;
5832 if (nread
&& *(ptr
-1) == '\r') *(ptr
-1) = '\0';
5843 static void syncCommand(redisClient
*c
) {
5844 /* ignore SYNC if aleady slave or in monitor mode */
5845 if (c
->flags
& REDIS_SLAVE
) return;
5847 /* SYNC can't be issued when the server has pending data to send to
5848 * the client about already issued commands. We need a fresh reply
5849 * buffer registering the differences between the BGSAVE and the current
5850 * dataset, so that we can copy to other slaves if needed. */
5851 if (listLength(c
->reply
) != 0) {
5852 addReplySds(c
,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
5856 redisLog(REDIS_NOTICE
,"Slave ask for synchronization");
5857 /* Here we need to check if there is a background saving operation
5858 * in progress, or if it is required to start one */
5859 if (server
.bgsavechildpid
!= -1) {
5860 /* Ok a background save is in progress. Let's check if it is a good
5861 * one for replication, i.e. if there is another slave that is
5862 * registering differences since the server forked to save */
5866 listRewind(server
.slaves
);
5867 while((ln
= listYield(server
.slaves
))) {
5869 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) break;
5872 /* Perfect, the server is already registering differences for
5873 * another slave. Set the right state, and copy the buffer. */
5874 listRelease(c
->reply
);
5875 c
->reply
= listDup(slave
->reply
);
5876 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
5877 redisLog(REDIS_NOTICE
,"Waiting for end of BGSAVE for SYNC");
5879 /* No way, we need to wait for the next BGSAVE in order to
5880 * register differences */
5881 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_START
;
5882 redisLog(REDIS_NOTICE
,"Waiting for next BGSAVE for SYNC");
5885 /* Ok we don't have a BGSAVE in progress, let's start one */
5886 redisLog(REDIS_NOTICE
,"Starting BGSAVE for SYNC");
5887 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
5888 redisLog(REDIS_NOTICE
,"Replication failed, can't BGSAVE");
5889 addReplySds(c
,sdsnew("-ERR Unalbe to perform background save\r\n"));
5892 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
5895 c
->flags
|= REDIS_SLAVE
;
5897 listAddNodeTail(server
.slaves
,c
);
5901 static void sendBulkToSlave(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
5902 redisClient
*slave
= privdata
;
5904 REDIS_NOTUSED(mask
);
5905 char buf
[REDIS_IOBUF_LEN
];
5906 ssize_t nwritten
, buflen
;
5908 if (slave
->repldboff
== 0) {
5909 /* Write the bulk write count before to transfer the DB. In theory here
5910 * we don't know how much room there is in the output buffer of the
5911 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
5912 * operations) will never be smaller than the few bytes we need. */
5915 bulkcount
= sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
5917 if (write(fd
,bulkcount
,sdslen(bulkcount
)) != (signed)sdslen(bulkcount
))
5925 lseek(slave
->repldbfd
,slave
->repldboff
,SEEK_SET
);
5926 buflen
= read(slave
->repldbfd
,buf
,REDIS_IOBUF_LEN
);
5928 redisLog(REDIS_WARNING
,"Read error sending DB to slave: %s",
5929 (buflen
== 0) ? "premature EOF" : strerror(errno
));
5933 if ((nwritten
= write(fd
,buf
,buflen
)) == -1) {
5934 redisLog(REDIS_DEBUG
,"Write error sending DB to slave: %s",
5939 slave
->repldboff
+= nwritten
;
5940 if (slave
->repldboff
== slave
->repldbsize
) {
5941 close(slave
->repldbfd
);
5942 slave
->repldbfd
= -1;
5943 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
5944 slave
->replstate
= REDIS_REPL_ONLINE
;
5945 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
,
5946 sendReplyToClient
, slave
) == AE_ERR
) {
5950 addReplySds(slave
,sdsempty());
5951 redisLog(REDIS_NOTICE
,"Synchronization with slave succeeded");
5955 /* This function is called at the end of every backgrond saving.
5956 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
5957 * otherwise REDIS_ERR is passed to the function.
5959 * The goal of this function is to handle slaves waiting for a successful
5960 * background saving in order to perform non-blocking synchronization. */
5961 static void updateSlavesWaitingBgsave(int bgsaveerr
) {
5963 int startbgsave
= 0;
5965 listRewind(server
.slaves
);
5966 while((ln
= listYield(server
.slaves
))) {
5967 redisClient
*slave
= ln
->value
;
5969 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) {
5971 slave
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
5972 } else if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) {
5973 struct redis_stat buf
;
5975 if (bgsaveerr
!= REDIS_OK
) {
5977 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE child returned an error");
5980 if ((slave
->repldbfd
= open(server
.dbfilename
,O_RDONLY
)) == -1 ||
5981 redis_fstat(slave
->repldbfd
,&buf
) == -1) {
5983 redisLog(REDIS_WARNING
,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno
));
5986 slave
->repldboff
= 0;
5987 slave
->repldbsize
= buf
.st_size
;
5988 slave
->replstate
= REDIS_REPL_SEND_BULK
;
5989 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
5990 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
, sendBulkToSlave
, slave
) == AE_ERR
) {
5997 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
5998 listRewind(server
.slaves
);
5999 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE failed");
6000 while((ln
= listYield(server
.slaves
))) {
6001 redisClient
*slave
= ln
->value
;
6003 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
)
6010 static int syncWithMaster(void) {
6011 char buf
[1024], tmpfile
[256], authcmd
[1024];
6013 int fd
= anetTcpConnect(NULL
,server
.masterhost
,server
.masterport
);
6017 redisLog(REDIS_WARNING
,"Unable to connect to MASTER: %s",
6022 /* AUTH with the master if required. */
6023 if(server
.masterauth
) {
6024 snprintf(authcmd
, 1024, "AUTH %s\r\n", server
.masterauth
);
6025 if (syncWrite(fd
, authcmd
, strlen(server
.masterauth
)+7, 5) == -1) {
6027 redisLog(REDIS_WARNING
,"Unable to AUTH to MASTER: %s",
6031 /* Read the AUTH result. */
6032 if (syncReadLine(fd
,buf
,1024,3600) == -1) {
6034 redisLog(REDIS_WARNING
,"I/O error reading auth result from MASTER: %s",
6038 if (buf
[0] != '+') {
6040 redisLog(REDIS_WARNING
,"Cannot AUTH to MASTER, is the masterauth password correct?");
6045 /* Issue the SYNC command */
6046 if (syncWrite(fd
,"SYNC \r\n",7,5) == -1) {
6048 redisLog(REDIS_WARNING
,"I/O error writing to MASTER: %s",
6052 /* Read the bulk write count */
6053 if (syncReadLine(fd
,buf
,1024,3600) == -1) {
6055 redisLog(REDIS_WARNING
,"I/O error reading bulk count from MASTER: %s",
6059 if (buf
[0] != '$') {
6061 redisLog(REDIS_WARNING
,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
6064 dumpsize
= atoi(buf
+1);
6065 redisLog(REDIS_NOTICE
,"Receiving %d bytes data dump from MASTER",dumpsize
);
6066 /* Read the bulk write data on a temp file */
6067 snprintf(tmpfile
,256,"temp-%d.%ld.rdb",(int)time(NULL
),(long int)random());
6068 dfd
= open(tmpfile
,O_CREAT
|O_WRONLY
,0644);
6071 redisLog(REDIS_WARNING
,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno
));
6075 int nread
, nwritten
;
6077 nread
= read(fd
,buf
,(dumpsize
< 1024)?dumpsize
:1024);
6079 redisLog(REDIS_WARNING
,"I/O error trying to sync with MASTER: %s",
6085 nwritten
= write(dfd
,buf
,nread
);
6086 if (nwritten
== -1) {
6087 redisLog(REDIS_WARNING
,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno
));
6095 if (rename(tmpfile
,server
.dbfilename
) == -1) {
6096 redisLog(REDIS_WARNING
,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno
));
6102 if (rdbLoad(server
.dbfilename
) != REDIS_OK
) {
6103 redisLog(REDIS_WARNING
,"Failed trying to load the MASTER synchronization DB from disk");
6107 server
.master
= createClient(fd
);
6108 server
.master
->flags
|= REDIS_MASTER
;
6109 server
.master
->authenticated
= 1;
6110 server
.replstate
= REDIS_REPL_CONNECTED
;
6114 static void slaveofCommand(redisClient
*c
) {
6115 if (!strcasecmp(c
->argv
[1]->ptr
,"no") &&
6116 !strcasecmp(c
->argv
[2]->ptr
,"one")) {
6117 if (server
.masterhost
) {
6118 sdsfree(server
.masterhost
);
6119 server
.masterhost
= NULL
;
6120 if (server
.master
) freeClient(server
.master
);
6121 server
.replstate
= REDIS_REPL_NONE
;
6122 redisLog(REDIS_NOTICE
,"MASTER MODE enabled (user request)");
6125 sdsfree(server
.masterhost
);
6126 server
.masterhost
= sdsdup(c
->argv
[1]->ptr
);
6127 server
.masterport
= atoi(c
->argv
[2]->ptr
);
6128 if (server
.master
) freeClient(server
.master
);
6129 server
.replstate
= REDIS_REPL_CONNECT
;
6130 redisLog(REDIS_NOTICE
,"SLAVE OF %s:%d enabled (user request)",
6131 server
.masterhost
, server
.masterport
);
6133 addReply(c
,shared
.ok
);
6136 /* ============================ Maxmemory directive ======================== */
6138 /* This function gets called when 'maxmemory' is set on the config file to limit
6139 * the max memory used by the server, and we are out of memory.
6140 * This function will try to, in order:
6142 * - Free objects from the free list
6143 * - Try to remove keys with an EXPIRE set
6145 * It is not possible to free enough memory to reach used-memory < maxmemory
6146 * the server will start refusing commands that will enlarge even more the
6149 static void freeMemoryIfNeeded(void) {
6150 while (server
.maxmemory
&& zmalloc_used_memory() > server
.maxmemory
) {
6151 if (listLength(server
.objfreelist
)) {
6154 listNode
*head
= listFirst(server
.objfreelist
);
6155 o
= listNodeValue(head
);
6156 listDelNode(server
.objfreelist
,head
);
6159 int j
, k
, freed
= 0;
6161 for (j
= 0; j
< server
.dbnum
; j
++) {
6163 robj
*minkey
= NULL
;
6164 struct dictEntry
*de
;
6166 if (dictSize(server
.db
[j
].expires
)) {
6168 /* From a sample of three keys drop the one nearest to
6169 * the natural expire */
6170 for (k
= 0; k
< 3; k
++) {
6173 de
= dictGetRandomKey(server
.db
[j
].expires
);
6174 t
= (time_t) dictGetEntryVal(de
);
6175 if (minttl
== -1 || t
< minttl
) {
6176 minkey
= dictGetEntryKey(de
);
6180 deleteKey(server
.db
+j
,minkey
);
6183 if (!freed
) return; /* nothing to free... */
6188 /* ============================== Append Only file ========================== */
6190 static void feedAppendOnlyFile(struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
) {
6191 sds buf
= sdsempty();
6197 /* The DB this command was targetting is not the same as the last command
6198 * we appendend. To issue a SELECT command is needed. */
6199 if (dictid
!= server
.appendseldb
) {
6202 snprintf(seldb
,sizeof(seldb
),"%d",dictid
);
6203 buf
= sdscatprintf(buf
,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
6204 (unsigned long)strlen(seldb
),seldb
);
6205 server
.appendseldb
= dictid
;
6208 /* "Fix" the argv vector if the command is EXPIRE. We want to translate
6209 * EXPIREs into EXPIREATs calls */
6210 if (cmd
->proc
== expireCommand
) {
6213 tmpargv
[0] = createStringObject("EXPIREAT",8);
6214 tmpargv
[1] = argv
[1];
6215 incrRefCount(argv
[1]);
6216 when
= time(NULL
)+strtol(argv
[2]->ptr
,NULL
,10);
6217 tmpargv
[2] = createObject(REDIS_STRING
,
6218 sdscatprintf(sdsempty(),"%ld",when
));
6222 /* Append the actual command */
6223 buf
= sdscatprintf(buf
,"*%d\r\n",argc
);
6224 for (j
= 0; j
< argc
; j
++) {
6227 o
= getDecodedObject(o
);
6228 buf
= sdscatprintf(buf
,"$%lu\r\n",(unsigned long)sdslen(o
->ptr
));
6229 buf
= sdscatlen(buf
,o
->ptr
,sdslen(o
->ptr
));
6230 buf
= sdscatlen(buf
,"\r\n",2);
6234 /* Free the objects from the modified argv for EXPIREAT */
6235 if (cmd
->proc
== expireCommand
) {
6236 for (j
= 0; j
< 3; j
++)
6237 decrRefCount(argv
[j
]);
6240 /* We want to perform a single write. This should be guaranteed atomic
6241 * at least if the filesystem we are writing is a real physical one.
6242 * While this will save us against the server being killed I don't think
6243 * there is much to do about the whole server stopping for power problems
6245 nwritten
= write(server
.appendfd
,buf
,sdslen(buf
));
6246 if (nwritten
!= (signed)sdslen(buf
)) {
6247 /* Ooops, we are in troubles. The best thing to do for now is
6248 * to simply exit instead to give the illusion that everything is
6249 * working as expected. */
6250 if (nwritten
== -1) {
6251 redisLog(REDIS_WARNING
,"Exiting on error writing to the append-only file: %s",strerror(errno
));
6253 redisLog(REDIS_WARNING
,"Exiting on short write while writing to the append-only file: %s",strerror(errno
));
6257 /* If a background append only file rewriting is in progress we want to
6258 * accumulate the differences between the child DB and the current one
6259 * in a buffer, so that when the child process will do its work we
6260 * can append the differences to the new append only file. */
6261 if (server
.bgrewritechildpid
!= -1)
6262 server
.bgrewritebuf
= sdscatlen(server
.bgrewritebuf
,buf
,sdslen(buf
));
6266 if (server
.appendfsync
== APPENDFSYNC_ALWAYS
||
6267 (server
.appendfsync
== APPENDFSYNC_EVERYSEC
&&
6268 now
-server
.lastfsync
> 1))
6270 fsync(server
.appendfd
); /* Let's try to get this data on the disk */
6271 server
.lastfsync
= now
;
6275 /* In Redis commands are always executed in the context of a client, so in
6276 * order to load the append only file we need to create a fake client. */
6277 static struct redisClient
*createFakeClient(void) {
6278 struct redisClient
*c
= zmalloc(sizeof(*c
));
6282 c
->querybuf
= sdsempty();
6286 /* We set the fake client as a slave waiting for the synchronization
6287 * so that Redis will not try to send replies to this client. */
6288 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_START
;
6289 c
->reply
= listCreate();
6290 listSetFreeMethod(c
->reply
,decrRefCount
);
6291 listSetDupMethod(c
->reply
,dupClientReplyValue
);
6295 static void freeFakeClient(struct redisClient
*c
) {
6296 sdsfree(c
->querybuf
);
6297 listRelease(c
->reply
);
6301 /* Replay the append log file. On error REDIS_OK is returned. On non fatal
6302 * error (the append only file is zero-length) REDIS_ERR is returned. On
6303 * fatal error an error message is logged and the program exists. */
6304 int loadAppendOnlyFile(char *filename
) {
6305 struct redisClient
*fakeClient
;
6306 FILE *fp
= fopen(filename
,"r");
6307 struct redis_stat sb
;
6309 if (redis_fstat(fileno(fp
),&sb
) != -1 && sb
.st_size
== 0)
6313 redisLog(REDIS_WARNING
,"Fatal error: can't open the append log file for reading: %s",strerror(errno
));
6317 fakeClient
= createFakeClient();
6324 struct redisCommand
*cmd
;
6326 if (fgets(buf
,sizeof(buf
),fp
) == NULL
) {
6332 if (buf
[0] != '*') goto fmterr
;
6334 argv
= zmalloc(sizeof(robj
*)*argc
);
6335 for (j
= 0; j
< argc
; j
++) {
6336 if (fgets(buf
,sizeof(buf
),fp
) == NULL
) goto readerr
;
6337 if (buf
[0] != '$') goto fmterr
;
6338 len
= strtol(buf
+1,NULL
,10);
6339 argsds
= sdsnewlen(NULL
,len
);
6340 if (len
&& fread(argsds
,len
,1,fp
) == 0) goto fmterr
;
6341 argv
[j
] = createObject(REDIS_STRING
,argsds
);
6342 if (fread(buf
,2,1,fp
) == 0) goto fmterr
; /* discard CRLF */
6345 /* Command lookup */
6346 cmd
= lookupCommand(argv
[0]->ptr
);
6348 redisLog(REDIS_WARNING
,"Unknown command '%s' reading the append only file", argv
[0]->ptr
);
6351 /* Try object sharing and encoding */
6352 if (server
.shareobjects
) {
6354 for(j
= 1; j
< argc
; j
++)
6355 argv
[j
] = tryObjectSharing(argv
[j
]);
6357 if (cmd
->flags
& REDIS_CMD_BULK
)
6358 tryObjectEncoding(argv
[argc
-1]);
6359 /* Run the command in the context of a fake client */
6360 fakeClient
->argc
= argc
;
6361 fakeClient
->argv
= argv
;
6362 cmd
->proc(fakeClient
);
6363 /* Discard the reply objects list from the fake client */
6364 while(listLength(fakeClient
->reply
))
6365 listDelNode(fakeClient
->reply
,listFirst(fakeClient
->reply
));
6366 /* Clean up, ready for the next command */
6367 for (j
= 0; j
< argc
; j
++) decrRefCount(argv
[j
]);
6371 freeFakeClient(fakeClient
);
6376 redisLog(REDIS_WARNING
,"Unexpected end of file reading the append only file");
6378 redisLog(REDIS_WARNING
,"Unrecoverable error reading the append only file: %s", strerror(errno
));
6382 redisLog(REDIS_WARNING
,"Bad file format reading the append only file");
6386 /* Write an object into a file in the bulk format $<count>\r\n<payload>\r\n */
6387 static int fwriteBulk(FILE *fp
, robj
*obj
) {
6389 obj
= getDecodedObject(obj
);
6390 snprintf(buf
,sizeof(buf
),"$%ld\r\n",(long)sdslen(obj
->ptr
));
6391 if (fwrite(buf
,strlen(buf
),1,fp
) == 0) goto err
;
6392 if (sdslen(obj
->ptr
) && fwrite(obj
->ptr
,sdslen(obj
->ptr
),1,fp
) == 0)
6394 if (fwrite("\r\n",2,1,fp
) == 0) goto err
;
6402 /* Write a double value in bulk format $<count>\r\n<payload>\r\n */
6403 static int fwriteBulkDouble(FILE *fp
, double d
) {
6404 char buf
[128], dbuf
[128];
6406 snprintf(dbuf
,sizeof(dbuf
),"%.17g\r\n",d
);
6407 snprintf(buf
,sizeof(buf
),"$%lu\r\n",(unsigned long)strlen(dbuf
)-2);
6408 if (fwrite(buf
,strlen(buf
),1,fp
) == 0) return 0;
6409 if (fwrite(dbuf
,strlen(dbuf
),1,fp
) == 0) return 0;
6413 /* Write a long value in bulk format $<count>\r\n<payload>\r\n */
6414 static int fwriteBulkLong(FILE *fp
, long l
) {
6415 char buf
[128], lbuf
[128];
6417 snprintf(lbuf
,sizeof(lbuf
),"%ld\r\n",l
);
6418 snprintf(buf
,sizeof(buf
),"$%lu\r\n",(unsigned long)strlen(lbuf
)-2);
6419 if (fwrite(buf
,strlen(buf
),1,fp
) == 0) return 0;
6420 if (fwrite(lbuf
,strlen(lbuf
),1,fp
) == 0) return 0;
6424 /* Write a sequence of commands able to fully rebuild the dataset into
6425 * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */
6426 static int rewriteAppendOnlyFile(char *filename
) {
6427 dictIterator
*di
= NULL
;
6432 time_t now
= time(NULL
);
6434 /* Note that we have to use a different temp name here compared to the
6435 * one used by rewriteAppendOnlyFileBackground() function. */
6436 snprintf(tmpfile
,256,"temp-rewriteaof-%d.aof", (int) getpid());
6437 fp
= fopen(tmpfile
,"w");
6439 redisLog(REDIS_WARNING
, "Failed rewriting the append only file: %s", strerror(errno
));
6442 for (j
= 0; j
< server
.dbnum
; j
++) {
6443 char selectcmd
[] = "*2\r\n$6\r\nSELECT\r\n";
6444 redisDb
*db
= server
.db
+j
;
6446 if (dictSize(d
) == 0) continue;
6447 di
= dictGetIterator(d
);
6453 /* SELECT the new DB */
6454 if (fwrite(selectcmd
,sizeof(selectcmd
)-1,1,fp
) == 0) goto werr
;
6455 if (fwriteBulkLong(fp
,j
) == 0) goto werr
;
6457 /* Iterate this DB writing every entry */
6458 while((de
= dictNext(di
)) != NULL
) {
6459 robj
*key
= dictGetEntryKey(de
);
6460 robj
*o
= dictGetEntryVal(de
);
6461 time_t expiretime
= getExpire(db
,key
);
6463 /* Save the key and associated value */
6464 if (o
->type
== REDIS_STRING
) {
6465 /* Emit a SET command */
6466 char cmd
[]="*3\r\n$3\r\nSET\r\n";
6467 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6469 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6470 if (fwriteBulk(fp
,o
) == 0) goto werr
;
6471 } else if (o
->type
== REDIS_LIST
) {
6472 /* Emit the RPUSHes needed to rebuild the list */
6473 list
*list
= o
->ptr
;
6477 while((ln
= listYield(list
))) {
6478 char cmd
[]="*3\r\n$5\r\nRPUSH\r\n";
6479 robj
*eleobj
= listNodeValue(ln
);
6481 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6482 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6483 if (fwriteBulk(fp
,eleobj
) == 0) goto werr
;
6485 } else if (o
->type
== REDIS_SET
) {
6486 /* Emit the SADDs needed to rebuild the set */
6488 dictIterator
*di
= dictGetIterator(set
);
6491 while((de
= dictNext(di
)) != NULL
) {
6492 char cmd
[]="*3\r\n$4\r\nSADD\r\n";
6493 robj
*eleobj
= dictGetEntryKey(de
);
6495 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6496 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6497 if (fwriteBulk(fp
,eleobj
) == 0) goto werr
;
6499 dictReleaseIterator(di
);
6500 } else if (o
->type
== REDIS_ZSET
) {
6501 /* Emit the ZADDs needed to rebuild the sorted set */
6503 dictIterator
*di
= dictGetIterator(zs
->dict
);
6506 while((de
= dictNext(di
)) != NULL
) {
6507 char cmd
[]="*4\r\n$4\r\nZADD\r\n";
6508 robj
*eleobj
= dictGetEntryKey(de
);
6509 double *score
= dictGetEntryVal(de
);
6511 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6512 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6513 if (fwriteBulkDouble(fp
,*score
) == 0) goto werr
;
6514 if (fwriteBulk(fp
,eleobj
) == 0) goto werr
;
6516 dictReleaseIterator(di
);
6518 redisAssert(0 != 0);
6520 /* Save the expire time */
6521 if (expiretime
!= -1) {
6522 char cmd
[]="*3\r\n$8\r\nEXPIREAT\r\n";
6523 /* If this key is already expired skip it */
6524 if (expiretime
< now
) continue;
6525 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6526 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6527 if (fwriteBulkLong(fp
,expiretime
) == 0) goto werr
;
6530 dictReleaseIterator(di
);
6533 /* Make sure data will not remain on the OS's output buffers */
6538 /* Use RENAME to make sure the DB file is changed atomically only
6539 * if the generate DB file is ok. */
6540 if (rename(tmpfile
,filename
) == -1) {
6541 redisLog(REDIS_WARNING
,"Error moving temp append only file on the final destination: %s", strerror(errno
));
6545 redisLog(REDIS_NOTICE
,"SYNC append only file rewrite performed");
6551 redisLog(REDIS_WARNING
,"Write error writing append only file on disk: %s", strerror(errno
));
6552 if (di
) dictReleaseIterator(di
);
6556 /* This is how rewriting of the append only file in background works:
6558 * 1) The user calls BGREWRITEAOF
6559 * 2) Redis calls this function, that forks():
6560 * 2a) the child rewrite the append only file in a temp file.
6561 * 2b) the parent accumulates differences in server.bgrewritebuf.
6562 * 3) When the child finished '2a' exists.
6563 * 4) The parent will trap the exit code, if it's OK, will append the
6564 * data accumulated into server.bgrewritebuf into the temp file, and
6565 * finally will rename(2) the temp file in the actual file name.
6566 * The the new file is reopened as the new append only file. Profit!
6568 static int rewriteAppendOnlyFileBackground(void) {
6571 if (server
.bgrewritechildpid
!= -1) return REDIS_ERR
;
6572 if ((childpid
= fork()) == 0) {
6577 snprintf(tmpfile
,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
6578 if (rewriteAppendOnlyFile(tmpfile
) == REDIS_OK
) {
6585 if (childpid
== -1) {
6586 redisLog(REDIS_WARNING
,
6587 "Can't rewrite append only file in background: fork: %s",
6591 redisLog(REDIS_NOTICE
,
6592 "Background append only file rewriting started by pid %d",childpid
);
6593 server
.bgrewritechildpid
= childpid
;
6594 /* We set appendseldb to -1 in order to force the next call to the
6595 * feedAppendOnlyFile() to issue a SELECT command, so the differences
6596 * accumulated by the parent into server.bgrewritebuf will start
6597 * with a SELECT statement and it will be safe to merge. */
6598 server
.appendseldb
= -1;
6601 return REDIS_OK
; /* unreached */
6604 static void bgrewriteaofCommand(redisClient
*c
) {
6605 if (server
.bgrewritechildpid
!= -1) {
6606 addReplySds(c
,sdsnew("-ERR background append only file rewriting already in progress\r\n"));
6609 if (rewriteAppendOnlyFileBackground() == REDIS_OK
) {
6610 char *status
= "+Background append only file rewriting started\r\n";
6611 addReplySds(c
,sdsnew(status
));
6613 addReply(c
,shared
.err
);
6617 static void aofRemoveTempFile(pid_t childpid
) {
6620 snprintf(tmpfile
,256,"temp-rewriteaof-bg-%d.aof", (int) childpid
);
6624 /* =============================== Virtual Memory =========================== */
6625 static void vmInit(void) {
6628 server
.vm_fp
= fopen("/tmp/redisvm","w+b");
6629 if (server
.vm_fp
== NULL
) {
6630 redisLog(REDIS_WARNING
,"Impossible to open the swap file. Exiting.");
6633 server
.vm_fd
= fileno(server
.vm_fp
);
6634 server
.vm_next_page
= 0;
6635 server
.vm_near_pages
= 0;
6636 totsize
= server
.vm_pages
*server
.vm_page_size
;
6637 redisLog(REDIS_NOTICE
,"Allocating %lld bytes of swap file",totsize
);
6638 if (ftruncate(server
.vm_fd
,totsize
) == -1) {
6639 redisLog(REDIS_WARNING
,"Can't ftruncate swap file: %s. Exiting.",
6643 redisLog(REDIS_NOTICE
,"Swap file allocated with success");
6645 server
.vm_bitmap
= zmalloc((server
.vm_near_pages
+7)/8);
6646 memset(server
.vm_bitmap
,0,(server
.vm_near_pages
+7)/8);
6647 /* Try to remove the swap file, so the OS will really delete it from the
6648 * file system when Redis exists. */
6649 unlink("/tmp/redisvm");
6652 /* Mark the page as used */
6653 static void vmMarkPageUsed(off_t page
) {
6654 off_t byte
= page
/8;
6656 server
.vm_bitmap
[byte
] |= 1<<bit
;
6659 /* Mark N contiguous pages as used, with 'page' being the first. */
6660 static void vmMarkPagesUsed(off_t page
, off_t count
) {
6663 for (j
= 0; j
< count
; j
++)
6664 vmMarkPageUsed(page
+count
);
6667 /* Mark the page as free */
6668 static void vmMarkPageFree(off_t page
) {
6669 off_t byte
= page
/8;
6671 server
.vm_bitmap
[byte
] &= ~(1<<bit
);
6674 /* Mark N contiguous pages as free, with 'page' being the first. */
6675 static void vmMarkPagesFree(off_t page
, off_t count
) {
6678 for (j
= 0; j
< count
; j
++)
6679 vmMarkPageFree(page
+count
);
6682 /* Test if the page is free */
6683 static int vmFreePage(off_t page
) {
6684 off_t byte
= page
/8;
6686 return server
.vm_bitmap
[byte
] & bit
;
6689 /* Find N contiguous free pages storing the first page of the cluster in *first.
6690 * Returns REDIS_OK if it was able to find N contiguous pages, otherwise
6691 * REDIS_ERR is returned.
6693 * This function uses a simple algorithm: we try to allocate
6694 * REDIS_VM_MAX_NEAR_PAGES sequentially, when we reach this limit we start
6695 * again from the start of the swap file searching for free spaces.
6697 * If it looks pretty clear that there are no free pages near our offset
6698 * we try to find less populated places doing a forward jump of
6699 * REDIS_VM_MAX_RANDOM_JUMP, then we start scanning again a few pages
6700 * without hurry, and then we jump again and so forth...
6702 * This function can be improved using a free list to avoid to guess
6703 * too much, since we could collect data about freed pages.
6705 * note: I implemented this function just after watching an episode of
6706 * Battlestar Galactica, where the hybrid was continuing to say "JUMP!"
6708 static int vmFindContiguousPages(off_t
*first
, int n
) {
6709 off_t base
, offset
= 0, since_jump
= 0, numfree
= 0;
6711 if (server
.vm_near_pages
== REDIS_VM_MAX_NEAR_PAGES
) {
6712 server
.vm_near_pages
= 0;
6713 server
.vm_next_page
= 0;
6715 server
.vm_near_pages
++; /* Yet another try for pages near to the old ones */
6716 base
= server
.vm_next_page
;
6718 while(offset
< server
.vm_pages
) {
6719 off_t
this = base
+offset
;
6721 /* If we overflow, restart from page zero */
6722 if (this >= server
.vm_pages
) {
6723 this -= server
.vm_pages
;
6725 /* Just overflowed, what we found on tail is no longer
6726 * interesting, as it's no longer contiguous. */
6730 if (vmFreePage(this)) {
6731 /* This is a free page */
6733 /* Already got N free pages? Return to the caller, with success */
6739 /* The current one is not a free page */
6743 /* Fast-forward if the current page is not free and we already
6744 * searched enough near this place. */
6746 if (!numfree
&& since_jump
>= REDIS_VM_MAX_RANDOM_JUMP
/4) {
6747 offset
+= random() % REDIS_VM_MAX_RANDOM_JUMP
;
6749 /* Note that even if we rewind after the jump, we are don't need
6750 * to make sure numfree is set to zero as we only jump *if* it
6751 * is set to zero. */
6753 /* Otherwise just check the next page */
6760 /* Swap the 'val' object relative to 'key' into disk. Store all the information
6761 * needed to later retrieve the object into the key object.
6762 * If we can't find enough contiguous empty pages to swap the object on disk
6763 * REDIS_ERR is returned. */
6764 static int vmSwapObject(robj
*key
, robj
*val
) {
6765 off_t pages
= rdbSavedObjectPages(val
);
6768 assert(key
->storage
== REDIS_VM_MEMORY
);
6769 if (vmFindContiguousPages(&page
,pages
) == REDIS_ERR
) return REDIS_ERR
;
6770 if (fseeko(server
.vm_fp
,page
*server
.vm_page_size
,SEEK_SET
) == -1) {
6771 redisLog(REDIS_WARNING
,
6772 "Critical VM problem in vmSwapObject(): can't seek: %s",
6776 rdbSaveObject(server
.vm_fp
,val
);
6777 key
->vm
.page
= page
;
6778 key
->vm
.usedpages
= pages
;
6779 key
->storage
= REDIS_VM_SWAPPED
;
6780 decrRefCount(val
); /* Deallocate the object from memory. */
6781 vmMarkPagesUsed(page
,pages
);
6785 /* Load the value object relative to the 'key' object from swap to memory.
6786 * The newly allocated object is returned. */
6787 static robj
*vmLoadObject(robj
*key
) {
6790 assert(key
->storage
== REDIS_VM_SWAPPED
);
6791 if (fseeko(server
.vm_fp
,key
->vm
.page
*server
.vm_page_size
,SEEK_SET
) == -1) {
6792 redisLog(REDIS_WARNING
,
6793 "Unrecoverable VM problem in vmLoadObject(): can't seek: %s",
6797 val
= rdbLoadObject(key
->type
,server
.vm_fp
);
6799 redisLog(REDIS_WARNING
, "Unrecoverable VM problem in vmLoadObject(): can't load object from swap file: %s", strerror(errno
));
6802 key
->storage
= REDIS_VM_MEMORY
;
6803 key
->vm
.atime
= server
.unixtime
;
6804 vmMarkPagesFree(key
->vm
.page
,key
->vm
.usedpages
);
6808 /* ================================= Debugging ============================== */
6810 static void debugCommand(redisClient
*c
) {
6811 if (!strcasecmp(c
->argv
[1]->ptr
,"segfault")) {
6813 } else if (!strcasecmp(c
->argv
[1]->ptr
,"reload")) {
6814 if (rdbSave(server
.dbfilename
) != REDIS_OK
) {
6815 addReply(c
,shared
.err
);
6819 if (rdbLoad(server
.dbfilename
) != REDIS_OK
) {
6820 addReply(c
,shared
.err
);
6823 redisLog(REDIS_WARNING
,"DB reloaded by DEBUG RELOAD");
6824 addReply(c
,shared
.ok
);
6825 } else if (!strcasecmp(c
->argv
[1]->ptr
,"loadaof")) {
6827 if (loadAppendOnlyFile(server
.appendfilename
) != REDIS_OK
) {
6828 addReply(c
,shared
.err
);
6831 redisLog(REDIS_WARNING
,"Append Only File loaded by DEBUG LOADAOF");
6832 addReply(c
,shared
.ok
);
6833 } else if (!strcasecmp(c
->argv
[1]->ptr
,"object") && c
->argc
== 3) {
6834 dictEntry
*de
= dictFind(c
->db
->dict
,c
->argv
[2]);
6838 addReply(c
,shared
.nokeyerr
);
6841 key
= dictGetEntryKey(de
);
6842 val
= dictGetEntryVal(de
);
6843 addReplySds(c
,sdscatprintf(sdsempty(),
6844 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d serializedlength:%lld\r\n",
6845 (void*)key
, key
->refcount
, (void*)val
, val
->refcount
,
6846 val
->encoding
, rdbSavedObjectLen(val
)));
6848 addReplySds(c
,sdsnew(
6849 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>|RELOAD]\r\n"));
6853 static void _redisAssert(char *estr
) {
6854 redisLog(REDIS_WARNING
,"=== ASSERTION FAILED ===");
6855 redisLog(REDIS_WARNING
,"==> %s\n",estr
);
6856 #ifdef HAVE_BACKTRACE
6857 redisLog(REDIS_WARNING
,"(forcing SIGSEGV in order to print the stack trace)");
6862 /* =================================== Main! ================================ */
6865 int linuxOvercommitMemoryValue(void) {
6866 FILE *fp
= fopen("/proc/sys/vm/overcommit_memory","r");
6870 if (fgets(buf
,64,fp
) == NULL
) {
6879 void linuxOvercommitMemoryWarning(void) {
6880 if (linuxOvercommitMemoryValue() == 0) {
6881 redisLog(REDIS_WARNING
,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
6884 #endif /* __linux__ */
6886 static void daemonize(void) {
6890 if (fork() != 0) exit(0); /* parent exits */
6891 printf("New pid: %d\n", getpid());
6892 setsid(); /* create a new session */
6894 /* Every output goes to /dev/null. If Redis is daemonized but
6895 * the 'logfile' is set to 'stdout' in the configuration file
6896 * it will not log at all. */
6897 if ((fd
= open("/dev/null", O_RDWR
, 0)) != -1) {
6898 dup2(fd
, STDIN_FILENO
);
6899 dup2(fd
, STDOUT_FILENO
);
6900 dup2(fd
, STDERR_FILENO
);
6901 if (fd
> STDERR_FILENO
) close(fd
);
6903 /* Try to write the pid file */
6904 fp
= fopen(server
.pidfile
,"w");
6906 fprintf(fp
,"%d\n",getpid());
6911 int main(int argc
, char **argv
) {
6914 resetServerSaveParams();
6915 loadServerConfig(argv
[1]);
6916 } else if (argc
> 2) {
6917 fprintf(stderr
,"Usage: ./redis-server [/path/to/redis.conf]\n");
6920 redisLog(REDIS_WARNING
,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
6922 if (server
.daemonize
) daemonize();
6924 redisLog(REDIS_NOTICE
,"Server started, Redis version " REDIS_VERSION
);
6926 linuxOvercommitMemoryWarning();
6928 if (server
.appendonly
) {
6929 if (loadAppendOnlyFile(server
.appendfilename
) == REDIS_OK
)
6930 redisLog(REDIS_NOTICE
,"DB loaded from append only file");
6932 if (rdbLoad(server
.dbfilename
) == REDIS_OK
)
6933 redisLog(REDIS_NOTICE
,"DB loaded from disk");
6935 if (aeCreateFileEvent(server
.el
, server
.fd
, AE_READABLE
,
6936 acceptHandler
, NULL
) == AE_ERR
) oom("creating file event");
6937 redisLog(REDIS_NOTICE
,"The server is now ready to accept connections on port %d", server
.port
);
6939 aeDeleteEventLoop(server
.el
);
6943 /* ============================= Backtrace support ========================= */
6945 #ifdef HAVE_BACKTRACE
6946 static char *findFuncName(void *pointer
, unsigned long *offset
);
6948 static void *getMcontextEip(ucontext_t
*uc
) {
6949 #if defined(__FreeBSD__)
6950 return (void*) uc
->uc_mcontext
.mc_eip
;
6951 #elif defined(__dietlibc__)
6952 return (void*) uc
->uc_mcontext
.eip
;
6953 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
6955 return (void*) uc
->uc_mcontext
->__ss
.__rip
;
6957 return (void*) uc
->uc_mcontext
->__ss
.__eip
;
6959 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
6960 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
6961 return (void*) uc
->uc_mcontext
->__ss
.__rip
;
6963 return (void*) uc
->uc_mcontext
->__ss
.__eip
;
6965 #elif defined(__i386__) || defined(__X86_64__) || defined(__x86_64__)
6966 return (void*) uc
->uc_mcontext
.gregs
[REG_EIP
]; /* Linux 32/64 bit */
6967 #elif defined(__ia64__) /* Linux IA64 */
6968 return (void*) uc
->uc_mcontext
.sc_ip
;
6974 static void segvHandler(int sig
, siginfo_t
*info
, void *secret
) {
6976 char **messages
= NULL
;
6977 int i
, trace_size
= 0;
6978 unsigned long offset
=0;
6979 ucontext_t
*uc
= (ucontext_t
*) secret
;
6981 REDIS_NOTUSED(info
);
6983 redisLog(REDIS_WARNING
,
6984 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION
, sig
);
6985 infostring
= genRedisInfoString();
6986 redisLog(REDIS_WARNING
, "%s",infostring
);
6987 /* It's not safe to sdsfree() the returned string under memory
6988 * corruption conditions. Let it leak as we are going to abort */
6990 trace_size
= backtrace(trace
, 100);
6991 /* overwrite sigaction with caller's address */
6992 if (getMcontextEip(uc
) != NULL
) {
6993 trace
[1] = getMcontextEip(uc
);
6995 messages
= backtrace_symbols(trace
, trace_size
);
6997 for (i
=1; i
<trace_size
; ++i
) {
6998 char *fn
= findFuncName(trace
[i
], &offset
), *p
;
7000 p
= strchr(messages
[i
],'+');
7001 if (!fn
|| (p
&& ((unsigned long)strtol(p
+1,NULL
,10)) < offset
)) {
7002 redisLog(REDIS_WARNING
,"%s", messages
[i
]);
7004 redisLog(REDIS_WARNING
,"%d redis-server %p %s + %d", i
, trace
[i
], fn
, (unsigned int)offset
);
7007 /* free(messages); Don't call free() with possibly corrupted memory. */
7011 static void setupSigSegvAction(void) {
7012 struct sigaction act
;
7014 sigemptyset (&act
.sa_mask
);
7015 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
7016 * is used. Otherwise, sa_handler is used */
7017 act
.sa_flags
= SA_NODEFER
| SA_ONSTACK
| SA_RESETHAND
| SA_SIGINFO
;
7018 act
.sa_sigaction
= segvHandler
;
7019 sigaction (SIGSEGV
, &act
, NULL
);
7020 sigaction (SIGBUS
, &act
, NULL
);
7021 sigaction (SIGFPE
, &act
, NULL
);
7022 sigaction (SIGILL
, &act
, NULL
);
7023 sigaction (SIGBUS
, &act
, NULL
);
7027 #include "staticsymbols.h"
7028 /* This function try to convert a pointer into a function name. It's used in
7029 * oreder to provide a backtrace under segmentation fault that's able to
7030 * display functions declared as static (otherwise the backtrace is useless). */
7031 static char *findFuncName(void *pointer
, unsigned long *offset
){
7033 unsigned long off
, minoff
= 0;
7035 /* Try to match against the Symbol with the smallest offset */
7036 for (i
=0; symsTable
[i
].pointer
; i
++) {
7037 unsigned long lp
= (unsigned long) pointer
;
7039 if (lp
!= (unsigned long)-1 && lp
>= symsTable
[i
].pointer
) {
7040 off
=lp
-symsTable
[i
].pointer
;
7041 if (ret
< 0 || off
< minoff
) {
7047 if (ret
== -1) return NULL
;
7049 return symsTable
[ret
].name
;
7051 #else /* HAVE_BACKTRACE */
7052 static void setupSigSegvAction(void) {
7054 #endif /* HAVE_BACKTRACE */