2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
30 #define REDIS_VERSION "1.001"
40 #define __USE_POSIX199309
46 #endif /* HAVE_BACKTRACE */
54 #include <arpa/inet.h>
58 #include <sys/resource.h>
62 #include "ae.h" /* Event driven programming library */
63 #include "sds.h" /* Dynamic safe strings */
64 #include "anet.h" /* Networking the easy way */
65 #include "dict.h" /* Hash tables */
66 #include "adlist.h" /* Linked lists */
67 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
68 #include "lzf.h" /* LZF compression library */
69 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
75 /* Static server configuration */
76 #define REDIS_SERVERPORT 6379 /* TCP port */
77 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
78 #define REDIS_IOBUF_LEN 1024
79 #define REDIS_LOADBUF_LEN 1024
80 #define REDIS_STATIC_ARGS 4
81 #define REDIS_DEFAULT_DBNUM 16
82 #define REDIS_CONFIGLINE_MAX 1024
83 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
84 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
85 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
86 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
87 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
89 /* Hash table parameters */
90 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
93 #define REDIS_CMD_BULK 1 /* Bulk write command */
94 #define REDIS_CMD_INLINE 2 /* Inline command */
95 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
96 this flags will return an error when the 'maxmemory' option is set in the
97 config file and the server is using more than maxmemory bytes of memory.
98 In short this commands are denied on low memory conditions. */
99 #define REDIS_CMD_DENYOOM 4
102 #define REDIS_STRING 0
107 /* Objects encoding */
108 #define REDIS_ENCODING_RAW 0 /* Raw representation */
109 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
111 /* Object types only used for dumping to disk */
112 #define REDIS_EXPIRETIME 253
113 #define REDIS_SELECTDB 254
114 #define REDIS_EOF 255
116 /* Defines related to the dump file format. To store 32 bits lengths for short
117 * keys requires a lot of space, so we check the most significant 2 bits of
118 * the first byte to interpreter the length:
120 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
121 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
122 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
123 * 11|000000 this means: specially encoded object will follow. The six bits
124 * number specify the kind of object that follows.
125 * See the REDIS_RDB_ENC_* defines.
127 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
128 * values, will fit inside. */
129 #define REDIS_RDB_6BITLEN 0
130 #define REDIS_RDB_14BITLEN 1
131 #define REDIS_RDB_32BITLEN 2
132 #define REDIS_RDB_ENCVAL 3
133 #define REDIS_RDB_LENERR UINT_MAX
135 /* When a length of a string object stored on disk has the first two bits
136 * set, the remaining two bits specify a special encoding for the object
137 * accordingly to the following defines: */
138 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
139 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
140 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
141 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
144 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
145 #define REDIS_SLAVE 2 /* This client is a slave server */
146 #define REDIS_MASTER 4 /* This client is a master server */
147 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
149 /* Slave replication state - slave side */
150 #define REDIS_REPL_NONE 0 /* No active replication */
151 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
152 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
154 /* Slave replication state - from the point of view of master
155 * Note that in SEND_BULK and ONLINE state the slave receives new updates
156 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
157 * to start the next background saving in order to send updates to it. */
158 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
159 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
160 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
161 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
163 /* List related stuff */
167 /* Sort operations */
168 #define REDIS_SORT_GET 0
169 #define REDIS_SORT_DEL 1
170 #define REDIS_SORT_INCR 2
171 #define REDIS_SORT_DECR 3
172 #define REDIS_SORT_ASC 4
173 #define REDIS_SORT_DESC 5
174 #define REDIS_SORTKEY_MAX 1024
177 #define REDIS_DEBUG 0
178 #define REDIS_NOTICE 1
179 #define REDIS_WARNING 2
181 /* Anti-warning macro... */
182 #define REDIS_NOTUSED(V) ((void) V)
185 /*================================= Data types ============================== */
187 /* A redis object, that is a type able to hold a string / list / set */
188 typedef struct redisObject
{
191 unsigned char encoding
;
192 unsigned char notused
[2];
196 typedef struct redisDb
{
202 /* With multiplexing we need to take per-clinet state.
203 * Clients are taken in a liked list. */
204 typedef struct redisClient
{
209 robj
**argv
, **mbargv
;
211 int bulklen
; /* bulk read len. -1 if not in bulk read mode */
212 int multibulk
; /* multi bulk command format active */
215 time_t lastinteraction
; /* time of the last interaction, used for timeout */
216 int flags
; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
217 int slaveseldb
; /* slave selected db, if this client is a slave */
218 int authenticated
; /* when requirepass is non-NULL */
219 int replstate
; /* replication state if this is a slave */
220 int repldbfd
; /* replication DB file descriptor */
221 long repldboff
; /* replication DB file offset */
222 off_t repldbsize
; /* replication DB file size */
230 /* Global server state structure */
236 unsigned int sharingpoolsize
;
237 long long dirty
; /* changes to DB from the last save */
239 list
*slaves
, *monitors
;
240 char neterr
[ANET_ERR_LEN
];
242 int cronloops
; /* number of times the cron function run */
243 list
*objfreelist
; /* A list of freed objects to avoid malloc() */
244 time_t lastsave
; /* Unix time of last save succeeede */
245 size_t usedmemory
; /* Used memory in megabytes */
246 /* Fields used only for stats */
247 time_t stat_starttime
; /* server start time */
248 long long stat_numcommands
; /* number of processed commands */
249 long long stat_numconnections
; /* number of connections received */
257 int bgsaveinprogress
;
258 pid_t bgsavechildpid
;
259 struct saveparam
*saveparams
;
266 /* Replication related */
270 redisClient
*master
; /* client that is master for this slave */
272 unsigned int maxclients
;
273 unsigned long maxmemory
;
274 /* Sort parameters - qsort_r() is only available under BSD so we
275 * have to take this state global, in order to pass it to sortCompare() */
281 typedef void redisCommandProc(redisClient
*c
);
282 struct redisCommand
{
284 redisCommandProc
*proc
;
289 struct redisFunctionSym
{
291 unsigned long pointer
;
294 typedef struct _redisSortObject
{
302 typedef struct _redisSortOperation
{
305 } redisSortOperation
;
307 struct sharedObjectsStruct
{
308 robj
*crlf
, *ok
, *err
, *emptybulk
, *czero
, *cone
, *pong
, *space
,
309 *colon
, *nullbulk
, *nullmultibulk
,
310 *emptymultibulk
, *wrongtypeerr
, *nokeyerr
, *syntaxerr
, *sameobjecterr
,
311 *outofrangeerr
, *plus
,
312 *select0
, *select1
, *select2
, *select3
, *select4
,
313 *select5
, *select6
, *select7
, *select8
, *select9
;
316 /*================================ Prototypes =============================== */
318 static void freeStringObject(robj
*o
);
319 static void freeListObject(robj
*o
);
320 static void freeSetObject(robj
*o
);
321 static void decrRefCount(void *o
);
322 static robj
*createObject(int type
, void *ptr
);
323 static void freeClient(redisClient
*c
);
324 static int rdbLoad(char *filename
);
325 static void addReply(redisClient
*c
, robj
*obj
);
326 static void addReplySds(redisClient
*c
, sds s
);
327 static void incrRefCount(robj
*o
);
328 static int rdbSaveBackground(char *filename
);
329 static robj
*createStringObject(char *ptr
, size_t len
);
330 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
);
331 static int syncWithMaster(void);
332 static robj
*tryObjectSharing(robj
*o
);
333 static int tryObjectEncoding(robj
*o
);
334 static robj
*getDecodedObject(const robj
*o
);
335 static int removeExpire(redisDb
*db
, robj
*key
);
336 static int expireIfNeeded(redisDb
*db
, robj
*key
);
337 static int deleteIfVolatile(redisDb
*db
, robj
*key
);
338 static int deleteKey(redisDb
*db
, robj
*key
);
339 static time_t getExpire(redisDb
*db
, robj
*key
);
340 static int setExpire(redisDb
*db
, robj
*key
, time_t when
);
341 static void updateSlavesWaitingBgsave(int bgsaveerr
);
342 static void freeMemoryIfNeeded(void);
343 static int processCommand(redisClient
*c
);
344 static void setupSigSegvAction(void);
345 static void rdbRemoveTempFile(pid_t childpid
);
346 static size_t stringObjectLen(robj
*o
);
348 static void authCommand(redisClient
*c
);
349 static void pingCommand(redisClient
*c
);
350 static void echoCommand(redisClient
*c
);
351 static void setCommand(redisClient
*c
);
352 static void setnxCommand(redisClient
*c
);
353 static void getCommand(redisClient
*c
);
354 static void delCommand(redisClient
*c
);
355 static void existsCommand(redisClient
*c
);
356 static void incrCommand(redisClient
*c
);
357 static void decrCommand(redisClient
*c
);
358 static void incrbyCommand(redisClient
*c
);
359 static void decrbyCommand(redisClient
*c
);
360 static void selectCommand(redisClient
*c
);
361 static void randomkeyCommand(redisClient
*c
);
362 static void keysCommand(redisClient
*c
);
363 static void dbsizeCommand(redisClient
*c
);
364 static void lastsaveCommand(redisClient
*c
);
365 static void saveCommand(redisClient
*c
);
366 static void bgsaveCommand(redisClient
*c
);
367 static void shutdownCommand(redisClient
*c
);
368 static void moveCommand(redisClient
*c
);
369 static void renameCommand(redisClient
*c
);
370 static void renamenxCommand(redisClient
*c
);
371 static void lpushCommand(redisClient
*c
);
372 static void rpushCommand(redisClient
*c
);
373 static void lpopCommand(redisClient
*c
);
374 static void rpopCommand(redisClient
*c
);
375 static void llenCommand(redisClient
*c
);
376 static void lindexCommand(redisClient
*c
);
377 static void lrangeCommand(redisClient
*c
);
378 static void ltrimCommand(redisClient
*c
);
379 static void typeCommand(redisClient
*c
);
380 static void lsetCommand(redisClient
*c
);
381 static void saddCommand(redisClient
*c
);
382 static void sremCommand(redisClient
*c
);
383 static void smoveCommand(redisClient
*c
);
384 static void sismemberCommand(redisClient
*c
);
385 static void scardCommand(redisClient
*c
);
386 static void spopCommand(redisClient
*c
);
387 static void sinterCommand(redisClient
*c
);
388 static void sinterstoreCommand(redisClient
*c
);
389 static void sunionCommand(redisClient
*c
);
390 static void sunionstoreCommand(redisClient
*c
);
391 static void sdiffCommand(redisClient
*c
);
392 static void sdiffstoreCommand(redisClient
*c
);
393 static void syncCommand(redisClient
*c
);
394 static void flushdbCommand(redisClient
*c
);
395 static void flushallCommand(redisClient
*c
);
396 static void sortCommand(redisClient
*c
);
397 static void lremCommand(redisClient
*c
);
398 static void infoCommand(redisClient
*c
);
399 static void mgetCommand(redisClient
*c
);
400 static void monitorCommand(redisClient
*c
);
401 static void expireCommand(redisClient
*c
);
402 static void getSetCommand(redisClient
*c
);
403 static void ttlCommand(redisClient
*c
);
404 static void slaveofCommand(redisClient
*c
);
405 static void debugCommand(redisClient
*c
);
406 /*================================= Globals ================================= */
409 static struct redisServer server
; /* server global state */
410 static struct redisCommand cmdTable
[] = {
411 {"get",getCommand
,2,REDIS_CMD_INLINE
},
412 {"set",setCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
413 {"setnx",setnxCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
414 {"del",delCommand
,-2,REDIS_CMD_INLINE
},
415 {"exists",existsCommand
,2,REDIS_CMD_INLINE
},
416 {"incr",incrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
417 {"decr",decrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
418 {"mget",mgetCommand
,-2,REDIS_CMD_INLINE
},
419 {"rpush",rpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
420 {"lpush",lpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
421 {"rpop",rpopCommand
,2,REDIS_CMD_INLINE
},
422 {"lpop",lpopCommand
,2,REDIS_CMD_INLINE
},
423 {"llen",llenCommand
,2,REDIS_CMD_INLINE
},
424 {"lindex",lindexCommand
,3,REDIS_CMD_INLINE
},
425 {"lset",lsetCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
426 {"lrange",lrangeCommand
,4,REDIS_CMD_INLINE
},
427 {"ltrim",ltrimCommand
,4,REDIS_CMD_INLINE
},
428 {"lrem",lremCommand
,4,REDIS_CMD_BULK
},
429 {"sadd",saddCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
430 {"srem",sremCommand
,3,REDIS_CMD_BULK
},
431 {"smove",smoveCommand
,4,REDIS_CMD_BULK
},
432 {"sismember",sismemberCommand
,3,REDIS_CMD_BULK
},
433 {"scard",scardCommand
,2,REDIS_CMD_INLINE
},
434 {"spop",spopCommand
,2,REDIS_CMD_INLINE
},
435 {"sinter",sinterCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
436 {"sinterstore",sinterstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
437 {"sunion",sunionCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
438 {"sunionstore",sunionstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
439 {"sdiff",sdiffCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
440 {"sdiffstore",sdiffstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
441 {"smembers",sinterCommand
,2,REDIS_CMD_INLINE
},
442 {"incrby",incrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
443 {"decrby",decrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
444 {"getset",getSetCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
445 {"randomkey",randomkeyCommand
,1,REDIS_CMD_INLINE
},
446 {"select",selectCommand
,2,REDIS_CMD_INLINE
},
447 {"move",moveCommand
,3,REDIS_CMD_INLINE
},
448 {"rename",renameCommand
,3,REDIS_CMD_INLINE
},
449 {"renamenx",renamenxCommand
,3,REDIS_CMD_INLINE
},
450 {"expire",expireCommand
,3,REDIS_CMD_INLINE
},
451 {"keys",keysCommand
,2,REDIS_CMD_INLINE
},
452 {"dbsize",dbsizeCommand
,1,REDIS_CMD_INLINE
},
453 {"auth",authCommand
,2,REDIS_CMD_INLINE
},
454 {"ping",pingCommand
,1,REDIS_CMD_INLINE
},
455 {"echo",echoCommand
,2,REDIS_CMD_BULK
},
456 {"save",saveCommand
,1,REDIS_CMD_INLINE
},
457 {"bgsave",bgsaveCommand
,1,REDIS_CMD_INLINE
},
458 {"shutdown",shutdownCommand
,1,REDIS_CMD_INLINE
},
459 {"lastsave",lastsaveCommand
,1,REDIS_CMD_INLINE
},
460 {"type",typeCommand
,2,REDIS_CMD_INLINE
},
461 {"sync",syncCommand
,1,REDIS_CMD_INLINE
},
462 {"flushdb",flushdbCommand
,1,REDIS_CMD_INLINE
},
463 {"flushall",flushallCommand
,1,REDIS_CMD_INLINE
},
464 {"sort",sortCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
465 {"info",infoCommand
,1,REDIS_CMD_INLINE
},
466 {"monitor",monitorCommand
,1,REDIS_CMD_INLINE
},
467 {"ttl",ttlCommand
,2,REDIS_CMD_INLINE
},
468 {"slaveof",slaveofCommand
,3,REDIS_CMD_INLINE
},
469 {"debug",debugCommand
,-2,REDIS_CMD_INLINE
},
472 /*============================ Utility functions ============================ */
474 /* Glob-style pattern matching. */
475 int stringmatchlen(const char *pattern
, int patternLen
,
476 const char *string
, int stringLen
, int nocase
)
481 while (pattern
[1] == '*') {
486 return 1; /* match */
488 if (stringmatchlen(pattern
+1, patternLen
-1,
489 string
, stringLen
, nocase
))
490 return 1; /* match */
494 return 0; /* no match */
498 return 0; /* no match */
508 not = pattern
[0] == '^';
515 if (pattern
[0] == '\\') {
518 if (pattern
[0] == string
[0])
520 } else if (pattern
[0] == ']') {
522 } else if (patternLen
== 0) {
526 } else if (pattern
[1] == '-' && patternLen
>= 3) {
527 int start
= pattern
[0];
528 int end
= pattern
[2];
536 start
= tolower(start
);
542 if (c
>= start
&& c
<= end
)
546 if (pattern
[0] == string
[0])
549 if (tolower((int)pattern
[0]) == tolower((int)string
[0]))
559 return 0; /* no match */
565 if (patternLen
>= 2) {
572 if (pattern
[0] != string
[0])
573 return 0; /* no match */
575 if (tolower((int)pattern
[0]) != tolower((int)string
[0]))
576 return 0; /* no match */
584 if (stringLen
== 0) {
585 while(*pattern
== '*') {
592 if (patternLen
== 0 && stringLen
== 0)
597 static void redisLog(int level
, const char *fmt
, ...) {
601 fp
= (server
.logfile
== NULL
) ? stdout
: fopen(server
.logfile
,"a");
605 if (level
>= server
.verbosity
) {
611 strftime(buf
,64,"%d %b %H:%M:%S",gmtime(&now
));
612 fprintf(fp
,"%s %c ",buf
,c
[level
]);
613 vfprintf(fp
, fmt
, ap
);
619 if (server
.logfile
) fclose(fp
);
622 /*====================== Hash table type implementation ==================== */
624 /* This is an hash table type that uses the SDS dynamic strings libary as
625 * keys and radis objects as values (objects can hold SDS strings,
628 static int sdsDictKeyCompare(void *privdata
, const void *key1
,
632 DICT_NOTUSED(privdata
);
634 l1
= sdslen((sds
)key1
);
635 l2
= sdslen((sds
)key2
);
636 if (l1
!= l2
) return 0;
637 return memcmp(key1
, key2
, l1
) == 0;
640 static void dictRedisObjectDestructor(void *privdata
, void *val
)
642 DICT_NOTUSED(privdata
);
647 static int dictObjKeyCompare(void *privdata
, const void *key1
,
650 const robj
*o1
= key1
, *o2
= key2
;
651 return sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
654 static unsigned int dictObjHash(const void *key
) {
656 return dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
659 static int dictEncObjKeyCompare(void *privdata
, const void *key1
,
662 const robj
*o1
= key1
, *o2
= key2
;
664 if (o1
->encoding
== REDIS_ENCODING_RAW
&&
665 o2
->encoding
== REDIS_ENCODING_RAW
)
666 return sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
671 dec1
= o1
->encoding
!= REDIS_ENCODING_RAW
?
672 getDecodedObject(o1
) : (robj
*)o1
;
673 dec2
= o2
->encoding
!= REDIS_ENCODING_RAW
?
674 getDecodedObject(o2
) : (robj
*)o2
;
675 cmp
= sdsDictKeyCompare(privdata
,dec1
->ptr
,dec2
->ptr
);
676 if (dec1
!= o1
) decrRefCount(dec1
);
677 if (dec2
!= o2
) decrRefCount(dec2
);
682 static unsigned int dictEncObjHash(const void *key
) {
685 if (o
->encoding
== REDIS_ENCODING_RAW
)
686 return dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
688 robj
*dec
= getDecodedObject(o
);
689 unsigned int hash
= dictGenHashFunction(dec
->ptr
, sdslen((sds
)dec
->ptr
));
695 static dictType setDictType
= {
696 dictEncObjHash
, /* hash function */
699 dictEncObjKeyCompare
, /* key compare */
700 dictRedisObjectDestructor
, /* key destructor */
701 NULL
/* val destructor */
704 static dictType hashDictType
= {
705 dictObjHash
, /* hash function */
708 dictObjKeyCompare
, /* key compare */
709 dictRedisObjectDestructor
, /* key destructor */
710 dictRedisObjectDestructor
/* val destructor */
713 /* ========================= Random utility functions ======================= */
715 /* Redis generally does not try to recover from out of memory conditions
716 * when allocating objects or strings, it is not clear if it will be possible
717 * to report this condition to the client since the networking layer itself
718 * is based on heap allocation for send buffers, so we simply abort.
719 * At least the code will be simpler to read... */
720 static void oom(const char *msg
) {
721 fprintf(stderr
, "%s: Out of memory\n",msg
);
727 /* ====================== Redis server networking stuff ===================== */
728 static void closeTimedoutClients(void) {
731 time_t now
= time(NULL
);
733 listRewind(server
.clients
);
734 while ((ln
= listYield(server
.clients
)) != NULL
) {
735 c
= listNodeValue(ln
);
736 if (!(c
->flags
& REDIS_SLAVE
) && /* no timeout for slaves */
737 !(c
->flags
& REDIS_MASTER
) && /* no timeout for masters */
738 (now
- c
->lastinteraction
> server
.maxidletime
)) {
739 redisLog(REDIS_DEBUG
,"Closing idle client");
745 static int htNeedsResize(dict
*dict
) {
746 long long size
, used
;
748 size
= dictSlots(dict
);
749 used
= dictSize(dict
);
750 return (size
&& used
&& size
> DICT_HT_INITIAL_SIZE
&&
751 (used
*100/size
< REDIS_HT_MINFILL
));
754 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
755 * we resize the hash table to save memory */
756 static void tryResizeHashTables(void) {
759 for (j
= 0; j
< server
.dbnum
; j
++) {
760 if (htNeedsResize(server
.db
[j
].dict
)) {
761 redisLog(REDIS_DEBUG
,"The hash table %d is too sparse, resize it...",j
);
762 dictResize(server
.db
[j
].dict
);
763 redisLog(REDIS_DEBUG
,"Hash table %d resized.",j
);
765 if (htNeedsResize(server
.db
[j
].expires
))
766 dictResize(server
.db
[j
].expires
);
770 static int serverCron(struct aeEventLoop
*eventLoop
, long long id
, void *clientData
) {
771 int j
, loops
= server
.cronloops
++;
772 REDIS_NOTUSED(eventLoop
);
774 REDIS_NOTUSED(clientData
);
776 /* Update the global state with the amount of used memory */
777 server
.usedmemory
= zmalloc_used_memory();
779 /* Show some info about non-empty databases */
780 for (j
= 0; j
< server
.dbnum
; j
++) {
781 long long size
, used
, vkeys
;
783 size
= dictSlots(server
.db
[j
].dict
);
784 used
= dictSize(server
.db
[j
].dict
);
785 vkeys
= dictSize(server
.db
[j
].expires
);
786 if (!(loops
% 5) && (used
|| vkeys
)) {
787 redisLog(REDIS_DEBUG
,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j
,used
,vkeys
,size
);
788 /* dictPrintStats(server.dict); */
792 /* We don't want to resize the hash tables while a bacground saving
793 * is in progress: the saving child is created using fork() that is
794 * implemented with a copy-on-write semantic in most modern systems, so
795 * if we resize the HT while there is the saving child at work actually
796 * a lot of memory movements in the parent will cause a lot of pages
798 if (!server
.bgsaveinprogress
) tryResizeHashTables();
800 /* Show information about connected clients */
802 redisLog(REDIS_DEBUG
,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
803 listLength(server
.clients
)-listLength(server
.slaves
),
804 listLength(server
.slaves
),
806 dictSize(server
.sharingpool
));
809 /* Close connections of timedout clients */
810 if (server
.maxidletime
&& !(loops
% 10))
811 closeTimedoutClients();
813 /* Check if a background saving in progress terminated */
814 if (server
.bgsaveinprogress
) {
816 if (wait4(-1,&statloc
,WNOHANG
,NULL
)) {
817 int exitcode
= WEXITSTATUS(statloc
);
818 int bysignal
= WIFSIGNALED(statloc
);
820 if (!bysignal
&& exitcode
== 0) {
821 redisLog(REDIS_NOTICE
,
822 "Background saving terminated with success");
824 server
.lastsave
= time(NULL
);
825 } else if (!bysignal
&& exitcode
!= 0) {
826 redisLog(REDIS_WARNING
, "Background saving error");
828 redisLog(REDIS_WARNING
,
829 "Background saving terminated by signal");
830 rdbRemoveTempFile(server
.bgsavechildpid
);
832 server
.bgsaveinprogress
= 0;
833 server
.bgsavechildpid
= -1;
834 updateSlavesWaitingBgsave(exitcode
== 0 ? REDIS_OK
: REDIS_ERR
);
837 /* If there is not a background saving in progress check if
838 * we have to save now */
839 time_t now
= time(NULL
);
840 for (j
= 0; j
< server
.saveparamslen
; j
++) {
841 struct saveparam
*sp
= server
.saveparams
+j
;
843 if (server
.dirty
>= sp
->changes
&&
844 now
-server
.lastsave
> sp
->seconds
) {
845 redisLog(REDIS_NOTICE
,"%d changes in %d seconds. Saving...",
846 sp
->changes
, sp
->seconds
);
847 rdbSaveBackground(server
.dbfilename
);
853 /* Try to expire a few timed out keys */
854 for (j
= 0; j
< server
.dbnum
; j
++) {
855 redisDb
*db
= server
.db
+j
;
856 int num
= dictSize(db
->expires
);
859 time_t now
= time(NULL
);
861 if (num
> REDIS_EXPIRELOOKUPS_PER_CRON
)
862 num
= REDIS_EXPIRELOOKUPS_PER_CRON
;
867 if ((de
= dictGetRandomKey(db
->expires
)) == NULL
) break;
868 t
= (time_t) dictGetEntryVal(de
);
870 deleteKey(db
,dictGetEntryKey(de
));
876 /* Check if we should connect to a MASTER */
877 if (server
.replstate
== REDIS_REPL_CONNECT
) {
878 redisLog(REDIS_NOTICE
,"Connecting to MASTER...");
879 if (syncWithMaster() == REDIS_OK
) {
880 redisLog(REDIS_NOTICE
,"MASTER <-> SLAVE sync succeeded");
886 static void createSharedObjects(void) {
887 shared
.crlf
= createObject(REDIS_STRING
,sdsnew("\r\n"));
888 shared
.ok
= createObject(REDIS_STRING
,sdsnew("+OK\r\n"));
889 shared
.err
= createObject(REDIS_STRING
,sdsnew("-ERR\r\n"));
890 shared
.emptybulk
= createObject(REDIS_STRING
,sdsnew("$0\r\n\r\n"));
891 shared
.czero
= createObject(REDIS_STRING
,sdsnew(":0\r\n"));
892 shared
.cone
= createObject(REDIS_STRING
,sdsnew(":1\r\n"));
893 shared
.nullbulk
= createObject(REDIS_STRING
,sdsnew("$-1\r\n"));
894 shared
.nullmultibulk
= createObject(REDIS_STRING
,sdsnew("*-1\r\n"));
895 shared
.emptymultibulk
= createObject(REDIS_STRING
,sdsnew("*0\r\n"));
897 shared
.pong
= createObject(REDIS_STRING
,sdsnew("+PONG\r\n"));
898 shared
.wrongtypeerr
= createObject(REDIS_STRING
,sdsnew(
899 "-ERR Operation against a key holding the wrong kind of value\r\n"));
900 shared
.nokeyerr
= createObject(REDIS_STRING
,sdsnew(
901 "-ERR no such key\r\n"));
902 shared
.syntaxerr
= createObject(REDIS_STRING
,sdsnew(
903 "-ERR syntax error\r\n"));
904 shared
.sameobjecterr
= createObject(REDIS_STRING
,sdsnew(
905 "-ERR source and destination objects are the same\r\n"));
906 shared
.outofrangeerr
= createObject(REDIS_STRING
,sdsnew(
907 "-ERR index out of range\r\n"));
908 shared
.space
= createObject(REDIS_STRING
,sdsnew(" "));
909 shared
.colon
= createObject(REDIS_STRING
,sdsnew(":"));
910 shared
.plus
= createObject(REDIS_STRING
,sdsnew("+"));
911 shared
.select0
= createStringObject("select 0\r\n",10);
912 shared
.select1
= createStringObject("select 1\r\n",10);
913 shared
.select2
= createStringObject("select 2\r\n",10);
914 shared
.select3
= createStringObject("select 3\r\n",10);
915 shared
.select4
= createStringObject("select 4\r\n",10);
916 shared
.select5
= createStringObject("select 5\r\n",10);
917 shared
.select6
= createStringObject("select 6\r\n",10);
918 shared
.select7
= createStringObject("select 7\r\n",10);
919 shared
.select8
= createStringObject("select 8\r\n",10);
920 shared
.select9
= createStringObject("select 9\r\n",10);
923 static void appendServerSaveParams(time_t seconds
, int changes
) {
924 server
.saveparams
= zrealloc(server
.saveparams
,sizeof(struct saveparam
)*(server
.saveparamslen
+1));
925 if (server
.saveparams
== NULL
) oom("appendServerSaveParams");
926 server
.saveparams
[server
.saveparamslen
].seconds
= seconds
;
927 server
.saveparams
[server
.saveparamslen
].changes
= changes
;
928 server
.saveparamslen
++;
931 static void ResetServerSaveParams() {
932 zfree(server
.saveparams
);
933 server
.saveparams
= NULL
;
934 server
.saveparamslen
= 0;
937 static void initServerConfig() {
938 server
.dbnum
= REDIS_DEFAULT_DBNUM
;
939 server
.port
= REDIS_SERVERPORT
;
940 server
.verbosity
= REDIS_DEBUG
;
941 server
.maxidletime
= REDIS_MAXIDLETIME
;
942 server
.saveparams
= NULL
;
943 server
.logfile
= NULL
; /* NULL = log on standard output */
944 server
.bindaddr
= NULL
;
945 server
.glueoutputbuf
= 1;
946 server
.daemonize
= 0;
947 server
.pidfile
= "/var/run/redis.pid";
948 server
.dbfilename
= "dump.rdb";
949 server
.requirepass
= NULL
;
950 server
.shareobjects
= 0;
951 server
.sharingpoolsize
= 1024;
952 server
.maxclients
= 0;
953 server
.maxmemory
= 0;
954 ResetServerSaveParams();
956 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
957 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
958 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
959 /* Replication related */
961 server
.masterhost
= NULL
;
962 server
.masterport
= 6379;
963 server
.master
= NULL
;
964 server
.replstate
= REDIS_REPL_NONE
;
967 static void initServer() {
970 signal(SIGHUP
, SIG_IGN
);
971 signal(SIGPIPE
, SIG_IGN
);
972 setupSigSegvAction();
974 server
.clients
= listCreate();
975 server
.slaves
= listCreate();
976 server
.monitors
= listCreate();
977 server
.objfreelist
= listCreate();
978 createSharedObjects();
979 server
.el
= aeCreateEventLoop();
980 server
.db
= zmalloc(sizeof(redisDb
)*server
.dbnum
);
981 server
.sharingpool
= dictCreate(&setDictType
,NULL
);
982 if (!server
.db
|| !server
.clients
|| !server
.slaves
|| !server
.monitors
|| !server
.el
|| !server
.objfreelist
)
983 oom("server initialization"); /* Fatal OOM */
984 server
.fd
= anetTcpServer(server
.neterr
, server
.port
, server
.bindaddr
);
985 if (server
.fd
== -1) {
986 redisLog(REDIS_WARNING
, "Opening TCP port: %s", server
.neterr
);
989 for (j
= 0; j
< server
.dbnum
; j
++) {
990 server
.db
[j
].dict
= dictCreate(&hashDictType
,NULL
);
991 server
.db
[j
].expires
= dictCreate(&setDictType
,NULL
);
994 server
.cronloops
= 0;
995 server
.bgsaveinprogress
= 0;
996 server
.bgsavechildpid
= -1;
997 server
.lastsave
= time(NULL
);
999 server
.usedmemory
= 0;
1000 server
.stat_numcommands
= 0;
1001 server
.stat_numconnections
= 0;
1002 server
.stat_starttime
= time(NULL
);
1003 aeCreateTimeEvent(server
.el
, 1000, serverCron
, NULL
, NULL
);
1006 /* Empty the whole database */
1007 static long long emptyDb() {
1009 long long removed
= 0;
1011 for (j
= 0; j
< server
.dbnum
; j
++) {
1012 removed
+= dictSize(server
.db
[j
].dict
);
1013 dictEmpty(server
.db
[j
].dict
);
1014 dictEmpty(server
.db
[j
].expires
);
1019 static int yesnotoi(char *s
) {
1020 if (!strcasecmp(s
,"yes")) return 1;
1021 else if (!strcasecmp(s
,"no")) return 0;
1025 /* I agree, this is a very rudimental way to load a configuration...
1026 will improve later if the config gets more complex */
1027 static void loadServerConfig(char *filename
) {
1029 char buf
[REDIS_CONFIGLINE_MAX
+1], *err
= NULL
;
1033 if (filename
[0] == '-' && filename
[1] == '\0')
1036 if ((fp
= fopen(filename
,"r")) == NULL
) {
1037 redisLog(REDIS_WARNING
,"Fatal error, can't open config file");
1042 while(fgets(buf
,REDIS_CONFIGLINE_MAX
+1,fp
) != NULL
) {
1048 line
= sdstrim(line
," \t\r\n");
1050 /* Skip comments and blank lines*/
1051 if (line
[0] == '#' || line
[0] == '\0') {
1056 /* Split into arguments */
1057 argv
= sdssplitlen(line
,sdslen(line
)," ",1,&argc
);
1058 sdstolower(argv
[0]);
1060 /* Execute config directives */
1061 if (!strcasecmp(argv
[0],"timeout") && argc
== 2) {
1062 server
.maxidletime
= atoi(argv
[1]);
1063 if (server
.maxidletime
< 0) {
1064 err
= "Invalid timeout value"; goto loaderr
;
1066 } else if (!strcasecmp(argv
[0],"port") && argc
== 2) {
1067 server
.port
= atoi(argv
[1]);
1068 if (server
.port
< 1 || server
.port
> 65535) {
1069 err
= "Invalid port"; goto loaderr
;
1071 } else if (!strcasecmp(argv
[0],"bind") && argc
== 2) {
1072 server
.bindaddr
= zstrdup(argv
[1]);
1073 } else if (!strcasecmp(argv
[0],"save") && argc
== 3) {
1074 int seconds
= atoi(argv
[1]);
1075 int changes
= atoi(argv
[2]);
1076 if (seconds
< 1 || changes
< 0) {
1077 err
= "Invalid save parameters"; goto loaderr
;
1079 appendServerSaveParams(seconds
,changes
);
1080 } else if (!strcasecmp(argv
[0],"dir") && argc
== 2) {
1081 if (chdir(argv
[1]) == -1) {
1082 redisLog(REDIS_WARNING
,"Can't chdir to '%s': %s",
1083 argv
[1], strerror(errno
));
1086 } else if (!strcasecmp(argv
[0],"loglevel") && argc
== 2) {
1087 if (!strcasecmp(argv
[1],"debug")) server
.verbosity
= REDIS_DEBUG
;
1088 else if (!strcasecmp(argv
[1],"notice")) server
.verbosity
= REDIS_NOTICE
;
1089 else if (!strcasecmp(argv
[1],"warning")) server
.verbosity
= REDIS_WARNING
;
1091 err
= "Invalid log level. Must be one of debug, notice, warning";
1094 } else if (!strcasecmp(argv
[0],"logfile") && argc
== 2) {
1097 server
.logfile
= zstrdup(argv
[1]);
1098 if (!strcasecmp(server
.logfile
,"stdout")) {
1099 zfree(server
.logfile
);
1100 server
.logfile
= NULL
;
1102 if (server
.logfile
) {
1103 /* Test if we are able to open the file. The server will not
1104 * be able to abort just for this problem later... */
1105 logfp
= fopen(server
.logfile
,"a");
1106 if (logfp
== NULL
) {
1107 err
= sdscatprintf(sdsempty(),
1108 "Can't open the log file: %s", strerror(errno
));
1113 } else if (!strcasecmp(argv
[0],"databases") && argc
== 2) {
1114 server
.dbnum
= atoi(argv
[1]);
1115 if (server
.dbnum
< 1) {
1116 err
= "Invalid number of databases"; goto loaderr
;
1118 } else if (!strcasecmp(argv
[0],"maxclients") && argc
== 2) {
1119 server
.maxclients
= atoi(argv
[1]);
1120 } else if (!strcasecmp(argv
[0],"maxmemory") && argc
== 2) {
1121 server
.maxmemory
= strtoll(argv
[1], NULL
, 10);
1122 } else if (!strcasecmp(argv
[0],"slaveof") && argc
== 3) {
1123 server
.masterhost
= sdsnew(argv
[1]);
1124 server
.masterport
= atoi(argv
[2]);
1125 server
.replstate
= REDIS_REPL_CONNECT
;
1126 } else if (!strcasecmp(argv
[0],"glueoutputbuf") && argc
== 2) {
1127 if ((server
.glueoutputbuf
= yesnotoi(argv
[1])) == -1) {
1128 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1130 } else if (!strcasecmp(argv
[0],"shareobjects") && argc
== 2) {
1131 if ((server
.shareobjects
= yesnotoi(argv
[1])) == -1) {
1132 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1134 } else if (!strcasecmp(argv
[0],"shareobjectspoolsize") && argc
== 2) {
1135 server
.sharingpoolsize
= atoi(argv
[1]);
1136 if (server
.sharingpoolsize
< 1) {
1137 err
= "invalid object sharing pool size"; goto loaderr
;
1139 } else if (!strcasecmp(argv
[0],"daemonize") && argc
== 2) {
1140 if ((server
.daemonize
= yesnotoi(argv
[1])) == -1) {
1141 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1143 } else if (!strcasecmp(argv
[0],"requirepass") && argc
== 2) {
1144 server
.requirepass
= zstrdup(argv
[1]);
1145 } else if (!strcasecmp(argv
[0],"pidfile") && argc
== 2) {
1146 server
.pidfile
= zstrdup(argv
[1]);
1147 } else if (!strcasecmp(argv
[0],"dbfilename") && argc
== 2) {
1148 server
.dbfilename
= zstrdup(argv
[1]);
1150 err
= "Bad directive or wrong number of arguments"; goto loaderr
;
1152 for (j
= 0; j
< argc
; j
++)
1157 if (fp
!= stdin
) fclose(fp
);
1161 fprintf(stderr
, "\n*** FATAL CONFIG FILE ERROR ***\n");
1162 fprintf(stderr
, "Reading the configuration file, at line %d\n", linenum
);
1163 fprintf(stderr
, ">>> '%s'\n", line
);
1164 fprintf(stderr
, "%s\n", err
);
1168 static void freeClientArgv(redisClient
*c
) {
1171 for (j
= 0; j
< c
->argc
; j
++)
1172 decrRefCount(c
->argv
[j
]);
1173 for (j
= 0; j
< c
->mbargc
; j
++)
1174 decrRefCount(c
->mbargv
[j
]);
1179 static void freeClient(redisClient
*c
) {
1182 aeDeleteFileEvent(server
.el
,c
->fd
,AE_READABLE
);
1183 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1184 sdsfree(c
->querybuf
);
1185 listRelease(c
->reply
);
1188 ln
= listSearchKey(server
.clients
,c
);
1190 listDelNode(server
.clients
,ln
);
1191 if (c
->flags
& REDIS_SLAVE
) {
1192 if (c
->replstate
== REDIS_REPL_SEND_BULK
&& c
->repldbfd
!= -1)
1194 list
*l
= (c
->flags
& REDIS_MONITOR
) ? server
.monitors
: server
.slaves
;
1195 ln
= listSearchKey(l
,c
);
1199 if (c
->flags
& REDIS_MASTER
) {
1200 server
.master
= NULL
;
1201 server
.replstate
= REDIS_REPL_CONNECT
;
1208 static void glueReplyBuffersIfNeeded(redisClient
*c
) {
1213 listRewind(c
->reply
);
1214 while((ln
= listYield(c
->reply
))) {
1216 totlen
+= sdslen(o
->ptr
);
1217 /* This optimization makes more sense if we don't have to copy
1219 if (totlen
> 1024) return;
1225 listRewind(c
->reply
);
1226 while((ln
= listYield(c
->reply
))) {
1228 memcpy(buf
+copylen
,o
->ptr
,sdslen(o
->ptr
));
1229 copylen
+= sdslen(o
->ptr
);
1230 listDelNode(c
->reply
,ln
);
1232 /* Now the output buffer is empty, add the new single element */
1233 o
= createObject(REDIS_STRING
,sdsnewlen(buf
,totlen
));
1234 if (!listAddNodeTail(c
->reply
,o
)) oom("listAddNodeTail");
1238 static void sendReplyToClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1239 redisClient
*c
= privdata
;
1240 int nwritten
= 0, totwritten
= 0, objlen
;
1243 REDIS_NOTUSED(mask
);
1245 if (server
.glueoutputbuf
&& listLength(c
->reply
) > 1)
1246 glueReplyBuffersIfNeeded(c
);
1247 while(listLength(c
->reply
)) {
1248 o
= listNodeValue(listFirst(c
->reply
));
1249 objlen
= sdslen(o
->ptr
);
1252 listDelNode(c
->reply
,listFirst(c
->reply
));
1256 if (c
->flags
& REDIS_MASTER
) {
1257 /* Don't reply to a master */
1258 nwritten
= objlen
- c
->sentlen
;
1260 nwritten
= write(fd
, ((char*)o
->ptr
)+c
->sentlen
, objlen
- c
->sentlen
);
1261 if (nwritten
<= 0) break;
1263 c
->sentlen
+= nwritten
;
1264 totwritten
+= nwritten
;
1265 /* If we fully sent the object on head go to the next one */
1266 if (c
->sentlen
== objlen
) {
1267 listDelNode(c
->reply
,listFirst(c
->reply
));
1270 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1271 * bytes, in a single threaded server it's a good idea to server
1272 * other clients as well, even if a very large request comes from
1273 * super fast link that is always able to accept data (in real world
1274 * terms think to 'KEYS *' against the loopback interfae) */
1275 if (totwritten
> REDIS_MAX_WRITE_PER_EVENT
) break;
1277 if (nwritten
== -1) {
1278 if (errno
== EAGAIN
) {
1281 redisLog(REDIS_DEBUG
,
1282 "Error writing to client: %s", strerror(errno
));
1287 if (totwritten
> 0) c
->lastinteraction
= time(NULL
);
1288 if (listLength(c
->reply
) == 0) {
1290 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1294 static struct redisCommand
*lookupCommand(char *name
) {
1296 while(cmdTable
[j
].name
!= NULL
) {
1297 if (!strcasecmp(name
,cmdTable
[j
].name
)) return &cmdTable
[j
];
1303 /* resetClient prepare the client to process the next command */
1304 static void resetClient(redisClient
*c
) {
1310 /* If this function gets called we already read a whole
1311 * command, argments are in the client argv/argc fields.
1312 * processCommand() execute the command or prepare the
1313 * server for a bulk read from the client.
1315 * If 1 is returned the client is still alive and valid and
1316 * and other operations can be performed by the caller. Otherwise
1317 * if 0 is returned the client was destroied (i.e. after QUIT). */
1318 static int processCommand(redisClient
*c
) {
1319 struct redisCommand
*cmd
;
1322 /* Free some memory if needed (maxmemory setting) */
1323 if (server
.maxmemory
) freeMemoryIfNeeded();
1325 /* Handle the multi bulk command type. This is an alternative protocol
1326 * supported by Redis in order to receive commands that are composed of
1327 * multiple binary-safe "bulk" arguments. The latency of processing is
1328 * a bit higher but this allows things like multi-sets, so if this
1329 * protocol is used only for MSET and similar commands this is a big win. */
1330 if (c
->multibulk
== 0 && c
->argc
== 1 && ((char*)(c
->argv
[0]->ptr
))[0] == '*') {
1331 c
->multibulk
= atoi(((char*)c
->argv
[0]->ptr
)+1);
1332 if (c
->multibulk
<= 0) {
1336 decrRefCount(c
->argv
[c
->argc
-1]);
1340 } else if (c
->multibulk
) {
1341 if (c
->bulklen
== -1) {
1342 if (((char*)c
->argv
[0]->ptr
)[0] != '$') {
1343 addReplySds(c
,sdsnew("-ERR multi bulk protocol error\r\n"));
1347 int bulklen
= atoi(((char*)c
->argv
[0]->ptr
)+1);
1348 decrRefCount(c
->argv
[0]);
1349 if (bulklen
< 0 || bulklen
> 1024*1024*1024) {
1351 addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n"));
1356 c
->bulklen
= bulklen
+2; /* add two bytes for CR+LF */
1360 c
->mbargv
= zrealloc(c
->mbargv
,(sizeof(robj
*))*(c
->mbargc
+1));
1361 c
->mbargv
[c
->mbargc
] = c
->argv
[0];
1365 if (c
->multibulk
== 0) {
1369 /* Here we need to swap the multi-bulk argc/argv with the
1370 * normal argc/argv of the client structure. */
1372 c
->argv
= c
->mbargv
;
1373 c
->mbargv
= auxargv
;
1376 c
->argc
= c
->mbargc
;
1377 c
->mbargc
= auxargc
;
1379 /* We need to set bulklen to something different than -1
1380 * in order for the code below to process the command without
1381 * to try to read the last argument of a bulk command as
1382 * a special argument. */
1384 /* continue below and process the command */
1391 /* -- end of multi bulk commands processing -- */
1393 /* The QUIT command is handled as a special case. Normal command
1394 * procs are unable to close the client connection safely */
1395 if (!strcasecmp(c
->argv
[0]->ptr
,"quit")) {
1399 cmd
= lookupCommand(c
->argv
[0]->ptr
);
1401 addReplySds(c
,sdsnew("-ERR unknown command\r\n"));
1404 } else if ((cmd
->arity
> 0 && cmd
->arity
!= c
->argc
) ||
1405 (c
->argc
< -cmd
->arity
)) {
1406 addReplySds(c
,sdsnew("-ERR wrong number of arguments\r\n"));
1409 } else if (server
.maxmemory
&& cmd
->flags
& REDIS_CMD_DENYOOM
&& zmalloc_used_memory() > server
.maxmemory
) {
1410 addReplySds(c
,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1413 } else if (cmd
->flags
& REDIS_CMD_BULK
&& c
->bulklen
== -1) {
1414 int bulklen
= atoi(c
->argv
[c
->argc
-1]->ptr
);
1416 decrRefCount(c
->argv
[c
->argc
-1]);
1417 if (bulklen
< 0 || bulklen
> 1024*1024*1024) {
1419 addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n"));
1424 c
->bulklen
= bulklen
+2; /* add two bytes for CR+LF */
1425 /* It is possible that the bulk read is already in the
1426 * buffer. Check this condition and handle it accordingly */
1427 if ((signed)sdslen(c
->querybuf
) >= c
->bulklen
) {
1428 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
1430 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
1435 /* Let's try to share objects on the command arguments vector */
1436 if (server
.shareobjects
) {
1438 for(j
= 1; j
< c
->argc
; j
++)
1439 c
->argv
[j
] = tryObjectSharing(c
->argv
[j
]);
1441 /* Let's try to encode the bulk object to save space. */
1442 if (cmd
->flags
& REDIS_CMD_BULK
)
1443 tryObjectEncoding(c
->argv
[c
->argc
-1]);
1445 /* Check if the user is authenticated */
1446 if (server
.requirepass
&& !c
->authenticated
&& cmd
->proc
!= authCommand
) {
1447 addReplySds(c
,sdsnew("-ERR operation not permitted\r\n"));
1452 /* Exec the command */
1453 dirty
= server
.dirty
;
1455 if (server
.dirty
-dirty
!= 0 && listLength(server
.slaves
))
1456 replicationFeedSlaves(server
.slaves
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1457 if (listLength(server
.monitors
))
1458 replicationFeedSlaves(server
.monitors
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1459 server
.stat_numcommands
++;
1461 /* Prepare the client for the next command */
1462 if (c
->flags
& REDIS_CLOSE
) {
1470 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
) {
1474 /* (args*2)+1 is enough room for args, spaces, newlines */
1475 robj
*static_outv
[REDIS_STATIC_ARGS
*2+1];
1477 if (argc
<= REDIS_STATIC_ARGS
) {
1480 outv
= zmalloc(sizeof(robj
*)*(argc
*2+1));
1481 if (!outv
) oom("replicationFeedSlaves");
1484 for (j
= 0; j
< argc
; j
++) {
1485 if (j
!= 0) outv
[outc
++] = shared
.space
;
1486 if ((cmd
->flags
& REDIS_CMD_BULK
) && j
== argc
-1) {
1489 lenobj
= createObject(REDIS_STRING
,
1490 sdscatprintf(sdsempty(),"%d\r\n",
1491 stringObjectLen(argv
[j
])));
1492 lenobj
->refcount
= 0;
1493 outv
[outc
++] = lenobj
;
1495 outv
[outc
++] = argv
[j
];
1497 outv
[outc
++] = shared
.crlf
;
1499 /* Increment all the refcounts at start and decrement at end in order to
1500 * be sure to free objects if there is no slave in a replication state
1501 * able to be feed with commands */
1502 for (j
= 0; j
< outc
; j
++) incrRefCount(outv
[j
]);
1504 while((ln
= listYield(slaves
))) {
1505 redisClient
*slave
= ln
->value
;
1507 /* Don't feed slaves that are still waiting for BGSAVE to start */
1508 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) continue;
1510 /* Feed all the other slaves, MONITORs and so on */
1511 if (slave
->slaveseldb
!= dictid
) {
1515 case 0: selectcmd
= shared
.select0
; break;
1516 case 1: selectcmd
= shared
.select1
; break;
1517 case 2: selectcmd
= shared
.select2
; break;
1518 case 3: selectcmd
= shared
.select3
; break;
1519 case 4: selectcmd
= shared
.select4
; break;
1520 case 5: selectcmd
= shared
.select5
; break;
1521 case 6: selectcmd
= shared
.select6
; break;
1522 case 7: selectcmd
= shared
.select7
; break;
1523 case 8: selectcmd
= shared
.select8
; break;
1524 case 9: selectcmd
= shared
.select9
; break;
1526 selectcmd
= createObject(REDIS_STRING
,
1527 sdscatprintf(sdsempty(),"select %d\r\n",dictid
));
1528 selectcmd
->refcount
= 0;
1531 addReply(slave
,selectcmd
);
1532 slave
->slaveseldb
= dictid
;
1534 for (j
= 0; j
< outc
; j
++) addReply(slave
,outv
[j
]);
1536 for (j
= 0; j
< outc
; j
++) decrRefCount(outv
[j
]);
1537 if (outv
!= static_outv
) zfree(outv
);
1540 static void readQueryFromClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1541 redisClient
*c
= (redisClient
*) privdata
;
1542 char buf
[REDIS_IOBUF_LEN
];
1545 REDIS_NOTUSED(mask
);
1547 nread
= read(fd
, buf
, REDIS_IOBUF_LEN
);
1549 if (errno
== EAGAIN
) {
1552 redisLog(REDIS_DEBUG
, "Reading from client: %s",strerror(errno
));
1556 } else if (nread
== 0) {
1557 redisLog(REDIS_DEBUG
, "Client closed connection");
1562 c
->querybuf
= sdscatlen(c
->querybuf
, buf
, nread
);
1563 c
->lastinteraction
= time(NULL
);
1569 if (c
->bulklen
== -1) {
1570 /* Read the first line of the query */
1571 char *p
= strchr(c
->querybuf
,'\n');
1578 query
= c
->querybuf
;
1579 c
->querybuf
= sdsempty();
1580 querylen
= 1+(p
-(query
));
1581 if (sdslen(query
) > querylen
) {
1582 /* leave data after the first line of the query in the buffer */
1583 c
->querybuf
= sdscatlen(c
->querybuf
,query
+querylen
,sdslen(query
)-querylen
);
1585 *p
= '\0'; /* remove "\n" */
1586 if (*(p
-1) == '\r') *(p
-1) = '\0'; /* and "\r" if any */
1587 sdsupdatelen(query
);
1589 /* Now we can split the query in arguments */
1590 if (sdslen(query
) == 0) {
1591 /* Ignore empty query */
1595 argv
= sdssplitlen(query
,sdslen(query
)," ",1,&argc
);
1596 if (argv
== NULL
) oom("sdssplitlen");
1599 if (c
->argv
) zfree(c
->argv
);
1600 c
->argv
= zmalloc(sizeof(robj
*)*argc
);
1601 if (c
->argv
== NULL
) oom("allocating arguments list for client");
1603 for (j
= 0; j
< argc
; j
++) {
1604 if (sdslen(argv
[j
])) {
1605 c
->argv
[c
->argc
] = createObject(REDIS_STRING
,argv
[j
]);
1612 /* Execute the command. If the client is still valid
1613 * after processCommand() return and there is something
1614 * on the query buffer try to process the next command. */
1615 if (c
->argc
&& processCommand(c
) && sdslen(c
->querybuf
)) goto again
;
1617 } else if (sdslen(c
->querybuf
) >= REDIS_REQUEST_MAX_SIZE
) {
1618 redisLog(REDIS_DEBUG
, "Client protocol error");
1623 /* Bulk read handling. Note that if we are at this point
1624 the client already sent a command terminated with a newline,
1625 we are reading the bulk data that is actually the last
1626 argument of the command. */
1627 int qbl
= sdslen(c
->querybuf
);
1629 if (c
->bulklen
<= qbl
) {
1630 /* Copy everything but the final CRLF as final argument */
1631 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
1633 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
1640 static int selectDb(redisClient
*c
, int id
) {
1641 if (id
< 0 || id
>= server
.dbnum
)
1643 c
->db
= &server
.db
[id
];
1647 static void *dupClientReplyValue(void *o
) {
1648 incrRefCount((robj
*)o
);
1652 static redisClient
*createClient(int fd
) {
1653 redisClient
*c
= zmalloc(sizeof(*c
));
1655 anetNonBlock(NULL
,fd
);
1656 anetTcpNoDelay(NULL
,fd
);
1657 if (!c
) return NULL
;
1660 c
->querybuf
= sdsempty();
1669 c
->lastinteraction
= time(NULL
);
1670 c
->authenticated
= 0;
1671 c
->replstate
= REDIS_REPL_NONE
;
1672 if ((c
->reply
= listCreate()) == NULL
) oom("listCreate");
1673 listSetFreeMethod(c
->reply
,decrRefCount
);
1674 listSetDupMethod(c
->reply
,dupClientReplyValue
);
1675 if (aeCreateFileEvent(server
.el
, c
->fd
, AE_READABLE
,
1676 readQueryFromClient
, c
, NULL
) == AE_ERR
) {
1680 if (!listAddNodeTail(server
.clients
,c
)) oom("listAddNodeTail");
1684 static void addReply(redisClient
*c
, robj
*obj
) {
1685 if (listLength(c
->reply
) == 0 &&
1686 (c
->replstate
== REDIS_REPL_NONE
||
1687 c
->replstate
== REDIS_REPL_ONLINE
) &&
1688 aeCreateFileEvent(server
.el
, c
->fd
, AE_WRITABLE
,
1689 sendReplyToClient
, c
, NULL
) == AE_ERR
) return;
1690 if (obj
->encoding
!= REDIS_ENCODING_RAW
) {
1691 obj
= getDecodedObject(obj
);
1695 if (!listAddNodeTail(c
->reply
,obj
)) oom("listAddNodeTail");
1698 static void addReplySds(redisClient
*c
, sds s
) {
1699 robj
*o
= createObject(REDIS_STRING
,s
);
1704 static void addReplyBulkLen(redisClient
*c
, robj
*obj
) {
1707 if (obj
->encoding
== REDIS_ENCODING_RAW
) {
1708 len
= sdslen(obj
->ptr
);
1710 long n
= (long)obj
->ptr
;
1717 while((n
= n
/10) != 0) {
1721 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",len
));
1724 static void acceptHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1729 REDIS_NOTUSED(mask
);
1730 REDIS_NOTUSED(privdata
);
1732 cfd
= anetAccept(server
.neterr
, fd
, cip
, &cport
);
1733 if (cfd
== AE_ERR
) {
1734 redisLog(REDIS_DEBUG
,"Accepting client connection: %s", server
.neterr
);
1737 redisLog(REDIS_DEBUG
,"Accepted %s:%d", cip
, cport
);
1738 if ((c
= createClient(cfd
)) == NULL
) {
1739 redisLog(REDIS_WARNING
,"Error allocating resoures for the client");
1740 close(cfd
); /* May be already closed, just ingore errors */
1743 /* If maxclient directive is set and this is one client more... close the
1744 * connection. Note that we create the client instead to check before
1745 * for this condition, since now the socket is already set in nonblocking
1746 * mode and we can send an error for free using the Kernel I/O */
1747 if (server
.maxclients
&& listLength(server
.clients
) > server
.maxclients
) {
1748 char *err
= "-ERR max number of clients reached\r\n";
1750 /* That's a best effort error message, don't check write errors */
1751 (void) write(c
->fd
,err
,strlen(err
));
1755 server
.stat_numconnections
++;
1758 /* ======================= Redis objects implementation ===================== */
1760 static robj
*createObject(int type
, void *ptr
) {
1763 if (listLength(server
.objfreelist
)) {
1764 listNode
*head
= listFirst(server
.objfreelist
);
1765 o
= listNodeValue(head
);
1766 listDelNode(server
.objfreelist
,head
);
1768 o
= zmalloc(sizeof(*o
));
1770 if (!o
) oom("createObject");
1772 o
->encoding
= REDIS_ENCODING_RAW
;
1778 static robj
*createStringObject(char *ptr
, size_t len
) {
1779 return createObject(REDIS_STRING
,sdsnewlen(ptr
,len
));
1782 static robj
*createListObject(void) {
1783 list
*l
= listCreate();
1785 if (!l
) oom("listCreate");
1786 listSetFreeMethod(l
,decrRefCount
);
1787 return createObject(REDIS_LIST
,l
);
1790 static robj
*createSetObject(void) {
1791 dict
*d
= dictCreate(&setDictType
,NULL
);
1792 if (!d
) oom("dictCreate");
1793 return createObject(REDIS_SET
,d
);
1796 static void freeStringObject(robj
*o
) {
1797 if (o
->encoding
== REDIS_ENCODING_RAW
) {
1802 static void freeListObject(robj
*o
) {
1803 listRelease((list
*) o
->ptr
);
1806 static void freeSetObject(robj
*o
) {
1807 dictRelease((dict
*) o
->ptr
);
1810 static void freeHashObject(robj
*o
) {
1811 dictRelease((dict
*) o
->ptr
);
1814 static void incrRefCount(robj
*o
) {
1816 #ifdef DEBUG_REFCOUNT
1817 if (o
->type
== REDIS_STRING
)
1818 printf("Increment '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
);
1822 static void decrRefCount(void *obj
) {
1825 #ifdef DEBUG_REFCOUNT
1826 if (o
->type
== REDIS_STRING
)
1827 printf("Decrement '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
-1);
1829 if (--(o
->refcount
) == 0) {
1831 case REDIS_STRING
: freeStringObject(o
); break;
1832 case REDIS_LIST
: freeListObject(o
); break;
1833 case REDIS_SET
: freeSetObject(o
); break;
1834 case REDIS_HASH
: freeHashObject(o
); break;
1835 default: assert(0 != 0); break;
1837 if (listLength(server
.objfreelist
) > REDIS_OBJFREELIST_MAX
||
1838 !listAddNodeHead(server
.objfreelist
,o
))
1843 static robj
*lookupKey(redisDb
*db
, robj
*key
) {
1844 dictEntry
*de
= dictFind(db
->dict
,key
);
1845 return de
? dictGetEntryVal(de
) : NULL
;
1848 static robj
*lookupKeyRead(redisDb
*db
, robj
*key
) {
1849 expireIfNeeded(db
,key
);
1850 return lookupKey(db
,key
);
1853 static robj
*lookupKeyWrite(redisDb
*db
, robj
*key
) {
1854 deleteIfVolatile(db
,key
);
1855 return lookupKey(db
,key
);
1858 static int deleteKey(redisDb
*db
, robj
*key
) {
1861 /* We need to protect key from destruction: after the first dictDelete()
1862 * it may happen that 'key' is no longer valid if we don't increment
1863 * it's count. This may happen when we get the object reference directly
1864 * from the hash table with dictRandomKey() or dict iterators */
1866 if (dictSize(db
->expires
)) dictDelete(db
->expires
,key
);
1867 retval
= dictDelete(db
->dict
,key
);
1870 return retval
== DICT_OK
;
1873 /* Try to share an object against the shared objects pool */
1874 static robj
*tryObjectSharing(robj
*o
) {
1875 struct dictEntry
*de
;
1878 if (o
== NULL
|| server
.shareobjects
== 0) return o
;
1880 assert(o
->type
== REDIS_STRING
);
1881 de
= dictFind(server
.sharingpool
,o
);
1883 robj
*shared
= dictGetEntryKey(de
);
1885 c
= ((unsigned long) dictGetEntryVal(de
))+1;
1886 dictGetEntryVal(de
) = (void*) c
;
1887 incrRefCount(shared
);
1891 /* Here we are using a stream algorihtm: Every time an object is
1892 * shared we increment its count, everytime there is a miss we
1893 * recrement the counter of a random object. If this object reaches
1894 * zero we remove the object and put the current object instead. */
1895 if (dictSize(server
.sharingpool
) >=
1896 server
.sharingpoolsize
) {
1897 de
= dictGetRandomKey(server
.sharingpool
);
1899 c
= ((unsigned long) dictGetEntryVal(de
))-1;
1900 dictGetEntryVal(de
) = (void*) c
;
1902 dictDelete(server
.sharingpool
,de
->key
);
1905 c
= 0; /* If the pool is empty we want to add this object */
1910 retval
= dictAdd(server
.sharingpool
,o
,(void*)1);
1911 assert(retval
== DICT_OK
);
1918 /* Check if the nul-terminated string 's' can be represented by a long
1919 * (that is, is a number that fits into long without any other space or
1920 * character before or after the digits).
1922 * If so, the function returns REDIS_OK and *longval is set to the value
1923 * of the number. Otherwise REDIS_ERR is returned */
1924 static int isStringRepresentableAsLong(char *s
, long *longval
) {
1925 char buf
[32], *endptr
;
1929 value
= strtol(s
, &endptr
, 10);
1930 if (endptr
[0] != '\0') return REDIS_ERR
;
1931 slen
= snprintf(buf
,32,"%ld",value
);
1933 /* If the number converted back into a string is not identical
1934 * then it's not possible to encode the string as integer */
1935 if (strlen(buf
) != (unsigned)slen
|| memcmp(buf
,s
,slen
)) return REDIS_ERR
;
1936 if (longval
) *longval
= value
;
1940 /* Try to encode a string object in order to save space */
1941 static int tryObjectEncoding(robj
*o
) {
1945 if (o
->encoding
!= REDIS_ENCODING_RAW
)
1946 return REDIS_ERR
; /* Already encoded */
1948 /* It's not save to encode shared objects: shared objects can be shared
1949 * everywhere in the "object space" of Redis. Encoded objects can only
1950 * appear as "values" (and not, for instance, as keys) */
1951 if (o
->refcount
> 1) return REDIS_ERR
;
1953 /* Currently we try to encode only strings */
1954 assert(o
->type
== REDIS_STRING
);
1956 /* Check if we can represent this string as a long integer */
1957 if (isStringRepresentableAsLong(s
,&value
) == REDIS_ERR
) return REDIS_ERR
;
1959 /* Ok, this object can be encoded */
1960 o
->encoding
= REDIS_ENCODING_INT
;
1962 o
->ptr
= (void*) value
;
1966 /* Get a decoded version of an encoded object (returned as a new object) */
1967 static robj
*getDecodedObject(const robj
*o
) {
1970 assert(o
->encoding
!= REDIS_ENCODING_RAW
);
1971 if (o
->type
== REDIS_STRING
&& o
->encoding
== REDIS_ENCODING_INT
) {
1974 snprintf(buf
,32,"%ld",(long)o
->ptr
);
1975 dec
= createStringObject(buf
,strlen(buf
));
1982 static int compareStringObjects(robj
*a
, robj
*b
) {
1983 assert(a
->type
== REDIS_STRING
&& b
->type
== REDIS_STRING
);
1985 if (a
->encoding
== REDIS_ENCODING_INT
&& b
->encoding
== REDIS_ENCODING_INT
){
1986 return (long)a
->ptr
- (long)b
->ptr
;
1992 if (a
->encoding
!= REDIS_ENCODING_RAW
) a
= getDecodedObject(a
);
1993 if (b
->encoding
!= REDIS_ENCODING_RAW
) b
= getDecodedObject(a
);
1994 retval
= sdscmp(a
->ptr
,b
->ptr
);
2001 static size_t stringObjectLen(robj
*o
) {
2002 assert(o
->type
== REDIS_STRING
);
2003 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2004 return sdslen(o
->ptr
);
2008 return snprintf(buf
,32,"%ld",(long)o
->ptr
);
2012 /*============================ DB saving/loading ============================ */
2014 static int rdbSaveType(FILE *fp
, unsigned char type
) {
2015 if (fwrite(&type
,1,1,fp
) == 0) return -1;
2019 static int rdbSaveTime(FILE *fp
, time_t t
) {
2020 int32_t t32
= (int32_t) t
;
2021 if (fwrite(&t32
,4,1,fp
) == 0) return -1;
2025 /* check rdbLoadLen() comments for more info */
2026 static int rdbSaveLen(FILE *fp
, uint32_t len
) {
2027 unsigned char buf
[2];
2030 /* Save a 6 bit len */
2031 buf
[0] = (len
&0xFF)|(REDIS_RDB_6BITLEN
<<6);
2032 if (fwrite(buf
,1,1,fp
) == 0) return -1;
2033 } else if (len
< (1<<14)) {
2034 /* Save a 14 bit len */
2035 buf
[0] = ((len
>>8)&0xFF)|(REDIS_RDB_14BITLEN
<<6);
2037 if (fwrite(buf
,2,1,fp
) == 0) return -1;
2039 /* Save a 32 bit len */
2040 buf
[0] = (REDIS_RDB_32BITLEN
<<6);
2041 if (fwrite(buf
,1,1,fp
) == 0) return -1;
2043 if (fwrite(&len
,4,1,fp
) == 0) return -1;
2048 /* String objects in the form "2391" "-100" without any space and with a
2049 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2050 * encoded as integers to save space */
2051 static int rdbTryIntegerEncoding(sds s
, unsigned char *enc
) {
2053 char *endptr
, buf
[32];
2055 /* Check if it's possible to encode this value as a number */
2056 value
= strtoll(s
, &endptr
, 10);
2057 if (endptr
[0] != '\0') return 0;
2058 snprintf(buf
,32,"%lld",value
);
2060 /* If the number converted back into a string is not identical
2061 * then it's not possible to encode the string as integer */
2062 if (strlen(buf
) != sdslen(s
) || memcmp(buf
,s
,sdslen(s
))) return 0;
2064 /* Finally check if it fits in our ranges */
2065 if (value
>= -(1<<7) && value
<= (1<<7)-1) {
2066 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT8
;
2067 enc
[1] = value
&0xFF;
2069 } else if (value
>= -(1<<15) && value
<= (1<<15)-1) {
2070 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT16
;
2071 enc
[1] = value
&0xFF;
2072 enc
[2] = (value
>>8)&0xFF;
2074 } else if (value
>= -((long long)1<<31) && value
<= ((long long)1<<31)-1) {
2075 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT32
;
2076 enc
[1] = value
&0xFF;
2077 enc
[2] = (value
>>8)&0xFF;
2078 enc
[3] = (value
>>16)&0xFF;
2079 enc
[4] = (value
>>24)&0xFF;
2086 static int rdbSaveLzfStringObject(FILE *fp
, robj
*obj
) {
2087 unsigned int comprlen
, outlen
;
2091 /* We require at least four bytes compression for this to be worth it */
2092 outlen
= sdslen(obj
->ptr
)-4;
2093 if (outlen
<= 0) return 0;
2094 if ((out
= zmalloc(outlen
+1)) == NULL
) return 0;
2095 comprlen
= lzf_compress(obj
->ptr
, sdslen(obj
->ptr
), out
, outlen
);
2096 if (comprlen
== 0) {
2100 /* Data compressed! Let's save it on disk */
2101 byte
= (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_LZF
;
2102 if (fwrite(&byte
,1,1,fp
) == 0) goto writeerr
;
2103 if (rdbSaveLen(fp
,comprlen
) == -1) goto writeerr
;
2104 if (rdbSaveLen(fp
,sdslen(obj
->ptr
)) == -1) goto writeerr
;
2105 if (fwrite(out
,comprlen
,1,fp
) == 0) goto writeerr
;
2114 /* Save a string objet as [len][data] on disk. If the object is a string
2115 * representation of an integer value we try to safe it in a special form */
2116 static int rdbSaveStringObjectRaw(FILE *fp
, robj
*obj
) {
2120 len
= sdslen(obj
->ptr
);
2122 /* Try integer encoding */
2124 unsigned char buf
[5];
2125 if ((enclen
= rdbTryIntegerEncoding(obj
->ptr
,buf
)) > 0) {
2126 if (fwrite(buf
,enclen
,1,fp
) == 0) return -1;
2131 /* Try LZF compression - under 20 bytes it's unable to compress even
2132 * aaaaaaaaaaaaaaaaaa so skip it */
2136 retval
= rdbSaveLzfStringObject(fp
,obj
);
2137 if (retval
== -1) return -1;
2138 if (retval
> 0) return 0;
2139 /* retval == 0 means data can't be compressed, save the old way */
2142 /* Store verbatim */
2143 if (rdbSaveLen(fp
,len
) == -1) return -1;
2144 if (len
&& fwrite(obj
->ptr
,len
,1,fp
) == 0) return -1;
2148 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2149 static int rdbSaveStringObject(FILE *fp
, robj
*obj
) {
2153 if (obj
->encoding
!= REDIS_ENCODING_RAW
) {
2154 dec
= getDecodedObject(obj
);
2155 retval
= rdbSaveStringObjectRaw(fp
,dec
);
2159 return rdbSaveStringObjectRaw(fp
,obj
);
2163 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2164 static int rdbSave(char *filename
) {
2165 dictIterator
*di
= NULL
;
2170 time_t now
= time(NULL
);
2172 snprintf(tmpfile
,256,"temp-%d.rdb", (int) getpid());
2173 fp
= fopen(tmpfile
,"w");
2175 redisLog(REDIS_WARNING
, "Failed saving the DB: %s", strerror(errno
));
2178 if (fwrite("REDIS0001",9,1,fp
) == 0) goto werr
;
2179 for (j
= 0; j
< server
.dbnum
; j
++) {
2180 redisDb
*db
= server
.db
+j
;
2182 if (dictSize(d
) == 0) continue;
2183 di
= dictGetIterator(d
);
2189 /* Write the SELECT DB opcode */
2190 if (rdbSaveType(fp
,REDIS_SELECTDB
) == -1) goto werr
;
2191 if (rdbSaveLen(fp
,j
) == -1) goto werr
;
2193 /* Iterate this DB writing every entry */
2194 while((de
= dictNext(di
)) != NULL
) {
2195 robj
*key
= dictGetEntryKey(de
);
2196 robj
*o
= dictGetEntryVal(de
);
2197 time_t expiretime
= getExpire(db
,key
);
2199 /* Save the expire time */
2200 if (expiretime
!= -1) {
2201 /* If this key is already expired skip it */
2202 if (expiretime
< now
) continue;
2203 if (rdbSaveType(fp
,REDIS_EXPIRETIME
) == -1) goto werr
;
2204 if (rdbSaveTime(fp
,expiretime
) == -1) goto werr
;
2206 /* Save the key and associated value */
2207 if (rdbSaveType(fp
,o
->type
) == -1) goto werr
;
2208 if (rdbSaveStringObject(fp
,key
) == -1) goto werr
;
2209 if (o
->type
== REDIS_STRING
) {
2210 /* Save a string value */
2211 if (rdbSaveStringObject(fp
,o
) == -1) goto werr
;
2212 } else if (o
->type
== REDIS_LIST
) {
2213 /* Save a list value */
2214 list
*list
= o
->ptr
;
2218 if (rdbSaveLen(fp
,listLength(list
)) == -1) goto werr
;
2219 while((ln
= listYield(list
))) {
2220 robj
*eleobj
= listNodeValue(ln
);
2222 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
2224 } else if (o
->type
== REDIS_SET
) {
2225 /* Save a set value */
2227 dictIterator
*di
= dictGetIterator(set
);
2230 if (!set
) oom("dictGetIteraotr");
2231 if (rdbSaveLen(fp
,dictSize(set
)) == -1) goto werr
;
2232 while((de
= dictNext(di
)) != NULL
) {
2233 robj
*eleobj
= dictGetEntryKey(de
);
2235 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
2237 dictReleaseIterator(di
);
2242 dictReleaseIterator(di
);
2245 if (rdbSaveType(fp
,REDIS_EOF
) == -1) goto werr
;
2247 /* Make sure data will not remain on the OS's output buffers */
2252 /* Use RENAME to make sure the DB file is changed atomically only
2253 * if the generate DB file is ok. */
2254 if (rename(tmpfile
,filename
) == -1) {
2255 redisLog(REDIS_WARNING
,"Error moving temp DB file on the final destionation: %s", strerror(errno
));
2259 redisLog(REDIS_NOTICE
,"DB saved on disk");
2261 server
.lastsave
= time(NULL
);
2267 redisLog(REDIS_WARNING
,"Write error saving DB on disk: %s", strerror(errno
));
2268 if (di
) dictReleaseIterator(di
);
2272 static int rdbSaveBackground(char *filename
) {
2275 if (server
.bgsaveinprogress
) return REDIS_ERR
;
2276 if ((childpid
= fork()) == 0) {
2279 if (rdbSave(filename
) == REDIS_OK
) {
2286 if (childpid
== -1) {
2287 redisLog(REDIS_WARNING
,"Can't save in background: fork: %s",
2291 redisLog(REDIS_NOTICE
,"Background saving started by pid %d",childpid
);
2292 server
.bgsaveinprogress
= 1;
2293 server
.bgsavechildpid
= childpid
;
2296 return REDIS_OK
; /* unreached */
2299 static void rdbRemoveTempFile(pid_t childpid
) {
2302 snprintf(tmpfile
,256,"temp-%d.rdb", (int) childpid
);
2306 static int rdbLoadType(FILE *fp
) {
2308 if (fread(&type
,1,1,fp
) == 0) return -1;
2312 static time_t rdbLoadTime(FILE *fp
) {
2314 if (fread(&t32
,4,1,fp
) == 0) return -1;
2315 return (time_t) t32
;
2318 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2319 * of this file for a description of how this are stored on disk.
2321 * isencoded is set to 1 if the readed length is not actually a length but
2322 * an "encoding type", check the above comments for more info */
2323 static uint32_t rdbLoadLen(FILE *fp
, int rdbver
, int *isencoded
) {
2324 unsigned char buf
[2];
2327 if (isencoded
) *isencoded
= 0;
2329 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
2334 if (fread(buf
,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2335 type
= (buf
[0]&0xC0)>>6;
2336 if (type
== REDIS_RDB_6BITLEN
) {
2337 /* Read a 6 bit len */
2339 } else if (type
== REDIS_RDB_ENCVAL
) {
2340 /* Read a 6 bit len encoding type */
2341 if (isencoded
) *isencoded
= 1;
2343 } else if (type
== REDIS_RDB_14BITLEN
) {
2344 /* Read a 14 bit len */
2345 if (fread(buf
+1,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2346 return ((buf
[0]&0x3F)<<8)|buf
[1];
2348 /* Read a 32 bit len */
2349 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
2355 static robj
*rdbLoadIntegerObject(FILE *fp
, int enctype
) {
2356 unsigned char enc
[4];
2359 if (enctype
== REDIS_RDB_ENC_INT8
) {
2360 if (fread(enc
,1,1,fp
) == 0) return NULL
;
2361 val
= (signed char)enc
[0];
2362 } else if (enctype
== REDIS_RDB_ENC_INT16
) {
2364 if (fread(enc
,2,1,fp
) == 0) return NULL
;
2365 v
= enc
[0]|(enc
[1]<<8);
2367 } else if (enctype
== REDIS_RDB_ENC_INT32
) {
2369 if (fread(enc
,4,1,fp
) == 0) return NULL
;
2370 v
= enc
[0]|(enc
[1]<<8)|(enc
[2]<<16)|(enc
[3]<<24);
2373 val
= 0; /* anti-warning */
2376 return createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",val
));
2379 static robj
*rdbLoadLzfStringObject(FILE*fp
, int rdbver
) {
2380 unsigned int len
, clen
;
2381 unsigned char *c
= NULL
;
2384 if ((clen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2385 if ((len
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2386 if ((c
= zmalloc(clen
)) == NULL
) goto err
;
2387 if ((val
= sdsnewlen(NULL
,len
)) == NULL
) goto err
;
2388 if (fread(c
,clen
,1,fp
) == 0) goto err
;
2389 if (lzf_decompress(c
,clen
,val
,len
) == 0) goto err
;
2391 return createObject(REDIS_STRING
,val
);
2398 static robj
*rdbLoadStringObject(FILE*fp
, int rdbver
) {
2403 len
= rdbLoadLen(fp
,rdbver
,&isencoded
);
2406 case REDIS_RDB_ENC_INT8
:
2407 case REDIS_RDB_ENC_INT16
:
2408 case REDIS_RDB_ENC_INT32
:
2409 return tryObjectSharing(rdbLoadIntegerObject(fp
,len
));
2410 case REDIS_RDB_ENC_LZF
:
2411 return tryObjectSharing(rdbLoadLzfStringObject(fp
,rdbver
));
2417 if (len
== REDIS_RDB_LENERR
) return NULL
;
2418 val
= sdsnewlen(NULL
,len
);
2419 if (len
&& fread(val
,len
,1,fp
) == 0) {
2423 return tryObjectSharing(createObject(REDIS_STRING
,val
));
2426 static int rdbLoad(char *filename
) {
2428 robj
*keyobj
= NULL
;
2430 int type
, retval
, rdbver
;
2431 dict
*d
= server
.db
[0].dict
;
2432 redisDb
*db
= server
.db
+0;
2434 time_t expiretime
= -1, now
= time(NULL
);
2436 fp
= fopen(filename
,"r");
2437 if (!fp
) return REDIS_ERR
;
2438 if (fread(buf
,9,1,fp
) == 0) goto eoferr
;
2440 if (memcmp(buf
,"REDIS",5) != 0) {
2442 redisLog(REDIS_WARNING
,"Wrong signature trying to load DB from file");
2445 rdbver
= atoi(buf
+5);
2448 redisLog(REDIS_WARNING
,"Can't handle RDB format version %d",rdbver
);
2455 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
2456 if (type
== REDIS_EXPIRETIME
) {
2457 if ((expiretime
= rdbLoadTime(fp
)) == -1) goto eoferr
;
2458 /* We read the time so we need to read the object type again */
2459 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
2461 if (type
== REDIS_EOF
) break;
2462 /* Handle SELECT DB opcode as a special case */
2463 if (type
== REDIS_SELECTDB
) {
2464 if ((dbid
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2466 if (dbid
>= (unsigned)server
.dbnum
) {
2467 redisLog(REDIS_WARNING
,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server
.dbnum
);
2470 db
= server
.db
+dbid
;
2475 if ((keyobj
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2477 if (type
== REDIS_STRING
) {
2478 /* Read string value */
2479 if ((o
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2480 tryObjectEncoding(o
);
2481 } else if (type
== REDIS_LIST
|| type
== REDIS_SET
) {
2482 /* Read list/set value */
2485 if ((listlen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2487 o
= (type
== REDIS_LIST
) ? createListObject() : createSetObject();
2488 /* Load every single element of the list/set */
2492 if ((ele
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2493 tryObjectEncoding(ele
);
2494 if (type
== REDIS_LIST
) {
2495 if (!listAddNodeTail((list
*)o
->ptr
,ele
))
2496 oom("listAddNodeTail");
2498 if (dictAdd((dict
*)o
->ptr
,ele
,NULL
) == DICT_ERR
)
2505 /* Add the new object in the hash table */
2506 retval
= dictAdd(d
,keyobj
,o
);
2507 if (retval
== DICT_ERR
) {
2508 redisLog(REDIS_WARNING
,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj
->ptr
);
2511 /* Set the expire time if needed */
2512 if (expiretime
!= -1) {
2513 setExpire(db
,keyobj
,expiretime
);
2514 /* Delete this key if already expired */
2515 if (expiretime
< now
) deleteKey(db
,keyobj
);
2523 eoferr
: /* unexpected end of file is handled here with a fatal exit */
2524 if (keyobj
) decrRefCount(keyobj
);
2525 redisLog(REDIS_WARNING
,"Short read or OOM loading DB. Unrecoverable error, exiting now.");
2527 return REDIS_ERR
; /* Just to avoid warning */
2530 /*================================== Commands =============================== */
2532 static void authCommand(redisClient
*c
) {
2533 if (!server
.requirepass
|| !strcmp(c
->argv
[1]->ptr
, server
.requirepass
)) {
2534 c
->authenticated
= 1;
2535 addReply(c
,shared
.ok
);
2537 c
->authenticated
= 0;
2538 addReply(c
,shared
.err
);
2542 static void pingCommand(redisClient
*c
) {
2543 addReply(c
,shared
.pong
);
2546 static void echoCommand(redisClient
*c
) {
2547 addReplyBulkLen(c
,c
->argv
[1]);
2548 addReply(c
,c
->argv
[1]);
2549 addReply(c
,shared
.crlf
);
2552 /*=================================== Strings =============================== */
2554 static void setGenericCommand(redisClient
*c
, int nx
) {
2557 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2558 if (retval
== DICT_ERR
) {
2560 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2561 incrRefCount(c
->argv
[2]);
2563 addReply(c
,shared
.czero
);
2567 incrRefCount(c
->argv
[1]);
2568 incrRefCount(c
->argv
[2]);
2571 removeExpire(c
->db
,c
->argv
[1]);
2572 addReply(c
, nx
? shared
.cone
: shared
.ok
);
2575 static void setCommand(redisClient
*c
) {
2576 setGenericCommand(c
,0);
2579 static void setnxCommand(redisClient
*c
) {
2580 setGenericCommand(c
,1);
2583 static void getCommand(redisClient
*c
) {
2584 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2587 addReply(c
,shared
.nullbulk
);
2589 if (o
->type
!= REDIS_STRING
) {
2590 addReply(c
,shared
.wrongtypeerr
);
2592 addReplyBulkLen(c
,o
);
2594 addReply(c
,shared
.crlf
);
2599 static void getSetCommand(redisClient
*c
) {
2601 if (dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]) == DICT_ERR
) {
2602 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2604 incrRefCount(c
->argv
[1]);
2606 incrRefCount(c
->argv
[2]);
2608 removeExpire(c
->db
,c
->argv
[1]);
2611 static void mgetCommand(redisClient
*c
) {
2614 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",c
->argc
-1));
2615 for (j
= 1; j
< c
->argc
; j
++) {
2616 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[j
]);
2618 addReply(c
,shared
.nullbulk
);
2620 if (o
->type
!= REDIS_STRING
) {
2621 addReply(c
,shared
.nullbulk
);
2623 addReplyBulkLen(c
,o
);
2625 addReply(c
,shared
.crlf
);
2631 static void incrDecrCommand(redisClient
*c
, long long incr
) {
2636 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2640 if (o
->type
!= REDIS_STRING
) {
2645 if (o
->encoding
== REDIS_ENCODING_RAW
)
2646 value
= strtoll(o
->ptr
, &eptr
, 10);
2647 else if (o
->encoding
== REDIS_ENCODING_INT
)
2648 value
= (long)o
->ptr
;
2655 o
= createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",value
));
2656 tryObjectEncoding(o
);
2657 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],o
);
2658 if (retval
== DICT_ERR
) {
2659 dictReplace(c
->db
->dict
,c
->argv
[1],o
);
2660 removeExpire(c
->db
,c
->argv
[1]);
2662 incrRefCount(c
->argv
[1]);
2665 addReply(c
,shared
.colon
);
2667 addReply(c
,shared
.crlf
);
2670 static void incrCommand(redisClient
*c
) {
2671 incrDecrCommand(c
,1);
2674 static void decrCommand(redisClient
*c
) {
2675 incrDecrCommand(c
,-1);
2678 static void incrbyCommand(redisClient
*c
) {
2679 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
2680 incrDecrCommand(c
,incr
);
2683 static void decrbyCommand(redisClient
*c
) {
2684 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
2685 incrDecrCommand(c
,-incr
);
2688 /* ========================= Type agnostic commands ========================= */
2690 static void delCommand(redisClient
*c
) {
2693 for (j
= 1; j
< c
->argc
; j
++) {
2694 if (deleteKey(c
->db
,c
->argv
[j
])) {
2701 addReply(c
,shared
.czero
);
2704 addReply(c
,shared
.cone
);
2707 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",deleted
));
2712 static void existsCommand(redisClient
*c
) {
2713 addReply(c
,lookupKeyRead(c
->db
,c
->argv
[1]) ? shared
.cone
: shared
.czero
);
2716 static void selectCommand(redisClient
*c
) {
2717 int id
= atoi(c
->argv
[1]->ptr
);
2719 if (selectDb(c
,id
) == REDIS_ERR
) {
2720 addReplySds(c
,sdsnew("-ERR invalid DB index\r\n"));
2722 addReply(c
,shared
.ok
);
2726 static void randomkeyCommand(redisClient
*c
) {
2730 de
= dictGetRandomKey(c
->db
->dict
);
2731 if (!de
|| expireIfNeeded(c
->db
,dictGetEntryKey(de
)) == 0) break;
2734 addReply(c
,shared
.plus
);
2735 addReply(c
,shared
.crlf
);
2737 addReply(c
,shared
.plus
);
2738 addReply(c
,dictGetEntryKey(de
));
2739 addReply(c
,shared
.crlf
);
2743 static void keysCommand(redisClient
*c
) {
2746 sds pattern
= c
->argv
[1]->ptr
;
2747 int plen
= sdslen(pattern
);
2748 int numkeys
= 0, keyslen
= 0;
2749 robj
*lenobj
= createObject(REDIS_STRING
,NULL
);
2751 di
= dictGetIterator(c
->db
->dict
);
2752 if (!di
) oom("dictGetIterator");
2754 decrRefCount(lenobj
);
2755 while((de
= dictNext(di
)) != NULL
) {
2756 robj
*keyobj
= dictGetEntryKey(de
);
2758 sds key
= keyobj
->ptr
;
2759 if ((pattern
[0] == '*' && pattern
[1] == '\0') ||
2760 stringmatchlen(pattern
,plen
,key
,sdslen(key
),0)) {
2761 if (expireIfNeeded(c
->db
,keyobj
) == 0) {
2763 addReply(c
,shared
.space
);
2766 keyslen
+= sdslen(key
);
2770 dictReleaseIterator(di
);
2771 lenobj
->ptr
= sdscatprintf(sdsempty(),"$%lu\r\n",keyslen
+(numkeys
? (numkeys
-1) : 0));
2772 addReply(c
,shared
.crlf
);
2775 static void dbsizeCommand(redisClient
*c
) {
2777 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c
->db
->dict
)));
2780 static void lastsaveCommand(redisClient
*c
) {
2782 sdscatprintf(sdsempty(),":%lu\r\n",server
.lastsave
));
2785 static void typeCommand(redisClient
*c
) {
2789 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2794 case REDIS_STRING
: type
= "+string"; break;
2795 case REDIS_LIST
: type
= "+list"; break;
2796 case REDIS_SET
: type
= "+set"; break;
2797 default: type
= "unknown"; break;
2800 addReplySds(c
,sdsnew(type
));
2801 addReply(c
,shared
.crlf
);
2804 static void saveCommand(redisClient
*c
) {
2805 if (server
.bgsaveinprogress
) {
2806 addReplySds(c
,sdsnew("-ERR background save in progress\r\n"));
2809 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
2810 addReply(c
,shared
.ok
);
2812 addReply(c
,shared
.err
);
2816 static void bgsaveCommand(redisClient
*c
) {
2817 if (server
.bgsaveinprogress
) {
2818 addReplySds(c
,sdsnew("-ERR background save already in progress\r\n"));
2821 if (rdbSaveBackground(server
.dbfilename
) == REDIS_OK
) {
2822 addReply(c
,shared
.ok
);
2824 addReply(c
,shared
.err
);
2828 static void shutdownCommand(redisClient
*c
) {
2829 redisLog(REDIS_WARNING
,"User requested shutdown, saving DB...");
2830 /* Kill the saving child if there is a background saving in progress.
2831 We want to avoid race conditions, for instance our saving child may
2832 overwrite the synchronous saving did by SHUTDOWN. */
2833 if (server
.bgsaveinprogress
) {
2834 redisLog(REDIS_WARNING
,"There is a live saving child. Killing it!");
2835 kill(server
.bgsavechildpid
,SIGKILL
);
2836 rdbRemoveTempFile(server
.bgsavechildpid
);
2839 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
2840 if (server
.daemonize
)
2841 unlink(server
.pidfile
);
2842 redisLog(REDIS_WARNING
,"%zu bytes used at exit",zmalloc_used_memory());
2843 redisLog(REDIS_WARNING
,"Server exit now, bye bye...");
2846 /* Ooops.. error saving! The best we can do is to continue operating.
2847 * Note that if there was a background saving process, in the next
2848 * cron() Redis will be notified that the background saving aborted,
2849 * handling special stuff like slaves pending for synchronization... */
2850 redisLog(REDIS_WARNING
,"Error trying to save the DB, can't exit");
2851 addReplySds(c
,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
2855 static void renameGenericCommand(redisClient
*c
, int nx
) {
2858 /* To use the same key as src and dst is probably an error */
2859 if (sdscmp(c
->argv
[1]->ptr
,c
->argv
[2]->ptr
) == 0) {
2860 addReply(c
,shared
.sameobjecterr
);
2864 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2866 addReply(c
,shared
.nokeyerr
);
2870 deleteIfVolatile(c
->db
,c
->argv
[2]);
2871 if (dictAdd(c
->db
->dict
,c
->argv
[2],o
) == DICT_ERR
) {
2874 addReply(c
,shared
.czero
);
2877 dictReplace(c
->db
->dict
,c
->argv
[2],o
);
2879 incrRefCount(c
->argv
[2]);
2881 deleteKey(c
->db
,c
->argv
[1]);
2883 addReply(c
,nx
? shared
.cone
: shared
.ok
);
2886 static void renameCommand(redisClient
*c
) {
2887 renameGenericCommand(c
,0);
2890 static void renamenxCommand(redisClient
*c
) {
2891 renameGenericCommand(c
,1);
2894 static void moveCommand(redisClient
*c
) {
2899 /* Obtain source and target DB pointers */
2902 if (selectDb(c
,atoi(c
->argv
[2]->ptr
)) == REDIS_ERR
) {
2903 addReply(c
,shared
.outofrangeerr
);
2907 selectDb(c
,srcid
); /* Back to the source DB */
2909 /* If the user is moving using as target the same
2910 * DB as the source DB it is probably an error. */
2912 addReply(c
,shared
.sameobjecterr
);
2916 /* Check if the element exists and get a reference */
2917 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2919 addReply(c
,shared
.czero
);
2923 /* Try to add the element to the target DB */
2924 deleteIfVolatile(dst
,c
->argv
[1]);
2925 if (dictAdd(dst
->dict
,c
->argv
[1],o
) == DICT_ERR
) {
2926 addReply(c
,shared
.czero
);
2929 incrRefCount(c
->argv
[1]);
2932 /* OK! key moved, free the entry in the source DB */
2933 deleteKey(src
,c
->argv
[1]);
2935 addReply(c
,shared
.cone
);
2938 /* =================================== Lists ================================ */
2939 static void pushGenericCommand(redisClient
*c
, int where
) {
2943 lobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2945 lobj
= createListObject();
2947 if (where
== REDIS_HEAD
) {
2948 if (!listAddNodeHead(list
,c
->argv
[2])) oom("listAddNodeHead");
2950 if (!listAddNodeTail(list
,c
->argv
[2])) oom("listAddNodeTail");
2952 dictAdd(c
->db
->dict
,c
->argv
[1],lobj
);
2953 incrRefCount(c
->argv
[1]);
2954 incrRefCount(c
->argv
[2]);
2956 if (lobj
->type
!= REDIS_LIST
) {
2957 addReply(c
,shared
.wrongtypeerr
);
2961 if (where
== REDIS_HEAD
) {
2962 if (!listAddNodeHead(list
,c
->argv
[2])) oom("listAddNodeHead");
2964 if (!listAddNodeTail(list
,c
->argv
[2])) oom("listAddNodeTail");
2966 incrRefCount(c
->argv
[2]);
2969 addReply(c
,shared
.ok
);
2972 static void lpushCommand(redisClient
*c
) {
2973 pushGenericCommand(c
,REDIS_HEAD
);
2976 static void rpushCommand(redisClient
*c
) {
2977 pushGenericCommand(c
,REDIS_TAIL
);
2980 static void llenCommand(redisClient
*c
) {
2984 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2986 addReply(c
,shared
.czero
);
2989 if (o
->type
!= REDIS_LIST
) {
2990 addReply(c
,shared
.wrongtypeerr
);
2993 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",listLength(l
)));
2998 static void lindexCommand(redisClient
*c
) {
3000 int index
= atoi(c
->argv
[2]->ptr
);
3002 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3004 addReply(c
,shared
.nullbulk
);
3006 if (o
->type
!= REDIS_LIST
) {
3007 addReply(c
,shared
.wrongtypeerr
);
3009 list
*list
= o
->ptr
;
3012 ln
= listIndex(list
, index
);
3014 addReply(c
,shared
.nullbulk
);
3016 robj
*ele
= listNodeValue(ln
);
3017 addReplyBulkLen(c
,ele
);
3019 addReply(c
,shared
.crlf
);
3025 static void lsetCommand(redisClient
*c
) {
3027 int index
= atoi(c
->argv
[2]->ptr
);
3029 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3031 addReply(c
,shared
.nokeyerr
);
3033 if (o
->type
!= REDIS_LIST
) {
3034 addReply(c
,shared
.wrongtypeerr
);
3036 list
*list
= o
->ptr
;
3039 ln
= listIndex(list
, index
);
3041 addReply(c
,shared
.outofrangeerr
);
3043 robj
*ele
= listNodeValue(ln
);
3046 listNodeValue(ln
) = c
->argv
[3];
3047 incrRefCount(c
->argv
[3]);
3048 addReply(c
,shared
.ok
);
3055 static void popGenericCommand(redisClient
*c
, int where
) {
3058 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3060 addReply(c
,shared
.nullbulk
);
3062 if (o
->type
!= REDIS_LIST
) {
3063 addReply(c
,shared
.wrongtypeerr
);
3065 list
*list
= o
->ptr
;
3068 if (where
== REDIS_HEAD
)
3069 ln
= listFirst(list
);
3071 ln
= listLast(list
);
3074 addReply(c
,shared
.nullbulk
);
3076 robj
*ele
= listNodeValue(ln
);
3077 addReplyBulkLen(c
,ele
);
3079 addReply(c
,shared
.crlf
);
3080 listDelNode(list
,ln
);
3087 static void lpopCommand(redisClient
*c
) {
3088 popGenericCommand(c
,REDIS_HEAD
);
3091 static void rpopCommand(redisClient
*c
) {
3092 popGenericCommand(c
,REDIS_TAIL
);
3095 static void lrangeCommand(redisClient
*c
) {
3097 int start
= atoi(c
->argv
[2]->ptr
);
3098 int end
= atoi(c
->argv
[3]->ptr
);
3100 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3102 addReply(c
,shared
.nullmultibulk
);
3104 if (o
->type
!= REDIS_LIST
) {
3105 addReply(c
,shared
.wrongtypeerr
);
3107 list
*list
= o
->ptr
;
3109 int llen
= listLength(list
);
3113 /* convert negative indexes */
3114 if (start
< 0) start
= llen
+start
;
3115 if (end
< 0) end
= llen
+end
;
3116 if (start
< 0) start
= 0;
3117 if (end
< 0) end
= 0;
3119 /* indexes sanity checks */
3120 if (start
> end
|| start
>= llen
) {
3121 /* Out of range start or start > end result in empty list */
3122 addReply(c
,shared
.emptymultibulk
);
3125 if (end
>= llen
) end
= llen
-1;
3126 rangelen
= (end
-start
)+1;
3128 /* Return the result in form of a multi-bulk reply */
3129 ln
= listIndex(list
, start
);
3130 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",rangelen
));
3131 for (j
= 0; j
< rangelen
; j
++) {
3132 ele
= listNodeValue(ln
);
3133 addReplyBulkLen(c
,ele
);
3135 addReply(c
,shared
.crlf
);
3142 static void ltrimCommand(redisClient
*c
) {
3144 int start
= atoi(c
->argv
[2]->ptr
);
3145 int end
= atoi(c
->argv
[3]->ptr
);
3147 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3149 addReply(c
,shared
.nokeyerr
);
3151 if (o
->type
!= REDIS_LIST
) {
3152 addReply(c
,shared
.wrongtypeerr
);
3154 list
*list
= o
->ptr
;
3156 int llen
= listLength(list
);
3157 int j
, ltrim
, rtrim
;
3159 /* convert negative indexes */
3160 if (start
< 0) start
= llen
+start
;
3161 if (end
< 0) end
= llen
+end
;
3162 if (start
< 0) start
= 0;
3163 if (end
< 0) end
= 0;
3165 /* indexes sanity checks */
3166 if (start
> end
|| start
>= llen
) {
3167 /* Out of range start or start > end result in empty list */
3171 if (end
>= llen
) end
= llen
-1;
3176 /* Remove list elements to perform the trim */
3177 for (j
= 0; j
< ltrim
; j
++) {
3178 ln
= listFirst(list
);
3179 listDelNode(list
,ln
);
3181 for (j
= 0; j
< rtrim
; j
++) {
3182 ln
= listLast(list
);
3183 listDelNode(list
,ln
);
3186 addReply(c
,shared
.ok
);
3191 static void lremCommand(redisClient
*c
) {
3194 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3196 addReply(c
,shared
.czero
);
3198 if (o
->type
!= REDIS_LIST
) {
3199 addReply(c
,shared
.wrongtypeerr
);
3201 list
*list
= o
->ptr
;
3202 listNode
*ln
, *next
;
3203 int toremove
= atoi(c
->argv
[2]->ptr
);
3208 toremove
= -toremove
;
3211 ln
= fromtail
? list
->tail
: list
->head
;
3213 robj
*ele
= listNodeValue(ln
);
3215 next
= fromtail
? ln
->prev
: ln
->next
;
3216 if (compareStringObjects(ele
,c
->argv
[3]) == 0) {
3217 listDelNode(list
,ln
);
3220 if (toremove
&& removed
== toremove
) break;
3224 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",removed
));
3229 /* ==================================== Sets ================================ */
3231 static void saddCommand(redisClient
*c
) {
3234 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3236 set
= createSetObject();
3237 dictAdd(c
->db
->dict
,c
->argv
[1],set
);
3238 incrRefCount(c
->argv
[1]);
3240 if (set
->type
!= REDIS_SET
) {
3241 addReply(c
,shared
.wrongtypeerr
);
3245 if (dictAdd(set
->ptr
,c
->argv
[2],NULL
) == DICT_OK
) {
3246 incrRefCount(c
->argv
[2]);
3248 addReply(c
,shared
.cone
);
3250 addReply(c
,shared
.czero
);
3254 static void sremCommand(redisClient
*c
) {
3257 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3259 addReply(c
,shared
.czero
);
3261 if (set
->type
!= REDIS_SET
) {
3262 addReply(c
,shared
.wrongtypeerr
);
3265 if (dictDelete(set
->ptr
,c
->argv
[2]) == DICT_OK
) {
3267 if (htNeedsResize(set
->ptr
)) dictResize(set
->ptr
);
3268 addReply(c
,shared
.cone
);
3270 addReply(c
,shared
.czero
);
3275 static void smoveCommand(redisClient
*c
) {
3276 robj
*srcset
, *dstset
;
3278 srcset
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3279 dstset
= lookupKeyWrite(c
->db
,c
->argv
[2]);
3281 /* If the source key does not exist return 0, if it's of the wrong type
3283 if (srcset
== NULL
|| srcset
->type
!= REDIS_SET
) {
3284 addReply(c
, srcset
? shared
.wrongtypeerr
: shared
.czero
);
3287 /* Error if the destination key is not a set as well */
3288 if (dstset
&& dstset
->type
!= REDIS_SET
) {
3289 addReply(c
,shared
.wrongtypeerr
);
3292 /* Remove the element from the source set */
3293 if (dictDelete(srcset
->ptr
,c
->argv
[3]) == DICT_ERR
) {
3294 /* Key not found in the src set! return zero */
3295 addReply(c
,shared
.czero
);
3299 /* Add the element to the destination set */
3301 dstset
= createSetObject();
3302 dictAdd(c
->db
->dict
,c
->argv
[2],dstset
);
3303 incrRefCount(c
->argv
[2]);
3305 if (dictAdd(dstset
->ptr
,c
->argv
[3],NULL
) == DICT_OK
)
3306 incrRefCount(c
->argv
[3]);
3307 addReply(c
,shared
.cone
);
3310 static void sismemberCommand(redisClient
*c
) {
3313 set
= lookupKeyRead(c
->db
,c
->argv
[1]);
3315 addReply(c
,shared
.czero
);
3317 if (set
->type
!= REDIS_SET
) {
3318 addReply(c
,shared
.wrongtypeerr
);
3321 if (dictFind(set
->ptr
,c
->argv
[2]))
3322 addReply(c
,shared
.cone
);
3324 addReply(c
,shared
.czero
);
3328 static void scardCommand(redisClient
*c
) {
3332 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3334 addReply(c
,shared
.czero
);
3337 if (o
->type
!= REDIS_SET
) {
3338 addReply(c
,shared
.wrongtypeerr
);
3341 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",
3347 static void spopCommand(redisClient
*c
) {
3351 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3353 addReply(c
,shared
.nullbulk
);
3355 if (set
->type
!= REDIS_SET
) {
3356 addReply(c
,shared
.wrongtypeerr
);
3359 de
= dictGetRandomKey(set
->ptr
);
3361 addReply(c
,shared
.nullbulk
);
3363 robj
*ele
= dictGetEntryKey(de
);
3365 addReplyBulkLen(c
,ele
);
3367 addReply(c
,shared
.crlf
);
3368 dictDelete(set
->ptr
,ele
);
3369 if (htNeedsResize(set
->ptr
)) dictResize(set
->ptr
);
3375 static int qsortCompareSetsByCardinality(const void *s1
, const void *s2
) {
3376 dict
**d1
= (void*) s1
, **d2
= (void*) s2
;
3378 return dictSize(*d1
)-dictSize(*d2
);
3381 static void sinterGenericCommand(redisClient
*c
, robj
**setskeys
, int setsnum
, robj
*dstkey
) {
3382 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
3385 robj
*lenobj
= NULL
, *dstset
= NULL
;
3386 int j
, cardinality
= 0;
3388 if (!dv
) oom("sinterGenericCommand");
3389 for (j
= 0; j
< setsnum
; j
++) {
3393 lookupKeyWrite(c
->db
,setskeys
[j
]) :
3394 lookupKeyRead(c
->db
,setskeys
[j
]);
3398 deleteKey(c
->db
,dstkey
);
3399 addReply(c
,shared
.ok
);
3401 addReply(c
,shared
.nullmultibulk
);
3405 if (setobj
->type
!= REDIS_SET
) {
3407 addReply(c
,shared
.wrongtypeerr
);
3410 dv
[j
] = setobj
->ptr
;
3412 /* Sort sets from the smallest to largest, this will improve our
3413 * algorithm's performace */
3414 qsort(dv
,setsnum
,sizeof(dict
*),qsortCompareSetsByCardinality
);
3416 /* The first thing we should output is the total number of elements...
3417 * since this is a multi-bulk write, but at this stage we don't know
3418 * the intersection set size, so we use a trick, append an empty object
3419 * to the output list and save the pointer to later modify it with the
3422 lenobj
= createObject(REDIS_STRING
,NULL
);
3424 decrRefCount(lenobj
);
3426 /* If we have a target key where to store the resulting set
3427 * create this key with an empty set inside */
3428 dstset
= createSetObject();
3431 /* Iterate all the elements of the first (smallest) set, and test
3432 * the element against all the other sets, if at least one set does
3433 * not include the element it is discarded */
3434 di
= dictGetIterator(dv
[0]);
3435 if (!di
) oom("dictGetIterator");
3437 while((de
= dictNext(di
)) != NULL
) {
3440 for (j
= 1; j
< setsnum
; j
++)
3441 if (dictFind(dv
[j
],dictGetEntryKey(de
)) == NULL
) break;
3443 continue; /* at least one set does not contain the member */
3444 ele
= dictGetEntryKey(de
);
3446 addReplyBulkLen(c
,ele
);
3448 addReply(c
,shared
.crlf
);
3451 dictAdd(dstset
->ptr
,ele
,NULL
);
3455 dictReleaseIterator(di
);
3458 /* Store the resulting set into the target */
3459 deleteKey(c
->db
,dstkey
);
3460 dictAdd(c
->db
->dict
,dstkey
,dstset
);
3461 incrRefCount(dstkey
);
3465 lenobj
->ptr
= sdscatprintf(sdsempty(),"*%d\r\n",cardinality
);
3467 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",
3468 dictSize((dict
*)dstset
->ptr
)));
3474 static void sinterCommand(redisClient
*c
) {
3475 sinterGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
);
3478 static void sinterstoreCommand(redisClient
*c
) {
3479 sinterGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1]);
3482 #define REDIS_OP_UNION 0
3483 #define REDIS_OP_DIFF 1
3485 static void sunionDiffGenericCommand(redisClient
*c
, robj
**setskeys
, int setsnum
, robj
*dstkey
, int op
) {
3486 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
3489 robj
*dstset
= NULL
;
3490 int j
, cardinality
= 0;
3492 if (!dv
) oom("sunionDiffGenericCommand");
3493 for (j
= 0; j
< setsnum
; j
++) {
3497 lookupKeyWrite(c
->db
,setskeys
[j
]) :
3498 lookupKeyRead(c
->db
,setskeys
[j
]);
3503 if (setobj
->type
!= REDIS_SET
) {
3505 addReply(c
,shared
.wrongtypeerr
);
3508 dv
[j
] = setobj
->ptr
;
3511 /* We need a temp set object to store our union. If the dstkey
3512 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
3513 * this set object will be the resulting object to set into the target key*/
3514 dstset
= createSetObject();
3516 /* Iterate all the elements of all the sets, add every element a single
3517 * time to the result set */
3518 for (j
= 0; j
< setsnum
; j
++) {
3519 if (op
== REDIS_OP_DIFF
&& j
== 0 && !dv
[j
]) break; /* result set is empty */
3520 if (!dv
[j
]) continue; /* non existing keys are like empty sets */
3522 di
= dictGetIterator(dv
[j
]);
3523 if (!di
) oom("dictGetIterator");
3525 while((de
= dictNext(di
)) != NULL
) {
3528 /* dictAdd will not add the same element multiple times */
3529 ele
= dictGetEntryKey(de
);
3530 if (op
== REDIS_OP_UNION
|| j
== 0) {
3531 if (dictAdd(dstset
->ptr
,ele
,NULL
) == DICT_OK
) {
3535 } else if (op
== REDIS_OP_DIFF
) {
3536 if (dictDelete(dstset
->ptr
,ele
) == DICT_OK
) {
3541 dictReleaseIterator(di
);
3543 if (op
== REDIS_OP_DIFF
&& cardinality
== 0) break; /* result set is empty */
3546 /* Output the content of the resulting set, if not in STORE mode */
3548 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",cardinality
));
3549 di
= dictGetIterator(dstset
->ptr
);
3550 if (!di
) oom("dictGetIterator");
3551 while((de
= dictNext(di
)) != NULL
) {
3554 ele
= dictGetEntryKey(de
);
3555 addReplyBulkLen(c
,ele
);
3557 addReply(c
,shared
.crlf
);
3559 dictReleaseIterator(di
);
3561 /* If we have a target key where to store the resulting set
3562 * create this key with the result set inside */
3563 deleteKey(c
->db
,dstkey
);
3564 dictAdd(c
->db
->dict
,dstkey
,dstset
);
3565 incrRefCount(dstkey
);
3570 decrRefCount(dstset
);
3572 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",
3573 dictSize((dict
*)dstset
->ptr
)));
3579 static void sunionCommand(redisClient
*c
) {
3580 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_UNION
);
3583 static void sunionstoreCommand(redisClient
*c
) {
3584 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_UNION
);
3587 static void sdiffCommand(redisClient
*c
) {
3588 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_DIFF
);
3591 static void sdiffstoreCommand(redisClient
*c
) {
3592 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_DIFF
);
3595 static void flushdbCommand(redisClient
*c
) {
3596 server
.dirty
+= dictSize(c
->db
->dict
);
3597 dictEmpty(c
->db
->dict
);
3598 dictEmpty(c
->db
->expires
);
3599 addReply(c
,shared
.ok
);
3602 static void flushallCommand(redisClient
*c
) {
3603 server
.dirty
+= emptyDb();
3604 addReply(c
,shared
.ok
);
3605 rdbSave(server
.dbfilename
);
3609 static redisSortOperation
*createSortOperation(int type
, robj
*pattern
) {
3610 redisSortOperation
*so
= zmalloc(sizeof(*so
));
3611 if (!so
) oom("createSortOperation");
3613 so
->pattern
= pattern
;
3617 /* Return the value associated to the key with a name obtained
3618 * substituting the first occurence of '*' in 'pattern' with 'subst' */
3619 static robj
*lookupKeyByPattern(redisDb
*db
, robj
*pattern
, robj
*subst
) {
3623 int prefixlen
, sublen
, postfixlen
;
3624 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
3627 unsigned short free
;
3628 unsigned short _len
; /* not used here */
3629 char buf
[REDIS_SORTKEY_MAX
+1];
3632 if (subst
->encoding
== REDIS_ENCODING_RAW
)
3633 incrRefCount(subst
);
3635 subst
= getDecodedObject(subst
);
3638 spat
= pattern
->ptr
;
3640 if (sdslen(spat
)+sdslen(ssub
)-1 > REDIS_SORTKEY_MAX
) return NULL
;
3641 p
= strchr(spat
,'*');
3642 if (!p
) return NULL
;
3645 sublen
= sdslen(ssub
);
3646 postfixlen
= sdslen(spat
)-(prefixlen
+1);
3647 memcpy(keyname
.buf
,spat
,prefixlen
);
3648 memcpy(keyname
.buf
+prefixlen
,ssub
,sublen
);
3649 memcpy(keyname
.buf
+prefixlen
+sublen
,p
+1,postfixlen
);
3650 keyname
.buf
[prefixlen
+sublen
+postfixlen
] = '\0';
3651 keyname
.len
= prefixlen
+sublen
+postfixlen
;
3652 keyname
._len
= USHRT_MAX
;
3654 keyobj
.refcount
= 1;
3655 keyobj
.type
= REDIS_STRING
;
3656 keyobj
.ptr
= ((char*)&keyname
)+(sizeof(long)*2);
3658 decrRefCount(subst
);
3660 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
3661 return lookupKeyRead(db
,&keyobj
);
3664 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
3665 * the additional parameter is not standard but a BSD-specific we have to
3666 * pass sorting parameters via the global 'server' structure */
3667 static int sortCompare(const void *s1
, const void *s2
) {
3668 const redisSortObject
*so1
= s1
, *so2
= s2
;
3671 if (!server
.sort_alpha
) {
3672 /* Numeric sorting. Here it's trivial as we precomputed scores */
3673 if (so1
->u
.score
> so2
->u
.score
) {
3675 } else if (so1
->u
.score
< so2
->u
.score
) {
3681 /* Alphanumeric sorting */
3682 if (server
.sort_bypattern
) {
3683 if (!so1
->u
.cmpobj
|| !so2
->u
.cmpobj
) {
3684 /* At least one compare object is NULL */
3685 if (so1
->u
.cmpobj
== so2
->u
.cmpobj
)
3687 else if (so1
->u
.cmpobj
== NULL
)
3692 /* We have both the objects, use strcoll */
3693 cmp
= strcoll(so1
->u
.cmpobj
->ptr
,so2
->u
.cmpobj
->ptr
);
3696 /* Compare elements directly */
3697 if (so1
->obj
->encoding
== REDIS_ENCODING_RAW
&&
3698 so2
->obj
->encoding
== REDIS_ENCODING_RAW
) {
3699 cmp
= strcoll(so1
->obj
->ptr
,so2
->obj
->ptr
);
3703 dec1
= so1
->obj
->encoding
== REDIS_ENCODING_RAW
?
3704 so1
->obj
: getDecodedObject(so1
->obj
);
3705 dec2
= so2
->obj
->encoding
== REDIS_ENCODING_RAW
?
3706 so2
->obj
: getDecodedObject(so2
->obj
);
3707 cmp
= strcoll(dec1
->ptr
,dec2
->ptr
);
3708 if (dec1
!= so1
->obj
) decrRefCount(dec1
);
3709 if (dec2
!= so2
->obj
) decrRefCount(dec2
);
3713 return server
.sort_desc
? -cmp
: cmp
;
3716 /* The SORT command is the most complex command in Redis. Warning: this code
3717 * is optimized for speed and a bit less for readability */
3718 static void sortCommand(redisClient
*c
) {
3721 int desc
= 0, alpha
= 0;
3722 int limit_start
= 0, limit_count
= -1, start
, end
;
3723 int j
, dontsort
= 0, vectorlen
;
3724 int getop
= 0; /* GET operation counter */
3725 robj
*sortval
, *sortby
= NULL
;
3726 redisSortObject
*vector
; /* Resulting vector to sort */
3728 /* Lookup the key to sort. It must be of the right types */
3729 sortval
= lookupKeyRead(c
->db
,c
->argv
[1]);
3730 if (sortval
== NULL
) {
3731 addReply(c
,shared
.nokeyerr
);
3734 if (sortval
->type
!= REDIS_SET
&& sortval
->type
!= REDIS_LIST
) {
3735 addReply(c
,shared
.wrongtypeerr
);
3739 /* Create a list of operations to perform for every sorted element.
3740 * Operations can be GET/DEL/INCR/DECR */
3741 operations
= listCreate();
3742 listSetFreeMethod(operations
,zfree
);
3745 /* Now we need to protect sortval incrementing its count, in the future
3746 * SORT may have options able to overwrite/delete keys during the sorting
3747 * and the sorted key itself may get destroied */
3748 incrRefCount(sortval
);
3750 /* The SORT command has an SQL-alike syntax, parse it */
3751 while(j
< c
->argc
) {
3752 int leftargs
= c
->argc
-j
-1;
3753 if (!strcasecmp(c
->argv
[j
]->ptr
,"asc")) {
3755 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"desc")) {
3757 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"alpha")) {
3759 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"limit") && leftargs
>= 2) {
3760 limit_start
= atoi(c
->argv
[j
+1]->ptr
);
3761 limit_count
= atoi(c
->argv
[j
+2]->ptr
);
3763 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"by") && leftargs
>= 1) {
3764 sortby
= c
->argv
[j
+1];
3765 /* If the BY pattern does not contain '*', i.e. it is constant,
3766 * we don't need to sort nor to lookup the weight keys. */
3767 if (strchr(c
->argv
[j
+1]->ptr
,'*') == NULL
) dontsort
= 1;
3769 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"get") && leftargs
>= 1) {
3770 listAddNodeTail(operations
,createSortOperation(
3771 REDIS_SORT_GET
,c
->argv
[j
+1]));
3774 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"del") && leftargs
>= 1) {
3775 listAddNodeTail(operations
,createSortOperation(
3776 REDIS_SORT_DEL
,c
->argv
[j
+1]));
3778 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"incr") && leftargs
>= 1) {
3779 listAddNodeTail(operations
,createSortOperation(
3780 REDIS_SORT_INCR
,c
->argv
[j
+1]));
3782 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"get") && leftargs
>= 1) {
3783 listAddNodeTail(operations
,createSortOperation(
3784 REDIS_SORT_DECR
,c
->argv
[j
+1]));
3787 decrRefCount(sortval
);
3788 listRelease(operations
);
3789 addReply(c
,shared
.syntaxerr
);
3795 /* Load the sorting vector with all the objects to sort */
3796 vectorlen
= (sortval
->type
== REDIS_LIST
) ?
3797 listLength((list
*)sortval
->ptr
) :
3798 dictSize((dict
*)sortval
->ptr
);
3799 vector
= zmalloc(sizeof(redisSortObject
)*vectorlen
);
3800 if (!vector
) oom("allocating objects vector for SORT");
3802 if (sortval
->type
== REDIS_LIST
) {
3803 list
*list
= sortval
->ptr
;
3807 while((ln
= listYield(list
))) {
3808 robj
*ele
= ln
->value
;
3809 vector
[j
].obj
= ele
;
3810 vector
[j
].u
.score
= 0;
3811 vector
[j
].u
.cmpobj
= NULL
;
3815 dict
*set
= sortval
->ptr
;
3819 di
= dictGetIterator(set
);
3820 if (!di
) oom("dictGetIterator");
3821 while((setele
= dictNext(di
)) != NULL
) {
3822 vector
[j
].obj
= dictGetEntryKey(setele
);
3823 vector
[j
].u
.score
= 0;
3824 vector
[j
].u
.cmpobj
= NULL
;
3827 dictReleaseIterator(di
);
3829 assert(j
== vectorlen
);
3831 /* Now it's time to load the right scores in the sorting vector */
3832 if (dontsort
== 0) {
3833 for (j
= 0; j
< vectorlen
; j
++) {
3837 byval
= lookupKeyByPattern(c
->db
,sortby
,vector
[j
].obj
);
3838 if (!byval
|| byval
->type
!= REDIS_STRING
) continue;
3840 if (byval
->encoding
== REDIS_ENCODING_RAW
) {
3841 vector
[j
].u
.cmpobj
= byval
;
3842 incrRefCount(byval
);
3844 vector
[j
].u
.cmpobj
= getDecodedObject(byval
);
3847 if (byval
->encoding
== REDIS_ENCODING_RAW
) {
3848 vector
[j
].u
.score
= strtod(byval
->ptr
,NULL
);
3850 if (byval
->encoding
== REDIS_ENCODING_INT
)
3851 vector
[j
].u
.score
= (long)byval
->ptr
;
3858 if (vector
[j
].obj
->encoding
== REDIS_ENCODING_RAW
)
3859 vector
[j
].u
.score
= strtod(vector
[j
].obj
->ptr
,NULL
);
3861 if (vector
[j
].obj
->encoding
== REDIS_ENCODING_INT
)
3862 vector
[j
].u
.score
= (long) vector
[j
].obj
->ptr
;
3871 /* We are ready to sort the vector... perform a bit of sanity check
3872 * on the LIMIT option too. We'll use a partial version of quicksort. */
3873 start
= (limit_start
< 0) ? 0 : limit_start
;
3874 end
= (limit_count
< 0) ? vectorlen
-1 : start
+limit_count
-1;
3875 if (start
>= vectorlen
) {
3876 start
= vectorlen
-1;
3879 if (end
>= vectorlen
) end
= vectorlen
-1;
3881 if (dontsort
== 0) {
3882 server
.sort_desc
= desc
;
3883 server
.sort_alpha
= alpha
;
3884 server
.sort_bypattern
= sortby
? 1 : 0;
3885 if (sortby
&& (start
!= 0 || end
!= vectorlen
-1))
3886 pqsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
, start
,end
);
3888 qsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
);
3891 /* Send command output to the output buffer, performing the specified
3892 * GET/DEL/INCR/DECR operations if any. */
3893 outputlen
= getop
? getop
*(end
-start
+1) : end
-start
+1;
3894 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",outputlen
));
3895 for (j
= start
; j
<= end
; j
++) {
3898 addReplyBulkLen(c
,vector
[j
].obj
);
3899 addReply(c
,vector
[j
].obj
);
3900 addReply(c
,shared
.crlf
);
3902 listRewind(operations
);
3903 while((ln
= listYield(operations
))) {
3904 redisSortOperation
*sop
= ln
->value
;
3905 robj
*val
= lookupKeyByPattern(c
->db
,sop
->pattern
,
3908 if (sop
->type
== REDIS_SORT_GET
) {
3909 if (!val
|| val
->type
!= REDIS_STRING
) {
3910 addReply(c
,shared
.nullbulk
);
3912 addReplyBulkLen(c
,val
);
3914 addReply(c
,shared
.crlf
);
3916 } else if (sop
->type
== REDIS_SORT_DEL
) {
3923 decrRefCount(sortval
);
3924 listRelease(operations
);
3925 for (j
= 0; j
< vectorlen
; j
++) {
3926 if (sortby
&& alpha
&& vector
[j
].u
.cmpobj
)
3927 decrRefCount(vector
[j
].u
.cmpobj
);
3932 static void infoCommand(redisClient
*c
) {
3934 time_t uptime
= time(NULL
)-server
.stat_starttime
;
3937 info
= sdscatprintf(sdsempty(),
3938 "redis_version:%s\r\n"
3939 "uptime_in_seconds:%d\r\n"
3940 "uptime_in_days:%d\r\n"
3941 "connected_clients:%d\r\n"
3942 "connected_slaves:%d\r\n"
3943 "used_memory:%zu\r\n"
3944 "changes_since_last_save:%lld\r\n"
3945 "bgsave_in_progress:%d\r\n"
3946 "last_save_time:%d\r\n"
3947 "total_connections_received:%lld\r\n"
3948 "total_commands_processed:%lld\r\n"
3953 listLength(server
.clients
)-listLength(server
.slaves
),
3954 listLength(server
.slaves
),
3957 server
.bgsaveinprogress
,
3959 server
.stat_numconnections
,
3960 server
.stat_numcommands
,
3961 server
.masterhost
== NULL
? "master" : "slave"
3963 if (server
.masterhost
) {
3964 info
= sdscatprintf(info
,
3965 "master_host:%s\r\n"
3966 "master_port:%d\r\n"
3967 "master_link_status:%s\r\n"
3968 "master_last_io_seconds_ago:%d\r\n"
3971 (server
.replstate
== REDIS_REPL_CONNECTED
) ?
3973 (int)(time(NULL
)-server
.master
->lastinteraction
)
3976 for (j
= 0; j
< server
.dbnum
; j
++) {
3977 long long keys
, vkeys
;
3979 keys
= dictSize(server
.db
[j
].dict
);
3980 vkeys
= dictSize(server
.db
[j
].expires
);
3981 if (keys
|| vkeys
) {
3982 info
= sdscatprintf(info
, "db%d: keys=%lld,expires=%lld\r\n",
3986 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info
)));
3987 addReplySds(c
,info
);
3988 addReply(c
,shared
.crlf
);
3991 static void monitorCommand(redisClient
*c
) {
3992 /* ignore MONITOR if aleady slave or in monitor mode */
3993 if (c
->flags
& REDIS_SLAVE
) return;
3995 c
->flags
|= (REDIS_SLAVE
|REDIS_MONITOR
);
3997 if (!listAddNodeTail(server
.monitors
,c
)) oom("listAddNodeTail");
3998 addReply(c
,shared
.ok
);
4001 /* ================================= Expire ================================= */
4002 static int removeExpire(redisDb
*db
, robj
*key
) {
4003 if (dictDelete(db
->expires
,key
) == DICT_OK
) {
4010 static int setExpire(redisDb
*db
, robj
*key
, time_t when
) {
4011 if (dictAdd(db
->expires
,key
,(void*)when
) == DICT_ERR
) {
4019 /* Return the expire time of the specified key, or -1 if no expire
4020 * is associated with this key (i.e. the key is non volatile) */
4021 static time_t getExpire(redisDb
*db
, robj
*key
) {
4024 /* No expire? return ASAP */
4025 if (dictSize(db
->expires
) == 0 ||
4026 (de
= dictFind(db
->expires
,key
)) == NULL
) return -1;
4028 return (time_t) dictGetEntryVal(de
);
4031 static int expireIfNeeded(redisDb
*db
, robj
*key
) {
4035 /* No expire? return ASAP */
4036 if (dictSize(db
->expires
) == 0 ||
4037 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
4039 /* Lookup the expire */
4040 when
= (time_t) dictGetEntryVal(de
);
4041 if (time(NULL
) <= when
) return 0;
4043 /* Delete the key */
4044 dictDelete(db
->expires
,key
);
4045 return dictDelete(db
->dict
,key
) == DICT_OK
;
4048 static int deleteIfVolatile(redisDb
*db
, robj
*key
) {
4051 /* No expire? return ASAP */
4052 if (dictSize(db
->expires
) == 0 ||
4053 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
4055 /* Delete the key */
4057 dictDelete(db
->expires
,key
);
4058 return dictDelete(db
->dict
,key
) == DICT_OK
;
4061 static void expireCommand(redisClient
*c
) {
4063 int seconds
= atoi(c
->argv
[2]->ptr
);
4065 de
= dictFind(c
->db
->dict
,c
->argv
[1]);
4067 addReply(c
,shared
.czero
);
4071 addReply(c
, shared
.czero
);
4074 time_t when
= time(NULL
)+seconds
;
4075 if (setExpire(c
->db
,c
->argv
[1],when
)) {
4076 addReply(c
,shared
.cone
);
4079 addReply(c
,shared
.czero
);
4085 static void ttlCommand(redisClient
*c
) {
4089 expire
= getExpire(c
->db
,c
->argv
[1]);
4091 ttl
= (int) (expire
-time(NULL
));
4092 if (ttl
< 0) ttl
= -1;
4094 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",ttl
));
4097 /* =============================== Replication ============================= */
4099 static int syncWrite(int fd
, char *ptr
, ssize_t size
, int timeout
) {
4100 ssize_t nwritten
, ret
= size
;
4101 time_t start
= time(NULL
);
4105 if (aeWait(fd
,AE_WRITABLE
,1000) & AE_WRITABLE
) {
4106 nwritten
= write(fd
,ptr
,size
);
4107 if (nwritten
== -1) return -1;
4111 if ((time(NULL
)-start
) > timeout
) {
4119 static int syncRead(int fd
, char *ptr
, ssize_t size
, int timeout
) {
4120 ssize_t nread
, totread
= 0;
4121 time_t start
= time(NULL
);
4125 if (aeWait(fd
,AE_READABLE
,1000) & AE_READABLE
) {
4126 nread
= read(fd
,ptr
,size
);
4127 if (nread
== -1) return -1;
4132 if ((time(NULL
)-start
) > timeout
) {
4140 static int syncReadLine(int fd
, char *ptr
, ssize_t size
, int timeout
) {
4147 if (syncRead(fd
,&c
,1,timeout
) == -1) return -1;
4150 if (nread
&& *(ptr
-1) == '\r') *(ptr
-1) = '\0';
4161 static void syncCommand(redisClient
*c
) {
4162 /* ignore SYNC if aleady slave or in monitor mode */
4163 if (c
->flags
& REDIS_SLAVE
) return;
4165 /* SYNC can't be issued when the server has pending data to send to
4166 * the client about already issued commands. We need a fresh reply
4167 * buffer registering the differences between the BGSAVE and the current
4168 * dataset, so that we can copy to other slaves if needed. */
4169 if (listLength(c
->reply
) != 0) {
4170 addReplySds(c
,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
4174 redisLog(REDIS_NOTICE
,"Slave ask for synchronization");
4175 /* Here we need to check if there is a background saving operation
4176 * in progress, or if it is required to start one */
4177 if (server
.bgsaveinprogress
) {
4178 /* Ok a background save is in progress. Let's check if it is a good
4179 * one for replication, i.e. if there is another slave that is
4180 * registering differences since the server forked to save */
4184 listRewind(server
.slaves
);
4185 while((ln
= listYield(server
.slaves
))) {
4187 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) break;
4190 /* Perfect, the server is already registering differences for
4191 * another slave. Set the right state, and copy the buffer. */
4192 listRelease(c
->reply
);
4193 c
->reply
= listDup(slave
->reply
);
4194 if (!c
->reply
) oom("listDup copying slave reply list");
4195 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
4196 redisLog(REDIS_NOTICE
,"Waiting for end of BGSAVE for SYNC");
4198 /* No way, we need to wait for the next BGSAVE in order to
4199 * register differences */
4200 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_START
;
4201 redisLog(REDIS_NOTICE
,"Waiting for next BGSAVE for SYNC");
4204 /* Ok we don't have a BGSAVE in progress, let's start one */
4205 redisLog(REDIS_NOTICE
,"Starting BGSAVE for SYNC");
4206 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
4207 redisLog(REDIS_NOTICE
,"Replication failed, can't BGSAVE");
4208 addReplySds(c
,sdsnew("-ERR Unalbe to perform background save\r\n"));
4211 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
4214 c
->flags
|= REDIS_SLAVE
;
4216 if (!listAddNodeTail(server
.slaves
,c
)) oom("listAddNodeTail");
4220 static void sendBulkToSlave(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
4221 redisClient
*slave
= privdata
;
4223 REDIS_NOTUSED(mask
);
4224 char buf
[REDIS_IOBUF_LEN
];
4225 ssize_t nwritten
, buflen
;
4227 if (slave
->repldboff
== 0) {
4228 /* Write the bulk write count before to transfer the DB. In theory here
4229 * we don't know how much room there is in the output buffer of the
4230 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
4231 * operations) will never be smaller than the few bytes we need. */
4234 bulkcount
= sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
4236 if (write(fd
,bulkcount
,sdslen(bulkcount
)) != (signed)sdslen(bulkcount
))
4244 lseek(slave
->repldbfd
,slave
->repldboff
,SEEK_SET
);
4245 buflen
= read(slave
->repldbfd
,buf
,REDIS_IOBUF_LEN
);
4247 redisLog(REDIS_WARNING
,"Read error sending DB to slave: %s",
4248 (buflen
== 0) ? "premature EOF" : strerror(errno
));
4252 if ((nwritten
= write(fd
,buf
,buflen
)) == -1) {
4253 redisLog(REDIS_DEBUG
,"Write error sending DB to slave: %s",
4258 slave
->repldboff
+= nwritten
;
4259 if (slave
->repldboff
== slave
->repldbsize
) {
4260 close(slave
->repldbfd
);
4261 slave
->repldbfd
= -1;
4262 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
4263 slave
->replstate
= REDIS_REPL_ONLINE
;
4264 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
,
4265 sendReplyToClient
, slave
, NULL
) == AE_ERR
) {
4269 addReplySds(slave
,sdsempty());
4270 redisLog(REDIS_NOTICE
,"Synchronization with slave succeeded");
4274 /* This function is called at the end of every backgrond saving.
4275 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
4276 * otherwise REDIS_ERR is passed to the function.
4278 * The goal of this function is to handle slaves waiting for a successful
4279 * background saving in order to perform non-blocking synchronization. */
4280 static void updateSlavesWaitingBgsave(int bgsaveerr
) {
4282 int startbgsave
= 0;
4284 listRewind(server
.slaves
);
4285 while((ln
= listYield(server
.slaves
))) {
4286 redisClient
*slave
= ln
->value
;
4288 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) {
4290 slave
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
4291 } else if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) {
4292 struct redis_stat buf
;
4294 if (bgsaveerr
!= REDIS_OK
) {
4296 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE child returned an error");
4299 if ((slave
->repldbfd
= open(server
.dbfilename
,O_RDONLY
)) == -1 ||
4300 redis_fstat(slave
->repldbfd
,&buf
) == -1) {
4302 redisLog(REDIS_WARNING
,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno
));
4305 slave
->repldboff
= 0;
4306 slave
->repldbsize
= buf
.st_size
;
4307 slave
->replstate
= REDIS_REPL_SEND_BULK
;
4308 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
4309 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
, sendBulkToSlave
, slave
, NULL
) == AE_ERR
) {
4316 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
4317 listRewind(server
.slaves
);
4318 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE failed");
4319 while((ln
= listYield(server
.slaves
))) {
4320 redisClient
*slave
= ln
->value
;
4322 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
)
4329 static int syncWithMaster(void) {
4330 char buf
[1024], tmpfile
[256];
4332 int fd
= anetTcpConnect(NULL
,server
.masterhost
,server
.masterport
);
4336 redisLog(REDIS_WARNING
,"Unable to connect to MASTER: %s",
4340 /* Issue the SYNC command */
4341 if (syncWrite(fd
,"SYNC \r\n",7,5) == -1) {
4343 redisLog(REDIS_WARNING
,"I/O error writing to MASTER: %s",
4347 /* Read the bulk write count */
4348 if (syncReadLine(fd
,buf
,1024,3600) == -1) {
4350 redisLog(REDIS_WARNING
,"I/O error reading bulk count from MASTER: %s",
4354 dumpsize
= atoi(buf
+1);
4355 redisLog(REDIS_NOTICE
,"Receiving %d bytes data dump from MASTER",dumpsize
);
4356 /* Read the bulk write data on a temp file */
4357 snprintf(tmpfile
,256,"temp-%d.%ld.rdb",(int)time(NULL
),(long int)random());
4358 dfd
= open(tmpfile
,O_CREAT
|O_WRONLY
,0644);
4361 redisLog(REDIS_WARNING
,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno
));
4365 int nread
, nwritten
;
4367 nread
= read(fd
,buf
,(dumpsize
< 1024)?dumpsize
:1024);
4369 redisLog(REDIS_WARNING
,"I/O error trying to sync with MASTER: %s",
4375 nwritten
= write(dfd
,buf
,nread
);
4376 if (nwritten
== -1) {
4377 redisLog(REDIS_WARNING
,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno
));
4385 if (rename(tmpfile
,server
.dbfilename
) == -1) {
4386 redisLog(REDIS_WARNING
,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno
));
4392 if (rdbLoad(server
.dbfilename
) != REDIS_OK
) {
4393 redisLog(REDIS_WARNING
,"Failed trying to load the MASTER synchronization DB from disk");
4397 server
.master
= createClient(fd
);
4398 server
.master
->flags
|= REDIS_MASTER
;
4399 server
.replstate
= REDIS_REPL_CONNECTED
;
4403 static void slaveofCommand(redisClient
*c
) {
4404 if (!strcasecmp(c
->argv
[1]->ptr
,"no") &&
4405 !strcasecmp(c
->argv
[2]->ptr
,"one")) {
4406 if (server
.masterhost
) {
4407 sdsfree(server
.masterhost
);
4408 server
.masterhost
= NULL
;
4409 if (server
.master
) freeClient(server
.master
);
4410 server
.replstate
= REDIS_REPL_NONE
;
4411 redisLog(REDIS_NOTICE
,"MASTER MODE enabled (user request)");
4414 sdsfree(server
.masterhost
);
4415 server
.masterhost
= sdsdup(c
->argv
[1]->ptr
);
4416 server
.masterport
= atoi(c
->argv
[2]->ptr
);
4417 if (server
.master
) freeClient(server
.master
);
4418 server
.replstate
= REDIS_REPL_CONNECT
;
4419 redisLog(REDIS_NOTICE
,"SLAVE OF %s:%d enabled (user request)",
4420 server
.masterhost
, server
.masterport
);
4422 addReply(c
,shared
.ok
);
4425 /* ============================ Maxmemory directive ======================== */
4427 /* This function gets called when 'maxmemory' is set on the config file to limit
4428 * the max memory used by the server, and we are out of memory.
4429 * This function will try to, in order:
4431 * - Free objects from the free list
4432 * - Try to remove keys with an EXPIRE set
4434 * It is not possible to free enough memory to reach used-memory < maxmemory
4435 * the server will start refusing commands that will enlarge even more the
4438 static void freeMemoryIfNeeded(void) {
4439 while (server
.maxmemory
&& zmalloc_used_memory() > server
.maxmemory
) {
4440 if (listLength(server
.objfreelist
)) {
4443 listNode
*head
= listFirst(server
.objfreelist
);
4444 o
= listNodeValue(head
);
4445 listDelNode(server
.objfreelist
,head
);
4448 int j
, k
, freed
= 0;
4450 for (j
= 0; j
< server
.dbnum
; j
++) {
4452 robj
*minkey
= NULL
;
4453 struct dictEntry
*de
;
4455 if (dictSize(server
.db
[j
].expires
)) {
4457 /* From a sample of three keys drop the one nearest to
4458 * the natural expire */
4459 for (k
= 0; k
< 3; k
++) {
4462 de
= dictGetRandomKey(server
.db
[j
].expires
);
4463 t
= (time_t) dictGetEntryVal(de
);
4464 if (minttl
== -1 || t
< minttl
) {
4465 minkey
= dictGetEntryKey(de
);
4469 deleteKey(server
.db
+j
,minkey
);
4472 if (!freed
) return; /* nothing to free... */
4477 /* ================================= Debugging ============================== */
4479 static void debugCommand(redisClient
*c
) {
4480 if (!strcasecmp(c
->argv
[1]->ptr
,"segfault")) {
4482 } else if (!strcasecmp(c
->argv
[1]->ptr
,"object") && c
->argc
== 3) {
4483 dictEntry
*de
= dictFind(c
->db
->dict
,c
->argv
[2]);
4487 addReply(c
,shared
.nokeyerr
);
4490 key
= dictGetEntryKey(de
);
4491 val
= dictGetEntryVal(de
);
4492 addReplySds(c
,sdscatprintf(sdsempty(),
4493 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
4494 key
, key
->refcount
, val
, val
->refcount
, val
->encoding
));
4496 addReplySds(c
,sdsnew(
4497 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>]\r\n"));
4501 #ifdef HAVE_BACKTRACE
4502 static struct redisFunctionSym symsTable
[] = {
4503 {"compareStringObjects", (unsigned long)compareStringObjects
},
4504 {"isStringRepresentableAsLong", (unsigned long)isStringRepresentableAsLong
},
4505 {"dictEncObjKeyCompare", (unsigned long)dictEncObjKeyCompare
},
4506 {"dictEncObjHash", (unsigned long)dictEncObjHash
},
4507 {"incrDecrCommand", (unsigned long)incrDecrCommand
},
4508 {"freeStringObject", (unsigned long)freeStringObject
},
4509 {"freeListObject", (unsigned long)freeListObject
},
4510 {"freeSetObject", (unsigned long)freeSetObject
},
4511 {"decrRefCount", (unsigned long)decrRefCount
},
4512 {"createObject", (unsigned long)createObject
},
4513 {"freeClient", (unsigned long)freeClient
},
4514 {"rdbLoad", (unsigned long)rdbLoad
},
4515 {"rdbSaveStringObject", (unsigned long)rdbSaveStringObject
},
4516 {"rdbSaveStringObjectRaw", (unsigned long)rdbSaveStringObjectRaw
},
4517 {"addReply", (unsigned long)addReply
},
4518 {"addReplySds", (unsigned long)addReplySds
},
4519 {"incrRefCount", (unsigned long)incrRefCount
},
4520 {"rdbSaveBackground", (unsigned long)rdbSaveBackground
},
4521 {"createStringObject", (unsigned long)createStringObject
},
4522 {"replicationFeedSlaves", (unsigned long)replicationFeedSlaves
},
4523 {"syncWithMaster", (unsigned long)syncWithMaster
},
4524 {"tryObjectSharing", (unsigned long)tryObjectSharing
},
4525 {"tryObjectEncoding", (unsigned long)tryObjectEncoding
},
4526 {"getDecodedObject", (unsigned long)getDecodedObject
},
4527 {"removeExpire", (unsigned long)removeExpire
},
4528 {"expireIfNeeded", (unsigned long)expireIfNeeded
},
4529 {"deleteIfVolatile", (unsigned long)deleteIfVolatile
},
4530 {"deleteKey", (unsigned long)deleteKey
},
4531 {"getExpire", (unsigned long)getExpire
},
4532 {"setExpire", (unsigned long)setExpire
},
4533 {"updateSlavesWaitingBgsave", (unsigned long)updateSlavesWaitingBgsave
},
4534 {"freeMemoryIfNeeded", (unsigned long)freeMemoryIfNeeded
},
4535 {"authCommand", (unsigned long)authCommand
},
4536 {"pingCommand", (unsigned long)pingCommand
},
4537 {"echoCommand", (unsigned long)echoCommand
},
4538 {"setCommand", (unsigned long)setCommand
},
4539 {"setnxCommand", (unsigned long)setnxCommand
},
4540 {"getCommand", (unsigned long)getCommand
},
4541 {"delCommand", (unsigned long)delCommand
},
4542 {"existsCommand", (unsigned long)existsCommand
},
4543 {"incrCommand", (unsigned long)incrCommand
},
4544 {"decrCommand", (unsigned long)decrCommand
},
4545 {"incrbyCommand", (unsigned long)incrbyCommand
},
4546 {"decrbyCommand", (unsigned long)decrbyCommand
},
4547 {"selectCommand", (unsigned long)selectCommand
},
4548 {"randomkeyCommand", (unsigned long)randomkeyCommand
},
4549 {"keysCommand", (unsigned long)keysCommand
},
4550 {"dbsizeCommand", (unsigned long)dbsizeCommand
},
4551 {"lastsaveCommand", (unsigned long)lastsaveCommand
},
4552 {"saveCommand", (unsigned long)saveCommand
},
4553 {"bgsaveCommand", (unsigned long)bgsaveCommand
},
4554 {"shutdownCommand", (unsigned long)shutdownCommand
},
4555 {"moveCommand", (unsigned long)moveCommand
},
4556 {"renameCommand", (unsigned long)renameCommand
},
4557 {"renamenxCommand", (unsigned long)renamenxCommand
},
4558 {"lpushCommand", (unsigned long)lpushCommand
},
4559 {"rpushCommand", (unsigned long)rpushCommand
},
4560 {"lpopCommand", (unsigned long)lpopCommand
},
4561 {"rpopCommand", (unsigned long)rpopCommand
},
4562 {"llenCommand", (unsigned long)llenCommand
},
4563 {"lindexCommand", (unsigned long)lindexCommand
},
4564 {"lrangeCommand", (unsigned long)lrangeCommand
},
4565 {"ltrimCommand", (unsigned long)ltrimCommand
},
4566 {"typeCommand", (unsigned long)typeCommand
},
4567 {"lsetCommand", (unsigned long)lsetCommand
},
4568 {"saddCommand", (unsigned long)saddCommand
},
4569 {"sremCommand", (unsigned long)sremCommand
},
4570 {"smoveCommand", (unsigned long)smoveCommand
},
4571 {"sismemberCommand", (unsigned long)sismemberCommand
},
4572 {"scardCommand", (unsigned long)scardCommand
},
4573 {"spopCommand", (unsigned long)spopCommand
},
4574 {"sinterCommand", (unsigned long)sinterCommand
},
4575 {"sinterstoreCommand", (unsigned long)sinterstoreCommand
},
4576 {"sunionCommand", (unsigned long)sunionCommand
},
4577 {"sunionstoreCommand", (unsigned long)sunionstoreCommand
},
4578 {"sdiffCommand", (unsigned long)sdiffCommand
},
4579 {"sdiffstoreCommand", (unsigned long)sdiffstoreCommand
},
4580 {"syncCommand", (unsigned long)syncCommand
},
4581 {"flushdbCommand", (unsigned long)flushdbCommand
},
4582 {"flushallCommand", (unsigned long)flushallCommand
},
4583 {"sortCommand", (unsigned long)sortCommand
},
4584 {"lremCommand", (unsigned long)lremCommand
},
4585 {"infoCommand", (unsigned long)infoCommand
},
4586 {"mgetCommand", (unsigned long)mgetCommand
},
4587 {"monitorCommand", (unsigned long)monitorCommand
},
4588 {"expireCommand", (unsigned long)expireCommand
},
4589 {"getSetCommand", (unsigned long)getSetCommand
},
4590 {"ttlCommand", (unsigned long)ttlCommand
},
4591 {"slaveofCommand", (unsigned long)slaveofCommand
},
4592 {"debugCommand", (unsigned long)debugCommand
},
4593 {"processCommand", (unsigned long)processCommand
},
4594 {"setupSigSegvAction", (unsigned long)setupSigSegvAction
},
4595 {"readQueryFromClient", (unsigned long)readQueryFromClient
},
4596 {"rdbRemoveTempFile", (unsigned long)rdbRemoveTempFile
},
4600 /* This function try to convert a pointer into a function name. It's used in
4601 * oreder to provide a backtrace under segmentation fault that's able to
4602 * display functions declared as static (otherwise the backtrace is useless). */
4603 static char *findFuncName(void *pointer
, unsigned long *offset
){
4605 unsigned long off
, minoff
= 0;
4607 /* Try to match against the Symbol with the smallest offset */
4608 for (i
=0; symsTable
[i
].pointer
; i
++) {
4609 unsigned long lp
= (unsigned long) pointer
;
4611 if (lp
!= (unsigned long)-1 && lp
>= symsTable
[i
].pointer
) {
4612 off
=lp
-symsTable
[i
].pointer
;
4613 if (ret
< 0 || off
< minoff
) {
4619 if (ret
== -1) return NULL
;
4621 return symsTable
[ret
].name
;
4624 static void *getMcontextEip(ucontext_t
*uc
) {
4625 #if defined(__FreeBSD__)
4626 return (void*) uc
->uc_mcontext
.mc_eip
;
4627 #elif defined(__dietlibc__)
4628 return (void*) uc
->uc_mcontext
.eip
;
4629 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
4630 return (void*) uc
->uc_mcontext
->__ss
.__eip
;
4631 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
4632 #ifdef _STRUCT_X86_THREAD_STATE64
4633 return (void*) uc
->uc_mcontext
->__ss
.__rip
;
4635 return (void*) uc
->uc_mcontext
->__ss
.__eip
;
4637 #elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
4638 return (void*) uc
->uc_mcontext
.gregs
[REG_EIP
];
4639 #elif defined(__ia64__) /* Linux IA64 */
4640 return (void*) uc
->uc_mcontext
.sc_ip
;
4646 static void segvHandler(int sig
, siginfo_t
*info
, void *secret
) {
4648 char **messages
= NULL
;
4649 int i
, trace_size
= 0;
4650 unsigned long offset
=0;
4651 time_t uptime
= time(NULL
)-server
.stat_starttime
;
4652 ucontext_t
*uc
= (ucontext_t
*) secret
;
4653 REDIS_NOTUSED(info
);
4655 redisLog(REDIS_WARNING
,
4656 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION
, sig
);
4657 redisLog(REDIS_WARNING
, "%s", sdscatprintf(sdsempty(),
4658 "redis_version:%s; "
4659 "uptime_in_seconds:%d; "
4660 "connected_clients:%d; "
4661 "connected_slaves:%d; "
4663 "changes_since_last_save:%lld; "
4664 "bgsave_in_progress:%d; "
4665 "last_save_time:%d; "
4666 "total_connections_received:%lld; "
4667 "total_commands_processed:%lld; "
4671 listLength(server
.clients
)-listLength(server
.slaves
),
4672 listLength(server
.slaves
),
4675 server
.bgsaveinprogress
,
4677 server
.stat_numconnections
,
4678 server
.stat_numcommands
,
4679 server
.masterhost
== NULL
? "master" : "slave"
4682 trace_size
= backtrace(trace
, 100);
4683 /* overwrite sigaction with caller's address */
4684 if (getMcontextEip(uc
) != NULL
) {
4685 trace
[1] = getMcontextEip(uc
);
4687 messages
= backtrace_symbols(trace
, trace_size
);
4689 for (i
=1; i
<trace_size
; ++i
) {
4690 char *fn
= findFuncName(trace
[i
], &offset
), *p
;
4692 p
= strchr(messages
[i
],'+');
4693 if (!fn
|| (p
&& ((unsigned long)strtol(p
+1,NULL
,10)) < offset
)) {
4694 redisLog(REDIS_WARNING
,"%s", messages
[i
]);
4696 redisLog(REDIS_WARNING
,"%d redis-server %p %s + %d", i
, trace
[i
], fn
, (unsigned int)offset
);
4703 static void setupSigSegvAction(void) {
4704 struct sigaction act
;
4706 sigemptyset (&act
.sa_mask
);
4707 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
4708 * is used. Otherwise, sa_handler is used */
4709 act
.sa_flags
= SA_NODEFER
| SA_ONSTACK
| SA_RESETHAND
| SA_SIGINFO
;
4710 act
.sa_sigaction
= segvHandler
;
4711 sigaction (SIGSEGV
, &act
, NULL
);
4712 sigaction (SIGBUS
, &act
, NULL
);
4713 sigaction (SIGFPE
, &act
, NULL
);
4714 sigaction (SIGILL
, &act
, NULL
);
4715 sigaction (SIGBUS
, &act
, NULL
);
4718 #else /* HAVE_BACKTRACE */
4719 static void setupSigSegvAction(void) {
4721 #endif /* HAVE_BACKTRACE */
4723 /* =================================== Main! ================================ */
4726 int linuxOvercommitMemoryValue(void) {
4727 FILE *fp
= fopen("/proc/sys/vm/overcommit_memory","r");
4731 if (fgets(buf
,64,fp
) == NULL
) {
4740 void linuxOvercommitMemoryWarning(void) {
4741 if (linuxOvercommitMemoryValue() == 0) {
4742 redisLog(REDIS_WARNING
,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
4745 #endif /* __linux__ */
4747 static void daemonize(void) {
4751 if (fork() != 0) exit(0); /* parent exits */
4752 setsid(); /* create a new session */
4754 /* Every output goes to /dev/null. If Redis is daemonized but
4755 * the 'logfile' is set to 'stdout' in the configuration file
4756 * it will not log at all. */
4757 if ((fd
= open("/dev/null", O_RDWR
, 0)) != -1) {
4758 dup2(fd
, STDIN_FILENO
);
4759 dup2(fd
, STDOUT_FILENO
);
4760 dup2(fd
, STDERR_FILENO
);
4761 if (fd
> STDERR_FILENO
) close(fd
);
4763 /* Try to write the pid file */
4764 fp
= fopen(server
.pidfile
,"w");
4766 fprintf(fp
,"%d\n",getpid());
4771 int main(int argc
, char **argv
) {
4774 ResetServerSaveParams();
4775 loadServerConfig(argv
[1]);
4776 } else if (argc
> 2) {
4777 fprintf(stderr
,"Usage: ./redis-server [/path/to/redis.conf]\n");
4780 redisLog(REDIS_WARNING
,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
4783 if (server
.daemonize
) daemonize();
4784 redisLog(REDIS_NOTICE
,"Server started, Redis version " REDIS_VERSION
);
4786 linuxOvercommitMemoryWarning();
4788 if (rdbLoad(server
.dbfilename
) == REDIS_OK
)
4789 redisLog(REDIS_NOTICE
,"DB loaded from disk");
4790 if (aeCreateFileEvent(server
.el
, server
.fd
, AE_READABLE
,
4791 acceptHandler
, NULL
, NULL
) == AE_ERR
) oom("creating file event");
4792 redisLog(REDIS_NOTICE
,"The server is now ready to accept connections on port %d", server
.port
);
4794 aeDeleteEventLoop(server
.el
);