2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
30 #define REDIS_VERSION "1.001"
40 #define __USE_POSIX199309
46 #endif /* HAVE_BACKTRACE */
54 #include <arpa/inet.h>
58 #include <sys/resource.h>
62 #include "ae.h" /* Event driven programming library */
63 #include "sds.h" /* Dynamic safe strings */
64 #include "anet.h" /* Networking the easy way */
65 #include "dict.h" /* Hash tables */
66 #include "adlist.h" /* Linked lists */
67 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
68 #include "lzf.h" /* LZF compression library */
69 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
75 /* Static server configuration */
76 #define REDIS_SERVERPORT 6379 /* TCP port */
77 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
78 #define REDIS_IOBUF_LEN 1024
79 #define REDIS_LOADBUF_LEN 1024
80 #define REDIS_STATIC_ARGS 4
81 #define REDIS_DEFAULT_DBNUM 16
82 #define REDIS_CONFIGLINE_MAX 1024
83 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
84 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
85 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
86 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
87 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
89 /* Hash table parameters */
90 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
93 #define REDIS_CMD_BULK 1 /* Bulk write command */
94 #define REDIS_CMD_INLINE 2 /* Inline command */
95 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
96 this flags will return an error when the 'maxmemory' option is set in the
97 config file and the server is using more than maxmemory bytes of memory.
98 In short this commands are denied on low memory conditions. */
99 #define REDIS_CMD_DENYOOM 4
102 #define REDIS_STRING 0
107 /* Objects encoding */
108 #define REDIS_ENCODING_RAW 0 /* Raw representation */
109 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
111 /* Object types only used for dumping to disk */
112 #define REDIS_EXPIRETIME 253
113 #define REDIS_SELECTDB 254
114 #define REDIS_EOF 255
116 /* Defines related to the dump file format. To store 32 bits lengths for short
117 * keys requires a lot of space, so we check the most significant 2 bits of
118 * the first byte to interpreter the length:
120 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
121 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
122 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
123 * 11|000000 this means: specially encoded object will follow. The six bits
124 * number specify the kind of object that follows.
125 * See the REDIS_RDB_ENC_* defines.
127 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
128 * values, will fit inside. */
129 #define REDIS_RDB_6BITLEN 0
130 #define REDIS_RDB_14BITLEN 1
131 #define REDIS_RDB_32BITLEN 2
132 #define REDIS_RDB_ENCVAL 3
133 #define REDIS_RDB_LENERR UINT_MAX
135 /* When a length of a string object stored on disk has the first two bits
136 * set, the remaining two bits specify a special encoding for the object
137 * accordingly to the following defines: */
138 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
139 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
140 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
141 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
144 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
145 #define REDIS_SLAVE 2 /* This client is a slave server */
146 #define REDIS_MASTER 4 /* This client is a master server */
147 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
149 /* Slave replication state - slave side */
150 #define REDIS_REPL_NONE 0 /* No active replication */
151 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
152 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
154 /* Slave replication state - from the point of view of master
155 * Note that in SEND_BULK and ONLINE state the slave receives new updates
156 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
157 * to start the next background saving in order to send updates to it. */
158 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
159 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
160 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
161 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
163 /* List related stuff */
167 /* Sort operations */
168 #define REDIS_SORT_GET 0
169 #define REDIS_SORT_DEL 1
170 #define REDIS_SORT_INCR 2
171 #define REDIS_SORT_DECR 3
172 #define REDIS_SORT_ASC 4
173 #define REDIS_SORT_DESC 5
174 #define REDIS_SORTKEY_MAX 1024
177 #define REDIS_DEBUG 0
178 #define REDIS_NOTICE 1
179 #define REDIS_WARNING 2
181 /* Anti-warning macro... */
182 #define REDIS_NOTUSED(V) ((void) V)
185 /*================================= Data types ============================== */
187 /* A redis object, that is a type able to hold a string / list / set */
188 typedef struct redisObject
{
191 unsigned char encoding
;
192 unsigned char notused
[2];
196 typedef struct redisDb
{
202 /* With multiplexing we need to take per-clinet state.
203 * Clients are taken in a liked list. */
204 typedef struct redisClient
{
209 robj
**argv
, **mbargv
;
211 int bulklen
; /* bulk read len. -1 if not in bulk read mode */
212 int multibulk
; /* multi bulk command format active */
215 time_t lastinteraction
; /* time of the last interaction, used for timeout */
216 int flags
; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
217 int slaveseldb
; /* slave selected db, if this client is a slave */
218 int authenticated
; /* when requirepass is non-NULL */
219 int replstate
; /* replication state if this is a slave */
220 int repldbfd
; /* replication DB file descriptor */
221 long repldboff
; /* replication DB file offset */
222 off_t repldbsize
; /* replication DB file size */
230 /* Global server state structure */
236 unsigned int sharingpoolsize
;
237 long long dirty
; /* changes to DB from the last save */
239 list
*slaves
, *monitors
;
240 char neterr
[ANET_ERR_LEN
];
242 int cronloops
; /* number of times the cron function run */
243 list
*objfreelist
; /* A list of freed objects to avoid malloc() */
244 time_t lastsave
; /* Unix time of last save succeeede */
245 size_t usedmemory
; /* Used memory in megabytes */
246 /* Fields used only for stats */
247 time_t stat_starttime
; /* server start time */
248 long long stat_numcommands
; /* number of processed commands */
249 long long stat_numconnections
; /* number of connections received */
257 int bgsaveinprogress
;
258 pid_t bgsavechildpid
;
259 struct saveparam
*saveparams
;
266 /* Replication related */
270 redisClient
*master
; /* client that is master for this slave */
272 unsigned int maxclients
;
273 unsigned long maxmemory
;
274 /* Sort parameters - qsort_r() is only available under BSD so we
275 * have to take this state global, in order to pass it to sortCompare() */
281 typedef void redisCommandProc(redisClient
*c
);
282 struct redisCommand
{
284 redisCommandProc
*proc
;
289 struct redisFunctionSym
{
291 unsigned long pointer
;
294 typedef struct _redisSortObject
{
302 typedef struct _redisSortOperation
{
305 } redisSortOperation
;
307 struct sharedObjectsStruct
{
308 robj
*crlf
, *ok
, *err
, *emptybulk
, *czero
, *cone
, *pong
, *space
,
309 *colon
, *nullbulk
, *nullmultibulk
,
310 *emptymultibulk
, *wrongtypeerr
, *nokeyerr
, *syntaxerr
, *sameobjecterr
,
311 *outofrangeerr
, *plus
,
312 *select0
, *select1
, *select2
, *select3
, *select4
,
313 *select5
, *select6
, *select7
, *select8
, *select9
;
316 /*================================ Prototypes =============================== */
318 static void freeStringObject(robj
*o
);
319 static void freeListObject(robj
*o
);
320 static void freeSetObject(robj
*o
);
321 static void decrRefCount(void *o
);
322 static robj
*createObject(int type
, void *ptr
);
323 static void freeClient(redisClient
*c
);
324 static int rdbLoad(char *filename
);
325 static void addReply(redisClient
*c
, robj
*obj
);
326 static void addReplySds(redisClient
*c
, sds s
);
327 static void incrRefCount(robj
*o
);
328 static int rdbSaveBackground(char *filename
);
329 static robj
*createStringObject(char *ptr
, size_t len
);
330 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
);
331 static int syncWithMaster(void);
332 static robj
*tryObjectSharing(robj
*o
);
333 static int tryObjectEncoding(robj
*o
);
334 static robj
*getDecodedObject(const robj
*o
);
335 static int removeExpire(redisDb
*db
, robj
*key
);
336 static int expireIfNeeded(redisDb
*db
, robj
*key
);
337 static int deleteIfVolatile(redisDb
*db
, robj
*key
);
338 static int deleteKey(redisDb
*db
, robj
*key
);
339 static time_t getExpire(redisDb
*db
, robj
*key
);
340 static int setExpire(redisDb
*db
, robj
*key
, time_t when
);
341 static void updateSlavesWaitingBgsave(int bgsaveerr
);
342 static void freeMemoryIfNeeded(void);
343 static int processCommand(redisClient
*c
);
344 static void setupSigSegvAction(void);
345 static void rdbRemoveTempFile(pid_t childpid
);
346 static size_t stringObjectLen(robj
*o
);
347 static void processInputBuffer(redisClient
*c
);
349 static void authCommand(redisClient
*c
);
350 static void pingCommand(redisClient
*c
);
351 static void echoCommand(redisClient
*c
);
352 static void setCommand(redisClient
*c
);
353 static void setnxCommand(redisClient
*c
);
354 static void getCommand(redisClient
*c
);
355 static void delCommand(redisClient
*c
);
356 static void existsCommand(redisClient
*c
);
357 static void incrCommand(redisClient
*c
);
358 static void decrCommand(redisClient
*c
);
359 static void incrbyCommand(redisClient
*c
);
360 static void decrbyCommand(redisClient
*c
);
361 static void selectCommand(redisClient
*c
);
362 static void randomkeyCommand(redisClient
*c
);
363 static void keysCommand(redisClient
*c
);
364 static void dbsizeCommand(redisClient
*c
);
365 static void lastsaveCommand(redisClient
*c
);
366 static void saveCommand(redisClient
*c
);
367 static void bgsaveCommand(redisClient
*c
);
368 static void shutdownCommand(redisClient
*c
);
369 static void moveCommand(redisClient
*c
);
370 static void renameCommand(redisClient
*c
);
371 static void renamenxCommand(redisClient
*c
);
372 static void lpushCommand(redisClient
*c
);
373 static void rpushCommand(redisClient
*c
);
374 static void lpopCommand(redisClient
*c
);
375 static void rpopCommand(redisClient
*c
);
376 static void llenCommand(redisClient
*c
);
377 static void lindexCommand(redisClient
*c
);
378 static void lrangeCommand(redisClient
*c
);
379 static void ltrimCommand(redisClient
*c
);
380 static void typeCommand(redisClient
*c
);
381 static void lsetCommand(redisClient
*c
);
382 static void saddCommand(redisClient
*c
);
383 static void sremCommand(redisClient
*c
);
384 static void smoveCommand(redisClient
*c
);
385 static void sismemberCommand(redisClient
*c
);
386 static void scardCommand(redisClient
*c
);
387 static void spopCommand(redisClient
*c
);
388 static void sinterCommand(redisClient
*c
);
389 static void sinterstoreCommand(redisClient
*c
);
390 static void sunionCommand(redisClient
*c
);
391 static void sunionstoreCommand(redisClient
*c
);
392 static void sdiffCommand(redisClient
*c
);
393 static void sdiffstoreCommand(redisClient
*c
);
394 static void syncCommand(redisClient
*c
);
395 static void flushdbCommand(redisClient
*c
);
396 static void flushallCommand(redisClient
*c
);
397 static void sortCommand(redisClient
*c
);
398 static void lremCommand(redisClient
*c
);
399 static void infoCommand(redisClient
*c
);
400 static void mgetCommand(redisClient
*c
);
401 static void monitorCommand(redisClient
*c
);
402 static void expireCommand(redisClient
*c
);
403 static void getsetCommand(redisClient
*c
);
404 static void ttlCommand(redisClient
*c
);
405 static void slaveofCommand(redisClient
*c
);
406 static void debugCommand(redisClient
*c
);
407 static void msetCommand(redisClient
*c
);
408 static void msetnxCommand(redisClient
*c
);
410 /*================================= Globals ================================= */
413 static struct redisServer server
; /* server global state */
414 static struct redisCommand cmdTable
[] = {
415 {"get",getCommand
,2,REDIS_CMD_INLINE
},
416 {"set",setCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
417 {"setnx",setnxCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
418 {"del",delCommand
,-2,REDIS_CMD_INLINE
},
419 {"exists",existsCommand
,2,REDIS_CMD_INLINE
},
420 {"incr",incrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
421 {"decr",decrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
422 {"mget",mgetCommand
,-2,REDIS_CMD_INLINE
},
423 {"rpush",rpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
424 {"lpush",lpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
425 {"rpop",rpopCommand
,2,REDIS_CMD_INLINE
},
426 {"lpop",lpopCommand
,2,REDIS_CMD_INLINE
},
427 {"llen",llenCommand
,2,REDIS_CMD_INLINE
},
428 {"lindex",lindexCommand
,3,REDIS_CMD_INLINE
},
429 {"lset",lsetCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
430 {"lrange",lrangeCommand
,4,REDIS_CMD_INLINE
},
431 {"ltrim",ltrimCommand
,4,REDIS_CMD_INLINE
},
432 {"lrem",lremCommand
,4,REDIS_CMD_BULK
},
433 {"sadd",saddCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
434 {"srem",sremCommand
,3,REDIS_CMD_BULK
},
435 {"smove",smoveCommand
,4,REDIS_CMD_BULK
},
436 {"sismember",sismemberCommand
,3,REDIS_CMD_BULK
},
437 {"scard",scardCommand
,2,REDIS_CMD_INLINE
},
438 {"spop",spopCommand
,2,REDIS_CMD_INLINE
},
439 {"sinter",sinterCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
440 {"sinterstore",sinterstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
441 {"sunion",sunionCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
442 {"sunionstore",sunionstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
443 {"sdiff",sdiffCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
444 {"sdiffstore",sdiffstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
445 {"smembers",sinterCommand
,2,REDIS_CMD_INLINE
},
446 {"incrby",incrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
447 {"decrby",decrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
448 {"getset",getsetCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
449 {"mset",msetCommand
,-3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
450 {"msetnx",msetnxCommand
,-3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
451 {"randomkey",randomkeyCommand
,1,REDIS_CMD_INLINE
},
452 {"select",selectCommand
,2,REDIS_CMD_INLINE
},
453 {"move",moveCommand
,3,REDIS_CMD_INLINE
},
454 {"rename",renameCommand
,3,REDIS_CMD_INLINE
},
455 {"renamenx",renamenxCommand
,3,REDIS_CMD_INLINE
},
456 {"expire",expireCommand
,3,REDIS_CMD_INLINE
},
457 {"keys",keysCommand
,2,REDIS_CMD_INLINE
},
458 {"dbsize",dbsizeCommand
,1,REDIS_CMD_INLINE
},
459 {"auth",authCommand
,2,REDIS_CMD_INLINE
},
460 {"ping",pingCommand
,1,REDIS_CMD_INLINE
},
461 {"echo",echoCommand
,2,REDIS_CMD_BULK
},
462 {"save",saveCommand
,1,REDIS_CMD_INLINE
},
463 {"bgsave",bgsaveCommand
,1,REDIS_CMD_INLINE
},
464 {"shutdown",shutdownCommand
,1,REDIS_CMD_INLINE
},
465 {"lastsave",lastsaveCommand
,1,REDIS_CMD_INLINE
},
466 {"type",typeCommand
,2,REDIS_CMD_INLINE
},
467 {"sync",syncCommand
,1,REDIS_CMD_INLINE
},
468 {"flushdb",flushdbCommand
,1,REDIS_CMD_INLINE
},
469 {"flushall",flushallCommand
,1,REDIS_CMD_INLINE
},
470 {"sort",sortCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
471 {"info",infoCommand
,1,REDIS_CMD_INLINE
},
472 {"monitor",monitorCommand
,1,REDIS_CMD_INLINE
},
473 {"ttl",ttlCommand
,2,REDIS_CMD_INLINE
},
474 {"slaveof",slaveofCommand
,3,REDIS_CMD_INLINE
},
475 {"debug",debugCommand
,-2,REDIS_CMD_INLINE
},
478 /*============================ Utility functions ============================ */
480 /* Glob-style pattern matching. */
481 int stringmatchlen(const char *pattern
, int patternLen
,
482 const char *string
, int stringLen
, int nocase
)
487 while (pattern
[1] == '*') {
492 return 1; /* match */
494 if (stringmatchlen(pattern
+1, patternLen
-1,
495 string
, stringLen
, nocase
))
496 return 1; /* match */
500 return 0; /* no match */
504 return 0; /* no match */
514 not = pattern
[0] == '^';
521 if (pattern
[0] == '\\') {
524 if (pattern
[0] == string
[0])
526 } else if (pattern
[0] == ']') {
528 } else if (patternLen
== 0) {
532 } else if (pattern
[1] == '-' && patternLen
>= 3) {
533 int start
= pattern
[0];
534 int end
= pattern
[2];
542 start
= tolower(start
);
548 if (c
>= start
&& c
<= end
)
552 if (pattern
[0] == string
[0])
555 if (tolower((int)pattern
[0]) == tolower((int)string
[0]))
565 return 0; /* no match */
571 if (patternLen
>= 2) {
578 if (pattern
[0] != string
[0])
579 return 0; /* no match */
581 if (tolower((int)pattern
[0]) != tolower((int)string
[0]))
582 return 0; /* no match */
590 if (stringLen
== 0) {
591 while(*pattern
== '*') {
598 if (patternLen
== 0 && stringLen
== 0)
603 static void redisLog(int level
, const char *fmt
, ...) {
607 fp
= (server
.logfile
== NULL
) ? stdout
: fopen(server
.logfile
,"a");
611 if (level
>= server
.verbosity
) {
617 strftime(buf
,64,"%d %b %H:%M:%S",gmtime(&now
));
618 fprintf(fp
,"%s %c ",buf
,c
[level
]);
619 vfprintf(fp
, fmt
, ap
);
625 if (server
.logfile
) fclose(fp
);
628 /*====================== Hash table type implementation ==================== */
630 /* This is an hash table type that uses the SDS dynamic strings libary as
631 * keys and radis objects as values (objects can hold SDS strings,
634 static int sdsDictKeyCompare(void *privdata
, const void *key1
,
638 DICT_NOTUSED(privdata
);
640 l1
= sdslen((sds
)key1
);
641 l2
= sdslen((sds
)key2
);
642 if (l1
!= l2
) return 0;
643 return memcmp(key1
, key2
, l1
) == 0;
646 static void dictRedisObjectDestructor(void *privdata
, void *val
)
648 DICT_NOTUSED(privdata
);
653 static int dictObjKeyCompare(void *privdata
, const void *key1
,
656 const robj
*o1
= key1
, *o2
= key2
;
657 return sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
660 static unsigned int dictObjHash(const void *key
) {
662 return dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
665 static int dictEncObjKeyCompare(void *privdata
, const void *key1
,
668 const robj
*o1
= key1
, *o2
= key2
;
670 if (o1
->encoding
== REDIS_ENCODING_RAW
&&
671 o2
->encoding
== REDIS_ENCODING_RAW
)
672 return sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
677 dec1
= o1
->encoding
!= REDIS_ENCODING_RAW
?
678 getDecodedObject(o1
) : (robj
*)o1
;
679 dec2
= o2
->encoding
!= REDIS_ENCODING_RAW
?
680 getDecodedObject(o2
) : (robj
*)o2
;
681 cmp
= sdsDictKeyCompare(privdata
,dec1
->ptr
,dec2
->ptr
);
682 if (dec1
!= o1
) decrRefCount(dec1
);
683 if (dec2
!= o2
) decrRefCount(dec2
);
688 static unsigned int dictEncObjHash(const void *key
) {
691 if (o
->encoding
== REDIS_ENCODING_RAW
)
692 return dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
694 robj
*dec
= getDecodedObject(o
);
695 unsigned int hash
= dictGenHashFunction(dec
->ptr
, sdslen((sds
)dec
->ptr
));
701 static dictType setDictType
= {
702 dictEncObjHash
, /* hash function */
705 dictEncObjKeyCompare
, /* key compare */
706 dictRedisObjectDestructor
, /* key destructor */
707 NULL
/* val destructor */
710 static dictType hashDictType
= {
711 dictObjHash
, /* hash function */
714 dictObjKeyCompare
, /* key compare */
715 dictRedisObjectDestructor
, /* key destructor */
716 dictRedisObjectDestructor
/* val destructor */
719 /* ========================= Random utility functions ======================= */
721 /* Redis generally does not try to recover from out of memory conditions
722 * when allocating objects or strings, it is not clear if it will be possible
723 * to report this condition to the client since the networking layer itself
724 * is based on heap allocation for send buffers, so we simply abort.
725 * At least the code will be simpler to read... */
726 static void oom(const char *msg
) {
727 fprintf(stderr
, "%s: Out of memory\n",msg
);
733 /* ====================== Redis server networking stuff ===================== */
734 static void closeTimedoutClients(void) {
737 time_t now
= time(NULL
);
739 listRewind(server
.clients
);
740 while ((ln
= listYield(server
.clients
)) != NULL
) {
741 c
= listNodeValue(ln
);
742 if (!(c
->flags
& REDIS_SLAVE
) && /* no timeout for slaves */
743 !(c
->flags
& REDIS_MASTER
) && /* no timeout for masters */
744 (now
- c
->lastinteraction
> server
.maxidletime
)) {
745 redisLog(REDIS_DEBUG
,"Closing idle client");
751 static int htNeedsResize(dict
*dict
) {
752 long long size
, used
;
754 size
= dictSlots(dict
);
755 used
= dictSize(dict
);
756 return (size
&& used
&& size
> DICT_HT_INITIAL_SIZE
&&
757 (used
*100/size
< REDIS_HT_MINFILL
));
760 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
761 * we resize the hash table to save memory */
762 static void tryResizeHashTables(void) {
765 for (j
= 0; j
< server
.dbnum
; j
++) {
766 if (htNeedsResize(server
.db
[j
].dict
)) {
767 redisLog(REDIS_DEBUG
,"The hash table %d is too sparse, resize it...",j
);
768 dictResize(server
.db
[j
].dict
);
769 redisLog(REDIS_DEBUG
,"Hash table %d resized.",j
);
771 if (htNeedsResize(server
.db
[j
].expires
))
772 dictResize(server
.db
[j
].expires
);
776 static int serverCron(struct aeEventLoop
*eventLoop
, long long id
, void *clientData
) {
777 int j
, loops
= server
.cronloops
++;
778 REDIS_NOTUSED(eventLoop
);
780 REDIS_NOTUSED(clientData
);
782 /* Update the global state with the amount of used memory */
783 server
.usedmemory
= zmalloc_used_memory();
785 /* Show some info about non-empty databases */
786 for (j
= 0; j
< server
.dbnum
; j
++) {
787 long long size
, used
, vkeys
;
789 size
= dictSlots(server
.db
[j
].dict
);
790 used
= dictSize(server
.db
[j
].dict
);
791 vkeys
= dictSize(server
.db
[j
].expires
);
792 if (!(loops
% 5) && (used
|| vkeys
)) {
793 redisLog(REDIS_DEBUG
,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j
,used
,vkeys
,size
);
794 /* dictPrintStats(server.dict); */
798 /* We don't want to resize the hash tables while a bacground saving
799 * is in progress: the saving child is created using fork() that is
800 * implemented with a copy-on-write semantic in most modern systems, so
801 * if we resize the HT while there is the saving child at work actually
802 * a lot of memory movements in the parent will cause a lot of pages
804 if (!server
.bgsaveinprogress
) tryResizeHashTables();
806 /* Show information about connected clients */
808 redisLog(REDIS_DEBUG
,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
809 listLength(server
.clients
)-listLength(server
.slaves
),
810 listLength(server
.slaves
),
812 dictSize(server
.sharingpool
));
815 /* Close connections of timedout clients */
816 if (server
.maxidletime
&& !(loops
% 10))
817 closeTimedoutClients();
819 /* Check if a background saving in progress terminated */
820 if (server
.bgsaveinprogress
) {
822 if (wait4(-1,&statloc
,WNOHANG
,NULL
)) {
823 int exitcode
= WEXITSTATUS(statloc
);
824 int bysignal
= WIFSIGNALED(statloc
);
826 if (!bysignal
&& exitcode
== 0) {
827 redisLog(REDIS_NOTICE
,
828 "Background saving terminated with success");
830 server
.lastsave
= time(NULL
);
831 } else if (!bysignal
&& exitcode
!= 0) {
832 redisLog(REDIS_WARNING
, "Background saving error");
834 redisLog(REDIS_WARNING
,
835 "Background saving terminated by signal");
836 rdbRemoveTempFile(server
.bgsavechildpid
);
838 server
.bgsaveinprogress
= 0;
839 server
.bgsavechildpid
= -1;
840 updateSlavesWaitingBgsave(exitcode
== 0 ? REDIS_OK
: REDIS_ERR
);
843 /* If there is not a background saving in progress check if
844 * we have to save now */
845 time_t now
= time(NULL
);
846 for (j
= 0; j
< server
.saveparamslen
; j
++) {
847 struct saveparam
*sp
= server
.saveparams
+j
;
849 if (server
.dirty
>= sp
->changes
&&
850 now
-server
.lastsave
> sp
->seconds
) {
851 redisLog(REDIS_NOTICE
,"%d changes in %d seconds. Saving...",
852 sp
->changes
, sp
->seconds
);
853 rdbSaveBackground(server
.dbfilename
);
859 /* Try to expire a few timed out keys */
860 for (j
= 0; j
< server
.dbnum
; j
++) {
861 redisDb
*db
= server
.db
+j
;
862 int num
= dictSize(db
->expires
);
865 time_t now
= time(NULL
);
867 if (num
> REDIS_EXPIRELOOKUPS_PER_CRON
)
868 num
= REDIS_EXPIRELOOKUPS_PER_CRON
;
873 if ((de
= dictGetRandomKey(db
->expires
)) == NULL
) break;
874 t
= (time_t) dictGetEntryVal(de
);
876 deleteKey(db
,dictGetEntryKey(de
));
882 /* Check if we should connect to a MASTER */
883 if (server
.replstate
== REDIS_REPL_CONNECT
) {
884 redisLog(REDIS_NOTICE
,"Connecting to MASTER...");
885 if (syncWithMaster() == REDIS_OK
) {
886 redisLog(REDIS_NOTICE
,"MASTER <-> SLAVE sync succeeded");
892 static void createSharedObjects(void) {
893 shared
.crlf
= createObject(REDIS_STRING
,sdsnew("\r\n"));
894 shared
.ok
= createObject(REDIS_STRING
,sdsnew("+OK\r\n"));
895 shared
.err
= createObject(REDIS_STRING
,sdsnew("-ERR\r\n"));
896 shared
.emptybulk
= createObject(REDIS_STRING
,sdsnew("$0\r\n\r\n"));
897 shared
.czero
= createObject(REDIS_STRING
,sdsnew(":0\r\n"));
898 shared
.cone
= createObject(REDIS_STRING
,sdsnew(":1\r\n"));
899 shared
.nullbulk
= createObject(REDIS_STRING
,sdsnew("$-1\r\n"));
900 shared
.nullmultibulk
= createObject(REDIS_STRING
,sdsnew("*-1\r\n"));
901 shared
.emptymultibulk
= createObject(REDIS_STRING
,sdsnew("*0\r\n"));
903 shared
.pong
= createObject(REDIS_STRING
,sdsnew("+PONG\r\n"));
904 shared
.wrongtypeerr
= createObject(REDIS_STRING
,sdsnew(
905 "-ERR Operation against a key holding the wrong kind of value\r\n"));
906 shared
.nokeyerr
= createObject(REDIS_STRING
,sdsnew(
907 "-ERR no such key\r\n"));
908 shared
.syntaxerr
= createObject(REDIS_STRING
,sdsnew(
909 "-ERR syntax error\r\n"));
910 shared
.sameobjecterr
= createObject(REDIS_STRING
,sdsnew(
911 "-ERR source and destination objects are the same\r\n"));
912 shared
.outofrangeerr
= createObject(REDIS_STRING
,sdsnew(
913 "-ERR index out of range\r\n"));
914 shared
.space
= createObject(REDIS_STRING
,sdsnew(" "));
915 shared
.colon
= createObject(REDIS_STRING
,sdsnew(":"));
916 shared
.plus
= createObject(REDIS_STRING
,sdsnew("+"));
917 shared
.select0
= createStringObject("select 0\r\n",10);
918 shared
.select1
= createStringObject("select 1\r\n",10);
919 shared
.select2
= createStringObject("select 2\r\n",10);
920 shared
.select3
= createStringObject("select 3\r\n",10);
921 shared
.select4
= createStringObject("select 4\r\n",10);
922 shared
.select5
= createStringObject("select 5\r\n",10);
923 shared
.select6
= createStringObject("select 6\r\n",10);
924 shared
.select7
= createStringObject("select 7\r\n",10);
925 shared
.select8
= createStringObject("select 8\r\n",10);
926 shared
.select9
= createStringObject("select 9\r\n",10);
929 static void appendServerSaveParams(time_t seconds
, int changes
) {
930 server
.saveparams
= zrealloc(server
.saveparams
,sizeof(struct saveparam
)*(server
.saveparamslen
+1));
931 if (server
.saveparams
== NULL
) oom("appendServerSaveParams");
932 server
.saveparams
[server
.saveparamslen
].seconds
= seconds
;
933 server
.saveparams
[server
.saveparamslen
].changes
= changes
;
934 server
.saveparamslen
++;
937 static void ResetServerSaveParams() {
938 zfree(server
.saveparams
);
939 server
.saveparams
= NULL
;
940 server
.saveparamslen
= 0;
943 static void initServerConfig() {
944 server
.dbnum
= REDIS_DEFAULT_DBNUM
;
945 server
.port
= REDIS_SERVERPORT
;
946 server
.verbosity
= REDIS_DEBUG
;
947 server
.maxidletime
= REDIS_MAXIDLETIME
;
948 server
.saveparams
= NULL
;
949 server
.logfile
= NULL
; /* NULL = log on standard output */
950 server
.bindaddr
= NULL
;
951 server
.glueoutputbuf
= 1;
952 server
.daemonize
= 0;
953 server
.pidfile
= "/var/run/redis.pid";
954 server
.dbfilename
= "dump.rdb";
955 server
.requirepass
= NULL
;
956 server
.shareobjects
= 0;
957 server
.sharingpoolsize
= 1024;
958 server
.maxclients
= 0;
959 server
.maxmemory
= 0;
960 ResetServerSaveParams();
962 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
963 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
964 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
965 /* Replication related */
967 server
.masterhost
= NULL
;
968 server
.masterport
= 6379;
969 server
.master
= NULL
;
970 server
.replstate
= REDIS_REPL_NONE
;
973 static void initServer() {
976 signal(SIGHUP
, SIG_IGN
);
977 signal(SIGPIPE
, SIG_IGN
);
978 setupSigSegvAction();
980 server
.clients
= listCreate();
981 server
.slaves
= listCreate();
982 server
.monitors
= listCreate();
983 server
.objfreelist
= listCreate();
984 createSharedObjects();
985 server
.el
= aeCreateEventLoop();
986 server
.db
= zmalloc(sizeof(redisDb
)*server
.dbnum
);
987 server
.sharingpool
= dictCreate(&setDictType
,NULL
);
988 if (!server
.db
|| !server
.clients
|| !server
.slaves
|| !server
.monitors
|| !server
.el
|| !server
.objfreelist
)
989 oom("server initialization"); /* Fatal OOM */
990 server
.fd
= anetTcpServer(server
.neterr
, server
.port
, server
.bindaddr
);
991 if (server
.fd
== -1) {
992 redisLog(REDIS_WARNING
, "Opening TCP port: %s", server
.neterr
);
995 for (j
= 0; j
< server
.dbnum
; j
++) {
996 server
.db
[j
].dict
= dictCreate(&hashDictType
,NULL
);
997 server
.db
[j
].expires
= dictCreate(&setDictType
,NULL
);
1000 server
.cronloops
= 0;
1001 server
.bgsaveinprogress
= 0;
1002 server
.bgsavechildpid
= -1;
1003 server
.lastsave
= time(NULL
);
1005 server
.usedmemory
= 0;
1006 server
.stat_numcommands
= 0;
1007 server
.stat_numconnections
= 0;
1008 server
.stat_starttime
= time(NULL
);
1009 aeCreateTimeEvent(server
.el
, 1000, serverCron
, NULL
, NULL
);
1012 /* Empty the whole database */
1013 static long long emptyDb() {
1015 long long removed
= 0;
1017 for (j
= 0; j
< server
.dbnum
; j
++) {
1018 removed
+= dictSize(server
.db
[j
].dict
);
1019 dictEmpty(server
.db
[j
].dict
);
1020 dictEmpty(server
.db
[j
].expires
);
1025 static int yesnotoi(char *s
) {
1026 if (!strcasecmp(s
,"yes")) return 1;
1027 else if (!strcasecmp(s
,"no")) return 0;
1031 /* I agree, this is a very rudimental way to load a configuration...
1032 will improve later if the config gets more complex */
1033 static void loadServerConfig(char *filename
) {
1035 char buf
[REDIS_CONFIGLINE_MAX
+1], *err
= NULL
;
1039 if (filename
[0] == '-' && filename
[1] == '\0')
1042 if ((fp
= fopen(filename
,"r")) == NULL
) {
1043 redisLog(REDIS_WARNING
,"Fatal error, can't open config file");
1048 while(fgets(buf
,REDIS_CONFIGLINE_MAX
+1,fp
) != NULL
) {
1054 line
= sdstrim(line
," \t\r\n");
1056 /* Skip comments and blank lines*/
1057 if (line
[0] == '#' || line
[0] == '\0') {
1062 /* Split into arguments */
1063 argv
= sdssplitlen(line
,sdslen(line
)," ",1,&argc
);
1064 sdstolower(argv
[0]);
1066 /* Execute config directives */
1067 if (!strcasecmp(argv
[0],"timeout") && argc
== 2) {
1068 server
.maxidletime
= atoi(argv
[1]);
1069 if (server
.maxidletime
< 0) {
1070 err
= "Invalid timeout value"; goto loaderr
;
1072 } else if (!strcasecmp(argv
[0],"port") && argc
== 2) {
1073 server
.port
= atoi(argv
[1]);
1074 if (server
.port
< 1 || server
.port
> 65535) {
1075 err
= "Invalid port"; goto loaderr
;
1077 } else if (!strcasecmp(argv
[0],"bind") && argc
== 2) {
1078 server
.bindaddr
= zstrdup(argv
[1]);
1079 } else if (!strcasecmp(argv
[0],"save") && argc
== 3) {
1080 int seconds
= atoi(argv
[1]);
1081 int changes
= atoi(argv
[2]);
1082 if (seconds
< 1 || changes
< 0) {
1083 err
= "Invalid save parameters"; goto loaderr
;
1085 appendServerSaveParams(seconds
,changes
);
1086 } else if (!strcasecmp(argv
[0],"dir") && argc
== 2) {
1087 if (chdir(argv
[1]) == -1) {
1088 redisLog(REDIS_WARNING
,"Can't chdir to '%s': %s",
1089 argv
[1], strerror(errno
));
1092 } else if (!strcasecmp(argv
[0],"loglevel") && argc
== 2) {
1093 if (!strcasecmp(argv
[1],"debug")) server
.verbosity
= REDIS_DEBUG
;
1094 else if (!strcasecmp(argv
[1],"notice")) server
.verbosity
= REDIS_NOTICE
;
1095 else if (!strcasecmp(argv
[1],"warning")) server
.verbosity
= REDIS_WARNING
;
1097 err
= "Invalid log level. Must be one of debug, notice, warning";
1100 } else if (!strcasecmp(argv
[0],"logfile") && argc
== 2) {
1103 server
.logfile
= zstrdup(argv
[1]);
1104 if (!strcasecmp(server
.logfile
,"stdout")) {
1105 zfree(server
.logfile
);
1106 server
.logfile
= NULL
;
1108 if (server
.logfile
) {
1109 /* Test if we are able to open the file. The server will not
1110 * be able to abort just for this problem later... */
1111 logfp
= fopen(server
.logfile
,"a");
1112 if (logfp
== NULL
) {
1113 err
= sdscatprintf(sdsempty(),
1114 "Can't open the log file: %s", strerror(errno
));
1119 } else if (!strcasecmp(argv
[0],"databases") && argc
== 2) {
1120 server
.dbnum
= atoi(argv
[1]);
1121 if (server
.dbnum
< 1) {
1122 err
= "Invalid number of databases"; goto loaderr
;
1124 } else if (!strcasecmp(argv
[0],"maxclients") && argc
== 2) {
1125 server
.maxclients
= atoi(argv
[1]);
1126 } else if (!strcasecmp(argv
[0],"maxmemory") && argc
== 2) {
1127 server
.maxmemory
= strtoll(argv
[1], NULL
, 10);
1128 } else if (!strcasecmp(argv
[0],"slaveof") && argc
== 3) {
1129 server
.masterhost
= sdsnew(argv
[1]);
1130 server
.masterport
= atoi(argv
[2]);
1131 server
.replstate
= REDIS_REPL_CONNECT
;
1132 } else if (!strcasecmp(argv
[0],"glueoutputbuf") && argc
== 2) {
1133 if ((server
.glueoutputbuf
= yesnotoi(argv
[1])) == -1) {
1134 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1136 } else if (!strcasecmp(argv
[0],"shareobjects") && argc
== 2) {
1137 if ((server
.shareobjects
= yesnotoi(argv
[1])) == -1) {
1138 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1140 } else if (!strcasecmp(argv
[0],"shareobjectspoolsize") && argc
== 2) {
1141 server
.sharingpoolsize
= atoi(argv
[1]);
1142 if (server
.sharingpoolsize
< 1) {
1143 err
= "invalid object sharing pool size"; goto loaderr
;
1145 } else if (!strcasecmp(argv
[0],"daemonize") && argc
== 2) {
1146 if ((server
.daemonize
= yesnotoi(argv
[1])) == -1) {
1147 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1149 } else if (!strcasecmp(argv
[0],"requirepass") && argc
== 2) {
1150 server
.requirepass
= zstrdup(argv
[1]);
1151 } else if (!strcasecmp(argv
[0],"pidfile") && argc
== 2) {
1152 server
.pidfile
= zstrdup(argv
[1]);
1153 } else if (!strcasecmp(argv
[0],"dbfilename") && argc
== 2) {
1154 server
.dbfilename
= zstrdup(argv
[1]);
1156 err
= "Bad directive or wrong number of arguments"; goto loaderr
;
1158 for (j
= 0; j
< argc
; j
++)
1163 if (fp
!= stdin
) fclose(fp
);
1167 fprintf(stderr
, "\n*** FATAL CONFIG FILE ERROR ***\n");
1168 fprintf(stderr
, "Reading the configuration file, at line %d\n", linenum
);
1169 fprintf(stderr
, ">>> '%s'\n", line
);
1170 fprintf(stderr
, "%s\n", err
);
1174 static void freeClientArgv(redisClient
*c
) {
1177 for (j
= 0; j
< c
->argc
; j
++)
1178 decrRefCount(c
->argv
[j
]);
1179 for (j
= 0; j
< c
->mbargc
; j
++)
1180 decrRefCount(c
->mbargv
[j
]);
1185 static void freeClient(redisClient
*c
) {
1188 aeDeleteFileEvent(server
.el
,c
->fd
,AE_READABLE
);
1189 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1190 sdsfree(c
->querybuf
);
1191 listRelease(c
->reply
);
1194 ln
= listSearchKey(server
.clients
,c
);
1196 listDelNode(server
.clients
,ln
);
1197 if (c
->flags
& REDIS_SLAVE
) {
1198 if (c
->replstate
== REDIS_REPL_SEND_BULK
&& c
->repldbfd
!= -1)
1200 list
*l
= (c
->flags
& REDIS_MONITOR
) ? server
.monitors
: server
.slaves
;
1201 ln
= listSearchKey(l
,c
);
1205 if (c
->flags
& REDIS_MASTER
) {
1206 server
.master
= NULL
;
1207 server
.replstate
= REDIS_REPL_CONNECT
;
1214 static void glueReplyBuffersIfNeeded(redisClient
*c
) {
1219 listRewind(c
->reply
);
1220 while((ln
= listYield(c
->reply
))) {
1222 totlen
+= sdslen(o
->ptr
);
1223 /* This optimization makes more sense if we don't have to copy
1225 if (totlen
> 1024) return;
1231 listRewind(c
->reply
);
1232 while((ln
= listYield(c
->reply
))) {
1234 memcpy(buf
+copylen
,o
->ptr
,sdslen(o
->ptr
));
1235 copylen
+= sdslen(o
->ptr
);
1236 listDelNode(c
->reply
,ln
);
1238 /* Now the output buffer is empty, add the new single element */
1239 o
= createObject(REDIS_STRING
,sdsnewlen(buf
,totlen
));
1240 if (!listAddNodeTail(c
->reply
,o
)) oom("listAddNodeTail");
1244 static void sendReplyToClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1245 redisClient
*c
= privdata
;
1246 int nwritten
= 0, totwritten
= 0, objlen
;
1249 REDIS_NOTUSED(mask
);
1251 if (server
.glueoutputbuf
&& listLength(c
->reply
) > 1)
1252 glueReplyBuffersIfNeeded(c
);
1253 while(listLength(c
->reply
)) {
1254 o
= listNodeValue(listFirst(c
->reply
));
1255 objlen
= sdslen(o
->ptr
);
1258 listDelNode(c
->reply
,listFirst(c
->reply
));
1262 if (c
->flags
& REDIS_MASTER
) {
1263 /* Don't reply to a master */
1264 nwritten
= objlen
- c
->sentlen
;
1266 nwritten
= write(fd
, ((char*)o
->ptr
)+c
->sentlen
, objlen
- c
->sentlen
);
1267 if (nwritten
<= 0) break;
1269 c
->sentlen
+= nwritten
;
1270 totwritten
+= nwritten
;
1271 /* If we fully sent the object on head go to the next one */
1272 if (c
->sentlen
== objlen
) {
1273 listDelNode(c
->reply
,listFirst(c
->reply
));
1276 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1277 * bytes, in a single threaded server it's a good idea to server
1278 * other clients as well, even if a very large request comes from
1279 * super fast link that is always able to accept data (in real world
1280 * terms think to 'KEYS *' against the loopback interfae) */
1281 if (totwritten
> REDIS_MAX_WRITE_PER_EVENT
) break;
1283 if (nwritten
== -1) {
1284 if (errno
== EAGAIN
) {
1287 redisLog(REDIS_DEBUG
,
1288 "Error writing to client: %s", strerror(errno
));
1293 if (totwritten
> 0) c
->lastinteraction
= time(NULL
);
1294 if (listLength(c
->reply
) == 0) {
1296 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1300 static struct redisCommand
*lookupCommand(char *name
) {
1302 while(cmdTable
[j
].name
!= NULL
) {
1303 if (!strcasecmp(name
,cmdTable
[j
].name
)) return &cmdTable
[j
];
1309 /* resetClient prepare the client to process the next command */
1310 static void resetClient(redisClient
*c
) {
1316 /* If this function gets called we already read a whole
1317 * command, argments are in the client argv/argc fields.
1318 * processCommand() execute the command or prepare the
1319 * server for a bulk read from the client.
1321 * If 1 is returned the client is still alive and valid and
1322 * and other operations can be performed by the caller. Otherwise
1323 * if 0 is returned the client was destroied (i.e. after QUIT). */
1324 static int processCommand(redisClient
*c
) {
1325 struct redisCommand
*cmd
;
1328 /* Free some memory if needed (maxmemory setting) */
1329 if (server
.maxmemory
) freeMemoryIfNeeded();
1331 /* Handle the multi bulk command type. This is an alternative protocol
1332 * supported by Redis in order to receive commands that are composed of
1333 * multiple binary-safe "bulk" arguments. The latency of processing is
1334 * a bit higher but this allows things like multi-sets, so if this
1335 * protocol is used only for MSET and similar commands this is a big win. */
1336 if (c
->multibulk
== 0 && c
->argc
== 1 && ((char*)(c
->argv
[0]->ptr
))[0] == '*') {
1337 c
->multibulk
= atoi(((char*)c
->argv
[0]->ptr
)+1);
1338 if (c
->multibulk
<= 0) {
1342 decrRefCount(c
->argv
[c
->argc
-1]);
1346 } else if (c
->multibulk
) {
1347 if (c
->bulklen
== -1) {
1348 if (((char*)c
->argv
[0]->ptr
)[0] != '$') {
1349 addReplySds(c
,sdsnew("-ERR multi bulk protocol error\r\n"));
1353 int bulklen
= atoi(((char*)c
->argv
[0]->ptr
)+1);
1354 decrRefCount(c
->argv
[0]);
1355 if (bulklen
< 0 || bulklen
> 1024*1024*1024) {
1357 addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n"));
1362 c
->bulklen
= bulklen
+2; /* add two bytes for CR+LF */
1366 c
->mbargv
= zrealloc(c
->mbargv
,(sizeof(robj
*))*(c
->mbargc
+1));
1367 c
->mbargv
[c
->mbargc
] = c
->argv
[0];
1371 if (c
->multibulk
== 0) {
1375 /* Here we need to swap the multi-bulk argc/argv with the
1376 * normal argc/argv of the client structure. */
1378 c
->argv
= c
->mbargv
;
1379 c
->mbargv
= auxargv
;
1382 c
->argc
= c
->mbargc
;
1383 c
->mbargc
= auxargc
;
1385 /* We need to set bulklen to something different than -1
1386 * in order for the code below to process the command without
1387 * to try to read the last argument of a bulk command as
1388 * a special argument. */
1390 /* continue below and process the command */
1397 /* -- end of multi bulk commands processing -- */
1399 /* The QUIT command is handled as a special case. Normal command
1400 * procs are unable to close the client connection safely */
1401 if (!strcasecmp(c
->argv
[0]->ptr
,"quit")) {
1405 cmd
= lookupCommand(c
->argv
[0]->ptr
);
1407 addReplySds(c
,sdsnew("-ERR unknown command\r\n"));
1410 } else if ((cmd
->arity
> 0 && cmd
->arity
!= c
->argc
) ||
1411 (c
->argc
< -cmd
->arity
)) {
1412 addReplySds(c
,sdsnew("-ERR wrong number of arguments\r\n"));
1415 } else if (server
.maxmemory
&& cmd
->flags
& REDIS_CMD_DENYOOM
&& zmalloc_used_memory() > server
.maxmemory
) {
1416 addReplySds(c
,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1419 } else if (cmd
->flags
& REDIS_CMD_BULK
&& c
->bulklen
== -1) {
1420 int bulklen
= atoi(c
->argv
[c
->argc
-1]->ptr
);
1422 decrRefCount(c
->argv
[c
->argc
-1]);
1423 if (bulklen
< 0 || bulklen
> 1024*1024*1024) {
1425 addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n"));
1430 c
->bulklen
= bulklen
+2; /* add two bytes for CR+LF */
1431 /* It is possible that the bulk read is already in the
1432 * buffer. Check this condition and handle it accordingly.
1433 * This is just a fast path, alternative to call processInputBuffer().
1434 * It's a good idea since the code is small and this condition
1435 * happens most of the times. */
1436 if ((signed)sdslen(c
->querybuf
) >= c
->bulklen
) {
1437 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
1439 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
1444 /* Let's try to share objects on the command arguments vector */
1445 if (server
.shareobjects
) {
1447 for(j
= 1; j
< c
->argc
; j
++)
1448 c
->argv
[j
] = tryObjectSharing(c
->argv
[j
]);
1450 /* Let's try to encode the bulk object to save space. */
1451 if (cmd
->flags
& REDIS_CMD_BULK
)
1452 tryObjectEncoding(c
->argv
[c
->argc
-1]);
1454 /* Check if the user is authenticated */
1455 if (server
.requirepass
&& !c
->authenticated
&& cmd
->proc
!= authCommand
) {
1456 addReplySds(c
,sdsnew("-ERR operation not permitted\r\n"));
1461 /* Exec the command */
1462 dirty
= server
.dirty
;
1464 if (server
.dirty
-dirty
!= 0 && listLength(server
.slaves
))
1465 replicationFeedSlaves(server
.slaves
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1466 if (listLength(server
.monitors
))
1467 replicationFeedSlaves(server
.monitors
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1468 server
.stat_numcommands
++;
1470 /* Prepare the client for the next command */
1471 if (c
->flags
& REDIS_CLOSE
) {
1479 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
) {
1483 /* (args*2)+1 is enough room for args, spaces, newlines */
1484 robj
*static_outv
[REDIS_STATIC_ARGS
*2+1];
1486 if (argc
<= REDIS_STATIC_ARGS
) {
1489 outv
= zmalloc(sizeof(robj
*)*(argc
*2+1));
1490 if (!outv
) oom("replicationFeedSlaves");
1493 for (j
= 0; j
< argc
; j
++) {
1494 if (j
!= 0) outv
[outc
++] = shared
.space
;
1495 if ((cmd
->flags
& REDIS_CMD_BULK
) && j
== argc
-1) {
1498 lenobj
= createObject(REDIS_STRING
,
1499 sdscatprintf(sdsempty(),"%d\r\n",
1500 stringObjectLen(argv
[j
])));
1501 lenobj
->refcount
= 0;
1502 outv
[outc
++] = lenobj
;
1504 outv
[outc
++] = argv
[j
];
1506 outv
[outc
++] = shared
.crlf
;
1508 /* Increment all the refcounts at start and decrement at end in order to
1509 * be sure to free objects if there is no slave in a replication state
1510 * able to be feed with commands */
1511 for (j
= 0; j
< outc
; j
++) incrRefCount(outv
[j
]);
1513 while((ln
= listYield(slaves
))) {
1514 redisClient
*slave
= ln
->value
;
1516 /* Don't feed slaves that are still waiting for BGSAVE to start */
1517 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) continue;
1519 /* Feed all the other slaves, MONITORs and so on */
1520 if (slave
->slaveseldb
!= dictid
) {
1524 case 0: selectcmd
= shared
.select0
; break;
1525 case 1: selectcmd
= shared
.select1
; break;
1526 case 2: selectcmd
= shared
.select2
; break;
1527 case 3: selectcmd
= shared
.select3
; break;
1528 case 4: selectcmd
= shared
.select4
; break;
1529 case 5: selectcmd
= shared
.select5
; break;
1530 case 6: selectcmd
= shared
.select6
; break;
1531 case 7: selectcmd
= shared
.select7
; break;
1532 case 8: selectcmd
= shared
.select8
; break;
1533 case 9: selectcmd
= shared
.select9
; break;
1535 selectcmd
= createObject(REDIS_STRING
,
1536 sdscatprintf(sdsempty(),"select %d\r\n",dictid
));
1537 selectcmd
->refcount
= 0;
1540 addReply(slave
,selectcmd
);
1541 slave
->slaveseldb
= dictid
;
1543 for (j
= 0; j
< outc
; j
++) addReply(slave
,outv
[j
]);
1545 for (j
= 0; j
< outc
; j
++) decrRefCount(outv
[j
]);
1546 if (outv
!= static_outv
) zfree(outv
);
1549 static void processInputBuffer(redisClient
*c
) {
1551 if (c
->bulklen
== -1) {
1552 /* Read the first line of the query */
1553 char *p
= strchr(c
->querybuf
,'\n');
1560 query
= c
->querybuf
;
1561 c
->querybuf
= sdsempty();
1562 querylen
= 1+(p
-(query
));
1563 if (sdslen(query
) > querylen
) {
1564 /* leave data after the first line of the query in the buffer */
1565 c
->querybuf
= sdscatlen(c
->querybuf
,query
+querylen
,sdslen(query
)-querylen
);
1567 *p
= '\0'; /* remove "\n" */
1568 if (*(p
-1) == '\r') *(p
-1) = '\0'; /* and "\r" if any */
1569 sdsupdatelen(query
);
1571 /* Now we can split the query in arguments */
1572 if (sdslen(query
) == 0) {
1573 /* Ignore empty query */
1577 argv
= sdssplitlen(query
,sdslen(query
)," ",1,&argc
);
1578 if (argv
== NULL
) oom("sdssplitlen");
1581 if (c
->argv
) zfree(c
->argv
);
1582 c
->argv
= zmalloc(sizeof(robj
*)*argc
);
1583 if (c
->argv
== NULL
) oom("allocating arguments list for client");
1585 for (j
= 0; j
< argc
; j
++) {
1586 if (sdslen(argv
[j
])) {
1587 c
->argv
[c
->argc
] = createObject(REDIS_STRING
,argv
[j
]);
1594 /* Execute the command. If the client is still valid
1595 * after processCommand() return and there is something
1596 * on the query buffer try to process the next command. */
1597 if (c
->argc
&& processCommand(c
) && sdslen(c
->querybuf
)) goto again
;
1599 } else if (sdslen(c
->querybuf
) >= REDIS_REQUEST_MAX_SIZE
) {
1600 redisLog(REDIS_DEBUG
, "Client protocol error");
1605 /* Bulk read handling. Note that if we are at this point
1606 the client already sent a command terminated with a newline,
1607 we are reading the bulk data that is actually the last
1608 argument of the command. */
1609 int qbl
= sdslen(c
->querybuf
);
1611 if (c
->bulklen
<= qbl
) {
1612 /* Copy everything but the final CRLF as final argument */
1613 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
1615 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
1616 /* Process the command. If the client is still valid after
1617 * the processing and there is more data in the buffer
1618 * try to parse it. */
1619 if (processCommand(c
) && sdslen(c
->querybuf
)) goto again
;
1625 static void readQueryFromClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1626 redisClient
*c
= (redisClient
*) privdata
;
1627 char buf
[REDIS_IOBUF_LEN
];
1630 REDIS_NOTUSED(mask
);
1632 nread
= read(fd
, buf
, REDIS_IOBUF_LEN
);
1634 if (errno
== EAGAIN
) {
1637 redisLog(REDIS_DEBUG
, "Reading from client: %s",strerror(errno
));
1641 } else if (nread
== 0) {
1642 redisLog(REDIS_DEBUG
, "Client closed connection");
1647 c
->querybuf
= sdscatlen(c
->querybuf
, buf
, nread
);
1648 c
->lastinteraction
= time(NULL
);
1652 processInputBuffer(c
);
1655 static int selectDb(redisClient
*c
, int id
) {
1656 if (id
< 0 || id
>= server
.dbnum
)
1658 c
->db
= &server
.db
[id
];
1662 static void *dupClientReplyValue(void *o
) {
1663 incrRefCount((robj
*)o
);
1667 static redisClient
*createClient(int fd
) {
1668 redisClient
*c
= zmalloc(sizeof(*c
));
1670 anetNonBlock(NULL
,fd
);
1671 anetTcpNoDelay(NULL
,fd
);
1672 if (!c
) return NULL
;
1675 c
->querybuf
= sdsempty();
1684 c
->lastinteraction
= time(NULL
);
1685 c
->authenticated
= 0;
1686 c
->replstate
= REDIS_REPL_NONE
;
1687 if ((c
->reply
= listCreate()) == NULL
) oom("listCreate");
1688 listSetFreeMethod(c
->reply
,decrRefCount
);
1689 listSetDupMethod(c
->reply
,dupClientReplyValue
);
1690 if (aeCreateFileEvent(server
.el
, c
->fd
, AE_READABLE
,
1691 readQueryFromClient
, c
, NULL
) == AE_ERR
) {
1695 if (!listAddNodeTail(server
.clients
,c
)) oom("listAddNodeTail");
1699 static void addReply(redisClient
*c
, robj
*obj
) {
1700 if (listLength(c
->reply
) == 0 &&
1701 (c
->replstate
== REDIS_REPL_NONE
||
1702 c
->replstate
== REDIS_REPL_ONLINE
) &&
1703 aeCreateFileEvent(server
.el
, c
->fd
, AE_WRITABLE
,
1704 sendReplyToClient
, c
, NULL
) == AE_ERR
) return;
1705 if (obj
->encoding
!= REDIS_ENCODING_RAW
) {
1706 obj
= getDecodedObject(obj
);
1710 if (!listAddNodeTail(c
->reply
,obj
)) oom("listAddNodeTail");
1713 static void addReplySds(redisClient
*c
, sds s
) {
1714 robj
*o
= createObject(REDIS_STRING
,s
);
1719 static void addReplyBulkLen(redisClient
*c
, robj
*obj
) {
1722 if (obj
->encoding
== REDIS_ENCODING_RAW
) {
1723 len
= sdslen(obj
->ptr
);
1725 long n
= (long)obj
->ptr
;
1732 while((n
= n
/10) != 0) {
1736 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",len
));
1739 static void acceptHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1744 REDIS_NOTUSED(mask
);
1745 REDIS_NOTUSED(privdata
);
1747 cfd
= anetAccept(server
.neterr
, fd
, cip
, &cport
);
1748 if (cfd
== AE_ERR
) {
1749 redisLog(REDIS_DEBUG
,"Accepting client connection: %s", server
.neterr
);
1752 redisLog(REDIS_DEBUG
,"Accepted %s:%d", cip
, cport
);
1753 if ((c
= createClient(cfd
)) == NULL
) {
1754 redisLog(REDIS_WARNING
,"Error allocating resoures for the client");
1755 close(cfd
); /* May be already closed, just ingore errors */
1758 /* If maxclient directive is set and this is one client more... close the
1759 * connection. Note that we create the client instead to check before
1760 * for this condition, since now the socket is already set in nonblocking
1761 * mode and we can send an error for free using the Kernel I/O */
1762 if (server
.maxclients
&& listLength(server
.clients
) > server
.maxclients
) {
1763 char *err
= "-ERR max number of clients reached\r\n";
1765 /* That's a best effort error message, don't check write errors */
1766 (void) write(c
->fd
,err
,strlen(err
));
1770 server
.stat_numconnections
++;
1773 /* ======================= Redis objects implementation ===================== */
1775 static robj
*createObject(int type
, void *ptr
) {
1778 if (listLength(server
.objfreelist
)) {
1779 listNode
*head
= listFirst(server
.objfreelist
);
1780 o
= listNodeValue(head
);
1781 listDelNode(server
.objfreelist
,head
);
1783 o
= zmalloc(sizeof(*o
));
1785 if (!o
) oom("createObject");
1787 o
->encoding
= REDIS_ENCODING_RAW
;
1793 static robj
*createStringObject(char *ptr
, size_t len
) {
1794 return createObject(REDIS_STRING
,sdsnewlen(ptr
,len
));
1797 static robj
*createListObject(void) {
1798 list
*l
= listCreate();
1800 if (!l
) oom("listCreate");
1801 listSetFreeMethod(l
,decrRefCount
);
1802 return createObject(REDIS_LIST
,l
);
1805 static robj
*createSetObject(void) {
1806 dict
*d
= dictCreate(&setDictType
,NULL
);
1807 if (!d
) oom("dictCreate");
1808 return createObject(REDIS_SET
,d
);
1811 static void freeStringObject(robj
*o
) {
1812 if (o
->encoding
== REDIS_ENCODING_RAW
) {
1817 static void freeListObject(robj
*o
) {
1818 listRelease((list
*) o
->ptr
);
1821 static void freeSetObject(robj
*o
) {
1822 dictRelease((dict
*) o
->ptr
);
1825 static void freeHashObject(robj
*o
) {
1826 dictRelease((dict
*) o
->ptr
);
1829 static void incrRefCount(robj
*o
) {
1831 #ifdef DEBUG_REFCOUNT
1832 if (o
->type
== REDIS_STRING
)
1833 printf("Increment '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
);
1837 static void decrRefCount(void *obj
) {
1840 #ifdef DEBUG_REFCOUNT
1841 if (o
->type
== REDIS_STRING
)
1842 printf("Decrement '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
-1);
1844 if (--(o
->refcount
) == 0) {
1846 case REDIS_STRING
: freeStringObject(o
); break;
1847 case REDIS_LIST
: freeListObject(o
); break;
1848 case REDIS_SET
: freeSetObject(o
); break;
1849 case REDIS_HASH
: freeHashObject(o
); break;
1850 default: assert(0 != 0); break;
1852 if (listLength(server
.objfreelist
) > REDIS_OBJFREELIST_MAX
||
1853 !listAddNodeHead(server
.objfreelist
,o
))
1858 static robj
*lookupKey(redisDb
*db
, robj
*key
) {
1859 dictEntry
*de
= dictFind(db
->dict
,key
);
1860 return de
? dictGetEntryVal(de
) : NULL
;
1863 static robj
*lookupKeyRead(redisDb
*db
, robj
*key
) {
1864 expireIfNeeded(db
,key
);
1865 return lookupKey(db
,key
);
1868 static robj
*lookupKeyWrite(redisDb
*db
, robj
*key
) {
1869 deleteIfVolatile(db
,key
);
1870 return lookupKey(db
,key
);
1873 static int deleteKey(redisDb
*db
, robj
*key
) {
1876 /* We need to protect key from destruction: after the first dictDelete()
1877 * it may happen that 'key' is no longer valid if we don't increment
1878 * it's count. This may happen when we get the object reference directly
1879 * from the hash table with dictRandomKey() or dict iterators */
1881 if (dictSize(db
->expires
)) dictDelete(db
->expires
,key
);
1882 retval
= dictDelete(db
->dict
,key
);
1885 return retval
== DICT_OK
;
1888 /* Try to share an object against the shared objects pool */
1889 static robj
*tryObjectSharing(robj
*o
) {
1890 struct dictEntry
*de
;
1893 if (o
== NULL
|| server
.shareobjects
== 0) return o
;
1895 assert(o
->type
== REDIS_STRING
);
1896 de
= dictFind(server
.sharingpool
,o
);
1898 robj
*shared
= dictGetEntryKey(de
);
1900 c
= ((unsigned long) dictGetEntryVal(de
))+1;
1901 dictGetEntryVal(de
) = (void*) c
;
1902 incrRefCount(shared
);
1906 /* Here we are using a stream algorihtm: Every time an object is
1907 * shared we increment its count, everytime there is a miss we
1908 * recrement the counter of a random object. If this object reaches
1909 * zero we remove the object and put the current object instead. */
1910 if (dictSize(server
.sharingpool
) >=
1911 server
.sharingpoolsize
) {
1912 de
= dictGetRandomKey(server
.sharingpool
);
1914 c
= ((unsigned long) dictGetEntryVal(de
))-1;
1915 dictGetEntryVal(de
) = (void*) c
;
1917 dictDelete(server
.sharingpool
,de
->key
);
1920 c
= 0; /* If the pool is empty we want to add this object */
1925 retval
= dictAdd(server
.sharingpool
,o
,(void*)1);
1926 assert(retval
== DICT_OK
);
1933 /* Check if the nul-terminated string 's' can be represented by a long
1934 * (that is, is a number that fits into long without any other space or
1935 * character before or after the digits).
1937 * If so, the function returns REDIS_OK and *longval is set to the value
1938 * of the number. Otherwise REDIS_ERR is returned */
1939 static int isStringRepresentableAsLong(char *s
, long *longval
) {
1940 char buf
[32], *endptr
;
1944 value
= strtol(s
, &endptr
, 10);
1945 if (endptr
[0] != '\0') return REDIS_ERR
;
1946 slen
= snprintf(buf
,32,"%ld",value
);
1948 /* If the number converted back into a string is not identical
1949 * then it's not possible to encode the string as integer */
1950 if (strlen(buf
) != (unsigned)slen
|| memcmp(buf
,s
,slen
)) return REDIS_ERR
;
1951 if (longval
) *longval
= value
;
1955 /* Try to encode a string object in order to save space */
1956 static int tryObjectEncoding(robj
*o
) {
1960 if (o
->encoding
!= REDIS_ENCODING_RAW
)
1961 return REDIS_ERR
; /* Already encoded */
1963 /* It's not save to encode shared objects: shared objects can be shared
1964 * everywhere in the "object space" of Redis. Encoded objects can only
1965 * appear as "values" (and not, for instance, as keys) */
1966 if (o
->refcount
> 1) return REDIS_ERR
;
1968 /* Currently we try to encode only strings */
1969 assert(o
->type
== REDIS_STRING
);
1971 /* Check if we can represent this string as a long integer */
1972 if (isStringRepresentableAsLong(s
,&value
) == REDIS_ERR
) return REDIS_ERR
;
1974 /* Ok, this object can be encoded */
1975 o
->encoding
= REDIS_ENCODING_INT
;
1977 o
->ptr
= (void*) value
;
1981 /* Get a decoded version of an encoded object (returned as a new object) */
1982 static robj
*getDecodedObject(const robj
*o
) {
1985 assert(o
->encoding
!= REDIS_ENCODING_RAW
);
1986 if (o
->type
== REDIS_STRING
&& o
->encoding
== REDIS_ENCODING_INT
) {
1989 snprintf(buf
,32,"%ld",(long)o
->ptr
);
1990 dec
= createStringObject(buf
,strlen(buf
));
1997 static int compareStringObjects(robj
*a
, robj
*b
) {
1998 assert(a
->type
== REDIS_STRING
&& b
->type
== REDIS_STRING
);
2000 if (a
->encoding
== REDIS_ENCODING_INT
&& b
->encoding
== REDIS_ENCODING_INT
){
2001 return (long)a
->ptr
- (long)b
->ptr
;
2007 if (a
->encoding
!= REDIS_ENCODING_RAW
) a
= getDecodedObject(a
);
2008 if (b
->encoding
!= REDIS_ENCODING_RAW
) b
= getDecodedObject(a
);
2009 retval
= sdscmp(a
->ptr
,b
->ptr
);
2016 static size_t stringObjectLen(robj
*o
) {
2017 assert(o
->type
== REDIS_STRING
);
2018 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2019 return sdslen(o
->ptr
);
2023 return snprintf(buf
,32,"%ld",(long)o
->ptr
);
2027 /*============================ DB saving/loading ============================ */
2029 static int rdbSaveType(FILE *fp
, unsigned char type
) {
2030 if (fwrite(&type
,1,1,fp
) == 0) return -1;
2034 static int rdbSaveTime(FILE *fp
, time_t t
) {
2035 int32_t t32
= (int32_t) t
;
2036 if (fwrite(&t32
,4,1,fp
) == 0) return -1;
2040 /* check rdbLoadLen() comments for more info */
2041 static int rdbSaveLen(FILE *fp
, uint32_t len
) {
2042 unsigned char buf
[2];
2045 /* Save a 6 bit len */
2046 buf
[0] = (len
&0xFF)|(REDIS_RDB_6BITLEN
<<6);
2047 if (fwrite(buf
,1,1,fp
) == 0) return -1;
2048 } else if (len
< (1<<14)) {
2049 /* Save a 14 bit len */
2050 buf
[0] = ((len
>>8)&0xFF)|(REDIS_RDB_14BITLEN
<<6);
2052 if (fwrite(buf
,2,1,fp
) == 0) return -1;
2054 /* Save a 32 bit len */
2055 buf
[0] = (REDIS_RDB_32BITLEN
<<6);
2056 if (fwrite(buf
,1,1,fp
) == 0) return -1;
2058 if (fwrite(&len
,4,1,fp
) == 0) return -1;
2063 /* String objects in the form "2391" "-100" without any space and with a
2064 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2065 * encoded as integers to save space */
2066 static int rdbTryIntegerEncoding(sds s
, unsigned char *enc
) {
2068 char *endptr
, buf
[32];
2070 /* Check if it's possible to encode this value as a number */
2071 value
= strtoll(s
, &endptr
, 10);
2072 if (endptr
[0] != '\0') return 0;
2073 snprintf(buf
,32,"%lld",value
);
2075 /* If the number converted back into a string is not identical
2076 * then it's not possible to encode the string as integer */
2077 if (strlen(buf
) != sdslen(s
) || memcmp(buf
,s
,sdslen(s
))) return 0;
2079 /* Finally check if it fits in our ranges */
2080 if (value
>= -(1<<7) && value
<= (1<<7)-1) {
2081 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT8
;
2082 enc
[1] = value
&0xFF;
2084 } else if (value
>= -(1<<15) && value
<= (1<<15)-1) {
2085 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT16
;
2086 enc
[1] = value
&0xFF;
2087 enc
[2] = (value
>>8)&0xFF;
2089 } else if (value
>= -((long long)1<<31) && value
<= ((long long)1<<31)-1) {
2090 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT32
;
2091 enc
[1] = value
&0xFF;
2092 enc
[2] = (value
>>8)&0xFF;
2093 enc
[3] = (value
>>16)&0xFF;
2094 enc
[4] = (value
>>24)&0xFF;
2101 static int rdbSaveLzfStringObject(FILE *fp
, robj
*obj
) {
2102 unsigned int comprlen
, outlen
;
2106 /* We require at least four bytes compression for this to be worth it */
2107 outlen
= sdslen(obj
->ptr
)-4;
2108 if (outlen
<= 0) return 0;
2109 if ((out
= zmalloc(outlen
+1)) == NULL
) return 0;
2110 comprlen
= lzf_compress(obj
->ptr
, sdslen(obj
->ptr
), out
, outlen
);
2111 if (comprlen
== 0) {
2115 /* Data compressed! Let's save it on disk */
2116 byte
= (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_LZF
;
2117 if (fwrite(&byte
,1,1,fp
) == 0) goto writeerr
;
2118 if (rdbSaveLen(fp
,comprlen
) == -1) goto writeerr
;
2119 if (rdbSaveLen(fp
,sdslen(obj
->ptr
)) == -1) goto writeerr
;
2120 if (fwrite(out
,comprlen
,1,fp
) == 0) goto writeerr
;
2129 /* Save a string objet as [len][data] on disk. If the object is a string
2130 * representation of an integer value we try to safe it in a special form */
2131 static int rdbSaveStringObjectRaw(FILE *fp
, robj
*obj
) {
2135 len
= sdslen(obj
->ptr
);
2137 /* Try integer encoding */
2139 unsigned char buf
[5];
2140 if ((enclen
= rdbTryIntegerEncoding(obj
->ptr
,buf
)) > 0) {
2141 if (fwrite(buf
,enclen
,1,fp
) == 0) return -1;
2146 /* Try LZF compression - under 20 bytes it's unable to compress even
2147 * aaaaaaaaaaaaaaaaaa so skip it */
2151 retval
= rdbSaveLzfStringObject(fp
,obj
);
2152 if (retval
== -1) return -1;
2153 if (retval
> 0) return 0;
2154 /* retval == 0 means data can't be compressed, save the old way */
2157 /* Store verbatim */
2158 if (rdbSaveLen(fp
,len
) == -1) return -1;
2159 if (len
&& fwrite(obj
->ptr
,len
,1,fp
) == 0) return -1;
2163 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2164 static int rdbSaveStringObject(FILE *fp
, robj
*obj
) {
2168 if (obj
->encoding
!= REDIS_ENCODING_RAW
) {
2169 dec
= getDecodedObject(obj
);
2170 retval
= rdbSaveStringObjectRaw(fp
,dec
);
2174 return rdbSaveStringObjectRaw(fp
,obj
);
2178 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2179 static int rdbSave(char *filename
) {
2180 dictIterator
*di
= NULL
;
2185 time_t now
= time(NULL
);
2187 snprintf(tmpfile
,256,"temp-%d.rdb", (int) getpid());
2188 fp
= fopen(tmpfile
,"w");
2190 redisLog(REDIS_WARNING
, "Failed saving the DB: %s", strerror(errno
));
2193 if (fwrite("REDIS0001",9,1,fp
) == 0) goto werr
;
2194 for (j
= 0; j
< server
.dbnum
; j
++) {
2195 redisDb
*db
= server
.db
+j
;
2197 if (dictSize(d
) == 0) continue;
2198 di
= dictGetIterator(d
);
2204 /* Write the SELECT DB opcode */
2205 if (rdbSaveType(fp
,REDIS_SELECTDB
) == -1) goto werr
;
2206 if (rdbSaveLen(fp
,j
) == -1) goto werr
;
2208 /* Iterate this DB writing every entry */
2209 while((de
= dictNext(di
)) != NULL
) {
2210 robj
*key
= dictGetEntryKey(de
);
2211 robj
*o
= dictGetEntryVal(de
);
2212 time_t expiretime
= getExpire(db
,key
);
2214 /* Save the expire time */
2215 if (expiretime
!= -1) {
2216 /* If this key is already expired skip it */
2217 if (expiretime
< now
) continue;
2218 if (rdbSaveType(fp
,REDIS_EXPIRETIME
) == -1) goto werr
;
2219 if (rdbSaveTime(fp
,expiretime
) == -1) goto werr
;
2221 /* Save the key and associated value */
2222 if (rdbSaveType(fp
,o
->type
) == -1) goto werr
;
2223 if (rdbSaveStringObject(fp
,key
) == -1) goto werr
;
2224 if (o
->type
== REDIS_STRING
) {
2225 /* Save a string value */
2226 if (rdbSaveStringObject(fp
,o
) == -1) goto werr
;
2227 } else if (o
->type
== REDIS_LIST
) {
2228 /* Save a list value */
2229 list
*list
= o
->ptr
;
2233 if (rdbSaveLen(fp
,listLength(list
)) == -1) goto werr
;
2234 while((ln
= listYield(list
))) {
2235 robj
*eleobj
= listNodeValue(ln
);
2237 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
2239 } else if (o
->type
== REDIS_SET
) {
2240 /* Save a set value */
2242 dictIterator
*di
= dictGetIterator(set
);
2245 if (!set
) oom("dictGetIteraotr");
2246 if (rdbSaveLen(fp
,dictSize(set
)) == -1) goto werr
;
2247 while((de
= dictNext(di
)) != NULL
) {
2248 robj
*eleobj
= dictGetEntryKey(de
);
2250 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
2252 dictReleaseIterator(di
);
2257 dictReleaseIterator(di
);
2260 if (rdbSaveType(fp
,REDIS_EOF
) == -1) goto werr
;
2262 /* Make sure data will not remain on the OS's output buffers */
2267 /* Use RENAME to make sure the DB file is changed atomically only
2268 * if the generate DB file is ok. */
2269 if (rename(tmpfile
,filename
) == -1) {
2270 redisLog(REDIS_WARNING
,"Error moving temp DB file on the final destionation: %s", strerror(errno
));
2274 redisLog(REDIS_NOTICE
,"DB saved on disk");
2276 server
.lastsave
= time(NULL
);
2282 redisLog(REDIS_WARNING
,"Write error saving DB on disk: %s", strerror(errno
));
2283 if (di
) dictReleaseIterator(di
);
2287 static int rdbSaveBackground(char *filename
) {
2290 if (server
.bgsaveinprogress
) return REDIS_ERR
;
2291 if ((childpid
= fork()) == 0) {
2294 if (rdbSave(filename
) == REDIS_OK
) {
2301 if (childpid
== -1) {
2302 redisLog(REDIS_WARNING
,"Can't save in background: fork: %s",
2306 redisLog(REDIS_NOTICE
,"Background saving started by pid %d",childpid
);
2307 server
.bgsaveinprogress
= 1;
2308 server
.bgsavechildpid
= childpid
;
2311 return REDIS_OK
; /* unreached */
2314 static void rdbRemoveTempFile(pid_t childpid
) {
2317 snprintf(tmpfile
,256,"temp-%d.rdb", (int) childpid
);
2321 static int rdbLoadType(FILE *fp
) {
2323 if (fread(&type
,1,1,fp
) == 0) return -1;
2327 static time_t rdbLoadTime(FILE *fp
) {
2329 if (fread(&t32
,4,1,fp
) == 0) return -1;
2330 return (time_t) t32
;
2333 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2334 * of this file for a description of how this are stored on disk.
2336 * isencoded is set to 1 if the readed length is not actually a length but
2337 * an "encoding type", check the above comments for more info */
2338 static uint32_t rdbLoadLen(FILE *fp
, int rdbver
, int *isencoded
) {
2339 unsigned char buf
[2];
2342 if (isencoded
) *isencoded
= 0;
2344 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
2349 if (fread(buf
,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2350 type
= (buf
[0]&0xC0)>>6;
2351 if (type
== REDIS_RDB_6BITLEN
) {
2352 /* Read a 6 bit len */
2354 } else if (type
== REDIS_RDB_ENCVAL
) {
2355 /* Read a 6 bit len encoding type */
2356 if (isencoded
) *isencoded
= 1;
2358 } else if (type
== REDIS_RDB_14BITLEN
) {
2359 /* Read a 14 bit len */
2360 if (fread(buf
+1,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2361 return ((buf
[0]&0x3F)<<8)|buf
[1];
2363 /* Read a 32 bit len */
2364 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
2370 static robj
*rdbLoadIntegerObject(FILE *fp
, int enctype
) {
2371 unsigned char enc
[4];
2374 if (enctype
== REDIS_RDB_ENC_INT8
) {
2375 if (fread(enc
,1,1,fp
) == 0) return NULL
;
2376 val
= (signed char)enc
[0];
2377 } else if (enctype
== REDIS_RDB_ENC_INT16
) {
2379 if (fread(enc
,2,1,fp
) == 0) return NULL
;
2380 v
= enc
[0]|(enc
[1]<<8);
2382 } else if (enctype
== REDIS_RDB_ENC_INT32
) {
2384 if (fread(enc
,4,1,fp
) == 0) return NULL
;
2385 v
= enc
[0]|(enc
[1]<<8)|(enc
[2]<<16)|(enc
[3]<<24);
2388 val
= 0; /* anti-warning */
2391 return createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",val
));
2394 static robj
*rdbLoadLzfStringObject(FILE*fp
, int rdbver
) {
2395 unsigned int len
, clen
;
2396 unsigned char *c
= NULL
;
2399 if ((clen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2400 if ((len
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2401 if ((c
= zmalloc(clen
)) == NULL
) goto err
;
2402 if ((val
= sdsnewlen(NULL
,len
)) == NULL
) goto err
;
2403 if (fread(c
,clen
,1,fp
) == 0) goto err
;
2404 if (lzf_decompress(c
,clen
,val
,len
) == 0) goto err
;
2406 return createObject(REDIS_STRING
,val
);
2413 static robj
*rdbLoadStringObject(FILE*fp
, int rdbver
) {
2418 len
= rdbLoadLen(fp
,rdbver
,&isencoded
);
2421 case REDIS_RDB_ENC_INT8
:
2422 case REDIS_RDB_ENC_INT16
:
2423 case REDIS_RDB_ENC_INT32
:
2424 return tryObjectSharing(rdbLoadIntegerObject(fp
,len
));
2425 case REDIS_RDB_ENC_LZF
:
2426 return tryObjectSharing(rdbLoadLzfStringObject(fp
,rdbver
));
2432 if (len
== REDIS_RDB_LENERR
) return NULL
;
2433 val
= sdsnewlen(NULL
,len
);
2434 if (len
&& fread(val
,len
,1,fp
) == 0) {
2438 return tryObjectSharing(createObject(REDIS_STRING
,val
));
2441 static int rdbLoad(char *filename
) {
2443 robj
*keyobj
= NULL
;
2445 int type
, retval
, rdbver
;
2446 dict
*d
= server
.db
[0].dict
;
2447 redisDb
*db
= server
.db
+0;
2449 time_t expiretime
= -1, now
= time(NULL
);
2451 fp
= fopen(filename
,"r");
2452 if (!fp
) return REDIS_ERR
;
2453 if (fread(buf
,9,1,fp
) == 0) goto eoferr
;
2455 if (memcmp(buf
,"REDIS",5) != 0) {
2457 redisLog(REDIS_WARNING
,"Wrong signature trying to load DB from file");
2460 rdbver
= atoi(buf
+5);
2463 redisLog(REDIS_WARNING
,"Can't handle RDB format version %d",rdbver
);
2470 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
2471 if (type
== REDIS_EXPIRETIME
) {
2472 if ((expiretime
= rdbLoadTime(fp
)) == -1) goto eoferr
;
2473 /* We read the time so we need to read the object type again */
2474 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
2476 if (type
== REDIS_EOF
) break;
2477 /* Handle SELECT DB opcode as a special case */
2478 if (type
== REDIS_SELECTDB
) {
2479 if ((dbid
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2481 if (dbid
>= (unsigned)server
.dbnum
) {
2482 redisLog(REDIS_WARNING
,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server
.dbnum
);
2485 db
= server
.db
+dbid
;
2490 if ((keyobj
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2492 if (type
== REDIS_STRING
) {
2493 /* Read string value */
2494 if ((o
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2495 tryObjectEncoding(o
);
2496 } else if (type
== REDIS_LIST
|| type
== REDIS_SET
) {
2497 /* Read list/set value */
2500 if ((listlen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2502 o
= (type
== REDIS_LIST
) ? createListObject() : createSetObject();
2503 /* Load every single element of the list/set */
2507 if ((ele
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2508 tryObjectEncoding(ele
);
2509 if (type
== REDIS_LIST
) {
2510 if (!listAddNodeTail((list
*)o
->ptr
,ele
))
2511 oom("listAddNodeTail");
2513 if (dictAdd((dict
*)o
->ptr
,ele
,NULL
) == DICT_ERR
)
2520 /* Add the new object in the hash table */
2521 retval
= dictAdd(d
,keyobj
,o
);
2522 if (retval
== DICT_ERR
) {
2523 redisLog(REDIS_WARNING
,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj
->ptr
);
2526 /* Set the expire time if needed */
2527 if (expiretime
!= -1) {
2528 setExpire(db
,keyobj
,expiretime
);
2529 /* Delete this key if already expired */
2530 if (expiretime
< now
) deleteKey(db
,keyobj
);
2538 eoferr
: /* unexpected end of file is handled here with a fatal exit */
2539 if (keyobj
) decrRefCount(keyobj
);
2540 redisLog(REDIS_WARNING
,"Short read or OOM loading DB. Unrecoverable error, exiting now.");
2542 return REDIS_ERR
; /* Just to avoid warning */
2545 /*================================== Commands =============================== */
2547 static void authCommand(redisClient
*c
) {
2548 if (!server
.requirepass
|| !strcmp(c
->argv
[1]->ptr
, server
.requirepass
)) {
2549 c
->authenticated
= 1;
2550 addReply(c
,shared
.ok
);
2552 c
->authenticated
= 0;
2553 addReply(c
,shared
.err
);
2557 static void pingCommand(redisClient
*c
) {
2558 addReply(c
,shared
.pong
);
2561 static void echoCommand(redisClient
*c
) {
2562 addReplyBulkLen(c
,c
->argv
[1]);
2563 addReply(c
,c
->argv
[1]);
2564 addReply(c
,shared
.crlf
);
2567 /*=================================== Strings =============================== */
2569 static void setGenericCommand(redisClient
*c
, int nx
) {
2572 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2573 if (retval
== DICT_ERR
) {
2575 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2576 incrRefCount(c
->argv
[2]);
2578 addReply(c
,shared
.czero
);
2582 incrRefCount(c
->argv
[1]);
2583 incrRefCount(c
->argv
[2]);
2586 removeExpire(c
->db
,c
->argv
[1]);
2587 addReply(c
, nx
? shared
.cone
: shared
.ok
);
2590 static void setCommand(redisClient
*c
) {
2591 setGenericCommand(c
,0);
2594 static void setnxCommand(redisClient
*c
) {
2595 setGenericCommand(c
,1);
2598 static void getCommand(redisClient
*c
) {
2599 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2602 addReply(c
,shared
.nullbulk
);
2604 if (o
->type
!= REDIS_STRING
) {
2605 addReply(c
,shared
.wrongtypeerr
);
2607 addReplyBulkLen(c
,o
);
2609 addReply(c
,shared
.crlf
);
2614 static void getsetCommand(redisClient
*c
) {
2616 if (dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]) == DICT_ERR
) {
2617 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2619 incrRefCount(c
->argv
[1]);
2621 incrRefCount(c
->argv
[2]);
2623 removeExpire(c
->db
,c
->argv
[1]);
2626 static void mgetCommand(redisClient
*c
) {
2629 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",c
->argc
-1));
2630 for (j
= 1; j
< c
->argc
; j
++) {
2631 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[j
]);
2633 addReply(c
,shared
.nullbulk
);
2635 if (o
->type
!= REDIS_STRING
) {
2636 addReply(c
,shared
.nullbulk
);
2638 addReplyBulkLen(c
,o
);
2640 addReply(c
,shared
.crlf
);
2646 static void incrDecrCommand(redisClient
*c
, long long incr
) {
2651 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2655 if (o
->type
!= REDIS_STRING
) {
2660 if (o
->encoding
== REDIS_ENCODING_RAW
)
2661 value
= strtoll(o
->ptr
, &eptr
, 10);
2662 else if (o
->encoding
== REDIS_ENCODING_INT
)
2663 value
= (long)o
->ptr
;
2670 o
= createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",value
));
2671 tryObjectEncoding(o
);
2672 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],o
);
2673 if (retval
== DICT_ERR
) {
2674 dictReplace(c
->db
->dict
,c
->argv
[1],o
);
2675 removeExpire(c
->db
,c
->argv
[1]);
2677 incrRefCount(c
->argv
[1]);
2680 addReply(c
,shared
.colon
);
2682 addReply(c
,shared
.crlf
);
2685 static void incrCommand(redisClient
*c
) {
2686 incrDecrCommand(c
,1);
2689 static void decrCommand(redisClient
*c
) {
2690 incrDecrCommand(c
,-1);
2693 static void incrbyCommand(redisClient
*c
) {
2694 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
2695 incrDecrCommand(c
,incr
);
2698 static void decrbyCommand(redisClient
*c
) {
2699 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
2700 incrDecrCommand(c
,-incr
);
2703 /* ========================= Type agnostic commands ========================= */
2705 static void delCommand(redisClient
*c
) {
2708 for (j
= 1; j
< c
->argc
; j
++) {
2709 if (deleteKey(c
->db
,c
->argv
[j
])) {
2716 addReply(c
,shared
.czero
);
2719 addReply(c
,shared
.cone
);
2722 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",deleted
));
2727 static void existsCommand(redisClient
*c
) {
2728 addReply(c
,lookupKeyRead(c
->db
,c
->argv
[1]) ? shared
.cone
: shared
.czero
);
2731 static void selectCommand(redisClient
*c
) {
2732 int id
= atoi(c
->argv
[1]->ptr
);
2734 if (selectDb(c
,id
) == REDIS_ERR
) {
2735 addReplySds(c
,sdsnew("-ERR invalid DB index\r\n"));
2737 addReply(c
,shared
.ok
);
2741 static void randomkeyCommand(redisClient
*c
) {
2745 de
= dictGetRandomKey(c
->db
->dict
);
2746 if (!de
|| expireIfNeeded(c
->db
,dictGetEntryKey(de
)) == 0) break;
2749 addReply(c
,shared
.plus
);
2750 addReply(c
,shared
.crlf
);
2752 addReply(c
,shared
.plus
);
2753 addReply(c
,dictGetEntryKey(de
));
2754 addReply(c
,shared
.crlf
);
2758 static void keysCommand(redisClient
*c
) {
2761 sds pattern
= c
->argv
[1]->ptr
;
2762 int plen
= sdslen(pattern
);
2763 int numkeys
= 0, keyslen
= 0;
2764 robj
*lenobj
= createObject(REDIS_STRING
,NULL
);
2766 di
= dictGetIterator(c
->db
->dict
);
2767 if (!di
) oom("dictGetIterator");
2769 decrRefCount(lenobj
);
2770 while((de
= dictNext(di
)) != NULL
) {
2771 robj
*keyobj
= dictGetEntryKey(de
);
2773 sds key
= keyobj
->ptr
;
2774 if ((pattern
[0] == '*' && pattern
[1] == '\0') ||
2775 stringmatchlen(pattern
,plen
,key
,sdslen(key
),0)) {
2776 if (expireIfNeeded(c
->db
,keyobj
) == 0) {
2778 addReply(c
,shared
.space
);
2781 keyslen
+= sdslen(key
);
2785 dictReleaseIterator(di
);
2786 lenobj
->ptr
= sdscatprintf(sdsempty(),"$%lu\r\n",keyslen
+(numkeys
? (numkeys
-1) : 0));
2787 addReply(c
,shared
.crlf
);
2790 static void dbsizeCommand(redisClient
*c
) {
2792 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c
->db
->dict
)));
2795 static void lastsaveCommand(redisClient
*c
) {
2797 sdscatprintf(sdsempty(),":%lu\r\n",server
.lastsave
));
2800 static void typeCommand(redisClient
*c
) {
2804 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2809 case REDIS_STRING
: type
= "+string"; break;
2810 case REDIS_LIST
: type
= "+list"; break;
2811 case REDIS_SET
: type
= "+set"; break;
2812 default: type
= "unknown"; break;
2815 addReplySds(c
,sdsnew(type
));
2816 addReply(c
,shared
.crlf
);
2819 static void saveCommand(redisClient
*c
) {
2820 if (server
.bgsaveinprogress
) {
2821 addReplySds(c
,sdsnew("-ERR background save in progress\r\n"));
2824 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
2825 addReply(c
,shared
.ok
);
2827 addReply(c
,shared
.err
);
2831 static void bgsaveCommand(redisClient
*c
) {
2832 if (server
.bgsaveinprogress
) {
2833 addReplySds(c
,sdsnew("-ERR background save already in progress\r\n"));
2836 if (rdbSaveBackground(server
.dbfilename
) == REDIS_OK
) {
2837 addReply(c
,shared
.ok
);
2839 addReply(c
,shared
.err
);
2843 static void shutdownCommand(redisClient
*c
) {
2844 redisLog(REDIS_WARNING
,"User requested shutdown, saving DB...");
2845 /* Kill the saving child if there is a background saving in progress.
2846 We want to avoid race conditions, for instance our saving child may
2847 overwrite the synchronous saving did by SHUTDOWN. */
2848 if (server
.bgsaveinprogress
) {
2849 redisLog(REDIS_WARNING
,"There is a live saving child. Killing it!");
2850 kill(server
.bgsavechildpid
,SIGKILL
);
2851 rdbRemoveTempFile(server
.bgsavechildpid
);
2854 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
2855 if (server
.daemonize
)
2856 unlink(server
.pidfile
);
2857 redisLog(REDIS_WARNING
,"%zu bytes used at exit",zmalloc_used_memory());
2858 redisLog(REDIS_WARNING
,"Server exit now, bye bye...");
2861 /* Ooops.. error saving! The best we can do is to continue operating.
2862 * Note that if there was a background saving process, in the next
2863 * cron() Redis will be notified that the background saving aborted,
2864 * handling special stuff like slaves pending for synchronization... */
2865 redisLog(REDIS_WARNING
,"Error trying to save the DB, can't exit");
2866 addReplySds(c
,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
2870 static void renameGenericCommand(redisClient
*c
, int nx
) {
2873 /* To use the same key as src and dst is probably an error */
2874 if (sdscmp(c
->argv
[1]->ptr
,c
->argv
[2]->ptr
) == 0) {
2875 addReply(c
,shared
.sameobjecterr
);
2879 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2881 addReply(c
,shared
.nokeyerr
);
2885 deleteIfVolatile(c
->db
,c
->argv
[2]);
2886 if (dictAdd(c
->db
->dict
,c
->argv
[2],o
) == DICT_ERR
) {
2889 addReply(c
,shared
.czero
);
2892 dictReplace(c
->db
->dict
,c
->argv
[2],o
);
2894 incrRefCount(c
->argv
[2]);
2896 deleteKey(c
->db
,c
->argv
[1]);
2898 addReply(c
,nx
? shared
.cone
: shared
.ok
);
2901 static void renameCommand(redisClient
*c
) {
2902 renameGenericCommand(c
,0);
2905 static void renamenxCommand(redisClient
*c
) {
2906 renameGenericCommand(c
,1);
2909 static void moveCommand(redisClient
*c
) {
2914 /* Obtain source and target DB pointers */
2917 if (selectDb(c
,atoi(c
->argv
[2]->ptr
)) == REDIS_ERR
) {
2918 addReply(c
,shared
.outofrangeerr
);
2922 selectDb(c
,srcid
); /* Back to the source DB */
2924 /* If the user is moving using as target the same
2925 * DB as the source DB it is probably an error. */
2927 addReply(c
,shared
.sameobjecterr
);
2931 /* Check if the element exists and get a reference */
2932 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2934 addReply(c
,shared
.czero
);
2938 /* Try to add the element to the target DB */
2939 deleteIfVolatile(dst
,c
->argv
[1]);
2940 if (dictAdd(dst
->dict
,c
->argv
[1],o
) == DICT_ERR
) {
2941 addReply(c
,shared
.czero
);
2944 incrRefCount(c
->argv
[1]);
2947 /* OK! key moved, free the entry in the source DB */
2948 deleteKey(src
,c
->argv
[1]);
2950 addReply(c
,shared
.cone
);
2953 /* =================================== Lists ================================ */
2954 static void pushGenericCommand(redisClient
*c
, int where
) {
2958 lobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2960 lobj
= createListObject();
2962 if (where
== REDIS_HEAD
) {
2963 if (!listAddNodeHead(list
,c
->argv
[2])) oom("listAddNodeHead");
2965 if (!listAddNodeTail(list
,c
->argv
[2])) oom("listAddNodeTail");
2967 dictAdd(c
->db
->dict
,c
->argv
[1],lobj
);
2968 incrRefCount(c
->argv
[1]);
2969 incrRefCount(c
->argv
[2]);
2971 if (lobj
->type
!= REDIS_LIST
) {
2972 addReply(c
,shared
.wrongtypeerr
);
2976 if (where
== REDIS_HEAD
) {
2977 if (!listAddNodeHead(list
,c
->argv
[2])) oom("listAddNodeHead");
2979 if (!listAddNodeTail(list
,c
->argv
[2])) oom("listAddNodeTail");
2981 incrRefCount(c
->argv
[2]);
2984 addReply(c
,shared
.ok
);
2987 static void lpushCommand(redisClient
*c
) {
2988 pushGenericCommand(c
,REDIS_HEAD
);
2991 static void rpushCommand(redisClient
*c
) {
2992 pushGenericCommand(c
,REDIS_TAIL
);
2995 static void llenCommand(redisClient
*c
) {
2999 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3001 addReply(c
,shared
.czero
);
3004 if (o
->type
!= REDIS_LIST
) {
3005 addReply(c
,shared
.wrongtypeerr
);
3008 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",listLength(l
)));
3013 static void lindexCommand(redisClient
*c
) {
3015 int index
= atoi(c
->argv
[2]->ptr
);
3017 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3019 addReply(c
,shared
.nullbulk
);
3021 if (o
->type
!= REDIS_LIST
) {
3022 addReply(c
,shared
.wrongtypeerr
);
3024 list
*list
= o
->ptr
;
3027 ln
= listIndex(list
, index
);
3029 addReply(c
,shared
.nullbulk
);
3031 robj
*ele
= listNodeValue(ln
);
3032 addReplyBulkLen(c
,ele
);
3034 addReply(c
,shared
.crlf
);
3040 static void lsetCommand(redisClient
*c
) {
3042 int index
= atoi(c
->argv
[2]->ptr
);
3044 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3046 addReply(c
,shared
.nokeyerr
);
3048 if (o
->type
!= REDIS_LIST
) {
3049 addReply(c
,shared
.wrongtypeerr
);
3051 list
*list
= o
->ptr
;
3054 ln
= listIndex(list
, index
);
3056 addReply(c
,shared
.outofrangeerr
);
3058 robj
*ele
= listNodeValue(ln
);
3061 listNodeValue(ln
) = c
->argv
[3];
3062 incrRefCount(c
->argv
[3]);
3063 addReply(c
,shared
.ok
);
3070 static void popGenericCommand(redisClient
*c
, int where
) {
3073 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3075 addReply(c
,shared
.nullbulk
);
3077 if (o
->type
!= REDIS_LIST
) {
3078 addReply(c
,shared
.wrongtypeerr
);
3080 list
*list
= o
->ptr
;
3083 if (where
== REDIS_HEAD
)
3084 ln
= listFirst(list
);
3086 ln
= listLast(list
);
3089 addReply(c
,shared
.nullbulk
);
3091 robj
*ele
= listNodeValue(ln
);
3092 addReplyBulkLen(c
,ele
);
3094 addReply(c
,shared
.crlf
);
3095 listDelNode(list
,ln
);
3102 static void lpopCommand(redisClient
*c
) {
3103 popGenericCommand(c
,REDIS_HEAD
);
3106 static void rpopCommand(redisClient
*c
) {
3107 popGenericCommand(c
,REDIS_TAIL
);
3110 static void lrangeCommand(redisClient
*c
) {
3112 int start
= atoi(c
->argv
[2]->ptr
);
3113 int end
= atoi(c
->argv
[3]->ptr
);
3115 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3117 addReply(c
,shared
.nullmultibulk
);
3119 if (o
->type
!= REDIS_LIST
) {
3120 addReply(c
,shared
.wrongtypeerr
);
3122 list
*list
= o
->ptr
;
3124 int llen
= listLength(list
);
3128 /* convert negative indexes */
3129 if (start
< 0) start
= llen
+start
;
3130 if (end
< 0) end
= llen
+end
;
3131 if (start
< 0) start
= 0;
3132 if (end
< 0) end
= 0;
3134 /* indexes sanity checks */
3135 if (start
> end
|| start
>= llen
) {
3136 /* Out of range start or start > end result in empty list */
3137 addReply(c
,shared
.emptymultibulk
);
3140 if (end
>= llen
) end
= llen
-1;
3141 rangelen
= (end
-start
)+1;
3143 /* Return the result in form of a multi-bulk reply */
3144 ln
= listIndex(list
, start
);
3145 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",rangelen
));
3146 for (j
= 0; j
< rangelen
; j
++) {
3147 ele
= listNodeValue(ln
);
3148 addReplyBulkLen(c
,ele
);
3150 addReply(c
,shared
.crlf
);
3157 static void ltrimCommand(redisClient
*c
) {
3159 int start
= atoi(c
->argv
[2]->ptr
);
3160 int end
= atoi(c
->argv
[3]->ptr
);
3162 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3164 addReply(c
,shared
.nokeyerr
);
3166 if (o
->type
!= REDIS_LIST
) {
3167 addReply(c
,shared
.wrongtypeerr
);
3169 list
*list
= o
->ptr
;
3171 int llen
= listLength(list
);
3172 int j
, ltrim
, rtrim
;
3174 /* convert negative indexes */
3175 if (start
< 0) start
= llen
+start
;
3176 if (end
< 0) end
= llen
+end
;
3177 if (start
< 0) start
= 0;
3178 if (end
< 0) end
= 0;
3180 /* indexes sanity checks */
3181 if (start
> end
|| start
>= llen
) {
3182 /* Out of range start or start > end result in empty list */
3186 if (end
>= llen
) end
= llen
-1;
3191 /* Remove list elements to perform the trim */
3192 for (j
= 0; j
< ltrim
; j
++) {
3193 ln
= listFirst(list
);
3194 listDelNode(list
,ln
);
3196 for (j
= 0; j
< rtrim
; j
++) {
3197 ln
= listLast(list
);
3198 listDelNode(list
,ln
);
3201 addReply(c
,shared
.ok
);
3206 static void lremCommand(redisClient
*c
) {
3209 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3211 addReply(c
,shared
.czero
);
3213 if (o
->type
!= REDIS_LIST
) {
3214 addReply(c
,shared
.wrongtypeerr
);
3216 list
*list
= o
->ptr
;
3217 listNode
*ln
, *next
;
3218 int toremove
= atoi(c
->argv
[2]->ptr
);
3223 toremove
= -toremove
;
3226 ln
= fromtail
? list
->tail
: list
->head
;
3228 robj
*ele
= listNodeValue(ln
);
3230 next
= fromtail
? ln
->prev
: ln
->next
;
3231 if (compareStringObjects(ele
,c
->argv
[3]) == 0) {
3232 listDelNode(list
,ln
);
3235 if (toremove
&& removed
== toremove
) break;
3239 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",removed
));
3244 /* ==================================== Sets ================================ */
3246 static void saddCommand(redisClient
*c
) {
3249 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3251 set
= createSetObject();
3252 dictAdd(c
->db
->dict
,c
->argv
[1],set
);
3253 incrRefCount(c
->argv
[1]);
3255 if (set
->type
!= REDIS_SET
) {
3256 addReply(c
,shared
.wrongtypeerr
);
3260 if (dictAdd(set
->ptr
,c
->argv
[2],NULL
) == DICT_OK
) {
3261 incrRefCount(c
->argv
[2]);
3263 addReply(c
,shared
.cone
);
3265 addReply(c
,shared
.czero
);
3269 static void sremCommand(redisClient
*c
) {
3272 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3274 addReply(c
,shared
.czero
);
3276 if (set
->type
!= REDIS_SET
) {
3277 addReply(c
,shared
.wrongtypeerr
);
3280 if (dictDelete(set
->ptr
,c
->argv
[2]) == DICT_OK
) {
3282 if (htNeedsResize(set
->ptr
)) dictResize(set
->ptr
);
3283 addReply(c
,shared
.cone
);
3285 addReply(c
,shared
.czero
);
3290 static void smoveCommand(redisClient
*c
) {
3291 robj
*srcset
, *dstset
;
3293 srcset
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3294 dstset
= lookupKeyWrite(c
->db
,c
->argv
[2]);
3296 /* If the source key does not exist return 0, if it's of the wrong type
3298 if (srcset
== NULL
|| srcset
->type
!= REDIS_SET
) {
3299 addReply(c
, srcset
? shared
.wrongtypeerr
: shared
.czero
);
3302 /* Error if the destination key is not a set as well */
3303 if (dstset
&& dstset
->type
!= REDIS_SET
) {
3304 addReply(c
,shared
.wrongtypeerr
);
3307 /* Remove the element from the source set */
3308 if (dictDelete(srcset
->ptr
,c
->argv
[3]) == DICT_ERR
) {
3309 /* Key not found in the src set! return zero */
3310 addReply(c
,shared
.czero
);
3314 /* Add the element to the destination set */
3316 dstset
= createSetObject();
3317 dictAdd(c
->db
->dict
,c
->argv
[2],dstset
);
3318 incrRefCount(c
->argv
[2]);
3320 if (dictAdd(dstset
->ptr
,c
->argv
[3],NULL
) == DICT_OK
)
3321 incrRefCount(c
->argv
[3]);
3322 addReply(c
,shared
.cone
);
3325 static void sismemberCommand(redisClient
*c
) {
3328 set
= lookupKeyRead(c
->db
,c
->argv
[1]);
3330 addReply(c
,shared
.czero
);
3332 if (set
->type
!= REDIS_SET
) {
3333 addReply(c
,shared
.wrongtypeerr
);
3336 if (dictFind(set
->ptr
,c
->argv
[2]))
3337 addReply(c
,shared
.cone
);
3339 addReply(c
,shared
.czero
);
3343 static void scardCommand(redisClient
*c
) {
3347 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3349 addReply(c
,shared
.czero
);
3352 if (o
->type
!= REDIS_SET
) {
3353 addReply(c
,shared
.wrongtypeerr
);
3356 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",
3362 static void spopCommand(redisClient
*c
) {
3366 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3368 addReply(c
,shared
.nullbulk
);
3370 if (set
->type
!= REDIS_SET
) {
3371 addReply(c
,shared
.wrongtypeerr
);
3374 de
= dictGetRandomKey(set
->ptr
);
3376 addReply(c
,shared
.nullbulk
);
3378 robj
*ele
= dictGetEntryKey(de
);
3380 addReplyBulkLen(c
,ele
);
3382 addReply(c
,shared
.crlf
);
3383 dictDelete(set
->ptr
,ele
);
3384 if (htNeedsResize(set
->ptr
)) dictResize(set
->ptr
);
3390 static int qsortCompareSetsByCardinality(const void *s1
, const void *s2
) {
3391 dict
**d1
= (void*) s1
, **d2
= (void*) s2
;
3393 return dictSize(*d1
)-dictSize(*d2
);
3396 static void sinterGenericCommand(redisClient
*c
, robj
**setskeys
, int setsnum
, robj
*dstkey
) {
3397 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
3400 robj
*lenobj
= NULL
, *dstset
= NULL
;
3401 int j
, cardinality
= 0;
3403 if (!dv
) oom("sinterGenericCommand");
3404 for (j
= 0; j
< setsnum
; j
++) {
3408 lookupKeyWrite(c
->db
,setskeys
[j
]) :
3409 lookupKeyRead(c
->db
,setskeys
[j
]);
3413 deleteKey(c
->db
,dstkey
);
3414 addReply(c
,shared
.ok
);
3416 addReply(c
,shared
.nullmultibulk
);
3420 if (setobj
->type
!= REDIS_SET
) {
3422 addReply(c
,shared
.wrongtypeerr
);
3425 dv
[j
] = setobj
->ptr
;
3427 /* Sort sets from the smallest to largest, this will improve our
3428 * algorithm's performace */
3429 qsort(dv
,setsnum
,sizeof(dict
*),qsortCompareSetsByCardinality
);
3431 /* The first thing we should output is the total number of elements...
3432 * since this is a multi-bulk write, but at this stage we don't know
3433 * the intersection set size, so we use a trick, append an empty object
3434 * to the output list and save the pointer to later modify it with the
3437 lenobj
= createObject(REDIS_STRING
,NULL
);
3439 decrRefCount(lenobj
);
3441 /* If we have a target key where to store the resulting set
3442 * create this key with an empty set inside */
3443 dstset
= createSetObject();
3446 /* Iterate all the elements of the first (smallest) set, and test
3447 * the element against all the other sets, if at least one set does
3448 * not include the element it is discarded */
3449 di
= dictGetIterator(dv
[0]);
3450 if (!di
) oom("dictGetIterator");
3452 while((de
= dictNext(di
)) != NULL
) {
3455 for (j
= 1; j
< setsnum
; j
++)
3456 if (dictFind(dv
[j
],dictGetEntryKey(de
)) == NULL
) break;
3458 continue; /* at least one set does not contain the member */
3459 ele
= dictGetEntryKey(de
);
3461 addReplyBulkLen(c
,ele
);
3463 addReply(c
,shared
.crlf
);
3466 dictAdd(dstset
->ptr
,ele
,NULL
);
3470 dictReleaseIterator(di
);
3473 /* Store the resulting set into the target */
3474 deleteKey(c
->db
,dstkey
);
3475 dictAdd(c
->db
->dict
,dstkey
,dstset
);
3476 incrRefCount(dstkey
);
3480 lenobj
->ptr
= sdscatprintf(sdsempty(),"*%d\r\n",cardinality
);
3482 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",
3483 dictSize((dict
*)dstset
->ptr
)));
3489 static void sinterCommand(redisClient
*c
) {
3490 sinterGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
);
3493 static void sinterstoreCommand(redisClient
*c
) {
3494 sinterGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1]);
3497 #define REDIS_OP_UNION 0
3498 #define REDIS_OP_DIFF 1
3500 static void sunionDiffGenericCommand(redisClient
*c
, robj
**setskeys
, int setsnum
, robj
*dstkey
, int op
) {
3501 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
3504 robj
*dstset
= NULL
;
3505 int j
, cardinality
= 0;
3507 if (!dv
) oom("sunionDiffGenericCommand");
3508 for (j
= 0; j
< setsnum
; j
++) {
3512 lookupKeyWrite(c
->db
,setskeys
[j
]) :
3513 lookupKeyRead(c
->db
,setskeys
[j
]);
3518 if (setobj
->type
!= REDIS_SET
) {
3520 addReply(c
,shared
.wrongtypeerr
);
3523 dv
[j
] = setobj
->ptr
;
3526 /* We need a temp set object to store our union. If the dstkey
3527 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
3528 * this set object will be the resulting object to set into the target key*/
3529 dstset
= createSetObject();
3531 /* Iterate all the elements of all the sets, add every element a single
3532 * time to the result set */
3533 for (j
= 0; j
< setsnum
; j
++) {
3534 if (op
== REDIS_OP_DIFF
&& j
== 0 && !dv
[j
]) break; /* result set is empty */
3535 if (!dv
[j
]) continue; /* non existing keys are like empty sets */
3537 di
= dictGetIterator(dv
[j
]);
3538 if (!di
) oom("dictGetIterator");
3540 while((de
= dictNext(di
)) != NULL
) {
3543 /* dictAdd will not add the same element multiple times */
3544 ele
= dictGetEntryKey(de
);
3545 if (op
== REDIS_OP_UNION
|| j
== 0) {
3546 if (dictAdd(dstset
->ptr
,ele
,NULL
) == DICT_OK
) {
3550 } else if (op
== REDIS_OP_DIFF
) {
3551 if (dictDelete(dstset
->ptr
,ele
) == DICT_OK
) {
3556 dictReleaseIterator(di
);
3558 if (op
== REDIS_OP_DIFF
&& cardinality
== 0) break; /* result set is empty */
3561 /* Output the content of the resulting set, if not in STORE mode */
3563 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",cardinality
));
3564 di
= dictGetIterator(dstset
->ptr
);
3565 if (!di
) oom("dictGetIterator");
3566 while((de
= dictNext(di
)) != NULL
) {
3569 ele
= dictGetEntryKey(de
);
3570 addReplyBulkLen(c
,ele
);
3572 addReply(c
,shared
.crlf
);
3574 dictReleaseIterator(di
);
3576 /* If we have a target key where to store the resulting set
3577 * create this key with the result set inside */
3578 deleteKey(c
->db
,dstkey
);
3579 dictAdd(c
->db
->dict
,dstkey
,dstset
);
3580 incrRefCount(dstkey
);
3585 decrRefCount(dstset
);
3587 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",
3588 dictSize((dict
*)dstset
->ptr
)));
3594 static void sunionCommand(redisClient
*c
) {
3595 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_UNION
);
3598 static void sunionstoreCommand(redisClient
*c
) {
3599 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_UNION
);
3602 static void sdiffCommand(redisClient
*c
) {
3603 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_DIFF
);
3606 static void sdiffstoreCommand(redisClient
*c
) {
3607 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_DIFF
);
3610 static void flushdbCommand(redisClient
*c
) {
3611 server
.dirty
+= dictSize(c
->db
->dict
);
3612 dictEmpty(c
->db
->dict
);
3613 dictEmpty(c
->db
->expires
);
3614 addReply(c
,shared
.ok
);
3617 static void flushallCommand(redisClient
*c
) {
3618 server
.dirty
+= emptyDb();
3619 addReply(c
,shared
.ok
);
3620 rdbSave(server
.dbfilename
);
3624 static redisSortOperation
*createSortOperation(int type
, robj
*pattern
) {
3625 redisSortOperation
*so
= zmalloc(sizeof(*so
));
3626 if (!so
) oom("createSortOperation");
3628 so
->pattern
= pattern
;
3632 /* Return the value associated to the key with a name obtained
3633 * substituting the first occurence of '*' in 'pattern' with 'subst' */
3634 static robj
*lookupKeyByPattern(redisDb
*db
, robj
*pattern
, robj
*subst
) {
3638 int prefixlen
, sublen
, postfixlen
;
3639 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
3643 char buf
[REDIS_SORTKEY_MAX
+1];
3646 if (subst
->encoding
== REDIS_ENCODING_RAW
)
3647 incrRefCount(subst
);
3649 subst
= getDecodedObject(subst
);
3652 spat
= pattern
->ptr
;
3654 if (sdslen(spat
)+sdslen(ssub
)-1 > REDIS_SORTKEY_MAX
) return NULL
;
3655 p
= strchr(spat
,'*');
3656 if (!p
) return NULL
;
3659 sublen
= sdslen(ssub
);
3660 postfixlen
= sdslen(spat
)-(prefixlen
+1);
3661 memcpy(keyname
.buf
,spat
,prefixlen
);
3662 memcpy(keyname
.buf
+prefixlen
,ssub
,sublen
);
3663 memcpy(keyname
.buf
+prefixlen
+sublen
,p
+1,postfixlen
);
3664 keyname
.buf
[prefixlen
+sublen
+postfixlen
] = '\0';
3665 keyname
.len
= prefixlen
+sublen
+postfixlen
;
3667 keyobj
.refcount
= 1;
3668 keyobj
.type
= REDIS_STRING
;
3669 keyobj
.ptr
= ((char*)&keyname
)+(sizeof(long)*2);
3671 decrRefCount(subst
);
3673 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
3674 return lookupKeyRead(db
,&keyobj
);
3677 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
3678 * the additional parameter is not standard but a BSD-specific we have to
3679 * pass sorting parameters via the global 'server' structure */
3680 static int sortCompare(const void *s1
, const void *s2
) {
3681 const redisSortObject
*so1
= s1
, *so2
= s2
;
3684 if (!server
.sort_alpha
) {
3685 /* Numeric sorting. Here it's trivial as we precomputed scores */
3686 if (so1
->u
.score
> so2
->u
.score
) {
3688 } else if (so1
->u
.score
< so2
->u
.score
) {
3694 /* Alphanumeric sorting */
3695 if (server
.sort_bypattern
) {
3696 if (!so1
->u
.cmpobj
|| !so2
->u
.cmpobj
) {
3697 /* At least one compare object is NULL */
3698 if (so1
->u
.cmpobj
== so2
->u
.cmpobj
)
3700 else if (so1
->u
.cmpobj
== NULL
)
3705 /* We have both the objects, use strcoll */
3706 cmp
= strcoll(so1
->u
.cmpobj
->ptr
,so2
->u
.cmpobj
->ptr
);
3709 /* Compare elements directly */
3710 if (so1
->obj
->encoding
== REDIS_ENCODING_RAW
&&
3711 so2
->obj
->encoding
== REDIS_ENCODING_RAW
) {
3712 cmp
= strcoll(so1
->obj
->ptr
,so2
->obj
->ptr
);
3716 dec1
= so1
->obj
->encoding
== REDIS_ENCODING_RAW
?
3717 so1
->obj
: getDecodedObject(so1
->obj
);
3718 dec2
= so2
->obj
->encoding
== REDIS_ENCODING_RAW
?
3719 so2
->obj
: getDecodedObject(so2
->obj
);
3720 cmp
= strcoll(dec1
->ptr
,dec2
->ptr
);
3721 if (dec1
!= so1
->obj
) decrRefCount(dec1
);
3722 if (dec2
!= so2
->obj
) decrRefCount(dec2
);
3726 return server
.sort_desc
? -cmp
: cmp
;
3729 /* The SORT command is the most complex command in Redis. Warning: this code
3730 * is optimized for speed and a bit less for readability */
3731 static void sortCommand(redisClient
*c
) {
3734 int desc
= 0, alpha
= 0;
3735 int limit_start
= 0, limit_count
= -1, start
, end
;
3736 int j
, dontsort
= 0, vectorlen
;
3737 int getop
= 0; /* GET operation counter */
3738 robj
*sortval
, *sortby
= NULL
;
3739 redisSortObject
*vector
; /* Resulting vector to sort */
3741 /* Lookup the key to sort. It must be of the right types */
3742 sortval
= lookupKeyRead(c
->db
,c
->argv
[1]);
3743 if (sortval
== NULL
) {
3744 addReply(c
,shared
.nokeyerr
);
3747 if (sortval
->type
!= REDIS_SET
&& sortval
->type
!= REDIS_LIST
) {
3748 addReply(c
,shared
.wrongtypeerr
);
3752 /* Create a list of operations to perform for every sorted element.
3753 * Operations can be GET/DEL/INCR/DECR */
3754 operations
= listCreate();
3755 listSetFreeMethod(operations
,zfree
);
3758 /* Now we need to protect sortval incrementing its count, in the future
3759 * SORT may have options able to overwrite/delete keys during the sorting
3760 * and the sorted key itself may get destroied */
3761 incrRefCount(sortval
);
3763 /* The SORT command has an SQL-alike syntax, parse it */
3764 while(j
< c
->argc
) {
3765 int leftargs
= c
->argc
-j
-1;
3766 if (!strcasecmp(c
->argv
[j
]->ptr
,"asc")) {
3768 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"desc")) {
3770 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"alpha")) {
3772 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"limit") && leftargs
>= 2) {
3773 limit_start
= atoi(c
->argv
[j
+1]->ptr
);
3774 limit_count
= atoi(c
->argv
[j
+2]->ptr
);
3776 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"by") && leftargs
>= 1) {
3777 sortby
= c
->argv
[j
+1];
3778 /* If the BY pattern does not contain '*', i.e. it is constant,
3779 * we don't need to sort nor to lookup the weight keys. */
3780 if (strchr(c
->argv
[j
+1]->ptr
,'*') == NULL
) dontsort
= 1;
3782 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"get") && leftargs
>= 1) {
3783 listAddNodeTail(operations
,createSortOperation(
3784 REDIS_SORT_GET
,c
->argv
[j
+1]));
3787 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"del") && leftargs
>= 1) {
3788 listAddNodeTail(operations
,createSortOperation(
3789 REDIS_SORT_DEL
,c
->argv
[j
+1]));
3791 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"incr") && leftargs
>= 1) {
3792 listAddNodeTail(operations
,createSortOperation(
3793 REDIS_SORT_INCR
,c
->argv
[j
+1]));
3795 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"get") && leftargs
>= 1) {
3796 listAddNodeTail(operations
,createSortOperation(
3797 REDIS_SORT_DECR
,c
->argv
[j
+1]));
3800 decrRefCount(sortval
);
3801 listRelease(operations
);
3802 addReply(c
,shared
.syntaxerr
);
3808 /* Load the sorting vector with all the objects to sort */
3809 vectorlen
= (sortval
->type
== REDIS_LIST
) ?
3810 listLength((list
*)sortval
->ptr
) :
3811 dictSize((dict
*)sortval
->ptr
);
3812 vector
= zmalloc(sizeof(redisSortObject
)*vectorlen
);
3813 if (!vector
) oom("allocating objects vector for SORT");
3815 if (sortval
->type
== REDIS_LIST
) {
3816 list
*list
= sortval
->ptr
;
3820 while((ln
= listYield(list
))) {
3821 robj
*ele
= ln
->value
;
3822 vector
[j
].obj
= ele
;
3823 vector
[j
].u
.score
= 0;
3824 vector
[j
].u
.cmpobj
= NULL
;
3828 dict
*set
= sortval
->ptr
;
3832 di
= dictGetIterator(set
);
3833 if (!di
) oom("dictGetIterator");
3834 while((setele
= dictNext(di
)) != NULL
) {
3835 vector
[j
].obj
= dictGetEntryKey(setele
);
3836 vector
[j
].u
.score
= 0;
3837 vector
[j
].u
.cmpobj
= NULL
;
3840 dictReleaseIterator(di
);
3842 assert(j
== vectorlen
);
3844 /* Now it's time to load the right scores in the sorting vector */
3845 if (dontsort
== 0) {
3846 for (j
= 0; j
< vectorlen
; j
++) {
3850 byval
= lookupKeyByPattern(c
->db
,sortby
,vector
[j
].obj
);
3851 if (!byval
|| byval
->type
!= REDIS_STRING
) continue;
3853 if (byval
->encoding
== REDIS_ENCODING_RAW
) {
3854 vector
[j
].u
.cmpobj
= byval
;
3855 incrRefCount(byval
);
3857 vector
[j
].u
.cmpobj
= getDecodedObject(byval
);
3860 if (byval
->encoding
== REDIS_ENCODING_RAW
) {
3861 vector
[j
].u
.score
= strtod(byval
->ptr
,NULL
);
3863 if (byval
->encoding
== REDIS_ENCODING_INT
) {
3864 vector
[j
].u
.score
= (long)byval
->ptr
;
3871 if (vector
[j
].obj
->encoding
== REDIS_ENCODING_RAW
)
3872 vector
[j
].u
.score
= strtod(vector
[j
].obj
->ptr
,NULL
);
3874 if (vector
[j
].obj
->encoding
== REDIS_ENCODING_INT
)
3875 vector
[j
].u
.score
= (long) vector
[j
].obj
->ptr
;
3884 /* We are ready to sort the vector... perform a bit of sanity check
3885 * on the LIMIT option too. We'll use a partial version of quicksort. */
3886 start
= (limit_start
< 0) ? 0 : limit_start
;
3887 end
= (limit_count
< 0) ? vectorlen
-1 : start
+limit_count
-1;
3888 if (start
>= vectorlen
) {
3889 start
= vectorlen
-1;
3892 if (end
>= vectorlen
) end
= vectorlen
-1;
3894 if (dontsort
== 0) {
3895 server
.sort_desc
= desc
;
3896 server
.sort_alpha
= alpha
;
3897 server
.sort_bypattern
= sortby
? 1 : 0;
3898 if (sortby
&& (start
!= 0 || end
!= vectorlen
-1))
3899 pqsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
, start
,end
);
3901 qsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
);
3904 /* Send command output to the output buffer, performing the specified
3905 * GET/DEL/INCR/DECR operations if any. */
3906 outputlen
= getop
? getop
*(end
-start
+1) : end
-start
+1;
3907 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",outputlen
));
3908 for (j
= start
; j
<= end
; j
++) {
3911 addReplyBulkLen(c
,vector
[j
].obj
);
3912 addReply(c
,vector
[j
].obj
);
3913 addReply(c
,shared
.crlf
);
3915 listRewind(operations
);
3916 while((ln
= listYield(operations
))) {
3917 redisSortOperation
*sop
= ln
->value
;
3918 robj
*val
= lookupKeyByPattern(c
->db
,sop
->pattern
,
3921 if (sop
->type
== REDIS_SORT_GET
) {
3922 if (!val
|| val
->type
!= REDIS_STRING
) {
3923 addReply(c
,shared
.nullbulk
);
3925 addReplyBulkLen(c
,val
);
3927 addReply(c
,shared
.crlf
);
3929 } else if (sop
->type
== REDIS_SORT_DEL
) {
3936 decrRefCount(sortval
);
3937 listRelease(operations
);
3938 for (j
= 0; j
< vectorlen
; j
++) {
3939 if (sortby
&& alpha
&& vector
[j
].u
.cmpobj
)
3940 decrRefCount(vector
[j
].u
.cmpobj
);
3945 static void infoCommand(redisClient
*c
) {
3947 time_t uptime
= time(NULL
)-server
.stat_starttime
;
3950 info
= sdscatprintf(sdsempty(),
3951 "redis_version:%s\r\n"
3953 "uptime_in_seconds:%d\r\n"
3954 "uptime_in_days:%d\r\n"
3955 "connected_clients:%d\r\n"
3956 "connected_slaves:%d\r\n"
3957 "used_memory:%zu\r\n"
3958 "changes_since_last_save:%lld\r\n"
3959 "bgsave_in_progress:%d\r\n"
3960 "last_save_time:%d\r\n"
3961 "total_connections_received:%lld\r\n"
3962 "total_commands_processed:%lld\r\n"
3965 (sizeof(long) == 8) ? "64" : "32",
3968 listLength(server
.clients
)-listLength(server
.slaves
),
3969 listLength(server
.slaves
),
3972 server
.bgsaveinprogress
,
3974 server
.stat_numconnections
,
3975 server
.stat_numcommands
,
3976 server
.masterhost
== NULL
? "master" : "slave"
3978 if (server
.masterhost
) {
3979 info
= sdscatprintf(info
,
3980 "master_host:%s\r\n"
3981 "master_port:%d\r\n"
3982 "master_link_status:%s\r\n"
3983 "master_last_io_seconds_ago:%d\r\n"
3986 (server
.replstate
== REDIS_REPL_CONNECTED
) ?
3988 (int)(time(NULL
)-server
.master
->lastinteraction
)
3991 for (j
= 0; j
< server
.dbnum
; j
++) {
3992 long long keys
, vkeys
;
3994 keys
= dictSize(server
.db
[j
].dict
);
3995 vkeys
= dictSize(server
.db
[j
].expires
);
3996 if (keys
|| vkeys
) {
3997 info
= sdscatprintf(info
, "db%d: keys=%lld,expires=%lld\r\n",
4001 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info
)));
4002 addReplySds(c
,info
);
4003 addReply(c
,shared
.crlf
);
4006 static void monitorCommand(redisClient
*c
) {
4007 /* ignore MONITOR if aleady slave or in monitor mode */
4008 if (c
->flags
& REDIS_SLAVE
) return;
4010 c
->flags
|= (REDIS_SLAVE
|REDIS_MONITOR
);
4012 if (!listAddNodeTail(server
.monitors
,c
)) oom("listAddNodeTail");
4013 addReply(c
,shared
.ok
);
4016 /* ================================= Expire ================================= */
4017 static int removeExpire(redisDb
*db
, robj
*key
) {
4018 if (dictDelete(db
->expires
,key
) == DICT_OK
) {
4025 static int setExpire(redisDb
*db
, robj
*key
, time_t when
) {
4026 if (dictAdd(db
->expires
,key
,(void*)when
) == DICT_ERR
) {
4034 /* Return the expire time of the specified key, or -1 if no expire
4035 * is associated with this key (i.e. the key is non volatile) */
4036 static time_t getExpire(redisDb
*db
, robj
*key
) {
4039 /* No expire? return ASAP */
4040 if (dictSize(db
->expires
) == 0 ||
4041 (de
= dictFind(db
->expires
,key
)) == NULL
) return -1;
4043 return (time_t) dictGetEntryVal(de
);
4046 static int expireIfNeeded(redisDb
*db
, robj
*key
) {
4050 /* No expire? return ASAP */
4051 if (dictSize(db
->expires
) == 0 ||
4052 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
4054 /* Lookup the expire */
4055 when
= (time_t) dictGetEntryVal(de
);
4056 if (time(NULL
) <= when
) return 0;
4058 /* Delete the key */
4059 dictDelete(db
->expires
,key
);
4060 return dictDelete(db
->dict
,key
) == DICT_OK
;
4063 static int deleteIfVolatile(redisDb
*db
, robj
*key
) {
4066 /* No expire? return ASAP */
4067 if (dictSize(db
->expires
) == 0 ||
4068 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
4070 /* Delete the key */
4072 dictDelete(db
->expires
,key
);
4073 return dictDelete(db
->dict
,key
) == DICT_OK
;
4076 static void expireCommand(redisClient
*c
) {
4078 int seconds
= atoi(c
->argv
[2]->ptr
);
4080 de
= dictFind(c
->db
->dict
,c
->argv
[1]);
4082 addReply(c
,shared
.czero
);
4086 addReply(c
, shared
.czero
);
4089 time_t when
= time(NULL
)+seconds
;
4090 if (setExpire(c
->db
,c
->argv
[1],when
)) {
4091 addReply(c
,shared
.cone
);
4094 addReply(c
,shared
.czero
);
4100 static void ttlCommand(redisClient
*c
) {
4104 expire
= getExpire(c
->db
,c
->argv
[1]);
4106 ttl
= (int) (expire
-time(NULL
));
4107 if (ttl
< 0) ttl
= -1;
4109 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",ttl
));
4112 static void msetGenericCommand(redisClient
*c
, int nx
) {
4115 if ((c
->argc
% 2) == 0) {
4116 addReplySds(c
,sdsnew("-ERR wrong number of arguments\r\n"));
4119 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
4120 * set nothing at all if at least one already key exists. */
4122 for (j
= 1; j
< c
->argc
; j
+= 2) {
4123 if (dictFind(c
->db
->dict
,c
->argv
[j
]) != NULL
) {
4124 addReply(c
, shared
.czero
);
4130 for (j
= 1; j
< c
->argc
; j
+= 2) {
4133 retval
= dictAdd(c
->db
->dict
,c
->argv
[j
],c
->argv
[j
+1]);
4134 if (retval
== DICT_ERR
) {
4135 dictReplace(c
->db
->dict
,c
->argv
[j
],c
->argv
[j
+1]);
4136 incrRefCount(c
->argv
[j
+1]);
4138 incrRefCount(c
->argv
[j
]);
4139 incrRefCount(c
->argv
[j
+1]);
4141 removeExpire(c
->db
,c
->argv
[j
]);
4143 server
.dirty
+= (c
->argc
-1)/2;
4144 addReply(c
, nx
? shared
.cone
: shared
.ok
);
4147 static void msetCommand(redisClient
*c
) {
4148 msetGenericCommand(c
,0);
4151 static void msetnxCommand(redisClient
*c
) {
4152 msetGenericCommand(c
,1);
4155 /* =============================== Replication ============================= */
4157 static int syncWrite(int fd
, char *ptr
, ssize_t size
, int timeout
) {
4158 ssize_t nwritten
, ret
= size
;
4159 time_t start
= time(NULL
);
4163 if (aeWait(fd
,AE_WRITABLE
,1000) & AE_WRITABLE
) {
4164 nwritten
= write(fd
,ptr
,size
);
4165 if (nwritten
== -1) return -1;
4169 if ((time(NULL
)-start
) > timeout
) {
4177 static int syncRead(int fd
, char *ptr
, ssize_t size
, int timeout
) {
4178 ssize_t nread
, totread
= 0;
4179 time_t start
= time(NULL
);
4183 if (aeWait(fd
,AE_READABLE
,1000) & AE_READABLE
) {
4184 nread
= read(fd
,ptr
,size
);
4185 if (nread
== -1) return -1;
4190 if ((time(NULL
)-start
) > timeout
) {
4198 static int syncReadLine(int fd
, char *ptr
, ssize_t size
, int timeout
) {
4205 if (syncRead(fd
,&c
,1,timeout
) == -1) return -1;
4208 if (nread
&& *(ptr
-1) == '\r') *(ptr
-1) = '\0';
4219 static void syncCommand(redisClient
*c
) {
4220 /* ignore SYNC if aleady slave or in monitor mode */
4221 if (c
->flags
& REDIS_SLAVE
) return;
4223 /* SYNC can't be issued when the server has pending data to send to
4224 * the client about already issued commands. We need a fresh reply
4225 * buffer registering the differences between the BGSAVE and the current
4226 * dataset, so that we can copy to other slaves if needed. */
4227 if (listLength(c
->reply
) != 0) {
4228 addReplySds(c
,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
4232 redisLog(REDIS_NOTICE
,"Slave ask for synchronization");
4233 /* Here we need to check if there is a background saving operation
4234 * in progress, or if it is required to start one */
4235 if (server
.bgsaveinprogress
) {
4236 /* Ok a background save is in progress. Let's check if it is a good
4237 * one for replication, i.e. if there is another slave that is
4238 * registering differences since the server forked to save */
4242 listRewind(server
.slaves
);
4243 while((ln
= listYield(server
.slaves
))) {
4245 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) break;
4248 /* Perfect, the server is already registering differences for
4249 * another slave. Set the right state, and copy the buffer. */
4250 listRelease(c
->reply
);
4251 c
->reply
= listDup(slave
->reply
);
4252 if (!c
->reply
) oom("listDup copying slave reply list");
4253 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
4254 redisLog(REDIS_NOTICE
,"Waiting for end of BGSAVE for SYNC");
4256 /* No way, we need to wait for the next BGSAVE in order to
4257 * register differences */
4258 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_START
;
4259 redisLog(REDIS_NOTICE
,"Waiting for next BGSAVE for SYNC");
4262 /* Ok we don't have a BGSAVE in progress, let's start one */
4263 redisLog(REDIS_NOTICE
,"Starting BGSAVE for SYNC");
4264 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
4265 redisLog(REDIS_NOTICE
,"Replication failed, can't BGSAVE");
4266 addReplySds(c
,sdsnew("-ERR Unalbe to perform background save\r\n"));
4269 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
4272 c
->flags
|= REDIS_SLAVE
;
4274 if (!listAddNodeTail(server
.slaves
,c
)) oom("listAddNodeTail");
4278 static void sendBulkToSlave(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
4279 redisClient
*slave
= privdata
;
4281 REDIS_NOTUSED(mask
);
4282 char buf
[REDIS_IOBUF_LEN
];
4283 ssize_t nwritten
, buflen
;
4285 if (slave
->repldboff
== 0) {
4286 /* Write the bulk write count before to transfer the DB. In theory here
4287 * we don't know how much room there is in the output buffer of the
4288 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
4289 * operations) will never be smaller than the few bytes we need. */
4292 bulkcount
= sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
4294 if (write(fd
,bulkcount
,sdslen(bulkcount
)) != (signed)sdslen(bulkcount
))
4302 lseek(slave
->repldbfd
,slave
->repldboff
,SEEK_SET
);
4303 buflen
= read(slave
->repldbfd
,buf
,REDIS_IOBUF_LEN
);
4305 redisLog(REDIS_WARNING
,"Read error sending DB to slave: %s",
4306 (buflen
== 0) ? "premature EOF" : strerror(errno
));
4310 if ((nwritten
= write(fd
,buf
,buflen
)) == -1) {
4311 redisLog(REDIS_DEBUG
,"Write error sending DB to slave: %s",
4316 slave
->repldboff
+= nwritten
;
4317 if (slave
->repldboff
== slave
->repldbsize
) {
4318 close(slave
->repldbfd
);
4319 slave
->repldbfd
= -1;
4320 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
4321 slave
->replstate
= REDIS_REPL_ONLINE
;
4322 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
,
4323 sendReplyToClient
, slave
, NULL
) == AE_ERR
) {
4327 addReplySds(slave
,sdsempty());
4328 redisLog(REDIS_NOTICE
,"Synchronization with slave succeeded");
4332 /* This function is called at the end of every backgrond saving.
4333 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
4334 * otherwise REDIS_ERR is passed to the function.
4336 * The goal of this function is to handle slaves waiting for a successful
4337 * background saving in order to perform non-blocking synchronization. */
4338 static void updateSlavesWaitingBgsave(int bgsaveerr
) {
4340 int startbgsave
= 0;
4342 listRewind(server
.slaves
);
4343 while((ln
= listYield(server
.slaves
))) {
4344 redisClient
*slave
= ln
->value
;
4346 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) {
4348 slave
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
4349 } else if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) {
4350 struct redis_stat buf
;
4352 if (bgsaveerr
!= REDIS_OK
) {
4354 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE child returned an error");
4357 if ((slave
->repldbfd
= open(server
.dbfilename
,O_RDONLY
)) == -1 ||
4358 redis_fstat(slave
->repldbfd
,&buf
) == -1) {
4360 redisLog(REDIS_WARNING
,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno
));
4363 slave
->repldboff
= 0;
4364 slave
->repldbsize
= buf
.st_size
;
4365 slave
->replstate
= REDIS_REPL_SEND_BULK
;
4366 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
4367 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
, sendBulkToSlave
, slave
, NULL
) == AE_ERR
) {
4374 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
4375 listRewind(server
.slaves
);
4376 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE failed");
4377 while((ln
= listYield(server
.slaves
))) {
4378 redisClient
*slave
= ln
->value
;
4380 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
)
4387 static int syncWithMaster(void) {
4388 char buf
[1024], tmpfile
[256];
4390 int fd
= anetTcpConnect(NULL
,server
.masterhost
,server
.masterport
);
4394 redisLog(REDIS_WARNING
,"Unable to connect to MASTER: %s",
4398 /* Issue the SYNC command */
4399 if (syncWrite(fd
,"SYNC \r\n",7,5) == -1) {
4401 redisLog(REDIS_WARNING
,"I/O error writing to MASTER: %s",
4405 /* Read the bulk write count */
4406 if (syncReadLine(fd
,buf
,1024,3600) == -1) {
4408 redisLog(REDIS_WARNING
,"I/O error reading bulk count from MASTER: %s",
4412 dumpsize
= atoi(buf
+1);
4413 redisLog(REDIS_NOTICE
,"Receiving %d bytes data dump from MASTER",dumpsize
);
4414 /* Read the bulk write data on a temp file */
4415 snprintf(tmpfile
,256,"temp-%d.%ld.rdb",(int)time(NULL
),(long int)random());
4416 dfd
= open(tmpfile
,O_CREAT
|O_WRONLY
,0644);
4419 redisLog(REDIS_WARNING
,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno
));
4423 int nread
, nwritten
;
4425 nread
= read(fd
,buf
,(dumpsize
< 1024)?dumpsize
:1024);
4427 redisLog(REDIS_WARNING
,"I/O error trying to sync with MASTER: %s",
4433 nwritten
= write(dfd
,buf
,nread
);
4434 if (nwritten
== -1) {
4435 redisLog(REDIS_WARNING
,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno
));
4443 if (rename(tmpfile
,server
.dbfilename
) == -1) {
4444 redisLog(REDIS_WARNING
,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno
));
4450 if (rdbLoad(server
.dbfilename
) != REDIS_OK
) {
4451 redisLog(REDIS_WARNING
,"Failed trying to load the MASTER synchronization DB from disk");
4455 server
.master
= createClient(fd
);
4456 server
.master
->flags
|= REDIS_MASTER
;
4457 server
.replstate
= REDIS_REPL_CONNECTED
;
4461 static void slaveofCommand(redisClient
*c
) {
4462 if (!strcasecmp(c
->argv
[1]->ptr
,"no") &&
4463 !strcasecmp(c
->argv
[2]->ptr
,"one")) {
4464 if (server
.masterhost
) {
4465 sdsfree(server
.masterhost
);
4466 server
.masterhost
= NULL
;
4467 if (server
.master
) freeClient(server
.master
);
4468 server
.replstate
= REDIS_REPL_NONE
;
4469 redisLog(REDIS_NOTICE
,"MASTER MODE enabled (user request)");
4472 sdsfree(server
.masterhost
);
4473 server
.masterhost
= sdsdup(c
->argv
[1]->ptr
);
4474 server
.masterport
= atoi(c
->argv
[2]->ptr
);
4475 if (server
.master
) freeClient(server
.master
);
4476 server
.replstate
= REDIS_REPL_CONNECT
;
4477 redisLog(REDIS_NOTICE
,"SLAVE OF %s:%d enabled (user request)",
4478 server
.masterhost
, server
.masterport
);
4480 addReply(c
,shared
.ok
);
4483 /* ============================ Maxmemory directive ======================== */
4485 /* This function gets called when 'maxmemory' is set on the config file to limit
4486 * the max memory used by the server, and we are out of memory.
4487 * This function will try to, in order:
4489 * - Free objects from the free list
4490 * - Try to remove keys with an EXPIRE set
4492 * It is not possible to free enough memory to reach used-memory < maxmemory
4493 * the server will start refusing commands that will enlarge even more the
4496 static void freeMemoryIfNeeded(void) {
4497 while (server
.maxmemory
&& zmalloc_used_memory() > server
.maxmemory
) {
4498 if (listLength(server
.objfreelist
)) {
4501 listNode
*head
= listFirst(server
.objfreelist
);
4502 o
= listNodeValue(head
);
4503 listDelNode(server
.objfreelist
,head
);
4506 int j
, k
, freed
= 0;
4508 for (j
= 0; j
< server
.dbnum
; j
++) {
4510 robj
*minkey
= NULL
;
4511 struct dictEntry
*de
;
4513 if (dictSize(server
.db
[j
].expires
)) {
4515 /* From a sample of three keys drop the one nearest to
4516 * the natural expire */
4517 for (k
= 0; k
< 3; k
++) {
4520 de
= dictGetRandomKey(server
.db
[j
].expires
);
4521 t
= (time_t) dictGetEntryVal(de
);
4522 if (minttl
== -1 || t
< minttl
) {
4523 minkey
= dictGetEntryKey(de
);
4527 deleteKey(server
.db
+j
,minkey
);
4530 if (!freed
) return; /* nothing to free... */
4535 /* ================================= Debugging ============================== */
4537 static void debugCommand(redisClient
*c
) {
4538 if (!strcasecmp(c
->argv
[1]->ptr
,"segfault")) {
4540 } else if (!strcasecmp(c
->argv
[1]->ptr
,"object") && c
->argc
== 3) {
4541 dictEntry
*de
= dictFind(c
->db
->dict
,c
->argv
[2]);
4545 addReply(c
,shared
.nokeyerr
);
4548 key
= dictGetEntryKey(de
);
4549 val
= dictGetEntryVal(de
);
4550 addReplySds(c
,sdscatprintf(sdsempty(),
4551 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
4552 key
, key
->refcount
, val
, val
->refcount
, val
->encoding
));
4554 addReplySds(c
,sdsnew(
4555 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>]\r\n"));
4559 #ifdef HAVE_BACKTRACE
4560 static struct redisFunctionSym symsTable
[] = {
4561 {"compareStringObjects", (unsigned long)compareStringObjects
},
4562 {"isStringRepresentableAsLong", (unsigned long)isStringRepresentableAsLong
},
4563 {"dictEncObjKeyCompare", (unsigned long)dictEncObjKeyCompare
},
4564 {"dictEncObjHash", (unsigned long)dictEncObjHash
},
4565 {"incrDecrCommand", (unsigned long)incrDecrCommand
},
4566 {"freeStringObject", (unsigned long)freeStringObject
},
4567 {"freeListObject", (unsigned long)freeListObject
},
4568 {"freeSetObject", (unsigned long)freeSetObject
},
4569 {"decrRefCount", (unsigned long)decrRefCount
},
4570 {"createObject", (unsigned long)createObject
},
4571 {"freeClient", (unsigned long)freeClient
},
4572 {"rdbLoad", (unsigned long)rdbLoad
},
4573 {"rdbSaveStringObject", (unsigned long)rdbSaveStringObject
},
4574 {"rdbSaveStringObjectRaw", (unsigned long)rdbSaveStringObjectRaw
},
4575 {"addReply", (unsigned long)addReply
},
4576 {"addReplySds", (unsigned long)addReplySds
},
4577 {"incrRefCount", (unsigned long)incrRefCount
},
4578 {"rdbSaveBackground", (unsigned long)rdbSaveBackground
},
4579 {"createStringObject", (unsigned long)createStringObject
},
4580 {"replicationFeedSlaves", (unsigned long)replicationFeedSlaves
},
4581 {"syncWithMaster", (unsigned long)syncWithMaster
},
4582 {"tryObjectSharing", (unsigned long)tryObjectSharing
},
4583 {"tryObjectEncoding", (unsigned long)tryObjectEncoding
},
4584 {"getDecodedObject", (unsigned long)getDecodedObject
},
4585 {"removeExpire", (unsigned long)removeExpire
},
4586 {"expireIfNeeded", (unsigned long)expireIfNeeded
},
4587 {"deleteIfVolatile", (unsigned long)deleteIfVolatile
},
4588 {"deleteKey", (unsigned long)deleteKey
},
4589 {"getExpire", (unsigned long)getExpire
},
4590 {"setExpire", (unsigned long)setExpire
},
4591 {"updateSlavesWaitingBgsave", (unsigned long)updateSlavesWaitingBgsave
},
4592 {"freeMemoryIfNeeded", (unsigned long)freeMemoryIfNeeded
},
4593 {"authCommand", (unsigned long)authCommand
},
4594 {"pingCommand", (unsigned long)pingCommand
},
4595 {"echoCommand", (unsigned long)echoCommand
},
4596 {"setCommand", (unsigned long)setCommand
},
4597 {"setnxCommand", (unsigned long)setnxCommand
},
4598 {"getCommand", (unsigned long)getCommand
},
4599 {"delCommand", (unsigned long)delCommand
},
4600 {"existsCommand", (unsigned long)existsCommand
},
4601 {"incrCommand", (unsigned long)incrCommand
},
4602 {"decrCommand", (unsigned long)decrCommand
},
4603 {"incrbyCommand", (unsigned long)incrbyCommand
},
4604 {"decrbyCommand", (unsigned long)decrbyCommand
},
4605 {"selectCommand", (unsigned long)selectCommand
},
4606 {"randomkeyCommand", (unsigned long)randomkeyCommand
},
4607 {"keysCommand", (unsigned long)keysCommand
},
4608 {"dbsizeCommand", (unsigned long)dbsizeCommand
},
4609 {"lastsaveCommand", (unsigned long)lastsaveCommand
},
4610 {"saveCommand", (unsigned long)saveCommand
},
4611 {"bgsaveCommand", (unsigned long)bgsaveCommand
},
4612 {"shutdownCommand", (unsigned long)shutdownCommand
},
4613 {"moveCommand", (unsigned long)moveCommand
},
4614 {"renameCommand", (unsigned long)renameCommand
},
4615 {"renamenxCommand", (unsigned long)renamenxCommand
},
4616 {"lpushCommand", (unsigned long)lpushCommand
},
4617 {"rpushCommand", (unsigned long)rpushCommand
},
4618 {"lpopCommand", (unsigned long)lpopCommand
},
4619 {"rpopCommand", (unsigned long)rpopCommand
},
4620 {"llenCommand", (unsigned long)llenCommand
},
4621 {"lindexCommand", (unsigned long)lindexCommand
},
4622 {"lrangeCommand", (unsigned long)lrangeCommand
},
4623 {"ltrimCommand", (unsigned long)ltrimCommand
},
4624 {"typeCommand", (unsigned long)typeCommand
},
4625 {"lsetCommand", (unsigned long)lsetCommand
},
4626 {"saddCommand", (unsigned long)saddCommand
},
4627 {"sremCommand", (unsigned long)sremCommand
},
4628 {"smoveCommand", (unsigned long)smoveCommand
},
4629 {"sismemberCommand", (unsigned long)sismemberCommand
},
4630 {"scardCommand", (unsigned long)scardCommand
},
4631 {"spopCommand", (unsigned long)spopCommand
},
4632 {"sinterCommand", (unsigned long)sinterCommand
},
4633 {"sinterstoreCommand", (unsigned long)sinterstoreCommand
},
4634 {"sunionCommand", (unsigned long)sunionCommand
},
4635 {"sunionstoreCommand", (unsigned long)sunionstoreCommand
},
4636 {"sdiffCommand", (unsigned long)sdiffCommand
},
4637 {"sdiffstoreCommand", (unsigned long)sdiffstoreCommand
},
4638 {"syncCommand", (unsigned long)syncCommand
},
4639 {"flushdbCommand", (unsigned long)flushdbCommand
},
4640 {"flushallCommand", (unsigned long)flushallCommand
},
4641 {"sortCommand", (unsigned long)sortCommand
},
4642 {"lremCommand", (unsigned long)lremCommand
},
4643 {"infoCommand", (unsigned long)infoCommand
},
4644 {"mgetCommand", (unsigned long)mgetCommand
},
4645 {"monitorCommand", (unsigned long)monitorCommand
},
4646 {"expireCommand", (unsigned long)expireCommand
},
4647 {"getsetCommand", (unsigned long)getsetCommand
},
4648 {"ttlCommand", (unsigned long)ttlCommand
},
4649 {"slaveofCommand", (unsigned long)slaveofCommand
},
4650 {"debugCommand", (unsigned long)debugCommand
},
4651 {"processCommand", (unsigned long)processCommand
},
4652 {"setupSigSegvAction", (unsigned long)setupSigSegvAction
},
4653 {"readQueryFromClient", (unsigned long)readQueryFromClient
},
4654 {"rdbRemoveTempFile", (unsigned long)rdbRemoveTempFile
},
4655 {"msetGenericCommand", (unsigned long)msetGenericCommand
},
4656 {"msetCommand", (unsigned long)msetCommand
},
4657 {"msetnxCommand", (unsigned long)msetnxCommand
},
4661 /* This function try to convert a pointer into a function name. It's used in
4662 * oreder to provide a backtrace under segmentation fault that's able to
4663 * display functions declared as static (otherwise the backtrace is useless). */
4664 static char *findFuncName(void *pointer
, unsigned long *offset
){
4666 unsigned long off
, minoff
= 0;
4668 /* Try to match against the Symbol with the smallest offset */
4669 for (i
=0; symsTable
[i
].pointer
; i
++) {
4670 unsigned long lp
= (unsigned long) pointer
;
4672 if (lp
!= (unsigned long)-1 && lp
>= symsTable
[i
].pointer
) {
4673 off
=lp
-symsTable
[i
].pointer
;
4674 if (ret
< 0 || off
< minoff
) {
4680 if (ret
== -1) return NULL
;
4682 return symsTable
[ret
].name
;
4685 static void *getMcontextEip(ucontext_t
*uc
) {
4686 #if defined(__FreeBSD__)
4687 return (void*) uc
->uc_mcontext
.mc_eip
;
4688 #elif defined(__dietlibc__)
4689 return (void*) uc
->uc_mcontext
.eip
;
4690 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
4691 return (void*) uc
->uc_mcontext
->__ss
.__eip
;
4692 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
4693 #ifdef _STRUCT_X86_THREAD_STATE64
4694 return (void*) uc
->uc_mcontext
->__ss
.__rip
;
4696 return (void*) uc
->uc_mcontext
->__ss
.__eip
;
4698 #elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
4699 return (void*) uc
->uc_mcontext
.gregs
[REG_EIP
];
4700 #elif defined(__ia64__) /* Linux IA64 */
4701 return (void*) uc
->uc_mcontext
.sc_ip
;
4707 static void segvHandler(int sig
, siginfo_t
*info
, void *secret
) {
4709 char **messages
= NULL
;
4710 int i
, trace_size
= 0;
4711 unsigned long offset
=0;
4712 time_t uptime
= time(NULL
)-server
.stat_starttime
;
4713 ucontext_t
*uc
= (ucontext_t
*) secret
;
4714 REDIS_NOTUSED(info
);
4716 redisLog(REDIS_WARNING
,
4717 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION
, sig
);
4718 redisLog(REDIS_WARNING
, "%s", sdscatprintf(sdsempty(),
4719 "redis_version:%s; "
4720 "uptime_in_seconds:%d; "
4721 "connected_clients:%d; "
4722 "connected_slaves:%d; "
4724 "changes_since_last_save:%lld; "
4725 "bgsave_in_progress:%d; "
4726 "last_save_time:%d; "
4727 "total_connections_received:%lld; "
4728 "total_commands_processed:%lld; "
4732 listLength(server
.clients
)-listLength(server
.slaves
),
4733 listLength(server
.slaves
),
4736 server
.bgsaveinprogress
,
4738 server
.stat_numconnections
,
4739 server
.stat_numcommands
,
4740 server
.masterhost
== NULL
? "master" : "slave"
4743 trace_size
= backtrace(trace
, 100);
4744 /* overwrite sigaction with caller's address */
4745 if (getMcontextEip(uc
) != NULL
) {
4746 trace
[1] = getMcontextEip(uc
);
4748 messages
= backtrace_symbols(trace
, trace_size
);
4750 for (i
=1; i
<trace_size
; ++i
) {
4751 char *fn
= findFuncName(trace
[i
], &offset
), *p
;
4753 p
= strchr(messages
[i
],'+');
4754 if (!fn
|| (p
&& ((unsigned long)strtol(p
+1,NULL
,10)) < offset
)) {
4755 redisLog(REDIS_WARNING
,"%s", messages
[i
]);
4757 redisLog(REDIS_WARNING
,"%d redis-server %p %s + %d", i
, trace
[i
], fn
, (unsigned int)offset
);
4764 static void setupSigSegvAction(void) {
4765 struct sigaction act
;
4767 sigemptyset (&act
.sa_mask
);
4768 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
4769 * is used. Otherwise, sa_handler is used */
4770 act
.sa_flags
= SA_NODEFER
| SA_ONSTACK
| SA_RESETHAND
| SA_SIGINFO
;
4771 act
.sa_sigaction
= segvHandler
;
4772 sigaction (SIGSEGV
, &act
, NULL
);
4773 sigaction (SIGBUS
, &act
, NULL
);
4774 sigaction (SIGFPE
, &act
, NULL
);
4775 sigaction (SIGILL
, &act
, NULL
);
4776 sigaction (SIGBUS
, &act
, NULL
);
4779 #else /* HAVE_BACKTRACE */
4780 static void setupSigSegvAction(void) {
4782 #endif /* HAVE_BACKTRACE */
4784 /* =================================== Main! ================================ */
4787 int linuxOvercommitMemoryValue(void) {
4788 FILE *fp
= fopen("/proc/sys/vm/overcommit_memory","r");
4792 if (fgets(buf
,64,fp
) == NULL
) {
4801 void linuxOvercommitMemoryWarning(void) {
4802 if (linuxOvercommitMemoryValue() == 0) {
4803 redisLog(REDIS_WARNING
,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
4806 #endif /* __linux__ */
4808 static void daemonize(void) {
4812 if (fork() != 0) exit(0); /* parent exits */
4813 setsid(); /* create a new session */
4815 /* Every output goes to /dev/null. If Redis is daemonized but
4816 * the 'logfile' is set to 'stdout' in the configuration file
4817 * it will not log at all. */
4818 if ((fd
= open("/dev/null", O_RDWR
, 0)) != -1) {
4819 dup2(fd
, STDIN_FILENO
);
4820 dup2(fd
, STDOUT_FILENO
);
4821 dup2(fd
, STDERR_FILENO
);
4822 if (fd
> STDERR_FILENO
) close(fd
);
4824 /* Try to write the pid file */
4825 fp
= fopen(server
.pidfile
,"w");
4827 fprintf(fp
,"%d\n",getpid());
4832 int main(int argc
, char **argv
) {
4835 ResetServerSaveParams();
4836 loadServerConfig(argv
[1]);
4837 } else if (argc
> 2) {
4838 fprintf(stderr
,"Usage: ./redis-server [/path/to/redis.conf]\n");
4841 redisLog(REDIS_WARNING
,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
4844 if (server
.daemonize
) daemonize();
4845 redisLog(REDIS_NOTICE
,"Server started, Redis version " REDIS_VERSION
);
4847 linuxOvercommitMemoryWarning();
4849 if (rdbLoad(server
.dbfilename
) == REDIS_OK
)
4850 redisLog(REDIS_NOTICE
,"DB loaded from disk");
4851 if (aeCreateFileEvent(server
.el
, server
.fd
, AE_READABLE
,
4852 acceptHandler
, NULL
, NULL
) == AE_ERR
) oom("creating file event");
4853 redisLog(REDIS_NOTICE
,"The server is now ready to accept connections on port %d", server
.port
);
4855 aeDeleteEventLoop(server
.el
);