2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
30 #define REDIS_VERSION "1.3.2"
40 #define __USE_POSIX199309
46 #endif /* HAVE_BACKTRACE */
54 #include <arpa/inet.h>
58 #include <sys/resource.h>
64 #include "solarisfixes.h"
68 #include "ae.h" /* Event driven programming library */
69 #include "sds.h" /* Dynamic safe strings */
70 #include "anet.h" /* Networking the easy way */
71 #include "dict.h" /* Hash tables */
72 #include "adlist.h" /* Linked lists */
73 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
74 #include "lzf.h" /* LZF compression library */
75 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
81 /* Static server configuration */
82 #define REDIS_SERVERPORT 6379 /* TCP port */
83 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
84 #define REDIS_IOBUF_LEN 1024
85 #define REDIS_LOADBUF_LEN 1024
86 #define REDIS_STATIC_ARGS 4
87 #define REDIS_DEFAULT_DBNUM 16
88 #define REDIS_CONFIGLINE_MAX 1024
89 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
90 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
91 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
92 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
93 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
95 /* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */
96 #define REDIS_WRITEV_THRESHOLD 3
97 /* Max number of iovecs used for each writev call */
98 #define REDIS_WRITEV_IOVEC_COUNT 256
100 /* Hash table parameters */
101 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
104 #define REDIS_CMD_BULK 1 /* Bulk write command */
105 #define REDIS_CMD_INLINE 2 /* Inline command */
106 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
107 this flags will return an error when the 'maxmemory' option is set in the
108 config file and the server is using more than maxmemory bytes of memory.
109 In short this commands are denied on low memory conditions. */
110 #define REDIS_CMD_DENYOOM 4
113 #define REDIS_STRING 0
119 /* Objects encoding */
120 #define REDIS_ENCODING_RAW 0 /* Raw representation */
121 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
123 /* Object types only used for dumping to disk */
124 #define REDIS_EXPIRETIME 253
125 #define REDIS_SELECTDB 254
126 #define REDIS_EOF 255
128 /* Defines related to the dump file format. To store 32 bits lengths for short
129 * keys requires a lot of space, so we check the most significant 2 bits of
130 * the first byte to interpreter the length:
132 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
133 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
134 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
135 * 11|000000 this means: specially encoded object will follow. The six bits
136 * number specify the kind of object that follows.
137 * See the REDIS_RDB_ENC_* defines.
139 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
140 * values, will fit inside. */
141 #define REDIS_RDB_6BITLEN 0
142 #define REDIS_RDB_14BITLEN 1
143 #define REDIS_RDB_32BITLEN 2
144 #define REDIS_RDB_ENCVAL 3
145 #define REDIS_RDB_LENERR UINT_MAX
147 /* When a length of a string object stored on disk has the first two bits
148 * set, the remaining two bits specify a special encoding for the object
149 * accordingly to the following defines: */
150 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
151 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
152 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
153 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
155 /* Virtual memory object->where field. */
156 #define REDIS_VM_MEMORY 0 /* The object is on memory */
157 #define REDIS_VM_SWAPPED 1 /* The object is on disk */
158 #define REDIS_VM_SWAPPING 2 /* Redis is swapping this object on disk */
159 #define REDIS_VM_LOADING 3 /* Redis is loading this object from disk */
161 /* Virtual memory static configuration stuff.
162 * Check vmFindContiguousPages() to know more about this magic numbers. */
163 #define REDIS_VM_MAX_NEAR_PAGES 65536
164 #define REDIS_VM_MAX_RANDOM_JUMP 4096
167 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
168 #define REDIS_SLAVE 2 /* This client is a slave server */
169 #define REDIS_MASTER 4 /* This client is a master server */
170 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
171 #define REDIS_MULTI 16 /* This client is in a MULTI context */
172 #define REDIS_BLOCKED 32 /* The client is waiting in a blocking operation */
174 /* Slave replication state - slave side */
175 #define REDIS_REPL_NONE 0 /* No active replication */
176 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
177 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
179 /* Slave replication state - from the point of view of master
180 * Note that in SEND_BULK and ONLINE state the slave receives new updates
181 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
182 * to start the next background saving in order to send updates to it. */
183 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
184 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
185 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
186 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
188 /* List related stuff */
192 /* Sort operations */
193 #define REDIS_SORT_GET 0
194 #define REDIS_SORT_ASC 1
195 #define REDIS_SORT_DESC 2
196 #define REDIS_SORTKEY_MAX 1024
199 #define REDIS_DEBUG 0
200 #define REDIS_NOTICE 1
201 #define REDIS_WARNING 2
203 /* Anti-warning macro... */
204 #define REDIS_NOTUSED(V) ((void) V)
206 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
207 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
209 /* Append only defines */
210 #define APPENDFSYNC_NO 0
211 #define APPENDFSYNC_ALWAYS 1
212 #define APPENDFSYNC_EVERYSEC 2
214 /* We can print the stacktrace, so our assert is defined this way: */
215 #define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e),exit(1)))
216 static void _redisAssert(char *estr
);
218 /*================================= Data types ============================== */
220 /* A redis object, that is a type able to hold a string / list / set */
222 /* The VM object structure */
223 struct redisObjectVM
{
224 off_t page
; /* the page at witch the object is stored on disk */
225 off_t usedpages
; /* number of pages used on disk */
226 time_t atime
; /* Last access time */
229 /* The actual Redis Object */
230 typedef struct redisObject
{
233 unsigned char encoding
;
234 unsigned char storage
; /* where? REDIS_VM_MEMORY, REDIS_VM_SWAPPED, ... */
235 unsigned char notused
;
237 /* VM fields, this are only allocated if VM is active, otherwise the
238 * object allocation function will just allocate
239 * sizeof(redisObjct) minus sizeof(redisObjectVM), so using
240 * Redis without VM active will not have any overhead. */
241 struct redisObjectVM vm
;
244 /* Macro used to initalize a Redis object allocated on the stack.
245 * Note that this macro is taken near the structure definition to make sure
246 * we'll update it when the structure is changed, to avoid bugs like
247 * bug #85 introduced exactly in this way. */
248 #define initStaticStringObject(_var,_ptr) do { \
250 _var.type = REDIS_STRING; \
251 _var.encoding = REDIS_ENCODING_RAW; \
253 if (server.vm_enabled) _var.storage = REDIS_VM_MEMORY; \
256 typedef struct redisDb
{
257 dict
*dict
; /* The keyspace for this DB */
258 dict
*expires
; /* Timeout of keys with a timeout set */
259 dict
*blockingkeys
; /* Keys with clients waiting for data (BLPOP) */
263 /* Client MULTI/EXEC state */
264 typedef struct multiCmd
{
267 struct redisCommand
*cmd
;
270 typedef struct multiState
{
271 multiCmd
*commands
; /* Array of MULTI commands */
272 int count
; /* Total number of MULTI commands */
275 /* With multiplexing we need to take per-clinet state.
276 * Clients are taken in a liked list. */
277 typedef struct redisClient
{
282 robj
**argv
, **mbargv
;
284 int bulklen
; /* bulk read len. -1 if not in bulk read mode */
285 int multibulk
; /* multi bulk command format active */
288 time_t lastinteraction
; /* time of the last interaction, used for timeout */
289 int flags
; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
291 int slaveseldb
; /* slave selected db, if this client is a slave */
292 int authenticated
; /* when requirepass is non-NULL */
293 int replstate
; /* replication state if this is a slave */
294 int repldbfd
; /* replication DB file descriptor */
295 long repldboff
; /* replication DB file offset */
296 off_t repldbsize
; /* replication DB file size */
297 multiState mstate
; /* MULTI/EXEC state */
298 robj
**blockingkeys
; /* The key we waiting to terminate a blocking
299 * operation such as BLPOP. Otherwise NULL. */
300 int blockingkeysnum
; /* Number of blocking keys */
301 time_t blockingto
; /* Blocking operation timeout. If UNIX current time
302 * is >= blockingto then the operation timed out. */
310 /* Global server state structure */
315 dict
*sharingpool
; /* Poll used for object sharing */
316 unsigned int sharingpoolsize
;
317 long long dirty
; /* changes to DB from the last save */
319 list
*slaves
, *monitors
;
320 char neterr
[ANET_ERR_LEN
];
322 int cronloops
; /* number of times the cron function run */
323 list
*objfreelist
; /* A list of freed objects to avoid malloc() */
324 time_t lastsave
; /* Unix time of last save succeeede */
325 size_t usedmemory
; /* Used memory in megabytes */
326 /* Fields used only for stats */
327 time_t stat_starttime
; /* server start time */
328 long long stat_numcommands
; /* number of processed commands */
329 long long stat_numconnections
; /* number of connections received */
342 pid_t bgsavechildpid
;
343 pid_t bgrewritechildpid
;
344 sds bgrewritebuf
; /* buffer taken by parent during oppend only rewrite */
345 struct saveparam
*saveparams
;
350 char *appendfilename
;
354 /* Replication related */
359 redisClient
*master
; /* client that is master for this slave */
361 unsigned int maxclients
;
362 unsigned long maxmemory
;
363 unsigned int blockedclients
;
364 /* Sort parameters - qsort_r() is only available under BSD so we
365 * have to take this state global, in order to pass it to sortCompare() */
369 /* Virtual memory configuration */
374 /* Virtual memory state */
377 off_t vm_next_page
; /* Next probably empty page */
378 off_t vm_near_pages
; /* Number of pages allocated sequentially */
379 unsigned char *vm_bitmap
; /* Bitmap of free/used pages */
380 time_t unixtime
; /* Unix time sampled every second. */
383 typedef void redisCommandProc(redisClient
*c
);
384 struct redisCommand
{
386 redisCommandProc
*proc
;
391 struct redisFunctionSym
{
393 unsigned long pointer
;
396 typedef struct _redisSortObject
{
404 typedef struct _redisSortOperation
{
407 } redisSortOperation
;
409 /* ZSETs use a specialized version of Skiplists */
411 typedef struct zskiplistNode
{
412 struct zskiplistNode
**forward
;
413 struct zskiplistNode
*backward
;
418 typedef struct zskiplist
{
419 struct zskiplistNode
*header
, *tail
;
420 unsigned long length
;
424 typedef struct zset
{
429 /* Our shared "common" objects */
431 struct sharedObjectsStruct
{
432 robj
*crlf
, *ok
, *err
, *emptybulk
, *czero
, *cone
, *pong
, *space
,
433 *colon
, *nullbulk
, *nullmultibulk
, *queued
,
434 *emptymultibulk
, *wrongtypeerr
, *nokeyerr
, *syntaxerr
, *sameobjecterr
,
435 *outofrangeerr
, *plus
,
436 *select0
, *select1
, *select2
, *select3
, *select4
,
437 *select5
, *select6
, *select7
, *select8
, *select9
;
440 /* Global vars that are actally used as constants. The following double
441 * values are used for double on-disk serialization, and are initialized
442 * at runtime to avoid strange compiler optimizations. */
444 static double R_Zero
, R_PosInf
, R_NegInf
, R_Nan
;
446 /*================================ Prototypes =============================== */
448 static void freeStringObject(robj
*o
);
449 static void freeListObject(robj
*o
);
450 static void freeSetObject(robj
*o
);
451 static void decrRefCount(void *o
);
452 static robj
*createObject(int type
, void *ptr
);
453 static void freeClient(redisClient
*c
);
454 static int rdbLoad(char *filename
);
455 static void addReply(redisClient
*c
, robj
*obj
);
456 static void addReplySds(redisClient
*c
, sds s
);
457 static void incrRefCount(robj
*o
);
458 static int rdbSaveBackground(char *filename
);
459 static robj
*createStringObject(char *ptr
, size_t len
);
460 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
);
461 static void feedAppendOnlyFile(struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
);
462 static int syncWithMaster(void);
463 static robj
*tryObjectSharing(robj
*o
);
464 static int tryObjectEncoding(robj
*o
);
465 static robj
*getDecodedObject(robj
*o
);
466 static int removeExpire(redisDb
*db
, robj
*key
);
467 static int expireIfNeeded(redisDb
*db
, robj
*key
);
468 static int deleteIfVolatile(redisDb
*db
, robj
*key
);
469 static int deleteKey(redisDb
*db
, robj
*key
);
470 static time_t getExpire(redisDb
*db
, robj
*key
);
471 static int setExpire(redisDb
*db
, robj
*key
, time_t when
);
472 static void updateSlavesWaitingBgsave(int bgsaveerr
);
473 static void freeMemoryIfNeeded(void);
474 static int processCommand(redisClient
*c
);
475 static void setupSigSegvAction(void);
476 static void rdbRemoveTempFile(pid_t childpid
);
477 static void aofRemoveTempFile(pid_t childpid
);
478 static size_t stringObjectLen(robj
*o
);
479 static void processInputBuffer(redisClient
*c
);
480 static zskiplist
*zslCreate(void);
481 static void zslFree(zskiplist
*zsl
);
482 static void zslInsert(zskiplist
*zsl
, double score
, robj
*obj
);
483 static void sendReplyToClientWritev(aeEventLoop
*el
, int fd
, void *privdata
, int mask
);
484 static void initClientMultiState(redisClient
*c
);
485 static void freeClientMultiState(redisClient
*c
);
486 static void queueMultiCommand(redisClient
*c
, struct redisCommand
*cmd
);
487 static void unblockClient(redisClient
*c
);
488 static int handleClientsWaitingListPush(redisClient
*c
, robj
*key
, robj
*ele
);
489 static void vmInit(void);
491 static void authCommand(redisClient
*c
);
492 static void pingCommand(redisClient
*c
);
493 static void echoCommand(redisClient
*c
);
494 static void setCommand(redisClient
*c
);
495 static void setnxCommand(redisClient
*c
);
496 static void getCommand(redisClient
*c
);
497 static void delCommand(redisClient
*c
);
498 static void existsCommand(redisClient
*c
);
499 static void incrCommand(redisClient
*c
);
500 static void decrCommand(redisClient
*c
);
501 static void incrbyCommand(redisClient
*c
);
502 static void decrbyCommand(redisClient
*c
);
503 static void selectCommand(redisClient
*c
);
504 static void randomkeyCommand(redisClient
*c
);
505 static void keysCommand(redisClient
*c
);
506 static void dbsizeCommand(redisClient
*c
);
507 static void lastsaveCommand(redisClient
*c
);
508 static void saveCommand(redisClient
*c
);
509 static void bgsaveCommand(redisClient
*c
);
510 static void bgrewriteaofCommand(redisClient
*c
);
511 static void shutdownCommand(redisClient
*c
);
512 static void moveCommand(redisClient
*c
);
513 static void renameCommand(redisClient
*c
);
514 static void renamenxCommand(redisClient
*c
);
515 static void lpushCommand(redisClient
*c
);
516 static void rpushCommand(redisClient
*c
);
517 static void lpopCommand(redisClient
*c
);
518 static void rpopCommand(redisClient
*c
);
519 static void llenCommand(redisClient
*c
);
520 static void lindexCommand(redisClient
*c
);
521 static void lrangeCommand(redisClient
*c
);
522 static void ltrimCommand(redisClient
*c
);
523 static void typeCommand(redisClient
*c
);
524 static void lsetCommand(redisClient
*c
);
525 static void saddCommand(redisClient
*c
);
526 static void sremCommand(redisClient
*c
);
527 static void smoveCommand(redisClient
*c
);
528 static void sismemberCommand(redisClient
*c
);
529 static void scardCommand(redisClient
*c
);
530 static void spopCommand(redisClient
*c
);
531 static void srandmemberCommand(redisClient
*c
);
532 static void sinterCommand(redisClient
*c
);
533 static void sinterstoreCommand(redisClient
*c
);
534 static void sunionCommand(redisClient
*c
);
535 static void sunionstoreCommand(redisClient
*c
);
536 static void sdiffCommand(redisClient
*c
);
537 static void sdiffstoreCommand(redisClient
*c
);
538 static void syncCommand(redisClient
*c
);
539 static void flushdbCommand(redisClient
*c
);
540 static void flushallCommand(redisClient
*c
);
541 static void sortCommand(redisClient
*c
);
542 static void lremCommand(redisClient
*c
);
543 static void rpoplpushcommand(redisClient
*c
);
544 static void infoCommand(redisClient
*c
);
545 static void mgetCommand(redisClient
*c
);
546 static void monitorCommand(redisClient
*c
);
547 static void expireCommand(redisClient
*c
);
548 static void expireatCommand(redisClient
*c
);
549 static void getsetCommand(redisClient
*c
);
550 static void ttlCommand(redisClient
*c
);
551 static void slaveofCommand(redisClient
*c
);
552 static void debugCommand(redisClient
*c
);
553 static void msetCommand(redisClient
*c
);
554 static void msetnxCommand(redisClient
*c
);
555 static void zaddCommand(redisClient
*c
);
556 static void zincrbyCommand(redisClient
*c
);
557 static void zrangeCommand(redisClient
*c
);
558 static void zrangebyscoreCommand(redisClient
*c
);
559 static void zrevrangeCommand(redisClient
*c
);
560 static void zcardCommand(redisClient
*c
);
561 static void zremCommand(redisClient
*c
);
562 static void zscoreCommand(redisClient
*c
);
563 static void zremrangebyscoreCommand(redisClient
*c
);
564 static void multiCommand(redisClient
*c
);
565 static void execCommand(redisClient
*c
);
566 static void blpopCommand(redisClient
*c
);
567 static void brpopCommand(redisClient
*c
);
569 /*================================= Globals ================================= */
572 static struct redisServer server
; /* server global state */
573 static struct redisCommand cmdTable
[] = {
574 {"get",getCommand
,2,REDIS_CMD_INLINE
},
575 {"set",setCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
576 {"setnx",setnxCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
577 {"del",delCommand
,-2,REDIS_CMD_INLINE
},
578 {"exists",existsCommand
,2,REDIS_CMD_INLINE
},
579 {"incr",incrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
580 {"decr",decrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
581 {"mget",mgetCommand
,-2,REDIS_CMD_INLINE
},
582 {"rpush",rpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
583 {"lpush",lpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
584 {"rpop",rpopCommand
,2,REDIS_CMD_INLINE
},
585 {"lpop",lpopCommand
,2,REDIS_CMD_INLINE
},
586 {"brpop",brpopCommand
,-3,REDIS_CMD_INLINE
},
587 {"blpop",blpopCommand
,-3,REDIS_CMD_INLINE
},
588 {"llen",llenCommand
,2,REDIS_CMD_INLINE
},
589 {"lindex",lindexCommand
,3,REDIS_CMD_INLINE
},
590 {"lset",lsetCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
591 {"lrange",lrangeCommand
,4,REDIS_CMD_INLINE
},
592 {"ltrim",ltrimCommand
,4,REDIS_CMD_INLINE
},
593 {"lrem",lremCommand
,4,REDIS_CMD_BULK
},
594 {"rpoplpush",rpoplpushcommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
595 {"sadd",saddCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
596 {"srem",sremCommand
,3,REDIS_CMD_BULK
},
597 {"smove",smoveCommand
,4,REDIS_CMD_BULK
},
598 {"sismember",sismemberCommand
,3,REDIS_CMD_BULK
},
599 {"scard",scardCommand
,2,REDIS_CMD_INLINE
},
600 {"spop",spopCommand
,2,REDIS_CMD_INLINE
},
601 {"srandmember",srandmemberCommand
,2,REDIS_CMD_INLINE
},
602 {"sinter",sinterCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
603 {"sinterstore",sinterstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
604 {"sunion",sunionCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
605 {"sunionstore",sunionstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
606 {"sdiff",sdiffCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
607 {"sdiffstore",sdiffstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
608 {"smembers",sinterCommand
,2,REDIS_CMD_INLINE
},
609 {"zadd",zaddCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
610 {"zincrby",zincrbyCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
611 {"zrem",zremCommand
,3,REDIS_CMD_BULK
},
612 {"zremrangebyscore",zremrangebyscoreCommand
,4,REDIS_CMD_INLINE
},
613 {"zrange",zrangeCommand
,-4,REDIS_CMD_INLINE
},
614 {"zrangebyscore",zrangebyscoreCommand
,-4,REDIS_CMD_INLINE
},
615 {"zrevrange",zrevrangeCommand
,-4,REDIS_CMD_INLINE
},
616 {"zcard",zcardCommand
,2,REDIS_CMD_INLINE
},
617 {"zscore",zscoreCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
618 {"incrby",incrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
619 {"decrby",decrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
620 {"getset",getsetCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
621 {"mset",msetCommand
,-3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
622 {"msetnx",msetnxCommand
,-3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
623 {"randomkey",randomkeyCommand
,1,REDIS_CMD_INLINE
},
624 {"select",selectCommand
,2,REDIS_CMD_INLINE
},
625 {"move",moveCommand
,3,REDIS_CMD_INLINE
},
626 {"rename",renameCommand
,3,REDIS_CMD_INLINE
},
627 {"renamenx",renamenxCommand
,3,REDIS_CMD_INLINE
},
628 {"expire",expireCommand
,3,REDIS_CMD_INLINE
},
629 {"expireat",expireatCommand
,3,REDIS_CMD_INLINE
},
630 {"keys",keysCommand
,2,REDIS_CMD_INLINE
},
631 {"dbsize",dbsizeCommand
,1,REDIS_CMD_INLINE
},
632 {"auth",authCommand
,2,REDIS_CMD_INLINE
},
633 {"ping",pingCommand
,1,REDIS_CMD_INLINE
},
634 {"echo",echoCommand
,2,REDIS_CMD_BULK
},
635 {"save",saveCommand
,1,REDIS_CMD_INLINE
},
636 {"bgsave",bgsaveCommand
,1,REDIS_CMD_INLINE
},
637 {"bgrewriteaof",bgrewriteaofCommand
,1,REDIS_CMD_INLINE
},
638 {"shutdown",shutdownCommand
,1,REDIS_CMD_INLINE
},
639 {"lastsave",lastsaveCommand
,1,REDIS_CMD_INLINE
},
640 {"type",typeCommand
,2,REDIS_CMD_INLINE
},
641 {"multi",multiCommand
,1,REDIS_CMD_INLINE
},
642 {"exec",execCommand
,1,REDIS_CMD_INLINE
},
643 {"sync",syncCommand
,1,REDIS_CMD_INLINE
},
644 {"flushdb",flushdbCommand
,1,REDIS_CMD_INLINE
},
645 {"flushall",flushallCommand
,1,REDIS_CMD_INLINE
},
646 {"sort",sortCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
647 {"info",infoCommand
,1,REDIS_CMD_INLINE
},
648 {"monitor",monitorCommand
,1,REDIS_CMD_INLINE
},
649 {"ttl",ttlCommand
,2,REDIS_CMD_INLINE
},
650 {"slaveof",slaveofCommand
,3,REDIS_CMD_INLINE
},
651 {"debug",debugCommand
,-2,REDIS_CMD_INLINE
},
655 /*============================ Utility functions ============================ */
657 /* Glob-style pattern matching. */
658 int stringmatchlen(const char *pattern
, int patternLen
,
659 const char *string
, int stringLen
, int nocase
)
664 while (pattern
[1] == '*') {
669 return 1; /* match */
671 if (stringmatchlen(pattern
+1, patternLen
-1,
672 string
, stringLen
, nocase
))
673 return 1; /* match */
677 return 0; /* no match */
681 return 0; /* no match */
691 not = pattern
[0] == '^';
698 if (pattern
[0] == '\\') {
701 if (pattern
[0] == string
[0])
703 } else if (pattern
[0] == ']') {
705 } else if (patternLen
== 0) {
709 } else if (pattern
[1] == '-' && patternLen
>= 3) {
710 int start
= pattern
[0];
711 int end
= pattern
[2];
719 start
= tolower(start
);
725 if (c
>= start
&& c
<= end
)
729 if (pattern
[0] == string
[0])
732 if (tolower((int)pattern
[0]) == tolower((int)string
[0]))
742 return 0; /* no match */
748 if (patternLen
>= 2) {
755 if (pattern
[0] != string
[0])
756 return 0; /* no match */
758 if (tolower((int)pattern
[0]) != tolower((int)string
[0]))
759 return 0; /* no match */
767 if (stringLen
== 0) {
768 while(*pattern
== '*') {
775 if (patternLen
== 0 && stringLen
== 0)
780 static void redisLog(int level
, const char *fmt
, ...) {
784 fp
= (server
.logfile
== NULL
) ? stdout
: fopen(server
.logfile
,"a");
788 if (level
>= server
.verbosity
) {
794 strftime(buf
,64,"%d %b %H:%M:%S",localtime(&now
));
795 fprintf(fp
,"%s %c ",buf
,c
[level
]);
796 vfprintf(fp
, fmt
, ap
);
802 if (server
.logfile
) fclose(fp
);
805 /*====================== Hash table type implementation ==================== */
807 /* This is an hash table type that uses the SDS dynamic strings libary as
808 * keys and radis objects as values (objects can hold SDS strings,
811 static void dictVanillaFree(void *privdata
, void *val
)
813 DICT_NOTUSED(privdata
);
817 static void dictListDestructor(void *privdata
, void *val
)
819 DICT_NOTUSED(privdata
);
820 listRelease((list
*)val
);
823 static int sdsDictKeyCompare(void *privdata
, const void *key1
,
827 DICT_NOTUSED(privdata
);
829 l1
= sdslen((sds
)key1
);
830 l2
= sdslen((sds
)key2
);
831 if (l1
!= l2
) return 0;
832 return memcmp(key1
, key2
, l1
) == 0;
835 static void dictRedisObjectDestructor(void *privdata
, void *val
)
837 DICT_NOTUSED(privdata
);
842 static int dictObjKeyCompare(void *privdata
, const void *key1
,
845 const robj
*o1
= key1
, *o2
= key2
;
846 return sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
849 static unsigned int dictObjHash(const void *key
) {
851 return dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
854 static int dictEncObjKeyCompare(void *privdata
, const void *key1
,
857 robj
*o1
= (robj
*) key1
, *o2
= (robj
*) key2
;
860 o1
= getDecodedObject(o1
);
861 o2
= getDecodedObject(o2
);
862 cmp
= sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
868 static unsigned int dictEncObjHash(const void *key
) {
869 robj
*o
= (robj
*) key
;
871 o
= getDecodedObject(o
);
872 unsigned int hash
= dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
877 static dictType setDictType
= {
878 dictEncObjHash
, /* hash function */
881 dictEncObjKeyCompare
, /* key compare */
882 dictRedisObjectDestructor
, /* key destructor */
883 NULL
/* val destructor */
886 static dictType zsetDictType
= {
887 dictEncObjHash
, /* hash function */
890 dictEncObjKeyCompare
, /* key compare */
891 dictRedisObjectDestructor
, /* key destructor */
892 dictVanillaFree
/* val destructor of malloc(sizeof(double)) */
895 static dictType hashDictType
= {
896 dictObjHash
, /* hash function */
899 dictObjKeyCompare
, /* key compare */
900 dictRedisObjectDestructor
, /* key destructor */
901 dictRedisObjectDestructor
/* val destructor */
904 /* Keylist hash table type has unencoded redis objects as keys and
905 * lists as values. It's used for blocking operations (BLPOP) */
906 static dictType keylistDictType
= {
907 dictObjHash
, /* hash function */
910 dictObjKeyCompare
, /* key compare */
911 dictRedisObjectDestructor
, /* key destructor */
912 dictListDestructor
/* val destructor */
915 /* ========================= Random utility functions ======================= */
917 /* Redis generally does not try to recover from out of memory conditions
918 * when allocating objects or strings, it is not clear if it will be possible
919 * to report this condition to the client since the networking layer itself
920 * is based on heap allocation for send buffers, so we simply abort.
921 * At least the code will be simpler to read... */
922 static void oom(const char *msg
) {
923 redisLog(REDIS_WARNING
, "%s: Out of memory\n",msg
);
928 /* ====================== Redis server networking stuff ===================== */
929 static void closeTimedoutClients(void) {
932 time_t now
= time(NULL
);
934 listRewind(server
.clients
);
935 while ((ln
= listYield(server
.clients
)) != NULL
) {
936 c
= listNodeValue(ln
);
937 if (server
.maxidletime
&&
938 !(c
->flags
& REDIS_SLAVE
) && /* no timeout for slaves */
939 !(c
->flags
& REDIS_MASTER
) && /* no timeout for masters */
940 (now
- c
->lastinteraction
> server
.maxidletime
))
942 redisLog(REDIS_DEBUG
,"Closing idle client");
944 } else if (c
->flags
& REDIS_BLOCKED
) {
945 if (c
->blockingto
!= 0 && c
->blockingto
< now
) {
946 addReply(c
,shared
.nullmultibulk
);
953 static int htNeedsResize(dict
*dict
) {
954 long long size
, used
;
956 size
= dictSlots(dict
);
957 used
= dictSize(dict
);
958 return (size
&& used
&& size
> DICT_HT_INITIAL_SIZE
&&
959 (used
*100/size
< REDIS_HT_MINFILL
));
962 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
963 * we resize the hash table to save memory */
964 static void tryResizeHashTables(void) {
967 for (j
= 0; j
< server
.dbnum
; j
++) {
968 if (htNeedsResize(server
.db
[j
].dict
)) {
969 redisLog(REDIS_DEBUG
,"The hash table %d is too sparse, resize it...",j
);
970 dictResize(server
.db
[j
].dict
);
971 redisLog(REDIS_DEBUG
,"Hash table %d resized.",j
);
973 if (htNeedsResize(server
.db
[j
].expires
))
974 dictResize(server
.db
[j
].expires
);
978 /* A background saving child (BGSAVE) terminated its work. Handle this. */
979 void backgroundSaveDoneHandler(int statloc
) {
980 int exitcode
= WEXITSTATUS(statloc
);
981 int bysignal
= WIFSIGNALED(statloc
);
983 if (!bysignal
&& exitcode
== 0) {
984 redisLog(REDIS_NOTICE
,
985 "Background saving terminated with success");
987 server
.lastsave
= time(NULL
);
988 } else if (!bysignal
&& exitcode
!= 0) {
989 redisLog(REDIS_WARNING
, "Background saving error");
991 redisLog(REDIS_WARNING
,
992 "Background saving terminated by signal");
993 rdbRemoveTempFile(server
.bgsavechildpid
);
995 server
.bgsavechildpid
= -1;
996 /* Possibly there are slaves waiting for a BGSAVE in order to be served
997 * (the first stage of SYNC is a bulk transfer of dump.rdb) */
998 updateSlavesWaitingBgsave(exitcode
== 0 ? REDIS_OK
: REDIS_ERR
);
1001 /* A background append only file rewriting (BGREWRITEAOF) terminated its work.
1003 void backgroundRewriteDoneHandler(int statloc
) {
1004 int exitcode
= WEXITSTATUS(statloc
);
1005 int bysignal
= WIFSIGNALED(statloc
);
1007 if (!bysignal
&& exitcode
== 0) {
1011 redisLog(REDIS_NOTICE
,
1012 "Background append only file rewriting terminated with success");
1013 /* Now it's time to flush the differences accumulated by the parent */
1014 snprintf(tmpfile
,256,"temp-rewriteaof-bg-%d.aof", (int) server
.bgrewritechildpid
);
1015 fd
= open(tmpfile
,O_WRONLY
|O_APPEND
);
1017 redisLog(REDIS_WARNING
, "Not able to open the temp append only file produced by the child: %s", strerror(errno
));
1020 /* Flush our data... */
1021 if (write(fd
,server
.bgrewritebuf
,sdslen(server
.bgrewritebuf
)) !=
1022 (signed) sdslen(server
.bgrewritebuf
)) {
1023 redisLog(REDIS_WARNING
, "Error or short write trying to flush the parent diff of the append log file in the child temp file: %s", strerror(errno
));
1027 redisLog(REDIS_NOTICE
,"Parent diff flushed into the new append log file with success (%lu bytes)",sdslen(server
.bgrewritebuf
));
1028 /* Now our work is to rename the temp file into the stable file. And
1029 * switch the file descriptor used by the server for append only. */
1030 if (rename(tmpfile
,server
.appendfilename
) == -1) {
1031 redisLog(REDIS_WARNING
,"Can't rename the temp append only file into the stable one: %s", strerror(errno
));
1035 /* Mission completed... almost */
1036 redisLog(REDIS_NOTICE
,"Append only file successfully rewritten.");
1037 if (server
.appendfd
!= -1) {
1038 /* If append only is actually enabled... */
1039 close(server
.appendfd
);
1040 server
.appendfd
= fd
;
1042 server
.appendseldb
= -1; /* Make sure it will issue SELECT */
1043 redisLog(REDIS_NOTICE
,"The new append only file was selected for future appends.");
1045 /* If append only is disabled we just generate a dump in this
1046 * format. Why not? */
1049 } else if (!bysignal
&& exitcode
!= 0) {
1050 redisLog(REDIS_WARNING
, "Background append only file rewriting error");
1052 redisLog(REDIS_WARNING
,
1053 "Background append only file rewriting terminated by signal");
1056 sdsfree(server
.bgrewritebuf
);
1057 server
.bgrewritebuf
= sdsempty();
1058 aofRemoveTempFile(server
.bgrewritechildpid
);
1059 server
.bgrewritechildpid
= -1;
1062 static int serverCron(struct aeEventLoop
*eventLoop
, long long id
, void *clientData
) {
1063 int j
, loops
= server
.cronloops
++;
1064 REDIS_NOTUSED(eventLoop
);
1066 REDIS_NOTUSED(clientData
);
1068 /* We take a cached value of the unix time in the global state because
1069 * with virtual memory and aging there is to store the current time
1070 * in objects at every object access, and accuracy is not needed.
1071 * To access a global var is faster than calling time(NULL) */
1072 server
.unixtime
= time(NULL
);
1074 /* Update the global state with the amount of used memory */
1075 server
.usedmemory
= zmalloc_used_memory();
1077 /* Show some info about non-empty databases */
1078 for (j
= 0; j
< server
.dbnum
; j
++) {
1079 long long size
, used
, vkeys
;
1081 size
= dictSlots(server
.db
[j
].dict
);
1082 used
= dictSize(server
.db
[j
].dict
);
1083 vkeys
= dictSize(server
.db
[j
].expires
);
1084 if (!(loops
% 5) && (used
|| vkeys
)) {
1085 redisLog(REDIS_DEBUG
,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j
,used
,vkeys
,size
);
1086 /* dictPrintStats(server.dict); */
1090 /* We don't want to resize the hash tables while a bacground saving
1091 * is in progress: the saving child is created using fork() that is
1092 * implemented with a copy-on-write semantic in most modern systems, so
1093 * if we resize the HT while there is the saving child at work actually
1094 * a lot of memory movements in the parent will cause a lot of pages
1096 if (server
.bgsavechildpid
== -1) tryResizeHashTables();
1098 /* Show information about connected clients */
1100 redisLog(REDIS_DEBUG
,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
1101 listLength(server
.clients
)-listLength(server
.slaves
),
1102 listLength(server
.slaves
),
1104 dictSize(server
.sharingpool
));
1107 /* Close connections of timedout clients */
1108 if ((server
.maxidletime
&& !(loops
% 10)) || server
.blockedclients
)
1109 closeTimedoutClients();
1111 /* Check if a background saving or AOF rewrite in progress terminated */
1112 if (server
.bgsavechildpid
!= -1 || server
.bgrewritechildpid
!= -1) {
1116 if ((pid
= wait3(&statloc
,WNOHANG
,NULL
)) != 0) {
1117 if (pid
== server
.bgsavechildpid
) {
1118 backgroundSaveDoneHandler(statloc
);
1120 backgroundRewriteDoneHandler(statloc
);
1124 /* If there is not a background saving in progress check if
1125 * we have to save now */
1126 time_t now
= time(NULL
);
1127 for (j
= 0; j
< server
.saveparamslen
; j
++) {
1128 struct saveparam
*sp
= server
.saveparams
+j
;
1130 if (server
.dirty
>= sp
->changes
&&
1131 now
-server
.lastsave
> sp
->seconds
) {
1132 redisLog(REDIS_NOTICE
,"%d changes in %d seconds. Saving...",
1133 sp
->changes
, sp
->seconds
);
1134 rdbSaveBackground(server
.dbfilename
);
1140 /* Try to expire a few timed out keys. The algorithm used is adaptive and
1141 * will use few CPU cycles if there are few expiring keys, otherwise
1142 * it will get more aggressive to avoid that too much memory is used by
1143 * keys that can be removed from the keyspace. */
1144 for (j
= 0; j
< server
.dbnum
; j
++) {
1146 redisDb
*db
= server
.db
+j
;
1148 /* Continue to expire if at the end of the cycle more than 25%
1149 * of the keys were expired. */
1151 int num
= dictSize(db
->expires
);
1152 time_t now
= time(NULL
);
1155 if (num
> REDIS_EXPIRELOOKUPS_PER_CRON
)
1156 num
= REDIS_EXPIRELOOKUPS_PER_CRON
;
1161 if ((de
= dictGetRandomKey(db
->expires
)) == NULL
) break;
1162 t
= (time_t) dictGetEntryVal(de
);
1164 deleteKey(db
,dictGetEntryKey(de
));
1168 } while (expired
> REDIS_EXPIRELOOKUPS_PER_CRON
/4);
1171 /* Check if we should connect to a MASTER */
1172 if (server
.replstate
== REDIS_REPL_CONNECT
) {
1173 redisLog(REDIS_NOTICE
,"Connecting to MASTER...");
1174 if (syncWithMaster() == REDIS_OK
) {
1175 redisLog(REDIS_NOTICE
,"MASTER <-> SLAVE sync succeeded");
1181 static void createSharedObjects(void) {
1182 shared
.crlf
= createObject(REDIS_STRING
,sdsnew("\r\n"));
1183 shared
.ok
= createObject(REDIS_STRING
,sdsnew("+OK\r\n"));
1184 shared
.err
= createObject(REDIS_STRING
,sdsnew("-ERR\r\n"));
1185 shared
.emptybulk
= createObject(REDIS_STRING
,sdsnew("$0\r\n\r\n"));
1186 shared
.czero
= createObject(REDIS_STRING
,sdsnew(":0\r\n"));
1187 shared
.cone
= createObject(REDIS_STRING
,sdsnew(":1\r\n"));
1188 shared
.nullbulk
= createObject(REDIS_STRING
,sdsnew("$-1\r\n"));
1189 shared
.nullmultibulk
= createObject(REDIS_STRING
,sdsnew("*-1\r\n"));
1190 shared
.emptymultibulk
= createObject(REDIS_STRING
,sdsnew("*0\r\n"));
1191 shared
.pong
= createObject(REDIS_STRING
,sdsnew("+PONG\r\n"));
1192 shared
.queued
= createObject(REDIS_STRING
,sdsnew("+QUEUED\r\n"));
1193 shared
.wrongtypeerr
= createObject(REDIS_STRING
,sdsnew(
1194 "-ERR Operation against a key holding the wrong kind of value\r\n"));
1195 shared
.nokeyerr
= createObject(REDIS_STRING
,sdsnew(
1196 "-ERR no such key\r\n"));
1197 shared
.syntaxerr
= createObject(REDIS_STRING
,sdsnew(
1198 "-ERR syntax error\r\n"));
1199 shared
.sameobjecterr
= createObject(REDIS_STRING
,sdsnew(
1200 "-ERR source and destination objects are the same\r\n"));
1201 shared
.outofrangeerr
= createObject(REDIS_STRING
,sdsnew(
1202 "-ERR index out of range\r\n"));
1203 shared
.space
= createObject(REDIS_STRING
,sdsnew(" "));
1204 shared
.colon
= createObject(REDIS_STRING
,sdsnew(":"));
1205 shared
.plus
= createObject(REDIS_STRING
,sdsnew("+"));
1206 shared
.select0
= createStringObject("select 0\r\n",10);
1207 shared
.select1
= createStringObject("select 1\r\n",10);
1208 shared
.select2
= createStringObject("select 2\r\n",10);
1209 shared
.select3
= createStringObject("select 3\r\n",10);
1210 shared
.select4
= createStringObject("select 4\r\n",10);
1211 shared
.select5
= createStringObject("select 5\r\n",10);
1212 shared
.select6
= createStringObject("select 6\r\n",10);
1213 shared
.select7
= createStringObject("select 7\r\n",10);
1214 shared
.select8
= createStringObject("select 8\r\n",10);
1215 shared
.select9
= createStringObject("select 9\r\n",10);
1218 static void appendServerSaveParams(time_t seconds
, int changes
) {
1219 server
.saveparams
= zrealloc(server
.saveparams
,sizeof(struct saveparam
)*(server
.saveparamslen
+1));
1220 server
.saveparams
[server
.saveparamslen
].seconds
= seconds
;
1221 server
.saveparams
[server
.saveparamslen
].changes
= changes
;
1222 server
.saveparamslen
++;
1225 static void resetServerSaveParams() {
1226 zfree(server
.saveparams
);
1227 server
.saveparams
= NULL
;
1228 server
.saveparamslen
= 0;
1231 static void initServerConfig() {
1232 server
.dbnum
= REDIS_DEFAULT_DBNUM
;
1233 server
.port
= REDIS_SERVERPORT
;
1234 server
.verbosity
= REDIS_DEBUG
;
1235 server
.maxidletime
= REDIS_MAXIDLETIME
;
1236 server
.saveparams
= NULL
;
1237 server
.logfile
= NULL
; /* NULL = log on standard output */
1238 server
.bindaddr
= NULL
;
1239 server
.glueoutputbuf
= 1;
1240 server
.daemonize
= 0;
1241 server
.appendonly
= 0;
1242 server
.appendfsync
= APPENDFSYNC_ALWAYS
;
1243 server
.lastfsync
= time(NULL
);
1244 server
.appendfd
= -1;
1245 server
.appendseldb
= -1; /* Make sure the first time will not match */
1246 server
.pidfile
= "/var/run/redis.pid";
1247 server
.dbfilename
= "dump.rdb";
1248 server
.appendfilename
= "appendonly.aof";
1249 server
.requirepass
= NULL
;
1250 server
.shareobjects
= 0;
1251 server
.rdbcompression
= 1;
1252 server
.sharingpoolsize
= 1024;
1253 server
.maxclients
= 0;
1254 server
.blockedclients
= 0;
1255 server
.maxmemory
= 0;
1256 server
.vm_enabled
= 0;
1257 server
.vm_page_size
= 256; /* 256 bytes per page */
1258 server
.vm_pages
= 1024*1024*100; /* 104 millions of pages */
1259 server
.vm_max_memory
= 1024LL*1024*1024*1; /* 1 GB of RAM */
1261 resetServerSaveParams();
1263 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1264 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1265 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1266 /* Replication related */
1268 server
.masterauth
= NULL
;
1269 server
.masterhost
= NULL
;
1270 server
.masterport
= 6379;
1271 server
.master
= NULL
;
1272 server
.replstate
= REDIS_REPL_NONE
;
1274 /* Double constants initialization */
1276 R_PosInf
= 1.0/R_Zero
;
1277 R_NegInf
= -1.0/R_Zero
;
1278 R_Nan
= R_Zero
/R_Zero
;
1281 static void initServer() {
1284 signal(SIGHUP
, SIG_IGN
);
1285 signal(SIGPIPE
, SIG_IGN
);
1286 setupSigSegvAction();
1288 server
.clients
= listCreate();
1289 server
.slaves
= listCreate();
1290 server
.monitors
= listCreate();
1291 server
.objfreelist
= listCreate();
1292 createSharedObjects();
1293 server
.el
= aeCreateEventLoop();
1294 server
.db
= zmalloc(sizeof(redisDb
)*server
.dbnum
);
1295 server
.sharingpool
= dictCreate(&setDictType
,NULL
);
1296 server
.fd
= anetTcpServer(server
.neterr
, server
.port
, server
.bindaddr
);
1297 if (server
.fd
== -1) {
1298 redisLog(REDIS_WARNING
, "Opening TCP port: %s", server
.neterr
);
1301 for (j
= 0; j
< server
.dbnum
; j
++) {
1302 server
.db
[j
].dict
= dictCreate(&hashDictType
,NULL
);
1303 server
.db
[j
].expires
= dictCreate(&setDictType
,NULL
);
1304 server
.db
[j
].blockingkeys
= dictCreate(&keylistDictType
,NULL
);
1305 server
.db
[j
].id
= j
;
1307 server
.cronloops
= 0;
1308 server
.bgsavechildpid
= -1;
1309 server
.bgrewritechildpid
= -1;
1310 server
.bgrewritebuf
= sdsempty();
1311 server
.lastsave
= time(NULL
);
1313 server
.usedmemory
= 0;
1314 server
.stat_numcommands
= 0;
1315 server
.stat_numconnections
= 0;
1316 server
.stat_starttime
= time(NULL
);
1317 server
.unixtime
= time(NULL
);
1318 aeCreateTimeEvent(server
.el
, 1, serverCron
, NULL
, NULL
);
1320 if (server
.appendonly
) {
1321 server
.appendfd
= open(server
.appendfilename
,O_WRONLY
|O_APPEND
|O_CREAT
,0644);
1322 if (server
.appendfd
== -1) {
1323 redisLog(REDIS_WARNING
, "Can't open the append-only file: %s",
1329 if (server
.vm_enabled
) vmInit();
1332 /* Empty the whole database */
1333 static long long emptyDb() {
1335 long long removed
= 0;
1337 for (j
= 0; j
< server
.dbnum
; j
++) {
1338 removed
+= dictSize(server
.db
[j
].dict
);
1339 dictEmpty(server
.db
[j
].dict
);
1340 dictEmpty(server
.db
[j
].expires
);
1345 static int yesnotoi(char *s
) {
1346 if (!strcasecmp(s
,"yes")) return 1;
1347 else if (!strcasecmp(s
,"no")) return 0;
1351 /* I agree, this is a very rudimental way to load a configuration...
1352 will improve later if the config gets more complex */
1353 static void loadServerConfig(char *filename
) {
1355 char buf
[REDIS_CONFIGLINE_MAX
+1], *err
= NULL
;
1359 if (filename
[0] == '-' && filename
[1] == '\0')
1362 if ((fp
= fopen(filename
,"r")) == NULL
) {
1363 redisLog(REDIS_WARNING
,"Fatal error, can't open config file");
1368 while(fgets(buf
,REDIS_CONFIGLINE_MAX
+1,fp
) != NULL
) {
1374 line
= sdstrim(line
," \t\r\n");
1376 /* Skip comments and blank lines*/
1377 if (line
[0] == '#' || line
[0] == '\0') {
1382 /* Split into arguments */
1383 argv
= sdssplitlen(line
,sdslen(line
)," ",1,&argc
);
1384 sdstolower(argv
[0]);
1386 /* Execute config directives */
1387 if (!strcasecmp(argv
[0],"timeout") && argc
== 2) {
1388 server
.maxidletime
= atoi(argv
[1]);
1389 if (server
.maxidletime
< 0) {
1390 err
= "Invalid timeout value"; goto loaderr
;
1392 } else if (!strcasecmp(argv
[0],"port") && argc
== 2) {
1393 server
.port
= atoi(argv
[1]);
1394 if (server
.port
< 1 || server
.port
> 65535) {
1395 err
= "Invalid port"; goto loaderr
;
1397 } else if (!strcasecmp(argv
[0],"bind") && argc
== 2) {
1398 server
.bindaddr
= zstrdup(argv
[1]);
1399 } else if (!strcasecmp(argv
[0],"save") && argc
== 3) {
1400 int seconds
= atoi(argv
[1]);
1401 int changes
= atoi(argv
[2]);
1402 if (seconds
< 1 || changes
< 0) {
1403 err
= "Invalid save parameters"; goto loaderr
;
1405 appendServerSaveParams(seconds
,changes
);
1406 } else if (!strcasecmp(argv
[0],"dir") && argc
== 2) {
1407 if (chdir(argv
[1]) == -1) {
1408 redisLog(REDIS_WARNING
,"Can't chdir to '%s': %s",
1409 argv
[1], strerror(errno
));
1412 } else if (!strcasecmp(argv
[0],"loglevel") && argc
== 2) {
1413 if (!strcasecmp(argv
[1],"debug")) server
.verbosity
= REDIS_DEBUG
;
1414 else if (!strcasecmp(argv
[1],"notice")) server
.verbosity
= REDIS_NOTICE
;
1415 else if (!strcasecmp(argv
[1],"warning")) server
.verbosity
= REDIS_WARNING
;
1417 err
= "Invalid log level. Must be one of debug, notice, warning";
1420 } else if (!strcasecmp(argv
[0],"logfile") && argc
== 2) {
1423 server
.logfile
= zstrdup(argv
[1]);
1424 if (!strcasecmp(server
.logfile
,"stdout")) {
1425 zfree(server
.logfile
);
1426 server
.logfile
= NULL
;
1428 if (server
.logfile
) {
1429 /* Test if we are able to open the file. The server will not
1430 * be able to abort just for this problem later... */
1431 logfp
= fopen(server
.logfile
,"a");
1432 if (logfp
== NULL
) {
1433 err
= sdscatprintf(sdsempty(),
1434 "Can't open the log file: %s", strerror(errno
));
1439 } else if (!strcasecmp(argv
[0],"databases") && argc
== 2) {
1440 server
.dbnum
= atoi(argv
[1]);
1441 if (server
.dbnum
< 1) {
1442 err
= "Invalid number of databases"; goto loaderr
;
1444 } else if (!strcasecmp(argv
[0],"maxclients") && argc
== 2) {
1445 server
.maxclients
= atoi(argv
[1]);
1446 } else if (!strcasecmp(argv
[0],"maxmemory") && argc
== 2) {
1447 server
.maxmemory
= strtoll(argv
[1], NULL
, 10);
1448 } else if (!strcasecmp(argv
[0],"slaveof") && argc
== 3) {
1449 server
.masterhost
= sdsnew(argv
[1]);
1450 server
.masterport
= atoi(argv
[2]);
1451 server
.replstate
= REDIS_REPL_CONNECT
;
1452 } else if (!strcasecmp(argv
[0],"masterauth") && argc
== 2) {
1453 server
.masterauth
= zstrdup(argv
[1]);
1454 } else if (!strcasecmp(argv
[0],"glueoutputbuf") && argc
== 2) {
1455 if ((server
.glueoutputbuf
= yesnotoi(argv
[1])) == -1) {
1456 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1458 } else if (!strcasecmp(argv
[0],"shareobjects") && argc
== 2) {
1459 if ((server
.shareobjects
= yesnotoi(argv
[1])) == -1) {
1460 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1462 } else if (!strcasecmp(argv
[0],"rdbcompression") && argc
== 2) {
1463 if ((server
.rdbcompression
= yesnotoi(argv
[1])) == -1) {
1464 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1466 } else if (!strcasecmp(argv
[0],"shareobjectspoolsize") && argc
== 2) {
1467 server
.sharingpoolsize
= atoi(argv
[1]);
1468 if (server
.sharingpoolsize
< 1) {
1469 err
= "invalid object sharing pool size"; goto loaderr
;
1471 } else if (!strcasecmp(argv
[0],"daemonize") && argc
== 2) {
1472 if ((server
.daemonize
= yesnotoi(argv
[1])) == -1) {
1473 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1475 } else if (!strcasecmp(argv
[0],"appendonly") && argc
== 2) {
1476 if ((server
.appendonly
= yesnotoi(argv
[1])) == -1) {
1477 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1479 } else if (!strcasecmp(argv
[0],"appendfsync") && argc
== 2) {
1480 if (!strcasecmp(argv
[1],"no")) {
1481 server
.appendfsync
= APPENDFSYNC_NO
;
1482 } else if (!strcasecmp(argv
[1],"always")) {
1483 server
.appendfsync
= APPENDFSYNC_ALWAYS
;
1484 } else if (!strcasecmp(argv
[1],"everysec")) {
1485 server
.appendfsync
= APPENDFSYNC_EVERYSEC
;
1487 err
= "argument must be 'no', 'always' or 'everysec'";
1490 } else if (!strcasecmp(argv
[0],"requirepass") && argc
== 2) {
1491 server
.requirepass
= zstrdup(argv
[1]);
1492 } else if (!strcasecmp(argv
[0],"pidfile") && argc
== 2) {
1493 server
.pidfile
= zstrdup(argv
[1]);
1494 } else if (!strcasecmp(argv
[0],"dbfilename") && argc
== 2) {
1495 server
.dbfilename
= zstrdup(argv
[1]);
1496 } else if (!strcasecmp(argv
[0],"vm-enabled") && argc
== 2) {
1497 if ((server
.vm_enabled
= yesnotoi(argv
[1])) == -1) {
1498 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1501 err
= "Bad directive or wrong number of arguments"; goto loaderr
;
1503 for (j
= 0; j
< argc
; j
++)
1508 if (fp
!= stdin
) fclose(fp
);
1512 fprintf(stderr
, "\n*** FATAL CONFIG FILE ERROR ***\n");
1513 fprintf(stderr
, "Reading the configuration file, at line %d\n", linenum
);
1514 fprintf(stderr
, ">>> '%s'\n", line
);
1515 fprintf(stderr
, "%s\n", err
);
1519 static void freeClientArgv(redisClient
*c
) {
1522 for (j
= 0; j
< c
->argc
; j
++)
1523 decrRefCount(c
->argv
[j
]);
1524 for (j
= 0; j
< c
->mbargc
; j
++)
1525 decrRefCount(c
->mbargv
[j
]);
1530 static void freeClient(redisClient
*c
) {
1533 /* Note that if the client we are freeing is blocked into a blocking
1534 * call, we have to set querybuf to NULL *before* to call unblockClient()
1535 * to avoid processInputBuffer() will get called. Also it is important
1536 * to remove the file events after this, because this call adds
1537 * the READABLE event. */
1538 sdsfree(c
->querybuf
);
1540 if (c
->flags
& REDIS_BLOCKED
)
1543 aeDeleteFileEvent(server
.el
,c
->fd
,AE_READABLE
);
1544 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1545 listRelease(c
->reply
);
1548 ln
= listSearchKey(server
.clients
,c
);
1549 redisAssert(ln
!= NULL
);
1550 listDelNode(server
.clients
,ln
);
1551 if (c
->flags
& REDIS_SLAVE
) {
1552 if (c
->replstate
== REDIS_REPL_SEND_BULK
&& c
->repldbfd
!= -1)
1554 list
*l
= (c
->flags
& REDIS_MONITOR
) ? server
.monitors
: server
.slaves
;
1555 ln
= listSearchKey(l
,c
);
1556 redisAssert(ln
!= NULL
);
1559 if (c
->flags
& REDIS_MASTER
) {
1560 server
.master
= NULL
;
1561 server
.replstate
= REDIS_REPL_CONNECT
;
1565 freeClientMultiState(c
);
1569 #define GLUEREPLY_UP_TO (1024)
1570 static void glueReplyBuffersIfNeeded(redisClient
*c
) {
1572 char buf
[GLUEREPLY_UP_TO
];
1576 listRewind(c
->reply
);
1577 while((ln
= listYield(c
->reply
))) {
1581 objlen
= sdslen(o
->ptr
);
1582 if (copylen
+ objlen
<= GLUEREPLY_UP_TO
) {
1583 memcpy(buf
+copylen
,o
->ptr
,objlen
);
1585 listDelNode(c
->reply
,ln
);
1587 if (copylen
== 0) return;
1591 /* Now the output buffer is empty, add the new single element */
1592 o
= createObject(REDIS_STRING
,sdsnewlen(buf
,copylen
));
1593 listAddNodeHead(c
->reply
,o
);
1596 static void sendReplyToClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1597 redisClient
*c
= privdata
;
1598 int nwritten
= 0, totwritten
= 0, objlen
;
1601 REDIS_NOTUSED(mask
);
1603 /* Use writev() if we have enough buffers to send */
1604 if (!server
.glueoutputbuf
&&
1605 listLength(c
->reply
) > REDIS_WRITEV_THRESHOLD
&&
1606 !(c
->flags
& REDIS_MASTER
))
1608 sendReplyToClientWritev(el
, fd
, privdata
, mask
);
1612 while(listLength(c
->reply
)) {
1613 if (server
.glueoutputbuf
&& listLength(c
->reply
) > 1)
1614 glueReplyBuffersIfNeeded(c
);
1616 o
= listNodeValue(listFirst(c
->reply
));
1617 objlen
= sdslen(o
->ptr
);
1620 listDelNode(c
->reply
,listFirst(c
->reply
));
1624 if (c
->flags
& REDIS_MASTER
) {
1625 /* Don't reply to a master */
1626 nwritten
= objlen
- c
->sentlen
;
1628 nwritten
= write(fd
, ((char*)o
->ptr
)+c
->sentlen
, objlen
- c
->sentlen
);
1629 if (nwritten
<= 0) break;
1631 c
->sentlen
+= nwritten
;
1632 totwritten
+= nwritten
;
1633 /* If we fully sent the object on head go to the next one */
1634 if (c
->sentlen
== objlen
) {
1635 listDelNode(c
->reply
,listFirst(c
->reply
));
1638 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1639 * bytes, in a single threaded server it's a good idea to serve
1640 * other clients as well, even if a very large request comes from
1641 * super fast link that is always able to accept data (in real world
1642 * scenario think about 'KEYS *' against the loopback interfae) */
1643 if (totwritten
> REDIS_MAX_WRITE_PER_EVENT
) break;
1645 if (nwritten
== -1) {
1646 if (errno
== EAGAIN
) {
1649 redisLog(REDIS_DEBUG
,
1650 "Error writing to client: %s", strerror(errno
));
1655 if (totwritten
> 0) c
->lastinteraction
= time(NULL
);
1656 if (listLength(c
->reply
) == 0) {
1658 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1662 static void sendReplyToClientWritev(aeEventLoop
*el
, int fd
, void *privdata
, int mask
)
1664 redisClient
*c
= privdata
;
1665 int nwritten
= 0, totwritten
= 0, objlen
, willwrite
;
1667 struct iovec iov
[REDIS_WRITEV_IOVEC_COUNT
];
1668 int offset
, ion
= 0;
1670 REDIS_NOTUSED(mask
);
1673 while (listLength(c
->reply
)) {
1674 offset
= c
->sentlen
;
1678 /* fill-in the iov[] array */
1679 for(node
= listFirst(c
->reply
); node
; node
= listNextNode(node
)) {
1680 o
= listNodeValue(node
);
1681 objlen
= sdslen(o
->ptr
);
1683 if (totwritten
+ objlen
- offset
> REDIS_MAX_WRITE_PER_EVENT
)
1686 if(ion
== REDIS_WRITEV_IOVEC_COUNT
)
1687 break; /* no more iovecs */
1689 iov
[ion
].iov_base
= ((char*)o
->ptr
) + offset
;
1690 iov
[ion
].iov_len
= objlen
- offset
;
1691 willwrite
+= objlen
- offset
;
1692 offset
= 0; /* just for the first item */
1699 /* write all collected blocks at once */
1700 if((nwritten
= writev(fd
, iov
, ion
)) < 0) {
1701 if (errno
!= EAGAIN
) {
1702 redisLog(REDIS_DEBUG
,
1703 "Error writing to client: %s", strerror(errno
));
1710 totwritten
+= nwritten
;
1711 offset
= c
->sentlen
;
1713 /* remove written robjs from c->reply */
1714 while (nwritten
&& listLength(c
->reply
)) {
1715 o
= listNodeValue(listFirst(c
->reply
));
1716 objlen
= sdslen(o
->ptr
);
1718 if(nwritten
>= objlen
- offset
) {
1719 listDelNode(c
->reply
, listFirst(c
->reply
));
1720 nwritten
-= objlen
- offset
;
1724 c
->sentlen
+= nwritten
;
1732 c
->lastinteraction
= time(NULL
);
1734 if (listLength(c
->reply
) == 0) {
1736 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1740 static struct redisCommand
*lookupCommand(char *name
) {
1742 while(cmdTable
[j
].name
!= NULL
) {
1743 if (!strcasecmp(name
,cmdTable
[j
].name
)) return &cmdTable
[j
];
1749 /* resetClient prepare the client to process the next command */
1750 static void resetClient(redisClient
*c
) {
1756 /* Call() is the core of Redis execution of a command */
1757 static void call(redisClient
*c
, struct redisCommand
*cmd
) {
1760 dirty
= server
.dirty
;
1762 if (server
.appendonly
&& server
.dirty
-dirty
)
1763 feedAppendOnlyFile(cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1764 if (server
.dirty
-dirty
&& listLength(server
.slaves
))
1765 replicationFeedSlaves(server
.slaves
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1766 if (listLength(server
.monitors
))
1767 replicationFeedSlaves(server
.monitors
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1768 server
.stat_numcommands
++;
1771 /* If this function gets called we already read a whole
1772 * command, argments are in the client argv/argc fields.
1773 * processCommand() execute the command or prepare the
1774 * server for a bulk read from the client.
1776 * If 1 is returned the client is still alive and valid and
1777 * and other operations can be performed by the caller. Otherwise
1778 * if 0 is returned the client was destroied (i.e. after QUIT). */
1779 static int processCommand(redisClient
*c
) {
1780 struct redisCommand
*cmd
;
1782 /* Free some memory if needed (maxmemory setting) */
1783 if (server
.maxmemory
) freeMemoryIfNeeded();
1785 /* Handle the multi bulk command type. This is an alternative protocol
1786 * supported by Redis in order to receive commands that are composed of
1787 * multiple binary-safe "bulk" arguments. The latency of processing is
1788 * a bit higher but this allows things like multi-sets, so if this
1789 * protocol is used only for MSET and similar commands this is a big win. */
1790 if (c
->multibulk
== 0 && c
->argc
== 1 && ((char*)(c
->argv
[0]->ptr
))[0] == '*') {
1791 c
->multibulk
= atoi(((char*)c
->argv
[0]->ptr
)+1);
1792 if (c
->multibulk
<= 0) {
1796 decrRefCount(c
->argv
[c
->argc
-1]);
1800 } else if (c
->multibulk
) {
1801 if (c
->bulklen
== -1) {
1802 if (((char*)c
->argv
[0]->ptr
)[0] != '$') {
1803 addReplySds(c
,sdsnew("-ERR multi bulk protocol error\r\n"));
1807 int bulklen
= atoi(((char*)c
->argv
[0]->ptr
)+1);
1808 decrRefCount(c
->argv
[0]);
1809 if (bulklen
< 0 || bulklen
> 1024*1024*1024) {
1811 addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n"));
1816 c
->bulklen
= bulklen
+2; /* add two bytes for CR+LF */
1820 c
->mbargv
= zrealloc(c
->mbargv
,(sizeof(robj
*))*(c
->mbargc
+1));
1821 c
->mbargv
[c
->mbargc
] = c
->argv
[0];
1825 if (c
->multibulk
== 0) {
1829 /* Here we need to swap the multi-bulk argc/argv with the
1830 * normal argc/argv of the client structure. */
1832 c
->argv
= c
->mbargv
;
1833 c
->mbargv
= auxargv
;
1836 c
->argc
= c
->mbargc
;
1837 c
->mbargc
= auxargc
;
1839 /* We need to set bulklen to something different than -1
1840 * in order for the code below to process the command without
1841 * to try to read the last argument of a bulk command as
1842 * a special argument. */
1844 /* continue below and process the command */
1851 /* -- end of multi bulk commands processing -- */
1853 /* The QUIT command is handled as a special case. Normal command
1854 * procs are unable to close the client connection safely */
1855 if (!strcasecmp(c
->argv
[0]->ptr
,"quit")) {
1859 cmd
= lookupCommand(c
->argv
[0]->ptr
);
1862 sdscatprintf(sdsempty(), "-ERR unknown command '%s'\r\n",
1863 (char*)c
->argv
[0]->ptr
));
1866 } else if ((cmd
->arity
> 0 && cmd
->arity
!= c
->argc
) ||
1867 (c
->argc
< -cmd
->arity
)) {
1869 sdscatprintf(sdsempty(),
1870 "-ERR wrong number of arguments for '%s' command\r\n",
1874 } else if (server
.maxmemory
&& cmd
->flags
& REDIS_CMD_DENYOOM
&& zmalloc_used_memory() > server
.maxmemory
) {
1875 addReplySds(c
,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1878 } else if (cmd
->flags
& REDIS_CMD_BULK
&& c
->bulklen
== -1) {
1879 int bulklen
= atoi(c
->argv
[c
->argc
-1]->ptr
);
1881 decrRefCount(c
->argv
[c
->argc
-1]);
1882 if (bulklen
< 0 || bulklen
> 1024*1024*1024) {
1884 addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n"));
1889 c
->bulklen
= bulklen
+2; /* add two bytes for CR+LF */
1890 /* It is possible that the bulk read is already in the
1891 * buffer. Check this condition and handle it accordingly.
1892 * This is just a fast path, alternative to call processInputBuffer().
1893 * It's a good idea since the code is small and this condition
1894 * happens most of the times. */
1895 if ((signed)sdslen(c
->querybuf
) >= c
->bulklen
) {
1896 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
1898 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
1903 /* Let's try to share objects on the command arguments vector */
1904 if (server
.shareobjects
) {
1906 for(j
= 1; j
< c
->argc
; j
++)
1907 c
->argv
[j
] = tryObjectSharing(c
->argv
[j
]);
1909 /* Let's try to encode the bulk object to save space. */
1910 if (cmd
->flags
& REDIS_CMD_BULK
)
1911 tryObjectEncoding(c
->argv
[c
->argc
-1]);
1913 /* Check if the user is authenticated */
1914 if (server
.requirepass
&& !c
->authenticated
&& cmd
->proc
!= authCommand
) {
1915 addReplySds(c
,sdsnew("-ERR operation not permitted\r\n"));
1920 /* Exec the command */
1921 if (c
->flags
& REDIS_MULTI
&& cmd
->proc
!= execCommand
) {
1922 queueMultiCommand(c
,cmd
);
1923 addReply(c
,shared
.queued
);
1928 /* Prepare the client for the next command */
1929 if (c
->flags
& REDIS_CLOSE
) {
1937 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
) {
1941 /* (args*2)+1 is enough room for args, spaces, newlines */
1942 robj
*static_outv
[REDIS_STATIC_ARGS
*2+1];
1944 if (argc
<= REDIS_STATIC_ARGS
) {
1947 outv
= zmalloc(sizeof(robj
*)*(argc
*2+1));
1950 for (j
= 0; j
< argc
; j
++) {
1951 if (j
!= 0) outv
[outc
++] = shared
.space
;
1952 if ((cmd
->flags
& REDIS_CMD_BULK
) && j
== argc
-1) {
1955 lenobj
= createObject(REDIS_STRING
,
1956 sdscatprintf(sdsempty(),"%lu\r\n",
1957 (unsigned long) stringObjectLen(argv
[j
])));
1958 lenobj
->refcount
= 0;
1959 outv
[outc
++] = lenobj
;
1961 outv
[outc
++] = argv
[j
];
1963 outv
[outc
++] = shared
.crlf
;
1965 /* Increment all the refcounts at start and decrement at end in order to
1966 * be sure to free objects if there is no slave in a replication state
1967 * able to be feed with commands */
1968 for (j
= 0; j
< outc
; j
++) incrRefCount(outv
[j
]);
1970 while((ln
= listYield(slaves
))) {
1971 redisClient
*slave
= ln
->value
;
1973 /* Don't feed slaves that are still waiting for BGSAVE to start */
1974 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) continue;
1976 /* Feed all the other slaves, MONITORs and so on */
1977 if (slave
->slaveseldb
!= dictid
) {
1981 case 0: selectcmd
= shared
.select0
; break;
1982 case 1: selectcmd
= shared
.select1
; break;
1983 case 2: selectcmd
= shared
.select2
; break;
1984 case 3: selectcmd
= shared
.select3
; break;
1985 case 4: selectcmd
= shared
.select4
; break;
1986 case 5: selectcmd
= shared
.select5
; break;
1987 case 6: selectcmd
= shared
.select6
; break;
1988 case 7: selectcmd
= shared
.select7
; break;
1989 case 8: selectcmd
= shared
.select8
; break;
1990 case 9: selectcmd
= shared
.select9
; break;
1992 selectcmd
= createObject(REDIS_STRING
,
1993 sdscatprintf(sdsempty(),"select %d\r\n",dictid
));
1994 selectcmd
->refcount
= 0;
1997 addReply(slave
,selectcmd
);
1998 slave
->slaveseldb
= dictid
;
2000 for (j
= 0; j
< outc
; j
++) addReply(slave
,outv
[j
]);
2002 for (j
= 0; j
< outc
; j
++) decrRefCount(outv
[j
]);
2003 if (outv
!= static_outv
) zfree(outv
);
2006 static void processInputBuffer(redisClient
*c
) {
2008 /* Before to process the input buffer, make sure the client is not
2009 * waitig for a blocking operation such as BLPOP. Note that the first
2010 * iteration the client is never blocked, otherwise the processInputBuffer
2011 * would not be called at all, but after the execution of the first commands
2012 * in the input buffer the client may be blocked, and the "goto again"
2013 * will try to reiterate. The following line will make it return asap. */
2014 if (c
->flags
& REDIS_BLOCKED
) return;
2015 if (c
->bulklen
== -1) {
2016 /* Read the first line of the query */
2017 char *p
= strchr(c
->querybuf
,'\n');
2024 query
= c
->querybuf
;
2025 c
->querybuf
= sdsempty();
2026 querylen
= 1+(p
-(query
));
2027 if (sdslen(query
) > querylen
) {
2028 /* leave data after the first line of the query in the buffer */
2029 c
->querybuf
= sdscatlen(c
->querybuf
,query
+querylen
,sdslen(query
)-querylen
);
2031 *p
= '\0'; /* remove "\n" */
2032 if (*(p
-1) == '\r') *(p
-1) = '\0'; /* and "\r" if any */
2033 sdsupdatelen(query
);
2035 /* Now we can split the query in arguments */
2036 argv
= sdssplitlen(query
,sdslen(query
)," ",1,&argc
);
2039 if (c
->argv
) zfree(c
->argv
);
2040 c
->argv
= zmalloc(sizeof(robj
*)*argc
);
2042 for (j
= 0; j
< argc
; j
++) {
2043 if (sdslen(argv
[j
])) {
2044 c
->argv
[c
->argc
] = createObject(REDIS_STRING
,argv
[j
]);
2052 /* Execute the command. If the client is still valid
2053 * after processCommand() return and there is something
2054 * on the query buffer try to process the next command. */
2055 if (processCommand(c
) && sdslen(c
->querybuf
)) goto again
;
2057 /* Nothing to process, argc == 0. Just process the query
2058 * buffer if it's not empty or return to the caller */
2059 if (sdslen(c
->querybuf
)) goto again
;
2062 } else if (sdslen(c
->querybuf
) >= REDIS_REQUEST_MAX_SIZE
) {
2063 redisLog(REDIS_DEBUG
, "Client protocol error");
2068 /* Bulk read handling. Note that if we are at this point
2069 the client already sent a command terminated with a newline,
2070 we are reading the bulk data that is actually the last
2071 argument of the command. */
2072 int qbl
= sdslen(c
->querybuf
);
2074 if (c
->bulklen
<= qbl
) {
2075 /* Copy everything but the final CRLF as final argument */
2076 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
2078 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
2079 /* Process the command. If the client is still valid after
2080 * the processing and there is more data in the buffer
2081 * try to parse it. */
2082 if (processCommand(c
) && sdslen(c
->querybuf
)) goto again
;
2088 static void readQueryFromClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
2089 redisClient
*c
= (redisClient
*) privdata
;
2090 char buf
[REDIS_IOBUF_LEN
];
2093 REDIS_NOTUSED(mask
);
2095 nread
= read(fd
, buf
, REDIS_IOBUF_LEN
);
2097 if (errno
== EAGAIN
) {
2100 redisLog(REDIS_DEBUG
, "Reading from client: %s",strerror(errno
));
2104 } else if (nread
== 0) {
2105 redisLog(REDIS_DEBUG
, "Client closed connection");
2110 c
->querybuf
= sdscatlen(c
->querybuf
, buf
, nread
);
2111 c
->lastinteraction
= time(NULL
);
2115 processInputBuffer(c
);
2118 static int selectDb(redisClient
*c
, int id
) {
2119 if (id
< 0 || id
>= server
.dbnum
)
2121 c
->db
= &server
.db
[id
];
2125 static void *dupClientReplyValue(void *o
) {
2126 incrRefCount((robj
*)o
);
2130 static redisClient
*createClient(int fd
) {
2131 redisClient
*c
= zmalloc(sizeof(*c
));
2133 anetNonBlock(NULL
,fd
);
2134 anetTcpNoDelay(NULL
,fd
);
2135 if (!c
) return NULL
;
2138 c
->querybuf
= sdsempty();
2147 c
->lastinteraction
= time(NULL
);
2148 c
->authenticated
= 0;
2149 c
->replstate
= REDIS_REPL_NONE
;
2150 c
->reply
= listCreate();
2151 c
->blockingkeys
= NULL
;
2152 c
->blockingkeysnum
= 0;
2153 listSetFreeMethod(c
->reply
,decrRefCount
);
2154 listSetDupMethod(c
->reply
,dupClientReplyValue
);
2155 if (aeCreateFileEvent(server
.el
, c
->fd
, AE_READABLE
,
2156 readQueryFromClient
, c
) == AE_ERR
) {
2160 listAddNodeTail(server
.clients
,c
);
2161 initClientMultiState(c
);
2165 static void addReply(redisClient
*c
, robj
*obj
) {
2166 if (listLength(c
->reply
) == 0 &&
2167 (c
->replstate
== REDIS_REPL_NONE
||
2168 c
->replstate
== REDIS_REPL_ONLINE
) &&
2169 aeCreateFileEvent(server
.el
, c
->fd
, AE_WRITABLE
,
2170 sendReplyToClient
, c
) == AE_ERR
) return;
2171 listAddNodeTail(c
->reply
,getDecodedObject(obj
));
2174 static void addReplySds(redisClient
*c
, sds s
) {
2175 robj
*o
= createObject(REDIS_STRING
,s
);
2180 static void addReplyDouble(redisClient
*c
, double d
) {
2183 snprintf(buf
,sizeof(buf
),"%.17g",d
);
2184 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n%s\r\n",
2185 (unsigned long) strlen(buf
),buf
));
2188 static void addReplyBulkLen(redisClient
*c
, robj
*obj
) {
2191 if (obj
->encoding
== REDIS_ENCODING_RAW
) {
2192 len
= sdslen(obj
->ptr
);
2194 long n
= (long)obj
->ptr
;
2196 /* Compute how many bytes will take this integer as a radix 10 string */
2202 while((n
= n
/10) != 0) {
2206 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n",(unsigned long)len
));
2209 static void acceptHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
2214 REDIS_NOTUSED(mask
);
2215 REDIS_NOTUSED(privdata
);
2217 cfd
= anetAccept(server
.neterr
, fd
, cip
, &cport
);
2218 if (cfd
== AE_ERR
) {
2219 redisLog(REDIS_DEBUG
,"Accepting client connection: %s", server
.neterr
);
2222 redisLog(REDIS_DEBUG
,"Accepted %s:%d", cip
, cport
);
2223 if ((c
= createClient(cfd
)) == NULL
) {
2224 redisLog(REDIS_WARNING
,"Error allocating resoures for the client");
2225 close(cfd
); /* May be already closed, just ingore errors */
2228 /* If maxclient directive is set and this is one client more... close the
2229 * connection. Note that we create the client instead to check before
2230 * for this condition, since now the socket is already set in nonblocking
2231 * mode and we can send an error for free using the Kernel I/O */
2232 if (server
.maxclients
&& listLength(server
.clients
) > server
.maxclients
) {
2233 char *err
= "-ERR max number of clients reached\r\n";
2235 /* That's a best effort error message, don't check write errors */
2236 if (write(c
->fd
,err
,strlen(err
)) == -1) {
2237 /* Nothing to do, Just to avoid the warning... */
2242 server
.stat_numconnections
++;
2245 /* ======================= Redis objects implementation ===================== */
2247 static robj
*createObject(int type
, void *ptr
) {
2250 if (listLength(server
.objfreelist
)) {
2251 listNode
*head
= listFirst(server
.objfreelist
);
2252 o
= listNodeValue(head
);
2253 listDelNode(server
.objfreelist
,head
);
2255 if (server
.vm_enabled
) {
2256 o
= zmalloc(sizeof(*o
));
2258 o
= zmalloc(sizeof(*o
)-sizeof(struct redisObjectVM
));
2262 o
->encoding
= REDIS_ENCODING_RAW
;
2265 if (server
.vm_enabled
) {
2266 o
->vm
.atime
= server
.unixtime
;
2267 o
->storage
= REDIS_VM_MEMORY
;
2272 static robj
*createStringObject(char *ptr
, size_t len
) {
2273 return createObject(REDIS_STRING
,sdsnewlen(ptr
,len
));
2276 static robj
*createListObject(void) {
2277 list
*l
= listCreate();
2279 listSetFreeMethod(l
,decrRefCount
);
2280 return createObject(REDIS_LIST
,l
);
2283 static robj
*createSetObject(void) {
2284 dict
*d
= dictCreate(&setDictType
,NULL
);
2285 return createObject(REDIS_SET
,d
);
2288 static robj
*createZsetObject(void) {
2289 zset
*zs
= zmalloc(sizeof(*zs
));
2291 zs
->dict
= dictCreate(&zsetDictType
,NULL
);
2292 zs
->zsl
= zslCreate();
2293 return createObject(REDIS_ZSET
,zs
);
2296 static void freeStringObject(robj
*o
) {
2297 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2302 static void freeListObject(robj
*o
) {
2303 listRelease((list
*) o
->ptr
);
2306 static void freeSetObject(robj
*o
) {
2307 dictRelease((dict
*) o
->ptr
);
2310 static void freeZsetObject(robj
*o
) {
2313 dictRelease(zs
->dict
);
2318 static void freeHashObject(robj
*o
) {
2319 dictRelease((dict
*) o
->ptr
);
2322 static void incrRefCount(robj
*o
) {
2324 #ifdef DEBUG_REFCOUNT
2325 if (o
->type
== REDIS_STRING
)
2326 printf("Increment '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
);
2330 static void decrRefCount(void *obj
) {
2333 #ifdef DEBUG_REFCOUNT
2334 if (o
->type
== REDIS_STRING
)
2335 printf("Decrement '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
-1);
2337 if (--(o
->refcount
) == 0) {
2339 case REDIS_STRING
: freeStringObject(o
); break;
2340 case REDIS_LIST
: freeListObject(o
); break;
2341 case REDIS_SET
: freeSetObject(o
); break;
2342 case REDIS_ZSET
: freeZsetObject(o
); break;
2343 case REDIS_HASH
: freeHashObject(o
); break;
2344 default: redisAssert(0 != 0); break;
2346 if (listLength(server
.objfreelist
) > REDIS_OBJFREELIST_MAX
||
2347 !listAddNodeHead(server
.objfreelist
,o
))
2352 static robj
*lookupKey(redisDb
*db
, robj
*key
) {
2353 dictEntry
*de
= dictFind(db
->dict
,key
);
2355 robj
*o
= dictGetEntryVal(de
);
2357 /* Update the access time of the key for the aging algorithm. */
2358 if (server
.vm_enabled
) o
->vm
.atime
= server
.unixtime
;
2365 static robj
*lookupKeyRead(redisDb
*db
, robj
*key
) {
2366 expireIfNeeded(db
,key
);
2367 return lookupKey(db
,key
);
2370 static robj
*lookupKeyWrite(redisDb
*db
, robj
*key
) {
2371 deleteIfVolatile(db
,key
);
2372 return lookupKey(db
,key
);
2375 static int deleteKey(redisDb
*db
, robj
*key
) {
2378 /* We need to protect key from destruction: after the first dictDelete()
2379 * it may happen that 'key' is no longer valid if we don't increment
2380 * it's count. This may happen when we get the object reference directly
2381 * from the hash table with dictRandomKey() or dict iterators */
2383 if (dictSize(db
->expires
)) dictDelete(db
->expires
,key
);
2384 retval
= dictDelete(db
->dict
,key
);
2387 return retval
== DICT_OK
;
2390 /* Try to share an object against the shared objects pool */
2391 static robj
*tryObjectSharing(robj
*o
) {
2392 struct dictEntry
*de
;
2395 if (o
== NULL
|| server
.shareobjects
== 0) return o
;
2397 redisAssert(o
->type
== REDIS_STRING
);
2398 de
= dictFind(server
.sharingpool
,o
);
2400 robj
*shared
= dictGetEntryKey(de
);
2402 c
= ((unsigned long) dictGetEntryVal(de
))+1;
2403 dictGetEntryVal(de
) = (void*) c
;
2404 incrRefCount(shared
);
2408 /* Here we are using a stream algorihtm: Every time an object is
2409 * shared we increment its count, everytime there is a miss we
2410 * recrement the counter of a random object. If this object reaches
2411 * zero we remove the object and put the current object instead. */
2412 if (dictSize(server
.sharingpool
) >=
2413 server
.sharingpoolsize
) {
2414 de
= dictGetRandomKey(server
.sharingpool
);
2415 redisAssert(de
!= NULL
);
2416 c
= ((unsigned long) dictGetEntryVal(de
))-1;
2417 dictGetEntryVal(de
) = (void*) c
;
2419 dictDelete(server
.sharingpool
,de
->key
);
2422 c
= 0; /* If the pool is empty we want to add this object */
2427 retval
= dictAdd(server
.sharingpool
,o
,(void*)1);
2428 redisAssert(retval
== DICT_OK
);
2435 /* Check if the nul-terminated string 's' can be represented by a long
2436 * (that is, is a number that fits into long without any other space or
2437 * character before or after the digits).
2439 * If so, the function returns REDIS_OK and *longval is set to the value
2440 * of the number. Otherwise REDIS_ERR is returned */
2441 static int isStringRepresentableAsLong(sds s
, long *longval
) {
2442 char buf
[32], *endptr
;
2446 value
= strtol(s
, &endptr
, 10);
2447 if (endptr
[0] != '\0') return REDIS_ERR
;
2448 slen
= snprintf(buf
,32,"%ld",value
);
2450 /* If the number converted back into a string is not identical
2451 * then it's not possible to encode the string as integer */
2452 if (sdslen(s
) != (unsigned)slen
|| memcmp(buf
,s
,slen
)) return REDIS_ERR
;
2453 if (longval
) *longval
= value
;
2457 /* Try to encode a string object in order to save space */
2458 static int tryObjectEncoding(robj
*o
) {
2462 if (o
->encoding
!= REDIS_ENCODING_RAW
)
2463 return REDIS_ERR
; /* Already encoded */
2465 /* It's not save to encode shared objects: shared objects can be shared
2466 * everywhere in the "object space" of Redis. Encoded objects can only
2467 * appear as "values" (and not, for instance, as keys) */
2468 if (o
->refcount
> 1) return REDIS_ERR
;
2470 /* Currently we try to encode only strings */
2471 redisAssert(o
->type
== REDIS_STRING
);
2473 /* Check if we can represent this string as a long integer */
2474 if (isStringRepresentableAsLong(s
,&value
) == REDIS_ERR
) return REDIS_ERR
;
2476 /* Ok, this object can be encoded */
2477 o
->encoding
= REDIS_ENCODING_INT
;
2479 o
->ptr
= (void*) value
;
2483 /* Get a decoded version of an encoded object (returned as a new object).
2484 * If the object is already raw-encoded just increment the ref count. */
2485 static robj
*getDecodedObject(robj
*o
) {
2488 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2492 if (o
->type
== REDIS_STRING
&& o
->encoding
== REDIS_ENCODING_INT
) {
2495 snprintf(buf
,32,"%ld",(long)o
->ptr
);
2496 dec
= createStringObject(buf
,strlen(buf
));
2499 redisAssert(1 != 1);
2503 /* Compare two string objects via strcmp() or alike.
2504 * Note that the objects may be integer-encoded. In such a case we
2505 * use snprintf() to get a string representation of the numbers on the stack
2506 * and compare the strings, it's much faster than calling getDecodedObject().
2508 * Important note: if objects are not integer encoded, but binary-safe strings,
2509 * sdscmp() from sds.c will apply memcmp() so this function ca be considered
2511 static int compareStringObjects(robj
*a
, robj
*b
) {
2512 redisAssert(a
->type
== REDIS_STRING
&& b
->type
== REDIS_STRING
);
2513 char bufa
[128], bufb
[128], *astr
, *bstr
;
2516 if (a
== b
) return 0;
2517 if (a
->encoding
!= REDIS_ENCODING_RAW
) {
2518 snprintf(bufa
,sizeof(bufa
),"%ld",(long) a
->ptr
);
2524 if (b
->encoding
!= REDIS_ENCODING_RAW
) {
2525 snprintf(bufb
,sizeof(bufb
),"%ld",(long) b
->ptr
);
2531 return bothsds
? sdscmp(astr
,bstr
) : strcmp(astr
,bstr
);
2534 static size_t stringObjectLen(robj
*o
) {
2535 redisAssert(o
->type
== REDIS_STRING
);
2536 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2537 return sdslen(o
->ptr
);
2541 return snprintf(buf
,32,"%ld",(long)o
->ptr
);
2545 /*============================ RDB saving/loading =========================== */
2547 static int rdbSaveType(FILE *fp
, unsigned char type
) {
2548 if (fwrite(&type
,1,1,fp
) == 0) return -1;
2552 static int rdbSaveTime(FILE *fp
, time_t t
) {
2553 int32_t t32
= (int32_t) t
;
2554 if (fwrite(&t32
,4,1,fp
) == 0) return -1;
2558 /* check rdbLoadLen() comments for more info */
2559 static int rdbSaveLen(FILE *fp
, uint32_t len
) {
2560 unsigned char buf
[2];
2563 /* Save a 6 bit len */
2564 buf
[0] = (len
&0xFF)|(REDIS_RDB_6BITLEN
<<6);
2565 if (fwrite(buf
,1,1,fp
) == 0) return -1;
2566 } else if (len
< (1<<14)) {
2567 /* Save a 14 bit len */
2568 buf
[0] = ((len
>>8)&0xFF)|(REDIS_RDB_14BITLEN
<<6);
2570 if (fwrite(buf
,2,1,fp
) == 0) return -1;
2572 /* Save a 32 bit len */
2573 buf
[0] = (REDIS_RDB_32BITLEN
<<6);
2574 if (fwrite(buf
,1,1,fp
) == 0) return -1;
2576 if (fwrite(&len
,4,1,fp
) == 0) return -1;
2581 /* String objects in the form "2391" "-100" without any space and with a
2582 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2583 * encoded as integers to save space */
2584 static int rdbTryIntegerEncoding(sds s
, unsigned char *enc
) {
2586 char *endptr
, buf
[32];
2588 /* Check if it's possible to encode this value as a number */
2589 value
= strtoll(s
, &endptr
, 10);
2590 if (endptr
[0] != '\0') return 0;
2591 snprintf(buf
,32,"%lld",value
);
2593 /* If the number converted back into a string is not identical
2594 * then it's not possible to encode the string as integer */
2595 if (strlen(buf
) != sdslen(s
) || memcmp(buf
,s
,sdslen(s
))) return 0;
2597 /* Finally check if it fits in our ranges */
2598 if (value
>= -(1<<7) && value
<= (1<<7)-1) {
2599 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT8
;
2600 enc
[1] = value
&0xFF;
2602 } else if (value
>= -(1<<15) && value
<= (1<<15)-1) {
2603 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT16
;
2604 enc
[1] = value
&0xFF;
2605 enc
[2] = (value
>>8)&0xFF;
2607 } else if (value
>= -((long long)1<<31) && value
<= ((long long)1<<31)-1) {
2608 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT32
;
2609 enc
[1] = value
&0xFF;
2610 enc
[2] = (value
>>8)&0xFF;
2611 enc
[3] = (value
>>16)&0xFF;
2612 enc
[4] = (value
>>24)&0xFF;
2619 static int rdbSaveLzfStringObject(FILE *fp
, robj
*obj
) {
2620 unsigned int comprlen
, outlen
;
2624 /* We require at least four bytes compression for this to be worth it */
2625 outlen
= sdslen(obj
->ptr
)-4;
2626 if (outlen
<= 0) return 0;
2627 if ((out
= zmalloc(outlen
+1)) == NULL
) return 0;
2628 comprlen
= lzf_compress(obj
->ptr
, sdslen(obj
->ptr
), out
, outlen
);
2629 if (comprlen
== 0) {
2633 /* Data compressed! Let's save it on disk */
2634 byte
= (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_LZF
;
2635 if (fwrite(&byte
,1,1,fp
) == 0) goto writeerr
;
2636 if (rdbSaveLen(fp
,comprlen
) == -1) goto writeerr
;
2637 if (rdbSaveLen(fp
,sdslen(obj
->ptr
)) == -1) goto writeerr
;
2638 if (fwrite(out
,comprlen
,1,fp
) == 0) goto writeerr
;
2647 /* Save a string objet as [len][data] on disk. If the object is a string
2648 * representation of an integer value we try to safe it in a special form */
2649 static int rdbSaveStringObjectRaw(FILE *fp
, robj
*obj
) {
2653 len
= sdslen(obj
->ptr
);
2655 /* Try integer encoding */
2657 unsigned char buf
[5];
2658 if ((enclen
= rdbTryIntegerEncoding(obj
->ptr
,buf
)) > 0) {
2659 if (fwrite(buf
,enclen
,1,fp
) == 0) return -1;
2664 /* Try LZF compression - under 20 bytes it's unable to compress even
2665 * aaaaaaaaaaaaaaaaaa so skip it */
2666 if (server
.rdbcompression
&& len
> 20) {
2669 retval
= rdbSaveLzfStringObject(fp
,obj
);
2670 if (retval
== -1) return -1;
2671 if (retval
> 0) return 0;
2672 /* retval == 0 means data can't be compressed, save the old way */
2675 /* Store verbatim */
2676 if (rdbSaveLen(fp
,len
) == -1) return -1;
2677 if (len
&& fwrite(obj
->ptr
,len
,1,fp
) == 0) return -1;
2681 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2682 static int rdbSaveStringObject(FILE *fp
, robj
*obj
) {
2685 obj
= getDecodedObject(obj
);
2686 retval
= rdbSaveStringObjectRaw(fp
,obj
);
2691 /* Save a double value. Doubles are saved as strings prefixed by an unsigned
2692 * 8 bit integer specifing the length of the representation.
2693 * This 8 bit integer has special values in order to specify the following
2699 static int rdbSaveDoubleValue(FILE *fp
, double val
) {
2700 unsigned char buf
[128];
2706 } else if (!isfinite(val
)) {
2708 buf
[0] = (val
< 0) ? 255 : 254;
2710 snprintf((char*)buf
+1,sizeof(buf
)-1,"%.17g",val
);
2711 buf
[0] = strlen((char*)buf
+1);
2714 if (fwrite(buf
,len
,1,fp
) == 0) return -1;
2718 /* Save a Redis object. */
2719 static int rdbSaveObject(FILE *fp
, robj
*o
) {
2720 if (o
->type
== REDIS_STRING
) {
2721 /* Save a string value */
2722 if (rdbSaveStringObject(fp
,o
) == -1) return -1;
2723 } else if (o
->type
== REDIS_LIST
) {
2724 /* Save a list value */
2725 list
*list
= o
->ptr
;
2729 if (rdbSaveLen(fp
,listLength(list
)) == -1) return -1;
2730 while((ln
= listYield(list
))) {
2731 robj
*eleobj
= listNodeValue(ln
);
2733 if (rdbSaveStringObject(fp
,eleobj
) == -1) return -1;
2735 } else if (o
->type
== REDIS_SET
) {
2736 /* Save a set value */
2738 dictIterator
*di
= dictGetIterator(set
);
2741 if (rdbSaveLen(fp
,dictSize(set
)) == -1) return -1;
2742 while((de
= dictNext(di
)) != NULL
) {
2743 robj
*eleobj
= dictGetEntryKey(de
);
2745 if (rdbSaveStringObject(fp
,eleobj
) == -1) return -1;
2747 dictReleaseIterator(di
);
2748 } else if (o
->type
== REDIS_ZSET
) {
2749 /* Save a set value */
2751 dictIterator
*di
= dictGetIterator(zs
->dict
);
2754 if (rdbSaveLen(fp
,dictSize(zs
->dict
)) == -1) return -1;
2755 while((de
= dictNext(di
)) != NULL
) {
2756 robj
*eleobj
= dictGetEntryKey(de
);
2757 double *score
= dictGetEntryVal(de
);
2759 if (rdbSaveStringObject(fp
,eleobj
) == -1) return -1;
2760 if (rdbSaveDoubleValue(fp
,*score
) == -1) return -1;
2762 dictReleaseIterator(di
);
2764 redisAssert(0 != 0);
2769 /* Return the length the object will have on disk if saved with
2770 * the rdbSaveObject() function. Currently we use a trick to get
2771 * this length with very little changes to the code. In the future
2772 * we could switch to a faster solution. */
2773 static off_t
rdbSavedObjectLen(robj
*o
) {
2774 static FILE *fp
= NULL
;
2776 if (fp
== NULL
) fp
= fopen("/dev/null","w");
2780 assert(rdbSaveObject(fp
,o
) != 1);
2784 /* Return the number of pages required to save this object in the swap file */
2785 static off_t
rdbSavedObjectPages(robj
*o
) {
2786 off_t bytes
= rdbSavedObjectLen(o
);
2788 return (bytes
+(server
.vm_page_size
-1))/server
.vm_page_size
;
2791 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2792 static int rdbSave(char *filename
) {
2793 dictIterator
*di
= NULL
;
2798 time_t now
= time(NULL
);
2800 snprintf(tmpfile
,256,"temp-%d.rdb", (int) getpid());
2801 fp
= fopen(tmpfile
,"w");
2803 redisLog(REDIS_WARNING
, "Failed saving the DB: %s", strerror(errno
));
2806 if (fwrite("REDIS0001",9,1,fp
) == 0) goto werr
;
2807 for (j
= 0; j
< server
.dbnum
; j
++) {
2808 redisDb
*db
= server
.db
+j
;
2810 if (dictSize(d
) == 0) continue;
2811 di
= dictGetIterator(d
);
2817 /* Write the SELECT DB opcode */
2818 if (rdbSaveType(fp
,REDIS_SELECTDB
) == -1) goto werr
;
2819 if (rdbSaveLen(fp
,j
) == -1) goto werr
;
2821 /* Iterate this DB writing every entry */
2822 while((de
= dictNext(di
)) != NULL
) {
2823 robj
*key
= dictGetEntryKey(de
);
2824 robj
*o
= dictGetEntryVal(de
);
2825 time_t expiretime
= getExpire(db
,key
);
2827 /* Save the expire time */
2828 if (expiretime
!= -1) {
2829 /* If this key is already expired skip it */
2830 if (expiretime
< now
) continue;
2831 if (rdbSaveType(fp
,REDIS_EXPIRETIME
) == -1) goto werr
;
2832 if (rdbSaveTime(fp
,expiretime
) == -1) goto werr
;
2834 /* Save the key and associated value */
2835 if (rdbSaveType(fp
,o
->type
) == -1) goto werr
;
2836 if (rdbSaveStringObject(fp
,key
) == -1) goto werr
;
2837 /* Save the actual value */
2838 if (rdbSaveObject(fp
,o
) == -1) goto werr
;
2840 dictReleaseIterator(di
);
2843 if (rdbSaveType(fp
,REDIS_EOF
) == -1) goto werr
;
2845 /* Make sure data will not remain on the OS's output buffers */
2850 /* Use RENAME to make sure the DB file is changed atomically only
2851 * if the generate DB file is ok. */
2852 if (rename(tmpfile
,filename
) == -1) {
2853 redisLog(REDIS_WARNING
,"Error moving temp DB file on the final destination: %s", strerror(errno
));
2857 redisLog(REDIS_NOTICE
,"DB saved on disk");
2859 server
.lastsave
= time(NULL
);
2865 redisLog(REDIS_WARNING
,"Write error saving DB on disk: %s", strerror(errno
));
2866 if (di
) dictReleaseIterator(di
);
2870 static int rdbSaveBackground(char *filename
) {
2873 if (server
.bgsavechildpid
!= -1) return REDIS_ERR
;
2874 if ((childpid
= fork()) == 0) {
2877 if (rdbSave(filename
) == REDIS_OK
) {
2884 if (childpid
== -1) {
2885 redisLog(REDIS_WARNING
,"Can't save in background: fork: %s",
2889 redisLog(REDIS_NOTICE
,"Background saving started by pid %d",childpid
);
2890 server
.bgsavechildpid
= childpid
;
2893 return REDIS_OK
; /* unreached */
2896 static void rdbRemoveTempFile(pid_t childpid
) {
2899 snprintf(tmpfile
,256,"temp-%d.rdb", (int) childpid
);
2903 static int rdbLoadType(FILE *fp
) {
2905 if (fread(&type
,1,1,fp
) == 0) return -1;
2909 static time_t rdbLoadTime(FILE *fp
) {
2911 if (fread(&t32
,4,1,fp
) == 0) return -1;
2912 return (time_t) t32
;
2915 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2916 * of this file for a description of how this are stored on disk.
2918 * isencoded is set to 1 if the readed length is not actually a length but
2919 * an "encoding type", check the above comments for more info */
2920 static uint32_t rdbLoadLen(FILE *fp
, int *isencoded
) {
2921 unsigned char buf
[2];
2925 if (isencoded
) *isencoded
= 0;
2926 if (fread(buf
,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2927 type
= (buf
[0]&0xC0)>>6;
2928 if (type
== REDIS_RDB_6BITLEN
) {
2929 /* Read a 6 bit len */
2931 } else if (type
== REDIS_RDB_ENCVAL
) {
2932 /* Read a 6 bit len encoding type */
2933 if (isencoded
) *isencoded
= 1;
2935 } else if (type
== REDIS_RDB_14BITLEN
) {
2936 /* Read a 14 bit len */
2937 if (fread(buf
+1,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2938 return ((buf
[0]&0x3F)<<8)|buf
[1];
2940 /* Read a 32 bit len */
2941 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
2946 static robj
*rdbLoadIntegerObject(FILE *fp
, int enctype
) {
2947 unsigned char enc
[4];
2950 if (enctype
== REDIS_RDB_ENC_INT8
) {
2951 if (fread(enc
,1,1,fp
) == 0) return NULL
;
2952 val
= (signed char)enc
[0];
2953 } else if (enctype
== REDIS_RDB_ENC_INT16
) {
2955 if (fread(enc
,2,1,fp
) == 0) return NULL
;
2956 v
= enc
[0]|(enc
[1]<<8);
2958 } else if (enctype
== REDIS_RDB_ENC_INT32
) {
2960 if (fread(enc
,4,1,fp
) == 0) return NULL
;
2961 v
= enc
[0]|(enc
[1]<<8)|(enc
[2]<<16)|(enc
[3]<<24);
2964 val
= 0; /* anti-warning */
2967 return createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",val
));
2970 static robj
*rdbLoadLzfStringObject(FILE*fp
) {
2971 unsigned int len
, clen
;
2972 unsigned char *c
= NULL
;
2975 if ((clen
= rdbLoadLen(fp
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2976 if ((len
= rdbLoadLen(fp
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2977 if ((c
= zmalloc(clen
)) == NULL
) goto err
;
2978 if ((val
= sdsnewlen(NULL
,len
)) == NULL
) goto err
;
2979 if (fread(c
,clen
,1,fp
) == 0) goto err
;
2980 if (lzf_decompress(c
,clen
,val
,len
) == 0) goto err
;
2982 return createObject(REDIS_STRING
,val
);
2989 static robj
*rdbLoadStringObject(FILE*fp
) {
2994 len
= rdbLoadLen(fp
,&isencoded
);
2997 case REDIS_RDB_ENC_INT8
:
2998 case REDIS_RDB_ENC_INT16
:
2999 case REDIS_RDB_ENC_INT32
:
3000 return tryObjectSharing(rdbLoadIntegerObject(fp
,len
));
3001 case REDIS_RDB_ENC_LZF
:
3002 return tryObjectSharing(rdbLoadLzfStringObject(fp
));
3008 if (len
== REDIS_RDB_LENERR
) return NULL
;
3009 val
= sdsnewlen(NULL
,len
);
3010 if (len
&& fread(val
,len
,1,fp
) == 0) {
3014 return tryObjectSharing(createObject(REDIS_STRING
,val
));
3017 /* For information about double serialization check rdbSaveDoubleValue() */
3018 static int rdbLoadDoubleValue(FILE *fp
, double *val
) {
3022 if (fread(&len
,1,1,fp
) == 0) return -1;
3024 case 255: *val
= R_NegInf
; return 0;
3025 case 254: *val
= R_PosInf
; return 0;
3026 case 253: *val
= R_Nan
; return 0;
3028 if (fread(buf
,len
,1,fp
) == 0) return -1;
3030 sscanf(buf
, "%lg", val
);
3035 /* Load a Redis object of the specified type from the specified file.
3036 * On success a newly allocated object is returned, otherwise NULL. */
3037 static robj
*rdbLoadObject(int type
, FILE *fp
) {
3040 if (type
== REDIS_STRING
) {
3041 /* Read string value */
3042 if ((o
= rdbLoadStringObject(fp
)) == NULL
) return NULL
;
3043 tryObjectEncoding(o
);
3044 } else if (type
== REDIS_LIST
|| type
== REDIS_SET
) {
3045 /* Read list/set value */
3048 if ((listlen
= rdbLoadLen(fp
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
3049 o
= (type
== REDIS_LIST
) ? createListObject() : createSetObject();
3050 /* Load every single element of the list/set */
3054 if ((ele
= rdbLoadStringObject(fp
)) == NULL
) return NULL
;
3055 tryObjectEncoding(ele
);
3056 if (type
== REDIS_LIST
) {
3057 listAddNodeTail((list
*)o
->ptr
,ele
);
3059 dictAdd((dict
*)o
->ptr
,ele
,NULL
);
3062 } else if (type
== REDIS_ZSET
) {
3063 /* Read list/set value */
3067 if ((zsetlen
= rdbLoadLen(fp
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
3068 o
= createZsetObject();
3070 /* Load every single element of the list/set */
3073 double *score
= zmalloc(sizeof(double));
3075 if ((ele
= rdbLoadStringObject(fp
)) == NULL
) return NULL
;
3076 tryObjectEncoding(ele
);
3077 if (rdbLoadDoubleValue(fp
,score
) == -1) return NULL
;
3078 dictAdd(zs
->dict
,ele
,score
);
3079 zslInsert(zs
->zsl
,*score
,ele
);
3080 incrRefCount(ele
); /* added to skiplist */
3083 redisAssert(0 != 0);
3088 static int rdbLoad(char *filename
) {
3090 robj
*keyobj
= NULL
;
3092 int type
, retval
, rdbver
;
3093 dict
*d
= server
.db
[0].dict
;
3094 redisDb
*db
= server
.db
+0;
3096 time_t expiretime
= -1, now
= time(NULL
);
3098 fp
= fopen(filename
,"r");
3099 if (!fp
) return REDIS_ERR
;
3100 if (fread(buf
,9,1,fp
) == 0) goto eoferr
;
3102 if (memcmp(buf
,"REDIS",5) != 0) {
3104 redisLog(REDIS_WARNING
,"Wrong signature trying to load DB from file");
3107 rdbver
= atoi(buf
+5);
3110 redisLog(REDIS_WARNING
,"Can't handle RDB format version %d",rdbver
);
3117 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
3118 if (type
== REDIS_EXPIRETIME
) {
3119 if ((expiretime
= rdbLoadTime(fp
)) == -1) goto eoferr
;
3120 /* We read the time so we need to read the object type again */
3121 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
3123 if (type
== REDIS_EOF
) break;
3124 /* Handle SELECT DB opcode as a special case */
3125 if (type
== REDIS_SELECTDB
) {
3126 if ((dbid
= rdbLoadLen(fp
,NULL
)) == REDIS_RDB_LENERR
)
3128 if (dbid
>= (unsigned)server
.dbnum
) {
3129 redisLog(REDIS_WARNING
,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server
.dbnum
);
3132 db
= server
.db
+dbid
;
3137 if ((keyobj
= rdbLoadStringObject(fp
)) == NULL
) goto eoferr
;
3139 if ((o
= rdbLoadObject(type
,fp
)) == NULL
) goto eoferr
;
3140 /* Add the new object in the hash table */
3141 retval
= dictAdd(d
,keyobj
,o
);
3142 if (retval
== DICT_ERR
) {
3143 redisLog(REDIS_WARNING
,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj
->ptr
);
3146 /* Set the expire time if needed */
3147 if (expiretime
!= -1) {
3148 setExpire(db
,keyobj
,expiretime
);
3149 /* Delete this key if already expired */
3150 if (expiretime
< now
) deleteKey(db
,keyobj
);
3158 eoferr
: /* unexpected end of file is handled here with a fatal exit */
3159 if (keyobj
) decrRefCount(keyobj
);
3160 redisLog(REDIS_WARNING
,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
3162 return REDIS_ERR
; /* Just to avoid warning */
3165 /*================================== Commands =============================== */
3167 static void authCommand(redisClient
*c
) {
3168 if (!server
.requirepass
|| !strcmp(c
->argv
[1]->ptr
, server
.requirepass
)) {
3169 c
->authenticated
= 1;
3170 addReply(c
,shared
.ok
);
3172 c
->authenticated
= 0;
3173 addReplySds(c
,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
3177 static void pingCommand(redisClient
*c
) {
3178 addReply(c
,shared
.pong
);
3181 static void echoCommand(redisClient
*c
) {
3182 addReplyBulkLen(c
,c
->argv
[1]);
3183 addReply(c
,c
->argv
[1]);
3184 addReply(c
,shared
.crlf
);
3187 /*=================================== Strings =============================== */
3189 static void setGenericCommand(redisClient
*c
, int nx
) {
3192 if (nx
) deleteIfVolatile(c
->db
,c
->argv
[1]);
3193 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
3194 if (retval
== DICT_ERR
) {
3196 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
3197 incrRefCount(c
->argv
[2]);
3199 addReply(c
,shared
.czero
);
3203 incrRefCount(c
->argv
[1]);
3204 incrRefCount(c
->argv
[2]);
3207 removeExpire(c
->db
,c
->argv
[1]);
3208 addReply(c
, nx
? shared
.cone
: shared
.ok
);
3211 static void setCommand(redisClient
*c
) {
3212 setGenericCommand(c
,0);
3215 static void setnxCommand(redisClient
*c
) {
3216 setGenericCommand(c
,1);
3219 static int getGenericCommand(redisClient
*c
) {
3220 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3223 addReply(c
,shared
.nullbulk
);
3226 if (o
->type
!= REDIS_STRING
) {
3227 addReply(c
,shared
.wrongtypeerr
);
3230 addReplyBulkLen(c
,o
);
3232 addReply(c
,shared
.crlf
);
3238 static void getCommand(redisClient
*c
) {
3239 getGenericCommand(c
);
3242 static void getsetCommand(redisClient
*c
) {
3243 if (getGenericCommand(c
) == REDIS_ERR
) return;
3244 if (dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]) == DICT_ERR
) {
3245 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
3247 incrRefCount(c
->argv
[1]);
3249 incrRefCount(c
->argv
[2]);
3251 removeExpire(c
->db
,c
->argv
[1]);
3254 static void mgetCommand(redisClient
*c
) {
3257 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",c
->argc
-1));
3258 for (j
= 1; j
< c
->argc
; j
++) {
3259 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[j
]);
3261 addReply(c
,shared
.nullbulk
);
3263 if (o
->type
!= REDIS_STRING
) {
3264 addReply(c
,shared
.nullbulk
);
3266 addReplyBulkLen(c
,o
);
3268 addReply(c
,shared
.crlf
);
3274 static void msetGenericCommand(redisClient
*c
, int nx
) {
3275 int j
, busykeys
= 0;
3277 if ((c
->argc
% 2) == 0) {
3278 addReplySds(c
,sdsnew("-ERR wrong number of arguments for MSET\r\n"));
3281 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
3282 * set nothing at all if at least one already key exists. */
3284 for (j
= 1; j
< c
->argc
; j
+= 2) {
3285 if (lookupKeyWrite(c
->db
,c
->argv
[j
]) != NULL
) {
3291 addReply(c
, shared
.czero
);
3295 for (j
= 1; j
< c
->argc
; j
+= 2) {
3298 tryObjectEncoding(c
->argv
[j
+1]);
3299 retval
= dictAdd(c
->db
->dict
,c
->argv
[j
],c
->argv
[j
+1]);
3300 if (retval
== DICT_ERR
) {
3301 dictReplace(c
->db
->dict
,c
->argv
[j
],c
->argv
[j
+1]);
3302 incrRefCount(c
->argv
[j
+1]);
3304 incrRefCount(c
->argv
[j
]);
3305 incrRefCount(c
->argv
[j
+1]);
3307 removeExpire(c
->db
,c
->argv
[j
]);
3309 server
.dirty
+= (c
->argc
-1)/2;
3310 addReply(c
, nx
? shared
.cone
: shared
.ok
);
3313 static void msetCommand(redisClient
*c
) {
3314 msetGenericCommand(c
,0);
3317 static void msetnxCommand(redisClient
*c
) {
3318 msetGenericCommand(c
,1);
3321 static void incrDecrCommand(redisClient
*c
, long long incr
) {
3326 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3330 if (o
->type
!= REDIS_STRING
) {
3335 if (o
->encoding
== REDIS_ENCODING_RAW
)
3336 value
= strtoll(o
->ptr
, &eptr
, 10);
3337 else if (o
->encoding
== REDIS_ENCODING_INT
)
3338 value
= (long)o
->ptr
;
3340 redisAssert(1 != 1);
3345 o
= createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",value
));
3346 tryObjectEncoding(o
);
3347 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],o
);
3348 if (retval
== DICT_ERR
) {
3349 dictReplace(c
->db
->dict
,c
->argv
[1],o
);
3350 removeExpire(c
->db
,c
->argv
[1]);
3352 incrRefCount(c
->argv
[1]);
3355 addReply(c
,shared
.colon
);
3357 addReply(c
,shared
.crlf
);
3360 static void incrCommand(redisClient
*c
) {
3361 incrDecrCommand(c
,1);
3364 static void decrCommand(redisClient
*c
) {
3365 incrDecrCommand(c
,-1);
3368 static void incrbyCommand(redisClient
*c
) {
3369 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
3370 incrDecrCommand(c
,incr
);
3373 static void decrbyCommand(redisClient
*c
) {
3374 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
3375 incrDecrCommand(c
,-incr
);
3378 /* ========================= Type agnostic commands ========================= */
3380 static void delCommand(redisClient
*c
) {
3383 for (j
= 1; j
< c
->argc
; j
++) {
3384 if (deleteKey(c
->db
,c
->argv
[j
])) {
3391 addReply(c
,shared
.czero
);
3394 addReply(c
,shared
.cone
);
3397 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",deleted
));
3402 static void existsCommand(redisClient
*c
) {
3403 addReply(c
,lookupKeyRead(c
->db
,c
->argv
[1]) ? shared
.cone
: shared
.czero
);
3406 static void selectCommand(redisClient
*c
) {
3407 int id
= atoi(c
->argv
[1]->ptr
);
3409 if (selectDb(c
,id
) == REDIS_ERR
) {
3410 addReplySds(c
,sdsnew("-ERR invalid DB index\r\n"));
3412 addReply(c
,shared
.ok
);
3416 static void randomkeyCommand(redisClient
*c
) {
3420 de
= dictGetRandomKey(c
->db
->dict
);
3421 if (!de
|| expireIfNeeded(c
->db
,dictGetEntryKey(de
)) == 0) break;
3424 addReply(c
,shared
.plus
);
3425 addReply(c
,shared
.crlf
);
3427 addReply(c
,shared
.plus
);
3428 addReply(c
,dictGetEntryKey(de
));
3429 addReply(c
,shared
.crlf
);
3433 static void keysCommand(redisClient
*c
) {
3436 sds pattern
= c
->argv
[1]->ptr
;
3437 int plen
= sdslen(pattern
);
3438 unsigned long numkeys
= 0, keyslen
= 0;
3439 robj
*lenobj
= createObject(REDIS_STRING
,NULL
);
3441 di
= dictGetIterator(c
->db
->dict
);
3443 decrRefCount(lenobj
);
3444 while((de
= dictNext(di
)) != NULL
) {
3445 robj
*keyobj
= dictGetEntryKey(de
);
3447 sds key
= keyobj
->ptr
;
3448 if ((pattern
[0] == '*' && pattern
[1] == '\0') ||
3449 stringmatchlen(pattern
,plen
,key
,sdslen(key
),0)) {
3450 if (expireIfNeeded(c
->db
,keyobj
) == 0) {
3452 addReply(c
,shared
.space
);
3455 keyslen
+= sdslen(key
);
3459 dictReleaseIterator(di
);
3460 lenobj
->ptr
= sdscatprintf(sdsempty(),"$%lu\r\n",keyslen
+(numkeys
? (numkeys
-1) : 0));
3461 addReply(c
,shared
.crlf
);
3464 static void dbsizeCommand(redisClient
*c
) {
3466 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c
->db
->dict
)));
3469 static void lastsaveCommand(redisClient
*c
) {
3471 sdscatprintf(sdsempty(),":%lu\r\n",server
.lastsave
));
3474 static void typeCommand(redisClient
*c
) {
3478 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3483 case REDIS_STRING
: type
= "+string"; break;
3484 case REDIS_LIST
: type
= "+list"; break;
3485 case REDIS_SET
: type
= "+set"; break;
3486 case REDIS_ZSET
: type
= "+zset"; break;
3487 default: type
= "unknown"; break;
3490 addReplySds(c
,sdsnew(type
));
3491 addReply(c
,shared
.crlf
);
3494 static void saveCommand(redisClient
*c
) {
3495 if (server
.bgsavechildpid
!= -1) {
3496 addReplySds(c
,sdsnew("-ERR background save in progress\r\n"));
3499 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
3500 addReply(c
,shared
.ok
);
3502 addReply(c
,shared
.err
);
3506 static void bgsaveCommand(redisClient
*c
) {
3507 if (server
.bgsavechildpid
!= -1) {
3508 addReplySds(c
,sdsnew("-ERR background save already in progress\r\n"));
3511 if (rdbSaveBackground(server
.dbfilename
) == REDIS_OK
) {
3512 char *status
= "+Background saving started\r\n";
3513 addReplySds(c
,sdsnew(status
));
3515 addReply(c
,shared
.err
);
3519 static void shutdownCommand(redisClient
*c
) {
3520 redisLog(REDIS_WARNING
,"User requested shutdown, saving DB...");
3521 /* Kill the saving child if there is a background saving in progress.
3522 We want to avoid race conditions, for instance our saving child may
3523 overwrite the synchronous saving did by SHUTDOWN. */
3524 if (server
.bgsavechildpid
!= -1) {
3525 redisLog(REDIS_WARNING
,"There is a live saving child. Killing it!");
3526 kill(server
.bgsavechildpid
,SIGKILL
);
3527 rdbRemoveTempFile(server
.bgsavechildpid
);
3529 if (server
.appendonly
) {
3530 /* Append only file: fsync() the AOF and exit */
3531 fsync(server
.appendfd
);
3534 /* Snapshotting. Perform a SYNC SAVE and exit */
3535 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
3536 if (server
.daemonize
)
3537 unlink(server
.pidfile
);
3538 redisLog(REDIS_WARNING
,"%zu bytes used at exit",zmalloc_used_memory());
3539 redisLog(REDIS_WARNING
,"Server exit now, bye bye...");
3542 /* Ooops.. error saving! The best we can do is to continue operating.
3543 * Note that if there was a background saving process, in the next
3544 * cron() Redis will be notified that the background saving aborted,
3545 * handling special stuff like slaves pending for synchronization... */
3546 redisLog(REDIS_WARNING
,"Error trying to save the DB, can't exit");
3547 addReplySds(c
,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3552 static void renameGenericCommand(redisClient
*c
, int nx
) {
3555 /* To use the same key as src and dst is probably an error */
3556 if (sdscmp(c
->argv
[1]->ptr
,c
->argv
[2]->ptr
) == 0) {
3557 addReply(c
,shared
.sameobjecterr
);
3561 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3563 addReply(c
,shared
.nokeyerr
);
3567 deleteIfVolatile(c
->db
,c
->argv
[2]);
3568 if (dictAdd(c
->db
->dict
,c
->argv
[2],o
) == DICT_ERR
) {
3571 addReply(c
,shared
.czero
);
3574 dictReplace(c
->db
->dict
,c
->argv
[2],o
);
3576 incrRefCount(c
->argv
[2]);
3578 deleteKey(c
->db
,c
->argv
[1]);
3580 addReply(c
,nx
? shared
.cone
: shared
.ok
);
3583 static void renameCommand(redisClient
*c
) {
3584 renameGenericCommand(c
,0);
3587 static void renamenxCommand(redisClient
*c
) {
3588 renameGenericCommand(c
,1);
3591 static void moveCommand(redisClient
*c
) {
3596 /* Obtain source and target DB pointers */
3599 if (selectDb(c
,atoi(c
->argv
[2]->ptr
)) == REDIS_ERR
) {
3600 addReply(c
,shared
.outofrangeerr
);
3604 selectDb(c
,srcid
); /* Back to the source DB */
3606 /* If the user is moving using as target the same
3607 * DB as the source DB it is probably an error. */
3609 addReply(c
,shared
.sameobjecterr
);
3613 /* Check if the element exists and get a reference */
3614 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3616 addReply(c
,shared
.czero
);
3620 /* Try to add the element to the target DB */
3621 deleteIfVolatile(dst
,c
->argv
[1]);
3622 if (dictAdd(dst
->dict
,c
->argv
[1],o
) == DICT_ERR
) {
3623 addReply(c
,shared
.czero
);
3626 incrRefCount(c
->argv
[1]);
3629 /* OK! key moved, free the entry in the source DB */
3630 deleteKey(src
,c
->argv
[1]);
3632 addReply(c
,shared
.cone
);
3635 /* =================================== Lists ================================ */
3636 static void pushGenericCommand(redisClient
*c
, int where
) {
3640 lobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3642 if (handleClientsWaitingListPush(c
,c
->argv
[1],c
->argv
[2])) {
3643 addReply(c
,shared
.ok
);
3646 lobj
= createListObject();
3648 if (where
== REDIS_HEAD
) {
3649 listAddNodeHead(list
,c
->argv
[2]);
3651 listAddNodeTail(list
,c
->argv
[2]);
3653 dictAdd(c
->db
->dict
,c
->argv
[1],lobj
);
3654 incrRefCount(c
->argv
[1]);
3655 incrRefCount(c
->argv
[2]);
3657 if (lobj
->type
!= REDIS_LIST
) {
3658 addReply(c
,shared
.wrongtypeerr
);
3661 if (handleClientsWaitingListPush(c
,c
->argv
[1],c
->argv
[2])) {
3662 addReply(c
,shared
.ok
);
3666 if (where
== REDIS_HEAD
) {
3667 listAddNodeHead(list
,c
->argv
[2]);
3669 listAddNodeTail(list
,c
->argv
[2]);
3671 incrRefCount(c
->argv
[2]);
3674 addReply(c
,shared
.ok
);
3677 static void lpushCommand(redisClient
*c
) {
3678 pushGenericCommand(c
,REDIS_HEAD
);
3681 static void rpushCommand(redisClient
*c
) {
3682 pushGenericCommand(c
,REDIS_TAIL
);
3685 static void llenCommand(redisClient
*c
) {
3689 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3691 addReply(c
,shared
.czero
);
3694 if (o
->type
!= REDIS_LIST
) {
3695 addReply(c
,shared
.wrongtypeerr
);
3698 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",listLength(l
)));
3703 static void lindexCommand(redisClient
*c
) {
3705 int index
= atoi(c
->argv
[2]->ptr
);
3707 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3709 addReply(c
,shared
.nullbulk
);
3711 if (o
->type
!= REDIS_LIST
) {
3712 addReply(c
,shared
.wrongtypeerr
);
3714 list
*list
= o
->ptr
;
3717 ln
= listIndex(list
, index
);
3719 addReply(c
,shared
.nullbulk
);
3721 robj
*ele
= listNodeValue(ln
);
3722 addReplyBulkLen(c
,ele
);
3724 addReply(c
,shared
.crlf
);
3730 static void lsetCommand(redisClient
*c
) {
3732 int index
= atoi(c
->argv
[2]->ptr
);
3734 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3736 addReply(c
,shared
.nokeyerr
);
3738 if (o
->type
!= REDIS_LIST
) {
3739 addReply(c
,shared
.wrongtypeerr
);
3741 list
*list
= o
->ptr
;
3744 ln
= listIndex(list
, index
);
3746 addReply(c
,shared
.outofrangeerr
);
3748 robj
*ele
= listNodeValue(ln
);
3751 listNodeValue(ln
) = c
->argv
[3];
3752 incrRefCount(c
->argv
[3]);
3753 addReply(c
,shared
.ok
);
3760 static void popGenericCommand(redisClient
*c
, int where
) {
3763 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3765 addReply(c
,shared
.nullbulk
);
3767 if (o
->type
!= REDIS_LIST
) {
3768 addReply(c
,shared
.wrongtypeerr
);
3770 list
*list
= o
->ptr
;
3773 if (where
== REDIS_HEAD
)
3774 ln
= listFirst(list
);
3776 ln
= listLast(list
);
3779 addReply(c
,shared
.nullbulk
);
3781 robj
*ele
= listNodeValue(ln
);
3782 addReplyBulkLen(c
,ele
);
3784 addReply(c
,shared
.crlf
);
3785 listDelNode(list
,ln
);
3792 static void lpopCommand(redisClient
*c
) {
3793 popGenericCommand(c
,REDIS_HEAD
);
3796 static void rpopCommand(redisClient
*c
) {
3797 popGenericCommand(c
,REDIS_TAIL
);
3800 static void lrangeCommand(redisClient
*c
) {
3802 int start
= atoi(c
->argv
[2]->ptr
);
3803 int end
= atoi(c
->argv
[3]->ptr
);
3805 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3807 addReply(c
,shared
.nullmultibulk
);
3809 if (o
->type
!= REDIS_LIST
) {
3810 addReply(c
,shared
.wrongtypeerr
);
3812 list
*list
= o
->ptr
;
3814 int llen
= listLength(list
);
3818 /* convert negative indexes */
3819 if (start
< 0) start
= llen
+start
;
3820 if (end
< 0) end
= llen
+end
;
3821 if (start
< 0) start
= 0;
3822 if (end
< 0) end
= 0;
3824 /* indexes sanity checks */
3825 if (start
> end
|| start
>= llen
) {
3826 /* Out of range start or start > end result in empty list */
3827 addReply(c
,shared
.emptymultibulk
);
3830 if (end
>= llen
) end
= llen
-1;
3831 rangelen
= (end
-start
)+1;
3833 /* Return the result in form of a multi-bulk reply */
3834 ln
= listIndex(list
, start
);
3835 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",rangelen
));
3836 for (j
= 0; j
< rangelen
; j
++) {
3837 ele
= listNodeValue(ln
);
3838 addReplyBulkLen(c
,ele
);
3840 addReply(c
,shared
.crlf
);
3847 static void ltrimCommand(redisClient
*c
) {
3849 int start
= atoi(c
->argv
[2]->ptr
);
3850 int end
= atoi(c
->argv
[3]->ptr
);
3852 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3854 addReply(c
,shared
.ok
);
3856 if (o
->type
!= REDIS_LIST
) {
3857 addReply(c
,shared
.wrongtypeerr
);
3859 list
*list
= o
->ptr
;
3861 int llen
= listLength(list
);
3862 int j
, ltrim
, rtrim
;
3864 /* convert negative indexes */
3865 if (start
< 0) start
= llen
+start
;
3866 if (end
< 0) end
= llen
+end
;
3867 if (start
< 0) start
= 0;
3868 if (end
< 0) end
= 0;
3870 /* indexes sanity checks */
3871 if (start
> end
|| start
>= llen
) {
3872 /* Out of range start or start > end result in empty list */
3876 if (end
>= llen
) end
= llen
-1;
3881 /* Remove list elements to perform the trim */
3882 for (j
= 0; j
< ltrim
; j
++) {
3883 ln
= listFirst(list
);
3884 listDelNode(list
,ln
);
3886 for (j
= 0; j
< rtrim
; j
++) {
3887 ln
= listLast(list
);
3888 listDelNode(list
,ln
);
3891 addReply(c
,shared
.ok
);
3896 static void lremCommand(redisClient
*c
) {
3899 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3901 addReply(c
,shared
.czero
);
3903 if (o
->type
!= REDIS_LIST
) {
3904 addReply(c
,shared
.wrongtypeerr
);
3906 list
*list
= o
->ptr
;
3907 listNode
*ln
, *next
;
3908 int toremove
= atoi(c
->argv
[2]->ptr
);
3913 toremove
= -toremove
;
3916 ln
= fromtail
? list
->tail
: list
->head
;
3918 robj
*ele
= listNodeValue(ln
);
3920 next
= fromtail
? ln
->prev
: ln
->next
;
3921 if (compareStringObjects(ele
,c
->argv
[3]) == 0) {
3922 listDelNode(list
,ln
);
3925 if (toremove
&& removed
== toremove
) break;
3929 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",removed
));
3934 /* This is the semantic of this command:
3935 * RPOPLPUSH srclist dstlist:
3936 * IF LLEN(srclist) > 0
3937 * element = RPOP srclist
3938 * LPUSH dstlist element
3945 * The idea is to be able to get an element from a list in a reliable way
3946 * since the element is not just returned but pushed against another list
3947 * as well. This command was originally proposed by Ezra Zygmuntowicz.
3949 static void rpoplpushcommand(redisClient
*c
) {
3952 sobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3954 addReply(c
,shared
.nullbulk
);
3956 if (sobj
->type
!= REDIS_LIST
) {
3957 addReply(c
,shared
.wrongtypeerr
);
3959 list
*srclist
= sobj
->ptr
;
3960 listNode
*ln
= listLast(srclist
);
3963 addReply(c
,shared
.nullbulk
);
3965 robj
*dobj
= lookupKeyWrite(c
->db
,c
->argv
[2]);
3966 robj
*ele
= listNodeValue(ln
);
3969 if (dobj
&& dobj
->type
!= REDIS_LIST
) {
3970 addReply(c
,shared
.wrongtypeerr
);
3974 /* Add the element to the target list (unless it's directly
3975 * passed to some BLPOP-ing client */
3976 if (!handleClientsWaitingListPush(c
,c
->argv
[2],ele
)) {
3978 /* Create the list if the key does not exist */
3979 dobj
= createListObject();
3980 dictAdd(c
->db
->dict
,c
->argv
[2],dobj
);
3981 incrRefCount(c
->argv
[2]);
3983 dstlist
= dobj
->ptr
;
3984 listAddNodeHead(dstlist
,ele
);
3988 /* Send the element to the client as reply as well */
3989 addReplyBulkLen(c
,ele
);
3991 addReply(c
,shared
.crlf
);
3993 /* Finally remove the element from the source list */
3994 listDelNode(srclist
,ln
);
4002 /* ==================================== Sets ================================ */
4004 static void saddCommand(redisClient
*c
) {
4007 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4009 set
= createSetObject();
4010 dictAdd(c
->db
->dict
,c
->argv
[1],set
);
4011 incrRefCount(c
->argv
[1]);
4013 if (set
->type
!= REDIS_SET
) {
4014 addReply(c
,shared
.wrongtypeerr
);
4018 if (dictAdd(set
->ptr
,c
->argv
[2],NULL
) == DICT_OK
) {
4019 incrRefCount(c
->argv
[2]);
4021 addReply(c
,shared
.cone
);
4023 addReply(c
,shared
.czero
);
4027 static void sremCommand(redisClient
*c
) {
4030 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4032 addReply(c
,shared
.czero
);
4034 if (set
->type
!= REDIS_SET
) {
4035 addReply(c
,shared
.wrongtypeerr
);
4038 if (dictDelete(set
->ptr
,c
->argv
[2]) == DICT_OK
) {
4040 if (htNeedsResize(set
->ptr
)) dictResize(set
->ptr
);
4041 addReply(c
,shared
.cone
);
4043 addReply(c
,shared
.czero
);
4048 static void smoveCommand(redisClient
*c
) {
4049 robj
*srcset
, *dstset
;
4051 srcset
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4052 dstset
= lookupKeyWrite(c
->db
,c
->argv
[2]);
4054 /* If the source key does not exist return 0, if it's of the wrong type
4056 if (srcset
== NULL
|| srcset
->type
!= REDIS_SET
) {
4057 addReply(c
, srcset
? shared
.wrongtypeerr
: shared
.czero
);
4060 /* Error if the destination key is not a set as well */
4061 if (dstset
&& dstset
->type
!= REDIS_SET
) {
4062 addReply(c
,shared
.wrongtypeerr
);
4065 /* Remove the element from the source set */
4066 if (dictDelete(srcset
->ptr
,c
->argv
[3]) == DICT_ERR
) {
4067 /* Key not found in the src set! return zero */
4068 addReply(c
,shared
.czero
);
4072 /* Add the element to the destination set */
4074 dstset
= createSetObject();
4075 dictAdd(c
->db
->dict
,c
->argv
[2],dstset
);
4076 incrRefCount(c
->argv
[2]);
4078 if (dictAdd(dstset
->ptr
,c
->argv
[3],NULL
) == DICT_OK
)
4079 incrRefCount(c
->argv
[3]);
4080 addReply(c
,shared
.cone
);
4083 static void sismemberCommand(redisClient
*c
) {
4086 set
= lookupKeyRead(c
->db
,c
->argv
[1]);
4088 addReply(c
,shared
.czero
);
4090 if (set
->type
!= REDIS_SET
) {
4091 addReply(c
,shared
.wrongtypeerr
);
4094 if (dictFind(set
->ptr
,c
->argv
[2]))
4095 addReply(c
,shared
.cone
);
4097 addReply(c
,shared
.czero
);
4101 static void scardCommand(redisClient
*c
) {
4105 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4107 addReply(c
,shared
.czero
);
4110 if (o
->type
!= REDIS_SET
) {
4111 addReply(c
,shared
.wrongtypeerr
);
4114 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",
4120 static void spopCommand(redisClient
*c
) {
4124 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4126 addReply(c
,shared
.nullbulk
);
4128 if (set
->type
!= REDIS_SET
) {
4129 addReply(c
,shared
.wrongtypeerr
);
4132 de
= dictGetRandomKey(set
->ptr
);
4134 addReply(c
,shared
.nullbulk
);
4136 robj
*ele
= dictGetEntryKey(de
);
4138 addReplyBulkLen(c
,ele
);
4140 addReply(c
,shared
.crlf
);
4141 dictDelete(set
->ptr
,ele
);
4142 if (htNeedsResize(set
->ptr
)) dictResize(set
->ptr
);
4148 static void srandmemberCommand(redisClient
*c
) {
4152 set
= lookupKeyRead(c
->db
,c
->argv
[1]);
4154 addReply(c
,shared
.nullbulk
);
4156 if (set
->type
!= REDIS_SET
) {
4157 addReply(c
,shared
.wrongtypeerr
);
4160 de
= dictGetRandomKey(set
->ptr
);
4162 addReply(c
,shared
.nullbulk
);
4164 robj
*ele
= dictGetEntryKey(de
);
4166 addReplyBulkLen(c
,ele
);
4168 addReply(c
,shared
.crlf
);
4173 static int qsortCompareSetsByCardinality(const void *s1
, const void *s2
) {
4174 dict
**d1
= (void*) s1
, **d2
= (void*) s2
;
4176 return dictSize(*d1
)-dictSize(*d2
);
4179 static void sinterGenericCommand(redisClient
*c
, robj
**setskeys
, unsigned long setsnum
, robj
*dstkey
) {
4180 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
4183 robj
*lenobj
= NULL
, *dstset
= NULL
;
4184 unsigned long j
, cardinality
= 0;
4186 for (j
= 0; j
< setsnum
; j
++) {
4190 lookupKeyWrite(c
->db
,setskeys
[j
]) :
4191 lookupKeyRead(c
->db
,setskeys
[j
]);
4195 if (deleteKey(c
->db
,dstkey
))
4197 addReply(c
,shared
.czero
);
4199 addReply(c
,shared
.nullmultibulk
);
4203 if (setobj
->type
!= REDIS_SET
) {
4205 addReply(c
,shared
.wrongtypeerr
);
4208 dv
[j
] = setobj
->ptr
;
4210 /* Sort sets from the smallest to largest, this will improve our
4211 * algorithm's performace */
4212 qsort(dv
,setsnum
,sizeof(dict
*),qsortCompareSetsByCardinality
);
4214 /* The first thing we should output is the total number of elements...
4215 * since this is a multi-bulk write, but at this stage we don't know
4216 * the intersection set size, so we use a trick, append an empty object
4217 * to the output list and save the pointer to later modify it with the
4220 lenobj
= createObject(REDIS_STRING
,NULL
);
4222 decrRefCount(lenobj
);
4224 /* If we have a target key where to store the resulting set
4225 * create this key with an empty set inside */
4226 dstset
= createSetObject();
4229 /* Iterate all the elements of the first (smallest) set, and test
4230 * the element against all the other sets, if at least one set does
4231 * not include the element it is discarded */
4232 di
= dictGetIterator(dv
[0]);
4234 while((de
= dictNext(di
)) != NULL
) {
4237 for (j
= 1; j
< setsnum
; j
++)
4238 if (dictFind(dv
[j
],dictGetEntryKey(de
)) == NULL
) break;
4240 continue; /* at least one set does not contain the member */
4241 ele
= dictGetEntryKey(de
);
4243 addReplyBulkLen(c
,ele
);
4245 addReply(c
,shared
.crlf
);
4248 dictAdd(dstset
->ptr
,ele
,NULL
);
4252 dictReleaseIterator(di
);
4255 /* Store the resulting set into the target */
4256 deleteKey(c
->db
,dstkey
);
4257 dictAdd(c
->db
->dict
,dstkey
,dstset
);
4258 incrRefCount(dstkey
);
4262 lenobj
->ptr
= sdscatprintf(sdsempty(),"*%lu\r\n",cardinality
);
4264 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",
4265 dictSize((dict
*)dstset
->ptr
)));
4271 static void sinterCommand(redisClient
*c
) {
4272 sinterGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
);
4275 static void sinterstoreCommand(redisClient
*c
) {
4276 sinterGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1]);
4279 #define REDIS_OP_UNION 0
4280 #define REDIS_OP_DIFF 1
4282 static void sunionDiffGenericCommand(redisClient
*c
, robj
**setskeys
, int setsnum
, robj
*dstkey
, int op
) {
4283 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
4286 robj
*dstset
= NULL
;
4287 int j
, cardinality
= 0;
4289 for (j
= 0; j
< setsnum
; j
++) {
4293 lookupKeyWrite(c
->db
,setskeys
[j
]) :
4294 lookupKeyRead(c
->db
,setskeys
[j
]);
4299 if (setobj
->type
!= REDIS_SET
) {
4301 addReply(c
,shared
.wrongtypeerr
);
4304 dv
[j
] = setobj
->ptr
;
4307 /* We need a temp set object to store our union. If the dstkey
4308 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
4309 * this set object will be the resulting object to set into the target key*/
4310 dstset
= createSetObject();
4312 /* Iterate all the elements of all the sets, add every element a single
4313 * time to the result set */
4314 for (j
= 0; j
< setsnum
; j
++) {
4315 if (op
== REDIS_OP_DIFF
&& j
== 0 && !dv
[j
]) break; /* result set is empty */
4316 if (!dv
[j
]) continue; /* non existing keys are like empty sets */
4318 di
= dictGetIterator(dv
[j
]);
4320 while((de
= dictNext(di
)) != NULL
) {
4323 /* dictAdd will not add the same element multiple times */
4324 ele
= dictGetEntryKey(de
);
4325 if (op
== REDIS_OP_UNION
|| j
== 0) {
4326 if (dictAdd(dstset
->ptr
,ele
,NULL
) == DICT_OK
) {
4330 } else if (op
== REDIS_OP_DIFF
) {
4331 if (dictDelete(dstset
->ptr
,ele
) == DICT_OK
) {
4336 dictReleaseIterator(di
);
4338 if (op
== REDIS_OP_DIFF
&& cardinality
== 0) break; /* result set is empty */
4341 /* Output the content of the resulting set, if not in STORE mode */
4343 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",cardinality
));
4344 di
= dictGetIterator(dstset
->ptr
);
4345 while((de
= dictNext(di
)) != NULL
) {
4348 ele
= dictGetEntryKey(de
);
4349 addReplyBulkLen(c
,ele
);
4351 addReply(c
,shared
.crlf
);
4353 dictReleaseIterator(di
);
4355 /* If we have a target key where to store the resulting set
4356 * create this key with the result set inside */
4357 deleteKey(c
->db
,dstkey
);
4358 dictAdd(c
->db
->dict
,dstkey
,dstset
);
4359 incrRefCount(dstkey
);
4364 decrRefCount(dstset
);
4366 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",
4367 dictSize((dict
*)dstset
->ptr
)));
4373 static void sunionCommand(redisClient
*c
) {
4374 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_UNION
);
4377 static void sunionstoreCommand(redisClient
*c
) {
4378 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_UNION
);
4381 static void sdiffCommand(redisClient
*c
) {
4382 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_DIFF
);
4385 static void sdiffstoreCommand(redisClient
*c
) {
4386 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_DIFF
);
4389 /* ==================================== ZSets =============================== */
4391 /* ZSETs are ordered sets using two data structures to hold the same elements
4392 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
4395 * The elements are added to an hash table mapping Redis objects to scores.
4396 * At the same time the elements are added to a skip list mapping scores
4397 * to Redis objects (so objects are sorted by scores in this "view"). */
4399 /* This skiplist implementation is almost a C translation of the original
4400 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
4401 * Alternative to Balanced Trees", modified in three ways:
4402 * a) this implementation allows for repeated values.
4403 * b) the comparison is not just by key (our 'score') but by satellite data.
4404 * c) there is a back pointer, so it's a doubly linked list with the back
4405 * pointers being only at "level 1". This allows to traverse the list
4406 * from tail to head, useful for ZREVRANGE. */
4408 static zskiplistNode
*zslCreateNode(int level
, double score
, robj
*obj
) {
4409 zskiplistNode
*zn
= zmalloc(sizeof(*zn
));
4411 zn
->forward
= zmalloc(sizeof(zskiplistNode
*) * level
);
4417 static zskiplist
*zslCreate(void) {
4421 zsl
= zmalloc(sizeof(*zsl
));
4424 zsl
->header
= zslCreateNode(ZSKIPLIST_MAXLEVEL
,0,NULL
);
4425 for (j
= 0; j
< ZSKIPLIST_MAXLEVEL
; j
++)
4426 zsl
->header
->forward
[j
] = NULL
;
4427 zsl
->header
->backward
= NULL
;
4432 static void zslFreeNode(zskiplistNode
*node
) {
4433 decrRefCount(node
->obj
);
4434 zfree(node
->forward
);
4438 static void zslFree(zskiplist
*zsl
) {
4439 zskiplistNode
*node
= zsl
->header
->forward
[0], *next
;
4441 zfree(zsl
->header
->forward
);
4444 next
= node
->forward
[0];
4451 static int zslRandomLevel(void) {
4453 while ((random()&0xFFFF) < (ZSKIPLIST_P
* 0xFFFF))
4458 static void zslInsert(zskiplist
*zsl
, double score
, robj
*obj
) {
4459 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
4463 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4464 while (x
->forward
[i
] &&
4465 (x
->forward
[i
]->score
< score
||
4466 (x
->forward
[i
]->score
== score
&&
4467 compareStringObjects(x
->forward
[i
]->obj
,obj
) < 0)))
4471 /* we assume the key is not already inside, since we allow duplicated
4472 * scores, and the re-insertion of score and redis object should never
4473 * happpen since the caller of zslInsert() should test in the hash table
4474 * if the element is already inside or not. */
4475 level
= zslRandomLevel();
4476 if (level
> zsl
->level
) {
4477 for (i
= zsl
->level
; i
< level
; i
++)
4478 update
[i
] = zsl
->header
;
4481 x
= zslCreateNode(level
,score
,obj
);
4482 for (i
= 0; i
< level
; i
++) {
4483 x
->forward
[i
] = update
[i
]->forward
[i
];
4484 update
[i
]->forward
[i
] = x
;
4486 x
->backward
= (update
[0] == zsl
->header
) ? NULL
: update
[0];
4488 x
->forward
[0]->backward
= x
;
4494 /* Delete an element with matching score/object from the skiplist. */
4495 static int zslDelete(zskiplist
*zsl
, double score
, robj
*obj
) {
4496 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
4500 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4501 while (x
->forward
[i
] &&
4502 (x
->forward
[i
]->score
< score
||
4503 (x
->forward
[i
]->score
== score
&&
4504 compareStringObjects(x
->forward
[i
]->obj
,obj
) < 0)))
4508 /* We may have multiple elements with the same score, what we need
4509 * is to find the element with both the right score and object. */
4511 if (x
&& score
== x
->score
&& compareStringObjects(x
->obj
,obj
) == 0) {
4512 for (i
= 0; i
< zsl
->level
; i
++) {
4513 if (update
[i
]->forward
[i
] != x
) break;
4514 update
[i
]->forward
[i
] = x
->forward
[i
];
4516 if (x
->forward
[0]) {
4517 x
->forward
[0]->backward
= (x
->backward
== zsl
->header
) ?
4520 zsl
->tail
= x
->backward
;
4523 while(zsl
->level
> 1 && zsl
->header
->forward
[zsl
->level
-1] == NULL
)
4528 return 0; /* not found */
4530 return 0; /* not found */
4533 /* Delete all the elements with score between min and max from the skiplist.
4534 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
4535 * Note that this function takes the reference to the hash table view of the
4536 * sorted set, in order to remove the elements from the hash table too. */
4537 static unsigned long zslDeleteRange(zskiplist
*zsl
, double min
, double max
, dict
*dict
) {
4538 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
4539 unsigned long removed
= 0;
4543 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4544 while (x
->forward
[i
] && x
->forward
[i
]->score
< min
)
4548 /* We may have multiple elements with the same score, what we need
4549 * is to find the element with both the right score and object. */
4551 while (x
&& x
->score
<= max
) {
4552 zskiplistNode
*next
;
4554 for (i
= 0; i
< zsl
->level
; i
++) {
4555 if (update
[i
]->forward
[i
] != x
) break;
4556 update
[i
]->forward
[i
] = x
->forward
[i
];
4558 if (x
->forward
[0]) {
4559 x
->forward
[0]->backward
= (x
->backward
== zsl
->header
) ?
4562 zsl
->tail
= x
->backward
;
4564 next
= x
->forward
[0];
4565 dictDelete(dict
,x
->obj
);
4567 while(zsl
->level
> 1 && zsl
->header
->forward
[zsl
->level
-1] == NULL
)
4573 return removed
; /* not found */
4576 /* Find the first node having a score equal or greater than the specified one.
4577 * Returns NULL if there is no match. */
4578 static zskiplistNode
*zslFirstWithScore(zskiplist
*zsl
, double score
) {
4583 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4584 while (x
->forward
[i
] && x
->forward
[i
]->score
< score
)
4587 /* We may have multiple elements with the same score, what we need
4588 * is to find the element with both the right score and object. */
4589 return x
->forward
[0];
4592 /* The actual Z-commands implementations */
4594 /* This generic command implements both ZADD and ZINCRBY.
4595 * scoreval is the score if the operation is a ZADD (doincrement == 0) or
4596 * the increment if the operation is a ZINCRBY (doincrement == 1). */
4597 static void zaddGenericCommand(redisClient
*c
, robj
*key
, robj
*ele
, double scoreval
, int doincrement
) {
4602 zsetobj
= lookupKeyWrite(c
->db
,key
);
4603 if (zsetobj
== NULL
) {
4604 zsetobj
= createZsetObject();
4605 dictAdd(c
->db
->dict
,key
,zsetobj
);
4608 if (zsetobj
->type
!= REDIS_ZSET
) {
4609 addReply(c
,shared
.wrongtypeerr
);
4615 /* Ok now since we implement both ZADD and ZINCRBY here the code
4616 * needs to handle the two different conditions. It's all about setting
4617 * '*score', that is, the new score to set, to the right value. */
4618 score
= zmalloc(sizeof(double));
4622 /* Read the old score. If the element was not present starts from 0 */
4623 de
= dictFind(zs
->dict
,ele
);
4625 double *oldscore
= dictGetEntryVal(de
);
4626 *score
= *oldscore
+ scoreval
;
4634 /* What follows is a simple remove and re-insert operation that is common
4635 * to both ZADD and ZINCRBY... */
4636 if (dictAdd(zs
->dict
,ele
,score
) == DICT_OK
) {
4637 /* case 1: New element */
4638 incrRefCount(ele
); /* added to hash */
4639 zslInsert(zs
->zsl
,*score
,ele
);
4640 incrRefCount(ele
); /* added to skiplist */
4643 addReplyDouble(c
,*score
);
4645 addReply(c
,shared
.cone
);
4650 /* case 2: Score update operation */
4651 de
= dictFind(zs
->dict
,ele
);
4652 redisAssert(de
!= NULL
);
4653 oldscore
= dictGetEntryVal(de
);
4654 if (*score
!= *oldscore
) {
4657 /* Remove and insert the element in the skip list with new score */
4658 deleted
= zslDelete(zs
->zsl
,*oldscore
,ele
);
4659 redisAssert(deleted
!= 0);
4660 zslInsert(zs
->zsl
,*score
,ele
);
4662 /* Update the score in the hash table */
4663 dictReplace(zs
->dict
,ele
,score
);
4669 addReplyDouble(c
,*score
);
4671 addReply(c
,shared
.czero
);
4675 static void zaddCommand(redisClient
*c
) {
4678 scoreval
= strtod(c
->argv
[2]->ptr
,NULL
);
4679 zaddGenericCommand(c
,c
->argv
[1],c
->argv
[3],scoreval
,0);
4682 static void zincrbyCommand(redisClient
*c
) {
4685 scoreval
= strtod(c
->argv
[2]->ptr
,NULL
);
4686 zaddGenericCommand(c
,c
->argv
[1],c
->argv
[3],scoreval
,1);
4689 static void zremCommand(redisClient
*c
) {
4693 zsetobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4694 if (zsetobj
== NULL
) {
4695 addReply(c
,shared
.czero
);
4701 if (zsetobj
->type
!= REDIS_ZSET
) {
4702 addReply(c
,shared
.wrongtypeerr
);
4706 de
= dictFind(zs
->dict
,c
->argv
[2]);
4708 addReply(c
,shared
.czero
);
4711 /* Delete from the skiplist */
4712 oldscore
= dictGetEntryVal(de
);
4713 deleted
= zslDelete(zs
->zsl
,*oldscore
,c
->argv
[2]);
4714 redisAssert(deleted
!= 0);
4716 /* Delete from the hash table */
4717 dictDelete(zs
->dict
,c
->argv
[2]);
4718 if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
);
4720 addReply(c
,shared
.cone
);
4724 static void zremrangebyscoreCommand(redisClient
*c
) {
4725 double min
= strtod(c
->argv
[2]->ptr
,NULL
);
4726 double max
= strtod(c
->argv
[3]->ptr
,NULL
);
4730 zsetobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4731 if (zsetobj
== NULL
) {
4732 addReply(c
,shared
.czero
);
4736 if (zsetobj
->type
!= REDIS_ZSET
) {
4737 addReply(c
,shared
.wrongtypeerr
);
4741 deleted
= zslDeleteRange(zs
->zsl
,min
,max
,zs
->dict
);
4742 if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
);
4743 server
.dirty
+= deleted
;
4744 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",deleted
));
4748 static void zrangeGenericCommand(redisClient
*c
, int reverse
) {
4750 int start
= atoi(c
->argv
[2]->ptr
);
4751 int end
= atoi(c
->argv
[3]->ptr
);
4754 if (c
->argc
== 5 && !strcasecmp(c
->argv
[4]->ptr
,"withscores")) {
4756 } else if (c
->argc
>= 5) {
4757 addReply(c
,shared
.syntaxerr
);
4761 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4763 addReply(c
,shared
.nullmultibulk
);
4765 if (o
->type
!= REDIS_ZSET
) {
4766 addReply(c
,shared
.wrongtypeerr
);
4768 zset
*zsetobj
= o
->ptr
;
4769 zskiplist
*zsl
= zsetobj
->zsl
;
4772 int llen
= zsl
->length
;
4776 /* convert negative indexes */
4777 if (start
< 0) start
= llen
+start
;
4778 if (end
< 0) end
= llen
+end
;
4779 if (start
< 0) start
= 0;
4780 if (end
< 0) end
= 0;
4782 /* indexes sanity checks */
4783 if (start
> end
|| start
>= llen
) {
4784 /* Out of range start or start > end result in empty list */
4785 addReply(c
,shared
.emptymultibulk
);
4788 if (end
>= llen
) end
= llen
-1;
4789 rangelen
= (end
-start
)+1;
4791 /* Return the result in form of a multi-bulk reply */
4797 ln
= zsl
->header
->forward
[0];
4799 ln
= ln
->forward
[0];
4802 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",
4803 withscores
? (rangelen
*2) : rangelen
));
4804 for (j
= 0; j
< rangelen
; j
++) {
4806 addReplyBulkLen(c
,ele
);
4808 addReply(c
,shared
.crlf
);
4810 addReplyDouble(c
,ln
->score
);
4811 ln
= reverse
? ln
->backward
: ln
->forward
[0];
4817 static void zrangeCommand(redisClient
*c
) {
4818 zrangeGenericCommand(c
,0);
4821 static void zrevrangeCommand(redisClient
*c
) {
4822 zrangeGenericCommand(c
,1);
4825 static void zrangebyscoreCommand(redisClient
*c
) {
4827 double min
= strtod(c
->argv
[2]->ptr
,NULL
);
4828 double max
= strtod(c
->argv
[3]->ptr
,NULL
);
4829 int offset
= 0, limit
= -1;
4831 if (c
->argc
!= 4 && c
->argc
!= 7) {
4833 sdsnew("-ERR wrong number of arguments for ZRANGEBYSCORE\r\n"));
4835 } else if (c
->argc
== 7 && strcasecmp(c
->argv
[4]->ptr
,"limit")) {
4836 addReply(c
,shared
.syntaxerr
);
4838 } else if (c
->argc
== 7) {
4839 offset
= atoi(c
->argv
[5]->ptr
);
4840 limit
= atoi(c
->argv
[6]->ptr
);
4841 if (offset
< 0) offset
= 0;
4844 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4846 addReply(c
,shared
.nullmultibulk
);
4848 if (o
->type
!= REDIS_ZSET
) {
4849 addReply(c
,shared
.wrongtypeerr
);
4851 zset
*zsetobj
= o
->ptr
;
4852 zskiplist
*zsl
= zsetobj
->zsl
;
4855 unsigned int rangelen
= 0;
4857 /* Get the first node with the score >= min */
4858 ln
= zslFirstWithScore(zsl
,min
);
4860 /* No element matching the speciifed interval */
4861 addReply(c
,shared
.emptymultibulk
);
4865 /* We don't know in advance how many matching elements there
4866 * are in the list, so we push this object that will represent
4867 * the multi-bulk length in the output buffer, and will "fix"
4869 lenobj
= createObject(REDIS_STRING
,NULL
);
4871 decrRefCount(lenobj
);
4873 while(ln
&& ln
->score
<= max
) {
4876 ln
= ln
->forward
[0];
4879 if (limit
== 0) break;
4881 addReplyBulkLen(c
,ele
);
4883 addReply(c
,shared
.crlf
);
4884 ln
= ln
->forward
[0];
4886 if (limit
> 0) limit
--;
4888 lenobj
->ptr
= sdscatprintf(sdsempty(),"*%d\r\n",rangelen
);
4893 static void zcardCommand(redisClient
*c
) {
4897 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4899 addReply(c
,shared
.czero
);
4902 if (o
->type
!= REDIS_ZSET
) {
4903 addReply(c
,shared
.wrongtypeerr
);
4906 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",zs
->zsl
->length
));
4911 static void zscoreCommand(redisClient
*c
) {
4915 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4917 addReply(c
,shared
.nullbulk
);
4920 if (o
->type
!= REDIS_ZSET
) {
4921 addReply(c
,shared
.wrongtypeerr
);
4926 de
= dictFind(zs
->dict
,c
->argv
[2]);
4928 addReply(c
,shared
.nullbulk
);
4930 double *score
= dictGetEntryVal(de
);
4932 addReplyDouble(c
,*score
);
4938 /* ========================= Non type-specific commands ==================== */
4940 static void flushdbCommand(redisClient
*c
) {
4941 server
.dirty
+= dictSize(c
->db
->dict
);
4942 dictEmpty(c
->db
->dict
);
4943 dictEmpty(c
->db
->expires
);
4944 addReply(c
,shared
.ok
);
4947 static void flushallCommand(redisClient
*c
) {
4948 server
.dirty
+= emptyDb();
4949 addReply(c
,shared
.ok
);
4950 rdbSave(server
.dbfilename
);
4954 static redisSortOperation
*createSortOperation(int type
, robj
*pattern
) {
4955 redisSortOperation
*so
= zmalloc(sizeof(*so
));
4957 so
->pattern
= pattern
;
4961 /* Return the value associated to the key with a name obtained
4962 * substituting the first occurence of '*' in 'pattern' with 'subst' */
4963 static robj
*lookupKeyByPattern(redisDb
*db
, robj
*pattern
, robj
*subst
) {
4967 int prefixlen
, sublen
, postfixlen
;
4968 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
4972 char buf
[REDIS_SORTKEY_MAX
+1];
4975 /* If the pattern is "#" return the substitution object itself in order
4976 * to implement the "SORT ... GET #" feature. */
4977 spat
= pattern
->ptr
;
4978 if (spat
[0] == '#' && spat
[1] == '\0') {
4982 /* The substitution object may be specially encoded. If so we create
4983 * a decoded object on the fly. Otherwise getDecodedObject will just
4984 * increment the ref count, that we'll decrement later. */
4985 subst
= getDecodedObject(subst
);
4988 if (sdslen(spat
)+sdslen(ssub
)-1 > REDIS_SORTKEY_MAX
) return NULL
;
4989 p
= strchr(spat
,'*');
4991 decrRefCount(subst
);
4996 sublen
= sdslen(ssub
);
4997 postfixlen
= sdslen(spat
)-(prefixlen
+1);
4998 memcpy(keyname
.buf
,spat
,prefixlen
);
4999 memcpy(keyname
.buf
+prefixlen
,ssub
,sublen
);
5000 memcpy(keyname
.buf
+prefixlen
+sublen
,p
+1,postfixlen
);
5001 keyname
.buf
[prefixlen
+sublen
+postfixlen
] = '\0';
5002 keyname
.len
= prefixlen
+sublen
+postfixlen
;
5004 initStaticStringObject(keyobj
,((char*)&keyname
)+(sizeof(long)*2))
5005 decrRefCount(subst
);
5007 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
5008 return lookupKeyRead(db
,&keyobj
);
5011 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
5012 * the additional parameter is not standard but a BSD-specific we have to
5013 * pass sorting parameters via the global 'server' structure */
5014 static int sortCompare(const void *s1
, const void *s2
) {
5015 const redisSortObject
*so1
= s1
, *so2
= s2
;
5018 if (!server
.sort_alpha
) {
5019 /* Numeric sorting. Here it's trivial as we precomputed scores */
5020 if (so1
->u
.score
> so2
->u
.score
) {
5022 } else if (so1
->u
.score
< so2
->u
.score
) {
5028 /* Alphanumeric sorting */
5029 if (server
.sort_bypattern
) {
5030 if (!so1
->u
.cmpobj
|| !so2
->u
.cmpobj
) {
5031 /* At least one compare object is NULL */
5032 if (so1
->u
.cmpobj
== so2
->u
.cmpobj
)
5034 else if (so1
->u
.cmpobj
== NULL
)
5039 /* We have both the objects, use strcoll */
5040 cmp
= strcoll(so1
->u
.cmpobj
->ptr
,so2
->u
.cmpobj
->ptr
);
5043 /* Compare elements directly */
5046 dec1
= getDecodedObject(so1
->obj
);
5047 dec2
= getDecodedObject(so2
->obj
);
5048 cmp
= strcoll(dec1
->ptr
,dec2
->ptr
);
5053 return server
.sort_desc
? -cmp
: cmp
;
5056 /* The SORT command is the most complex command in Redis. Warning: this code
5057 * is optimized for speed and a bit less for readability */
5058 static void sortCommand(redisClient
*c
) {
5061 int desc
= 0, alpha
= 0;
5062 int limit_start
= 0, limit_count
= -1, start
, end
;
5063 int j
, dontsort
= 0, vectorlen
;
5064 int getop
= 0; /* GET operation counter */
5065 robj
*sortval
, *sortby
= NULL
, *storekey
= NULL
;
5066 redisSortObject
*vector
; /* Resulting vector to sort */
5068 /* Lookup the key to sort. It must be of the right types */
5069 sortval
= lookupKeyRead(c
->db
,c
->argv
[1]);
5070 if (sortval
== NULL
) {
5071 addReply(c
,shared
.nullmultibulk
);
5074 if (sortval
->type
!= REDIS_SET
&& sortval
->type
!= REDIS_LIST
&&
5075 sortval
->type
!= REDIS_ZSET
)
5077 addReply(c
,shared
.wrongtypeerr
);
5081 /* Create a list of operations to perform for every sorted element.
5082 * Operations can be GET/DEL/INCR/DECR */
5083 operations
= listCreate();
5084 listSetFreeMethod(operations
,zfree
);
5087 /* Now we need to protect sortval incrementing its count, in the future
5088 * SORT may have options able to overwrite/delete keys during the sorting
5089 * and the sorted key itself may get destroied */
5090 incrRefCount(sortval
);
5092 /* The SORT command has an SQL-alike syntax, parse it */
5093 while(j
< c
->argc
) {
5094 int leftargs
= c
->argc
-j
-1;
5095 if (!strcasecmp(c
->argv
[j
]->ptr
,"asc")) {
5097 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"desc")) {
5099 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"alpha")) {
5101 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"limit") && leftargs
>= 2) {
5102 limit_start
= atoi(c
->argv
[j
+1]->ptr
);
5103 limit_count
= atoi(c
->argv
[j
+2]->ptr
);
5105 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"store") && leftargs
>= 1) {
5106 storekey
= c
->argv
[j
+1];
5108 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"by") && leftargs
>= 1) {
5109 sortby
= c
->argv
[j
+1];
5110 /* If the BY pattern does not contain '*', i.e. it is constant,
5111 * we don't need to sort nor to lookup the weight keys. */
5112 if (strchr(c
->argv
[j
+1]->ptr
,'*') == NULL
) dontsort
= 1;
5114 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"get") && leftargs
>= 1) {
5115 listAddNodeTail(operations
,createSortOperation(
5116 REDIS_SORT_GET
,c
->argv
[j
+1]));
5120 decrRefCount(sortval
);
5121 listRelease(operations
);
5122 addReply(c
,shared
.syntaxerr
);
5128 /* Load the sorting vector with all the objects to sort */
5129 switch(sortval
->type
) {
5130 case REDIS_LIST
: vectorlen
= listLength((list
*)sortval
->ptr
); break;
5131 case REDIS_SET
: vectorlen
= dictSize((dict
*)sortval
->ptr
); break;
5132 case REDIS_ZSET
: vectorlen
= dictSize(((zset
*)sortval
->ptr
)->dict
); break;
5133 default: vectorlen
= 0; redisAssert(0); /* Avoid GCC warning */
5135 vector
= zmalloc(sizeof(redisSortObject
)*vectorlen
);
5138 if (sortval
->type
== REDIS_LIST
) {
5139 list
*list
= sortval
->ptr
;
5143 while((ln
= listYield(list
))) {
5144 robj
*ele
= ln
->value
;
5145 vector
[j
].obj
= ele
;
5146 vector
[j
].u
.score
= 0;
5147 vector
[j
].u
.cmpobj
= NULL
;
5155 if (sortval
->type
== REDIS_SET
) {
5158 zset
*zs
= sortval
->ptr
;
5162 di
= dictGetIterator(set
);
5163 while((setele
= dictNext(di
)) != NULL
) {
5164 vector
[j
].obj
= dictGetEntryKey(setele
);
5165 vector
[j
].u
.score
= 0;
5166 vector
[j
].u
.cmpobj
= NULL
;
5169 dictReleaseIterator(di
);
5171 redisAssert(j
== vectorlen
);
5173 /* Now it's time to load the right scores in the sorting vector */
5174 if (dontsort
== 0) {
5175 for (j
= 0; j
< vectorlen
; j
++) {
5179 byval
= lookupKeyByPattern(c
->db
,sortby
,vector
[j
].obj
);
5180 if (!byval
|| byval
->type
!= REDIS_STRING
) continue;
5182 vector
[j
].u
.cmpobj
= getDecodedObject(byval
);
5184 if (byval
->encoding
== REDIS_ENCODING_RAW
) {
5185 vector
[j
].u
.score
= strtod(byval
->ptr
,NULL
);
5187 /* Don't need to decode the object if it's
5188 * integer-encoded (the only encoding supported) so
5189 * far. We can just cast it */
5190 if (byval
->encoding
== REDIS_ENCODING_INT
) {
5191 vector
[j
].u
.score
= (long)byval
->ptr
;
5193 redisAssert(1 != 1);
5198 if (vector
[j
].obj
->encoding
== REDIS_ENCODING_RAW
)
5199 vector
[j
].u
.score
= strtod(vector
[j
].obj
->ptr
,NULL
);
5201 if (vector
[j
].obj
->encoding
== REDIS_ENCODING_INT
)
5202 vector
[j
].u
.score
= (long) vector
[j
].obj
->ptr
;
5204 redisAssert(1 != 1);
5211 /* We are ready to sort the vector... perform a bit of sanity check
5212 * on the LIMIT option too. We'll use a partial version of quicksort. */
5213 start
= (limit_start
< 0) ? 0 : limit_start
;
5214 end
= (limit_count
< 0) ? vectorlen
-1 : start
+limit_count
-1;
5215 if (start
>= vectorlen
) {
5216 start
= vectorlen
-1;
5219 if (end
>= vectorlen
) end
= vectorlen
-1;
5221 if (dontsort
== 0) {
5222 server
.sort_desc
= desc
;
5223 server
.sort_alpha
= alpha
;
5224 server
.sort_bypattern
= sortby
? 1 : 0;
5225 if (sortby
&& (start
!= 0 || end
!= vectorlen
-1))
5226 pqsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
, start
,end
);
5228 qsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
);
5231 /* Send command output to the output buffer, performing the specified
5232 * GET/DEL/INCR/DECR operations if any. */
5233 outputlen
= getop
? getop
*(end
-start
+1) : end
-start
+1;
5234 if (storekey
== NULL
) {
5235 /* STORE option not specified, sent the sorting result to client */
5236 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",outputlen
));
5237 for (j
= start
; j
<= end
; j
++) {
5240 addReplyBulkLen(c
,vector
[j
].obj
);
5241 addReply(c
,vector
[j
].obj
);
5242 addReply(c
,shared
.crlf
);
5244 listRewind(operations
);
5245 while((ln
= listYield(operations
))) {
5246 redisSortOperation
*sop
= ln
->value
;
5247 robj
*val
= lookupKeyByPattern(c
->db
,sop
->pattern
,
5250 if (sop
->type
== REDIS_SORT_GET
) {
5251 if (!val
|| val
->type
!= REDIS_STRING
) {
5252 addReply(c
,shared
.nullbulk
);
5254 addReplyBulkLen(c
,val
);
5256 addReply(c
,shared
.crlf
);
5259 redisAssert(sop
->type
== REDIS_SORT_GET
); /* always fails */
5264 robj
*listObject
= createListObject();
5265 list
*listPtr
= (list
*) listObject
->ptr
;
5267 /* STORE option specified, set the sorting result as a List object */
5268 for (j
= start
; j
<= end
; j
++) {
5271 listAddNodeTail(listPtr
,vector
[j
].obj
);
5272 incrRefCount(vector
[j
].obj
);
5274 listRewind(operations
);
5275 while((ln
= listYield(operations
))) {
5276 redisSortOperation
*sop
= ln
->value
;
5277 robj
*val
= lookupKeyByPattern(c
->db
,sop
->pattern
,
5280 if (sop
->type
== REDIS_SORT_GET
) {
5281 if (!val
|| val
->type
!= REDIS_STRING
) {
5282 listAddNodeTail(listPtr
,createStringObject("",0));
5284 listAddNodeTail(listPtr
,val
);
5288 redisAssert(sop
->type
== REDIS_SORT_GET
); /* always fails */
5292 if (dictReplace(c
->db
->dict
,storekey
,listObject
)) {
5293 incrRefCount(storekey
);
5295 /* Note: we add 1 because the DB is dirty anyway since even if the
5296 * SORT result is empty a new key is set and maybe the old content
5298 server
.dirty
+= 1+outputlen
;
5299 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",outputlen
));
5303 decrRefCount(sortval
);
5304 listRelease(operations
);
5305 for (j
= 0; j
< vectorlen
; j
++) {
5306 if (sortby
&& alpha
&& vector
[j
].u
.cmpobj
)
5307 decrRefCount(vector
[j
].u
.cmpobj
);
5312 /* Create the string returned by the INFO command. This is decoupled
5313 * by the INFO command itself as we need to report the same information
5314 * on memory corruption problems. */
5315 static sds
genRedisInfoString(void) {
5317 time_t uptime
= time(NULL
)-server
.stat_starttime
;
5320 info
= sdscatprintf(sdsempty(),
5321 "redis_version:%s\r\n"
5323 "multiplexing_api:%s\r\n"
5324 "uptime_in_seconds:%ld\r\n"
5325 "uptime_in_days:%ld\r\n"
5326 "connected_clients:%d\r\n"
5327 "connected_slaves:%d\r\n"
5328 "blocked_clients:%d\r\n"
5329 "used_memory:%zu\r\n"
5330 "changes_since_last_save:%lld\r\n"
5331 "bgsave_in_progress:%d\r\n"
5332 "last_save_time:%ld\r\n"
5333 "bgrewriteaof_in_progress:%d\r\n"
5334 "total_connections_received:%lld\r\n"
5335 "total_commands_processed:%lld\r\n"
5338 (sizeof(long) == 8) ? "64" : "32",
5342 listLength(server
.clients
)-listLength(server
.slaves
),
5343 listLength(server
.slaves
),
5344 server
.blockedclients
,
5347 server
.bgsavechildpid
!= -1,
5349 server
.bgrewritechildpid
!= -1,
5350 server
.stat_numconnections
,
5351 server
.stat_numcommands
,
5352 server
.masterhost
== NULL
? "master" : "slave"
5354 if (server
.masterhost
) {
5355 info
= sdscatprintf(info
,
5356 "master_host:%s\r\n"
5357 "master_port:%d\r\n"
5358 "master_link_status:%s\r\n"
5359 "master_last_io_seconds_ago:%d\r\n"
5362 (server
.replstate
== REDIS_REPL_CONNECTED
) ?
5364 server
.master
? ((int)(time(NULL
)-server
.master
->lastinteraction
)) : -1
5367 for (j
= 0; j
< server
.dbnum
; j
++) {
5368 long long keys
, vkeys
;
5370 keys
= dictSize(server
.db
[j
].dict
);
5371 vkeys
= dictSize(server
.db
[j
].expires
);
5372 if (keys
|| vkeys
) {
5373 info
= sdscatprintf(info
, "db%d:keys=%lld,expires=%lld\r\n",
5380 static void infoCommand(redisClient
*c
) {
5381 sds info
= genRedisInfoString();
5382 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n",
5383 (unsigned long)sdslen(info
)));
5384 addReplySds(c
,info
);
5385 addReply(c
,shared
.crlf
);
5388 static void monitorCommand(redisClient
*c
) {
5389 /* ignore MONITOR if aleady slave or in monitor mode */
5390 if (c
->flags
& REDIS_SLAVE
) return;
5392 c
->flags
|= (REDIS_SLAVE
|REDIS_MONITOR
);
5394 listAddNodeTail(server
.monitors
,c
);
5395 addReply(c
,shared
.ok
);
5398 /* ================================= Expire ================================= */
5399 static int removeExpire(redisDb
*db
, robj
*key
) {
5400 if (dictDelete(db
->expires
,key
) == DICT_OK
) {
5407 static int setExpire(redisDb
*db
, robj
*key
, time_t when
) {
5408 if (dictAdd(db
->expires
,key
,(void*)when
) == DICT_ERR
) {
5416 /* Return the expire time of the specified key, or -1 if no expire
5417 * is associated with this key (i.e. the key is non volatile) */
5418 static time_t getExpire(redisDb
*db
, robj
*key
) {
5421 /* No expire? return ASAP */
5422 if (dictSize(db
->expires
) == 0 ||
5423 (de
= dictFind(db
->expires
,key
)) == NULL
) return -1;
5425 return (time_t) dictGetEntryVal(de
);
5428 static int expireIfNeeded(redisDb
*db
, robj
*key
) {
5432 /* No expire? return ASAP */
5433 if (dictSize(db
->expires
) == 0 ||
5434 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
5436 /* Lookup the expire */
5437 when
= (time_t) dictGetEntryVal(de
);
5438 if (time(NULL
) <= when
) return 0;
5440 /* Delete the key */
5441 dictDelete(db
->expires
,key
);
5442 return dictDelete(db
->dict
,key
) == DICT_OK
;
5445 static int deleteIfVolatile(redisDb
*db
, robj
*key
) {
5448 /* No expire? return ASAP */
5449 if (dictSize(db
->expires
) == 0 ||
5450 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
5452 /* Delete the key */
5454 dictDelete(db
->expires
,key
);
5455 return dictDelete(db
->dict
,key
) == DICT_OK
;
5458 static void expireGenericCommand(redisClient
*c
, robj
*key
, time_t seconds
) {
5461 de
= dictFind(c
->db
->dict
,key
);
5463 addReply(c
,shared
.czero
);
5467 if (deleteKey(c
->db
,key
)) server
.dirty
++;
5468 addReply(c
, shared
.cone
);
5471 time_t when
= time(NULL
)+seconds
;
5472 if (setExpire(c
->db
,key
,when
)) {
5473 addReply(c
,shared
.cone
);
5476 addReply(c
,shared
.czero
);
5482 static void expireCommand(redisClient
*c
) {
5483 expireGenericCommand(c
,c
->argv
[1],strtol(c
->argv
[2]->ptr
,NULL
,10));
5486 static void expireatCommand(redisClient
*c
) {
5487 expireGenericCommand(c
,c
->argv
[1],strtol(c
->argv
[2]->ptr
,NULL
,10)-time(NULL
));
5490 static void ttlCommand(redisClient
*c
) {
5494 expire
= getExpire(c
->db
,c
->argv
[1]);
5496 ttl
= (int) (expire
-time(NULL
));
5497 if (ttl
< 0) ttl
= -1;
5499 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",ttl
));
5502 /* ================================ MULTI/EXEC ============================== */
5504 /* Client state initialization for MULTI/EXEC */
5505 static void initClientMultiState(redisClient
*c
) {
5506 c
->mstate
.commands
= NULL
;
5507 c
->mstate
.count
= 0;
5510 /* Release all the resources associated with MULTI/EXEC state */
5511 static void freeClientMultiState(redisClient
*c
) {
5514 for (j
= 0; j
< c
->mstate
.count
; j
++) {
5516 multiCmd
*mc
= c
->mstate
.commands
+j
;
5518 for (i
= 0; i
< mc
->argc
; i
++)
5519 decrRefCount(mc
->argv
[i
]);
5522 zfree(c
->mstate
.commands
);
5525 /* Add a new command into the MULTI commands queue */
5526 static void queueMultiCommand(redisClient
*c
, struct redisCommand
*cmd
) {
5530 c
->mstate
.commands
= zrealloc(c
->mstate
.commands
,
5531 sizeof(multiCmd
)*(c
->mstate
.count
+1));
5532 mc
= c
->mstate
.commands
+c
->mstate
.count
;
5535 mc
->argv
= zmalloc(sizeof(robj
*)*c
->argc
);
5536 memcpy(mc
->argv
,c
->argv
,sizeof(robj
*)*c
->argc
);
5537 for (j
= 0; j
< c
->argc
; j
++)
5538 incrRefCount(mc
->argv
[j
]);
5542 static void multiCommand(redisClient
*c
) {
5543 c
->flags
|= REDIS_MULTI
;
5544 addReply(c
,shared
.ok
);
5547 static void execCommand(redisClient
*c
) {
5552 if (!(c
->flags
& REDIS_MULTI
)) {
5553 addReplySds(c
,sdsnew("-ERR EXEC without MULTI\r\n"));
5557 orig_argv
= c
->argv
;
5558 orig_argc
= c
->argc
;
5559 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",c
->mstate
.count
));
5560 for (j
= 0; j
< c
->mstate
.count
; j
++) {
5561 c
->argc
= c
->mstate
.commands
[j
].argc
;
5562 c
->argv
= c
->mstate
.commands
[j
].argv
;
5563 call(c
,c
->mstate
.commands
[j
].cmd
);
5565 c
->argv
= orig_argv
;
5566 c
->argc
= orig_argc
;
5567 freeClientMultiState(c
);
5568 initClientMultiState(c
);
5569 c
->flags
&= (~REDIS_MULTI
);
5572 /* =========================== Blocking Operations ========================= */
5574 /* Currently Redis blocking operations support is limited to list POP ops,
5575 * so the current implementation is not fully generic, but it is also not
5576 * completely specific so it will not require a rewrite to support new
5577 * kind of blocking operations in the future.
5579 * Still it's important to note that list blocking operations can be already
5580 * used as a notification mechanism in order to implement other blocking
5581 * operations at application level, so there must be a very strong evidence
5582 * of usefulness and generality before new blocking operations are implemented.
5584 * This is how the current blocking POP works, we use BLPOP as example:
5585 * - If the user calls BLPOP and the key exists and contains a non empty list
5586 * then LPOP is called instead. So BLPOP is semantically the same as LPOP
5587 * if there is not to block.
5588 * - If instead BLPOP is called and the key does not exists or the list is
5589 * empty we need to block. In order to do so we remove the notification for
5590 * new data to read in the client socket (so that we'll not serve new
5591 * requests if the blocking request is not served). Also we put the client
5592 * in a dictionary (db->blockingkeys) mapping keys to a list of clients
5593 * blocking for this keys.
5594 * - If a PUSH operation against a key with blocked clients waiting is
5595 * performed, we serve the first in the list: basically instead to push
5596 * the new element inside the list we return it to the (first / oldest)
5597 * blocking client, unblock the client, and remove it form the list.
5599 * The above comment and the source code should be enough in order to understand
5600 * the implementation and modify / fix it later.
5603 /* Set a client in blocking mode for the specified key, with the specified
5605 static void blockForKeys(redisClient
*c
, robj
**keys
, int numkeys
, time_t timeout
) {
5610 c
->blockingkeys
= zmalloc(sizeof(robj
*)*numkeys
);
5611 c
->blockingkeysnum
= numkeys
;
5612 c
->blockingto
= timeout
;
5613 for (j
= 0; j
< numkeys
; j
++) {
5614 /* Add the key in the client structure, to map clients -> keys */
5615 c
->blockingkeys
[j
] = keys
[j
];
5616 incrRefCount(keys
[j
]);
5618 /* And in the other "side", to map keys -> clients */
5619 de
= dictFind(c
->db
->blockingkeys
,keys
[j
]);
5623 /* For every key we take a list of clients blocked for it */
5625 retval
= dictAdd(c
->db
->blockingkeys
,keys
[j
],l
);
5626 incrRefCount(keys
[j
]);
5627 assert(retval
== DICT_OK
);
5629 l
= dictGetEntryVal(de
);
5631 listAddNodeTail(l
,c
);
5633 /* Mark the client as a blocked client */
5634 c
->flags
|= REDIS_BLOCKED
;
5635 aeDeleteFileEvent(server
.el
,c
->fd
,AE_READABLE
);
5636 server
.blockedclients
++;
5639 /* Unblock a client that's waiting in a blocking operation such as BLPOP */
5640 static void unblockClient(redisClient
*c
) {
5645 assert(c
->blockingkeys
!= NULL
);
5646 /* The client may wait for multiple keys, so unblock it for every key. */
5647 for (j
= 0; j
< c
->blockingkeysnum
; j
++) {
5648 /* Remove this client from the list of clients waiting for this key. */
5649 de
= dictFind(c
->db
->blockingkeys
,c
->blockingkeys
[j
]);
5651 l
= dictGetEntryVal(de
);
5652 listDelNode(l
,listSearchKey(l
,c
));
5653 /* If the list is empty we need to remove it to avoid wasting memory */
5654 if (listLength(l
) == 0)
5655 dictDelete(c
->db
->blockingkeys
,c
->blockingkeys
[j
]);
5656 decrRefCount(c
->blockingkeys
[j
]);
5658 /* Cleanup the client structure */
5659 zfree(c
->blockingkeys
);
5660 c
->blockingkeys
= NULL
;
5661 c
->flags
&= (~REDIS_BLOCKED
);
5662 server
.blockedclients
--;
5663 /* Ok now we are ready to get read events from socket, note that we
5664 * can't trap errors here as it's possible that unblockClients() is
5665 * called from freeClient() itself, and the only thing we can do
5666 * if we failed to register the READABLE event is to kill the client.
5667 * Still the following function should never fail in the real world as
5668 * we are sure the file descriptor is sane, and we exit on out of mem. */
5669 aeCreateFileEvent(server
.el
, c
->fd
, AE_READABLE
, readQueryFromClient
, c
);
5670 /* As a final step we want to process data if there is some command waiting
5671 * in the input buffer. Note that this is safe even if unblockClient()
5672 * gets called from freeClient() because freeClient() will be smart
5673 * enough to call this function *after* c->querybuf was set to NULL. */
5674 if (c
->querybuf
&& sdslen(c
->querybuf
) > 0) processInputBuffer(c
);
5677 /* This should be called from any function PUSHing into lists.
5678 * 'c' is the "pushing client", 'key' is the key it is pushing data against,
5679 * 'ele' is the element pushed.
5681 * If the function returns 0 there was no client waiting for a list push
5684 * If the function returns 1 there was a client waiting for a list push
5685 * against this key, the element was passed to this client thus it's not
5686 * needed to actually add it to the list and the caller should return asap. */
5687 static int handleClientsWaitingListPush(redisClient
*c
, robj
*key
, robj
*ele
) {
5688 struct dictEntry
*de
;
5689 redisClient
*receiver
;
5693 de
= dictFind(c
->db
->blockingkeys
,key
);
5694 if (de
== NULL
) return 0;
5695 l
= dictGetEntryVal(de
);
5698 receiver
= ln
->value
;
5700 addReplySds(receiver
,sdsnew("*2\r\n"));
5701 addReplyBulkLen(receiver
,key
);
5702 addReply(receiver
,key
);
5703 addReply(receiver
,shared
.crlf
);
5704 addReplyBulkLen(receiver
,ele
);
5705 addReply(receiver
,ele
);
5706 addReply(receiver
,shared
.crlf
);
5707 unblockClient(receiver
);
5711 /* Blocking RPOP/LPOP */
5712 static void blockingPopGenericCommand(redisClient
*c
, int where
) {
5717 for (j
= 1; j
< c
->argc
-1; j
++) {
5718 o
= lookupKeyWrite(c
->db
,c
->argv
[j
]);
5720 if (o
->type
!= REDIS_LIST
) {
5721 addReply(c
,shared
.wrongtypeerr
);
5724 list
*list
= o
->ptr
;
5725 if (listLength(list
) != 0) {
5726 /* If the list contains elements fall back to the usual
5727 * non-blocking POP operation */
5728 robj
*argv
[2], **orig_argv
;
5731 /* We need to alter the command arguments before to call
5732 * popGenericCommand() as the command takes a single key. */
5733 orig_argv
= c
->argv
;
5734 orig_argc
= c
->argc
;
5735 argv
[1] = c
->argv
[j
];
5739 /* Also the return value is different, we need to output
5740 * the multi bulk reply header and the key name. The
5741 * "real" command will add the last element (the value)
5742 * for us. If this souds like an hack to you it's just
5743 * because it is... */
5744 addReplySds(c
,sdsnew("*2\r\n"));
5745 addReplyBulkLen(c
,argv
[1]);
5746 addReply(c
,argv
[1]);
5747 addReply(c
,shared
.crlf
);
5748 popGenericCommand(c
,where
);
5750 /* Fix the client structure with the original stuff */
5751 c
->argv
= orig_argv
;
5752 c
->argc
= orig_argc
;
5758 /* If the list is empty or the key does not exists we must block */
5759 timeout
= strtol(c
->argv
[c
->argc
-1]->ptr
,NULL
,10);
5760 if (timeout
> 0) timeout
+= time(NULL
);
5761 blockForKeys(c
,c
->argv
+1,c
->argc
-2,timeout
);
5764 static void blpopCommand(redisClient
*c
) {
5765 blockingPopGenericCommand(c
,REDIS_HEAD
);
5768 static void brpopCommand(redisClient
*c
) {
5769 blockingPopGenericCommand(c
,REDIS_TAIL
);
5772 /* =============================== Replication ============================= */
5774 static int syncWrite(int fd
, char *ptr
, ssize_t size
, int timeout
) {
5775 ssize_t nwritten
, ret
= size
;
5776 time_t start
= time(NULL
);
5780 if (aeWait(fd
,AE_WRITABLE
,1000) & AE_WRITABLE
) {
5781 nwritten
= write(fd
,ptr
,size
);
5782 if (nwritten
== -1) return -1;
5786 if ((time(NULL
)-start
) > timeout
) {
5794 static int syncRead(int fd
, char *ptr
, ssize_t size
, int timeout
) {
5795 ssize_t nread
, totread
= 0;
5796 time_t start
= time(NULL
);
5800 if (aeWait(fd
,AE_READABLE
,1000) & AE_READABLE
) {
5801 nread
= read(fd
,ptr
,size
);
5802 if (nread
== -1) return -1;
5807 if ((time(NULL
)-start
) > timeout
) {
5815 static int syncReadLine(int fd
, char *ptr
, ssize_t size
, int timeout
) {
5822 if (syncRead(fd
,&c
,1,timeout
) == -1) return -1;
5825 if (nread
&& *(ptr
-1) == '\r') *(ptr
-1) = '\0';
5836 static void syncCommand(redisClient
*c
) {
5837 /* ignore SYNC if aleady slave or in monitor mode */
5838 if (c
->flags
& REDIS_SLAVE
) return;
5840 /* SYNC can't be issued when the server has pending data to send to
5841 * the client about already issued commands. We need a fresh reply
5842 * buffer registering the differences between the BGSAVE and the current
5843 * dataset, so that we can copy to other slaves if needed. */
5844 if (listLength(c
->reply
) != 0) {
5845 addReplySds(c
,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
5849 redisLog(REDIS_NOTICE
,"Slave ask for synchronization");
5850 /* Here we need to check if there is a background saving operation
5851 * in progress, or if it is required to start one */
5852 if (server
.bgsavechildpid
!= -1) {
5853 /* Ok a background save is in progress. Let's check if it is a good
5854 * one for replication, i.e. if there is another slave that is
5855 * registering differences since the server forked to save */
5859 listRewind(server
.slaves
);
5860 while((ln
= listYield(server
.slaves
))) {
5862 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) break;
5865 /* Perfect, the server is already registering differences for
5866 * another slave. Set the right state, and copy the buffer. */
5867 listRelease(c
->reply
);
5868 c
->reply
= listDup(slave
->reply
);
5869 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
5870 redisLog(REDIS_NOTICE
,"Waiting for end of BGSAVE for SYNC");
5872 /* No way, we need to wait for the next BGSAVE in order to
5873 * register differences */
5874 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_START
;
5875 redisLog(REDIS_NOTICE
,"Waiting for next BGSAVE for SYNC");
5878 /* Ok we don't have a BGSAVE in progress, let's start one */
5879 redisLog(REDIS_NOTICE
,"Starting BGSAVE for SYNC");
5880 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
5881 redisLog(REDIS_NOTICE
,"Replication failed, can't BGSAVE");
5882 addReplySds(c
,sdsnew("-ERR Unalbe to perform background save\r\n"));
5885 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
5888 c
->flags
|= REDIS_SLAVE
;
5890 listAddNodeTail(server
.slaves
,c
);
5894 static void sendBulkToSlave(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
5895 redisClient
*slave
= privdata
;
5897 REDIS_NOTUSED(mask
);
5898 char buf
[REDIS_IOBUF_LEN
];
5899 ssize_t nwritten
, buflen
;
5901 if (slave
->repldboff
== 0) {
5902 /* Write the bulk write count before to transfer the DB. In theory here
5903 * we don't know how much room there is in the output buffer of the
5904 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
5905 * operations) will never be smaller than the few bytes we need. */
5908 bulkcount
= sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
5910 if (write(fd
,bulkcount
,sdslen(bulkcount
)) != (signed)sdslen(bulkcount
))
5918 lseek(slave
->repldbfd
,slave
->repldboff
,SEEK_SET
);
5919 buflen
= read(slave
->repldbfd
,buf
,REDIS_IOBUF_LEN
);
5921 redisLog(REDIS_WARNING
,"Read error sending DB to slave: %s",
5922 (buflen
== 0) ? "premature EOF" : strerror(errno
));
5926 if ((nwritten
= write(fd
,buf
,buflen
)) == -1) {
5927 redisLog(REDIS_DEBUG
,"Write error sending DB to slave: %s",
5932 slave
->repldboff
+= nwritten
;
5933 if (slave
->repldboff
== slave
->repldbsize
) {
5934 close(slave
->repldbfd
);
5935 slave
->repldbfd
= -1;
5936 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
5937 slave
->replstate
= REDIS_REPL_ONLINE
;
5938 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
,
5939 sendReplyToClient
, slave
) == AE_ERR
) {
5943 addReplySds(slave
,sdsempty());
5944 redisLog(REDIS_NOTICE
,"Synchronization with slave succeeded");
5948 /* This function is called at the end of every backgrond saving.
5949 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
5950 * otherwise REDIS_ERR is passed to the function.
5952 * The goal of this function is to handle slaves waiting for a successful
5953 * background saving in order to perform non-blocking synchronization. */
5954 static void updateSlavesWaitingBgsave(int bgsaveerr
) {
5956 int startbgsave
= 0;
5958 listRewind(server
.slaves
);
5959 while((ln
= listYield(server
.slaves
))) {
5960 redisClient
*slave
= ln
->value
;
5962 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) {
5964 slave
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
5965 } else if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) {
5966 struct redis_stat buf
;
5968 if (bgsaveerr
!= REDIS_OK
) {
5970 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE child returned an error");
5973 if ((slave
->repldbfd
= open(server
.dbfilename
,O_RDONLY
)) == -1 ||
5974 redis_fstat(slave
->repldbfd
,&buf
) == -1) {
5976 redisLog(REDIS_WARNING
,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno
));
5979 slave
->repldboff
= 0;
5980 slave
->repldbsize
= buf
.st_size
;
5981 slave
->replstate
= REDIS_REPL_SEND_BULK
;
5982 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
5983 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
, sendBulkToSlave
, slave
) == AE_ERR
) {
5990 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
5991 listRewind(server
.slaves
);
5992 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE failed");
5993 while((ln
= listYield(server
.slaves
))) {
5994 redisClient
*slave
= ln
->value
;
5996 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
)
6003 static int syncWithMaster(void) {
6004 char buf
[1024], tmpfile
[256], authcmd
[1024];
6006 int fd
= anetTcpConnect(NULL
,server
.masterhost
,server
.masterport
);
6010 redisLog(REDIS_WARNING
,"Unable to connect to MASTER: %s",
6015 /* AUTH with the master if required. */
6016 if(server
.masterauth
) {
6017 snprintf(authcmd
, 1024, "AUTH %s\r\n", server
.masterauth
);
6018 if (syncWrite(fd
, authcmd
, strlen(server
.masterauth
)+7, 5) == -1) {
6020 redisLog(REDIS_WARNING
,"Unable to AUTH to MASTER: %s",
6024 /* Read the AUTH result. */
6025 if (syncReadLine(fd
,buf
,1024,3600) == -1) {
6027 redisLog(REDIS_WARNING
,"I/O error reading auth result from MASTER: %s",
6031 if (buf
[0] != '+') {
6033 redisLog(REDIS_WARNING
,"Cannot AUTH to MASTER, is the masterauth password correct?");
6038 /* Issue the SYNC command */
6039 if (syncWrite(fd
,"SYNC \r\n",7,5) == -1) {
6041 redisLog(REDIS_WARNING
,"I/O error writing to MASTER: %s",
6045 /* Read the bulk write count */
6046 if (syncReadLine(fd
,buf
,1024,3600) == -1) {
6048 redisLog(REDIS_WARNING
,"I/O error reading bulk count from MASTER: %s",
6052 if (buf
[0] != '$') {
6054 redisLog(REDIS_WARNING
,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
6057 dumpsize
= atoi(buf
+1);
6058 redisLog(REDIS_NOTICE
,"Receiving %d bytes data dump from MASTER",dumpsize
);
6059 /* Read the bulk write data on a temp file */
6060 snprintf(tmpfile
,256,"temp-%d.%ld.rdb",(int)time(NULL
),(long int)random());
6061 dfd
= open(tmpfile
,O_CREAT
|O_WRONLY
,0644);
6064 redisLog(REDIS_WARNING
,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno
));
6068 int nread
, nwritten
;
6070 nread
= read(fd
,buf
,(dumpsize
< 1024)?dumpsize
:1024);
6072 redisLog(REDIS_WARNING
,"I/O error trying to sync with MASTER: %s",
6078 nwritten
= write(dfd
,buf
,nread
);
6079 if (nwritten
== -1) {
6080 redisLog(REDIS_WARNING
,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno
));
6088 if (rename(tmpfile
,server
.dbfilename
) == -1) {
6089 redisLog(REDIS_WARNING
,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno
));
6095 if (rdbLoad(server
.dbfilename
) != REDIS_OK
) {
6096 redisLog(REDIS_WARNING
,"Failed trying to load the MASTER synchronization DB from disk");
6100 server
.master
= createClient(fd
);
6101 server
.master
->flags
|= REDIS_MASTER
;
6102 server
.master
->authenticated
= 1;
6103 server
.replstate
= REDIS_REPL_CONNECTED
;
6107 static void slaveofCommand(redisClient
*c
) {
6108 if (!strcasecmp(c
->argv
[1]->ptr
,"no") &&
6109 !strcasecmp(c
->argv
[2]->ptr
,"one")) {
6110 if (server
.masterhost
) {
6111 sdsfree(server
.masterhost
);
6112 server
.masterhost
= NULL
;
6113 if (server
.master
) freeClient(server
.master
);
6114 server
.replstate
= REDIS_REPL_NONE
;
6115 redisLog(REDIS_NOTICE
,"MASTER MODE enabled (user request)");
6118 sdsfree(server
.masterhost
);
6119 server
.masterhost
= sdsdup(c
->argv
[1]->ptr
);
6120 server
.masterport
= atoi(c
->argv
[2]->ptr
);
6121 if (server
.master
) freeClient(server
.master
);
6122 server
.replstate
= REDIS_REPL_CONNECT
;
6123 redisLog(REDIS_NOTICE
,"SLAVE OF %s:%d enabled (user request)",
6124 server
.masterhost
, server
.masterport
);
6126 addReply(c
,shared
.ok
);
6129 /* ============================ Maxmemory directive ======================== */
6131 /* This function gets called when 'maxmemory' is set on the config file to limit
6132 * the max memory used by the server, and we are out of memory.
6133 * This function will try to, in order:
6135 * - Free objects from the free list
6136 * - Try to remove keys with an EXPIRE set
6138 * It is not possible to free enough memory to reach used-memory < maxmemory
6139 * the server will start refusing commands that will enlarge even more the
6142 static void freeMemoryIfNeeded(void) {
6143 while (server
.maxmemory
&& zmalloc_used_memory() > server
.maxmemory
) {
6144 if (listLength(server
.objfreelist
)) {
6147 listNode
*head
= listFirst(server
.objfreelist
);
6148 o
= listNodeValue(head
);
6149 listDelNode(server
.objfreelist
,head
);
6152 int j
, k
, freed
= 0;
6154 for (j
= 0; j
< server
.dbnum
; j
++) {
6156 robj
*minkey
= NULL
;
6157 struct dictEntry
*de
;
6159 if (dictSize(server
.db
[j
].expires
)) {
6161 /* From a sample of three keys drop the one nearest to
6162 * the natural expire */
6163 for (k
= 0; k
< 3; k
++) {
6166 de
= dictGetRandomKey(server
.db
[j
].expires
);
6167 t
= (time_t) dictGetEntryVal(de
);
6168 if (minttl
== -1 || t
< minttl
) {
6169 minkey
= dictGetEntryKey(de
);
6173 deleteKey(server
.db
+j
,minkey
);
6176 if (!freed
) return; /* nothing to free... */
6181 /* ============================== Append Only file ========================== */
6183 static void feedAppendOnlyFile(struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
) {
6184 sds buf
= sdsempty();
6190 /* The DB this command was targetting is not the same as the last command
6191 * we appendend. To issue a SELECT command is needed. */
6192 if (dictid
!= server
.appendseldb
) {
6195 snprintf(seldb
,sizeof(seldb
),"%d",dictid
);
6196 buf
= sdscatprintf(buf
,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
6197 (unsigned long)strlen(seldb
),seldb
);
6198 server
.appendseldb
= dictid
;
6201 /* "Fix" the argv vector if the command is EXPIRE. We want to translate
6202 * EXPIREs into EXPIREATs calls */
6203 if (cmd
->proc
== expireCommand
) {
6206 tmpargv
[0] = createStringObject("EXPIREAT",8);
6207 tmpargv
[1] = argv
[1];
6208 incrRefCount(argv
[1]);
6209 when
= time(NULL
)+strtol(argv
[2]->ptr
,NULL
,10);
6210 tmpargv
[2] = createObject(REDIS_STRING
,
6211 sdscatprintf(sdsempty(),"%ld",when
));
6215 /* Append the actual command */
6216 buf
= sdscatprintf(buf
,"*%d\r\n",argc
);
6217 for (j
= 0; j
< argc
; j
++) {
6220 o
= getDecodedObject(o
);
6221 buf
= sdscatprintf(buf
,"$%lu\r\n",(unsigned long)sdslen(o
->ptr
));
6222 buf
= sdscatlen(buf
,o
->ptr
,sdslen(o
->ptr
));
6223 buf
= sdscatlen(buf
,"\r\n",2);
6227 /* Free the objects from the modified argv for EXPIREAT */
6228 if (cmd
->proc
== expireCommand
) {
6229 for (j
= 0; j
< 3; j
++)
6230 decrRefCount(argv
[j
]);
6233 /* We want to perform a single write. This should be guaranteed atomic
6234 * at least if the filesystem we are writing is a real physical one.
6235 * While this will save us against the server being killed I don't think
6236 * there is much to do about the whole server stopping for power problems
6238 nwritten
= write(server
.appendfd
,buf
,sdslen(buf
));
6239 if (nwritten
!= (signed)sdslen(buf
)) {
6240 /* Ooops, we are in troubles. The best thing to do for now is
6241 * to simply exit instead to give the illusion that everything is
6242 * working as expected. */
6243 if (nwritten
== -1) {
6244 redisLog(REDIS_WARNING
,"Exiting on error writing to the append-only file: %s",strerror(errno
));
6246 redisLog(REDIS_WARNING
,"Exiting on short write while writing to the append-only file: %s",strerror(errno
));
6250 /* If a background append only file rewriting is in progress we want to
6251 * accumulate the differences between the child DB and the current one
6252 * in a buffer, so that when the child process will do its work we
6253 * can append the differences to the new append only file. */
6254 if (server
.bgrewritechildpid
!= -1)
6255 server
.bgrewritebuf
= sdscatlen(server
.bgrewritebuf
,buf
,sdslen(buf
));
6259 if (server
.appendfsync
== APPENDFSYNC_ALWAYS
||
6260 (server
.appendfsync
== APPENDFSYNC_EVERYSEC
&&
6261 now
-server
.lastfsync
> 1))
6263 fsync(server
.appendfd
); /* Let's try to get this data on the disk */
6264 server
.lastfsync
= now
;
6268 /* In Redis commands are always executed in the context of a client, so in
6269 * order to load the append only file we need to create a fake client. */
6270 static struct redisClient
*createFakeClient(void) {
6271 struct redisClient
*c
= zmalloc(sizeof(*c
));
6275 c
->querybuf
= sdsempty();
6279 /* We set the fake client as a slave waiting for the synchronization
6280 * so that Redis will not try to send replies to this client. */
6281 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_START
;
6282 c
->reply
= listCreate();
6283 listSetFreeMethod(c
->reply
,decrRefCount
);
6284 listSetDupMethod(c
->reply
,dupClientReplyValue
);
6288 static void freeFakeClient(struct redisClient
*c
) {
6289 sdsfree(c
->querybuf
);
6290 listRelease(c
->reply
);
6294 /* Replay the append log file. On error REDIS_OK is returned. On non fatal
6295 * error (the append only file is zero-length) REDIS_ERR is returned. On
6296 * fatal error an error message is logged and the program exists. */
6297 int loadAppendOnlyFile(char *filename
) {
6298 struct redisClient
*fakeClient
;
6299 FILE *fp
= fopen(filename
,"r");
6300 struct redis_stat sb
;
6302 if (redis_fstat(fileno(fp
),&sb
) != -1 && sb
.st_size
== 0)
6306 redisLog(REDIS_WARNING
,"Fatal error: can't open the append log file for reading: %s",strerror(errno
));
6310 fakeClient
= createFakeClient();
6317 struct redisCommand
*cmd
;
6319 if (fgets(buf
,sizeof(buf
),fp
) == NULL
) {
6325 if (buf
[0] != '*') goto fmterr
;
6327 argv
= zmalloc(sizeof(robj
*)*argc
);
6328 for (j
= 0; j
< argc
; j
++) {
6329 if (fgets(buf
,sizeof(buf
),fp
) == NULL
) goto readerr
;
6330 if (buf
[0] != '$') goto fmterr
;
6331 len
= strtol(buf
+1,NULL
,10);
6332 argsds
= sdsnewlen(NULL
,len
);
6333 if (len
&& fread(argsds
,len
,1,fp
) == 0) goto fmterr
;
6334 argv
[j
] = createObject(REDIS_STRING
,argsds
);
6335 if (fread(buf
,2,1,fp
) == 0) goto fmterr
; /* discard CRLF */
6338 /* Command lookup */
6339 cmd
= lookupCommand(argv
[0]->ptr
);
6341 redisLog(REDIS_WARNING
,"Unknown command '%s' reading the append only file", argv
[0]->ptr
);
6344 /* Try object sharing and encoding */
6345 if (server
.shareobjects
) {
6347 for(j
= 1; j
< argc
; j
++)
6348 argv
[j
] = tryObjectSharing(argv
[j
]);
6350 if (cmd
->flags
& REDIS_CMD_BULK
)
6351 tryObjectEncoding(argv
[argc
-1]);
6352 /* Run the command in the context of a fake client */
6353 fakeClient
->argc
= argc
;
6354 fakeClient
->argv
= argv
;
6355 cmd
->proc(fakeClient
);
6356 /* Discard the reply objects list from the fake client */
6357 while(listLength(fakeClient
->reply
))
6358 listDelNode(fakeClient
->reply
,listFirst(fakeClient
->reply
));
6359 /* Clean up, ready for the next command */
6360 for (j
= 0; j
< argc
; j
++) decrRefCount(argv
[j
]);
6364 freeFakeClient(fakeClient
);
6369 redisLog(REDIS_WARNING
,"Unexpected end of file reading the append only file");
6371 redisLog(REDIS_WARNING
,"Unrecoverable error reading the append only file: %s", strerror(errno
));
6375 redisLog(REDIS_WARNING
,"Bad file format reading the append only file");
6379 /* Write an object into a file in the bulk format $<count>\r\n<payload>\r\n */
6380 static int fwriteBulk(FILE *fp
, robj
*obj
) {
6382 obj
= getDecodedObject(obj
);
6383 snprintf(buf
,sizeof(buf
),"$%ld\r\n",(long)sdslen(obj
->ptr
));
6384 if (fwrite(buf
,strlen(buf
),1,fp
) == 0) goto err
;
6385 if (sdslen(obj
->ptr
) && fwrite(obj
->ptr
,sdslen(obj
->ptr
),1,fp
) == 0)
6387 if (fwrite("\r\n",2,1,fp
) == 0) goto err
;
6395 /* Write a double value in bulk format $<count>\r\n<payload>\r\n */
6396 static int fwriteBulkDouble(FILE *fp
, double d
) {
6397 char buf
[128], dbuf
[128];
6399 snprintf(dbuf
,sizeof(dbuf
),"%.17g\r\n",d
);
6400 snprintf(buf
,sizeof(buf
),"$%lu\r\n",(unsigned long)strlen(dbuf
)-2);
6401 if (fwrite(buf
,strlen(buf
),1,fp
) == 0) return 0;
6402 if (fwrite(dbuf
,strlen(dbuf
),1,fp
) == 0) return 0;
6406 /* Write a long value in bulk format $<count>\r\n<payload>\r\n */
6407 static int fwriteBulkLong(FILE *fp
, long l
) {
6408 char buf
[128], lbuf
[128];
6410 snprintf(lbuf
,sizeof(lbuf
),"%ld\r\n",l
);
6411 snprintf(buf
,sizeof(buf
),"$%lu\r\n",(unsigned long)strlen(lbuf
)-2);
6412 if (fwrite(buf
,strlen(buf
),1,fp
) == 0) return 0;
6413 if (fwrite(lbuf
,strlen(lbuf
),1,fp
) == 0) return 0;
6417 /* Write a sequence of commands able to fully rebuild the dataset into
6418 * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */
6419 static int rewriteAppendOnlyFile(char *filename
) {
6420 dictIterator
*di
= NULL
;
6425 time_t now
= time(NULL
);
6427 /* Note that we have to use a different temp name here compared to the
6428 * one used by rewriteAppendOnlyFileBackground() function. */
6429 snprintf(tmpfile
,256,"temp-rewriteaof-%d.aof", (int) getpid());
6430 fp
= fopen(tmpfile
,"w");
6432 redisLog(REDIS_WARNING
, "Failed rewriting the append only file: %s", strerror(errno
));
6435 for (j
= 0; j
< server
.dbnum
; j
++) {
6436 char selectcmd
[] = "*2\r\n$6\r\nSELECT\r\n";
6437 redisDb
*db
= server
.db
+j
;
6439 if (dictSize(d
) == 0) continue;
6440 di
= dictGetIterator(d
);
6446 /* SELECT the new DB */
6447 if (fwrite(selectcmd
,sizeof(selectcmd
)-1,1,fp
) == 0) goto werr
;
6448 if (fwriteBulkLong(fp
,j
) == 0) goto werr
;
6450 /* Iterate this DB writing every entry */
6451 while((de
= dictNext(di
)) != NULL
) {
6452 robj
*key
= dictGetEntryKey(de
);
6453 robj
*o
= dictGetEntryVal(de
);
6454 time_t expiretime
= getExpire(db
,key
);
6456 /* Save the key and associated value */
6457 if (o
->type
== REDIS_STRING
) {
6458 /* Emit a SET command */
6459 char cmd
[]="*3\r\n$3\r\nSET\r\n";
6460 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6462 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6463 if (fwriteBulk(fp
,o
) == 0) goto werr
;
6464 } else if (o
->type
== REDIS_LIST
) {
6465 /* Emit the RPUSHes needed to rebuild the list */
6466 list
*list
= o
->ptr
;
6470 while((ln
= listYield(list
))) {
6471 char cmd
[]="*3\r\n$5\r\nRPUSH\r\n";
6472 robj
*eleobj
= listNodeValue(ln
);
6474 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6475 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6476 if (fwriteBulk(fp
,eleobj
) == 0) goto werr
;
6478 } else if (o
->type
== REDIS_SET
) {
6479 /* Emit the SADDs needed to rebuild the set */
6481 dictIterator
*di
= dictGetIterator(set
);
6484 while((de
= dictNext(di
)) != NULL
) {
6485 char cmd
[]="*3\r\n$4\r\nSADD\r\n";
6486 robj
*eleobj
= dictGetEntryKey(de
);
6488 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6489 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6490 if (fwriteBulk(fp
,eleobj
) == 0) goto werr
;
6492 dictReleaseIterator(di
);
6493 } else if (o
->type
== REDIS_ZSET
) {
6494 /* Emit the ZADDs needed to rebuild the sorted set */
6496 dictIterator
*di
= dictGetIterator(zs
->dict
);
6499 while((de
= dictNext(di
)) != NULL
) {
6500 char cmd
[]="*4\r\n$4\r\nZADD\r\n";
6501 robj
*eleobj
= dictGetEntryKey(de
);
6502 double *score
= dictGetEntryVal(de
);
6504 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6505 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6506 if (fwriteBulkDouble(fp
,*score
) == 0) goto werr
;
6507 if (fwriteBulk(fp
,eleobj
) == 0) goto werr
;
6509 dictReleaseIterator(di
);
6511 redisAssert(0 != 0);
6513 /* Save the expire time */
6514 if (expiretime
!= -1) {
6515 char cmd
[]="*3\r\n$8\r\nEXPIREAT\r\n";
6516 /* If this key is already expired skip it */
6517 if (expiretime
< now
) continue;
6518 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6519 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6520 if (fwriteBulkLong(fp
,expiretime
) == 0) goto werr
;
6523 dictReleaseIterator(di
);
6526 /* Make sure data will not remain on the OS's output buffers */
6531 /* Use RENAME to make sure the DB file is changed atomically only
6532 * if the generate DB file is ok. */
6533 if (rename(tmpfile
,filename
) == -1) {
6534 redisLog(REDIS_WARNING
,"Error moving temp append only file on the final destination: %s", strerror(errno
));
6538 redisLog(REDIS_NOTICE
,"SYNC append only file rewrite performed");
6544 redisLog(REDIS_WARNING
,"Write error writing append only file on disk: %s", strerror(errno
));
6545 if (di
) dictReleaseIterator(di
);
6549 /* This is how rewriting of the append only file in background works:
6551 * 1) The user calls BGREWRITEAOF
6552 * 2) Redis calls this function, that forks():
6553 * 2a) the child rewrite the append only file in a temp file.
6554 * 2b) the parent accumulates differences in server.bgrewritebuf.
6555 * 3) When the child finished '2a' exists.
6556 * 4) The parent will trap the exit code, if it's OK, will append the
6557 * data accumulated into server.bgrewritebuf into the temp file, and
6558 * finally will rename(2) the temp file in the actual file name.
6559 * The the new file is reopened as the new append only file. Profit!
6561 static int rewriteAppendOnlyFileBackground(void) {
6564 if (server
.bgrewritechildpid
!= -1) return REDIS_ERR
;
6565 if ((childpid
= fork()) == 0) {
6570 snprintf(tmpfile
,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
6571 if (rewriteAppendOnlyFile(tmpfile
) == REDIS_OK
) {
6578 if (childpid
== -1) {
6579 redisLog(REDIS_WARNING
,
6580 "Can't rewrite append only file in background: fork: %s",
6584 redisLog(REDIS_NOTICE
,
6585 "Background append only file rewriting started by pid %d",childpid
);
6586 server
.bgrewritechildpid
= childpid
;
6587 /* We set appendseldb to -1 in order to force the next call to the
6588 * feedAppendOnlyFile() to issue a SELECT command, so the differences
6589 * accumulated by the parent into server.bgrewritebuf will start
6590 * with a SELECT statement and it will be safe to merge. */
6591 server
.appendseldb
= -1;
6594 return REDIS_OK
; /* unreached */
6597 static void bgrewriteaofCommand(redisClient
*c
) {
6598 if (server
.bgrewritechildpid
!= -1) {
6599 addReplySds(c
,sdsnew("-ERR background append only file rewriting already in progress\r\n"));
6602 if (rewriteAppendOnlyFileBackground() == REDIS_OK
) {
6603 char *status
= "+Background append only file rewriting started\r\n";
6604 addReplySds(c
,sdsnew(status
));
6606 addReply(c
,shared
.err
);
6610 static void aofRemoveTempFile(pid_t childpid
) {
6613 snprintf(tmpfile
,256,"temp-rewriteaof-bg-%d.aof", (int) childpid
);
6617 /* =============================== Virtual Memory =========================== */
6618 static void vmInit(void) {
6621 server
.vm_fp
= fopen("/tmp/redisvm","w+b");
6622 if (server
.vm_fp
== NULL
) {
6623 redisLog(REDIS_WARNING
,"Impossible to open the swap file. Exiting.");
6626 server
.vm_fd
= fileno(server
.vm_fp
);
6627 server
.vm_next_page
= 0;
6628 server
.vm_near_pages
= 0;
6629 totsize
= server
.vm_pages
*server
.vm_page_size
;
6630 redisLog(REDIS_NOTICE
,"Allocating %lld bytes of swap file",totsize
);
6631 if (ftruncate(server
.vm_fd
,totsize
) == -1) {
6632 redisLog(REDIS_WARNING
,"Can't ftruncate swap file: %s. Exiting.",
6636 redisLog(REDIS_NOTICE
,"Swap file allocated with success");
6638 server
.vm_bitmap
= zmalloc((server
.vm_near_pages
+7)/8);
6639 memset(server
.vm_bitmap
,0,(server
.vm_near_pages
+7)/8);
6640 /* Try to remove the swap file, so the OS will really delete it from the
6641 * file system when Redis exists. */
6642 unlink("/tmp/redisvm");
6645 /* Mark the page as used */
6646 static void vmMarkPageUsed(off_t page
) {
6647 off_t byte
= page
/8;
6649 server
.vm_bitmap
[byte
] |= 1<<bit
;
6652 /* Mark N contiguous pages as used, with 'page' being the first. */
6653 static void vmMarkPagesUsed(off_t page
, off_t count
) {
6656 for (j
= 0; j
< count
; j
++)
6657 vmMarkPageUsed(page
+count
);
6660 /* Mark the page as free */
6661 static void vmMarkPageFree(off_t page
) {
6662 off_t byte
= page
/8;
6664 server
.vm_bitmap
[byte
] &= ~(1<<bit
);
6667 /* Mark N contiguous pages as free, with 'page' being the first. */
6668 static void vmMarkPagesFree(off_t page
, off_t count
) {
6671 for (j
= 0; j
< count
; j
++)
6672 vmMarkPageFree(page
+count
);
6675 /* Test if the page is free */
6676 static int vmFreePage(off_t page
) {
6677 off_t byte
= page
/8;
6679 return server
.vm_bitmap
[byte
] & bit
;
6682 /* Find N contiguous free pages storing the first page of the cluster in *first.
6683 * Returns REDIS_OK if it was able to find N contiguous pages, otherwise
6684 * REDIS_ERR is returned.
6686 * This function uses a simple algorithm: we try to allocate
6687 * REDIS_VM_MAX_NEAR_PAGES sequentially, when we reach this limit we start
6688 * again from the start of the swap file searching for free spaces.
6690 * If it looks pretty clear that there are no free pages near our offset
6691 * we try to find less populated places doing a forward jump of
6692 * REDIS_VM_MAX_RANDOM_JUMP, then we start scanning again a few pages
6693 * without hurry, and then we jump again and so forth...
6695 * This function can be improved using a free list to avoid to guess
6696 * too much, since we could collect data about freed pages.
6698 * note: I implemented this function just after watching an episode of
6699 * Battlestar Galactica, where the hybrid was continuing to say "JUMP!"
6701 static int vmFindContiguousPages(off_t
*first
, int n
) {
6702 off_t base
, offset
= 0, since_jump
= 0, numfree
= 0;
6704 if (server
.vm_near_pages
== REDIS_VM_MAX_NEAR_PAGES
) {
6705 server
.vm_near_pages
= 0;
6706 server
.vm_next_page
= 0;
6708 server
.vm_near_pages
++; /* Yet another try for pages near to the old ones */
6709 base
= server
.vm_next_page
;
6711 while(offset
< server
.vm_pages
) {
6712 off_t
this = base
+offset
;
6714 /* If we overflow, restart from page zero */
6715 if (this >= server
.vm_pages
) {
6716 this -= server
.vm_pages
;
6718 /* Just overflowed, what we found on tail is no longer
6719 * interesting, as it's no longer contiguous. */
6723 if (vmFreePage(this)) {
6724 /* This is a free page */
6726 /* Already got N free pages? Return to the caller, with success */
6732 /* The current one is not a free page */
6736 /* Fast-forward if the current page is not free and we already
6737 * searched enough near this place. */
6739 if (!numfree
&& since_jump
>= REDIS_VM_MAX_RANDOM_JUMP
/4) {
6740 offset
+= random() % REDIS_VM_MAX_RANDOM_JUMP
;
6742 /* Note that even if we rewind after the jump, we are don't need
6743 * to make sure numfree is set to zero as we only jump *if* it
6744 * is set to zero. */
6746 /* Otherwise just check the next page */
6753 /* Swap the 'val' object relative to 'key' into disk. Store all the information
6754 * needed to later retrieve the object into the key object.
6755 * If we can't find enough contiguous empty pages to swap the object on disk
6756 * REDIS_ERR is returned. */
6757 static int vmSwapObject(robj
*key
, robj
*val
) {
6758 off_t pages
= rdbSavedObjectPages(val
);
6761 assert(key
->storage
== REDIS_VM_MEMORY
);
6762 if (vmFindContiguousPages(&page
,pages
) == REDIS_ERR
) return REDIS_ERR
;
6763 if (fseeko(server
.vm_fp
,page
*server
.vm_page_size
,SEEK_SET
) == -1) {
6764 redisLog(REDIS_WARNING
,
6765 "Critical VM problem in vmSwapObject(): can't seek: %s",
6769 rdbSaveObject(server
.vm_fp
,val
);
6770 key
->vm
.page
= page
;
6771 key
->vm
.usedpages
= pages
;
6772 key
->storage
= REDIS_VM_SWAPPED
;
6773 decrRefCount(val
); /* Deallocate the object from memory. */
6774 vmMarkPagesUsed(page
,pages
);
6778 /* Load the value object relative to the 'key' object from swap to memory.
6779 * The newly allocated object is returned. */
6780 static robj
*vmLoadObject(robj
*key
) {
6783 assert(key
->storage
== REDIS_VM_SWAPPED
);
6784 if (fseeko(server
.vm_fp
,key
->vm
.page
*server
.vm_page_size
,SEEK_SET
) == -1) {
6785 redisLog(REDIS_WARNING
,
6786 "Unrecoverable VM problem in vmLoadObject(): can't seek: %s",
6790 val
= rdbLoadObject(key
->type
,server
.vm_fp
);
6792 redisLog(REDIS_WARNING
, "Unrecoverable VM problem in vmLoadObject(): can't load object from swap file: %s", strerror(errno
));
6795 key
->storage
= REDIS_VM_MEMORY
;
6796 key
->vm
.atime
= server
.unixtime
;
6797 vmMarkPagesFree(key
->vm
.page
,key
->vm
.usedpages
);
6801 /* ================================= Debugging ============================== */
6803 static void debugCommand(redisClient
*c
) {
6804 if (!strcasecmp(c
->argv
[1]->ptr
,"segfault")) {
6806 } else if (!strcasecmp(c
->argv
[1]->ptr
,"reload")) {
6807 if (rdbSave(server
.dbfilename
) != REDIS_OK
) {
6808 addReply(c
,shared
.err
);
6812 if (rdbLoad(server
.dbfilename
) != REDIS_OK
) {
6813 addReply(c
,shared
.err
);
6816 redisLog(REDIS_WARNING
,"DB reloaded by DEBUG RELOAD");
6817 addReply(c
,shared
.ok
);
6818 } else if (!strcasecmp(c
->argv
[1]->ptr
,"loadaof")) {
6820 if (loadAppendOnlyFile(server
.appendfilename
) != REDIS_OK
) {
6821 addReply(c
,shared
.err
);
6824 redisLog(REDIS_WARNING
,"Append Only File loaded by DEBUG LOADAOF");
6825 addReply(c
,shared
.ok
);
6826 } else if (!strcasecmp(c
->argv
[1]->ptr
,"object") && c
->argc
== 3) {
6827 dictEntry
*de
= dictFind(c
->db
->dict
,c
->argv
[2]);
6831 addReply(c
,shared
.nokeyerr
);
6834 key
= dictGetEntryKey(de
);
6835 val
= dictGetEntryVal(de
);
6836 addReplySds(c
,sdscatprintf(sdsempty(),
6837 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d serializedlength:%lld\r\n",
6838 (void*)key
, key
->refcount
, (void*)val
, val
->refcount
,
6839 val
->encoding
, rdbSavedObjectLen(val
)));
6841 addReplySds(c
,sdsnew(
6842 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>|RELOAD]\r\n"));
6846 static void _redisAssert(char *estr
) {
6847 redisLog(REDIS_WARNING
,"=== ASSERTION FAILED ===");
6848 redisLog(REDIS_WARNING
,"==> %s\n",estr
);
6849 #ifdef HAVE_BACKTRACE
6850 redisLog(REDIS_WARNING
,"(forcing SIGSEGV in order to print the stack trace)");
6855 /* =================================== Main! ================================ */
6858 int linuxOvercommitMemoryValue(void) {
6859 FILE *fp
= fopen("/proc/sys/vm/overcommit_memory","r");
6863 if (fgets(buf
,64,fp
) == NULL
) {
6872 void linuxOvercommitMemoryWarning(void) {
6873 if (linuxOvercommitMemoryValue() == 0) {
6874 redisLog(REDIS_WARNING
,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
6877 #endif /* __linux__ */
6879 static void daemonize(void) {
6883 if (fork() != 0) exit(0); /* parent exits */
6884 printf("New pid: %d\n", getpid());
6885 setsid(); /* create a new session */
6887 /* Every output goes to /dev/null. If Redis is daemonized but
6888 * the 'logfile' is set to 'stdout' in the configuration file
6889 * it will not log at all. */
6890 if ((fd
= open("/dev/null", O_RDWR
, 0)) != -1) {
6891 dup2(fd
, STDIN_FILENO
);
6892 dup2(fd
, STDOUT_FILENO
);
6893 dup2(fd
, STDERR_FILENO
);
6894 if (fd
> STDERR_FILENO
) close(fd
);
6896 /* Try to write the pid file */
6897 fp
= fopen(server
.pidfile
,"w");
6899 fprintf(fp
,"%d\n",getpid());
6904 int main(int argc
, char **argv
) {
6907 resetServerSaveParams();
6908 loadServerConfig(argv
[1]);
6909 } else if (argc
> 2) {
6910 fprintf(stderr
,"Usage: ./redis-server [/path/to/redis.conf]\n");
6913 redisLog(REDIS_WARNING
,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
6915 if (server
.daemonize
) daemonize();
6917 redisLog(REDIS_NOTICE
,"Server started, Redis version " REDIS_VERSION
);
6919 linuxOvercommitMemoryWarning();
6921 if (server
.appendonly
) {
6922 if (loadAppendOnlyFile(server
.appendfilename
) == REDIS_OK
)
6923 redisLog(REDIS_NOTICE
,"DB loaded from append only file");
6925 if (rdbLoad(server
.dbfilename
) == REDIS_OK
)
6926 redisLog(REDIS_NOTICE
,"DB loaded from disk");
6928 if (aeCreateFileEvent(server
.el
, server
.fd
, AE_READABLE
,
6929 acceptHandler
, NULL
) == AE_ERR
) oom("creating file event");
6930 redisLog(REDIS_NOTICE
,"The server is now ready to accept connections on port %d", server
.port
);
6932 aeDeleteEventLoop(server
.el
);
6936 /* ============================= Backtrace support ========================= */
6938 #ifdef HAVE_BACKTRACE
6939 static char *findFuncName(void *pointer
, unsigned long *offset
);
6941 static void *getMcontextEip(ucontext_t
*uc
) {
6942 #if defined(__FreeBSD__)
6943 return (void*) uc
->uc_mcontext
.mc_eip
;
6944 #elif defined(__dietlibc__)
6945 return (void*) uc
->uc_mcontext
.eip
;
6946 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
6948 return (void*) uc
->uc_mcontext
->__ss
.__rip
;
6950 return (void*) uc
->uc_mcontext
->__ss
.__eip
;
6952 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
6953 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
6954 return (void*) uc
->uc_mcontext
->__ss
.__rip
;
6956 return (void*) uc
->uc_mcontext
->__ss
.__eip
;
6958 #elif defined(__i386__) || defined(__X86_64__) || defined(__x86_64__)
6959 return (void*) uc
->uc_mcontext
.gregs
[REG_EIP
]; /* Linux 32/64 bit */
6960 #elif defined(__ia64__) /* Linux IA64 */
6961 return (void*) uc
->uc_mcontext
.sc_ip
;
6967 static void segvHandler(int sig
, siginfo_t
*info
, void *secret
) {
6969 char **messages
= NULL
;
6970 int i
, trace_size
= 0;
6971 unsigned long offset
=0;
6972 ucontext_t
*uc
= (ucontext_t
*) secret
;
6974 REDIS_NOTUSED(info
);
6976 redisLog(REDIS_WARNING
,
6977 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION
, sig
);
6978 infostring
= genRedisInfoString();
6979 redisLog(REDIS_WARNING
, "%s",infostring
);
6980 /* It's not safe to sdsfree() the returned string under memory
6981 * corruption conditions. Let it leak as we are going to abort */
6983 trace_size
= backtrace(trace
, 100);
6984 /* overwrite sigaction with caller's address */
6985 if (getMcontextEip(uc
) != NULL
) {
6986 trace
[1] = getMcontextEip(uc
);
6988 messages
= backtrace_symbols(trace
, trace_size
);
6990 for (i
=1; i
<trace_size
; ++i
) {
6991 char *fn
= findFuncName(trace
[i
], &offset
), *p
;
6993 p
= strchr(messages
[i
],'+');
6994 if (!fn
|| (p
&& ((unsigned long)strtol(p
+1,NULL
,10)) < offset
)) {
6995 redisLog(REDIS_WARNING
,"%s", messages
[i
]);
6997 redisLog(REDIS_WARNING
,"%d redis-server %p %s + %d", i
, trace
[i
], fn
, (unsigned int)offset
);
7000 /* free(messages); Don't call free() with possibly corrupted memory. */
7004 static void setupSigSegvAction(void) {
7005 struct sigaction act
;
7007 sigemptyset (&act
.sa_mask
);
7008 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
7009 * is used. Otherwise, sa_handler is used */
7010 act
.sa_flags
= SA_NODEFER
| SA_ONSTACK
| SA_RESETHAND
| SA_SIGINFO
;
7011 act
.sa_sigaction
= segvHandler
;
7012 sigaction (SIGSEGV
, &act
, NULL
);
7013 sigaction (SIGBUS
, &act
, NULL
);
7014 sigaction (SIGFPE
, &act
, NULL
);
7015 sigaction (SIGILL
, &act
, NULL
);
7016 sigaction (SIGBUS
, &act
, NULL
);
7020 #include "staticsymbols.h"
7021 /* This function try to convert a pointer into a function name. It's used in
7022 * oreder to provide a backtrace under segmentation fault that's able to
7023 * display functions declared as static (otherwise the backtrace is useless). */
7024 static char *findFuncName(void *pointer
, unsigned long *offset
){
7026 unsigned long off
, minoff
= 0;
7028 /* Try to match against the Symbol with the smallest offset */
7029 for (i
=0; symsTable
[i
].pointer
; i
++) {
7030 unsigned long lp
= (unsigned long) pointer
;
7032 if (lp
!= (unsigned long)-1 && lp
>= symsTable
[i
].pointer
) {
7033 off
=lp
-symsTable
[i
].pointer
;
7034 if (ret
< 0 || off
< minoff
) {
7040 if (ret
== -1) return NULL
;
7042 return symsTable
[ret
].name
;
7044 #else /* HAVE_BACKTRACE */
7045 static void setupSigSegvAction(void) {
7047 #endif /* HAVE_BACKTRACE */