2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
30 #define REDIS_VERSION "1.000"
40 #define __USE_POSIX199309
46 #endif /* HAVE_BACKTRACE */
54 #include <arpa/inet.h>
58 #include <sys/resource.h>
62 #include "ae.h" /* Event driven programming library */
63 #include "sds.h" /* Dynamic safe strings */
64 #include "anet.h" /* Networking the easy way */
65 #include "dict.h" /* Hash tables */
66 #include "adlist.h" /* Linked lists */
67 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
68 #include "lzf.h" /* LZF compression library */
69 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
75 /* Static server configuration */
76 #define REDIS_SERVERPORT 6379 /* TCP port */
77 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
78 #define REDIS_IOBUF_LEN 1024
79 #define REDIS_LOADBUF_LEN 1024
80 #define REDIS_STATIC_ARGS 4
81 #define REDIS_DEFAULT_DBNUM 16
82 #define REDIS_CONFIGLINE_MAX 1024
83 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
84 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
85 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
86 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
87 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
89 /* Hash table parameters */
90 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
93 #define REDIS_CMD_BULK 1 /* Bulk write command */
94 #define REDIS_CMD_INLINE 2 /* Inline command */
95 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
96 this flags will return an error when the 'maxmemory' option is set in the
97 config file and the server is using more than maxmemory bytes of memory.
98 In short this commands are denied on low memory conditions. */
99 #define REDIS_CMD_DENYOOM 4
102 #define REDIS_STRING 0
107 /* Objects encoding */
108 #define REDIS_ENCODING_RAW 0 /* Raw representation */
109 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
111 /* Object types only used for dumping to disk */
112 #define REDIS_EXPIRETIME 253
113 #define REDIS_SELECTDB 254
114 #define REDIS_EOF 255
116 /* Defines related to the dump file format. To store 32 bits lengths for short
117 * keys requires a lot of space, so we check the most significant 2 bits of
118 * the first byte to interpreter the length:
120 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
121 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
122 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
123 * 11|000000 this means: specially encoded object will follow. The six bits
124 * number specify the kind of object that follows.
125 * See the REDIS_RDB_ENC_* defines.
127 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
128 * values, will fit inside. */
129 #define REDIS_RDB_6BITLEN 0
130 #define REDIS_RDB_14BITLEN 1
131 #define REDIS_RDB_32BITLEN 2
132 #define REDIS_RDB_ENCVAL 3
133 #define REDIS_RDB_LENERR UINT_MAX
135 /* When a length of a string object stored on disk has the first two bits
136 * set, the remaining two bits specify a special encoding for the object
137 * accordingly to the following defines: */
138 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
139 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
140 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
141 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
144 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
145 #define REDIS_SLAVE 2 /* This client is a slave server */
146 #define REDIS_MASTER 4 /* This client is a master server */
147 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
149 /* Slave replication state - slave side */
150 #define REDIS_REPL_NONE 0 /* No active replication */
151 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
152 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
154 /* Slave replication state - from the point of view of master
155 * Note that in SEND_BULK and ONLINE state the slave receives new updates
156 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
157 * to start the next background saving in order to send updates to it. */
158 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
159 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
160 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
161 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
163 /* List related stuff */
167 /* Sort operations */
168 #define REDIS_SORT_GET 0
169 #define REDIS_SORT_DEL 1
170 #define REDIS_SORT_INCR 2
171 #define REDIS_SORT_DECR 3
172 #define REDIS_SORT_ASC 4
173 #define REDIS_SORT_DESC 5
174 #define REDIS_SORTKEY_MAX 1024
177 #define REDIS_DEBUG 0
178 #define REDIS_NOTICE 1
179 #define REDIS_WARNING 2
181 /* Anti-warning macro... */
182 #define REDIS_NOTUSED(V) ((void) V)
185 /*================================= Data types ============================== */
187 /* A redis object, that is a type able to hold a string / list / set */
188 typedef struct redisObject
{
191 unsigned char encoding
;
192 unsigned char notused
[2];
196 typedef struct redisDb
{
202 /* With multiplexing we need to take per-clinet state.
203 * Clients are taken in a liked list. */
204 typedef struct redisClient
{
211 int bulklen
; /* bulk read len. -1 if not in bulk read mode */
214 time_t lastinteraction
; /* time of the last interaction, used for timeout */
215 int flags
; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
216 int slaveseldb
; /* slave selected db, if this client is a slave */
217 int authenticated
; /* when requirepass is non-NULL */
218 int replstate
; /* replication state if this is a slave */
219 int repldbfd
; /* replication DB file descriptor */
220 long repldboff
; /* replication DB file offset */
221 off_t repldbsize
; /* replication DB file size */
229 /* Global server state structure */
235 unsigned int sharingpoolsize
;
236 long long dirty
; /* changes to DB from the last save */
238 list
*slaves
, *monitors
;
239 char neterr
[ANET_ERR_LEN
];
241 int cronloops
; /* number of times the cron function run */
242 list
*objfreelist
; /* A list of freed objects to avoid malloc() */
243 time_t lastsave
; /* Unix time of last save succeeede */
244 size_t usedmemory
; /* Used memory in megabytes */
245 /* Fields used only for stats */
246 time_t stat_starttime
; /* server start time */
247 long long stat_numcommands
; /* number of processed commands */
248 long long stat_numconnections
; /* number of connections received */
256 int bgsaveinprogress
;
257 pid_t bgsavechildpid
;
258 struct saveparam
*saveparams
;
265 /* Replication related */
269 redisClient
*master
; /* client that is master for this slave */
271 unsigned int maxclients
;
272 unsigned int maxmemory
;
273 /* Sort parameters - qsort_r() is only available under BSD so we
274 * have to take this state global, in order to pass it to sortCompare() */
280 typedef void redisCommandProc(redisClient
*c
);
281 struct redisCommand
{
283 redisCommandProc
*proc
;
288 struct redisFunctionSym
{
290 unsigned long pointer
;
293 typedef struct _redisSortObject
{
301 typedef struct _redisSortOperation
{
304 } redisSortOperation
;
306 struct sharedObjectsStruct
{
307 robj
*crlf
, *ok
, *err
, *emptybulk
, *czero
, *cone
, *pong
, *space
,
308 *colon
, *nullbulk
, *nullmultibulk
,
309 *emptymultibulk
, *wrongtypeerr
, *nokeyerr
, *syntaxerr
, *sameobjecterr
,
310 *outofrangeerr
, *plus
,
311 *select0
, *select1
, *select2
, *select3
, *select4
,
312 *select5
, *select6
, *select7
, *select8
, *select9
;
315 /*================================ Prototypes =============================== */
317 static void freeStringObject(robj
*o
);
318 static void freeListObject(robj
*o
);
319 static void freeSetObject(robj
*o
);
320 static void decrRefCount(void *o
);
321 static robj
*createObject(int type
, void *ptr
);
322 static void freeClient(redisClient
*c
);
323 static int rdbLoad(char *filename
);
324 static void addReply(redisClient
*c
, robj
*obj
);
325 static void addReplySds(redisClient
*c
, sds s
);
326 static void incrRefCount(robj
*o
);
327 static int rdbSaveBackground(char *filename
);
328 static robj
*createStringObject(char *ptr
, size_t len
);
329 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
);
330 static int syncWithMaster(void);
331 static robj
*tryObjectSharing(robj
*o
);
332 static int tryObjectEncoding(robj
*o
);
333 static robj
*getDecodedObject(const robj
*o
);
334 static int removeExpire(redisDb
*db
, robj
*key
);
335 static int expireIfNeeded(redisDb
*db
, robj
*key
);
336 static int deleteIfVolatile(redisDb
*db
, robj
*key
);
337 static int deleteKey(redisDb
*db
, robj
*key
);
338 static time_t getExpire(redisDb
*db
, robj
*key
);
339 static int setExpire(redisDb
*db
, robj
*key
, time_t when
);
340 static void updateSlavesWaitingBgsave(int bgsaveerr
);
341 static void freeMemoryIfNeeded(void);
342 static int processCommand(redisClient
*c
);
343 static void setupSigSegvAction(void);
344 static void rdbRemoveTempFile(pid_t childpid
);
346 static void authCommand(redisClient
*c
);
347 static void pingCommand(redisClient
*c
);
348 static void echoCommand(redisClient
*c
);
349 static void setCommand(redisClient
*c
);
350 static void setnxCommand(redisClient
*c
);
351 static void getCommand(redisClient
*c
);
352 static void delCommand(redisClient
*c
);
353 static void existsCommand(redisClient
*c
);
354 static void incrCommand(redisClient
*c
);
355 static void decrCommand(redisClient
*c
);
356 static void incrbyCommand(redisClient
*c
);
357 static void decrbyCommand(redisClient
*c
);
358 static void selectCommand(redisClient
*c
);
359 static void randomkeyCommand(redisClient
*c
);
360 static void keysCommand(redisClient
*c
);
361 static void dbsizeCommand(redisClient
*c
);
362 static void lastsaveCommand(redisClient
*c
);
363 static void saveCommand(redisClient
*c
);
364 static void bgsaveCommand(redisClient
*c
);
365 static void shutdownCommand(redisClient
*c
);
366 static void moveCommand(redisClient
*c
);
367 static void renameCommand(redisClient
*c
);
368 static void renamenxCommand(redisClient
*c
);
369 static void lpushCommand(redisClient
*c
);
370 static void rpushCommand(redisClient
*c
);
371 static void lpopCommand(redisClient
*c
);
372 static void rpopCommand(redisClient
*c
);
373 static void llenCommand(redisClient
*c
);
374 static void lindexCommand(redisClient
*c
);
375 static void lrangeCommand(redisClient
*c
);
376 static void ltrimCommand(redisClient
*c
);
377 static void typeCommand(redisClient
*c
);
378 static void lsetCommand(redisClient
*c
);
379 static void saddCommand(redisClient
*c
);
380 static void sremCommand(redisClient
*c
);
381 static void smoveCommand(redisClient
*c
);
382 static void sismemberCommand(redisClient
*c
);
383 static void scardCommand(redisClient
*c
);
384 static void spopCommand(redisClient
*c
);
385 static void sinterCommand(redisClient
*c
);
386 static void sinterstoreCommand(redisClient
*c
);
387 static void sunionCommand(redisClient
*c
);
388 static void sunionstoreCommand(redisClient
*c
);
389 static void sdiffCommand(redisClient
*c
);
390 static void sdiffstoreCommand(redisClient
*c
);
391 static void syncCommand(redisClient
*c
);
392 static void flushdbCommand(redisClient
*c
);
393 static void flushallCommand(redisClient
*c
);
394 static void sortCommand(redisClient
*c
);
395 static void lremCommand(redisClient
*c
);
396 static void infoCommand(redisClient
*c
);
397 static void mgetCommand(redisClient
*c
);
398 static void monitorCommand(redisClient
*c
);
399 static void expireCommand(redisClient
*c
);
400 static void getSetCommand(redisClient
*c
);
401 static void ttlCommand(redisClient
*c
);
402 static void slaveofCommand(redisClient
*c
);
403 static void debugCommand(redisClient
*c
);
404 /*================================= Globals ================================= */
407 static struct redisServer server
; /* server global state */
408 static struct redisCommand cmdTable
[] = {
409 {"get",getCommand
,2,REDIS_CMD_INLINE
},
410 {"set",setCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
411 {"setnx",setnxCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
412 {"del",delCommand
,-2,REDIS_CMD_INLINE
},
413 {"exists",existsCommand
,2,REDIS_CMD_INLINE
},
414 {"incr",incrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
415 {"decr",decrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
416 {"mget",mgetCommand
,-2,REDIS_CMD_INLINE
},
417 {"rpush",rpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
418 {"lpush",lpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
419 {"rpop",rpopCommand
,2,REDIS_CMD_INLINE
},
420 {"lpop",lpopCommand
,2,REDIS_CMD_INLINE
},
421 {"llen",llenCommand
,2,REDIS_CMD_INLINE
},
422 {"lindex",lindexCommand
,3,REDIS_CMD_INLINE
},
423 {"lset",lsetCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
424 {"lrange",lrangeCommand
,4,REDIS_CMD_INLINE
},
425 {"ltrim",ltrimCommand
,4,REDIS_CMD_INLINE
},
426 {"lrem",lremCommand
,4,REDIS_CMD_BULK
},
427 {"sadd",saddCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
428 {"srem",sremCommand
,3,REDIS_CMD_BULK
},
429 {"smove",smoveCommand
,4,REDIS_CMD_BULK
},
430 {"sismember",sismemberCommand
,3,REDIS_CMD_BULK
},
431 {"scard",scardCommand
,2,REDIS_CMD_INLINE
},
432 {"spop",spopCommand
,2,REDIS_CMD_INLINE
},
433 {"sinter",sinterCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
434 {"sinterstore",sinterstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
435 {"sunion",sunionCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
436 {"sunionstore",sunionstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
437 {"sdiff",sdiffCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
438 {"sdiffstore",sdiffstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
439 {"smembers",sinterCommand
,2,REDIS_CMD_INLINE
},
440 {"incrby",incrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
441 {"decrby",decrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
442 {"getset",getSetCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
443 {"randomkey",randomkeyCommand
,1,REDIS_CMD_INLINE
},
444 {"select",selectCommand
,2,REDIS_CMD_INLINE
},
445 {"move",moveCommand
,3,REDIS_CMD_INLINE
},
446 {"rename",renameCommand
,3,REDIS_CMD_INLINE
},
447 {"renamenx",renamenxCommand
,3,REDIS_CMD_INLINE
},
448 {"expire",expireCommand
,3,REDIS_CMD_INLINE
},
449 {"keys",keysCommand
,2,REDIS_CMD_INLINE
},
450 {"dbsize",dbsizeCommand
,1,REDIS_CMD_INLINE
},
451 {"auth",authCommand
,2,REDIS_CMD_INLINE
},
452 {"ping",pingCommand
,1,REDIS_CMD_INLINE
},
453 {"echo",echoCommand
,2,REDIS_CMD_BULK
},
454 {"save",saveCommand
,1,REDIS_CMD_INLINE
},
455 {"bgsave",bgsaveCommand
,1,REDIS_CMD_INLINE
},
456 {"shutdown",shutdownCommand
,1,REDIS_CMD_INLINE
},
457 {"lastsave",lastsaveCommand
,1,REDIS_CMD_INLINE
},
458 {"type",typeCommand
,2,REDIS_CMD_INLINE
},
459 {"sync",syncCommand
,1,REDIS_CMD_INLINE
},
460 {"flushdb",flushdbCommand
,1,REDIS_CMD_INLINE
},
461 {"flushall",flushallCommand
,1,REDIS_CMD_INLINE
},
462 {"sort",sortCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
463 {"info",infoCommand
,1,REDIS_CMD_INLINE
},
464 {"monitor",monitorCommand
,1,REDIS_CMD_INLINE
},
465 {"ttl",ttlCommand
,2,REDIS_CMD_INLINE
},
466 {"slaveof",slaveofCommand
,3,REDIS_CMD_INLINE
},
467 {"debug",debugCommand
,-2,REDIS_CMD_INLINE
},
470 /*============================ Utility functions ============================ */
472 /* Glob-style pattern matching. */
473 int stringmatchlen(const char *pattern
, int patternLen
,
474 const char *string
, int stringLen
, int nocase
)
479 while (pattern
[1] == '*') {
484 return 1; /* match */
486 if (stringmatchlen(pattern
+1, patternLen
-1,
487 string
, stringLen
, nocase
))
488 return 1; /* match */
492 return 0; /* no match */
496 return 0; /* no match */
506 not = pattern
[0] == '^';
513 if (pattern
[0] == '\\') {
516 if (pattern
[0] == string
[0])
518 } else if (pattern
[0] == ']') {
520 } else if (patternLen
== 0) {
524 } else if (pattern
[1] == '-' && patternLen
>= 3) {
525 int start
= pattern
[0];
526 int end
= pattern
[2];
534 start
= tolower(start
);
540 if (c
>= start
&& c
<= end
)
544 if (pattern
[0] == string
[0])
547 if (tolower((int)pattern
[0]) == tolower((int)string
[0]))
557 return 0; /* no match */
563 if (patternLen
>= 2) {
570 if (pattern
[0] != string
[0])
571 return 0; /* no match */
573 if (tolower((int)pattern
[0]) != tolower((int)string
[0]))
574 return 0; /* no match */
582 if (stringLen
== 0) {
583 while(*pattern
== '*') {
590 if (patternLen
== 0 && stringLen
== 0)
595 static void redisLog(int level
, const char *fmt
, ...) {
599 fp
= (server
.logfile
== NULL
) ? stdout
: fopen(server
.logfile
,"a");
603 if (level
>= server
.verbosity
) {
609 strftime(buf
,64,"%d %b %H:%M:%S",gmtime(&now
));
610 fprintf(fp
,"%s %c ",buf
,c
[level
]);
611 vfprintf(fp
, fmt
, ap
);
617 if (server
.logfile
) fclose(fp
);
620 /*====================== Hash table type implementation ==================== */
622 /* This is an hash table type that uses the SDS dynamic strings libary as
623 * keys and radis objects as values (objects can hold SDS strings,
626 static int sdsDictKeyCompare(void *privdata
, const void *key1
,
630 DICT_NOTUSED(privdata
);
632 l1
= sdslen((sds
)key1
);
633 l2
= sdslen((sds
)key2
);
634 if (l1
!= l2
) return 0;
635 return memcmp(key1
, key2
, l1
) == 0;
638 static void dictRedisObjectDestructor(void *privdata
, void *val
)
640 DICT_NOTUSED(privdata
);
645 static int dictObjKeyCompare(void *privdata
, const void *key1
,
648 const robj
*o1
= key1
, *o2
= key2
;
649 return sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
652 static unsigned int dictObjHash(const void *key
) {
654 return dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
657 static int dictEncObjKeyCompare(void *privdata
, const void *key1
,
660 const robj
*o1
= key1
, *o2
= key2
;
662 if (o1
->encoding
== REDIS_ENCODING_RAW
&&
663 o2
->encoding
== REDIS_ENCODING_RAW
)
664 return sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
669 dec1
= o1
->encoding
!= REDIS_ENCODING_RAW
?
670 getDecodedObject(o1
) : (robj
*)o1
;
671 dec2
= o2
->encoding
!= REDIS_ENCODING_RAW
?
672 getDecodedObject(o2
) : (robj
*)o2
;
673 cmp
= sdsDictKeyCompare(privdata
,dec1
->ptr
,dec2
->ptr
);
674 if (dec1
!= o1
) decrRefCount(dec1
);
675 if (dec2
!= o2
) decrRefCount(dec2
);
680 static unsigned int dictEncObjHash(const void *key
) {
683 if (o
->encoding
== REDIS_ENCODING_RAW
)
684 return dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
686 robj
*dec
= getDecodedObject(o
);
687 unsigned int hash
= dictGenHashFunction(dec
->ptr
, sdslen((sds
)dec
->ptr
));
693 static dictType setDictType
= {
694 dictEncObjHash
, /* hash function */
697 dictEncObjKeyCompare
, /* key compare */
698 dictRedisObjectDestructor
, /* key destructor */
699 NULL
/* val destructor */
702 static dictType hashDictType
= {
703 dictObjHash
, /* hash function */
706 dictObjKeyCompare
, /* key compare */
707 dictRedisObjectDestructor
, /* key destructor */
708 dictRedisObjectDestructor
/* val destructor */
711 /* ========================= Random utility functions ======================= */
713 /* Redis generally does not try to recover from out of memory conditions
714 * when allocating objects or strings, it is not clear if it will be possible
715 * to report this condition to the client since the networking layer itself
716 * is based on heap allocation for send buffers, so we simply abort.
717 * At least the code will be simpler to read... */
718 static void oom(const char *msg
) {
719 fprintf(stderr
, "%s: Out of memory\n",msg
);
725 /* ====================== Redis server networking stuff ===================== */
726 static void closeTimedoutClients(void) {
729 time_t now
= time(NULL
);
731 listRewind(server
.clients
);
732 while ((ln
= listYield(server
.clients
)) != NULL
) {
733 c
= listNodeValue(ln
);
734 if (!(c
->flags
& REDIS_SLAVE
) && /* no timeout for slaves */
735 !(c
->flags
& REDIS_MASTER
) && /* no timeout for masters */
736 (now
- c
->lastinteraction
> server
.maxidletime
)) {
737 redisLog(REDIS_DEBUG
,"Closing idle client");
743 static int htNeedsResize(dict
*dict
) {
744 long long size
, used
;
746 size
= dictSlots(dict
);
747 used
= dictSize(dict
);
748 return (size
&& used
&& size
> DICT_HT_INITIAL_SIZE
&&
749 (used
*100/size
< REDIS_HT_MINFILL
));
752 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
753 * we resize the hash table to save memory */
754 static void tryResizeHashTables(void) {
757 for (j
= 0; j
< server
.dbnum
; j
++) {
758 if (htNeedsResize(server
.db
[j
].dict
)) {
759 redisLog(REDIS_DEBUG
,"The hash table %d is too sparse, resize it...",j
);
760 dictResize(server
.db
[j
].dict
);
761 redisLog(REDIS_DEBUG
,"Hash table %d resized.",j
);
763 if (htNeedsResize(server
.db
[j
].expires
))
764 dictResize(server
.db
[j
].expires
);
768 static int serverCron(struct aeEventLoop
*eventLoop
, long long id
, void *clientData
) {
769 int j
, loops
= server
.cronloops
++;
770 REDIS_NOTUSED(eventLoop
);
772 REDIS_NOTUSED(clientData
);
774 /* Update the global state with the amount of used memory */
775 server
.usedmemory
= zmalloc_used_memory();
777 /* Show some info about non-empty databases */
778 for (j
= 0; j
< server
.dbnum
; j
++) {
779 long long size
, used
, vkeys
;
781 size
= dictSlots(server
.db
[j
].dict
);
782 used
= dictSize(server
.db
[j
].dict
);
783 vkeys
= dictSize(server
.db
[j
].expires
);
784 if (!(loops
% 5) && (used
|| vkeys
)) {
785 redisLog(REDIS_DEBUG
,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j
,used
,vkeys
,size
);
786 /* dictPrintStats(server.dict); */
790 /* We don't want to resize the hash tables while a bacground saving
791 * is in progress: the saving child is created using fork() that is
792 * implemented with a copy-on-write semantic in most modern systems, so
793 * if we resize the HT while there is the saving child at work actually
794 * a lot of memory movements in the parent will cause a lot of pages
796 if (!server
.bgsaveinprogress
) tryResizeHashTables();
798 /* Show information about connected clients */
800 redisLog(REDIS_DEBUG
,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
801 listLength(server
.clients
)-listLength(server
.slaves
),
802 listLength(server
.slaves
),
804 dictSize(server
.sharingpool
));
807 /* Close connections of timedout clients */
808 if (server
.maxidletime
&& !(loops
% 10))
809 closeTimedoutClients();
811 /* Check if a background saving in progress terminated */
812 if (server
.bgsaveinprogress
) {
814 if (wait4(-1,&statloc
,WNOHANG
,NULL
)) {
815 int exitcode
= WEXITSTATUS(statloc
);
816 int bysignal
= WIFSIGNALED(statloc
);
818 if (!bysignal
&& exitcode
== 0) {
819 redisLog(REDIS_NOTICE
,
820 "Background saving terminated with success");
822 server
.lastsave
= time(NULL
);
823 } else if (!bysignal
&& exitcode
!= 0) {
824 redisLog(REDIS_WARNING
, "Background saving error");
826 redisLog(REDIS_WARNING
,
827 "Background saving terminated by signal");
828 rdbRemoveTempFile(server
.bgsavechildpid
);
830 server
.bgsaveinprogress
= 0;
831 server
.bgsavechildpid
= -1;
832 updateSlavesWaitingBgsave(exitcode
== 0 ? REDIS_OK
: REDIS_ERR
);
835 /* If there is not a background saving in progress check if
836 * we have to save now */
837 time_t now
= time(NULL
);
838 for (j
= 0; j
< server
.saveparamslen
; j
++) {
839 struct saveparam
*sp
= server
.saveparams
+j
;
841 if (server
.dirty
>= sp
->changes
&&
842 now
-server
.lastsave
> sp
->seconds
) {
843 redisLog(REDIS_NOTICE
,"%d changes in %d seconds. Saving...",
844 sp
->changes
, sp
->seconds
);
845 rdbSaveBackground(server
.dbfilename
);
851 /* Try to expire a few timed out keys */
852 for (j
= 0; j
< server
.dbnum
; j
++) {
853 redisDb
*db
= server
.db
+j
;
854 int num
= dictSize(db
->expires
);
857 time_t now
= time(NULL
);
859 if (num
> REDIS_EXPIRELOOKUPS_PER_CRON
)
860 num
= REDIS_EXPIRELOOKUPS_PER_CRON
;
865 if ((de
= dictGetRandomKey(db
->expires
)) == NULL
) break;
866 t
= (time_t) dictGetEntryVal(de
);
868 deleteKey(db
,dictGetEntryKey(de
));
874 /* Check if we should connect to a MASTER */
875 if (server
.replstate
== REDIS_REPL_CONNECT
) {
876 redisLog(REDIS_NOTICE
,"Connecting to MASTER...");
877 if (syncWithMaster() == REDIS_OK
) {
878 redisLog(REDIS_NOTICE
,"MASTER <-> SLAVE sync succeeded");
884 static void createSharedObjects(void) {
885 shared
.crlf
= createObject(REDIS_STRING
,sdsnew("\r\n"));
886 shared
.ok
= createObject(REDIS_STRING
,sdsnew("+OK\r\n"));
887 shared
.err
= createObject(REDIS_STRING
,sdsnew("-ERR\r\n"));
888 shared
.emptybulk
= createObject(REDIS_STRING
,sdsnew("$0\r\n\r\n"));
889 shared
.czero
= createObject(REDIS_STRING
,sdsnew(":0\r\n"));
890 shared
.cone
= createObject(REDIS_STRING
,sdsnew(":1\r\n"));
891 shared
.nullbulk
= createObject(REDIS_STRING
,sdsnew("$-1\r\n"));
892 shared
.nullmultibulk
= createObject(REDIS_STRING
,sdsnew("*-1\r\n"));
893 shared
.emptymultibulk
= createObject(REDIS_STRING
,sdsnew("*0\r\n"));
895 shared
.pong
= createObject(REDIS_STRING
,sdsnew("+PONG\r\n"));
896 shared
.wrongtypeerr
= createObject(REDIS_STRING
,sdsnew(
897 "-ERR Operation against a key holding the wrong kind of value\r\n"));
898 shared
.nokeyerr
= createObject(REDIS_STRING
,sdsnew(
899 "-ERR no such key\r\n"));
900 shared
.syntaxerr
= createObject(REDIS_STRING
,sdsnew(
901 "-ERR syntax error\r\n"));
902 shared
.sameobjecterr
= createObject(REDIS_STRING
,sdsnew(
903 "-ERR source and destination objects are the same\r\n"));
904 shared
.outofrangeerr
= createObject(REDIS_STRING
,sdsnew(
905 "-ERR index out of range\r\n"));
906 shared
.space
= createObject(REDIS_STRING
,sdsnew(" "));
907 shared
.colon
= createObject(REDIS_STRING
,sdsnew(":"));
908 shared
.plus
= createObject(REDIS_STRING
,sdsnew("+"));
909 shared
.select0
= createStringObject("select 0\r\n",10);
910 shared
.select1
= createStringObject("select 1\r\n",10);
911 shared
.select2
= createStringObject("select 2\r\n",10);
912 shared
.select3
= createStringObject("select 3\r\n",10);
913 shared
.select4
= createStringObject("select 4\r\n",10);
914 shared
.select5
= createStringObject("select 5\r\n",10);
915 shared
.select6
= createStringObject("select 6\r\n",10);
916 shared
.select7
= createStringObject("select 7\r\n",10);
917 shared
.select8
= createStringObject("select 8\r\n",10);
918 shared
.select9
= createStringObject("select 9\r\n",10);
921 static void appendServerSaveParams(time_t seconds
, int changes
) {
922 server
.saveparams
= zrealloc(server
.saveparams
,sizeof(struct saveparam
)*(server
.saveparamslen
+1));
923 if (server
.saveparams
== NULL
) oom("appendServerSaveParams");
924 server
.saveparams
[server
.saveparamslen
].seconds
= seconds
;
925 server
.saveparams
[server
.saveparamslen
].changes
= changes
;
926 server
.saveparamslen
++;
929 static void ResetServerSaveParams() {
930 zfree(server
.saveparams
);
931 server
.saveparams
= NULL
;
932 server
.saveparamslen
= 0;
935 static void initServerConfig() {
936 server
.dbnum
= REDIS_DEFAULT_DBNUM
;
937 server
.port
= REDIS_SERVERPORT
;
938 server
.verbosity
= REDIS_DEBUG
;
939 server
.maxidletime
= REDIS_MAXIDLETIME
;
940 server
.saveparams
= NULL
;
941 server
.logfile
= NULL
; /* NULL = log on standard output */
942 server
.bindaddr
= NULL
;
943 server
.glueoutputbuf
= 1;
944 server
.daemonize
= 0;
945 server
.pidfile
= "/var/run/redis.pid";
946 server
.dbfilename
= "dump.rdb";
947 server
.requirepass
= NULL
;
948 server
.shareobjects
= 0;
949 server
.sharingpoolsize
= 1024;
950 server
.maxclients
= 0;
951 server
.maxmemory
= 0;
952 ResetServerSaveParams();
954 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
955 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
956 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
957 /* Replication related */
959 server
.masterhost
= NULL
;
960 server
.masterport
= 6379;
961 server
.master
= NULL
;
962 server
.replstate
= REDIS_REPL_NONE
;
965 static void initServer() {
968 signal(SIGHUP
, SIG_IGN
);
969 signal(SIGPIPE
, SIG_IGN
);
970 setupSigSegvAction();
972 server
.clients
= listCreate();
973 server
.slaves
= listCreate();
974 server
.monitors
= listCreate();
975 server
.objfreelist
= listCreate();
976 createSharedObjects();
977 server
.el
= aeCreateEventLoop();
978 server
.db
= zmalloc(sizeof(redisDb
)*server
.dbnum
);
979 server
.sharingpool
= dictCreate(&setDictType
,NULL
);
980 if (!server
.db
|| !server
.clients
|| !server
.slaves
|| !server
.monitors
|| !server
.el
|| !server
.objfreelist
)
981 oom("server initialization"); /* Fatal OOM */
982 server
.fd
= anetTcpServer(server
.neterr
, server
.port
, server
.bindaddr
);
983 if (server
.fd
== -1) {
984 redisLog(REDIS_WARNING
, "Opening TCP port: %s", server
.neterr
);
987 for (j
= 0; j
< server
.dbnum
; j
++) {
988 server
.db
[j
].dict
= dictCreate(&hashDictType
,NULL
);
989 server
.db
[j
].expires
= dictCreate(&setDictType
,NULL
);
992 server
.cronloops
= 0;
993 server
.bgsaveinprogress
= 0;
994 server
.bgsavechildpid
= -1;
995 server
.lastsave
= time(NULL
);
997 server
.usedmemory
= 0;
998 server
.stat_numcommands
= 0;
999 server
.stat_numconnections
= 0;
1000 server
.stat_starttime
= time(NULL
);
1001 aeCreateTimeEvent(server
.el
, 1000, serverCron
, NULL
, NULL
);
1004 /* Empty the whole database */
1005 static long long emptyDb() {
1007 long long removed
= 0;
1009 for (j
= 0; j
< server
.dbnum
; j
++) {
1010 removed
+= dictSize(server
.db
[j
].dict
);
1011 dictEmpty(server
.db
[j
].dict
);
1012 dictEmpty(server
.db
[j
].expires
);
1017 static int yesnotoi(char *s
) {
1018 if (!strcasecmp(s
,"yes")) return 1;
1019 else if (!strcasecmp(s
,"no")) return 0;
1023 /* I agree, this is a very rudimental way to load a configuration...
1024 will improve later if the config gets more complex */
1025 static void loadServerConfig(char *filename
) {
1027 char buf
[REDIS_CONFIGLINE_MAX
+1], *err
= NULL
;
1031 if (filename
[0] == '-' && filename
[1] == '\0')
1034 if ((fp
= fopen(filename
,"r")) == NULL
) {
1035 redisLog(REDIS_WARNING
,"Fatal error, can't open config file");
1040 while(fgets(buf
,REDIS_CONFIGLINE_MAX
+1,fp
) != NULL
) {
1046 line
= sdstrim(line
," \t\r\n");
1048 /* Skip comments and blank lines*/
1049 if (line
[0] == '#' || line
[0] == '\0') {
1054 /* Split into arguments */
1055 argv
= sdssplitlen(line
,sdslen(line
)," ",1,&argc
);
1056 sdstolower(argv
[0]);
1058 /* Execute config directives */
1059 if (!strcasecmp(argv
[0],"timeout") && argc
== 2) {
1060 server
.maxidletime
= atoi(argv
[1]);
1061 if (server
.maxidletime
< 0) {
1062 err
= "Invalid timeout value"; goto loaderr
;
1064 } else if (!strcasecmp(argv
[0],"port") && argc
== 2) {
1065 server
.port
= atoi(argv
[1]);
1066 if (server
.port
< 1 || server
.port
> 65535) {
1067 err
= "Invalid port"; goto loaderr
;
1069 } else if (!strcasecmp(argv
[0],"bind") && argc
== 2) {
1070 server
.bindaddr
= zstrdup(argv
[1]);
1071 } else if (!strcasecmp(argv
[0],"save") && argc
== 3) {
1072 int seconds
= atoi(argv
[1]);
1073 int changes
= atoi(argv
[2]);
1074 if (seconds
< 1 || changes
< 0) {
1075 err
= "Invalid save parameters"; goto loaderr
;
1077 appendServerSaveParams(seconds
,changes
);
1078 } else if (!strcasecmp(argv
[0],"dir") && argc
== 2) {
1079 if (chdir(argv
[1]) == -1) {
1080 redisLog(REDIS_WARNING
,"Can't chdir to '%s': %s",
1081 argv
[1], strerror(errno
));
1084 } else if (!strcasecmp(argv
[0],"loglevel") && argc
== 2) {
1085 if (!strcasecmp(argv
[1],"debug")) server
.verbosity
= REDIS_DEBUG
;
1086 else if (!strcasecmp(argv
[1],"notice")) server
.verbosity
= REDIS_NOTICE
;
1087 else if (!strcasecmp(argv
[1],"warning")) server
.verbosity
= REDIS_WARNING
;
1089 err
= "Invalid log level. Must be one of debug, notice, warning";
1092 } else if (!strcasecmp(argv
[0],"logfile") && argc
== 2) {
1095 server
.logfile
= zstrdup(argv
[1]);
1096 if (!strcasecmp(server
.logfile
,"stdout")) {
1097 zfree(server
.logfile
);
1098 server
.logfile
= NULL
;
1100 if (server
.logfile
) {
1101 /* Test if we are able to open the file. The server will not
1102 * be able to abort just for this problem later... */
1103 logfp
= fopen(server
.logfile
,"a");
1104 if (logfp
== NULL
) {
1105 err
= sdscatprintf(sdsempty(),
1106 "Can't open the log file: %s", strerror(errno
));
1111 } else if (!strcasecmp(argv
[0],"databases") && argc
== 2) {
1112 server
.dbnum
= atoi(argv
[1]);
1113 if (server
.dbnum
< 1) {
1114 err
= "Invalid number of databases"; goto loaderr
;
1116 } else if (!strcasecmp(argv
[0],"maxclients") && argc
== 2) {
1117 server
.maxclients
= atoi(argv
[1]);
1118 } else if (!strcasecmp(argv
[0],"maxmemory") && argc
== 2) {
1119 server
.maxmemory
= atoi(argv
[1]);
1120 } else if (!strcasecmp(argv
[0],"slaveof") && argc
== 3) {
1121 server
.masterhost
= sdsnew(argv
[1]);
1122 server
.masterport
= atoi(argv
[2]);
1123 server
.replstate
= REDIS_REPL_CONNECT
;
1124 } else if (!strcasecmp(argv
[0],"glueoutputbuf") && argc
== 2) {
1125 if ((server
.glueoutputbuf
= yesnotoi(argv
[1])) == -1) {
1126 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1128 } else if (!strcasecmp(argv
[0],"shareobjects") && argc
== 2) {
1129 if ((server
.shareobjects
= yesnotoi(argv
[1])) == -1) {
1130 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1132 } else if (!strcasecmp(argv
[0],"shareobjectspoolsize") && argc
== 2) {
1133 server
.sharingpoolsize
= atoi(argv
[1]);
1134 if (server
.sharingpoolsize
< 1) {
1135 err
= "invalid object sharing pool size"; goto loaderr
;
1137 } else if (!strcasecmp(argv
[0],"daemonize") && argc
== 2) {
1138 if ((server
.daemonize
= yesnotoi(argv
[1])) == -1) {
1139 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1141 } else if (!strcasecmp(argv
[0],"requirepass") && argc
== 2) {
1142 server
.requirepass
= zstrdup(argv
[1]);
1143 } else if (!strcasecmp(argv
[0],"pidfile") && argc
== 2) {
1144 server
.pidfile
= zstrdup(argv
[1]);
1145 } else if (!strcasecmp(argv
[0],"dbfilename") && argc
== 2) {
1146 server
.dbfilename
= zstrdup(argv
[1]);
1148 err
= "Bad directive or wrong number of arguments"; goto loaderr
;
1150 for (j
= 0; j
< argc
; j
++)
1155 if (fp
!= stdin
) fclose(fp
);
1159 fprintf(stderr
, "\n*** FATAL CONFIG FILE ERROR ***\n");
1160 fprintf(stderr
, "Reading the configuration file, at line %d\n", linenum
);
1161 fprintf(stderr
, ">>> '%s'\n", line
);
1162 fprintf(stderr
, "%s\n", err
);
1166 static void freeClientArgv(redisClient
*c
) {
1169 for (j
= 0; j
< c
->argc
; j
++)
1170 decrRefCount(c
->argv
[j
]);
1174 static void freeClient(redisClient
*c
) {
1177 aeDeleteFileEvent(server
.el
,c
->fd
,AE_READABLE
);
1178 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1179 sdsfree(c
->querybuf
);
1180 listRelease(c
->reply
);
1183 ln
= listSearchKey(server
.clients
,c
);
1185 listDelNode(server
.clients
,ln
);
1186 if (c
->flags
& REDIS_SLAVE
) {
1187 if (c
->replstate
== REDIS_REPL_SEND_BULK
&& c
->repldbfd
!= -1)
1189 list
*l
= (c
->flags
& REDIS_MONITOR
) ? server
.monitors
: server
.slaves
;
1190 ln
= listSearchKey(l
,c
);
1194 if (c
->flags
& REDIS_MASTER
) {
1195 server
.master
= NULL
;
1196 server
.replstate
= REDIS_REPL_CONNECT
;
1202 static void glueReplyBuffersIfNeeded(redisClient
*c
) {
1207 listRewind(c
->reply
);
1208 while((ln
= listYield(c
->reply
))) {
1210 totlen
+= sdslen(o
->ptr
);
1211 /* This optimization makes more sense if we don't have to copy
1213 if (totlen
> 1024) return;
1219 listRewind(c
->reply
);
1220 while((ln
= listYield(c
->reply
))) {
1222 memcpy(buf
+copylen
,o
->ptr
,sdslen(o
->ptr
));
1223 copylen
+= sdslen(o
->ptr
);
1224 listDelNode(c
->reply
,ln
);
1226 /* Now the output buffer is empty, add the new single element */
1227 o
= createObject(REDIS_STRING
,sdsnewlen(buf
,totlen
));
1228 if (!listAddNodeTail(c
->reply
,o
)) oom("listAddNodeTail");
1232 static void sendReplyToClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1233 redisClient
*c
= privdata
;
1234 int nwritten
= 0, totwritten
= 0, objlen
;
1237 REDIS_NOTUSED(mask
);
1239 if (server
.glueoutputbuf
&& listLength(c
->reply
) > 1)
1240 glueReplyBuffersIfNeeded(c
);
1241 while(listLength(c
->reply
)) {
1242 o
= listNodeValue(listFirst(c
->reply
));
1243 objlen
= sdslen(o
->ptr
);
1246 listDelNode(c
->reply
,listFirst(c
->reply
));
1250 if (c
->flags
& REDIS_MASTER
) {
1251 /* Don't reply to a master */
1252 nwritten
= objlen
- c
->sentlen
;
1254 nwritten
= write(fd
, ((char*)o
->ptr
)+c
->sentlen
, objlen
- c
->sentlen
);
1255 if (nwritten
<= 0) break;
1257 c
->sentlen
+= nwritten
;
1258 totwritten
+= nwritten
;
1259 /* If we fully sent the object on head go to the next one */
1260 if (c
->sentlen
== objlen
) {
1261 listDelNode(c
->reply
,listFirst(c
->reply
));
1264 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1265 * bytes, in a single threaded server it's a good idea to server
1266 * other clients as well, even if a very large request comes from
1267 * super fast link that is always able to accept data (in real world
1268 * terms think to 'KEYS *' against the loopback interfae) */
1269 if (totwritten
> REDIS_MAX_WRITE_PER_EVENT
) break;
1271 if (nwritten
== -1) {
1272 if (errno
== EAGAIN
) {
1275 redisLog(REDIS_DEBUG
,
1276 "Error writing to client: %s", strerror(errno
));
1281 if (totwritten
> 0) c
->lastinteraction
= time(NULL
);
1282 if (listLength(c
->reply
) == 0) {
1284 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1288 static struct redisCommand
*lookupCommand(char *name
) {
1290 while(cmdTable
[j
].name
!= NULL
) {
1291 if (!strcasecmp(name
,cmdTable
[j
].name
)) return &cmdTable
[j
];
1297 /* resetClient prepare the client to process the next command */
1298 static void resetClient(redisClient
*c
) {
1303 /* If this function gets called we already read a whole
1304 * command, argments are in the client argv/argc fields.
1305 * processCommand() execute the command or prepare the
1306 * server for a bulk read from the client.
1308 * If 1 is returned the client is still alive and valid and
1309 * and other operations can be performed by the caller. Otherwise
1310 * if 0 is returned the client was destroied (i.e. after QUIT). */
1311 static int processCommand(redisClient
*c
) {
1312 struct redisCommand
*cmd
;
1315 /* Free some memory if needed (maxmemory setting) */
1316 if (server
.maxmemory
) freeMemoryIfNeeded();
1318 /* The QUIT command is handled as a special case. Normal command
1319 * procs are unable to close the client connection safely */
1320 if (!strcasecmp(c
->argv
[0]->ptr
,"quit")) {
1324 cmd
= lookupCommand(c
->argv
[0]->ptr
);
1326 addReplySds(c
,sdsnew("-ERR unknown command\r\n"));
1329 } else if ((cmd
->arity
> 0 && cmd
->arity
!= c
->argc
) ||
1330 (c
->argc
< -cmd
->arity
)) {
1331 addReplySds(c
,sdsnew("-ERR wrong number of arguments\r\n"));
1334 } else if (server
.maxmemory
&& cmd
->flags
& REDIS_CMD_DENYOOM
&& zmalloc_used_memory() > server
.maxmemory
) {
1335 addReplySds(c
,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1338 } else if (cmd
->flags
& REDIS_CMD_BULK
&& c
->bulklen
== -1) {
1339 int bulklen
= atoi(c
->argv
[c
->argc
-1]->ptr
);
1341 decrRefCount(c
->argv
[c
->argc
-1]);
1342 if (bulklen
< 0 || bulklen
> 1024*1024*1024) {
1344 addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n"));
1349 c
->bulklen
= bulklen
+2; /* add two bytes for CR+LF */
1350 /* It is possible that the bulk read is already in the
1351 * buffer. Check this condition and handle it accordingly */
1352 if ((signed)sdslen(c
->querybuf
) >= c
->bulklen
) {
1353 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
1355 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
1360 /* Let's try to share objects on the command arguments vector */
1361 if (server
.shareobjects
) {
1363 for(j
= 1; j
< c
->argc
; j
++)
1364 c
->argv
[j
] = tryObjectSharing(c
->argv
[j
]);
1366 /* Let's try to encode the bulk object to save space. */
1367 if (cmd
->flags
& REDIS_CMD_BULK
)
1368 tryObjectEncoding(c
->argv
[c
->argc
-1]);
1370 /* Check if the user is authenticated */
1371 if (server
.requirepass
&& !c
->authenticated
&& cmd
->proc
!= authCommand
) {
1372 addReplySds(c
,sdsnew("-ERR operation not permitted\r\n"));
1377 /* Exec the command */
1378 dirty
= server
.dirty
;
1380 if (server
.dirty
-dirty
!= 0 && listLength(server
.slaves
))
1381 replicationFeedSlaves(server
.slaves
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1382 if (listLength(server
.monitors
))
1383 replicationFeedSlaves(server
.monitors
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1384 server
.stat_numcommands
++;
1386 /* Prepare the client for the next command */
1387 if (c
->flags
& REDIS_CLOSE
) {
1395 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
) {
1399 /* (args*2)+1 is enough room for args, spaces, newlines */
1400 robj
*static_outv
[REDIS_STATIC_ARGS
*2+1];
1402 if (argc
<= REDIS_STATIC_ARGS
) {
1405 outv
= zmalloc(sizeof(robj
*)*(argc
*2+1));
1406 if (!outv
) oom("replicationFeedSlaves");
1409 for (j
= 0; j
< argc
; j
++) {
1410 if (j
!= 0) outv
[outc
++] = shared
.space
;
1411 if ((cmd
->flags
& REDIS_CMD_BULK
) && j
== argc
-1) {
1414 lenobj
= createObject(REDIS_STRING
,
1415 sdscatprintf(sdsempty(),"%d\r\n",sdslen(argv
[j
]->ptr
)));
1416 lenobj
->refcount
= 0;
1417 outv
[outc
++] = lenobj
;
1419 outv
[outc
++] = argv
[j
];
1421 outv
[outc
++] = shared
.crlf
;
1423 /* Increment all the refcounts at start and decrement at end in order to
1424 * be sure to free objects if there is no slave in a replication state
1425 * able to be feed with commands */
1426 for (j
= 0; j
< outc
; j
++) incrRefCount(outv
[j
]);
1428 while((ln
= listYield(slaves
))) {
1429 redisClient
*slave
= ln
->value
;
1431 /* Don't feed slaves that are still waiting for BGSAVE to start */
1432 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) continue;
1434 /* Feed all the other slaves, MONITORs and so on */
1435 if (slave
->slaveseldb
!= dictid
) {
1439 case 0: selectcmd
= shared
.select0
; break;
1440 case 1: selectcmd
= shared
.select1
; break;
1441 case 2: selectcmd
= shared
.select2
; break;
1442 case 3: selectcmd
= shared
.select3
; break;
1443 case 4: selectcmd
= shared
.select4
; break;
1444 case 5: selectcmd
= shared
.select5
; break;
1445 case 6: selectcmd
= shared
.select6
; break;
1446 case 7: selectcmd
= shared
.select7
; break;
1447 case 8: selectcmd
= shared
.select8
; break;
1448 case 9: selectcmd
= shared
.select9
; break;
1450 selectcmd
= createObject(REDIS_STRING
,
1451 sdscatprintf(sdsempty(),"select %d\r\n",dictid
));
1452 selectcmd
->refcount
= 0;
1455 addReply(slave
,selectcmd
);
1456 slave
->slaveseldb
= dictid
;
1458 for (j
= 0; j
< outc
; j
++) addReply(slave
,outv
[j
]);
1460 for (j
= 0; j
< outc
; j
++) decrRefCount(outv
[j
]);
1461 if (outv
!= static_outv
) zfree(outv
);
1464 static void readQueryFromClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1465 redisClient
*c
= (redisClient
*) privdata
;
1466 char buf
[REDIS_IOBUF_LEN
];
1469 REDIS_NOTUSED(mask
);
1471 nread
= read(fd
, buf
, REDIS_IOBUF_LEN
);
1473 if (errno
== EAGAIN
) {
1476 redisLog(REDIS_DEBUG
, "Reading from client: %s",strerror(errno
));
1480 } else if (nread
== 0) {
1481 redisLog(REDIS_DEBUG
, "Client closed connection");
1486 c
->querybuf
= sdscatlen(c
->querybuf
, buf
, nread
);
1487 c
->lastinteraction
= time(NULL
);
1493 if (c
->bulklen
== -1) {
1494 /* Read the first line of the query */
1495 char *p
= strchr(c
->querybuf
,'\n');
1502 query
= c
->querybuf
;
1503 c
->querybuf
= sdsempty();
1504 querylen
= 1+(p
-(query
));
1505 if (sdslen(query
) > querylen
) {
1506 /* leave data after the first line of the query in the buffer */
1507 c
->querybuf
= sdscatlen(c
->querybuf
,query
+querylen
,sdslen(query
)-querylen
);
1509 *p
= '\0'; /* remove "\n" */
1510 if (*(p
-1) == '\r') *(p
-1) = '\0'; /* and "\r" if any */
1511 sdsupdatelen(query
);
1513 /* Now we can split the query in arguments */
1514 if (sdslen(query
) == 0) {
1515 /* Ignore empty query */
1519 argv
= sdssplitlen(query
,sdslen(query
)," ",1,&argc
);
1520 if (argv
== NULL
) oom("sdssplitlen");
1523 if (c
->argv
) zfree(c
->argv
);
1524 c
->argv
= zmalloc(sizeof(robj
*)*argc
);
1525 if (c
->argv
== NULL
) oom("allocating arguments list for client");
1527 for (j
= 0; j
< argc
; j
++) {
1528 if (sdslen(argv
[j
])) {
1529 c
->argv
[c
->argc
] = createObject(REDIS_STRING
,argv
[j
]);
1536 /* Execute the command. If the client is still valid
1537 * after processCommand() return and there is something
1538 * on the query buffer try to process the next command. */
1539 if (c
->argc
&& processCommand(c
) && sdslen(c
->querybuf
)) goto again
;
1541 } else if (sdslen(c
->querybuf
) >= REDIS_REQUEST_MAX_SIZE
) {
1542 redisLog(REDIS_DEBUG
, "Client protocol error");
1547 /* Bulk read handling. Note that if we are at this point
1548 the client already sent a command terminated with a newline,
1549 we are reading the bulk data that is actually the last
1550 argument of the command. */
1551 int qbl
= sdslen(c
->querybuf
);
1553 if (c
->bulklen
<= qbl
) {
1554 /* Copy everything but the final CRLF as final argument */
1555 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
1557 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
1564 static int selectDb(redisClient
*c
, int id
) {
1565 if (id
< 0 || id
>= server
.dbnum
)
1567 c
->db
= &server
.db
[id
];
1571 static void *dupClientReplyValue(void *o
) {
1572 incrRefCount((robj
*)o
);
1576 static redisClient
*createClient(int fd
) {
1577 redisClient
*c
= zmalloc(sizeof(*c
));
1579 anetNonBlock(NULL
,fd
);
1580 anetTcpNoDelay(NULL
,fd
);
1581 if (!c
) return NULL
;
1584 c
->querybuf
= sdsempty();
1590 c
->lastinteraction
= time(NULL
);
1591 c
->authenticated
= 0;
1592 c
->replstate
= REDIS_REPL_NONE
;
1593 if ((c
->reply
= listCreate()) == NULL
) oom("listCreate");
1594 listSetFreeMethod(c
->reply
,decrRefCount
);
1595 listSetDupMethod(c
->reply
,dupClientReplyValue
);
1596 if (aeCreateFileEvent(server
.el
, c
->fd
, AE_READABLE
,
1597 readQueryFromClient
, c
, NULL
) == AE_ERR
) {
1601 if (!listAddNodeTail(server
.clients
,c
)) oom("listAddNodeTail");
1605 static void addReply(redisClient
*c
, robj
*obj
) {
1606 if (listLength(c
->reply
) == 0 &&
1607 (c
->replstate
== REDIS_REPL_NONE
||
1608 c
->replstate
== REDIS_REPL_ONLINE
) &&
1609 aeCreateFileEvent(server
.el
, c
->fd
, AE_WRITABLE
,
1610 sendReplyToClient
, c
, NULL
) == AE_ERR
) return;
1611 if (obj
->encoding
!= REDIS_ENCODING_RAW
) {
1612 obj
= getDecodedObject(obj
);
1616 if (!listAddNodeTail(c
->reply
,obj
)) oom("listAddNodeTail");
1619 static void addReplySds(redisClient
*c
, sds s
) {
1620 robj
*o
= createObject(REDIS_STRING
,s
);
1625 static void addReplyBulkLen(redisClient
*c
, robj
*obj
) {
1628 if (obj
->encoding
== REDIS_ENCODING_RAW
) {
1629 len
= sdslen(obj
->ptr
);
1631 long n
= (long)obj
->ptr
;
1638 while((n
= n
/10) != 0) {
1642 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",len
));
1645 static void acceptHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1650 REDIS_NOTUSED(mask
);
1651 REDIS_NOTUSED(privdata
);
1653 cfd
= anetAccept(server
.neterr
, fd
, cip
, &cport
);
1654 if (cfd
== AE_ERR
) {
1655 redisLog(REDIS_DEBUG
,"Accepting client connection: %s", server
.neterr
);
1658 redisLog(REDIS_DEBUG
,"Accepted %s:%d", cip
, cport
);
1659 if ((c
= createClient(cfd
)) == NULL
) {
1660 redisLog(REDIS_WARNING
,"Error allocating resoures for the client");
1661 close(cfd
); /* May be already closed, just ingore errors */
1664 /* If maxclient directive is set and this is one client more... close the
1665 * connection. Note that we create the client instead to check before
1666 * for this condition, since now the socket is already set in nonblocking
1667 * mode and we can send an error for free using the Kernel I/O */
1668 if (server
.maxclients
&& listLength(server
.clients
) > server
.maxclients
) {
1669 char *err
= "-ERR max number of clients reached\r\n";
1671 /* That's a best effort error message, don't check write errors */
1672 (void) write(c
->fd
,err
,strlen(err
));
1676 server
.stat_numconnections
++;
1679 /* ======================= Redis objects implementation ===================== */
1681 static robj
*createObject(int type
, void *ptr
) {
1684 if (listLength(server
.objfreelist
)) {
1685 listNode
*head
= listFirst(server
.objfreelist
);
1686 o
= listNodeValue(head
);
1687 listDelNode(server
.objfreelist
,head
);
1689 o
= zmalloc(sizeof(*o
));
1691 if (!o
) oom("createObject");
1693 o
->encoding
= REDIS_ENCODING_RAW
;
1699 static robj
*createStringObject(char *ptr
, size_t len
) {
1700 return createObject(REDIS_STRING
,sdsnewlen(ptr
,len
));
1703 static robj
*createListObject(void) {
1704 list
*l
= listCreate();
1706 if (!l
) oom("listCreate");
1707 listSetFreeMethod(l
,decrRefCount
);
1708 return createObject(REDIS_LIST
,l
);
1711 static robj
*createSetObject(void) {
1712 dict
*d
= dictCreate(&setDictType
,NULL
);
1713 if (!d
) oom("dictCreate");
1714 return createObject(REDIS_SET
,d
);
1717 static void freeStringObject(robj
*o
) {
1718 if (o
->encoding
== REDIS_ENCODING_RAW
) {
1723 static void freeListObject(robj
*o
) {
1724 listRelease((list
*) o
->ptr
);
1727 static void freeSetObject(robj
*o
) {
1728 dictRelease((dict
*) o
->ptr
);
1731 static void freeHashObject(robj
*o
) {
1732 dictRelease((dict
*) o
->ptr
);
1735 static void incrRefCount(robj
*o
) {
1737 #ifdef DEBUG_REFCOUNT
1738 if (o
->type
== REDIS_STRING
)
1739 printf("Increment '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
);
1743 static void decrRefCount(void *obj
) {
1746 #ifdef DEBUG_REFCOUNT
1747 if (o
->type
== REDIS_STRING
)
1748 printf("Decrement '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
-1);
1750 if (--(o
->refcount
) == 0) {
1752 case REDIS_STRING
: freeStringObject(o
); break;
1753 case REDIS_LIST
: freeListObject(o
); break;
1754 case REDIS_SET
: freeSetObject(o
); break;
1755 case REDIS_HASH
: freeHashObject(o
); break;
1756 default: assert(0 != 0); break;
1758 if (listLength(server
.objfreelist
) > REDIS_OBJFREELIST_MAX
||
1759 !listAddNodeHead(server
.objfreelist
,o
))
1764 static robj
*lookupKey(redisDb
*db
, robj
*key
) {
1765 dictEntry
*de
= dictFind(db
->dict
,key
);
1766 return de
? dictGetEntryVal(de
) : NULL
;
1769 static robj
*lookupKeyRead(redisDb
*db
, robj
*key
) {
1770 expireIfNeeded(db
,key
);
1771 return lookupKey(db
,key
);
1774 static robj
*lookupKeyWrite(redisDb
*db
, robj
*key
) {
1775 deleteIfVolatile(db
,key
);
1776 return lookupKey(db
,key
);
1779 static int deleteKey(redisDb
*db
, robj
*key
) {
1782 /* We need to protect key from destruction: after the first dictDelete()
1783 * it may happen that 'key' is no longer valid if we don't increment
1784 * it's count. This may happen when we get the object reference directly
1785 * from the hash table with dictRandomKey() or dict iterators */
1787 if (dictSize(db
->expires
)) dictDelete(db
->expires
,key
);
1788 retval
= dictDelete(db
->dict
,key
);
1791 return retval
== DICT_OK
;
1794 /* Try to share an object against the shared objects pool */
1795 static robj
*tryObjectSharing(robj
*o
) {
1796 struct dictEntry
*de
;
1799 if (o
== NULL
|| server
.shareobjects
== 0) return o
;
1801 assert(o
->type
== REDIS_STRING
);
1802 de
= dictFind(server
.sharingpool
,o
);
1804 robj
*shared
= dictGetEntryKey(de
);
1806 c
= ((unsigned long) dictGetEntryVal(de
))+1;
1807 dictGetEntryVal(de
) = (void*) c
;
1808 incrRefCount(shared
);
1812 /* Here we are using a stream algorihtm: Every time an object is
1813 * shared we increment its count, everytime there is a miss we
1814 * recrement the counter of a random object. If this object reaches
1815 * zero we remove the object and put the current object instead. */
1816 if (dictSize(server
.sharingpool
) >=
1817 server
.sharingpoolsize
) {
1818 de
= dictGetRandomKey(server
.sharingpool
);
1820 c
= ((unsigned long) dictGetEntryVal(de
))-1;
1821 dictGetEntryVal(de
) = (void*) c
;
1823 dictDelete(server
.sharingpool
,de
->key
);
1826 c
= 0; /* If the pool is empty we want to add this object */
1831 retval
= dictAdd(server
.sharingpool
,o
,(void*)1);
1832 assert(retval
== DICT_OK
);
1839 /* Try to encode a string object in order to save space */
1840 static int tryObjectEncoding(robj
*o
) {
1842 char *endptr
, buf
[32];
1845 if (o
->encoding
!= REDIS_ENCODING_RAW
)
1846 return REDIS_ERR
; /* Already encoded */
1848 /* It's not save to encode shared objects: shared objects can be shared
1849 * everywhere in the "object space" of Redis. Encoded objects can only
1850 * appear as "values" (and not, for instance, as keys) */
1851 if (o
->refcount
> 1) return REDIS_ERR
;
1853 /* Currently we try to encode only strings */
1854 assert(o
->type
== REDIS_STRING
);
1856 /* Check if it's possible to encode this value as a long. We are assuming
1857 * that sizeof(long) = sizeof(void) in all the supported archs. */
1858 value
= strtol(s
, &endptr
, 10);
1859 if (endptr
[0] != '\0') return REDIS_ERR
;
1860 snprintf(buf
,32,"%ld",value
);
1862 /* If the number converted back into a string is not identical
1863 * then it's not possible to encode the string as integer */
1864 if (strlen(buf
) != sdslen(s
) || memcmp(buf
,s
,sdslen(s
))) return REDIS_ERR
;
1866 /* Ok, this object can be encoded */
1867 o
->encoding
= REDIS_ENCODING_INT
;
1869 o
->ptr
= (void*) value
;
1873 /* Get a decoded version of an encoded object (returned as a new object) */
1874 static robj
*getDecodedObject(const robj
*o
) {
1877 assert(o
->encoding
!= REDIS_ENCODING_RAW
);
1878 if (o
->type
== REDIS_STRING
&& o
->encoding
== REDIS_ENCODING_INT
) {
1881 snprintf(buf
,32,"%ld",(long)o
->ptr
);
1882 dec
= createStringObject(buf
,strlen(buf
));
1889 /*============================ DB saving/loading ============================ */
1891 static int rdbSaveType(FILE *fp
, unsigned char type
) {
1892 if (fwrite(&type
,1,1,fp
) == 0) return -1;
1896 static int rdbSaveTime(FILE *fp
, time_t t
) {
1897 int32_t t32
= (int32_t) t
;
1898 if (fwrite(&t32
,4,1,fp
) == 0) return -1;
1902 /* check rdbLoadLen() comments for more info */
1903 static int rdbSaveLen(FILE *fp
, uint32_t len
) {
1904 unsigned char buf
[2];
1907 /* Save a 6 bit len */
1908 buf
[0] = (len
&0xFF)|(REDIS_RDB_6BITLEN
<<6);
1909 if (fwrite(buf
,1,1,fp
) == 0) return -1;
1910 } else if (len
< (1<<14)) {
1911 /* Save a 14 bit len */
1912 buf
[0] = ((len
>>8)&0xFF)|(REDIS_RDB_14BITLEN
<<6);
1914 if (fwrite(buf
,2,1,fp
) == 0) return -1;
1916 /* Save a 32 bit len */
1917 buf
[0] = (REDIS_RDB_32BITLEN
<<6);
1918 if (fwrite(buf
,1,1,fp
) == 0) return -1;
1920 if (fwrite(&len
,4,1,fp
) == 0) return -1;
1925 /* String objects in the form "2391" "-100" without any space and with a
1926 * range of values that can fit in an 8, 16 or 32 bit signed value can be
1927 * encoded as integers to save space */
1928 static int rdbTryIntegerEncoding(sds s
, unsigned char *enc
) {
1930 char *endptr
, buf
[32];
1932 /* Check if it's possible to encode this value as a number */
1933 value
= strtoll(s
, &endptr
, 10);
1934 if (endptr
[0] != '\0') return 0;
1935 snprintf(buf
,32,"%lld",value
);
1937 /* If the number converted back into a string is not identical
1938 * then it's not possible to encode the string as integer */
1939 if (strlen(buf
) != sdslen(s
) || memcmp(buf
,s
,sdslen(s
))) return 0;
1941 /* Finally check if it fits in our ranges */
1942 if (value
>= -(1<<7) && value
<= (1<<7)-1) {
1943 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT8
;
1944 enc
[1] = value
&0xFF;
1946 } else if (value
>= -(1<<15) && value
<= (1<<15)-1) {
1947 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT16
;
1948 enc
[1] = value
&0xFF;
1949 enc
[2] = (value
>>8)&0xFF;
1951 } else if (value
>= -((long long)1<<31) && value
<= ((long long)1<<31)-1) {
1952 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT32
;
1953 enc
[1] = value
&0xFF;
1954 enc
[2] = (value
>>8)&0xFF;
1955 enc
[3] = (value
>>16)&0xFF;
1956 enc
[4] = (value
>>24)&0xFF;
1963 static int rdbSaveLzfStringObject(FILE *fp
, robj
*obj
) {
1964 unsigned int comprlen
, outlen
;
1968 /* We require at least four bytes compression for this to be worth it */
1969 outlen
= sdslen(obj
->ptr
)-4;
1970 if (outlen
<= 0) return 0;
1971 if ((out
= zmalloc(outlen
+1)) == NULL
) return 0;
1972 comprlen
= lzf_compress(obj
->ptr
, sdslen(obj
->ptr
), out
, outlen
);
1973 if (comprlen
== 0) {
1977 /* Data compressed! Let's save it on disk */
1978 byte
= (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_LZF
;
1979 if (fwrite(&byte
,1,1,fp
) == 0) goto writeerr
;
1980 if (rdbSaveLen(fp
,comprlen
) == -1) goto writeerr
;
1981 if (rdbSaveLen(fp
,sdslen(obj
->ptr
)) == -1) goto writeerr
;
1982 if (fwrite(out
,comprlen
,1,fp
) == 0) goto writeerr
;
1991 /* Save a string objet as [len][data] on disk. If the object is a string
1992 * representation of an integer value we try to safe it in a special form */
1993 static int rdbSaveStringObjectRaw(FILE *fp
, robj
*obj
) {
1997 len
= sdslen(obj
->ptr
);
1999 /* Try integer encoding */
2001 unsigned char buf
[5];
2002 if ((enclen
= rdbTryIntegerEncoding(obj
->ptr
,buf
)) > 0) {
2003 if (fwrite(buf
,enclen
,1,fp
) == 0) return -1;
2008 /* Try LZF compression - under 20 bytes it's unable to compress even
2009 * aaaaaaaaaaaaaaaaaa so skip it */
2013 retval
= rdbSaveLzfStringObject(fp
,obj
);
2014 if (retval
== -1) return -1;
2015 if (retval
> 0) return 0;
2016 /* retval == 0 means data can't be compressed, save the old way */
2019 /* Store verbatim */
2020 if (rdbSaveLen(fp
,len
) == -1) return -1;
2021 if (len
&& fwrite(obj
->ptr
,len
,1,fp
) == 0) return -1;
2025 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2026 static int rdbSaveStringObject(FILE *fp
, robj
*obj
) {
2030 if (obj
->encoding
!= REDIS_ENCODING_RAW
) {
2031 dec
= getDecodedObject(obj
);
2032 retval
= rdbSaveStringObjectRaw(fp
,dec
);
2036 return rdbSaveStringObjectRaw(fp
,obj
);
2040 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2041 static int rdbSave(char *filename
) {
2042 dictIterator
*di
= NULL
;
2047 time_t now
= time(NULL
);
2049 snprintf(tmpfile
,256,"temp-%d.rdb", (int) getpid());
2050 fp
= fopen(tmpfile
,"w");
2052 redisLog(REDIS_WARNING
, "Failed saving the DB: %s", strerror(errno
));
2055 if (fwrite("REDIS0001",9,1,fp
) == 0) goto werr
;
2056 for (j
= 0; j
< server
.dbnum
; j
++) {
2057 redisDb
*db
= server
.db
+j
;
2059 if (dictSize(d
) == 0) continue;
2060 di
= dictGetIterator(d
);
2066 /* Write the SELECT DB opcode */
2067 if (rdbSaveType(fp
,REDIS_SELECTDB
) == -1) goto werr
;
2068 if (rdbSaveLen(fp
,j
) == -1) goto werr
;
2070 /* Iterate this DB writing every entry */
2071 while((de
= dictNext(di
)) != NULL
) {
2072 robj
*key
= dictGetEntryKey(de
);
2073 robj
*o
= dictGetEntryVal(de
);
2074 time_t expiretime
= getExpire(db
,key
);
2076 /* Save the expire time */
2077 if (expiretime
!= -1) {
2078 /* If this key is already expired skip it */
2079 if (expiretime
< now
) continue;
2080 if (rdbSaveType(fp
,REDIS_EXPIRETIME
) == -1) goto werr
;
2081 if (rdbSaveTime(fp
,expiretime
) == -1) goto werr
;
2083 /* Save the key and associated value */
2084 if (rdbSaveType(fp
,o
->type
) == -1) goto werr
;
2085 if (rdbSaveStringObject(fp
,key
) == -1) goto werr
;
2086 if (o
->type
== REDIS_STRING
) {
2087 /* Save a string value */
2088 if (rdbSaveStringObject(fp
,o
) == -1) goto werr
;
2089 } else if (o
->type
== REDIS_LIST
) {
2090 /* Save a list value */
2091 list
*list
= o
->ptr
;
2095 if (rdbSaveLen(fp
,listLength(list
)) == -1) goto werr
;
2096 while((ln
= listYield(list
))) {
2097 robj
*eleobj
= listNodeValue(ln
);
2099 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
2101 } else if (o
->type
== REDIS_SET
) {
2102 /* Save a set value */
2104 dictIterator
*di
= dictGetIterator(set
);
2107 if (!set
) oom("dictGetIteraotr");
2108 if (rdbSaveLen(fp
,dictSize(set
)) == -1) goto werr
;
2109 while((de
= dictNext(di
)) != NULL
) {
2110 robj
*eleobj
= dictGetEntryKey(de
);
2112 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
2114 dictReleaseIterator(di
);
2119 dictReleaseIterator(di
);
2122 if (rdbSaveType(fp
,REDIS_EOF
) == -1) goto werr
;
2124 /* Make sure data will not remain on the OS's output buffers */
2129 /* Use RENAME to make sure the DB file is changed atomically only
2130 * if the generate DB file is ok. */
2131 if (rename(tmpfile
,filename
) == -1) {
2132 redisLog(REDIS_WARNING
,"Error moving temp DB file on the final destionation: %s", strerror(errno
));
2136 redisLog(REDIS_NOTICE
,"DB saved on disk");
2138 server
.lastsave
= time(NULL
);
2144 redisLog(REDIS_WARNING
,"Write error saving DB on disk: %s", strerror(errno
));
2145 if (di
) dictReleaseIterator(di
);
2149 static int rdbSaveBackground(char *filename
) {
2152 if (server
.bgsaveinprogress
) return REDIS_ERR
;
2153 if ((childpid
= fork()) == 0) {
2156 if (rdbSave(filename
) == REDIS_OK
) {
2163 if (childpid
== -1) {
2164 redisLog(REDIS_WARNING
,"Can't save in background: fork: %s",
2168 redisLog(REDIS_NOTICE
,"Background saving started by pid %d",childpid
);
2169 server
.bgsaveinprogress
= 1;
2170 server
.bgsavechildpid
= childpid
;
2173 return REDIS_OK
; /* unreached */
2176 static void rdbRemoveTempFile(pid_t childpid
) {
2179 snprintf(tmpfile
,256,"temp-%d.rdb", (int) childpid
);
2183 static int rdbLoadType(FILE *fp
) {
2185 if (fread(&type
,1,1,fp
) == 0) return -1;
2189 static time_t rdbLoadTime(FILE *fp
) {
2191 if (fread(&t32
,4,1,fp
) == 0) return -1;
2192 return (time_t) t32
;
2195 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2196 * of this file for a description of how this are stored on disk.
2198 * isencoded is set to 1 if the readed length is not actually a length but
2199 * an "encoding type", check the above comments for more info */
2200 static uint32_t rdbLoadLen(FILE *fp
, int rdbver
, int *isencoded
) {
2201 unsigned char buf
[2];
2204 if (isencoded
) *isencoded
= 0;
2206 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
2211 if (fread(buf
,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2212 type
= (buf
[0]&0xC0)>>6;
2213 if (type
== REDIS_RDB_6BITLEN
) {
2214 /* Read a 6 bit len */
2216 } else if (type
== REDIS_RDB_ENCVAL
) {
2217 /* Read a 6 bit len encoding type */
2218 if (isencoded
) *isencoded
= 1;
2220 } else if (type
== REDIS_RDB_14BITLEN
) {
2221 /* Read a 14 bit len */
2222 if (fread(buf
+1,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2223 return ((buf
[0]&0x3F)<<8)|buf
[1];
2225 /* Read a 32 bit len */
2226 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
2232 static robj
*rdbLoadIntegerObject(FILE *fp
, int enctype
) {
2233 unsigned char enc
[4];
2236 if (enctype
== REDIS_RDB_ENC_INT8
) {
2237 if (fread(enc
,1,1,fp
) == 0) return NULL
;
2238 val
= (signed char)enc
[0];
2239 } else if (enctype
== REDIS_RDB_ENC_INT16
) {
2241 if (fread(enc
,2,1,fp
) == 0) return NULL
;
2242 v
= enc
[0]|(enc
[1]<<8);
2244 } else if (enctype
== REDIS_RDB_ENC_INT32
) {
2246 if (fread(enc
,4,1,fp
) == 0) return NULL
;
2247 v
= enc
[0]|(enc
[1]<<8)|(enc
[2]<<16)|(enc
[3]<<24);
2250 val
= 0; /* anti-warning */
2253 return createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",val
));
2256 static robj
*rdbLoadLzfStringObject(FILE*fp
, int rdbver
) {
2257 unsigned int len
, clen
;
2258 unsigned char *c
= NULL
;
2261 if ((clen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2262 if ((len
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2263 if ((c
= zmalloc(clen
)) == NULL
) goto err
;
2264 if ((val
= sdsnewlen(NULL
,len
)) == NULL
) goto err
;
2265 if (fread(c
,clen
,1,fp
) == 0) goto err
;
2266 if (lzf_decompress(c
,clen
,val
,len
) == 0) goto err
;
2268 return createObject(REDIS_STRING
,val
);
2275 static robj
*rdbLoadStringObject(FILE*fp
, int rdbver
) {
2280 len
= rdbLoadLen(fp
,rdbver
,&isencoded
);
2283 case REDIS_RDB_ENC_INT8
:
2284 case REDIS_RDB_ENC_INT16
:
2285 case REDIS_RDB_ENC_INT32
:
2286 return tryObjectSharing(rdbLoadIntegerObject(fp
,len
));
2287 case REDIS_RDB_ENC_LZF
:
2288 return tryObjectSharing(rdbLoadLzfStringObject(fp
,rdbver
));
2294 if (len
== REDIS_RDB_LENERR
) return NULL
;
2295 val
= sdsnewlen(NULL
,len
);
2296 if (len
&& fread(val
,len
,1,fp
) == 0) {
2300 return tryObjectSharing(createObject(REDIS_STRING
,val
));
2303 static int rdbLoad(char *filename
) {
2305 robj
*keyobj
= NULL
;
2307 int type
, retval
, rdbver
;
2308 dict
*d
= server
.db
[0].dict
;
2309 redisDb
*db
= server
.db
+0;
2311 time_t expiretime
= -1, now
= time(NULL
);
2313 fp
= fopen(filename
,"r");
2314 if (!fp
) return REDIS_ERR
;
2315 if (fread(buf
,9,1,fp
) == 0) goto eoferr
;
2317 if (memcmp(buf
,"REDIS",5) != 0) {
2319 redisLog(REDIS_WARNING
,"Wrong signature trying to load DB from file");
2322 rdbver
= atoi(buf
+5);
2325 redisLog(REDIS_WARNING
,"Can't handle RDB format version %d",rdbver
);
2332 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
2333 if (type
== REDIS_EXPIRETIME
) {
2334 if ((expiretime
= rdbLoadTime(fp
)) == -1) goto eoferr
;
2335 /* We read the time so we need to read the object type again */
2336 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
2338 if (type
== REDIS_EOF
) break;
2339 /* Handle SELECT DB opcode as a special case */
2340 if (type
== REDIS_SELECTDB
) {
2341 if ((dbid
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2343 if (dbid
>= (unsigned)server
.dbnum
) {
2344 redisLog(REDIS_WARNING
,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server
.dbnum
);
2347 db
= server
.db
+dbid
;
2352 if ((keyobj
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2354 if (type
== REDIS_STRING
) {
2355 /* Read string value */
2356 if ((o
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2357 tryObjectEncoding(o
);
2358 } else if (type
== REDIS_LIST
|| type
== REDIS_SET
) {
2359 /* Read list/set value */
2362 if ((listlen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2364 o
= (type
== REDIS_LIST
) ? createListObject() : createSetObject();
2365 /* Load every single element of the list/set */
2369 if ((ele
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2370 tryObjectEncoding(ele
);
2371 if (type
== REDIS_LIST
) {
2372 if (!listAddNodeTail((list
*)o
->ptr
,ele
))
2373 oom("listAddNodeTail");
2375 if (dictAdd((dict
*)o
->ptr
,ele
,NULL
) == DICT_ERR
)
2382 /* Add the new object in the hash table */
2383 retval
= dictAdd(d
,keyobj
,o
);
2384 if (retval
== DICT_ERR
) {
2385 redisLog(REDIS_WARNING
,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj
->ptr
);
2388 /* Set the expire time if needed */
2389 if (expiretime
!= -1) {
2390 setExpire(db
,keyobj
,expiretime
);
2391 /* Delete this key if already expired */
2392 if (expiretime
< now
) deleteKey(db
,keyobj
);
2400 eoferr
: /* unexpected end of file is handled here with a fatal exit */
2401 if (keyobj
) decrRefCount(keyobj
);
2402 redisLog(REDIS_WARNING
,"Short read or OOM loading DB. Unrecoverable error, exiting now.");
2404 return REDIS_ERR
; /* Just to avoid warning */
2407 /*================================== Commands =============================== */
2409 static void authCommand(redisClient
*c
) {
2410 if (!server
.requirepass
|| !strcmp(c
->argv
[1]->ptr
, server
.requirepass
)) {
2411 c
->authenticated
= 1;
2412 addReply(c
,shared
.ok
);
2414 c
->authenticated
= 0;
2415 addReply(c
,shared
.err
);
2419 static void pingCommand(redisClient
*c
) {
2420 addReply(c
,shared
.pong
);
2423 static void echoCommand(redisClient
*c
) {
2424 addReplyBulkLen(c
,c
->argv
[1]);
2425 addReply(c
,c
->argv
[1]);
2426 addReply(c
,shared
.crlf
);
2429 /*=================================== Strings =============================== */
2431 static void setGenericCommand(redisClient
*c
, int nx
) {
2434 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2435 if (retval
== DICT_ERR
) {
2437 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2438 incrRefCount(c
->argv
[2]);
2440 addReply(c
,shared
.czero
);
2444 incrRefCount(c
->argv
[1]);
2445 incrRefCount(c
->argv
[2]);
2448 removeExpire(c
->db
,c
->argv
[1]);
2449 addReply(c
, nx
? shared
.cone
: shared
.ok
);
2452 static void setCommand(redisClient
*c
) {
2453 setGenericCommand(c
,0);
2456 static void setnxCommand(redisClient
*c
) {
2457 setGenericCommand(c
,1);
2460 static void getCommand(redisClient
*c
) {
2461 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2464 addReply(c
,shared
.nullbulk
);
2466 if (o
->type
!= REDIS_STRING
) {
2467 addReply(c
,shared
.wrongtypeerr
);
2469 addReplyBulkLen(c
,o
);
2471 addReply(c
,shared
.crlf
);
2476 static void getSetCommand(redisClient
*c
) {
2478 if (dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]) == DICT_ERR
) {
2479 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2481 incrRefCount(c
->argv
[1]);
2483 incrRefCount(c
->argv
[2]);
2485 removeExpire(c
->db
,c
->argv
[1]);
2488 static void mgetCommand(redisClient
*c
) {
2491 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",c
->argc
-1));
2492 for (j
= 1; j
< c
->argc
; j
++) {
2493 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[j
]);
2495 addReply(c
,shared
.nullbulk
);
2497 if (o
->type
!= REDIS_STRING
) {
2498 addReply(c
,shared
.nullbulk
);
2500 addReplyBulkLen(c
,o
);
2502 addReply(c
,shared
.crlf
);
2508 static void incrDecrCommand(redisClient
*c
, long long incr
) {
2513 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2517 if (o
->type
!= REDIS_STRING
) {
2522 if (o
->encoding
== REDIS_ENCODING_RAW
)
2523 value
= strtoll(o
->ptr
, &eptr
, 10);
2524 else if (o
->encoding
== REDIS_ENCODING_INT
)
2525 value
= (long)o
->ptr
;
2532 o
= createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",value
));
2533 tryObjectEncoding(o
);
2534 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],o
);
2535 if (retval
== DICT_ERR
) {
2536 dictReplace(c
->db
->dict
,c
->argv
[1],o
);
2537 removeExpire(c
->db
,c
->argv
[1]);
2539 incrRefCount(c
->argv
[1]);
2542 addReply(c
,shared
.colon
);
2544 addReply(c
,shared
.crlf
);
2547 static void incrCommand(redisClient
*c
) {
2548 incrDecrCommand(c
,1);
2551 static void decrCommand(redisClient
*c
) {
2552 incrDecrCommand(c
,-1);
2555 static void incrbyCommand(redisClient
*c
) {
2556 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
2557 incrDecrCommand(c
,incr
);
2560 static void decrbyCommand(redisClient
*c
) {
2561 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
2562 incrDecrCommand(c
,-incr
);
2565 /* ========================= Type agnostic commands ========================= */
2567 static void delCommand(redisClient
*c
) {
2570 for (j
= 1; j
< c
->argc
; j
++) {
2571 if (deleteKey(c
->db
,c
->argv
[j
])) {
2578 addReply(c
,shared
.czero
);
2581 addReply(c
,shared
.cone
);
2584 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",deleted
));
2589 static void existsCommand(redisClient
*c
) {
2590 addReply(c
,lookupKeyRead(c
->db
,c
->argv
[1]) ? shared
.cone
: shared
.czero
);
2593 static void selectCommand(redisClient
*c
) {
2594 int id
= atoi(c
->argv
[1]->ptr
);
2596 if (selectDb(c
,id
) == REDIS_ERR
) {
2597 addReplySds(c
,sdsnew("-ERR invalid DB index\r\n"));
2599 addReply(c
,shared
.ok
);
2603 static void randomkeyCommand(redisClient
*c
) {
2607 de
= dictGetRandomKey(c
->db
->dict
);
2608 if (!de
|| expireIfNeeded(c
->db
,dictGetEntryKey(de
)) == 0) break;
2611 addReply(c
,shared
.plus
);
2612 addReply(c
,shared
.crlf
);
2614 addReply(c
,shared
.plus
);
2615 addReply(c
,dictGetEntryKey(de
));
2616 addReply(c
,shared
.crlf
);
2620 static void keysCommand(redisClient
*c
) {
2623 sds pattern
= c
->argv
[1]->ptr
;
2624 int plen
= sdslen(pattern
);
2625 int numkeys
= 0, keyslen
= 0;
2626 robj
*lenobj
= createObject(REDIS_STRING
,NULL
);
2628 di
= dictGetIterator(c
->db
->dict
);
2629 if (!di
) oom("dictGetIterator");
2631 decrRefCount(lenobj
);
2632 while((de
= dictNext(di
)) != NULL
) {
2633 robj
*keyobj
= dictGetEntryKey(de
);
2635 sds key
= keyobj
->ptr
;
2636 if ((pattern
[0] == '*' && pattern
[1] == '\0') ||
2637 stringmatchlen(pattern
,plen
,key
,sdslen(key
),0)) {
2638 if (expireIfNeeded(c
->db
,keyobj
) == 0) {
2640 addReply(c
,shared
.space
);
2643 keyslen
+= sdslen(key
);
2647 dictReleaseIterator(di
);
2648 lenobj
->ptr
= sdscatprintf(sdsempty(),"$%lu\r\n",keyslen
+(numkeys
? (numkeys
-1) : 0));
2649 addReply(c
,shared
.crlf
);
2652 static void dbsizeCommand(redisClient
*c
) {
2654 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c
->db
->dict
)));
2657 static void lastsaveCommand(redisClient
*c
) {
2659 sdscatprintf(sdsempty(),":%lu\r\n",server
.lastsave
));
2662 static void typeCommand(redisClient
*c
) {
2666 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2671 case REDIS_STRING
: type
= "+string"; break;
2672 case REDIS_LIST
: type
= "+list"; break;
2673 case REDIS_SET
: type
= "+set"; break;
2674 default: type
= "unknown"; break;
2677 addReplySds(c
,sdsnew(type
));
2678 addReply(c
,shared
.crlf
);
2681 static void saveCommand(redisClient
*c
) {
2682 if (server
.bgsaveinprogress
) {
2683 addReplySds(c
,sdsnew("-ERR background save in progress\r\n"));
2686 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
2687 addReply(c
,shared
.ok
);
2689 addReply(c
,shared
.err
);
2693 static void bgsaveCommand(redisClient
*c
) {
2694 if (server
.bgsaveinprogress
) {
2695 addReplySds(c
,sdsnew("-ERR background save already in progress\r\n"));
2698 if (rdbSaveBackground(server
.dbfilename
) == REDIS_OK
) {
2699 addReply(c
,shared
.ok
);
2701 addReply(c
,shared
.err
);
2705 static void shutdownCommand(redisClient
*c
) {
2706 redisLog(REDIS_WARNING
,"User requested shutdown, saving DB...");
2707 /* Kill the saving child if there is a background saving in progress.
2708 We want to avoid race conditions, for instance our saving child may
2709 overwrite the synchronous saving did by SHUTDOWN. */
2710 if (server
.bgsaveinprogress
) {
2711 redisLog(REDIS_WARNING
,"There is a live saving child. Killing it!");
2712 kill(server
.bgsavechildpid
,SIGKILL
);
2713 rdbRemoveTempFile(server
.bgsavechildpid
);
2716 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
2717 if (server
.daemonize
)
2718 unlink(server
.pidfile
);
2719 redisLog(REDIS_WARNING
,"%zu bytes used at exit",zmalloc_used_memory());
2720 redisLog(REDIS_WARNING
,"Server exit now, bye bye...");
2723 /* Ooops.. error saving! The best we can do is to continue operating.
2724 * Note that if there was a background saving process, in the next
2725 * cron() Redis will be notified that the background saving aborted,
2726 * handling special stuff like slaves pending for synchronization... */
2727 redisLog(REDIS_WARNING
,"Error trying to save the DB, can't exit");
2728 addReplySds(c
,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
2732 static void renameGenericCommand(redisClient
*c
, int nx
) {
2735 /* To use the same key as src and dst is probably an error */
2736 if (sdscmp(c
->argv
[1]->ptr
,c
->argv
[2]->ptr
) == 0) {
2737 addReply(c
,shared
.sameobjecterr
);
2741 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2743 addReply(c
,shared
.nokeyerr
);
2747 deleteIfVolatile(c
->db
,c
->argv
[2]);
2748 if (dictAdd(c
->db
->dict
,c
->argv
[2],o
) == DICT_ERR
) {
2751 addReply(c
,shared
.czero
);
2754 dictReplace(c
->db
->dict
,c
->argv
[2],o
);
2756 incrRefCount(c
->argv
[2]);
2758 deleteKey(c
->db
,c
->argv
[1]);
2760 addReply(c
,nx
? shared
.cone
: shared
.ok
);
2763 static void renameCommand(redisClient
*c
) {
2764 renameGenericCommand(c
,0);
2767 static void renamenxCommand(redisClient
*c
) {
2768 renameGenericCommand(c
,1);
2771 static void moveCommand(redisClient
*c
) {
2776 /* Obtain source and target DB pointers */
2779 if (selectDb(c
,atoi(c
->argv
[2]->ptr
)) == REDIS_ERR
) {
2780 addReply(c
,shared
.outofrangeerr
);
2784 selectDb(c
,srcid
); /* Back to the source DB */
2786 /* If the user is moving using as target the same
2787 * DB as the source DB it is probably an error. */
2789 addReply(c
,shared
.sameobjecterr
);
2793 /* Check if the element exists and get a reference */
2794 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2796 addReply(c
,shared
.czero
);
2800 /* Try to add the element to the target DB */
2801 deleteIfVolatile(dst
,c
->argv
[1]);
2802 if (dictAdd(dst
->dict
,c
->argv
[1],o
) == DICT_ERR
) {
2803 addReply(c
,shared
.czero
);
2806 incrRefCount(c
->argv
[1]);
2809 /* OK! key moved, free the entry in the source DB */
2810 deleteKey(src
,c
->argv
[1]);
2812 addReply(c
,shared
.cone
);
2815 /* =================================== Lists ================================ */
2816 static void pushGenericCommand(redisClient
*c
, int where
) {
2820 lobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2822 lobj
= createListObject();
2824 if (where
== REDIS_HEAD
) {
2825 if (!listAddNodeHead(list
,c
->argv
[2])) oom("listAddNodeHead");
2827 if (!listAddNodeTail(list
,c
->argv
[2])) oom("listAddNodeTail");
2829 dictAdd(c
->db
->dict
,c
->argv
[1],lobj
);
2830 incrRefCount(c
->argv
[1]);
2831 incrRefCount(c
->argv
[2]);
2833 if (lobj
->type
!= REDIS_LIST
) {
2834 addReply(c
,shared
.wrongtypeerr
);
2838 if (where
== REDIS_HEAD
) {
2839 if (!listAddNodeHead(list
,c
->argv
[2])) oom("listAddNodeHead");
2841 if (!listAddNodeTail(list
,c
->argv
[2])) oom("listAddNodeTail");
2843 incrRefCount(c
->argv
[2]);
2846 addReply(c
,shared
.ok
);
2849 static void lpushCommand(redisClient
*c
) {
2850 pushGenericCommand(c
,REDIS_HEAD
);
2853 static void rpushCommand(redisClient
*c
) {
2854 pushGenericCommand(c
,REDIS_TAIL
);
2857 static void llenCommand(redisClient
*c
) {
2861 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2863 addReply(c
,shared
.czero
);
2866 if (o
->type
!= REDIS_LIST
) {
2867 addReply(c
,shared
.wrongtypeerr
);
2870 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",listLength(l
)));
2875 static void lindexCommand(redisClient
*c
) {
2877 int index
= atoi(c
->argv
[2]->ptr
);
2879 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2881 addReply(c
,shared
.nullbulk
);
2883 if (o
->type
!= REDIS_LIST
) {
2884 addReply(c
,shared
.wrongtypeerr
);
2886 list
*list
= o
->ptr
;
2889 ln
= listIndex(list
, index
);
2891 addReply(c
,shared
.nullbulk
);
2893 robj
*ele
= listNodeValue(ln
);
2894 addReplyBulkLen(c
,ele
);
2896 addReply(c
,shared
.crlf
);
2902 static void lsetCommand(redisClient
*c
) {
2904 int index
= atoi(c
->argv
[2]->ptr
);
2906 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2908 addReply(c
,shared
.nokeyerr
);
2910 if (o
->type
!= REDIS_LIST
) {
2911 addReply(c
,shared
.wrongtypeerr
);
2913 list
*list
= o
->ptr
;
2916 ln
= listIndex(list
, index
);
2918 addReply(c
,shared
.outofrangeerr
);
2920 robj
*ele
= listNodeValue(ln
);
2923 listNodeValue(ln
) = c
->argv
[3];
2924 incrRefCount(c
->argv
[3]);
2925 addReply(c
,shared
.ok
);
2932 static void popGenericCommand(redisClient
*c
, int where
) {
2935 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2937 addReply(c
,shared
.nullbulk
);
2939 if (o
->type
!= REDIS_LIST
) {
2940 addReply(c
,shared
.wrongtypeerr
);
2942 list
*list
= o
->ptr
;
2945 if (where
== REDIS_HEAD
)
2946 ln
= listFirst(list
);
2948 ln
= listLast(list
);
2951 addReply(c
,shared
.nullbulk
);
2953 robj
*ele
= listNodeValue(ln
);
2954 addReplyBulkLen(c
,ele
);
2956 addReply(c
,shared
.crlf
);
2957 listDelNode(list
,ln
);
2964 static void lpopCommand(redisClient
*c
) {
2965 popGenericCommand(c
,REDIS_HEAD
);
2968 static void rpopCommand(redisClient
*c
) {
2969 popGenericCommand(c
,REDIS_TAIL
);
2972 static void lrangeCommand(redisClient
*c
) {
2974 int start
= atoi(c
->argv
[2]->ptr
);
2975 int end
= atoi(c
->argv
[3]->ptr
);
2977 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2979 addReply(c
,shared
.nullmultibulk
);
2981 if (o
->type
!= REDIS_LIST
) {
2982 addReply(c
,shared
.wrongtypeerr
);
2984 list
*list
= o
->ptr
;
2986 int llen
= listLength(list
);
2990 /* convert negative indexes */
2991 if (start
< 0) start
= llen
+start
;
2992 if (end
< 0) end
= llen
+end
;
2993 if (start
< 0) start
= 0;
2994 if (end
< 0) end
= 0;
2996 /* indexes sanity checks */
2997 if (start
> end
|| start
>= llen
) {
2998 /* Out of range start or start > end result in empty list */
2999 addReply(c
,shared
.emptymultibulk
);
3002 if (end
>= llen
) end
= llen
-1;
3003 rangelen
= (end
-start
)+1;
3005 /* Return the result in form of a multi-bulk reply */
3006 ln
= listIndex(list
, start
);
3007 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",rangelen
));
3008 for (j
= 0; j
< rangelen
; j
++) {
3009 ele
= listNodeValue(ln
);
3010 addReplyBulkLen(c
,ele
);
3012 addReply(c
,shared
.crlf
);
3019 static void ltrimCommand(redisClient
*c
) {
3021 int start
= atoi(c
->argv
[2]->ptr
);
3022 int end
= atoi(c
->argv
[3]->ptr
);
3024 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3026 addReply(c
,shared
.nokeyerr
);
3028 if (o
->type
!= REDIS_LIST
) {
3029 addReply(c
,shared
.wrongtypeerr
);
3031 list
*list
= o
->ptr
;
3033 int llen
= listLength(list
);
3034 int j
, ltrim
, rtrim
;
3036 /* convert negative indexes */
3037 if (start
< 0) start
= llen
+start
;
3038 if (end
< 0) end
= llen
+end
;
3039 if (start
< 0) start
= 0;
3040 if (end
< 0) end
= 0;
3042 /* indexes sanity checks */
3043 if (start
> end
|| start
>= llen
) {
3044 /* Out of range start or start > end result in empty list */
3048 if (end
>= llen
) end
= llen
-1;
3053 /* Remove list elements to perform the trim */
3054 for (j
= 0; j
< ltrim
; j
++) {
3055 ln
= listFirst(list
);
3056 listDelNode(list
,ln
);
3058 for (j
= 0; j
< rtrim
; j
++) {
3059 ln
= listLast(list
);
3060 listDelNode(list
,ln
);
3063 addReply(c
,shared
.ok
);
3068 static void lremCommand(redisClient
*c
) {
3071 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3073 addReply(c
,shared
.czero
);
3075 if (o
->type
!= REDIS_LIST
) {
3076 addReply(c
,shared
.wrongtypeerr
);
3078 list
*list
= o
->ptr
;
3079 listNode
*ln
, *next
;
3080 int toremove
= atoi(c
->argv
[2]->ptr
);
3085 toremove
= -toremove
;
3088 ln
= fromtail
? list
->tail
: list
->head
;
3090 robj
*ele
= listNodeValue(ln
);
3092 next
= fromtail
? ln
->prev
: ln
->next
;
3093 if (sdscmp(ele
->ptr
,c
->argv
[3]->ptr
) == 0) {
3094 listDelNode(list
,ln
);
3097 if (toremove
&& removed
== toremove
) break;
3101 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",removed
));
3106 /* ==================================== Sets ================================ */
3108 static void saddCommand(redisClient
*c
) {
3111 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3113 set
= createSetObject();
3114 dictAdd(c
->db
->dict
,c
->argv
[1],set
);
3115 incrRefCount(c
->argv
[1]);
3117 if (set
->type
!= REDIS_SET
) {
3118 addReply(c
,shared
.wrongtypeerr
);
3122 if (dictAdd(set
->ptr
,c
->argv
[2],NULL
) == DICT_OK
) {
3123 incrRefCount(c
->argv
[2]);
3125 addReply(c
,shared
.cone
);
3127 addReply(c
,shared
.czero
);
3131 static void sremCommand(redisClient
*c
) {
3134 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3136 addReply(c
,shared
.czero
);
3138 if (set
->type
!= REDIS_SET
) {
3139 addReply(c
,shared
.wrongtypeerr
);
3142 if (dictDelete(set
->ptr
,c
->argv
[2]) == DICT_OK
) {
3144 if (htNeedsResize(set
->ptr
)) dictResize(set
->ptr
);
3145 addReply(c
,shared
.cone
);
3147 addReply(c
,shared
.czero
);
3152 static void smoveCommand(redisClient
*c
) {
3153 robj
*srcset
, *dstset
;
3155 srcset
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3156 dstset
= lookupKeyWrite(c
->db
,c
->argv
[2]);
3158 /* If the source key does not exist return 0, if it's of the wrong type
3160 if (srcset
== NULL
|| srcset
->type
!= REDIS_SET
) {
3161 addReply(c
, srcset
? shared
.wrongtypeerr
: shared
.czero
);
3164 /* Error if the destination key is not a set as well */
3165 if (dstset
&& dstset
->type
!= REDIS_SET
) {
3166 addReply(c
,shared
.wrongtypeerr
);
3169 /* Remove the element from the source set */
3170 if (dictDelete(srcset
->ptr
,c
->argv
[3]) == DICT_ERR
) {
3171 /* Key not found in the src set! return zero */
3172 addReply(c
,shared
.czero
);
3176 /* Add the element to the destination set */
3178 dstset
= createSetObject();
3179 dictAdd(c
->db
->dict
,c
->argv
[2],dstset
);
3180 incrRefCount(c
->argv
[2]);
3182 if (dictAdd(dstset
->ptr
,c
->argv
[3],NULL
) == DICT_OK
)
3183 incrRefCount(c
->argv
[3]);
3184 addReply(c
,shared
.cone
);
3187 static void sismemberCommand(redisClient
*c
) {
3190 set
= lookupKeyRead(c
->db
,c
->argv
[1]);
3192 addReply(c
,shared
.czero
);
3194 if (set
->type
!= REDIS_SET
) {
3195 addReply(c
,shared
.wrongtypeerr
);
3198 if (dictFind(set
->ptr
,c
->argv
[2]))
3199 addReply(c
,shared
.cone
);
3201 addReply(c
,shared
.czero
);
3205 static void scardCommand(redisClient
*c
) {
3209 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3211 addReply(c
,shared
.czero
);
3214 if (o
->type
!= REDIS_SET
) {
3215 addReply(c
,shared
.wrongtypeerr
);
3218 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",
3224 static void spopCommand(redisClient
*c
) {
3228 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3230 addReply(c
,shared
.nullbulk
);
3232 if (set
->type
!= REDIS_SET
) {
3233 addReply(c
,shared
.wrongtypeerr
);
3236 de
= dictGetRandomKey(set
->ptr
);
3238 addReply(c
,shared
.nullbulk
);
3240 robj
*ele
= dictGetEntryKey(de
);
3242 addReplyBulkLen(c
,ele
);
3244 addReply(c
,shared
.crlf
);
3245 dictDelete(set
->ptr
,ele
);
3246 if (htNeedsResize(set
->ptr
)) dictResize(set
->ptr
);
3252 static int qsortCompareSetsByCardinality(const void *s1
, const void *s2
) {
3253 dict
**d1
= (void*) s1
, **d2
= (void*) s2
;
3255 return dictSize(*d1
)-dictSize(*d2
);
3258 static void sinterGenericCommand(redisClient
*c
, robj
**setskeys
, int setsnum
, robj
*dstkey
) {
3259 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
3262 robj
*lenobj
= NULL
, *dstset
= NULL
;
3263 int j
, cardinality
= 0;
3265 if (!dv
) oom("sinterGenericCommand");
3266 for (j
= 0; j
< setsnum
; j
++) {
3270 lookupKeyWrite(c
->db
,setskeys
[j
]) :
3271 lookupKeyRead(c
->db
,setskeys
[j
]);
3275 deleteKey(c
->db
,dstkey
);
3276 addReply(c
,shared
.ok
);
3278 addReply(c
,shared
.nullmultibulk
);
3282 if (setobj
->type
!= REDIS_SET
) {
3284 addReply(c
,shared
.wrongtypeerr
);
3287 dv
[j
] = setobj
->ptr
;
3289 /* Sort sets from the smallest to largest, this will improve our
3290 * algorithm's performace */
3291 qsort(dv
,setsnum
,sizeof(dict
*),qsortCompareSetsByCardinality
);
3293 /* The first thing we should output is the total number of elements...
3294 * since this is a multi-bulk write, but at this stage we don't know
3295 * the intersection set size, so we use a trick, append an empty object
3296 * to the output list and save the pointer to later modify it with the
3299 lenobj
= createObject(REDIS_STRING
,NULL
);
3301 decrRefCount(lenobj
);
3303 /* If we have a target key where to store the resulting set
3304 * create this key with an empty set inside */
3305 dstset
= createSetObject();
3308 /* Iterate all the elements of the first (smallest) set, and test
3309 * the element against all the other sets, if at least one set does
3310 * not include the element it is discarded */
3311 di
= dictGetIterator(dv
[0]);
3312 if (!di
) oom("dictGetIterator");
3314 while((de
= dictNext(di
)) != NULL
) {
3317 for (j
= 1; j
< setsnum
; j
++)
3318 if (dictFind(dv
[j
],dictGetEntryKey(de
)) == NULL
) break;
3320 continue; /* at least one set does not contain the member */
3321 ele
= dictGetEntryKey(de
);
3323 addReplyBulkLen(c
,ele
);
3325 addReply(c
,shared
.crlf
);
3328 dictAdd(dstset
->ptr
,ele
,NULL
);
3332 dictReleaseIterator(di
);
3335 /* Store the resulting set into the target */
3336 deleteKey(c
->db
,dstkey
);
3337 dictAdd(c
->db
->dict
,dstkey
,dstset
);
3338 incrRefCount(dstkey
);
3342 lenobj
->ptr
= sdscatprintf(sdsempty(),"*%d\r\n",cardinality
);
3344 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",
3345 dictSize((dict
*)dstset
->ptr
)));
3351 static void sinterCommand(redisClient
*c
) {
3352 sinterGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
);
3355 static void sinterstoreCommand(redisClient
*c
) {
3356 sinterGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1]);
3359 #define REDIS_OP_UNION 0
3360 #define REDIS_OP_DIFF 1
3362 static void sunionDiffGenericCommand(redisClient
*c
, robj
**setskeys
, int setsnum
, robj
*dstkey
, int op
) {
3363 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
3366 robj
*dstset
= NULL
;
3367 int j
, cardinality
= 0;
3369 if (!dv
) oom("sunionDiffGenericCommand");
3370 for (j
= 0; j
< setsnum
; j
++) {
3374 lookupKeyWrite(c
->db
,setskeys
[j
]) :
3375 lookupKeyRead(c
->db
,setskeys
[j
]);
3380 if (setobj
->type
!= REDIS_SET
) {
3382 addReply(c
,shared
.wrongtypeerr
);
3385 dv
[j
] = setobj
->ptr
;
3388 /* We need a temp set object to store our union. If the dstkey
3389 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
3390 * this set object will be the resulting object to set into the target key*/
3391 dstset
= createSetObject();
3393 /* Iterate all the elements of all the sets, add every element a single
3394 * time to the result set */
3395 for (j
= 0; j
< setsnum
; j
++) {
3396 if (op
== REDIS_OP_DIFF
&& j
== 0 && !dv
[j
]) break; /* result set is empty */
3397 if (!dv
[j
]) continue; /* non existing keys are like empty sets */
3399 di
= dictGetIterator(dv
[j
]);
3400 if (!di
) oom("dictGetIterator");
3402 while((de
= dictNext(di
)) != NULL
) {
3405 /* dictAdd will not add the same element multiple times */
3406 ele
= dictGetEntryKey(de
);
3407 if (op
== REDIS_OP_UNION
|| j
== 0) {
3408 if (dictAdd(dstset
->ptr
,ele
,NULL
) == DICT_OK
) {
3412 } else if (op
== REDIS_OP_DIFF
) {
3413 if (dictDelete(dstset
->ptr
,ele
) == DICT_OK
) {
3418 dictReleaseIterator(di
);
3420 if (op
== REDIS_OP_DIFF
&& cardinality
== 0) break; /* result set is empty */
3423 /* Output the content of the resulting set, if not in STORE mode */
3425 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",cardinality
));
3426 di
= dictGetIterator(dstset
->ptr
);
3427 if (!di
) oom("dictGetIterator");
3428 while((de
= dictNext(di
)) != NULL
) {
3431 ele
= dictGetEntryKey(de
);
3432 addReplyBulkLen(c
,ele
);
3434 addReply(c
,shared
.crlf
);
3436 dictReleaseIterator(di
);
3438 /* If we have a target key where to store the resulting set
3439 * create this key with the result set inside */
3440 deleteKey(c
->db
,dstkey
);
3441 dictAdd(c
->db
->dict
,dstkey
,dstset
);
3442 incrRefCount(dstkey
);
3447 decrRefCount(dstset
);
3449 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",
3450 dictSize((dict
*)dstset
->ptr
)));
3456 static void sunionCommand(redisClient
*c
) {
3457 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_UNION
);
3460 static void sunionstoreCommand(redisClient
*c
) {
3461 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_UNION
);
3464 static void sdiffCommand(redisClient
*c
) {
3465 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_DIFF
);
3468 static void sdiffstoreCommand(redisClient
*c
) {
3469 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_DIFF
);
3472 static void flushdbCommand(redisClient
*c
) {
3473 server
.dirty
+= dictSize(c
->db
->dict
);
3474 dictEmpty(c
->db
->dict
);
3475 dictEmpty(c
->db
->expires
);
3476 addReply(c
,shared
.ok
);
3479 static void flushallCommand(redisClient
*c
) {
3480 server
.dirty
+= emptyDb();
3481 addReply(c
,shared
.ok
);
3482 rdbSave(server
.dbfilename
);
3486 static redisSortOperation
*createSortOperation(int type
, robj
*pattern
) {
3487 redisSortOperation
*so
= zmalloc(sizeof(*so
));
3488 if (!so
) oom("createSortOperation");
3490 so
->pattern
= pattern
;
3494 /* Return the value associated to the key with a name obtained
3495 * substituting the first occurence of '*' in 'pattern' with 'subst' */
3496 static robj
*lookupKeyByPattern(redisDb
*db
, robj
*pattern
, robj
*subst
) {
3500 int prefixlen
, sublen
, postfixlen
;
3501 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
3505 char buf
[REDIS_SORTKEY_MAX
+1];
3508 if (subst
->encoding
== REDIS_ENCODING_RAW
)
3509 incrRefCount(subst
);
3511 subst
= getDecodedObject(subst
);
3514 spat
= pattern
->ptr
;
3516 if (sdslen(spat
)+sdslen(ssub
)-1 > REDIS_SORTKEY_MAX
) return NULL
;
3517 p
= strchr(spat
,'*');
3518 if (!p
) return NULL
;
3521 sublen
= sdslen(ssub
);
3522 postfixlen
= sdslen(spat
)-(prefixlen
+1);
3523 memcpy(keyname
.buf
,spat
,prefixlen
);
3524 memcpy(keyname
.buf
+prefixlen
,ssub
,sublen
);
3525 memcpy(keyname
.buf
+prefixlen
+sublen
,p
+1,postfixlen
);
3526 keyname
.buf
[prefixlen
+sublen
+postfixlen
] = '\0';
3527 keyname
.len
= prefixlen
+sublen
+postfixlen
;
3529 keyobj
.refcount
= 1;
3530 keyobj
.type
= REDIS_STRING
;
3531 keyobj
.ptr
= ((char*)&keyname
)+(sizeof(long)*2);
3533 decrRefCount(subst
);
3535 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
3536 return lookupKeyRead(db
,&keyobj
);
3539 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
3540 * the additional parameter is not standard but a BSD-specific we have to
3541 * pass sorting parameters via the global 'server' structure */
3542 static int sortCompare(const void *s1
, const void *s2
) {
3543 const redisSortObject
*so1
= s1
, *so2
= s2
;
3546 if (!server
.sort_alpha
) {
3547 /* Numeric sorting. Here it's trivial as we precomputed scores */
3548 if (so1
->u
.score
> so2
->u
.score
) {
3550 } else if (so1
->u
.score
< so2
->u
.score
) {
3556 /* Alphanumeric sorting */
3557 if (server
.sort_bypattern
) {
3558 if (!so1
->u
.cmpobj
|| !so2
->u
.cmpobj
) {
3559 /* At least one compare object is NULL */
3560 if (so1
->u
.cmpobj
== so2
->u
.cmpobj
)
3562 else if (so1
->u
.cmpobj
== NULL
)
3567 /* We have both the objects, use strcoll */
3568 cmp
= strcoll(so1
->u
.cmpobj
->ptr
,so2
->u
.cmpobj
->ptr
);
3571 /* Compare elements directly */
3572 if (so1
->obj
->encoding
== REDIS_ENCODING_RAW
&&
3573 so2
->obj
->encoding
== REDIS_ENCODING_RAW
) {
3574 cmp
= strcoll(so1
->obj
->ptr
,so2
->obj
->ptr
);
3578 dec1
= so1
->obj
->encoding
== REDIS_ENCODING_RAW
?
3579 so1
->obj
: getDecodedObject(so1
->obj
);
3580 dec2
= so2
->obj
->encoding
== REDIS_ENCODING_RAW
?
3581 so2
->obj
: getDecodedObject(so2
->obj
);
3582 cmp
= strcoll(dec1
->ptr
,dec2
->ptr
);
3583 if (dec1
!= so1
->obj
) decrRefCount(dec1
);
3584 if (dec2
!= so2
->obj
) decrRefCount(dec2
);
3588 return server
.sort_desc
? -cmp
: cmp
;
3591 /* The SORT command is the most complex command in Redis. Warning: this code
3592 * is optimized for speed and a bit less for readability */
3593 static void sortCommand(redisClient
*c
) {
3596 int desc
= 0, alpha
= 0;
3597 int limit_start
= 0, limit_count
= -1, start
, end
;
3598 int j
, dontsort
= 0, vectorlen
;
3599 int getop
= 0; /* GET operation counter */
3600 robj
*sortval
, *sortby
= NULL
;
3601 redisSortObject
*vector
; /* Resulting vector to sort */
3603 /* Lookup the key to sort. It must be of the right types */
3604 sortval
= lookupKeyRead(c
->db
,c
->argv
[1]);
3605 if (sortval
== NULL
) {
3606 addReply(c
,shared
.nokeyerr
);
3609 if (sortval
->type
!= REDIS_SET
&& sortval
->type
!= REDIS_LIST
) {
3610 addReply(c
,shared
.wrongtypeerr
);
3614 /* Create a list of operations to perform for every sorted element.
3615 * Operations can be GET/DEL/INCR/DECR */
3616 operations
= listCreate();
3617 listSetFreeMethod(operations
,zfree
);
3620 /* Now we need to protect sortval incrementing its count, in the future
3621 * SORT may have options able to overwrite/delete keys during the sorting
3622 * and the sorted key itself may get destroied */
3623 incrRefCount(sortval
);
3625 /* The SORT command has an SQL-alike syntax, parse it */
3626 while(j
< c
->argc
) {
3627 int leftargs
= c
->argc
-j
-1;
3628 if (!strcasecmp(c
->argv
[j
]->ptr
,"asc")) {
3630 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"desc")) {
3632 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"alpha")) {
3634 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"limit") && leftargs
>= 2) {
3635 limit_start
= atoi(c
->argv
[j
+1]->ptr
);
3636 limit_count
= atoi(c
->argv
[j
+2]->ptr
);
3638 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"by") && leftargs
>= 1) {
3639 sortby
= c
->argv
[j
+1];
3640 /* If the BY pattern does not contain '*', i.e. it is constant,
3641 * we don't need to sort nor to lookup the weight keys. */
3642 if (strchr(c
->argv
[j
+1]->ptr
,'*') == NULL
) dontsort
= 1;
3644 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"get") && leftargs
>= 1) {
3645 listAddNodeTail(operations
,createSortOperation(
3646 REDIS_SORT_GET
,c
->argv
[j
+1]));
3649 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"del") && leftargs
>= 1) {
3650 listAddNodeTail(operations
,createSortOperation(
3651 REDIS_SORT_DEL
,c
->argv
[j
+1]));
3653 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"incr") && leftargs
>= 1) {
3654 listAddNodeTail(operations
,createSortOperation(
3655 REDIS_SORT_INCR
,c
->argv
[j
+1]));
3657 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"get") && leftargs
>= 1) {
3658 listAddNodeTail(operations
,createSortOperation(
3659 REDIS_SORT_DECR
,c
->argv
[j
+1]));
3662 decrRefCount(sortval
);
3663 listRelease(operations
);
3664 addReply(c
,shared
.syntaxerr
);
3670 /* Load the sorting vector with all the objects to sort */
3671 vectorlen
= (sortval
->type
== REDIS_LIST
) ?
3672 listLength((list
*)sortval
->ptr
) :
3673 dictSize((dict
*)sortval
->ptr
);
3674 vector
= zmalloc(sizeof(redisSortObject
)*vectorlen
);
3675 if (!vector
) oom("allocating objects vector for SORT");
3677 if (sortval
->type
== REDIS_LIST
) {
3678 list
*list
= sortval
->ptr
;
3682 while((ln
= listYield(list
))) {
3683 robj
*ele
= ln
->value
;
3684 vector
[j
].obj
= ele
;
3685 vector
[j
].u
.score
= 0;
3686 vector
[j
].u
.cmpobj
= NULL
;
3690 dict
*set
= sortval
->ptr
;
3694 di
= dictGetIterator(set
);
3695 if (!di
) oom("dictGetIterator");
3696 while((setele
= dictNext(di
)) != NULL
) {
3697 vector
[j
].obj
= dictGetEntryKey(setele
);
3698 vector
[j
].u
.score
= 0;
3699 vector
[j
].u
.cmpobj
= NULL
;
3702 dictReleaseIterator(di
);
3704 assert(j
== vectorlen
);
3706 /* Now it's time to load the right scores in the sorting vector */
3707 if (dontsort
== 0) {
3708 for (j
= 0; j
< vectorlen
; j
++) {
3712 byval
= lookupKeyByPattern(c
->db
,sortby
,vector
[j
].obj
);
3713 if (!byval
|| byval
->type
!= REDIS_STRING
) continue;
3715 if (byval
->encoding
== REDIS_ENCODING_RAW
) {
3716 vector
[j
].u
.cmpobj
= byval
;
3717 incrRefCount(byval
);
3719 vector
[j
].u
.cmpobj
= getDecodedObject(byval
);
3722 if (byval
->encoding
== REDIS_ENCODING_RAW
) {
3723 vector
[j
].u
.score
= strtod(byval
->ptr
,NULL
);
3725 if (byval
->encoding
== REDIS_ENCODING_INT
)
3726 vector
[j
].u
.score
= (long)byval
->ptr
;
3733 if (vector
[j
].obj
->encoding
== REDIS_ENCODING_RAW
)
3734 vector
[j
].u
.score
= strtod(vector
[j
].obj
->ptr
,NULL
);
3736 if (vector
[j
].obj
->encoding
== REDIS_ENCODING_INT
)
3737 vector
[j
].u
.score
= (long) vector
[j
].obj
->ptr
;
3746 /* We are ready to sort the vector... perform a bit of sanity check
3747 * on the LIMIT option too. We'll use a partial version of quicksort. */
3748 start
= (limit_start
< 0) ? 0 : limit_start
;
3749 end
= (limit_count
< 0) ? vectorlen
-1 : start
+limit_count
-1;
3750 if (start
>= vectorlen
) {
3751 start
= vectorlen
-1;
3754 if (end
>= vectorlen
) end
= vectorlen
-1;
3756 if (dontsort
== 0) {
3757 server
.sort_desc
= desc
;
3758 server
.sort_alpha
= alpha
;
3759 server
.sort_bypattern
= sortby
? 1 : 0;
3760 if (sortby
&& (start
!= 0 || end
!= vectorlen
-1))
3761 pqsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
, start
,end
);
3763 qsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
);
3766 /* Send command output to the output buffer, performing the specified
3767 * GET/DEL/INCR/DECR operations if any. */
3768 outputlen
= getop
? getop
*(end
-start
+1) : end
-start
+1;
3769 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",outputlen
));
3770 for (j
= start
; j
<= end
; j
++) {
3773 addReplyBulkLen(c
,vector
[j
].obj
);
3774 addReply(c
,vector
[j
].obj
);
3775 addReply(c
,shared
.crlf
);
3777 listRewind(operations
);
3778 while((ln
= listYield(operations
))) {
3779 redisSortOperation
*sop
= ln
->value
;
3780 robj
*val
= lookupKeyByPattern(c
->db
,sop
->pattern
,
3783 if (sop
->type
== REDIS_SORT_GET
) {
3784 if (!val
|| val
->type
!= REDIS_STRING
) {
3785 addReply(c
,shared
.nullbulk
);
3787 addReplyBulkLen(c
,val
);
3789 addReply(c
,shared
.crlf
);
3791 } else if (sop
->type
== REDIS_SORT_DEL
) {
3798 decrRefCount(sortval
);
3799 listRelease(operations
);
3800 for (j
= 0; j
< vectorlen
; j
++) {
3801 if (sortby
&& alpha
&& vector
[j
].u
.cmpobj
)
3802 decrRefCount(vector
[j
].u
.cmpobj
);
3807 static void infoCommand(redisClient
*c
) {
3809 time_t uptime
= time(NULL
)-server
.stat_starttime
;
3812 info
= sdscatprintf(sdsempty(),
3813 "redis_version:%s\r\n"
3814 "uptime_in_seconds:%d\r\n"
3815 "uptime_in_days:%d\r\n"
3816 "connected_clients:%d\r\n"
3817 "connected_slaves:%d\r\n"
3818 "used_memory:%zu\r\n"
3819 "changes_since_last_save:%lld\r\n"
3820 "bgsave_in_progress:%d\r\n"
3821 "last_save_time:%d\r\n"
3822 "total_connections_received:%lld\r\n"
3823 "total_commands_processed:%lld\r\n"
3828 listLength(server
.clients
)-listLength(server
.slaves
),
3829 listLength(server
.slaves
),
3832 server
.bgsaveinprogress
,
3834 server
.stat_numconnections
,
3835 server
.stat_numcommands
,
3836 server
.masterhost
== NULL
? "master" : "slave"
3838 if (server
.masterhost
) {
3839 info
= sdscatprintf(info
,
3840 "master_host:%s\r\n"
3841 "master_port:%d\r\n"
3842 "master_link_status:%s\r\n"
3843 "master_last_io_seconds_ago:%d\r\n"
3846 (server
.replstate
== REDIS_REPL_CONNECTED
) ?
3848 (int)(time(NULL
)-server
.master
->lastinteraction
)
3851 for (j
= 0; j
< server
.dbnum
; j
++) {
3852 long long keys
, vkeys
;
3854 keys
= dictSize(server
.db
[j
].dict
);
3855 vkeys
= dictSize(server
.db
[j
].expires
);
3856 if (keys
|| vkeys
) {
3857 info
= sdscatprintf(info
, "db%d: keys=%lld,expires=%lld\r\n",
3861 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info
)));
3862 addReplySds(c
,info
);
3863 addReply(c
,shared
.crlf
);
3866 static void monitorCommand(redisClient
*c
) {
3867 /* ignore MONITOR if aleady slave or in monitor mode */
3868 if (c
->flags
& REDIS_SLAVE
) return;
3870 c
->flags
|= (REDIS_SLAVE
|REDIS_MONITOR
);
3872 if (!listAddNodeTail(server
.monitors
,c
)) oom("listAddNodeTail");
3873 addReply(c
,shared
.ok
);
3876 /* ================================= Expire ================================= */
3877 static int removeExpire(redisDb
*db
, robj
*key
) {
3878 if (dictDelete(db
->expires
,key
) == DICT_OK
) {
3885 static int setExpire(redisDb
*db
, robj
*key
, time_t when
) {
3886 if (dictAdd(db
->expires
,key
,(void*)when
) == DICT_ERR
) {
3894 /* Return the expire time of the specified key, or -1 if no expire
3895 * is associated with this key (i.e. the key is non volatile) */
3896 static time_t getExpire(redisDb
*db
, robj
*key
) {
3899 /* No expire? return ASAP */
3900 if (dictSize(db
->expires
) == 0 ||
3901 (de
= dictFind(db
->expires
,key
)) == NULL
) return -1;
3903 return (time_t) dictGetEntryVal(de
);
3906 static int expireIfNeeded(redisDb
*db
, robj
*key
) {
3910 /* No expire? return ASAP */
3911 if (dictSize(db
->expires
) == 0 ||
3912 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
3914 /* Lookup the expire */
3915 when
= (time_t) dictGetEntryVal(de
);
3916 if (time(NULL
) <= when
) return 0;
3918 /* Delete the key */
3919 dictDelete(db
->expires
,key
);
3920 return dictDelete(db
->dict
,key
) == DICT_OK
;
3923 static int deleteIfVolatile(redisDb
*db
, robj
*key
) {
3926 /* No expire? return ASAP */
3927 if (dictSize(db
->expires
) == 0 ||
3928 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
3930 /* Delete the key */
3932 dictDelete(db
->expires
,key
);
3933 return dictDelete(db
->dict
,key
) == DICT_OK
;
3936 static void expireCommand(redisClient
*c
) {
3938 int seconds
= atoi(c
->argv
[2]->ptr
);
3940 de
= dictFind(c
->db
->dict
,c
->argv
[1]);
3942 addReply(c
,shared
.czero
);
3946 addReply(c
, shared
.czero
);
3949 time_t when
= time(NULL
)+seconds
;
3950 if (setExpire(c
->db
,c
->argv
[1],when
)) {
3951 addReply(c
,shared
.cone
);
3954 addReply(c
,shared
.czero
);
3960 static void ttlCommand(redisClient
*c
) {
3964 expire
= getExpire(c
->db
,c
->argv
[1]);
3966 ttl
= (int) (expire
-time(NULL
));
3967 if (ttl
< 0) ttl
= -1;
3969 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",ttl
));
3972 /* =============================== Replication ============================= */
3974 static int syncWrite(int fd
, char *ptr
, ssize_t size
, int timeout
) {
3975 ssize_t nwritten
, ret
= size
;
3976 time_t start
= time(NULL
);
3980 if (aeWait(fd
,AE_WRITABLE
,1000) & AE_WRITABLE
) {
3981 nwritten
= write(fd
,ptr
,size
);
3982 if (nwritten
== -1) return -1;
3986 if ((time(NULL
)-start
) > timeout
) {
3994 static int syncRead(int fd
, char *ptr
, ssize_t size
, int timeout
) {
3995 ssize_t nread
, totread
= 0;
3996 time_t start
= time(NULL
);
4000 if (aeWait(fd
,AE_READABLE
,1000) & AE_READABLE
) {
4001 nread
= read(fd
,ptr
,size
);
4002 if (nread
== -1) return -1;
4007 if ((time(NULL
)-start
) > timeout
) {
4015 static int syncReadLine(int fd
, char *ptr
, ssize_t size
, int timeout
) {
4022 if (syncRead(fd
,&c
,1,timeout
) == -1) return -1;
4025 if (nread
&& *(ptr
-1) == '\r') *(ptr
-1) = '\0';
4036 static void syncCommand(redisClient
*c
) {
4037 /* ignore SYNC if aleady slave or in monitor mode */
4038 if (c
->flags
& REDIS_SLAVE
) return;
4040 /* SYNC can't be issued when the server has pending data to send to
4041 * the client about already issued commands. We need a fresh reply
4042 * buffer registering the differences between the BGSAVE and the current
4043 * dataset, so that we can copy to other slaves if needed. */
4044 if (listLength(c
->reply
) != 0) {
4045 addReplySds(c
,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
4049 redisLog(REDIS_NOTICE
,"Slave ask for synchronization");
4050 /* Here we need to check if there is a background saving operation
4051 * in progress, or if it is required to start one */
4052 if (server
.bgsaveinprogress
) {
4053 /* Ok a background save is in progress. Let's check if it is a good
4054 * one for replication, i.e. if there is another slave that is
4055 * registering differences since the server forked to save */
4059 listRewind(server
.slaves
);
4060 while((ln
= listYield(server
.slaves
))) {
4062 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) break;
4065 /* Perfect, the server is already registering differences for
4066 * another slave. Set the right state, and copy the buffer. */
4067 listRelease(c
->reply
);
4068 c
->reply
= listDup(slave
->reply
);
4069 if (!c
->reply
) oom("listDup copying slave reply list");
4070 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
4071 redisLog(REDIS_NOTICE
,"Waiting for end of BGSAVE for SYNC");
4073 /* No way, we need to wait for the next BGSAVE in order to
4074 * register differences */
4075 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_START
;
4076 redisLog(REDIS_NOTICE
,"Waiting for next BGSAVE for SYNC");
4079 /* Ok we don't have a BGSAVE in progress, let's start one */
4080 redisLog(REDIS_NOTICE
,"Starting BGSAVE for SYNC");
4081 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
4082 redisLog(REDIS_NOTICE
,"Replication failed, can't BGSAVE");
4083 addReplySds(c
,sdsnew("-ERR Unalbe to perform background save\r\n"));
4086 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
4089 c
->flags
|= REDIS_SLAVE
;
4091 if (!listAddNodeTail(server
.slaves
,c
)) oom("listAddNodeTail");
4095 static void sendBulkToSlave(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
4096 redisClient
*slave
= privdata
;
4098 REDIS_NOTUSED(mask
);
4099 char buf
[REDIS_IOBUF_LEN
];
4100 ssize_t nwritten
, buflen
;
4102 if (slave
->repldboff
== 0) {
4103 /* Write the bulk write count before to transfer the DB. In theory here
4104 * we don't know how much room there is in the output buffer of the
4105 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
4106 * operations) will never be smaller than the few bytes we need. */
4109 bulkcount
= sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
4111 if (write(fd
,bulkcount
,sdslen(bulkcount
)) != (signed)sdslen(bulkcount
))
4119 lseek(slave
->repldbfd
,slave
->repldboff
,SEEK_SET
);
4120 buflen
= read(slave
->repldbfd
,buf
,REDIS_IOBUF_LEN
);
4122 redisLog(REDIS_WARNING
,"Read error sending DB to slave: %s",
4123 (buflen
== 0) ? "premature EOF" : strerror(errno
));
4127 if ((nwritten
= write(fd
,buf
,buflen
)) == -1) {
4128 redisLog(REDIS_DEBUG
,"Write error sending DB to slave: %s",
4133 slave
->repldboff
+= nwritten
;
4134 if (slave
->repldboff
== slave
->repldbsize
) {
4135 close(slave
->repldbfd
);
4136 slave
->repldbfd
= -1;
4137 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
4138 slave
->replstate
= REDIS_REPL_ONLINE
;
4139 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
,
4140 sendReplyToClient
, slave
, NULL
) == AE_ERR
) {
4144 addReplySds(slave
,sdsempty());
4145 redisLog(REDIS_NOTICE
,"Synchronization with slave succeeded");
4149 /* This function is called at the end of every backgrond saving.
4150 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
4151 * otherwise REDIS_ERR is passed to the function.
4153 * The goal of this function is to handle slaves waiting for a successful
4154 * background saving in order to perform non-blocking synchronization. */
4155 static void updateSlavesWaitingBgsave(int bgsaveerr
) {
4157 int startbgsave
= 0;
4159 listRewind(server
.slaves
);
4160 while((ln
= listYield(server
.slaves
))) {
4161 redisClient
*slave
= ln
->value
;
4163 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) {
4165 slave
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
4166 } else if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) {
4167 struct redis_stat buf
;
4169 if (bgsaveerr
!= REDIS_OK
) {
4171 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE child returned an error");
4174 if ((slave
->repldbfd
= open(server
.dbfilename
,O_RDONLY
)) == -1 ||
4175 redis_fstat(slave
->repldbfd
,&buf
) == -1) {
4177 redisLog(REDIS_WARNING
,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno
));
4180 slave
->repldboff
= 0;
4181 slave
->repldbsize
= buf
.st_size
;
4182 slave
->replstate
= REDIS_REPL_SEND_BULK
;
4183 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
4184 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
, sendBulkToSlave
, slave
, NULL
) == AE_ERR
) {
4191 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
4192 listRewind(server
.slaves
);
4193 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE failed");
4194 while((ln
= listYield(server
.slaves
))) {
4195 redisClient
*slave
= ln
->value
;
4197 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
)
4204 static int syncWithMaster(void) {
4205 char buf
[1024], tmpfile
[256];
4207 int fd
= anetTcpConnect(NULL
,server
.masterhost
,server
.masterport
);
4211 redisLog(REDIS_WARNING
,"Unable to connect to MASTER: %s",
4215 /* Issue the SYNC command */
4216 if (syncWrite(fd
,"SYNC \r\n",7,5) == -1) {
4218 redisLog(REDIS_WARNING
,"I/O error writing to MASTER: %s",
4222 /* Read the bulk write count */
4223 if (syncReadLine(fd
,buf
,1024,3600) == -1) {
4225 redisLog(REDIS_WARNING
,"I/O error reading bulk count from MASTER: %s",
4229 dumpsize
= atoi(buf
+1);
4230 redisLog(REDIS_NOTICE
,"Receiving %d bytes data dump from MASTER",dumpsize
);
4231 /* Read the bulk write data on a temp file */
4232 snprintf(tmpfile
,256,"temp-%d.%ld.rdb",(int)time(NULL
),(long int)random());
4233 dfd
= open(tmpfile
,O_CREAT
|O_WRONLY
,0644);
4236 redisLog(REDIS_WARNING
,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno
));
4240 int nread
, nwritten
;
4242 nread
= read(fd
,buf
,(dumpsize
< 1024)?dumpsize
:1024);
4244 redisLog(REDIS_WARNING
,"I/O error trying to sync with MASTER: %s",
4250 nwritten
= write(dfd
,buf
,nread
);
4251 if (nwritten
== -1) {
4252 redisLog(REDIS_WARNING
,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno
));
4260 if (rename(tmpfile
,server
.dbfilename
) == -1) {
4261 redisLog(REDIS_WARNING
,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno
));
4267 if (rdbLoad(server
.dbfilename
) != REDIS_OK
) {
4268 redisLog(REDIS_WARNING
,"Failed trying to load the MASTER synchronization DB from disk");
4272 server
.master
= createClient(fd
);
4273 server
.master
->flags
|= REDIS_MASTER
;
4274 server
.replstate
= REDIS_REPL_CONNECTED
;
4278 static void slaveofCommand(redisClient
*c
) {
4279 if (!strcasecmp(c
->argv
[1]->ptr
,"no") &&
4280 !strcasecmp(c
->argv
[2]->ptr
,"one")) {
4281 if (server
.masterhost
) {
4282 sdsfree(server
.masterhost
);
4283 server
.masterhost
= NULL
;
4284 if (server
.master
) freeClient(server
.master
);
4285 server
.replstate
= REDIS_REPL_NONE
;
4286 redisLog(REDIS_NOTICE
,"MASTER MODE enabled (user request)");
4289 sdsfree(server
.masterhost
);
4290 server
.masterhost
= sdsdup(c
->argv
[1]->ptr
);
4291 server
.masterport
= atoi(c
->argv
[2]->ptr
);
4292 if (server
.master
) freeClient(server
.master
);
4293 server
.replstate
= REDIS_REPL_CONNECT
;
4294 redisLog(REDIS_NOTICE
,"SLAVE OF %s:%d enabled (user request)",
4295 server
.masterhost
, server
.masterport
);
4297 addReply(c
,shared
.ok
);
4300 /* ============================ Maxmemory directive ======================== */
4302 /* This function gets called when 'maxmemory' is set on the config file to limit
4303 * the max memory used by the server, and we are out of memory.
4304 * This function will try to, in order:
4306 * - Free objects from the free list
4307 * - Try to remove keys with an EXPIRE set
4309 * It is not possible to free enough memory to reach used-memory < maxmemory
4310 * the server will start refusing commands that will enlarge even more the
4313 static void freeMemoryIfNeeded(void) {
4314 while (server
.maxmemory
&& zmalloc_used_memory() > server
.maxmemory
) {
4315 if (listLength(server
.objfreelist
)) {
4318 listNode
*head
= listFirst(server
.objfreelist
);
4319 o
= listNodeValue(head
);
4320 listDelNode(server
.objfreelist
,head
);
4323 int j
, k
, freed
= 0;
4325 for (j
= 0; j
< server
.dbnum
; j
++) {
4327 robj
*minkey
= NULL
;
4328 struct dictEntry
*de
;
4330 if (dictSize(server
.db
[j
].expires
)) {
4332 /* From a sample of three keys drop the one nearest to
4333 * the natural expire */
4334 for (k
= 0; k
< 3; k
++) {
4337 de
= dictGetRandomKey(server
.db
[j
].expires
);
4338 t
= (time_t) dictGetEntryVal(de
);
4339 if (minttl
== -1 || t
< minttl
) {
4340 minkey
= dictGetEntryKey(de
);
4344 deleteKey(server
.db
+j
,minkey
);
4347 if (!freed
) return; /* nothing to free... */
4352 /* ================================= Debugging ============================== */
4354 static void debugCommand(redisClient
*c
) {
4355 if (!strcasecmp(c
->argv
[1]->ptr
,"segfault")) {
4357 } else if (!strcasecmp(c
->argv
[1]->ptr
,"object") && c
->argc
== 3) {
4358 dictEntry
*de
= dictFind(c
->db
->dict
,c
->argv
[2]);
4362 addReply(c
,shared
.nokeyerr
);
4365 key
= dictGetEntryKey(de
);
4366 val
= dictGetEntryVal(de
);
4367 addReplySds(c
,sdscatprintf(sdsempty(),
4368 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
4369 key
, key
->refcount
, val
, val
->refcount
, val
->encoding
));
4371 addReplySds(c
,sdsnew(
4372 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>]\r\n"));
4376 #ifdef HAVE_BACKTRACE
4377 static struct redisFunctionSym symsTable
[] = {
4378 {"dictEncObjKeyCompare", (unsigned long)dictEncObjKeyCompare
},
4379 {"dictEncObjHash", (unsigned long)dictEncObjHash
},
4380 {"incrDecrCommand", (unsigned long)incrDecrCommand
},
4381 {"freeStringObject", (unsigned long)freeStringObject
},
4382 {"freeListObject", (unsigned long)freeListObject
},
4383 {"freeSetObject", (unsigned long)freeSetObject
},
4384 {"decrRefCount", (unsigned long)decrRefCount
},
4385 {"createObject", (unsigned long)createObject
},
4386 {"freeClient", (unsigned long)freeClient
},
4387 {"rdbLoad", (unsigned long)rdbLoad
},
4388 {"rdbSaveStringObject", (unsigned long)rdbSaveStringObject
},
4389 {"rdbSaveStringObjectRaw", (unsigned long)rdbSaveStringObjectRaw
},
4390 {"addReply", (unsigned long)addReply
},
4391 {"addReplySds", (unsigned long)addReplySds
},
4392 {"incrRefCount", (unsigned long)incrRefCount
},
4393 {"rdbSaveBackground", (unsigned long)rdbSaveBackground
},
4394 {"createStringObject", (unsigned long)createStringObject
},
4395 {"replicationFeedSlaves", (unsigned long)replicationFeedSlaves
},
4396 {"syncWithMaster", (unsigned long)syncWithMaster
},
4397 {"tryObjectSharing", (unsigned long)tryObjectSharing
},
4398 {"tryObjectEncoding", (unsigned long)tryObjectEncoding
},
4399 {"getDecodedObject", (unsigned long)getDecodedObject
},
4400 {"removeExpire", (unsigned long)removeExpire
},
4401 {"expireIfNeeded", (unsigned long)expireIfNeeded
},
4402 {"deleteIfVolatile", (unsigned long)deleteIfVolatile
},
4403 {"deleteKey", (unsigned long)deleteKey
},
4404 {"getExpire", (unsigned long)getExpire
},
4405 {"setExpire", (unsigned long)setExpire
},
4406 {"updateSlavesWaitingBgsave", (unsigned long)updateSlavesWaitingBgsave
},
4407 {"freeMemoryIfNeeded", (unsigned long)freeMemoryIfNeeded
},
4408 {"authCommand", (unsigned long)authCommand
},
4409 {"pingCommand", (unsigned long)pingCommand
},
4410 {"echoCommand", (unsigned long)echoCommand
},
4411 {"setCommand", (unsigned long)setCommand
},
4412 {"setnxCommand", (unsigned long)setnxCommand
},
4413 {"getCommand", (unsigned long)getCommand
},
4414 {"delCommand", (unsigned long)delCommand
},
4415 {"existsCommand", (unsigned long)existsCommand
},
4416 {"incrCommand", (unsigned long)incrCommand
},
4417 {"decrCommand", (unsigned long)decrCommand
},
4418 {"incrbyCommand", (unsigned long)incrbyCommand
},
4419 {"decrbyCommand", (unsigned long)decrbyCommand
},
4420 {"selectCommand", (unsigned long)selectCommand
},
4421 {"randomkeyCommand", (unsigned long)randomkeyCommand
},
4422 {"keysCommand", (unsigned long)keysCommand
},
4423 {"dbsizeCommand", (unsigned long)dbsizeCommand
},
4424 {"lastsaveCommand", (unsigned long)lastsaveCommand
},
4425 {"saveCommand", (unsigned long)saveCommand
},
4426 {"bgsaveCommand", (unsigned long)bgsaveCommand
},
4427 {"shutdownCommand", (unsigned long)shutdownCommand
},
4428 {"moveCommand", (unsigned long)moveCommand
},
4429 {"renameCommand", (unsigned long)renameCommand
},
4430 {"renamenxCommand", (unsigned long)renamenxCommand
},
4431 {"lpushCommand", (unsigned long)lpushCommand
},
4432 {"rpushCommand", (unsigned long)rpushCommand
},
4433 {"lpopCommand", (unsigned long)lpopCommand
},
4434 {"rpopCommand", (unsigned long)rpopCommand
},
4435 {"llenCommand", (unsigned long)llenCommand
},
4436 {"lindexCommand", (unsigned long)lindexCommand
},
4437 {"lrangeCommand", (unsigned long)lrangeCommand
},
4438 {"ltrimCommand", (unsigned long)ltrimCommand
},
4439 {"typeCommand", (unsigned long)typeCommand
},
4440 {"lsetCommand", (unsigned long)lsetCommand
},
4441 {"saddCommand", (unsigned long)saddCommand
},
4442 {"sremCommand", (unsigned long)sremCommand
},
4443 {"smoveCommand", (unsigned long)smoveCommand
},
4444 {"sismemberCommand", (unsigned long)sismemberCommand
},
4445 {"scardCommand", (unsigned long)scardCommand
},
4446 {"spopCommand", (unsigned long)spopCommand
},
4447 {"sinterCommand", (unsigned long)sinterCommand
},
4448 {"sinterstoreCommand", (unsigned long)sinterstoreCommand
},
4449 {"sunionCommand", (unsigned long)sunionCommand
},
4450 {"sunionstoreCommand", (unsigned long)sunionstoreCommand
},
4451 {"sdiffCommand", (unsigned long)sdiffCommand
},
4452 {"sdiffstoreCommand", (unsigned long)sdiffstoreCommand
},
4453 {"syncCommand", (unsigned long)syncCommand
},
4454 {"flushdbCommand", (unsigned long)flushdbCommand
},
4455 {"flushallCommand", (unsigned long)flushallCommand
},
4456 {"sortCommand", (unsigned long)sortCommand
},
4457 {"lremCommand", (unsigned long)lremCommand
},
4458 {"infoCommand", (unsigned long)infoCommand
},
4459 {"mgetCommand", (unsigned long)mgetCommand
},
4460 {"monitorCommand", (unsigned long)monitorCommand
},
4461 {"expireCommand", (unsigned long)expireCommand
},
4462 {"getSetCommand", (unsigned long)getSetCommand
},
4463 {"ttlCommand", (unsigned long)ttlCommand
},
4464 {"slaveofCommand", (unsigned long)slaveofCommand
},
4465 {"debugCommand", (unsigned long)debugCommand
},
4466 {"processCommand", (unsigned long)processCommand
},
4467 {"setupSigSegvAction", (unsigned long)setupSigSegvAction
},
4468 {"readQueryFromClient", (unsigned long)readQueryFromClient
},
4469 {"rdbRemoveTempFile", (unsigned long)rdbRemoveTempFile
},
4473 /* This function try to convert a pointer into a function name. It's used in
4474 * oreder to provide a backtrace under segmentation fault that's able to
4475 * display functions declared as static (otherwise the backtrace is useless). */
4476 static char *findFuncName(void *pointer
, unsigned long *offset
){
4478 unsigned long off
, minoff
= 0;
4480 /* Try to match against the Symbol with the smallest offset */
4481 for (i
=0; symsTable
[i
].pointer
; i
++) {
4482 unsigned long lp
= (unsigned long) pointer
;
4484 if (lp
!= (unsigned long)-1 && lp
>= symsTable
[i
].pointer
) {
4485 off
=lp
-symsTable
[i
].pointer
;
4486 if (ret
< 0 || off
< minoff
) {
4492 if (ret
== -1) return NULL
;
4494 return symsTable
[ret
].name
;
4497 static void *getMcontextEip(ucontext_t
*uc
) {
4498 #if defined(__FreeBSD__)
4499 return (void*) uc
->uc_mcontext
.mc_eip
;
4500 #elif defined(__dietlibc__)
4501 return (void*) uc
->uc_mcontext
.eip
;
4502 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
4503 return (void*) uc
->uc_mcontext
->__ss
.__eip
;
4504 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
4505 #ifdef _STRUCT_X86_THREAD_STATE64
4506 return (void*) uc
->uc_mcontext
->__ss
.__rip
;
4508 return (void*) uc
->uc_mcontext
->__ss
.__eip
;
4510 #elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
4511 return (void*) uc
->uc_mcontext
.gregs
[REG_EIP
];
4512 #elif defined(__ia64__) /* Linux IA64 */
4513 return (void*) uc
->uc_mcontext
.sc_ip
;
4519 static void segvHandler(int sig
, siginfo_t
*info
, void *secret
) {
4521 char **messages
= NULL
;
4522 int i
, trace_size
= 0;
4523 unsigned long offset
=0;
4524 time_t uptime
= time(NULL
)-server
.stat_starttime
;
4525 ucontext_t
*uc
= (ucontext_t
*) secret
;
4526 REDIS_NOTUSED(info
);
4528 redisLog(REDIS_WARNING
,
4529 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION
, sig
);
4530 redisLog(REDIS_WARNING
, "%s", sdscatprintf(sdsempty(),
4531 "redis_version:%s; "
4532 "uptime_in_seconds:%d; "
4533 "connected_clients:%d; "
4534 "connected_slaves:%d; "
4536 "changes_since_last_save:%lld; "
4537 "bgsave_in_progress:%d; "
4538 "last_save_time:%d; "
4539 "total_connections_received:%lld; "
4540 "total_commands_processed:%lld; "
4544 listLength(server
.clients
)-listLength(server
.slaves
),
4545 listLength(server
.slaves
),
4548 server
.bgsaveinprogress
,
4550 server
.stat_numconnections
,
4551 server
.stat_numcommands
,
4552 server
.masterhost
== NULL
? "master" : "slave"
4555 trace_size
= backtrace(trace
, 100);
4556 /* overwrite sigaction with caller's address */
4557 if (getMcontextEip(uc
) != NULL
) {
4558 trace
[1] = getMcontextEip(uc
);
4560 messages
= backtrace_symbols(trace
, trace_size
);
4562 for (i
=1; i
<trace_size
; ++i
) {
4563 char *fn
= findFuncName(trace
[i
], &offset
), *p
;
4565 p
= strchr(messages
[i
],'+');
4566 if (!fn
|| (p
&& ((unsigned long)strtol(p
+1,NULL
,10)) < offset
)) {
4567 redisLog(REDIS_WARNING
,"%s", messages
[i
]);
4569 redisLog(REDIS_WARNING
,"%d redis-server %p %s + %d", i
, trace
[i
], fn
, (unsigned int)offset
);
4576 static void setupSigSegvAction(void) {
4577 struct sigaction act
;
4579 sigemptyset (&act
.sa_mask
);
4580 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
4581 * is used. Otherwise, sa_handler is used */
4582 act
.sa_flags
= SA_NODEFER
| SA_ONSTACK
| SA_RESETHAND
| SA_SIGINFO
;
4583 act
.sa_sigaction
= segvHandler
;
4584 sigaction (SIGSEGV
, &act
, NULL
);
4585 sigaction (SIGBUS
, &act
, NULL
);
4586 sigaction (SIGFPE
, &act
, NULL
);
4587 sigaction (SIGILL
, &act
, NULL
);
4588 sigaction (SIGBUS
, &act
, NULL
);
4591 #else /* HAVE_BACKTRACE */
4592 static void setupSigSegvAction(void) {
4594 #endif /* HAVE_BACKTRACE */
4596 /* =================================== Main! ================================ */
4599 int linuxOvercommitMemoryValue(void) {
4600 FILE *fp
= fopen("/proc/sys/vm/overcommit_memory","r");
4604 if (fgets(buf
,64,fp
) == NULL
) {
4613 void linuxOvercommitMemoryWarning(void) {
4614 if (linuxOvercommitMemoryValue() == 0) {
4615 redisLog(REDIS_WARNING
,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
4618 #endif /* __linux__ */
4620 static void daemonize(void) {
4624 if (fork() != 0) exit(0); /* parent exits */
4625 setsid(); /* create a new session */
4627 /* Every output goes to /dev/null. If Redis is daemonized but
4628 * the 'logfile' is set to 'stdout' in the configuration file
4629 * it will not log at all. */
4630 if ((fd
= open("/dev/null", O_RDWR
, 0)) != -1) {
4631 dup2(fd
, STDIN_FILENO
);
4632 dup2(fd
, STDOUT_FILENO
);
4633 dup2(fd
, STDERR_FILENO
);
4634 if (fd
> STDERR_FILENO
) close(fd
);
4636 /* Try to write the pid file */
4637 fp
= fopen(server
.pidfile
,"w");
4639 fprintf(fp
,"%d\n",getpid());
4644 int main(int argc
, char **argv
) {
4647 ResetServerSaveParams();
4648 loadServerConfig(argv
[1]);
4649 } else if (argc
> 2) {
4650 fprintf(stderr
,"Usage: ./redis-server [/path/to/redis.conf]\n");
4653 redisLog(REDIS_WARNING
,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
4656 if (server
.daemonize
) daemonize();
4657 redisLog(REDIS_NOTICE
,"Server started, Redis version " REDIS_VERSION
);
4659 linuxOvercommitMemoryWarning();
4661 if (rdbLoad(server
.dbfilename
) == REDIS_OK
)
4662 redisLog(REDIS_NOTICE
,"DB loaded from disk");
4663 if (aeCreateFileEvent(server
.el
, server
.fd
, AE_READABLE
,
4664 acceptHandler
, NULL
, NULL
) == AE_ERR
) oom("creating file event");
4665 redisLog(REDIS_NOTICE
,"The server is now ready to accept connections on port %d", server
.port
);
4667 aeDeleteEventLoop(server
.el
);