2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
30 #define REDIS_VERSION "1.001"
40 #define __USE_POSIX199309
46 #endif /* HAVE_BACKTRACE */
54 #include <arpa/inet.h>
58 #include <sys/resource.h>
62 #include "ae.h" /* Event driven programming library */
63 #include "sds.h" /* Dynamic safe strings */
64 #include "anet.h" /* Networking the easy way */
65 #include "dict.h" /* Hash tables */
66 #include "adlist.h" /* Linked lists */
67 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
68 #include "lzf.h" /* LZF compression library */
69 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
75 /* Static server configuration */
76 #define REDIS_SERVERPORT 6379 /* TCP port */
77 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
78 #define REDIS_IOBUF_LEN 1024
79 #define REDIS_LOADBUF_LEN 1024
80 #define REDIS_STATIC_ARGS 4
81 #define REDIS_DEFAULT_DBNUM 16
82 #define REDIS_CONFIGLINE_MAX 1024
83 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
84 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
85 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
86 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
87 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
89 /* Hash table parameters */
90 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
93 #define REDIS_CMD_BULK 1 /* Bulk write command */
94 #define REDIS_CMD_INLINE 2 /* Inline command */
95 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
96 this flags will return an error when the 'maxmemory' option is set in the
97 config file and the server is using more than maxmemory bytes of memory.
98 In short this commands are denied on low memory conditions. */
99 #define REDIS_CMD_DENYOOM 4
102 #define REDIS_STRING 0
107 /* Objects encoding */
108 #define REDIS_ENCODING_RAW 0 /* Raw representation */
109 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
111 /* Object types only used for dumping to disk */
112 #define REDIS_EXPIRETIME 253
113 #define REDIS_SELECTDB 254
114 #define REDIS_EOF 255
116 /* Defines related to the dump file format. To store 32 bits lengths for short
117 * keys requires a lot of space, so we check the most significant 2 bits of
118 * the first byte to interpreter the length:
120 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
121 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
122 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
123 * 11|000000 this means: specially encoded object will follow. The six bits
124 * number specify the kind of object that follows.
125 * See the REDIS_RDB_ENC_* defines.
127 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
128 * values, will fit inside. */
129 #define REDIS_RDB_6BITLEN 0
130 #define REDIS_RDB_14BITLEN 1
131 #define REDIS_RDB_32BITLEN 2
132 #define REDIS_RDB_ENCVAL 3
133 #define REDIS_RDB_LENERR UINT_MAX
135 /* When a length of a string object stored on disk has the first two bits
136 * set, the remaining two bits specify a special encoding for the object
137 * accordingly to the following defines: */
138 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
139 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
140 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
141 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
144 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
145 #define REDIS_SLAVE 2 /* This client is a slave server */
146 #define REDIS_MASTER 4 /* This client is a master server */
147 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
149 /* Slave replication state - slave side */
150 #define REDIS_REPL_NONE 0 /* No active replication */
151 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
152 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
154 /* Slave replication state - from the point of view of master
155 * Note that in SEND_BULK and ONLINE state the slave receives new updates
156 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
157 * to start the next background saving in order to send updates to it. */
158 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
159 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
160 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
161 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
163 /* List related stuff */
167 /* Sort operations */
168 #define REDIS_SORT_GET 0
169 #define REDIS_SORT_DEL 1
170 #define REDIS_SORT_INCR 2
171 #define REDIS_SORT_DECR 3
172 #define REDIS_SORT_ASC 4
173 #define REDIS_SORT_DESC 5
174 #define REDIS_SORTKEY_MAX 1024
177 #define REDIS_DEBUG 0
178 #define REDIS_NOTICE 1
179 #define REDIS_WARNING 2
181 /* Anti-warning macro... */
182 #define REDIS_NOTUSED(V) ((void) V)
185 /*================================= Data types ============================== */
187 /* A redis object, that is a type able to hold a string / list / set */
188 typedef struct redisObject
{
191 unsigned char encoding
;
192 unsigned char notused
[2];
196 typedef struct redisDb
{
202 /* With multiplexing we need to take per-clinet state.
203 * Clients are taken in a liked list. */
204 typedef struct redisClient
{
209 robj
**argv
, **mbargv
;
211 int bulklen
; /* bulk read len. -1 if not in bulk read mode */
212 int multibulk
; /* multi bulk command format active */
215 time_t lastinteraction
; /* time of the last interaction, used for timeout */
216 int flags
; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
217 int slaveseldb
; /* slave selected db, if this client is a slave */
218 int authenticated
; /* when requirepass is non-NULL */
219 int replstate
; /* replication state if this is a slave */
220 int repldbfd
; /* replication DB file descriptor */
221 long repldboff
; /* replication DB file offset */
222 off_t repldbsize
; /* replication DB file size */
230 /* Global server state structure */
236 unsigned int sharingpoolsize
;
237 long long dirty
; /* changes to DB from the last save */
239 list
*slaves
, *monitors
;
240 char neterr
[ANET_ERR_LEN
];
242 int cronloops
; /* number of times the cron function run */
243 list
*objfreelist
; /* A list of freed objects to avoid malloc() */
244 time_t lastsave
; /* Unix time of last save succeeede */
245 size_t usedmemory
; /* Used memory in megabytes */
246 /* Fields used only for stats */
247 time_t stat_starttime
; /* server start time */
248 long long stat_numcommands
; /* number of processed commands */
249 long long stat_numconnections
; /* number of connections received */
257 int bgsaveinprogress
;
258 pid_t bgsavechildpid
;
259 struct saveparam
*saveparams
;
266 /* Replication related */
270 redisClient
*master
; /* client that is master for this slave */
272 unsigned int maxclients
;
273 unsigned long maxmemory
;
274 /* Sort parameters - qsort_r() is only available under BSD so we
275 * have to take this state global, in order to pass it to sortCompare() */
281 typedef void redisCommandProc(redisClient
*c
);
282 struct redisCommand
{
284 redisCommandProc
*proc
;
289 struct redisFunctionSym
{
291 unsigned long pointer
;
294 typedef struct _redisSortObject
{
302 typedef struct _redisSortOperation
{
305 } redisSortOperation
;
307 struct sharedObjectsStruct
{
308 robj
*crlf
, *ok
, *err
, *emptybulk
, *czero
, *cone
, *pong
, *space
,
309 *colon
, *nullbulk
, *nullmultibulk
,
310 *emptymultibulk
, *wrongtypeerr
, *nokeyerr
, *syntaxerr
, *sameobjecterr
,
311 *outofrangeerr
, *plus
,
312 *select0
, *select1
, *select2
, *select3
, *select4
,
313 *select5
, *select6
, *select7
, *select8
, *select9
;
316 /*================================ Prototypes =============================== */
318 static void freeStringObject(robj
*o
);
319 static void freeListObject(robj
*o
);
320 static void freeSetObject(robj
*o
);
321 static void decrRefCount(void *o
);
322 static robj
*createObject(int type
, void *ptr
);
323 static void freeClient(redisClient
*c
);
324 static int rdbLoad(char *filename
);
325 static void addReply(redisClient
*c
, robj
*obj
);
326 static void addReplySds(redisClient
*c
, sds s
);
327 static void incrRefCount(robj
*o
);
328 static int rdbSaveBackground(char *filename
);
329 static robj
*createStringObject(char *ptr
, size_t len
);
330 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
);
331 static int syncWithMaster(void);
332 static robj
*tryObjectSharing(robj
*o
);
333 static int tryObjectEncoding(robj
*o
);
334 static robj
*getDecodedObject(const robj
*o
);
335 static int removeExpire(redisDb
*db
, robj
*key
);
336 static int expireIfNeeded(redisDb
*db
, robj
*key
);
337 static int deleteIfVolatile(redisDb
*db
, robj
*key
);
338 static int deleteKey(redisDb
*db
, robj
*key
);
339 static time_t getExpire(redisDb
*db
, robj
*key
);
340 static int setExpire(redisDb
*db
, robj
*key
, time_t when
);
341 static void updateSlavesWaitingBgsave(int bgsaveerr
);
342 static void freeMemoryIfNeeded(void);
343 static int processCommand(redisClient
*c
);
344 static void setupSigSegvAction(void);
345 static void rdbRemoveTempFile(pid_t childpid
);
346 static size_t stringObjectLen(robj
*o
);
347 static void processInputBuffer(redisClient
*c
);
349 static void authCommand(redisClient
*c
);
350 static void pingCommand(redisClient
*c
);
351 static void echoCommand(redisClient
*c
);
352 static void setCommand(redisClient
*c
);
353 static void setnxCommand(redisClient
*c
);
354 static void getCommand(redisClient
*c
);
355 static void delCommand(redisClient
*c
);
356 static void existsCommand(redisClient
*c
);
357 static void incrCommand(redisClient
*c
);
358 static void decrCommand(redisClient
*c
);
359 static void incrbyCommand(redisClient
*c
);
360 static void decrbyCommand(redisClient
*c
);
361 static void selectCommand(redisClient
*c
);
362 static void randomkeyCommand(redisClient
*c
);
363 static void keysCommand(redisClient
*c
);
364 static void dbsizeCommand(redisClient
*c
);
365 static void lastsaveCommand(redisClient
*c
);
366 static void saveCommand(redisClient
*c
);
367 static void bgsaveCommand(redisClient
*c
);
368 static void shutdownCommand(redisClient
*c
);
369 static void moveCommand(redisClient
*c
);
370 static void renameCommand(redisClient
*c
);
371 static void renamenxCommand(redisClient
*c
);
372 static void lpushCommand(redisClient
*c
);
373 static void rpushCommand(redisClient
*c
);
374 static void lpopCommand(redisClient
*c
);
375 static void rpopCommand(redisClient
*c
);
376 static void llenCommand(redisClient
*c
);
377 static void lindexCommand(redisClient
*c
);
378 static void lrangeCommand(redisClient
*c
);
379 static void ltrimCommand(redisClient
*c
);
380 static void typeCommand(redisClient
*c
);
381 static void lsetCommand(redisClient
*c
);
382 static void saddCommand(redisClient
*c
);
383 static void sremCommand(redisClient
*c
);
384 static void smoveCommand(redisClient
*c
);
385 static void sismemberCommand(redisClient
*c
);
386 static void scardCommand(redisClient
*c
);
387 static void spopCommand(redisClient
*c
);
388 static void sinterCommand(redisClient
*c
);
389 static void sinterstoreCommand(redisClient
*c
);
390 static void sunionCommand(redisClient
*c
);
391 static void sunionstoreCommand(redisClient
*c
);
392 static void sdiffCommand(redisClient
*c
);
393 static void sdiffstoreCommand(redisClient
*c
);
394 static void syncCommand(redisClient
*c
);
395 static void flushdbCommand(redisClient
*c
);
396 static void flushallCommand(redisClient
*c
);
397 static void sortCommand(redisClient
*c
);
398 static void lremCommand(redisClient
*c
);
399 static void infoCommand(redisClient
*c
);
400 static void mgetCommand(redisClient
*c
);
401 static void monitorCommand(redisClient
*c
);
402 static void expireCommand(redisClient
*c
);
403 static void getsetCommand(redisClient
*c
);
404 static void ttlCommand(redisClient
*c
);
405 static void slaveofCommand(redisClient
*c
);
406 static void debugCommand(redisClient
*c
);
407 static void msetCommand(redisClient
*c
);
408 static void msetnxCommand(redisClient
*c
);
410 /*================================= Globals ================================= */
413 static struct redisServer server
; /* server global state */
414 static struct redisCommand cmdTable
[] = {
415 {"get",getCommand
,2,REDIS_CMD_INLINE
},
416 {"set",setCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
417 {"setnx",setnxCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
418 {"del",delCommand
,-2,REDIS_CMD_INLINE
},
419 {"exists",existsCommand
,2,REDIS_CMD_INLINE
},
420 {"incr",incrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
421 {"decr",decrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
422 {"mget",mgetCommand
,-2,REDIS_CMD_INLINE
},
423 {"rpush",rpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
424 {"lpush",lpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
425 {"rpop",rpopCommand
,2,REDIS_CMD_INLINE
},
426 {"lpop",lpopCommand
,2,REDIS_CMD_INLINE
},
427 {"llen",llenCommand
,2,REDIS_CMD_INLINE
},
428 {"lindex",lindexCommand
,3,REDIS_CMD_INLINE
},
429 {"lset",lsetCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
430 {"lrange",lrangeCommand
,4,REDIS_CMD_INLINE
},
431 {"ltrim",ltrimCommand
,4,REDIS_CMD_INLINE
},
432 {"lrem",lremCommand
,4,REDIS_CMD_BULK
},
433 {"sadd",saddCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
434 {"srem",sremCommand
,3,REDIS_CMD_BULK
},
435 {"smove",smoveCommand
,4,REDIS_CMD_BULK
},
436 {"sismember",sismemberCommand
,3,REDIS_CMD_BULK
},
437 {"scard",scardCommand
,2,REDIS_CMD_INLINE
},
438 {"spop",spopCommand
,2,REDIS_CMD_INLINE
},
439 {"sinter",sinterCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
440 {"sinterstore",sinterstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
441 {"sunion",sunionCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
442 {"sunionstore",sunionstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
443 {"sdiff",sdiffCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
444 {"sdiffstore",sdiffstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
445 {"smembers",sinterCommand
,2,REDIS_CMD_INLINE
},
446 {"incrby",incrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
447 {"decrby",decrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
448 {"getset",getsetCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
449 {"mset",msetCommand
,-3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
450 {"msetnx",msetnxCommand
,-3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
451 {"randomkey",randomkeyCommand
,1,REDIS_CMD_INLINE
},
452 {"select",selectCommand
,2,REDIS_CMD_INLINE
},
453 {"move",moveCommand
,3,REDIS_CMD_INLINE
},
454 {"rename",renameCommand
,3,REDIS_CMD_INLINE
},
455 {"renamenx",renamenxCommand
,3,REDIS_CMD_INLINE
},
456 {"expire",expireCommand
,3,REDIS_CMD_INLINE
},
457 {"keys",keysCommand
,2,REDIS_CMD_INLINE
},
458 {"dbsize",dbsizeCommand
,1,REDIS_CMD_INLINE
},
459 {"auth",authCommand
,2,REDIS_CMD_INLINE
},
460 {"ping",pingCommand
,1,REDIS_CMD_INLINE
},
461 {"echo",echoCommand
,2,REDIS_CMD_BULK
},
462 {"save",saveCommand
,1,REDIS_CMD_INLINE
},
463 {"bgsave",bgsaveCommand
,1,REDIS_CMD_INLINE
},
464 {"shutdown",shutdownCommand
,1,REDIS_CMD_INLINE
},
465 {"lastsave",lastsaveCommand
,1,REDIS_CMD_INLINE
},
466 {"type",typeCommand
,2,REDIS_CMD_INLINE
},
467 {"sync",syncCommand
,1,REDIS_CMD_INLINE
},
468 {"flushdb",flushdbCommand
,1,REDIS_CMD_INLINE
},
469 {"flushall",flushallCommand
,1,REDIS_CMD_INLINE
},
470 {"sort",sortCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
471 {"info",infoCommand
,1,REDIS_CMD_INLINE
},
472 {"monitor",monitorCommand
,1,REDIS_CMD_INLINE
},
473 {"ttl",ttlCommand
,2,REDIS_CMD_INLINE
},
474 {"slaveof",slaveofCommand
,3,REDIS_CMD_INLINE
},
475 {"debug",debugCommand
,-2,REDIS_CMD_INLINE
},
478 /*============================ Utility functions ============================ */
480 /* Glob-style pattern matching. */
481 int stringmatchlen(const char *pattern
, int patternLen
,
482 const char *string
, int stringLen
, int nocase
)
487 while (pattern
[1] == '*') {
492 return 1; /* match */
494 if (stringmatchlen(pattern
+1, patternLen
-1,
495 string
, stringLen
, nocase
))
496 return 1; /* match */
500 return 0; /* no match */
504 return 0; /* no match */
514 not = pattern
[0] == '^';
521 if (pattern
[0] == '\\') {
524 if (pattern
[0] == string
[0])
526 } else if (pattern
[0] == ']') {
528 } else if (patternLen
== 0) {
532 } else if (pattern
[1] == '-' && patternLen
>= 3) {
533 int start
= pattern
[0];
534 int end
= pattern
[2];
542 start
= tolower(start
);
548 if (c
>= start
&& c
<= end
)
552 if (pattern
[0] == string
[0])
555 if (tolower((int)pattern
[0]) == tolower((int)string
[0]))
565 return 0; /* no match */
571 if (patternLen
>= 2) {
578 if (pattern
[0] != string
[0])
579 return 0; /* no match */
581 if (tolower((int)pattern
[0]) != tolower((int)string
[0]))
582 return 0; /* no match */
590 if (stringLen
== 0) {
591 while(*pattern
== '*') {
598 if (patternLen
== 0 && stringLen
== 0)
603 static void redisLog(int level
, const char *fmt
, ...) {
607 fp
= (server
.logfile
== NULL
) ? stdout
: fopen(server
.logfile
,"a");
611 if (level
>= server
.verbosity
) {
617 strftime(buf
,64,"%d %b %H:%M:%S",gmtime(&now
));
618 fprintf(fp
,"%s %c ",buf
,c
[level
]);
619 vfprintf(fp
, fmt
, ap
);
625 if (server
.logfile
) fclose(fp
);
628 /*====================== Hash table type implementation ==================== */
630 /* This is an hash table type that uses the SDS dynamic strings libary as
631 * keys and radis objects as values (objects can hold SDS strings,
634 static int sdsDictKeyCompare(void *privdata
, const void *key1
,
638 DICT_NOTUSED(privdata
);
640 l1
= sdslen((sds
)key1
);
641 l2
= sdslen((sds
)key2
);
642 if (l1
!= l2
) return 0;
643 return memcmp(key1
, key2
, l1
) == 0;
646 static void dictRedisObjectDestructor(void *privdata
, void *val
)
648 DICT_NOTUSED(privdata
);
653 static int dictObjKeyCompare(void *privdata
, const void *key1
,
656 const robj
*o1
= key1
, *o2
= key2
;
657 return sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
660 static unsigned int dictObjHash(const void *key
) {
662 return dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
665 static int dictEncObjKeyCompare(void *privdata
, const void *key1
,
668 const robj
*o1
= key1
, *o2
= key2
;
670 if (o1
->encoding
== REDIS_ENCODING_RAW
&&
671 o2
->encoding
== REDIS_ENCODING_RAW
)
672 return sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
677 dec1
= o1
->encoding
!= REDIS_ENCODING_RAW
?
678 getDecodedObject(o1
) : (robj
*)o1
;
679 dec2
= o2
->encoding
!= REDIS_ENCODING_RAW
?
680 getDecodedObject(o2
) : (robj
*)o2
;
681 cmp
= sdsDictKeyCompare(privdata
,dec1
->ptr
,dec2
->ptr
);
682 if (dec1
!= o1
) decrRefCount(dec1
);
683 if (dec2
!= o2
) decrRefCount(dec2
);
688 static unsigned int dictEncObjHash(const void *key
) {
691 if (o
->encoding
== REDIS_ENCODING_RAW
)
692 return dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
694 robj
*dec
= getDecodedObject(o
);
695 unsigned int hash
= dictGenHashFunction(dec
->ptr
, sdslen((sds
)dec
->ptr
));
701 static dictType setDictType
= {
702 dictEncObjHash
, /* hash function */
705 dictEncObjKeyCompare
, /* key compare */
706 dictRedisObjectDestructor
, /* key destructor */
707 NULL
/* val destructor */
710 static dictType hashDictType
= {
711 dictObjHash
, /* hash function */
714 dictObjKeyCompare
, /* key compare */
715 dictRedisObjectDestructor
, /* key destructor */
716 dictRedisObjectDestructor
/* val destructor */
719 /* ========================= Random utility functions ======================= */
721 /* Redis generally does not try to recover from out of memory conditions
722 * when allocating objects or strings, it is not clear if it will be possible
723 * to report this condition to the client since the networking layer itself
724 * is based on heap allocation for send buffers, so we simply abort.
725 * At least the code will be simpler to read... */
726 static void oom(const char *msg
) {
727 fprintf(stderr
, "%s: Out of memory\n",msg
);
733 /* ====================== Redis server networking stuff ===================== */
734 static void closeTimedoutClients(void) {
737 time_t now
= time(NULL
);
739 listRewind(server
.clients
);
740 while ((ln
= listYield(server
.clients
)) != NULL
) {
741 c
= listNodeValue(ln
);
742 if (!(c
->flags
& REDIS_SLAVE
) && /* no timeout for slaves */
743 !(c
->flags
& REDIS_MASTER
) && /* no timeout for masters */
744 (now
- c
->lastinteraction
> server
.maxidletime
)) {
745 redisLog(REDIS_DEBUG
,"Closing idle client");
751 static int htNeedsResize(dict
*dict
) {
752 long long size
, used
;
754 size
= dictSlots(dict
);
755 used
= dictSize(dict
);
756 return (size
&& used
&& size
> DICT_HT_INITIAL_SIZE
&&
757 (used
*100/size
< REDIS_HT_MINFILL
));
760 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
761 * we resize the hash table to save memory */
762 static void tryResizeHashTables(void) {
765 for (j
= 0; j
< server
.dbnum
; j
++) {
766 if (htNeedsResize(server
.db
[j
].dict
)) {
767 redisLog(REDIS_DEBUG
,"The hash table %d is too sparse, resize it...",j
);
768 dictResize(server
.db
[j
].dict
);
769 redisLog(REDIS_DEBUG
,"Hash table %d resized.",j
);
771 if (htNeedsResize(server
.db
[j
].expires
))
772 dictResize(server
.db
[j
].expires
);
776 static int serverCron(struct aeEventLoop
*eventLoop
, long long id
, void *clientData
) {
777 int j
, loops
= server
.cronloops
++;
778 REDIS_NOTUSED(eventLoop
);
780 REDIS_NOTUSED(clientData
);
782 /* Update the global state with the amount of used memory */
783 server
.usedmemory
= zmalloc_used_memory();
785 /* Show some info about non-empty databases */
786 for (j
= 0; j
< server
.dbnum
; j
++) {
787 long long size
, used
, vkeys
;
789 size
= dictSlots(server
.db
[j
].dict
);
790 used
= dictSize(server
.db
[j
].dict
);
791 vkeys
= dictSize(server
.db
[j
].expires
);
792 if (!(loops
% 5) && (used
|| vkeys
)) {
793 redisLog(REDIS_DEBUG
,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j
,used
,vkeys
,size
);
794 /* dictPrintStats(server.dict); */
798 /* We don't want to resize the hash tables while a bacground saving
799 * is in progress: the saving child is created using fork() that is
800 * implemented with a copy-on-write semantic in most modern systems, so
801 * if we resize the HT while there is the saving child at work actually
802 * a lot of memory movements in the parent will cause a lot of pages
804 if (!server
.bgsaveinprogress
) tryResizeHashTables();
806 /* Show information about connected clients */
808 redisLog(REDIS_DEBUG
,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
809 listLength(server
.clients
)-listLength(server
.slaves
),
810 listLength(server
.slaves
),
812 dictSize(server
.sharingpool
));
815 /* Close connections of timedout clients */
816 if (server
.maxidletime
&& !(loops
% 10))
817 closeTimedoutClients();
819 /* Check if a background saving in progress terminated */
820 if (server
.bgsaveinprogress
) {
822 if (wait4(-1,&statloc
,WNOHANG
,NULL
)) {
823 int exitcode
= WEXITSTATUS(statloc
);
824 int bysignal
= WIFSIGNALED(statloc
);
826 if (!bysignal
&& exitcode
== 0) {
827 redisLog(REDIS_NOTICE
,
828 "Background saving terminated with success");
830 server
.lastsave
= time(NULL
);
831 } else if (!bysignal
&& exitcode
!= 0) {
832 redisLog(REDIS_WARNING
, "Background saving error");
834 redisLog(REDIS_WARNING
,
835 "Background saving terminated by signal");
836 rdbRemoveTempFile(server
.bgsavechildpid
);
838 server
.bgsaveinprogress
= 0;
839 server
.bgsavechildpid
= -1;
840 updateSlavesWaitingBgsave(exitcode
== 0 ? REDIS_OK
: REDIS_ERR
);
843 /* If there is not a background saving in progress check if
844 * we have to save now */
845 time_t now
= time(NULL
);
846 for (j
= 0; j
< server
.saveparamslen
; j
++) {
847 struct saveparam
*sp
= server
.saveparams
+j
;
849 if (server
.dirty
>= sp
->changes
&&
850 now
-server
.lastsave
> sp
->seconds
) {
851 redisLog(REDIS_NOTICE
,"%d changes in %d seconds. Saving...",
852 sp
->changes
, sp
->seconds
);
853 rdbSaveBackground(server
.dbfilename
);
859 /* Try to expire a few timed out keys */
860 for (j
= 0; j
< server
.dbnum
; j
++) {
861 redisDb
*db
= server
.db
+j
;
862 int num
= dictSize(db
->expires
);
865 time_t now
= time(NULL
);
867 if (num
> REDIS_EXPIRELOOKUPS_PER_CRON
)
868 num
= REDIS_EXPIRELOOKUPS_PER_CRON
;
873 if ((de
= dictGetRandomKey(db
->expires
)) == NULL
) break;
874 t
= (time_t) dictGetEntryVal(de
);
876 deleteKey(db
,dictGetEntryKey(de
));
882 /* Check if we should connect to a MASTER */
883 if (server
.replstate
== REDIS_REPL_CONNECT
) {
884 redisLog(REDIS_NOTICE
,"Connecting to MASTER...");
885 if (syncWithMaster() == REDIS_OK
) {
886 redisLog(REDIS_NOTICE
,"MASTER <-> SLAVE sync succeeded");
892 static void createSharedObjects(void) {
893 shared
.crlf
= createObject(REDIS_STRING
,sdsnew("\r\n"));
894 shared
.ok
= createObject(REDIS_STRING
,sdsnew("+OK\r\n"));
895 shared
.err
= createObject(REDIS_STRING
,sdsnew("-ERR\r\n"));
896 shared
.emptybulk
= createObject(REDIS_STRING
,sdsnew("$0\r\n\r\n"));
897 shared
.czero
= createObject(REDIS_STRING
,sdsnew(":0\r\n"));
898 shared
.cone
= createObject(REDIS_STRING
,sdsnew(":1\r\n"));
899 shared
.nullbulk
= createObject(REDIS_STRING
,sdsnew("$-1\r\n"));
900 shared
.nullmultibulk
= createObject(REDIS_STRING
,sdsnew("*-1\r\n"));
901 shared
.emptymultibulk
= createObject(REDIS_STRING
,sdsnew("*0\r\n"));
903 shared
.pong
= createObject(REDIS_STRING
,sdsnew("+PONG\r\n"));
904 shared
.wrongtypeerr
= createObject(REDIS_STRING
,sdsnew(
905 "-ERR Operation against a key holding the wrong kind of value\r\n"));
906 shared
.nokeyerr
= createObject(REDIS_STRING
,sdsnew(
907 "-ERR no such key\r\n"));
908 shared
.syntaxerr
= createObject(REDIS_STRING
,sdsnew(
909 "-ERR syntax error\r\n"));
910 shared
.sameobjecterr
= createObject(REDIS_STRING
,sdsnew(
911 "-ERR source and destination objects are the same\r\n"));
912 shared
.outofrangeerr
= createObject(REDIS_STRING
,sdsnew(
913 "-ERR index out of range\r\n"));
914 shared
.space
= createObject(REDIS_STRING
,sdsnew(" "));
915 shared
.colon
= createObject(REDIS_STRING
,sdsnew(":"));
916 shared
.plus
= createObject(REDIS_STRING
,sdsnew("+"));
917 shared
.select0
= createStringObject("select 0\r\n",10);
918 shared
.select1
= createStringObject("select 1\r\n",10);
919 shared
.select2
= createStringObject("select 2\r\n",10);
920 shared
.select3
= createStringObject("select 3\r\n",10);
921 shared
.select4
= createStringObject("select 4\r\n",10);
922 shared
.select5
= createStringObject("select 5\r\n",10);
923 shared
.select6
= createStringObject("select 6\r\n",10);
924 shared
.select7
= createStringObject("select 7\r\n",10);
925 shared
.select8
= createStringObject("select 8\r\n",10);
926 shared
.select9
= createStringObject("select 9\r\n",10);
929 static void appendServerSaveParams(time_t seconds
, int changes
) {
930 server
.saveparams
= zrealloc(server
.saveparams
,sizeof(struct saveparam
)*(server
.saveparamslen
+1));
931 if (server
.saveparams
== NULL
) oom("appendServerSaveParams");
932 server
.saveparams
[server
.saveparamslen
].seconds
= seconds
;
933 server
.saveparams
[server
.saveparamslen
].changes
= changes
;
934 server
.saveparamslen
++;
937 static void ResetServerSaveParams() {
938 zfree(server
.saveparams
);
939 server
.saveparams
= NULL
;
940 server
.saveparamslen
= 0;
943 static void initServerConfig() {
944 server
.dbnum
= REDIS_DEFAULT_DBNUM
;
945 server
.port
= REDIS_SERVERPORT
;
946 server
.verbosity
= REDIS_DEBUG
;
947 server
.maxidletime
= REDIS_MAXIDLETIME
;
948 server
.saveparams
= NULL
;
949 server
.logfile
= NULL
; /* NULL = log on standard output */
950 server
.bindaddr
= NULL
;
951 server
.glueoutputbuf
= 1;
952 server
.daemonize
= 0;
953 server
.pidfile
= "/var/run/redis.pid";
954 server
.dbfilename
= "dump.rdb";
955 server
.requirepass
= NULL
;
956 server
.shareobjects
= 0;
957 server
.sharingpoolsize
= 1024;
958 server
.maxclients
= 0;
959 server
.maxmemory
= 0;
960 ResetServerSaveParams();
962 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
963 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
964 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
965 /* Replication related */
967 server
.masterhost
= NULL
;
968 server
.masterport
= 6379;
969 server
.master
= NULL
;
970 server
.replstate
= REDIS_REPL_NONE
;
973 static void initServer() {
976 signal(SIGHUP
, SIG_IGN
);
977 signal(SIGPIPE
, SIG_IGN
);
978 setupSigSegvAction();
980 server
.clients
= listCreate();
981 server
.slaves
= listCreate();
982 server
.monitors
= listCreate();
983 server
.objfreelist
= listCreate();
984 createSharedObjects();
985 server
.el
= aeCreateEventLoop();
986 server
.db
= zmalloc(sizeof(redisDb
)*server
.dbnum
);
987 server
.sharingpool
= dictCreate(&setDictType
,NULL
);
988 if (!server
.db
|| !server
.clients
|| !server
.slaves
|| !server
.monitors
|| !server
.el
|| !server
.objfreelist
)
989 oom("server initialization"); /* Fatal OOM */
990 server
.fd
= anetTcpServer(server
.neterr
, server
.port
, server
.bindaddr
);
991 if (server
.fd
== -1) {
992 redisLog(REDIS_WARNING
, "Opening TCP port: %s", server
.neterr
);
995 for (j
= 0; j
< server
.dbnum
; j
++) {
996 server
.db
[j
].dict
= dictCreate(&hashDictType
,NULL
);
997 server
.db
[j
].expires
= dictCreate(&setDictType
,NULL
);
1000 server
.cronloops
= 0;
1001 server
.bgsaveinprogress
= 0;
1002 server
.bgsavechildpid
= -1;
1003 server
.lastsave
= time(NULL
);
1005 server
.usedmemory
= 0;
1006 server
.stat_numcommands
= 0;
1007 server
.stat_numconnections
= 0;
1008 server
.stat_starttime
= time(NULL
);
1009 aeCreateTimeEvent(server
.el
, 1000, serverCron
, NULL
, NULL
);
1012 /* Empty the whole database */
1013 static long long emptyDb() {
1015 long long removed
= 0;
1017 for (j
= 0; j
< server
.dbnum
; j
++) {
1018 removed
+= dictSize(server
.db
[j
].dict
);
1019 dictEmpty(server
.db
[j
].dict
);
1020 dictEmpty(server
.db
[j
].expires
);
1025 static int yesnotoi(char *s
) {
1026 if (!strcasecmp(s
,"yes")) return 1;
1027 else if (!strcasecmp(s
,"no")) return 0;
1031 /* I agree, this is a very rudimental way to load a configuration...
1032 will improve later if the config gets more complex */
1033 static void loadServerConfig(char *filename
) {
1035 char buf
[REDIS_CONFIGLINE_MAX
+1], *err
= NULL
;
1039 if (filename
[0] == '-' && filename
[1] == '\0')
1042 if ((fp
= fopen(filename
,"r")) == NULL
) {
1043 redisLog(REDIS_WARNING
,"Fatal error, can't open config file");
1048 while(fgets(buf
,REDIS_CONFIGLINE_MAX
+1,fp
) != NULL
) {
1054 line
= sdstrim(line
," \t\r\n");
1056 /* Skip comments and blank lines*/
1057 if (line
[0] == '#' || line
[0] == '\0') {
1062 /* Split into arguments */
1063 argv
= sdssplitlen(line
,sdslen(line
)," ",1,&argc
);
1064 sdstolower(argv
[0]);
1066 /* Execute config directives */
1067 if (!strcasecmp(argv
[0],"timeout") && argc
== 2) {
1068 server
.maxidletime
= atoi(argv
[1]);
1069 if (server
.maxidletime
< 0) {
1070 err
= "Invalid timeout value"; goto loaderr
;
1072 } else if (!strcasecmp(argv
[0],"port") && argc
== 2) {
1073 server
.port
= atoi(argv
[1]);
1074 if (server
.port
< 1 || server
.port
> 65535) {
1075 err
= "Invalid port"; goto loaderr
;
1077 } else if (!strcasecmp(argv
[0],"bind") && argc
== 2) {
1078 server
.bindaddr
= zstrdup(argv
[1]);
1079 } else if (!strcasecmp(argv
[0],"save") && argc
== 3) {
1080 int seconds
= atoi(argv
[1]);
1081 int changes
= atoi(argv
[2]);
1082 if (seconds
< 1 || changes
< 0) {
1083 err
= "Invalid save parameters"; goto loaderr
;
1085 appendServerSaveParams(seconds
,changes
);
1086 } else if (!strcasecmp(argv
[0],"dir") && argc
== 2) {
1087 if (chdir(argv
[1]) == -1) {
1088 redisLog(REDIS_WARNING
,"Can't chdir to '%s': %s",
1089 argv
[1], strerror(errno
));
1092 } else if (!strcasecmp(argv
[0],"loglevel") && argc
== 2) {
1093 if (!strcasecmp(argv
[1],"debug")) server
.verbosity
= REDIS_DEBUG
;
1094 else if (!strcasecmp(argv
[1],"notice")) server
.verbosity
= REDIS_NOTICE
;
1095 else if (!strcasecmp(argv
[1],"warning")) server
.verbosity
= REDIS_WARNING
;
1097 err
= "Invalid log level. Must be one of debug, notice, warning";
1100 } else if (!strcasecmp(argv
[0],"logfile") && argc
== 2) {
1103 server
.logfile
= zstrdup(argv
[1]);
1104 if (!strcasecmp(server
.logfile
,"stdout")) {
1105 zfree(server
.logfile
);
1106 server
.logfile
= NULL
;
1108 if (server
.logfile
) {
1109 /* Test if we are able to open the file. The server will not
1110 * be able to abort just for this problem later... */
1111 logfp
= fopen(server
.logfile
,"a");
1112 if (logfp
== NULL
) {
1113 err
= sdscatprintf(sdsempty(),
1114 "Can't open the log file: %s", strerror(errno
));
1119 } else if (!strcasecmp(argv
[0],"databases") && argc
== 2) {
1120 server
.dbnum
= atoi(argv
[1]);
1121 if (server
.dbnum
< 1) {
1122 err
= "Invalid number of databases"; goto loaderr
;
1124 } else if (!strcasecmp(argv
[0],"maxclients") && argc
== 2) {
1125 server
.maxclients
= atoi(argv
[1]);
1126 } else if (!strcasecmp(argv
[0],"maxmemory") && argc
== 2) {
1127 server
.maxmemory
= strtoll(argv
[1], NULL
, 10);
1128 } else if (!strcasecmp(argv
[0],"slaveof") && argc
== 3) {
1129 server
.masterhost
= sdsnew(argv
[1]);
1130 server
.masterport
= atoi(argv
[2]);
1131 server
.replstate
= REDIS_REPL_CONNECT
;
1132 } else if (!strcasecmp(argv
[0],"glueoutputbuf") && argc
== 2) {
1133 if ((server
.glueoutputbuf
= yesnotoi(argv
[1])) == -1) {
1134 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1136 } else if (!strcasecmp(argv
[0],"shareobjects") && argc
== 2) {
1137 if ((server
.shareobjects
= yesnotoi(argv
[1])) == -1) {
1138 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1140 } else if (!strcasecmp(argv
[0],"shareobjectspoolsize") && argc
== 2) {
1141 server
.sharingpoolsize
= atoi(argv
[1]);
1142 if (server
.sharingpoolsize
< 1) {
1143 err
= "invalid object sharing pool size"; goto loaderr
;
1145 } else if (!strcasecmp(argv
[0],"daemonize") && argc
== 2) {
1146 if ((server
.daemonize
= yesnotoi(argv
[1])) == -1) {
1147 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1149 } else if (!strcasecmp(argv
[0],"requirepass") && argc
== 2) {
1150 server
.requirepass
= zstrdup(argv
[1]);
1151 } else if (!strcasecmp(argv
[0],"pidfile") && argc
== 2) {
1152 server
.pidfile
= zstrdup(argv
[1]);
1153 } else if (!strcasecmp(argv
[0],"dbfilename") && argc
== 2) {
1154 server
.dbfilename
= zstrdup(argv
[1]);
1156 err
= "Bad directive or wrong number of arguments"; goto loaderr
;
1158 for (j
= 0; j
< argc
; j
++)
1163 if (fp
!= stdin
) fclose(fp
);
1167 fprintf(stderr
, "\n*** FATAL CONFIG FILE ERROR ***\n");
1168 fprintf(stderr
, "Reading the configuration file, at line %d\n", linenum
);
1169 fprintf(stderr
, ">>> '%s'\n", line
);
1170 fprintf(stderr
, "%s\n", err
);
1174 static void freeClientArgv(redisClient
*c
) {
1177 for (j
= 0; j
< c
->argc
; j
++)
1178 decrRefCount(c
->argv
[j
]);
1179 for (j
= 0; j
< c
->mbargc
; j
++)
1180 decrRefCount(c
->mbargv
[j
]);
1185 static void freeClient(redisClient
*c
) {
1188 aeDeleteFileEvent(server
.el
,c
->fd
,AE_READABLE
);
1189 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1190 sdsfree(c
->querybuf
);
1191 listRelease(c
->reply
);
1194 ln
= listSearchKey(server
.clients
,c
);
1196 listDelNode(server
.clients
,ln
);
1197 if (c
->flags
& REDIS_SLAVE
) {
1198 if (c
->replstate
== REDIS_REPL_SEND_BULK
&& c
->repldbfd
!= -1)
1200 list
*l
= (c
->flags
& REDIS_MONITOR
) ? server
.monitors
: server
.slaves
;
1201 ln
= listSearchKey(l
,c
);
1205 if (c
->flags
& REDIS_MASTER
) {
1206 server
.master
= NULL
;
1207 server
.replstate
= REDIS_REPL_CONNECT
;
1214 static void glueReplyBuffersIfNeeded(redisClient
*c
) {
1219 listRewind(c
->reply
);
1220 while((ln
= listYield(c
->reply
))) {
1222 totlen
+= sdslen(o
->ptr
);
1223 /* This optimization makes more sense if we don't have to copy
1225 if (totlen
> 1024) return;
1231 listRewind(c
->reply
);
1232 while((ln
= listYield(c
->reply
))) {
1234 memcpy(buf
+copylen
,o
->ptr
,sdslen(o
->ptr
));
1235 copylen
+= sdslen(o
->ptr
);
1236 listDelNode(c
->reply
,ln
);
1238 /* Now the output buffer is empty, add the new single element */
1239 o
= createObject(REDIS_STRING
,sdsnewlen(buf
,totlen
));
1240 if (!listAddNodeTail(c
->reply
,o
)) oom("listAddNodeTail");
1244 static void sendReplyToClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1245 redisClient
*c
= privdata
;
1246 int nwritten
= 0, totwritten
= 0, objlen
;
1249 REDIS_NOTUSED(mask
);
1251 if (server
.glueoutputbuf
&& listLength(c
->reply
) > 1)
1252 glueReplyBuffersIfNeeded(c
);
1253 while(listLength(c
->reply
)) {
1254 o
= listNodeValue(listFirst(c
->reply
));
1255 objlen
= sdslen(o
->ptr
);
1258 listDelNode(c
->reply
,listFirst(c
->reply
));
1262 if (c
->flags
& REDIS_MASTER
) {
1263 /* Don't reply to a master */
1264 nwritten
= objlen
- c
->sentlen
;
1266 nwritten
= write(fd
, ((char*)o
->ptr
)+c
->sentlen
, objlen
- c
->sentlen
);
1267 if (nwritten
<= 0) break;
1269 c
->sentlen
+= nwritten
;
1270 totwritten
+= nwritten
;
1271 /* If we fully sent the object on head go to the next one */
1272 if (c
->sentlen
== objlen
) {
1273 listDelNode(c
->reply
,listFirst(c
->reply
));
1276 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1277 * bytes, in a single threaded server it's a good idea to server
1278 * other clients as well, even if a very large request comes from
1279 * super fast link that is always able to accept data (in real world
1280 * terms think to 'KEYS *' against the loopback interfae) */
1281 if (totwritten
> REDIS_MAX_WRITE_PER_EVENT
) break;
1283 if (nwritten
== -1) {
1284 if (errno
== EAGAIN
) {
1287 redisLog(REDIS_DEBUG
,
1288 "Error writing to client: %s", strerror(errno
));
1293 if (totwritten
> 0) c
->lastinteraction
= time(NULL
);
1294 if (listLength(c
->reply
) == 0) {
1296 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1300 static struct redisCommand
*lookupCommand(char *name
) {
1302 while(cmdTable
[j
].name
!= NULL
) {
1303 if (!strcasecmp(name
,cmdTable
[j
].name
)) return &cmdTable
[j
];
1309 /* resetClient prepare the client to process the next command */
1310 static void resetClient(redisClient
*c
) {
1316 /* If this function gets called we already read a whole
1317 * command, argments are in the client argv/argc fields.
1318 * processCommand() execute the command or prepare the
1319 * server for a bulk read from the client.
1321 * If 1 is returned the client is still alive and valid and
1322 * and other operations can be performed by the caller. Otherwise
1323 * if 0 is returned the client was destroied (i.e. after QUIT). */
1324 static int processCommand(redisClient
*c
) {
1325 struct redisCommand
*cmd
;
1328 /* Free some memory if needed (maxmemory setting) */
1329 if (server
.maxmemory
) freeMemoryIfNeeded();
1331 /* Handle the multi bulk command type. This is an alternative protocol
1332 * supported by Redis in order to receive commands that are composed of
1333 * multiple binary-safe "bulk" arguments. The latency of processing is
1334 * a bit higher but this allows things like multi-sets, so if this
1335 * protocol is used only for MSET and similar commands this is a big win. */
1336 if (c
->multibulk
== 0 && c
->argc
== 1 && ((char*)(c
->argv
[0]->ptr
))[0] == '*') {
1337 c
->multibulk
= atoi(((char*)c
->argv
[0]->ptr
)+1);
1338 if (c
->multibulk
<= 0) {
1342 decrRefCount(c
->argv
[c
->argc
-1]);
1346 } else if (c
->multibulk
) {
1347 if (c
->bulklen
== -1) {
1348 if (((char*)c
->argv
[0]->ptr
)[0] != '$') {
1349 addReplySds(c
,sdsnew("-ERR multi bulk protocol error\r\n"));
1353 int bulklen
= atoi(((char*)c
->argv
[0]->ptr
)+1);
1354 decrRefCount(c
->argv
[0]);
1355 if (bulklen
< 0 || bulklen
> 1024*1024*1024) {
1357 addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n"));
1362 c
->bulklen
= bulklen
+2; /* add two bytes for CR+LF */
1364 if (sdslen(c->querybuf) > 0)
1365 processInputBuffer(c);
1370 c
->mbargv
= zrealloc(c
->mbargv
,(sizeof(robj
*))*(c
->mbargc
+1));
1371 c
->mbargv
[c
->mbargc
] = c
->argv
[0];
1375 if (c
->multibulk
== 0) {
1379 /* Here we need to swap the multi-bulk argc/argv with the
1380 * normal argc/argv of the client structure. */
1382 c
->argv
= c
->mbargv
;
1383 c
->mbargv
= auxargv
;
1386 c
->argc
= c
->mbargc
;
1387 c
->mbargc
= auxargc
;
1389 /* We need to set bulklen to something different than -1
1390 * in order for the code below to process the command without
1391 * to try to read the last argument of a bulk command as
1392 * a special argument. */
1394 /* continue below and process the command */
1398 if (sdslen(c->querybuf) > 0)
1399 processInputBuffer(c);
1405 /* -- end of multi bulk commands processing -- */
1407 /* The QUIT command is handled as a special case. Normal command
1408 * procs are unable to close the client connection safely */
1409 if (!strcasecmp(c
->argv
[0]->ptr
,"quit")) {
1413 cmd
= lookupCommand(c
->argv
[0]->ptr
);
1415 addReplySds(c
,sdsnew("-ERR unknown command\r\n"));
1418 } else if ((cmd
->arity
> 0 && cmd
->arity
!= c
->argc
) ||
1419 (c
->argc
< -cmd
->arity
)) {
1420 addReplySds(c
,sdsnew("-ERR wrong number of arguments\r\n"));
1423 } else if (server
.maxmemory
&& cmd
->flags
& REDIS_CMD_DENYOOM
&& zmalloc_used_memory() > server
.maxmemory
) {
1424 addReplySds(c
,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1427 } else if (cmd
->flags
& REDIS_CMD_BULK
&& c
->bulklen
== -1) {
1428 int bulklen
= atoi(c
->argv
[c
->argc
-1]->ptr
);
1430 decrRefCount(c
->argv
[c
->argc
-1]);
1431 if (bulklen
< 0 || bulklen
> 1024*1024*1024) {
1433 addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n"));
1438 c
->bulklen
= bulklen
+2; /* add two bytes for CR+LF */
1439 /* It is possible that the bulk read is already in the
1440 * buffer. Check this condition and handle it accordingly */
1441 if ((signed)sdslen(c
->querybuf
) >= c
->bulklen
) {
1442 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
1444 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
1449 /* Let's try to share objects on the command arguments vector */
1450 if (server
.shareobjects
) {
1452 for(j
= 1; j
< c
->argc
; j
++)
1453 c
->argv
[j
] = tryObjectSharing(c
->argv
[j
]);
1455 /* Let's try to encode the bulk object to save space. */
1456 if (cmd
->flags
& REDIS_CMD_BULK
)
1457 tryObjectEncoding(c
->argv
[c
->argc
-1]);
1459 /* Check if the user is authenticated */
1460 if (server
.requirepass
&& !c
->authenticated
&& cmd
->proc
!= authCommand
) {
1461 addReplySds(c
,sdsnew("-ERR operation not permitted\r\n"));
1466 /* Exec the command */
1467 dirty
= server
.dirty
;
1469 if (server
.dirty
-dirty
!= 0 && listLength(server
.slaves
))
1470 replicationFeedSlaves(server
.slaves
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1471 if (listLength(server
.monitors
))
1472 replicationFeedSlaves(server
.monitors
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1473 server
.stat_numcommands
++;
1475 /* Prepare the client for the next command */
1476 if (c
->flags
& REDIS_CLOSE
) {
1484 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
) {
1488 /* (args*2)+1 is enough room for args, spaces, newlines */
1489 robj
*static_outv
[REDIS_STATIC_ARGS
*2+1];
1491 if (argc
<= REDIS_STATIC_ARGS
) {
1494 outv
= zmalloc(sizeof(robj
*)*(argc
*2+1));
1495 if (!outv
) oom("replicationFeedSlaves");
1498 for (j
= 0; j
< argc
; j
++) {
1499 if (j
!= 0) outv
[outc
++] = shared
.space
;
1500 if ((cmd
->flags
& REDIS_CMD_BULK
) && j
== argc
-1) {
1503 lenobj
= createObject(REDIS_STRING
,
1504 sdscatprintf(sdsempty(),"%d\r\n",
1505 stringObjectLen(argv
[j
])));
1506 lenobj
->refcount
= 0;
1507 outv
[outc
++] = lenobj
;
1509 outv
[outc
++] = argv
[j
];
1511 outv
[outc
++] = shared
.crlf
;
1513 /* Increment all the refcounts at start and decrement at end in order to
1514 * be sure to free objects if there is no slave in a replication state
1515 * able to be feed with commands */
1516 for (j
= 0; j
< outc
; j
++) incrRefCount(outv
[j
]);
1518 while((ln
= listYield(slaves
))) {
1519 redisClient
*slave
= ln
->value
;
1521 /* Don't feed slaves that are still waiting for BGSAVE to start */
1522 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) continue;
1524 /* Feed all the other slaves, MONITORs and so on */
1525 if (slave
->slaveseldb
!= dictid
) {
1529 case 0: selectcmd
= shared
.select0
; break;
1530 case 1: selectcmd
= shared
.select1
; break;
1531 case 2: selectcmd
= shared
.select2
; break;
1532 case 3: selectcmd
= shared
.select3
; break;
1533 case 4: selectcmd
= shared
.select4
; break;
1534 case 5: selectcmd
= shared
.select5
; break;
1535 case 6: selectcmd
= shared
.select6
; break;
1536 case 7: selectcmd
= shared
.select7
; break;
1537 case 8: selectcmd
= shared
.select8
; break;
1538 case 9: selectcmd
= shared
.select9
; break;
1540 selectcmd
= createObject(REDIS_STRING
,
1541 sdscatprintf(sdsempty(),"select %d\r\n",dictid
));
1542 selectcmd
->refcount
= 0;
1545 addReply(slave
,selectcmd
);
1546 slave
->slaveseldb
= dictid
;
1548 for (j
= 0; j
< outc
; j
++) addReply(slave
,outv
[j
]);
1550 for (j
= 0; j
< outc
; j
++) decrRefCount(outv
[j
]);
1551 if (outv
!= static_outv
) zfree(outv
);
1554 static void processInputBuffer(redisClient
*c
) {
1556 if (c
->bulklen
== -1) {
1557 /* Read the first line of the query */
1558 char *p
= strchr(c
->querybuf
,'\n');
1565 query
= c
->querybuf
;
1566 c
->querybuf
= sdsempty();
1567 querylen
= 1+(p
-(query
));
1568 if (sdslen(query
) > querylen
) {
1569 /* leave data after the first line of the query in the buffer */
1570 c
->querybuf
= sdscatlen(c
->querybuf
,query
+querylen
,sdslen(query
)-querylen
);
1572 *p
= '\0'; /* remove "\n" */
1573 if (*(p
-1) == '\r') *(p
-1) = '\0'; /* and "\r" if any */
1574 sdsupdatelen(query
);
1576 /* Now we can split the query in arguments */
1577 if (sdslen(query
) == 0) {
1578 /* Ignore empty query */
1582 argv
= sdssplitlen(query
,sdslen(query
)," ",1,&argc
);
1583 if (argv
== NULL
) oom("sdssplitlen");
1586 if (c
->argv
) zfree(c
->argv
);
1587 c
->argv
= zmalloc(sizeof(robj
*)*argc
);
1588 if (c
->argv
== NULL
) oom("allocating arguments list for client");
1590 for (j
= 0; j
< argc
; j
++) {
1591 if (sdslen(argv
[j
])) {
1592 c
->argv
[c
->argc
] = createObject(REDIS_STRING
,argv
[j
]);
1599 /* Execute the command. If the client is still valid
1600 * after processCommand() return and there is something
1601 * on the query buffer try to process the next command. */
1602 if (c
->argc
&& processCommand(c
) && sdslen(c
->querybuf
)) goto again
;
1604 } else if (sdslen(c
->querybuf
) >= REDIS_REQUEST_MAX_SIZE
) {
1605 redisLog(REDIS_DEBUG
, "Client protocol error");
1610 /* Bulk read handling. Note that if we are at this point
1611 the client already sent a command terminated with a newline,
1612 we are reading the bulk data that is actually the last
1613 argument of the command. */
1614 int qbl
= sdslen(c
->querybuf
);
1616 if (c
->bulklen
<= qbl
) {
1617 /* Copy everything but the final CRLF as final argument */
1618 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
1620 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
1621 /* Process the command. If the client is still valid after
1622 * the processing and there is more data in the buffer
1623 * try to parse it. */
1624 if (processCommand(c
) && sdslen(c
->querybuf
)) goto again
;
1630 static void readQueryFromClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1631 redisClient
*c
= (redisClient
*) privdata
;
1632 char buf
[REDIS_IOBUF_LEN
];
1635 REDIS_NOTUSED(mask
);
1637 nread
= read(fd
, buf
, REDIS_IOBUF_LEN
);
1639 if (errno
== EAGAIN
) {
1642 redisLog(REDIS_DEBUG
, "Reading from client: %s",strerror(errno
));
1646 } else if (nread
== 0) {
1647 redisLog(REDIS_DEBUG
, "Client closed connection");
1652 c
->querybuf
= sdscatlen(c
->querybuf
, buf
, nread
);
1653 c
->lastinteraction
= time(NULL
);
1657 processInputBuffer(c
);
1660 static int selectDb(redisClient
*c
, int id
) {
1661 if (id
< 0 || id
>= server
.dbnum
)
1663 c
->db
= &server
.db
[id
];
1667 static void *dupClientReplyValue(void *o
) {
1668 incrRefCount((robj
*)o
);
1672 static redisClient
*createClient(int fd
) {
1673 redisClient
*c
= zmalloc(sizeof(*c
));
1675 anetNonBlock(NULL
,fd
);
1676 anetTcpNoDelay(NULL
,fd
);
1677 if (!c
) return NULL
;
1680 c
->querybuf
= sdsempty();
1689 c
->lastinteraction
= time(NULL
);
1690 c
->authenticated
= 0;
1691 c
->replstate
= REDIS_REPL_NONE
;
1692 if ((c
->reply
= listCreate()) == NULL
) oom("listCreate");
1693 listSetFreeMethod(c
->reply
,decrRefCount
);
1694 listSetDupMethod(c
->reply
,dupClientReplyValue
);
1695 if (aeCreateFileEvent(server
.el
, c
->fd
, AE_READABLE
,
1696 readQueryFromClient
, c
, NULL
) == AE_ERR
) {
1700 if (!listAddNodeTail(server
.clients
,c
)) oom("listAddNodeTail");
1704 static void addReply(redisClient
*c
, robj
*obj
) {
1705 if (listLength(c
->reply
) == 0 &&
1706 (c
->replstate
== REDIS_REPL_NONE
||
1707 c
->replstate
== REDIS_REPL_ONLINE
) &&
1708 aeCreateFileEvent(server
.el
, c
->fd
, AE_WRITABLE
,
1709 sendReplyToClient
, c
, NULL
) == AE_ERR
) return;
1710 if (obj
->encoding
!= REDIS_ENCODING_RAW
) {
1711 obj
= getDecodedObject(obj
);
1715 if (!listAddNodeTail(c
->reply
,obj
)) oom("listAddNodeTail");
1718 static void addReplySds(redisClient
*c
, sds s
) {
1719 robj
*o
= createObject(REDIS_STRING
,s
);
1724 static void addReplyBulkLen(redisClient
*c
, robj
*obj
) {
1727 if (obj
->encoding
== REDIS_ENCODING_RAW
) {
1728 len
= sdslen(obj
->ptr
);
1730 long n
= (long)obj
->ptr
;
1737 while((n
= n
/10) != 0) {
1741 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",len
));
1744 static void acceptHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1749 REDIS_NOTUSED(mask
);
1750 REDIS_NOTUSED(privdata
);
1752 cfd
= anetAccept(server
.neterr
, fd
, cip
, &cport
);
1753 if (cfd
== AE_ERR
) {
1754 redisLog(REDIS_DEBUG
,"Accepting client connection: %s", server
.neterr
);
1757 redisLog(REDIS_DEBUG
,"Accepted %s:%d", cip
, cport
);
1758 if ((c
= createClient(cfd
)) == NULL
) {
1759 redisLog(REDIS_WARNING
,"Error allocating resoures for the client");
1760 close(cfd
); /* May be already closed, just ingore errors */
1763 /* If maxclient directive is set and this is one client more... close the
1764 * connection. Note that we create the client instead to check before
1765 * for this condition, since now the socket is already set in nonblocking
1766 * mode and we can send an error for free using the Kernel I/O */
1767 if (server
.maxclients
&& listLength(server
.clients
) > server
.maxclients
) {
1768 char *err
= "-ERR max number of clients reached\r\n";
1770 /* That's a best effort error message, don't check write errors */
1771 (void) write(c
->fd
,err
,strlen(err
));
1775 server
.stat_numconnections
++;
1778 /* ======================= Redis objects implementation ===================== */
1780 static robj
*createObject(int type
, void *ptr
) {
1783 if (listLength(server
.objfreelist
)) {
1784 listNode
*head
= listFirst(server
.objfreelist
);
1785 o
= listNodeValue(head
);
1786 listDelNode(server
.objfreelist
,head
);
1788 o
= zmalloc(sizeof(*o
));
1790 if (!o
) oom("createObject");
1792 o
->encoding
= REDIS_ENCODING_RAW
;
1798 static robj
*createStringObject(char *ptr
, size_t len
) {
1799 return createObject(REDIS_STRING
,sdsnewlen(ptr
,len
));
1802 static robj
*createListObject(void) {
1803 list
*l
= listCreate();
1805 if (!l
) oom("listCreate");
1806 listSetFreeMethod(l
,decrRefCount
);
1807 return createObject(REDIS_LIST
,l
);
1810 static robj
*createSetObject(void) {
1811 dict
*d
= dictCreate(&setDictType
,NULL
);
1812 if (!d
) oom("dictCreate");
1813 return createObject(REDIS_SET
,d
);
1816 static void freeStringObject(robj
*o
) {
1817 if (o
->encoding
== REDIS_ENCODING_RAW
) {
1822 static void freeListObject(robj
*o
) {
1823 listRelease((list
*) o
->ptr
);
1826 static void freeSetObject(robj
*o
) {
1827 dictRelease((dict
*) o
->ptr
);
1830 static void freeHashObject(robj
*o
) {
1831 dictRelease((dict
*) o
->ptr
);
1834 static void incrRefCount(robj
*o
) {
1836 #ifdef DEBUG_REFCOUNT
1837 if (o
->type
== REDIS_STRING
)
1838 printf("Increment '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
);
1842 static void decrRefCount(void *obj
) {
1845 #ifdef DEBUG_REFCOUNT
1846 if (o
->type
== REDIS_STRING
)
1847 printf("Decrement '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
-1);
1849 if (--(o
->refcount
) == 0) {
1851 case REDIS_STRING
: freeStringObject(o
); break;
1852 case REDIS_LIST
: freeListObject(o
); break;
1853 case REDIS_SET
: freeSetObject(o
); break;
1854 case REDIS_HASH
: freeHashObject(o
); break;
1855 default: assert(0 != 0); break;
1857 if (listLength(server
.objfreelist
) > REDIS_OBJFREELIST_MAX
||
1858 !listAddNodeHead(server
.objfreelist
,o
))
1863 static robj
*lookupKey(redisDb
*db
, robj
*key
) {
1864 dictEntry
*de
= dictFind(db
->dict
,key
);
1865 return de
? dictGetEntryVal(de
) : NULL
;
1868 static robj
*lookupKeyRead(redisDb
*db
, robj
*key
) {
1869 expireIfNeeded(db
,key
);
1870 return lookupKey(db
,key
);
1873 static robj
*lookupKeyWrite(redisDb
*db
, robj
*key
) {
1874 deleteIfVolatile(db
,key
);
1875 return lookupKey(db
,key
);
1878 static int deleteKey(redisDb
*db
, robj
*key
) {
1881 /* We need to protect key from destruction: after the first dictDelete()
1882 * it may happen that 'key' is no longer valid if we don't increment
1883 * it's count. This may happen when we get the object reference directly
1884 * from the hash table with dictRandomKey() or dict iterators */
1886 if (dictSize(db
->expires
)) dictDelete(db
->expires
,key
);
1887 retval
= dictDelete(db
->dict
,key
);
1890 return retval
== DICT_OK
;
1893 /* Try to share an object against the shared objects pool */
1894 static robj
*tryObjectSharing(robj
*o
) {
1895 struct dictEntry
*de
;
1898 if (o
== NULL
|| server
.shareobjects
== 0) return o
;
1900 assert(o
->type
== REDIS_STRING
);
1901 de
= dictFind(server
.sharingpool
,o
);
1903 robj
*shared
= dictGetEntryKey(de
);
1905 c
= ((unsigned long) dictGetEntryVal(de
))+1;
1906 dictGetEntryVal(de
) = (void*) c
;
1907 incrRefCount(shared
);
1911 /* Here we are using a stream algorihtm: Every time an object is
1912 * shared we increment its count, everytime there is a miss we
1913 * recrement the counter of a random object. If this object reaches
1914 * zero we remove the object and put the current object instead. */
1915 if (dictSize(server
.sharingpool
) >=
1916 server
.sharingpoolsize
) {
1917 de
= dictGetRandomKey(server
.sharingpool
);
1919 c
= ((unsigned long) dictGetEntryVal(de
))-1;
1920 dictGetEntryVal(de
) = (void*) c
;
1922 dictDelete(server
.sharingpool
,de
->key
);
1925 c
= 0; /* If the pool is empty we want to add this object */
1930 retval
= dictAdd(server
.sharingpool
,o
,(void*)1);
1931 assert(retval
== DICT_OK
);
1938 /* Check if the nul-terminated string 's' can be represented by a long
1939 * (that is, is a number that fits into long without any other space or
1940 * character before or after the digits).
1942 * If so, the function returns REDIS_OK and *longval is set to the value
1943 * of the number. Otherwise REDIS_ERR is returned */
1944 static int isStringRepresentableAsLong(char *s
, long *longval
) {
1945 char buf
[32], *endptr
;
1949 value
= strtol(s
, &endptr
, 10);
1950 if (endptr
[0] != '\0') return REDIS_ERR
;
1951 slen
= snprintf(buf
,32,"%ld",value
);
1953 /* If the number converted back into a string is not identical
1954 * then it's not possible to encode the string as integer */
1955 if (strlen(buf
) != (unsigned)slen
|| memcmp(buf
,s
,slen
)) return REDIS_ERR
;
1956 if (longval
) *longval
= value
;
1960 /* Try to encode a string object in order to save space */
1961 static int tryObjectEncoding(robj
*o
) {
1965 if (o
->encoding
!= REDIS_ENCODING_RAW
)
1966 return REDIS_ERR
; /* Already encoded */
1968 /* It's not save to encode shared objects: shared objects can be shared
1969 * everywhere in the "object space" of Redis. Encoded objects can only
1970 * appear as "values" (and not, for instance, as keys) */
1971 if (o
->refcount
> 1) return REDIS_ERR
;
1973 /* Currently we try to encode only strings */
1974 assert(o
->type
== REDIS_STRING
);
1976 /* Check if we can represent this string as a long integer */
1977 if (isStringRepresentableAsLong(s
,&value
) == REDIS_ERR
) return REDIS_ERR
;
1979 /* Ok, this object can be encoded */
1980 o
->encoding
= REDIS_ENCODING_INT
;
1982 o
->ptr
= (void*) value
;
1986 /* Get a decoded version of an encoded object (returned as a new object) */
1987 static robj
*getDecodedObject(const robj
*o
) {
1990 assert(o
->encoding
!= REDIS_ENCODING_RAW
);
1991 if (o
->type
== REDIS_STRING
&& o
->encoding
== REDIS_ENCODING_INT
) {
1994 snprintf(buf
,32,"%ld",(long)o
->ptr
);
1995 dec
= createStringObject(buf
,strlen(buf
));
2002 static int compareStringObjects(robj
*a
, robj
*b
) {
2003 assert(a
->type
== REDIS_STRING
&& b
->type
== REDIS_STRING
);
2005 if (a
->encoding
== REDIS_ENCODING_INT
&& b
->encoding
== REDIS_ENCODING_INT
){
2006 return (long)a
->ptr
- (long)b
->ptr
;
2012 if (a
->encoding
!= REDIS_ENCODING_RAW
) a
= getDecodedObject(a
);
2013 if (b
->encoding
!= REDIS_ENCODING_RAW
) b
= getDecodedObject(a
);
2014 retval
= sdscmp(a
->ptr
,b
->ptr
);
2021 static size_t stringObjectLen(robj
*o
) {
2022 assert(o
->type
== REDIS_STRING
);
2023 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2024 return sdslen(o
->ptr
);
2028 return snprintf(buf
,32,"%ld",(long)o
->ptr
);
2032 /*============================ DB saving/loading ============================ */
2034 static int rdbSaveType(FILE *fp
, unsigned char type
) {
2035 if (fwrite(&type
,1,1,fp
) == 0) return -1;
2039 static int rdbSaveTime(FILE *fp
, time_t t
) {
2040 int32_t t32
= (int32_t) t
;
2041 if (fwrite(&t32
,4,1,fp
) == 0) return -1;
2045 /* check rdbLoadLen() comments for more info */
2046 static int rdbSaveLen(FILE *fp
, uint32_t len
) {
2047 unsigned char buf
[2];
2050 /* Save a 6 bit len */
2051 buf
[0] = (len
&0xFF)|(REDIS_RDB_6BITLEN
<<6);
2052 if (fwrite(buf
,1,1,fp
) == 0) return -1;
2053 } else if (len
< (1<<14)) {
2054 /* Save a 14 bit len */
2055 buf
[0] = ((len
>>8)&0xFF)|(REDIS_RDB_14BITLEN
<<6);
2057 if (fwrite(buf
,2,1,fp
) == 0) return -1;
2059 /* Save a 32 bit len */
2060 buf
[0] = (REDIS_RDB_32BITLEN
<<6);
2061 if (fwrite(buf
,1,1,fp
) == 0) return -1;
2063 if (fwrite(&len
,4,1,fp
) == 0) return -1;
2068 /* String objects in the form "2391" "-100" without any space and with a
2069 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2070 * encoded as integers to save space */
2071 static int rdbTryIntegerEncoding(sds s
, unsigned char *enc
) {
2073 char *endptr
, buf
[32];
2075 /* Check if it's possible to encode this value as a number */
2076 value
= strtoll(s
, &endptr
, 10);
2077 if (endptr
[0] != '\0') return 0;
2078 snprintf(buf
,32,"%lld",value
);
2080 /* If the number converted back into a string is not identical
2081 * then it's not possible to encode the string as integer */
2082 if (strlen(buf
) != sdslen(s
) || memcmp(buf
,s
,sdslen(s
))) return 0;
2084 /* Finally check if it fits in our ranges */
2085 if (value
>= -(1<<7) && value
<= (1<<7)-1) {
2086 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT8
;
2087 enc
[1] = value
&0xFF;
2089 } else if (value
>= -(1<<15) && value
<= (1<<15)-1) {
2090 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT16
;
2091 enc
[1] = value
&0xFF;
2092 enc
[2] = (value
>>8)&0xFF;
2094 } else if (value
>= -((long long)1<<31) && value
<= ((long long)1<<31)-1) {
2095 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT32
;
2096 enc
[1] = value
&0xFF;
2097 enc
[2] = (value
>>8)&0xFF;
2098 enc
[3] = (value
>>16)&0xFF;
2099 enc
[4] = (value
>>24)&0xFF;
2106 static int rdbSaveLzfStringObject(FILE *fp
, robj
*obj
) {
2107 unsigned int comprlen
, outlen
;
2111 /* We require at least four bytes compression for this to be worth it */
2112 outlen
= sdslen(obj
->ptr
)-4;
2113 if (outlen
<= 0) return 0;
2114 if ((out
= zmalloc(outlen
+1)) == NULL
) return 0;
2115 comprlen
= lzf_compress(obj
->ptr
, sdslen(obj
->ptr
), out
, outlen
);
2116 if (comprlen
== 0) {
2120 /* Data compressed! Let's save it on disk */
2121 byte
= (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_LZF
;
2122 if (fwrite(&byte
,1,1,fp
) == 0) goto writeerr
;
2123 if (rdbSaveLen(fp
,comprlen
) == -1) goto writeerr
;
2124 if (rdbSaveLen(fp
,sdslen(obj
->ptr
)) == -1) goto writeerr
;
2125 if (fwrite(out
,comprlen
,1,fp
) == 0) goto writeerr
;
2134 /* Save a string objet as [len][data] on disk. If the object is a string
2135 * representation of an integer value we try to safe it in a special form */
2136 static int rdbSaveStringObjectRaw(FILE *fp
, robj
*obj
) {
2140 len
= sdslen(obj
->ptr
);
2142 /* Try integer encoding */
2144 unsigned char buf
[5];
2145 if ((enclen
= rdbTryIntegerEncoding(obj
->ptr
,buf
)) > 0) {
2146 if (fwrite(buf
,enclen
,1,fp
) == 0) return -1;
2151 /* Try LZF compression - under 20 bytes it's unable to compress even
2152 * aaaaaaaaaaaaaaaaaa so skip it */
2156 retval
= rdbSaveLzfStringObject(fp
,obj
);
2157 if (retval
== -1) return -1;
2158 if (retval
> 0) return 0;
2159 /* retval == 0 means data can't be compressed, save the old way */
2162 /* Store verbatim */
2163 if (rdbSaveLen(fp
,len
) == -1) return -1;
2164 if (len
&& fwrite(obj
->ptr
,len
,1,fp
) == 0) return -1;
2168 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2169 static int rdbSaveStringObject(FILE *fp
, robj
*obj
) {
2173 if (obj
->encoding
!= REDIS_ENCODING_RAW
) {
2174 dec
= getDecodedObject(obj
);
2175 retval
= rdbSaveStringObjectRaw(fp
,dec
);
2179 return rdbSaveStringObjectRaw(fp
,obj
);
2183 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2184 static int rdbSave(char *filename
) {
2185 dictIterator
*di
= NULL
;
2190 time_t now
= time(NULL
);
2192 snprintf(tmpfile
,256,"temp-%d.rdb", (int) getpid());
2193 fp
= fopen(tmpfile
,"w");
2195 redisLog(REDIS_WARNING
, "Failed saving the DB: %s", strerror(errno
));
2198 if (fwrite("REDIS0001",9,1,fp
) == 0) goto werr
;
2199 for (j
= 0; j
< server
.dbnum
; j
++) {
2200 redisDb
*db
= server
.db
+j
;
2202 if (dictSize(d
) == 0) continue;
2203 di
= dictGetIterator(d
);
2209 /* Write the SELECT DB opcode */
2210 if (rdbSaveType(fp
,REDIS_SELECTDB
) == -1) goto werr
;
2211 if (rdbSaveLen(fp
,j
) == -1) goto werr
;
2213 /* Iterate this DB writing every entry */
2214 while((de
= dictNext(di
)) != NULL
) {
2215 robj
*key
= dictGetEntryKey(de
);
2216 robj
*o
= dictGetEntryVal(de
);
2217 time_t expiretime
= getExpire(db
,key
);
2219 /* Save the expire time */
2220 if (expiretime
!= -1) {
2221 /* If this key is already expired skip it */
2222 if (expiretime
< now
) continue;
2223 if (rdbSaveType(fp
,REDIS_EXPIRETIME
) == -1) goto werr
;
2224 if (rdbSaveTime(fp
,expiretime
) == -1) goto werr
;
2226 /* Save the key and associated value */
2227 if (rdbSaveType(fp
,o
->type
) == -1) goto werr
;
2228 if (rdbSaveStringObject(fp
,key
) == -1) goto werr
;
2229 if (o
->type
== REDIS_STRING
) {
2230 /* Save a string value */
2231 if (rdbSaveStringObject(fp
,o
) == -1) goto werr
;
2232 } else if (o
->type
== REDIS_LIST
) {
2233 /* Save a list value */
2234 list
*list
= o
->ptr
;
2238 if (rdbSaveLen(fp
,listLength(list
)) == -1) goto werr
;
2239 while((ln
= listYield(list
))) {
2240 robj
*eleobj
= listNodeValue(ln
);
2242 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
2244 } else if (o
->type
== REDIS_SET
) {
2245 /* Save a set value */
2247 dictIterator
*di
= dictGetIterator(set
);
2250 if (!set
) oom("dictGetIteraotr");
2251 if (rdbSaveLen(fp
,dictSize(set
)) == -1) goto werr
;
2252 while((de
= dictNext(di
)) != NULL
) {
2253 robj
*eleobj
= dictGetEntryKey(de
);
2255 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
2257 dictReleaseIterator(di
);
2262 dictReleaseIterator(di
);
2265 if (rdbSaveType(fp
,REDIS_EOF
) == -1) goto werr
;
2267 /* Make sure data will not remain on the OS's output buffers */
2272 /* Use RENAME to make sure the DB file is changed atomically only
2273 * if the generate DB file is ok. */
2274 if (rename(tmpfile
,filename
) == -1) {
2275 redisLog(REDIS_WARNING
,"Error moving temp DB file on the final destionation: %s", strerror(errno
));
2279 redisLog(REDIS_NOTICE
,"DB saved on disk");
2281 server
.lastsave
= time(NULL
);
2287 redisLog(REDIS_WARNING
,"Write error saving DB on disk: %s", strerror(errno
));
2288 if (di
) dictReleaseIterator(di
);
2292 static int rdbSaveBackground(char *filename
) {
2295 if (server
.bgsaveinprogress
) return REDIS_ERR
;
2296 if ((childpid
= fork()) == 0) {
2299 if (rdbSave(filename
) == REDIS_OK
) {
2306 if (childpid
== -1) {
2307 redisLog(REDIS_WARNING
,"Can't save in background: fork: %s",
2311 redisLog(REDIS_NOTICE
,"Background saving started by pid %d",childpid
);
2312 server
.bgsaveinprogress
= 1;
2313 server
.bgsavechildpid
= childpid
;
2316 return REDIS_OK
; /* unreached */
2319 static void rdbRemoveTempFile(pid_t childpid
) {
2322 snprintf(tmpfile
,256,"temp-%d.rdb", (int) childpid
);
2326 static int rdbLoadType(FILE *fp
) {
2328 if (fread(&type
,1,1,fp
) == 0) return -1;
2332 static time_t rdbLoadTime(FILE *fp
) {
2334 if (fread(&t32
,4,1,fp
) == 0) return -1;
2335 return (time_t) t32
;
2338 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2339 * of this file for a description of how this are stored on disk.
2341 * isencoded is set to 1 if the readed length is not actually a length but
2342 * an "encoding type", check the above comments for more info */
2343 static uint32_t rdbLoadLen(FILE *fp
, int rdbver
, int *isencoded
) {
2344 unsigned char buf
[2];
2347 if (isencoded
) *isencoded
= 0;
2349 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
2354 if (fread(buf
,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2355 type
= (buf
[0]&0xC0)>>6;
2356 if (type
== REDIS_RDB_6BITLEN
) {
2357 /* Read a 6 bit len */
2359 } else if (type
== REDIS_RDB_ENCVAL
) {
2360 /* Read a 6 bit len encoding type */
2361 if (isencoded
) *isencoded
= 1;
2363 } else if (type
== REDIS_RDB_14BITLEN
) {
2364 /* Read a 14 bit len */
2365 if (fread(buf
+1,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2366 return ((buf
[0]&0x3F)<<8)|buf
[1];
2368 /* Read a 32 bit len */
2369 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
2375 static robj
*rdbLoadIntegerObject(FILE *fp
, int enctype
) {
2376 unsigned char enc
[4];
2379 if (enctype
== REDIS_RDB_ENC_INT8
) {
2380 if (fread(enc
,1,1,fp
) == 0) return NULL
;
2381 val
= (signed char)enc
[0];
2382 } else if (enctype
== REDIS_RDB_ENC_INT16
) {
2384 if (fread(enc
,2,1,fp
) == 0) return NULL
;
2385 v
= enc
[0]|(enc
[1]<<8);
2387 } else if (enctype
== REDIS_RDB_ENC_INT32
) {
2389 if (fread(enc
,4,1,fp
) == 0) return NULL
;
2390 v
= enc
[0]|(enc
[1]<<8)|(enc
[2]<<16)|(enc
[3]<<24);
2393 val
= 0; /* anti-warning */
2396 return createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",val
));
2399 static robj
*rdbLoadLzfStringObject(FILE*fp
, int rdbver
) {
2400 unsigned int len
, clen
;
2401 unsigned char *c
= NULL
;
2404 if ((clen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2405 if ((len
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2406 if ((c
= zmalloc(clen
)) == NULL
) goto err
;
2407 if ((val
= sdsnewlen(NULL
,len
)) == NULL
) goto err
;
2408 if (fread(c
,clen
,1,fp
) == 0) goto err
;
2409 if (lzf_decompress(c
,clen
,val
,len
) == 0) goto err
;
2411 return createObject(REDIS_STRING
,val
);
2418 static robj
*rdbLoadStringObject(FILE*fp
, int rdbver
) {
2423 len
= rdbLoadLen(fp
,rdbver
,&isencoded
);
2426 case REDIS_RDB_ENC_INT8
:
2427 case REDIS_RDB_ENC_INT16
:
2428 case REDIS_RDB_ENC_INT32
:
2429 return tryObjectSharing(rdbLoadIntegerObject(fp
,len
));
2430 case REDIS_RDB_ENC_LZF
:
2431 return tryObjectSharing(rdbLoadLzfStringObject(fp
,rdbver
));
2437 if (len
== REDIS_RDB_LENERR
) return NULL
;
2438 val
= sdsnewlen(NULL
,len
);
2439 if (len
&& fread(val
,len
,1,fp
) == 0) {
2443 return tryObjectSharing(createObject(REDIS_STRING
,val
));
2446 static int rdbLoad(char *filename
) {
2448 robj
*keyobj
= NULL
;
2450 int type
, retval
, rdbver
;
2451 dict
*d
= server
.db
[0].dict
;
2452 redisDb
*db
= server
.db
+0;
2454 time_t expiretime
= -1, now
= time(NULL
);
2456 fp
= fopen(filename
,"r");
2457 if (!fp
) return REDIS_ERR
;
2458 if (fread(buf
,9,1,fp
) == 0) goto eoferr
;
2460 if (memcmp(buf
,"REDIS",5) != 0) {
2462 redisLog(REDIS_WARNING
,"Wrong signature trying to load DB from file");
2465 rdbver
= atoi(buf
+5);
2468 redisLog(REDIS_WARNING
,"Can't handle RDB format version %d",rdbver
);
2475 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
2476 if (type
== REDIS_EXPIRETIME
) {
2477 if ((expiretime
= rdbLoadTime(fp
)) == -1) goto eoferr
;
2478 /* We read the time so we need to read the object type again */
2479 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
2481 if (type
== REDIS_EOF
) break;
2482 /* Handle SELECT DB opcode as a special case */
2483 if (type
== REDIS_SELECTDB
) {
2484 if ((dbid
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2486 if (dbid
>= (unsigned)server
.dbnum
) {
2487 redisLog(REDIS_WARNING
,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server
.dbnum
);
2490 db
= server
.db
+dbid
;
2495 if ((keyobj
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2497 if (type
== REDIS_STRING
) {
2498 /* Read string value */
2499 if ((o
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2500 tryObjectEncoding(o
);
2501 } else if (type
== REDIS_LIST
|| type
== REDIS_SET
) {
2502 /* Read list/set value */
2505 if ((listlen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2507 o
= (type
== REDIS_LIST
) ? createListObject() : createSetObject();
2508 /* Load every single element of the list/set */
2512 if ((ele
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2513 tryObjectEncoding(ele
);
2514 if (type
== REDIS_LIST
) {
2515 if (!listAddNodeTail((list
*)o
->ptr
,ele
))
2516 oom("listAddNodeTail");
2518 if (dictAdd((dict
*)o
->ptr
,ele
,NULL
) == DICT_ERR
)
2525 /* Add the new object in the hash table */
2526 retval
= dictAdd(d
,keyobj
,o
);
2527 if (retval
== DICT_ERR
) {
2528 redisLog(REDIS_WARNING
,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj
->ptr
);
2531 /* Set the expire time if needed */
2532 if (expiretime
!= -1) {
2533 setExpire(db
,keyobj
,expiretime
);
2534 /* Delete this key if already expired */
2535 if (expiretime
< now
) deleteKey(db
,keyobj
);
2543 eoferr
: /* unexpected end of file is handled here with a fatal exit */
2544 if (keyobj
) decrRefCount(keyobj
);
2545 redisLog(REDIS_WARNING
,"Short read or OOM loading DB. Unrecoverable error, exiting now.");
2547 return REDIS_ERR
; /* Just to avoid warning */
2550 /*================================== Commands =============================== */
2552 static void authCommand(redisClient
*c
) {
2553 if (!server
.requirepass
|| !strcmp(c
->argv
[1]->ptr
, server
.requirepass
)) {
2554 c
->authenticated
= 1;
2555 addReply(c
,shared
.ok
);
2557 c
->authenticated
= 0;
2558 addReply(c
,shared
.err
);
2562 static void pingCommand(redisClient
*c
) {
2563 addReply(c
,shared
.pong
);
2566 static void echoCommand(redisClient
*c
) {
2567 addReplyBulkLen(c
,c
->argv
[1]);
2568 addReply(c
,c
->argv
[1]);
2569 addReply(c
,shared
.crlf
);
2572 /*=================================== Strings =============================== */
2574 static void setGenericCommand(redisClient
*c
, int nx
) {
2577 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2578 if (retval
== DICT_ERR
) {
2580 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2581 incrRefCount(c
->argv
[2]);
2583 addReply(c
,shared
.czero
);
2587 incrRefCount(c
->argv
[1]);
2588 incrRefCount(c
->argv
[2]);
2591 removeExpire(c
->db
,c
->argv
[1]);
2592 addReply(c
, nx
? shared
.cone
: shared
.ok
);
2595 static void setCommand(redisClient
*c
) {
2596 setGenericCommand(c
,0);
2599 static void setnxCommand(redisClient
*c
) {
2600 setGenericCommand(c
,1);
2603 static void getCommand(redisClient
*c
) {
2604 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2607 addReply(c
,shared
.nullbulk
);
2609 if (o
->type
!= REDIS_STRING
) {
2610 addReply(c
,shared
.wrongtypeerr
);
2612 addReplyBulkLen(c
,o
);
2614 addReply(c
,shared
.crlf
);
2619 static void getsetCommand(redisClient
*c
) {
2621 if (dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]) == DICT_ERR
) {
2622 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2624 incrRefCount(c
->argv
[1]);
2626 incrRefCount(c
->argv
[2]);
2628 removeExpire(c
->db
,c
->argv
[1]);
2631 static void mgetCommand(redisClient
*c
) {
2634 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",c
->argc
-1));
2635 for (j
= 1; j
< c
->argc
; j
++) {
2636 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[j
]);
2638 addReply(c
,shared
.nullbulk
);
2640 if (o
->type
!= REDIS_STRING
) {
2641 addReply(c
,shared
.nullbulk
);
2643 addReplyBulkLen(c
,o
);
2645 addReply(c
,shared
.crlf
);
2651 static void incrDecrCommand(redisClient
*c
, long long incr
) {
2656 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2660 if (o
->type
!= REDIS_STRING
) {
2665 if (o
->encoding
== REDIS_ENCODING_RAW
)
2666 value
= strtoll(o
->ptr
, &eptr
, 10);
2667 else if (o
->encoding
== REDIS_ENCODING_INT
)
2668 value
= (long)o
->ptr
;
2675 o
= createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",value
));
2676 tryObjectEncoding(o
);
2677 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],o
);
2678 if (retval
== DICT_ERR
) {
2679 dictReplace(c
->db
->dict
,c
->argv
[1],o
);
2680 removeExpire(c
->db
,c
->argv
[1]);
2682 incrRefCount(c
->argv
[1]);
2685 addReply(c
,shared
.colon
);
2687 addReply(c
,shared
.crlf
);
2690 static void incrCommand(redisClient
*c
) {
2691 incrDecrCommand(c
,1);
2694 static void decrCommand(redisClient
*c
) {
2695 incrDecrCommand(c
,-1);
2698 static void incrbyCommand(redisClient
*c
) {
2699 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
2700 incrDecrCommand(c
,incr
);
2703 static void decrbyCommand(redisClient
*c
) {
2704 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
2705 incrDecrCommand(c
,-incr
);
2708 /* ========================= Type agnostic commands ========================= */
2710 static void delCommand(redisClient
*c
) {
2713 for (j
= 1; j
< c
->argc
; j
++) {
2714 if (deleteKey(c
->db
,c
->argv
[j
])) {
2721 addReply(c
,shared
.czero
);
2724 addReply(c
,shared
.cone
);
2727 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",deleted
));
2732 static void existsCommand(redisClient
*c
) {
2733 addReply(c
,lookupKeyRead(c
->db
,c
->argv
[1]) ? shared
.cone
: shared
.czero
);
2736 static void selectCommand(redisClient
*c
) {
2737 int id
= atoi(c
->argv
[1]->ptr
);
2739 if (selectDb(c
,id
) == REDIS_ERR
) {
2740 addReplySds(c
,sdsnew("-ERR invalid DB index\r\n"));
2742 addReply(c
,shared
.ok
);
2746 static void randomkeyCommand(redisClient
*c
) {
2750 de
= dictGetRandomKey(c
->db
->dict
);
2751 if (!de
|| expireIfNeeded(c
->db
,dictGetEntryKey(de
)) == 0) break;
2754 addReply(c
,shared
.plus
);
2755 addReply(c
,shared
.crlf
);
2757 addReply(c
,shared
.plus
);
2758 addReply(c
,dictGetEntryKey(de
));
2759 addReply(c
,shared
.crlf
);
2763 static void keysCommand(redisClient
*c
) {
2766 sds pattern
= c
->argv
[1]->ptr
;
2767 int plen
= sdslen(pattern
);
2768 int numkeys
= 0, keyslen
= 0;
2769 robj
*lenobj
= createObject(REDIS_STRING
,NULL
);
2771 di
= dictGetIterator(c
->db
->dict
);
2772 if (!di
) oom("dictGetIterator");
2774 decrRefCount(lenobj
);
2775 while((de
= dictNext(di
)) != NULL
) {
2776 robj
*keyobj
= dictGetEntryKey(de
);
2778 sds key
= keyobj
->ptr
;
2779 if ((pattern
[0] == '*' && pattern
[1] == '\0') ||
2780 stringmatchlen(pattern
,plen
,key
,sdslen(key
),0)) {
2781 if (expireIfNeeded(c
->db
,keyobj
) == 0) {
2783 addReply(c
,shared
.space
);
2786 keyslen
+= sdslen(key
);
2790 dictReleaseIterator(di
);
2791 lenobj
->ptr
= sdscatprintf(sdsempty(),"$%lu\r\n",keyslen
+(numkeys
? (numkeys
-1) : 0));
2792 addReply(c
,shared
.crlf
);
2795 static void dbsizeCommand(redisClient
*c
) {
2797 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c
->db
->dict
)));
2800 static void lastsaveCommand(redisClient
*c
) {
2802 sdscatprintf(sdsempty(),":%lu\r\n",server
.lastsave
));
2805 static void typeCommand(redisClient
*c
) {
2809 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2814 case REDIS_STRING
: type
= "+string"; break;
2815 case REDIS_LIST
: type
= "+list"; break;
2816 case REDIS_SET
: type
= "+set"; break;
2817 default: type
= "unknown"; break;
2820 addReplySds(c
,sdsnew(type
));
2821 addReply(c
,shared
.crlf
);
2824 static void saveCommand(redisClient
*c
) {
2825 if (server
.bgsaveinprogress
) {
2826 addReplySds(c
,sdsnew("-ERR background save in progress\r\n"));
2829 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
2830 addReply(c
,shared
.ok
);
2832 addReply(c
,shared
.err
);
2836 static void bgsaveCommand(redisClient
*c
) {
2837 if (server
.bgsaveinprogress
) {
2838 addReplySds(c
,sdsnew("-ERR background save already in progress\r\n"));
2841 if (rdbSaveBackground(server
.dbfilename
) == REDIS_OK
) {
2842 addReply(c
,shared
.ok
);
2844 addReply(c
,shared
.err
);
2848 static void shutdownCommand(redisClient
*c
) {
2849 redisLog(REDIS_WARNING
,"User requested shutdown, saving DB...");
2850 /* Kill the saving child if there is a background saving in progress.
2851 We want to avoid race conditions, for instance our saving child may
2852 overwrite the synchronous saving did by SHUTDOWN. */
2853 if (server
.bgsaveinprogress
) {
2854 redisLog(REDIS_WARNING
,"There is a live saving child. Killing it!");
2855 kill(server
.bgsavechildpid
,SIGKILL
);
2856 rdbRemoveTempFile(server
.bgsavechildpid
);
2859 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
2860 if (server
.daemonize
)
2861 unlink(server
.pidfile
);
2862 redisLog(REDIS_WARNING
,"%zu bytes used at exit",zmalloc_used_memory());
2863 redisLog(REDIS_WARNING
,"Server exit now, bye bye...");
2866 /* Ooops.. error saving! The best we can do is to continue operating.
2867 * Note that if there was a background saving process, in the next
2868 * cron() Redis will be notified that the background saving aborted,
2869 * handling special stuff like slaves pending for synchronization... */
2870 redisLog(REDIS_WARNING
,"Error trying to save the DB, can't exit");
2871 addReplySds(c
,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
2875 static void renameGenericCommand(redisClient
*c
, int nx
) {
2878 /* To use the same key as src and dst is probably an error */
2879 if (sdscmp(c
->argv
[1]->ptr
,c
->argv
[2]->ptr
) == 0) {
2880 addReply(c
,shared
.sameobjecterr
);
2884 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2886 addReply(c
,shared
.nokeyerr
);
2890 deleteIfVolatile(c
->db
,c
->argv
[2]);
2891 if (dictAdd(c
->db
->dict
,c
->argv
[2],o
) == DICT_ERR
) {
2894 addReply(c
,shared
.czero
);
2897 dictReplace(c
->db
->dict
,c
->argv
[2],o
);
2899 incrRefCount(c
->argv
[2]);
2901 deleteKey(c
->db
,c
->argv
[1]);
2903 addReply(c
,nx
? shared
.cone
: shared
.ok
);
2906 static void renameCommand(redisClient
*c
) {
2907 renameGenericCommand(c
,0);
2910 static void renamenxCommand(redisClient
*c
) {
2911 renameGenericCommand(c
,1);
2914 static void moveCommand(redisClient
*c
) {
2919 /* Obtain source and target DB pointers */
2922 if (selectDb(c
,atoi(c
->argv
[2]->ptr
)) == REDIS_ERR
) {
2923 addReply(c
,shared
.outofrangeerr
);
2927 selectDb(c
,srcid
); /* Back to the source DB */
2929 /* If the user is moving using as target the same
2930 * DB as the source DB it is probably an error. */
2932 addReply(c
,shared
.sameobjecterr
);
2936 /* Check if the element exists and get a reference */
2937 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2939 addReply(c
,shared
.czero
);
2943 /* Try to add the element to the target DB */
2944 deleteIfVolatile(dst
,c
->argv
[1]);
2945 if (dictAdd(dst
->dict
,c
->argv
[1],o
) == DICT_ERR
) {
2946 addReply(c
,shared
.czero
);
2949 incrRefCount(c
->argv
[1]);
2952 /* OK! key moved, free the entry in the source DB */
2953 deleteKey(src
,c
->argv
[1]);
2955 addReply(c
,shared
.cone
);
2958 /* =================================== Lists ================================ */
2959 static void pushGenericCommand(redisClient
*c
, int where
) {
2963 lobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2965 lobj
= createListObject();
2967 if (where
== REDIS_HEAD
) {
2968 if (!listAddNodeHead(list
,c
->argv
[2])) oom("listAddNodeHead");
2970 if (!listAddNodeTail(list
,c
->argv
[2])) oom("listAddNodeTail");
2972 dictAdd(c
->db
->dict
,c
->argv
[1],lobj
);
2973 incrRefCount(c
->argv
[1]);
2974 incrRefCount(c
->argv
[2]);
2976 if (lobj
->type
!= REDIS_LIST
) {
2977 addReply(c
,shared
.wrongtypeerr
);
2981 if (where
== REDIS_HEAD
) {
2982 if (!listAddNodeHead(list
,c
->argv
[2])) oom("listAddNodeHead");
2984 if (!listAddNodeTail(list
,c
->argv
[2])) oom("listAddNodeTail");
2986 incrRefCount(c
->argv
[2]);
2989 addReply(c
,shared
.ok
);
2992 static void lpushCommand(redisClient
*c
) {
2993 pushGenericCommand(c
,REDIS_HEAD
);
2996 static void rpushCommand(redisClient
*c
) {
2997 pushGenericCommand(c
,REDIS_TAIL
);
3000 static void llenCommand(redisClient
*c
) {
3004 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3006 addReply(c
,shared
.czero
);
3009 if (o
->type
!= REDIS_LIST
) {
3010 addReply(c
,shared
.wrongtypeerr
);
3013 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",listLength(l
)));
3018 static void lindexCommand(redisClient
*c
) {
3020 int index
= atoi(c
->argv
[2]->ptr
);
3022 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3024 addReply(c
,shared
.nullbulk
);
3026 if (o
->type
!= REDIS_LIST
) {
3027 addReply(c
,shared
.wrongtypeerr
);
3029 list
*list
= o
->ptr
;
3032 ln
= listIndex(list
, index
);
3034 addReply(c
,shared
.nullbulk
);
3036 robj
*ele
= listNodeValue(ln
);
3037 addReplyBulkLen(c
,ele
);
3039 addReply(c
,shared
.crlf
);
3045 static void lsetCommand(redisClient
*c
) {
3047 int index
= atoi(c
->argv
[2]->ptr
);
3049 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3051 addReply(c
,shared
.nokeyerr
);
3053 if (o
->type
!= REDIS_LIST
) {
3054 addReply(c
,shared
.wrongtypeerr
);
3056 list
*list
= o
->ptr
;
3059 ln
= listIndex(list
, index
);
3061 addReply(c
,shared
.outofrangeerr
);
3063 robj
*ele
= listNodeValue(ln
);
3066 listNodeValue(ln
) = c
->argv
[3];
3067 incrRefCount(c
->argv
[3]);
3068 addReply(c
,shared
.ok
);
3075 static void popGenericCommand(redisClient
*c
, int where
) {
3078 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3080 addReply(c
,shared
.nullbulk
);
3082 if (o
->type
!= REDIS_LIST
) {
3083 addReply(c
,shared
.wrongtypeerr
);
3085 list
*list
= o
->ptr
;
3088 if (where
== REDIS_HEAD
)
3089 ln
= listFirst(list
);
3091 ln
= listLast(list
);
3094 addReply(c
,shared
.nullbulk
);
3096 robj
*ele
= listNodeValue(ln
);
3097 addReplyBulkLen(c
,ele
);
3099 addReply(c
,shared
.crlf
);
3100 listDelNode(list
,ln
);
3107 static void lpopCommand(redisClient
*c
) {
3108 popGenericCommand(c
,REDIS_HEAD
);
3111 static void rpopCommand(redisClient
*c
) {
3112 popGenericCommand(c
,REDIS_TAIL
);
3115 static void lrangeCommand(redisClient
*c
) {
3117 int start
= atoi(c
->argv
[2]->ptr
);
3118 int end
= atoi(c
->argv
[3]->ptr
);
3120 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3122 addReply(c
,shared
.nullmultibulk
);
3124 if (o
->type
!= REDIS_LIST
) {
3125 addReply(c
,shared
.wrongtypeerr
);
3127 list
*list
= o
->ptr
;
3129 int llen
= listLength(list
);
3133 /* convert negative indexes */
3134 if (start
< 0) start
= llen
+start
;
3135 if (end
< 0) end
= llen
+end
;
3136 if (start
< 0) start
= 0;
3137 if (end
< 0) end
= 0;
3139 /* indexes sanity checks */
3140 if (start
> end
|| start
>= llen
) {
3141 /* Out of range start or start > end result in empty list */
3142 addReply(c
,shared
.emptymultibulk
);
3145 if (end
>= llen
) end
= llen
-1;
3146 rangelen
= (end
-start
)+1;
3148 /* Return the result in form of a multi-bulk reply */
3149 ln
= listIndex(list
, start
);
3150 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",rangelen
));
3151 for (j
= 0; j
< rangelen
; j
++) {
3152 ele
= listNodeValue(ln
);
3153 addReplyBulkLen(c
,ele
);
3155 addReply(c
,shared
.crlf
);
3162 static void ltrimCommand(redisClient
*c
) {
3164 int start
= atoi(c
->argv
[2]->ptr
);
3165 int end
= atoi(c
->argv
[3]->ptr
);
3167 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3169 addReply(c
,shared
.nokeyerr
);
3171 if (o
->type
!= REDIS_LIST
) {
3172 addReply(c
,shared
.wrongtypeerr
);
3174 list
*list
= o
->ptr
;
3176 int llen
= listLength(list
);
3177 int j
, ltrim
, rtrim
;
3179 /* convert negative indexes */
3180 if (start
< 0) start
= llen
+start
;
3181 if (end
< 0) end
= llen
+end
;
3182 if (start
< 0) start
= 0;
3183 if (end
< 0) end
= 0;
3185 /* indexes sanity checks */
3186 if (start
> end
|| start
>= llen
) {
3187 /* Out of range start or start > end result in empty list */
3191 if (end
>= llen
) end
= llen
-1;
3196 /* Remove list elements to perform the trim */
3197 for (j
= 0; j
< ltrim
; j
++) {
3198 ln
= listFirst(list
);
3199 listDelNode(list
,ln
);
3201 for (j
= 0; j
< rtrim
; j
++) {
3202 ln
= listLast(list
);
3203 listDelNode(list
,ln
);
3206 addReply(c
,shared
.ok
);
3211 static void lremCommand(redisClient
*c
) {
3214 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3216 addReply(c
,shared
.czero
);
3218 if (o
->type
!= REDIS_LIST
) {
3219 addReply(c
,shared
.wrongtypeerr
);
3221 list
*list
= o
->ptr
;
3222 listNode
*ln
, *next
;
3223 int toremove
= atoi(c
->argv
[2]->ptr
);
3228 toremove
= -toremove
;
3231 ln
= fromtail
? list
->tail
: list
->head
;
3233 robj
*ele
= listNodeValue(ln
);
3235 next
= fromtail
? ln
->prev
: ln
->next
;
3236 if (compareStringObjects(ele
,c
->argv
[3]) == 0) {
3237 listDelNode(list
,ln
);
3240 if (toremove
&& removed
== toremove
) break;
3244 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",removed
));
3249 /* ==================================== Sets ================================ */
3251 static void saddCommand(redisClient
*c
) {
3254 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3256 set
= createSetObject();
3257 dictAdd(c
->db
->dict
,c
->argv
[1],set
);
3258 incrRefCount(c
->argv
[1]);
3260 if (set
->type
!= REDIS_SET
) {
3261 addReply(c
,shared
.wrongtypeerr
);
3265 if (dictAdd(set
->ptr
,c
->argv
[2],NULL
) == DICT_OK
) {
3266 incrRefCount(c
->argv
[2]);
3268 addReply(c
,shared
.cone
);
3270 addReply(c
,shared
.czero
);
3274 static void sremCommand(redisClient
*c
) {
3277 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3279 addReply(c
,shared
.czero
);
3281 if (set
->type
!= REDIS_SET
) {
3282 addReply(c
,shared
.wrongtypeerr
);
3285 if (dictDelete(set
->ptr
,c
->argv
[2]) == DICT_OK
) {
3287 if (htNeedsResize(set
->ptr
)) dictResize(set
->ptr
);
3288 addReply(c
,shared
.cone
);
3290 addReply(c
,shared
.czero
);
3295 static void smoveCommand(redisClient
*c
) {
3296 robj
*srcset
, *dstset
;
3298 srcset
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3299 dstset
= lookupKeyWrite(c
->db
,c
->argv
[2]);
3301 /* If the source key does not exist return 0, if it's of the wrong type
3303 if (srcset
== NULL
|| srcset
->type
!= REDIS_SET
) {
3304 addReply(c
, srcset
? shared
.wrongtypeerr
: shared
.czero
);
3307 /* Error if the destination key is not a set as well */
3308 if (dstset
&& dstset
->type
!= REDIS_SET
) {
3309 addReply(c
,shared
.wrongtypeerr
);
3312 /* Remove the element from the source set */
3313 if (dictDelete(srcset
->ptr
,c
->argv
[3]) == DICT_ERR
) {
3314 /* Key not found in the src set! return zero */
3315 addReply(c
,shared
.czero
);
3319 /* Add the element to the destination set */
3321 dstset
= createSetObject();
3322 dictAdd(c
->db
->dict
,c
->argv
[2],dstset
);
3323 incrRefCount(c
->argv
[2]);
3325 if (dictAdd(dstset
->ptr
,c
->argv
[3],NULL
) == DICT_OK
)
3326 incrRefCount(c
->argv
[3]);
3327 addReply(c
,shared
.cone
);
3330 static void sismemberCommand(redisClient
*c
) {
3333 set
= lookupKeyRead(c
->db
,c
->argv
[1]);
3335 addReply(c
,shared
.czero
);
3337 if (set
->type
!= REDIS_SET
) {
3338 addReply(c
,shared
.wrongtypeerr
);
3341 if (dictFind(set
->ptr
,c
->argv
[2]))
3342 addReply(c
,shared
.cone
);
3344 addReply(c
,shared
.czero
);
3348 static void scardCommand(redisClient
*c
) {
3352 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3354 addReply(c
,shared
.czero
);
3357 if (o
->type
!= REDIS_SET
) {
3358 addReply(c
,shared
.wrongtypeerr
);
3361 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",
3367 static void spopCommand(redisClient
*c
) {
3371 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3373 addReply(c
,shared
.nullbulk
);
3375 if (set
->type
!= REDIS_SET
) {
3376 addReply(c
,shared
.wrongtypeerr
);
3379 de
= dictGetRandomKey(set
->ptr
);
3381 addReply(c
,shared
.nullbulk
);
3383 robj
*ele
= dictGetEntryKey(de
);
3385 addReplyBulkLen(c
,ele
);
3387 addReply(c
,shared
.crlf
);
3388 dictDelete(set
->ptr
,ele
);
3389 if (htNeedsResize(set
->ptr
)) dictResize(set
->ptr
);
3395 static int qsortCompareSetsByCardinality(const void *s1
, const void *s2
) {
3396 dict
**d1
= (void*) s1
, **d2
= (void*) s2
;
3398 return dictSize(*d1
)-dictSize(*d2
);
3401 static void sinterGenericCommand(redisClient
*c
, robj
**setskeys
, int setsnum
, robj
*dstkey
) {
3402 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
3405 robj
*lenobj
= NULL
, *dstset
= NULL
;
3406 int j
, cardinality
= 0;
3408 if (!dv
) oom("sinterGenericCommand");
3409 for (j
= 0; j
< setsnum
; j
++) {
3413 lookupKeyWrite(c
->db
,setskeys
[j
]) :
3414 lookupKeyRead(c
->db
,setskeys
[j
]);
3418 deleteKey(c
->db
,dstkey
);
3419 addReply(c
,shared
.ok
);
3421 addReply(c
,shared
.nullmultibulk
);
3425 if (setobj
->type
!= REDIS_SET
) {
3427 addReply(c
,shared
.wrongtypeerr
);
3430 dv
[j
] = setobj
->ptr
;
3432 /* Sort sets from the smallest to largest, this will improve our
3433 * algorithm's performace */
3434 qsort(dv
,setsnum
,sizeof(dict
*),qsortCompareSetsByCardinality
);
3436 /* The first thing we should output is the total number of elements...
3437 * since this is a multi-bulk write, but at this stage we don't know
3438 * the intersection set size, so we use a trick, append an empty object
3439 * to the output list and save the pointer to later modify it with the
3442 lenobj
= createObject(REDIS_STRING
,NULL
);
3444 decrRefCount(lenobj
);
3446 /* If we have a target key where to store the resulting set
3447 * create this key with an empty set inside */
3448 dstset
= createSetObject();
3451 /* Iterate all the elements of the first (smallest) set, and test
3452 * the element against all the other sets, if at least one set does
3453 * not include the element it is discarded */
3454 di
= dictGetIterator(dv
[0]);
3455 if (!di
) oom("dictGetIterator");
3457 while((de
= dictNext(di
)) != NULL
) {
3460 for (j
= 1; j
< setsnum
; j
++)
3461 if (dictFind(dv
[j
],dictGetEntryKey(de
)) == NULL
) break;
3463 continue; /* at least one set does not contain the member */
3464 ele
= dictGetEntryKey(de
);
3466 addReplyBulkLen(c
,ele
);
3468 addReply(c
,shared
.crlf
);
3471 dictAdd(dstset
->ptr
,ele
,NULL
);
3475 dictReleaseIterator(di
);
3478 /* Store the resulting set into the target */
3479 deleteKey(c
->db
,dstkey
);
3480 dictAdd(c
->db
->dict
,dstkey
,dstset
);
3481 incrRefCount(dstkey
);
3485 lenobj
->ptr
= sdscatprintf(sdsempty(),"*%d\r\n",cardinality
);
3487 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",
3488 dictSize((dict
*)dstset
->ptr
)));
3494 static void sinterCommand(redisClient
*c
) {
3495 sinterGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
);
3498 static void sinterstoreCommand(redisClient
*c
) {
3499 sinterGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1]);
3502 #define REDIS_OP_UNION 0
3503 #define REDIS_OP_DIFF 1
3505 static void sunionDiffGenericCommand(redisClient
*c
, robj
**setskeys
, int setsnum
, robj
*dstkey
, int op
) {
3506 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
3509 robj
*dstset
= NULL
;
3510 int j
, cardinality
= 0;
3512 if (!dv
) oom("sunionDiffGenericCommand");
3513 for (j
= 0; j
< setsnum
; j
++) {
3517 lookupKeyWrite(c
->db
,setskeys
[j
]) :
3518 lookupKeyRead(c
->db
,setskeys
[j
]);
3523 if (setobj
->type
!= REDIS_SET
) {
3525 addReply(c
,shared
.wrongtypeerr
);
3528 dv
[j
] = setobj
->ptr
;
3531 /* We need a temp set object to store our union. If the dstkey
3532 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
3533 * this set object will be the resulting object to set into the target key*/
3534 dstset
= createSetObject();
3536 /* Iterate all the elements of all the sets, add every element a single
3537 * time to the result set */
3538 for (j
= 0; j
< setsnum
; j
++) {
3539 if (op
== REDIS_OP_DIFF
&& j
== 0 && !dv
[j
]) break; /* result set is empty */
3540 if (!dv
[j
]) continue; /* non existing keys are like empty sets */
3542 di
= dictGetIterator(dv
[j
]);
3543 if (!di
) oom("dictGetIterator");
3545 while((de
= dictNext(di
)) != NULL
) {
3548 /* dictAdd will not add the same element multiple times */
3549 ele
= dictGetEntryKey(de
);
3550 if (op
== REDIS_OP_UNION
|| j
== 0) {
3551 if (dictAdd(dstset
->ptr
,ele
,NULL
) == DICT_OK
) {
3555 } else if (op
== REDIS_OP_DIFF
) {
3556 if (dictDelete(dstset
->ptr
,ele
) == DICT_OK
) {
3561 dictReleaseIterator(di
);
3563 if (op
== REDIS_OP_DIFF
&& cardinality
== 0) break; /* result set is empty */
3566 /* Output the content of the resulting set, if not in STORE mode */
3568 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",cardinality
));
3569 di
= dictGetIterator(dstset
->ptr
);
3570 if (!di
) oom("dictGetIterator");
3571 while((de
= dictNext(di
)) != NULL
) {
3574 ele
= dictGetEntryKey(de
);
3575 addReplyBulkLen(c
,ele
);
3577 addReply(c
,shared
.crlf
);
3579 dictReleaseIterator(di
);
3581 /* If we have a target key where to store the resulting set
3582 * create this key with the result set inside */
3583 deleteKey(c
->db
,dstkey
);
3584 dictAdd(c
->db
->dict
,dstkey
,dstset
);
3585 incrRefCount(dstkey
);
3590 decrRefCount(dstset
);
3592 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",
3593 dictSize((dict
*)dstset
->ptr
)));
3599 static void sunionCommand(redisClient
*c
) {
3600 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_UNION
);
3603 static void sunionstoreCommand(redisClient
*c
) {
3604 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_UNION
);
3607 static void sdiffCommand(redisClient
*c
) {
3608 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_DIFF
);
3611 static void sdiffstoreCommand(redisClient
*c
) {
3612 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_DIFF
);
3615 static void flushdbCommand(redisClient
*c
) {
3616 server
.dirty
+= dictSize(c
->db
->dict
);
3617 dictEmpty(c
->db
->dict
);
3618 dictEmpty(c
->db
->expires
);
3619 addReply(c
,shared
.ok
);
3622 static void flushallCommand(redisClient
*c
) {
3623 server
.dirty
+= emptyDb();
3624 addReply(c
,shared
.ok
);
3625 rdbSave(server
.dbfilename
);
3629 static redisSortOperation
*createSortOperation(int type
, robj
*pattern
) {
3630 redisSortOperation
*so
= zmalloc(sizeof(*so
));
3631 if (!so
) oom("createSortOperation");
3633 so
->pattern
= pattern
;
3637 /* Return the value associated to the key with a name obtained
3638 * substituting the first occurence of '*' in 'pattern' with 'subst' */
3639 static robj
*lookupKeyByPattern(redisDb
*db
, robj
*pattern
, robj
*subst
) {
3643 int prefixlen
, sublen
, postfixlen
;
3644 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
3648 char buf
[REDIS_SORTKEY_MAX
+1];
3651 if (subst
->encoding
== REDIS_ENCODING_RAW
)
3652 incrRefCount(subst
);
3654 subst
= getDecodedObject(subst
);
3657 spat
= pattern
->ptr
;
3659 if (sdslen(spat
)+sdslen(ssub
)-1 > REDIS_SORTKEY_MAX
) return NULL
;
3660 p
= strchr(spat
,'*');
3661 if (!p
) return NULL
;
3664 sublen
= sdslen(ssub
);
3665 postfixlen
= sdslen(spat
)-(prefixlen
+1);
3666 memcpy(keyname
.buf
,spat
,prefixlen
);
3667 memcpy(keyname
.buf
+prefixlen
,ssub
,sublen
);
3668 memcpy(keyname
.buf
+prefixlen
+sublen
,p
+1,postfixlen
);
3669 keyname
.buf
[prefixlen
+sublen
+postfixlen
] = '\0';
3670 keyname
.len
= prefixlen
+sublen
+postfixlen
;
3672 keyobj
.refcount
= 1;
3673 keyobj
.type
= REDIS_STRING
;
3674 keyobj
.ptr
= ((char*)&keyname
)+(sizeof(long)*2);
3676 decrRefCount(subst
);
3678 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
3679 return lookupKeyRead(db
,&keyobj
);
3682 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
3683 * the additional parameter is not standard but a BSD-specific we have to
3684 * pass sorting parameters via the global 'server' structure */
3685 static int sortCompare(const void *s1
, const void *s2
) {
3686 const redisSortObject
*so1
= s1
, *so2
= s2
;
3689 if (!server
.sort_alpha
) {
3690 /* Numeric sorting. Here it's trivial as we precomputed scores */
3691 if (so1
->u
.score
> so2
->u
.score
) {
3693 } else if (so1
->u
.score
< so2
->u
.score
) {
3699 /* Alphanumeric sorting */
3700 if (server
.sort_bypattern
) {
3701 if (!so1
->u
.cmpobj
|| !so2
->u
.cmpobj
) {
3702 /* At least one compare object is NULL */
3703 if (so1
->u
.cmpobj
== so2
->u
.cmpobj
)
3705 else if (so1
->u
.cmpobj
== NULL
)
3710 /* We have both the objects, use strcoll */
3711 cmp
= strcoll(so1
->u
.cmpobj
->ptr
,so2
->u
.cmpobj
->ptr
);
3714 /* Compare elements directly */
3715 if (so1
->obj
->encoding
== REDIS_ENCODING_RAW
&&
3716 so2
->obj
->encoding
== REDIS_ENCODING_RAW
) {
3717 cmp
= strcoll(so1
->obj
->ptr
,so2
->obj
->ptr
);
3721 dec1
= so1
->obj
->encoding
== REDIS_ENCODING_RAW
?
3722 so1
->obj
: getDecodedObject(so1
->obj
);
3723 dec2
= so2
->obj
->encoding
== REDIS_ENCODING_RAW
?
3724 so2
->obj
: getDecodedObject(so2
->obj
);
3725 cmp
= strcoll(dec1
->ptr
,dec2
->ptr
);
3726 if (dec1
!= so1
->obj
) decrRefCount(dec1
);
3727 if (dec2
!= so2
->obj
) decrRefCount(dec2
);
3731 return server
.sort_desc
? -cmp
: cmp
;
3734 /* The SORT command is the most complex command in Redis. Warning: this code
3735 * is optimized for speed and a bit less for readability */
3736 static void sortCommand(redisClient
*c
) {
3739 int desc
= 0, alpha
= 0;
3740 int limit_start
= 0, limit_count
= -1, start
, end
;
3741 int j
, dontsort
= 0, vectorlen
;
3742 int getop
= 0; /* GET operation counter */
3743 robj
*sortval
, *sortby
= NULL
;
3744 redisSortObject
*vector
; /* Resulting vector to sort */
3746 /* Lookup the key to sort. It must be of the right types */
3747 sortval
= lookupKeyRead(c
->db
,c
->argv
[1]);
3748 if (sortval
== NULL
) {
3749 addReply(c
,shared
.nokeyerr
);
3752 if (sortval
->type
!= REDIS_SET
&& sortval
->type
!= REDIS_LIST
) {
3753 addReply(c
,shared
.wrongtypeerr
);
3757 /* Create a list of operations to perform for every sorted element.
3758 * Operations can be GET/DEL/INCR/DECR */
3759 operations
= listCreate();
3760 listSetFreeMethod(operations
,zfree
);
3763 /* Now we need to protect sortval incrementing its count, in the future
3764 * SORT may have options able to overwrite/delete keys during the sorting
3765 * and the sorted key itself may get destroied */
3766 incrRefCount(sortval
);
3768 /* The SORT command has an SQL-alike syntax, parse it */
3769 while(j
< c
->argc
) {
3770 int leftargs
= c
->argc
-j
-1;
3771 if (!strcasecmp(c
->argv
[j
]->ptr
,"asc")) {
3773 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"desc")) {
3775 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"alpha")) {
3777 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"limit") && leftargs
>= 2) {
3778 limit_start
= atoi(c
->argv
[j
+1]->ptr
);
3779 limit_count
= atoi(c
->argv
[j
+2]->ptr
);
3781 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"by") && leftargs
>= 1) {
3782 sortby
= c
->argv
[j
+1];
3783 /* If the BY pattern does not contain '*', i.e. it is constant,
3784 * we don't need to sort nor to lookup the weight keys. */
3785 if (strchr(c
->argv
[j
+1]->ptr
,'*') == NULL
) dontsort
= 1;
3787 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"get") && leftargs
>= 1) {
3788 listAddNodeTail(operations
,createSortOperation(
3789 REDIS_SORT_GET
,c
->argv
[j
+1]));
3792 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"del") && leftargs
>= 1) {
3793 listAddNodeTail(operations
,createSortOperation(
3794 REDIS_SORT_DEL
,c
->argv
[j
+1]));
3796 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"incr") && leftargs
>= 1) {
3797 listAddNodeTail(operations
,createSortOperation(
3798 REDIS_SORT_INCR
,c
->argv
[j
+1]));
3800 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"get") && leftargs
>= 1) {
3801 listAddNodeTail(operations
,createSortOperation(
3802 REDIS_SORT_DECR
,c
->argv
[j
+1]));
3805 decrRefCount(sortval
);
3806 listRelease(operations
);
3807 addReply(c
,shared
.syntaxerr
);
3813 /* Load the sorting vector with all the objects to sort */
3814 vectorlen
= (sortval
->type
== REDIS_LIST
) ?
3815 listLength((list
*)sortval
->ptr
) :
3816 dictSize((dict
*)sortval
->ptr
);
3817 vector
= zmalloc(sizeof(redisSortObject
)*vectorlen
);
3818 if (!vector
) oom("allocating objects vector for SORT");
3820 if (sortval
->type
== REDIS_LIST
) {
3821 list
*list
= sortval
->ptr
;
3825 while((ln
= listYield(list
))) {
3826 robj
*ele
= ln
->value
;
3827 vector
[j
].obj
= ele
;
3828 vector
[j
].u
.score
= 0;
3829 vector
[j
].u
.cmpobj
= NULL
;
3833 dict
*set
= sortval
->ptr
;
3837 di
= dictGetIterator(set
);
3838 if (!di
) oom("dictGetIterator");
3839 while((setele
= dictNext(di
)) != NULL
) {
3840 vector
[j
].obj
= dictGetEntryKey(setele
);
3841 vector
[j
].u
.score
= 0;
3842 vector
[j
].u
.cmpobj
= NULL
;
3845 dictReleaseIterator(di
);
3847 assert(j
== vectorlen
);
3849 /* Now it's time to load the right scores in the sorting vector */
3850 if (dontsort
== 0) {
3851 for (j
= 0; j
< vectorlen
; j
++) {
3855 byval
= lookupKeyByPattern(c
->db
,sortby
,vector
[j
].obj
);
3856 if (!byval
|| byval
->type
!= REDIS_STRING
) continue;
3858 if (byval
->encoding
== REDIS_ENCODING_RAW
) {
3859 vector
[j
].u
.cmpobj
= byval
;
3860 incrRefCount(byval
);
3862 vector
[j
].u
.cmpobj
= getDecodedObject(byval
);
3865 if (byval
->encoding
== REDIS_ENCODING_RAW
) {
3866 vector
[j
].u
.score
= strtod(byval
->ptr
,NULL
);
3868 if (byval
->encoding
== REDIS_ENCODING_INT
) {
3869 vector
[j
].u
.score
= (long)byval
->ptr
;
3876 if (vector
[j
].obj
->encoding
== REDIS_ENCODING_RAW
)
3877 vector
[j
].u
.score
= strtod(vector
[j
].obj
->ptr
,NULL
);
3879 if (vector
[j
].obj
->encoding
== REDIS_ENCODING_INT
)
3880 vector
[j
].u
.score
= (long) vector
[j
].obj
->ptr
;
3889 /* We are ready to sort the vector... perform a bit of sanity check
3890 * on the LIMIT option too. We'll use a partial version of quicksort. */
3891 start
= (limit_start
< 0) ? 0 : limit_start
;
3892 end
= (limit_count
< 0) ? vectorlen
-1 : start
+limit_count
-1;
3893 if (start
>= vectorlen
) {
3894 start
= vectorlen
-1;
3897 if (end
>= vectorlen
) end
= vectorlen
-1;
3899 if (dontsort
== 0) {
3900 server
.sort_desc
= desc
;
3901 server
.sort_alpha
= alpha
;
3902 server
.sort_bypattern
= sortby
? 1 : 0;
3903 if (sortby
&& (start
!= 0 || end
!= vectorlen
-1))
3904 pqsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
, start
,end
);
3906 qsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
);
3909 /* Send command output to the output buffer, performing the specified
3910 * GET/DEL/INCR/DECR operations if any. */
3911 outputlen
= getop
? getop
*(end
-start
+1) : end
-start
+1;
3912 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",outputlen
));
3913 for (j
= start
; j
<= end
; j
++) {
3916 addReplyBulkLen(c
,vector
[j
].obj
);
3917 addReply(c
,vector
[j
].obj
);
3918 addReply(c
,shared
.crlf
);
3920 listRewind(operations
);
3921 while((ln
= listYield(operations
))) {
3922 redisSortOperation
*sop
= ln
->value
;
3923 robj
*val
= lookupKeyByPattern(c
->db
,sop
->pattern
,
3926 if (sop
->type
== REDIS_SORT_GET
) {
3927 if (!val
|| val
->type
!= REDIS_STRING
) {
3928 addReply(c
,shared
.nullbulk
);
3930 addReplyBulkLen(c
,val
);
3932 addReply(c
,shared
.crlf
);
3934 } else if (sop
->type
== REDIS_SORT_DEL
) {
3941 decrRefCount(sortval
);
3942 listRelease(operations
);
3943 for (j
= 0; j
< vectorlen
; j
++) {
3944 if (sortby
&& alpha
&& vector
[j
].u
.cmpobj
)
3945 decrRefCount(vector
[j
].u
.cmpobj
);
3950 static void infoCommand(redisClient
*c
) {
3952 time_t uptime
= time(NULL
)-server
.stat_starttime
;
3955 info
= sdscatprintf(sdsempty(),
3956 "redis_version:%s\r\n"
3958 "uptime_in_seconds:%d\r\n"
3959 "uptime_in_days:%d\r\n"
3960 "connected_clients:%d\r\n"
3961 "connected_slaves:%d\r\n"
3962 "used_memory:%zu\r\n"
3963 "changes_since_last_save:%lld\r\n"
3964 "bgsave_in_progress:%d\r\n"
3965 "last_save_time:%d\r\n"
3966 "total_connections_received:%lld\r\n"
3967 "total_commands_processed:%lld\r\n"
3970 (sizeof(long) == 8) ? "64" : "32",
3973 listLength(server
.clients
)-listLength(server
.slaves
),
3974 listLength(server
.slaves
),
3977 server
.bgsaveinprogress
,
3979 server
.stat_numconnections
,
3980 server
.stat_numcommands
,
3981 server
.masterhost
== NULL
? "master" : "slave"
3983 if (server
.masterhost
) {
3984 info
= sdscatprintf(info
,
3985 "master_host:%s\r\n"
3986 "master_port:%d\r\n"
3987 "master_link_status:%s\r\n"
3988 "master_last_io_seconds_ago:%d\r\n"
3991 (server
.replstate
== REDIS_REPL_CONNECTED
) ?
3993 (int)(time(NULL
)-server
.master
->lastinteraction
)
3996 for (j
= 0; j
< server
.dbnum
; j
++) {
3997 long long keys
, vkeys
;
3999 keys
= dictSize(server
.db
[j
].dict
);
4000 vkeys
= dictSize(server
.db
[j
].expires
);
4001 if (keys
|| vkeys
) {
4002 info
= sdscatprintf(info
, "db%d: keys=%lld,expires=%lld\r\n",
4006 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info
)));
4007 addReplySds(c
,info
);
4008 addReply(c
,shared
.crlf
);
4011 static void monitorCommand(redisClient
*c
) {
4012 /* ignore MONITOR if aleady slave or in monitor mode */
4013 if (c
->flags
& REDIS_SLAVE
) return;
4015 c
->flags
|= (REDIS_SLAVE
|REDIS_MONITOR
);
4017 if (!listAddNodeTail(server
.monitors
,c
)) oom("listAddNodeTail");
4018 addReply(c
,shared
.ok
);
4021 /* ================================= Expire ================================= */
4022 static int removeExpire(redisDb
*db
, robj
*key
) {
4023 if (dictDelete(db
->expires
,key
) == DICT_OK
) {
4030 static int setExpire(redisDb
*db
, robj
*key
, time_t when
) {
4031 if (dictAdd(db
->expires
,key
,(void*)when
) == DICT_ERR
) {
4039 /* Return the expire time of the specified key, or -1 if no expire
4040 * is associated with this key (i.e. the key is non volatile) */
4041 static time_t getExpire(redisDb
*db
, robj
*key
) {
4044 /* No expire? return ASAP */
4045 if (dictSize(db
->expires
) == 0 ||
4046 (de
= dictFind(db
->expires
,key
)) == NULL
) return -1;
4048 return (time_t) dictGetEntryVal(de
);
4051 static int expireIfNeeded(redisDb
*db
, robj
*key
) {
4055 /* No expire? return ASAP */
4056 if (dictSize(db
->expires
) == 0 ||
4057 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
4059 /* Lookup the expire */
4060 when
= (time_t) dictGetEntryVal(de
);
4061 if (time(NULL
) <= when
) return 0;
4063 /* Delete the key */
4064 dictDelete(db
->expires
,key
);
4065 return dictDelete(db
->dict
,key
) == DICT_OK
;
4068 static int deleteIfVolatile(redisDb
*db
, robj
*key
) {
4071 /* No expire? return ASAP */
4072 if (dictSize(db
->expires
) == 0 ||
4073 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
4075 /* Delete the key */
4077 dictDelete(db
->expires
,key
);
4078 return dictDelete(db
->dict
,key
) == DICT_OK
;
4081 static void expireCommand(redisClient
*c
) {
4083 int seconds
= atoi(c
->argv
[2]->ptr
);
4085 de
= dictFind(c
->db
->dict
,c
->argv
[1]);
4087 addReply(c
,shared
.czero
);
4091 addReply(c
, shared
.czero
);
4094 time_t when
= time(NULL
)+seconds
;
4095 if (setExpire(c
->db
,c
->argv
[1],when
)) {
4096 addReply(c
,shared
.cone
);
4099 addReply(c
,shared
.czero
);
4105 static void ttlCommand(redisClient
*c
) {
4109 expire
= getExpire(c
->db
,c
->argv
[1]);
4111 ttl
= (int) (expire
-time(NULL
));
4112 if (ttl
< 0) ttl
= -1;
4114 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",ttl
));
4117 static void msetGenericCommand(redisClient
*c
, int nx
) {
4120 if ((c
->argc
% 2) == 0) {
4121 addReplySds(c
,sdsnew("-ERR wrong number of arguments\r\n"));
4124 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
4125 * set nothing at all if at least one already key exists. */
4127 for (j
= 1; j
< c
->argc
; j
+= 2) {
4128 if (dictFind(c
->db
->dict
,c
->argv
[j
]) != NULL
) {
4129 addReply(c
, shared
.czero
);
4135 for (j
= 1; j
< c
->argc
; j
+= 2) {
4136 dictAdd(c
->db
->dict
,c
->argv
[j
],c
->argv
[j
+1]);
4137 incrRefCount(c
->argv
[j
]);
4138 incrRefCount(c
->argv
[j
+1]);
4139 removeExpire(c
->db
,c
->argv
[j
]);
4141 server
.dirty
+= (c
->argc
-1)/2;
4142 addReply(c
, nx
? shared
.cone
: shared
.ok
);
4145 static void msetCommand(redisClient
*c
) {
4146 msetGenericCommand(c
,0);
4149 static void msetnxCommand(redisClient
*c
) {
4150 msetGenericCommand(c
,1);
4153 /* =============================== Replication ============================= */
4155 static int syncWrite(int fd
, char *ptr
, ssize_t size
, int timeout
) {
4156 ssize_t nwritten
, ret
= size
;
4157 time_t start
= time(NULL
);
4161 if (aeWait(fd
,AE_WRITABLE
,1000) & AE_WRITABLE
) {
4162 nwritten
= write(fd
,ptr
,size
);
4163 if (nwritten
== -1) return -1;
4167 if ((time(NULL
)-start
) > timeout
) {
4175 static int syncRead(int fd
, char *ptr
, ssize_t size
, int timeout
) {
4176 ssize_t nread
, totread
= 0;
4177 time_t start
= time(NULL
);
4181 if (aeWait(fd
,AE_READABLE
,1000) & AE_READABLE
) {
4182 nread
= read(fd
,ptr
,size
);
4183 if (nread
== -1) return -1;
4188 if ((time(NULL
)-start
) > timeout
) {
4196 static int syncReadLine(int fd
, char *ptr
, ssize_t size
, int timeout
) {
4203 if (syncRead(fd
,&c
,1,timeout
) == -1) return -1;
4206 if (nread
&& *(ptr
-1) == '\r') *(ptr
-1) = '\0';
4217 static void syncCommand(redisClient
*c
) {
4218 /* ignore SYNC if aleady slave or in monitor mode */
4219 if (c
->flags
& REDIS_SLAVE
) return;
4221 /* SYNC can't be issued when the server has pending data to send to
4222 * the client about already issued commands. We need a fresh reply
4223 * buffer registering the differences between the BGSAVE and the current
4224 * dataset, so that we can copy to other slaves if needed. */
4225 if (listLength(c
->reply
) != 0) {
4226 addReplySds(c
,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
4230 redisLog(REDIS_NOTICE
,"Slave ask for synchronization");
4231 /* Here we need to check if there is a background saving operation
4232 * in progress, or if it is required to start one */
4233 if (server
.bgsaveinprogress
) {
4234 /* Ok a background save is in progress. Let's check if it is a good
4235 * one for replication, i.e. if there is another slave that is
4236 * registering differences since the server forked to save */
4240 listRewind(server
.slaves
);
4241 while((ln
= listYield(server
.slaves
))) {
4243 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) break;
4246 /* Perfect, the server is already registering differences for
4247 * another slave. Set the right state, and copy the buffer. */
4248 listRelease(c
->reply
);
4249 c
->reply
= listDup(slave
->reply
);
4250 if (!c
->reply
) oom("listDup copying slave reply list");
4251 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
4252 redisLog(REDIS_NOTICE
,"Waiting for end of BGSAVE for SYNC");
4254 /* No way, we need to wait for the next BGSAVE in order to
4255 * register differences */
4256 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_START
;
4257 redisLog(REDIS_NOTICE
,"Waiting for next BGSAVE for SYNC");
4260 /* Ok we don't have a BGSAVE in progress, let's start one */
4261 redisLog(REDIS_NOTICE
,"Starting BGSAVE for SYNC");
4262 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
4263 redisLog(REDIS_NOTICE
,"Replication failed, can't BGSAVE");
4264 addReplySds(c
,sdsnew("-ERR Unalbe to perform background save\r\n"));
4267 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
4270 c
->flags
|= REDIS_SLAVE
;
4272 if (!listAddNodeTail(server
.slaves
,c
)) oom("listAddNodeTail");
4276 static void sendBulkToSlave(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
4277 redisClient
*slave
= privdata
;
4279 REDIS_NOTUSED(mask
);
4280 char buf
[REDIS_IOBUF_LEN
];
4281 ssize_t nwritten
, buflen
;
4283 if (slave
->repldboff
== 0) {
4284 /* Write the bulk write count before to transfer the DB. In theory here
4285 * we don't know how much room there is in the output buffer of the
4286 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
4287 * operations) will never be smaller than the few bytes we need. */
4290 bulkcount
= sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
4292 if (write(fd
,bulkcount
,sdslen(bulkcount
)) != (signed)sdslen(bulkcount
))
4300 lseek(slave
->repldbfd
,slave
->repldboff
,SEEK_SET
);
4301 buflen
= read(slave
->repldbfd
,buf
,REDIS_IOBUF_LEN
);
4303 redisLog(REDIS_WARNING
,"Read error sending DB to slave: %s",
4304 (buflen
== 0) ? "premature EOF" : strerror(errno
));
4308 if ((nwritten
= write(fd
,buf
,buflen
)) == -1) {
4309 redisLog(REDIS_DEBUG
,"Write error sending DB to slave: %s",
4314 slave
->repldboff
+= nwritten
;
4315 if (slave
->repldboff
== slave
->repldbsize
) {
4316 close(slave
->repldbfd
);
4317 slave
->repldbfd
= -1;
4318 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
4319 slave
->replstate
= REDIS_REPL_ONLINE
;
4320 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
,
4321 sendReplyToClient
, slave
, NULL
) == AE_ERR
) {
4325 addReplySds(slave
,sdsempty());
4326 redisLog(REDIS_NOTICE
,"Synchronization with slave succeeded");
4330 /* This function is called at the end of every backgrond saving.
4331 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
4332 * otherwise REDIS_ERR is passed to the function.
4334 * The goal of this function is to handle slaves waiting for a successful
4335 * background saving in order to perform non-blocking synchronization. */
4336 static void updateSlavesWaitingBgsave(int bgsaveerr
) {
4338 int startbgsave
= 0;
4340 listRewind(server
.slaves
);
4341 while((ln
= listYield(server
.slaves
))) {
4342 redisClient
*slave
= ln
->value
;
4344 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) {
4346 slave
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
4347 } else if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) {
4348 struct redis_stat buf
;
4350 if (bgsaveerr
!= REDIS_OK
) {
4352 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE child returned an error");
4355 if ((slave
->repldbfd
= open(server
.dbfilename
,O_RDONLY
)) == -1 ||
4356 redis_fstat(slave
->repldbfd
,&buf
) == -1) {
4358 redisLog(REDIS_WARNING
,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno
));
4361 slave
->repldboff
= 0;
4362 slave
->repldbsize
= buf
.st_size
;
4363 slave
->replstate
= REDIS_REPL_SEND_BULK
;
4364 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
4365 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
, sendBulkToSlave
, slave
, NULL
) == AE_ERR
) {
4372 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
4373 listRewind(server
.slaves
);
4374 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE failed");
4375 while((ln
= listYield(server
.slaves
))) {
4376 redisClient
*slave
= ln
->value
;
4378 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
)
4385 static int syncWithMaster(void) {
4386 char buf
[1024], tmpfile
[256];
4388 int fd
= anetTcpConnect(NULL
,server
.masterhost
,server
.masterport
);
4392 redisLog(REDIS_WARNING
,"Unable to connect to MASTER: %s",
4396 /* Issue the SYNC command */
4397 if (syncWrite(fd
,"SYNC \r\n",7,5) == -1) {
4399 redisLog(REDIS_WARNING
,"I/O error writing to MASTER: %s",
4403 /* Read the bulk write count */
4404 if (syncReadLine(fd
,buf
,1024,3600) == -1) {
4406 redisLog(REDIS_WARNING
,"I/O error reading bulk count from MASTER: %s",
4410 dumpsize
= atoi(buf
+1);
4411 redisLog(REDIS_NOTICE
,"Receiving %d bytes data dump from MASTER",dumpsize
);
4412 /* Read the bulk write data on a temp file */
4413 snprintf(tmpfile
,256,"temp-%d.%ld.rdb",(int)time(NULL
),(long int)random());
4414 dfd
= open(tmpfile
,O_CREAT
|O_WRONLY
,0644);
4417 redisLog(REDIS_WARNING
,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno
));
4421 int nread
, nwritten
;
4423 nread
= read(fd
,buf
,(dumpsize
< 1024)?dumpsize
:1024);
4425 redisLog(REDIS_WARNING
,"I/O error trying to sync with MASTER: %s",
4431 nwritten
= write(dfd
,buf
,nread
);
4432 if (nwritten
== -1) {
4433 redisLog(REDIS_WARNING
,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno
));
4441 if (rename(tmpfile
,server
.dbfilename
) == -1) {
4442 redisLog(REDIS_WARNING
,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno
));
4448 if (rdbLoad(server
.dbfilename
) != REDIS_OK
) {
4449 redisLog(REDIS_WARNING
,"Failed trying to load the MASTER synchronization DB from disk");
4453 server
.master
= createClient(fd
);
4454 server
.master
->flags
|= REDIS_MASTER
;
4455 server
.replstate
= REDIS_REPL_CONNECTED
;
4459 static void slaveofCommand(redisClient
*c
) {
4460 if (!strcasecmp(c
->argv
[1]->ptr
,"no") &&
4461 !strcasecmp(c
->argv
[2]->ptr
,"one")) {
4462 if (server
.masterhost
) {
4463 sdsfree(server
.masterhost
);
4464 server
.masterhost
= NULL
;
4465 if (server
.master
) freeClient(server
.master
);
4466 server
.replstate
= REDIS_REPL_NONE
;
4467 redisLog(REDIS_NOTICE
,"MASTER MODE enabled (user request)");
4470 sdsfree(server
.masterhost
);
4471 server
.masterhost
= sdsdup(c
->argv
[1]->ptr
);
4472 server
.masterport
= atoi(c
->argv
[2]->ptr
);
4473 if (server
.master
) freeClient(server
.master
);
4474 server
.replstate
= REDIS_REPL_CONNECT
;
4475 redisLog(REDIS_NOTICE
,"SLAVE OF %s:%d enabled (user request)",
4476 server
.masterhost
, server
.masterport
);
4478 addReply(c
,shared
.ok
);
4481 /* ============================ Maxmemory directive ======================== */
4483 /* This function gets called when 'maxmemory' is set on the config file to limit
4484 * the max memory used by the server, and we are out of memory.
4485 * This function will try to, in order:
4487 * - Free objects from the free list
4488 * - Try to remove keys with an EXPIRE set
4490 * It is not possible to free enough memory to reach used-memory < maxmemory
4491 * the server will start refusing commands that will enlarge even more the
4494 static void freeMemoryIfNeeded(void) {
4495 while (server
.maxmemory
&& zmalloc_used_memory() > server
.maxmemory
) {
4496 if (listLength(server
.objfreelist
)) {
4499 listNode
*head
= listFirst(server
.objfreelist
);
4500 o
= listNodeValue(head
);
4501 listDelNode(server
.objfreelist
,head
);
4504 int j
, k
, freed
= 0;
4506 for (j
= 0; j
< server
.dbnum
; j
++) {
4508 robj
*minkey
= NULL
;
4509 struct dictEntry
*de
;
4511 if (dictSize(server
.db
[j
].expires
)) {
4513 /* From a sample of three keys drop the one nearest to
4514 * the natural expire */
4515 for (k
= 0; k
< 3; k
++) {
4518 de
= dictGetRandomKey(server
.db
[j
].expires
);
4519 t
= (time_t) dictGetEntryVal(de
);
4520 if (minttl
== -1 || t
< minttl
) {
4521 minkey
= dictGetEntryKey(de
);
4525 deleteKey(server
.db
+j
,minkey
);
4528 if (!freed
) return; /* nothing to free... */
4533 /* ================================= Debugging ============================== */
4535 static void debugCommand(redisClient
*c
) {
4536 if (!strcasecmp(c
->argv
[1]->ptr
,"segfault")) {
4538 } else if (!strcasecmp(c
->argv
[1]->ptr
,"object") && c
->argc
== 3) {
4539 dictEntry
*de
= dictFind(c
->db
->dict
,c
->argv
[2]);
4543 addReply(c
,shared
.nokeyerr
);
4546 key
= dictGetEntryKey(de
);
4547 val
= dictGetEntryVal(de
);
4548 addReplySds(c
,sdscatprintf(sdsempty(),
4549 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
4550 key
, key
->refcount
, val
, val
->refcount
, val
->encoding
));
4552 addReplySds(c
,sdsnew(
4553 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>]\r\n"));
4557 #ifdef HAVE_BACKTRACE
4558 static struct redisFunctionSym symsTable
[] = {
4559 {"compareStringObjects", (unsigned long)compareStringObjects
},
4560 {"isStringRepresentableAsLong", (unsigned long)isStringRepresentableAsLong
},
4561 {"dictEncObjKeyCompare", (unsigned long)dictEncObjKeyCompare
},
4562 {"dictEncObjHash", (unsigned long)dictEncObjHash
},
4563 {"incrDecrCommand", (unsigned long)incrDecrCommand
},
4564 {"freeStringObject", (unsigned long)freeStringObject
},
4565 {"freeListObject", (unsigned long)freeListObject
},
4566 {"freeSetObject", (unsigned long)freeSetObject
},
4567 {"decrRefCount", (unsigned long)decrRefCount
},
4568 {"createObject", (unsigned long)createObject
},
4569 {"freeClient", (unsigned long)freeClient
},
4570 {"rdbLoad", (unsigned long)rdbLoad
},
4571 {"rdbSaveStringObject", (unsigned long)rdbSaveStringObject
},
4572 {"rdbSaveStringObjectRaw", (unsigned long)rdbSaveStringObjectRaw
},
4573 {"addReply", (unsigned long)addReply
},
4574 {"addReplySds", (unsigned long)addReplySds
},
4575 {"incrRefCount", (unsigned long)incrRefCount
},
4576 {"rdbSaveBackground", (unsigned long)rdbSaveBackground
},
4577 {"createStringObject", (unsigned long)createStringObject
},
4578 {"replicationFeedSlaves", (unsigned long)replicationFeedSlaves
},
4579 {"syncWithMaster", (unsigned long)syncWithMaster
},
4580 {"tryObjectSharing", (unsigned long)tryObjectSharing
},
4581 {"tryObjectEncoding", (unsigned long)tryObjectEncoding
},
4582 {"getDecodedObject", (unsigned long)getDecodedObject
},
4583 {"removeExpire", (unsigned long)removeExpire
},
4584 {"expireIfNeeded", (unsigned long)expireIfNeeded
},
4585 {"deleteIfVolatile", (unsigned long)deleteIfVolatile
},
4586 {"deleteKey", (unsigned long)deleteKey
},
4587 {"getExpire", (unsigned long)getExpire
},
4588 {"setExpire", (unsigned long)setExpire
},
4589 {"updateSlavesWaitingBgsave", (unsigned long)updateSlavesWaitingBgsave
},
4590 {"freeMemoryIfNeeded", (unsigned long)freeMemoryIfNeeded
},
4591 {"authCommand", (unsigned long)authCommand
},
4592 {"pingCommand", (unsigned long)pingCommand
},
4593 {"echoCommand", (unsigned long)echoCommand
},
4594 {"setCommand", (unsigned long)setCommand
},
4595 {"setnxCommand", (unsigned long)setnxCommand
},
4596 {"getCommand", (unsigned long)getCommand
},
4597 {"delCommand", (unsigned long)delCommand
},
4598 {"existsCommand", (unsigned long)existsCommand
},
4599 {"incrCommand", (unsigned long)incrCommand
},
4600 {"decrCommand", (unsigned long)decrCommand
},
4601 {"incrbyCommand", (unsigned long)incrbyCommand
},
4602 {"decrbyCommand", (unsigned long)decrbyCommand
},
4603 {"selectCommand", (unsigned long)selectCommand
},
4604 {"randomkeyCommand", (unsigned long)randomkeyCommand
},
4605 {"keysCommand", (unsigned long)keysCommand
},
4606 {"dbsizeCommand", (unsigned long)dbsizeCommand
},
4607 {"lastsaveCommand", (unsigned long)lastsaveCommand
},
4608 {"saveCommand", (unsigned long)saveCommand
},
4609 {"bgsaveCommand", (unsigned long)bgsaveCommand
},
4610 {"shutdownCommand", (unsigned long)shutdownCommand
},
4611 {"moveCommand", (unsigned long)moveCommand
},
4612 {"renameCommand", (unsigned long)renameCommand
},
4613 {"renamenxCommand", (unsigned long)renamenxCommand
},
4614 {"lpushCommand", (unsigned long)lpushCommand
},
4615 {"rpushCommand", (unsigned long)rpushCommand
},
4616 {"lpopCommand", (unsigned long)lpopCommand
},
4617 {"rpopCommand", (unsigned long)rpopCommand
},
4618 {"llenCommand", (unsigned long)llenCommand
},
4619 {"lindexCommand", (unsigned long)lindexCommand
},
4620 {"lrangeCommand", (unsigned long)lrangeCommand
},
4621 {"ltrimCommand", (unsigned long)ltrimCommand
},
4622 {"typeCommand", (unsigned long)typeCommand
},
4623 {"lsetCommand", (unsigned long)lsetCommand
},
4624 {"saddCommand", (unsigned long)saddCommand
},
4625 {"sremCommand", (unsigned long)sremCommand
},
4626 {"smoveCommand", (unsigned long)smoveCommand
},
4627 {"sismemberCommand", (unsigned long)sismemberCommand
},
4628 {"scardCommand", (unsigned long)scardCommand
},
4629 {"spopCommand", (unsigned long)spopCommand
},
4630 {"sinterCommand", (unsigned long)sinterCommand
},
4631 {"sinterstoreCommand", (unsigned long)sinterstoreCommand
},
4632 {"sunionCommand", (unsigned long)sunionCommand
},
4633 {"sunionstoreCommand", (unsigned long)sunionstoreCommand
},
4634 {"sdiffCommand", (unsigned long)sdiffCommand
},
4635 {"sdiffstoreCommand", (unsigned long)sdiffstoreCommand
},
4636 {"syncCommand", (unsigned long)syncCommand
},
4637 {"flushdbCommand", (unsigned long)flushdbCommand
},
4638 {"flushallCommand", (unsigned long)flushallCommand
},
4639 {"sortCommand", (unsigned long)sortCommand
},
4640 {"lremCommand", (unsigned long)lremCommand
},
4641 {"infoCommand", (unsigned long)infoCommand
},
4642 {"mgetCommand", (unsigned long)mgetCommand
},
4643 {"monitorCommand", (unsigned long)monitorCommand
},
4644 {"expireCommand", (unsigned long)expireCommand
},
4645 {"getsetCommand", (unsigned long)getsetCommand
},
4646 {"ttlCommand", (unsigned long)ttlCommand
},
4647 {"slaveofCommand", (unsigned long)slaveofCommand
},
4648 {"debugCommand", (unsigned long)debugCommand
},
4649 {"processCommand", (unsigned long)processCommand
},
4650 {"setupSigSegvAction", (unsigned long)setupSigSegvAction
},
4651 {"readQueryFromClient", (unsigned long)readQueryFromClient
},
4652 {"rdbRemoveTempFile", (unsigned long)rdbRemoveTempFile
},
4653 {"msetGenericCommand", (unsigned long)msetGenericCommand
},
4654 {"msetCommand", (unsigned long)msetCommand
},
4655 {"msetnxCommand", (unsigned long)msetnxCommand
},
4659 /* This function try to convert a pointer into a function name. It's used in
4660 * oreder to provide a backtrace under segmentation fault that's able to
4661 * display functions declared as static (otherwise the backtrace is useless). */
4662 static char *findFuncName(void *pointer
, unsigned long *offset
){
4664 unsigned long off
, minoff
= 0;
4666 /* Try to match against the Symbol with the smallest offset */
4667 for (i
=0; symsTable
[i
].pointer
; i
++) {
4668 unsigned long lp
= (unsigned long) pointer
;
4670 if (lp
!= (unsigned long)-1 && lp
>= symsTable
[i
].pointer
) {
4671 off
=lp
-symsTable
[i
].pointer
;
4672 if (ret
< 0 || off
< minoff
) {
4678 if (ret
== -1) return NULL
;
4680 return symsTable
[ret
].name
;
4683 static void *getMcontextEip(ucontext_t
*uc
) {
4684 #if defined(__FreeBSD__)
4685 return (void*) uc
->uc_mcontext
.mc_eip
;
4686 #elif defined(__dietlibc__)
4687 return (void*) uc
->uc_mcontext
.eip
;
4688 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
4689 return (void*) uc
->uc_mcontext
->__ss
.__eip
;
4690 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
4691 #ifdef _STRUCT_X86_THREAD_STATE64
4692 return (void*) uc
->uc_mcontext
->__ss
.__rip
;
4694 return (void*) uc
->uc_mcontext
->__ss
.__eip
;
4696 #elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
4697 return (void*) uc
->uc_mcontext
.gregs
[REG_EIP
];
4698 #elif defined(__ia64__) /* Linux IA64 */
4699 return (void*) uc
->uc_mcontext
.sc_ip
;
4705 static void segvHandler(int sig
, siginfo_t
*info
, void *secret
) {
4707 char **messages
= NULL
;
4708 int i
, trace_size
= 0;
4709 unsigned long offset
=0;
4710 time_t uptime
= time(NULL
)-server
.stat_starttime
;
4711 ucontext_t
*uc
= (ucontext_t
*) secret
;
4712 REDIS_NOTUSED(info
);
4714 redisLog(REDIS_WARNING
,
4715 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION
, sig
);
4716 redisLog(REDIS_WARNING
, "%s", sdscatprintf(sdsempty(),
4717 "redis_version:%s; "
4718 "uptime_in_seconds:%d; "
4719 "connected_clients:%d; "
4720 "connected_slaves:%d; "
4722 "changes_since_last_save:%lld; "
4723 "bgsave_in_progress:%d; "
4724 "last_save_time:%d; "
4725 "total_connections_received:%lld; "
4726 "total_commands_processed:%lld; "
4730 listLength(server
.clients
)-listLength(server
.slaves
),
4731 listLength(server
.slaves
),
4734 server
.bgsaveinprogress
,
4736 server
.stat_numconnections
,
4737 server
.stat_numcommands
,
4738 server
.masterhost
== NULL
? "master" : "slave"
4741 trace_size
= backtrace(trace
, 100);
4742 /* overwrite sigaction with caller's address */
4743 if (getMcontextEip(uc
) != NULL
) {
4744 trace
[1] = getMcontextEip(uc
);
4746 messages
= backtrace_symbols(trace
, trace_size
);
4748 for (i
=1; i
<trace_size
; ++i
) {
4749 char *fn
= findFuncName(trace
[i
], &offset
), *p
;
4751 p
= strchr(messages
[i
],'+');
4752 if (!fn
|| (p
&& ((unsigned long)strtol(p
+1,NULL
,10)) < offset
)) {
4753 redisLog(REDIS_WARNING
,"%s", messages
[i
]);
4755 redisLog(REDIS_WARNING
,"%d redis-server %p %s + %d", i
, trace
[i
], fn
, (unsigned int)offset
);
4762 static void setupSigSegvAction(void) {
4763 struct sigaction act
;
4765 sigemptyset (&act
.sa_mask
);
4766 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
4767 * is used. Otherwise, sa_handler is used */
4768 act
.sa_flags
= SA_NODEFER
| SA_ONSTACK
| SA_RESETHAND
| SA_SIGINFO
;
4769 act
.sa_sigaction
= segvHandler
;
4770 sigaction (SIGSEGV
, &act
, NULL
);
4771 sigaction (SIGBUS
, &act
, NULL
);
4772 sigaction (SIGFPE
, &act
, NULL
);
4773 sigaction (SIGILL
, &act
, NULL
);
4774 sigaction (SIGBUS
, &act
, NULL
);
4777 #else /* HAVE_BACKTRACE */
4778 static void setupSigSegvAction(void) {
4780 #endif /* HAVE_BACKTRACE */
4782 /* =================================== Main! ================================ */
4785 int linuxOvercommitMemoryValue(void) {
4786 FILE *fp
= fopen("/proc/sys/vm/overcommit_memory","r");
4790 if (fgets(buf
,64,fp
) == NULL
) {
4799 void linuxOvercommitMemoryWarning(void) {
4800 if (linuxOvercommitMemoryValue() == 0) {
4801 redisLog(REDIS_WARNING
,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
4804 #endif /* __linux__ */
4806 static void daemonize(void) {
4810 if (fork() != 0) exit(0); /* parent exits */
4811 setsid(); /* create a new session */
4813 /* Every output goes to /dev/null. If Redis is daemonized but
4814 * the 'logfile' is set to 'stdout' in the configuration file
4815 * it will not log at all. */
4816 if ((fd
= open("/dev/null", O_RDWR
, 0)) != -1) {
4817 dup2(fd
, STDIN_FILENO
);
4818 dup2(fd
, STDOUT_FILENO
);
4819 dup2(fd
, STDERR_FILENO
);
4820 if (fd
> STDERR_FILENO
) close(fd
);
4822 /* Try to write the pid file */
4823 fp
= fopen(server
.pidfile
,"w");
4825 fprintf(fp
,"%d\n",getpid());
4830 int main(int argc
, char **argv
) {
4833 ResetServerSaveParams();
4834 loadServerConfig(argv
[1]);
4835 } else if (argc
> 2) {
4836 fprintf(stderr
,"Usage: ./redis-server [/path/to/redis.conf]\n");
4839 redisLog(REDIS_WARNING
,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
4842 if (server
.daemonize
) daemonize();
4843 redisLog(REDIS_NOTICE
,"Server started, Redis version " REDIS_VERSION
);
4845 linuxOvercommitMemoryWarning();
4847 if (rdbLoad(server
.dbfilename
) == REDIS_OK
)
4848 redisLog(REDIS_NOTICE
,"DB loaded from disk");
4849 if (aeCreateFileEvent(server
.el
, server
.fd
, AE_READABLE
,
4850 acceptHandler
, NULL
, NULL
) == AE_ERR
) oom("creating file event");
4851 redisLog(REDIS_NOTICE
,"The server is now ready to accept connections on port %d", server
.port
);
4853 aeDeleteEventLoop(server
.el
);