2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
30 #define REDIS_VERSION "1.001"
40 #define __USE_POSIX199309
46 #endif /* HAVE_BACKTRACE */
54 #include <arpa/inet.h>
58 #include <sys/resource.h>
62 #include "ae.h" /* Event driven programming library */
63 #include "sds.h" /* Dynamic safe strings */
64 #include "anet.h" /* Networking the easy way */
65 #include "dict.h" /* Hash tables */
66 #include "adlist.h" /* Linked lists */
67 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
68 #include "lzf.h" /* LZF compression library */
69 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
75 /* Static server configuration */
76 #define REDIS_SERVERPORT 6379 /* TCP port */
77 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
78 #define REDIS_IOBUF_LEN 1024
79 #define REDIS_LOADBUF_LEN 1024
80 #define REDIS_STATIC_ARGS 4
81 #define REDIS_DEFAULT_DBNUM 16
82 #define REDIS_CONFIGLINE_MAX 1024
83 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
84 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
85 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
86 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
87 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
89 /* Hash table parameters */
90 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
93 #define REDIS_CMD_BULK 1 /* Bulk write command */
94 #define REDIS_CMD_INLINE 2 /* Inline command */
95 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
96 this flags will return an error when the 'maxmemory' option is set in the
97 config file and the server is using more than maxmemory bytes of memory.
98 In short this commands are denied on low memory conditions. */
99 #define REDIS_CMD_DENYOOM 4
102 #define REDIS_STRING 0
107 /* Objects encoding */
108 #define REDIS_ENCODING_RAW 0 /* Raw representation */
109 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
111 /* Object types only used for dumping to disk */
112 #define REDIS_EXPIRETIME 253
113 #define REDIS_SELECTDB 254
114 #define REDIS_EOF 255
116 /* Defines related to the dump file format. To store 32 bits lengths for short
117 * keys requires a lot of space, so we check the most significant 2 bits of
118 * the first byte to interpreter the length:
120 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
121 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
122 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
123 * 11|000000 this means: specially encoded object will follow. The six bits
124 * number specify the kind of object that follows.
125 * See the REDIS_RDB_ENC_* defines.
127 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
128 * values, will fit inside. */
129 #define REDIS_RDB_6BITLEN 0
130 #define REDIS_RDB_14BITLEN 1
131 #define REDIS_RDB_32BITLEN 2
132 #define REDIS_RDB_ENCVAL 3
133 #define REDIS_RDB_LENERR UINT_MAX
135 /* When a length of a string object stored on disk has the first two bits
136 * set, the remaining two bits specify a special encoding for the object
137 * accordingly to the following defines: */
138 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
139 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
140 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
141 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
144 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
145 #define REDIS_SLAVE 2 /* This client is a slave server */
146 #define REDIS_MASTER 4 /* This client is a master server */
147 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
149 /* Slave replication state - slave side */
150 #define REDIS_REPL_NONE 0 /* No active replication */
151 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
152 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
154 /* Slave replication state - from the point of view of master
155 * Note that in SEND_BULK and ONLINE state the slave receives new updates
156 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
157 * to start the next background saving in order to send updates to it. */
158 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
159 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
160 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
161 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
163 /* List related stuff */
167 /* Sort operations */
168 #define REDIS_SORT_GET 0
169 #define REDIS_SORT_DEL 1
170 #define REDIS_SORT_INCR 2
171 #define REDIS_SORT_DECR 3
172 #define REDIS_SORT_ASC 4
173 #define REDIS_SORT_DESC 5
174 #define REDIS_SORTKEY_MAX 1024
177 #define REDIS_DEBUG 0
178 #define REDIS_NOTICE 1
179 #define REDIS_WARNING 2
181 /* Anti-warning macro... */
182 #define REDIS_NOTUSED(V) ((void) V)
185 /*================================= Data types ============================== */
187 /* A redis object, that is a type able to hold a string / list / set */
188 typedef struct redisObject
{
191 unsigned char encoding
;
192 unsigned char notused
[2];
196 typedef struct redisDb
{
202 /* With multiplexing we need to take per-clinet state.
203 * Clients are taken in a liked list. */
204 typedef struct redisClient
{
209 robj
**argv
, **mbargv
;
211 int bulklen
; /* bulk read len. -1 if not in bulk read mode */
212 int multibulk
; /* multi bulk command format active */
215 time_t lastinteraction
; /* time of the last interaction, used for timeout */
216 int flags
; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
217 int slaveseldb
; /* slave selected db, if this client is a slave */
218 int authenticated
; /* when requirepass is non-NULL */
219 int replstate
; /* replication state if this is a slave */
220 int repldbfd
; /* replication DB file descriptor */
221 long repldboff
; /* replication DB file offset */
222 off_t repldbsize
; /* replication DB file size */
230 /* Global server state structure */
236 unsigned int sharingpoolsize
;
237 long long dirty
; /* changes to DB from the last save */
239 list
*slaves
, *monitors
;
240 char neterr
[ANET_ERR_LEN
];
242 int cronloops
; /* number of times the cron function run */
243 list
*objfreelist
; /* A list of freed objects to avoid malloc() */
244 time_t lastsave
; /* Unix time of last save succeeede */
245 size_t usedmemory
; /* Used memory in megabytes */
246 /* Fields used only for stats */
247 time_t stat_starttime
; /* server start time */
248 long long stat_numcommands
; /* number of processed commands */
249 long long stat_numconnections
; /* number of connections received */
257 int bgsaveinprogress
;
258 pid_t bgsavechildpid
;
259 struct saveparam
*saveparams
;
266 /* Replication related */
270 redisClient
*master
; /* client that is master for this slave */
272 unsigned int maxclients
;
273 unsigned long maxmemory
;
274 /* Sort parameters - qsort_r() is only available under BSD so we
275 * have to take this state global, in order to pass it to sortCompare() */
281 typedef void redisCommandProc(redisClient
*c
);
282 struct redisCommand
{
284 redisCommandProc
*proc
;
289 struct redisFunctionSym
{
291 unsigned long pointer
;
294 typedef struct _redisSortObject
{
302 typedef struct _redisSortOperation
{
305 } redisSortOperation
;
307 struct sharedObjectsStruct
{
308 robj
*crlf
, *ok
, *err
, *emptybulk
, *czero
, *cone
, *pong
, *space
,
309 *colon
, *nullbulk
, *nullmultibulk
,
310 *emptymultibulk
, *wrongtypeerr
, *nokeyerr
, *syntaxerr
, *sameobjecterr
,
311 *outofrangeerr
, *plus
,
312 *select0
, *select1
, *select2
, *select3
, *select4
,
313 *select5
, *select6
, *select7
, *select8
, *select9
;
316 /*================================ Prototypes =============================== */
318 static void freeStringObject(robj
*o
);
319 static void freeListObject(robj
*o
);
320 static void freeSetObject(robj
*o
);
321 static void decrRefCount(void *o
);
322 static robj
*createObject(int type
, void *ptr
);
323 static void freeClient(redisClient
*c
);
324 static int rdbLoad(char *filename
);
325 static void addReply(redisClient
*c
, robj
*obj
);
326 static void addReplySds(redisClient
*c
, sds s
);
327 static void incrRefCount(robj
*o
);
328 static int rdbSaveBackground(char *filename
);
329 static robj
*createStringObject(char *ptr
, size_t len
);
330 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
);
331 static int syncWithMaster(void);
332 static robj
*tryObjectSharing(robj
*o
);
333 static int tryObjectEncoding(robj
*o
);
334 static robj
*getDecodedObject(const robj
*o
);
335 static int removeExpire(redisDb
*db
, robj
*key
);
336 static int expireIfNeeded(redisDb
*db
, robj
*key
);
337 static int deleteIfVolatile(redisDb
*db
, robj
*key
);
338 static int deleteKey(redisDb
*db
, robj
*key
);
339 static time_t getExpire(redisDb
*db
, robj
*key
);
340 static int setExpire(redisDb
*db
, robj
*key
, time_t when
);
341 static void updateSlavesWaitingBgsave(int bgsaveerr
);
342 static void freeMemoryIfNeeded(void);
343 static int processCommand(redisClient
*c
);
344 static void setupSigSegvAction(void);
345 static void rdbRemoveTempFile(pid_t childpid
);
346 static size_t stringObjectLen(robj
*o
);
348 static void authCommand(redisClient
*c
);
349 static void pingCommand(redisClient
*c
);
350 static void echoCommand(redisClient
*c
);
351 static void setCommand(redisClient
*c
);
352 static void setnxCommand(redisClient
*c
);
353 static void getCommand(redisClient
*c
);
354 static void delCommand(redisClient
*c
);
355 static void existsCommand(redisClient
*c
);
356 static void incrCommand(redisClient
*c
);
357 static void decrCommand(redisClient
*c
);
358 static void incrbyCommand(redisClient
*c
);
359 static void decrbyCommand(redisClient
*c
);
360 static void selectCommand(redisClient
*c
);
361 static void randomkeyCommand(redisClient
*c
);
362 static void keysCommand(redisClient
*c
);
363 static void dbsizeCommand(redisClient
*c
);
364 static void lastsaveCommand(redisClient
*c
);
365 static void saveCommand(redisClient
*c
);
366 static void bgsaveCommand(redisClient
*c
);
367 static void shutdownCommand(redisClient
*c
);
368 static void moveCommand(redisClient
*c
);
369 static void renameCommand(redisClient
*c
);
370 static void renamenxCommand(redisClient
*c
);
371 static void lpushCommand(redisClient
*c
);
372 static void rpushCommand(redisClient
*c
);
373 static void lpopCommand(redisClient
*c
);
374 static void rpopCommand(redisClient
*c
);
375 static void llenCommand(redisClient
*c
);
376 static void lindexCommand(redisClient
*c
);
377 static void lrangeCommand(redisClient
*c
);
378 static void ltrimCommand(redisClient
*c
);
379 static void typeCommand(redisClient
*c
);
380 static void lsetCommand(redisClient
*c
);
381 static void saddCommand(redisClient
*c
);
382 static void sremCommand(redisClient
*c
);
383 static void smoveCommand(redisClient
*c
);
384 static void sismemberCommand(redisClient
*c
);
385 static void scardCommand(redisClient
*c
);
386 static void spopCommand(redisClient
*c
);
387 static void sinterCommand(redisClient
*c
);
388 static void sinterstoreCommand(redisClient
*c
);
389 static void sunionCommand(redisClient
*c
);
390 static void sunionstoreCommand(redisClient
*c
);
391 static void sdiffCommand(redisClient
*c
);
392 static void sdiffstoreCommand(redisClient
*c
);
393 static void syncCommand(redisClient
*c
);
394 static void flushdbCommand(redisClient
*c
);
395 static void flushallCommand(redisClient
*c
);
396 static void sortCommand(redisClient
*c
);
397 static void lremCommand(redisClient
*c
);
398 static void infoCommand(redisClient
*c
);
399 static void mgetCommand(redisClient
*c
);
400 static void monitorCommand(redisClient
*c
);
401 static void expireCommand(redisClient
*c
);
402 static void getsetCommand(redisClient
*c
);
403 static void ttlCommand(redisClient
*c
);
404 static void slaveofCommand(redisClient
*c
);
405 static void debugCommand(redisClient
*c
);
406 static void msetCommand(redisClient
*c
);
407 static void msetnxCommand(redisClient
*c
);
409 /*================================= Globals ================================= */
412 static struct redisServer server
; /* server global state */
413 static struct redisCommand cmdTable
[] = {
414 {"get",getCommand
,2,REDIS_CMD_INLINE
},
415 {"set",setCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
416 {"setnx",setnxCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
417 {"del",delCommand
,-2,REDIS_CMD_INLINE
},
418 {"exists",existsCommand
,2,REDIS_CMD_INLINE
},
419 {"incr",incrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
420 {"decr",decrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
421 {"mget",mgetCommand
,-2,REDIS_CMD_INLINE
},
422 {"rpush",rpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
423 {"lpush",lpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
424 {"rpop",rpopCommand
,2,REDIS_CMD_INLINE
},
425 {"lpop",lpopCommand
,2,REDIS_CMD_INLINE
},
426 {"llen",llenCommand
,2,REDIS_CMD_INLINE
},
427 {"lindex",lindexCommand
,3,REDIS_CMD_INLINE
},
428 {"lset",lsetCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
429 {"lrange",lrangeCommand
,4,REDIS_CMD_INLINE
},
430 {"ltrim",ltrimCommand
,4,REDIS_CMD_INLINE
},
431 {"lrem",lremCommand
,4,REDIS_CMD_BULK
},
432 {"sadd",saddCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
433 {"srem",sremCommand
,3,REDIS_CMD_BULK
},
434 {"smove",smoveCommand
,4,REDIS_CMD_BULK
},
435 {"sismember",sismemberCommand
,3,REDIS_CMD_BULK
},
436 {"scard",scardCommand
,2,REDIS_CMD_INLINE
},
437 {"spop",spopCommand
,2,REDIS_CMD_INLINE
},
438 {"sinter",sinterCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
439 {"sinterstore",sinterstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
440 {"sunion",sunionCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
441 {"sunionstore",sunionstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
442 {"sdiff",sdiffCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
443 {"sdiffstore",sdiffstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
444 {"smembers",sinterCommand
,2,REDIS_CMD_INLINE
},
445 {"incrby",incrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
446 {"decrby",decrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
447 {"getset",getsetCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
448 {"mset",msetCommand
,-3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
449 {"msetnx",msetnxCommand
,-3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
450 {"randomkey",randomkeyCommand
,1,REDIS_CMD_INLINE
},
451 {"select",selectCommand
,2,REDIS_CMD_INLINE
},
452 {"move",moveCommand
,3,REDIS_CMD_INLINE
},
453 {"rename",renameCommand
,3,REDIS_CMD_INLINE
},
454 {"renamenx",renamenxCommand
,3,REDIS_CMD_INLINE
},
455 {"expire",expireCommand
,3,REDIS_CMD_INLINE
},
456 {"keys",keysCommand
,2,REDIS_CMD_INLINE
},
457 {"dbsize",dbsizeCommand
,1,REDIS_CMD_INLINE
},
458 {"auth",authCommand
,2,REDIS_CMD_INLINE
},
459 {"ping",pingCommand
,1,REDIS_CMD_INLINE
},
460 {"echo",echoCommand
,2,REDIS_CMD_BULK
},
461 {"save",saveCommand
,1,REDIS_CMD_INLINE
},
462 {"bgsave",bgsaveCommand
,1,REDIS_CMD_INLINE
},
463 {"shutdown",shutdownCommand
,1,REDIS_CMD_INLINE
},
464 {"lastsave",lastsaveCommand
,1,REDIS_CMD_INLINE
},
465 {"type",typeCommand
,2,REDIS_CMD_INLINE
},
466 {"sync",syncCommand
,1,REDIS_CMD_INLINE
},
467 {"flushdb",flushdbCommand
,1,REDIS_CMD_INLINE
},
468 {"flushall",flushallCommand
,1,REDIS_CMD_INLINE
},
469 {"sort",sortCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
470 {"info",infoCommand
,1,REDIS_CMD_INLINE
},
471 {"monitor",monitorCommand
,1,REDIS_CMD_INLINE
},
472 {"ttl",ttlCommand
,2,REDIS_CMD_INLINE
},
473 {"slaveof",slaveofCommand
,3,REDIS_CMD_INLINE
},
474 {"debug",debugCommand
,-2,REDIS_CMD_INLINE
},
477 /*============================ Utility functions ============================ */
479 /* Glob-style pattern matching. */
480 int stringmatchlen(const char *pattern
, int patternLen
,
481 const char *string
, int stringLen
, int nocase
)
486 while (pattern
[1] == '*') {
491 return 1; /* match */
493 if (stringmatchlen(pattern
+1, patternLen
-1,
494 string
, stringLen
, nocase
))
495 return 1; /* match */
499 return 0; /* no match */
503 return 0; /* no match */
513 not = pattern
[0] == '^';
520 if (pattern
[0] == '\\') {
523 if (pattern
[0] == string
[0])
525 } else if (pattern
[0] == ']') {
527 } else if (patternLen
== 0) {
531 } else if (pattern
[1] == '-' && patternLen
>= 3) {
532 int start
= pattern
[0];
533 int end
= pattern
[2];
541 start
= tolower(start
);
547 if (c
>= start
&& c
<= end
)
551 if (pattern
[0] == string
[0])
554 if (tolower((int)pattern
[0]) == tolower((int)string
[0]))
564 return 0; /* no match */
570 if (patternLen
>= 2) {
577 if (pattern
[0] != string
[0])
578 return 0; /* no match */
580 if (tolower((int)pattern
[0]) != tolower((int)string
[0]))
581 return 0; /* no match */
589 if (stringLen
== 0) {
590 while(*pattern
== '*') {
597 if (patternLen
== 0 && stringLen
== 0)
602 static void redisLog(int level
, const char *fmt
, ...) {
606 fp
= (server
.logfile
== NULL
) ? stdout
: fopen(server
.logfile
,"a");
610 if (level
>= server
.verbosity
) {
616 strftime(buf
,64,"%d %b %H:%M:%S",gmtime(&now
));
617 fprintf(fp
,"%s %c ",buf
,c
[level
]);
618 vfprintf(fp
, fmt
, ap
);
624 if (server
.logfile
) fclose(fp
);
627 /*====================== Hash table type implementation ==================== */
629 /* This is an hash table type that uses the SDS dynamic strings libary as
630 * keys and radis objects as values (objects can hold SDS strings,
633 static int sdsDictKeyCompare(void *privdata
, const void *key1
,
637 DICT_NOTUSED(privdata
);
639 l1
= sdslen((sds
)key1
);
640 l2
= sdslen((sds
)key2
);
641 if (l1
!= l2
) return 0;
642 return memcmp(key1
, key2
, l1
) == 0;
645 static void dictRedisObjectDestructor(void *privdata
, void *val
)
647 DICT_NOTUSED(privdata
);
652 static int dictObjKeyCompare(void *privdata
, const void *key1
,
655 const robj
*o1
= key1
, *o2
= key2
;
656 return sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
659 static unsigned int dictObjHash(const void *key
) {
661 return dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
664 static int dictEncObjKeyCompare(void *privdata
, const void *key1
,
667 const robj
*o1
= key1
, *o2
= key2
;
669 if (o1
->encoding
== REDIS_ENCODING_RAW
&&
670 o2
->encoding
== REDIS_ENCODING_RAW
)
671 return sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
676 dec1
= o1
->encoding
!= REDIS_ENCODING_RAW
?
677 getDecodedObject(o1
) : (robj
*)o1
;
678 dec2
= o2
->encoding
!= REDIS_ENCODING_RAW
?
679 getDecodedObject(o2
) : (robj
*)o2
;
680 cmp
= sdsDictKeyCompare(privdata
,dec1
->ptr
,dec2
->ptr
);
681 if (dec1
!= o1
) decrRefCount(dec1
);
682 if (dec2
!= o2
) decrRefCount(dec2
);
687 static unsigned int dictEncObjHash(const void *key
) {
690 if (o
->encoding
== REDIS_ENCODING_RAW
)
691 return dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
693 robj
*dec
= getDecodedObject(o
);
694 unsigned int hash
= dictGenHashFunction(dec
->ptr
, sdslen((sds
)dec
->ptr
));
700 static dictType setDictType
= {
701 dictEncObjHash
, /* hash function */
704 dictEncObjKeyCompare
, /* key compare */
705 dictRedisObjectDestructor
, /* key destructor */
706 NULL
/* val destructor */
709 static dictType hashDictType
= {
710 dictObjHash
, /* hash function */
713 dictObjKeyCompare
, /* key compare */
714 dictRedisObjectDestructor
, /* key destructor */
715 dictRedisObjectDestructor
/* val destructor */
718 /* ========================= Random utility functions ======================= */
720 /* Redis generally does not try to recover from out of memory conditions
721 * when allocating objects or strings, it is not clear if it will be possible
722 * to report this condition to the client since the networking layer itself
723 * is based on heap allocation for send buffers, so we simply abort.
724 * At least the code will be simpler to read... */
725 static void oom(const char *msg
) {
726 fprintf(stderr
, "%s: Out of memory\n",msg
);
732 /* ====================== Redis server networking stuff ===================== */
733 static void closeTimedoutClients(void) {
736 time_t now
= time(NULL
);
738 listRewind(server
.clients
);
739 while ((ln
= listYield(server
.clients
)) != NULL
) {
740 c
= listNodeValue(ln
);
741 if (!(c
->flags
& REDIS_SLAVE
) && /* no timeout for slaves */
742 !(c
->flags
& REDIS_MASTER
) && /* no timeout for masters */
743 (now
- c
->lastinteraction
> server
.maxidletime
)) {
744 redisLog(REDIS_DEBUG
,"Closing idle client");
750 static int htNeedsResize(dict
*dict
) {
751 long long size
, used
;
753 size
= dictSlots(dict
);
754 used
= dictSize(dict
);
755 return (size
&& used
&& size
> DICT_HT_INITIAL_SIZE
&&
756 (used
*100/size
< REDIS_HT_MINFILL
));
759 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
760 * we resize the hash table to save memory */
761 static void tryResizeHashTables(void) {
764 for (j
= 0; j
< server
.dbnum
; j
++) {
765 if (htNeedsResize(server
.db
[j
].dict
)) {
766 redisLog(REDIS_DEBUG
,"The hash table %d is too sparse, resize it...",j
);
767 dictResize(server
.db
[j
].dict
);
768 redisLog(REDIS_DEBUG
,"Hash table %d resized.",j
);
770 if (htNeedsResize(server
.db
[j
].expires
))
771 dictResize(server
.db
[j
].expires
);
775 static int serverCron(struct aeEventLoop
*eventLoop
, long long id
, void *clientData
) {
776 int j
, loops
= server
.cronloops
++;
777 REDIS_NOTUSED(eventLoop
);
779 REDIS_NOTUSED(clientData
);
781 /* Update the global state with the amount of used memory */
782 server
.usedmemory
= zmalloc_used_memory();
784 /* Show some info about non-empty databases */
785 for (j
= 0; j
< server
.dbnum
; j
++) {
786 long long size
, used
, vkeys
;
788 size
= dictSlots(server
.db
[j
].dict
);
789 used
= dictSize(server
.db
[j
].dict
);
790 vkeys
= dictSize(server
.db
[j
].expires
);
791 if (!(loops
% 5) && (used
|| vkeys
)) {
792 redisLog(REDIS_DEBUG
,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j
,used
,vkeys
,size
);
793 /* dictPrintStats(server.dict); */
797 /* We don't want to resize the hash tables while a bacground saving
798 * is in progress: the saving child is created using fork() that is
799 * implemented with a copy-on-write semantic in most modern systems, so
800 * if we resize the HT while there is the saving child at work actually
801 * a lot of memory movements in the parent will cause a lot of pages
803 if (!server
.bgsaveinprogress
) tryResizeHashTables();
805 /* Show information about connected clients */
807 redisLog(REDIS_DEBUG
,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
808 listLength(server
.clients
)-listLength(server
.slaves
),
809 listLength(server
.slaves
),
811 dictSize(server
.sharingpool
));
814 /* Close connections of timedout clients */
815 if (server
.maxidletime
&& !(loops
% 10))
816 closeTimedoutClients();
818 /* Check if a background saving in progress terminated */
819 if (server
.bgsaveinprogress
) {
821 if (wait4(-1,&statloc
,WNOHANG
,NULL
)) {
822 int exitcode
= WEXITSTATUS(statloc
);
823 int bysignal
= WIFSIGNALED(statloc
);
825 if (!bysignal
&& exitcode
== 0) {
826 redisLog(REDIS_NOTICE
,
827 "Background saving terminated with success");
829 server
.lastsave
= time(NULL
);
830 } else if (!bysignal
&& exitcode
!= 0) {
831 redisLog(REDIS_WARNING
, "Background saving error");
833 redisLog(REDIS_WARNING
,
834 "Background saving terminated by signal");
835 rdbRemoveTempFile(server
.bgsavechildpid
);
837 server
.bgsaveinprogress
= 0;
838 server
.bgsavechildpid
= -1;
839 updateSlavesWaitingBgsave(exitcode
== 0 ? REDIS_OK
: REDIS_ERR
);
842 /* If there is not a background saving in progress check if
843 * we have to save now */
844 time_t now
= time(NULL
);
845 for (j
= 0; j
< server
.saveparamslen
; j
++) {
846 struct saveparam
*sp
= server
.saveparams
+j
;
848 if (server
.dirty
>= sp
->changes
&&
849 now
-server
.lastsave
> sp
->seconds
) {
850 redisLog(REDIS_NOTICE
,"%d changes in %d seconds. Saving...",
851 sp
->changes
, sp
->seconds
);
852 rdbSaveBackground(server
.dbfilename
);
858 /* Try to expire a few timed out keys */
859 for (j
= 0; j
< server
.dbnum
; j
++) {
860 redisDb
*db
= server
.db
+j
;
861 int num
= dictSize(db
->expires
);
864 time_t now
= time(NULL
);
866 if (num
> REDIS_EXPIRELOOKUPS_PER_CRON
)
867 num
= REDIS_EXPIRELOOKUPS_PER_CRON
;
872 if ((de
= dictGetRandomKey(db
->expires
)) == NULL
) break;
873 t
= (time_t) dictGetEntryVal(de
);
875 deleteKey(db
,dictGetEntryKey(de
));
881 /* Check if we should connect to a MASTER */
882 if (server
.replstate
== REDIS_REPL_CONNECT
) {
883 redisLog(REDIS_NOTICE
,"Connecting to MASTER...");
884 if (syncWithMaster() == REDIS_OK
) {
885 redisLog(REDIS_NOTICE
,"MASTER <-> SLAVE sync succeeded");
891 static void createSharedObjects(void) {
892 shared
.crlf
= createObject(REDIS_STRING
,sdsnew("\r\n"));
893 shared
.ok
= createObject(REDIS_STRING
,sdsnew("+OK\r\n"));
894 shared
.err
= createObject(REDIS_STRING
,sdsnew("-ERR\r\n"));
895 shared
.emptybulk
= createObject(REDIS_STRING
,sdsnew("$0\r\n\r\n"));
896 shared
.czero
= createObject(REDIS_STRING
,sdsnew(":0\r\n"));
897 shared
.cone
= createObject(REDIS_STRING
,sdsnew(":1\r\n"));
898 shared
.nullbulk
= createObject(REDIS_STRING
,sdsnew("$-1\r\n"));
899 shared
.nullmultibulk
= createObject(REDIS_STRING
,sdsnew("*-1\r\n"));
900 shared
.emptymultibulk
= createObject(REDIS_STRING
,sdsnew("*0\r\n"));
902 shared
.pong
= createObject(REDIS_STRING
,sdsnew("+PONG\r\n"));
903 shared
.wrongtypeerr
= createObject(REDIS_STRING
,sdsnew(
904 "-ERR Operation against a key holding the wrong kind of value\r\n"));
905 shared
.nokeyerr
= createObject(REDIS_STRING
,sdsnew(
906 "-ERR no such key\r\n"));
907 shared
.syntaxerr
= createObject(REDIS_STRING
,sdsnew(
908 "-ERR syntax error\r\n"));
909 shared
.sameobjecterr
= createObject(REDIS_STRING
,sdsnew(
910 "-ERR source and destination objects are the same\r\n"));
911 shared
.outofrangeerr
= createObject(REDIS_STRING
,sdsnew(
912 "-ERR index out of range\r\n"));
913 shared
.space
= createObject(REDIS_STRING
,sdsnew(" "));
914 shared
.colon
= createObject(REDIS_STRING
,sdsnew(":"));
915 shared
.plus
= createObject(REDIS_STRING
,sdsnew("+"));
916 shared
.select0
= createStringObject("select 0\r\n",10);
917 shared
.select1
= createStringObject("select 1\r\n",10);
918 shared
.select2
= createStringObject("select 2\r\n",10);
919 shared
.select3
= createStringObject("select 3\r\n",10);
920 shared
.select4
= createStringObject("select 4\r\n",10);
921 shared
.select5
= createStringObject("select 5\r\n",10);
922 shared
.select6
= createStringObject("select 6\r\n",10);
923 shared
.select7
= createStringObject("select 7\r\n",10);
924 shared
.select8
= createStringObject("select 8\r\n",10);
925 shared
.select9
= createStringObject("select 9\r\n",10);
928 static void appendServerSaveParams(time_t seconds
, int changes
) {
929 server
.saveparams
= zrealloc(server
.saveparams
,sizeof(struct saveparam
)*(server
.saveparamslen
+1));
930 if (server
.saveparams
== NULL
) oom("appendServerSaveParams");
931 server
.saveparams
[server
.saveparamslen
].seconds
= seconds
;
932 server
.saveparams
[server
.saveparamslen
].changes
= changes
;
933 server
.saveparamslen
++;
936 static void ResetServerSaveParams() {
937 zfree(server
.saveparams
);
938 server
.saveparams
= NULL
;
939 server
.saveparamslen
= 0;
942 static void initServerConfig() {
943 server
.dbnum
= REDIS_DEFAULT_DBNUM
;
944 server
.port
= REDIS_SERVERPORT
;
945 server
.verbosity
= REDIS_DEBUG
;
946 server
.maxidletime
= REDIS_MAXIDLETIME
;
947 server
.saveparams
= NULL
;
948 server
.logfile
= NULL
; /* NULL = log on standard output */
949 server
.bindaddr
= NULL
;
950 server
.glueoutputbuf
= 1;
951 server
.daemonize
= 0;
952 server
.pidfile
= "/var/run/redis.pid";
953 server
.dbfilename
= "dump.rdb";
954 server
.requirepass
= NULL
;
955 server
.shareobjects
= 0;
956 server
.sharingpoolsize
= 1024;
957 server
.maxclients
= 0;
958 server
.maxmemory
= 0;
959 ResetServerSaveParams();
961 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
962 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
963 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
964 /* Replication related */
966 server
.masterhost
= NULL
;
967 server
.masterport
= 6379;
968 server
.master
= NULL
;
969 server
.replstate
= REDIS_REPL_NONE
;
972 static void initServer() {
975 signal(SIGHUP
, SIG_IGN
);
976 signal(SIGPIPE
, SIG_IGN
);
977 setupSigSegvAction();
979 server
.clients
= listCreate();
980 server
.slaves
= listCreate();
981 server
.monitors
= listCreate();
982 server
.objfreelist
= listCreate();
983 createSharedObjects();
984 server
.el
= aeCreateEventLoop();
985 server
.db
= zmalloc(sizeof(redisDb
)*server
.dbnum
);
986 server
.sharingpool
= dictCreate(&setDictType
,NULL
);
987 if (!server
.db
|| !server
.clients
|| !server
.slaves
|| !server
.monitors
|| !server
.el
|| !server
.objfreelist
)
988 oom("server initialization"); /* Fatal OOM */
989 server
.fd
= anetTcpServer(server
.neterr
, server
.port
, server
.bindaddr
);
990 if (server
.fd
== -1) {
991 redisLog(REDIS_WARNING
, "Opening TCP port: %s", server
.neterr
);
994 for (j
= 0; j
< server
.dbnum
; j
++) {
995 server
.db
[j
].dict
= dictCreate(&hashDictType
,NULL
);
996 server
.db
[j
].expires
= dictCreate(&setDictType
,NULL
);
999 server
.cronloops
= 0;
1000 server
.bgsaveinprogress
= 0;
1001 server
.bgsavechildpid
= -1;
1002 server
.lastsave
= time(NULL
);
1004 server
.usedmemory
= 0;
1005 server
.stat_numcommands
= 0;
1006 server
.stat_numconnections
= 0;
1007 server
.stat_starttime
= time(NULL
);
1008 aeCreateTimeEvent(server
.el
, 1000, serverCron
, NULL
, NULL
);
1011 /* Empty the whole database */
1012 static long long emptyDb() {
1014 long long removed
= 0;
1016 for (j
= 0; j
< server
.dbnum
; j
++) {
1017 removed
+= dictSize(server
.db
[j
].dict
);
1018 dictEmpty(server
.db
[j
].dict
);
1019 dictEmpty(server
.db
[j
].expires
);
1024 static int yesnotoi(char *s
) {
1025 if (!strcasecmp(s
,"yes")) return 1;
1026 else if (!strcasecmp(s
,"no")) return 0;
1030 /* I agree, this is a very rudimental way to load a configuration...
1031 will improve later if the config gets more complex */
1032 static void loadServerConfig(char *filename
) {
1034 char buf
[REDIS_CONFIGLINE_MAX
+1], *err
= NULL
;
1038 if (filename
[0] == '-' && filename
[1] == '\0')
1041 if ((fp
= fopen(filename
,"r")) == NULL
) {
1042 redisLog(REDIS_WARNING
,"Fatal error, can't open config file");
1047 while(fgets(buf
,REDIS_CONFIGLINE_MAX
+1,fp
) != NULL
) {
1053 line
= sdstrim(line
," \t\r\n");
1055 /* Skip comments and blank lines*/
1056 if (line
[0] == '#' || line
[0] == '\0') {
1061 /* Split into arguments */
1062 argv
= sdssplitlen(line
,sdslen(line
)," ",1,&argc
);
1063 sdstolower(argv
[0]);
1065 /* Execute config directives */
1066 if (!strcasecmp(argv
[0],"timeout") && argc
== 2) {
1067 server
.maxidletime
= atoi(argv
[1]);
1068 if (server
.maxidletime
< 0) {
1069 err
= "Invalid timeout value"; goto loaderr
;
1071 } else if (!strcasecmp(argv
[0],"port") && argc
== 2) {
1072 server
.port
= atoi(argv
[1]);
1073 if (server
.port
< 1 || server
.port
> 65535) {
1074 err
= "Invalid port"; goto loaderr
;
1076 } else if (!strcasecmp(argv
[0],"bind") && argc
== 2) {
1077 server
.bindaddr
= zstrdup(argv
[1]);
1078 } else if (!strcasecmp(argv
[0],"save") && argc
== 3) {
1079 int seconds
= atoi(argv
[1]);
1080 int changes
= atoi(argv
[2]);
1081 if (seconds
< 1 || changes
< 0) {
1082 err
= "Invalid save parameters"; goto loaderr
;
1084 appendServerSaveParams(seconds
,changes
);
1085 } else if (!strcasecmp(argv
[0],"dir") && argc
== 2) {
1086 if (chdir(argv
[1]) == -1) {
1087 redisLog(REDIS_WARNING
,"Can't chdir to '%s': %s",
1088 argv
[1], strerror(errno
));
1091 } else if (!strcasecmp(argv
[0],"loglevel") && argc
== 2) {
1092 if (!strcasecmp(argv
[1],"debug")) server
.verbosity
= REDIS_DEBUG
;
1093 else if (!strcasecmp(argv
[1],"notice")) server
.verbosity
= REDIS_NOTICE
;
1094 else if (!strcasecmp(argv
[1],"warning")) server
.verbosity
= REDIS_WARNING
;
1096 err
= "Invalid log level. Must be one of debug, notice, warning";
1099 } else if (!strcasecmp(argv
[0],"logfile") && argc
== 2) {
1102 server
.logfile
= zstrdup(argv
[1]);
1103 if (!strcasecmp(server
.logfile
,"stdout")) {
1104 zfree(server
.logfile
);
1105 server
.logfile
= NULL
;
1107 if (server
.logfile
) {
1108 /* Test if we are able to open the file. The server will not
1109 * be able to abort just for this problem later... */
1110 logfp
= fopen(server
.logfile
,"a");
1111 if (logfp
== NULL
) {
1112 err
= sdscatprintf(sdsempty(),
1113 "Can't open the log file: %s", strerror(errno
));
1118 } else if (!strcasecmp(argv
[0],"databases") && argc
== 2) {
1119 server
.dbnum
= atoi(argv
[1]);
1120 if (server
.dbnum
< 1) {
1121 err
= "Invalid number of databases"; goto loaderr
;
1123 } else if (!strcasecmp(argv
[0],"maxclients") && argc
== 2) {
1124 server
.maxclients
= atoi(argv
[1]);
1125 } else if (!strcasecmp(argv
[0],"maxmemory") && argc
== 2) {
1126 server
.maxmemory
= strtoll(argv
[1], NULL
, 10);
1127 } else if (!strcasecmp(argv
[0],"slaveof") && argc
== 3) {
1128 server
.masterhost
= sdsnew(argv
[1]);
1129 server
.masterport
= atoi(argv
[2]);
1130 server
.replstate
= REDIS_REPL_CONNECT
;
1131 } else if (!strcasecmp(argv
[0],"glueoutputbuf") && argc
== 2) {
1132 if ((server
.glueoutputbuf
= yesnotoi(argv
[1])) == -1) {
1133 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1135 } else if (!strcasecmp(argv
[0],"shareobjects") && argc
== 2) {
1136 if ((server
.shareobjects
= yesnotoi(argv
[1])) == -1) {
1137 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1139 } else if (!strcasecmp(argv
[0],"shareobjectspoolsize") && argc
== 2) {
1140 server
.sharingpoolsize
= atoi(argv
[1]);
1141 if (server
.sharingpoolsize
< 1) {
1142 err
= "invalid object sharing pool size"; goto loaderr
;
1144 } else if (!strcasecmp(argv
[0],"daemonize") && argc
== 2) {
1145 if ((server
.daemonize
= yesnotoi(argv
[1])) == -1) {
1146 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1148 } else if (!strcasecmp(argv
[0],"requirepass") && argc
== 2) {
1149 server
.requirepass
= zstrdup(argv
[1]);
1150 } else if (!strcasecmp(argv
[0],"pidfile") && argc
== 2) {
1151 server
.pidfile
= zstrdup(argv
[1]);
1152 } else if (!strcasecmp(argv
[0],"dbfilename") && argc
== 2) {
1153 server
.dbfilename
= zstrdup(argv
[1]);
1155 err
= "Bad directive or wrong number of arguments"; goto loaderr
;
1157 for (j
= 0; j
< argc
; j
++)
1162 if (fp
!= stdin
) fclose(fp
);
1166 fprintf(stderr
, "\n*** FATAL CONFIG FILE ERROR ***\n");
1167 fprintf(stderr
, "Reading the configuration file, at line %d\n", linenum
);
1168 fprintf(stderr
, ">>> '%s'\n", line
);
1169 fprintf(stderr
, "%s\n", err
);
1173 static void freeClientArgv(redisClient
*c
) {
1176 for (j
= 0; j
< c
->argc
; j
++)
1177 decrRefCount(c
->argv
[j
]);
1178 for (j
= 0; j
< c
->mbargc
; j
++)
1179 decrRefCount(c
->mbargv
[j
]);
1184 static void freeClient(redisClient
*c
) {
1187 aeDeleteFileEvent(server
.el
,c
->fd
,AE_READABLE
);
1188 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1189 sdsfree(c
->querybuf
);
1190 listRelease(c
->reply
);
1193 ln
= listSearchKey(server
.clients
,c
);
1195 listDelNode(server
.clients
,ln
);
1196 if (c
->flags
& REDIS_SLAVE
) {
1197 if (c
->replstate
== REDIS_REPL_SEND_BULK
&& c
->repldbfd
!= -1)
1199 list
*l
= (c
->flags
& REDIS_MONITOR
) ? server
.monitors
: server
.slaves
;
1200 ln
= listSearchKey(l
,c
);
1204 if (c
->flags
& REDIS_MASTER
) {
1205 server
.master
= NULL
;
1206 server
.replstate
= REDIS_REPL_CONNECT
;
1213 static void glueReplyBuffersIfNeeded(redisClient
*c
) {
1218 listRewind(c
->reply
);
1219 while((ln
= listYield(c
->reply
))) {
1221 totlen
+= sdslen(o
->ptr
);
1222 /* This optimization makes more sense if we don't have to copy
1224 if (totlen
> 1024) return;
1230 listRewind(c
->reply
);
1231 while((ln
= listYield(c
->reply
))) {
1233 memcpy(buf
+copylen
,o
->ptr
,sdslen(o
->ptr
));
1234 copylen
+= sdslen(o
->ptr
);
1235 listDelNode(c
->reply
,ln
);
1237 /* Now the output buffer is empty, add the new single element */
1238 o
= createObject(REDIS_STRING
,sdsnewlen(buf
,totlen
));
1239 if (!listAddNodeTail(c
->reply
,o
)) oom("listAddNodeTail");
1243 static void sendReplyToClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1244 redisClient
*c
= privdata
;
1245 int nwritten
= 0, totwritten
= 0, objlen
;
1248 REDIS_NOTUSED(mask
);
1250 if (server
.glueoutputbuf
&& listLength(c
->reply
) > 1)
1251 glueReplyBuffersIfNeeded(c
);
1252 while(listLength(c
->reply
)) {
1253 o
= listNodeValue(listFirst(c
->reply
));
1254 objlen
= sdslen(o
->ptr
);
1257 listDelNode(c
->reply
,listFirst(c
->reply
));
1261 if (c
->flags
& REDIS_MASTER
) {
1262 /* Don't reply to a master */
1263 nwritten
= objlen
- c
->sentlen
;
1265 nwritten
= write(fd
, ((char*)o
->ptr
)+c
->sentlen
, objlen
- c
->sentlen
);
1266 if (nwritten
<= 0) break;
1268 c
->sentlen
+= nwritten
;
1269 totwritten
+= nwritten
;
1270 /* If we fully sent the object on head go to the next one */
1271 if (c
->sentlen
== objlen
) {
1272 listDelNode(c
->reply
,listFirst(c
->reply
));
1275 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1276 * bytes, in a single threaded server it's a good idea to server
1277 * other clients as well, even if a very large request comes from
1278 * super fast link that is always able to accept data (in real world
1279 * terms think to 'KEYS *' against the loopback interfae) */
1280 if (totwritten
> REDIS_MAX_WRITE_PER_EVENT
) break;
1282 if (nwritten
== -1) {
1283 if (errno
== EAGAIN
) {
1286 redisLog(REDIS_DEBUG
,
1287 "Error writing to client: %s", strerror(errno
));
1292 if (totwritten
> 0) c
->lastinteraction
= time(NULL
);
1293 if (listLength(c
->reply
) == 0) {
1295 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1299 static struct redisCommand
*lookupCommand(char *name
) {
1301 while(cmdTable
[j
].name
!= NULL
) {
1302 if (!strcasecmp(name
,cmdTable
[j
].name
)) return &cmdTable
[j
];
1308 /* resetClient prepare the client to process the next command */
1309 static void resetClient(redisClient
*c
) {
1315 /* If this function gets called we already read a whole
1316 * command, argments are in the client argv/argc fields.
1317 * processCommand() execute the command or prepare the
1318 * server for a bulk read from the client.
1320 * If 1 is returned the client is still alive and valid and
1321 * and other operations can be performed by the caller. Otherwise
1322 * if 0 is returned the client was destroied (i.e. after QUIT). */
1323 static int processCommand(redisClient
*c
) {
1324 struct redisCommand
*cmd
;
1327 /* Free some memory if needed (maxmemory setting) */
1328 if (server
.maxmemory
) freeMemoryIfNeeded();
1330 /* Handle the multi bulk command type. This is an alternative protocol
1331 * supported by Redis in order to receive commands that are composed of
1332 * multiple binary-safe "bulk" arguments. The latency of processing is
1333 * a bit higher but this allows things like multi-sets, so if this
1334 * protocol is used only for MSET and similar commands this is a big win. */
1335 if (c
->multibulk
== 0 && c
->argc
== 1 && ((char*)(c
->argv
[0]->ptr
))[0] == '*') {
1336 c
->multibulk
= atoi(((char*)c
->argv
[0]->ptr
)+1);
1337 if (c
->multibulk
<= 0) {
1341 decrRefCount(c
->argv
[c
->argc
-1]);
1345 } else if (c
->multibulk
) {
1346 if (c
->bulklen
== -1) {
1347 if (((char*)c
->argv
[0]->ptr
)[0] != '$') {
1348 addReplySds(c
,sdsnew("-ERR multi bulk protocol error\r\n"));
1352 int bulklen
= atoi(((char*)c
->argv
[0]->ptr
)+1);
1353 decrRefCount(c
->argv
[0]);
1354 if (bulklen
< 0 || bulklen
> 1024*1024*1024) {
1356 addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n"));
1361 c
->bulklen
= bulklen
+2; /* add two bytes for CR+LF */
1365 c
->mbargv
= zrealloc(c
->mbargv
,(sizeof(robj
*))*(c
->mbargc
+1));
1366 c
->mbargv
[c
->mbargc
] = c
->argv
[0];
1370 if (c
->multibulk
== 0) {
1374 /* Here we need to swap the multi-bulk argc/argv with the
1375 * normal argc/argv of the client structure. */
1377 c
->argv
= c
->mbargv
;
1378 c
->mbargv
= auxargv
;
1381 c
->argc
= c
->mbargc
;
1382 c
->mbargc
= auxargc
;
1384 /* We need to set bulklen to something different than -1
1385 * in order for the code below to process the command without
1386 * to try to read the last argument of a bulk command as
1387 * a special argument. */
1389 /* continue below and process the command */
1396 /* -- end of multi bulk commands processing -- */
1398 /* The QUIT command is handled as a special case. Normal command
1399 * procs are unable to close the client connection safely */
1400 if (!strcasecmp(c
->argv
[0]->ptr
,"quit")) {
1404 cmd
= lookupCommand(c
->argv
[0]->ptr
);
1406 addReplySds(c
,sdsnew("-ERR unknown command\r\n"));
1409 } else if ((cmd
->arity
> 0 && cmd
->arity
!= c
->argc
) ||
1410 (c
->argc
< -cmd
->arity
)) {
1411 addReplySds(c
,sdsnew("-ERR wrong number of arguments\r\n"));
1414 } else if (server
.maxmemory
&& cmd
->flags
& REDIS_CMD_DENYOOM
&& zmalloc_used_memory() > server
.maxmemory
) {
1415 addReplySds(c
,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1418 } else if (cmd
->flags
& REDIS_CMD_BULK
&& c
->bulklen
== -1) {
1419 int bulklen
= atoi(c
->argv
[c
->argc
-1]->ptr
);
1421 decrRefCount(c
->argv
[c
->argc
-1]);
1422 if (bulklen
< 0 || bulklen
> 1024*1024*1024) {
1424 addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n"));
1429 c
->bulklen
= bulklen
+2; /* add two bytes for CR+LF */
1430 /* It is possible that the bulk read is already in the
1431 * buffer. Check this condition and handle it accordingly */
1432 if ((signed)sdslen(c
->querybuf
) >= c
->bulklen
) {
1433 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
1435 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
1440 /* Let's try to share objects on the command arguments vector */
1441 if (server
.shareobjects
) {
1443 for(j
= 1; j
< c
->argc
; j
++)
1444 c
->argv
[j
] = tryObjectSharing(c
->argv
[j
]);
1446 /* Let's try to encode the bulk object to save space. */
1447 if (cmd
->flags
& REDIS_CMD_BULK
)
1448 tryObjectEncoding(c
->argv
[c
->argc
-1]);
1450 /* Check if the user is authenticated */
1451 if (server
.requirepass
&& !c
->authenticated
&& cmd
->proc
!= authCommand
) {
1452 addReplySds(c
,sdsnew("-ERR operation not permitted\r\n"));
1457 /* Exec the command */
1458 dirty
= server
.dirty
;
1460 if (server
.dirty
-dirty
!= 0 && listLength(server
.slaves
))
1461 replicationFeedSlaves(server
.slaves
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1462 if (listLength(server
.monitors
))
1463 replicationFeedSlaves(server
.monitors
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1464 server
.stat_numcommands
++;
1466 /* Prepare the client for the next command */
1467 if (c
->flags
& REDIS_CLOSE
) {
1475 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
) {
1479 /* (args*2)+1 is enough room for args, spaces, newlines */
1480 robj
*static_outv
[REDIS_STATIC_ARGS
*2+1];
1482 if (argc
<= REDIS_STATIC_ARGS
) {
1485 outv
= zmalloc(sizeof(robj
*)*(argc
*2+1));
1486 if (!outv
) oom("replicationFeedSlaves");
1489 for (j
= 0; j
< argc
; j
++) {
1490 if (j
!= 0) outv
[outc
++] = shared
.space
;
1491 if ((cmd
->flags
& REDIS_CMD_BULK
) && j
== argc
-1) {
1494 lenobj
= createObject(REDIS_STRING
,
1495 sdscatprintf(sdsempty(),"%d\r\n",
1496 stringObjectLen(argv
[j
])));
1497 lenobj
->refcount
= 0;
1498 outv
[outc
++] = lenobj
;
1500 outv
[outc
++] = argv
[j
];
1502 outv
[outc
++] = shared
.crlf
;
1504 /* Increment all the refcounts at start and decrement at end in order to
1505 * be sure to free objects if there is no slave in a replication state
1506 * able to be feed with commands */
1507 for (j
= 0; j
< outc
; j
++) incrRefCount(outv
[j
]);
1509 while((ln
= listYield(slaves
))) {
1510 redisClient
*slave
= ln
->value
;
1512 /* Don't feed slaves that are still waiting for BGSAVE to start */
1513 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) continue;
1515 /* Feed all the other slaves, MONITORs and so on */
1516 if (slave
->slaveseldb
!= dictid
) {
1520 case 0: selectcmd
= shared
.select0
; break;
1521 case 1: selectcmd
= shared
.select1
; break;
1522 case 2: selectcmd
= shared
.select2
; break;
1523 case 3: selectcmd
= shared
.select3
; break;
1524 case 4: selectcmd
= shared
.select4
; break;
1525 case 5: selectcmd
= shared
.select5
; break;
1526 case 6: selectcmd
= shared
.select6
; break;
1527 case 7: selectcmd
= shared
.select7
; break;
1528 case 8: selectcmd
= shared
.select8
; break;
1529 case 9: selectcmd
= shared
.select9
; break;
1531 selectcmd
= createObject(REDIS_STRING
,
1532 sdscatprintf(sdsempty(),"select %d\r\n",dictid
));
1533 selectcmd
->refcount
= 0;
1536 addReply(slave
,selectcmd
);
1537 slave
->slaveseldb
= dictid
;
1539 for (j
= 0; j
< outc
; j
++) addReply(slave
,outv
[j
]);
1541 for (j
= 0; j
< outc
; j
++) decrRefCount(outv
[j
]);
1542 if (outv
!= static_outv
) zfree(outv
);
1545 static void readQueryFromClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1546 redisClient
*c
= (redisClient
*) privdata
;
1547 char buf
[REDIS_IOBUF_LEN
];
1550 REDIS_NOTUSED(mask
);
1552 nread
= read(fd
, buf
, REDIS_IOBUF_LEN
);
1554 if (errno
== EAGAIN
) {
1557 redisLog(REDIS_DEBUG
, "Reading from client: %s",strerror(errno
));
1561 } else if (nread
== 0) {
1562 redisLog(REDIS_DEBUG
, "Client closed connection");
1567 c
->querybuf
= sdscatlen(c
->querybuf
, buf
, nread
);
1568 c
->lastinteraction
= time(NULL
);
1574 if (c
->bulklen
== -1) {
1575 /* Read the first line of the query */
1576 char *p
= strchr(c
->querybuf
,'\n');
1583 query
= c
->querybuf
;
1584 c
->querybuf
= sdsempty();
1585 querylen
= 1+(p
-(query
));
1586 if (sdslen(query
) > querylen
) {
1587 /* leave data after the first line of the query in the buffer */
1588 c
->querybuf
= sdscatlen(c
->querybuf
,query
+querylen
,sdslen(query
)-querylen
);
1590 *p
= '\0'; /* remove "\n" */
1591 if (*(p
-1) == '\r') *(p
-1) = '\0'; /* and "\r" if any */
1592 sdsupdatelen(query
);
1594 /* Now we can split the query in arguments */
1595 if (sdslen(query
) == 0) {
1596 /* Ignore empty query */
1600 argv
= sdssplitlen(query
,sdslen(query
)," ",1,&argc
);
1601 if (argv
== NULL
) oom("sdssplitlen");
1604 if (c
->argv
) zfree(c
->argv
);
1605 c
->argv
= zmalloc(sizeof(robj
*)*argc
);
1606 if (c
->argv
== NULL
) oom("allocating arguments list for client");
1608 for (j
= 0; j
< argc
; j
++) {
1609 if (sdslen(argv
[j
])) {
1610 c
->argv
[c
->argc
] = createObject(REDIS_STRING
,argv
[j
]);
1617 /* Execute the command. If the client is still valid
1618 * after processCommand() return and there is something
1619 * on the query buffer try to process the next command. */
1620 if (c
->argc
&& processCommand(c
) && sdslen(c
->querybuf
)) goto again
;
1622 } else if (sdslen(c
->querybuf
) >= REDIS_REQUEST_MAX_SIZE
) {
1623 redisLog(REDIS_DEBUG
, "Client protocol error");
1628 /* Bulk read handling. Note that if we are at this point
1629 the client already sent a command terminated with a newline,
1630 we are reading the bulk data that is actually the last
1631 argument of the command. */
1632 int qbl
= sdslen(c
->querybuf
);
1634 if (c
->bulklen
<= qbl
) {
1635 /* Copy everything but the final CRLF as final argument */
1636 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
1638 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
1645 static int selectDb(redisClient
*c
, int id
) {
1646 if (id
< 0 || id
>= server
.dbnum
)
1648 c
->db
= &server
.db
[id
];
1652 static void *dupClientReplyValue(void *o
) {
1653 incrRefCount((robj
*)o
);
1657 static redisClient
*createClient(int fd
) {
1658 redisClient
*c
= zmalloc(sizeof(*c
));
1660 anetNonBlock(NULL
,fd
);
1661 anetTcpNoDelay(NULL
,fd
);
1662 if (!c
) return NULL
;
1665 c
->querybuf
= sdsempty();
1674 c
->lastinteraction
= time(NULL
);
1675 c
->authenticated
= 0;
1676 c
->replstate
= REDIS_REPL_NONE
;
1677 if ((c
->reply
= listCreate()) == NULL
) oom("listCreate");
1678 listSetFreeMethod(c
->reply
,decrRefCount
);
1679 listSetDupMethod(c
->reply
,dupClientReplyValue
);
1680 if (aeCreateFileEvent(server
.el
, c
->fd
, AE_READABLE
,
1681 readQueryFromClient
, c
, NULL
) == AE_ERR
) {
1685 if (!listAddNodeTail(server
.clients
,c
)) oom("listAddNodeTail");
1689 static void addReply(redisClient
*c
, robj
*obj
) {
1690 if (listLength(c
->reply
) == 0 &&
1691 (c
->replstate
== REDIS_REPL_NONE
||
1692 c
->replstate
== REDIS_REPL_ONLINE
) &&
1693 aeCreateFileEvent(server
.el
, c
->fd
, AE_WRITABLE
,
1694 sendReplyToClient
, c
, NULL
) == AE_ERR
) return;
1695 if (obj
->encoding
!= REDIS_ENCODING_RAW
) {
1696 obj
= getDecodedObject(obj
);
1700 if (!listAddNodeTail(c
->reply
,obj
)) oom("listAddNodeTail");
1703 static void addReplySds(redisClient
*c
, sds s
) {
1704 robj
*o
= createObject(REDIS_STRING
,s
);
1709 static void addReplyBulkLen(redisClient
*c
, robj
*obj
) {
1712 if (obj
->encoding
== REDIS_ENCODING_RAW
) {
1713 len
= sdslen(obj
->ptr
);
1715 long n
= (long)obj
->ptr
;
1722 while((n
= n
/10) != 0) {
1726 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",len
));
1729 static void acceptHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1734 REDIS_NOTUSED(mask
);
1735 REDIS_NOTUSED(privdata
);
1737 cfd
= anetAccept(server
.neterr
, fd
, cip
, &cport
);
1738 if (cfd
== AE_ERR
) {
1739 redisLog(REDIS_DEBUG
,"Accepting client connection: %s", server
.neterr
);
1742 redisLog(REDIS_DEBUG
,"Accepted %s:%d", cip
, cport
);
1743 if ((c
= createClient(cfd
)) == NULL
) {
1744 redisLog(REDIS_WARNING
,"Error allocating resoures for the client");
1745 close(cfd
); /* May be already closed, just ingore errors */
1748 /* If maxclient directive is set and this is one client more... close the
1749 * connection. Note that we create the client instead to check before
1750 * for this condition, since now the socket is already set in nonblocking
1751 * mode and we can send an error for free using the Kernel I/O */
1752 if (server
.maxclients
&& listLength(server
.clients
) > server
.maxclients
) {
1753 char *err
= "-ERR max number of clients reached\r\n";
1755 /* That's a best effort error message, don't check write errors */
1756 (void) write(c
->fd
,err
,strlen(err
));
1760 server
.stat_numconnections
++;
1763 /* ======================= Redis objects implementation ===================== */
1765 static robj
*createObject(int type
, void *ptr
) {
1768 if (listLength(server
.objfreelist
)) {
1769 listNode
*head
= listFirst(server
.objfreelist
);
1770 o
= listNodeValue(head
);
1771 listDelNode(server
.objfreelist
,head
);
1773 o
= zmalloc(sizeof(*o
));
1775 if (!o
) oom("createObject");
1777 o
->encoding
= REDIS_ENCODING_RAW
;
1783 static robj
*createStringObject(char *ptr
, size_t len
) {
1784 return createObject(REDIS_STRING
,sdsnewlen(ptr
,len
));
1787 static robj
*createListObject(void) {
1788 list
*l
= listCreate();
1790 if (!l
) oom("listCreate");
1791 listSetFreeMethod(l
,decrRefCount
);
1792 return createObject(REDIS_LIST
,l
);
1795 static robj
*createSetObject(void) {
1796 dict
*d
= dictCreate(&setDictType
,NULL
);
1797 if (!d
) oom("dictCreate");
1798 return createObject(REDIS_SET
,d
);
1801 static void freeStringObject(robj
*o
) {
1802 if (o
->encoding
== REDIS_ENCODING_RAW
) {
1807 static void freeListObject(robj
*o
) {
1808 listRelease((list
*) o
->ptr
);
1811 static void freeSetObject(robj
*o
) {
1812 dictRelease((dict
*) o
->ptr
);
1815 static void freeHashObject(robj
*o
) {
1816 dictRelease((dict
*) o
->ptr
);
1819 static void incrRefCount(robj
*o
) {
1821 #ifdef DEBUG_REFCOUNT
1822 if (o
->type
== REDIS_STRING
)
1823 printf("Increment '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
);
1827 static void decrRefCount(void *obj
) {
1830 #ifdef DEBUG_REFCOUNT
1831 if (o
->type
== REDIS_STRING
)
1832 printf("Decrement '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
-1);
1834 if (--(o
->refcount
) == 0) {
1836 case REDIS_STRING
: freeStringObject(o
); break;
1837 case REDIS_LIST
: freeListObject(o
); break;
1838 case REDIS_SET
: freeSetObject(o
); break;
1839 case REDIS_HASH
: freeHashObject(o
); break;
1840 default: assert(0 != 0); break;
1842 if (listLength(server
.objfreelist
) > REDIS_OBJFREELIST_MAX
||
1843 !listAddNodeHead(server
.objfreelist
,o
))
1848 static robj
*lookupKey(redisDb
*db
, robj
*key
) {
1849 dictEntry
*de
= dictFind(db
->dict
,key
);
1850 return de
? dictGetEntryVal(de
) : NULL
;
1853 static robj
*lookupKeyRead(redisDb
*db
, robj
*key
) {
1854 expireIfNeeded(db
,key
);
1855 return lookupKey(db
,key
);
1858 static robj
*lookupKeyWrite(redisDb
*db
, robj
*key
) {
1859 deleteIfVolatile(db
,key
);
1860 return lookupKey(db
,key
);
1863 static int deleteKey(redisDb
*db
, robj
*key
) {
1866 /* We need to protect key from destruction: after the first dictDelete()
1867 * it may happen that 'key' is no longer valid if we don't increment
1868 * it's count. This may happen when we get the object reference directly
1869 * from the hash table with dictRandomKey() or dict iterators */
1871 if (dictSize(db
->expires
)) dictDelete(db
->expires
,key
);
1872 retval
= dictDelete(db
->dict
,key
);
1875 return retval
== DICT_OK
;
1878 /* Try to share an object against the shared objects pool */
1879 static robj
*tryObjectSharing(robj
*o
) {
1880 struct dictEntry
*de
;
1883 if (o
== NULL
|| server
.shareobjects
== 0) return o
;
1885 assert(o
->type
== REDIS_STRING
);
1886 de
= dictFind(server
.sharingpool
,o
);
1888 robj
*shared
= dictGetEntryKey(de
);
1890 c
= ((unsigned long) dictGetEntryVal(de
))+1;
1891 dictGetEntryVal(de
) = (void*) c
;
1892 incrRefCount(shared
);
1896 /* Here we are using a stream algorihtm: Every time an object is
1897 * shared we increment its count, everytime there is a miss we
1898 * recrement the counter of a random object. If this object reaches
1899 * zero we remove the object and put the current object instead. */
1900 if (dictSize(server
.sharingpool
) >=
1901 server
.sharingpoolsize
) {
1902 de
= dictGetRandomKey(server
.sharingpool
);
1904 c
= ((unsigned long) dictGetEntryVal(de
))-1;
1905 dictGetEntryVal(de
) = (void*) c
;
1907 dictDelete(server
.sharingpool
,de
->key
);
1910 c
= 0; /* If the pool is empty we want to add this object */
1915 retval
= dictAdd(server
.sharingpool
,o
,(void*)1);
1916 assert(retval
== DICT_OK
);
1923 /* Check if the nul-terminated string 's' can be represented by a long
1924 * (that is, is a number that fits into long without any other space or
1925 * character before or after the digits).
1927 * If so, the function returns REDIS_OK and *longval is set to the value
1928 * of the number. Otherwise REDIS_ERR is returned */
1929 static int isStringRepresentableAsLong(char *s
, long *longval
) {
1930 char buf
[32], *endptr
;
1934 value
= strtol(s
, &endptr
, 10);
1935 if (endptr
[0] != '\0') return REDIS_ERR
;
1936 slen
= snprintf(buf
,32,"%ld",value
);
1938 /* If the number converted back into a string is not identical
1939 * then it's not possible to encode the string as integer */
1940 if (strlen(buf
) != (unsigned)slen
|| memcmp(buf
,s
,slen
)) return REDIS_ERR
;
1941 if (longval
) *longval
= value
;
1945 /* Try to encode a string object in order to save space */
1946 static int tryObjectEncoding(robj
*o
) {
1950 if (o
->encoding
!= REDIS_ENCODING_RAW
)
1951 return REDIS_ERR
; /* Already encoded */
1953 /* It's not save to encode shared objects: shared objects can be shared
1954 * everywhere in the "object space" of Redis. Encoded objects can only
1955 * appear as "values" (and not, for instance, as keys) */
1956 if (o
->refcount
> 1) return REDIS_ERR
;
1958 /* Currently we try to encode only strings */
1959 assert(o
->type
== REDIS_STRING
);
1961 /* Check if we can represent this string as a long integer */
1962 if (isStringRepresentableAsLong(s
,&value
) == REDIS_ERR
) return REDIS_ERR
;
1964 /* Ok, this object can be encoded */
1965 o
->encoding
= REDIS_ENCODING_INT
;
1967 o
->ptr
= (void*) value
;
1971 /* Get a decoded version of an encoded object (returned as a new object) */
1972 static robj
*getDecodedObject(const robj
*o
) {
1975 assert(o
->encoding
!= REDIS_ENCODING_RAW
);
1976 if (o
->type
== REDIS_STRING
&& o
->encoding
== REDIS_ENCODING_INT
) {
1979 snprintf(buf
,32,"%ld",(long)o
->ptr
);
1980 dec
= createStringObject(buf
,strlen(buf
));
1987 static int compareStringObjects(robj
*a
, robj
*b
) {
1988 assert(a
->type
== REDIS_STRING
&& b
->type
== REDIS_STRING
);
1990 if (a
->encoding
== REDIS_ENCODING_INT
&& b
->encoding
== REDIS_ENCODING_INT
){
1991 return (long)a
->ptr
- (long)b
->ptr
;
1997 if (a
->encoding
!= REDIS_ENCODING_RAW
) a
= getDecodedObject(a
);
1998 if (b
->encoding
!= REDIS_ENCODING_RAW
) b
= getDecodedObject(a
);
1999 retval
= sdscmp(a
->ptr
,b
->ptr
);
2006 static size_t stringObjectLen(robj
*o
) {
2007 assert(o
->type
== REDIS_STRING
);
2008 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2009 return sdslen(o
->ptr
);
2013 return snprintf(buf
,32,"%ld",(long)o
->ptr
);
2017 /*============================ DB saving/loading ============================ */
2019 static int rdbSaveType(FILE *fp
, unsigned char type
) {
2020 if (fwrite(&type
,1,1,fp
) == 0) return -1;
2024 static int rdbSaveTime(FILE *fp
, time_t t
) {
2025 int32_t t32
= (int32_t) t
;
2026 if (fwrite(&t32
,4,1,fp
) == 0) return -1;
2030 /* check rdbLoadLen() comments for more info */
2031 static int rdbSaveLen(FILE *fp
, uint32_t len
) {
2032 unsigned char buf
[2];
2035 /* Save a 6 bit len */
2036 buf
[0] = (len
&0xFF)|(REDIS_RDB_6BITLEN
<<6);
2037 if (fwrite(buf
,1,1,fp
) == 0) return -1;
2038 } else if (len
< (1<<14)) {
2039 /* Save a 14 bit len */
2040 buf
[0] = ((len
>>8)&0xFF)|(REDIS_RDB_14BITLEN
<<6);
2042 if (fwrite(buf
,2,1,fp
) == 0) return -1;
2044 /* Save a 32 bit len */
2045 buf
[0] = (REDIS_RDB_32BITLEN
<<6);
2046 if (fwrite(buf
,1,1,fp
) == 0) return -1;
2048 if (fwrite(&len
,4,1,fp
) == 0) return -1;
2053 /* String objects in the form "2391" "-100" without any space and with a
2054 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2055 * encoded as integers to save space */
2056 static int rdbTryIntegerEncoding(sds s
, unsigned char *enc
) {
2058 char *endptr
, buf
[32];
2060 /* Check if it's possible to encode this value as a number */
2061 value
= strtoll(s
, &endptr
, 10);
2062 if (endptr
[0] != '\0') return 0;
2063 snprintf(buf
,32,"%lld",value
);
2065 /* If the number converted back into a string is not identical
2066 * then it's not possible to encode the string as integer */
2067 if (strlen(buf
) != sdslen(s
) || memcmp(buf
,s
,sdslen(s
))) return 0;
2069 /* Finally check if it fits in our ranges */
2070 if (value
>= -(1<<7) && value
<= (1<<7)-1) {
2071 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT8
;
2072 enc
[1] = value
&0xFF;
2074 } else if (value
>= -(1<<15) && value
<= (1<<15)-1) {
2075 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT16
;
2076 enc
[1] = value
&0xFF;
2077 enc
[2] = (value
>>8)&0xFF;
2079 } else if (value
>= -((long long)1<<31) && value
<= ((long long)1<<31)-1) {
2080 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT32
;
2081 enc
[1] = value
&0xFF;
2082 enc
[2] = (value
>>8)&0xFF;
2083 enc
[3] = (value
>>16)&0xFF;
2084 enc
[4] = (value
>>24)&0xFF;
2091 static int rdbSaveLzfStringObject(FILE *fp
, robj
*obj
) {
2092 unsigned int comprlen
, outlen
;
2096 /* We require at least four bytes compression for this to be worth it */
2097 outlen
= sdslen(obj
->ptr
)-4;
2098 if (outlen
<= 0) return 0;
2099 if ((out
= zmalloc(outlen
+1)) == NULL
) return 0;
2100 comprlen
= lzf_compress(obj
->ptr
, sdslen(obj
->ptr
), out
, outlen
);
2101 if (comprlen
== 0) {
2105 /* Data compressed! Let's save it on disk */
2106 byte
= (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_LZF
;
2107 if (fwrite(&byte
,1,1,fp
) == 0) goto writeerr
;
2108 if (rdbSaveLen(fp
,comprlen
) == -1) goto writeerr
;
2109 if (rdbSaveLen(fp
,sdslen(obj
->ptr
)) == -1) goto writeerr
;
2110 if (fwrite(out
,comprlen
,1,fp
) == 0) goto writeerr
;
2119 /* Save a string objet as [len][data] on disk. If the object is a string
2120 * representation of an integer value we try to safe it in a special form */
2121 static int rdbSaveStringObjectRaw(FILE *fp
, robj
*obj
) {
2125 len
= sdslen(obj
->ptr
);
2127 /* Try integer encoding */
2129 unsigned char buf
[5];
2130 if ((enclen
= rdbTryIntegerEncoding(obj
->ptr
,buf
)) > 0) {
2131 if (fwrite(buf
,enclen
,1,fp
) == 0) return -1;
2136 /* Try LZF compression - under 20 bytes it's unable to compress even
2137 * aaaaaaaaaaaaaaaaaa so skip it */
2141 retval
= rdbSaveLzfStringObject(fp
,obj
);
2142 if (retval
== -1) return -1;
2143 if (retval
> 0) return 0;
2144 /* retval == 0 means data can't be compressed, save the old way */
2147 /* Store verbatim */
2148 if (rdbSaveLen(fp
,len
) == -1) return -1;
2149 if (len
&& fwrite(obj
->ptr
,len
,1,fp
) == 0) return -1;
2153 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2154 static int rdbSaveStringObject(FILE *fp
, robj
*obj
) {
2158 if (obj
->encoding
!= REDIS_ENCODING_RAW
) {
2159 dec
= getDecodedObject(obj
);
2160 retval
= rdbSaveStringObjectRaw(fp
,dec
);
2164 return rdbSaveStringObjectRaw(fp
,obj
);
2168 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2169 static int rdbSave(char *filename
) {
2170 dictIterator
*di
= NULL
;
2175 time_t now
= time(NULL
);
2177 snprintf(tmpfile
,256,"temp-%d.rdb", (int) getpid());
2178 fp
= fopen(tmpfile
,"w");
2180 redisLog(REDIS_WARNING
, "Failed saving the DB: %s", strerror(errno
));
2183 if (fwrite("REDIS0001",9,1,fp
) == 0) goto werr
;
2184 for (j
= 0; j
< server
.dbnum
; j
++) {
2185 redisDb
*db
= server
.db
+j
;
2187 if (dictSize(d
) == 0) continue;
2188 di
= dictGetIterator(d
);
2194 /* Write the SELECT DB opcode */
2195 if (rdbSaveType(fp
,REDIS_SELECTDB
) == -1) goto werr
;
2196 if (rdbSaveLen(fp
,j
) == -1) goto werr
;
2198 /* Iterate this DB writing every entry */
2199 while((de
= dictNext(di
)) != NULL
) {
2200 robj
*key
= dictGetEntryKey(de
);
2201 robj
*o
= dictGetEntryVal(de
);
2202 time_t expiretime
= getExpire(db
,key
);
2204 /* Save the expire time */
2205 if (expiretime
!= -1) {
2206 /* If this key is already expired skip it */
2207 if (expiretime
< now
) continue;
2208 if (rdbSaveType(fp
,REDIS_EXPIRETIME
) == -1) goto werr
;
2209 if (rdbSaveTime(fp
,expiretime
) == -1) goto werr
;
2211 /* Save the key and associated value */
2212 if (rdbSaveType(fp
,o
->type
) == -1) goto werr
;
2213 if (rdbSaveStringObject(fp
,key
) == -1) goto werr
;
2214 if (o
->type
== REDIS_STRING
) {
2215 /* Save a string value */
2216 if (rdbSaveStringObject(fp
,o
) == -1) goto werr
;
2217 } else if (o
->type
== REDIS_LIST
) {
2218 /* Save a list value */
2219 list
*list
= o
->ptr
;
2223 if (rdbSaveLen(fp
,listLength(list
)) == -1) goto werr
;
2224 while((ln
= listYield(list
))) {
2225 robj
*eleobj
= listNodeValue(ln
);
2227 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
2229 } else if (o
->type
== REDIS_SET
) {
2230 /* Save a set value */
2232 dictIterator
*di
= dictGetIterator(set
);
2235 if (!set
) oom("dictGetIteraotr");
2236 if (rdbSaveLen(fp
,dictSize(set
)) == -1) goto werr
;
2237 while((de
= dictNext(di
)) != NULL
) {
2238 robj
*eleobj
= dictGetEntryKey(de
);
2240 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
2242 dictReleaseIterator(di
);
2247 dictReleaseIterator(di
);
2250 if (rdbSaveType(fp
,REDIS_EOF
) == -1) goto werr
;
2252 /* Make sure data will not remain on the OS's output buffers */
2257 /* Use RENAME to make sure the DB file is changed atomically only
2258 * if the generate DB file is ok. */
2259 if (rename(tmpfile
,filename
) == -1) {
2260 redisLog(REDIS_WARNING
,"Error moving temp DB file on the final destionation: %s", strerror(errno
));
2264 redisLog(REDIS_NOTICE
,"DB saved on disk");
2266 server
.lastsave
= time(NULL
);
2272 redisLog(REDIS_WARNING
,"Write error saving DB on disk: %s", strerror(errno
));
2273 if (di
) dictReleaseIterator(di
);
2277 static int rdbSaveBackground(char *filename
) {
2280 if (server
.bgsaveinprogress
) return REDIS_ERR
;
2281 if ((childpid
= fork()) == 0) {
2284 if (rdbSave(filename
) == REDIS_OK
) {
2291 if (childpid
== -1) {
2292 redisLog(REDIS_WARNING
,"Can't save in background: fork: %s",
2296 redisLog(REDIS_NOTICE
,"Background saving started by pid %d",childpid
);
2297 server
.bgsaveinprogress
= 1;
2298 server
.bgsavechildpid
= childpid
;
2301 return REDIS_OK
; /* unreached */
2304 static void rdbRemoveTempFile(pid_t childpid
) {
2307 snprintf(tmpfile
,256,"temp-%d.rdb", (int) childpid
);
2311 static int rdbLoadType(FILE *fp
) {
2313 if (fread(&type
,1,1,fp
) == 0) return -1;
2317 static time_t rdbLoadTime(FILE *fp
) {
2319 if (fread(&t32
,4,1,fp
) == 0) return -1;
2320 return (time_t) t32
;
2323 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2324 * of this file for a description of how this are stored on disk.
2326 * isencoded is set to 1 if the readed length is not actually a length but
2327 * an "encoding type", check the above comments for more info */
2328 static uint32_t rdbLoadLen(FILE *fp
, int rdbver
, int *isencoded
) {
2329 unsigned char buf
[2];
2332 if (isencoded
) *isencoded
= 0;
2334 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
2339 if (fread(buf
,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2340 type
= (buf
[0]&0xC0)>>6;
2341 if (type
== REDIS_RDB_6BITLEN
) {
2342 /* Read a 6 bit len */
2344 } else if (type
== REDIS_RDB_ENCVAL
) {
2345 /* Read a 6 bit len encoding type */
2346 if (isencoded
) *isencoded
= 1;
2348 } else if (type
== REDIS_RDB_14BITLEN
) {
2349 /* Read a 14 bit len */
2350 if (fread(buf
+1,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2351 return ((buf
[0]&0x3F)<<8)|buf
[1];
2353 /* Read a 32 bit len */
2354 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
2360 static robj
*rdbLoadIntegerObject(FILE *fp
, int enctype
) {
2361 unsigned char enc
[4];
2364 if (enctype
== REDIS_RDB_ENC_INT8
) {
2365 if (fread(enc
,1,1,fp
) == 0) return NULL
;
2366 val
= (signed char)enc
[0];
2367 } else if (enctype
== REDIS_RDB_ENC_INT16
) {
2369 if (fread(enc
,2,1,fp
) == 0) return NULL
;
2370 v
= enc
[0]|(enc
[1]<<8);
2372 } else if (enctype
== REDIS_RDB_ENC_INT32
) {
2374 if (fread(enc
,4,1,fp
) == 0) return NULL
;
2375 v
= enc
[0]|(enc
[1]<<8)|(enc
[2]<<16)|(enc
[3]<<24);
2378 val
= 0; /* anti-warning */
2381 return createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",val
));
2384 static robj
*rdbLoadLzfStringObject(FILE*fp
, int rdbver
) {
2385 unsigned int len
, clen
;
2386 unsigned char *c
= NULL
;
2389 if ((clen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2390 if ((len
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2391 if ((c
= zmalloc(clen
)) == NULL
) goto err
;
2392 if ((val
= sdsnewlen(NULL
,len
)) == NULL
) goto err
;
2393 if (fread(c
,clen
,1,fp
) == 0) goto err
;
2394 if (lzf_decompress(c
,clen
,val
,len
) == 0) goto err
;
2396 return createObject(REDIS_STRING
,val
);
2403 static robj
*rdbLoadStringObject(FILE*fp
, int rdbver
) {
2408 len
= rdbLoadLen(fp
,rdbver
,&isencoded
);
2411 case REDIS_RDB_ENC_INT8
:
2412 case REDIS_RDB_ENC_INT16
:
2413 case REDIS_RDB_ENC_INT32
:
2414 return tryObjectSharing(rdbLoadIntegerObject(fp
,len
));
2415 case REDIS_RDB_ENC_LZF
:
2416 return tryObjectSharing(rdbLoadLzfStringObject(fp
,rdbver
));
2422 if (len
== REDIS_RDB_LENERR
) return NULL
;
2423 val
= sdsnewlen(NULL
,len
);
2424 if (len
&& fread(val
,len
,1,fp
) == 0) {
2428 return tryObjectSharing(createObject(REDIS_STRING
,val
));
2431 static int rdbLoad(char *filename
) {
2433 robj
*keyobj
= NULL
;
2435 int type
, retval
, rdbver
;
2436 dict
*d
= server
.db
[0].dict
;
2437 redisDb
*db
= server
.db
+0;
2439 time_t expiretime
= -1, now
= time(NULL
);
2441 fp
= fopen(filename
,"r");
2442 if (!fp
) return REDIS_ERR
;
2443 if (fread(buf
,9,1,fp
) == 0) goto eoferr
;
2445 if (memcmp(buf
,"REDIS",5) != 0) {
2447 redisLog(REDIS_WARNING
,"Wrong signature trying to load DB from file");
2450 rdbver
= atoi(buf
+5);
2453 redisLog(REDIS_WARNING
,"Can't handle RDB format version %d",rdbver
);
2460 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
2461 if (type
== REDIS_EXPIRETIME
) {
2462 if ((expiretime
= rdbLoadTime(fp
)) == -1) goto eoferr
;
2463 /* We read the time so we need to read the object type again */
2464 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
2466 if (type
== REDIS_EOF
) break;
2467 /* Handle SELECT DB opcode as a special case */
2468 if (type
== REDIS_SELECTDB
) {
2469 if ((dbid
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2471 if (dbid
>= (unsigned)server
.dbnum
) {
2472 redisLog(REDIS_WARNING
,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server
.dbnum
);
2475 db
= server
.db
+dbid
;
2480 if ((keyobj
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2482 if (type
== REDIS_STRING
) {
2483 /* Read string value */
2484 if ((o
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2485 tryObjectEncoding(o
);
2486 } else if (type
== REDIS_LIST
|| type
== REDIS_SET
) {
2487 /* Read list/set value */
2490 if ((listlen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2492 o
= (type
== REDIS_LIST
) ? createListObject() : createSetObject();
2493 /* Load every single element of the list/set */
2497 if ((ele
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2498 tryObjectEncoding(ele
);
2499 if (type
== REDIS_LIST
) {
2500 if (!listAddNodeTail((list
*)o
->ptr
,ele
))
2501 oom("listAddNodeTail");
2503 if (dictAdd((dict
*)o
->ptr
,ele
,NULL
) == DICT_ERR
)
2510 /* Add the new object in the hash table */
2511 retval
= dictAdd(d
,keyobj
,o
);
2512 if (retval
== DICT_ERR
) {
2513 redisLog(REDIS_WARNING
,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj
->ptr
);
2516 /* Set the expire time if needed */
2517 if (expiretime
!= -1) {
2518 setExpire(db
,keyobj
,expiretime
);
2519 /* Delete this key if already expired */
2520 if (expiretime
< now
) deleteKey(db
,keyobj
);
2528 eoferr
: /* unexpected end of file is handled here with a fatal exit */
2529 if (keyobj
) decrRefCount(keyobj
);
2530 redisLog(REDIS_WARNING
,"Short read or OOM loading DB. Unrecoverable error, exiting now.");
2532 return REDIS_ERR
; /* Just to avoid warning */
2535 /*================================== Commands =============================== */
2537 static void authCommand(redisClient
*c
) {
2538 if (!server
.requirepass
|| !strcmp(c
->argv
[1]->ptr
, server
.requirepass
)) {
2539 c
->authenticated
= 1;
2540 addReply(c
,shared
.ok
);
2542 c
->authenticated
= 0;
2543 addReply(c
,shared
.err
);
2547 static void pingCommand(redisClient
*c
) {
2548 addReply(c
,shared
.pong
);
2551 static void echoCommand(redisClient
*c
) {
2552 addReplyBulkLen(c
,c
->argv
[1]);
2553 addReply(c
,c
->argv
[1]);
2554 addReply(c
,shared
.crlf
);
2557 /*=================================== Strings =============================== */
2559 static void setGenericCommand(redisClient
*c
, int nx
) {
2562 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2563 if (retval
== DICT_ERR
) {
2565 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2566 incrRefCount(c
->argv
[2]);
2568 addReply(c
,shared
.czero
);
2572 incrRefCount(c
->argv
[1]);
2573 incrRefCount(c
->argv
[2]);
2576 removeExpire(c
->db
,c
->argv
[1]);
2577 addReply(c
, nx
? shared
.cone
: shared
.ok
);
2580 static void setCommand(redisClient
*c
) {
2581 setGenericCommand(c
,0);
2584 static void setnxCommand(redisClient
*c
) {
2585 setGenericCommand(c
,1);
2588 static void getCommand(redisClient
*c
) {
2589 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2592 addReply(c
,shared
.nullbulk
);
2594 if (o
->type
!= REDIS_STRING
) {
2595 addReply(c
,shared
.wrongtypeerr
);
2597 addReplyBulkLen(c
,o
);
2599 addReply(c
,shared
.crlf
);
2604 static void getsetCommand(redisClient
*c
) {
2606 if (dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]) == DICT_ERR
) {
2607 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2609 incrRefCount(c
->argv
[1]);
2611 incrRefCount(c
->argv
[2]);
2613 removeExpire(c
->db
,c
->argv
[1]);
2616 static void mgetCommand(redisClient
*c
) {
2619 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",c
->argc
-1));
2620 for (j
= 1; j
< c
->argc
; j
++) {
2621 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[j
]);
2623 addReply(c
,shared
.nullbulk
);
2625 if (o
->type
!= REDIS_STRING
) {
2626 addReply(c
,shared
.nullbulk
);
2628 addReplyBulkLen(c
,o
);
2630 addReply(c
,shared
.crlf
);
2636 static void incrDecrCommand(redisClient
*c
, long long incr
) {
2641 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2645 if (o
->type
!= REDIS_STRING
) {
2650 if (o
->encoding
== REDIS_ENCODING_RAW
)
2651 value
= strtoll(o
->ptr
, &eptr
, 10);
2652 else if (o
->encoding
== REDIS_ENCODING_INT
)
2653 value
= (long)o
->ptr
;
2660 o
= createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",value
));
2661 tryObjectEncoding(o
);
2662 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],o
);
2663 if (retval
== DICT_ERR
) {
2664 dictReplace(c
->db
->dict
,c
->argv
[1],o
);
2665 removeExpire(c
->db
,c
->argv
[1]);
2667 incrRefCount(c
->argv
[1]);
2670 addReply(c
,shared
.colon
);
2672 addReply(c
,shared
.crlf
);
2675 static void incrCommand(redisClient
*c
) {
2676 incrDecrCommand(c
,1);
2679 static void decrCommand(redisClient
*c
) {
2680 incrDecrCommand(c
,-1);
2683 static void incrbyCommand(redisClient
*c
) {
2684 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
2685 incrDecrCommand(c
,incr
);
2688 static void decrbyCommand(redisClient
*c
) {
2689 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
2690 incrDecrCommand(c
,-incr
);
2693 /* ========================= Type agnostic commands ========================= */
2695 static void delCommand(redisClient
*c
) {
2698 for (j
= 1; j
< c
->argc
; j
++) {
2699 if (deleteKey(c
->db
,c
->argv
[j
])) {
2706 addReply(c
,shared
.czero
);
2709 addReply(c
,shared
.cone
);
2712 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",deleted
));
2717 static void existsCommand(redisClient
*c
) {
2718 addReply(c
,lookupKeyRead(c
->db
,c
->argv
[1]) ? shared
.cone
: shared
.czero
);
2721 static void selectCommand(redisClient
*c
) {
2722 int id
= atoi(c
->argv
[1]->ptr
);
2724 if (selectDb(c
,id
) == REDIS_ERR
) {
2725 addReplySds(c
,sdsnew("-ERR invalid DB index\r\n"));
2727 addReply(c
,shared
.ok
);
2731 static void randomkeyCommand(redisClient
*c
) {
2735 de
= dictGetRandomKey(c
->db
->dict
);
2736 if (!de
|| expireIfNeeded(c
->db
,dictGetEntryKey(de
)) == 0) break;
2739 addReply(c
,shared
.plus
);
2740 addReply(c
,shared
.crlf
);
2742 addReply(c
,shared
.plus
);
2743 addReply(c
,dictGetEntryKey(de
));
2744 addReply(c
,shared
.crlf
);
2748 static void keysCommand(redisClient
*c
) {
2751 sds pattern
= c
->argv
[1]->ptr
;
2752 int plen
= sdslen(pattern
);
2753 int numkeys
= 0, keyslen
= 0;
2754 robj
*lenobj
= createObject(REDIS_STRING
,NULL
);
2756 di
= dictGetIterator(c
->db
->dict
);
2757 if (!di
) oom("dictGetIterator");
2759 decrRefCount(lenobj
);
2760 while((de
= dictNext(di
)) != NULL
) {
2761 robj
*keyobj
= dictGetEntryKey(de
);
2763 sds key
= keyobj
->ptr
;
2764 if ((pattern
[0] == '*' && pattern
[1] == '\0') ||
2765 stringmatchlen(pattern
,plen
,key
,sdslen(key
),0)) {
2766 if (expireIfNeeded(c
->db
,keyobj
) == 0) {
2768 addReply(c
,shared
.space
);
2771 keyslen
+= sdslen(key
);
2775 dictReleaseIterator(di
);
2776 lenobj
->ptr
= sdscatprintf(sdsempty(),"$%lu\r\n",keyslen
+(numkeys
? (numkeys
-1) : 0));
2777 addReply(c
,shared
.crlf
);
2780 static void dbsizeCommand(redisClient
*c
) {
2782 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c
->db
->dict
)));
2785 static void lastsaveCommand(redisClient
*c
) {
2787 sdscatprintf(sdsempty(),":%lu\r\n",server
.lastsave
));
2790 static void typeCommand(redisClient
*c
) {
2794 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2799 case REDIS_STRING
: type
= "+string"; break;
2800 case REDIS_LIST
: type
= "+list"; break;
2801 case REDIS_SET
: type
= "+set"; break;
2802 default: type
= "unknown"; break;
2805 addReplySds(c
,sdsnew(type
));
2806 addReply(c
,shared
.crlf
);
2809 static void saveCommand(redisClient
*c
) {
2810 if (server
.bgsaveinprogress
) {
2811 addReplySds(c
,sdsnew("-ERR background save in progress\r\n"));
2814 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
2815 addReply(c
,shared
.ok
);
2817 addReply(c
,shared
.err
);
2821 static void bgsaveCommand(redisClient
*c
) {
2822 if (server
.bgsaveinprogress
) {
2823 addReplySds(c
,sdsnew("-ERR background save already in progress\r\n"));
2826 if (rdbSaveBackground(server
.dbfilename
) == REDIS_OK
) {
2827 addReply(c
,shared
.ok
);
2829 addReply(c
,shared
.err
);
2833 static void shutdownCommand(redisClient
*c
) {
2834 redisLog(REDIS_WARNING
,"User requested shutdown, saving DB...");
2835 /* Kill the saving child if there is a background saving in progress.
2836 We want to avoid race conditions, for instance our saving child may
2837 overwrite the synchronous saving did by SHUTDOWN. */
2838 if (server
.bgsaveinprogress
) {
2839 redisLog(REDIS_WARNING
,"There is a live saving child. Killing it!");
2840 kill(server
.bgsavechildpid
,SIGKILL
);
2841 rdbRemoveTempFile(server
.bgsavechildpid
);
2844 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
2845 if (server
.daemonize
)
2846 unlink(server
.pidfile
);
2847 redisLog(REDIS_WARNING
,"%zu bytes used at exit",zmalloc_used_memory());
2848 redisLog(REDIS_WARNING
,"Server exit now, bye bye...");
2851 /* Ooops.. error saving! The best we can do is to continue operating.
2852 * Note that if there was a background saving process, in the next
2853 * cron() Redis will be notified that the background saving aborted,
2854 * handling special stuff like slaves pending for synchronization... */
2855 redisLog(REDIS_WARNING
,"Error trying to save the DB, can't exit");
2856 addReplySds(c
,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
2860 static void renameGenericCommand(redisClient
*c
, int nx
) {
2863 /* To use the same key as src and dst is probably an error */
2864 if (sdscmp(c
->argv
[1]->ptr
,c
->argv
[2]->ptr
) == 0) {
2865 addReply(c
,shared
.sameobjecterr
);
2869 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2871 addReply(c
,shared
.nokeyerr
);
2875 deleteIfVolatile(c
->db
,c
->argv
[2]);
2876 if (dictAdd(c
->db
->dict
,c
->argv
[2],o
) == DICT_ERR
) {
2879 addReply(c
,shared
.czero
);
2882 dictReplace(c
->db
->dict
,c
->argv
[2],o
);
2884 incrRefCount(c
->argv
[2]);
2886 deleteKey(c
->db
,c
->argv
[1]);
2888 addReply(c
,nx
? shared
.cone
: shared
.ok
);
2891 static void renameCommand(redisClient
*c
) {
2892 renameGenericCommand(c
,0);
2895 static void renamenxCommand(redisClient
*c
) {
2896 renameGenericCommand(c
,1);
2899 static void moveCommand(redisClient
*c
) {
2904 /* Obtain source and target DB pointers */
2907 if (selectDb(c
,atoi(c
->argv
[2]->ptr
)) == REDIS_ERR
) {
2908 addReply(c
,shared
.outofrangeerr
);
2912 selectDb(c
,srcid
); /* Back to the source DB */
2914 /* If the user is moving using as target the same
2915 * DB as the source DB it is probably an error. */
2917 addReply(c
,shared
.sameobjecterr
);
2921 /* Check if the element exists and get a reference */
2922 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2924 addReply(c
,shared
.czero
);
2928 /* Try to add the element to the target DB */
2929 deleteIfVolatile(dst
,c
->argv
[1]);
2930 if (dictAdd(dst
->dict
,c
->argv
[1],o
) == DICT_ERR
) {
2931 addReply(c
,shared
.czero
);
2934 incrRefCount(c
->argv
[1]);
2937 /* OK! key moved, free the entry in the source DB */
2938 deleteKey(src
,c
->argv
[1]);
2940 addReply(c
,shared
.cone
);
2943 /* =================================== Lists ================================ */
2944 static void pushGenericCommand(redisClient
*c
, int where
) {
2948 lobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2950 lobj
= createListObject();
2952 if (where
== REDIS_HEAD
) {
2953 if (!listAddNodeHead(list
,c
->argv
[2])) oom("listAddNodeHead");
2955 if (!listAddNodeTail(list
,c
->argv
[2])) oom("listAddNodeTail");
2957 dictAdd(c
->db
->dict
,c
->argv
[1],lobj
);
2958 incrRefCount(c
->argv
[1]);
2959 incrRefCount(c
->argv
[2]);
2961 if (lobj
->type
!= REDIS_LIST
) {
2962 addReply(c
,shared
.wrongtypeerr
);
2966 if (where
== REDIS_HEAD
) {
2967 if (!listAddNodeHead(list
,c
->argv
[2])) oom("listAddNodeHead");
2969 if (!listAddNodeTail(list
,c
->argv
[2])) oom("listAddNodeTail");
2971 incrRefCount(c
->argv
[2]);
2974 addReply(c
,shared
.ok
);
2977 static void lpushCommand(redisClient
*c
) {
2978 pushGenericCommand(c
,REDIS_HEAD
);
2981 static void rpushCommand(redisClient
*c
) {
2982 pushGenericCommand(c
,REDIS_TAIL
);
2985 static void llenCommand(redisClient
*c
) {
2989 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2991 addReply(c
,shared
.czero
);
2994 if (o
->type
!= REDIS_LIST
) {
2995 addReply(c
,shared
.wrongtypeerr
);
2998 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",listLength(l
)));
3003 static void lindexCommand(redisClient
*c
) {
3005 int index
= atoi(c
->argv
[2]->ptr
);
3007 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3009 addReply(c
,shared
.nullbulk
);
3011 if (o
->type
!= REDIS_LIST
) {
3012 addReply(c
,shared
.wrongtypeerr
);
3014 list
*list
= o
->ptr
;
3017 ln
= listIndex(list
, index
);
3019 addReply(c
,shared
.nullbulk
);
3021 robj
*ele
= listNodeValue(ln
);
3022 addReplyBulkLen(c
,ele
);
3024 addReply(c
,shared
.crlf
);
3030 static void lsetCommand(redisClient
*c
) {
3032 int index
= atoi(c
->argv
[2]->ptr
);
3034 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3036 addReply(c
,shared
.nokeyerr
);
3038 if (o
->type
!= REDIS_LIST
) {
3039 addReply(c
,shared
.wrongtypeerr
);
3041 list
*list
= o
->ptr
;
3044 ln
= listIndex(list
, index
);
3046 addReply(c
,shared
.outofrangeerr
);
3048 robj
*ele
= listNodeValue(ln
);
3051 listNodeValue(ln
) = c
->argv
[3];
3052 incrRefCount(c
->argv
[3]);
3053 addReply(c
,shared
.ok
);
3060 static void popGenericCommand(redisClient
*c
, int where
) {
3063 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3065 addReply(c
,shared
.nullbulk
);
3067 if (o
->type
!= REDIS_LIST
) {
3068 addReply(c
,shared
.wrongtypeerr
);
3070 list
*list
= o
->ptr
;
3073 if (where
== REDIS_HEAD
)
3074 ln
= listFirst(list
);
3076 ln
= listLast(list
);
3079 addReply(c
,shared
.nullbulk
);
3081 robj
*ele
= listNodeValue(ln
);
3082 addReplyBulkLen(c
,ele
);
3084 addReply(c
,shared
.crlf
);
3085 listDelNode(list
,ln
);
3092 static void lpopCommand(redisClient
*c
) {
3093 popGenericCommand(c
,REDIS_HEAD
);
3096 static void rpopCommand(redisClient
*c
) {
3097 popGenericCommand(c
,REDIS_TAIL
);
3100 static void lrangeCommand(redisClient
*c
) {
3102 int start
= atoi(c
->argv
[2]->ptr
);
3103 int end
= atoi(c
->argv
[3]->ptr
);
3105 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3107 addReply(c
,shared
.nullmultibulk
);
3109 if (o
->type
!= REDIS_LIST
) {
3110 addReply(c
,shared
.wrongtypeerr
);
3112 list
*list
= o
->ptr
;
3114 int llen
= listLength(list
);
3118 /* convert negative indexes */
3119 if (start
< 0) start
= llen
+start
;
3120 if (end
< 0) end
= llen
+end
;
3121 if (start
< 0) start
= 0;
3122 if (end
< 0) end
= 0;
3124 /* indexes sanity checks */
3125 if (start
> end
|| start
>= llen
) {
3126 /* Out of range start or start > end result in empty list */
3127 addReply(c
,shared
.emptymultibulk
);
3130 if (end
>= llen
) end
= llen
-1;
3131 rangelen
= (end
-start
)+1;
3133 /* Return the result in form of a multi-bulk reply */
3134 ln
= listIndex(list
, start
);
3135 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",rangelen
));
3136 for (j
= 0; j
< rangelen
; j
++) {
3137 ele
= listNodeValue(ln
);
3138 addReplyBulkLen(c
,ele
);
3140 addReply(c
,shared
.crlf
);
3147 static void ltrimCommand(redisClient
*c
) {
3149 int start
= atoi(c
->argv
[2]->ptr
);
3150 int end
= atoi(c
->argv
[3]->ptr
);
3152 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3154 addReply(c
,shared
.nokeyerr
);
3156 if (o
->type
!= REDIS_LIST
) {
3157 addReply(c
,shared
.wrongtypeerr
);
3159 list
*list
= o
->ptr
;
3161 int llen
= listLength(list
);
3162 int j
, ltrim
, rtrim
;
3164 /* convert negative indexes */
3165 if (start
< 0) start
= llen
+start
;
3166 if (end
< 0) end
= llen
+end
;
3167 if (start
< 0) start
= 0;
3168 if (end
< 0) end
= 0;
3170 /* indexes sanity checks */
3171 if (start
> end
|| start
>= llen
) {
3172 /* Out of range start or start > end result in empty list */
3176 if (end
>= llen
) end
= llen
-1;
3181 /* Remove list elements to perform the trim */
3182 for (j
= 0; j
< ltrim
; j
++) {
3183 ln
= listFirst(list
);
3184 listDelNode(list
,ln
);
3186 for (j
= 0; j
< rtrim
; j
++) {
3187 ln
= listLast(list
);
3188 listDelNode(list
,ln
);
3191 addReply(c
,shared
.ok
);
3196 static void lremCommand(redisClient
*c
) {
3199 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3201 addReply(c
,shared
.czero
);
3203 if (o
->type
!= REDIS_LIST
) {
3204 addReply(c
,shared
.wrongtypeerr
);
3206 list
*list
= o
->ptr
;
3207 listNode
*ln
, *next
;
3208 int toremove
= atoi(c
->argv
[2]->ptr
);
3213 toremove
= -toremove
;
3216 ln
= fromtail
? list
->tail
: list
->head
;
3218 robj
*ele
= listNodeValue(ln
);
3220 next
= fromtail
? ln
->prev
: ln
->next
;
3221 if (compareStringObjects(ele
,c
->argv
[3]) == 0) {
3222 listDelNode(list
,ln
);
3225 if (toremove
&& removed
== toremove
) break;
3229 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",removed
));
3234 /* ==================================== Sets ================================ */
3236 static void saddCommand(redisClient
*c
) {
3239 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3241 set
= createSetObject();
3242 dictAdd(c
->db
->dict
,c
->argv
[1],set
);
3243 incrRefCount(c
->argv
[1]);
3245 if (set
->type
!= REDIS_SET
) {
3246 addReply(c
,shared
.wrongtypeerr
);
3250 if (dictAdd(set
->ptr
,c
->argv
[2],NULL
) == DICT_OK
) {
3251 incrRefCount(c
->argv
[2]);
3253 addReply(c
,shared
.cone
);
3255 addReply(c
,shared
.czero
);
3259 static void sremCommand(redisClient
*c
) {
3262 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3264 addReply(c
,shared
.czero
);
3266 if (set
->type
!= REDIS_SET
) {
3267 addReply(c
,shared
.wrongtypeerr
);
3270 if (dictDelete(set
->ptr
,c
->argv
[2]) == DICT_OK
) {
3272 if (htNeedsResize(set
->ptr
)) dictResize(set
->ptr
);
3273 addReply(c
,shared
.cone
);
3275 addReply(c
,shared
.czero
);
3280 static void smoveCommand(redisClient
*c
) {
3281 robj
*srcset
, *dstset
;
3283 srcset
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3284 dstset
= lookupKeyWrite(c
->db
,c
->argv
[2]);
3286 /* If the source key does not exist return 0, if it's of the wrong type
3288 if (srcset
== NULL
|| srcset
->type
!= REDIS_SET
) {
3289 addReply(c
, srcset
? shared
.wrongtypeerr
: shared
.czero
);
3292 /* Error if the destination key is not a set as well */
3293 if (dstset
&& dstset
->type
!= REDIS_SET
) {
3294 addReply(c
,shared
.wrongtypeerr
);
3297 /* Remove the element from the source set */
3298 if (dictDelete(srcset
->ptr
,c
->argv
[3]) == DICT_ERR
) {
3299 /* Key not found in the src set! return zero */
3300 addReply(c
,shared
.czero
);
3304 /* Add the element to the destination set */
3306 dstset
= createSetObject();
3307 dictAdd(c
->db
->dict
,c
->argv
[2],dstset
);
3308 incrRefCount(c
->argv
[2]);
3310 if (dictAdd(dstset
->ptr
,c
->argv
[3],NULL
) == DICT_OK
)
3311 incrRefCount(c
->argv
[3]);
3312 addReply(c
,shared
.cone
);
3315 static void sismemberCommand(redisClient
*c
) {
3318 set
= lookupKeyRead(c
->db
,c
->argv
[1]);
3320 addReply(c
,shared
.czero
);
3322 if (set
->type
!= REDIS_SET
) {
3323 addReply(c
,shared
.wrongtypeerr
);
3326 if (dictFind(set
->ptr
,c
->argv
[2]))
3327 addReply(c
,shared
.cone
);
3329 addReply(c
,shared
.czero
);
3333 static void scardCommand(redisClient
*c
) {
3337 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3339 addReply(c
,shared
.czero
);
3342 if (o
->type
!= REDIS_SET
) {
3343 addReply(c
,shared
.wrongtypeerr
);
3346 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",
3352 static void spopCommand(redisClient
*c
) {
3356 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3358 addReply(c
,shared
.nullbulk
);
3360 if (set
->type
!= REDIS_SET
) {
3361 addReply(c
,shared
.wrongtypeerr
);
3364 de
= dictGetRandomKey(set
->ptr
);
3366 addReply(c
,shared
.nullbulk
);
3368 robj
*ele
= dictGetEntryKey(de
);
3370 addReplyBulkLen(c
,ele
);
3372 addReply(c
,shared
.crlf
);
3373 dictDelete(set
->ptr
,ele
);
3374 if (htNeedsResize(set
->ptr
)) dictResize(set
->ptr
);
3380 static int qsortCompareSetsByCardinality(const void *s1
, const void *s2
) {
3381 dict
**d1
= (void*) s1
, **d2
= (void*) s2
;
3383 return dictSize(*d1
)-dictSize(*d2
);
3386 static void sinterGenericCommand(redisClient
*c
, robj
**setskeys
, int setsnum
, robj
*dstkey
) {
3387 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
3390 robj
*lenobj
= NULL
, *dstset
= NULL
;
3391 int j
, cardinality
= 0;
3393 if (!dv
) oom("sinterGenericCommand");
3394 for (j
= 0; j
< setsnum
; j
++) {
3398 lookupKeyWrite(c
->db
,setskeys
[j
]) :
3399 lookupKeyRead(c
->db
,setskeys
[j
]);
3403 deleteKey(c
->db
,dstkey
);
3404 addReply(c
,shared
.ok
);
3406 addReply(c
,shared
.nullmultibulk
);
3410 if (setobj
->type
!= REDIS_SET
) {
3412 addReply(c
,shared
.wrongtypeerr
);
3415 dv
[j
] = setobj
->ptr
;
3417 /* Sort sets from the smallest to largest, this will improve our
3418 * algorithm's performace */
3419 qsort(dv
,setsnum
,sizeof(dict
*),qsortCompareSetsByCardinality
);
3421 /* The first thing we should output is the total number of elements...
3422 * since this is a multi-bulk write, but at this stage we don't know
3423 * the intersection set size, so we use a trick, append an empty object
3424 * to the output list and save the pointer to later modify it with the
3427 lenobj
= createObject(REDIS_STRING
,NULL
);
3429 decrRefCount(lenobj
);
3431 /* If we have a target key where to store the resulting set
3432 * create this key with an empty set inside */
3433 dstset
= createSetObject();
3436 /* Iterate all the elements of the first (smallest) set, and test
3437 * the element against all the other sets, if at least one set does
3438 * not include the element it is discarded */
3439 di
= dictGetIterator(dv
[0]);
3440 if (!di
) oom("dictGetIterator");
3442 while((de
= dictNext(di
)) != NULL
) {
3445 for (j
= 1; j
< setsnum
; j
++)
3446 if (dictFind(dv
[j
],dictGetEntryKey(de
)) == NULL
) break;
3448 continue; /* at least one set does not contain the member */
3449 ele
= dictGetEntryKey(de
);
3451 addReplyBulkLen(c
,ele
);
3453 addReply(c
,shared
.crlf
);
3456 dictAdd(dstset
->ptr
,ele
,NULL
);
3460 dictReleaseIterator(di
);
3463 /* Store the resulting set into the target */
3464 deleteKey(c
->db
,dstkey
);
3465 dictAdd(c
->db
->dict
,dstkey
,dstset
);
3466 incrRefCount(dstkey
);
3470 lenobj
->ptr
= sdscatprintf(sdsempty(),"*%d\r\n",cardinality
);
3472 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",
3473 dictSize((dict
*)dstset
->ptr
)));
3479 static void sinterCommand(redisClient
*c
) {
3480 sinterGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
);
3483 static void sinterstoreCommand(redisClient
*c
) {
3484 sinterGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1]);
3487 #define REDIS_OP_UNION 0
3488 #define REDIS_OP_DIFF 1
3490 static void sunionDiffGenericCommand(redisClient
*c
, robj
**setskeys
, int setsnum
, robj
*dstkey
, int op
) {
3491 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
3494 robj
*dstset
= NULL
;
3495 int j
, cardinality
= 0;
3497 if (!dv
) oom("sunionDiffGenericCommand");
3498 for (j
= 0; j
< setsnum
; j
++) {
3502 lookupKeyWrite(c
->db
,setskeys
[j
]) :
3503 lookupKeyRead(c
->db
,setskeys
[j
]);
3508 if (setobj
->type
!= REDIS_SET
) {
3510 addReply(c
,shared
.wrongtypeerr
);
3513 dv
[j
] = setobj
->ptr
;
3516 /* We need a temp set object to store our union. If the dstkey
3517 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
3518 * this set object will be the resulting object to set into the target key*/
3519 dstset
= createSetObject();
3521 /* Iterate all the elements of all the sets, add every element a single
3522 * time to the result set */
3523 for (j
= 0; j
< setsnum
; j
++) {
3524 if (op
== REDIS_OP_DIFF
&& j
== 0 && !dv
[j
]) break; /* result set is empty */
3525 if (!dv
[j
]) continue; /* non existing keys are like empty sets */
3527 di
= dictGetIterator(dv
[j
]);
3528 if (!di
) oom("dictGetIterator");
3530 while((de
= dictNext(di
)) != NULL
) {
3533 /* dictAdd will not add the same element multiple times */
3534 ele
= dictGetEntryKey(de
);
3535 if (op
== REDIS_OP_UNION
|| j
== 0) {
3536 if (dictAdd(dstset
->ptr
,ele
,NULL
) == DICT_OK
) {
3540 } else if (op
== REDIS_OP_DIFF
) {
3541 if (dictDelete(dstset
->ptr
,ele
) == DICT_OK
) {
3546 dictReleaseIterator(di
);
3548 if (op
== REDIS_OP_DIFF
&& cardinality
== 0) break; /* result set is empty */
3551 /* Output the content of the resulting set, if not in STORE mode */
3553 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",cardinality
));
3554 di
= dictGetIterator(dstset
->ptr
);
3555 if (!di
) oom("dictGetIterator");
3556 while((de
= dictNext(di
)) != NULL
) {
3559 ele
= dictGetEntryKey(de
);
3560 addReplyBulkLen(c
,ele
);
3562 addReply(c
,shared
.crlf
);
3564 dictReleaseIterator(di
);
3566 /* If we have a target key where to store the resulting set
3567 * create this key with the result set inside */
3568 deleteKey(c
->db
,dstkey
);
3569 dictAdd(c
->db
->dict
,dstkey
,dstset
);
3570 incrRefCount(dstkey
);
3575 decrRefCount(dstset
);
3577 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",
3578 dictSize((dict
*)dstset
->ptr
)));
3584 static void sunionCommand(redisClient
*c
) {
3585 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_UNION
);
3588 static void sunionstoreCommand(redisClient
*c
) {
3589 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_UNION
);
3592 static void sdiffCommand(redisClient
*c
) {
3593 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_DIFF
);
3596 static void sdiffstoreCommand(redisClient
*c
) {
3597 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_DIFF
);
3600 static void flushdbCommand(redisClient
*c
) {
3601 server
.dirty
+= dictSize(c
->db
->dict
);
3602 dictEmpty(c
->db
->dict
);
3603 dictEmpty(c
->db
->expires
);
3604 addReply(c
,shared
.ok
);
3607 static void flushallCommand(redisClient
*c
) {
3608 server
.dirty
+= emptyDb();
3609 addReply(c
,shared
.ok
);
3610 rdbSave(server
.dbfilename
);
3614 static redisSortOperation
*createSortOperation(int type
, robj
*pattern
) {
3615 redisSortOperation
*so
= zmalloc(sizeof(*so
));
3616 if (!so
) oom("createSortOperation");
3618 so
->pattern
= pattern
;
3622 /* Return the value associated to the key with a name obtained
3623 * substituting the first occurence of '*' in 'pattern' with 'subst' */
3624 static robj
*lookupKeyByPattern(redisDb
*db
, robj
*pattern
, robj
*subst
) {
3628 int prefixlen
, sublen
, postfixlen
;
3629 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
3633 char buf
[REDIS_SORTKEY_MAX
+1];
3636 if (subst
->encoding
== REDIS_ENCODING_RAW
)
3637 incrRefCount(subst
);
3639 subst
= getDecodedObject(subst
);
3642 spat
= pattern
->ptr
;
3644 if (sdslen(spat
)+sdslen(ssub
)-1 > REDIS_SORTKEY_MAX
) return NULL
;
3645 p
= strchr(spat
,'*');
3646 if (!p
) return NULL
;
3649 sublen
= sdslen(ssub
);
3650 postfixlen
= sdslen(spat
)-(prefixlen
+1);
3651 memcpy(keyname
.buf
,spat
,prefixlen
);
3652 memcpy(keyname
.buf
+prefixlen
,ssub
,sublen
);
3653 memcpy(keyname
.buf
+prefixlen
+sublen
,p
+1,postfixlen
);
3654 keyname
.buf
[prefixlen
+sublen
+postfixlen
] = '\0';
3655 keyname
.len
= prefixlen
+sublen
+postfixlen
;
3657 keyobj
.refcount
= 1;
3658 keyobj
.type
= REDIS_STRING
;
3659 keyobj
.ptr
= ((char*)&keyname
)+(sizeof(long)*2);
3661 decrRefCount(subst
);
3663 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
3664 return lookupKeyRead(db
,&keyobj
);
3667 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
3668 * the additional parameter is not standard but a BSD-specific we have to
3669 * pass sorting parameters via the global 'server' structure */
3670 static int sortCompare(const void *s1
, const void *s2
) {
3671 const redisSortObject
*so1
= s1
, *so2
= s2
;
3674 if (!server
.sort_alpha
) {
3675 /* Numeric sorting. Here it's trivial as we precomputed scores */
3676 if (so1
->u
.score
> so2
->u
.score
) {
3678 } else if (so1
->u
.score
< so2
->u
.score
) {
3684 /* Alphanumeric sorting */
3685 if (server
.sort_bypattern
) {
3686 if (!so1
->u
.cmpobj
|| !so2
->u
.cmpobj
) {
3687 /* At least one compare object is NULL */
3688 if (so1
->u
.cmpobj
== so2
->u
.cmpobj
)
3690 else if (so1
->u
.cmpobj
== NULL
)
3695 /* We have both the objects, use strcoll */
3696 cmp
= strcoll(so1
->u
.cmpobj
->ptr
,so2
->u
.cmpobj
->ptr
);
3699 /* Compare elements directly */
3700 if (so1
->obj
->encoding
== REDIS_ENCODING_RAW
&&
3701 so2
->obj
->encoding
== REDIS_ENCODING_RAW
) {
3702 cmp
= strcoll(so1
->obj
->ptr
,so2
->obj
->ptr
);
3706 dec1
= so1
->obj
->encoding
== REDIS_ENCODING_RAW
?
3707 so1
->obj
: getDecodedObject(so1
->obj
);
3708 dec2
= so2
->obj
->encoding
== REDIS_ENCODING_RAW
?
3709 so2
->obj
: getDecodedObject(so2
->obj
);
3710 cmp
= strcoll(dec1
->ptr
,dec2
->ptr
);
3711 if (dec1
!= so1
->obj
) decrRefCount(dec1
);
3712 if (dec2
!= so2
->obj
) decrRefCount(dec2
);
3716 return server
.sort_desc
? -cmp
: cmp
;
3719 /* The SORT command is the most complex command in Redis. Warning: this code
3720 * is optimized for speed and a bit less for readability */
3721 static void sortCommand(redisClient
*c
) {
3724 int desc
= 0, alpha
= 0;
3725 int limit_start
= 0, limit_count
= -1, start
, end
;
3726 int j
, dontsort
= 0, vectorlen
;
3727 int getop
= 0; /* GET operation counter */
3728 robj
*sortval
, *sortby
= NULL
;
3729 redisSortObject
*vector
; /* Resulting vector to sort */
3731 /* Lookup the key to sort. It must be of the right types */
3732 sortval
= lookupKeyRead(c
->db
,c
->argv
[1]);
3733 if (sortval
== NULL
) {
3734 addReply(c
,shared
.nokeyerr
);
3737 if (sortval
->type
!= REDIS_SET
&& sortval
->type
!= REDIS_LIST
) {
3738 addReply(c
,shared
.wrongtypeerr
);
3742 /* Create a list of operations to perform for every sorted element.
3743 * Operations can be GET/DEL/INCR/DECR */
3744 operations
= listCreate();
3745 listSetFreeMethod(operations
,zfree
);
3748 /* Now we need to protect sortval incrementing its count, in the future
3749 * SORT may have options able to overwrite/delete keys during the sorting
3750 * and the sorted key itself may get destroied */
3751 incrRefCount(sortval
);
3753 /* The SORT command has an SQL-alike syntax, parse it */
3754 while(j
< c
->argc
) {
3755 int leftargs
= c
->argc
-j
-1;
3756 if (!strcasecmp(c
->argv
[j
]->ptr
,"asc")) {
3758 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"desc")) {
3760 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"alpha")) {
3762 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"limit") && leftargs
>= 2) {
3763 limit_start
= atoi(c
->argv
[j
+1]->ptr
);
3764 limit_count
= atoi(c
->argv
[j
+2]->ptr
);
3766 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"by") && leftargs
>= 1) {
3767 sortby
= c
->argv
[j
+1];
3768 /* If the BY pattern does not contain '*', i.e. it is constant,
3769 * we don't need to sort nor to lookup the weight keys. */
3770 if (strchr(c
->argv
[j
+1]->ptr
,'*') == NULL
) dontsort
= 1;
3772 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"get") && leftargs
>= 1) {
3773 listAddNodeTail(operations
,createSortOperation(
3774 REDIS_SORT_GET
,c
->argv
[j
+1]));
3777 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"del") && leftargs
>= 1) {
3778 listAddNodeTail(operations
,createSortOperation(
3779 REDIS_SORT_DEL
,c
->argv
[j
+1]));
3781 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"incr") && leftargs
>= 1) {
3782 listAddNodeTail(operations
,createSortOperation(
3783 REDIS_SORT_INCR
,c
->argv
[j
+1]));
3785 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"get") && leftargs
>= 1) {
3786 listAddNodeTail(operations
,createSortOperation(
3787 REDIS_SORT_DECR
,c
->argv
[j
+1]));
3790 decrRefCount(sortval
);
3791 listRelease(operations
);
3792 addReply(c
,shared
.syntaxerr
);
3798 /* Load the sorting vector with all the objects to sort */
3799 vectorlen
= (sortval
->type
== REDIS_LIST
) ?
3800 listLength((list
*)sortval
->ptr
) :
3801 dictSize((dict
*)sortval
->ptr
);
3802 vector
= zmalloc(sizeof(redisSortObject
)*vectorlen
);
3803 if (!vector
) oom("allocating objects vector for SORT");
3805 if (sortval
->type
== REDIS_LIST
) {
3806 list
*list
= sortval
->ptr
;
3810 while((ln
= listYield(list
))) {
3811 robj
*ele
= ln
->value
;
3812 vector
[j
].obj
= ele
;
3813 vector
[j
].u
.score
= 0;
3814 vector
[j
].u
.cmpobj
= NULL
;
3818 dict
*set
= sortval
->ptr
;
3822 di
= dictGetIterator(set
);
3823 if (!di
) oom("dictGetIterator");
3824 while((setele
= dictNext(di
)) != NULL
) {
3825 vector
[j
].obj
= dictGetEntryKey(setele
);
3826 vector
[j
].u
.score
= 0;
3827 vector
[j
].u
.cmpobj
= NULL
;
3830 dictReleaseIterator(di
);
3832 assert(j
== vectorlen
);
3834 /* Now it's time to load the right scores in the sorting vector */
3835 if (dontsort
== 0) {
3836 for (j
= 0; j
< vectorlen
; j
++) {
3840 byval
= lookupKeyByPattern(c
->db
,sortby
,vector
[j
].obj
);
3841 if (!byval
|| byval
->type
!= REDIS_STRING
) continue;
3843 if (byval
->encoding
== REDIS_ENCODING_RAW
) {
3844 vector
[j
].u
.cmpobj
= byval
;
3845 incrRefCount(byval
);
3847 vector
[j
].u
.cmpobj
= getDecodedObject(byval
);
3850 if (byval
->encoding
== REDIS_ENCODING_RAW
) {
3851 vector
[j
].u
.score
= strtod(byval
->ptr
,NULL
);
3853 if (byval
->encoding
== REDIS_ENCODING_INT
) {
3854 vector
[j
].u
.score
= (long)byval
->ptr
;
3861 if (vector
[j
].obj
->encoding
== REDIS_ENCODING_RAW
)
3862 vector
[j
].u
.score
= strtod(vector
[j
].obj
->ptr
,NULL
);
3864 if (vector
[j
].obj
->encoding
== REDIS_ENCODING_INT
)
3865 vector
[j
].u
.score
= (long) vector
[j
].obj
->ptr
;
3874 /* We are ready to sort the vector... perform a bit of sanity check
3875 * on the LIMIT option too. We'll use a partial version of quicksort. */
3876 start
= (limit_start
< 0) ? 0 : limit_start
;
3877 end
= (limit_count
< 0) ? vectorlen
-1 : start
+limit_count
-1;
3878 if (start
>= vectorlen
) {
3879 start
= vectorlen
-1;
3882 if (end
>= vectorlen
) end
= vectorlen
-1;
3884 if (dontsort
== 0) {
3885 server
.sort_desc
= desc
;
3886 server
.sort_alpha
= alpha
;
3887 server
.sort_bypattern
= sortby
? 1 : 0;
3888 if (sortby
&& (start
!= 0 || end
!= vectorlen
-1))
3889 pqsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
, start
,end
);
3891 qsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
);
3894 /* Send command output to the output buffer, performing the specified
3895 * GET/DEL/INCR/DECR operations if any. */
3896 outputlen
= getop
? getop
*(end
-start
+1) : end
-start
+1;
3897 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",outputlen
));
3898 for (j
= start
; j
<= end
; j
++) {
3901 addReplyBulkLen(c
,vector
[j
].obj
);
3902 addReply(c
,vector
[j
].obj
);
3903 addReply(c
,shared
.crlf
);
3905 listRewind(operations
);
3906 while((ln
= listYield(operations
))) {
3907 redisSortOperation
*sop
= ln
->value
;
3908 robj
*val
= lookupKeyByPattern(c
->db
,sop
->pattern
,
3911 if (sop
->type
== REDIS_SORT_GET
) {
3912 if (!val
|| val
->type
!= REDIS_STRING
) {
3913 addReply(c
,shared
.nullbulk
);
3915 addReplyBulkLen(c
,val
);
3917 addReply(c
,shared
.crlf
);
3919 } else if (sop
->type
== REDIS_SORT_DEL
) {
3926 decrRefCount(sortval
);
3927 listRelease(operations
);
3928 for (j
= 0; j
< vectorlen
; j
++) {
3929 if (sortby
&& alpha
&& vector
[j
].u
.cmpobj
)
3930 decrRefCount(vector
[j
].u
.cmpobj
);
3935 static void infoCommand(redisClient
*c
) {
3937 time_t uptime
= time(NULL
)-server
.stat_starttime
;
3940 info
= sdscatprintf(sdsempty(),
3941 "redis_version:%s\r\n"
3943 "uptime_in_seconds:%d\r\n"
3944 "uptime_in_days:%d\r\n"
3945 "connected_clients:%d\r\n"
3946 "connected_slaves:%d\r\n"
3947 "used_memory:%zu\r\n"
3948 "changes_since_last_save:%lld\r\n"
3949 "bgsave_in_progress:%d\r\n"
3950 "last_save_time:%d\r\n"
3951 "total_connections_received:%lld\r\n"
3952 "total_commands_processed:%lld\r\n"
3955 (sizeof(long) == 8) ? "64" : "32",
3958 listLength(server
.clients
)-listLength(server
.slaves
),
3959 listLength(server
.slaves
),
3962 server
.bgsaveinprogress
,
3964 server
.stat_numconnections
,
3965 server
.stat_numcommands
,
3966 server
.masterhost
== NULL
? "master" : "slave"
3968 if (server
.masterhost
) {
3969 info
= sdscatprintf(info
,
3970 "master_host:%s\r\n"
3971 "master_port:%d\r\n"
3972 "master_link_status:%s\r\n"
3973 "master_last_io_seconds_ago:%d\r\n"
3976 (server
.replstate
== REDIS_REPL_CONNECTED
) ?
3978 (int)(time(NULL
)-server
.master
->lastinteraction
)
3981 for (j
= 0; j
< server
.dbnum
; j
++) {
3982 long long keys
, vkeys
;
3984 keys
= dictSize(server
.db
[j
].dict
);
3985 vkeys
= dictSize(server
.db
[j
].expires
);
3986 if (keys
|| vkeys
) {
3987 info
= sdscatprintf(info
, "db%d: keys=%lld,expires=%lld\r\n",
3991 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info
)));
3992 addReplySds(c
,info
);
3993 addReply(c
,shared
.crlf
);
3996 static void monitorCommand(redisClient
*c
) {
3997 /* ignore MONITOR if aleady slave or in monitor mode */
3998 if (c
->flags
& REDIS_SLAVE
) return;
4000 c
->flags
|= (REDIS_SLAVE
|REDIS_MONITOR
);
4002 if (!listAddNodeTail(server
.monitors
,c
)) oom("listAddNodeTail");
4003 addReply(c
,shared
.ok
);
4006 /* ================================= Expire ================================= */
4007 static int removeExpire(redisDb
*db
, robj
*key
) {
4008 if (dictDelete(db
->expires
,key
) == DICT_OK
) {
4015 static int setExpire(redisDb
*db
, robj
*key
, time_t when
) {
4016 if (dictAdd(db
->expires
,key
,(void*)when
) == DICT_ERR
) {
4024 /* Return the expire time of the specified key, or -1 if no expire
4025 * is associated with this key (i.e. the key is non volatile) */
4026 static time_t getExpire(redisDb
*db
, robj
*key
) {
4029 /* No expire? return ASAP */
4030 if (dictSize(db
->expires
) == 0 ||
4031 (de
= dictFind(db
->expires
,key
)) == NULL
) return -1;
4033 return (time_t) dictGetEntryVal(de
);
4036 static int expireIfNeeded(redisDb
*db
, robj
*key
) {
4040 /* No expire? return ASAP */
4041 if (dictSize(db
->expires
) == 0 ||
4042 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
4044 /* Lookup the expire */
4045 when
= (time_t) dictGetEntryVal(de
);
4046 if (time(NULL
) <= when
) return 0;
4048 /* Delete the key */
4049 dictDelete(db
->expires
,key
);
4050 return dictDelete(db
->dict
,key
) == DICT_OK
;
4053 static int deleteIfVolatile(redisDb
*db
, robj
*key
) {
4056 /* No expire? return ASAP */
4057 if (dictSize(db
->expires
) == 0 ||
4058 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
4060 /* Delete the key */
4062 dictDelete(db
->expires
,key
);
4063 return dictDelete(db
->dict
,key
) == DICT_OK
;
4066 static void expireCommand(redisClient
*c
) {
4068 int seconds
= atoi(c
->argv
[2]->ptr
);
4070 de
= dictFind(c
->db
->dict
,c
->argv
[1]);
4072 addReply(c
,shared
.czero
);
4076 addReply(c
, shared
.czero
);
4079 time_t when
= time(NULL
)+seconds
;
4080 if (setExpire(c
->db
,c
->argv
[1],when
)) {
4081 addReply(c
,shared
.cone
);
4084 addReply(c
,shared
.czero
);
4090 static void ttlCommand(redisClient
*c
) {
4094 expire
= getExpire(c
->db
,c
->argv
[1]);
4096 ttl
= (int) (expire
-time(NULL
));
4097 if (ttl
< 0) ttl
= -1;
4099 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",ttl
));
4102 static void msetGenericCommand(redisClient
*c
, int nx
) {
4105 if ((c
->argc
% 2) == 0) {
4106 addReplySds(c
,sdsnew("-ERR wrong number of arguments\r\n"));
4109 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
4110 * set nothing at all if at least one already key exists. */
4112 for (j
= 1; j
< c
->argc
; j
+= 2) {
4113 if (dictFind(c
->db
->dict
,c
->argv
[j
]) != NULL
) {
4114 addReply(c
, shared
.czero
);
4120 for (j
= 1; j
< c
->argc
; j
+= 2) {
4121 dictAdd(c
->db
->dict
,c
->argv
[j
],c
->argv
[j
+1]);
4122 incrRefCount(c
->argv
[j
]);
4123 incrRefCount(c
->argv
[j
+1]);
4124 removeExpire(c
->db
,c
->argv
[j
]);
4126 server
.dirty
+= (c
->argc
-1)/2;
4127 addReply(c
, nx
? shared
.cone
: shared
.ok
);
4130 static void msetCommand(redisClient
*c
) {
4131 msetGenericCommand(c
,0);
4134 static void msetnxCommand(redisClient
*c
) {
4135 msetGenericCommand(c
,1);
4138 /* =============================== Replication ============================= */
4140 static int syncWrite(int fd
, char *ptr
, ssize_t size
, int timeout
) {
4141 ssize_t nwritten
, ret
= size
;
4142 time_t start
= time(NULL
);
4146 if (aeWait(fd
,AE_WRITABLE
,1000) & AE_WRITABLE
) {
4147 nwritten
= write(fd
,ptr
,size
);
4148 if (nwritten
== -1) return -1;
4152 if ((time(NULL
)-start
) > timeout
) {
4160 static int syncRead(int fd
, char *ptr
, ssize_t size
, int timeout
) {
4161 ssize_t nread
, totread
= 0;
4162 time_t start
= time(NULL
);
4166 if (aeWait(fd
,AE_READABLE
,1000) & AE_READABLE
) {
4167 nread
= read(fd
,ptr
,size
);
4168 if (nread
== -1) return -1;
4173 if ((time(NULL
)-start
) > timeout
) {
4181 static int syncReadLine(int fd
, char *ptr
, ssize_t size
, int timeout
) {
4188 if (syncRead(fd
,&c
,1,timeout
) == -1) return -1;
4191 if (nread
&& *(ptr
-1) == '\r') *(ptr
-1) = '\0';
4202 static void syncCommand(redisClient
*c
) {
4203 /* ignore SYNC if aleady slave or in monitor mode */
4204 if (c
->flags
& REDIS_SLAVE
) return;
4206 /* SYNC can't be issued when the server has pending data to send to
4207 * the client about already issued commands. We need a fresh reply
4208 * buffer registering the differences between the BGSAVE and the current
4209 * dataset, so that we can copy to other slaves if needed. */
4210 if (listLength(c
->reply
) != 0) {
4211 addReplySds(c
,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
4215 redisLog(REDIS_NOTICE
,"Slave ask for synchronization");
4216 /* Here we need to check if there is a background saving operation
4217 * in progress, or if it is required to start one */
4218 if (server
.bgsaveinprogress
) {
4219 /* Ok a background save is in progress. Let's check if it is a good
4220 * one for replication, i.e. if there is another slave that is
4221 * registering differences since the server forked to save */
4225 listRewind(server
.slaves
);
4226 while((ln
= listYield(server
.slaves
))) {
4228 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) break;
4231 /* Perfect, the server is already registering differences for
4232 * another slave. Set the right state, and copy the buffer. */
4233 listRelease(c
->reply
);
4234 c
->reply
= listDup(slave
->reply
);
4235 if (!c
->reply
) oom("listDup copying slave reply list");
4236 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
4237 redisLog(REDIS_NOTICE
,"Waiting for end of BGSAVE for SYNC");
4239 /* No way, we need to wait for the next BGSAVE in order to
4240 * register differences */
4241 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_START
;
4242 redisLog(REDIS_NOTICE
,"Waiting for next BGSAVE for SYNC");
4245 /* Ok we don't have a BGSAVE in progress, let's start one */
4246 redisLog(REDIS_NOTICE
,"Starting BGSAVE for SYNC");
4247 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
4248 redisLog(REDIS_NOTICE
,"Replication failed, can't BGSAVE");
4249 addReplySds(c
,sdsnew("-ERR Unalbe to perform background save\r\n"));
4252 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
4255 c
->flags
|= REDIS_SLAVE
;
4257 if (!listAddNodeTail(server
.slaves
,c
)) oom("listAddNodeTail");
4261 static void sendBulkToSlave(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
4262 redisClient
*slave
= privdata
;
4264 REDIS_NOTUSED(mask
);
4265 char buf
[REDIS_IOBUF_LEN
];
4266 ssize_t nwritten
, buflen
;
4268 if (slave
->repldboff
== 0) {
4269 /* Write the bulk write count before to transfer the DB. In theory here
4270 * we don't know how much room there is in the output buffer of the
4271 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
4272 * operations) will never be smaller than the few bytes we need. */
4275 bulkcount
= sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
4277 if (write(fd
,bulkcount
,sdslen(bulkcount
)) != (signed)sdslen(bulkcount
))
4285 lseek(slave
->repldbfd
,slave
->repldboff
,SEEK_SET
);
4286 buflen
= read(slave
->repldbfd
,buf
,REDIS_IOBUF_LEN
);
4288 redisLog(REDIS_WARNING
,"Read error sending DB to slave: %s",
4289 (buflen
== 0) ? "premature EOF" : strerror(errno
));
4293 if ((nwritten
= write(fd
,buf
,buflen
)) == -1) {
4294 redisLog(REDIS_DEBUG
,"Write error sending DB to slave: %s",
4299 slave
->repldboff
+= nwritten
;
4300 if (slave
->repldboff
== slave
->repldbsize
) {
4301 close(slave
->repldbfd
);
4302 slave
->repldbfd
= -1;
4303 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
4304 slave
->replstate
= REDIS_REPL_ONLINE
;
4305 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
,
4306 sendReplyToClient
, slave
, NULL
) == AE_ERR
) {
4310 addReplySds(slave
,sdsempty());
4311 redisLog(REDIS_NOTICE
,"Synchronization with slave succeeded");
4315 /* This function is called at the end of every backgrond saving.
4316 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
4317 * otherwise REDIS_ERR is passed to the function.
4319 * The goal of this function is to handle slaves waiting for a successful
4320 * background saving in order to perform non-blocking synchronization. */
4321 static void updateSlavesWaitingBgsave(int bgsaveerr
) {
4323 int startbgsave
= 0;
4325 listRewind(server
.slaves
);
4326 while((ln
= listYield(server
.slaves
))) {
4327 redisClient
*slave
= ln
->value
;
4329 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) {
4331 slave
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
4332 } else if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) {
4333 struct redis_stat buf
;
4335 if (bgsaveerr
!= REDIS_OK
) {
4337 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE child returned an error");
4340 if ((slave
->repldbfd
= open(server
.dbfilename
,O_RDONLY
)) == -1 ||
4341 redis_fstat(slave
->repldbfd
,&buf
) == -1) {
4343 redisLog(REDIS_WARNING
,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno
));
4346 slave
->repldboff
= 0;
4347 slave
->repldbsize
= buf
.st_size
;
4348 slave
->replstate
= REDIS_REPL_SEND_BULK
;
4349 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
4350 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
, sendBulkToSlave
, slave
, NULL
) == AE_ERR
) {
4357 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
4358 listRewind(server
.slaves
);
4359 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE failed");
4360 while((ln
= listYield(server
.slaves
))) {
4361 redisClient
*slave
= ln
->value
;
4363 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
)
4370 static int syncWithMaster(void) {
4371 char buf
[1024], tmpfile
[256];
4373 int fd
= anetTcpConnect(NULL
,server
.masterhost
,server
.masterport
);
4377 redisLog(REDIS_WARNING
,"Unable to connect to MASTER: %s",
4381 /* Issue the SYNC command */
4382 if (syncWrite(fd
,"SYNC \r\n",7,5) == -1) {
4384 redisLog(REDIS_WARNING
,"I/O error writing to MASTER: %s",
4388 /* Read the bulk write count */
4389 if (syncReadLine(fd
,buf
,1024,3600) == -1) {
4391 redisLog(REDIS_WARNING
,"I/O error reading bulk count from MASTER: %s",
4395 dumpsize
= atoi(buf
+1);
4396 redisLog(REDIS_NOTICE
,"Receiving %d bytes data dump from MASTER",dumpsize
);
4397 /* Read the bulk write data on a temp file */
4398 snprintf(tmpfile
,256,"temp-%d.%ld.rdb",(int)time(NULL
),(long int)random());
4399 dfd
= open(tmpfile
,O_CREAT
|O_WRONLY
,0644);
4402 redisLog(REDIS_WARNING
,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno
));
4406 int nread
, nwritten
;
4408 nread
= read(fd
,buf
,(dumpsize
< 1024)?dumpsize
:1024);
4410 redisLog(REDIS_WARNING
,"I/O error trying to sync with MASTER: %s",
4416 nwritten
= write(dfd
,buf
,nread
);
4417 if (nwritten
== -1) {
4418 redisLog(REDIS_WARNING
,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno
));
4426 if (rename(tmpfile
,server
.dbfilename
) == -1) {
4427 redisLog(REDIS_WARNING
,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno
));
4433 if (rdbLoad(server
.dbfilename
) != REDIS_OK
) {
4434 redisLog(REDIS_WARNING
,"Failed trying to load the MASTER synchronization DB from disk");
4438 server
.master
= createClient(fd
);
4439 server
.master
->flags
|= REDIS_MASTER
;
4440 server
.replstate
= REDIS_REPL_CONNECTED
;
4444 static void slaveofCommand(redisClient
*c
) {
4445 if (!strcasecmp(c
->argv
[1]->ptr
,"no") &&
4446 !strcasecmp(c
->argv
[2]->ptr
,"one")) {
4447 if (server
.masterhost
) {
4448 sdsfree(server
.masterhost
);
4449 server
.masterhost
= NULL
;
4450 if (server
.master
) freeClient(server
.master
);
4451 server
.replstate
= REDIS_REPL_NONE
;
4452 redisLog(REDIS_NOTICE
,"MASTER MODE enabled (user request)");
4455 sdsfree(server
.masterhost
);
4456 server
.masterhost
= sdsdup(c
->argv
[1]->ptr
);
4457 server
.masterport
= atoi(c
->argv
[2]->ptr
);
4458 if (server
.master
) freeClient(server
.master
);
4459 server
.replstate
= REDIS_REPL_CONNECT
;
4460 redisLog(REDIS_NOTICE
,"SLAVE OF %s:%d enabled (user request)",
4461 server
.masterhost
, server
.masterport
);
4463 addReply(c
,shared
.ok
);
4466 /* ============================ Maxmemory directive ======================== */
4468 /* This function gets called when 'maxmemory' is set on the config file to limit
4469 * the max memory used by the server, and we are out of memory.
4470 * This function will try to, in order:
4472 * - Free objects from the free list
4473 * - Try to remove keys with an EXPIRE set
4475 * It is not possible to free enough memory to reach used-memory < maxmemory
4476 * the server will start refusing commands that will enlarge even more the
4479 static void freeMemoryIfNeeded(void) {
4480 while (server
.maxmemory
&& zmalloc_used_memory() > server
.maxmemory
) {
4481 if (listLength(server
.objfreelist
)) {
4484 listNode
*head
= listFirst(server
.objfreelist
);
4485 o
= listNodeValue(head
);
4486 listDelNode(server
.objfreelist
,head
);
4489 int j
, k
, freed
= 0;
4491 for (j
= 0; j
< server
.dbnum
; j
++) {
4493 robj
*minkey
= NULL
;
4494 struct dictEntry
*de
;
4496 if (dictSize(server
.db
[j
].expires
)) {
4498 /* From a sample of three keys drop the one nearest to
4499 * the natural expire */
4500 for (k
= 0; k
< 3; k
++) {
4503 de
= dictGetRandomKey(server
.db
[j
].expires
);
4504 t
= (time_t) dictGetEntryVal(de
);
4505 if (minttl
== -1 || t
< minttl
) {
4506 minkey
= dictGetEntryKey(de
);
4510 deleteKey(server
.db
+j
,minkey
);
4513 if (!freed
) return; /* nothing to free... */
4518 /* ================================= Debugging ============================== */
4520 static void debugCommand(redisClient
*c
) {
4521 if (!strcasecmp(c
->argv
[1]->ptr
,"segfault")) {
4523 } else if (!strcasecmp(c
->argv
[1]->ptr
,"object") && c
->argc
== 3) {
4524 dictEntry
*de
= dictFind(c
->db
->dict
,c
->argv
[2]);
4528 addReply(c
,shared
.nokeyerr
);
4531 key
= dictGetEntryKey(de
);
4532 val
= dictGetEntryVal(de
);
4533 addReplySds(c
,sdscatprintf(sdsempty(),
4534 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
4535 key
, key
->refcount
, val
, val
->refcount
, val
->encoding
));
4537 addReplySds(c
,sdsnew(
4538 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>]\r\n"));
4542 #ifdef HAVE_BACKTRACE
4543 static struct redisFunctionSym symsTable
[] = {
4544 {"compareStringObjects", (unsigned long)compareStringObjects
},
4545 {"isStringRepresentableAsLong", (unsigned long)isStringRepresentableAsLong
},
4546 {"dictEncObjKeyCompare", (unsigned long)dictEncObjKeyCompare
},
4547 {"dictEncObjHash", (unsigned long)dictEncObjHash
},
4548 {"incrDecrCommand", (unsigned long)incrDecrCommand
},
4549 {"freeStringObject", (unsigned long)freeStringObject
},
4550 {"freeListObject", (unsigned long)freeListObject
},
4551 {"freeSetObject", (unsigned long)freeSetObject
},
4552 {"decrRefCount", (unsigned long)decrRefCount
},
4553 {"createObject", (unsigned long)createObject
},
4554 {"freeClient", (unsigned long)freeClient
},
4555 {"rdbLoad", (unsigned long)rdbLoad
},
4556 {"rdbSaveStringObject", (unsigned long)rdbSaveStringObject
},
4557 {"rdbSaveStringObjectRaw", (unsigned long)rdbSaveStringObjectRaw
},
4558 {"addReply", (unsigned long)addReply
},
4559 {"addReplySds", (unsigned long)addReplySds
},
4560 {"incrRefCount", (unsigned long)incrRefCount
},
4561 {"rdbSaveBackground", (unsigned long)rdbSaveBackground
},
4562 {"createStringObject", (unsigned long)createStringObject
},
4563 {"replicationFeedSlaves", (unsigned long)replicationFeedSlaves
},
4564 {"syncWithMaster", (unsigned long)syncWithMaster
},
4565 {"tryObjectSharing", (unsigned long)tryObjectSharing
},
4566 {"tryObjectEncoding", (unsigned long)tryObjectEncoding
},
4567 {"getDecodedObject", (unsigned long)getDecodedObject
},
4568 {"removeExpire", (unsigned long)removeExpire
},
4569 {"expireIfNeeded", (unsigned long)expireIfNeeded
},
4570 {"deleteIfVolatile", (unsigned long)deleteIfVolatile
},
4571 {"deleteKey", (unsigned long)deleteKey
},
4572 {"getExpire", (unsigned long)getExpire
},
4573 {"setExpire", (unsigned long)setExpire
},
4574 {"updateSlavesWaitingBgsave", (unsigned long)updateSlavesWaitingBgsave
},
4575 {"freeMemoryIfNeeded", (unsigned long)freeMemoryIfNeeded
},
4576 {"authCommand", (unsigned long)authCommand
},
4577 {"pingCommand", (unsigned long)pingCommand
},
4578 {"echoCommand", (unsigned long)echoCommand
},
4579 {"setCommand", (unsigned long)setCommand
},
4580 {"setnxCommand", (unsigned long)setnxCommand
},
4581 {"getCommand", (unsigned long)getCommand
},
4582 {"delCommand", (unsigned long)delCommand
},
4583 {"existsCommand", (unsigned long)existsCommand
},
4584 {"incrCommand", (unsigned long)incrCommand
},
4585 {"decrCommand", (unsigned long)decrCommand
},
4586 {"incrbyCommand", (unsigned long)incrbyCommand
},
4587 {"decrbyCommand", (unsigned long)decrbyCommand
},
4588 {"selectCommand", (unsigned long)selectCommand
},
4589 {"randomkeyCommand", (unsigned long)randomkeyCommand
},
4590 {"keysCommand", (unsigned long)keysCommand
},
4591 {"dbsizeCommand", (unsigned long)dbsizeCommand
},
4592 {"lastsaveCommand", (unsigned long)lastsaveCommand
},
4593 {"saveCommand", (unsigned long)saveCommand
},
4594 {"bgsaveCommand", (unsigned long)bgsaveCommand
},
4595 {"shutdownCommand", (unsigned long)shutdownCommand
},
4596 {"moveCommand", (unsigned long)moveCommand
},
4597 {"renameCommand", (unsigned long)renameCommand
},
4598 {"renamenxCommand", (unsigned long)renamenxCommand
},
4599 {"lpushCommand", (unsigned long)lpushCommand
},
4600 {"rpushCommand", (unsigned long)rpushCommand
},
4601 {"lpopCommand", (unsigned long)lpopCommand
},
4602 {"rpopCommand", (unsigned long)rpopCommand
},
4603 {"llenCommand", (unsigned long)llenCommand
},
4604 {"lindexCommand", (unsigned long)lindexCommand
},
4605 {"lrangeCommand", (unsigned long)lrangeCommand
},
4606 {"ltrimCommand", (unsigned long)ltrimCommand
},
4607 {"typeCommand", (unsigned long)typeCommand
},
4608 {"lsetCommand", (unsigned long)lsetCommand
},
4609 {"saddCommand", (unsigned long)saddCommand
},
4610 {"sremCommand", (unsigned long)sremCommand
},
4611 {"smoveCommand", (unsigned long)smoveCommand
},
4612 {"sismemberCommand", (unsigned long)sismemberCommand
},
4613 {"scardCommand", (unsigned long)scardCommand
},
4614 {"spopCommand", (unsigned long)spopCommand
},
4615 {"sinterCommand", (unsigned long)sinterCommand
},
4616 {"sinterstoreCommand", (unsigned long)sinterstoreCommand
},
4617 {"sunionCommand", (unsigned long)sunionCommand
},
4618 {"sunionstoreCommand", (unsigned long)sunionstoreCommand
},
4619 {"sdiffCommand", (unsigned long)sdiffCommand
},
4620 {"sdiffstoreCommand", (unsigned long)sdiffstoreCommand
},
4621 {"syncCommand", (unsigned long)syncCommand
},
4622 {"flushdbCommand", (unsigned long)flushdbCommand
},
4623 {"flushallCommand", (unsigned long)flushallCommand
},
4624 {"sortCommand", (unsigned long)sortCommand
},
4625 {"lremCommand", (unsigned long)lremCommand
},
4626 {"infoCommand", (unsigned long)infoCommand
},
4627 {"mgetCommand", (unsigned long)mgetCommand
},
4628 {"monitorCommand", (unsigned long)monitorCommand
},
4629 {"expireCommand", (unsigned long)expireCommand
},
4630 {"getsetCommand", (unsigned long)getsetCommand
},
4631 {"ttlCommand", (unsigned long)ttlCommand
},
4632 {"slaveofCommand", (unsigned long)slaveofCommand
},
4633 {"debugCommand", (unsigned long)debugCommand
},
4634 {"processCommand", (unsigned long)processCommand
},
4635 {"setupSigSegvAction", (unsigned long)setupSigSegvAction
},
4636 {"readQueryFromClient", (unsigned long)readQueryFromClient
},
4637 {"rdbRemoveTempFile", (unsigned long)rdbRemoveTempFile
},
4638 {"msetGenericCommand", (unsigned long)msetGenericCommand
},
4639 {"msetCommand", (unsigned long)msetCommand
},
4640 {"msetnxCommand", (unsigned long)msetnxCommand
},
4644 /* This function try to convert a pointer into a function name. It's used in
4645 * oreder to provide a backtrace under segmentation fault that's able to
4646 * display functions declared as static (otherwise the backtrace is useless). */
4647 static char *findFuncName(void *pointer
, unsigned long *offset
){
4649 unsigned long off
, minoff
= 0;
4651 /* Try to match against the Symbol with the smallest offset */
4652 for (i
=0; symsTable
[i
].pointer
; i
++) {
4653 unsigned long lp
= (unsigned long) pointer
;
4655 if (lp
!= (unsigned long)-1 && lp
>= symsTable
[i
].pointer
) {
4656 off
=lp
-symsTable
[i
].pointer
;
4657 if (ret
< 0 || off
< minoff
) {
4663 if (ret
== -1) return NULL
;
4665 return symsTable
[ret
].name
;
4668 static void *getMcontextEip(ucontext_t
*uc
) {
4669 #if defined(__FreeBSD__)
4670 return (void*) uc
->uc_mcontext
.mc_eip
;
4671 #elif defined(__dietlibc__)
4672 return (void*) uc
->uc_mcontext
.eip
;
4673 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
4674 return (void*) uc
->uc_mcontext
->__ss
.__eip
;
4675 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
4676 #ifdef _STRUCT_X86_THREAD_STATE64
4677 return (void*) uc
->uc_mcontext
->__ss
.__rip
;
4679 return (void*) uc
->uc_mcontext
->__ss
.__eip
;
4681 #elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
4682 return (void*) uc
->uc_mcontext
.gregs
[REG_EIP
];
4683 #elif defined(__ia64__) /* Linux IA64 */
4684 return (void*) uc
->uc_mcontext
.sc_ip
;
4690 static void segvHandler(int sig
, siginfo_t
*info
, void *secret
) {
4692 char **messages
= NULL
;
4693 int i
, trace_size
= 0;
4694 unsigned long offset
=0;
4695 time_t uptime
= time(NULL
)-server
.stat_starttime
;
4696 ucontext_t
*uc
= (ucontext_t
*) secret
;
4697 REDIS_NOTUSED(info
);
4699 redisLog(REDIS_WARNING
,
4700 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION
, sig
);
4701 redisLog(REDIS_WARNING
, "%s", sdscatprintf(sdsempty(),
4702 "redis_version:%s; "
4703 "uptime_in_seconds:%d; "
4704 "connected_clients:%d; "
4705 "connected_slaves:%d; "
4707 "changes_since_last_save:%lld; "
4708 "bgsave_in_progress:%d; "
4709 "last_save_time:%d; "
4710 "total_connections_received:%lld; "
4711 "total_commands_processed:%lld; "
4715 listLength(server
.clients
)-listLength(server
.slaves
),
4716 listLength(server
.slaves
),
4719 server
.bgsaveinprogress
,
4721 server
.stat_numconnections
,
4722 server
.stat_numcommands
,
4723 server
.masterhost
== NULL
? "master" : "slave"
4726 trace_size
= backtrace(trace
, 100);
4727 /* overwrite sigaction with caller's address */
4728 if (getMcontextEip(uc
) != NULL
) {
4729 trace
[1] = getMcontextEip(uc
);
4731 messages
= backtrace_symbols(trace
, trace_size
);
4733 for (i
=1; i
<trace_size
; ++i
) {
4734 char *fn
= findFuncName(trace
[i
], &offset
), *p
;
4736 p
= strchr(messages
[i
],'+');
4737 if (!fn
|| (p
&& ((unsigned long)strtol(p
+1,NULL
,10)) < offset
)) {
4738 redisLog(REDIS_WARNING
,"%s", messages
[i
]);
4740 redisLog(REDIS_WARNING
,"%d redis-server %p %s + %d", i
, trace
[i
], fn
, (unsigned int)offset
);
4747 static void setupSigSegvAction(void) {
4748 struct sigaction act
;
4750 sigemptyset (&act
.sa_mask
);
4751 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
4752 * is used. Otherwise, sa_handler is used */
4753 act
.sa_flags
= SA_NODEFER
| SA_ONSTACK
| SA_RESETHAND
| SA_SIGINFO
;
4754 act
.sa_sigaction
= segvHandler
;
4755 sigaction (SIGSEGV
, &act
, NULL
);
4756 sigaction (SIGBUS
, &act
, NULL
);
4757 sigaction (SIGFPE
, &act
, NULL
);
4758 sigaction (SIGILL
, &act
, NULL
);
4759 sigaction (SIGBUS
, &act
, NULL
);
4762 #else /* HAVE_BACKTRACE */
4763 static void setupSigSegvAction(void) {
4765 #endif /* HAVE_BACKTRACE */
4767 /* =================================== Main! ================================ */
4770 int linuxOvercommitMemoryValue(void) {
4771 FILE *fp
= fopen("/proc/sys/vm/overcommit_memory","r");
4775 if (fgets(buf
,64,fp
) == NULL
) {
4784 void linuxOvercommitMemoryWarning(void) {
4785 if (linuxOvercommitMemoryValue() == 0) {
4786 redisLog(REDIS_WARNING
,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
4789 #endif /* __linux__ */
4791 static void daemonize(void) {
4795 if (fork() != 0) exit(0); /* parent exits */
4796 setsid(); /* create a new session */
4798 /* Every output goes to /dev/null. If Redis is daemonized but
4799 * the 'logfile' is set to 'stdout' in the configuration file
4800 * it will not log at all. */
4801 if ((fd
= open("/dev/null", O_RDWR
, 0)) != -1) {
4802 dup2(fd
, STDIN_FILENO
);
4803 dup2(fd
, STDOUT_FILENO
);
4804 dup2(fd
, STDERR_FILENO
);
4805 if (fd
> STDERR_FILENO
) close(fd
);
4807 /* Try to write the pid file */
4808 fp
= fopen(server
.pidfile
,"w");
4810 fprintf(fp
,"%d\n",getpid());
4815 int main(int argc
, char **argv
) {
4818 ResetServerSaveParams();
4819 loadServerConfig(argv
[1]);
4820 } else if (argc
> 2) {
4821 fprintf(stderr
,"Usage: ./redis-server [/path/to/redis.conf]\n");
4824 redisLog(REDIS_WARNING
,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
4827 if (server
.daemonize
) daemonize();
4828 redisLog(REDIS_NOTICE
,"Server started, Redis version " REDIS_VERSION
);
4830 linuxOvercommitMemoryWarning();
4832 if (rdbLoad(server
.dbfilename
) == REDIS_OK
)
4833 redisLog(REDIS_NOTICE
,"DB loaded from disk");
4834 if (aeCreateFileEvent(server
.el
, server
.fd
, AE_READABLE
,
4835 acceptHandler
, NULL
, NULL
) == AE_ERR
) oom("creating file event");
4836 redisLog(REDIS_NOTICE
,"The server is now ready to accept connections on port %d", server
.port
);
4838 aeDeleteEventLoop(server
.el
);