2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
30 #define REDIS_VERSION "1.3.2"
40 #define __USE_POSIX199309
46 #endif /* HAVE_BACKTRACE */
54 #include <arpa/inet.h>
58 #include <sys/resource.h>
64 #include "solarisfixes.h"
68 #include "ae.h" /* Event driven programming library */
69 #include "sds.h" /* Dynamic safe strings */
70 #include "anet.h" /* Networking the easy way */
71 #include "dict.h" /* Hash tables */
72 #include "adlist.h" /* Linked lists */
73 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
74 #include "lzf.h" /* LZF compression library */
75 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
81 /* Static server configuration */
82 #define REDIS_SERVERPORT 6379 /* TCP port */
83 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
84 #define REDIS_IOBUF_LEN 1024
85 #define REDIS_LOADBUF_LEN 1024
86 #define REDIS_STATIC_ARGS 4
87 #define REDIS_DEFAULT_DBNUM 16
88 #define REDIS_CONFIGLINE_MAX 1024
89 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
90 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
91 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
92 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
93 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
95 /* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */
96 #define REDIS_WRITEV_THRESHOLD 3
97 /* Max number of iovecs used for each writev call */
98 #define REDIS_WRITEV_IOVEC_COUNT 256
100 /* Hash table parameters */
101 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
104 #define REDIS_CMD_BULK 1 /* Bulk write command */
105 #define REDIS_CMD_INLINE 2 /* Inline command */
106 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
107 this flags will return an error when the 'maxmemory' option is set in the
108 config file and the server is using more than maxmemory bytes of memory.
109 In short this commands are denied on low memory conditions. */
110 #define REDIS_CMD_DENYOOM 4
113 #define REDIS_STRING 0
119 /* Objects encoding */
120 #define REDIS_ENCODING_RAW 0 /* Raw representation */
121 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
123 /* Object types only used for dumping to disk */
124 #define REDIS_EXPIRETIME 253
125 #define REDIS_SELECTDB 254
126 #define REDIS_EOF 255
128 /* Defines related to the dump file format. To store 32 bits lengths for short
129 * keys requires a lot of space, so we check the most significant 2 bits of
130 * the first byte to interpreter the length:
132 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
133 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
134 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
135 * 11|000000 this means: specially encoded object will follow. The six bits
136 * number specify the kind of object that follows.
137 * See the REDIS_RDB_ENC_* defines.
139 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
140 * values, will fit inside. */
141 #define REDIS_RDB_6BITLEN 0
142 #define REDIS_RDB_14BITLEN 1
143 #define REDIS_RDB_32BITLEN 2
144 #define REDIS_RDB_ENCVAL 3
145 #define REDIS_RDB_LENERR UINT_MAX
147 /* When a length of a string object stored on disk has the first two bits
148 * set, the remaining two bits specify a special encoding for the object
149 * accordingly to the following defines: */
150 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
151 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
152 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
153 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
155 /* Virtual memory object->where field. */
156 #define REDIS_VM_MEMORY 0 /* The object is on memory */
157 #define REDIS_VM_SWAPPED 1 /* The object is on disk */
158 #define REDIS_VM_SWAPPING 2 /* Redis is swapping this object on disk */
159 #define REDIS_VM_LOADING 3 /* Redis is loading this object from disk */
161 /* Virtual memory static configuration stuff.
162 * Check vmFindContiguousPages() to know more about this magic numbers. */
163 #define REDIS_VM_MAX_NEAR_PAGES 65536
164 #define REDIS_VM_MAX_RANDOM_JUMP 4096
167 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
168 #define REDIS_SLAVE 2 /* This client is a slave server */
169 #define REDIS_MASTER 4 /* This client is a master server */
170 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
171 #define REDIS_MULTI 16 /* This client is in a MULTI context */
172 #define REDIS_BLOCKED 32 /* The client is waiting in a blocking operation */
174 /* Slave replication state - slave side */
175 #define REDIS_REPL_NONE 0 /* No active replication */
176 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
177 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
179 /* Slave replication state - from the point of view of master
180 * Note that in SEND_BULK and ONLINE state the slave receives new updates
181 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
182 * to start the next background saving in order to send updates to it. */
183 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
184 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
185 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
186 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
188 /* List related stuff */
192 /* Sort operations */
193 #define REDIS_SORT_GET 0
194 #define REDIS_SORT_ASC 1
195 #define REDIS_SORT_DESC 2
196 #define REDIS_SORTKEY_MAX 1024
199 #define REDIS_DEBUG 0
200 #define REDIS_NOTICE 1
201 #define REDIS_WARNING 2
203 /* Anti-warning macro... */
204 #define REDIS_NOTUSED(V) ((void) V)
206 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
207 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
209 /* Append only defines */
210 #define APPENDFSYNC_NO 0
211 #define APPENDFSYNC_ALWAYS 1
212 #define APPENDFSYNC_EVERYSEC 2
214 /* We can print the stacktrace, so our assert is defined this way: */
215 #define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e),exit(1)))
216 static void _redisAssert(char *estr
);
218 /*================================= Data types ============================== */
220 /* A redis object, that is a type able to hold a string / list / set */
222 /* The VM object structure */
223 struct redisObjectVM
{
224 off_t offset
; /* the page at witch the object is stored on disk */
225 int pages
; /* number of pages used on disk */
228 /* The actual Redis Object */
229 typedef struct redisObject
{
232 unsigned char encoding
;
233 unsigned char storage
; /* where? REDIS_VM_MEMORY, REDIS_VM_SWAPPED, ... */
234 unsigned char notused
;
236 /* VM fields, this are only allocated if VM is active, otherwise the
237 * object allocation function will just allocate
238 * sizeof(redisObjct) minus sizeof(redisObjectVM), so using
239 * Redis without VM active will not have any overhead. */
240 struct redisObjectVM vm
;
243 /* Macro used to initalize a Redis object allocated on the stack.
244 * Note that this macro is taken near the structure definition to make sure
245 * we'll update it when the structure is changed, to avoid bugs like
246 * bug #85 introduced exactly in this way. */
247 #define initStaticStringObject(_var,_ptr) do { \
249 _var.type = REDIS_STRING; \
250 _var.encoding = REDIS_ENCODING_RAW; \
254 typedef struct redisDb
{
255 dict
*dict
; /* The keyspace for this DB */
256 dict
*expires
; /* Timeout of keys with a timeout set */
257 dict
*blockingkeys
; /* Keys with clients waiting for data (BLPOP) */
261 /* Client MULTI/EXEC state */
262 typedef struct multiCmd
{
265 struct redisCommand
*cmd
;
268 typedef struct multiState
{
269 multiCmd
*commands
; /* Array of MULTI commands */
270 int count
; /* Total number of MULTI commands */
273 /* With multiplexing we need to take per-clinet state.
274 * Clients are taken in a liked list. */
275 typedef struct redisClient
{
280 robj
**argv
, **mbargv
;
282 int bulklen
; /* bulk read len. -1 if not in bulk read mode */
283 int multibulk
; /* multi bulk command format active */
286 time_t lastinteraction
; /* time of the last interaction, used for timeout */
287 int flags
; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
289 int slaveseldb
; /* slave selected db, if this client is a slave */
290 int authenticated
; /* when requirepass is non-NULL */
291 int replstate
; /* replication state if this is a slave */
292 int repldbfd
; /* replication DB file descriptor */
293 long repldboff
; /* replication DB file offset */
294 off_t repldbsize
; /* replication DB file size */
295 multiState mstate
; /* MULTI/EXEC state */
296 robj
**blockingkeys
; /* The key we waiting to terminate a blocking
297 * operation such as BLPOP. Otherwise NULL. */
298 int blockingkeysnum
; /* Number of blocking keys */
299 time_t blockingto
; /* Blocking operation timeout. If UNIX current time
300 * is >= blockingto then the operation timed out. */
308 /* Global server state structure */
313 dict
*sharingpool
; /* Poll used for object sharing */
314 unsigned int sharingpoolsize
;
315 long long dirty
; /* changes to DB from the last save */
317 list
*slaves
, *monitors
;
318 char neterr
[ANET_ERR_LEN
];
320 int cronloops
; /* number of times the cron function run */
321 list
*objfreelist
; /* A list of freed objects to avoid malloc() */
322 time_t lastsave
; /* Unix time of last save succeeede */
323 size_t usedmemory
; /* Used memory in megabytes */
324 /* Fields used only for stats */
325 time_t stat_starttime
; /* server start time */
326 long long stat_numcommands
; /* number of processed commands */
327 long long stat_numconnections
; /* number of connections received */
340 pid_t bgsavechildpid
;
341 pid_t bgrewritechildpid
;
342 sds bgrewritebuf
; /* buffer taken by parent during oppend only rewrite */
343 struct saveparam
*saveparams
;
348 char *appendfilename
;
352 /* Replication related */
357 redisClient
*master
; /* client that is master for this slave */
359 unsigned int maxclients
;
360 unsigned long maxmemory
;
361 unsigned int blockedclients
;
362 /* Sort parameters - qsort_r() is only available under BSD so we
363 * have to take this state global, in order to pass it to sortCompare() */
367 /* Virtual memory configuration */
372 /* Virtual memory state */
375 off_t vm_next_page
; /* Next probably empty page */
376 off_t vm_near_pages
; /* Number of pages allocated sequentially */
377 unsigned char *vm_bitmap
; /* Bitmap of free/used pages */
380 typedef void redisCommandProc(redisClient
*c
);
381 struct redisCommand
{
383 redisCommandProc
*proc
;
388 struct redisFunctionSym
{
390 unsigned long pointer
;
393 typedef struct _redisSortObject
{
401 typedef struct _redisSortOperation
{
404 } redisSortOperation
;
406 /* ZSETs use a specialized version of Skiplists */
408 typedef struct zskiplistNode
{
409 struct zskiplistNode
**forward
;
410 struct zskiplistNode
*backward
;
415 typedef struct zskiplist
{
416 struct zskiplistNode
*header
, *tail
;
417 unsigned long length
;
421 typedef struct zset
{
426 /* Our shared "common" objects */
428 struct sharedObjectsStruct
{
429 robj
*crlf
, *ok
, *err
, *emptybulk
, *czero
, *cone
, *pong
, *space
,
430 *colon
, *nullbulk
, *nullmultibulk
, *queued
,
431 *emptymultibulk
, *wrongtypeerr
, *nokeyerr
, *syntaxerr
, *sameobjecterr
,
432 *outofrangeerr
, *plus
,
433 *select0
, *select1
, *select2
, *select3
, *select4
,
434 *select5
, *select6
, *select7
, *select8
, *select9
;
437 /* Global vars that are actally used as constants. The following double
438 * values are used for double on-disk serialization, and are initialized
439 * at runtime to avoid strange compiler optimizations. */
441 static double R_Zero
, R_PosInf
, R_NegInf
, R_Nan
;
443 /*================================ Prototypes =============================== */
445 static void freeStringObject(robj
*o
);
446 static void freeListObject(robj
*o
);
447 static void freeSetObject(robj
*o
);
448 static void decrRefCount(void *o
);
449 static robj
*createObject(int type
, void *ptr
);
450 static void freeClient(redisClient
*c
);
451 static int rdbLoad(char *filename
);
452 static void addReply(redisClient
*c
, robj
*obj
);
453 static void addReplySds(redisClient
*c
, sds s
);
454 static void incrRefCount(robj
*o
);
455 static int rdbSaveBackground(char *filename
);
456 static robj
*createStringObject(char *ptr
, size_t len
);
457 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
);
458 static void feedAppendOnlyFile(struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
);
459 static int syncWithMaster(void);
460 static robj
*tryObjectSharing(robj
*o
);
461 static int tryObjectEncoding(robj
*o
);
462 static robj
*getDecodedObject(robj
*o
);
463 static int removeExpire(redisDb
*db
, robj
*key
);
464 static int expireIfNeeded(redisDb
*db
, robj
*key
);
465 static int deleteIfVolatile(redisDb
*db
, robj
*key
);
466 static int deleteKey(redisDb
*db
, robj
*key
);
467 static time_t getExpire(redisDb
*db
, robj
*key
);
468 static int setExpire(redisDb
*db
, robj
*key
, time_t when
);
469 static void updateSlavesWaitingBgsave(int bgsaveerr
);
470 static void freeMemoryIfNeeded(void);
471 static int processCommand(redisClient
*c
);
472 static void setupSigSegvAction(void);
473 static void rdbRemoveTempFile(pid_t childpid
);
474 static void aofRemoveTempFile(pid_t childpid
);
475 static size_t stringObjectLen(robj
*o
);
476 static void processInputBuffer(redisClient
*c
);
477 static zskiplist
*zslCreate(void);
478 static void zslFree(zskiplist
*zsl
);
479 static void zslInsert(zskiplist
*zsl
, double score
, robj
*obj
);
480 static void sendReplyToClientWritev(aeEventLoop
*el
, int fd
, void *privdata
, int mask
);
481 static void initClientMultiState(redisClient
*c
);
482 static void freeClientMultiState(redisClient
*c
);
483 static void queueMultiCommand(redisClient
*c
, struct redisCommand
*cmd
);
484 static void unblockClient(redisClient
*c
);
485 static int handleClientsWaitingListPush(redisClient
*c
, robj
*key
, robj
*ele
);
486 static void vmInit(void);
488 static void authCommand(redisClient
*c
);
489 static void pingCommand(redisClient
*c
);
490 static void echoCommand(redisClient
*c
);
491 static void setCommand(redisClient
*c
);
492 static void setnxCommand(redisClient
*c
);
493 static void getCommand(redisClient
*c
);
494 static void delCommand(redisClient
*c
);
495 static void existsCommand(redisClient
*c
);
496 static void incrCommand(redisClient
*c
);
497 static void decrCommand(redisClient
*c
);
498 static void incrbyCommand(redisClient
*c
);
499 static void decrbyCommand(redisClient
*c
);
500 static void selectCommand(redisClient
*c
);
501 static void randomkeyCommand(redisClient
*c
);
502 static void keysCommand(redisClient
*c
);
503 static void dbsizeCommand(redisClient
*c
);
504 static void lastsaveCommand(redisClient
*c
);
505 static void saveCommand(redisClient
*c
);
506 static void bgsaveCommand(redisClient
*c
);
507 static void bgrewriteaofCommand(redisClient
*c
);
508 static void shutdownCommand(redisClient
*c
);
509 static void moveCommand(redisClient
*c
);
510 static void renameCommand(redisClient
*c
);
511 static void renamenxCommand(redisClient
*c
);
512 static void lpushCommand(redisClient
*c
);
513 static void rpushCommand(redisClient
*c
);
514 static void lpopCommand(redisClient
*c
);
515 static void rpopCommand(redisClient
*c
);
516 static void llenCommand(redisClient
*c
);
517 static void lindexCommand(redisClient
*c
);
518 static void lrangeCommand(redisClient
*c
);
519 static void ltrimCommand(redisClient
*c
);
520 static void typeCommand(redisClient
*c
);
521 static void lsetCommand(redisClient
*c
);
522 static void saddCommand(redisClient
*c
);
523 static void sremCommand(redisClient
*c
);
524 static void smoveCommand(redisClient
*c
);
525 static void sismemberCommand(redisClient
*c
);
526 static void scardCommand(redisClient
*c
);
527 static void spopCommand(redisClient
*c
);
528 static void srandmemberCommand(redisClient
*c
);
529 static void sinterCommand(redisClient
*c
);
530 static void sinterstoreCommand(redisClient
*c
);
531 static void sunionCommand(redisClient
*c
);
532 static void sunionstoreCommand(redisClient
*c
);
533 static void sdiffCommand(redisClient
*c
);
534 static void sdiffstoreCommand(redisClient
*c
);
535 static void syncCommand(redisClient
*c
);
536 static void flushdbCommand(redisClient
*c
);
537 static void flushallCommand(redisClient
*c
);
538 static void sortCommand(redisClient
*c
);
539 static void lremCommand(redisClient
*c
);
540 static void rpoplpushcommand(redisClient
*c
);
541 static void infoCommand(redisClient
*c
);
542 static void mgetCommand(redisClient
*c
);
543 static void monitorCommand(redisClient
*c
);
544 static void expireCommand(redisClient
*c
);
545 static void expireatCommand(redisClient
*c
);
546 static void getsetCommand(redisClient
*c
);
547 static void ttlCommand(redisClient
*c
);
548 static void slaveofCommand(redisClient
*c
);
549 static void debugCommand(redisClient
*c
);
550 static void msetCommand(redisClient
*c
);
551 static void msetnxCommand(redisClient
*c
);
552 static void zaddCommand(redisClient
*c
);
553 static void zincrbyCommand(redisClient
*c
);
554 static void zrangeCommand(redisClient
*c
);
555 static void zrangebyscoreCommand(redisClient
*c
);
556 static void zrevrangeCommand(redisClient
*c
);
557 static void zcardCommand(redisClient
*c
);
558 static void zremCommand(redisClient
*c
);
559 static void zscoreCommand(redisClient
*c
);
560 static void zremrangebyscoreCommand(redisClient
*c
);
561 static void multiCommand(redisClient
*c
);
562 static void execCommand(redisClient
*c
);
563 static void blpopCommand(redisClient
*c
);
564 static void brpopCommand(redisClient
*c
);
566 /*================================= Globals ================================= */
569 static struct redisServer server
; /* server global state */
570 static struct redisCommand cmdTable
[] = {
571 {"get",getCommand
,2,REDIS_CMD_INLINE
},
572 {"set",setCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
573 {"setnx",setnxCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
574 {"del",delCommand
,-2,REDIS_CMD_INLINE
},
575 {"exists",existsCommand
,2,REDIS_CMD_INLINE
},
576 {"incr",incrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
577 {"decr",decrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
578 {"mget",mgetCommand
,-2,REDIS_CMD_INLINE
},
579 {"rpush",rpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
580 {"lpush",lpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
581 {"rpop",rpopCommand
,2,REDIS_CMD_INLINE
},
582 {"lpop",lpopCommand
,2,REDIS_CMD_INLINE
},
583 {"brpop",brpopCommand
,-3,REDIS_CMD_INLINE
},
584 {"blpop",blpopCommand
,-3,REDIS_CMD_INLINE
},
585 {"llen",llenCommand
,2,REDIS_CMD_INLINE
},
586 {"lindex",lindexCommand
,3,REDIS_CMD_INLINE
},
587 {"lset",lsetCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
588 {"lrange",lrangeCommand
,4,REDIS_CMD_INLINE
},
589 {"ltrim",ltrimCommand
,4,REDIS_CMD_INLINE
},
590 {"lrem",lremCommand
,4,REDIS_CMD_BULK
},
591 {"rpoplpush",rpoplpushcommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
592 {"sadd",saddCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
593 {"srem",sremCommand
,3,REDIS_CMD_BULK
},
594 {"smove",smoveCommand
,4,REDIS_CMD_BULK
},
595 {"sismember",sismemberCommand
,3,REDIS_CMD_BULK
},
596 {"scard",scardCommand
,2,REDIS_CMD_INLINE
},
597 {"spop",spopCommand
,2,REDIS_CMD_INLINE
},
598 {"srandmember",srandmemberCommand
,2,REDIS_CMD_INLINE
},
599 {"sinter",sinterCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
600 {"sinterstore",sinterstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
601 {"sunion",sunionCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
602 {"sunionstore",sunionstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
603 {"sdiff",sdiffCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
604 {"sdiffstore",sdiffstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
605 {"smembers",sinterCommand
,2,REDIS_CMD_INLINE
},
606 {"zadd",zaddCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
607 {"zincrby",zincrbyCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
608 {"zrem",zremCommand
,3,REDIS_CMD_BULK
},
609 {"zremrangebyscore",zremrangebyscoreCommand
,4,REDIS_CMD_INLINE
},
610 {"zrange",zrangeCommand
,-4,REDIS_CMD_INLINE
},
611 {"zrangebyscore",zrangebyscoreCommand
,-4,REDIS_CMD_INLINE
},
612 {"zrevrange",zrevrangeCommand
,-4,REDIS_CMD_INLINE
},
613 {"zcard",zcardCommand
,2,REDIS_CMD_INLINE
},
614 {"zscore",zscoreCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
615 {"incrby",incrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
616 {"decrby",decrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
617 {"getset",getsetCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
618 {"mset",msetCommand
,-3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
619 {"msetnx",msetnxCommand
,-3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
620 {"randomkey",randomkeyCommand
,1,REDIS_CMD_INLINE
},
621 {"select",selectCommand
,2,REDIS_CMD_INLINE
},
622 {"move",moveCommand
,3,REDIS_CMD_INLINE
},
623 {"rename",renameCommand
,3,REDIS_CMD_INLINE
},
624 {"renamenx",renamenxCommand
,3,REDIS_CMD_INLINE
},
625 {"expire",expireCommand
,3,REDIS_CMD_INLINE
},
626 {"expireat",expireatCommand
,3,REDIS_CMD_INLINE
},
627 {"keys",keysCommand
,2,REDIS_CMD_INLINE
},
628 {"dbsize",dbsizeCommand
,1,REDIS_CMD_INLINE
},
629 {"auth",authCommand
,2,REDIS_CMD_INLINE
},
630 {"ping",pingCommand
,1,REDIS_CMD_INLINE
},
631 {"echo",echoCommand
,2,REDIS_CMD_BULK
},
632 {"save",saveCommand
,1,REDIS_CMD_INLINE
},
633 {"bgsave",bgsaveCommand
,1,REDIS_CMD_INLINE
},
634 {"bgrewriteaof",bgrewriteaofCommand
,1,REDIS_CMD_INLINE
},
635 {"shutdown",shutdownCommand
,1,REDIS_CMD_INLINE
},
636 {"lastsave",lastsaveCommand
,1,REDIS_CMD_INLINE
},
637 {"type",typeCommand
,2,REDIS_CMD_INLINE
},
638 {"multi",multiCommand
,1,REDIS_CMD_INLINE
},
639 {"exec",execCommand
,1,REDIS_CMD_INLINE
},
640 {"sync",syncCommand
,1,REDIS_CMD_INLINE
},
641 {"flushdb",flushdbCommand
,1,REDIS_CMD_INLINE
},
642 {"flushall",flushallCommand
,1,REDIS_CMD_INLINE
},
643 {"sort",sortCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
644 {"info",infoCommand
,1,REDIS_CMD_INLINE
},
645 {"monitor",monitorCommand
,1,REDIS_CMD_INLINE
},
646 {"ttl",ttlCommand
,2,REDIS_CMD_INLINE
},
647 {"slaveof",slaveofCommand
,3,REDIS_CMD_INLINE
},
648 {"debug",debugCommand
,-2,REDIS_CMD_INLINE
},
652 /*============================ Utility functions ============================ */
654 /* Glob-style pattern matching. */
655 int stringmatchlen(const char *pattern
, int patternLen
,
656 const char *string
, int stringLen
, int nocase
)
661 while (pattern
[1] == '*') {
666 return 1; /* match */
668 if (stringmatchlen(pattern
+1, patternLen
-1,
669 string
, stringLen
, nocase
))
670 return 1; /* match */
674 return 0; /* no match */
678 return 0; /* no match */
688 not = pattern
[0] == '^';
695 if (pattern
[0] == '\\') {
698 if (pattern
[0] == string
[0])
700 } else if (pattern
[0] == ']') {
702 } else if (patternLen
== 0) {
706 } else if (pattern
[1] == '-' && patternLen
>= 3) {
707 int start
= pattern
[0];
708 int end
= pattern
[2];
716 start
= tolower(start
);
722 if (c
>= start
&& c
<= end
)
726 if (pattern
[0] == string
[0])
729 if (tolower((int)pattern
[0]) == tolower((int)string
[0]))
739 return 0; /* no match */
745 if (patternLen
>= 2) {
752 if (pattern
[0] != string
[0])
753 return 0; /* no match */
755 if (tolower((int)pattern
[0]) != tolower((int)string
[0]))
756 return 0; /* no match */
764 if (stringLen
== 0) {
765 while(*pattern
== '*') {
772 if (patternLen
== 0 && stringLen
== 0)
777 static void redisLog(int level
, const char *fmt
, ...) {
781 fp
= (server
.logfile
== NULL
) ? stdout
: fopen(server
.logfile
,"a");
785 if (level
>= server
.verbosity
) {
791 strftime(buf
,64,"%d %b %H:%M:%S",localtime(&now
));
792 fprintf(fp
,"%s %c ",buf
,c
[level
]);
793 vfprintf(fp
, fmt
, ap
);
799 if (server
.logfile
) fclose(fp
);
802 /*====================== Hash table type implementation ==================== */
804 /* This is an hash table type that uses the SDS dynamic strings libary as
805 * keys and radis objects as values (objects can hold SDS strings,
808 static void dictVanillaFree(void *privdata
, void *val
)
810 DICT_NOTUSED(privdata
);
814 static void dictListDestructor(void *privdata
, void *val
)
816 DICT_NOTUSED(privdata
);
817 listRelease((list
*)val
);
820 static int sdsDictKeyCompare(void *privdata
, const void *key1
,
824 DICT_NOTUSED(privdata
);
826 l1
= sdslen((sds
)key1
);
827 l2
= sdslen((sds
)key2
);
828 if (l1
!= l2
) return 0;
829 return memcmp(key1
, key2
, l1
) == 0;
832 static void dictRedisObjectDestructor(void *privdata
, void *val
)
834 DICT_NOTUSED(privdata
);
839 static int dictObjKeyCompare(void *privdata
, const void *key1
,
842 const robj
*o1
= key1
, *o2
= key2
;
843 return sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
846 static unsigned int dictObjHash(const void *key
) {
848 return dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
851 static int dictEncObjKeyCompare(void *privdata
, const void *key1
,
854 robj
*o1
= (robj
*) key1
, *o2
= (robj
*) key2
;
857 o1
= getDecodedObject(o1
);
858 o2
= getDecodedObject(o2
);
859 cmp
= sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
865 static unsigned int dictEncObjHash(const void *key
) {
866 robj
*o
= (robj
*) key
;
868 o
= getDecodedObject(o
);
869 unsigned int hash
= dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
874 static dictType setDictType
= {
875 dictEncObjHash
, /* hash function */
878 dictEncObjKeyCompare
, /* key compare */
879 dictRedisObjectDestructor
, /* key destructor */
880 NULL
/* val destructor */
883 static dictType zsetDictType
= {
884 dictEncObjHash
, /* hash function */
887 dictEncObjKeyCompare
, /* key compare */
888 dictRedisObjectDestructor
, /* key destructor */
889 dictVanillaFree
/* val destructor of malloc(sizeof(double)) */
892 static dictType hashDictType
= {
893 dictObjHash
, /* hash function */
896 dictObjKeyCompare
, /* key compare */
897 dictRedisObjectDestructor
, /* key destructor */
898 dictRedisObjectDestructor
/* val destructor */
901 /* Keylist hash table type has unencoded redis objects as keys and
902 * lists as values. It's used for blocking operations (BLPOP) */
903 static dictType keylistDictType
= {
904 dictObjHash
, /* hash function */
907 dictObjKeyCompare
, /* key compare */
908 dictRedisObjectDestructor
, /* key destructor */
909 dictListDestructor
/* val destructor */
912 /* ========================= Random utility functions ======================= */
914 /* Redis generally does not try to recover from out of memory conditions
915 * when allocating objects or strings, it is not clear if it will be possible
916 * to report this condition to the client since the networking layer itself
917 * is based on heap allocation for send buffers, so we simply abort.
918 * At least the code will be simpler to read... */
919 static void oom(const char *msg
) {
920 redisLog(REDIS_WARNING
, "%s: Out of memory\n",msg
);
925 /* ====================== Redis server networking stuff ===================== */
926 static void closeTimedoutClients(void) {
929 time_t now
= time(NULL
);
931 listRewind(server
.clients
);
932 while ((ln
= listYield(server
.clients
)) != NULL
) {
933 c
= listNodeValue(ln
);
934 if (server
.maxidletime
&&
935 !(c
->flags
& REDIS_SLAVE
) && /* no timeout for slaves */
936 !(c
->flags
& REDIS_MASTER
) && /* no timeout for masters */
937 (now
- c
->lastinteraction
> server
.maxidletime
))
939 redisLog(REDIS_DEBUG
,"Closing idle client");
941 } else if (c
->flags
& REDIS_BLOCKED
) {
942 if (c
->blockingto
!= 0 && c
->blockingto
< now
) {
943 addReply(c
,shared
.nullmultibulk
);
950 static int htNeedsResize(dict
*dict
) {
951 long long size
, used
;
953 size
= dictSlots(dict
);
954 used
= dictSize(dict
);
955 return (size
&& used
&& size
> DICT_HT_INITIAL_SIZE
&&
956 (used
*100/size
< REDIS_HT_MINFILL
));
959 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
960 * we resize the hash table to save memory */
961 static void tryResizeHashTables(void) {
964 for (j
= 0; j
< server
.dbnum
; j
++) {
965 if (htNeedsResize(server
.db
[j
].dict
)) {
966 redisLog(REDIS_DEBUG
,"The hash table %d is too sparse, resize it...",j
);
967 dictResize(server
.db
[j
].dict
);
968 redisLog(REDIS_DEBUG
,"Hash table %d resized.",j
);
970 if (htNeedsResize(server
.db
[j
].expires
))
971 dictResize(server
.db
[j
].expires
);
975 /* A background saving child (BGSAVE) terminated its work. Handle this. */
976 void backgroundSaveDoneHandler(int statloc
) {
977 int exitcode
= WEXITSTATUS(statloc
);
978 int bysignal
= WIFSIGNALED(statloc
);
980 if (!bysignal
&& exitcode
== 0) {
981 redisLog(REDIS_NOTICE
,
982 "Background saving terminated with success");
984 server
.lastsave
= time(NULL
);
985 } else if (!bysignal
&& exitcode
!= 0) {
986 redisLog(REDIS_WARNING
, "Background saving error");
988 redisLog(REDIS_WARNING
,
989 "Background saving terminated by signal");
990 rdbRemoveTempFile(server
.bgsavechildpid
);
992 server
.bgsavechildpid
= -1;
993 /* Possibly there are slaves waiting for a BGSAVE in order to be served
994 * (the first stage of SYNC is a bulk transfer of dump.rdb) */
995 updateSlavesWaitingBgsave(exitcode
== 0 ? REDIS_OK
: REDIS_ERR
);
998 /* A background append only file rewriting (BGREWRITEAOF) terminated its work.
1000 void backgroundRewriteDoneHandler(int statloc
) {
1001 int exitcode
= WEXITSTATUS(statloc
);
1002 int bysignal
= WIFSIGNALED(statloc
);
1004 if (!bysignal
&& exitcode
== 0) {
1008 redisLog(REDIS_NOTICE
,
1009 "Background append only file rewriting terminated with success");
1010 /* Now it's time to flush the differences accumulated by the parent */
1011 snprintf(tmpfile
,256,"temp-rewriteaof-bg-%d.aof", (int) server
.bgrewritechildpid
);
1012 fd
= open(tmpfile
,O_WRONLY
|O_APPEND
);
1014 redisLog(REDIS_WARNING
, "Not able to open the temp append only file produced by the child: %s", strerror(errno
));
1017 /* Flush our data... */
1018 if (write(fd
,server
.bgrewritebuf
,sdslen(server
.bgrewritebuf
)) !=
1019 (signed) sdslen(server
.bgrewritebuf
)) {
1020 redisLog(REDIS_WARNING
, "Error or short write trying to flush the parent diff of the append log file in the child temp file: %s", strerror(errno
));
1024 redisLog(REDIS_NOTICE
,"Parent diff flushed into the new append log file with success (%lu bytes)",sdslen(server
.bgrewritebuf
));
1025 /* Now our work is to rename the temp file into the stable file. And
1026 * switch the file descriptor used by the server for append only. */
1027 if (rename(tmpfile
,server
.appendfilename
) == -1) {
1028 redisLog(REDIS_WARNING
,"Can't rename the temp append only file into the stable one: %s", strerror(errno
));
1032 /* Mission completed... almost */
1033 redisLog(REDIS_NOTICE
,"Append only file successfully rewritten.");
1034 if (server
.appendfd
!= -1) {
1035 /* If append only is actually enabled... */
1036 close(server
.appendfd
);
1037 server
.appendfd
= fd
;
1039 server
.appendseldb
= -1; /* Make sure it will issue SELECT */
1040 redisLog(REDIS_NOTICE
,"The new append only file was selected for future appends.");
1042 /* If append only is disabled we just generate a dump in this
1043 * format. Why not? */
1046 } else if (!bysignal
&& exitcode
!= 0) {
1047 redisLog(REDIS_WARNING
, "Background append only file rewriting error");
1049 redisLog(REDIS_WARNING
,
1050 "Background append only file rewriting terminated by signal");
1053 sdsfree(server
.bgrewritebuf
);
1054 server
.bgrewritebuf
= sdsempty();
1055 aofRemoveTempFile(server
.bgrewritechildpid
);
1056 server
.bgrewritechildpid
= -1;
1059 static int serverCron(struct aeEventLoop
*eventLoop
, long long id
, void *clientData
) {
1060 int j
, loops
= server
.cronloops
++;
1061 REDIS_NOTUSED(eventLoop
);
1063 REDIS_NOTUSED(clientData
);
1065 /* Update the global state with the amount of used memory */
1066 server
.usedmemory
= zmalloc_used_memory();
1068 /* Show some info about non-empty databases */
1069 for (j
= 0; j
< server
.dbnum
; j
++) {
1070 long long size
, used
, vkeys
;
1072 size
= dictSlots(server
.db
[j
].dict
);
1073 used
= dictSize(server
.db
[j
].dict
);
1074 vkeys
= dictSize(server
.db
[j
].expires
);
1075 if (!(loops
% 5) && (used
|| vkeys
)) {
1076 redisLog(REDIS_DEBUG
,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j
,used
,vkeys
,size
);
1077 /* dictPrintStats(server.dict); */
1081 /* We don't want to resize the hash tables while a bacground saving
1082 * is in progress: the saving child is created using fork() that is
1083 * implemented with a copy-on-write semantic in most modern systems, so
1084 * if we resize the HT while there is the saving child at work actually
1085 * a lot of memory movements in the parent will cause a lot of pages
1087 if (server
.bgsavechildpid
== -1) tryResizeHashTables();
1089 /* Show information about connected clients */
1091 redisLog(REDIS_DEBUG
,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
1092 listLength(server
.clients
)-listLength(server
.slaves
),
1093 listLength(server
.slaves
),
1095 dictSize(server
.sharingpool
));
1098 /* Close connections of timedout clients */
1099 if ((server
.maxidletime
&& !(loops
% 10)) || server
.blockedclients
)
1100 closeTimedoutClients();
1102 /* Check if a background saving or AOF rewrite in progress terminated */
1103 if (server
.bgsavechildpid
!= -1 || server
.bgrewritechildpid
!= -1) {
1107 if ((pid
= wait3(&statloc
,WNOHANG
,NULL
)) != 0) {
1108 if (pid
== server
.bgsavechildpid
) {
1109 backgroundSaveDoneHandler(statloc
);
1111 backgroundRewriteDoneHandler(statloc
);
1115 /* If there is not a background saving in progress check if
1116 * we have to save now */
1117 time_t now
= time(NULL
);
1118 for (j
= 0; j
< server
.saveparamslen
; j
++) {
1119 struct saveparam
*sp
= server
.saveparams
+j
;
1121 if (server
.dirty
>= sp
->changes
&&
1122 now
-server
.lastsave
> sp
->seconds
) {
1123 redisLog(REDIS_NOTICE
,"%d changes in %d seconds. Saving...",
1124 sp
->changes
, sp
->seconds
);
1125 rdbSaveBackground(server
.dbfilename
);
1131 /* Try to expire a few timed out keys. The algorithm used is adaptive and
1132 * will use few CPU cycles if there are few expiring keys, otherwise
1133 * it will get more aggressive to avoid that too much memory is used by
1134 * keys that can be removed from the keyspace. */
1135 for (j
= 0; j
< server
.dbnum
; j
++) {
1137 redisDb
*db
= server
.db
+j
;
1139 /* Continue to expire if at the end of the cycle more than 25%
1140 * of the keys were expired. */
1142 int num
= dictSize(db
->expires
);
1143 time_t now
= time(NULL
);
1146 if (num
> REDIS_EXPIRELOOKUPS_PER_CRON
)
1147 num
= REDIS_EXPIRELOOKUPS_PER_CRON
;
1152 if ((de
= dictGetRandomKey(db
->expires
)) == NULL
) break;
1153 t
= (time_t) dictGetEntryVal(de
);
1155 deleteKey(db
,dictGetEntryKey(de
));
1159 } while (expired
> REDIS_EXPIRELOOKUPS_PER_CRON
/4);
1162 /* Check if we should connect to a MASTER */
1163 if (server
.replstate
== REDIS_REPL_CONNECT
) {
1164 redisLog(REDIS_NOTICE
,"Connecting to MASTER...");
1165 if (syncWithMaster() == REDIS_OK
) {
1166 redisLog(REDIS_NOTICE
,"MASTER <-> SLAVE sync succeeded");
1172 static void createSharedObjects(void) {
1173 shared
.crlf
= createObject(REDIS_STRING
,sdsnew("\r\n"));
1174 shared
.ok
= createObject(REDIS_STRING
,sdsnew("+OK\r\n"));
1175 shared
.err
= createObject(REDIS_STRING
,sdsnew("-ERR\r\n"));
1176 shared
.emptybulk
= createObject(REDIS_STRING
,sdsnew("$0\r\n\r\n"));
1177 shared
.czero
= createObject(REDIS_STRING
,sdsnew(":0\r\n"));
1178 shared
.cone
= createObject(REDIS_STRING
,sdsnew(":1\r\n"));
1179 shared
.nullbulk
= createObject(REDIS_STRING
,sdsnew("$-1\r\n"));
1180 shared
.nullmultibulk
= createObject(REDIS_STRING
,sdsnew("*-1\r\n"));
1181 shared
.emptymultibulk
= createObject(REDIS_STRING
,sdsnew("*0\r\n"));
1182 shared
.pong
= createObject(REDIS_STRING
,sdsnew("+PONG\r\n"));
1183 shared
.queued
= createObject(REDIS_STRING
,sdsnew("+QUEUED\r\n"));
1184 shared
.wrongtypeerr
= createObject(REDIS_STRING
,sdsnew(
1185 "-ERR Operation against a key holding the wrong kind of value\r\n"));
1186 shared
.nokeyerr
= createObject(REDIS_STRING
,sdsnew(
1187 "-ERR no such key\r\n"));
1188 shared
.syntaxerr
= createObject(REDIS_STRING
,sdsnew(
1189 "-ERR syntax error\r\n"));
1190 shared
.sameobjecterr
= createObject(REDIS_STRING
,sdsnew(
1191 "-ERR source and destination objects are the same\r\n"));
1192 shared
.outofrangeerr
= createObject(REDIS_STRING
,sdsnew(
1193 "-ERR index out of range\r\n"));
1194 shared
.space
= createObject(REDIS_STRING
,sdsnew(" "));
1195 shared
.colon
= createObject(REDIS_STRING
,sdsnew(":"));
1196 shared
.plus
= createObject(REDIS_STRING
,sdsnew("+"));
1197 shared
.select0
= createStringObject("select 0\r\n",10);
1198 shared
.select1
= createStringObject("select 1\r\n",10);
1199 shared
.select2
= createStringObject("select 2\r\n",10);
1200 shared
.select3
= createStringObject("select 3\r\n",10);
1201 shared
.select4
= createStringObject("select 4\r\n",10);
1202 shared
.select5
= createStringObject("select 5\r\n",10);
1203 shared
.select6
= createStringObject("select 6\r\n",10);
1204 shared
.select7
= createStringObject("select 7\r\n",10);
1205 shared
.select8
= createStringObject("select 8\r\n",10);
1206 shared
.select9
= createStringObject("select 9\r\n",10);
1209 static void appendServerSaveParams(time_t seconds
, int changes
) {
1210 server
.saveparams
= zrealloc(server
.saveparams
,sizeof(struct saveparam
)*(server
.saveparamslen
+1));
1211 server
.saveparams
[server
.saveparamslen
].seconds
= seconds
;
1212 server
.saveparams
[server
.saveparamslen
].changes
= changes
;
1213 server
.saveparamslen
++;
1216 static void resetServerSaveParams() {
1217 zfree(server
.saveparams
);
1218 server
.saveparams
= NULL
;
1219 server
.saveparamslen
= 0;
1222 static void initServerConfig() {
1223 server
.dbnum
= REDIS_DEFAULT_DBNUM
;
1224 server
.port
= REDIS_SERVERPORT
;
1225 server
.verbosity
= REDIS_DEBUG
;
1226 server
.maxidletime
= REDIS_MAXIDLETIME
;
1227 server
.saveparams
= NULL
;
1228 server
.logfile
= NULL
; /* NULL = log on standard output */
1229 server
.bindaddr
= NULL
;
1230 server
.glueoutputbuf
= 1;
1231 server
.daemonize
= 0;
1232 server
.appendonly
= 0;
1233 server
.appendfsync
= APPENDFSYNC_ALWAYS
;
1234 server
.lastfsync
= time(NULL
);
1235 server
.appendfd
= -1;
1236 server
.appendseldb
= -1; /* Make sure the first time will not match */
1237 server
.pidfile
= "/var/run/redis.pid";
1238 server
.dbfilename
= "dump.rdb";
1239 server
.appendfilename
= "appendonly.aof";
1240 server
.requirepass
= NULL
;
1241 server
.shareobjects
= 0;
1242 server
.rdbcompression
= 1;
1243 server
.sharingpoolsize
= 1024;
1244 server
.maxclients
= 0;
1245 server
.blockedclients
= 0;
1246 server
.maxmemory
= 0;
1247 server
.vm_enabled
= 0;
1248 server
.vm_page_size
= 256; /* 256 bytes per page */
1249 server
.vm_pages
= 1024*1024*100; /* 104 millions of pages */
1250 server
.vm_max_memory
= 1024LL*1024*1024*1; /* 1 GB of RAM */
1252 resetServerSaveParams();
1254 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1255 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1256 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1257 /* Replication related */
1259 server
.masterauth
= NULL
;
1260 server
.masterhost
= NULL
;
1261 server
.masterport
= 6379;
1262 server
.master
= NULL
;
1263 server
.replstate
= REDIS_REPL_NONE
;
1265 /* Double constants initialization */
1267 R_PosInf
= 1.0/R_Zero
;
1268 R_NegInf
= -1.0/R_Zero
;
1269 R_Nan
= R_Zero
/R_Zero
;
1272 static void initServer() {
1275 signal(SIGHUP
, SIG_IGN
);
1276 signal(SIGPIPE
, SIG_IGN
);
1277 setupSigSegvAction();
1279 server
.clients
= listCreate();
1280 server
.slaves
= listCreate();
1281 server
.monitors
= listCreate();
1282 server
.objfreelist
= listCreate();
1283 createSharedObjects();
1284 server
.el
= aeCreateEventLoop();
1285 server
.db
= zmalloc(sizeof(redisDb
)*server
.dbnum
);
1286 server
.sharingpool
= dictCreate(&setDictType
,NULL
);
1287 server
.fd
= anetTcpServer(server
.neterr
, server
.port
, server
.bindaddr
);
1288 if (server
.fd
== -1) {
1289 redisLog(REDIS_WARNING
, "Opening TCP port: %s", server
.neterr
);
1292 for (j
= 0; j
< server
.dbnum
; j
++) {
1293 server
.db
[j
].dict
= dictCreate(&hashDictType
,NULL
);
1294 server
.db
[j
].expires
= dictCreate(&setDictType
,NULL
);
1295 server
.db
[j
].blockingkeys
= dictCreate(&keylistDictType
,NULL
);
1296 server
.db
[j
].id
= j
;
1298 server
.cronloops
= 0;
1299 server
.bgsavechildpid
= -1;
1300 server
.bgrewritechildpid
= -1;
1301 server
.bgrewritebuf
= sdsempty();
1302 server
.lastsave
= time(NULL
);
1304 server
.usedmemory
= 0;
1305 server
.stat_numcommands
= 0;
1306 server
.stat_numconnections
= 0;
1307 server
.stat_starttime
= time(NULL
);
1308 aeCreateTimeEvent(server
.el
, 1, serverCron
, NULL
, NULL
);
1310 if (server
.appendonly
) {
1311 server
.appendfd
= open(server
.appendfilename
,O_WRONLY
|O_APPEND
|O_CREAT
,0644);
1312 if (server
.appendfd
== -1) {
1313 redisLog(REDIS_WARNING
, "Can't open the append-only file: %s",
1319 if (server
.vm_enabled
) vmInit();
1322 /* Empty the whole database */
1323 static long long emptyDb() {
1325 long long removed
= 0;
1327 for (j
= 0; j
< server
.dbnum
; j
++) {
1328 removed
+= dictSize(server
.db
[j
].dict
);
1329 dictEmpty(server
.db
[j
].dict
);
1330 dictEmpty(server
.db
[j
].expires
);
1335 static int yesnotoi(char *s
) {
1336 if (!strcasecmp(s
,"yes")) return 1;
1337 else if (!strcasecmp(s
,"no")) return 0;
1341 /* I agree, this is a very rudimental way to load a configuration...
1342 will improve later if the config gets more complex */
1343 static void loadServerConfig(char *filename
) {
1345 char buf
[REDIS_CONFIGLINE_MAX
+1], *err
= NULL
;
1349 if (filename
[0] == '-' && filename
[1] == '\0')
1352 if ((fp
= fopen(filename
,"r")) == NULL
) {
1353 redisLog(REDIS_WARNING
,"Fatal error, can't open config file");
1358 while(fgets(buf
,REDIS_CONFIGLINE_MAX
+1,fp
) != NULL
) {
1364 line
= sdstrim(line
," \t\r\n");
1366 /* Skip comments and blank lines*/
1367 if (line
[0] == '#' || line
[0] == '\0') {
1372 /* Split into arguments */
1373 argv
= sdssplitlen(line
,sdslen(line
)," ",1,&argc
);
1374 sdstolower(argv
[0]);
1376 /* Execute config directives */
1377 if (!strcasecmp(argv
[0],"timeout") && argc
== 2) {
1378 server
.maxidletime
= atoi(argv
[1]);
1379 if (server
.maxidletime
< 0) {
1380 err
= "Invalid timeout value"; goto loaderr
;
1382 } else if (!strcasecmp(argv
[0],"port") && argc
== 2) {
1383 server
.port
= atoi(argv
[1]);
1384 if (server
.port
< 1 || server
.port
> 65535) {
1385 err
= "Invalid port"; goto loaderr
;
1387 } else if (!strcasecmp(argv
[0],"bind") && argc
== 2) {
1388 server
.bindaddr
= zstrdup(argv
[1]);
1389 } else if (!strcasecmp(argv
[0],"save") && argc
== 3) {
1390 int seconds
= atoi(argv
[1]);
1391 int changes
= atoi(argv
[2]);
1392 if (seconds
< 1 || changes
< 0) {
1393 err
= "Invalid save parameters"; goto loaderr
;
1395 appendServerSaveParams(seconds
,changes
);
1396 } else if (!strcasecmp(argv
[0],"dir") && argc
== 2) {
1397 if (chdir(argv
[1]) == -1) {
1398 redisLog(REDIS_WARNING
,"Can't chdir to '%s': %s",
1399 argv
[1], strerror(errno
));
1402 } else if (!strcasecmp(argv
[0],"loglevel") && argc
== 2) {
1403 if (!strcasecmp(argv
[1],"debug")) server
.verbosity
= REDIS_DEBUG
;
1404 else if (!strcasecmp(argv
[1],"notice")) server
.verbosity
= REDIS_NOTICE
;
1405 else if (!strcasecmp(argv
[1],"warning")) server
.verbosity
= REDIS_WARNING
;
1407 err
= "Invalid log level. Must be one of debug, notice, warning";
1410 } else if (!strcasecmp(argv
[0],"logfile") && argc
== 2) {
1413 server
.logfile
= zstrdup(argv
[1]);
1414 if (!strcasecmp(server
.logfile
,"stdout")) {
1415 zfree(server
.logfile
);
1416 server
.logfile
= NULL
;
1418 if (server
.logfile
) {
1419 /* Test if we are able to open the file. The server will not
1420 * be able to abort just for this problem later... */
1421 logfp
= fopen(server
.logfile
,"a");
1422 if (logfp
== NULL
) {
1423 err
= sdscatprintf(sdsempty(),
1424 "Can't open the log file: %s", strerror(errno
));
1429 } else if (!strcasecmp(argv
[0],"databases") && argc
== 2) {
1430 server
.dbnum
= atoi(argv
[1]);
1431 if (server
.dbnum
< 1) {
1432 err
= "Invalid number of databases"; goto loaderr
;
1434 } else if (!strcasecmp(argv
[0],"maxclients") && argc
== 2) {
1435 server
.maxclients
= atoi(argv
[1]);
1436 } else if (!strcasecmp(argv
[0],"maxmemory") && argc
== 2) {
1437 server
.maxmemory
= strtoll(argv
[1], NULL
, 10);
1438 } else if (!strcasecmp(argv
[0],"slaveof") && argc
== 3) {
1439 server
.masterhost
= sdsnew(argv
[1]);
1440 server
.masterport
= atoi(argv
[2]);
1441 server
.replstate
= REDIS_REPL_CONNECT
;
1442 } else if (!strcasecmp(argv
[0],"masterauth") && argc
== 2) {
1443 server
.masterauth
= zstrdup(argv
[1]);
1444 } else if (!strcasecmp(argv
[0],"glueoutputbuf") && argc
== 2) {
1445 if ((server
.glueoutputbuf
= yesnotoi(argv
[1])) == -1) {
1446 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1448 } else if (!strcasecmp(argv
[0],"shareobjects") && argc
== 2) {
1449 if ((server
.shareobjects
= yesnotoi(argv
[1])) == -1) {
1450 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1452 } else if (!strcasecmp(argv
[0],"rdbcompression") && argc
== 2) {
1453 if ((server
.rdbcompression
= yesnotoi(argv
[1])) == -1) {
1454 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1456 } else if (!strcasecmp(argv
[0],"shareobjectspoolsize") && argc
== 2) {
1457 server
.sharingpoolsize
= atoi(argv
[1]);
1458 if (server
.sharingpoolsize
< 1) {
1459 err
= "invalid object sharing pool size"; goto loaderr
;
1461 } else if (!strcasecmp(argv
[0],"daemonize") && argc
== 2) {
1462 if ((server
.daemonize
= yesnotoi(argv
[1])) == -1) {
1463 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1465 } else if (!strcasecmp(argv
[0],"appendonly") && argc
== 2) {
1466 if ((server
.appendonly
= yesnotoi(argv
[1])) == -1) {
1467 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1469 } else if (!strcasecmp(argv
[0],"appendfsync") && argc
== 2) {
1470 if (!strcasecmp(argv
[1],"no")) {
1471 server
.appendfsync
= APPENDFSYNC_NO
;
1472 } else if (!strcasecmp(argv
[1],"always")) {
1473 server
.appendfsync
= APPENDFSYNC_ALWAYS
;
1474 } else if (!strcasecmp(argv
[1],"everysec")) {
1475 server
.appendfsync
= APPENDFSYNC_EVERYSEC
;
1477 err
= "argument must be 'no', 'always' or 'everysec'";
1480 } else if (!strcasecmp(argv
[0],"requirepass") && argc
== 2) {
1481 server
.requirepass
= zstrdup(argv
[1]);
1482 } else if (!strcasecmp(argv
[0],"pidfile") && argc
== 2) {
1483 server
.pidfile
= zstrdup(argv
[1]);
1484 } else if (!strcasecmp(argv
[0],"dbfilename") && argc
== 2) {
1485 server
.dbfilename
= zstrdup(argv
[1]);
1486 } else if (!strcasecmp(argv
[0],"vm-enabled") && argc
== 2) {
1487 if ((server
.vm_enabled
= yesnotoi(argv
[1])) == -1) {
1488 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1491 err
= "Bad directive or wrong number of arguments"; goto loaderr
;
1493 for (j
= 0; j
< argc
; j
++)
1498 if (fp
!= stdin
) fclose(fp
);
1502 fprintf(stderr
, "\n*** FATAL CONFIG FILE ERROR ***\n");
1503 fprintf(stderr
, "Reading the configuration file, at line %d\n", linenum
);
1504 fprintf(stderr
, ">>> '%s'\n", line
);
1505 fprintf(stderr
, "%s\n", err
);
1509 static void freeClientArgv(redisClient
*c
) {
1512 for (j
= 0; j
< c
->argc
; j
++)
1513 decrRefCount(c
->argv
[j
]);
1514 for (j
= 0; j
< c
->mbargc
; j
++)
1515 decrRefCount(c
->mbargv
[j
]);
1520 static void freeClient(redisClient
*c
) {
1523 /* Note that if the client we are freeing is blocked into a blocking
1524 * call, we have to set querybuf to NULL *before* to call unblockClient()
1525 * to avoid processInputBuffer() will get called. Also it is important
1526 * to remove the file events after this, because this call adds
1527 * the READABLE event. */
1528 sdsfree(c
->querybuf
);
1530 if (c
->flags
& REDIS_BLOCKED
)
1533 aeDeleteFileEvent(server
.el
,c
->fd
,AE_READABLE
);
1534 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1535 listRelease(c
->reply
);
1538 ln
= listSearchKey(server
.clients
,c
);
1539 redisAssert(ln
!= NULL
);
1540 listDelNode(server
.clients
,ln
);
1541 if (c
->flags
& REDIS_SLAVE
) {
1542 if (c
->replstate
== REDIS_REPL_SEND_BULK
&& c
->repldbfd
!= -1)
1544 list
*l
= (c
->flags
& REDIS_MONITOR
) ? server
.monitors
: server
.slaves
;
1545 ln
= listSearchKey(l
,c
);
1546 redisAssert(ln
!= NULL
);
1549 if (c
->flags
& REDIS_MASTER
) {
1550 server
.master
= NULL
;
1551 server
.replstate
= REDIS_REPL_CONNECT
;
1555 freeClientMultiState(c
);
1559 #define GLUEREPLY_UP_TO (1024)
1560 static void glueReplyBuffersIfNeeded(redisClient
*c
) {
1562 char buf
[GLUEREPLY_UP_TO
];
1566 listRewind(c
->reply
);
1567 while((ln
= listYield(c
->reply
))) {
1571 objlen
= sdslen(o
->ptr
);
1572 if (copylen
+ objlen
<= GLUEREPLY_UP_TO
) {
1573 memcpy(buf
+copylen
,o
->ptr
,objlen
);
1575 listDelNode(c
->reply
,ln
);
1577 if (copylen
== 0) return;
1581 /* Now the output buffer is empty, add the new single element */
1582 o
= createObject(REDIS_STRING
,sdsnewlen(buf
,copylen
));
1583 listAddNodeHead(c
->reply
,o
);
1586 static void sendReplyToClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1587 redisClient
*c
= privdata
;
1588 int nwritten
= 0, totwritten
= 0, objlen
;
1591 REDIS_NOTUSED(mask
);
1593 /* Use writev() if we have enough buffers to send */
1594 if (!server
.glueoutputbuf
&&
1595 listLength(c
->reply
) > REDIS_WRITEV_THRESHOLD
&&
1596 !(c
->flags
& REDIS_MASTER
))
1598 sendReplyToClientWritev(el
, fd
, privdata
, mask
);
1602 while(listLength(c
->reply
)) {
1603 if (server
.glueoutputbuf
&& listLength(c
->reply
) > 1)
1604 glueReplyBuffersIfNeeded(c
);
1606 o
= listNodeValue(listFirst(c
->reply
));
1607 objlen
= sdslen(o
->ptr
);
1610 listDelNode(c
->reply
,listFirst(c
->reply
));
1614 if (c
->flags
& REDIS_MASTER
) {
1615 /* Don't reply to a master */
1616 nwritten
= objlen
- c
->sentlen
;
1618 nwritten
= write(fd
, ((char*)o
->ptr
)+c
->sentlen
, objlen
- c
->sentlen
);
1619 if (nwritten
<= 0) break;
1621 c
->sentlen
+= nwritten
;
1622 totwritten
+= nwritten
;
1623 /* If we fully sent the object on head go to the next one */
1624 if (c
->sentlen
== objlen
) {
1625 listDelNode(c
->reply
,listFirst(c
->reply
));
1628 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1629 * bytes, in a single threaded server it's a good idea to serve
1630 * other clients as well, even if a very large request comes from
1631 * super fast link that is always able to accept data (in real world
1632 * scenario think about 'KEYS *' against the loopback interfae) */
1633 if (totwritten
> REDIS_MAX_WRITE_PER_EVENT
) break;
1635 if (nwritten
== -1) {
1636 if (errno
== EAGAIN
) {
1639 redisLog(REDIS_DEBUG
,
1640 "Error writing to client: %s", strerror(errno
));
1645 if (totwritten
> 0) c
->lastinteraction
= time(NULL
);
1646 if (listLength(c
->reply
) == 0) {
1648 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1652 static void sendReplyToClientWritev(aeEventLoop
*el
, int fd
, void *privdata
, int mask
)
1654 redisClient
*c
= privdata
;
1655 int nwritten
= 0, totwritten
= 0, objlen
, willwrite
;
1657 struct iovec iov
[REDIS_WRITEV_IOVEC_COUNT
];
1658 int offset
, ion
= 0;
1660 REDIS_NOTUSED(mask
);
1663 while (listLength(c
->reply
)) {
1664 offset
= c
->sentlen
;
1668 /* fill-in the iov[] array */
1669 for(node
= listFirst(c
->reply
); node
; node
= listNextNode(node
)) {
1670 o
= listNodeValue(node
);
1671 objlen
= sdslen(o
->ptr
);
1673 if (totwritten
+ objlen
- offset
> REDIS_MAX_WRITE_PER_EVENT
)
1676 if(ion
== REDIS_WRITEV_IOVEC_COUNT
)
1677 break; /* no more iovecs */
1679 iov
[ion
].iov_base
= ((char*)o
->ptr
) + offset
;
1680 iov
[ion
].iov_len
= objlen
- offset
;
1681 willwrite
+= objlen
- offset
;
1682 offset
= 0; /* just for the first item */
1689 /* write all collected blocks at once */
1690 if((nwritten
= writev(fd
, iov
, ion
)) < 0) {
1691 if (errno
!= EAGAIN
) {
1692 redisLog(REDIS_DEBUG
,
1693 "Error writing to client: %s", strerror(errno
));
1700 totwritten
+= nwritten
;
1701 offset
= c
->sentlen
;
1703 /* remove written robjs from c->reply */
1704 while (nwritten
&& listLength(c
->reply
)) {
1705 o
= listNodeValue(listFirst(c
->reply
));
1706 objlen
= sdslen(o
->ptr
);
1708 if(nwritten
>= objlen
- offset
) {
1709 listDelNode(c
->reply
, listFirst(c
->reply
));
1710 nwritten
-= objlen
- offset
;
1714 c
->sentlen
+= nwritten
;
1722 c
->lastinteraction
= time(NULL
);
1724 if (listLength(c
->reply
) == 0) {
1726 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1730 static struct redisCommand
*lookupCommand(char *name
) {
1732 while(cmdTable
[j
].name
!= NULL
) {
1733 if (!strcasecmp(name
,cmdTable
[j
].name
)) return &cmdTable
[j
];
1739 /* resetClient prepare the client to process the next command */
1740 static void resetClient(redisClient
*c
) {
1746 /* Call() is the core of Redis execution of a command */
1747 static void call(redisClient
*c
, struct redisCommand
*cmd
) {
1750 dirty
= server
.dirty
;
1752 if (server
.appendonly
&& server
.dirty
-dirty
)
1753 feedAppendOnlyFile(cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1754 if (server
.dirty
-dirty
&& listLength(server
.slaves
))
1755 replicationFeedSlaves(server
.slaves
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1756 if (listLength(server
.monitors
))
1757 replicationFeedSlaves(server
.monitors
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1758 server
.stat_numcommands
++;
1761 /* If this function gets called we already read a whole
1762 * command, argments are in the client argv/argc fields.
1763 * processCommand() execute the command or prepare the
1764 * server for a bulk read from the client.
1766 * If 1 is returned the client is still alive and valid and
1767 * and other operations can be performed by the caller. Otherwise
1768 * if 0 is returned the client was destroied (i.e. after QUIT). */
1769 static int processCommand(redisClient
*c
) {
1770 struct redisCommand
*cmd
;
1772 /* Free some memory if needed (maxmemory setting) */
1773 if (server
.maxmemory
) freeMemoryIfNeeded();
1775 /* Handle the multi bulk command type. This is an alternative protocol
1776 * supported by Redis in order to receive commands that are composed of
1777 * multiple binary-safe "bulk" arguments. The latency of processing is
1778 * a bit higher but this allows things like multi-sets, so if this
1779 * protocol is used only for MSET and similar commands this is a big win. */
1780 if (c
->multibulk
== 0 && c
->argc
== 1 && ((char*)(c
->argv
[0]->ptr
))[0] == '*') {
1781 c
->multibulk
= atoi(((char*)c
->argv
[0]->ptr
)+1);
1782 if (c
->multibulk
<= 0) {
1786 decrRefCount(c
->argv
[c
->argc
-1]);
1790 } else if (c
->multibulk
) {
1791 if (c
->bulklen
== -1) {
1792 if (((char*)c
->argv
[0]->ptr
)[0] != '$') {
1793 addReplySds(c
,sdsnew("-ERR multi bulk protocol error\r\n"));
1797 int bulklen
= atoi(((char*)c
->argv
[0]->ptr
)+1);
1798 decrRefCount(c
->argv
[0]);
1799 if (bulklen
< 0 || bulklen
> 1024*1024*1024) {
1801 addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n"));
1806 c
->bulklen
= bulklen
+2; /* add two bytes for CR+LF */
1810 c
->mbargv
= zrealloc(c
->mbargv
,(sizeof(robj
*))*(c
->mbargc
+1));
1811 c
->mbargv
[c
->mbargc
] = c
->argv
[0];
1815 if (c
->multibulk
== 0) {
1819 /* Here we need to swap the multi-bulk argc/argv with the
1820 * normal argc/argv of the client structure. */
1822 c
->argv
= c
->mbargv
;
1823 c
->mbargv
= auxargv
;
1826 c
->argc
= c
->mbargc
;
1827 c
->mbargc
= auxargc
;
1829 /* We need to set bulklen to something different than -1
1830 * in order for the code below to process the command without
1831 * to try to read the last argument of a bulk command as
1832 * a special argument. */
1834 /* continue below and process the command */
1841 /* -- end of multi bulk commands processing -- */
1843 /* The QUIT command is handled as a special case. Normal command
1844 * procs are unable to close the client connection safely */
1845 if (!strcasecmp(c
->argv
[0]->ptr
,"quit")) {
1849 cmd
= lookupCommand(c
->argv
[0]->ptr
);
1852 sdscatprintf(sdsempty(), "-ERR unknown command '%s'\r\n",
1853 (char*)c
->argv
[0]->ptr
));
1856 } else if ((cmd
->arity
> 0 && cmd
->arity
!= c
->argc
) ||
1857 (c
->argc
< -cmd
->arity
)) {
1859 sdscatprintf(sdsempty(),
1860 "-ERR wrong number of arguments for '%s' command\r\n",
1864 } else if (server
.maxmemory
&& cmd
->flags
& REDIS_CMD_DENYOOM
&& zmalloc_used_memory() > server
.maxmemory
) {
1865 addReplySds(c
,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1868 } else if (cmd
->flags
& REDIS_CMD_BULK
&& c
->bulklen
== -1) {
1869 int bulklen
= atoi(c
->argv
[c
->argc
-1]->ptr
);
1871 decrRefCount(c
->argv
[c
->argc
-1]);
1872 if (bulklen
< 0 || bulklen
> 1024*1024*1024) {
1874 addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n"));
1879 c
->bulklen
= bulklen
+2; /* add two bytes for CR+LF */
1880 /* It is possible that the bulk read is already in the
1881 * buffer. Check this condition and handle it accordingly.
1882 * This is just a fast path, alternative to call processInputBuffer().
1883 * It's a good idea since the code is small and this condition
1884 * happens most of the times. */
1885 if ((signed)sdslen(c
->querybuf
) >= c
->bulklen
) {
1886 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
1888 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
1893 /* Let's try to share objects on the command arguments vector */
1894 if (server
.shareobjects
) {
1896 for(j
= 1; j
< c
->argc
; j
++)
1897 c
->argv
[j
] = tryObjectSharing(c
->argv
[j
]);
1899 /* Let's try to encode the bulk object to save space. */
1900 if (cmd
->flags
& REDIS_CMD_BULK
)
1901 tryObjectEncoding(c
->argv
[c
->argc
-1]);
1903 /* Check if the user is authenticated */
1904 if (server
.requirepass
&& !c
->authenticated
&& cmd
->proc
!= authCommand
) {
1905 addReplySds(c
,sdsnew("-ERR operation not permitted\r\n"));
1910 /* Exec the command */
1911 if (c
->flags
& REDIS_MULTI
&& cmd
->proc
!= execCommand
) {
1912 queueMultiCommand(c
,cmd
);
1913 addReply(c
,shared
.queued
);
1918 /* Prepare the client for the next command */
1919 if (c
->flags
& REDIS_CLOSE
) {
1927 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
) {
1931 /* (args*2)+1 is enough room for args, spaces, newlines */
1932 robj
*static_outv
[REDIS_STATIC_ARGS
*2+1];
1934 if (argc
<= REDIS_STATIC_ARGS
) {
1937 outv
= zmalloc(sizeof(robj
*)*(argc
*2+1));
1940 for (j
= 0; j
< argc
; j
++) {
1941 if (j
!= 0) outv
[outc
++] = shared
.space
;
1942 if ((cmd
->flags
& REDIS_CMD_BULK
) && j
== argc
-1) {
1945 lenobj
= createObject(REDIS_STRING
,
1946 sdscatprintf(sdsempty(),"%lu\r\n",
1947 (unsigned long) stringObjectLen(argv
[j
])));
1948 lenobj
->refcount
= 0;
1949 outv
[outc
++] = lenobj
;
1951 outv
[outc
++] = argv
[j
];
1953 outv
[outc
++] = shared
.crlf
;
1955 /* Increment all the refcounts at start and decrement at end in order to
1956 * be sure to free objects if there is no slave in a replication state
1957 * able to be feed with commands */
1958 for (j
= 0; j
< outc
; j
++) incrRefCount(outv
[j
]);
1960 while((ln
= listYield(slaves
))) {
1961 redisClient
*slave
= ln
->value
;
1963 /* Don't feed slaves that are still waiting for BGSAVE to start */
1964 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) continue;
1966 /* Feed all the other slaves, MONITORs and so on */
1967 if (slave
->slaveseldb
!= dictid
) {
1971 case 0: selectcmd
= shared
.select0
; break;
1972 case 1: selectcmd
= shared
.select1
; break;
1973 case 2: selectcmd
= shared
.select2
; break;
1974 case 3: selectcmd
= shared
.select3
; break;
1975 case 4: selectcmd
= shared
.select4
; break;
1976 case 5: selectcmd
= shared
.select5
; break;
1977 case 6: selectcmd
= shared
.select6
; break;
1978 case 7: selectcmd
= shared
.select7
; break;
1979 case 8: selectcmd
= shared
.select8
; break;
1980 case 9: selectcmd
= shared
.select9
; break;
1982 selectcmd
= createObject(REDIS_STRING
,
1983 sdscatprintf(sdsempty(),"select %d\r\n",dictid
));
1984 selectcmd
->refcount
= 0;
1987 addReply(slave
,selectcmd
);
1988 slave
->slaveseldb
= dictid
;
1990 for (j
= 0; j
< outc
; j
++) addReply(slave
,outv
[j
]);
1992 for (j
= 0; j
< outc
; j
++) decrRefCount(outv
[j
]);
1993 if (outv
!= static_outv
) zfree(outv
);
1996 static void processInputBuffer(redisClient
*c
) {
1998 /* Before to process the input buffer, make sure the client is not
1999 * waitig for a blocking operation such as BLPOP. Note that the first
2000 * iteration the client is never blocked, otherwise the processInputBuffer
2001 * would not be called at all, but after the execution of the first commands
2002 * in the input buffer the client may be blocked, and the "goto again"
2003 * will try to reiterate. The following line will make it return asap. */
2004 if (c
->flags
& REDIS_BLOCKED
) return;
2005 if (c
->bulklen
== -1) {
2006 /* Read the first line of the query */
2007 char *p
= strchr(c
->querybuf
,'\n');
2014 query
= c
->querybuf
;
2015 c
->querybuf
= sdsempty();
2016 querylen
= 1+(p
-(query
));
2017 if (sdslen(query
) > querylen
) {
2018 /* leave data after the first line of the query in the buffer */
2019 c
->querybuf
= sdscatlen(c
->querybuf
,query
+querylen
,sdslen(query
)-querylen
);
2021 *p
= '\0'; /* remove "\n" */
2022 if (*(p
-1) == '\r') *(p
-1) = '\0'; /* and "\r" if any */
2023 sdsupdatelen(query
);
2025 /* Now we can split the query in arguments */
2026 argv
= sdssplitlen(query
,sdslen(query
)," ",1,&argc
);
2029 if (c
->argv
) zfree(c
->argv
);
2030 c
->argv
= zmalloc(sizeof(robj
*)*argc
);
2032 for (j
= 0; j
< argc
; j
++) {
2033 if (sdslen(argv
[j
])) {
2034 c
->argv
[c
->argc
] = createObject(REDIS_STRING
,argv
[j
]);
2042 /* Execute the command. If the client is still valid
2043 * after processCommand() return and there is something
2044 * on the query buffer try to process the next command. */
2045 if (processCommand(c
) && sdslen(c
->querybuf
)) goto again
;
2047 /* Nothing to process, argc == 0. Just process the query
2048 * buffer if it's not empty or return to the caller */
2049 if (sdslen(c
->querybuf
)) goto again
;
2052 } else if (sdslen(c
->querybuf
) >= REDIS_REQUEST_MAX_SIZE
) {
2053 redisLog(REDIS_DEBUG
, "Client protocol error");
2058 /* Bulk read handling. Note that if we are at this point
2059 the client already sent a command terminated with a newline,
2060 we are reading the bulk data that is actually the last
2061 argument of the command. */
2062 int qbl
= sdslen(c
->querybuf
);
2064 if (c
->bulklen
<= qbl
) {
2065 /* Copy everything but the final CRLF as final argument */
2066 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
2068 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
2069 /* Process the command. If the client is still valid after
2070 * the processing and there is more data in the buffer
2071 * try to parse it. */
2072 if (processCommand(c
) && sdslen(c
->querybuf
)) goto again
;
2078 static void readQueryFromClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
2079 redisClient
*c
= (redisClient
*) privdata
;
2080 char buf
[REDIS_IOBUF_LEN
];
2083 REDIS_NOTUSED(mask
);
2085 nread
= read(fd
, buf
, REDIS_IOBUF_LEN
);
2087 if (errno
== EAGAIN
) {
2090 redisLog(REDIS_DEBUG
, "Reading from client: %s",strerror(errno
));
2094 } else if (nread
== 0) {
2095 redisLog(REDIS_DEBUG
, "Client closed connection");
2100 c
->querybuf
= sdscatlen(c
->querybuf
, buf
, nread
);
2101 c
->lastinteraction
= time(NULL
);
2105 processInputBuffer(c
);
2108 static int selectDb(redisClient
*c
, int id
) {
2109 if (id
< 0 || id
>= server
.dbnum
)
2111 c
->db
= &server
.db
[id
];
2115 static void *dupClientReplyValue(void *o
) {
2116 incrRefCount((robj
*)o
);
2120 static redisClient
*createClient(int fd
) {
2121 redisClient
*c
= zmalloc(sizeof(*c
));
2123 anetNonBlock(NULL
,fd
);
2124 anetTcpNoDelay(NULL
,fd
);
2125 if (!c
) return NULL
;
2128 c
->querybuf
= sdsempty();
2137 c
->lastinteraction
= time(NULL
);
2138 c
->authenticated
= 0;
2139 c
->replstate
= REDIS_REPL_NONE
;
2140 c
->reply
= listCreate();
2141 c
->blockingkeys
= NULL
;
2142 c
->blockingkeysnum
= 0;
2143 listSetFreeMethod(c
->reply
,decrRefCount
);
2144 listSetDupMethod(c
->reply
,dupClientReplyValue
);
2145 if (aeCreateFileEvent(server
.el
, c
->fd
, AE_READABLE
,
2146 readQueryFromClient
, c
) == AE_ERR
) {
2150 listAddNodeTail(server
.clients
,c
);
2151 initClientMultiState(c
);
2155 static void addReply(redisClient
*c
, robj
*obj
) {
2156 if (listLength(c
->reply
) == 0 &&
2157 (c
->replstate
== REDIS_REPL_NONE
||
2158 c
->replstate
== REDIS_REPL_ONLINE
) &&
2159 aeCreateFileEvent(server
.el
, c
->fd
, AE_WRITABLE
,
2160 sendReplyToClient
, c
) == AE_ERR
) return;
2161 listAddNodeTail(c
->reply
,getDecodedObject(obj
));
2164 static void addReplySds(redisClient
*c
, sds s
) {
2165 robj
*o
= createObject(REDIS_STRING
,s
);
2170 static void addReplyDouble(redisClient
*c
, double d
) {
2173 snprintf(buf
,sizeof(buf
),"%.17g",d
);
2174 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n%s\r\n",
2175 (unsigned long) strlen(buf
),buf
));
2178 static void addReplyBulkLen(redisClient
*c
, robj
*obj
) {
2181 if (obj
->encoding
== REDIS_ENCODING_RAW
) {
2182 len
= sdslen(obj
->ptr
);
2184 long n
= (long)obj
->ptr
;
2186 /* Compute how many bytes will take this integer as a radix 10 string */
2192 while((n
= n
/10) != 0) {
2196 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n",(unsigned long)len
));
2199 static void acceptHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
2204 REDIS_NOTUSED(mask
);
2205 REDIS_NOTUSED(privdata
);
2207 cfd
= anetAccept(server
.neterr
, fd
, cip
, &cport
);
2208 if (cfd
== AE_ERR
) {
2209 redisLog(REDIS_DEBUG
,"Accepting client connection: %s", server
.neterr
);
2212 redisLog(REDIS_DEBUG
,"Accepted %s:%d", cip
, cport
);
2213 if ((c
= createClient(cfd
)) == NULL
) {
2214 redisLog(REDIS_WARNING
,"Error allocating resoures for the client");
2215 close(cfd
); /* May be already closed, just ingore errors */
2218 /* If maxclient directive is set and this is one client more... close the
2219 * connection. Note that we create the client instead to check before
2220 * for this condition, since now the socket is already set in nonblocking
2221 * mode and we can send an error for free using the Kernel I/O */
2222 if (server
.maxclients
&& listLength(server
.clients
) > server
.maxclients
) {
2223 char *err
= "-ERR max number of clients reached\r\n";
2225 /* That's a best effort error message, don't check write errors */
2226 if (write(c
->fd
,err
,strlen(err
)) == -1) {
2227 /* Nothing to do, Just to avoid the warning... */
2232 server
.stat_numconnections
++;
2235 /* ======================= Redis objects implementation ===================== */
2237 static robj
*createObject(int type
, void *ptr
) {
2240 if (listLength(server
.objfreelist
)) {
2241 listNode
*head
= listFirst(server
.objfreelist
);
2242 o
= listNodeValue(head
);
2243 listDelNode(server
.objfreelist
,head
);
2245 if (server
.vm_enabled
) {
2246 o
= zmalloc(sizeof(*o
));
2248 o
= zmalloc(sizeof(*o
)-sizeof(struct redisObjectVM
));
2252 o
->encoding
= REDIS_ENCODING_RAW
;
2258 static robj
*createStringObject(char *ptr
, size_t len
) {
2259 return createObject(REDIS_STRING
,sdsnewlen(ptr
,len
));
2262 static robj
*createListObject(void) {
2263 list
*l
= listCreate();
2265 listSetFreeMethod(l
,decrRefCount
);
2266 return createObject(REDIS_LIST
,l
);
2269 static robj
*createSetObject(void) {
2270 dict
*d
= dictCreate(&setDictType
,NULL
);
2271 return createObject(REDIS_SET
,d
);
2274 static robj
*createZsetObject(void) {
2275 zset
*zs
= zmalloc(sizeof(*zs
));
2277 zs
->dict
= dictCreate(&zsetDictType
,NULL
);
2278 zs
->zsl
= zslCreate();
2279 return createObject(REDIS_ZSET
,zs
);
2282 static void freeStringObject(robj
*o
) {
2283 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2288 static void freeListObject(robj
*o
) {
2289 listRelease((list
*) o
->ptr
);
2292 static void freeSetObject(robj
*o
) {
2293 dictRelease((dict
*) o
->ptr
);
2296 static void freeZsetObject(robj
*o
) {
2299 dictRelease(zs
->dict
);
2304 static void freeHashObject(robj
*o
) {
2305 dictRelease((dict
*) o
->ptr
);
2308 static void incrRefCount(robj
*o
) {
2310 #ifdef DEBUG_REFCOUNT
2311 if (o
->type
== REDIS_STRING
)
2312 printf("Increment '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
);
2316 static void decrRefCount(void *obj
) {
2319 #ifdef DEBUG_REFCOUNT
2320 if (o
->type
== REDIS_STRING
)
2321 printf("Decrement '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
-1);
2323 if (--(o
->refcount
) == 0) {
2325 case REDIS_STRING
: freeStringObject(o
); break;
2326 case REDIS_LIST
: freeListObject(o
); break;
2327 case REDIS_SET
: freeSetObject(o
); break;
2328 case REDIS_ZSET
: freeZsetObject(o
); break;
2329 case REDIS_HASH
: freeHashObject(o
); break;
2330 default: redisAssert(0 != 0); break;
2332 if (listLength(server
.objfreelist
) > REDIS_OBJFREELIST_MAX
||
2333 !listAddNodeHead(server
.objfreelist
,o
))
2338 static robj
*lookupKey(redisDb
*db
, robj
*key
) {
2339 dictEntry
*de
= dictFind(db
->dict
,key
);
2340 return de
? dictGetEntryVal(de
) : NULL
;
2343 static robj
*lookupKeyRead(redisDb
*db
, robj
*key
) {
2344 expireIfNeeded(db
,key
);
2345 return lookupKey(db
,key
);
2348 static robj
*lookupKeyWrite(redisDb
*db
, robj
*key
) {
2349 deleteIfVolatile(db
,key
);
2350 return lookupKey(db
,key
);
2353 static int deleteKey(redisDb
*db
, robj
*key
) {
2356 /* We need to protect key from destruction: after the first dictDelete()
2357 * it may happen that 'key' is no longer valid if we don't increment
2358 * it's count. This may happen when we get the object reference directly
2359 * from the hash table with dictRandomKey() or dict iterators */
2361 if (dictSize(db
->expires
)) dictDelete(db
->expires
,key
);
2362 retval
= dictDelete(db
->dict
,key
);
2365 return retval
== DICT_OK
;
2368 /* Try to share an object against the shared objects pool */
2369 static robj
*tryObjectSharing(robj
*o
) {
2370 struct dictEntry
*de
;
2373 if (o
== NULL
|| server
.shareobjects
== 0) return o
;
2375 redisAssert(o
->type
== REDIS_STRING
);
2376 de
= dictFind(server
.sharingpool
,o
);
2378 robj
*shared
= dictGetEntryKey(de
);
2380 c
= ((unsigned long) dictGetEntryVal(de
))+1;
2381 dictGetEntryVal(de
) = (void*) c
;
2382 incrRefCount(shared
);
2386 /* Here we are using a stream algorihtm: Every time an object is
2387 * shared we increment its count, everytime there is a miss we
2388 * recrement the counter of a random object. If this object reaches
2389 * zero we remove the object and put the current object instead. */
2390 if (dictSize(server
.sharingpool
) >=
2391 server
.sharingpoolsize
) {
2392 de
= dictGetRandomKey(server
.sharingpool
);
2393 redisAssert(de
!= NULL
);
2394 c
= ((unsigned long) dictGetEntryVal(de
))-1;
2395 dictGetEntryVal(de
) = (void*) c
;
2397 dictDelete(server
.sharingpool
,de
->key
);
2400 c
= 0; /* If the pool is empty we want to add this object */
2405 retval
= dictAdd(server
.sharingpool
,o
,(void*)1);
2406 redisAssert(retval
== DICT_OK
);
2413 /* Check if the nul-terminated string 's' can be represented by a long
2414 * (that is, is a number that fits into long without any other space or
2415 * character before or after the digits).
2417 * If so, the function returns REDIS_OK and *longval is set to the value
2418 * of the number. Otherwise REDIS_ERR is returned */
2419 static int isStringRepresentableAsLong(sds s
, long *longval
) {
2420 char buf
[32], *endptr
;
2424 value
= strtol(s
, &endptr
, 10);
2425 if (endptr
[0] != '\0') return REDIS_ERR
;
2426 slen
= snprintf(buf
,32,"%ld",value
);
2428 /* If the number converted back into a string is not identical
2429 * then it's not possible to encode the string as integer */
2430 if (sdslen(s
) != (unsigned)slen
|| memcmp(buf
,s
,slen
)) return REDIS_ERR
;
2431 if (longval
) *longval
= value
;
2435 /* Try to encode a string object in order to save space */
2436 static int tryObjectEncoding(robj
*o
) {
2440 if (o
->encoding
!= REDIS_ENCODING_RAW
)
2441 return REDIS_ERR
; /* Already encoded */
2443 /* It's not save to encode shared objects: shared objects can be shared
2444 * everywhere in the "object space" of Redis. Encoded objects can only
2445 * appear as "values" (and not, for instance, as keys) */
2446 if (o
->refcount
> 1) return REDIS_ERR
;
2448 /* Currently we try to encode only strings */
2449 redisAssert(o
->type
== REDIS_STRING
);
2451 /* Check if we can represent this string as a long integer */
2452 if (isStringRepresentableAsLong(s
,&value
) == REDIS_ERR
) return REDIS_ERR
;
2454 /* Ok, this object can be encoded */
2455 o
->encoding
= REDIS_ENCODING_INT
;
2457 o
->ptr
= (void*) value
;
2461 /* Get a decoded version of an encoded object (returned as a new object).
2462 * If the object is already raw-encoded just increment the ref count. */
2463 static robj
*getDecodedObject(robj
*o
) {
2466 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2470 if (o
->type
== REDIS_STRING
&& o
->encoding
== REDIS_ENCODING_INT
) {
2473 snprintf(buf
,32,"%ld",(long)o
->ptr
);
2474 dec
= createStringObject(buf
,strlen(buf
));
2477 redisAssert(1 != 1);
2481 /* Compare two string objects via strcmp() or alike.
2482 * Note that the objects may be integer-encoded. In such a case we
2483 * use snprintf() to get a string representation of the numbers on the stack
2484 * and compare the strings, it's much faster than calling getDecodedObject().
2486 * Important note: if objects are not integer encoded, but binary-safe strings,
2487 * sdscmp() from sds.c will apply memcmp() so this function ca be considered
2489 static int compareStringObjects(robj
*a
, robj
*b
) {
2490 redisAssert(a
->type
== REDIS_STRING
&& b
->type
== REDIS_STRING
);
2491 char bufa
[128], bufb
[128], *astr
, *bstr
;
2494 if (a
== b
) return 0;
2495 if (a
->encoding
!= REDIS_ENCODING_RAW
) {
2496 snprintf(bufa
,sizeof(bufa
),"%ld",(long) a
->ptr
);
2502 if (b
->encoding
!= REDIS_ENCODING_RAW
) {
2503 snprintf(bufb
,sizeof(bufb
),"%ld",(long) b
->ptr
);
2509 return bothsds
? sdscmp(astr
,bstr
) : strcmp(astr
,bstr
);
2512 static size_t stringObjectLen(robj
*o
) {
2513 redisAssert(o
->type
== REDIS_STRING
);
2514 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2515 return sdslen(o
->ptr
);
2519 return snprintf(buf
,32,"%ld",(long)o
->ptr
);
2523 /*============================ RDB saving/loading =========================== */
2525 static int rdbSaveType(FILE *fp
, unsigned char type
) {
2526 if (fwrite(&type
,1,1,fp
) == 0) return -1;
2530 static int rdbSaveTime(FILE *fp
, time_t t
) {
2531 int32_t t32
= (int32_t) t
;
2532 if (fwrite(&t32
,4,1,fp
) == 0) return -1;
2536 /* check rdbLoadLen() comments for more info */
2537 static int rdbSaveLen(FILE *fp
, uint32_t len
) {
2538 unsigned char buf
[2];
2541 /* Save a 6 bit len */
2542 buf
[0] = (len
&0xFF)|(REDIS_RDB_6BITLEN
<<6);
2543 if (fwrite(buf
,1,1,fp
) == 0) return -1;
2544 } else if (len
< (1<<14)) {
2545 /* Save a 14 bit len */
2546 buf
[0] = ((len
>>8)&0xFF)|(REDIS_RDB_14BITLEN
<<6);
2548 if (fwrite(buf
,2,1,fp
) == 0) return -1;
2550 /* Save a 32 bit len */
2551 buf
[0] = (REDIS_RDB_32BITLEN
<<6);
2552 if (fwrite(buf
,1,1,fp
) == 0) return -1;
2554 if (fwrite(&len
,4,1,fp
) == 0) return -1;
2559 /* String objects in the form "2391" "-100" without any space and with a
2560 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2561 * encoded as integers to save space */
2562 static int rdbTryIntegerEncoding(sds s
, unsigned char *enc
) {
2564 char *endptr
, buf
[32];
2566 /* Check if it's possible to encode this value as a number */
2567 value
= strtoll(s
, &endptr
, 10);
2568 if (endptr
[0] != '\0') return 0;
2569 snprintf(buf
,32,"%lld",value
);
2571 /* If the number converted back into a string is not identical
2572 * then it's not possible to encode the string as integer */
2573 if (strlen(buf
) != sdslen(s
) || memcmp(buf
,s
,sdslen(s
))) return 0;
2575 /* Finally check if it fits in our ranges */
2576 if (value
>= -(1<<7) && value
<= (1<<7)-1) {
2577 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT8
;
2578 enc
[1] = value
&0xFF;
2580 } else if (value
>= -(1<<15) && value
<= (1<<15)-1) {
2581 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT16
;
2582 enc
[1] = value
&0xFF;
2583 enc
[2] = (value
>>8)&0xFF;
2585 } else if (value
>= -((long long)1<<31) && value
<= ((long long)1<<31)-1) {
2586 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT32
;
2587 enc
[1] = value
&0xFF;
2588 enc
[2] = (value
>>8)&0xFF;
2589 enc
[3] = (value
>>16)&0xFF;
2590 enc
[4] = (value
>>24)&0xFF;
2597 static int rdbSaveLzfStringObject(FILE *fp
, robj
*obj
) {
2598 unsigned int comprlen
, outlen
;
2602 /* We require at least four bytes compression for this to be worth it */
2603 outlen
= sdslen(obj
->ptr
)-4;
2604 if (outlen
<= 0) return 0;
2605 if ((out
= zmalloc(outlen
+1)) == NULL
) return 0;
2606 comprlen
= lzf_compress(obj
->ptr
, sdslen(obj
->ptr
), out
, outlen
);
2607 if (comprlen
== 0) {
2611 /* Data compressed! Let's save it on disk */
2612 byte
= (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_LZF
;
2613 if (fwrite(&byte
,1,1,fp
) == 0) goto writeerr
;
2614 if (rdbSaveLen(fp
,comprlen
) == -1) goto writeerr
;
2615 if (rdbSaveLen(fp
,sdslen(obj
->ptr
)) == -1) goto writeerr
;
2616 if (fwrite(out
,comprlen
,1,fp
) == 0) goto writeerr
;
2625 /* Save a string objet as [len][data] on disk. If the object is a string
2626 * representation of an integer value we try to safe it in a special form */
2627 static int rdbSaveStringObjectRaw(FILE *fp
, robj
*obj
) {
2631 len
= sdslen(obj
->ptr
);
2633 /* Try integer encoding */
2635 unsigned char buf
[5];
2636 if ((enclen
= rdbTryIntegerEncoding(obj
->ptr
,buf
)) > 0) {
2637 if (fwrite(buf
,enclen
,1,fp
) == 0) return -1;
2642 /* Try LZF compression - under 20 bytes it's unable to compress even
2643 * aaaaaaaaaaaaaaaaaa so skip it */
2644 if (server
.rdbcompression
&& len
> 20) {
2647 retval
= rdbSaveLzfStringObject(fp
,obj
);
2648 if (retval
== -1) return -1;
2649 if (retval
> 0) return 0;
2650 /* retval == 0 means data can't be compressed, save the old way */
2653 /* Store verbatim */
2654 if (rdbSaveLen(fp
,len
) == -1) return -1;
2655 if (len
&& fwrite(obj
->ptr
,len
,1,fp
) == 0) return -1;
2659 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2660 static int rdbSaveStringObject(FILE *fp
, robj
*obj
) {
2663 obj
= getDecodedObject(obj
);
2664 retval
= rdbSaveStringObjectRaw(fp
,obj
);
2669 /* Save a double value. Doubles are saved as strings prefixed by an unsigned
2670 * 8 bit integer specifing the length of the representation.
2671 * This 8 bit integer has special values in order to specify the following
2677 static int rdbSaveDoubleValue(FILE *fp
, double val
) {
2678 unsigned char buf
[128];
2684 } else if (!isfinite(val
)) {
2686 buf
[0] = (val
< 0) ? 255 : 254;
2688 snprintf((char*)buf
+1,sizeof(buf
)-1,"%.17g",val
);
2689 buf
[0] = strlen((char*)buf
+1);
2692 if (fwrite(buf
,len
,1,fp
) == 0) return -1;
2696 /* Save a Redis object. */
2697 static int rdbSaveObject(FILE *fp
, robj
*o
) {
2698 if (o
->type
== REDIS_STRING
) {
2699 /* Save a string value */
2700 if (rdbSaveStringObject(fp
,o
) == -1) return -1;
2701 } else if (o
->type
== REDIS_LIST
) {
2702 /* Save a list value */
2703 list
*list
= o
->ptr
;
2707 if (rdbSaveLen(fp
,listLength(list
)) == -1) return -1;
2708 while((ln
= listYield(list
))) {
2709 robj
*eleobj
= listNodeValue(ln
);
2711 if (rdbSaveStringObject(fp
,eleobj
) == -1) return -1;
2713 } else if (o
->type
== REDIS_SET
) {
2714 /* Save a set value */
2716 dictIterator
*di
= dictGetIterator(set
);
2719 if (rdbSaveLen(fp
,dictSize(set
)) == -1) return -1;
2720 while((de
= dictNext(di
)) != NULL
) {
2721 robj
*eleobj
= dictGetEntryKey(de
);
2723 if (rdbSaveStringObject(fp
,eleobj
) == -1) return -1;
2725 dictReleaseIterator(di
);
2726 } else if (o
->type
== REDIS_ZSET
) {
2727 /* Save a set value */
2729 dictIterator
*di
= dictGetIterator(zs
->dict
);
2732 if (rdbSaveLen(fp
,dictSize(zs
->dict
)) == -1) return -1;
2733 while((de
= dictNext(di
)) != NULL
) {
2734 robj
*eleobj
= dictGetEntryKey(de
);
2735 double *score
= dictGetEntryVal(de
);
2737 if (rdbSaveStringObject(fp
,eleobj
) == -1) return -1;
2738 if (rdbSaveDoubleValue(fp
,*score
) == -1) return -1;
2740 dictReleaseIterator(di
);
2742 redisAssert(0 != 0);
2747 /* Return the length the object will have on disk if saved with
2748 * the rdbSaveObject() function. Currently we use a trick to get
2749 * this length with very little changes to the code. In the future
2750 * we could switch to a faster solution. */
2751 static off_t
rdbSavedObjectLen(robj
*o
) {
2752 static FILE *fp
= NULL
;
2754 if (fp
== NULL
) fp
= fopen("/dev/null","w");
2758 assert(rdbSaveObject(fp
,o
) != 1);
2762 /* Return the number of pages required to save this object in the swap file */
2763 static off_t
rdbSavedObjectPages(robj
*o
) {
2764 off_t bytes
= rdbSavedObjectLen(o
);
2766 return (bytes
+(server
.vm_page_size
-1))/server
.vm_page_size
;
2769 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2770 static int rdbSave(char *filename
) {
2771 dictIterator
*di
= NULL
;
2776 time_t now
= time(NULL
);
2778 snprintf(tmpfile
,256,"temp-%d.rdb", (int) getpid());
2779 fp
= fopen(tmpfile
,"w");
2781 redisLog(REDIS_WARNING
, "Failed saving the DB: %s", strerror(errno
));
2784 if (fwrite("REDIS0001",9,1,fp
) == 0) goto werr
;
2785 for (j
= 0; j
< server
.dbnum
; j
++) {
2786 redisDb
*db
= server
.db
+j
;
2788 if (dictSize(d
) == 0) continue;
2789 di
= dictGetIterator(d
);
2795 /* Write the SELECT DB opcode */
2796 if (rdbSaveType(fp
,REDIS_SELECTDB
) == -1) goto werr
;
2797 if (rdbSaveLen(fp
,j
) == -1) goto werr
;
2799 /* Iterate this DB writing every entry */
2800 while((de
= dictNext(di
)) != NULL
) {
2801 robj
*key
= dictGetEntryKey(de
);
2802 robj
*o
= dictGetEntryVal(de
);
2803 time_t expiretime
= getExpire(db
,key
);
2805 /* Save the expire time */
2806 if (expiretime
!= -1) {
2807 /* If this key is already expired skip it */
2808 if (expiretime
< now
) continue;
2809 if (rdbSaveType(fp
,REDIS_EXPIRETIME
) == -1) goto werr
;
2810 if (rdbSaveTime(fp
,expiretime
) == -1) goto werr
;
2812 /* Save the key and associated value */
2813 if (rdbSaveType(fp
,o
->type
) == -1) goto werr
;
2814 if (rdbSaveStringObject(fp
,key
) == -1) goto werr
;
2815 /* Save the actual value */
2816 if (rdbSaveObject(fp
,o
) == -1) goto werr
;
2818 dictReleaseIterator(di
);
2821 if (rdbSaveType(fp
,REDIS_EOF
) == -1) goto werr
;
2823 /* Make sure data will not remain on the OS's output buffers */
2828 /* Use RENAME to make sure the DB file is changed atomically only
2829 * if the generate DB file is ok. */
2830 if (rename(tmpfile
,filename
) == -1) {
2831 redisLog(REDIS_WARNING
,"Error moving temp DB file on the final destination: %s", strerror(errno
));
2835 redisLog(REDIS_NOTICE
,"DB saved on disk");
2837 server
.lastsave
= time(NULL
);
2843 redisLog(REDIS_WARNING
,"Write error saving DB on disk: %s", strerror(errno
));
2844 if (di
) dictReleaseIterator(di
);
2848 static int rdbSaveBackground(char *filename
) {
2851 if (server
.bgsavechildpid
!= -1) return REDIS_ERR
;
2852 if ((childpid
= fork()) == 0) {
2855 if (rdbSave(filename
) == REDIS_OK
) {
2862 if (childpid
== -1) {
2863 redisLog(REDIS_WARNING
,"Can't save in background: fork: %s",
2867 redisLog(REDIS_NOTICE
,"Background saving started by pid %d",childpid
);
2868 server
.bgsavechildpid
= childpid
;
2871 return REDIS_OK
; /* unreached */
2874 static void rdbRemoveTempFile(pid_t childpid
) {
2877 snprintf(tmpfile
,256,"temp-%d.rdb", (int) childpid
);
2881 static int rdbLoadType(FILE *fp
) {
2883 if (fread(&type
,1,1,fp
) == 0) return -1;
2887 static time_t rdbLoadTime(FILE *fp
) {
2889 if (fread(&t32
,4,1,fp
) == 0) return -1;
2890 return (time_t) t32
;
2893 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2894 * of this file for a description of how this are stored on disk.
2896 * isencoded is set to 1 if the readed length is not actually a length but
2897 * an "encoding type", check the above comments for more info */
2898 static uint32_t rdbLoadLen(FILE *fp
, int rdbver
, int *isencoded
) {
2899 unsigned char buf
[2];
2902 if (isencoded
) *isencoded
= 0;
2904 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
2909 if (fread(buf
,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2910 type
= (buf
[0]&0xC0)>>6;
2911 if (type
== REDIS_RDB_6BITLEN
) {
2912 /* Read a 6 bit len */
2914 } else if (type
== REDIS_RDB_ENCVAL
) {
2915 /* Read a 6 bit len encoding type */
2916 if (isencoded
) *isencoded
= 1;
2918 } else if (type
== REDIS_RDB_14BITLEN
) {
2919 /* Read a 14 bit len */
2920 if (fread(buf
+1,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2921 return ((buf
[0]&0x3F)<<8)|buf
[1];
2923 /* Read a 32 bit len */
2924 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
2930 static robj
*rdbLoadIntegerObject(FILE *fp
, int enctype
) {
2931 unsigned char enc
[4];
2934 if (enctype
== REDIS_RDB_ENC_INT8
) {
2935 if (fread(enc
,1,1,fp
) == 0) return NULL
;
2936 val
= (signed char)enc
[0];
2937 } else if (enctype
== REDIS_RDB_ENC_INT16
) {
2939 if (fread(enc
,2,1,fp
) == 0) return NULL
;
2940 v
= enc
[0]|(enc
[1]<<8);
2942 } else if (enctype
== REDIS_RDB_ENC_INT32
) {
2944 if (fread(enc
,4,1,fp
) == 0) return NULL
;
2945 v
= enc
[0]|(enc
[1]<<8)|(enc
[2]<<16)|(enc
[3]<<24);
2948 val
= 0; /* anti-warning */
2951 return createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",val
));
2954 static robj
*rdbLoadLzfStringObject(FILE*fp
, int rdbver
) {
2955 unsigned int len
, clen
;
2956 unsigned char *c
= NULL
;
2959 if ((clen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2960 if ((len
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2961 if ((c
= zmalloc(clen
)) == NULL
) goto err
;
2962 if ((val
= sdsnewlen(NULL
,len
)) == NULL
) goto err
;
2963 if (fread(c
,clen
,1,fp
) == 0) goto err
;
2964 if (lzf_decompress(c
,clen
,val
,len
) == 0) goto err
;
2966 return createObject(REDIS_STRING
,val
);
2973 static robj
*rdbLoadStringObject(FILE*fp
, int rdbver
) {
2978 len
= rdbLoadLen(fp
,rdbver
,&isencoded
);
2981 case REDIS_RDB_ENC_INT8
:
2982 case REDIS_RDB_ENC_INT16
:
2983 case REDIS_RDB_ENC_INT32
:
2984 return tryObjectSharing(rdbLoadIntegerObject(fp
,len
));
2985 case REDIS_RDB_ENC_LZF
:
2986 return tryObjectSharing(rdbLoadLzfStringObject(fp
,rdbver
));
2992 if (len
== REDIS_RDB_LENERR
) return NULL
;
2993 val
= sdsnewlen(NULL
,len
);
2994 if (len
&& fread(val
,len
,1,fp
) == 0) {
2998 return tryObjectSharing(createObject(REDIS_STRING
,val
));
3001 /* For information about double serialization check rdbSaveDoubleValue() */
3002 static int rdbLoadDoubleValue(FILE *fp
, double *val
) {
3006 if (fread(&len
,1,1,fp
) == 0) return -1;
3008 case 255: *val
= R_NegInf
; return 0;
3009 case 254: *val
= R_PosInf
; return 0;
3010 case 253: *val
= R_Nan
; return 0;
3012 if (fread(buf
,len
,1,fp
) == 0) return -1;
3014 sscanf(buf
, "%lg", val
);
3019 static int rdbLoad(char *filename
) {
3021 robj
*keyobj
= NULL
;
3023 int type
, retval
, rdbver
;
3024 dict
*d
= server
.db
[0].dict
;
3025 redisDb
*db
= server
.db
+0;
3027 time_t expiretime
= -1, now
= time(NULL
);
3029 fp
= fopen(filename
,"r");
3030 if (!fp
) return REDIS_ERR
;
3031 if (fread(buf
,9,1,fp
) == 0) goto eoferr
;
3033 if (memcmp(buf
,"REDIS",5) != 0) {
3035 redisLog(REDIS_WARNING
,"Wrong signature trying to load DB from file");
3038 rdbver
= atoi(buf
+5);
3041 redisLog(REDIS_WARNING
,"Can't handle RDB format version %d",rdbver
);
3048 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
3049 if (type
== REDIS_EXPIRETIME
) {
3050 if ((expiretime
= rdbLoadTime(fp
)) == -1) goto eoferr
;
3051 /* We read the time so we need to read the object type again */
3052 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
3054 if (type
== REDIS_EOF
) break;
3055 /* Handle SELECT DB opcode as a special case */
3056 if (type
== REDIS_SELECTDB
) {
3057 if ((dbid
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
3059 if (dbid
>= (unsigned)server
.dbnum
) {
3060 redisLog(REDIS_WARNING
,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server
.dbnum
);
3063 db
= server
.db
+dbid
;
3068 if ((keyobj
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
3070 if (type
== REDIS_STRING
) {
3071 /* Read string value */
3072 if ((o
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
3073 tryObjectEncoding(o
);
3074 } else if (type
== REDIS_LIST
|| type
== REDIS_SET
) {
3075 /* Read list/set value */
3078 if ((listlen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
3080 o
= (type
== REDIS_LIST
) ? createListObject() : createSetObject();
3081 /* Load every single element of the list/set */
3085 if ((ele
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
3086 tryObjectEncoding(ele
);
3087 if (type
== REDIS_LIST
) {
3088 listAddNodeTail((list
*)o
->ptr
,ele
);
3090 dictAdd((dict
*)o
->ptr
,ele
,NULL
);
3093 } else if (type
== REDIS_ZSET
) {
3094 /* Read list/set value */
3098 if ((zsetlen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
3100 o
= createZsetObject();
3102 /* Load every single element of the list/set */
3105 double *score
= zmalloc(sizeof(double));
3107 if ((ele
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
3108 tryObjectEncoding(ele
);
3109 if (rdbLoadDoubleValue(fp
,score
) == -1) goto eoferr
;
3110 dictAdd(zs
->dict
,ele
,score
);
3111 zslInsert(zs
->zsl
,*score
,ele
);
3112 incrRefCount(ele
); /* added to skiplist */
3115 redisAssert(0 != 0);
3117 /* Add the new object in the hash table */
3118 retval
= dictAdd(d
,keyobj
,o
);
3119 if (retval
== DICT_ERR
) {
3120 redisLog(REDIS_WARNING
,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj
->ptr
);
3123 /* Set the expire time if needed */
3124 if (expiretime
!= -1) {
3125 setExpire(db
,keyobj
,expiretime
);
3126 /* Delete this key if already expired */
3127 if (expiretime
< now
) deleteKey(db
,keyobj
);
3135 eoferr
: /* unexpected end of file is handled here with a fatal exit */
3136 if (keyobj
) decrRefCount(keyobj
);
3137 redisLog(REDIS_WARNING
,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
3139 return REDIS_ERR
; /* Just to avoid warning */
3142 /*================================== Commands =============================== */
3144 static void authCommand(redisClient
*c
) {
3145 if (!server
.requirepass
|| !strcmp(c
->argv
[1]->ptr
, server
.requirepass
)) {
3146 c
->authenticated
= 1;
3147 addReply(c
,shared
.ok
);
3149 c
->authenticated
= 0;
3150 addReplySds(c
,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
3154 static void pingCommand(redisClient
*c
) {
3155 addReply(c
,shared
.pong
);
3158 static void echoCommand(redisClient
*c
) {
3159 addReplyBulkLen(c
,c
->argv
[1]);
3160 addReply(c
,c
->argv
[1]);
3161 addReply(c
,shared
.crlf
);
3164 /*=================================== Strings =============================== */
3166 static void setGenericCommand(redisClient
*c
, int nx
) {
3169 if (nx
) deleteIfVolatile(c
->db
,c
->argv
[1]);
3170 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
3171 if (retval
== DICT_ERR
) {
3173 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
3174 incrRefCount(c
->argv
[2]);
3176 addReply(c
,shared
.czero
);
3180 incrRefCount(c
->argv
[1]);
3181 incrRefCount(c
->argv
[2]);
3184 removeExpire(c
->db
,c
->argv
[1]);
3185 addReply(c
, nx
? shared
.cone
: shared
.ok
);
3188 static void setCommand(redisClient
*c
) {
3189 setGenericCommand(c
,0);
3192 static void setnxCommand(redisClient
*c
) {
3193 setGenericCommand(c
,1);
3196 static int getGenericCommand(redisClient
*c
) {
3197 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3200 addReply(c
,shared
.nullbulk
);
3203 if (o
->type
!= REDIS_STRING
) {
3204 addReply(c
,shared
.wrongtypeerr
);
3207 addReplyBulkLen(c
,o
);
3209 addReply(c
,shared
.crlf
);
3215 static void getCommand(redisClient
*c
) {
3216 getGenericCommand(c
);
3219 static void getsetCommand(redisClient
*c
) {
3220 if (getGenericCommand(c
) == REDIS_ERR
) return;
3221 if (dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]) == DICT_ERR
) {
3222 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
3224 incrRefCount(c
->argv
[1]);
3226 incrRefCount(c
->argv
[2]);
3228 removeExpire(c
->db
,c
->argv
[1]);
3231 static void mgetCommand(redisClient
*c
) {
3234 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",c
->argc
-1));
3235 for (j
= 1; j
< c
->argc
; j
++) {
3236 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[j
]);
3238 addReply(c
,shared
.nullbulk
);
3240 if (o
->type
!= REDIS_STRING
) {
3241 addReply(c
,shared
.nullbulk
);
3243 addReplyBulkLen(c
,o
);
3245 addReply(c
,shared
.crlf
);
3251 static void msetGenericCommand(redisClient
*c
, int nx
) {
3252 int j
, busykeys
= 0;
3254 if ((c
->argc
% 2) == 0) {
3255 addReplySds(c
,sdsnew("-ERR wrong number of arguments for MSET\r\n"));
3258 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
3259 * set nothing at all if at least one already key exists. */
3261 for (j
= 1; j
< c
->argc
; j
+= 2) {
3262 if (lookupKeyWrite(c
->db
,c
->argv
[j
]) != NULL
) {
3268 addReply(c
, shared
.czero
);
3272 for (j
= 1; j
< c
->argc
; j
+= 2) {
3275 tryObjectEncoding(c
->argv
[j
+1]);
3276 retval
= dictAdd(c
->db
->dict
,c
->argv
[j
],c
->argv
[j
+1]);
3277 if (retval
== DICT_ERR
) {
3278 dictReplace(c
->db
->dict
,c
->argv
[j
],c
->argv
[j
+1]);
3279 incrRefCount(c
->argv
[j
+1]);
3281 incrRefCount(c
->argv
[j
]);
3282 incrRefCount(c
->argv
[j
+1]);
3284 removeExpire(c
->db
,c
->argv
[j
]);
3286 server
.dirty
+= (c
->argc
-1)/2;
3287 addReply(c
, nx
? shared
.cone
: shared
.ok
);
3290 static void msetCommand(redisClient
*c
) {
3291 msetGenericCommand(c
,0);
3294 static void msetnxCommand(redisClient
*c
) {
3295 msetGenericCommand(c
,1);
3298 static void incrDecrCommand(redisClient
*c
, long long incr
) {
3303 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3307 if (o
->type
!= REDIS_STRING
) {
3312 if (o
->encoding
== REDIS_ENCODING_RAW
)
3313 value
= strtoll(o
->ptr
, &eptr
, 10);
3314 else if (o
->encoding
== REDIS_ENCODING_INT
)
3315 value
= (long)o
->ptr
;
3317 redisAssert(1 != 1);
3322 o
= createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",value
));
3323 tryObjectEncoding(o
);
3324 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],o
);
3325 if (retval
== DICT_ERR
) {
3326 dictReplace(c
->db
->dict
,c
->argv
[1],o
);
3327 removeExpire(c
->db
,c
->argv
[1]);
3329 incrRefCount(c
->argv
[1]);
3332 addReply(c
,shared
.colon
);
3334 addReply(c
,shared
.crlf
);
3337 static void incrCommand(redisClient
*c
) {
3338 incrDecrCommand(c
,1);
3341 static void decrCommand(redisClient
*c
) {
3342 incrDecrCommand(c
,-1);
3345 static void incrbyCommand(redisClient
*c
) {
3346 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
3347 incrDecrCommand(c
,incr
);
3350 static void decrbyCommand(redisClient
*c
) {
3351 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
3352 incrDecrCommand(c
,-incr
);
3355 /* ========================= Type agnostic commands ========================= */
3357 static void delCommand(redisClient
*c
) {
3360 for (j
= 1; j
< c
->argc
; j
++) {
3361 if (deleteKey(c
->db
,c
->argv
[j
])) {
3368 addReply(c
,shared
.czero
);
3371 addReply(c
,shared
.cone
);
3374 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",deleted
));
3379 static void existsCommand(redisClient
*c
) {
3380 addReply(c
,lookupKeyRead(c
->db
,c
->argv
[1]) ? shared
.cone
: shared
.czero
);
3383 static void selectCommand(redisClient
*c
) {
3384 int id
= atoi(c
->argv
[1]->ptr
);
3386 if (selectDb(c
,id
) == REDIS_ERR
) {
3387 addReplySds(c
,sdsnew("-ERR invalid DB index\r\n"));
3389 addReply(c
,shared
.ok
);
3393 static void randomkeyCommand(redisClient
*c
) {
3397 de
= dictGetRandomKey(c
->db
->dict
);
3398 if (!de
|| expireIfNeeded(c
->db
,dictGetEntryKey(de
)) == 0) break;
3401 addReply(c
,shared
.plus
);
3402 addReply(c
,shared
.crlf
);
3404 addReply(c
,shared
.plus
);
3405 addReply(c
,dictGetEntryKey(de
));
3406 addReply(c
,shared
.crlf
);
3410 static void keysCommand(redisClient
*c
) {
3413 sds pattern
= c
->argv
[1]->ptr
;
3414 int plen
= sdslen(pattern
);
3415 unsigned long numkeys
= 0, keyslen
= 0;
3416 robj
*lenobj
= createObject(REDIS_STRING
,NULL
);
3418 di
= dictGetIterator(c
->db
->dict
);
3420 decrRefCount(lenobj
);
3421 while((de
= dictNext(di
)) != NULL
) {
3422 robj
*keyobj
= dictGetEntryKey(de
);
3424 sds key
= keyobj
->ptr
;
3425 if ((pattern
[0] == '*' && pattern
[1] == '\0') ||
3426 stringmatchlen(pattern
,plen
,key
,sdslen(key
),0)) {
3427 if (expireIfNeeded(c
->db
,keyobj
) == 0) {
3429 addReply(c
,shared
.space
);
3432 keyslen
+= sdslen(key
);
3436 dictReleaseIterator(di
);
3437 lenobj
->ptr
= sdscatprintf(sdsempty(),"$%lu\r\n",keyslen
+(numkeys
? (numkeys
-1) : 0));
3438 addReply(c
,shared
.crlf
);
3441 static void dbsizeCommand(redisClient
*c
) {
3443 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c
->db
->dict
)));
3446 static void lastsaveCommand(redisClient
*c
) {
3448 sdscatprintf(sdsempty(),":%lu\r\n",server
.lastsave
));
3451 static void typeCommand(redisClient
*c
) {
3455 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3460 case REDIS_STRING
: type
= "+string"; break;
3461 case REDIS_LIST
: type
= "+list"; break;
3462 case REDIS_SET
: type
= "+set"; break;
3463 case REDIS_ZSET
: type
= "+zset"; break;
3464 default: type
= "unknown"; break;
3467 addReplySds(c
,sdsnew(type
));
3468 addReply(c
,shared
.crlf
);
3471 static void saveCommand(redisClient
*c
) {
3472 if (server
.bgsavechildpid
!= -1) {
3473 addReplySds(c
,sdsnew("-ERR background save in progress\r\n"));
3476 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
3477 addReply(c
,shared
.ok
);
3479 addReply(c
,shared
.err
);
3483 static void bgsaveCommand(redisClient
*c
) {
3484 if (server
.bgsavechildpid
!= -1) {
3485 addReplySds(c
,sdsnew("-ERR background save already in progress\r\n"));
3488 if (rdbSaveBackground(server
.dbfilename
) == REDIS_OK
) {
3489 char *status
= "+Background saving started\r\n";
3490 addReplySds(c
,sdsnew(status
));
3492 addReply(c
,shared
.err
);
3496 static void shutdownCommand(redisClient
*c
) {
3497 redisLog(REDIS_WARNING
,"User requested shutdown, saving DB...");
3498 /* Kill the saving child if there is a background saving in progress.
3499 We want to avoid race conditions, for instance our saving child may
3500 overwrite the synchronous saving did by SHUTDOWN. */
3501 if (server
.bgsavechildpid
!= -1) {
3502 redisLog(REDIS_WARNING
,"There is a live saving child. Killing it!");
3503 kill(server
.bgsavechildpid
,SIGKILL
);
3504 rdbRemoveTempFile(server
.bgsavechildpid
);
3506 if (server
.appendonly
) {
3507 /* Append only file: fsync() the AOF and exit */
3508 fsync(server
.appendfd
);
3511 /* Snapshotting. Perform a SYNC SAVE and exit */
3512 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
3513 if (server
.daemonize
)
3514 unlink(server
.pidfile
);
3515 redisLog(REDIS_WARNING
,"%zu bytes used at exit",zmalloc_used_memory());
3516 redisLog(REDIS_WARNING
,"Server exit now, bye bye...");
3519 /* Ooops.. error saving! The best we can do is to continue operating.
3520 * Note that if there was a background saving process, in the next
3521 * cron() Redis will be notified that the background saving aborted,
3522 * handling special stuff like slaves pending for synchronization... */
3523 redisLog(REDIS_WARNING
,"Error trying to save the DB, can't exit");
3524 addReplySds(c
,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3529 static void renameGenericCommand(redisClient
*c
, int nx
) {
3532 /* To use the same key as src and dst is probably an error */
3533 if (sdscmp(c
->argv
[1]->ptr
,c
->argv
[2]->ptr
) == 0) {
3534 addReply(c
,shared
.sameobjecterr
);
3538 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3540 addReply(c
,shared
.nokeyerr
);
3544 deleteIfVolatile(c
->db
,c
->argv
[2]);
3545 if (dictAdd(c
->db
->dict
,c
->argv
[2],o
) == DICT_ERR
) {
3548 addReply(c
,shared
.czero
);
3551 dictReplace(c
->db
->dict
,c
->argv
[2],o
);
3553 incrRefCount(c
->argv
[2]);
3555 deleteKey(c
->db
,c
->argv
[1]);
3557 addReply(c
,nx
? shared
.cone
: shared
.ok
);
3560 static void renameCommand(redisClient
*c
) {
3561 renameGenericCommand(c
,0);
3564 static void renamenxCommand(redisClient
*c
) {
3565 renameGenericCommand(c
,1);
3568 static void moveCommand(redisClient
*c
) {
3573 /* Obtain source and target DB pointers */
3576 if (selectDb(c
,atoi(c
->argv
[2]->ptr
)) == REDIS_ERR
) {
3577 addReply(c
,shared
.outofrangeerr
);
3581 selectDb(c
,srcid
); /* Back to the source DB */
3583 /* If the user is moving using as target the same
3584 * DB as the source DB it is probably an error. */
3586 addReply(c
,shared
.sameobjecterr
);
3590 /* Check if the element exists and get a reference */
3591 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3593 addReply(c
,shared
.czero
);
3597 /* Try to add the element to the target DB */
3598 deleteIfVolatile(dst
,c
->argv
[1]);
3599 if (dictAdd(dst
->dict
,c
->argv
[1],o
) == DICT_ERR
) {
3600 addReply(c
,shared
.czero
);
3603 incrRefCount(c
->argv
[1]);
3606 /* OK! key moved, free the entry in the source DB */
3607 deleteKey(src
,c
->argv
[1]);
3609 addReply(c
,shared
.cone
);
3612 /* =================================== Lists ================================ */
3613 static void pushGenericCommand(redisClient
*c
, int where
) {
3617 lobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3619 if (handleClientsWaitingListPush(c
,c
->argv
[1],c
->argv
[2])) {
3620 addReply(c
,shared
.ok
);
3623 lobj
= createListObject();
3625 if (where
== REDIS_HEAD
) {
3626 listAddNodeHead(list
,c
->argv
[2]);
3628 listAddNodeTail(list
,c
->argv
[2]);
3630 dictAdd(c
->db
->dict
,c
->argv
[1],lobj
);
3631 incrRefCount(c
->argv
[1]);
3632 incrRefCount(c
->argv
[2]);
3634 if (lobj
->type
!= REDIS_LIST
) {
3635 addReply(c
,shared
.wrongtypeerr
);
3638 if (handleClientsWaitingListPush(c
,c
->argv
[1],c
->argv
[2])) {
3639 addReply(c
,shared
.ok
);
3643 if (where
== REDIS_HEAD
) {
3644 listAddNodeHead(list
,c
->argv
[2]);
3646 listAddNodeTail(list
,c
->argv
[2]);
3648 incrRefCount(c
->argv
[2]);
3651 addReply(c
,shared
.ok
);
3654 static void lpushCommand(redisClient
*c
) {
3655 pushGenericCommand(c
,REDIS_HEAD
);
3658 static void rpushCommand(redisClient
*c
) {
3659 pushGenericCommand(c
,REDIS_TAIL
);
3662 static void llenCommand(redisClient
*c
) {
3666 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3668 addReply(c
,shared
.czero
);
3671 if (o
->type
!= REDIS_LIST
) {
3672 addReply(c
,shared
.wrongtypeerr
);
3675 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",listLength(l
)));
3680 static void lindexCommand(redisClient
*c
) {
3682 int index
= atoi(c
->argv
[2]->ptr
);
3684 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3686 addReply(c
,shared
.nullbulk
);
3688 if (o
->type
!= REDIS_LIST
) {
3689 addReply(c
,shared
.wrongtypeerr
);
3691 list
*list
= o
->ptr
;
3694 ln
= listIndex(list
, index
);
3696 addReply(c
,shared
.nullbulk
);
3698 robj
*ele
= listNodeValue(ln
);
3699 addReplyBulkLen(c
,ele
);
3701 addReply(c
,shared
.crlf
);
3707 static void lsetCommand(redisClient
*c
) {
3709 int index
= atoi(c
->argv
[2]->ptr
);
3711 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3713 addReply(c
,shared
.nokeyerr
);
3715 if (o
->type
!= REDIS_LIST
) {
3716 addReply(c
,shared
.wrongtypeerr
);
3718 list
*list
= o
->ptr
;
3721 ln
= listIndex(list
, index
);
3723 addReply(c
,shared
.outofrangeerr
);
3725 robj
*ele
= listNodeValue(ln
);
3728 listNodeValue(ln
) = c
->argv
[3];
3729 incrRefCount(c
->argv
[3]);
3730 addReply(c
,shared
.ok
);
3737 static void popGenericCommand(redisClient
*c
, int where
) {
3740 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3742 addReply(c
,shared
.nullbulk
);
3744 if (o
->type
!= REDIS_LIST
) {
3745 addReply(c
,shared
.wrongtypeerr
);
3747 list
*list
= o
->ptr
;
3750 if (where
== REDIS_HEAD
)
3751 ln
= listFirst(list
);
3753 ln
= listLast(list
);
3756 addReply(c
,shared
.nullbulk
);
3758 robj
*ele
= listNodeValue(ln
);
3759 addReplyBulkLen(c
,ele
);
3761 addReply(c
,shared
.crlf
);
3762 listDelNode(list
,ln
);
3769 static void lpopCommand(redisClient
*c
) {
3770 popGenericCommand(c
,REDIS_HEAD
);
3773 static void rpopCommand(redisClient
*c
) {
3774 popGenericCommand(c
,REDIS_TAIL
);
3777 static void lrangeCommand(redisClient
*c
) {
3779 int start
= atoi(c
->argv
[2]->ptr
);
3780 int end
= atoi(c
->argv
[3]->ptr
);
3782 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3784 addReply(c
,shared
.nullmultibulk
);
3786 if (o
->type
!= REDIS_LIST
) {
3787 addReply(c
,shared
.wrongtypeerr
);
3789 list
*list
= o
->ptr
;
3791 int llen
= listLength(list
);
3795 /* convert negative indexes */
3796 if (start
< 0) start
= llen
+start
;
3797 if (end
< 0) end
= llen
+end
;
3798 if (start
< 0) start
= 0;
3799 if (end
< 0) end
= 0;
3801 /* indexes sanity checks */
3802 if (start
> end
|| start
>= llen
) {
3803 /* Out of range start or start > end result in empty list */
3804 addReply(c
,shared
.emptymultibulk
);
3807 if (end
>= llen
) end
= llen
-1;
3808 rangelen
= (end
-start
)+1;
3810 /* Return the result in form of a multi-bulk reply */
3811 ln
= listIndex(list
, start
);
3812 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",rangelen
));
3813 for (j
= 0; j
< rangelen
; j
++) {
3814 ele
= listNodeValue(ln
);
3815 addReplyBulkLen(c
,ele
);
3817 addReply(c
,shared
.crlf
);
3824 static void ltrimCommand(redisClient
*c
) {
3826 int start
= atoi(c
->argv
[2]->ptr
);
3827 int end
= atoi(c
->argv
[3]->ptr
);
3829 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3831 addReply(c
,shared
.ok
);
3833 if (o
->type
!= REDIS_LIST
) {
3834 addReply(c
,shared
.wrongtypeerr
);
3836 list
*list
= o
->ptr
;
3838 int llen
= listLength(list
);
3839 int j
, ltrim
, rtrim
;
3841 /* convert negative indexes */
3842 if (start
< 0) start
= llen
+start
;
3843 if (end
< 0) end
= llen
+end
;
3844 if (start
< 0) start
= 0;
3845 if (end
< 0) end
= 0;
3847 /* indexes sanity checks */
3848 if (start
> end
|| start
>= llen
) {
3849 /* Out of range start or start > end result in empty list */
3853 if (end
>= llen
) end
= llen
-1;
3858 /* Remove list elements to perform the trim */
3859 for (j
= 0; j
< ltrim
; j
++) {
3860 ln
= listFirst(list
);
3861 listDelNode(list
,ln
);
3863 for (j
= 0; j
< rtrim
; j
++) {
3864 ln
= listLast(list
);
3865 listDelNode(list
,ln
);
3868 addReply(c
,shared
.ok
);
3873 static void lremCommand(redisClient
*c
) {
3876 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3878 addReply(c
,shared
.czero
);
3880 if (o
->type
!= REDIS_LIST
) {
3881 addReply(c
,shared
.wrongtypeerr
);
3883 list
*list
= o
->ptr
;
3884 listNode
*ln
, *next
;
3885 int toremove
= atoi(c
->argv
[2]->ptr
);
3890 toremove
= -toremove
;
3893 ln
= fromtail
? list
->tail
: list
->head
;
3895 robj
*ele
= listNodeValue(ln
);
3897 next
= fromtail
? ln
->prev
: ln
->next
;
3898 if (compareStringObjects(ele
,c
->argv
[3]) == 0) {
3899 listDelNode(list
,ln
);
3902 if (toremove
&& removed
== toremove
) break;
3906 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",removed
));
3911 /* This is the semantic of this command:
3912 * RPOPLPUSH srclist dstlist:
3913 * IF LLEN(srclist) > 0
3914 * element = RPOP srclist
3915 * LPUSH dstlist element
3922 * The idea is to be able to get an element from a list in a reliable way
3923 * since the element is not just returned but pushed against another list
3924 * as well. This command was originally proposed by Ezra Zygmuntowicz.
3926 static void rpoplpushcommand(redisClient
*c
) {
3929 sobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3931 addReply(c
,shared
.nullbulk
);
3933 if (sobj
->type
!= REDIS_LIST
) {
3934 addReply(c
,shared
.wrongtypeerr
);
3936 list
*srclist
= sobj
->ptr
;
3937 listNode
*ln
= listLast(srclist
);
3940 addReply(c
,shared
.nullbulk
);
3942 robj
*dobj
= lookupKeyWrite(c
->db
,c
->argv
[2]);
3943 robj
*ele
= listNodeValue(ln
);
3946 if (dobj
&& dobj
->type
!= REDIS_LIST
) {
3947 addReply(c
,shared
.wrongtypeerr
);
3951 /* Add the element to the target list (unless it's directly
3952 * passed to some BLPOP-ing client */
3953 if (!handleClientsWaitingListPush(c
,c
->argv
[2],ele
)) {
3955 /* Create the list if the key does not exist */
3956 dobj
= createListObject();
3957 dictAdd(c
->db
->dict
,c
->argv
[2],dobj
);
3958 incrRefCount(c
->argv
[2]);
3960 dstlist
= dobj
->ptr
;
3961 listAddNodeHead(dstlist
,ele
);
3965 /* Send the element to the client as reply as well */
3966 addReplyBulkLen(c
,ele
);
3968 addReply(c
,shared
.crlf
);
3970 /* Finally remove the element from the source list */
3971 listDelNode(srclist
,ln
);
3979 /* ==================================== Sets ================================ */
3981 static void saddCommand(redisClient
*c
) {
3984 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3986 set
= createSetObject();
3987 dictAdd(c
->db
->dict
,c
->argv
[1],set
);
3988 incrRefCount(c
->argv
[1]);
3990 if (set
->type
!= REDIS_SET
) {
3991 addReply(c
,shared
.wrongtypeerr
);
3995 if (dictAdd(set
->ptr
,c
->argv
[2],NULL
) == DICT_OK
) {
3996 incrRefCount(c
->argv
[2]);
3998 addReply(c
,shared
.cone
);
4000 addReply(c
,shared
.czero
);
4004 static void sremCommand(redisClient
*c
) {
4007 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4009 addReply(c
,shared
.czero
);
4011 if (set
->type
!= REDIS_SET
) {
4012 addReply(c
,shared
.wrongtypeerr
);
4015 if (dictDelete(set
->ptr
,c
->argv
[2]) == DICT_OK
) {
4017 if (htNeedsResize(set
->ptr
)) dictResize(set
->ptr
);
4018 addReply(c
,shared
.cone
);
4020 addReply(c
,shared
.czero
);
4025 static void smoveCommand(redisClient
*c
) {
4026 robj
*srcset
, *dstset
;
4028 srcset
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4029 dstset
= lookupKeyWrite(c
->db
,c
->argv
[2]);
4031 /* If the source key does not exist return 0, if it's of the wrong type
4033 if (srcset
== NULL
|| srcset
->type
!= REDIS_SET
) {
4034 addReply(c
, srcset
? shared
.wrongtypeerr
: shared
.czero
);
4037 /* Error if the destination key is not a set as well */
4038 if (dstset
&& dstset
->type
!= REDIS_SET
) {
4039 addReply(c
,shared
.wrongtypeerr
);
4042 /* Remove the element from the source set */
4043 if (dictDelete(srcset
->ptr
,c
->argv
[3]) == DICT_ERR
) {
4044 /* Key not found in the src set! return zero */
4045 addReply(c
,shared
.czero
);
4049 /* Add the element to the destination set */
4051 dstset
= createSetObject();
4052 dictAdd(c
->db
->dict
,c
->argv
[2],dstset
);
4053 incrRefCount(c
->argv
[2]);
4055 if (dictAdd(dstset
->ptr
,c
->argv
[3],NULL
) == DICT_OK
)
4056 incrRefCount(c
->argv
[3]);
4057 addReply(c
,shared
.cone
);
4060 static void sismemberCommand(redisClient
*c
) {
4063 set
= lookupKeyRead(c
->db
,c
->argv
[1]);
4065 addReply(c
,shared
.czero
);
4067 if (set
->type
!= REDIS_SET
) {
4068 addReply(c
,shared
.wrongtypeerr
);
4071 if (dictFind(set
->ptr
,c
->argv
[2]))
4072 addReply(c
,shared
.cone
);
4074 addReply(c
,shared
.czero
);
4078 static void scardCommand(redisClient
*c
) {
4082 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4084 addReply(c
,shared
.czero
);
4087 if (o
->type
!= REDIS_SET
) {
4088 addReply(c
,shared
.wrongtypeerr
);
4091 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",
4097 static void spopCommand(redisClient
*c
) {
4101 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4103 addReply(c
,shared
.nullbulk
);
4105 if (set
->type
!= REDIS_SET
) {
4106 addReply(c
,shared
.wrongtypeerr
);
4109 de
= dictGetRandomKey(set
->ptr
);
4111 addReply(c
,shared
.nullbulk
);
4113 robj
*ele
= dictGetEntryKey(de
);
4115 addReplyBulkLen(c
,ele
);
4117 addReply(c
,shared
.crlf
);
4118 dictDelete(set
->ptr
,ele
);
4119 if (htNeedsResize(set
->ptr
)) dictResize(set
->ptr
);
4125 static void srandmemberCommand(redisClient
*c
) {
4129 set
= lookupKeyRead(c
->db
,c
->argv
[1]);
4131 addReply(c
,shared
.nullbulk
);
4133 if (set
->type
!= REDIS_SET
) {
4134 addReply(c
,shared
.wrongtypeerr
);
4137 de
= dictGetRandomKey(set
->ptr
);
4139 addReply(c
,shared
.nullbulk
);
4141 robj
*ele
= dictGetEntryKey(de
);
4143 addReplyBulkLen(c
,ele
);
4145 addReply(c
,shared
.crlf
);
4150 static int qsortCompareSetsByCardinality(const void *s1
, const void *s2
) {
4151 dict
**d1
= (void*) s1
, **d2
= (void*) s2
;
4153 return dictSize(*d1
)-dictSize(*d2
);
4156 static void sinterGenericCommand(redisClient
*c
, robj
**setskeys
, unsigned long setsnum
, robj
*dstkey
) {
4157 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
4160 robj
*lenobj
= NULL
, *dstset
= NULL
;
4161 unsigned long j
, cardinality
= 0;
4163 for (j
= 0; j
< setsnum
; j
++) {
4167 lookupKeyWrite(c
->db
,setskeys
[j
]) :
4168 lookupKeyRead(c
->db
,setskeys
[j
]);
4172 if (deleteKey(c
->db
,dstkey
))
4174 addReply(c
,shared
.czero
);
4176 addReply(c
,shared
.nullmultibulk
);
4180 if (setobj
->type
!= REDIS_SET
) {
4182 addReply(c
,shared
.wrongtypeerr
);
4185 dv
[j
] = setobj
->ptr
;
4187 /* Sort sets from the smallest to largest, this will improve our
4188 * algorithm's performace */
4189 qsort(dv
,setsnum
,sizeof(dict
*),qsortCompareSetsByCardinality
);
4191 /* The first thing we should output is the total number of elements...
4192 * since this is a multi-bulk write, but at this stage we don't know
4193 * the intersection set size, so we use a trick, append an empty object
4194 * to the output list and save the pointer to later modify it with the
4197 lenobj
= createObject(REDIS_STRING
,NULL
);
4199 decrRefCount(lenobj
);
4201 /* If we have a target key where to store the resulting set
4202 * create this key with an empty set inside */
4203 dstset
= createSetObject();
4206 /* Iterate all the elements of the first (smallest) set, and test
4207 * the element against all the other sets, if at least one set does
4208 * not include the element it is discarded */
4209 di
= dictGetIterator(dv
[0]);
4211 while((de
= dictNext(di
)) != NULL
) {
4214 for (j
= 1; j
< setsnum
; j
++)
4215 if (dictFind(dv
[j
],dictGetEntryKey(de
)) == NULL
) break;
4217 continue; /* at least one set does not contain the member */
4218 ele
= dictGetEntryKey(de
);
4220 addReplyBulkLen(c
,ele
);
4222 addReply(c
,shared
.crlf
);
4225 dictAdd(dstset
->ptr
,ele
,NULL
);
4229 dictReleaseIterator(di
);
4232 /* Store the resulting set into the target */
4233 deleteKey(c
->db
,dstkey
);
4234 dictAdd(c
->db
->dict
,dstkey
,dstset
);
4235 incrRefCount(dstkey
);
4239 lenobj
->ptr
= sdscatprintf(sdsempty(),"*%lu\r\n",cardinality
);
4241 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",
4242 dictSize((dict
*)dstset
->ptr
)));
4248 static void sinterCommand(redisClient
*c
) {
4249 sinterGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
);
4252 static void sinterstoreCommand(redisClient
*c
) {
4253 sinterGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1]);
4256 #define REDIS_OP_UNION 0
4257 #define REDIS_OP_DIFF 1
4259 static void sunionDiffGenericCommand(redisClient
*c
, robj
**setskeys
, int setsnum
, robj
*dstkey
, int op
) {
4260 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
4263 robj
*dstset
= NULL
;
4264 int j
, cardinality
= 0;
4266 for (j
= 0; j
< setsnum
; j
++) {
4270 lookupKeyWrite(c
->db
,setskeys
[j
]) :
4271 lookupKeyRead(c
->db
,setskeys
[j
]);
4276 if (setobj
->type
!= REDIS_SET
) {
4278 addReply(c
,shared
.wrongtypeerr
);
4281 dv
[j
] = setobj
->ptr
;
4284 /* We need a temp set object to store our union. If the dstkey
4285 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
4286 * this set object will be the resulting object to set into the target key*/
4287 dstset
= createSetObject();
4289 /* Iterate all the elements of all the sets, add every element a single
4290 * time to the result set */
4291 for (j
= 0; j
< setsnum
; j
++) {
4292 if (op
== REDIS_OP_DIFF
&& j
== 0 && !dv
[j
]) break; /* result set is empty */
4293 if (!dv
[j
]) continue; /* non existing keys are like empty sets */
4295 di
= dictGetIterator(dv
[j
]);
4297 while((de
= dictNext(di
)) != NULL
) {
4300 /* dictAdd will not add the same element multiple times */
4301 ele
= dictGetEntryKey(de
);
4302 if (op
== REDIS_OP_UNION
|| j
== 0) {
4303 if (dictAdd(dstset
->ptr
,ele
,NULL
) == DICT_OK
) {
4307 } else if (op
== REDIS_OP_DIFF
) {
4308 if (dictDelete(dstset
->ptr
,ele
) == DICT_OK
) {
4313 dictReleaseIterator(di
);
4315 if (op
== REDIS_OP_DIFF
&& cardinality
== 0) break; /* result set is empty */
4318 /* Output the content of the resulting set, if not in STORE mode */
4320 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",cardinality
));
4321 di
= dictGetIterator(dstset
->ptr
);
4322 while((de
= dictNext(di
)) != NULL
) {
4325 ele
= dictGetEntryKey(de
);
4326 addReplyBulkLen(c
,ele
);
4328 addReply(c
,shared
.crlf
);
4330 dictReleaseIterator(di
);
4332 /* If we have a target key where to store the resulting set
4333 * create this key with the result set inside */
4334 deleteKey(c
->db
,dstkey
);
4335 dictAdd(c
->db
->dict
,dstkey
,dstset
);
4336 incrRefCount(dstkey
);
4341 decrRefCount(dstset
);
4343 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",
4344 dictSize((dict
*)dstset
->ptr
)));
4350 static void sunionCommand(redisClient
*c
) {
4351 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_UNION
);
4354 static void sunionstoreCommand(redisClient
*c
) {
4355 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_UNION
);
4358 static void sdiffCommand(redisClient
*c
) {
4359 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_DIFF
);
4362 static void sdiffstoreCommand(redisClient
*c
) {
4363 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_DIFF
);
4366 /* ==================================== ZSets =============================== */
4368 /* ZSETs are ordered sets using two data structures to hold the same elements
4369 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
4372 * The elements are added to an hash table mapping Redis objects to scores.
4373 * At the same time the elements are added to a skip list mapping scores
4374 * to Redis objects (so objects are sorted by scores in this "view"). */
4376 /* This skiplist implementation is almost a C translation of the original
4377 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
4378 * Alternative to Balanced Trees", modified in three ways:
4379 * a) this implementation allows for repeated values.
4380 * b) the comparison is not just by key (our 'score') but by satellite data.
4381 * c) there is a back pointer, so it's a doubly linked list with the back
4382 * pointers being only at "level 1". This allows to traverse the list
4383 * from tail to head, useful for ZREVRANGE. */
4385 static zskiplistNode
*zslCreateNode(int level
, double score
, robj
*obj
) {
4386 zskiplistNode
*zn
= zmalloc(sizeof(*zn
));
4388 zn
->forward
= zmalloc(sizeof(zskiplistNode
*) * level
);
4394 static zskiplist
*zslCreate(void) {
4398 zsl
= zmalloc(sizeof(*zsl
));
4401 zsl
->header
= zslCreateNode(ZSKIPLIST_MAXLEVEL
,0,NULL
);
4402 for (j
= 0; j
< ZSKIPLIST_MAXLEVEL
; j
++)
4403 zsl
->header
->forward
[j
] = NULL
;
4404 zsl
->header
->backward
= NULL
;
4409 static void zslFreeNode(zskiplistNode
*node
) {
4410 decrRefCount(node
->obj
);
4411 zfree(node
->forward
);
4415 static void zslFree(zskiplist
*zsl
) {
4416 zskiplistNode
*node
= zsl
->header
->forward
[0], *next
;
4418 zfree(zsl
->header
->forward
);
4421 next
= node
->forward
[0];
4428 static int zslRandomLevel(void) {
4430 while ((random()&0xFFFF) < (ZSKIPLIST_P
* 0xFFFF))
4435 static void zslInsert(zskiplist
*zsl
, double score
, robj
*obj
) {
4436 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
4440 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4441 while (x
->forward
[i
] &&
4442 (x
->forward
[i
]->score
< score
||
4443 (x
->forward
[i
]->score
== score
&&
4444 compareStringObjects(x
->forward
[i
]->obj
,obj
) < 0)))
4448 /* we assume the key is not already inside, since we allow duplicated
4449 * scores, and the re-insertion of score and redis object should never
4450 * happpen since the caller of zslInsert() should test in the hash table
4451 * if the element is already inside or not. */
4452 level
= zslRandomLevel();
4453 if (level
> zsl
->level
) {
4454 for (i
= zsl
->level
; i
< level
; i
++)
4455 update
[i
] = zsl
->header
;
4458 x
= zslCreateNode(level
,score
,obj
);
4459 for (i
= 0; i
< level
; i
++) {
4460 x
->forward
[i
] = update
[i
]->forward
[i
];
4461 update
[i
]->forward
[i
] = x
;
4463 x
->backward
= (update
[0] == zsl
->header
) ? NULL
: update
[0];
4465 x
->forward
[0]->backward
= x
;
4471 /* Delete an element with matching score/object from the skiplist. */
4472 static int zslDelete(zskiplist
*zsl
, double score
, robj
*obj
) {
4473 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
4477 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4478 while (x
->forward
[i
] &&
4479 (x
->forward
[i
]->score
< score
||
4480 (x
->forward
[i
]->score
== score
&&
4481 compareStringObjects(x
->forward
[i
]->obj
,obj
) < 0)))
4485 /* We may have multiple elements with the same score, what we need
4486 * is to find the element with both the right score and object. */
4488 if (x
&& score
== x
->score
&& compareStringObjects(x
->obj
,obj
) == 0) {
4489 for (i
= 0; i
< zsl
->level
; i
++) {
4490 if (update
[i
]->forward
[i
] != x
) break;
4491 update
[i
]->forward
[i
] = x
->forward
[i
];
4493 if (x
->forward
[0]) {
4494 x
->forward
[0]->backward
= (x
->backward
== zsl
->header
) ?
4497 zsl
->tail
= x
->backward
;
4500 while(zsl
->level
> 1 && zsl
->header
->forward
[zsl
->level
-1] == NULL
)
4505 return 0; /* not found */
4507 return 0; /* not found */
4510 /* Delete all the elements with score between min and max from the skiplist.
4511 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
4512 * Note that this function takes the reference to the hash table view of the
4513 * sorted set, in order to remove the elements from the hash table too. */
4514 static unsigned long zslDeleteRange(zskiplist
*zsl
, double min
, double max
, dict
*dict
) {
4515 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
4516 unsigned long removed
= 0;
4520 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4521 while (x
->forward
[i
] && x
->forward
[i
]->score
< min
)
4525 /* We may have multiple elements with the same score, what we need
4526 * is to find the element with both the right score and object. */
4528 while (x
&& x
->score
<= max
) {
4529 zskiplistNode
*next
;
4531 for (i
= 0; i
< zsl
->level
; i
++) {
4532 if (update
[i
]->forward
[i
] != x
) break;
4533 update
[i
]->forward
[i
] = x
->forward
[i
];
4535 if (x
->forward
[0]) {
4536 x
->forward
[0]->backward
= (x
->backward
== zsl
->header
) ?
4539 zsl
->tail
= x
->backward
;
4541 next
= x
->forward
[0];
4542 dictDelete(dict
,x
->obj
);
4544 while(zsl
->level
> 1 && zsl
->header
->forward
[zsl
->level
-1] == NULL
)
4550 return removed
; /* not found */
4553 /* Find the first node having a score equal or greater than the specified one.
4554 * Returns NULL if there is no match. */
4555 static zskiplistNode
*zslFirstWithScore(zskiplist
*zsl
, double score
) {
4560 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4561 while (x
->forward
[i
] && x
->forward
[i
]->score
< score
)
4564 /* We may have multiple elements with the same score, what we need
4565 * is to find the element with both the right score and object. */
4566 return x
->forward
[0];
4569 /* The actual Z-commands implementations */
4571 /* This generic command implements both ZADD and ZINCRBY.
4572 * scoreval is the score if the operation is a ZADD (doincrement == 0) or
4573 * the increment if the operation is a ZINCRBY (doincrement == 1). */
4574 static void zaddGenericCommand(redisClient
*c
, robj
*key
, robj
*ele
, double scoreval
, int doincrement
) {
4579 zsetobj
= lookupKeyWrite(c
->db
,key
);
4580 if (zsetobj
== NULL
) {
4581 zsetobj
= createZsetObject();
4582 dictAdd(c
->db
->dict
,key
,zsetobj
);
4585 if (zsetobj
->type
!= REDIS_ZSET
) {
4586 addReply(c
,shared
.wrongtypeerr
);
4592 /* Ok now since we implement both ZADD and ZINCRBY here the code
4593 * needs to handle the two different conditions. It's all about setting
4594 * '*score', that is, the new score to set, to the right value. */
4595 score
= zmalloc(sizeof(double));
4599 /* Read the old score. If the element was not present starts from 0 */
4600 de
= dictFind(zs
->dict
,ele
);
4602 double *oldscore
= dictGetEntryVal(de
);
4603 *score
= *oldscore
+ scoreval
;
4611 /* What follows is a simple remove and re-insert operation that is common
4612 * to both ZADD and ZINCRBY... */
4613 if (dictAdd(zs
->dict
,ele
,score
) == DICT_OK
) {
4614 /* case 1: New element */
4615 incrRefCount(ele
); /* added to hash */
4616 zslInsert(zs
->zsl
,*score
,ele
);
4617 incrRefCount(ele
); /* added to skiplist */
4620 addReplyDouble(c
,*score
);
4622 addReply(c
,shared
.cone
);
4627 /* case 2: Score update operation */
4628 de
= dictFind(zs
->dict
,ele
);
4629 redisAssert(de
!= NULL
);
4630 oldscore
= dictGetEntryVal(de
);
4631 if (*score
!= *oldscore
) {
4634 /* Remove and insert the element in the skip list with new score */
4635 deleted
= zslDelete(zs
->zsl
,*oldscore
,ele
);
4636 redisAssert(deleted
!= 0);
4637 zslInsert(zs
->zsl
,*score
,ele
);
4639 /* Update the score in the hash table */
4640 dictReplace(zs
->dict
,ele
,score
);
4646 addReplyDouble(c
,*score
);
4648 addReply(c
,shared
.czero
);
4652 static void zaddCommand(redisClient
*c
) {
4655 scoreval
= strtod(c
->argv
[2]->ptr
,NULL
);
4656 zaddGenericCommand(c
,c
->argv
[1],c
->argv
[3],scoreval
,0);
4659 static void zincrbyCommand(redisClient
*c
) {
4662 scoreval
= strtod(c
->argv
[2]->ptr
,NULL
);
4663 zaddGenericCommand(c
,c
->argv
[1],c
->argv
[3],scoreval
,1);
4666 static void zremCommand(redisClient
*c
) {
4670 zsetobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4671 if (zsetobj
== NULL
) {
4672 addReply(c
,shared
.czero
);
4678 if (zsetobj
->type
!= REDIS_ZSET
) {
4679 addReply(c
,shared
.wrongtypeerr
);
4683 de
= dictFind(zs
->dict
,c
->argv
[2]);
4685 addReply(c
,shared
.czero
);
4688 /* Delete from the skiplist */
4689 oldscore
= dictGetEntryVal(de
);
4690 deleted
= zslDelete(zs
->zsl
,*oldscore
,c
->argv
[2]);
4691 redisAssert(deleted
!= 0);
4693 /* Delete from the hash table */
4694 dictDelete(zs
->dict
,c
->argv
[2]);
4695 if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
);
4697 addReply(c
,shared
.cone
);
4701 static void zremrangebyscoreCommand(redisClient
*c
) {
4702 double min
= strtod(c
->argv
[2]->ptr
,NULL
);
4703 double max
= strtod(c
->argv
[3]->ptr
,NULL
);
4707 zsetobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4708 if (zsetobj
== NULL
) {
4709 addReply(c
,shared
.czero
);
4713 if (zsetobj
->type
!= REDIS_ZSET
) {
4714 addReply(c
,shared
.wrongtypeerr
);
4718 deleted
= zslDeleteRange(zs
->zsl
,min
,max
,zs
->dict
);
4719 if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
);
4720 server
.dirty
+= deleted
;
4721 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",deleted
));
4725 static void zrangeGenericCommand(redisClient
*c
, int reverse
) {
4727 int start
= atoi(c
->argv
[2]->ptr
);
4728 int end
= atoi(c
->argv
[3]->ptr
);
4731 if (c
->argc
== 5 && !strcasecmp(c
->argv
[4]->ptr
,"withscores")) {
4733 } else if (c
->argc
>= 5) {
4734 addReply(c
,shared
.syntaxerr
);
4738 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4740 addReply(c
,shared
.nullmultibulk
);
4742 if (o
->type
!= REDIS_ZSET
) {
4743 addReply(c
,shared
.wrongtypeerr
);
4745 zset
*zsetobj
= o
->ptr
;
4746 zskiplist
*zsl
= zsetobj
->zsl
;
4749 int llen
= zsl
->length
;
4753 /* convert negative indexes */
4754 if (start
< 0) start
= llen
+start
;
4755 if (end
< 0) end
= llen
+end
;
4756 if (start
< 0) start
= 0;
4757 if (end
< 0) end
= 0;
4759 /* indexes sanity checks */
4760 if (start
> end
|| start
>= llen
) {
4761 /* Out of range start or start > end result in empty list */
4762 addReply(c
,shared
.emptymultibulk
);
4765 if (end
>= llen
) end
= llen
-1;
4766 rangelen
= (end
-start
)+1;
4768 /* Return the result in form of a multi-bulk reply */
4774 ln
= zsl
->header
->forward
[0];
4776 ln
= ln
->forward
[0];
4779 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",
4780 withscores
? (rangelen
*2) : rangelen
));
4781 for (j
= 0; j
< rangelen
; j
++) {
4783 addReplyBulkLen(c
,ele
);
4785 addReply(c
,shared
.crlf
);
4787 addReplyDouble(c
,ln
->score
);
4788 ln
= reverse
? ln
->backward
: ln
->forward
[0];
4794 static void zrangeCommand(redisClient
*c
) {
4795 zrangeGenericCommand(c
,0);
4798 static void zrevrangeCommand(redisClient
*c
) {
4799 zrangeGenericCommand(c
,1);
4802 static void zrangebyscoreCommand(redisClient
*c
) {
4804 double min
= strtod(c
->argv
[2]->ptr
,NULL
);
4805 double max
= strtod(c
->argv
[3]->ptr
,NULL
);
4806 int offset
= 0, limit
= -1;
4808 if (c
->argc
!= 4 && c
->argc
!= 7) {
4810 sdsnew("-ERR wrong number of arguments for ZRANGEBYSCORE\r\n"));
4812 } else if (c
->argc
== 7 && strcasecmp(c
->argv
[4]->ptr
,"limit")) {
4813 addReply(c
,shared
.syntaxerr
);
4815 } else if (c
->argc
== 7) {
4816 offset
= atoi(c
->argv
[5]->ptr
);
4817 limit
= atoi(c
->argv
[6]->ptr
);
4818 if (offset
< 0) offset
= 0;
4821 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4823 addReply(c
,shared
.nullmultibulk
);
4825 if (o
->type
!= REDIS_ZSET
) {
4826 addReply(c
,shared
.wrongtypeerr
);
4828 zset
*zsetobj
= o
->ptr
;
4829 zskiplist
*zsl
= zsetobj
->zsl
;
4832 unsigned int rangelen
= 0;
4834 /* Get the first node with the score >= min */
4835 ln
= zslFirstWithScore(zsl
,min
);
4837 /* No element matching the speciifed interval */
4838 addReply(c
,shared
.emptymultibulk
);
4842 /* We don't know in advance how many matching elements there
4843 * are in the list, so we push this object that will represent
4844 * the multi-bulk length in the output buffer, and will "fix"
4846 lenobj
= createObject(REDIS_STRING
,NULL
);
4848 decrRefCount(lenobj
);
4850 while(ln
&& ln
->score
<= max
) {
4853 ln
= ln
->forward
[0];
4856 if (limit
== 0) break;
4858 addReplyBulkLen(c
,ele
);
4860 addReply(c
,shared
.crlf
);
4861 ln
= ln
->forward
[0];
4863 if (limit
> 0) limit
--;
4865 lenobj
->ptr
= sdscatprintf(sdsempty(),"*%d\r\n",rangelen
);
4870 static void zcardCommand(redisClient
*c
) {
4874 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4876 addReply(c
,shared
.czero
);
4879 if (o
->type
!= REDIS_ZSET
) {
4880 addReply(c
,shared
.wrongtypeerr
);
4883 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",zs
->zsl
->length
));
4888 static void zscoreCommand(redisClient
*c
) {
4892 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4894 addReply(c
,shared
.nullbulk
);
4897 if (o
->type
!= REDIS_ZSET
) {
4898 addReply(c
,shared
.wrongtypeerr
);
4903 de
= dictFind(zs
->dict
,c
->argv
[2]);
4905 addReply(c
,shared
.nullbulk
);
4907 double *score
= dictGetEntryVal(de
);
4909 addReplyDouble(c
,*score
);
4915 /* ========================= Non type-specific commands ==================== */
4917 static void flushdbCommand(redisClient
*c
) {
4918 server
.dirty
+= dictSize(c
->db
->dict
);
4919 dictEmpty(c
->db
->dict
);
4920 dictEmpty(c
->db
->expires
);
4921 addReply(c
,shared
.ok
);
4924 static void flushallCommand(redisClient
*c
) {
4925 server
.dirty
+= emptyDb();
4926 addReply(c
,shared
.ok
);
4927 rdbSave(server
.dbfilename
);
4931 static redisSortOperation
*createSortOperation(int type
, robj
*pattern
) {
4932 redisSortOperation
*so
= zmalloc(sizeof(*so
));
4934 so
->pattern
= pattern
;
4938 /* Return the value associated to the key with a name obtained
4939 * substituting the first occurence of '*' in 'pattern' with 'subst' */
4940 static robj
*lookupKeyByPattern(redisDb
*db
, robj
*pattern
, robj
*subst
) {
4944 int prefixlen
, sublen
, postfixlen
;
4945 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
4949 char buf
[REDIS_SORTKEY_MAX
+1];
4952 /* If the pattern is "#" return the substitution object itself in order
4953 * to implement the "SORT ... GET #" feature. */
4954 spat
= pattern
->ptr
;
4955 if (spat
[0] == '#' && spat
[1] == '\0') {
4959 /* The substitution object may be specially encoded. If so we create
4960 * a decoded object on the fly. Otherwise getDecodedObject will just
4961 * increment the ref count, that we'll decrement later. */
4962 subst
= getDecodedObject(subst
);
4965 if (sdslen(spat
)+sdslen(ssub
)-1 > REDIS_SORTKEY_MAX
) return NULL
;
4966 p
= strchr(spat
,'*');
4968 decrRefCount(subst
);
4973 sublen
= sdslen(ssub
);
4974 postfixlen
= sdslen(spat
)-(prefixlen
+1);
4975 memcpy(keyname
.buf
,spat
,prefixlen
);
4976 memcpy(keyname
.buf
+prefixlen
,ssub
,sublen
);
4977 memcpy(keyname
.buf
+prefixlen
+sublen
,p
+1,postfixlen
);
4978 keyname
.buf
[prefixlen
+sublen
+postfixlen
] = '\0';
4979 keyname
.len
= prefixlen
+sublen
+postfixlen
;
4981 initStaticStringObject(keyobj
,((char*)&keyname
)+(sizeof(long)*2))
4982 decrRefCount(subst
);
4984 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
4985 return lookupKeyRead(db
,&keyobj
);
4988 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
4989 * the additional parameter is not standard but a BSD-specific we have to
4990 * pass sorting parameters via the global 'server' structure */
4991 static int sortCompare(const void *s1
, const void *s2
) {
4992 const redisSortObject
*so1
= s1
, *so2
= s2
;
4995 if (!server
.sort_alpha
) {
4996 /* Numeric sorting. Here it's trivial as we precomputed scores */
4997 if (so1
->u
.score
> so2
->u
.score
) {
4999 } else if (so1
->u
.score
< so2
->u
.score
) {
5005 /* Alphanumeric sorting */
5006 if (server
.sort_bypattern
) {
5007 if (!so1
->u
.cmpobj
|| !so2
->u
.cmpobj
) {
5008 /* At least one compare object is NULL */
5009 if (so1
->u
.cmpobj
== so2
->u
.cmpobj
)
5011 else if (so1
->u
.cmpobj
== NULL
)
5016 /* We have both the objects, use strcoll */
5017 cmp
= strcoll(so1
->u
.cmpobj
->ptr
,so2
->u
.cmpobj
->ptr
);
5020 /* Compare elements directly */
5023 dec1
= getDecodedObject(so1
->obj
);
5024 dec2
= getDecodedObject(so2
->obj
);
5025 cmp
= strcoll(dec1
->ptr
,dec2
->ptr
);
5030 return server
.sort_desc
? -cmp
: cmp
;
5033 /* The SORT command is the most complex command in Redis. Warning: this code
5034 * is optimized for speed and a bit less for readability */
5035 static void sortCommand(redisClient
*c
) {
5038 int desc
= 0, alpha
= 0;
5039 int limit_start
= 0, limit_count
= -1, start
, end
;
5040 int j
, dontsort
= 0, vectorlen
;
5041 int getop
= 0; /* GET operation counter */
5042 robj
*sortval
, *sortby
= NULL
, *storekey
= NULL
;
5043 redisSortObject
*vector
; /* Resulting vector to sort */
5045 /* Lookup the key to sort. It must be of the right types */
5046 sortval
= lookupKeyRead(c
->db
,c
->argv
[1]);
5047 if (sortval
== NULL
) {
5048 addReply(c
,shared
.nullmultibulk
);
5051 if (sortval
->type
!= REDIS_SET
&& sortval
->type
!= REDIS_LIST
&&
5052 sortval
->type
!= REDIS_ZSET
)
5054 addReply(c
,shared
.wrongtypeerr
);
5058 /* Create a list of operations to perform for every sorted element.
5059 * Operations can be GET/DEL/INCR/DECR */
5060 operations
= listCreate();
5061 listSetFreeMethod(operations
,zfree
);
5064 /* Now we need to protect sortval incrementing its count, in the future
5065 * SORT may have options able to overwrite/delete keys during the sorting
5066 * and the sorted key itself may get destroied */
5067 incrRefCount(sortval
);
5069 /* The SORT command has an SQL-alike syntax, parse it */
5070 while(j
< c
->argc
) {
5071 int leftargs
= c
->argc
-j
-1;
5072 if (!strcasecmp(c
->argv
[j
]->ptr
,"asc")) {
5074 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"desc")) {
5076 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"alpha")) {
5078 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"limit") && leftargs
>= 2) {
5079 limit_start
= atoi(c
->argv
[j
+1]->ptr
);
5080 limit_count
= atoi(c
->argv
[j
+2]->ptr
);
5082 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"store") && leftargs
>= 1) {
5083 storekey
= c
->argv
[j
+1];
5085 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"by") && leftargs
>= 1) {
5086 sortby
= c
->argv
[j
+1];
5087 /* If the BY pattern does not contain '*', i.e. it is constant,
5088 * we don't need to sort nor to lookup the weight keys. */
5089 if (strchr(c
->argv
[j
+1]->ptr
,'*') == NULL
) dontsort
= 1;
5091 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"get") && leftargs
>= 1) {
5092 listAddNodeTail(operations
,createSortOperation(
5093 REDIS_SORT_GET
,c
->argv
[j
+1]));
5097 decrRefCount(sortval
);
5098 listRelease(operations
);
5099 addReply(c
,shared
.syntaxerr
);
5105 /* Load the sorting vector with all the objects to sort */
5106 switch(sortval
->type
) {
5107 case REDIS_LIST
: vectorlen
= listLength((list
*)sortval
->ptr
); break;
5108 case REDIS_SET
: vectorlen
= dictSize((dict
*)sortval
->ptr
); break;
5109 case REDIS_ZSET
: vectorlen
= dictSize(((zset
*)sortval
->ptr
)->dict
); break;
5110 default: vectorlen
= 0; redisAssert(0); /* Avoid GCC warning */
5112 vector
= zmalloc(sizeof(redisSortObject
)*vectorlen
);
5115 if (sortval
->type
== REDIS_LIST
) {
5116 list
*list
= sortval
->ptr
;
5120 while((ln
= listYield(list
))) {
5121 robj
*ele
= ln
->value
;
5122 vector
[j
].obj
= ele
;
5123 vector
[j
].u
.score
= 0;
5124 vector
[j
].u
.cmpobj
= NULL
;
5132 if (sortval
->type
== REDIS_SET
) {
5135 zset
*zs
= sortval
->ptr
;
5139 di
= dictGetIterator(set
);
5140 while((setele
= dictNext(di
)) != NULL
) {
5141 vector
[j
].obj
= dictGetEntryKey(setele
);
5142 vector
[j
].u
.score
= 0;
5143 vector
[j
].u
.cmpobj
= NULL
;
5146 dictReleaseIterator(di
);
5148 redisAssert(j
== vectorlen
);
5150 /* Now it's time to load the right scores in the sorting vector */
5151 if (dontsort
== 0) {
5152 for (j
= 0; j
< vectorlen
; j
++) {
5156 byval
= lookupKeyByPattern(c
->db
,sortby
,vector
[j
].obj
);
5157 if (!byval
|| byval
->type
!= REDIS_STRING
) continue;
5159 vector
[j
].u
.cmpobj
= getDecodedObject(byval
);
5161 if (byval
->encoding
== REDIS_ENCODING_RAW
) {
5162 vector
[j
].u
.score
= strtod(byval
->ptr
,NULL
);
5164 /* Don't need to decode the object if it's
5165 * integer-encoded (the only encoding supported) so
5166 * far. We can just cast it */
5167 if (byval
->encoding
== REDIS_ENCODING_INT
) {
5168 vector
[j
].u
.score
= (long)byval
->ptr
;
5170 redisAssert(1 != 1);
5175 if (vector
[j
].obj
->encoding
== REDIS_ENCODING_RAW
)
5176 vector
[j
].u
.score
= strtod(vector
[j
].obj
->ptr
,NULL
);
5178 if (vector
[j
].obj
->encoding
== REDIS_ENCODING_INT
)
5179 vector
[j
].u
.score
= (long) vector
[j
].obj
->ptr
;
5181 redisAssert(1 != 1);
5188 /* We are ready to sort the vector... perform a bit of sanity check
5189 * on the LIMIT option too. We'll use a partial version of quicksort. */
5190 start
= (limit_start
< 0) ? 0 : limit_start
;
5191 end
= (limit_count
< 0) ? vectorlen
-1 : start
+limit_count
-1;
5192 if (start
>= vectorlen
) {
5193 start
= vectorlen
-1;
5196 if (end
>= vectorlen
) end
= vectorlen
-1;
5198 if (dontsort
== 0) {
5199 server
.sort_desc
= desc
;
5200 server
.sort_alpha
= alpha
;
5201 server
.sort_bypattern
= sortby
? 1 : 0;
5202 if (sortby
&& (start
!= 0 || end
!= vectorlen
-1))
5203 pqsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
, start
,end
);
5205 qsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
);
5208 /* Send command output to the output buffer, performing the specified
5209 * GET/DEL/INCR/DECR operations if any. */
5210 outputlen
= getop
? getop
*(end
-start
+1) : end
-start
+1;
5211 if (storekey
== NULL
) {
5212 /* STORE option not specified, sent the sorting result to client */
5213 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",outputlen
));
5214 for (j
= start
; j
<= end
; j
++) {
5217 addReplyBulkLen(c
,vector
[j
].obj
);
5218 addReply(c
,vector
[j
].obj
);
5219 addReply(c
,shared
.crlf
);
5221 listRewind(operations
);
5222 while((ln
= listYield(operations
))) {
5223 redisSortOperation
*sop
= ln
->value
;
5224 robj
*val
= lookupKeyByPattern(c
->db
,sop
->pattern
,
5227 if (sop
->type
== REDIS_SORT_GET
) {
5228 if (!val
|| val
->type
!= REDIS_STRING
) {
5229 addReply(c
,shared
.nullbulk
);
5231 addReplyBulkLen(c
,val
);
5233 addReply(c
,shared
.crlf
);
5236 redisAssert(sop
->type
== REDIS_SORT_GET
); /* always fails */
5241 robj
*listObject
= createListObject();
5242 list
*listPtr
= (list
*) listObject
->ptr
;
5244 /* STORE option specified, set the sorting result as a List object */
5245 for (j
= start
; j
<= end
; j
++) {
5248 listAddNodeTail(listPtr
,vector
[j
].obj
);
5249 incrRefCount(vector
[j
].obj
);
5251 listRewind(operations
);
5252 while((ln
= listYield(operations
))) {
5253 redisSortOperation
*sop
= ln
->value
;
5254 robj
*val
= lookupKeyByPattern(c
->db
,sop
->pattern
,
5257 if (sop
->type
== REDIS_SORT_GET
) {
5258 if (!val
|| val
->type
!= REDIS_STRING
) {
5259 listAddNodeTail(listPtr
,createStringObject("",0));
5261 listAddNodeTail(listPtr
,val
);
5265 redisAssert(sop
->type
== REDIS_SORT_GET
); /* always fails */
5269 if (dictReplace(c
->db
->dict
,storekey
,listObject
)) {
5270 incrRefCount(storekey
);
5272 /* Note: we add 1 because the DB is dirty anyway since even if the
5273 * SORT result is empty a new key is set and maybe the old content
5275 server
.dirty
+= 1+outputlen
;
5276 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",outputlen
));
5280 decrRefCount(sortval
);
5281 listRelease(operations
);
5282 for (j
= 0; j
< vectorlen
; j
++) {
5283 if (sortby
&& alpha
&& vector
[j
].u
.cmpobj
)
5284 decrRefCount(vector
[j
].u
.cmpobj
);
5289 /* Create the string returned by the INFO command. This is decoupled
5290 * by the INFO command itself as we need to report the same information
5291 * on memory corruption problems. */
5292 static sds
genRedisInfoString(void) {
5294 time_t uptime
= time(NULL
)-server
.stat_starttime
;
5297 info
= sdscatprintf(sdsempty(),
5298 "redis_version:%s\r\n"
5300 "multiplexing_api:%s\r\n"
5301 "uptime_in_seconds:%ld\r\n"
5302 "uptime_in_days:%ld\r\n"
5303 "connected_clients:%d\r\n"
5304 "connected_slaves:%d\r\n"
5305 "blocked_clients:%d\r\n"
5306 "used_memory:%zu\r\n"
5307 "changes_since_last_save:%lld\r\n"
5308 "bgsave_in_progress:%d\r\n"
5309 "last_save_time:%ld\r\n"
5310 "bgrewriteaof_in_progress:%d\r\n"
5311 "total_connections_received:%lld\r\n"
5312 "total_commands_processed:%lld\r\n"
5315 (sizeof(long) == 8) ? "64" : "32",
5319 listLength(server
.clients
)-listLength(server
.slaves
),
5320 listLength(server
.slaves
),
5321 server
.blockedclients
,
5324 server
.bgsavechildpid
!= -1,
5326 server
.bgrewritechildpid
!= -1,
5327 server
.stat_numconnections
,
5328 server
.stat_numcommands
,
5329 server
.masterhost
== NULL
? "master" : "slave"
5331 if (server
.masterhost
) {
5332 info
= sdscatprintf(info
,
5333 "master_host:%s\r\n"
5334 "master_port:%d\r\n"
5335 "master_link_status:%s\r\n"
5336 "master_last_io_seconds_ago:%d\r\n"
5339 (server
.replstate
== REDIS_REPL_CONNECTED
) ?
5341 server
.master
? ((int)(time(NULL
)-server
.master
->lastinteraction
)) : -1
5344 for (j
= 0; j
< server
.dbnum
; j
++) {
5345 long long keys
, vkeys
;
5347 keys
= dictSize(server
.db
[j
].dict
);
5348 vkeys
= dictSize(server
.db
[j
].expires
);
5349 if (keys
|| vkeys
) {
5350 info
= sdscatprintf(info
, "db%d:keys=%lld,expires=%lld\r\n",
5357 static void infoCommand(redisClient
*c
) {
5358 sds info
= genRedisInfoString();
5359 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n",
5360 (unsigned long)sdslen(info
)));
5361 addReplySds(c
,info
);
5362 addReply(c
,shared
.crlf
);
5365 static void monitorCommand(redisClient
*c
) {
5366 /* ignore MONITOR if aleady slave or in monitor mode */
5367 if (c
->flags
& REDIS_SLAVE
) return;
5369 c
->flags
|= (REDIS_SLAVE
|REDIS_MONITOR
);
5371 listAddNodeTail(server
.monitors
,c
);
5372 addReply(c
,shared
.ok
);
5375 /* ================================= Expire ================================= */
5376 static int removeExpire(redisDb
*db
, robj
*key
) {
5377 if (dictDelete(db
->expires
,key
) == DICT_OK
) {
5384 static int setExpire(redisDb
*db
, robj
*key
, time_t when
) {
5385 if (dictAdd(db
->expires
,key
,(void*)when
) == DICT_ERR
) {
5393 /* Return the expire time of the specified key, or -1 if no expire
5394 * is associated with this key (i.e. the key is non volatile) */
5395 static time_t getExpire(redisDb
*db
, robj
*key
) {
5398 /* No expire? return ASAP */
5399 if (dictSize(db
->expires
) == 0 ||
5400 (de
= dictFind(db
->expires
,key
)) == NULL
) return -1;
5402 return (time_t) dictGetEntryVal(de
);
5405 static int expireIfNeeded(redisDb
*db
, robj
*key
) {
5409 /* No expire? return ASAP */
5410 if (dictSize(db
->expires
) == 0 ||
5411 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
5413 /* Lookup the expire */
5414 when
= (time_t) dictGetEntryVal(de
);
5415 if (time(NULL
) <= when
) return 0;
5417 /* Delete the key */
5418 dictDelete(db
->expires
,key
);
5419 return dictDelete(db
->dict
,key
) == DICT_OK
;
5422 static int deleteIfVolatile(redisDb
*db
, robj
*key
) {
5425 /* No expire? return ASAP */
5426 if (dictSize(db
->expires
) == 0 ||
5427 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
5429 /* Delete the key */
5431 dictDelete(db
->expires
,key
);
5432 return dictDelete(db
->dict
,key
) == DICT_OK
;
5435 static void expireGenericCommand(redisClient
*c
, robj
*key
, time_t seconds
) {
5438 de
= dictFind(c
->db
->dict
,key
);
5440 addReply(c
,shared
.czero
);
5444 if (deleteKey(c
->db
,key
)) server
.dirty
++;
5445 addReply(c
, shared
.cone
);
5448 time_t when
= time(NULL
)+seconds
;
5449 if (setExpire(c
->db
,key
,when
)) {
5450 addReply(c
,shared
.cone
);
5453 addReply(c
,shared
.czero
);
5459 static void expireCommand(redisClient
*c
) {
5460 expireGenericCommand(c
,c
->argv
[1],strtol(c
->argv
[2]->ptr
,NULL
,10));
5463 static void expireatCommand(redisClient
*c
) {
5464 expireGenericCommand(c
,c
->argv
[1],strtol(c
->argv
[2]->ptr
,NULL
,10)-time(NULL
));
5467 static void ttlCommand(redisClient
*c
) {
5471 expire
= getExpire(c
->db
,c
->argv
[1]);
5473 ttl
= (int) (expire
-time(NULL
));
5474 if (ttl
< 0) ttl
= -1;
5476 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",ttl
));
5479 /* ================================ MULTI/EXEC ============================== */
5481 /* Client state initialization for MULTI/EXEC */
5482 static void initClientMultiState(redisClient
*c
) {
5483 c
->mstate
.commands
= NULL
;
5484 c
->mstate
.count
= 0;
5487 /* Release all the resources associated with MULTI/EXEC state */
5488 static void freeClientMultiState(redisClient
*c
) {
5491 for (j
= 0; j
< c
->mstate
.count
; j
++) {
5493 multiCmd
*mc
= c
->mstate
.commands
+j
;
5495 for (i
= 0; i
< mc
->argc
; i
++)
5496 decrRefCount(mc
->argv
[i
]);
5499 zfree(c
->mstate
.commands
);
5502 /* Add a new command into the MULTI commands queue */
5503 static void queueMultiCommand(redisClient
*c
, struct redisCommand
*cmd
) {
5507 c
->mstate
.commands
= zrealloc(c
->mstate
.commands
,
5508 sizeof(multiCmd
)*(c
->mstate
.count
+1));
5509 mc
= c
->mstate
.commands
+c
->mstate
.count
;
5512 mc
->argv
= zmalloc(sizeof(robj
*)*c
->argc
);
5513 memcpy(mc
->argv
,c
->argv
,sizeof(robj
*)*c
->argc
);
5514 for (j
= 0; j
< c
->argc
; j
++)
5515 incrRefCount(mc
->argv
[j
]);
5519 static void multiCommand(redisClient
*c
) {
5520 c
->flags
|= REDIS_MULTI
;
5521 addReply(c
,shared
.ok
);
5524 static void execCommand(redisClient
*c
) {
5529 if (!(c
->flags
& REDIS_MULTI
)) {
5530 addReplySds(c
,sdsnew("-ERR EXEC without MULTI\r\n"));
5534 orig_argv
= c
->argv
;
5535 orig_argc
= c
->argc
;
5536 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",c
->mstate
.count
));
5537 for (j
= 0; j
< c
->mstate
.count
; j
++) {
5538 c
->argc
= c
->mstate
.commands
[j
].argc
;
5539 c
->argv
= c
->mstate
.commands
[j
].argv
;
5540 call(c
,c
->mstate
.commands
[j
].cmd
);
5542 c
->argv
= orig_argv
;
5543 c
->argc
= orig_argc
;
5544 freeClientMultiState(c
);
5545 initClientMultiState(c
);
5546 c
->flags
&= (~REDIS_MULTI
);
5549 /* =========================== Blocking Operations ========================= */
5551 /* Currently Redis blocking operations support is limited to list POP ops,
5552 * so the current implementation is not fully generic, but it is also not
5553 * completely specific so it will not require a rewrite to support new
5554 * kind of blocking operations in the future.
5556 * Still it's important to note that list blocking operations can be already
5557 * used as a notification mechanism in order to implement other blocking
5558 * operations at application level, so there must be a very strong evidence
5559 * of usefulness and generality before new blocking operations are implemented.
5561 * This is how the current blocking POP works, we use BLPOP as example:
5562 * - If the user calls BLPOP and the key exists and contains a non empty list
5563 * then LPOP is called instead. So BLPOP is semantically the same as LPOP
5564 * if there is not to block.
5565 * - If instead BLPOP is called and the key does not exists or the list is
5566 * empty we need to block. In order to do so we remove the notification for
5567 * new data to read in the client socket (so that we'll not serve new
5568 * requests if the blocking request is not served). Also we put the client
5569 * in a dictionary (db->blockingkeys) mapping keys to a list of clients
5570 * blocking for this keys.
5571 * - If a PUSH operation against a key with blocked clients waiting is
5572 * performed, we serve the first in the list: basically instead to push
5573 * the new element inside the list we return it to the (first / oldest)
5574 * blocking client, unblock the client, and remove it form the list.
5576 * The above comment and the source code should be enough in order to understand
5577 * the implementation and modify / fix it later.
5580 /* Set a client in blocking mode for the specified key, with the specified
5582 static void blockForKeys(redisClient
*c
, robj
**keys
, int numkeys
, time_t timeout
) {
5587 c
->blockingkeys
= zmalloc(sizeof(robj
*)*numkeys
);
5588 c
->blockingkeysnum
= numkeys
;
5589 c
->blockingto
= timeout
;
5590 for (j
= 0; j
< numkeys
; j
++) {
5591 /* Add the key in the client structure, to map clients -> keys */
5592 c
->blockingkeys
[j
] = keys
[j
];
5593 incrRefCount(keys
[j
]);
5595 /* And in the other "side", to map keys -> clients */
5596 de
= dictFind(c
->db
->blockingkeys
,keys
[j
]);
5600 /* For every key we take a list of clients blocked for it */
5602 retval
= dictAdd(c
->db
->blockingkeys
,keys
[j
],l
);
5603 incrRefCount(keys
[j
]);
5604 assert(retval
== DICT_OK
);
5606 l
= dictGetEntryVal(de
);
5608 listAddNodeTail(l
,c
);
5610 /* Mark the client as a blocked client */
5611 c
->flags
|= REDIS_BLOCKED
;
5612 aeDeleteFileEvent(server
.el
,c
->fd
,AE_READABLE
);
5613 server
.blockedclients
++;
5616 /* Unblock a client that's waiting in a blocking operation such as BLPOP */
5617 static void unblockClient(redisClient
*c
) {
5622 assert(c
->blockingkeys
!= NULL
);
5623 /* The client may wait for multiple keys, so unblock it for every key. */
5624 for (j
= 0; j
< c
->blockingkeysnum
; j
++) {
5625 /* Remove this client from the list of clients waiting for this key. */
5626 de
= dictFind(c
->db
->blockingkeys
,c
->blockingkeys
[j
]);
5628 l
= dictGetEntryVal(de
);
5629 listDelNode(l
,listSearchKey(l
,c
));
5630 /* If the list is empty we need to remove it to avoid wasting memory */
5631 if (listLength(l
) == 0)
5632 dictDelete(c
->db
->blockingkeys
,c
->blockingkeys
[j
]);
5633 decrRefCount(c
->blockingkeys
[j
]);
5635 /* Cleanup the client structure */
5636 zfree(c
->blockingkeys
);
5637 c
->blockingkeys
= NULL
;
5638 c
->flags
&= (~REDIS_BLOCKED
);
5639 server
.blockedclients
--;
5640 /* Ok now we are ready to get read events from socket, note that we
5641 * can't trap errors here as it's possible that unblockClients() is
5642 * called from freeClient() itself, and the only thing we can do
5643 * if we failed to register the READABLE event is to kill the client.
5644 * Still the following function should never fail in the real world as
5645 * we are sure the file descriptor is sane, and we exit on out of mem. */
5646 aeCreateFileEvent(server
.el
, c
->fd
, AE_READABLE
, readQueryFromClient
, c
);
5647 /* As a final step we want to process data if there is some command waiting
5648 * in the input buffer. Note that this is safe even if unblockClient()
5649 * gets called from freeClient() because freeClient() will be smart
5650 * enough to call this function *after* c->querybuf was set to NULL. */
5651 if (c
->querybuf
&& sdslen(c
->querybuf
) > 0) processInputBuffer(c
);
5654 /* This should be called from any function PUSHing into lists.
5655 * 'c' is the "pushing client", 'key' is the key it is pushing data against,
5656 * 'ele' is the element pushed.
5658 * If the function returns 0 there was no client waiting for a list push
5661 * If the function returns 1 there was a client waiting for a list push
5662 * against this key, the element was passed to this client thus it's not
5663 * needed to actually add it to the list and the caller should return asap. */
5664 static int handleClientsWaitingListPush(redisClient
*c
, robj
*key
, robj
*ele
) {
5665 struct dictEntry
*de
;
5666 redisClient
*receiver
;
5670 de
= dictFind(c
->db
->blockingkeys
,key
);
5671 if (de
== NULL
) return 0;
5672 l
= dictGetEntryVal(de
);
5675 receiver
= ln
->value
;
5677 addReplySds(receiver
,sdsnew("*2\r\n"));
5678 addReplyBulkLen(receiver
,key
);
5679 addReply(receiver
,key
);
5680 addReply(receiver
,shared
.crlf
);
5681 addReplyBulkLen(receiver
,ele
);
5682 addReply(receiver
,ele
);
5683 addReply(receiver
,shared
.crlf
);
5684 unblockClient(receiver
);
5688 /* Blocking RPOP/LPOP */
5689 static void blockingPopGenericCommand(redisClient
*c
, int where
) {
5694 for (j
= 1; j
< c
->argc
-1; j
++) {
5695 o
= lookupKeyWrite(c
->db
,c
->argv
[j
]);
5697 if (o
->type
!= REDIS_LIST
) {
5698 addReply(c
,shared
.wrongtypeerr
);
5701 list
*list
= o
->ptr
;
5702 if (listLength(list
) != 0) {
5703 /* If the list contains elements fall back to the usual
5704 * non-blocking POP operation */
5705 robj
*argv
[2], **orig_argv
;
5708 /* We need to alter the command arguments before to call
5709 * popGenericCommand() as the command takes a single key. */
5710 orig_argv
= c
->argv
;
5711 orig_argc
= c
->argc
;
5712 argv
[1] = c
->argv
[j
];
5716 /* Also the return value is different, we need to output
5717 * the multi bulk reply header and the key name. The
5718 * "real" command will add the last element (the value)
5719 * for us. If this souds like an hack to you it's just
5720 * because it is... */
5721 addReplySds(c
,sdsnew("*2\r\n"));
5722 addReplyBulkLen(c
,argv
[1]);
5723 addReply(c
,argv
[1]);
5724 addReply(c
,shared
.crlf
);
5725 popGenericCommand(c
,where
);
5727 /* Fix the client structure with the original stuff */
5728 c
->argv
= orig_argv
;
5729 c
->argc
= orig_argc
;
5735 /* If the list is empty or the key does not exists we must block */
5736 timeout
= strtol(c
->argv
[c
->argc
-1]->ptr
,NULL
,10);
5737 if (timeout
> 0) timeout
+= time(NULL
);
5738 blockForKeys(c
,c
->argv
+1,c
->argc
-2,timeout
);
5741 static void blpopCommand(redisClient
*c
) {
5742 blockingPopGenericCommand(c
,REDIS_HEAD
);
5745 static void brpopCommand(redisClient
*c
) {
5746 blockingPopGenericCommand(c
,REDIS_TAIL
);
5749 /* =============================== Replication ============================= */
5751 static int syncWrite(int fd
, char *ptr
, ssize_t size
, int timeout
) {
5752 ssize_t nwritten
, ret
= size
;
5753 time_t start
= time(NULL
);
5757 if (aeWait(fd
,AE_WRITABLE
,1000) & AE_WRITABLE
) {
5758 nwritten
= write(fd
,ptr
,size
);
5759 if (nwritten
== -1) return -1;
5763 if ((time(NULL
)-start
) > timeout
) {
5771 static int syncRead(int fd
, char *ptr
, ssize_t size
, int timeout
) {
5772 ssize_t nread
, totread
= 0;
5773 time_t start
= time(NULL
);
5777 if (aeWait(fd
,AE_READABLE
,1000) & AE_READABLE
) {
5778 nread
= read(fd
,ptr
,size
);
5779 if (nread
== -1) return -1;
5784 if ((time(NULL
)-start
) > timeout
) {
5792 static int syncReadLine(int fd
, char *ptr
, ssize_t size
, int timeout
) {
5799 if (syncRead(fd
,&c
,1,timeout
) == -1) return -1;
5802 if (nread
&& *(ptr
-1) == '\r') *(ptr
-1) = '\0';
5813 static void syncCommand(redisClient
*c
) {
5814 /* ignore SYNC if aleady slave or in monitor mode */
5815 if (c
->flags
& REDIS_SLAVE
) return;
5817 /* SYNC can't be issued when the server has pending data to send to
5818 * the client about already issued commands. We need a fresh reply
5819 * buffer registering the differences between the BGSAVE and the current
5820 * dataset, so that we can copy to other slaves if needed. */
5821 if (listLength(c
->reply
) != 0) {
5822 addReplySds(c
,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
5826 redisLog(REDIS_NOTICE
,"Slave ask for synchronization");
5827 /* Here we need to check if there is a background saving operation
5828 * in progress, or if it is required to start one */
5829 if (server
.bgsavechildpid
!= -1) {
5830 /* Ok a background save is in progress. Let's check if it is a good
5831 * one for replication, i.e. if there is another slave that is
5832 * registering differences since the server forked to save */
5836 listRewind(server
.slaves
);
5837 while((ln
= listYield(server
.slaves
))) {
5839 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) break;
5842 /* Perfect, the server is already registering differences for
5843 * another slave. Set the right state, and copy the buffer. */
5844 listRelease(c
->reply
);
5845 c
->reply
= listDup(slave
->reply
);
5846 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
5847 redisLog(REDIS_NOTICE
,"Waiting for end of BGSAVE for SYNC");
5849 /* No way, we need to wait for the next BGSAVE in order to
5850 * register differences */
5851 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_START
;
5852 redisLog(REDIS_NOTICE
,"Waiting for next BGSAVE for SYNC");
5855 /* Ok we don't have a BGSAVE in progress, let's start one */
5856 redisLog(REDIS_NOTICE
,"Starting BGSAVE for SYNC");
5857 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
5858 redisLog(REDIS_NOTICE
,"Replication failed, can't BGSAVE");
5859 addReplySds(c
,sdsnew("-ERR Unalbe to perform background save\r\n"));
5862 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
5865 c
->flags
|= REDIS_SLAVE
;
5867 listAddNodeTail(server
.slaves
,c
);
5871 static void sendBulkToSlave(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
5872 redisClient
*slave
= privdata
;
5874 REDIS_NOTUSED(mask
);
5875 char buf
[REDIS_IOBUF_LEN
];
5876 ssize_t nwritten
, buflen
;
5878 if (slave
->repldboff
== 0) {
5879 /* Write the bulk write count before to transfer the DB. In theory here
5880 * we don't know how much room there is in the output buffer of the
5881 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
5882 * operations) will never be smaller than the few bytes we need. */
5885 bulkcount
= sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
5887 if (write(fd
,bulkcount
,sdslen(bulkcount
)) != (signed)sdslen(bulkcount
))
5895 lseek(slave
->repldbfd
,slave
->repldboff
,SEEK_SET
);
5896 buflen
= read(slave
->repldbfd
,buf
,REDIS_IOBUF_LEN
);
5898 redisLog(REDIS_WARNING
,"Read error sending DB to slave: %s",
5899 (buflen
== 0) ? "premature EOF" : strerror(errno
));
5903 if ((nwritten
= write(fd
,buf
,buflen
)) == -1) {
5904 redisLog(REDIS_DEBUG
,"Write error sending DB to slave: %s",
5909 slave
->repldboff
+= nwritten
;
5910 if (slave
->repldboff
== slave
->repldbsize
) {
5911 close(slave
->repldbfd
);
5912 slave
->repldbfd
= -1;
5913 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
5914 slave
->replstate
= REDIS_REPL_ONLINE
;
5915 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
,
5916 sendReplyToClient
, slave
) == AE_ERR
) {
5920 addReplySds(slave
,sdsempty());
5921 redisLog(REDIS_NOTICE
,"Synchronization with slave succeeded");
5925 /* This function is called at the end of every backgrond saving.
5926 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
5927 * otherwise REDIS_ERR is passed to the function.
5929 * The goal of this function is to handle slaves waiting for a successful
5930 * background saving in order to perform non-blocking synchronization. */
5931 static void updateSlavesWaitingBgsave(int bgsaveerr
) {
5933 int startbgsave
= 0;
5935 listRewind(server
.slaves
);
5936 while((ln
= listYield(server
.slaves
))) {
5937 redisClient
*slave
= ln
->value
;
5939 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) {
5941 slave
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
5942 } else if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) {
5943 struct redis_stat buf
;
5945 if (bgsaveerr
!= REDIS_OK
) {
5947 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE child returned an error");
5950 if ((slave
->repldbfd
= open(server
.dbfilename
,O_RDONLY
)) == -1 ||
5951 redis_fstat(slave
->repldbfd
,&buf
) == -1) {
5953 redisLog(REDIS_WARNING
,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno
));
5956 slave
->repldboff
= 0;
5957 slave
->repldbsize
= buf
.st_size
;
5958 slave
->replstate
= REDIS_REPL_SEND_BULK
;
5959 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
5960 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
, sendBulkToSlave
, slave
) == AE_ERR
) {
5967 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
5968 listRewind(server
.slaves
);
5969 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE failed");
5970 while((ln
= listYield(server
.slaves
))) {
5971 redisClient
*slave
= ln
->value
;
5973 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
)
5980 static int syncWithMaster(void) {
5981 char buf
[1024], tmpfile
[256], authcmd
[1024];
5983 int fd
= anetTcpConnect(NULL
,server
.masterhost
,server
.masterport
);
5987 redisLog(REDIS_WARNING
,"Unable to connect to MASTER: %s",
5992 /* AUTH with the master if required. */
5993 if(server
.masterauth
) {
5994 snprintf(authcmd
, 1024, "AUTH %s\r\n", server
.masterauth
);
5995 if (syncWrite(fd
, authcmd
, strlen(server
.masterauth
)+7, 5) == -1) {
5997 redisLog(REDIS_WARNING
,"Unable to AUTH to MASTER: %s",
6001 /* Read the AUTH result. */
6002 if (syncReadLine(fd
,buf
,1024,3600) == -1) {
6004 redisLog(REDIS_WARNING
,"I/O error reading auth result from MASTER: %s",
6008 if (buf
[0] != '+') {
6010 redisLog(REDIS_WARNING
,"Cannot AUTH to MASTER, is the masterauth password correct?");
6015 /* Issue the SYNC command */
6016 if (syncWrite(fd
,"SYNC \r\n",7,5) == -1) {
6018 redisLog(REDIS_WARNING
,"I/O error writing to MASTER: %s",
6022 /* Read the bulk write count */
6023 if (syncReadLine(fd
,buf
,1024,3600) == -1) {
6025 redisLog(REDIS_WARNING
,"I/O error reading bulk count from MASTER: %s",
6029 if (buf
[0] != '$') {
6031 redisLog(REDIS_WARNING
,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
6034 dumpsize
= atoi(buf
+1);
6035 redisLog(REDIS_NOTICE
,"Receiving %d bytes data dump from MASTER",dumpsize
);
6036 /* Read the bulk write data on a temp file */
6037 snprintf(tmpfile
,256,"temp-%d.%ld.rdb",(int)time(NULL
),(long int)random());
6038 dfd
= open(tmpfile
,O_CREAT
|O_WRONLY
,0644);
6041 redisLog(REDIS_WARNING
,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno
));
6045 int nread
, nwritten
;
6047 nread
= read(fd
,buf
,(dumpsize
< 1024)?dumpsize
:1024);
6049 redisLog(REDIS_WARNING
,"I/O error trying to sync with MASTER: %s",
6055 nwritten
= write(dfd
,buf
,nread
);
6056 if (nwritten
== -1) {
6057 redisLog(REDIS_WARNING
,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno
));
6065 if (rename(tmpfile
,server
.dbfilename
) == -1) {
6066 redisLog(REDIS_WARNING
,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno
));
6072 if (rdbLoad(server
.dbfilename
) != REDIS_OK
) {
6073 redisLog(REDIS_WARNING
,"Failed trying to load the MASTER synchronization DB from disk");
6077 server
.master
= createClient(fd
);
6078 server
.master
->flags
|= REDIS_MASTER
;
6079 server
.master
->authenticated
= 1;
6080 server
.replstate
= REDIS_REPL_CONNECTED
;
6084 static void slaveofCommand(redisClient
*c
) {
6085 if (!strcasecmp(c
->argv
[1]->ptr
,"no") &&
6086 !strcasecmp(c
->argv
[2]->ptr
,"one")) {
6087 if (server
.masterhost
) {
6088 sdsfree(server
.masterhost
);
6089 server
.masterhost
= NULL
;
6090 if (server
.master
) freeClient(server
.master
);
6091 server
.replstate
= REDIS_REPL_NONE
;
6092 redisLog(REDIS_NOTICE
,"MASTER MODE enabled (user request)");
6095 sdsfree(server
.masterhost
);
6096 server
.masterhost
= sdsdup(c
->argv
[1]->ptr
);
6097 server
.masterport
= atoi(c
->argv
[2]->ptr
);
6098 if (server
.master
) freeClient(server
.master
);
6099 server
.replstate
= REDIS_REPL_CONNECT
;
6100 redisLog(REDIS_NOTICE
,"SLAVE OF %s:%d enabled (user request)",
6101 server
.masterhost
, server
.masterport
);
6103 addReply(c
,shared
.ok
);
6106 /* ============================ Maxmemory directive ======================== */
6108 /* This function gets called when 'maxmemory' is set on the config file to limit
6109 * the max memory used by the server, and we are out of memory.
6110 * This function will try to, in order:
6112 * - Free objects from the free list
6113 * - Try to remove keys with an EXPIRE set
6115 * It is not possible to free enough memory to reach used-memory < maxmemory
6116 * the server will start refusing commands that will enlarge even more the
6119 static void freeMemoryIfNeeded(void) {
6120 while (server
.maxmemory
&& zmalloc_used_memory() > server
.maxmemory
) {
6121 if (listLength(server
.objfreelist
)) {
6124 listNode
*head
= listFirst(server
.objfreelist
);
6125 o
= listNodeValue(head
);
6126 listDelNode(server
.objfreelist
,head
);
6129 int j
, k
, freed
= 0;
6131 for (j
= 0; j
< server
.dbnum
; j
++) {
6133 robj
*minkey
= NULL
;
6134 struct dictEntry
*de
;
6136 if (dictSize(server
.db
[j
].expires
)) {
6138 /* From a sample of three keys drop the one nearest to
6139 * the natural expire */
6140 for (k
= 0; k
< 3; k
++) {
6143 de
= dictGetRandomKey(server
.db
[j
].expires
);
6144 t
= (time_t) dictGetEntryVal(de
);
6145 if (minttl
== -1 || t
< minttl
) {
6146 minkey
= dictGetEntryKey(de
);
6150 deleteKey(server
.db
+j
,minkey
);
6153 if (!freed
) return; /* nothing to free... */
6158 /* ============================== Append Only file ========================== */
6160 static void feedAppendOnlyFile(struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
) {
6161 sds buf
= sdsempty();
6167 /* The DB this command was targetting is not the same as the last command
6168 * we appendend. To issue a SELECT command is needed. */
6169 if (dictid
!= server
.appendseldb
) {
6172 snprintf(seldb
,sizeof(seldb
),"%d",dictid
);
6173 buf
= sdscatprintf(buf
,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
6174 (unsigned long)strlen(seldb
),seldb
);
6175 server
.appendseldb
= dictid
;
6178 /* "Fix" the argv vector if the command is EXPIRE. We want to translate
6179 * EXPIREs into EXPIREATs calls */
6180 if (cmd
->proc
== expireCommand
) {
6183 tmpargv
[0] = createStringObject("EXPIREAT",8);
6184 tmpargv
[1] = argv
[1];
6185 incrRefCount(argv
[1]);
6186 when
= time(NULL
)+strtol(argv
[2]->ptr
,NULL
,10);
6187 tmpargv
[2] = createObject(REDIS_STRING
,
6188 sdscatprintf(sdsempty(),"%ld",when
));
6192 /* Append the actual command */
6193 buf
= sdscatprintf(buf
,"*%d\r\n",argc
);
6194 for (j
= 0; j
< argc
; j
++) {
6197 o
= getDecodedObject(o
);
6198 buf
= sdscatprintf(buf
,"$%lu\r\n",(unsigned long)sdslen(o
->ptr
));
6199 buf
= sdscatlen(buf
,o
->ptr
,sdslen(o
->ptr
));
6200 buf
= sdscatlen(buf
,"\r\n",2);
6204 /* Free the objects from the modified argv for EXPIREAT */
6205 if (cmd
->proc
== expireCommand
) {
6206 for (j
= 0; j
< 3; j
++)
6207 decrRefCount(argv
[j
]);
6210 /* We want to perform a single write. This should be guaranteed atomic
6211 * at least if the filesystem we are writing is a real physical one.
6212 * While this will save us against the server being killed I don't think
6213 * there is much to do about the whole server stopping for power problems
6215 nwritten
= write(server
.appendfd
,buf
,sdslen(buf
));
6216 if (nwritten
!= (signed)sdslen(buf
)) {
6217 /* Ooops, we are in troubles. The best thing to do for now is
6218 * to simply exit instead to give the illusion that everything is
6219 * working as expected. */
6220 if (nwritten
== -1) {
6221 redisLog(REDIS_WARNING
,"Exiting on error writing to the append-only file: %s",strerror(errno
));
6223 redisLog(REDIS_WARNING
,"Exiting on short write while writing to the append-only file: %s",strerror(errno
));
6227 /* If a background append only file rewriting is in progress we want to
6228 * accumulate the differences between the child DB and the current one
6229 * in a buffer, so that when the child process will do its work we
6230 * can append the differences to the new append only file. */
6231 if (server
.bgrewritechildpid
!= -1)
6232 server
.bgrewritebuf
= sdscatlen(server
.bgrewritebuf
,buf
,sdslen(buf
));
6236 if (server
.appendfsync
== APPENDFSYNC_ALWAYS
||
6237 (server
.appendfsync
== APPENDFSYNC_EVERYSEC
&&
6238 now
-server
.lastfsync
> 1))
6240 fsync(server
.appendfd
); /* Let's try to get this data on the disk */
6241 server
.lastfsync
= now
;
6245 /* In Redis commands are always executed in the context of a client, so in
6246 * order to load the append only file we need to create a fake client. */
6247 static struct redisClient
*createFakeClient(void) {
6248 struct redisClient
*c
= zmalloc(sizeof(*c
));
6252 c
->querybuf
= sdsempty();
6256 /* We set the fake client as a slave waiting for the synchronization
6257 * so that Redis will not try to send replies to this client. */
6258 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_START
;
6259 c
->reply
= listCreate();
6260 listSetFreeMethod(c
->reply
,decrRefCount
);
6261 listSetDupMethod(c
->reply
,dupClientReplyValue
);
6265 static void freeFakeClient(struct redisClient
*c
) {
6266 sdsfree(c
->querybuf
);
6267 listRelease(c
->reply
);
6271 /* Replay the append log file. On error REDIS_OK is returned. On non fatal
6272 * error (the append only file is zero-length) REDIS_ERR is returned. On
6273 * fatal error an error message is logged and the program exists. */
6274 int loadAppendOnlyFile(char *filename
) {
6275 struct redisClient
*fakeClient
;
6276 FILE *fp
= fopen(filename
,"r");
6277 struct redis_stat sb
;
6279 if (redis_fstat(fileno(fp
),&sb
) != -1 && sb
.st_size
== 0)
6283 redisLog(REDIS_WARNING
,"Fatal error: can't open the append log file for reading: %s",strerror(errno
));
6287 fakeClient
= createFakeClient();
6294 struct redisCommand
*cmd
;
6296 if (fgets(buf
,sizeof(buf
),fp
) == NULL
) {
6302 if (buf
[0] != '*') goto fmterr
;
6304 argv
= zmalloc(sizeof(robj
*)*argc
);
6305 for (j
= 0; j
< argc
; j
++) {
6306 if (fgets(buf
,sizeof(buf
),fp
) == NULL
) goto readerr
;
6307 if (buf
[0] != '$') goto fmterr
;
6308 len
= strtol(buf
+1,NULL
,10);
6309 argsds
= sdsnewlen(NULL
,len
);
6310 if (len
&& fread(argsds
,len
,1,fp
) == 0) goto fmterr
;
6311 argv
[j
] = createObject(REDIS_STRING
,argsds
);
6312 if (fread(buf
,2,1,fp
) == 0) goto fmterr
; /* discard CRLF */
6315 /* Command lookup */
6316 cmd
= lookupCommand(argv
[0]->ptr
);
6318 redisLog(REDIS_WARNING
,"Unknown command '%s' reading the append only file", argv
[0]->ptr
);
6321 /* Try object sharing and encoding */
6322 if (server
.shareobjects
) {
6324 for(j
= 1; j
< argc
; j
++)
6325 argv
[j
] = tryObjectSharing(argv
[j
]);
6327 if (cmd
->flags
& REDIS_CMD_BULK
)
6328 tryObjectEncoding(argv
[argc
-1]);
6329 /* Run the command in the context of a fake client */
6330 fakeClient
->argc
= argc
;
6331 fakeClient
->argv
= argv
;
6332 cmd
->proc(fakeClient
);
6333 /* Discard the reply objects list from the fake client */
6334 while(listLength(fakeClient
->reply
))
6335 listDelNode(fakeClient
->reply
,listFirst(fakeClient
->reply
));
6336 /* Clean up, ready for the next command */
6337 for (j
= 0; j
< argc
; j
++) decrRefCount(argv
[j
]);
6341 freeFakeClient(fakeClient
);
6346 redisLog(REDIS_WARNING
,"Unexpected end of file reading the append only file");
6348 redisLog(REDIS_WARNING
,"Unrecoverable error reading the append only file: %s", strerror(errno
));
6352 redisLog(REDIS_WARNING
,"Bad file format reading the append only file");
6356 /* Write an object into a file in the bulk format $<count>\r\n<payload>\r\n */
6357 static int fwriteBulk(FILE *fp
, robj
*obj
) {
6359 obj
= getDecodedObject(obj
);
6360 snprintf(buf
,sizeof(buf
),"$%ld\r\n",(long)sdslen(obj
->ptr
));
6361 if (fwrite(buf
,strlen(buf
),1,fp
) == 0) goto err
;
6362 if (sdslen(obj
->ptr
) && fwrite(obj
->ptr
,sdslen(obj
->ptr
),1,fp
) == 0)
6364 if (fwrite("\r\n",2,1,fp
) == 0) goto err
;
6372 /* Write a double value in bulk format $<count>\r\n<payload>\r\n */
6373 static int fwriteBulkDouble(FILE *fp
, double d
) {
6374 char buf
[128], dbuf
[128];
6376 snprintf(dbuf
,sizeof(dbuf
),"%.17g\r\n",d
);
6377 snprintf(buf
,sizeof(buf
),"$%lu\r\n",(unsigned long)strlen(dbuf
)-2);
6378 if (fwrite(buf
,strlen(buf
),1,fp
) == 0) return 0;
6379 if (fwrite(dbuf
,strlen(dbuf
),1,fp
) == 0) return 0;
6383 /* Write a long value in bulk format $<count>\r\n<payload>\r\n */
6384 static int fwriteBulkLong(FILE *fp
, long l
) {
6385 char buf
[128], lbuf
[128];
6387 snprintf(lbuf
,sizeof(lbuf
),"%ld\r\n",l
);
6388 snprintf(buf
,sizeof(buf
),"$%lu\r\n",(unsigned long)strlen(lbuf
)-2);
6389 if (fwrite(buf
,strlen(buf
),1,fp
) == 0) return 0;
6390 if (fwrite(lbuf
,strlen(lbuf
),1,fp
) == 0) return 0;
6394 /* Write a sequence of commands able to fully rebuild the dataset into
6395 * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */
6396 static int rewriteAppendOnlyFile(char *filename
) {
6397 dictIterator
*di
= NULL
;
6402 time_t now
= time(NULL
);
6404 /* Note that we have to use a different temp name here compared to the
6405 * one used by rewriteAppendOnlyFileBackground() function. */
6406 snprintf(tmpfile
,256,"temp-rewriteaof-%d.aof", (int) getpid());
6407 fp
= fopen(tmpfile
,"w");
6409 redisLog(REDIS_WARNING
, "Failed rewriting the append only file: %s", strerror(errno
));
6412 for (j
= 0; j
< server
.dbnum
; j
++) {
6413 char selectcmd
[] = "*2\r\n$6\r\nSELECT\r\n";
6414 redisDb
*db
= server
.db
+j
;
6416 if (dictSize(d
) == 0) continue;
6417 di
= dictGetIterator(d
);
6423 /* SELECT the new DB */
6424 if (fwrite(selectcmd
,sizeof(selectcmd
)-1,1,fp
) == 0) goto werr
;
6425 if (fwriteBulkLong(fp
,j
) == 0) goto werr
;
6427 /* Iterate this DB writing every entry */
6428 while((de
= dictNext(di
)) != NULL
) {
6429 robj
*key
= dictGetEntryKey(de
);
6430 robj
*o
= dictGetEntryVal(de
);
6431 time_t expiretime
= getExpire(db
,key
);
6433 /* Save the key and associated value */
6434 if (o
->type
== REDIS_STRING
) {
6435 /* Emit a SET command */
6436 char cmd
[]="*3\r\n$3\r\nSET\r\n";
6437 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6439 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6440 if (fwriteBulk(fp
,o
) == 0) goto werr
;
6441 } else if (o
->type
== REDIS_LIST
) {
6442 /* Emit the RPUSHes needed to rebuild the list */
6443 list
*list
= o
->ptr
;
6447 while((ln
= listYield(list
))) {
6448 char cmd
[]="*3\r\n$5\r\nRPUSH\r\n";
6449 robj
*eleobj
= listNodeValue(ln
);
6451 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6452 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6453 if (fwriteBulk(fp
,eleobj
) == 0) goto werr
;
6455 } else if (o
->type
== REDIS_SET
) {
6456 /* Emit the SADDs needed to rebuild the set */
6458 dictIterator
*di
= dictGetIterator(set
);
6461 while((de
= dictNext(di
)) != NULL
) {
6462 char cmd
[]="*3\r\n$4\r\nSADD\r\n";
6463 robj
*eleobj
= dictGetEntryKey(de
);
6465 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6466 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6467 if (fwriteBulk(fp
,eleobj
) == 0) goto werr
;
6469 dictReleaseIterator(di
);
6470 } else if (o
->type
== REDIS_ZSET
) {
6471 /* Emit the ZADDs needed to rebuild the sorted set */
6473 dictIterator
*di
= dictGetIterator(zs
->dict
);
6476 while((de
= dictNext(di
)) != NULL
) {
6477 char cmd
[]="*4\r\n$4\r\nZADD\r\n";
6478 robj
*eleobj
= dictGetEntryKey(de
);
6479 double *score
= dictGetEntryVal(de
);
6481 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6482 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6483 if (fwriteBulkDouble(fp
,*score
) == 0) goto werr
;
6484 if (fwriteBulk(fp
,eleobj
) == 0) goto werr
;
6486 dictReleaseIterator(di
);
6488 redisAssert(0 != 0);
6490 /* Save the expire time */
6491 if (expiretime
!= -1) {
6492 char cmd
[]="*3\r\n$8\r\nEXPIREAT\r\n";
6493 /* If this key is already expired skip it */
6494 if (expiretime
< now
) continue;
6495 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6496 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6497 if (fwriteBulkLong(fp
,expiretime
) == 0) goto werr
;
6500 dictReleaseIterator(di
);
6503 /* Make sure data will not remain on the OS's output buffers */
6508 /* Use RENAME to make sure the DB file is changed atomically only
6509 * if the generate DB file is ok. */
6510 if (rename(tmpfile
,filename
) == -1) {
6511 redisLog(REDIS_WARNING
,"Error moving temp append only file on the final destination: %s", strerror(errno
));
6515 redisLog(REDIS_NOTICE
,"SYNC append only file rewrite performed");
6521 redisLog(REDIS_WARNING
,"Write error writing append only file on disk: %s", strerror(errno
));
6522 if (di
) dictReleaseIterator(di
);
6526 /* This is how rewriting of the append only file in background works:
6528 * 1) The user calls BGREWRITEAOF
6529 * 2) Redis calls this function, that forks():
6530 * 2a) the child rewrite the append only file in a temp file.
6531 * 2b) the parent accumulates differences in server.bgrewritebuf.
6532 * 3) When the child finished '2a' exists.
6533 * 4) The parent will trap the exit code, if it's OK, will append the
6534 * data accumulated into server.bgrewritebuf into the temp file, and
6535 * finally will rename(2) the temp file in the actual file name.
6536 * The the new file is reopened as the new append only file. Profit!
6538 static int rewriteAppendOnlyFileBackground(void) {
6541 if (server
.bgrewritechildpid
!= -1) return REDIS_ERR
;
6542 if ((childpid
= fork()) == 0) {
6547 snprintf(tmpfile
,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
6548 if (rewriteAppendOnlyFile(tmpfile
) == REDIS_OK
) {
6555 if (childpid
== -1) {
6556 redisLog(REDIS_WARNING
,
6557 "Can't rewrite append only file in background: fork: %s",
6561 redisLog(REDIS_NOTICE
,
6562 "Background append only file rewriting started by pid %d",childpid
);
6563 server
.bgrewritechildpid
= childpid
;
6564 /* We set appendseldb to -1 in order to force the next call to the
6565 * feedAppendOnlyFile() to issue a SELECT command, so the differences
6566 * accumulated by the parent into server.bgrewritebuf will start
6567 * with a SELECT statement and it will be safe to merge. */
6568 server
.appendseldb
= -1;
6571 return REDIS_OK
; /* unreached */
6574 static void bgrewriteaofCommand(redisClient
*c
) {
6575 if (server
.bgrewritechildpid
!= -1) {
6576 addReplySds(c
,sdsnew("-ERR background append only file rewriting already in progress\r\n"));
6579 if (rewriteAppendOnlyFileBackground() == REDIS_OK
) {
6580 char *status
= "+Background append only file rewriting started\r\n";
6581 addReplySds(c
,sdsnew(status
));
6583 addReply(c
,shared
.err
);
6587 static void aofRemoveTempFile(pid_t childpid
) {
6590 snprintf(tmpfile
,256,"temp-rewriteaof-bg-%d.aof", (int) childpid
);
6594 /* =============================== Virtual Memory =========================== */
6595 static void vmInit(void) {
6598 server
.vm_fp
= fopen("/tmp/redisvm","w+b");
6599 if (server
.vm_fp
== NULL
) {
6600 redisLog(REDIS_WARNING
,"Impossible to open the swap file. Exiting.");
6603 server
.vm_fd
= fileno(server
.vm_fp
);
6604 server
.vm_next_page
= 0;
6605 server
.vm_near_pages
= 0;
6606 totsize
= server
.vm_pages
*server
.vm_page_size
;
6607 redisLog(REDIS_NOTICE
,"Allocating %lld bytes of swap file",totsize
);
6608 if (ftruncate(server
.vm_fd
,totsize
) == -1) {
6609 redisLog(REDIS_WARNING
,"Can't ftruncate swap file: %s. Exiting.",
6613 redisLog(REDIS_NOTICE
,"Swap file allocated with success");
6615 server
.vm_bitmap
= zmalloc((server
.vm_near_pages
+7)/8);
6616 memset(server
.vm_bitmap
,0,(server
.vm_near_pages
+7)/8);
6617 /* Try to remove the swap file, so the OS will really delete it from the
6618 * file system when Redis exists. */
6619 unlink("/tmp/redisvm");
6622 /* Mark the page as used */
6623 static void vmMarkPageUsed(off_t page
) {
6624 off_t byte
= page
/8;
6626 server
.vm_bitmap
[byte
] |= 1<<bit
;
6629 /* Mark N contiguous pages as used, with 'page' being the first. */
6630 static void vmMarkPagesUsed(off_t page
, off_t count
) {
6633 for (j
= 0; j
< count
; j
++)
6634 vmMarkPageUsed(page
+count
);
6637 /* Mark the page as free */
6638 static void vmMarkPageFree(off_t page
) {
6639 off_t byte
= page
/8;
6641 server
.vm_bitmap
[byte
] &= ~(1<<bit
);
6644 /* Mark N contiguous pages as free, with 'page' being the first. */
6645 static void vmMarkPagesFree(off_t page
, off_t count
) {
6648 for (j
= 0; j
< count
; j
++)
6649 vmMarkPageFree(page
+count
);
6652 /* Test if the page is free */
6653 static int vmFreePage(off_t page
) {
6654 off_t byte
= page
/8;
6656 return server
.vm_bitmap
[byte
] & bit
;
6659 /* Find N contiguous free pages storing the first page of the cluster in *first.
6660 * Returns 1 if it was able to find N contiguous pages, otherwise 0 is
6663 * This function uses a simple algorithm: we try to allocate
6664 * REDIS_VM_MAX_NEAR_PAGES sequentially, when we reach this limit we start
6665 * again from the start of the swap file searching for free spaces.
6667 * If it looks pretty clear that there are no free pages near our offset
6668 * we try to find less populated places doing a forward jump of
6669 * REDIS_VM_MAX_RANDOM_JUMP, then we start scanning again a few pages
6670 * without hurry, and then we jump again and so forth...
6672 * This function can be improved using a free list to avoid to guess
6673 * too much, since we could collect data about freed pages.
6675 * note: I implemented this function just after watching an episode of
6676 * Battlestar Galactica, where the hybrid was continuing to say "JUMP!"
6678 static int vmFindContiguousPages(off_t
*first
, int n
) {
6679 off_t base
, offset
= 0, since_jump
= 0, numfree
= 0;
6681 if (server
.vm_near_pages
== REDIS_VM_MAX_NEAR_PAGES
) {
6682 server
.vm_near_pages
= 0;
6683 server
.vm_next_page
= 0;
6685 server
.vm_near_pages
++; /* Yet another try for pages near to the old ones */
6686 base
= server
.vm_next_page
;
6688 while(offset
< server
.vm_pages
) {
6689 off_t
this = base
+offset
;
6691 /* If we overflow, restart from page zero */
6692 if (this >= server
.vm_pages
) {
6693 this -= server
.vm_pages
;
6695 /* Just overflowed, what we found on tail is no longer
6696 * interesting, as it's no longer contiguous. */
6700 if (vmFreePage(this)) {
6701 /* This is a free page */
6703 /* Already got N free pages? Return to the caller, with success */
6709 /* The current one is not a free page */
6713 /* Fast-forward if the current page is not free and we already
6714 * searched enough near this place. */
6716 if (!numfree
&& since_jump
>= REDIS_VM_MAX_RANDOM_JUMP
/4) {
6717 offset
+= random() % REDIS_VM_MAX_RANDOM_JUMP
;
6719 /* Note that even if we rewind after the jump, we are don't need
6720 * to make sure numfree is set to zero as we only jump *if* it
6721 * is set to zero. */
6723 /* Otherwise just check the next page */
6730 /* ================================= Debugging ============================== */
6732 static void debugCommand(redisClient
*c
) {
6733 if (!strcasecmp(c
->argv
[1]->ptr
,"segfault")) {
6735 } else if (!strcasecmp(c
->argv
[1]->ptr
,"reload")) {
6736 if (rdbSave(server
.dbfilename
) != REDIS_OK
) {
6737 addReply(c
,shared
.err
);
6741 if (rdbLoad(server
.dbfilename
) != REDIS_OK
) {
6742 addReply(c
,shared
.err
);
6745 redisLog(REDIS_WARNING
,"DB reloaded by DEBUG RELOAD");
6746 addReply(c
,shared
.ok
);
6747 } else if (!strcasecmp(c
->argv
[1]->ptr
,"loadaof")) {
6749 if (loadAppendOnlyFile(server
.appendfilename
) != REDIS_OK
) {
6750 addReply(c
,shared
.err
);
6753 redisLog(REDIS_WARNING
,"Append Only File loaded by DEBUG LOADAOF");
6754 addReply(c
,shared
.ok
);
6755 } else if (!strcasecmp(c
->argv
[1]->ptr
,"object") && c
->argc
== 3) {
6756 dictEntry
*de
= dictFind(c
->db
->dict
,c
->argv
[2]);
6760 addReply(c
,shared
.nokeyerr
);
6763 key
= dictGetEntryKey(de
);
6764 val
= dictGetEntryVal(de
);
6765 addReplySds(c
,sdscatprintf(sdsempty(),
6766 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d serializedlength:%lld\r\n",
6767 (void*)key
, key
->refcount
, (void*)val
, val
->refcount
,
6768 val
->encoding
, rdbSavedObjectLen(val
)));
6770 addReplySds(c
,sdsnew(
6771 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>|RELOAD]\r\n"));
6775 static void _redisAssert(char *estr
) {
6776 redisLog(REDIS_WARNING
,"=== ASSERTION FAILED ===");
6777 redisLog(REDIS_WARNING
,"==> %s\n",estr
);
6778 #ifdef HAVE_BACKTRACE
6779 redisLog(REDIS_WARNING
,"(forcing SIGSEGV in order to print the stack trace)");
6784 /* =================================== Main! ================================ */
6787 int linuxOvercommitMemoryValue(void) {
6788 FILE *fp
= fopen("/proc/sys/vm/overcommit_memory","r");
6792 if (fgets(buf
,64,fp
) == NULL
) {
6801 void linuxOvercommitMemoryWarning(void) {
6802 if (linuxOvercommitMemoryValue() == 0) {
6803 redisLog(REDIS_WARNING
,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
6806 #endif /* __linux__ */
6808 static void daemonize(void) {
6812 if (fork() != 0) exit(0); /* parent exits */
6813 printf("New pid: %d\n", getpid());
6814 setsid(); /* create a new session */
6816 /* Every output goes to /dev/null. If Redis is daemonized but
6817 * the 'logfile' is set to 'stdout' in the configuration file
6818 * it will not log at all. */
6819 if ((fd
= open("/dev/null", O_RDWR
, 0)) != -1) {
6820 dup2(fd
, STDIN_FILENO
);
6821 dup2(fd
, STDOUT_FILENO
);
6822 dup2(fd
, STDERR_FILENO
);
6823 if (fd
> STDERR_FILENO
) close(fd
);
6825 /* Try to write the pid file */
6826 fp
= fopen(server
.pidfile
,"w");
6828 fprintf(fp
,"%d\n",getpid());
6833 int main(int argc
, char **argv
) {
6836 resetServerSaveParams();
6837 loadServerConfig(argv
[1]);
6838 } else if (argc
> 2) {
6839 fprintf(stderr
,"Usage: ./redis-server [/path/to/redis.conf]\n");
6842 redisLog(REDIS_WARNING
,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
6844 if (server
.daemonize
) daemonize();
6846 redisLog(REDIS_NOTICE
,"Server started, Redis version " REDIS_VERSION
);
6848 linuxOvercommitMemoryWarning();
6850 if (server
.appendonly
) {
6851 if (loadAppendOnlyFile(server
.appendfilename
) == REDIS_OK
)
6852 redisLog(REDIS_NOTICE
,"DB loaded from append only file");
6854 if (rdbLoad(server
.dbfilename
) == REDIS_OK
)
6855 redisLog(REDIS_NOTICE
,"DB loaded from disk");
6857 if (aeCreateFileEvent(server
.el
, server
.fd
, AE_READABLE
,
6858 acceptHandler
, NULL
) == AE_ERR
) oom("creating file event");
6859 redisLog(REDIS_NOTICE
,"The server is now ready to accept connections on port %d", server
.port
);
6861 aeDeleteEventLoop(server
.el
);
6865 /* ============================= Backtrace support ========================= */
6867 #ifdef HAVE_BACKTRACE
6868 static char *findFuncName(void *pointer
, unsigned long *offset
);
6870 static void *getMcontextEip(ucontext_t
*uc
) {
6871 #if defined(__FreeBSD__)
6872 return (void*) uc
->uc_mcontext
.mc_eip
;
6873 #elif defined(__dietlibc__)
6874 return (void*) uc
->uc_mcontext
.eip
;
6875 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
6877 return (void*) uc
->uc_mcontext
->__ss
.__rip
;
6879 return (void*) uc
->uc_mcontext
->__ss
.__eip
;
6881 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
6882 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
6883 return (void*) uc
->uc_mcontext
->__ss
.__rip
;
6885 return (void*) uc
->uc_mcontext
->__ss
.__eip
;
6887 #elif defined(__i386__) || defined(__X86_64__) || defined(__x86_64__)
6888 return (void*) uc
->uc_mcontext
.gregs
[REG_EIP
]; /* Linux 32/64 bit */
6889 #elif defined(__ia64__) /* Linux IA64 */
6890 return (void*) uc
->uc_mcontext
.sc_ip
;
6896 static void segvHandler(int sig
, siginfo_t
*info
, void *secret
) {
6898 char **messages
= NULL
;
6899 int i
, trace_size
= 0;
6900 unsigned long offset
=0;
6901 ucontext_t
*uc
= (ucontext_t
*) secret
;
6903 REDIS_NOTUSED(info
);
6905 redisLog(REDIS_WARNING
,
6906 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION
, sig
);
6907 infostring
= genRedisInfoString();
6908 redisLog(REDIS_WARNING
, "%s",infostring
);
6909 /* It's not safe to sdsfree() the returned string under memory
6910 * corruption conditions. Let it leak as we are going to abort */
6912 trace_size
= backtrace(trace
, 100);
6913 /* overwrite sigaction with caller's address */
6914 if (getMcontextEip(uc
) != NULL
) {
6915 trace
[1] = getMcontextEip(uc
);
6917 messages
= backtrace_symbols(trace
, trace_size
);
6919 for (i
=1; i
<trace_size
; ++i
) {
6920 char *fn
= findFuncName(trace
[i
], &offset
), *p
;
6922 p
= strchr(messages
[i
],'+');
6923 if (!fn
|| (p
&& ((unsigned long)strtol(p
+1,NULL
,10)) < offset
)) {
6924 redisLog(REDIS_WARNING
,"%s", messages
[i
]);
6926 redisLog(REDIS_WARNING
,"%d redis-server %p %s + %d", i
, trace
[i
], fn
, (unsigned int)offset
);
6929 /* free(messages); Don't call free() with possibly corrupted memory. */
6933 static void setupSigSegvAction(void) {
6934 struct sigaction act
;
6936 sigemptyset (&act
.sa_mask
);
6937 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
6938 * is used. Otherwise, sa_handler is used */
6939 act
.sa_flags
= SA_NODEFER
| SA_ONSTACK
| SA_RESETHAND
| SA_SIGINFO
;
6940 act
.sa_sigaction
= segvHandler
;
6941 sigaction (SIGSEGV
, &act
, NULL
);
6942 sigaction (SIGBUS
, &act
, NULL
);
6943 sigaction (SIGFPE
, &act
, NULL
);
6944 sigaction (SIGILL
, &act
, NULL
);
6945 sigaction (SIGBUS
, &act
, NULL
);
6949 #include "staticsymbols.h"
6950 /* This function try to convert a pointer into a function name. It's used in
6951 * oreder to provide a backtrace under segmentation fault that's able to
6952 * display functions declared as static (otherwise the backtrace is useless). */
6953 static char *findFuncName(void *pointer
, unsigned long *offset
){
6955 unsigned long off
, minoff
= 0;
6957 /* Try to match against the Symbol with the smallest offset */
6958 for (i
=0; symsTable
[i
].pointer
; i
++) {
6959 unsigned long lp
= (unsigned long) pointer
;
6961 if (lp
!= (unsigned long)-1 && lp
>= symsTable
[i
].pointer
) {
6962 off
=lp
-symsTable
[i
].pointer
;
6963 if (ret
< 0 || off
< minoff
) {
6969 if (ret
== -1) return NULL
;
6971 return symsTable
[ret
].name
;
6973 #else /* HAVE_BACKTRACE */
6974 static void setupSigSegvAction(void) {
6976 #endif /* HAVE_BACKTRACE */