2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
30 #define REDIS_VERSION "1.3.2"
40 #define __USE_POSIX199309
46 #endif /* HAVE_BACKTRACE */
54 #include <arpa/inet.h>
58 #include <sys/resource.h>
64 #include "solarisfixes.h"
68 #include "ae.h" /* Event driven programming library */
69 #include "sds.h" /* Dynamic safe strings */
70 #include "anet.h" /* Networking the easy way */
71 #include "dict.h" /* Hash tables */
72 #include "adlist.h" /* Linked lists */
73 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
74 #include "lzf.h" /* LZF compression library */
75 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
81 /* Static server configuration */
82 #define REDIS_SERVERPORT 6379 /* TCP port */
83 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
84 #define REDIS_IOBUF_LEN 1024
85 #define REDIS_LOADBUF_LEN 1024
86 #define REDIS_STATIC_ARGS 4
87 #define REDIS_DEFAULT_DBNUM 16
88 #define REDIS_CONFIGLINE_MAX 1024
89 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
90 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
91 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
92 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
93 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
95 /* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */
96 #define REDIS_WRITEV_THRESHOLD 3
97 /* Max number of iovecs used for each writev call */
98 #define REDIS_WRITEV_IOVEC_COUNT 256
100 /* Hash table parameters */
101 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
104 #define REDIS_CMD_BULK 1 /* Bulk write command */
105 #define REDIS_CMD_INLINE 2 /* Inline command */
106 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
107 this flags will return an error when the 'maxmemory' option is set in the
108 config file and the server is using more than maxmemory bytes of memory.
109 In short this commands are denied on low memory conditions. */
110 #define REDIS_CMD_DENYOOM 4
113 #define REDIS_STRING 0
119 /* Objects encoding */
120 #define REDIS_ENCODING_RAW 0 /* Raw representation */
121 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
123 /* Object types only used for dumping to disk */
124 #define REDIS_EXPIRETIME 253
125 #define REDIS_SELECTDB 254
126 #define REDIS_EOF 255
128 /* Defines related to the dump file format. To store 32 bits lengths for short
129 * keys requires a lot of space, so we check the most significant 2 bits of
130 * the first byte to interpreter the length:
132 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
133 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
134 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
135 * 11|000000 this means: specially encoded object will follow. The six bits
136 * number specify the kind of object that follows.
137 * See the REDIS_RDB_ENC_* defines.
139 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
140 * values, will fit inside. */
141 #define REDIS_RDB_6BITLEN 0
142 #define REDIS_RDB_14BITLEN 1
143 #define REDIS_RDB_32BITLEN 2
144 #define REDIS_RDB_ENCVAL 3
145 #define REDIS_RDB_LENERR UINT_MAX
147 /* When a length of a string object stored on disk has the first two bits
148 * set, the remaining two bits specify a special encoding for the object
149 * accordingly to the following defines: */
150 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
151 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
152 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
153 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
155 /* Virtual memory object->where field. */
156 #define REDIS_VM_MEMORY 0 /* The object is on memory */
157 #define REDIS_VM_SWAPPED 1 /* The object is on disk */
158 #define REDIS_VM_SWAPPING 2 /* Redis is swapping this object on disk */
159 #define REDIS_VM_LOADING 3 /* Redis is loading this object from disk */
162 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
163 #define REDIS_SLAVE 2 /* This client is a slave server */
164 #define REDIS_MASTER 4 /* This client is a master server */
165 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
166 #define REDIS_MULTI 16 /* This client is in a MULTI context */
167 #define REDIS_BLOCKED 32 /* The client is waiting in a blocking operation */
169 /* Slave replication state - slave side */
170 #define REDIS_REPL_NONE 0 /* No active replication */
171 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
172 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
174 /* Slave replication state - from the point of view of master
175 * Note that in SEND_BULK and ONLINE state the slave receives new updates
176 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
177 * to start the next background saving in order to send updates to it. */
178 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
179 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
180 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
181 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
183 /* List related stuff */
187 /* Sort operations */
188 #define REDIS_SORT_GET 0
189 #define REDIS_SORT_ASC 1
190 #define REDIS_SORT_DESC 2
191 #define REDIS_SORTKEY_MAX 1024
194 #define REDIS_DEBUG 0
195 #define REDIS_NOTICE 1
196 #define REDIS_WARNING 2
198 /* Anti-warning macro... */
199 #define REDIS_NOTUSED(V) ((void) V)
201 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
202 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
204 /* Append only defines */
205 #define APPENDFSYNC_NO 0
206 #define APPENDFSYNC_ALWAYS 1
207 #define APPENDFSYNC_EVERYSEC 2
209 /* We can print the stacktrace, so our assert is defined this way: */
210 #define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e),exit(1)))
211 static void _redisAssert(char *estr
);
213 /*================================= Data types ============================== */
215 /* A redis object, that is a type able to hold a string / list / set */
217 /* The VM object structure */
218 struct redisObjectVM
{
219 off_t offset
; /* the page at witch the object is stored on disk */
220 int pages
; /* number of pages used on disk */
223 /* The actual Redis Object */
224 typedef struct redisObject
{
227 unsigned char encoding
;
228 unsigned char storage
; /* where? REDIS_VM_MEMORY, REDIS_VM_SWAPPED, ... */
229 unsigned char notused
;
231 /* VM fields, this are only allocated if VM is active, otherwise the
232 * object allocation function will just allocate
233 * sizeof(redisObjct) minus sizeof(redisObjectVM), so using
234 * Redis without VM active will not have any overhead. */
235 struct redisObjectVM vm
;
238 /* Macro used to initalize a Redis object allocated on the stack.
239 * Note that this macro is taken near the structure definition to make sure
240 * we'll update it when the structure is changed, to avoid bugs like
241 * bug #85 introduced exactly in this way. */
242 #define initStaticStringObject(_var,_ptr) do { \
244 _var.type = REDIS_STRING; \
245 _var.encoding = REDIS_ENCODING_RAW; \
249 typedef struct redisDb
{
250 dict
*dict
; /* The keyspace for this DB */
251 dict
*expires
; /* Timeout of keys with a timeout set */
252 dict
*blockingkeys
; /* Keys with clients waiting for data (BLPOP) */
256 /* Client MULTI/EXEC state */
257 typedef struct multiCmd
{
260 struct redisCommand
*cmd
;
263 typedef struct multiState
{
264 multiCmd
*commands
; /* Array of MULTI commands */
265 int count
; /* Total number of MULTI commands */
268 /* With multiplexing we need to take per-clinet state.
269 * Clients are taken in a liked list. */
270 typedef struct redisClient
{
275 robj
**argv
, **mbargv
;
277 int bulklen
; /* bulk read len. -1 if not in bulk read mode */
278 int multibulk
; /* multi bulk command format active */
281 time_t lastinteraction
; /* time of the last interaction, used for timeout */
282 int flags
; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
284 int slaveseldb
; /* slave selected db, if this client is a slave */
285 int authenticated
; /* when requirepass is non-NULL */
286 int replstate
; /* replication state if this is a slave */
287 int repldbfd
; /* replication DB file descriptor */
288 long repldboff
; /* replication DB file offset */
289 off_t repldbsize
; /* replication DB file size */
290 multiState mstate
; /* MULTI/EXEC state */
291 robj
**blockingkeys
; /* The key we waiting to terminate a blocking
292 * operation such as BLPOP. Otherwise NULL. */
293 int blockingkeysnum
; /* Number of blocking keys */
294 time_t blockingto
; /* Blocking operation timeout. If UNIX current time
295 * is >= blockingto then the operation timed out. */
303 /* Global server state structure */
308 dict
*sharingpool
; /* Poll used for object sharing */
309 unsigned int sharingpoolsize
;
310 long long dirty
; /* changes to DB from the last save */
312 list
*slaves
, *monitors
;
313 char neterr
[ANET_ERR_LEN
];
315 int cronloops
; /* number of times the cron function run */
316 list
*objfreelist
; /* A list of freed objects to avoid malloc() */
317 time_t lastsave
; /* Unix time of last save succeeede */
318 size_t usedmemory
; /* Used memory in megabytes */
319 /* Fields used only for stats */
320 time_t stat_starttime
; /* server start time */
321 long long stat_numcommands
; /* number of processed commands */
322 long long stat_numconnections
; /* number of connections received */
335 pid_t bgsavechildpid
;
336 pid_t bgrewritechildpid
;
337 sds bgrewritebuf
; /* buffer taken by parent during oppend only rewrite */
338 struct saveparam
*saveparams
;
343 char *appendfilename
;
347 /* Replication related */
352 redisClient
*master
; /* client that is master for this slave */
354 unsigned int maxclients
;
355 unsigned long maxmemory
;
356 unsigned int blockedclients
;
357 /* Sort parameters - qsort_r() is only available under BSD so we
358 * have to take this state global, in order to pass it to sortCompare() */
362 /* Virtual memory configuration */
367 /* Virtual memory state */
370 off_t vm_next_page
; /* Next probably empty page */
371 off_t vm_near_pages
; /* Number of pages allocated sequentially */
374 typedef void redisCommandProc(redisClient
*c
);
375 struct redisCommand
{
377 redisCommandProc
*proc
;
382 struct redisFunctionSym
{
384 unsigned long pointer
;
387 typedef struct _redisSortObject
{
395 typedef struct _redisSortOperation
{
398 } redisSortOperation
;
400 /* ZSETs use a specialized version of Skiplists */
402 typedef struct zskiplistNode
{
403 struct zskiplistNode
**forward
;
404 struct zskiplistNode
*backward
;
409 typedef struct zskiplist
{
410 struct zskiplistNode
*header
, *tail
;
411 unsigned long length
;
415 typedef struct zset
{
420 /* Our shared "common" objects */
422 struct sharedObjectsStruct
{
423 robj
*crlf
, *ok
, *err
, *emptybulk
, *czero
, *cone
, *pong
, *space
,
424 *colon
, *nullbulk
, *nullmultibulk
, *queued
,
425 *emptymultibulk
, *wrongtypeerr
, *nokeyerr
, *syntaxerr
, *sameobjecterr
,
426 *outofrangeerr
, *plus
,
427 *select0
, *select1
, *select2
, *select3
, *select4
,
428 *select5
, *select6
, *select7
, *select8
, *select9
;
431 /* Global vars that are actally used as constants. The following double
432 * values are used for double on-disk serialization, and are initialized
433 * at runtime to avoid strange compiler optimizations. */
435 static double R_Zero
, R_PosInf
, R_NegInf
, R_Nan
;
437 /*================================ Prototypes =============================== */
439 static void freeStringObject(robj
*o
);
440 static void freeListObject(robj
*o
);
441 static void freeSetObject(robj
*o
);
442 static void decrRefCount(void *o
);
443 static robj
*createObject(int type
, void *ptr
);
444 static void freeClient(redisClient
*c
);
445 static int rdbLoad(char *filename
);
446 static void addReply(redisClient
*c
, robj
*obj
);
447 static void addReplySds(redisClient
*c
, sds s
);
448 static void incrRefCount(robj
*o
);
449 static int rdbSaveBackground(char *filename
);
450 static robj
*createStringObject(char *ptr
, size_t len
);
451 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
);
452 static void feedAppendOnlyFile(struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
);
453 static int syncWithMaster(void);
454 static robj
*tryObjectSharing(robj
*o
);
455 static int tryObjectEncoding(robj
*o
);
456 static robj
*getDecodedObject(robj
*o
);
457 static int removeExpire(redisDb
*db
, robj
*key
);
458 static int expireIfNeeded(redisDb
*db
, robj
*key
);
459 static int deleteIfVolatile(redisDb
*db
, robj
*key
);
460 static int deleteKey(redisDb
*db
, robj
*key
);
461 static time_t getExpire(redisDb
*db
, robj
*key
);
462 static int setExpire(redisDb
*db
, robj
*key
, time_t when
);
463 static void updateSlavesWaitingBgsave(int bgsaveerr
);
464 static void freeMemoryIfNeeded(void);
465 static int processCommand(redisClient
*c
);
466 static void setupSigSegvAction(void);
467 static void rdbRemoveTempFile(pid_t childpid
);
468 static void aofRemoveTempFile(pid_t childpid
);
469 static size_t stringObjectLen(robj
*o
);
470 static void processInputBuffer(redisClient
*c
);
471 static zskiplist
*zslCreate(void);
472 static void zslFree(zskiplist
*zsl
);
473 static void zslInsert(zskiplist
*zsl
, double score
, robj
*obj
);
474 static void sendReplyToClientWritev(aeEventLoop
*el
, int fd
, void *privdata
, int mask
);
475 static void initClientMultiState(redisClient
*c
);
476 static void freeClientMultiState(redisClient
*c
);
477 static void queueMultiCommand(redisClient
*c
, struct redisCommand
*cmd
);
478 static void unblockClient(redisClient
*c
);
479 static int handleClientsWaitingListPush(redisClient
*c
, robj
*key
, robj
*ele
);
480 static void vmInit(void);
482 static void authCommand(redisClient
*c
);
483 static void pingCommand(redisClient
*c
);
484 static void echoCommand(redisClient
*c
);
485 static void setCommand(redisClient
*c
);
486 static void setnxCommand(redisClient
*c
);
487 static void getCommand(redisClient
*c
);
488 static void delCommand(redisClient
*c
);
489 static void existsCommand(redisClient
*c
);
490 static void incrCommand(redisClient
*c
);
491 static void decrCommand(redisClient
*c
);
492 static void incrbyCommand(redisClient
*c
);
493 static void decrbyCommand(redisClient
*c
);
494 static void selectCommand(redisClient
*c
);
495 static void randomkeyCommand(redisClient
*c
);
496 static void keysCommand(redisClient
*c
);
497 static void dbsizeCommand(redisClient
*c
);
498 static void lastsaveCommand(redisClient
*c
);
499 static void saveCommand(redisClient
*c
);
500 static void bgsaveCommand(redisClient
*c
);
501 static void bgrewriteaofCommand(redisClient
*c
);
502 static void shutdownCommand(redisClient
*c
);
503 static void moveCommand(redisClient
*c
);
504 static void renameCommand(redisClient
*c
);
505 static void renamenxCommand(redisClient
*c
);
506 static void lpushCommand(redisClient
*c
);
507 static void rpushCommand(redisClient
*c
);
508 static void lpopCommand(redisClient
*c
);
509 static void rpopCommand(redisClient
*c
);
510 static void llenCommand(redisClient
*c
);
511 static void lindexCommand(redisClient
*c
);
512 static void lrangeCommand(redisClient
*c
);
513 static void ltrimCommand(redisClient
*c
);
514 static void typeCommand(redisClient
*c
);
515 static void lsetCommand(redisClient
*c
);
516 static void saddCommand(redisClient
*c
);
517 static void sremCommand(redisClient
*c
);
518 static void smoveCommand(redisClient
*c
);
519 static void sismemberCommand(redisClient
*c
);
520 static void scardCommand(redisClient
*c
);
521 static void spopCommand(redisClient
*c
);
522 static void srandmemberCommand(redisClient
*c
);
523 static void sinterCommand(redisClient
*c
);
524 static void sinterstoreCommand(redisClient
*c
);
525 static void sunionCommand(redisClient
*c
);
526 static void sunionstoreCommand(redisClient
*c
);
527 static void sdiffCommand(redisClient
*c
);
528 static void sdiffstoreCommand(redisClient
*c
);
529 static void syncCommand(redisClient
*c
);
530 static void flushdbCommand(redisClient
*c
);
531 static void flushallCommand(redisClient
*c
);
532 static void sortCommand(redisClient
*c
);
533 static void lremCommand(redisClient
*c
);
534 static void rpoplpushcommand(redisClient
*c
);
535 static void infoCommand(redisClient
*c
);
536 static void mgetCommand(redisClient
*c
);
537 static void monitorCommand(redisClient
*c
);
538 static void expireCommand(redisClient
*c
);
539 static void expireatCommand(redisClient
*c
);
540 static void getsetCommand(redisClient
*c
);
541 static void ttlCommand(redisClient
*c
);
542 static void slaveofCommand(redisClient
*c
);
543 static void debugCommand(redisClient
*c
);
544 static void msetCommand(redisClient
*c
);
545 static void msetnxCommand(redisClient
*c
);
546 static void zaddCommand(redisClient
*c
);
547 static void zincrbyCommand(redisClient
*c
);
548 static void zrangeCommand(redisClient
*c
);
549 static void zrangebyscoreCommand(redisClient
*c
);
550 static void zrevrangeCommand(redisClient
*c
);
551 static void zcardCommand(redisClient
*c
);
552 static void zremCommand(redisClient
*c
);
553 static void zscoreCommand(redisClient
*c
);
554 static void zremrangebyscoreCommand(redisClient
*c
);
555 static void multiCommand(redisClient
*c
);
556 static void execCommand(redisClient
*c
);
557 static void blpopCommand(redisClient
*c
);
558 static void brpopCommand(redisClient
*c
);
560 /*================================= Globals ================================= */
563 static struct redisServer server
; /* server global state */
564 static struct redisCommand cmdTable
[] = {
565 {"get",getCommand
,2,REDIS_CMD_INLINE
},
566 {"set",setCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
567 {"setnx",setnxCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
568 {"del",delCommand
,-2,REDIS_CMD_INLINE
},
569 {"exists",existsCommand
,2,REDIS_CMD_INLINE
},
570 {"incr",incrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
571 {"decr",decrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
572 {"mget",mgetCommand
,-2,REDIS_CMD_INLINE
},
573 {"rpush",rpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
574 {"lpush",lpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
575 {"rpop",rpopCommand
,2,REDIS_CMD_INLINE
},
576 {"lpop",lpopCommand
,2,REDIS_CMD_INLINE
},
577 {"brpop",brpopCommand
,-3,REDIS_CMD_INLINE
},
578 {"blpop",blpopCommand
,-3,REDIS_CMD_INLINE
},
579 {"llen",llenCommand
,2,REDIS_CMD_INLINE
},
580 {"lindex",lindexCommand
,3,REDIS_CMD_INLINE
},
581 {"lset",lsetCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
582 {"lrange",lrangeCommand
,4,REDIS_CMD_INLINE
},
583 {"ltrim",ltrimCommand
,4,REDIS_CMD_INLINE
},
584 {"lrem",lremCommand
,4,REDIS_CMD_BULK
},
585 {"rpoplpush",rpoplpushcommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
586 {"sadd",saddCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
587 {"srem",sremCommand
,3,REDIS_CMD_BULK
},
588 {"smove",smoveCommand
,4,REDIS_CMD_BULK
},
589 {"sismember",sismemberCommand
,3,REDIS_CMD_BULK
},
590 {"scard",scardCommand
,2,REDIS_CMD_INLINE
},
591 {"spop",spopCommand
,2,REDIS_CMD_INLINE
},
592 {"srandmember",srandmemberCommand
,2,REDIS_CMD_INLINE
},
593 {"sinter",sinterCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
594 {"sinterstore",sinterstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
595 {"sunion",sunionCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
596 {"sunionstore",sunionstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
597 {"sdiff",sdiffCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
598 {"sdiffstore",sdiffstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
599 {"smembers",sinterCommand
,2,REDIS_CMD_INLINE
},
600 {"zadd",zaddCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
601 {"zincrby",zincrbyCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
602 {"zrem",zremCommand
,3,REDIS_CMD_BULK
},
603 {"zremrangebyscore",zremrangebyscoreCommand
,4,REDIS_CMD_INLINE
},
604 {"zrange",zrangeCommand
,-4,REDIS_CMD_INLINE
},
605 {"zrangebyscore",zrangebyscoreCommand
,-4,REDIS_CMD_INLINE
},
606 {"zrevrange",zrevrangeCommand
,-4,REDIS_CMD_INLINE
},
607 {"zcard",zcardCommand
,2,REDIS_CMD_INLINE
},
608 {"zscore",zscoreCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
609 {"incrby",incrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
610 {"decrby",decrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
611 {"getset",getsetCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
612 {"mset",msetCommand
,-3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
613 {"msetnx",msetnxCommand
,-3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
614 {"randomkey",randomkeyCommand
,1,REDIS_CMD_INLINE
},
615 {"select",selectCommand
,2,REDIS_CMD_INLINE
},
616 {"move",moveCommand
,3,REDIS_CMD_INLINE
},
617 {"rename",renameCommand
,3,REDIS_CMD_INLINE
},
618 {"renamenx",renamenxCommand
,3,REDIS_CMD_INLINE
},
619 {"expire",expireCommand
,3,REDIS_CMD_INLINE
},
620 {"expireat",expireatCommand
,3,REDIS_CMD_INLINE
},
621 {"keys",keysCommand
,2,REDIS_CMD_INLINE
},
622 {"dbsize",dbsizeCommand
,1,REDIS_CMD_INLINE
},
623 {"auth",authCommand
,2,REDIS_CMD_INLINE
},
624 {"ping",pingCommand
,1,REDIS_CMD_INLINE
},
625 {"echo",echoCommand
,2,REDIS_CMD_BULK
},
626 {"save",saveCommand
,1,REDIS_CMD_INLINE
},
627 {"bgsave",bgsaveCommand
,1,REDIS_CMD_INLINE
},
628 {"bgrewriteaof",bgrewriteaofCommand
,1,REDIS_CMD_INLINE
},
629 {"shutdown",shutdownCommand
,1,REDIS_CMD_INLINE
},
630 {"lastsave",lastsaveCommand
,1,REDIS_CMD_INLINE
},
631 {"type",typeCommand
,2,REDIS_CMD_INLINE
},
632 {"multi",multiCommand
,1,REDIS_CMD_INLINE
},
633 {"exec",execCommand
,1,REDIS_CMD_INLINE
},
634 {"sync",syncCommand
,1,REDIS_CMD_INLINE
},
635 {"flushdb",flushdbCommand
,1,REDIS_CMD_INLINE
},
636 {"flushall",flushallCommand
,1,REDIS_CMD_INLINE
},
637 {"sort",sortCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
638 {"info",infoCommand
,1,REDIS_CMD_INLINE
},
639 {"monitor",monitorCommand
,1,REDIS_CMD_INLINE
},
640 {"ttl",ttlCommand
,2,REDIS_CMD_INLINE
},
641 {"slaveof",slaveofCommand
,3,REDIS_CMD_INLINE
},
642 {"debug",debugCommand
,-2,REDIS_CMD_INLINE
},
646 /*============================ Utility functions ============================ */
648 /* Glob-style pattern matching. */
649 int stringmatchlen(const char *pattern
, int patternLen
,
650 const char *string
, int stringLen
, int nocase
)
655 while (pattern
[1] == '*') {
660 return 1; /* match */
662 if (stringmatchlen(pattern
+1, patternLen
-1,
663 string
, stringLen
, nocase
))
664 return 1; /* match */
668 return 0; /* no match */
672 return 0; /* no match */
682 not = pattern
[0] == '^';
689 if (pattern
[0] == '\\') {
692 if (pattern
[0] == string
[0])
694 } else if (pattern
[0] == ']') {
696 } else if (patternLen
== 0) {
700 } else if (pattern
[1] == '-' && patternLen
>= 3) {
701 int start
= pattern
[0];
702 int end
= pattern
[2];
710 start
= tolower(start
);
716 if (c
>= start
&& c
<= end
)
720 if (pattern
[0] == string
[0])
723 if (tolower((int)pattern
[0]) == tolower((int)string
[0]))
733 return 0; /* no match */
739 if (patternLen
>= 2) {
746 if (pattern
[0] != string
[0])
747 return 0; /* no match */
749 if (tolower((int)pattern
[0]) != tolower((int)string
[0]))
750 return 0; /* no match */
758 if (stringLen
== 0) {
759 while(*pattern
== '*') {
766 if (patternLen
== 0 && stringLen
== 0)
771 static void redisLog(int level
, const char *fmt
, ...) {
775 fp
= (server
.logfile
== NULL
) ? stdout
: fopen(server
.logfile
,"a");
779 if (level
>= server
.verbosity
) {
785 strftime(buf
,64,"%d %b %H:%M:%S",localtime(&now
));
786 fprintf(fp
,"%s %c ",buf
,c
[level
]);
787 vfprintf(fp
, fmt
, ap
);
793 if (server
.logfile
) fclose(fp
);
796 /*====================== Hash table type implementation ==================== */
798 /* This is an hash table type that uses the SDS dynamic strings libary as
799 * keys and radis objects as values (objects can hold SDS strings,
802 static void dictVanillaFree(void *privdata
, void *val
)
804 DICT_NOTUSED(privdata
);
808 static void dictListDestructor(void *privdata
, void *val
)
810 DICT_NOTUSED(privdata
);
811 listRelease((list
*)val
);
814 static int sdsDictKeyCompare(void *privdata
, const void *key1
,
818 DICT_NOTUSED(privdata
);
820 l1
= sdslen((sds
)key1
);
821 l2
= sdslen((sds
)key2
);
822 if (l1
!= l2
) return 0;
823 return memcmp(key1
, key2
, l1
) == 0;
826 static void dictRedisObjectDestructor(void *privdata
, void *val
)
828 DICT_NOTUSED(privdata
);
833 static int dictObjKeyCompare(void *privdata
, const void *key1
,
836 const robj
*o1
= key1
, *o2
= key2
;
837 return sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
840 static unsigned int dictObjHash(const void *key
) {
842 return dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
845 static int dictEncObjKeyCompare(void *privdata
, const void *key1
,
848 robj
*o1
= (robj
*) key1
, *o2
= (robj
*) key2
;
851 o1
= getDecodedObject(o1
);
852 o2
= getDecodedObject(o2
);
853 cmp
= sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
859 static unsigned int dictEncObjHash(const void *key
) {
860 robj
*o
= (robj
*) key
;
862 o
= getDecodedObject(o
);
863 unsigned int hash
= dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
868 static dictType setDictType
= {
869 dictEncObjHash
, /* hash function */
872 dictEncObjKeyCompare
, /* key compare */
873 dictRedisObjectDestructor
, /* key destructor */
874 NULL
/* val destructor */
877 static dictType zsetDictType
= {
878 dictEncObjHash
, /* hash function */
881 dictEncObjKeyCompare
, /* key compare */
882 dictRedisObjectDestructor
, /* key destructor */
883 dictVanillaFree
/* val destructor of malloc(sizeof(double)) */
886 static dictType hashDictType
= {
887 dictObjHash
, /* hash function */
890 dictObjKeyCompare
, /* key compare */
891 dictRedisObjectDestructor
, /* key destructor */
892 dictRedisObjectDestructor
/* val destructor */
895 /* Keylist hash table type has unencoded redis objects as keys and
896 * lists as values. It's used for blocking operations (BLPOP) */
897 static dictType keylistDictType
= {
898 dictObjHash
, /* hash function */
901 dictObjKeyCompare
, /* key compare */
902 dictRedisObjectDestructor
, /* key destructor */
903 dictListDestructor
/* val destructor */
906 /* ========================= Random utility functions ======================= */
908 /* Redis generally does not try to recover from out of memory conditions
909 * when allocating objects or strings, it is not clear if it will be possible
910 * to report this condition to the client since the networking layer itself
911 * is based on heap allocation for send buffers, so we simply abort.
912 * At least the code will be simpler to read... */
913 static void oom(const char *msg
) {
914 redisLog(REDIS_WARNING
, "%s: Out of memory\n",msg
);
919 /* ====================== Redis server networking stuff ===================== */
920 static void closeTimedoutClients(void) {
923 time_t now
= time(NULL
);
925 listRewind(server
.clients
);
926 while ((ln
= listYield(server
.clients
)) != NULL
) {
927 c
= listNodeValue(ln
);
928 if (server
.maxidletime
&&
929 !(c
->flags
& REDIS_SLAVE
) && /* no timeout for slaves */
930 !(c
->flags
& REDIS_MASTER
) && /* no timeout for masters */
931 (now
- c
->lastinteraction
> server
.maxidletime
))
933 redisLog(REDIS_DEBUG
,"Closing idle client");
935 } else if (c
->flags
& REDIS_BLOCKED
) {
936 if (c
->blockingto
!= 0 && c
->blockingto
< now
) {
937 addReply(c
,shared
.nullmultibulk
);
944 static int htNeedsResize(dict
*dict
) {
945 long long size
, used
;
947 size
= dictSlots(dict
);
948 used
= dictSize(dict
);
949 return (size
&& used
&& size
> DICT_HT_INITIAL_SIZE
&&
950 (used
*100/size
< REDIS_HT_MINFILL
));
953 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
954 * we resize the hash table to save memory */
955 static void tryResizeHashTables(void) {
958 for (j
= 0; j
< server
.dbnum
; j
++) {
959 if (htNeedsResize(server
.db
[j
].dict
)) {
960 redisLog(REDIS_DEBUG
,"The hash table %d is too sparse, resize it...",j
);
961 dictResize(server
.db
[j
].dict
);
962 redisLog(REDIS_DEBUG
,"Hash table %d resized.",j
);
964 if (htNeedsResize(server
.db
[j
].expires
))
965 dictResize(server
.db
[j
].expires
);
969 /* A background saving child (BGSAVE) terminated its work. Handle this. */
970 void backgroundSaveDoneHandler(int statloc
) {
971 int exitcode
= WEXITSTATUS(statloc
);
972 int bysignal
= WIFSIGNALED(statloc
);
974 if (!bysignal
&& exitcode
== 0) {
975 redisLog(REDIS_NOTICE
,
976 "Background saving terminated with success");
978 server
.lastsave
= time(NULL
);
979 } else if (!bysignal
&& exitcode
!= 0) {
980 redisLog(REDIS_WARNING
, "Background saving error");
982 redisLog(REDIS_WARNING
,
983 "Background saving terminated by signal");
984 rdbRemoveTempFile(server
.bgsavechildpid
);
986 server
.bgsavechildpid
= -1;
987 /* Possibly there are slaves waiting for a BGSAVE in order to be served
988 * (the first stage of SYNC is a bulk transfer of dump.rdb) */
989 updateSlavesWaitingBgsave(exitcode
== 0 ? REDIS_OK
: REDIS_ERR
);
992 /* A background append only file rewriting (BGREWRITEAOF) terminated its work.
994 void backgroundRewriteDoneHandler(int statloc
) {
995 int exitcode
= WEXITSTATUS(statloc
);
996 int bysignal
= WIFSIGNALED(statloc
);
998 if (!bysignal
&& exitcode
== 0) {
1002 redisLog(REDIS_NOTICE
,
1003 "Background append only file rewriting terminated with success");
1004 /* Now it's time to flush the differences accumulated by the parent */
1005 snprintf(tmpfile
,256,"temp-rewriteaof-bg-%d.aof", (int) server
.bgrewritechildpid
);
1006 fd
= open(tmpfile
,O_WRONLY
|O_APPEND
);
1008 redisLog(REDIS_WARNING
, "Not able to open the temp append only file produced by the child: %s", strerror(errno
));
1011 /* Flush our data... */
1012 if (write(fd
,server
.bgrewritebuf
,sdslen(server
.bgrewritebuf
)) !=
1013 (signed) sdslen(server
.bgrewritebuf
)) {
1014 redisLog(REDIS_WARNING
, "Error or short write trying to flush the parent diff of the append log file in the child temp file: %s", strerror(errno
));
1018 redisLog(REDIS_NOTICE
,"Parent diff flushed into the new append log file with success (%lu bytes)",sdslen(server
.bgrewritebuf
));
1019 /* Now our work is to rename the temp file into the stable file. And
1020 * switch the file descriptor used by the server for append only. */
1021 if (rename(tmpfile
,server
.appendfilename
) == -1) {
1022 redisLog(REDIS_WARNING
,"Can't rename the temp append only file into the stable one: %s", strerror(errno
));
1026 /* Mission completed... almost */
1027 redisLog(REDIS_NOTICE
,"Append only file successfully rewritten.");
1028 if (server
.appendfd
!= -1) {
1029 /* If append only is actually enabled... */
1030 close(server
.appendfd
);
1031 server
.appendfd
= fd
;
1033 server
.appendseldb
= -1; /* Make sure it will issue SELECT */
1034 redisLog(REDIS_NOTICE
,"The new append only file was selected for future appends.");
1036 /* If append only is disabled we just generate a dump in this
1037 * format. Why not? */
1040 } else if (!bysignal
&& exitcode
!= 0) {
1041 redisLog(REDIS_WARNING
, "Background append only file rewriting error");
1043 redisLog(REDIS_WARNING
,
1044 "Background append only file rewriting terminated by signal");
1047 sdsfree(server
.bgrewritebuf
);
1048 server
.bgrewritebuf
= sdsempty();
1049 aofRemoveTempFile(server
.bgrewritechildpid
);
1050 server
.bgrewritechildpid
= -1;
1053 static int serverCron(struct aeEventLoop
*eventLoop
, long long id
, void *clientData
) {
1054 int j
, loops
= server
.cronloops
++;
1055 REDIS_NOTUSED(eventLoop
);
1057 REDIS_NOTUSED(clientData
);
1059 /* Update the global state with the amount of used memory */
1060 server
.usedmemory
= zmalloc_used_memory();
1062 /* Show some info about non-empty databases */
1063 for (j
= 0; j
< server
.dbnum
; j
++) {
1064 long long size
, used
, vkeys
;
1066 size
= dictSlots(server
.db
[j
].dict
);
1067 used
= dictSize(server
.db
[j
].dict
);
1068 vkeys
= dictSize(server
.db
[j
].expires
);
1069 if (!(loops
% 5) && (used
|| vkeys
)) {
1070 redisLog(REDIS_DEBUG
,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j
,used
,vkeys
,size
);
1071 /* dictPrintStats(server.dict); */
1075 /* We don't want to resize the hash tables while a bacground saving
1076 * is in progress: the saving child is created using fork() that is
1077 * implemented with a copy-on-write semantic in most modern systems, so
1078 * if we resize the HT while there is the saving child at work actually
1079 * a lot of memory movements in the parent will cause a lot of pages
1081 if (server
.bgsavechildpid
== -1) tryResizeHashTables();
1083 /* Show information about connected clients */
1085 redisLog(REDIS_DEBUG
,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
1086 listLength(server
.clients
)-listLength(server
.slaves
),
1087 listLength(server
.slaves
),
1089 dictSize(server
.sharingpool
));
1092 /* Close connections of timedout clients */
1093 if ((server
.maxidletime
&& !(loops
% 10)) || server
.blockedclients
)
1094 closeTimedoutClients();
1096 /* Check if a background saving or AOF rewrite in progress terminated */
1097 if (server
.bgsavechildpid
!= -1 || server
.bgrewritechildpid
!= -1) {
1101 if ((pid
= wait3(&statloc
,WNOHANG
,NULL
)) != 0) {
1102 if (pid
== server
.bgsavechildpid
) {
1103 backgroundSaveDoneHandler(statloc
);
1105 backgroundRewriteDoneHandler(statloc
);
1109 /* If there is not a background saving in progress check if
1110 * we have to save now */
1111 time_t now
= time(NULL
);
1112 for (j
= 0; j
< server
.saveparamslen
; j
++) {
1113 struct saveparam
*sp
= server
.saveparams
+j
;
1115 if (server
.dirty
>= sp
->changes
&&
1116 now
-server
.lastsave
> sp
->seconds
) {
1117 redisLog(REDIS_NOTICE
,"%d changes in %d seconds. Saving...",
1118 sp
->changes
, sp
->seconds
);
1119 rdbSaveBackground(server
.dbfilename
);
1125 /* Try to expire a few timed out keys. The algorithm used is adaptive and
1126 * will use few CPU cycles if there are few expiring keys, otherwise
1127 * it will get more aggressive to avoid that too much memory is used by
1128 * keys that can be removed from the keyspace. */
1129 for (j
= 0; j
< server
.dbnum
; j
++) {
1131 redisDb
*db
= server
.db
+j
;
1133 /* Continue to expire if at the end of the cycle more than 25%
1134 * of the keys were expired. */
1136 int num
= dictSize(db
->expires
);
1137 time_t now
= time(NULL
);
1140 if (num
> REDIS_EXPIRELOOKUPS_PER_CRON
)
1141 num
= REDIS_EXPIRELOOKUPS_PER_CRON
;
1146 if ((de
= dictGetRandomKey(db
->expires
)) == NULL
) break;
1147 t
= (time_t) dictGetEntryVal(de
);
1149 deleteKey(db
,dictGetEntryKey(de
));
1153 } while (expired
> REDIS_EXPIRELOOKUPS_PER_CRON
/4);
1156 /* Check if we should connect to a MASTER */
1157 if (server
.replstate
== REDIS_REPL_CONNECT
) {
1158 redisLog(REDIS_NOTICE
,"Connecting to MASTER...");
1159 if (syncWithMaster() == REDIS_OK
) {
1160 redisLog(REDIS_NOTICE
,"MASTER <-> SLAVE sync succeeded");
1166 static void createSharedObjects(void) {
1167 shared
.crlf
= createObject(REDIS_STRING
,sdsnew("\r\n"));
1168 shared
.ok
= createObject(REDIS_STRING
,sdsnew("+OK\r\n"));
1169 shared
.err
= createObject(REDIS_STRING
,sdsnew("-ERR\r\n"));
1170 shared
.emptybulk
= createObject(REDIS_STRING
,sdsnew("$0\r\n\r\n"));
1171 shared
.czero
= createObject(REDIS_STRING
,sdsnew(":0\r\n"));
1172 shared
.cone
= createObject(REDIS_STRING
,sdsnew(":1\r\n"));
1173 shared
.nullbulk
= createObject(REDIS_STRING
,sdsnew("$-1\r\n"));
1174 shared
.nullmultibulk
= createObject(REDIS_STRING
,sdsnew("*-1\r\n"));
1175 shared
.emptymultibulk
= createObject(REDIS_STRING
,sdsnew("*0\r\n"));
1176 shared
.pong
= createObject(REDIS_STRING
,sdsnew("+PONG\r\n"));
1177 shared
.queued
= createObject(REDIS_STRING
,sdsnew("+QUEUED\r\n"));
1178 shared
.wrongtypeerr
= createObject(REDIS_STRING
,sdsnew(
1179 "-ERR Operation against a key holding the wrong kind of value\r\n"));
1180 shared
.nokeyerr
= createObject(REDIS_STRING
,sdsnew(
1181 "-ERR no such key\r\n"));
1182 shared
.syntaxerr
= createObject(REDIS_STRING
,sdsnew(
1183 "-ERR syntax error\r\n"));
1184 shared
.sameobjecterr
= createObject(REDIS_STRING
,sdsnew(
1185 "-ERR source and destination objects are the same\r\n"));
1186 shared
.outofrangeerr
= createObject(REDIS_STRING
,sdsnew(
1187 "-ERR index out of range\r\n"));
1188 shared
.space
= createObject(REDIS_STRING
,sdsnew(" "));
1189 shared
.colon
= createObject(REDIS_STRING
,sdsnew(":"));
1190 shared
.plus
= createObject(REDIS_STRING
,sdsnew("+"));
1191 shared
.select0
= createStringObject("select 0\r\n",10);
1192 shared
.select1
= createStringObject("select 1\r\n",10);
1193 shared
.select2
= createStringObject("select 2\r\n",10);
1194 shared
.select3
= createStringObject("select 3\r\n",10);
1195 shared
.select4
= createStringObject("select 4\r\n",10);
1196 shared
.select5
= createStringObject("select 5\r\n",10);
1197 shared
.select6
= createStringObject("select 6\r\n",10);
1198 shared
.select7
= createStringObject("select 7\r\n",10);
1199 shared
.select8
= createStringObject("select 8\r\n",10);
1200 shared
.select9
= createStringObject("select 9\r\n",10);
1203 static void appendServerSaveParams(time_t seconds
, int changes
) {
1204 server
.saveparams
= zrealloc(server
.saveparams
,sizeof(struct saveparam
)*(server
.saveparamslen
+1));
1205 server
.saveparams
[server
.saveparamslen
].seconds
= seconds
;
1206 server
.saveparams
[server
.saveparamslen
].changes
= changes
;
1207 server
.saveparamslen
++;
1210 static void resetServerSaveParams() {
1211 zfree(server
.saveparams
);
1212 server
.saveparams
= NULL
;
1213 server
.saveparamslen
= 0;
1216 static void initServerConfig() {
1217 server
.dbnum
= REDIS_DEFAULT_DBNUM
;
1218 server
.port
= REDIS_SERVERPORT
;
1219 server
.verbosity
= REDIS_DEBUG
;
1220 server
.maxidletime
= REDIS_MAXIDLETIME
;
1221 server
.saveparams
= NULL
;
1222 server
.logfile
= NULL
; /* NULL = log on standard output */
1223 server
.bindaddr
= NULL
;
1224 server
.glueoutputbuf
= 1;
1225 server
.daemonize
= 0;
1226 server
.appendonly
= 0;
1227 server
.appendfsync
= APPENDFSYNC_ALWAYS
;
1228 server
.lastfsync
= time(NULL
);
1229 server
.appendfd
= -1;
1230 server
.appendseldb
= -1; /* Make sure the first time will not match */
1231 server
.pidfile
= "/var/run/redis.pid";
1232 server
.dbfilename
= "dump.rdb";
1233 server
.appendfilename
= "appendonly.aof";
1234 server
.requirepass
= NULL
;
1235 server
.shareobjects
= 0;
1236 server
.rdbcompression
= 1;
1237 server
.sharingpoolsize
= 1024;
1238 server
.maxclients
= 0;
1239 server
.blockedclients
= 0;
1240 server
.maxmemory
= 0;
1241 server
.vm_enabled
= 0;
1242 server
.vm_page_size
= 256; /* 256 bytes per page */
1243 server
.vm_pages
= 1024*1024*100; /* 104 millions of pages */
1244 server
.vm_max_memory
= 1024LL*1024*1024*1; /* 1 GB of RAM */
1246 resetServerSaveParams();
1248 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1249 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1250 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1251 /* Replication related */
1253 server
.masterauth
= NULL
;
1254 server
.masterhost
= NULL
;
1255 server
.masterport
= 6379;
1256 server
.master
= NULL
;
1257 server
.replstate
= REDIS_REPL_NONE
;
1259 /* Double constants initialization */
1261 R_PosInf
= 1.0/R_Zero
;
1262 R_NegInf
= -1.0/R_Zero
;
1263 R_Nan
= R_Zero
/R_Zero
;
1266 static void initServer() {
1269 signal(SIGHUP
, SIG_IGN
);
1270 signal(SIGPIPE
, SIG_IGN
);
1271 setupSigSegvAction();
1273 server
.clients
= listCreate();
1274 server
.slaves
= listCreate();
1275 server
.monitors
= listCreate();
1276 server
.objfreelist
= listCreate();
1277 createSharedObjects();
1278 server
.el
= aeCreateEventLoop();
1279 server
.db
= zmalloc(sizeof(redisDb
)*server
.dbnum
);
1280 server
.sharingpool
= dictCreate(&setDictType
,NULL
);
1281 server
.fd
= anetTcpServer(server
.neterr
, server
.port
, server
.bindaddr
);
1282 if (server
.fd
== -1) {
1283 redisLog(REDIS_WARNING
, "Opening TCP port: %s", server
.neterr
);
1286 for (j
= 0; j
< server
.dbnum
; j
++) {
1287 server
.db
[j
].dict
= dictCreate(&hashDictType
,NULL
);
1288 server
.db
[j
].expires
= dictCreate(&setDictType
,NULL
);
1289 server
.db
[j
].blockingkeys
= dictCreate(&keylistDictType
,NULL
);
1290 server
.db
[j
].id
= j
;
1292 server
.cronloops
= 0;
1293 server
.bgsavechildpid
= -1;
1294 server
.bgrewritechildpid
= -1;
1295 server
.bgrewritebuf
= sdsempty();
1296 server
.lastsave
= time(NULL
);
1298 server
.usedmemory
= 0;
1299 server
.stat_numcommands
= 0;
1300 server
.stat_numconnections
= 0;
1301 server
.stat_starttime
= time(NULL
);
1302 aeCreateTimeEvent(server
.el
, 1, serverCron
, NULL
, NULL
);
1304 if (server
.appendonly
) {
1305 server
.appendfd
= open(server
.appendfilename
,O_WRONLY
|O_APPEND
|O_CREAT
,0644);
1306 if (server
.appendfd
== -1) {
1307 redisLog(REDIS_WARNING
, "Can't open the append-only file: %s",
1313 if (server
.vm_enabled
) vmInit();
1316 /* Empty the whole database */
1317 static long long emptyDb() {
1319 long long removed
= 0;
1321 for (j
= 0; j
< server
.dbnum
; j
++) {
1322 removed
+= dictSize(server
.db
[j
].dict
);
1323 dictEmpty(server
.db
[j
].dict
);
1324 dictEmpty(server
.db
[j
].expires
);
1329 static int yesnotoi(char *s
) {
1330 if (!strcasecmp(s
,"yes")) return 1;
1331 else if (!strcasecmp(s
,"no")) return 0;
1335 /* I agree, this is a very rudimental way to load a configuration...
1336 will improve later if the config gets more complex */
1337 static void loadServerConfig(char *filename
) {
1339 char buf
[REDIS_CONFIGLINE_MAX
+1], *err
= NULL
;
1343 if (filename
[0] == '-' && filename
[1] == '\0')
1346 if ((fp
= fopen(filename
,"r")) == NULL
) {
1347 redisLog(REDIS_WARNING
,"Fatal error, can't open config file");
1352 while(fgets(buf
,REDIS_CONFIGLINE_MAX
+1,fp
) != NULL
) {
1358 line
= sdstrim(line
," \t\r\n");
1360 /* Skip comments and blank lines*/
1361 if (line
[0] == '#' || line
[0] == '\0') {
1366 /* Split into arguments */
1367 argv
= sdssplitlen(line
,sdslen(line
)," ",1,&argc
);
1368 sdstolower(argv
[0]);
1370 /* Execute config directives */
1371 if (!strcasecmp(argv
[0],"timeout") && argc
== 2) {
1372 server
.maxidletime
= atoi(argv
[1]);
1373 if (server
.maxidletime
< 0) {
1374 err
= "Invalid timeout value"; goto loaderr
;
1376 } else if (!strcasecmp(argv
[0],"port") && argc
== 2) {
1377 server
.port
= atoi(argv
[1]);
1378 if (server
.port
< 1 || server
.port
> 65535) {
1379 err
= "Invalid port"; goto loaderr
;
1381 } else if (!strcasecmp(argv
[0],"bind") && argc
== 2) {
1382 server
.bindaddr
= zstrdup(argv
[1]);
1383 } else if (!strcasecmp(argv
[0],"save") && argc
== 3) {
1384 int seconds
= atoi(argv
[1]);
1385 int changes
= atoi(argv
[2]);
1386 if (seconds
< 1 || changes
< 0) {
1387 err
= "Invalid save parameters"; goto loaderr
;
1389 appendServerSaveParams(seconds
,changes
);
1390 } else if (!strcasecmp(argv
[0],"dir") && argc
== 2) {
1391 if (chdir(argv
[1]) == -1) {
1392 redisLog(REDIS_WARNING
,"Can't chdir to '%s': %s",
1393 argv
[1], strerror(errno
));
1396 } else if (!strcasecmp(argv
[0],"loglevel") && argc
== 2) {
1397 if (!strcasecmp(argv
[1],"debug")) server
.verbosity
= REDIS_DEBUG
;
1398 else if (!strcasecmp(argv
[1],"notice")) server
.verbosity
= REDIS_NOTICE
;
1399 else if (!strcasecmp(argv
[1],"warning")) server
.verbosity
= REDIS_WARNING
;
1401 err
= "Invalid log level. Must be one of debug, notice, warning";
1404 } else if (!strcasecmp(argv
[0],"logfile") && argc
== 2) {
1407 server
.logfile
= zstrdup(argv
[1]);
1408 if (!strcasecmp(server
.logfile
,"stdout")) {
1409 zfree(server
.logfile
);
1410 server
.logfile
= NULL
;
1412 if (server
.logfile
) {
1413 /* Test if we are able to open the file. The server will not
1414 * be able to abort just for this problem later... */
1415 logfp
= fopen(server
.logfile
,"a");
1416 if (logfp
== NULL
) {
1417 err
= sdscatprintf(sdsempty(),
1418 "Can't open the log file: %s", strerror(errno
));
1423 } else if (!strcasecmp(argv
[0],"databases") && argc
== 2) {
1424 server
.dbnum
= atoi(argv
[1]);
1425 if (server
.dbnum
< 1) {
1426 err
= "Invalid number of databases"; goto loaderr
;
1428 } else if (!strcasecmp(argv
[0],"maxclients") && argc
== 2) {
1429 server
.maxclients
= atoi(argv
[1]);
1430 } else if (!strcasecmp(argv
[0],"maxmemory") && argc
== 2) {
1431 server
.maxmemory
= strtoll(argv
[1], NULL
, 10);
1432 } else if (!strcasecmp(argv
[0],"slaveof") && argc
== 3) {
1433 server
.masterhost
= sdsnew(argv
[1]);
1434 server
.masterport
= atoi(argv
[2]);
1435 server
.replstate
= REDIS_REPL_CONNECT
;
1436 } else if (!strcasecmp(argv
[0],"masterauth") && argc
== 2) {
1437 server
.masterauth
= zstrdup(argv
[1]);
1438 } else if (!strcasecmp(argv
[0],"glueoutputbuf") && argc
== 2) {
1439 if ((server
.glueoutputbuf
= yesnotoi(argv
[1])) == -1) {
1440 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1442 } else if (!strcasecmp(argv
[0],"shareobjects") && argc
== 2) {
1443 if ((server
.shareobjects
= yesnotoi(argv
[1])) == -1) {
1444 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1446 } else if (!strcasecmp(argv
[0],"rdbcompression") && argc
== 2) {
1447 if ((server
.rdbcompression
= yesnotoi(argv
[1])) == -1) {
1448 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1450 } else if (!strcasecmp(argv
[0],"shareobjectspoolsize") && argc
== 2) {
1451 server
.sharingpoolsize
= atoi(argv
[1]);
1452 if (server
.sharingpoolsize
< 1) {
1453 err
= "invalid object sharing pool size"; goto loaderr
;
1455 } else if (!strcasecmp(argv
[0],"daemonize") && argc
== 2) {
1456 if ((server
.daemonize
= yesnotoi(argv
[1])) == -1) {
1457 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1459 } else if (!strcasecmp(argv
[0],"appendonly") && argc
== 2) {
1460 if ((server
.appendonly
= yesnotoi(argv
[1])) == -1) {
1461 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1463 } else if (!strcasecmp(argv
[0],"appendfsync") && argc
== 2) {
1464 if (!strcasecmp(argv
[1],"no")) {
1465 server
.appendfsync
= APPENDFSYNC_NO
;
1466 } else if (!strcasecmp(argv
[1],"always")) {
1467 server
.appendfsync
= APPENDFSYNC_ALWAYS
;
1468 } else if (!strcasecmp(argv
[1],"everysec")) {
1469 server
.appendfsync
= APPENDFSYNC_EVERYSEC
;
1471 err
= "argument must be 'no', 'always' or 'everysec'";
1474 } else if (!strcasecmp(argv
[0],"requirepass") && argc
== 2) {
1475 server
.requirepass
= zstrdup(argv
[1]);
1476 } else if (!strcasecmp(argv
[0],"pidfile") && argc
== 2) {
1477 server
.pidfile
= zstrdup(argv
[1]);
1478 } else if (!strcasecmp(argv
[0],"dbfilename") && argc
== 2) {
1479 server
.dbfilename
= zstrdup(argv
[1]);
1480 } else if (!strcasecmp(argv
[0],"vm-enabled") && argc
== 2) {
1481 if ((server
.vm_enabled
= yesnotoi(argv
[1])) == -1) {
1482 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1485 err
= "Bad directive or wrong number of arguments"; goto loaderr
;
1487 for (j
= 0; j
< argc
; j
++)
1492 if (fp
!= stdin
) fclose(fp
);
1496 fprintf(stderr
, "\n*** FATAL CONFIG FILE ERROR ***\n");
1497 fprintf(stderr
, "Reading the configuration file, at line %d\n", linenum
);
1498 fprintf(stderr
, ">>> '%s'\n", line
);
1499 fprintf(stderr
, "%s\n", err
);
1503 static void freeClientArgv(redisClient
*c
) {
1506 for (j
= 0; j
< c
->argc
; j
++)
1507 decrRefCount(c
->argv
[j
]);
1508 for (j
= 0; j
< c
->mbargc
; j
++)
1509 decrRefCount(c
->mbargv
[j
]);
1514 static void freeClient(redisClient
*c
) {
1517 /* Note that if the client we are freeing is blocked into a blocking
1518 * call, we have to set querybuf to NULL *before* to call unblockClient()
1519 * to avoid processInputBuffer() will get called. Also it is important
1520 * to remove the file events after this, because this call adds
1521 * the READABLE event. */
1522 sdsfree(c
->querybuf
);
1524 if (c
->flags
& REDIS_BLOCKED
)
1527 aeDeleteFileEvent(server
.el
,c
->fd
,AE_READABLE
);
1528 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1529 listRelease(c
->reply
);
1532 ln
= listSearchKey(server
.clients
,c
);
1533 redisAssert(ln
!= NULL
);
1534 listDelNode(server
.clients
,ln
);
1535 if (c
->flags
& REDIS_SLAVE
) {
1536 if (c
->replstate
== REDIS_REPL_SEND_BULK
&& c
->repldbfd
!= -1)
1538 list
*l
= (c
->flags
& REDIS_MONITOR
) ? server
.monitors
: server
.slaves
;
1539 ln
= listSearchKey(l
,c
);
1540 redisAssert(ln
!= NULL
);
1543 if (c
->flags
& REDIS_MASTER
) {
1544 server
.master
= NULL
;
1545 server
.replstate
= REDIS_REPL_CONNECT
;
1549 freeClientMultiState(c
);
1553 #define GLUEREPLY_UP_TO (1024)
1554 static void glueReplyBuffersIfNeeded(redisClient
*c
) {
1556 char buf
[GLUEREPLY_UP_TO
];
1560 listRewind(c
->reply
);
1561 while((ln
= listYield(c
->reply
))) {
1565 objlen
= sdslen(o
->ptr
);
1566 if (copylen
+ objlen
<= GLUEREPLY_UP_TO
) {
1567 memcpy(buf
+copylen
,o
->ptr
,objlen
);
1569 listDelNode(c
->reply
,ln
);
1571 if (copylen
== 0) return;
1575 /* Now the output buffer is empty, add the new single element */
1576 o
= createObject(REDIS_STRING
,sdsnewlen(buf
,copylen
));
1577 listAddNodeHead(c
->reply
,o
);
1580 static void sendReplyToClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1581 redisClient
*c
= privdata
;
1582 int nwritten
= 0, totwritten
= 0, objlen
;
1585 REDIS_NOTUSED(mask
);
1587 /* Use writev() if we have enough buffers to send */
1588 if (!server
.glueoutputbuf
&&
1589 listLength(c
->reply
) > REDIS_WRITEV_THRESHOLD
&&
1590 !(c
->flags
& REDIS_MASTER
))
1592 sendReplyToClientWritev(el
, fd
, privdata
, mask
);
1596 while(listLength(c
->reply
)) {
1597 if (server
.glueoutputbuf
&& listLength(c
->reply
) > 1)
1598 glueReplyBuffersIfNeeded(c
);
1600 o
= listNodeValue(listFirst(c
->reply
));
1601 objlen
= sdslen(o
->ptr
);
1604 listDelNode(c
->reply
,listFirst(c
->reply
));
1608 if (c
->flags
& REDIS_MASTER
) {
1609 /* Don't reply to a master */
1610 nwritten
= objlen
- c
->sentlen
;
1612 nwritten
= write(fd
, ((char*)o
->ptr
)+c
->sentlen
, objlen
- c
->sentlen
);
1613 if (nwritten
<= 0) break;
1615 c
->sentlen
+= nwritten
;
1616 totwritten
+= nwritten
;
1617 /* If we fully sent the object on head go to the next one */
1618 if (c
->sentlen
== objlen
) {
1619 listDelNode(c
->reply
,listFirst(c
->reply
));
1622 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1623 * bytes, in a single threaded server it's a good idea to serve
1624 * other clients as well, even if a very large request comes from
1625 * super fast link that is always able to accept data (in real world
1626 * scenario think about 'KEYS *' against the loopback interfae) */
1627 if (totwritten
> REDIS_MAX_WRITE_PER_EVENT
) break;
1629 if (nwritten
== -1) {
1630 if (errno
== EAGAIN
) {
1633 redisLog(REDIS_DEBUG
,
1634 "Error writing to client: %s", strerror(errno
));
1639 if (totwritten
> 0) c
->lastinteraction
= time(NULL
);
1640 if (listLength(c
->reply
) == 0) {
1642 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1646 static void sendReplyToClientWritev(aeEventLoop
*el
, int fd
, void *privdata
, int mask
)
1648 redisClient
*c
= privdata
;
1649 int nwritten
= 0, totwritten
= 0, objlen
, willwrite
;
1651 struct iovec iov
[REDIS_WRITEV_IOVEC_COUNT
];
1652 int offset
, ion
= 0;
1654 REDIS_NOTUSED(mask
);
1657 while (listLength(c
->reply
)) {
1658 offset
= c
->sentlen
;
1662 /* fill-in the iov[] array */
1663 for(node
= listFirst(c
->reply
); node
; node
= listNextNode(node
)) {
1664 o
= listNodeValue(node
);
1665 objlen
= sdslen(o
->ptr
);
1667 if (totwritten
+ objlen
- offset
> REDIS_MAX_WRITE_PER_EVENT
)
1670 if(ion
== REDIS_WRITEV_IOVEC_COUNT
)
1671 break; /* no more iovecs */
1673 iov
[ion
].iov_base
= ((char*)o
->ptr
) + offset
;
1674 iov
[ion
].iov_len
= objlen
- offset
;
1675 willwrite
+= objlen
- offset
;
1676 offset
= 0; /* just for the first item */
1683 /* write all collected blocks at once */
1684 if((nwritten
= writev(fd
, iov
, ion
)) < 0) {
1685 if (errno
!= EAGAIN
) {
1686 redisLog(REDIS_DEBUG
,
1687 "Error writing to client: %s", strerror(errno
));
1694 totwritten
+= nwritten
;
1695 offset
= c
->sentlen
;
1697 /* remove written robjs from c->reply */
1698 while (nwritten
&& listLength(c
->reply
)) {
1699 o
= listNodeValue(listFirst(c
->reply
));
1700 objlen
= sdslen(o
->ptr
);
1702 if(nwritten
>= objlen
- offset
) {
1703 listDelNode(c
->reply
, listFirst(c
->reply
));
1704 nwritten
-= objlen
- offset
;
1708 c
->sentlen
+= nwritten
;
1716 c
->lastinteraction
= time(NULL
);
1718 if (listLength(c
->reply
) == 0) {
1720 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1724 static struct redisCommand
*lookupCommand(char *name
) {
1726 while(cmdTable
[j
].name
!= NULL
) {
1727 if (!strcasecmp(name
,cmdTable
[j
].name
)) return &cmdTable
[j
];
1733 /* resetClient prepare the client to process the next command */
1734 static void resetClient(redisClient
*c
) {
1740 /* Call() is the core of Redis execution of a command */
1741 static void call(redisClient
*c
, struct redisCommand
*cmd
) {
1744 dirty
= server
.dirty
;
1746 if (server
.appendonly
&& server
.dirty
-dirty
)
1747 feedAppendOnlyFile(cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1748 if (server
.dirty
-dirty
&& listLength(server
.slaves
))
1749 replicationFeedSlaves(server
.slaves
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1750 if (listLength(server
.monitors
))
1751 replicationFeedSlaves(server
.monitors
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1752 server
.stat_numcommands
++;
1755 /* If this function gets called we already read a whole
1756 * command, argments are in the client argv/argc fields.
1757 * processCommand() execute the command or prepare the
1758 * server for a bulk read from the client.
1760 * If 1 is returned the client is still alive and valid and
1761 * and other operations can be performed by the caller. Otherwise
1762 * if 0 is returned the client was destroied (i.e. after QUIT). */
1763 static int processCommand(redisClient
*c
) {
1764 struct redisCommand
*cmd
;
1766 /* Free some memory if needed (maxmemory setting) */
1767 if (server
.maxmemory
) freeMemoryIfNeeded();
1769 /* Handle the multi bulk command type. This is an alternative protocol
1770 * supported by Redis in order to receive commands that are composed of
1771 * multiple binary-safe "bulk" arguments. The latency of processing is
1772 * a bit higher but this allows things like multi-sets, so if this
1773 * protocol is used only for MSET and similar commands this is a big win. */
1774 if (c
->multibulk
== 0 && c
->argc
== 1 && ((char*)(c
->argv
[0]->ptr
))[0] == '*') {
1775 c
->multibulk
= atoi(((char*)c
->argv
[0]->ptr
)+1);
1776 if (c
->multibulk
<= 0) {
1780 decrRefCount(c
->argv
[c
->argc
-1]);
1784 } else if (c
->multibulk
) {
1785 if (c
->bulklen
== -1) {
1786 if (((char*)c
->argv
[0]->ptr
)[0] != '$') {
1787 addReplySds(c
,sdsnew("-ERR multi bulk protocol error\r\n"));
1791 int bulklen
= atoi(((char*)c
->argv
[0]->ptr
)+1);
1792 decrRefCount(c
->argv
[0]);
1793 if (bulklen
< 0 || bulklen
> 1024*1024*1024) {
1795 addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n"));
1800 c
->bulklen
= bulklen
+2; /* add two bytes for CR+LF */
1804 c
->mbargv
= zrealloc(c
->mbargv
,(sizeof(robj
*))*(c
->mbargc
+1));
1805 c
->mbargv
[c
->mbargc
] = c
->argv
[0];
1809 if (c
->multibulk
== 0) {
1813 /* Here we need to swap the multi-bulk argc/argv with the
1814 * normal argc/argv of the client structure. */
1816 c
->argv
= c
->mbargv
;
1817 c
->mbargv
= auxargv
;
1820 c
->argc
= c
->mbargc
;
1821 c
->mbargc
= auxargc
;
1823 /* We need to set bulklen to something different than -1
1824 * in order for the code below to process the command without
1825 * to try to read the last argument of a bulk command as
1826 * a special argument. */
1828 /* continue below and process the command */
1835 /* -- end of multi bulk commands processing -- */
1837 /* The QUIT command is handled as a special case. Normal command
1838 * procs are unable to close the client connection safely */
1839 if (!strcasecmp(c
->argv
[0]->ptr
,"quit")) {
1843 cmd
= lookupCommand(c
->argv
[0]->ptr
);
1846 sdscatprintf(sdsempty(), "-ERR unknown command '%s'\r\n",
1847 (char*)c
->argv
[0]->ptr
));
1850 } else if ((cmd
->arity
> 0 && cmd
->arity
!= c
->argc
) ||
1851 (c
->argc
< -cmd
->arity
)) {
1853 sdscatprintf(sdsempty(),
1854 "-ERR wrong number of arguments for '%s' command\r\n",
1858 } else if (server
.maxmemory
&& cmd
->flags
& REDIS_CMD_DENYOOM
&& zmalloc_used_memory() > server
.maxmemory
) {
1859 addReplySds(c
,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1862 } else if (cmd
->flags
& REDIS_CMD_BULK
&& c
->bulklen
== -1) {
1863 int bulklen
= atoi(c
->argv
[c
->argc
-1]->ptr
);
1865 decrRefCount(c
->argv
[c
->argc
-1]);
1866 if (bulklen
< 0 || bulklen
> 1024*1024*1024) {
1868 addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n"));
1873 c
->bulklen
= bulklen
+2; /* add two bytes for CR+LF */
1874 /* It is possible that the bulk read is already in the
1875 * buffer. Check this condition and handle it accordingly.
1876 * This is just a fast path, alternative to call processInputBuffer().
1877 * It's a good idea since the code is small and this condition
1878 * happens most of the times. */
1879 if ((signed)sdslen(c
->querybuf
) >= c
->bulklen
) {
1880 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
1882 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
1887 /* Let's try to share objects on the command arguments vector */
1888 if (server
.shareobjects
) {
1890 for(j
= 1; j
< c
->argc
; j
++)
1891 c
->argv
[j
] = tryObjectSharing(c
->argv
[j
]);
1893 /* Let's try to encode the bulk object to save space. */
1894 if (cmd
->flags
& REDIS_CMD_BULK
)
1895 tryObjectEncoding(c
->argv
[c
->argc
-1]);
1897 /* Check if the user is authenticated */
1898 if (server
.requirepass
&& !c
->authenticated
&& cmd
->proc
!= authCommand
) {
1899 addReplySds(c
,sdsnew("-ERR operation not permitted\r\n"));
1904 /* Exec the command */
1905 if (c
->flags
& REDIS_MULTI
&& cmd
->proc
!= execCommand
) {
1906 queueMultiCommand(c
,cmd
);
1907 addReply(c
,shared
.queued
);
1912 /* Prepare the client for the next command */
1913 if (c
->flags
& REDIS_CLOSE
) {
1921 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
) {
1925 /* (args*2)+1 is enough room for args, spaces, newlines */
1926 robj
*static_outv
[REDIS_STATIC_ARGS
*2+1];
1928 if (argc
<= REDIS_STATIC_ARGS
) {
1931 outv
= zmalloc(sizeof(robj
*)*(argc
*2+1));
1934 for (j
= 0; j
< argc
; j
++) {
1935 if (j
!= 0) outv
[outc
++] = shared
.space
;
1936 if ((cmd
->flags
& REDIS_CMD_BULK
) && j
== argc
-1) {
1939 lenobj
= createObject(REDIS_STRING
,
1940 sdscatprintf(sdsempty(),"%lu\r\n",
1941 (unsigned long) stringObjectLen(argv
[j
])));
1942 lenobj
->refcount
= 0;
1943 outv
[outc
++] = lenobj
;
1945 outv
[outc
++] = argv
[j
];
1947 outv
[outc
++] = shared
.crlf
;
1949 /* Increment all the refcounts at start and decrement at end in order to
1950 * be sure to free objects if there is no slave in a replication state
1951 * able to be feed with commands */
1952 for (j
= 0; j
< outc
; j
++) incrRefCount(outv
[j
]);
1954 while((ln
= listYield(slaves
))) {
1955 redisClient
*slave
= ln
->value
;
1957 /* Don't feed slaves that are still waiting for BGSAVE to start */
1958 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) continue;
1960 /* Feed all the other slaves, MONITORs and so on */
1961 if (slave
->slaveseldb
!= dictid
) {
1965 case 0: selectcmd
= shared
.select0
; break;
1966 case 1: selectcmd
= shared
.select1
; break;
1967 case 2: selectcmd
= shared
.select2
; break;
1968 case 3: selectcmd
= shared
.select3
; break;
1969 case 4: selectcmd
= shared
.select4
; break;
1970 case 5: selectcmd
= shared
.select5
; break;
1971 case 6: selectcmd
= shared
.select6
; break;
1972 case 7: selectcmd
= shared
.select7
; break;
1973 case 8: selectcmd
= shared
.select8
; break;
1974 case 9: selectcmd
= shared
.select9
; break;
1976 selectcmd
= createObject(REDIS_STRING
,
1977 sdscatprintf(sdsempty(),"select %d\r\n",dictid
));
1978 selectcmd
->refcount
= 0;
1981 addReply(slave
,selectcmd
);
1982 slave
->slaveseldb
= dictid
;
1984 for (j
= 0; j
< outc
; j
++) addReply(slave
,outv
[j
]);
1986 for (j
= 0; j
< outc
; j
++) decrRefCount(outv
[j
]);
1987 if (outv
!= static_outv
) zfree(outv
);
1990 static void processInputBuffer(redisClient
*c
) {
1992 /* Before to process the input buffer, make sure the client is not
1993 * waitig for a blocking operation such as BLPOP. Note that the first
1994 * iteration the client is never blocked, otherwise the processInputBuffer
1995 * would not be called at all, but after the execution of the first commands
1996 * in the input buffer the client may be blocked, and the "goto again"
1997 * will try to reiterate. The following line will make it return asap. */
1998 if (c
->flags
& REDIS_BLOCKED
) return;
1999 if (c
->bulklen
== -1) {
2000 /* Read the first line of the query */
2001 char *p
= strchr(c
->querybuf
,'\n');
2008 query
= c
->querybuf
;
2009 c
->querybuf
= sdsempty();
2010 querylen
= 1+(p
-(query
));
2011 if (sdslen(query
) > querylen
) {
2012 /* leave data after the first line of the query in the buffer */
2013 c
->querybuf
= sdscatlen(c
->querybuf
,query
+querylen
,sdslen(query
)-querylen
);
2015 *p
= '\0'; /* remove "\n" */
2016 if (*(p
-1) == '\r') *(p
-1) = '\0'; /* and "\r" if any */
2017 sdsupdatelen(query
);
2019 /* Now we can split the query in arguments */
2020 argv
= sdssplitlen(query
,sdslen(query
)," ",1,&argc
);
2023 if (c
->argv
) zfree(c
->argv
);
2024 c
->argv
= zmalloc(sizeof(robj
*)*argc
);
2026 for (j
= 0; j
< argc
; j
++) {
2027 if (sdslen(argv
[j
])) {
2028 c
->argv
[c
->argc
] = createObject(REDIS_STRING
,argv
[j
]);
2036 /* Execute the command. If the client is still valid
2037 * after processCommand() return and there is something
2038 * on the query buffer try to process the next command. */
2039 if (processCommand(c
) && sdslen(c
->querybuf
)) goto again
;
2041 /* Nothing to process, argc == 0. Just process the query
2042 * buffer if it's not empty or return to the caller */
2043 if (sdslen(c
->querybuf
)) goto again
;
2046 } else if (sdslen(c
->querybuf
) >= REDIS_REQUEST_MAX_SIZE
) {
2047 redisLog(REDIS_DEBUG
, "Client protocol error");
2052 /* Bulk read handling. Note that if we are at this point
2053 the client already sent a command terminated with a newline,
2054 we are reading the bulk data that is actually the last
2055 argument of the command. */
2056 int qbl
= sdslen(c
->querybuf
);
2058 if (c
->bulklen
<= qbl
) {
2059 /* Copy everything but the final CRLF as final argument */
2060 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
2062 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
2063 /* Process the command. If the client is still valid after
2064 * the processing and there is more data in the buffer
2065 * try to parse it. */
2066 if (processCommand(c
) && sdslen(c
->querybuf
)) goto again
;
2072 static void readQueryFromClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
2073 redisClient
*c
= (redisClient
*) privdata
;
2074 char buf
[REDIS_IOBUF_LEN
];
2077 REDIS_NOTUSED(mask
);
2079 nread
= read(fd
, buf
, REDIS_IOBUF_LEN
);
2081 if (errno
== EAGAIN
) {
2084 redisLog(REDIS_DEBUG
, "Reading from client: %s",strerror(errno
));
2088 } else if (nread
== 0) {
2089 redisLog(REDIS_DEBUG
, "Client closed connection");
2094 c
->querybuf
= sdscatlen(c
->querybuf
, buf
, nread
);
2095 c
->lastinteraction
= time(NULL
);
2099 processInputBuffer(c
);
2102 static int selectDb(redisClient
*c
, int id
) {
2103 if (id
< 0 || id
>= server
.dbnum
)
2105 c
->db
= &server
.db
[id
];
2109 static void *dupClientReplyValue(void *o
) {
2110 incrRefCount((robj
*)o
);
2114 static redisClient
*createClient(int fd
) {
2115 redisClient
*c
= zmalloc(sizeof(*c
));
2117 anetNonBlock(NULL
,fd
);
2118 anetTcpNoDelay(NULL
,fd
);
2119 if (!c
) return NULL
;
2122 c
->querybuf
= sdsempty();
2131 c
->lastinteraction
= time(NULL
);
2132 c
->authenticated
= 0;
2133 c
->replstate
= REDIS_REPL_NONE
;
2134 c
->reply
= listCreate();
2135 c
->blockingkeys
= NULL
;
2136 c
->blockingkeysnum
= 0;
2137 listSetFreeMethod(c
->reply
,decrRefCount
);
2138 listSetDupMethod(c
->reply
,dupClientReplyValue
);
2139 if (aeCreateFileEvent(server
.el
, c
->fd
, AE_READABLE
,
2140 readQueryFromClient
, c
) == AE_ERR
) {
2144 listAddNodeTail(server
.clients
,c
);
2145 initClientMultiState(c
);
2149 static void addReply(redisClient
*c
, robj
*obj
) {
2150 if (listLength(c
->reply
) == 0 &&
2151 (c
->replstate
== REDIS_REPL_NONE
||
2152 c
->replstate
== REDIS_REPL_ONLINE
) &&
2153 aeCreateFileEvent(server
.el
, c
->fd
, AE_WRITABLE
,
2154 sendReplyToClient
, c
) == AE_ERR
) return;
2155 listAddNodeTail(c
->reply
,getDecodedObject(obj
));
2158 static void addReplySds(redisClient
*c
, sds s
) {
2159 robj
*o
= createObject(REDIS_STRING
,s
);
2164 static void addReplyDouble(redisClient
*c
, double d
) {
2167 snprintf(buf
,sizeof(buf
),"%.17g",d
);
2168 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n%s\r\n",
2169 (unsigned long) strlen(buf
),buf
));
2172 static void addReplyBulkLen(redisClient
*c
, robj
*obj
) {
2175 if (obj
->encoding
== REDIS_ENCODING_RAW
) {
2176 len
= sdslen(obj
->ptr
);
2178 long n
= (long)obj
->ptr
;
2180 /* Compute how many bytes will take this integer as a radix 10 string */
2186 while((n
= n
/10) != 0) {
2190 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n",(unsigned long)len
));
2193 static void acceptHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
2198 REDIS_NOTUSED(mask
);
2199 REDIS_NOTUSED(privdata
);
2201 cfd
= anetAccept(server
.neterr
, fd
, cip
, &cport
);
2202 if (cfd
== AE_ERR
) {
2203 redisLog(REDIS_DEBUG
,"Accepting client connection: %s", server
.neterr
);
2206 redisLog(REDIS_DEBUG
,"Accepted %s:%d", cip
, cport
);
2207 if ((c
= createClient(cfd
)) == NULL
) {
2208 redisLog(REDIS_WARNING
,"Error allocating resoures for the client");
2209 close(cfd
); /* May be already closed, just ingore errors */
2212 /* If maxclient directive is set and this is one client more... close the
2213 * connection. Note that we create the client instead to check before
2214 * for this condition, since now the socket is already set in nonblocking
2215 * mode and we can send an error for free using the Kernel I/O */
2216 if (server
.maxclients
&& listLength(server
.clients
) > server
.maxclients
) {
2217 char *err
= "-ERR max number of clients reached\r\n";
2219 /* That's a best effort error message, don't check write errors */
2220 if (write(c
->fd
,err
,strlen(err
)) == -1) {
2221 /* Nothing to do, Just to avoid the warning... */
2226 server
.stat_numconnections
++;
2229 /* ======================= Redis objects implementation ===================== */
2231 static robj
*createObject(int type
, void *ptr
) {
2234 if (listLength(server
.objfreelist
)) {
2235 listNode
*head
= listFirst(server
.objfreelist
);
2236 o
= listNodeValue(head
);
2237 listDelNode(server
.objfreelist
,head
);
2239 if (server
.vm_enabled
) {
2240 o
= zmalloc(sizeof(*o
));
2242 o
= zmalloc(sizeof(*o
)-sizeof(struct redisObjectVM
));
2246 o
->encoding
= REDIS_ENCODING_RAW
;
2252 static robj
*createStringObject(char *ptr
, size_t len
) {
2253 return createObject(REDIS_STRING
,sdsnewlen(ptr
,len
));
2256 static robj
*createListObject(void) {
2257 list
*l
= listCreate();
2259 listSetFreeMethod(l
,decrRefCount
);
2260 return createObject(REDIS_LIST
,l
);
2263 static robj
*createSetObject(void) {
2264 dict
*d
= dictCreate(&setDictType
,NULL
);
2265 return createObject(REDIS_SET
,d
);
2268 static robj
*createZsetObject(void) {
2269 zset
*zs
= zmalloc(sizeof(*zs
));
2271 zs
->dict
= dictCreate(&zsetDictType
,NULL
);
2272 zs
->zsl
= zslCreate();
2273 return createObject(REDIS_ZSET
,zs
);
2276 static void freeStringObject(robj
*o
) {
2277 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2282 static void freeListObject(robj
*o
) {
2283 listRelease((list
*) o
->ptr
);
2286 static void freeSetObject(robj
*o
) {
2287 dictRelease((dict
*) o
->ptr
);
2290 static void freeZsetObject(robj
*o
) {
2293 dictRelease(zs
->dict
);
2298 static void freeHashObject(robj
*o
) {
2299 dictRelease((dict
*) o
->ptr
);
2302 static void incrRefCount(robj
*o
) {
2304 #ifdef DEBUG_REFCOUNT
2305 if (o
->type
== REDIS_STRING
)
2306 printf("Increment '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
);
2310 static void decrRefCount(void *obj
) {
2313 #ifdef DEBUG_REFCOUNT
2314 if (o
->type
== REDIS_STRING
)
2315 printf("Decrement '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
-1);
2317 if (--(o
->refcount
) == 0) {
2319 case REDIS_STRING
: freeStringObject(o
); break;
2320 case REDIS_LIST
: freeListObject(o
); break;
2321 case REDIS_SET
: freeSetObject(o
); break;
2322 case REDIS_ZSET
: freeZsetObject(o
); break;
2323 case REDIS_HASH
: freeHashObject(o
); break;
2324 default: redisAssert(0 != 0); break;
2326 if (listLength(server
.objfreelist
) > REDIS_OBJFREELIST_MAX
||
2327 !listAddNodeHead(server
.objfreelist
,o
))
2332 static robj
*lookupKey(redisDb
*db
, robj
*key
) {
2333 dictEntry
*de
= dictFind(db
->dict
,key
);
2334 return de
? dictGetEntryVal(de
) : NULL
;
2337 static robj
*lookupKeyRead(redisDb
*db
, robj
*key
) {
2338 expireIfNeeded(db
,key
);
2339 return lookupKey(db
,key
);
2342 static robj
*lookupKeyWrite(redisDb
*db
, robj
*key
) {
2343 deleteIfVolatile(db
,key
);
2344 return lookupKey(db
,key
);
2347 static int deleteKey(redisDb
*db
, robj
*key
) {
2350 /* We need to protect key from destruction: after the first dictDelete()
2351 * it may happen that 'key' is no longer valid if we don't increment
2352 * it's count. This may happen when we get the object reference directly
2353 * from the hash table with dictRandomKey() or dict iterators */
2355 if (dictSize(db
->expires
)) dictDelete(db
->expires
,key
);
2356 retval
= dictDelete(db
->dict
,key
);
2359 return retval
== DICT_OK
;
2362 /* Try to share an object against the shared objects pool */
2363 static robj
*tryObjectSharing(robj
*o
) {
2364 struct dictEntry
*de
;
2367 if (o
== NULL
|| server
.shareobjects
== 0) return o
;
2369 redisAssert(o
->type
== REDIS_STRING
);
2370 de
= dictFind(server
.sharingpool
,o
);
2372 robj
*shared
= dictGetEntryKey(de
);
2374 c
= ((unsigned long) dictGetEntryVal(de
))+1;
2375 dictGetEntryVal(de
) = (void*) c
;
2376 incrRefCount(shared
);
2380 /* Here we are using a stream algorihtm: Every time an object is
2381 * shared we increment its count, everytime there is a miss we
2382 * recrement the counter of a random object. If this object reaches
2383 * zero we remove the object and put the current object instead. */
2384 if (dictSize(server
.sharingpool
) >=
2385 server
.sharingpoolsize
) {
2386 de
= dictGetRandomKey(server
.sharingpool
);
2387 redisAssert(de
!= NULL
);
2388 c
= ((unsigned long) dictGetEntryVal(de
))-1;
2389 dictGetEntryVal(de
) = (void*) c
;
2391 dictDelete(server
.sharingpool
,de
->key
);
2394 c
= 0; /* If the pool is empty we want to add this object */
2399 retval
= dictAdd(server
.sharingpool
,o
,(void*)1);
2400 redisAssert(retval
== DICT_OK
);
2407 /* Check if the nul-terminated string 's' can be represented by a long
2408 * (that is, is a number that fits into long without any other space or
2409 * character before or after the digits).
2411 * If so, the function returns REDIS_OK and *longval is set to the value
2412 * of the number. Otherwise REDIS_ERR is returned */
2413 static int isStringRepresentableAsLong(sds s
, long *longval
) {
2414 char buf
[32], *endptr
;
2418 value
= strtol(s
, &endptr
, 10);
2419 if (endptr
[0] != '\0') return REDIS_ERR
;
2420 slen
= snprintf(buf
,32,"%ld",value
);
2422 /* If the number converted back into a string is not identical
2423 * then it's not possible to encode the string as integer */
2424 if (sdslen(s
) != (unsigned)slen
|| memcmp(buf
,s
,slen
)) return REDIS_ERR
;
2425 if (longval
) *longval
= value
;
2429 /* Try to encode a string object in order to save space */
2430 static int tryObjectEncoding(robj
*o
) {
2434 if (o
->encoding
!= REDIS_ENCODING_RAW
)
2435 return REDIS_ERR
; /* Already encoded */
2437 /* It's not save to encode shared objects: shared objects can be shared
2438 * everywhere in the "object space" of Redis. Encoded objects can only
2439 * appear as "values" (and not, for instance, as keys) */
2440 if (o
->refcount
> 1) return REDIS_ERR
;
2442 /* Currently we try to encode only strings */
2443 redisAssert(o
->type
== REDIS_STRING
);
2445 /* Check if we can represent this string as a long integer */
2446 if (isStringRepresentableAsLong(s
,&value
) == REDIS_ERR
) return REDIS_ERR
;
2448 /* Ok, this object can be encoded */
2449 o
->encoding
= REDIS_ENCODING_INT
;
2451 o
->ptr
= (void*) value
;
2455 /* Get a decoded version of an encoded object (returned as a new object).
2456 * If the object is already raw-encoded just increment the ref count. */
2457 static robj
*getDecodedObject(robj
*o
) {
2460 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2464 if (o
->type
== REDIS_STRING
&& o
->encoding
== REDIS_ENCODING_INT
) {
2467 snprintf(buf
,32,"%ld",(long)o
->ptr
);
2468 dec
= createStringObject(buf
,strlen(buf
));
2471 redisAssert(1 != 1);
2475 /* Compare two string objects via strcmp() or alike.
2476 * Note that the objects may be integer-encoded. In such a case we
2477 * use snprintf() to get a string representation of the numbers on the stack
2478 * and compare the strings, it's much faster than calling getDecodedObject().
2480 * Important note: if objects are not integer encoded, but binary-safe strings,
2481 * sdscmp() from sds.c will apply memcmp() so this function ca be considered
2483 static int compareStringObjects(robj
*a
, robj
*b
) {
2484 redisAssert(a
->type
== REDIS_STRING
&& b
->type
== REDIS_STRING
);
2485 char bufa
[128], bufb
[128], *astr
, *bstr
;
2488 if (a
== b
) return 0;
2489 if (a
->encoding
!= REDIS_ENCODING_RAW
) {
2490 snprintf(bufa
,sizeof(bufa
),"%ld",(long) a
->ptr
);
2496 if (b
->encoding
!= REDIS_ENCODING_RAW
) {
2497 snprintf(bufb
,sizeof(bufb
),"%ld",(long) b
->ptr
);
2503 return bothsds
? sdscmp(astr
,bstr
) : strcmp(astr
,bstr
);
2506 static size_t stringObjectLen(robj
*o
) {
2507 redisAssert(o
->type
== REDIS_STRING
);
2508 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2509 return sdslen(o
->ptr
);
2513 return snprintf(buf
,32,"%ld",(long)o
->ptr
);
2517 /*============================ RDB saving/loading =========================== */
2519 static int rdbSaveType(FILE *fp
, unsigned char type
) {
2520 if (fwrite(&type
,1,1,fp
) == 0) return -1;
2524 static int rdbSaveTime(FILE *fp
, time_t t
) {
2525 int32_t t32
= (int32_t) t
;
2526 if (fwrite(&t32
,4,1,fp
) == 0) return -1;
2530 /* check rdbLoadLen() comments for more info */
2531 static int rdbSaveLen(FILE *fp
, uint32_t len
) {
2532 unsigned char buf
[2];
2535 /* Save a 6 bit len */
2536 buf
[0] = (len
&0xFF)|(REDIS_RDB_6BITLEN
<<6);
2537 if (fwrite(buf
,1,1,fp
) == 0) return -1;
2538 } else if (len
< (1<<14)) {
2539 /* Save a 14 bit len */
2540 buf
[0] = ((len
>>8)&0xFF)|(REDIS_RDB_14BITLEN
<<6);
2542 if (fwrite(buf
,2,1,fp
) == 0) return -1;
2544 /* Save a 32 bit len */
2545 buf
[0] = (REDIS_RDB_32BITLEN
<<6);
2546 if (fwrite(buf
,1,1,fp
) == 0) return -1;
2548 if (fwrite(&len
,4,1,fp
) == 0) return -1;
2553 /* String objects in the form "2391" "-100" without any space and with a
2554 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2555 * encoded as integers to save space */
2556 static int rdbTryIntegerEncoding(sds s
, unsigned char *enc
) {
2558 char *endptr
, buf
[32];
2560 /* Check if it's possible to encode this value as a number */
2561 value
= strtoll(s
, &endptr
, 10);
2562 if (endptr
[0] != '\0') return 0;
2563 snprintf(buf
,32,"%lld",value
);
2565 /* If the number converted back into a string is not identical
2566 * then it's not possible to encode the string as integer */
2567 if (strlen(buf
) != sdslen(s
) || memcmp(buf
,s
,sdslen(s
))) return 0;
2569 /* Finally check if it fits in our ranges */
2570 if (value
>= -(1<<7) && value
<= (1<<7)-1) {
2571 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT8
;
2572 enc
[1] = value
&0xFF;
2574 } else if (value
>= -(1<<15) && value
<= (1<<15)-1) {
2575 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT16
;
2576 enc
[1] = value
&0xFF;
2577 enc
[2] = (value
>>8)&0xFF;
2579 } else if (value
>= -((long long)1<<31) && value
<= ((long long)1<<31)-1) {
2580 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT32
;
2581 enc
[1] = value
&0xFF;
2582 enc
[2] = (value
>>8)&0xFF;
2583 enc
[3] = (value
>>16)&0xFF;
2584 enc
[4] = (value
>>24)&0xFF;
2591 static int rdbSaveLzfStringObject(FILE *fp
, robj
*obj
) {
2592 unsigned int comprlen
, outlen
;
2596 /* We require at least four bytes compression for this to be worth it */
2597 outlen
= sdslen(obj
->ptr
)-4;
2598 if (outlen
<= 0) return 0;
2599 if ((out
= zmalloc(outlen
+1)) == NULL
) return 0;
2600 comprlen
= lzf_compress(obj
->ptr
, sdslen(obj
->ptr
), out
, outlen
);
2601 if (comprlen
== 0) {
2605 /* Data compressed! Let's save it on disk */
2606 byte
= (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_LZF
;
2607 if (fwrite(&byte
,1,1,fp
) == 0) goto writeerr
;
2608 if (rdbSaveLen(fp
,comprlen
) == -1) goto writeerr
;
2609 if (rdbSaveLen(fp
,sdslen(obj
->ptr
)) == -1) goto writeerr
;
2610 if (fwrite(out
,comprlen
,1,fp
) == 0) goto writeerr
;
2619 /* Save a string objet as [len][data] on disk. If the object is a string
2620 * representation of an integer value we try to safe it in a special form */
2621 static int rdbSaveStringObjectRaw(FILE *fp
, robj
*obj
) {
2625 len
= sdslen(obj
->ptr
);
2627 /* Try integer encoding */
2629 unsigned char buf
[5];
2630 if ((enclen
= rdbTryIntegerEncoding(obj
->ptr
,buf
)) > 0) {
2631 if (fwrite(buf
,enclen
,1,fp
) == 0) return -1;
2636 /* Try LZF compression - under 20 bytes it's unable to compress even
2637 * aaaaaaaaaaaaaaaaaa so skip it */
2638 if (server
.rdbcompression
&& len
> 20) {
2641 retval
= rdbSaveLzfStringObject(fp
,obj
);
2642 if (retval
== -1) return -1;
2643 if (retval
> 0) return 0;
2644 /* retval == 0 means data can't be compressed, save the old way */
2647 /* Store verbatim */
2648 if (rdbSaveLen(fp
,len
) == -1) return -1;
2649 if (len
&& fwrite(obj
->ptr
,len
,1,fp
) == 0) return -1;
2653 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2654 static int rdbSaveStringObject(FILE *fp
, robj
*obj
) {
2657 obj
= getDecodedObject(obj
);
2658 retval
= rdbSaveStringObjectRaw(fp
,obj
);
2663 /* Save a double value. Doubles are saved as strings prefixed by an unsigned
2664 * 8 bit integer specifing the length of the representation.
2665 * This 8 bit integer has special values in order to specify the following
2671 static int rdbSaveDoubleValue(FILE *fp
, double val
) {
2672 unsigned char buf
[128];
2678 } else if (!isfinite(val
)) {
2680 buf
[0] = (val
< 0) ? 255 : 254;
2682 snprintf((char*)buf
+1,sizeof(buf
)-1,"%.17g",val
);
2683 buf
[0] = strlen((char*)buf
+1);
2686 if (fwrite(buf
,len
,1,fp
) == 0) return -1;
2690 /* Save a Redis object. */
2691 static int rdbSaveObject(FILE *fp
, robj
*o
) {
2692 if (o
->type
== REDIS_STRING
) {
2693 /* Save a string value */
2694 if (rdbSaveStringObject(fp
,o
) == -1) return -1;
2695 } else if (o
->type
== REDIS_LIST
) {
2696 /* Save a list value */
2697 list
*list
= o
->ptr
;
2701 if (rdbSaveLen(fp
,listLength(list
)) == -1) return -1;
2702 while((ln
= listYield(list
))) {
2703 robj
*eleobj
= listNodeValue(ln
);
2705 if (rdbSaveStringObject(fp
,eleobj
) == -1) return -1;
2707 } else if (o
->type
== REDIS_SET
) {
2708 /* Save a set value */
2710 dictIterator
*di
= dictGetIterator(set
);
2713 if (rdbSaveLen(fp
,dictSize(set
)) == -1) return -1;
2714 while((de
= dictNext(di
)) != NULL
) {
2715 robj
*eleobj
= dictGetEntryKey(de
);
2717 if (rdbSaveStringObject(fp
,eleobj
) == -1) return -1;
2719 dictReleaseIterator(di
);
2720 } else if (o
->type
== REDIS_ZSET
) {
2721 /* Save a set value */
2723 dictIterator
*di
= dictGetIterator(zs
->dict
);
2726 if (rdbSaveLen(fp
,dictSize(zs
->dict
)) == -1) return -1;
2727 while((de
= dictNext(di
)) != NULL
) {
2728 robj
*eleobj
= dictGetEntryKey(de
);
2729 double *score
= dictGetEntryVal(de
);
2731 if (rdbSaveStringObject(fp
,eleobj
) == -1) return -1;
2732 if (rdbSaveDoubleValue(fp
,*score
) == -1) return -1;
2734 dictReleaseIterator(di
);
2736 redisAssert(0 != 0);
2741 /* Return the length the object will have on disk if saved with
2742 * the rdbSaveObject() function. Currently we use a trick to get
2743 * this length with very little changes to the code. In the future
2744 * we could switch to a faster solution. */
2745 static off_t
rdbSavedObjectLen(robj
*o
) {
2746 static FILE *fp
= NULL
;
2748 if (fp
== NULL
) fp
= fopen("/dev/null","w");
2752 assert(rdbSaveObject(fp
,o
) != 1);
2756 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2757 static int rdbSave(char *filename
) {
2758 dictIterator
*di
= NULL
;
2763 time_t now
= time(NULL
);
2765 snprintf(tmpfile
,256,"temp-%d.rdb", (int) getpid());
2766 fp
= fopen(tmpfile
,"w");
2768 redisLog(REDIS_WARNING
, "Failed saving the DB: %s", strerror(errno
));
2771 if (fwrite("REDIS0001",9,1,fp
) == 0) goto werr
;
2772 for (j
= 0; j
< server
.dbnum
; j
++) {
2773 redisDb
*db
= server
.db
+j
;
2775 if (dictSize(d
) == 0) continue;
2776 di
= dictGetIterator(d
);
2782 /* Write the SELECT DB opcode */
2783 if (rdbSaveType(fp
,REDIS_SELECTDB
) == -1) goto werr
;
2784 if (rdbSaveLen(fp
,j
) == -1) goto werr
;
2786 /* Iterate this DB writing every entry */
2787 while((de
= dictNext(di
)) != NULL
) {
2788 robj
*key
= dictGetEntryKey(de
);
2789 robj
*o
= dictGetEntryVal(de
);
2790 time_t expiretime
= getExpire(db
,key
);
2792 /* Save the expire time */
2793 if (expiretime
!= -1) {
2794 /* If this key is already expired skip it */
2795 if (expiretime
< now
) continue;
2796 if (rdbSaveType(fp
,REDIS_EXPIRETIME
) == -1) goto werr
;
2797 if (rdbSaveTime(fp
,expiretime
) == -1) goto werr
;
2799 /* Save the key and associated value */
2800 if (rdbSaveType(fp
,o
->type
) == -1) goto werr
;
2801 if (rdbSaveStringObject(fp
,key
) == -1) goto werr
;
2802 /* Save the actual value */
2803 if (rdbSaveObject(fp
,o
) == -1) goto werr
;
2805 dictReleaseIterator(di
);
2808 if (rdbSaveType(fp
,REDIS_EOF
) == -1) goto werr
;
2810 /* Make sure data will not remain on the OS's output buffers */
2815 /* Use RENAME to make sure the DB file is changed atomically only
2816 * if the generate DB file is ok. */
2817 if (rename(tmpfile
,filename
) == -1) {
2818 redisLog(REDIS_WARNING
,"Error moving temp DB file on the final destination: %s", strerror(errno
));
2822 redisLog(REDIS_NOTICE
,"DB saved on disk");
2824 server
.lastsave
= time(NULL
);
2830 redisLog(REDIS_WARNING
,"Write error saving DB on disk: %s", strerror(errno
));
2831 if (di
) dictReleaseIterator(di
);
2835 static int rdbSaveBackground(char *filename
) {
2838 if (server
.bgsavechildpid
!= -1) return REDIS_ERR
;
2839 if ((childpid
= fork()) == 0) {
2842 if (rdbSave(filename
) == REDIS_OK
) {
2849 if (childpid
== -1) {
2850 redisLog(REDIS_WARNING
,"Can't save in background: fork: %s",
2854 redisLog(REDIS_NOTICE
,"Background saving started by pid %d",childpid
);
2855 server
.bgsavechildpid
= childpid
;
2858 return REDIS_OK
; /* unreached */
2861 static void rdbRemoveTempFile(pid_t childpid
) {
2864 snprintf(tmpfile
,256,"temp-%d.rdb", (int) childpid
);
2868 static int rdbLoadType(FILE *fp
) {
2870 if (fread(&type
,1,1,fp
) == 0) return -1;
2874 static time_t rdbLoadTime(FILE *fp
) {
2876 if (fread(&t32
,4,1,fp
) == 0) return -1;
2877 return (time_t) t32
;
2880 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2881 * of this file for a description of how this are stored on disk.
2883 * isencoded is set to 1 if the readed length is not actually a length but
2884 * an "encoding type", check the above comments for more info */
2885 static uint32_t rdbLoadLen(FILE *fp
, int rdbver
, int *isencoded
) {
2886 unsigned char buf
[2];
2889 if (isencoded
) *isencoded
= 0;
2891 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
2896 if (fread(buf
,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2897 type
= (buf
[0]&0xC0)>>6;
2898 if (type
== REDIS_RDB_6BITLEN
) {
2899 /* Read a 6 bit len */
2901 } else if (type
== REDIS_RDB_ENCVAL
) {
2902 /* Read a 6 bit len encoding type */
2903 if (isencoded
) *isencoded
= 1;
2905 } else if (type
== REDIS_RDB_14BITLEN
) {
2906 /* Read a 14 bit len */
2907 if (fread(buf
+1,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2908 return ((buf
[0]&0x3F)<<8)|buf
[1];
2910 /* Read a 32 bit len */
2911 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
2917 static robj
*rdbLoadIntegerObject(FILE *fp
, int enctype
) {
2918 unsigned char enc
[4];
2921 if (enctype
== REDIS_RDB_ENC_INT8
) {
2922 if (fread(enc
,1,1,fp
) == 0) return NULL
;
2923 val
= (signed char)enc
[0];
2924 } else if (enctype
== REDIS_RDB_ENC_INT16
) {
2926 if (fread(enc
,2,1,fp
) == 0) return NULL
;
2927 v
= enc
[0]|(enc
[1]<<8);
2929 } else if (enctype
== REDIS_RDB_ENC_INT32
) {
2931 if (fread(enc
,4,1,fp
) == 0) return NULL
;
2932 v
= enc
[0]|(enc
[1]<<8)|(enc
[2]<<16)|(enc
[3]<<24);
2935 val
= 0; /* anti-warning */
2938 return createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",val
));
2941 static robj
*rdbLoadLzfStringObject(FILE*fp
, int rdbver
) {
2942 unsigned int len
, clen
;
2943 unsigned char *c
= NULL
;
2946 if ((clen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2947 if ((len
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2948 if ((c
= zmalloc(clen
)) == NULL
) goto err
;
2949 if ((val
= sdsnewlen(NULL
,len
)) == NULL
) goto err
;
2950 if (fread(c
,clen
,1,fp
) == 0) goto err
;
2951 if (lzf_decompress(c
,clen
,val
,len
) == 0) goto err
;
2953 return createObject(REDIS_STRING
,val
);
2960 static robj
*rdbLoadStringObject(FILE*fp
, int rdbver
) {
2965 len
= rdbLoadLen(fp
,rdbver
,&isencoded
);
2968 case REDIS_RDB_ENC_INT8
:
2969 case REDIS_RDB_ENC_INT16
:
2970 case REDIS_RDB_ENC_INT32
:
2971 return tryObjectSharing(rdbLoadIntegerObject(fp
,len
));
2972 case REDIS_RDB_ENC_LZF
:
2973 return tryObjectSharing(rdbLoadLzfStringObject(fp
,rdbver
));
2979 if (len
== REDIS_RDB_LENERR
) return NULL
;
2980 val
= sdsnewlen(NULL
,len
);
2981 if (len
&& fread(val
,len
,1,fp
) == 0) {
2985 return tryObjectSharing(createObject(REDIS_STRING
,val
));
2988 /* For information about double serialization check rdbSaveDoubleValue() */
2989 static int rdbLoadDoubleValue(FILE *fp
, double *val
) {
2993 if (fread(&len
,1,1,fp
) == 0) return -1;
2995 case 255: *val
= R_NegInf
; return 0;
2996 case 254: *val
= R_PosInf
; return 0;
2997 case 253: *val
= R_Nan
; return 0;
2999 if (fread(buf
,len
,1,fp
) == 0) return -1;
3001 sscanf(buf
, "%lg", val
);
3006 static int rdbLoad(char *filename
) {
3008 robj
*keyobj
= NULL
;
3010 int type
, retval
, rdbver
;
3011 dict
*d
= server
.db
[0].dict
;
3012 redisDb
*db
= server
.db
+0;
3014 time_t expiretime
= -1, now
= time(NULL
);
3016 fp
= fopen(filename
,"r");
3017 if (!fp
) return REDIS_ERR
;
3018 if (fread(buf
,9,1,fp
) == 0) goto eoferr
;
3020 if (memcmp(buf
,"REDIS",5) != 0) {
3022 redisLog(REDIS_WARNING
,"Wrong signature trying to load DB from file");
3025 rdbver
= atoi(buf
+5);
3028 redisLog(REDIS_WARNING
,"Can't handle RDB format version %d",rdbver
);
3035 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
3036 if (type
== REDIS_EXPIRETIME
) {
3037 if ((expiretime
= rdbLoadTime(fp
)) == -1) goto eoferr
;
3038 /* We read the time so we need to read the object type again */
3039 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
3041 if (type
== REDIS_EOF
) break;
3042 /* Handle SELECT DB opcode as a special case */
3043 if (type
== REDIS_SELECTDB
) {
3044 if ((dbid
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
3046 if (dbid
>= (unsigned)server
.dbnum
) {
3047 redisLog(REDIS_WARNING
,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server
.dbnum
);
3050 db
= server
.db
+dbid
;
3055 if ((keyobj
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
3057 if (type
== REDIS_STRING
) {
3058 /* Read string value */
3059 if ((o
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
3060 tryObjectEncoding(o
);
3061 } else if (type
== REDIS_LIST
|| type
== REDIS_SET
) {
3062 /* Read list/set value */
3065 if ((listlen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
3067 o
= (type
== REDIS_LIST
) ? createListObject() : createSetObject();
3068 /* Load every single element of the list/set */
3072 if ((ele
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
3073 tryObjectEncoding(ele
);
3074 if (type
== REDIS_LIST
) {
3075 listAddNodeTail((list
*)o
->ptr
,ele
);
3077 dictAdd((dict
*)o
->ptr
,ele
,NULL
);
3080 } else if (type
== REDIS_ZSET
) {
3081 /* Read list/set value */
3085 if ((zsetlen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
3087 o
= createZsetObject();
3089 /* Load every single element of the list/set */
3092 double *score
= zmalloc(sizeof(double));
3094 if ((ele
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
3095 tryObjectEncoding(ele
);
3096 if (rdbLoadDoubleValue(fp
,score
) == -1) goto eoferr
;
3097 dictAdd(zs
->dict
,ele
,score
);
3098 zslInsert(zs
->zsl
,*score
,ele
);
3099 incrRefCount(ele
); /* added to skiplist */
3102 redisAssert(0 != 0);
3104 /* Add the new object in the hash table */
3105 retval
= dictAdd(d
,keyobj
,o
);
3106 if (retval
== DICT_ERR
) {
3107 redisLog(REDIS_WARNING
,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj
->ptr
);
3110 /* Set the expire time if needed */
3111 if (expiretime
!= -1) {
3112 setExpire(db
,keyobj
,expiretime
);
3113 /* Delete this key if already expired */
3114 if (expiretime
< now
) deleteKey(db
,keyobj
);
3122 eoferr
: /* unexpected end of file is handled here with a fatal exit */
3123 if (keyobj
) decrRefCount(keyobj
);
3124 redisLog(REDIS_WARNING
,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
3126 return REDIS_ERR
; /* Just to avoid warning */
3129 /*================================== Commands =============================== */
3131 static void authCommand(redisClient
*c
) {
3132 if (!server
.requirepass
|| !strcmp(c
->argv
[1]->ptr
, server
.requirepass
)) {
3133 c
->authenticated
= 1;
3134 addReply(c
,shared
.ok
);
3136 c
->authenticated
= 0;
3137 addReplySds(c
,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
3141 static void pingCommand(redisClient
*c
) {
3142 addReply(c
,shared
.pong
);
3145 static void echoCommand(redisClient
*c
) {
3146 addReplyBulkLen(c
,c
->argv
[1]);
3147 addReply(c
,c
->argv
[1]);
3148 addReply(c
,shared
.crlf
);
3151 /*=================================== Strings =============================== */
3153 static void setGenericCommand(redisClient
*c
, int nx
) {
3156 if (nx
) deleteIfVolatile(c
->db
,c
->argv
[1]);
3157 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
3158 if (retval
== DICT_ERR
) {
3160 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
3161 incrRefCount(c
->argv
[2]);
3163 addReply(c
,shared
.czero
);
3167 incrRefCount(c
->argv
[1]);
3168 incrRefCount(c
->argv
[2]);
3171 removeExpire(c
->db
,c
->argv
[1]);
3172 addReply(c
, nx
? shared
.cone
: shared
.ok
);
3175 static void setCommand(redisClient
*c
) {
3176 setGenericCommand(c
,0);
3179 static void setnxCommand(redisClient
*c
) {
3180 setGenericCommand(c
,1);
3183 static int getGenericCommand(redisClient
*c
) {
3184 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3187 addReply(c
,shared
.nullbulk
);
3190 if (o
->type
!= REDIS_STRING
) {
3191 addReply(c
,shared
.wrongtypeerr
);
3194 addReplyBulkLen(c
,o
);
3196 addReply(c
,shared
.crlf
);
3202 static void getCommand(redisClient
*c
) {
3203 getGenericCommand(c
);
3206 static void getsetCommand(redisClient
*c
) {
3207 if (getGenericCommand(c
) == REDIS_ERR
) return;
3208 if (dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]) == DICT_ERR
) {
3209 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
3211 incrRefCount(c
->argv
[1]);
3213 incrRefCount(c
->argv
[2]);
3215 removeExpire(c
->db
,c
->argv
[1]);
3218 static void mgetCommand(redisClient
*c
) {
3221 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",c
->argc
-1));
3222 for (j
= 1; j
< c
->argc
; j
++) {
3223 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[j
]);
3225 addReply(c
,shared
.nullbulk
);
3227 if (o
->type
!= REDIS_STRING
) {
3228 addReply(c
,shared
.nullbulk
);
3230 addReplyBulkLen(c
,o
);
3232 addReply(c
,shared
.crlf
);
3238 static void msetGenericCommand(redisClient
*c
, int nx
) {
3239 int j
, busykeys
= 0;
3241 if ((c
->argc
% 2) == 0) {
3242 addReplySds(c
,sdsnew("-ERR wrong number of arguments for MSET\r\n"));
3245 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
3246 * set nothing at all if at least one already key exists. */
3248 for (j
= 1; j
< c
->argc
; j
+= 2) {
3249 if (lookupKeyWrite(c
->db
,c
->argv
[j
]) != NULL
) {
3255 addReply(c
, shared
.czero
);
3259 for (j
= 1; j
< c
->argc
; j
+= 2) {
3262 tryObjectEncoding(c
->argv
[j
+1]);
3263 retval
= dictAdd(c
->db
->dict
,c
->argv
[j
],c
->argv
[j
+1]);
3264 if (retval
== DICT_ERR
) {
3265 dictReplace(c
->db
->dict
,c
->argv
[j
],c
->argv
[j
+1]);
3266 incrRefCount(c
->argv
[j
+1]);
3268 incrRefCount(c
->argv
[j
]);
3269 incrRefCount(c
->argv
[j
+1]);
3271 removeExpire(c
->db
,c
->argv
[j
]);
3273 server
.dirty
+= (c
->argc
-1)/2;
3274 addReply(c
, nx
? shared
.cone
: shared
.ok
);
3277 static void msetCommand(redisClient
*c
) {
3278 msetGenericCommand(c
,0);
3281 static void msetnxCommand(redisClient
*c
) {
3282 msetGenericCommand(c
,1);
3285 static void incrDecrCommand(redisClient
*c
, long long incr
) {
3290 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3294 if (o
->type
!= REDIS_STRING
) {
3299 if (o
->encoding
== REDIS_ENCODING_RAW
)
3300 value
= strtoll(o
->ptr
, &eptr
, 10);
3301 else if (o
->encoding
== REDIS_ENCODING_INT
)
3302 value
= (long)o
->ptr
;
3304 redisAssert(1 != 1);
3309 o
= createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",value
));
3310 tryObjectEncoding(o
);
3311 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],o
);
3312 if (retval
== DICT_ERR
) {
3313 dictReplace(c
->db
->dict
,c
->argv
[1],o
);
3314 removeExpire(c
->db
,c
->argv
[1]);
3316 incrRefCount(c
->argv
[1]);
3319 addReply(c
,shared
.colon
);
3321 addReply(c
,shared
.crlf
);
3324 static void incrCommand(redisClient
*c
) {
3325 incrDecrCommand(c
,1);
3328 static void decrCommand(redisClient
*c
) {
3329 incrDecrCommand(c
,-1);
3332 static void incrbyCommand(redisClient
*c
) {
3333 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
3334 incrDecrCommand(c
,incr
);
3337 static void decrbyCommand(redisClient
*c
) {
3338 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
3339 incrDecrCommand(c
,-incr
);
3342 /* ========================= Type agnostic commands ========================= */
3344 static void delCommand(redisClient
*c
) {
3347 for (j
= 1; j
< c
->argc
; j
++) {
3348 if (deleteKey(c
->db
,c
->argv
[j
])) {
3355 addReply(c
,shared
.czero
);
3358 addReply(c
,shared
.cone
);
3361 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",deleted
));
3366 static void existsCommand(redisClient
*c
) {
3367 addReply(c
,lookupKeyRead(c
->db
,c
->argv
[1]) ? shared
.cone
: shared
.czero
);
3370 static void selectCommand(redisClient
*c
) {
3371 int id
= atoi(c
->argv
[1]->ptr
);
3373 if (selectDb(c
,id
) == REDIS_ERR
) {
3374 addReplySds(c
,sdsnew("-ERR invalid DB index\r\n"));
3376 addReply(c
,shared
.ok
);
3380 static void randomkeyCommand(redisClient
*c
) {
3384 de
= dictGetRandomKey(c
->db
->dict
);
3385 if (!de
|| expireIfNeeded(c
->db
,dictGetEntryKey(de
)) == 0) break;
3388 addReply(c
,shared
.plus
);
3389 addReply(c
,shared
.crlf
);
3391 addReply(c
,shared
.plus
);
3392 addReply(c
,dictGetEntryKey(de
));
3393 addReply(c
,shared
.crlf
);
3397 static void keysCommand(redisClient
*c
) {
3400 sds pattern
= c
->argv
[1]->ptr
;
3401 int plen
= sdslen(pattern
);
3402 unsigned long numkeys
= 0, keyslen
= 0;
3403 robj
*lenobj
= createObject(REDIS_STRING
,NULL
);
3405 di
= dictGetIterator(c
->db
->dict
);
3407 decrRefCount(lenobj
);
3408 while((de
= dictNext(di
)) != NULL
) {
3409 robj
*keyobj
= dictGetEntryKey(de
);
3411 sds key
= keyobj
->ptr
;
3412 if ((pattern
[0] == '*' && pattern
[1] == '\0') ||
3413 stringmatchlen(pattern
,plen
,key
,sdslen(key
),0)) {
3414 if (expireIfNeeded(c
->db
,keyobj
) == 0) {
3416 addReply(c
,shared
.space
);
3419 keyslen
+= sdslen(key
);
3423 dictReleaseIterator(di
);
3424 lenobj
->ptr
= sdscatprintf(sdsempty(),"$%lu\r\n",keyslen
+(numkeys
? (numkeys
-1) : 0));
3425 addReply(c
,shared
.crlf
);
3428 static void dbsizeCommand(redisClient
*c
) {
3430 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c
->db
->dict
)));
3433 static void lastsaveCommand(redisClient
*c
) {
3435 sdscatprintf(sdsempty(),":%lu\r\n",server
.lastsave
));
3438 static void typeCommand(redisClient
*c
) {
3442 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3447 case REDIS_STRING
: type
= "+string"; break;
3448 case REDIS_LIST
: type
= "+list"; break;
3449 case REDIS_SET
: type
= "+set"; break;
3450 case REDIS_ZSET
: type
= "+zset"; break;
3451 default: type
= "unknown"; break;
3454 addReplySds(c
,sdsnew(type
));
3455 addReply(c
,shared
.crlf
);
3458 static void saveCommand(redisClient
*c
) {
3459 if (server
.bgsavechildpid
!= -1) {
3460 addReplySds(c
,sdsnew("-ERR background save in progress\r\n"));
3463 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
3464 addReply(c
,shared
.ok
);
3466 addReply(c
,shared
.err
);
3470 static void bgsaveCommand(redisClient
*c
) {
3471 if (server
.bgsavechildpid
!= -1) {
3472 addReplySds(c
,sdsnew("-ERR background save already in progress\r\n"));
3475 if (rdbSaveBackground(server
.dbfilename
) == REDIS_OK
) {
3476 char *status
= "+Background saving started\r\n";
3477 addReplySds(c
,sdsnew(status
));
3479 addReply(c
,shared
.err
);
3483 static void shutdownCommand(redisClient
*c
) {
3484 redisLog(REDIS_WARNING
,"User requested shutdown, saving DB...");
3485 /* Kill the saving child if there is a background saving in progress.
3486 We want to avoid race conditions, for instance our saving child may
3487 overwrite the synchronous saving did by SHUTDOWN. */
3488 if (server
.bgsavechildpid
!= -1) {
3489 redisLog(REDIS_WARNING
,"There is a live saving child. Killing it!");
3490 kill(server
.bgsavechildpid
,SIGKILL
);
3491 rdbRemoveTempFile(server
.bgsavechildpid
);
3493 if (server
.appendonly
) {
3494 /* Append only file: fsync() the AOF and exit */
3495 fsync(server
.appendfd
);
3498 /* Snapshotting. Perform a SYNC SAVE and exit */
3499 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
3500 if (server
.daemonize
)
3501 unlink(server
.pidfile
);
3502 redisLog(REDIS_WARNING
,"%zu bytes used at exit",zmalloc_used_memory());
3503 redisLog(REDIS_WARNING
,"Server exit now, bye bye...");
3506 /* Ooops.. error saving! The best we can do is to continue operating.
3507 * Note that if there was a background saving process, in the next
3508 * cron() Redis will be notified that the background saving aborted,
3509 * handling special stuff like slaves pending for synchronization... */
3510 redisLog(REDIS_WARNING
,"Error trying to save the DB, can't exit");
3511 addReplySds(c
,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3516 static void renameGenericCommand(redisClient
*c
, int nx
) {
3519 /* To use the same key as src and dst is probably an error */
3520 if (sdscmp(c
->argv
[1]->ptr
,c
->argv
[2]->ptr
) == 0) {
3521 addReply(c
,shared
.sameobjecterr
);
3525 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3527 addReply(c
,shared
.nokeyerr
);
3531 deleteIfVolatile(c
->db
,c
->argv
[2]);
3532 if (dictAdd(c
->db
->dict
,c
->argv
[2],o
) == DICT_ERR
) {
3535 addReply(c
,shared
.czero
);
3538 dictReplace(c
->db
->dict
,c
->argv
[2],o
);
3540 incrRefCount(c
->argv
[2]);
3542 deleteKey(c
->db
,c
->argv
[1]);
3544 addReply(c
,nx
? shared
.cone
: shared
.ok
);
3547 static void renameCommand(redisClient
*c
) {
3548 renameGenericCommand(c
,0);
3551 static void renamenxCommand(redisClient
*c
) {
3552 renameGenericCommand(c
,1);
3555 static void moveCommand(redisClient
*c
) {
3560 /* Obtain source and target DB pointers */
3563 if (selectDb(c
,atoi(c
->argv
[2]->ptr
)) == REDIS_ERR
) {
3564 addReply(c
,shared
.outofrangeerr
);
3568 selectDb(c
,srcid
); /* Back to the source DB */
3570 /* If the user is moving using as target the same
3571 * DB as the source DB it is probably an error. */
3573 addReply(c
,shared
.sameobjecterr
);
3577 /* Check if the element exists and get a reference */
3578 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3580 addReply(c
,shared
.czero
);
3584 /* Try to add the element to the target DB */
3585 deleteIfVolatile(dst
,c
->argv
[1]);
3586 if (dictAdd(dst
->dict
,c
->argv
[1],o
) == DICT_ERR
) {
3587 addReply(c
,shared
.czero
);
3590 incrRefCount(c
->argv
[1]);
3593 /* OK! key moved, free the entry in the source DB */
3594 deleteKey(src
,c
->argv
[1]);
3596 addReply(c
,shared
.cone
);
3599 /* =================================== Lists ================================ */
3600 static void pushGenericCommand(redisClient
*c
, int where
) {
3604 lobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3606 if (handleClientsWaitingListPush(c
,c
->argv
[1],c
->argv
[2])) {
3607 addReply(c
,shared
.ok
);
3610 lobj
= createListObject();
3612 if (where
== REDIS_HEAD
) {
3613 listAddNodeHead(list
,c
->argv
[2]);
3615 listAddNodeTail(list
,c
->argv
[2]);
3617 dictAdd(c
->db
->dict
,c
->argv
[1],lobj
);
3618 incrRefCount(c
->argv
[1]);
3619 incrRefCount(c
->argv
[2]);
3621 if (lobj
->type
!= REDIS_LIST
) {
3622 addReply(c
,shared
.wrongtypeerr
);
3625 if (handleClientsWaitingListPush(c
,c
->argv
[1],c
->argv
[2])) {
3626 addReply(c
,shared
.ok
);
3630 if (where
== REDIS_HEAD
) {
3631 listAddNodeHead(list
,c
->argv
[2]);
3633 listAddNodeTail(list
,c
->argv
[2]);
3635 incrRefCount(c
->argv
[2]);
3638 addReply(c
,shared
.ok
);
3641 static void lpushCommand(redisClient
*c
) {
3642 pushGenericCommand(c
,REDIS_HEAD
);
3645 static void rpushCommand(redisClient
*c
) {
3646 pushGenericCommand(c
,REDIS_TAIL
);
3649 static void llenCommand(redisClient
*c
) {
3653 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3655 addReply(c
,shared
.czero
);
3658 if (o
->type
!= REDIS_LIST
) {
3659 addReply(c
,shared
.wrongtypeerr
);
3662 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",listLength(l
)));
3667 static void lindexCommand(redisClient
*c
) {
3669 int index
= atoi(c
->argv
[2]->ptr
);
3671 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3673 addReply(c
,shared
.nullbulk
);
3675 if (o
->type
!= REDIS_LIST
) {
3676 addReply(c
,shared
.wrongtypeerr
);
3678 list
*list
= o
->ptr
;
3681 ln
= listIndex(list
, index
);
3683 addReply(c
,shared
.nullbulk
);
3685 robj
*ele
= listNodeValue(ln
);
3686 addReplyBulkLen(c
,ele
);
3688 addReply(c
,shared
.crlf
);
3694 static void lsetCommand(redisClient
*c
) {
3696 int index
= atoi(c
->argv
[2]->ptr
);
3698 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3700 addReply(c
,shared
.nokeyerr
);
3702 if (o
->type
!= REDIS_LIST
) {
3703 addReply(c
,shared
.wrongtypeerr
);
3705 list
*list
= o
->ptr
;
3708 ln
= listIndex(list
, index
);
3710 addReply(c
,shared
.outofrangeerr
);
3712 robj
*ele
= listNodeValue(ln
);
3715 listNodeValue(ln
) = c
->argv
[3];
3716 incrRefCount(c
->argv
[3]);
3717 addReply(c
,shared
.ok
);
3724 static void popGenericCommand(redisClient
*c
, int where
) {
3727 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3729 addReply(c
,shared
.nullbulk
);
3731 if (o
->type
!= REDIS_LIST
) {
3732 addReply(c
,shared
.wrongtypeerr
);
3734 list
*list
= o
->ptr
;
3737 if (where
== REDIS_HEAD
)
3738 ln
= listFirst(list
);
3740 ln
= listLast(list
);
3743 addReply(c
,shared
.nullbulk
);
3745 robj
*ele
= listNodeValue(ln
);
3746 addReplyBulkLen(c
,ele
);
3748 addReply(c
,shared
.crlf
);
3749 listDelNode(list
,ln
);
3756 static void lpopCommand(redisClient
*c
) {
3757 popGenericCommand(c
,REDIS_HEAD
);
3760 static void rpopCommand(redisClient
*c
) {
3761 popGenericCommand(c
,REDIS_TAIL
);
3764 static void lrangeCommand(redisClient
*c
) {
3766 int start
= atoi(c
->argv
[2]->ptr
);
3767 int end
= atoi(c
->argv
[3]->ptr
);
3769 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3771 addReply(c
,shared
.nullmultibulk
);
3773 if (o
->type
!= REDIS_LIST
) {
3774 addReply(c
,shared
.wrongtypeerr
);
3776 list
*list
= o
->ptr
;
3778 int llen
= listLength(list
);
3782 /* convert negative indexes */
3783 if (start
< 0) start
= llen
+start
;
3784 if (end
< 0) end
= llen
+end
;
3785 if (start
< 0) start
= 0;
3786 if (end
< 0) end
= 0;
3788 /* indexes sanity checks */
3789 if (start
> end
|| start
>= llen
) {
3790 /* Out of range start or start > end result in empty list */
3791 addReply(c
,shared
.emptymultibulk
);
3794 if (end
>= llen
) end
= llen
-1;
3795 rangelen
= (end
-start
)+1;
3797 /* Return the result in form of a multi-bulk reply */
3798 ln
= listIndex(list
, start
);
3799 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",rangelen
));
3800 for (j
= 0; j
< rangelen
; j
++) {
3801 ele
= listNodeValue(ln
);
3802 addReplyBulkLen(c
,ele
);
3804 addReply(c
,shared
.crlf
);
3811 static void ltrimCommand(redisClient
*c
) {
3813 int start
= atoi(c
->argv
[2]->ptr
);
3814 int end
= atoi(c
->argv
[3]->ptr
);
3816 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3818 addReply(c
,shared
.ok
);
3820 if (o
->type
!= REDIS_LIST
) {
3821 addReply(c
,shared
.wrongtypeerr
);
3823 list
*list
= o
->ptr
;
3825 int llen
= listLength(list
);
3826 int j
, ltrim
, rtrim
;
3828 /* convert negative indexes */
3829 if (start
< 0) start
= llen
+start
;
3830 if (end
< 0) end
= llen
+end
;
3831 if (start
< 0) start
= 0;
3832 if (end
< 0) end
= 0;
3834 /* indexes sanity checks */
3835 if (start
> end
|| start
>= llen
) {
3836 /* Out of range start or start > end result in empty list */
3840 if (end
>= llen
) end
= llen
-1;
3845 /* Remove list elements to perform the trim */
3846 for (j
= 0; j
< ltrim
; j
++) {
3847 ln
= listFirst(list
);
3848 listDelNode(list
,ln
);
3850 for (j
= 0; j
< rtrim
; j
++) {
3851 ln
= listLast(list
);
3852 listDelNode(list
,ln
);
3855 addReply(c
,shared
.ok
);
3860 static void lremCommand(redisClient
*c
) {
3863 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3865 addReply(c
,shared
.czero
);
3867 if (o
->type
!= REDIS_LIST
) {
3868 addReply(c
,shared
.wrongtypeerr
);
3870 list
*list
= o
->ptr
;
3871 listNode
*ln
, *next
;
3872 int toremove
= atoi(c
->argv
[2]->ptr
);
3877 toremove
= -toremove
;
3880 ln
= fromtail
? list
->tail
: list
->head
;
3882 robj
*ele
= listNodeValue(ln
);
3884 next
= fromtail
? ln
->prev
: ln
->next
;
3885 if (compareStringObjects(ele
,c
->argv
[3]) == 0) {
3886 listDelNode(list
,ln
);
3889 if (toremove
&& removed
== toremove
) break;
3893 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",removed
));
3898 /* This is the semantic of this command:
3899 * RPOPLPUSH srclist dstlist:
3900 * IF LLEN(srclist) > 0
3901 * element = RPOP srclist
3902 * LPUSH dstlist element
3909 * The idea is to be able to get an element from a list in a reliable way
3910 * since the element is not just returned but pushed against another list
3911 * as well. This command was originally proposed by Ezra Zygmuntowicz.
3913 static void rpoplpushcommand(redisClient
*c
) {
3916 sobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3918 addReply(c
,shared
.nullbulk
);
3920 if (sobj
->type
!= REDIS_LIST
) {
3921 addReply(c
,shared
.wrongtypeerr
);
3923 list
*srclist
= sobj
->ptr
;
3924 listNode
*ln
= listLast(srclist
);
3927 addReply(c
,shared
.nullbulk
);
3929 robj
*dobj
= lookupKeyWrite(c
->db
,c
->argv
[2]);
3930 robj
*ele
= listNodeValue(ln
);
3933 if (dobj
&& dobj
->type
!= REDIS_LIST
) {
3934 addReply(c
,shared
.wrongtypeerr
);
3938 /* Add the element to the target list (unless it's directly
3939 * passed to some BLPOP-ing client */
3940 if (!handleClientsWaitingListPush(c
,c
->argv
[2],ele
)) {
3942 /* Create the list if the key does not exist */
3943 dobj
= createListObject();
3944 dictAdd(c
->db
->dict
,c
->argv
[2],dobj
);
3945 incrRefCount(c
->argv
[2]);
3947 dstlist
= dobj
->ptr
;
3948 listAddNodeHead(dstlist
,ele
);
3952 /* Send the element to the client as reply as well */
3953 addReplyBulkLen(c
,ele
);
3955 addReply(c
,shared
.crlf
);
3957 /* Finally remove the element from the source list */
3958 listDelNode(srclist
,ln
);
3966 /* ==================================== Sets ================================ */
3968 static void saddCommand(redisClient
*c
) {
3971 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3973 set
= createSetObject();
3974 dictAdd(c
->db
->dict
,c
->argv
[1],set
);
3975 incrRefCount(c
->argv
[1]);
3977 if (set
->type
!= REDIS_SET
) {
3978 addReply(c
,shared
.wrongtypeerr
);
3982 if (dictAdd(set
->ptr
,c
->argv
[2],NULL
) == DICT_OK
) {
3983 incrRefCount(c
->argv
[2]);
3985 addReply(c
,shared
.cone
);
3987 addReply(c
,shared
.czero
);
3991 static void sremCommand(redisClient
*c
) {
3994 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3996 addReply(c
,shared
.czero
);
3998 if (set
->type
!= REDIS_SET
) {
3999 addReply(c
,shared
.wrongtypeerr
);
4002 if (dictDelete(set
->ptr
,c
->argv
[2]) == DICT_OK
) {
4004 if (htNeedsResize(set
->ptr
)) dictResize(set
->ptr
);
4005 addReply(c
,shared
.cone
);
4007 addReply(c
,shared
.czero
);
4012 static void smoveCommand(redisClient
*c
) {
4013 robj
*srcset
, *dstset
;
4015 srcset
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4016 dstset
= lookupKeyWrite(c
->db
,c
->argv
[2]);
4018 /* If the source key does not exist return 0, if it's of the wrong type
4020 if (srcset
== NULL
|| srcset
->type
!= REDIS_SET
) {
4021 addReply(c
, srcset
? shared
.wrongtypeerr
: shared
.czero
);
4024 /* Error if the destination key is not a set as well */
4025 if (dstset
&& dstset
->type
!= REDIS_SET
) {
4026 addReply(c
,shared
.wrongtypeerr
);
4029 /* Remove the element from the source set */
4030 if (dictDelete(srcset
->ptr
,c
->argv
[3]) == DICT_ERR
) {
4031 /* Key not found in the src set! return zero */
4032 addReply(c
,shared
.czero
);
4036 /* Add the element to the destination set */
4038 dstset
= createSetObject();
4039 dictAdd(c
->db
->dict
,c
->argv
[2],dstset
);
4040 incrRefCount(c
->argv
[2]);
4042 if (dictAdd(dstset
->ptr
,c
->argv
[3],NULL
) == DICT_OK
)
4043 incrRefCount(c
->argv
[3]);
4044 addReply(c
,shared
.cone
);
4047 static void sismemberCommand(redisClient
*c
) {
4050 set
= lookupKeyRead(c
->db
,c
->argv
[1]);
4052 addReply(c
,shared
.czero
);
4054 if (set
->type
!= REDIS_SET
) {
4055 addReply(c
,shared
.wrongtypeerr
);
4058 if (dictFind(set
->ptr
,c
->argv
[2]))
4059 addReply(c
,shared
.cone
);
4061 addReply(c
,shared
.czero
);
4065 static void scardCommand(redisClient
*c
) {
4069 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4071 addReply(c
,shared
.czero
);
4074 if (o
->type
!= REDIS_SET
) {
4075 addReply(c
,shared
.wrongtypeerr
);
4078 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",
4084 static void spopCommand(redisClient
*c
) {
4088 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4090 addReply(c
,shared
.nullbulk
);
4092 if (set
->type
!= REDIS_SET
) {
4093 addReply(c
,shared
.wrongtypeerr
);
4096 de
= dictGetRandomKey(set
->ptr
);
4098 addReply(c
,shared
.nullbulk
);
4100 robj
*ele
= dictGetEntryKey(de
);
4102 addReplyBulkLen(c
,ele
);
4104 addReply(c
,shared
.crlf
);
4105 dictDelete(set
->ptr
,ele
);
4106 if (htNeedsResize(set
->ptr
)) dictResize(set
->ptr
);
4112 static void srandmemberCommand(redisClient
*c
) {
4116 set
= lookupKeyRead(c
->db
,c
->argv
[1]);
4118 addReply(c
,shared
.nullbulk
);
4120 if (set
->type
!= REDIS_SET
) {
4121 addReply(c
,shared
.wrongtypeerr
);
4124 de
= dictGetRandomKey(set
->ptr
);
4126 addReply(c
,shared
.nullbulk
);
4128 robj
*ele
= dictGetEntryKey(de
);
4130 addReplyBulkLen(c
,ele
);
4132 addReply(c
,shared
.crlf
);
4137 static int qsortCompareSetsByCardinality(const void *s1
, const void *s2
) {
4138 dict
**d1
= (void*) s1
, **d2
= (void*) s2
;
4140 return dictSize(*d1
)-dictSize(*d2
);
4143 static void sinterGenericCommand(redisClient
*c
, robj
**setskeys
, unsigned long setsnum
, robj
*dstkey
) {
4144 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
4147 robj
*lenobj
= NULL
, *dstset
= NULL
;
4148 unsigned long j
, cardinality
= 0;
4150 for (j
= 0; j
< setsnum
; j
++) {
4154 lookupKeyWrite(c
->db
,setskeys
[j
]) :
4155 lookupKeyRead(c
->db
,setskeys
[j
]);
4159 if (deleteKey(c
->db
,dstkey
))
4161 addReply(c
,shared
.czero
);
4163 addReply(c
,shared
.nullmultibulk
);
4167 if (setobj
->type
!= REDIS_SET
) {
4169 addReply(c
,shared
.wrongtypeerr
);
4172 dv
[j
] = setobj
->ptr
;
4174 /* Sort sets from the smallest to largest, this will improve our
4175 * algorithm's performace */
4176 qsort(dv
,setsnum
,sizeof(dict
*),qsortCompareSetsByCardinality
);
4178 /* The first thing we should output is the total number of elements...
4179 * since this is a multi-bulk write, but at this stage we don't know
4180 * the intersection set size, so we use a trick, append an empty object
4181 * to the output list and save the pointer to later modify it with the
4184 lenobj
= createObject(REDIS_STRING
,NULL
);
4186 decrRefCount(lenobj
);
4188 /* If we have a target key where to store the resulting set
4189 * create this key with an empty set inside */
4190 dstset
= createSetObject();
4193 /* Iterate all the elements of the first (smallest) set, and test
4194 * the element against all the other sets, if at least one set does
4195 * not include the element it is discarded */
4196 di
= dictGetIterator(dv
[0]);
4198 while((de
= dictNext(di
)) != NULL
) {
4201 for (j
= 1; j
< setsnum
; j
++)
4202 if (dictFind(dv
[j
],dictGetEntryKey(de
)) == NULL
) break;
4204 continue; /* at least one set does not contain the member */
4205 ele
= dictGetEntryKey(de
);
4207 addReplyBulkLen(c
,ele
);
4209 addReply(c
,shared
.crlf
);
4212 dictAdd(dstset
->ptr
,ele
,NULL
);
4216 dictReleaseIterator(di
);
4219 /* Store the resulting set into the target */
4220 deleteKey(c
->db
,dstkey
);
4221 dictAdd(c
->db
->dict
,dstkey
,dstset
);
4222 incrRefCount(dstkey
);
4226 lenobj
->ptr
= sdscatprintf(sdsempty(),"*%lu\r\n",cardinality
);
4228 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",
4229 dictSize((dict
*)dstset
->ptr
)));
4235 static void sinterCommand(redisClient
*c
) {
4236 sinterGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
);
4239 static void sinterstoreCommand(redisClient
*c
) {
4240 sinterGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1]);
4243 #define REDIS_OP_UNION 0
4244 #define REDIS_OP_DIFF 1
4246 static void sunionDiffGenericCommand(redisClient
*c
, robj
**setskeys
, int setsnum
, robj
*dstkey
, int op
) {
4247 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
4250 robj
*dstset
= NULL
;
4251 int j
, cardinality
= 0;
4253 for (j
= 0; j
< setsnum
; j
++) {
4257 lookupKeyWrite(c
->db
,setskeys
[j
]) :
4258 lookupKeyRead(c
->db
,setskeys
[j
]);
4263 if (setobj
->type
!= REDIS_SET
) {
4265 addReply(c
,shared
.wrongtypeerr
);
4268 dv
[j
] = setobj
->ptr
;
4271 /* We need a temp set object to store our union. If the dstkey
4272 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
4273 * this set object will be the resulting object to set into the target key*/
4274 dstset
= createSetObject();
4276 /* Iterate all the elements of all the sets, add every element a single
4277 * time to the result set */
4278 for (j
= 0; j
< setsnum
; j
++) {
4279 if (op
== REDIS_OP_DIFF
&& j
== 0 && !dv
[j
]) break; /* result set is empty */
4280 if (!dv
[j
]) continue; /* non existing keys are like empty sets */
4282 di
= dictGetIterator(dv
[j
]);
4284 while((de
= dictNext(di
)) != NULL
) {
4287 /* dictAdd will not add the same element multiple times */
4288 ele
= dictGetEntryKey(de
);
4289 if (op
== REDIS_OP_UNION
|| j
== 0) {
4290 if (dictAdd(dstset
->ptr
,ele
,NULL
) == DICT_OK
) {
4294 } else if (op
== REDIS_OP_DIFF
) {
4295 if (dictDelete(dstset
->ptr
,ele
) == DICT_OK
) {
4300 dictReleaseIterator(di
);
4302 if (op
== REDIS_OP_DIFF
&& cardinality
== 0) break; /* result set is empty */
4305 /* Output the content of the resulting set, if not in STORE mode */
4307 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",cardinality
));
4308 di
= dictGetIterator(dstset
->ptr
);
4309 while((de
= dictNext(di
)) != NULL
) {
4312 ele
= dictGetEntryKey(de
);
4313 addReplyBulkLen(c
,ele
);
4315 addReply(c
,shared
.crlf
);
4317 dictReleaseIterator(di
);
4319 /* If we have a target key where to store the resulting set
4320 * create this key with the result set inside */
4321 deleteKey(c
->db
,dstkey
);
4322 dictAdd(c
->db
->dict
,dstkey
,dstset
);
4323 incrRefCount(dstkey
);
4328 decrRefCount(dstset
);
4330 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",
4331 dictSize((dict
*)dstset
->ptr
)));
4337 static void sunionCommand(redisClient
*c
) {
4338 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_UNION
);
4341 static void sunionstoreCommand(redisClient
*c
) {
4342 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_UNION
);
4345 static void sdiffCommand(redisClient
*c
) {
4346 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_DIFF
);
4349 static void sdiffstoreCommand(redisClient
*c
) {
4350 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_DIFF
);
4353 /* ==================================== ZSets =============================== */
4355 /* ZSETs are ordered sets using two data structures to hold the same elements
4356 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
4359 * The elements are added to an hash table mapping Redis objects to scores.
4360 * At the same time the elements are added to a skip list mapping scores
4361 * to Redis objects (so objects are sorted by scores in this "view"). */
4363 /* This skiplist implementation is almost a C translation of the original
4364 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
4365 * Alternative to Balanced Trees", modified in three ways:
4366 * a) this implementation allows for repeated values.
4367 * b) the comparison is not just by key (our 'score') but by satellite data.
4368 * c) there is a back pointer, so it's a doubly linked list with the back
4369 * pointers being only at "level 1". This allows to traverse the list
4370 * from tail to head, useful for ZREVRANGE. */
4372 static zskiplistNode
*zslCreateNode(int level
, double score
, robj
*obj
) {
4373 zskiplistNode
*zn
= zmalloc(sizeof(*zn
));
4375 zn
->forward
= zmalloc(sizeof(zskiplistNode
*) * level
);
4381 static zskiplist
*zslCreate(void) {
4385 zsl
= zmalloc(sizeof(*zsl
));
4388 zsl
->header
= zslCreateNode(ZSKIPLIST_MAXLEVEL
,0,NULL
);
4389 for (j
= 0; j
< ZSKIPLIST_MAXLEVEL
; j
++)
4390 zsl
->header
->forward
[j
] = NULL
;
4391 zsl
->header
->backward
= NULL
;
4396 static void zslFreeNode(zskiplistNode
*node
) {
4397 decrRefCount(node
->obj
);
4398 zfree(node
->forward
);
4402 static void zslFree(zskiplist
*zsl
) {
4403 zskiplistNode
*node
= zsl
->header
->forward
[0], *next
;
4405 zfree(zsl
->header
->forward
);
4408 next
= node
->forward
[0];
4415 static int zslRandomLevel(void) {
4417 while ((random()&0xFFFF) < (ZSKIPLIST_P
* 0xFFFF))
4422 static void zslInsert(zskiplist
*zsl
, double score
, robj
*obj
) {
4423 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
4427 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4428 while (x
->forward
[i
] &&
4429 (x
->forward
[i
]->score
< score
||
4430 (x
->forward
[i
]->score
== score
&&
4431 compareStringObjects(x
->forward
[i
]->obj
,obj
) < 0)))
4435 /* we assume the key is not already inside, since we allow duplicated
4436 * scores, and the re-insertion of score and redis object should never
4437 * happpen since the caller of zslInsert() should test in the hash table
4438 * if the element is already inside or not. */
4439 level
= zslRandomLevel();
4440 if (level
> zsl
->level
) {
4441 for (i
= zsl
->level
; i
< level
; i
++)
4442 update
[i
] = zsl
->header
;
4445 x
= zslCreateNode(level
,score
,obj
);
4446 for (i
= 0; i
< level
; i
++) {
4447 x
->forward
[i
] = update
[i
]->forward
[i
];
4448 update
[i
]->forward
[i
] = x
;
4450 x
->backward
= (update
[0] == zsl
->header
) ? NULL
: update
[0];
4452 x
->forward
[0]->backward
= x
;
4458 /* Delete an element with matching score/object from the skiplist. */
4459 static int zslDelete(zskiplist
*zsl
, double score
, robj
*obj
) {
4460 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
4464 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4465 while (x
->forward
[i
] &&
4466 (x
->forward
[i
]->score
< score
||
4467 (x
->forward
[i
]->score
== score
&&
4468 compareStringObjects(x
->forward
[i
]->obj
,obj
) < 0)))
4472 /* We may have multiple elements with the same score, what we need
4473 * is to find the element with both the right score and object. */
4475 if (x
&& score
== x
->score
&& compareStringObjects(x
->obj
,obj
) == 0) {
4476 for (i
= 0; i
< zsl
->level
; i
++) {
4477 if (update
[i
]->forward
[i
] != x
) break;
4478 update
[i
]->forward
[i
] = x
->forward
[i
];
4480 if (x
->forward
[0]) {
4481 x
->forward
[0]->backward
= (x
->backward
== zsl
->header
) ?
4484 zsl
->tail
= x
->backward
;
4487 while(zsl
->level
> 1 && zsl
->header
->forward
[zsl
->level
-1] == NULL
)
4492 return 0; /* not found */
4494 return 0; /* not found */
4497 /* Delete all the elements with score between min and max from the skiplist.
4498 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
4499 * Note that this function takes the reference to the hash table view of the
4500 * sorted set, in order to remove the elements from the hash table too. */
4501 static unsigned long zslDeleteRange(zskiplist
*zsl
, double min
, double max
, dict
*dict
) {
4502 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
4503 unsigned long removed
= 0;
4507 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4508 while (x
->forward
[i
] && x
->forward
[i
]->score
< min
)
4512 /* We may have multiple elements with the same score, what we need
4513 * is to find the element with both the right score and object. */
4515 while (x
&& x
->score
<= max
) {
4516 zskiplistNode
*next
;
4518 for (i
= 0; i
< zsl
->level
; i
++) {
4519 if (update
[i
]->forward
[i
] != x
) break;
4520 update
[i
]->forward
[i
] = x
->forward
[i
];
4522 if (x
->forward
[0]) {
4523 x
->forward
[0]->backward
= (x
->backward
== zsl
->header
) ?
4526 zsl
->tail
= x
->backward
;
4528 next
= x
->forward
[0];
4529 dictDelete(dict
,x
->obj
);
4531 while(zsl
->level
> 1 && zsl
->header
->forward
[zsl
->level
-1] == NULL
)
4537 return removed
; /* not found */
4540 /* Find the first node having a score equal or greater than the specified one.
4541 * Returns NULL if there is no match. */
4542 static zskiplistNode
*zslFirstWithScore(zskiplist
*zsl
, double score
) {
4547 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4548 while (x
->forward
[i
] && x
->forward
[i
]->score
< score
)
4551 /* We may have multiple elements with the same score, what we need
4552 * is to find the element with both the right score and object. */
4553 return x
->forward
[0];
4556 /* The actual Z-commands implementations */
4558 /* This generic command implements both ZADD and ZINCRBY.
4559 * scoreval is the score if the operation is a ZADD (doincrement == 0) or
4560 * the increment if the operation is a ZINCRBY (doincrement == 1). */
4561 static void zaddGenericCommand(redisClient
*c
, robj
*key
, robj
*ele
, double scoreval
, int doincrement
) {
4566 zsetobj
= lookupKeyWrite(c
->db
,key
);
4567 if (zsetobj
== NULL
) {
4568 zsetobj
= createZsetObject();
4569 dictAdd(c
->db
->dict
,key
,zsetobj
);
4572 if (zsetobj
->type
!= REDIS_ZSET
) {
4573 addReply(c
,shared
.wrongtypeerr
);
4579 /* Ok now since we implement both ZADD and ZINCRBY here the code
4580 * needs to handle the two different conditions. It's all about setting
4581 * '*score', that is, the new score to set, to the right value. */
4582 score
= zmalloc(sizeof(double));
4586 /* Read the old score. If the element was not present starts from 0 */
4587 de
= dictFind(zs
->dict
,ele
);
4589 double *oldscore
= dictGetEntryVal(de
);
4590 *score
= *oldscore
+ scoreval
;
4598 /* What follows is a simple remove and re-insert operation that is common
4599 * to both ZADD and ZINCRBY... */
4600 if (dictAdd(zs
->dict
,ele
,score
) == DICT_OK
) {
4601 /* case 1: New element */
4602 incrRefCount(ele
); /* added to hash */
4603 zslInsert(zs
->zsl
,*score
,ele
);
4604 incrRefCount(ele
); /* added to skiplist */
4607 addReplyDouble(c
,*score
);
4609 addReply(c
,shared
.cone
);
4614 /* case 2: Score update operation */
4615 de
= dictFind(zs
->dict
,ele
);
4616 redisAssert(de
!= NULL
);
4617 oldscore
= dictGetEntryVal(de
);
4618 if (*score
!= *oldscore
) {
4621 /* Remove and insert the element in the skip list with new score */
4622 deleted
= zslDelete(zs
->zsl
,*oldscore
,ele
);
4623 redisAssert(deleted
!= 0);
4624 zslInsert(zs
->zsl
,*score
,ele
);
4626 /* Update the score in the hash table */
4627 dictReplace(zs
->dict
,ele
,score
);
4633 addReplyDouble(c
,*score
);
4635 addReply(c
,shared
.czero
);
4639 static void zaddCommand(redisClient
*c
) {
4642 scoreval
= strtod(c
->argv
[2]->ptr
,NULL
);
4643 zaddGenericCommand(c
,c
->argv
[1],c
->argv
[3],scoreval
,0);
4646 static void zincrbyCommand(redisClient
*c
) {
4649 scoreval
= strtod(c
->argv
[2]->ptr
,NULL
);
4650 zaddGenericCommand(c
,c
->argv
[1],c
->argv
[3],scoreval
,1);
4653 static void zremCommand(redisClient
*c
) {
4657 zsetobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4658 if (zsetobj
== NULL
) {
4659 addReply(c
,shared
.czero
);
4665 if (zsetobj
->type
!= REDIS_ZSET
) {
4666 addReply(c
,shared
.wrongtypeerr
);
4670 de
= dictFind(zs
->dict
,c
->argv
[2]);
4672 addReply(c
,shared
.czero
);
4675 /* Delete from the skiplist */
4676 oldscore
= dictGetEntryVal(de
);
4677 deleted
= zslDelete(zs
->zsl
,*oldscore
,c
->argv
[2]);
4678 redisAssert(deleted
!= 0);
4680 /* Delete from the hash table */
4681 dictDelete(zs
->dict
,c
->argv
[2]);
4682 if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
);
4684 addReply(c
,shared
.cone
);
4688 static void zremrangebyscoreCommand(redisClient
*c
) {
4689 double min
= strtod(c
->argv
[2]->ptr
,NULL
);
4690 double max
= strtod(c
->argv
[3]->ptr
,NULL
);
4694 zsetobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4695 if (zsetobj
== NULL
) {
4696 addReply(c
,shared
.czero
);
4700 if (zsetobj
->type
!= REDIS_ZSET
) {
4701 addReply(c
,shared
.wrongtypeerr
);
4705 deleted
= zslDeleteRange(zs
->zsl
,min
,max
,zs
->dict
);
4706 if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
);
4707 server
.dirty
+= deleted
;
4708 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",deleted
));
4712 static void zrangeGenericCommand(redisClient
*c
, int reverse
) {
4714 int start
= atoi(c
->argv
[2]->ptr
);
4715 int end
= atoi(c
->argv
[3]->ptr
);
4718 if (c
->argc
== 5 && !strcasecmp(c
->argv
[4]->ptr
,"withscores")) {
4720 } else if (c
->argc
>= 5) {
4721 addReply(c
,shared
.syntaxerr
);
4725 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4727 addReply(c
,shared
.nullmultibulk
);
4729 if (o
->type
!= REDIS_ZSET
) {
4730 addReply(c
,shared
.wrongtypeerr
);
4732 zset
*zsetobj
= o
->ptr
;
4733 zskiplist
*zsl
= zsetobj
->zsl
;
4736 int llen
= zsl
->length
;
4740 /* convert negative indexes */
4741 if (start
< 0) start
= llen
+start
;
4742 if (end
< 0) end
= llen
+end
;
4743 if (start
< 0) start
= 0;
4744 if (end
< 0) end
= 0;
4746 /* indexes sanity checks */
4747 if (start
> end
|| start
>= llen
) {
4748 /* Out of range start or start > end result in empty list */
4749 addReply(c
,shared
.emptymultibulk
);
4752 if (end
>= llen
) end
= llen
-1;
4753 rangelen
= (end
-start
)+1;
4755 /* Return the result in form of a multi-bulk reply */
4761 ln
= zsl
->header
->forward
[0];
4763 ln
= ln
->forward
[0];
4766 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",
4767 withscores
? (rangelen
*2) : rangelen
));
4768 for (j
= 0; j
< rangelen
; j
++) {
4770 addReplyBulkLen(c
,ele
);
4772 addReply(c
,shared
.crlf
);
4774 addReplyDouble(c
,ln
->score
);
4775 ln
= reverse
? ln
->backward
: ln
->forward
[0];
4781 static void zrangeCommand(redisClient
*c
) {
4782 zrangeGenericCommand(c
,0);
4785 static void zrevrangeCommand(redisClient
*c
) {
4786 zrangeGenericCommand(c
,1);
4789 static void zrangebyscoreCommand(redisClient
*c
) {
4791 double min
= strtod(c
->argv
[2]->ptr
,NULL
);
4792 double max
= strtod(c
->argv
[3]->ptr
,NULL
);
4793 int offset
= 0, limit
= -1;
4795 if (c
->argc
!= 4 && c
->argc
!= 7) {
4797 sdsnew("-ERR wrong number of arguments for ZRANGEBYSCORE\r\n"));
4799 } else if (c
->argc
== 7 && strcasecmp(c
->argv
[4]->ptr
,"limit")) {
4800 addReply(c
,shared
.syntaxerr
);
4802 } else if (c
->argc
== 7) {
4803 offset
= atoi(c
->argv
[5]->ptr
);
4804 limit
= atoi(c
->argv
[6]->ptr
);
4805 if (offset
< 0) offset
= 0;
4808 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4810 addReply(c
,shared
.nullmultibulk
);
4812 if (o
->type
!= REDIS_ZSET
) {
4813 addReply(c
,shared
.wrongtypeerr
);
4815 zset
*zsetobj
= o
->ptr
;
4816 zskiplist
*zsl
= zsetobj
->zsl
;
4819 unsigned int rangelen
= 0;
4821 /* Get the first node with the score >= min */
4822 ln
= zslFirstWithScore(zsl
,min
);
4824 /* No element matching the speciifed interval */
4825 addReply(c
,shared
.emptymultibulk
);
4829 /* We don't know in advance how many matching elements there
4830 * are in the list, so we push this object that will represent
4831 * the multi-bulk length in the output buffer, and will "fix"
4833 lenobj
= createObject(REDIS_STRING
,NULL
);
4835 decrRefCount(lenobj
);
4837 while(ln
&& ln
->score
<= max
) {
4840 ln
= ln
->forward
[0];
4843 if (limit
== 0) break;
4845 addReplyBulkLen(c
,ele
);
4847 addReply(c
,shared
.crlf
);
4848 ln
= ln
->forward
[0];
4850 if (limit
> 0) limit
--;
4852 lenobj
->ptr
= sdscatprintf(sdsempty(),"*%d\r\n",rangelen
);
4857 static void zcardCommand(redisClient
*c
) {
4861 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4863 addReply(c
,shared
.czero
);
4866 if (o
->type
!= REDIS_ZSET
) {
4867 addReply(c
,shared
.wrongtypeerr
);
4870 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",zs
->zsl
->length
));
4875 static void zscoreCommand(redisClient
*c
) {
4879 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4881 addReply(c
,shared
.nullbulk
);
4884 if (o
->type
!= REDIS_ZSET
) {
4885 addReply(c
,shared
.wrongtypeerr
);
4890 de
= dictFind(zs
->dict
,c
->argv
[2]);
4892 addReply(c
,shared
.nullbulk
);
4894 double *score
= dictGetEntryVal(de
);
4896 addReplyDouble(c
,*score
);
4902 /* ========================= Non type-specific commands ==================== */
4904 static void flushdbCommand(redisClient
*c
) {
4905 server
.dirty
+= dictSize(c
->db
->dict
);
4906 dictEmpty(c
->db
->dict
);
4907 dictEmpty(c
->db
->expires
);
4908 addReply(c
,shared
.ok
);
4911 static void flushallCommand(redisClient
*c
) {
4912 server
.dirty
+= emptyDb();
4913 addReply(c
,shared
.ok
);
4914 rdbSave(server
.dbfilename
);
4918 static redisSortOperation
*createSortOperation(int type
, robj
*pattern
) {
4919 redisSortOperation
*so
= zmalloc(sizeof(*so
));
4921 so
->pattern
= pattern
;
4925 /* Return the value associated to the key with a name obtained
4926 * substituting the first occurence of '*' in 'pattern' with 'subst' */
4927 static robj
*lookupKeyByPattern(redisDb
*db
, robj
*pattern
, robj
*subst
) {
4931 int prefixlen
, sublen
, postfixlen
;
4932 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
4936 char buf
[REDIS_SORTKEY_MAX
+1];
4939 /* If the pattern is "#" return the substitution object itself in order
4940 * to implement the "SORT ... GET #" feature. */
4941 spat
= pattern
->ptr
;
4942 if (spat
[0] == '#' && spat
[1] == '\0') {
4946 /* The substitution object may be specially encoded. If so we create
4947 * a decoded object on the fly. Otherwise getDecodedObject will just
4948 * increment the ref count, that we'll decrement later. */
4949 subst
= getDecodedObject(subst
);
4952 if (sdslen(spat
)+sdslen(ssub
)-1 > REDIS_SORTKEY_MAX
) return NULL
;
4953 p
= strchr(spat
,'*');
4955 decrRefCount(subst
);
4960 sublen
= sdslen(ssub
);
4961 postfixlen
= sdslen(spat
)-(prefixlen
+1);
4962 memcpy(keyname
.buf
,spat
,prefixlen
);
4963 memcpy(keyname
.buf
+prefixlen
,ssub
,sublen
);
4964 memcpy(keyname
.buf
+prefixlen
+sublen
,p
+1,postfixlen
);
4965 keyname
.buf
[prefixlen
+sublen
+postfixlen
] = '\0';
4966 keyname
.len
= prefixlen
+sublen
+postfixlen
;
4968 initStaticStringObject(keyobj
,((char*)&keyname
)+(sizeof(long)*2))
4969 decrRefCount(subst
);
4971 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
4972 return lookupKeyRead(db
,&keyobj
);
4975 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
4976 * the additional parameter is not standard but a BSD-specific we have to
4977 * pass sorting parameters via the global 'server' structure */
4978 static int sortCompare(const void *s1
, const void *s2
) {
4979 const redisSortObject
*so1
= s1
, *so2
= s2
;
4982 if (!server
.sort_alpha
) {
4983 /* Numeric sorting. Here it's trivial as we precomputed scores */
4984 if (so1
->u
.score
> so2
->u
.score
) {
4986 } else if (so1
->u
.score
< so2
->u
.score
) {
4992 /* Alphanumeric sorting */
4993 if (server
.sort_bypattern
) {
4994 if (!so1
->u
.cmpobj
|| !so2
->u
.cmpobj
) {
4995 /* At least one compare object is NULL */
4996 if (so1
->u
.cmpobj
== so2
->u
.cmpobj
)
4998 else if (so1
->u
.cmpobj
== NULL
)
5003 /* We have both the objects, use strcoll */
5004 cmp
= strcoll(so1
->u
.cmpobj
->ptr
,so2
->u
.cmpobj
->ptr
);
5007 /* Compare elements directly */
5010 dec1
= getDecodedObject(so1
->obj
);
5011 dec2
= getDecodedObject(so2
->obj
);
5012 cmp
= strcoll(dec1
->ptr
,dec2
->ptr
);
5017 return server
.sort_desc
? -cmp
: cmp
;
5020 /* The SORT command is the most complex command in Redis. Warning: this code
5021 * is optimized for speed and a bit less for readability */
5022 static void sortCommand(redisClient
*c
) {
5025 int desc
= 0, alpha
= 0;
5026 int limit_start
= 0, limit_count
= -1, start
, end
;
5027 int j
, dontsort
= 0, vectorlen
;
5028 int getop
= 0; /* GET operation counter */
5029 robj
*sortval
, *sortby
= NULL
, *storekey
= NULL
;
5030 redisSortObject
*vector
; /* Resulting vector to sort */
5032 /* Lookup the key to sort. It must be of the right types */
5033 sortval
= lookupKeyRead(c
->db
,c
->argv
[1]);
5034 if (sortval
== NULL
) {
5035 addReply(c
,shared
.nullmultibulk
);
5038 if (sortval
->type
!= REDIS_SET
&& sortval
->type
!= REDIS_LIST
&&
5039 sortval
->type
!= REDIS_ZSET
)
5041 addReply(c
,shared
.wrongtypeerr
);
5045 /* Create a list of operations to perform for every sorted element.
5046 * Operations can be GET/DEL/INCR/DECR */
5047 operations
= listCreate();
5048 listSetFreeMethod(operations
,zfree
);
5051 /* Now we need to protect sortval incrementing its count, in the future
5052 * SORT may have options able to overwrite/delete keys during the sorting
5053 * and the sorted key itself may get destroied */
5054 incrRefCount(sortval
);
5056 /* The SORT command has an SQL-alike syntax, parse it */
5057 while(j
< c
->argc
) {
5058 int leftargs
= c
->argc
-j
-1;
5059 if (!strcasecmp(c
->argv
[j
]->ptr
,"asc")) {
5061 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"desc")) {
5063 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"alpha")) {
5065 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"limit") && leftargs
>= 2) {
5066 limit_start
= atoi(c
->argv
[j
+1]->ptr
);
5067 limit_count
= atoi(c
->argv
[j
+2]->ptr
);
5069 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"store") && leftargs
>= 1) {
5070 storekey
= c
->argv
[j
+1];
5072 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"by") && leftargs
>= 1) {
5073 sortby
= c
->argv
[j
+1];
5074 /* If the BY pattern does not contain '*', i.e. it is constant,
5075 * we don't need to sort nor to lookup the weight keys. */
5076 if (strchr(c
->argv
[j
+1]->ptr
,'*') == NULL
) dontsort
= 1;
5078 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"get") && leftargs
>= 1) {
5079 listAddNodeTail(operations
,createSortOperation(
5080 REDIS_SORT_GET
,c
->argv
[j
+1]));
5084 decrRefCount(sortval
);
5085 listRelease(operations
);
5086 addReply(c
,shared
.syntaxerr
);
5092 /* Load the sorting vector with all the objects to sort */
5093 switch(sortval
->type
) {
5094 case REDIS_LIST
: vectorlen
= listLength((list
*)sortval
->ptr
); break;
5095 case REDIS_SET
: vectorlen
= dictSize((dict
*)sortval
->ptr
); break;
5096 case REDIS_ZSET
: vectorlen
= dictSize(((zset
*)sortval
->ptr
)->dict
); break;
5097 default: vectorlen
= 0; redisAssert(0); /* Avoid GCC warning */
5099 vector
= zmalloc(sizeof(redisSortObject
)*vectorlen
);
5102 if (sortval
->type
== REDIS_LIST
) {
5103 list
*list
= sortval
->ptr
;
5107 while((ln
= listYield(list
))) {
5108 robj
*ele
= ln
->value
;
5109 vector
[j
].obj
= ele
;
5110 vector
[j
].u
.score
= 0;
5111 vector
[j
].u
.cmpobj
= NULL
;
5119 if (sortval
->type
== REDIS_SET
) {
5122 zset
*zs
= sortval
->ptr
;
5126 di
= dictGetIterator(set
);
5127 while((setele
= dictNext(di
)) != NULL
) {
5128 vector
[j
].obj
= dictGetEntryKey(setele
);
5129 vector
[j
].u
.score
= 0;
5130 vector
[j
].u
.cmpobj
= NULL
;
5133 dictReleaseIterator(di
);
5135 redisAssert(j
== vectorlen
);
5137 /* Now it's time to load the right scores in the sorting vector */
5138 if (dontsort
== 0) {
5139 for (j
= 0; j
< vectorlen
; j
++) {
5143 byval
= lookupKeyByPattern(c
->db
,sortby
,vector
[j
].obj
);
5144 if (!byval
|| byval
->type
!= REDIS_STRING
) continue;
5146 vector
[j
].u
.cmpobj
= getDecodedObject(byval
);
5148 if (byval
->encoding
== REDIS_ENCODING_RAW
) {
5149 vector
[j
].u
.score
= strtod(byval
->ptr
,NULL
);
5151 /* Don't need to decode the object if it's
5152 * integer-encoded (the only encoding supported) so
5153 * far. We can just cast it */
5154 if (byval
->encoding
== REDIS_ENCODING_INT
) {
5155 vector
[j
].u
.score
= (long)byval
->ptr
;
5157 redisAssert(1 != 1);
5162 if (vector
[j
].obj
->encoding
== REDIS_ENCODING_RAW
)
5163 vector
[j
].u
.score
= strtod(vector
[j
].obj
->ptr
,NULL
);
5165 if (vector
[j
].obj
->encoding
== REDIS_ENCODING_INT
)
5166 vector
[j
].u
.score
= (long) vector
[j
].obj
->ptr
;
5168 redisAssert(1 != 1);
5175 /* We are ready to sort the vector... perform a bit of sanity check
5176 * on the LIMIT option too. We'll use a partial version of quicksort. */
5177 start
= (limit_start
< 0) ? 0 : limit_start
;
5178 end
= (limit_count
< 0) ? vectorlen
-1 : start
+limit_count
-1;
5179 if (start
>= vectorlen
) {
5180 start
= vectorlen
-1;
5183 if (end
>= vectorlen
) end
= vectorlen
-1;
5185 if (dontsort
== 0) {
5186 server
.sort_desc
= desc
;
5187 server
.sort_alpha
= alpha
;
5188 server
.sort_bypattern
= sortby
? 1 : 0;
5189 if (sortby
&& (start
!= 0 || end
!= vectorlen
-1))
5190 pqsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
, start
,end
);
5192 qsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
);
5195 /* Send command output to the output buffer, performing the specified
5196 * GET/DEL/INCR/DECR operations if any. */
5197 outputlen
= getop
? getop
*(end
-start
+1) : end
-start
+1;
5198 if (storekey
== NULL
) {
5199 /* STORE option not specified, sent the sorting result to client */
5200 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",outputlen
));
5201 for (j
= start
; j
<= end
; j
++) {
5204 addReplyBulkLen(c
,vector
[j
].obj
);
5205 addReply(c
,vector
[j
].obj
);
5206 addReply(c
,shared
.crlf
);
5208 listRewind(operations
);
5209 while((ln
= listYield(operations
))) {
5210 redisSortOperation
*sop
= ln
->value
;
5211 robj
*val
= lookupKeyByPattern(c
->db
,sop
->pattern
,
5214 if (sop
->type
== REDIS_SORT_GET
) {
5215 if (!val
|| val
->type
!= REDIS_STRING
) {
5216 addReply(c
,shared
.nullbulk
);
5218 addReplyBulkLen(c
,val
);
5220 addReply(c
,shared
.crlf
);
5223 redisAssert(sop
->type
== REDIS_SORT_GET
); /* always fails */
5228 robj
*listObject
= createListObject();
5229 list
*listPtr
= (list
*) listObject
->ptr
;
5231 /* STORE option specified, set the sorting result as a List object */
5232 for (j
= start
; j
<= end
; j
++) {
5235 listAddNodeTail(listPtr
,vector
[j
].obj
);
5236 incrRefCount(vector
[j
].obj
);
5238 listRewind(operations
);
5239 while((ln
= listYield(operations
))) {
5240 redisSortOperation
*sop
= ln
->value
;
5241 robj
*val
= lookupKeyByPattern(c
->db
,sop
->pattern
,
5244 if (sop
->type
== REDIS_SORT_GET
) {
5245 if (!val
|| val
->type
!= REDIS_STRING
) {
5246 listAddNodeTail(listPtr
,createStringObject("",0));
5248 listAddNodeTail(listPtr
,val
);
5252 redisAssert(sop
->type
== REDIS_SORT_GET
); /* always fails */
5256 if (dictReplace(c
->db
->dict
,storekey
,listObject
)) {
5257 incrRefCount(storekey
);
5259 /* Note: we add 1 because the DB is dirty anyway since even if the
5260 * SORT result is empty a new key is set and maybe the old content
5262 server
.dirty
+= 1+outputlen
;
5263 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",outputlen
));
5267 decrRefCount(sortval
);
5268 listRelease(operations
);
5269 for (j
= 0; j
< vectorlen
; j
++) {
5270 if (sortby
&& alpha
&& vector
[j
].u
.cmpobj
)
5271 decrRefCount(vector
[j
].u
.cmpobj
);
5276 /* Create the string returned by the INFO command. This is decoupled
5277 * by the INFO command itself as we need to report the same information
5278 * on memory corruption problems. */
5279 static sds
genRedisInfoString(void) {
5281 time_t uptime
= time(NULL
)-server
.stat_starttime
;
5284 info
= sdscatprintf(sdsempty(),
5285 "redis_version:%s\r\n"
5287 "multiplexing_api:%s\r\n"
5288 "uptime_in_seconds:%ld\r\n"
5289 "uptime_in_days:%ld\r\n"
5290 "connected_clients:%d\r\n"
5291 "connected_slaves:%d\r\n"
5292 "blocked_clients:%d\r\n"
5293 "used_memory:%zu\r\n"
5294 "changes_since_last_save:%lld\r\n"
5295 "bgsave_in_progress:%d\r\n"
5296 "last_save_time:%ld\r\n"
5297 "bgrewriteaof_in_progress:%d\r\n"
5298 "total_connections_received:%lld\r\n"
5299 "total_commands_processed:%lld\r\n"
5302 (sizeof(long) == 8) ? "64" : "32",
5306 listLength(server
.clients
)-listLength(server
.slaves
),
5307 listLength(server
.slaves
),
5308 server
.blockedclients
,
5311 server
.bgsavechildpid
!= -1,
5313 server
.bgrewritechildpid
!= -1,
5314 server
.stat_numconnections
,
5315 server
.stat_numcommands
,
5316 server
.masterhost
== NULL
? "master" : "slave"
5318 if (server
.masterhost
) {
5319 info
= sdscatprintf(info
,
5320 "master_host:%s\r\n"
5321 "master_port:%d\r\n"
5322 "master_link_status:%s\r\n"
5323 "master_last_io_seconds_ago:%d\r\n"
5326 (server
.replstate
== REDIS_REPL_CONNECTED
) ?
5328 server
.master
? ((int)(time(NULL
)-server
.master
->lastinteraction
)) : -1
5331 for (j
= 0; j
< server
.dbnum
; j
++) {
5332 long long keys
, vkeys
;
5334 keys
= dictSize(server
.db
[j
].dict
);
5335 vkeys
= dictSize(server
.db
[j
].expires
);
5336 if (keys
|| vkeys
) {
5337 info
= sdscatprintf(info
, "db%d:keys=%lld,expires=%lld\r\n",
5344 static void infoCommand(redisClient
*c
) {
5345 sds info
= genRedisInfoString();
5346 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n",
5347 (unsigned long)sdslen(info
)));
5348 addReplySds(c
,info
);
5349 addReply(c
,shared
.crlf
);
5352 static void monitorCommand(redisClient
*c
) {
5353 /* ignore MONITOR if aleady slave or in monitor mode */
5354 if (c
->flags
& REDIS_SLAVE
) return;
5356 c
->flags
|= (REDIS_SLAVE
|REDIS_MONITOR
);
5358 listAddNodeTail(server
.monitors
,c
);
5359 addReply(c
,shared
.ok
);
5362 /* ================================= Expire ================================= */
5363 static int removeExpire(redisDb
*db
, robj
*key
) {
5364 if (dictDelete(db
->expires
,key
) == DICT_OK
) {
5371 static int setExpire(redisDb
*db
, robj
*key
, time_t when
) {
5372 if (dictAdd(db
->expires
,key
,(void*)when
) == DICT_ERR
) {
5380 /* Return the expire time of the specified key, or -1 if no expire
5381 * is associated with this key (i.e. the key is non volatile) */
5382 static time_t getExpire(redisDb
*db
, robj
*key
) {
5385 /* No expire? return ASAP */
5386 if (dictSize(db
->expires
) == 0 ||
5387 (de
= dictFind(db
->expires
,key
)) == NULL
) return -1;
5389 return (time_t) dictGetEntryVal(de
);
5392 static int expireIfNeeded(redisDb
*db
, robj
*key
) {
5396 /* No expire? return ASAP */
5397 if (dictSize(db
->expires
) == 0 ||
5398 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
5400 /* Lookup the expire */
5401 when
= (time_t) dictGetEntryVal(de
);
5402 if (time(NULL
) <= when
) return 0;
5404 /* Delete the key */
5405 dictDelete(db
->expires
,key
);
5406 return dictDelete(db
->dict
,key
) == DICT_OK
;
5409 static int deleteIfVolatile(redisDb
*db
, robj
*key
) {
5412 /* No expire? return ASAP */
5413 if (dictSize(db
->expires
) == 0 ||
5414 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
5416 /* Delete the key */
5418 dictDelete(db
->expires
,key
);
5419 return dictDelete(db
->dict
,key
) == DICT_OK
;
5422 static void expireGenericCommand(redisClient
*c
, robj
*key
, time_t seconds
) {
5425 de
= dictFind(c
->db
->dict
,key
);
5427 addReply(c
,shared
.czero
);
5431 if (deleteKey(c
->db
,key
)) server
.dirty
++;
5432 addReply(c
, shared
.cone
);
5435 time_t when
= time(NULL
)+seconds
;
5436 if (setExpire(c
->db
,key
,when
)) {
5437 addReply(c
,shared
.cone
);
5440 addReply(c
,shared
.czero
);
5446 static void expireCommand(redisClient
*c
) {
5447 expireGenericCommand(c
,c
->argv
[1],strtol(c
->argv
[2]->ptr
,NULL
,10));
5450 static void expireatCommand(redisClient
*c
) {
5451 expireGenericCommand(c
,c
->argv
[1],strtol(c
->argv
[2]->ptr
,NULL
,10)-time(NULL
));
5454 static void ttlCommand(redisClient
*c
) {
5458 expire
= getExpire(c
->db
,c
->argv
[1]);
5460 ttl
= (int) (expire
-time(NULL
));
5461 if (ttl
< 0) ttl
= -1;
5463 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",ttl
));
5466 /* ================================ MULTI/EXEC ============================== */
5468 /* Client state initialization for MULTI/EXEC */
5469 static void initClientMultiState(redisClient
*c
) {
5470 c
->mstate
.commands
= NULL
;
5471 c
->mstate
.count
= 0;
5474 /* Release all the resources associated with MULTI/EXEC state */
5475 static void freeClientMultiState(redisClient
*c
) {
5478 for (j
= 0; j
< c
->mstate
.count
; j
++) {
5480 multiCmd
*mc
= c
->mstate
.commands
+j
;
5482 for (i
= 0; i
< mc
->argc
; i
++)
5483 decrRefCount(mc
->argv
[i
]);
5486 zfree(c
->mstate
.commands
);
5489 /* Add a new command into the MULTI commands queue */
5490 static void queueMultiCommand(redisClient
*c
, struct redisCommand
*cmd
) {
5494 c
->mstate
.commands
= zrealloc(c
->mstate
.commands
,
5495 sizeof(multiCmd
)*(c
->mstate
.count
+1));
5496 mc
= c
->mstate
.commands
+c
->mstate
.count
;
5499 mc
->argv
= zmalloc(sizeof(robj
*)*c
->argc
);
5500 memcpy(mc
->argv
,c
->argv
,sizeof(robj
*)*c
->argc
);
5501 for (j
= 0; j
< c
->argc
; j
++)
5502 incrRefCount(mc
->argv
[j
]);
5506 static void multiCommand(redisClient
*c
) {
5507 c
->flags
|= REDIS_MULTI
;
5508 addReply(c
,shared
.ok
);
5511 static void execCommand(redisClient
*c
) {
5516 if (!(c
->flags
& REDIS_MULTI
)) {
5517 addReplySds(c
,sdsnew("-ERR EXEC without MULTI\r\n"));
5521 orig_argv
= c
->argv
;
5522 orig_argc
= c
->argc
;
5523 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",c
->mstate
.count
));
5524 for (j
= 0; j
< c
->mstate
.count
; j
++) {
5525 c
->argc
= c
->mstate
.commands
[j
].argc
;
5526 c
->argv
= c
->mstate
.commands
[j
].argv
;
5527 call(c
,c
->mstate
.commands
[j
].cmd
);
5529 c
->argv
= orig_argv
;
5530 c
->argc
= orig_argc
;
5531 freeClientMultiState(c
);
5532 initClientMultiState(c
);
5533 c
->flags
&= (~REDIS_MULTI
);
5536 /* =========================== Blocking Operations ========================= */
5538 /* Currently Redis blocking operations support is limited to list POP ops,
5539 * so the current implementation is not fully generic, but it is also not
5540 * completely specific so it will not require a rewrite to support new
5541 * kind of blocking operations in the future.
5543 * Still it's important to note that list blocking operations can be already
5544 * used as a notification mechanism in order to implement other blocking
5545 * operations at application level, so there must be a very strong evidence
5546 * of usefulness and generality before new blocking operations are implemented.
5548 * This is how the current blocking POP works, we use BLPOP as example:
5549 * - If the user calls BLPOP and the key exists and contains a non empty list
5550 * then LPOP is called instead. So BLPOP is semantically the same as LPOP
5551 * if there is not to block.
5552 * - If instead BLPOP is called and the key does not exists or the list is
5553 * empty we need to block. In order to do so we remove the notification for
5554 * new data to read in the client socket (so that we'll not serve new
5555 * requests if the blocking request is not served). Also we put the client
5556 * in a dictionary (db->blockingkeys) mapping keys to a list of clients
5557 * blocking for this keys.
5558 * - If a PUSH operation against a key with blocked clients waiting is
5559 * performed, we serve the first in the list: basically instead to push
5560 * the new element inside the list we return it to the (first / oldest)
5561 * blocking client, unblock the client, and remove it form the list.
5563 * The above comment and the source code should be enough in order to understand
5564 * the implementation and modify / fix it later.
5567 /* Set a client in blocking mode for the specified key, with the specified
5569 static void blockForKeys(redisClient
*c
, robj
**keys
, int numkeys
, time_t timeout
) {
5574 c
->blockingkeys
= zmalloc(sizeof(robj
*)*numkeys
);
5575 c
->blockingkeysnum
= numkeys
;
5576 c
->blockingto
= timeout
;
5577 for (j
= 0; j
< numkeys
; j
++) {
5578 /* Add the key in the client structure, to map clients -> keys */
5579 c
->blockingkeys
[j
] = keys
[j
];
5580 incrRefCount(keys
[j
]);
5582 /* And in the other "side", to map keys -> clients */
5583 de
= dictFind(c
->db
->blockingkeys
,keys
[j
]);
5587 /* For every key we take a list of clients blocked for it */
5589 retval
= dictAdd(c
->db
->blockingkeys
,keys
[j
],l
);
5590 incrRefCount(keys
[j
]);
5591 assert(retval
== DICT_OK
);
5593 l
= dictGetEntryVal(de
);
5595 listAddNodeTail(l
,c
);
5597 /* Mark the client as a blocked client */
5598 c
->flags
|= REDIS_BLOCKED
;
5599 aeDeleteFileEvent(server
.el
,c
->fd
,AE_READABLE
);
5600 server
.blockedclients
++;
5603 /* Unblock a client that's waiting in a blocking operation such as BLPOP */
5604 static void unblockClient(redisClient
*c
) {
5609 assert(c
->blockingkeys
!= NULL
);
5610 /* The client may wait for multiple keys, so unblock it for every key. */
5611 for (j
= 0; j
< c
->blockingkeysnum
; j
++) {
5612 /* Remove this client from the list of clients waiting for this key. */
5613 de
= dictFind(c
->db
->blockingkeys
,c
->blockingkeys
[j
]);
5615 l
= dictGetEntryVal(de
);
5616 listDelNode(l
,listSearchKey(l
,c
));
5617 /* If the list is empty we need to remove it to avoid wasting memory */
5618 if (listLength(l
) == 0)
5619 dictDelete(c
->db
->blockingkeys
,c
->blockingkeys
[j
]);
5620 decrRefCount(c
->blockingkeys
[j
]);
5622 /* Cleanup the client structure */
5623 zfree(c
->blockingkeys
);
5624 c
->blockingkeys
= NULL
;
5625 c
->flags
&= (~REDIS_BLOCKED
);
5626 server
.blockedclients
--;
5627 /* Ok now we are ready to get read events from socket, note that we
5628 * can't trap errors here as it's possible that unblockClients() is
5629 * called from freeClient() itself, and the only thing we can do
5630 * if we failed to register the READABLE event is to kill the client.
5631 * Still the following function should never fail in the real world as
5632 * we are sure the file descriptor is sane, and we exit on out of mem. */
5633 aeCreateFileEvent(server
.el
, c
->fd
, AE_READABLE
, readQueryFromClient
, c
);
5634 /* As a final step we want to process data if there is some command waiting
5635 * in the input buffer. Note that this is safe even if unblockClient()
5636 * gets called from freeClient() because freeClient() will be smart
5637 * enough to call this function *after* c->querybuf was set to NULL. */
5638 if (c
->querybuf
&& sdslen(c
->querybuf
) > 0) processInputBuffer(c
);
5641 /* This should be called from any function PUSHing into lists.
5642 * 'c' is the "pushing client", 'key' is the key it is pushing data against,
5643 * 'ele' is the element pushed.
5645 * If the function returns 0 there was no client waiting for a list push
5648 * If the function returns 1 there was a client waiting for a list push
5649 * against this key, the element was passed to this client thus it's not
5650 * needed to actually add it to the list and the caller should return asap. */
5651 static int handleClientsWaitingListPush(redisClient
*c
, robj
*key
, robj
*ele
) {
5652 struct dictEntry
*de
;
5653 redisClient
*receiver
;
5657 de
= dictFind(c
->db
->blockingkeys
,key
);
5658 if (de
== NULL
) return 0;
5659 l
= dictGetEntryVal(de
);
5662 receiver
= ln
->value
;
5664 addReplySds(receiver
,sdsnew("*2\r\n"));
5665 addReplyBulkLen(receiver
,key
);
5666 addReply(receiver
,key
);
5667 addReply(receiver
,shared
.crlf
);
5668 addReplyBulkLen(receiver
,ele
);
5669 addReply(receiver
,ele
);
5670 addReply(receiver
,shared
.crlf
);
5671 unblockClient(receiver
);
5675 /* Blocking RPOP/LPOP */
5676 static void blockingPopGenericCommand(redisClient
*c
, int where
) {
5681 for (j
= 1; j
< c
->argc
-1; j
++) {
5682 o
= lookupKeyWrite(c
->db
,c
->argv
[j
]);
5684 if (o
->type
!= REDIS_LIST
) {
5685 addReply(c
,shared
.wrongtypeerr
);
5688 list
*list
= o
->ptr
;
5689 if (listLength(list
) != 0) {
5690 /* If the list contains elements fall back to the usual
5691 * non-blocking POP operation */
5692 robj
*argv
[2], **orig_argv
;
5695 /* We need to alter the command arguments before to call
5696 * popGenericCommand() as the command takes a single key. */
5697 orig_argv
= c
->argv
;
5698 orig_argc
= c
->argc
;
5699 argv
[1] = c
->argv
[j
];
5703 /* Also the return value is different, we need to output
5704 * the multi bulk reply header and the key name. The
5705 * "real" command will add the last element (the value)
5706 * for us. If this souds like an hack to you it's just
5707 * because it is... */
5708 addReplySds(c
,sdsnew("*2\r\n"));
5709 addReplyBulkLen(c
,argv
[1]);
5710 addReply(c
,argv
[1]);
5711 addReply(c
,shared
.crlf
);
5712 popGenericCommand(c
,where
);
5714 /* Fix the client structure with the original stuff */
5715 c
->argv
= orig_argv
;
5716 c
->argc
= orig_argc
;
5722 /* If the list is empty or the key does not exists we must block */
5723 timeout
= strtol(c
->argv
[c
->argc
-1]->ptr
,NULL
,10);
5724 if (timeout
> 0) timeout
+= time(NULL
);
5725 blockForKeys(c
,c
->argv
+1,c
->argc
-2,timeout
);
5728 static void blpopCommand(redisClient
*c
) {
5729 blockingPopGenericCommand(c
,REDIS_HEAD
);
5732 static void brpopCommand(redisClient
*c
) {
5733 blockingPopGenericCommand(c
,REDIS_TAIL
);
5736 /* =============================== Replication ============================= */
5738 static int syncWrite(int fd
, char *ptr
, ssize_t size
, int timeout
) {
5739 ssize_t nwritten
, ret
= size
;
5740 time_t start
= time(NULL
);
5744 if (aeWait(fd
,AE_WRITABLE
,1000) & AE_WRITABLE
) {
5745 nwritten
= write(fd
,ptr
,size
);
5746 if (nwritten
== -1) return -1;
5750 if ((time(NULL
)-start
) > timeout
) {
5758 static int syncRead(int fd
, char *ptr
, ssize_t size
, int timeout
) {
5759 ssize_t nread
, totread
= 0;
5760 time_t start
= time(NULL
);
5764 if (aeWait(fd
,AE_READABLE
,1000) & AE_READABLE
) {
5765 nread
= read(fd
,ptr
,size
);
5766 if (nread
== -1) return -1;
5771 if ((time(NULL
)-start
) > timeout
) {
5779 static int syncReadLine(int fd
, char *ptr
, ssize_t size
, int timeout
) {
5786 if (syncRead(fd
,&c
,1,timeout
) == -1) return -1;
5789 if (nread
&& *(ptr
-1) == '\r') *(ptr
-1) = '\0';
5800 static void syncCommand(redisClient
*c
) {
5801 /* ignore SYNC if aleady slave or in monitor mode */
5802 if (c
->flags
& REDIS_SLAVE
) return;
5804 /* SYNC can't be issued when the server has pending data to send to
5805 * the client about already issued commands. We need a fresh reply
5806 * buffer registering the differences between the BGSAVE and the current
5807 * dataset, so that we can copy to other slaves if needed. */
5808 if (listLength(c
->reply
) != 0) {
5809 addReplySds(c
,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
5813 redisLog(REDIS_NOTICE
,"Slave ask for synchronization");
5814 /* Here we need to check if there is a background saving operation
5815 * in progress, or if it is required to start one */
5816 if (server
.bgsavechildpid
!= -1) {
5817 /* Ok a background save is in progress. Let's check if it is a good
5818 * one for replication, i.e. if there is another slave that is
5819 * registering differences since the server forked to save */
5823 listRewind(server
.slaves
);
5824 while((ln
= listYield(server
.slaves
))) {
5826 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) break;
5829 /* Perfect, the server is already registering differences for
5830 * another slave. Set the right state, and copy the buffer. */
5831 listRelease(c
->reply
);
5832 c
->reply
= listDup(slave
->reply
);
5833 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
5834 redisLog(REDIS_NOTICE
,"Waiting for end of BGSAVE for SYNC");
5836 /* No way, we need to wait for the next BGSAVE in order to
5837 * register differences */
5838 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_START
;
5839 redisLog(REDIS_NOTICE
,"Waiting for next BGSAVE for SYNC");
5842 /* Ok we don't have a BGSAVE in progress, let's start one */
5843 redisLog(REDIS_NOTICE
,"Starting BGSAVE for SYNC");
5844 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
5845 redisLog(REDIS_NOTICE
,"Replication failed, can't BGSAVE");
5846 addReplySds(c
,sdsnew("-ERR Unalbe to perform background save\r\n"));
5849 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
5852 c
->flags
|= REDIS_SLAVE
;
5854 listAddNodeTail(server
.slaves
,c
);
5858 static void sendBulkToSlave(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
5859 redisClient
*slave
= privdata
;
5861 REDIS_NOTUSED(mask
);
5862 char buf
[REDIS_IOBUF_LEN
];
5863 ssize_t nwritten
, buflen
;
5865 if (slave
->repldboff
== 0) {
5866 /* Write the bulk write count before to transfer the DB. In theory here
5867 * we don't know how much room there is in the output buffer of the
5868 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
5869 * operations) will never be smaller than the few bytes we need. */
5872 bulkcount
= sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
5874 if (write(fd
,bulkcount
,sdslen(bulkcount
)) != (signed)sdslen(bulkcount
))
5882 lseek(slave
->repldbfd
,slave
->repldboff
,SEEK_SET
);
5883 buflen
= read(slave
->repldbfd
,buf
,REDIS_IOBUF_LEN
);
5885 redisLog(REDIS_WARNING
,"Read error sending DB to slave: %s",
5886 (buflen
== 0) ? "premature EOF" : strerror(errno
));
5890 if ((nwritten
= write(fd
,buf
,buflen
)) == -1) {
5891 redisLog(REDIS_DEBUG
,"Write error sending DB to slave: %s",
5896 slave
->repldboff
+= nwritten
;
5897 if (slave
->repldboff
== slave
->repldbsize
) {
5898 close(slave
->repldbfd
);
5899 slave
->repldbfd
= -1;
5900 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
5901 slave
->replstate
= REDIS_REPL_ONLINE
;
5902 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
,
5903 sendReplyToClient
, slave
) == AE_ERR
) {
5907 addReplySds(slave
,sdsempty());
5908 redisLog(REDIS_NOTICE
,"Synchronization with slave succeeded");
5912 /* This function is called at the end of every backgrond saving.
5913 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
5914 * otherwise REDIS_ERR is passed to the function.
5916 * The goal of this function is to handle slaves waiting for a successful
5917 * background saving in order to perform non-blocking synchronization. */
5918 static void updateSlavesWaitingBgsave(int bgsaveerr
) {
5920 int startbgsave
= 0;
5922 listRewind(server
.slaves
);
5923 while((ln
= listYield(server
.slaves
))) {
5924 redisClient
*slave
= ln
->value
;
5926 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) {
5928 slave
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
5929 } else if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) {
5930 struct redis_stat buf
;
5932 if (bgsaveerr
!= REDIS_OK
) {
5934 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE child returned an error");
5937 if ((slave
->repldbfd
= open(server
.dbfilename
,O_RDONLY
)) == -1 ||
5938 redis_fstat(slave
->repldbfd
,&buf
) == -1) {
5940 redisLog(REDIS_WARNING
,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno
));
5943 slave
->repldboff
= 0;
5944 slave
->repldbsize
= buf
.st_size
;
5945 slave
->replstate
= REDIS_REPL_SEND_BULK
;
5946 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
5947 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
, sendBulkToSlave
, slave
) == AE_ERR
) {
5954 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
5955 listRewind(server
.slaves
);
5956 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE failed");
5957 while((ln
= listYield(server
.slaves
))) {
5958 redisClient
*slave
= ln
->value
;
5960 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
)
5967 static int syncWithMaster(void) {
5968 char buf
[1024], tmpfile
[256], authcmd
[1024];
5970 int fd
= anetTcpConnect(NULL
,server
.masterhost
,server
.masterport
);
5974 redisLog(REDIS_WARNING
,"Unable to connect to MASTER: %s",
5979 /* AUTH with the master if required. */
5980 if(server
.masterauth
) {
5981 snprintf(authcmd
, 1024, "AUTH %s\r\n", server
.masterauth
);
5982 if (syncWrite(fd
, authcmd
, strlen(server
.masterauth
)+7, 5) == -1) {
5984 redisLog(REDIS_WARNING
,"Unable to AUTH to MASTER: %s",
5988 /* Read the AUTH result. */
5989 if (syncReadLine(fd
,buf
,1024,3600) == -1) {
5991 redisLog(REDIS_WARNING
,"I/O error reading auth result from MASTER: %s",
5995 if (buf
[0] != '+') {
5997 redisLog(REDIS_WARNING
,"Cannot AUTH to MASTER, is the masterauth password correct?");
6002 /* Issue the SYNC command */
6003 if (syncWrite(fd
,"SYNC \r\n",7,5) == -1) {
6005 redisLog(REDIS_WARNING
,"I/O error writing to MASTER: %s",
6009 /* Read the bulk write count */
6010 if (syncReadLine(fd
,buf
,1024,3600) == -1) {
6012 redisLog(REDIS_WARNING
,"I/O error reading bulk count from MASTER: %s",
6016 if (buf
[0] != '$') {
6018 redisLog(REDIS_WARNING
,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
6021 dumpsize
= atoi(buf
+1);
6022 redisLog(REDIS_NOTICE
,"Receiving %d bytes data dump from MASTER",dumpsize
);
6023 /* Read the bulk write data on a temp file */
6024 snprintf(tmpfile
,256,"temp-%d.%ld.rdb",(int)time(NULL
),(long int)random());
6025 dfd
= open(tmpfile
,O_CREAT
|O_WRONLY
,0644);
6028 redisLog(REDIS_WARNING
,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno
));
6032 int nread
, nwritten
;
6034 nread
= read(fd
,buf
,(dumpsize
< 1024)?dumpsize
:1024);
6036 redisLog(REDIS_WARNING
,"I/O error trying to sync with MASTER: %s",
6042 nwritten
= write(dfd
,buf
,nread
);
6043 if (nwritten
== -1) {
6044 redisLog(REDIS_WARNING
,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno
));
6052 if (rename(tmpfile
,server
.dbfilename
) == -1) {
6053 redisLog(REDIS_WARNING
,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno
));
6059 if (rdbLoad(server
.dbfilename
) != REDIS_OK
) {
6060 redisLog(REDIS_WARNING
,"Failed trying to load the MASTER synchronization DB from disk");
6064 server
.master
= createClient(fd
);
6065 server
.master
->flags
|= REDIS_MASTER
;
6066 server
.master
->authenticated
= 1;
6067 server
.replstate
= REDIS_REPL_CONNECTED
;
6071 static void slaveofCommand(redisClient
*c
) {
6072 if (!strcasecmp(c
->argv
[1]->ptr
,"no") &&
6073 !strcasecmp(c
->argv
[2]->ptr
,"one")) {
6074 if (server
.masterhost
) {
6075 sdsfree(server
.masterhost
);
6076 server
.masterhost
= NULL
;
6077 if (server
.master
) freeClient(server
.master
);
6078 server
.replstate
= REDIS_REPL_NONE
;
6079 redisLog(REDIS_NOTICE
,"MASTER MODE enabled (user request)");
6082 sdsfree(server
.masterhost
);
6083 server
.masterhost
= sdsdup(c
->argv
[1]->ptr
);
6084 server
.masterport
= atoi(c
->argv
[2]->ptr
);
6085 if (server
.master
) freeClient(server
.master
);
6086 server
.replstate
= REDIS_REPL_CONNECT
;
6087 redisLog(REDIS_NOTICE
,"SLAVE OF %s:%d enabled (user request)",
6088 server
.masterhost
, server
.masterport
);
6090 addReply(c
,shared
.ok
);
6093 /* ============================ Maxmemory directive ======================== */
6095 /* This function gets called when 'maxmemory' is set on the config file to limit
6096 * the max memory used by the server, and we are out of memory.
6097 * This function will try to, in order:
6099 * - Free objects from the free list
6100 * - Try to remove keys with an EXPIRE set
6102 * It is not possible to free enough memory to reach used-memory < maxmemory
6103 * the server will start refusing commands that will enlarge even more the
6106 static void freeMemoryIfNeeded(void) {
6107 while (server
.maxmemory
&& zmalloc_used_memory() > server
.maxmemory
) {
6108 if (listLength(server
.objfreelist
)) {
6111 listNode
*head
= listFirst(server
.objfreelist
);
6112 o
= listNodeValue(head
);
6113 listDelNode(server
.objfreelist
,head
);
6116 int j
, k
, freed
= 0;
6118 for (j
= 0; j
< server
.dbnum
; j
++) {
6120 robj
*minkey
= NULL
;
6121 struct dictEntry
*de
;
6123 if (dictSize(server
.db
[j
].expires
)) {
6125 /* From a sample of three keys drop the one nearest to
6126 * the natural expire */
6127 for (k
= 0; k
< 3; k
++) {
6130 de
= dictGetRandomKey(server
.db
[j
].expires
);
6131 t
= (time_t) dictGetEntryVal(de
);
6132 if (minttl
== -1 || t
< minttl
) {
6133 minkey
= dictGetEntryKey(de
);
6137 deleteKey(server
.db
+j
,minkey
);
6140 if (!freed
) return; /* nothing to free... */
6145 /* ============================== Append Only file ========================== */
6147 static void feedAppendOnlyFile(struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
) {
6148 sds buf
= sdsempty();
6154 /* The DB this command was targetting is not the same as the last command
6155 * we appendend. To issue a SELECT command is needed. */
6156 if (dictid
!= server
.appendseldb
) {
6159 snprintf(seldb
,sizeof(seldb
),"%d",dictid
);
6160 buf
= sdscatprintf(buf
,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
6161 (unsigned long)strlen(seldb
),seldb
);
6162 server
.appendseldb
= dictid
;
6165 /* "Fix" the argv vector if the command is EXPIRE. We want to translate
6166 * EXPIREs into EXPIREATs calls */
6167 if (cmd
->proc
== expireCommand
) {
6170 tmpargv
[0] = createStringObject("EXPIREAT",8);
6171 tmpargv
[1] = argv
[1];
6172 incrRefCount(argv
[1]);
6173 when
= time(NULL
)+strtol(argv
[2]->ptr
,NULL
,10);
6174 tmpargv
[2] = createObject(REDIS_STRING
,
6175 sdscatprintf(sdsempty(),"%ld",when
));
6179 /* Append the actual command */
6180 buf
= sdscatprintf(buf
,"*%d\r\n",argc
);
6181 for (j
= 0; j
< argc
; j
++) {
6184 o
= getDecodedObject(o
);
6185 buf
= sdscatprintf(buf
,"$%lu\r\n",(unsigned long)sdslen(o
->ptr
));
6186 buf
= sdscatlen(buf
,o
->ptr
,sdslen(o
->ptr
));
6187 buf
= sdscatlen(buf
,"\r\n",2);
6191 /* Free the objects from the modified argv for EXPIREAT */
6192 if (cmd
->proc
== expireCommand
) {
6193 for (j
= 0; j
< 3; j
++)
6194 decrRefCount(argv
[j
]);
6197 /* We want to perform a single write. This should be guaranteed atomic
6198 * at least if the filesystem we are writing is a real physical one.
6199 * While this will save us against the server being killed I don't think
6200 * there is much to do about the whole server stopping for power problems
6202 nwritten
= write(server
.appendfd
,buf
,sdslen(buf
));
6203 if (nwritten
!= (signed)sdslen(buf
)) {
6204 /* Ooops, we are in troubles. The best thing to do for now is
6205 * to simply exit instead to give the illusion that everything is
6206 * working as expected. */
6207 if (nwritten
== -1) {
6208 redisLog(REDIS_WARNING
,"Exiting on error writing to the append-only file: %s",strerror(errno
));
6210 redisLog(REDIS_WARNING
,"Exiting on short write while writing to the append-only file: %s",strerror(errno
));
6214 /* If a background append only file rewriting is in progress we want to
6215 * accumulate the differences between the child DB and the current one
6216 * in a buffer, so that when the child process will do its work we
6217 * can append the differences to the new append only file. */
6218 if (server
.bgrewritechildpid
!= -1)
6219 server
.bgrewritebuf
= sdscatlen(server
.bgrewritebuf
,buf
,sdslen(buf
));
6223 if (server
.appendfsync
== APPENDFSYNC_ALWAYS
||
6224 (server
.appendfsync
== APPENDFSYNC_EVERYSEC
&&
6225 now
-server
.lastfsync
> 1))
6227 fsync(server
.appendfd
); /* Let's try to get this data on the disk */
6228 server
.lastfsync
= now
;
6232 /* In Redis commands are always executed in the context of a client, so in
6233 * order to load the append only file we need to create a fake client. */
6234 static struct redisClient
*createFakeClient(void) {
6235 struct redisClient
*c
= zmalloc(sizeof(*c
));
6239 c
->querybuf
= sdsempty();
6243 /* We set the fake client as a slave waiting for the synchronization
6244 * so that Redis will not try to send replies to this client. */
6245 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_START
;
6246 c
->reply
= listCreate();
6247 listSetFreeMethod(c
->reply
,decrRefCount
);
6248 listSetDupMethod(c
->reply
,dupClientReplyValue
);
6252 static void freeFakeClient(struct redisClient
*c
) {
6253 sdsfree(c
->querybuf
);
6254 listRelease(c
->reply
);
6258 /* Replay the append log file. On error REDIS_OK is returned. On non fatal
6259 * error (the append only file is zero-length) REDIS_ERR is returned. On
6260 * fatal error an error message is logged and the program exists. */
6261 int loadAppendOnlyFile(char *filename
) {
6262 struct redisClient
*fakeClient
;
6263 FILE *fp
= fopen(filename
,"r");
6264 struct redis_stat sb
;
6266 if (redis_fstat(fileno(fp
),&sb
) != -1 && sb
.st_size
== 0)
6270 redisLog(REDIS_WARNING
,"Fatal error: can't open the append log file for reading: %s",strerror(errno
));
6274 fakeClient
= createFakeClient();
6281 struct redisCommand
*cmd
;
6283 if (fgets(buf
,sizeof(buf
),fp
) == NULL
) {
6289 if (buf
[0] != '*') goto fmterr
;
6291 argv
= zmalloc(sizeof(robj
*)*argc
);
6292 for (j
= 0; j
< argc
; j
++) {
6293 if (fgets(buf
,sizeof(buf
),fp
) == NULL
) goto readerr
;
6294 if (buf
[0] != '$') goto fmterr
;
6295 len
= strtol(buf
+1,NULL
,10);
6296 argsds
= sdsnewlen(NULL
,len
);
6297 if (len
&& fread(argsds
,len
,1,fp
) == 0) goto fmterr
;
6298 argv
[j
] = createObject(REDIS_STRING
,argsds
);
6299 if (fread(buf
,2,1,fp
) == 0) goto fmterr
; /* discard CRLF */
6302 /* Command lookup */
6303 cmd
= lookupCommand(argv
[0]->ptr
);
6305 redisLog(REDIS_WARNING
,"Unknown command '%s' reading the append only file", argv
[0]->ptr
);
6308 /* Try object sharing and encoding */
6309 if (server
.shareobjects
) {
6311 for(j
= 1; j
< argc
; j
++)
6312 argv
[j
] = tryObjectSharing(argv
[j
]);
6314 if (cmd
->flags
& REDIS_CMD_BULK
)
6315 tryObjectEncoding(argv
[argc
-1]);
6316 /* Run the command in the context of a fake client */
6317 fakeClient
->argc
= argc
;
6318 fakeClient
->argv
= argv
;
6319 cmd
->proc(fakeClient
);
6320 /* Discard the reply objects list from the fake client */
6321 while(listLength(fakeClient
->reply
))
6322 listDelNode(fakeClient
->reply
,listFirst(fakeClient
->reply
));
6323 /* Clean up, ready for the next command */
6324 for (j
= 0; j
< argc
; j
++) decrRefCount(argv
[j
]);
6328 freeFakeClient(fakeClient
);
6333 redisLog(REDIS_WARNING
,"Unexpected end of file reading the append only file");
6335 redisLog(REDIS_WARNING
,"Unrecoverable error reading the append only file: %s", strerror(errno
));
6339 redisLog(REDIS_WARNING
,"Bad file format reading the append only file");
6343 /* Write an object into a file in the bulk format $<count>\r\n<payload>\r\n */
6344 static int fwriteBulk(FILE *fp
, robj
*obj
) {
6346 obj
= getDecodedObject(obj
);
6347 snprintf(buf
,sizeof(buf
),"$%ld\r\n",(long)sdslen(obj
->ptr
));
6348 if (fwrite(buf
,strlen(buf
),1,fp
) == 0) goto err
;
6349 if (sdslen(obj
->ptr
) && fwrite(obj
->ptr
,sdslen(obj
->ptr
),1,fp
) == 0)
6351 if (fwrite("\r\n",2,1,fp
) == 0) goto err
;
6359 /* Write a double value in bulk format $<count>\r\n<payload>\r\n */
6360 static int fwriteBulkDouble(FILE *fp
, double d
) {
6361 char buf
[128], dbuf
[128];
6363 snprintf(dbuf
,sizeof(dbuf
),"%.17g\r\n",d
);
6364 snprintf(buf
,sizeof(buf
),"$%lu\r\n",(unsigned long)strlen(dbuf
)-2);
6365 if (fwrite(buf
,strlen(buf
),1,fp
) == 0) return 0;
6366 if (fwrite(dbuf
,strlen(dbuf
),1,fp
) == 0) return 0;
6370 /* Write a long value in bulk format $<count>\r\n<payload>\r\n */
6371 static int fwriteBulkLong(FILE *fp
, long l
) {
6372 char buf
[128], lbuf
[128];
6374 snprintf(lbuf
,sizeof(lbuf
),"%ld\r\n",l
);
6375 snprintf(buf
,sizeof(buf
),"$%lu\r\n",(unsigned long)strlen(lbuf
)-2);
6376 if (fwrite(buf
,strlen(buf
),1,fp
) == 0) return 0;
6377 if (fwrite(lbuf
,strlen(lbuf
),1,fp
) == 0) return 0;
6381 /* Write a sequence of commands able to fully rebuild the dataset into
6382 * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */
6383 static int rewriteAppendOnlyFile(char *filename
) {
6384 dictIterator
*di
= NULL
;
6389 time_t now
= time(NULL
);
6391 /* Note that we have to use a different temp name here compared to the
6392 * one used by rewriteAppendOnlyFileBackground() function. */
6393 snprintf(tmpfile
,256,"temp-rewriteaof-%d.aof", (int) getpid());
6394 fp
= fopen(tmpfile
,"w");
6396 redisLog(REDIS_WARNING
, "Failed rewriting the append only file: %s", strerror(errno
));
6399 for (j
= 0; j
< server
.dbnum
; j
++) {
6400 char selectcmd
[] = "*2\r\n$6\r\nSELECT\r\n";
6401 redisDb
*db
= server
.db
+j
;
6403 if (dictSize(d
) == 0) continue;
6404 di
= dictGetIterator(d
);
6410 /* SELECT the new DB */
6411 if (fwrite(selectcmd
,sizeof(selectcmd
)-1,1,fp
) == 0) goto werr
;
6412 if (fwriteBulkLong(fp
,j
) == 0) goto werr
;
6414 /* Iterate this DB writing every entry */
6415 while((de
= dictNext(di
)) != NULL
) {
6416 robj
*key
= dictGetEntryKey(de
);
6417 robj
*o
= dictGetEntryVal(de
);
6418 time_t expiretime
= getExpire(db
,key
);
6420 /* Save the key and associated value */
6421 if (o
->type
== REDIS_STRING
) {
6422 /* Emit a SET command */
6423 char cmd
[]="*3\r\n$3\r\nSET\r\n";
6424 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6426 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6427 if (fwriteBulk(fp
,o
) == 0) goto werr
;
6428 } else if (o
->type
== REDIS_LIST
) {
6429 /* Emit the RPUSHes needed to rebuild the list */
6430 list
*list
= o
->ptr
;
6434 while((ln
= listYield(list
))) {
6435 char cmd
[]="*3\r\n$5\r\nRPUSH\r\n";
6436 robj
*eleobj
= listNodeValue(ln
);
6438 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6439 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6440 if (fwriteBulk(fp
,eleobj
) == 0) goto werr
;
6442 } else if (o
->type
== REDIS_SET
) {
6443 /* Emit the SADDs needed to rebuild the set */
6445 dictIterator
*di
= dictGetIterator(set
);
6448 while((de
= dictNext(di
)) != NULL
) {
6449 char cmd
[]="*3\r\n$4\r\nSADD\r\n";
6450 robj
*eleobj
= dictGetEntryKey(de
);
6452 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6453 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6454 if (fwriteBulk(fp
,eleobj
) == 0) goto werr
;
6456 dictReleaseIterator(di
);
6457 } else if (o
->type
== REDIS_ZSET
) {
6458 /* Emit the ZADDs needed to rebuild the sorted set */
6460 dictIterator
*di
= dictGetIterator(zs
->dict
);
6463 while((de
= dictNext(di
)) != NULL
) {
6464 char cmd
[]="*4\r\n$4\r\nZADD\r\n";
6465 robj
*eleobj
= dictGetEntryKey(de
);
6466 double *score
= dictGetEntryVal(de
);
6468 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6469 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6470 if (fwriteBulkDouble(fp
,*score
) == 0) goto werr
;
6471 if (fwriteBulk(fp
,eleobj
) == 0) goto werr
;
6473 dictReleaseIterator(di
);
6475 redisAssert(0 != 0);
6477 /* Save the expire time */
6478 if (expiretime
!= -1) {
6479 char cmd
[]="*3\r\n$8\r\nEXPIREAT\r\n";
6480 /* If this key is already expired skip it */
6481 if (expiretime
< now
) continue;
6482 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6483 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6484 if (fwriteBulkLong(fp
,expiretime
) == 0) goto werr
;
6487 dictReleaseIterator(di
);
6490 /* Make sure data will not remain on the OS's output buffers */
6495 /* Use RENAME to make sure the DB file is changed atomically only
6496 * if the generate DB file is ok. */
6497 if (rename(tmpfile
,filename
) == -1) {
6498 redisLog(REDIS_WARNING
,"Error moving temp append only file on the final destination: %s", strerror(errno
));
6502 redisLog(REDIS_NOTICE
,"SYNC append only file rewrite performed");
6508 redisLog(REDIS_WARNING
,"Write error writing append only file on disk: %s", strerror(errno
));
6509 if (di
) dictReleaseIterator(di
);
6513 /* This is how rewriting of the append only file in background works:
6515 * 1) The user calls BGREWRITEAOF
6516 * 2) Redis calls this function, that forks():
6517 * 2a) the child rewrite the append only file in a temp file.
6518 * 2b) the parent accumulates differences in server.bgrewritebuf.
6519 * 3) When the child finished '2a' exists.
6520 * 4) The parent will trap the exit code, if it's OK, will append the
6521 * data accumulated into server.bgrewritebuf into the temp file, and
6522 * finally will rename(2) the temp file in the actual file name.
6523 * The the new file is reopened as the new append only file. Profit!
6525 static int rewriteAppendOnlyFileBackground(void) {
6528 if (server
.bgrewritechildpid
!= -1) return REDIS_ERR
;
6529 if ((childpid
= fork()) == 0) {
6534 snprintf(tmpfile
,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
6535 if (rewriteAppendOnlyFile(tmpfile
) == REDIS_OK
) {
6542 if (childpid
== -1) {
6543 redisLog(REDIS_WARNING
,
6544 "Can't rewrite append only file in background: fork: %s",
6548 redisLog(REDIS_NOTICE
,
6549 "Background append only file rewriting started by pid %d",childpid
);
6550 server
.bgrewritechildpid
= childpid
;
6551 /* We set appendseldb to -1 in order to force the next call to the
6552 * feedAppendOnlyFile() to issue a SELECT command, so the differences
6553 * accumulated by the parent into server.bgrewritebuf will start
6554 * with a SELECT statement and it will be safe to merge. */
6555 server
.appendseldb
= -1;
6558 return REDIS_OK
; /* unreached */
6561 static void bgrewriteaofCommand(redisClient
*c
) {
6562 if (server
.bgrewritechildpid
!= -1) {
6563 addReplySds(c
,sdsnew("-ERR background append only file rewriting already in progress\r\n"));
6566 if (rewriteAppendOnlyFileBackground() == REDIS_OK
) {
6567 char *status
= "+Background append only file rewriting started\r\n";
6568 addReplySds(c
,sdsnew(status
));
6570 addReply(c
,shared
.err
);
6574 static void aofRemoveTempFile(pid_t childpid
) {
6577 snprintf(tmpfile
,256,"temp-rewriteaof-bg-%d.aof", (int) childpid
);
6581 /* =============================== Virtual Memory =========================== */
6582 static void vmInit(void) {
6585 server
.vm_fp
= fopen("/tmp/redisvm","w+b");
6586 if (server
.vm_fp
== NULL
) {
6587 redisLog(REDIS_WARNING
,"Impossible to open the swap file. Exiting.");
6590 server
.vm_fd
= fileno(server
.vm_fp
);
6591 server
.vm_next_page
= 0;
6592 server
.vm_near_pages
= 0;
6593 totsize
= server
.vm_pages
*server
.vm_page_size
;
6594 redisLog(REDIS_NOTICE
,"Allocating %lld bytes of swap file",totsize
);
6595 if (ftruncate(server
.vm_fd
,totsize
) == -1) {
6596 redisLog(REDIS_WARNING
,"Can't ftruncate swap file: %s. Exiting.",
6600 redisLog(REDIS_NOTICE
,"Swap file allocated with success");
6602 /* Try to remove the swap file, so the OS will really delete it from the
6603 * file system when Redis exists. */
6604 unlink("/tmp/redisvm");
6607 /* ================================= Debugging ============================== */
6609 static void debugCommand(redisClient
*c
) {
6610 if (!strcasecmp(c
->argv
[1]->ptr
,"segfault")) {
6612 } else if (!strcasecmp(c
->argv
[1]->ptr
,"reload")) {
6613 if (rdbSave(server
.dbfilename
) != REDIS_OK
) {
6614 addReply(c
,shared
.err
);
6618 if (rdbLoad(server
.dbfilename
) != REDIS_OK
) {
6619 addReply(c
,shared
.err
);
6622 redisLog(REDIS_WARNING
,"DB reloaded by DEBUG RELOAD");
6623 addReply(c
,shared
.ok
);
6624 } else if (!strcasecmp(c
->argv
[1]->ptr
,"loadaof")) {
6626 if (loadAppendOnlyFile(server
.appendfilename
) != REDIS_OK
) {
6627 addReply(c
,shared
.err
);
6630 redisLog(REDIS_WARNING
,"Append Only File loaded by DEBUG LOADAOF");
6631 addReply(c
,shared
.ok
);
6632 } else if (!strcasecmp(c
->argv
[1]->ptr
,"object") && c
->argc
== 3) {
6633 dictEntry
*de
= dictFind(c
->db
->dict
,c
->argv
[2]);
6637 addReply(c
,shared
.nokeyerr
);
6640 key
= dictGetEntryKey(de
);
6641 val
= dictGetEntryVal(de
);
6642 addReplySds(c
,sdscatprintf(sdsempty(),
6643 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d serializedlength:%lld\r\n",
6644 (void*)key
, key
->refcount
, (void*)val
, val
->refcount
,
6645 val
->encoding
, rdbSavedObjectLen(val
)));
6647 addReplySds(c
,sdsnew(
6648 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>|RELOAD]\r\n"));
6652 static void _redisAssert(char *estr
) {
6653 redisLog(REDIS_WARNING
,"=== ASSERTION FAILED ===");
6654 redisLog(REDIS_WARNING
,"==> %s\n",estr
);
6655 #ifdef HAVE_BACKTRACE
6656 redisLog(REDIS_WARNING
,"(forcing SIGSEGV in order to print the stack trace)");
6661 /* =================================== Main! ================================ */
6664 int linuxOvercommitMemoryValue(void) {
6665 FILE *fp
= fopen("/proc/sys/vm/overcommit_memory","r");
6669 if (fgets(buf
,64,fp
) == NULL
) {
6678 void linuxOvercommitMemoryWarning(void) {
6679 if (linuxOvercommitMemoryValue() == 0) {
6680 redisLog(REDIS_WARNING
,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
6683 #endif /* __linux__ */
6685 static void daemonize(void) {
6689 if (fork() != 0) exit(0); /* parent exits */
6690 printf("New pid: %d\n", getpid());
6691 setsid(); /* create a new session */
6693 /* Every output goes to /dev/null. If Redis is daemonized but
6694 * the 'logfile' is set to 'stdout' in the configuration file
6695 * it will not log at all. */
6696 if ((fd
= open("/dev/null", O_RDWR
, 0)) != -1) {
6697 dup2(fd
, STDIN_FILENO
);
6698 dup2(fd
, STDOUT_FILENO
);
6699 dup2(fd
, STDERR_FILENO
);
6700 if (fd
> STDERR_FILENO
) close(fd
);
6702 /* Try to write the pid file */
6703 fp
= fopen(server
.pidfile
,"w");
6705 fprintf(fp
,"%d\n",getpid());
6710 int main(int argc
, char **argv
) {
6713 resetServerSaveParams();
6714 loadServerConfig(argv
[1]);
6715 } else if (argc
> 2) {
6716 fprintf(stderr
,"Usage: ./redis-server [/path/to/redis.conf]\n");
6719 redisLog(REDIS_WARNING
,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
6721 if (server
.daemonize
) daemonize();
6723 redisLog(REDIS_NOTICE
,"Server started, Redis version " REDIS_VERSION
);
6725 linuxOvercommitMemoryWarning();
6727 if (server
.appendonly
) {
6728 if (loadAppendOnlyFile(server
.appendfilename
) == REDIS_OK
)
6729 redisLog(REDIS_NOTICE
,"DB loaded from append only file");
6731 if (rdbLoad(server
.dbfilename
) == REDIS_OK
)
6732 redisLog(REDIS_NOTICE
,"DB loaded from disk");
6734 if (aeCreateFileEvent(server
.el
, server
.fd
, AE_READABLE
,
6735 acceptHandler
, NULL
) == AE_ERR
) oom("creating file event");
6736 redisLog(REDIS_NOTICE
,"The server is now ready to accept connections on port %d", server
.port
);
6738 aeDeleteEventLoop(server
.el
);
6742 /* ============================= Backtrace support ========================= */
6744 #ifdef HAVE_BACKTRACE
6745 static char *findFuncName(void *pointer
, unsigned long *offset
);
6747 static void *getMcontextEip(ucontext_t
*uc
) {
6748 #if defined(__FreeBSD__)
6749 return (void*) uc
->uc_mcontext
.mc_eip
;
6750 #elif defined(__dietlibc__)
6751 return (void*) uc
->uc_mcontext
.eip
;
6752 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
6754 return (void*) uc
->uc_mcontext
->__ss
.__rip
;
6756 return (void*) uc
->uc_mcontext
->__ss
.__eip
;
6758 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
6759 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
6760 return (void*) uc
->uc_mcontext
->__ss
.__rip
;
6762 return (void*) uc
->uc_mcontext
->__ss
.__eip
;
6764 #elif defined(__i386__) || defined(__X86_64__) || defined(__x86_64__)
6765 return (void*) uc
->uc_mcontext
.gregs
[REG_EIP
]; /* Linux 32/64 bit */
6766 #elif defined(__ia64__) /* Linux IA64 */
6767 return (void*) uc
->uc_mcontext
.sc_ip
;
6773 static void segvHandler(int sig
, siginfo_t
*info
, void *secret
) {
6775 char **messages
= NULL
;
6776 int i
, trace_size
= 0;
6777 unsigned long offset
=0;
6778 ucontext_t
*uc
= (ucontext_t
*) secret
;
6780 REDIS_NOTUSED(info
);
6782 redisLog(REDIS_WARNING
,
6783 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION
, sig
);
6784 infostring
= genRedisInfoString();
6785 redisLog(REDIS_WARNING
, "%s",infostring
);
6786 /* It's not safe to sdsfree() the returned string under memory
6787 * corruption conditions. Let it leak as we are going to abort */
6789 trace_size
= backtrace(trace
, 100);
6790 /* overwrite sigaction with caller's address */
6791 if (getMcontextEip(uc
) != NULL
) {
6792 trace
[1] = getMcontextEip(uc
);
6794 messages
= backtrace_symbols(trace
, trace_size
);
6796 for (i
=1; i
<trace_size
; ++i
) {
6797 char *fn
= findFuncName(trace
[i
], &offset
), *p
;
6799 p
= strchr(messages
[i
],'+');
6800 if (!fn
|| (p
&& ((unsigned long)strtol(p
+1,NULL
,10)) < offset
)) {
6801 redisLog(REDIS_WARNING
,"%s", messages
[i
]);
6803 redisLog(REDIS_WARNING
,"%d redis-server %p %s + %d", i
, trace
[i
], fn
, (unsigned int)offset
);
6806 /* free(messages); Don't call free() with possibly corrupted memory. */
6810 static void setupSigSegvAction(void) {
6811 struct sigaction act
;
6813 sigemptyset (&act
.sa_mask
);
6814 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
6815 * is used. Otherwise, sa_handler is used */
6816 act
.sa_flags
= SA_NODEFER
| SA_ONSTACK
| SA_RESETHAND
| SA_SIGINFO
;
6817 act
.sa_sigaction
= segvHandler
;
6818 sigaction (SIGSEGV
, &act
, NULL
);
6819 sigaction (SIGBUS
, &act
, NULL
);
6820 sigaction (SIGFPE
, &act
, NULL
);
6821 sigaction (SIGILL
, &act
, NULL
);
6822 sigaction (SIGBUS
, &act
, NULL
);
6826 #include "staticsymbols.h"
6827 /* This function try to convert a pointer into a function name. It's used in
6828 * oreder to provide a backtrace under segmentation fault that's able to
6829 * display functions declared as static (otherwise the backtrace is useless). */
6830 static char *findFuncName(void *pointer
, unsigned long *offset
){
6832 unsigned long off
, minoff
= 0;
6834 /* Try to match against the Symbol with the smallest offset */
6835 for (i
=0; symsTable
[i
].pointer
; i
++) {
6836 unsigned long lp
= (unsigned long) pointer
;
6838 if (lp
!= (unsigned long)-1 && lp
>= symsTable
[i
].pointer
) {
6839 off
=lp
-symsTable
[i
].pointer
;
6840 if (ret
< 0 || off
< minoff
) {
6846 if (ret
== -1) return NULL
;
6848 return symsTable
[ret
].name
;
6850 #else /* HAVE_BACKTRACE */
6851 static void setupSigSegvAction(void) {
6853 #endif /* HAVE_BACKTRACE */