2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
30 #define REDIS_VERSION "1.001"
40 #define __USE_POSIX199309
46 #endif /* HAVE_BACKTRACE */
54 #include <arpa/inet.h>
58 #include <sys/resource.h>
62 #include "ae.h" /* Event driven programming library */
63 #include "sds.h" /* Dynamic safe strings */
64 #include "anet.h" /* Networking the easy way */
65 #include "dict.h" /* Hash tables */
66 #include "adlist.h" /* Linked lists */
67 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
68 #include "lzf.h" /* LZF compression library */
69 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
75 /* Static server configuration */
76 #define REDIS_SERVERPORT 6379 /* TCP port */
77 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
78 #define REDIS_IOBUF_LEN 1024
79 #define REDIS_LOADBUF_LEN 1024
80 #define REDIS_STATIC_ARGS 4
81 #define REDIS_DEFAULT_DBNUM 16
82 #define REDIS_CONFIGLINE_MAX 1024
83 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
84 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
85 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
86 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
87 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
89 /* Hash table parameters */
90 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
93 #define REDIS_CMD_BULK 1 /* Bulk write command */
94 #define REDIS_CMD_INLINE 2 /* Inline command */
95 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
96 this flags will return an error when the 'maxmemory' option is set in the
97 config file and the server is using more than maxmemory bytes of memory.
98 In short this commands are denied on low memory conditions. */
99 #define REDIS_CMD_DENYOOM 4
102 #define REDIS_STRING 0
107 /* Objects encoding */
108 #define REDIS_ENCODING_RAW 0 /* Raw representation */
109 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
111 /* Object types only used for dumping to disk */
112 #define REDIS_EXPIRETIME 253
113 #define REDIS_SELECTDB 254
114 #define REDIS_EOF 255
116 /* Defines related to the dump file format. To store 32 bits lengths for short
117 * keys requires a lot of space, so we check the most significant 2 bits of
118 * the first byte to interpreter the length:
120 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
121 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
122 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
123 * 11|000000 this means: specially encoded object will follow. The six bits
124 * number specify the kind of object that follows.
125 * See the REDIS_RDB_ENC_* defines.
127 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
128 * values, will fit inside. */
129 #define REDIS_RDB_6BITLEN 0
130 #define REDIS_RDB_14BITLEN 1
131 #define REDIS_RDB_32BITLEN 2
132 #define REDIS_RDB_ENCVAL 3
133 #define REDIS_RDB_LENERR UINT_MAX
135 /* When a length of a string object stored on disk has the first two bits
136 * set, the remaining two bits specify a special encoding for the object
137 * accordingly to the following defines: */
138 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
139 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
140 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
141 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
144 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
145 #define REDIS_SLAVE 2 /* This client is a slave server */
146 #define REDIS_MASTER 4 /* This client is a master server */
147 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
149 /* Slave replication state - slave side */
150 #define REDIS_REPL_NONE 0 /* No active replication */
151 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
152 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
154 /* Slave replication state - from the point of view of master
155 * Note that in SEND_BULK and ONLINE state the slave receives new updates
156 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
157 * to start the next background saving in order to send updates to it. */
158 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
159 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
160 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
161 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
163 /* List related stuff */
167 /* Sort operations */
168 #define REDIS_SORT_GET 0
169 #define REDIS_SORT_DEL 1
170 #define REDIS_SORT_INCR 2
171 #define REDIS_SORT_DECR 3
172 #define REDIS_SORT_ASC 4
173 #define REDIS_SORT_DESC 5
174 #define REDIS_SORTKEY_MAX 1024
177 #define REDIS_DEBUG 0
178 #define REDIS_NOTICE 1
179 #define REDIS_WARNING 2
181 /* Anti-warning macro... */
182 #define REDIS_NOTUSED(V) ((void) V)
185 /*================================= Data types ============================== */
187 /* A redis object, that is a type able to hold a string / list / set */
188 typedef struct redisObject
{
191 unsigned char encoding
;
192 unsigned char notused
[2];
196 typedef struct redisDb
{
202 /* With multiplexing we need to take per-clinet state.
203 * Clients are taken in a liked list. */
204 typedef struct redisClient
{
209 robj
**argv
, **mbargv
;
211 int bulklen
; /* bulk read len. -1 if not in bulk read mode */
212 int multibulk
; /* multi bulk command format active */
215 time_t lastinteraction
; /* time of the last interaction, used for timeout */
216 int flags
; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
217 int slaveseldb
; /* slave selected db, if this client is a slave */
218 int authenticated
; /* when requirepass is non-NULL */
219 int replstate
; /* replication state if this is a slave */
220 int repldbfd
; /* replication DB file descriptor */
221 long repldboff
; /* replication DB file offset */
222 off_t repldbsize
; /* replication DB file size */
230 /* Global server state structure */
236 unsigned int sharingpoolsize
;
237 long long dirty
; /* changes to DB from the last save */
239 list
*slaves
, *monitors
;
240 char neterr
[ANET_ERR_LEN
];
242 int cronloops
; /* number of times the cron function run */
243 list
*objfreelist
; /* A list of freed objects to avoid malloc() */
244 time_t lastsave
; /* Unix time of last save succeeede */
245 size_t usedmemory
; /* Used memory in megabytes */
246 /* Fields used only for stats */
247 time_t stat_starttime
; /* server start time */
248 long long stat_numcommands
; /* number of processed commands */
249 long long stat_numconnections
; /* number of connections received */
257 int bgsaveinprogress
;
258 pid_t bgsavechildpid
;
259 struct saveparam
*saveparams
;
266 /* Replication related */
270 redisClient
*master
; /* client that is master for this slave */
272 unsigned int maxclients
;
273 unsigned long maxmemory
;
274 /* Sort parameters - qsort_r() is only available under BSD so we
275 * have to take this state global, in order to pass it to sortCompare() */
281 typedef void redisCommandProc(redisClient
*c
);
282 struct redisCommand
{
284 redisCommandProc
*proc
;
289 struct redisFunctionSym
{
291 unsigned long pointer
;
294 typedef struct _redisSortObject
{
302 typedef struct _redisSortOperation
{
305 } redisSortOperation
;
307 struct sharedObjectsStruct
{
308 robj
*crlf
, *ok
, *err
, *emptybulk
, *czero
, *cone
, *pong
, *space
,
309 *colon
, *nullbulk
, *nullmultibulk
,
310 *emptymultibulk
, *wrongtypeerr
, *nokeyerr
, *syntaxerr
, *sameobjecterr
,
311 *outofrangeerr
, *plus
,
312 *select0
, *select1
, *select2
, *select3
, *select4
,
313 *select5
, *select6
, *select7
, *select8
, *select9
;
316 /*================================ Prototypes =============================== */
318 static void freeStringObject(robj
*o
);
319 static void freeListObject(robj
*o
);
320 static void freeSetObject(robj
*o
);
321 static void decrRefCount(void *o
);
322 static robj
*createObject(int type
, void *ptr
);
323 static void freeClient(redisClient
*c
);
324 static int rdbLoad(char *filename
);
325 static void addReply(redisClient
*c
, robj
*obj
);
326 static void addReplySds(redisClient
*c
, sds s
);
327 static void incrRefCount(robj
*o
);
328 static int rdbSaveBackground(char *filename
);
329 static robj
*createStringObject(char *ptr
, size_t len
);
330 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
);
331 static int syncWithMaster(void);
332 static robj
*tryObjectSharing(robj
*o
);
333 static int tryObjectEncoding(robj
*o
);
334 static robj
*getDecodedObject(const robj
*o
);
335 static int removeExpire(redisDb
*db
, robj
*key
);
336 static int expireIfNeeded(redisDb
*db
, robj
*key
);
337 static int deleteIfVolatile(redisDb
*db
, robj
*key
);
338 static int deleteKey(redisDb
*db
, robj
*key
);
339 static time_t getExpire(redisDb
*db
, robj
*key
);
340 static int setExpire(redisDb
*db
, robj
*key
, time_t when
);
341 static void updateSlavesWaitingBgsave(int bgsaveerr
);
342 static void freeMemoryIfNeeded(void);
343 static int processCommand(redisClient
*c
);
344 static void setupSigSegvAction(void);
345 static void rdbRemoveTempFile(pid_t childpid
);
346 static size_t stringObjectLen(robj
*o
);
347 static void processInputBuffer(redisClient
*c
);
349 static void authCommand(redisClient
*c
);
350 static void pingCommand(redisClient
*c
);
351 static void echoCommand(redisClient
*c
);
352 static void setCommand(redisClient
*c
);
353 static void setnxCommand(redisClient
*c
);
354 static void getCommand(redisClient
*c
);
355 static void delCommand(redisClient
*c
);
356 static void existsCommand(redisClient
*c
);
357 static void incrCommand(redisClient
*c
);
358 static void decrCommand(redisClient
*c
);
359 static void incrbyCommand(redisClient
*c
);
360 static void decrbyCommand(redisClient
*c
);
361 static void selectCommand(redisClient
*c
);
362 static void randomkeyCommand(redisClient
*c
);
363 static void keysCommand(redisClient
*c
);
364 static void dbsizeCommand(redisClient
*c
);
365 static void lastsaveCommand(redisClient
*c
);
366 static void saveCommand(redisClient
*c
);
367 static void bgsaveCommand(redisClient
*c
);
368 static void shutdownCommand(redisClient
*c
);
369 static void moveCommand(redisClient
*c
);
370 static void renameCommand(redisClient
*c
);
371 static void renamenxCommand(redisClient
*c
);
372 static void lpushCommand(redisClient
*c
);
373 static void rpushCommand(redisClient
*c
);
374 static void lpopCommand(redisClient
*c
);
375 static void rpopCommand(redisClient
*c
);
376 static void llenCommand(redisClient
*c
);
377 static void lindexCommand(redisClient
*c
);
378 static void lrangeCommand(redisClient
*c
);
379 static void ltrimCommand(redisClient
*c
);
380 static void typeCommand(redisClient
*c
);
381 static void lsetCommand(redisClient
*c
);
382 static void saddCommand(redisClient
*c
);
383 static void sremCommand(redisClient
*c
);
384 static void smoveCommand(redisClient
*c
);
385 static void sismemberCommand(redisClient
*c
);
386 static void scardCommand(redisClient
*c
);
387 static void spopCommand(redisClient
*c
);
388 static void srandmemberCommand(redisClient
*c
);
389 static void sinterCommand(redisClient
*c
);
390 static void sinterstoreCommand(redisClient
*c
);
391 static void sunionCommand(redisClient
*c
);
392 static void sunionstoreCommand(redisClient
*c
);
393 static void sdiffCommand(redisClient
*c
);
394 static void sdiffstoreCommand(redisClient
*c
);
395 static void syncCommand(redisClient
*c
);
396 static void flushdbCommand(redisClient
*c
);
397 static void flushallCommand(redisClient
*c
);
398 static void sortCommand(redisClient
*c
);
399 static void lremCommand(redisClient
*c
);
400 static void infoCommand(redisClient
*c
);
401 static void mgetCommand(redisClient
*c
);
402 static void monitorCommand(redisClient
*c
);
403 static void expireCommand(redisClient
*c
);
404 static void getsetCommand(redisClient
*c
);
405 static void ttlCommand(redisClient
*c
);
406 static void slaveofCommand(redisClient
*c
);
407 static void debugCommand(redisClient
*c
);
408 static void msetCommand(redisClient
*c
);
409 static void msetnxCommand(redisClient
*c
);
411 /*================================= Globals ================================= */
414 static struct redisServer server
; /* server global state */
415 static struct redisCommand cmdTable
[] = {
416 {"get",getCommand
,2,REDIS_CMD_INLINE
},
417 {"set",setCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
418 {"setnx",setnxCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
419 {"del",delCommand
,-2,REDIS_CMD_INLINE
},
420 {"exists",existsCommand
,2,REDIS_CMD_INLINE
},
421 {"incr",incrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
422 {"decr",decrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
423 {"mget",mgetCommand
,-2,REDIS_CMD_INLINE
},
424 {"rpush",rpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
425 {"lpush",lpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
426 {"rpop",rpopCommand
,2,REDIS_CMD_INLINE
},
427 {"lpop",lpopCommand
,2,REDIS_CMD_INLINE
},
428 {"llen",llenCommand
,2,REDIS_CMD_INLINE
},
429 {"lindex",lindexCommand
,3,REDIS_CMD_INLINE
},
430 {"lset",lsetCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
431 {"lrange",lrangeCommand
,4,REDIS_CMD_INLINE
},
432 {"ltrim",ltrimCommand
,4,REDIS_CMD_INLINE
},
433 {"lrem",lremCommand
,4,REDIS_CMD_BULK
},
434 {"sadd",saddCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
435 {"srem",sremCommand
,3,REDIS_CMD_BULK
},
436 {"smove",smoveCommand
,4,REDIS_CMD_BULK
},
437 {"sismember",sismemberCommand
,3,REDIS_CMD_BULK
},
438 {"scard",scardCommand
,2,REDIS_CMD_INLINE
},
439 {"spop",spopCommand
,2,REDIS_CMD_INLINE
},
440 {"srandmember",srandmemberCommand
,2,REDIS_CMD_INLINE
},
441 {"sinter",sinterCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
442 {"sinterstore",sinterstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
443 {"sunion",sunionCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
444 {"sunionstore",sunionstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
445 {"sdiff",sdiffCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
446 {"sdiffstore",sdiffstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
447 {"smembers",sinterCommand
,2,REDIS_CMD_INLINE
},
448 {"incrby",incrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
449 {"decrby",decrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
450 {"getset",getsetCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
451 {"mset",msetCommand
,-3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
452 {"msetnx",msetnxCommand
,-3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
453 {"randomkey",randomkeyCommand
,1,REDIS_CMD_INLINE
},
454 {"select",selectCommand
,2,REDIS_CMD_INLINE
},
455 {"move",moveCommand
,3,REDIS_CMD_INLINE
},
456 {"rename",renameCommand
,3,REDIS_CMD_INLINE
},
457 {"renamenx",renamenxCommand
,3,REDIS_CMD_INLINE
},
458 {"expire",expireCommand
,3,REDIS_CMD_INLINE
},
459 {"keys",keysCommand
,2,REDIS_CMD_INLINE
},
460 {"dbsize",dbsizeCommand
,1,REDIS_CMD_INLINE
},
461 {"auth",authCommand
,2,REDIS_CMD_INLINE
},
462 {"ping",pingCommand
,1,REDIS_CMD_INLINE
},
463 {"echo",echoCommand
,2,REDIS_CMD_BULK
},
464 {"save",saveCommand
,1,REDIS_CMD_INLINE
},
465 {"bgsave",bgsaveCommand
,1,REDIS_CMD_INLINE
},
466 {"shutdown",shutdownCommand
,1,REDIS_CMD_INLINE
},
467 {"lastsave",lastsaveCommand
,1,REDIS_CMD_INLINE
},
468 {"type",typeCommand
,2,REDIS_CMD_INLINE
},
469 {"sync",syncCommand
,1,REDIS_CMD_INLINE
},
470 {"flushdb",flushdbCommand
,1,REDIS_CMD_INLINE
},
471 {"flushall",flushallCommand
,1,REDIS_CMD_INLINE
},
472 {"sort",sortCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
473 {"info",infoCommand
,1,REDIS_CMD_INLINE
},
474 {"monitor",monitorCommand
,1,REDIS_CMD_INLINE
},
475 {"ttl",ttlCommand
,2,REDIS_CMD_INLINE
},
476 {"slaveof",slaveofCommand
,3,REDIS_CMD_INLINE
},
477 {"debug",debugCommand
,-2,REDIS_CMD_INLINE
},
480 /*============================ Utility functions ============================ */
482 /* Glob-style pattern matching. */
483 int stringmatchlen(const char *pattern
, int patternLen
,
484 const char *string
, int stringLen
, int nocase
)
489 while (pattern
[1] == '*') {
494 return 1; /* match */
496 if (stringmatchlen(pattern
+1, patternLen
-1,
497 string
, stringLen
, nocase
))
498 return 1; /* match */
502 return 0; /* no match */
506 return 0; /* no match */
516 not = pattern
[0] == '^';
523 if (pattern
[0] == '\\') {
526 if (pattern
[0] == string
[0])
528 } else if (pattern
[0] == ']') {
530 } else if (patternLen
== 0) {
534 } else if (pattern
[1] == '-' && patternLen
>= 3) {
535 int start
= pattern
[0];
536 int end
= pattern
[2];
544 start
= tolower(start
);
550 if (c
>= start
&& c
<= end
)
554 if (pattern
[0] == string
[0])
557 if (tolower((int)pattern
[0]) == tolower((int)string
[0]))
567 return 0; /* no match */
573 if (patternLen
>= 2) {
580 if (pattern
[0] != string
[0])
581 return 0; /* no match */
583 if (tolower((int)pattern
[0]) != tolower((int)string
[0]))
584 return 0; /* no match */
592 if (stringLen
== 0) {
593 while(*pattern
== '*') {
600 if (patternLen
== 0 && stringLen
== 0)
605 static void redisLog(int level
, const char *fmt
, ...) {
609 fp
= (server
.logfile
== NULL
) ? stdout
: fopen(server
.logfile
,"a");
613 if (level
>= server
.verbosity
) {
619 strftime(buf
,64,"%d %b %H:%M:%S",gmtime(&now
));
620 fprintf(fp
,"%s %c ",buf
,c
[level
]);
621 vfprintf(fp
, fmt
, ap
);
627 if (server
.logfile
) fclose(fp
);
630 /*====================== Hash table type implementation ==================== */
632 /* This is an hash table type that uses the SDS dynamic strings libary as
633 * keys and radis objects as values (objects can hold SDS strings,
636 static int sdsDictKeyCompare(void *privdata
, const void *key1
,
640 DICT_NOTUSED(privdata
);
642 l1
= sdslen((sds
)key1
);
643 l2
= sdslen((sds
)key2
);
644 if (l1
!= l2
) return 0;
645 return memcmp(key1
, key2
, l1
) == 0;
648 static void dictRedisObjectDestructor(void *privdata
, void *val
)
650 DICT_NOTUSED(privdata
);
655 static int dictObjKeyCompare(void *privdata
, const void *key1
,
658 const robj
*o1
= key1
, *o2
= key2
;
659 return sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
662 static unsigned int dictObjHash(const void *key
) {
664 return dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
667 static int dictEncObjKeyCompare(void *privdata
, const void *key1
,
670 const robj
*o1
= key1
, *o2
= key2
;
672 if (o1
->encoding
== REDIS_ENCODING_RAW
&&
673 o2
->encoding
== REDIS_ENCODING_RAW
)
674 return sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
679 dec1
= o1
->encoding
!= REDIS_ENCODING_RAW
?
680 getDecodedObject(o1
) : (robj
*)o1
;
681 dec2
= o2
->encoding
!= REDIS_ENCODING_RAW
?
682 getDecodedObject(o2
) : (robj
*)o2
;
683 cmp
= sdsDictKeyCompare(privdata
,dec1
->ptr
,dec2
->ptr
);
684 if (dec1
!= o1
) decrRefCount(dec1
);
685 if (dec2
!= o2
) decrRefCount(dec2
);
690 static unsigned int dictEncObjHash(const void *key
) {
693 if (o
->encoding
== REDIS_ENCODING_RAW
)
694 return dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
696 robj
*dec
= getDecodedObject(o
);
697 unsigned int hash
= dictGenHashFunction(dec
->ptr
, sdslen((sds
)dec
->ptr
));
703 static dictType setDictType
= {
704 dictEncObjHash
, /* hash function */
707 dictEncObjKeyCompare
, /* key compare */
708 dictRedisObjectDestructor
, /* key destructor */
709 NULL
/* val destructor */
712 static dictType hashDictType
= {
713 dictObjHash
, /* hash function */
716 dictObjKeyCompare
, /* key compare */
717 dictRedisObjectDestructor
, /* key destructor */
718 dictRedisObjectDestructor
/* val destructor */
721 /* ========================= Random utility functions ======================= */
723 /* Redis generally does not try to recover from out of memory conditions
724 * when allocating objects or strings, it is not clear if it will be possible
725 * to report this condition to the client since the networking layer itself
726 * is based on heap allocation for send buffers, so we simply abort.
727 * At least the code will be simpler to read... */
728 static void oom(const char *msg
) {
729 fprintf(stderr
, "%s: Out of memory\n",msg
);
735 /* ====================== Redis server networking stuff ===================== */
736 static void closeTimedoutClients(void) {
739 time_t now
= time(NULL
);
741 listRewind(server
.clients
);
742 while ((ln
= listYield(server
.clients
)) != NULL
) {
743 c
= listNodeValue(ln
);
744 if (!(c
->flags
& REDIS_SLAVE
) && /* no timeout for slaves */
745 !(c
->flags
& REDIS_MASTER
) && /* no timeout for masters */
746 (now
- c
->lastinteraction
> server
.maxidletime
)) {
747 redisLog(REDIS_DEBUG
,"Closing idle client");
753 static int htNeedsResize(dict
*dict
) {
754 long long size
, used
;
756 size
= dictSlots(dict
);
757 used
= dictSize(dict
);
758 return (size
&& used
&& size
> DICT_HT_INITIAL_SIZE
&&
759 (used
*100/size
< REDIS_HT_MINFILL
));
762 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
763 * we resize the hash table to save memory */
764 static void tryResizeHashTables(void) {
767 for (j
= 0; j
< server
.dbnum
; j
++) {
768 if (htNeedsResize(server
.db
[j
].dict
)) {
769 redisLog(REDIS_DEBUG
,"The hash table %d is too sparse, resize it...",j
);
770 dictResize(server
.db
[j
].dict
);
771 redisLog(REDIS_DEBUG
,"Hash table %d resized.",j
);
773 if (htNeedsResize(server
.db
[j
].expires
))
774 dictResize(server
.db
[j
].expires
);
778 static int serverCron(struct aeEventLoop
*eventLoop
, long long id
, void *clientData
) {
779 int j
, loops
= server
.cronloops
++;
780 REDIS_NOTUSED(eventLoop
);
782 REDIS_NOTUSED(clientData
);
784 /* Update the global state with the amount of used memory */
785 server
.usedmemory
= zmalloc_used_memory();
787 /* Show some info about non-empty databases */
788 for (j
= 0; j
< server
.dbnum
; j
++) {
789 long long size
, used
, vkeys
;
791 size
= dictSlots(server
.db
[j
].dict
);
792 used
= dictSize(server
.db
[j
].dict
);
793 vkeys
= dictSize(server
.db
[j
].expires
);
794 if (!(loops
% 5) && (used
|| vkeys
)) {
795 redisLog(REDIS_DEBUG
,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j
,used
,vkeys
,size
);
796 /* dictPrintStats(server.dict); */
800 /* We don't want to resize the hash tables while a bacground saving
801 * is in progress: the saving child is created using fork() that is
802 * implemented with a copy-on-write semantic in most modern systems, so
803 * if we resize the HT while there is the saving child at work actually
804 * a lot of memory movements in the parent will cause a lot of pages
806 if (!server
.bgsaveinprogress
) tryResizeHashTables();
808 /* Show information about connected clients */
810 redisLog(REDIS_DEBUG
,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
811 listLength(server
.clients
)-listLength(server
.slaves
),
812 listLength(server
.slaves
),
814 dictSize(server
.sharingpool
));
817 /* Close connections of timedout clients */
818 if (server
.maxidletime
&& !(loops
% 10))
819 closeTimedoutClients();
821 /* Check if a background saving in progress terminated */
822 if (server
.bgsaveinprogress
) {
824 if (wait4(-1,&statloc
,WNOHANG
,NULL
)) {
825 int exitcode
= WEXITSTATUS(statloc
);
826 int bysignal
= WIFSIGNALED(statloc
);
828 if (!bysignal
&& exitcode
== 0) {
829 redisLog(REDIS_NOTICE
,
830 "Background saving terminated with success");
832 server
.lastsave
= time(NULL
);
833 } else if (!bysignal
&& exitcode
!= 0) {
834 redisLog(REDIS_WARNING
, "Background saving error");
836 redisLog(REDIS_WARNING
,
837 "Background saving terminated by signal");
838 rdbRemoveTempFile(server
.bgsavechildpid
);
840 server
.bgsaveinprogress
= 0;
841 server
.bgsavechildpid
= -1;
842 updateSlavesWaitingBgsave(exitcode
== 0 ? REDIS_OK
: REDIS_ERR
);
845 /* If there is not a background saving in progress check if
846 * we have to save now */
847 time_t now
= time(NULL
);
848 for (j
= 0; j
< server
.saveparamslen
; j
++) {
849 struct saveparam
*sp
= server
.saveparams
+j
;
851 if (server
.dirty
>= sp
->changes
&&
852 now
-server
.lastsave
> sp
->seconds
) {
853 redisLog(REDIS_NOTICE
,"%d changes in %d seconds. Saving...",
854 sp
->changes
, sp
->seconds
);
855 rdbSaveBackground(server
.dbfilename
);
861 /* Try to expire a few timed out keys */
862 for (j
= 0; j
< server
.dbnum
; j
++) {
863 redisDb
*db
= server
.db
+j
;
864 int num
= dictSize(db
->expires
);
867 time_t now
= time(NULL
);
869 if (num
> REDIS_EXPIRELOOKUPS_PER_CRON
)
870 num
= REDIS_EXPIRELOOKUPS_PER_CRON
;
875 if ((de
= dictGetRandomKey(db
->expires
)) == NULL
) break;
876 t
= (time_t) dictGetEntryVal(de
);
878 deleteKey(db
,dictGetEntryKey(de
));
884 /* Check if we should connect to a MASTER */
885 if (server
.replstate
== REDIS_REPL_CONNECT
) {
886 redisLog(REDIS_NOTICE
,"Connecting to MASTER...");
887 if (syncWithMaster() == REDIS_OK
) {
888 redisLog(REDIS_NOTICE
,"MASTER <-> SLAVE sync succeeded");
894 static void createSharedObjects(void) {
895 shared
.crlf
= createObject(REDIS_STRING
,sdsnew("\r\n"));
896 shared
.ok
= createObject(REDIS_STRING
,sdsnew("+OK\r\n"));
897 shared
.err
= createObject(REDIS_STRING
,sdsnew("-ERR\r\n"));
898 shared
.emptybulk
= createObject(REDIS_STRING
,sdsnew("$0\r\n\r\n"));
899 shared
.czero
= createObject(REDIS_STRING
,sdsnew(":0\r\n"));
900 shared
.cone
= createObject(REDIS_STRING
,sdsnew(":1\r\n"));
901 shared
.nullbulk
= createObject(REDIS_STRING
,sdsnew("$-1\r\n"));
902 shared
.nullmultibulk
= createObject(REDIS_STRING
,sdsnew("*-1\r\n"));
903 shared
.emptymultibulk
= createObject(REDIS_STRING
,sdsnew("*0\r\n"));
905 shared
.pong
= createObject(REDIS_STRING
,sdsnew("+PONG\r\n"));
906 shared
.wrongtypeerr
= createObject(REDIS_STRING
,sdsnew(
907 "-ERR Operation against a key holding the wrong kind of value\r\n"));
908 shared
.nokeyerr
= createObject(REDIS_STRING
,sdsnew(
909 "-ERR no such key\r\n"));
910 shared
.syntaxerr
= createObject(REDIS_STRING
,sdsnew(
911 "-ERR syntax error\r\n"));
912 shared
.sameobjecterr
= createObject(REDIS_STRING
,sdsnew(
913 "-ERR source and destination objects are the same\r\n"));
914 shared
.outofrangeerr
= createObject(REDIS_STRING
,sdsnew(
915 "-ERR index out of range\r\n"));
916 shared
.space
= createObject(REDIS_STRING
,sdsnew(" "));
917 shared
.colon
= createObject(REDIS_STRING
,sdsnew(":"));
918 shared
.plus
= createObject(REDIS_STRING
,sdsnew("+"));
919 shared
.select0
= createStringObject("select 0\r\n",10);
920 shared
.select1
= createStringObject("select 1\r\n",10);
921 shared
.select2
= createStringObject("select 2\r\n",10);
922 shared
.select3
= createStringObject("select 3\r\n",10);
923 shared
.select4
= createStringObject("select 4\r\n",10);
924 shared
.select5
= createStringObject("select 5\r\n",10);
925 shared
.select6
= createStringObject("select 6\r\n",10);
926 shared
.select7
= createStringObject("select 7\r\n",10);
927 shared
.select8
= createStringObject("select 8\r\n",10);
928 shared
.select9
= createStringObject("select 9\r\n",10);
931 static void appendServerSaveParams(time_t seconds
, int changes
) {
932 server
.saveparams
= zrealloc(server
.saveparams
,sizeof(struct saveparam
)*(server
.saveparamslen
+1));
933 if (server
.saveparams
== NULL
) oom("appendServerSaveParams");
934 server
.saveparams
[server
.saveparamslen
].seconds
= seconds
;
935 server
.saveparams
[server
.saveparamslen
].changes
= changes
;
936 server
.saveparamslen
++;
939 static void ResetServerSaveParams() {
940 zfree(server
.saveparams
);
941 server
.saveparams
= NULL
;
942 server
.saveparamslen
= 0;
945 static void initServerConfig() {
946 server
.dbnum
= REDIS_DEFAULT_DBNUM
;
947 server
.port
= REDIS_SERVERPORT
;
948 server
.verbosity
= REDIS_DEBUG
;
949 server
.maxidletime
= REDIS_MAXIDLETIME
;
950 server
.saveparams
= NULL
;
951 server
.logfile
= NULL
; /* NULL = log on standard output */
952 server
.bindaddr
= NULL
;
953 server
.glueoutputbuf
= 1;
954 server
.daemonize
= 0;
955 server
.pidfile
= "/var/run/redis.pid";
956 server
.dbfilename
= "dump.rdb";
957 server
.requirepass
= NULL
;
958 server
.shareobjects
= 0;
959 server
.sharingpoolsize
= 1024;
960 server
.maxclients
= 0;
961 server
.maxmemory
= 0;
962 ResetServerSaveParams();
964 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
965 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
966 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
967 /* Replication related */
969 server
.masterhost
= NULL
;
970 server
.masterport
= 6379;
971 server
.master
= NULL
;
972 server
.replstate
= REDIS_REPL_NONE
;
975 static void initServer() {
978 signal(SIGHUP
, SIG_IGN
);
979 signal(SIGPIPE
, SIG_IGN
);
980 setupSigSegvAction();
982 server
.clients
= listCreate();
983 server
.slaves
= listCreate();
984 server
.monitors
= listCreate();
985 server
.objfreelist
= listCreate();
986 createSharedObjects();
987 server
.el
= aeCreateEventLoop();
988 server
.db
= zmalloc(sizeof(redisDb
)*server
.dbnum
);
989 server
.sharingpool
= dictCreate(&setDictType
,NULL
);
990 if (!server
.db
|| !server
.clients
|| !server
.slaves
|| !server
.monitors
|| !server
.el
|| !server
.objfreelist
)
991 oom("server initialization"); /* Fatal OOM */
992 server
.fd
= anetTcpServer(server
.neterr
, server
.port
, server
.bindaddr
);
993 if (server
.fd
== -1) {
994 redisLog(REDIS_WARNING
, "Opening TCP port: %s", server
.neterr
);
997 for (j
= 0; j
< server
.dbnum
; j
++) {
998 server
.db
[j
].dict
= dictCreate(&hashDictType
,NULL
);
999 server
.db
[j
].expires
= dictCreate(&setDictType
,NULL
);
1000 server
.db
[j
].id
= j
;
1002 server
.cronloops
= 0;
1003 server
.bgsaveinprogress
= 0;
1004 server
.bgsavechildpid
= -1;
1005 server
.lastsave
= time(NULL
);
1007 server
.usedmemory
= 0;
1008 server
.stat_numcommands
= 0;
1009 server
.stat_numconnections
= 0;
1010 server
.stat_starttime
= time(NULL
);
1011 aeCreateTimeEvent(server
.el
, 1000, serverCron
, NULL
, NULL
);
1014 /* Empty the whole database */
1015 static long long emptyDb() {
1017 long long removed
= 0;
1019 for (j
= 0; j
< server
.dbnum
; j
++) {
1020 removed
+= dictSize(server
.db
[j
].dict
);
1021 dictEmpty(server
.db
[j
].dict
);
1022 dictEmpty(server
.db
[j
].expires
);
1027 static int yesnotoi(char *s
) {
1028 if (!strcasecmp(s
,"yes")) return 1;
1029 else if (!strcasecmp(s
,"no")) return 0;
1033 /* I agree, this is a very rudimental way to load a configuration...
1034 will improve later if the config gets more complex */
1035 static void loadServerConfig(char *filename
) {
1037 char buf
[REDIS_CONFIGLINE_MAX
+1], *err
= NULL
;
1041 if (filename
[0] == '-' && filename
[1] == '\0')
1044 if ((fp
= fopen(filename
,"r")) == NULL
) {
1045 redisLog(REDIS_WARNING
,"Fatal error, can't open config file");
1050 while(fgets(buf
,REDIS_CONFIGLINE_MAX
+1,fp
) != NULL
) {
1056 line
= sdstrim(line
," \t\r\n");
1058 /* Skip comments and blank lines*/
1059 if (line
[0] == '#' || line
[0] == '\0') {
1064 /* Split into arguments */
1065 argv
= sdssplitlen(line
,sdslen(line
)," ",1,&argc
);
1066 sdstolower(argv
[0]);
1068 /* Execute config directives */
1069 if (!strcasecmp(argv
[0],"timeout") && argc
== 2) {
1070 server
.maxidletime
= atoi(argv
[1]);
1071 if (server
.maxidletime
< 0) {
1072 err
= "Invalid timeout value"; goto loaderr
;
1074 } else if (!strcasecmp(argv
[0],"port") && argc
== 2) {
1075 server
.port
= atoi(argv
[1]);
1076 if (server
.port
< 1 || server
.port
> 65535) {
1077 err
= "Invalid port"; goto loaderr
;
1079 } else if (!strcasecmp(argv
[0],"bind") && argc
== 2) {
1080 server
.bindaddr
= zstrdup(argv
[1]);
1081 } else if (!strcasecmp(argv
[0],"save") && argc
== 3) {
1082 int seconds
= atoi(argv
[1]);
1083 int changes
= atoi(argv
[2]);
1084 if (seconds
< 1 || changes
< 0) {
1085 err
= "Invalid save parameters"; goto loaderr
;
1087 appendServerSaveParams(seconds
,changes
);
1088 } else if (!strcasecmp(argv
[0],"dir") && argc
== 2) {
1089 if (chdir(argv
[1]) == -1) {
1090 redisLog(REDIS_WARNING
,"Can't chdir to '%s': %s",
1091 argv
[1], strerror(errno
));
1094 } else if (!strcasecmp(argv
[0],"loglevel") && argc
== 2) {
1095 if (!strcasecmp(argv
[1],"debug")) server
.verbosity
= REDIS_DEBUG
;
1096 else if (!strcasecmp(argv
[1],"notice")) server
.verbosity
= REDIS_NOTICE
;
1097 else if (!strcasecmp(argv
[1],"warning")) server
.verbosity
= REDIS_WARNING
;
1099 err
= "Invalid log level. Must be one of debug, notice, warning";
1102 } else if (!strcasecmp(argv
[0],"logfile") && argc
== 2) {
1105 server
.logfile
= zstrdup(argv
[1]);
1106 if (!strcasecmp(server
.logfile
,"stdout")) {
1107 zfree(server
.logfile
);
1108 server
.logfile
= NULL
;
1110 if (server
.logfile
) {
1111 /* Test if we are able to open the file. The server will not
1112 * be able to abort just for this problem later... */
1113 logfp
= fopen(server
.logfile
,"a");
1114 if (logfp
== NULL
) {
1115 err
= sdscatprintf(sdsempty(),
1116 "Can't open the log file: %s", strerror(errno
));
1121 } else if (!strcasecmp(argv
[0],"databases") && argc
== 2) {
1122 server
.dbnum
= atoi(argv
[1]);
1123 if (server
.dbnum
< 1) {
1124 err
= "Invalid number of databases"; goto loaderr
;
1126 } else if (!strcasecmp(argv
[0],"maxclients") && argc
== 2) {
1127 server
.maxclients
= atoi(argv
[1]);
1128 } else if (!strcasecmp(argv
[0],"maxmemory") && argc
== 2) {
1129 server
.maxmemory
= strtoll(argv
[1], NULL
, 10);
1130 } else if (!strcasecmp(argv
[0],"slaveof") && argc
== 3) {
1131 server
.masterhost
= sdsnew(argv
[1]);
1132 server
.masterport
= atoi(argv
[2]);
1133 server
.replstate
= REDIS_REPL_CONNECT
;
1134 } else if (!strcasecmp(argv
[0],"glueoutputbuf") && argc
== 2) {
1135 if ((server
.glueoutputbuf
= yesnotoi(argv
[1])) == -1) {
1136 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1138 } else if (!strcasecmp(argv
[0],"shareobjects") && argc
== 2) {
1139 if ((server
.shareobjects
= yesnotoi(argv
[1])) == -1) {
1140 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1142 } else if (!strcasecmp(argv
[0],"shareobjectspoolsize") && argc
== 2) {
1143 server
.sharingpoolsize
= atoi(argv
[1]);
1144 if (server
.sharingpoolsize
< 1) {
1145 err
= "invalid object sharing pool size"; goto loaderr
;
1147 } else if (!strcasecmp(argv
[0],"daemonize") && argc
== 2) {
1148 if ((server
.daemonize
= yesnotoi(argv
[1])) == -1) {
1149 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1151 } else if (!strcasecmp(argv
[0],"requirepass") && argc
== 2) {
1152 server
.requirepass
= zstrdup(argv
[1]);
1153 } else if (!strcasecmp(argv
[0],"pidfile") && argc
== 2) {
1154 server
.pidfile
= zstrdup(argv
[1]);
1155 } else if (!strcasecmp(argv
[0],"dbfilename") && argc
== 2) {
1156 server
.dbfilename
= zstrdup(argv
[1]);
1158 err
= "Bad directive or wrong number of arguments"; goto loaderr
;
1160 for (j
= 0; j
< argc
; j
++)
1165 if (fp
!= stdin
) fclose(fp
);
1169 fprintf(stderr
, "\n*** FATAL CONFIG FILE ERROR ***\n");
1170 fprintf(stderr
, "Reading the configuration file, at line %d\n", linenum
);
1171 fprintf(stderr
, ">>> '%s'\n", line
);
1172 fprintf(stderr
, "%s\n", err
);
1176 static void freeClientArgv(redisClient
*c
) {
1179 for (j
= 0; j
< c
->argc
; j
++)
1180 decrRefCount(c
->argv
[j
]);
1181 for (j
= 0; j
< c
->mbargc
; j
++)
1182 decrRefCount(c
->mbargv
[j
]);
1187 static void freeClient(redisClient
*c
) {
1190 aeDeleteFileEvent(server
.el
,c
->fd
,AE_READABLE
);
1191 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1192 sdsfree(c
->querybuf
);
1193 listRelease(c
->reply
);
1196 ln
= listSearchKey(server
.clients
,c
);
1198 listDelNode(server
.clients
,ln
);
1199 if (c
->flags
& REDIS_SLAVE
) {
1200 if (c
->replstate
== REDIS_REPL_SEND_BULK
&& c
->repldbfd
!= -1)
1202 list
*l
= (c
->flags
& REDIS_MONITOR
) ? server
.monitors
: server
.slaves
;
1203 ln
= listSearchKey(l
,c
);
1207 if (c
->flags
& REDIS_MASTER
) {
1208 server
.master
= NULL
;
1209 server
.replstate
= REDIS_REPL_CONNECT
;
1216 static void glueReplyBuffersIfNeeded(redisClient
*c
) {
1221 listRewind(c
->reply
);
1222 while((ln
= listYield(c
->reply
))) {
1224 totlen
+= sdslen(o
->ptr
);
1225 /* This optimization makes more sense if we don't have to copy
1227 if (totlen
> 1024) return;
1233 listRewind(c
->reply
);
1234 while((ln
= listYield(c
->reply
))) {
1236 memcpy(buf
+copylen
,o
->ptr
,sdslen(o
->ptr
));
1237 copylen
+= sdslen(o
->ptr
);
1238 listDelNode(c
->reply
,ln
);
1240 /* Now the output buffer is empty, add the new single element */
1241 o
= createObject(REDIS_STRING
,sdsnewlen(buf
,totlen
));
1242 if (!listAddNodeTail(c
->reply
,o
)) oom("listAddNodeTail");
1246 static void sendReplyToClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1247 redisClient
*c
= privdata
;
1248 int nwritten
= 0, totwritten
= 0, objlen
;
1251 REDIS_NOTUSED(mask
);
1253 if (server
.glueoutputbuf
&& listLength(c
->reply
) > 1)
1254 glueReplyBuffersIfNeeded(c
);
1255 while(listLength(c
->reply
)) {
1256 o
= listNodeValue(listFirst(c
->reply
));
1257 objlen
= sdslen(o
->ptr
);
1260 listDelNode(c
->reply
,listFirst(c
->reply
));
1264 if (c
->flags
& REDIS_MASTER
) {
1265 /* Don't reply to a master */
1266 nwritten
= objlen
- c
->sentlen
;
1268 nwritten
= write(fd
, ((char*)o
->ptr
)+c
->sentlen
, objlen
- c
->sentlen
);
1269 if (nwritten
<= 0) break;
1271 c
->sentlen
+= nwritten
;
1272 totwritten
+= nwritten
;
1273 /* If we fully sent the object on head go to the next one */
1274 if (c
->sentlen
== objlen
) {
1275 listDelNode(c
->reply
,listFirst(c
->reply
));
1278 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1279 * bytes, in a single threaded server it's a good idea to server
1280 * other clients as well, even if a very large request comes from
1281 * super fast link that is always able to accept data (in real world
1282 * terms think to 'KEYS *' against the loopback interfae) */
1283 if (totwritten
> REDIS_MAX_WRITE_PER_EVENT
) break;
1285 if (nwritten
== -1) {
1286 if (errno
== EAGAIN
) {
1289 redisLog(REDIS_DEBUG
,
1290 "Error writing to client: %s", strerror(errno
));
1295 if (totwritten
> 0) c
->lastinteraction
= time(NULL
);
1296 if (listLength(c
->reply
) == 0) {
1298 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1302 static struct redisCommand
*lookupCommand(char *name
) {
1304 while(cmdTable
[j
].name
!= NULL
) {
1305 if (!strcasecmp(name
,cmdTable
[j
].name
)) return &cmdTable
[j
];
1311 /* resetClient prepare the client to process the next command */
1312 static void resetClient(redisClient
*c
) {
1318 /* If this function gets called we already read a whole
1319 * command, argments are in the client argv/argc fields.
1320 * processCommand() execute the command or prepare the
1321 * server for a bulk read from the client.
1323 * If 1 is returned the client is still alive and valid and
1324 * and other operations can be performed by the caller. Otherwise
1325 * if 0 is returned the client was destroied (i.e. after QUIT). */
1326 static int processCommand(redisClient
*c
) {
1327 struct redisCommand
*cmd
;
1330 /* Free some memory if needed (maxmemory setting) */
1331 if (server
.maxmemory
) freeMemoryIfNeeded();
1333 /* Handle the multi bulk command type. This is an alternative protocol
1334 * supported by Redis in order to receive commands that are composed of
1335 * multiple binary-safe "bulk" arguments. The latency of processing is
1336 * a bit higher but this allows things like multi-sets, so if this
1337 * protocol is used only for MSET and similar commands this is a big win. */
1338 if (c
->multibulk
== 0 && c
->argc
== 1 && ((char*)(c
->argv
[0]->ptr
))[0] == '*') {
1339 c
->multibulk
= atoi(((char*)c
->argv
[0]->ptr
)+1);
1340 if (c
->multibulk
<= 0) {
1344 decrRefCount(c
->argv
[c
->argc
-1]);
1348 } else if (c
->multibulk
) {
1349 if (c
->bulklen
== -1) {
1350 if (((char*)c
->argv
[0]->ptr
)[0] != '$') {
1351 addReplySds(c
,sdsnew("-ERR multi bulk protocol error\r\n"));
1355 int bulklen
= atoi(((char*)c
->argv
[0]->ptr
)+1);
1356 decrRefCount(c
->argv
[0]);
1357 if (bulklen
< 0 || bulklen
> 1024*1024*1024) {
1359 addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n"));
1364 c
->bulklen
= bulklen
+2; /* add two bytes for CR+LF */
1368 c
->mbargv
= zrealloc(c
->mbargv
,(sizeof(robj
*))*(c
->mbargc
+1));
1369 c
->mbargv
[c
->mbargc
] = c
->argv
[0];
1373 if (c
->multibulk
== 0) {
1377 /* Here we need to swap the multi-bulk argc/argv with the
1378 * normal argc/argv of the client structure. */
1380 c
->argv
= c
->mbargv
;
1381 c
->mbargv
= auxargv
;
1384 c
->argc
= c
->mbargc
;
1385 c
->mbargc
= auxargc
;
1387 /* We need to set bulklen to something different than -1
1388 * in order for the code below to process the command without
1389 * to try to read the last argument of a bulk command as
1390 * a special argument. */
1392 /* continue below and process the command */
1399 /* -- end of multi bulk commands processing -- */
1401 /* The QUIT command is handled as a special case. Normal command
1402 * procs are unable to close the client connection safely */
1403 if (!strcasecmp(c
->argv
[0]->ptr
,"quit")) {
1407 cmd
= lookupCommand(c
->argv
[0]->ptr
);
1409 addReplySds(c
,sdsnew("-ERR unknown command\r\n"));
1412 } else if ((cmd
->arity
> 0 && cmd
->arity
!= c
->argc
) ||
1413 (c
->argc
< -cmd
->arity
)) {
1414 addReplySds(c
,sdsnew("-ERR wrong number of arguments\r\n"));
1417 } else if (server
.maxmemory
&& cmd
->flags
& REDIS_CMD_DENYOOM
&& zmalloc_used_memory() > server
.maxmemory
) {
1418 addReplySds(c
,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1421 } else if (cmd
->flags
& REDIS_CMD_BULK
&& c
->bulklen
== -1) {
1422 int bulklen
= atoi(c
->argv
[c
->argc
-1]->ptr
);
1424 decrRefCount(c
->argv
[c
->argc
-1]);
1425 if (bulklen
< 0 || bulklen
> 1024*1024*1024) {
1427 addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n"));
1432 c
->bulklen
= bulklen
+2; /* add two bytes for CR+LF */
1433 /* It is possible that the bulk read is already in the
1434 * buffer. Check this condition and handle it accordingly.
1435 * This is just a fast path, alternative to call processInputBuffer().
1436 * It's a good idea since the code is small and this condition
1437 * happens most of the times. */
1438 if ((signed)sdslen(c
->querybuf
) >= c
->bulklen
) {
1439 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
1441 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
1446 /* Let's try to share objects on the command arguments vector */
1447 if (server
.shareobjects
) {
1449 for(j
= 1; j
< c
->argc
; j
++)
1450 c
->argv
[j
] = tryObjectSharing(c
->argv
[j
]);
1452 /* Let's try to encode the bulk object to save space. */
1453 if (cmd
->flags
& REDIS_CMD_BULK
)
1454 tryObjectEncoding(c
->argv
[c
->argc
-1]);
1456 /* Check if the user is authenticated */
1457 if (server
.requirepass
&& !c
->authenticated
&& cmd
->proc
!= authCommand
) {
1458 addReplySds(c
,sdsnew("-ERR operation not permitted\r\n"));
1463 /* Exec the command */
1464 dirty
= server
.dirty
;
1466 if (server
.dirty
-dirty
!= 0 && listLength(server
.slaves
))
1467 replicationFeedSlaves(server
.slaves
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1468 if (listLength(server
.monitors
))
1469 replicationFeedSlaves(server
.monitors
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1470 server
.stat_numcommands
++;
1472 /* Prepare the client for the next command */
1473 if (c
->flags
& REDIS_CLOSE
) {
1481 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
) {
1485 /* (args*2)+1 is enough room for args, spaces, newlines */
1486 robj
*static_outv
[REDIS_STATIC_ARGS
*2+1];
1488 if (argc
<= REDIS_STATIC_ARGS
) {
1491 outv
= zmalloc(sizeof(robj
*)*(argc
*2+1));
1492 if (!outv
) oom("replicationFeedSlaves");
1495 for (j
= 0; j
< argc
; j
++) {
1496 if (j
!= 0) outv
[outc
++] = shared
.space
;
1497 if ((cmd
->flags
& REDIS_CMD_BULK
) && j
== argc
-1) {
1500 lenobj
= createObject(REDIS_STRING
,
1501 sdscatprintf(sdsempty(),"%d\r\n",
1502 stringObjectLen(argv
[j
])));
1503 lenobj
->refcount
= 0;
1504 outv
[outc
++] = lenobj
;
1506 outv
[outc
++] = argv
[j
];
1508 outv
[outc
++] = shared
.crlf
;
1510 /* Increment all the refcounts at start and decrement at end in order to
1511 * be sure to free objects if there is no slave in a replication state
1512 * able to be feed with commands */
1513 for (j
= 0; j
< outc
; j
++) incrRefCount(outv
[j
]);
1515 while((ln
= listYield(slaves
))) {
1516 redisClient
*slave
= ln
->value
;
1518 /* Don't feed slaves that are still waiting for BGSAVE to start */
1519 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) continue;
1521 /* Feed all the other slaves, MONITORs and so on */
1522 if (slave
->slaveseldb
!= dictid
) {
1526 case 0: selectcmd
= shared
.select0
; break;
1527 case 1: selectcmd
= shared
.select1
; break;
1528 case 2: selectcmd
= shared
.select2
; break;
1529 case 3: selectcmd
= shared
.select3
; break;
1530 case 4: selectcmd
= shared
.select4
; break;
1531 case 5: selectcmd
= shared
.select5
; break;
1532 case 6: selectcmd
= shared
.select6
; break;
1533 case 7: selectcmd
= shared
.select7
; break;
1534 case 8: selectcmd
= shared
.select8
; break;
1535 case 9: selectcmd
= shared
.select9
; break;
1537 selectcmd
= createObject(REDIS_STRING
,
1538 sdscatprintf(sdsempty(),"select %d\r\n",dictid
));
1539 selectcmd
->refcount
= 0;
1542 addReply(slave
,selectcmd
);
1543 slave
->slaveseldb
= dictid
;
1545 for (j
= 0; j
< outc
; j
++) addReply(slave
,outv
[j
]);
1547 for (j
= 0; j
< outc
; j
++) decrRefCount(outv
[j
]);
1548 if (outv
!= static_outv
) zfree(outv
);
1551 static void processInputBuffer(redisClient
*c
) {
1553 if (c
->bulklen
== -1) {
1554 /* Read the first line of the query */
1555 char *p
= strchr(c
->querybuf
,'\n');
1562 query
= c
->querybuf
;
1563 c
->querybuf
= sdsempty();
1564 querylen
= 1+(p
-(query
));
1565 if (sdslen(query
) > querylen
) {
1566 /* leave data after the first line of the query in the buffer */
1567 c
->querybuf
= sdscatlen(c
->querybuf
,query
+querylen
,sdslen(query
)-querylen
);
1569 *p
= '\0'; /* remove "\n" */
1570 if (*(p
-1) == '\r') *(p
-1) = '\0'; /* and "\r" if any */
1571 sdsupdatelen(query
);
1573 /* Now we can split the query in arguments */
1574 if (sdslen(query
) == 0) {
1575 /* Ignore empty query */
1579 argv
= sdssplitlen(query
,sdslen(query
)," ",1,&argc
);
1580 if (argv
== NULL
) oom("sdssplitlen");
1583 if (c
->argv
) zfree(c
->argv
);
1584 c
->argv
= zmalloc(sizeof(robj
*)*argc
);
1585 if (c
->argv
== NULL
) oom("allocating arguments list for client");
1587 for (j
= 0; j
< argc
; j
++) {
1588 if (sdslen(argv
[j
])) {
1589 c
->argv
[c
->argc
] = createObject(REDIS_STRING
,argv
[j
]);
1596 /* Execute the command. If the client is still valid
1597 * after processCommand() return and there is something
1598 * on the query buffer try to process the next command. */
1599 if (c
->argc
&& processCommand(c
) && sdslen(c
->querybuf
)) goto again
;
1601 } else if (sdslen(c
->querybuf
) >= REDIS_REQUEST_MAX_SIZE
) {
1602 redisLog(REDIS_DEBUG
, "Client protocol error");
1607 /* Bulk read handling. Note that if we are at this point
1608 the client already sent a command terminated with a newline,
1609 we are reading the bulk data that is actually the last
1610 argument of the command. */
1611 int qbl
= sdslen(c
->querybuf
);
1613 if (c
->bulklen
<= qbl
) {
1614 /* Copy everything but the final CRLF as final argument */
1615 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
1617 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
1618 /* Process the command. If the client is still valid after
1619 * the processing and there is more data in the buffer
1620 * try to parse it. */
1621 if (processCommand(c
) && sdslen(c
->querybuf
)) goto again
;
1627 static void readQueryFromClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1628 redisClient
*c
= (redisClient
*) privdata
;
1629 char buf
[REDIS_IOBUF_LEN
];
1632 REDIS_NOTUSED(mask
);
1634 nread
= read(fd
, buf
, REDIS_IOBUF_LEN
);
1636 if (errno
== EAGAIN
) {
1639 redisLog(REDIS_DEBUG
, "Reading from client: %s",strerror(errno
));
1643 } else if (nread
== 0) {
1644 redisLog(REDIS_DEBUG
, "Client closed connection");
1649 c
->querybuf
= sdscatlen(c
->querybuf
, buf
, nread
);
1650 c
->lastinteraction
= time(NULL
);
1654 processInputBuffer(c
);
1657 static int selectDb(redisClient
*c
, int id
) {
1658 if (id
< 0 || id
>= server
.dbnum
)
1660 c
->db
= &server
.db
[id
];
1664 static void *dupClientReplyValue(void *o
) {
1665 incrRefCount((robj
*)o
);
1669 static redisClient
*createClient(int fd
) {
1670 redisClient
*c
= zmalloc(sizeof(*c
));
1672 anetNonBlock(NULL
,fd
);
1673 anetTcpNoDelay(NULL
,fd
);
1674 if (!c
) return NULL
;
1677 c
->querybuf
= sdsempty();
1686 c
->lastinteraction
= time(NULL
);
1687 c
->authenticated
= 0;
1688 c
->replstate
= REDIS_REPL_NONE
;
1689 if ((c
->reply
= listCreate()) == NULL
) oom("listCreate");
1690 listSetFreeMethod(c
->reply
,decrRefCount
);
1691 listSetDupMethod(c
->reply
,dupClientReplyValue
);
1692 if (aeCreateFileEvent(server
.el
, c
->fd
, AE_READABLE
,
1693 readQueryFromClient
, c
, NULL
) == AE_ERR
) {
1697 if (!listAddNodeTail(server
.clients
,c
)) oom("listAddNodeTail");
1701 static void addReply(redisClient
*c
, robj
*obj
) {
1702 if (listLength(c
->reply
) == 0 &&
1703 (c
->replstate
== REDIS_REPL_NONE
||
1704 c
->replstate
== REDIS_REPL_ONLINE
) &&
1705 aeCreateFileEvent(server
.el
, c
->fd
, AE_WRITABLE
,
1706 sendReplyToClient
, c
, NULL
) == AE_ERR
) return;
1707 if (obj
->encoding
!= REDIS_ENCODING_RAW
) {
1708 obj
= getDecodedObject(obj
);
1712 if (!listAddNodeTail(c
->reply
,obj
)) oom("listAddNodeTail");
1715 static void addReplySds(redisClient
*c
, sds s
) {
1716 robj
*o
= createObject(REDIS_STRING
,s
);
1721 static void addReplyBulkLen(redisClient
*c
, robj
*obj
) {
1724 if (obj
->encoding
== REDIS_ENCODING_RAW
) {
1725 len
= sdslen(obj
->ptr
);
1727 long n
= (long)obj
->ptr
;
1734 while((n
= n
/10) != 0) {
1738 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",len
));
1741 static void acceptHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1746 REDIS_NOTUSED(mask
);
1747 REDIS_NOTUSED(privdata
);
1749 cfd
= anetAccept(server
.neterr
, fd
, cip
, &cport
);
1750 if (cfd
== AE_ERR
) {
1751 redisLog(REDIS_DEBUG
,"Accepting client connection: %s", server
.neterr
);
1754 redisLog(REDIS_DEBUG
,"Accepted %s:%d", cip
, cport
);
1755 if ((c
= createClient(cfd
)) == NULL
) {
1756 redisLog(REDIS_WARNING
,"Error allocating resoures for the client");
1757 close(cfd
); /* May be already closed, just ingore errors */
1760 /* If maxclient directive is set and this is one client more... close the
1761 * connection. Note that we create the client instead to check before
1762 * for this condition, since now the socket is already set in nonblocking
1763 * mode and we can send an error for free using the Kernel I/O */
1764 if (server
.maxclients
&& listLength(server
.clients
) > server
.maxclients
) {
1765 char *err
= "-ERR max number of clients reached\r\n";
1767 /* That's a best effort error message, don't check write errors */
1768 (void) write(c
->fd
,err
,strlen(err
));
1772 server
.stat_numconnections
++;
1775 /* ======================= Redis objects implementation ===================== */
1777 static robj
*createObject(int type
, void *ptr
) {
1780 if (listLength(server
.objfreelist
)) {
1781 listNode
*head
= listFirst(server
.objfreelist
);
1782 o
= listNodeValue(head
);
1783 listDelNode(server
.objfreelist
,head
);
1785 o
= zmalloc(sizeof(*o
));
1787 if (!o
) oom("createObject");
1789 o
->encoding
= REDIS_ENCODING_RAW
;
1795 static robj
*createStringObject(char *ptr
, size_t len
) {
1796 return createObject(REDIS_STRING
,sdsnewlen(ptr
,len
));
1799 static robj
*createListObject(void) {
1800 list
*l
= listCreate();
1802 if (!l
) oom("listCreate");
1803 listSetFreeMethod(l
,decrRefCount
);
1804 return createObject(REDIS_LIST
,l
);
1807 static robj
*createSetObject(void) {
1808 dict
*d
= dictCreate(&setDictType
,NULL
);
1809 if (!d
) oom("dictCreate");
1810 return createObject(REDIS_SET
,d
);
1813 static void freeStringObject(robj
*o
) {
1814 if (o
->encoding
== REDIS_ENCODING_RAW
) {
1819 static void freeListObject(robj
*o
) {
1820 listRelease((list
*) o
->ptr
);
1823 static void freeSetObject(robj
*o
) {
1824 dictRelease((dict
*) o
->ptr
);
1827 static void freeHashObject(robj
*o
) {
1828 dictRelease((dict
*) o
->ptr
);
1831 static void incrRefCount(robj
*o
) {
1833 #ifdef DEBUG_REFCOUNT
1834 if (o
->type
== REDIS_STRING
)
1835 printf("Increment '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
);
1839 static void decrRefCount(void *obj
) {
1842 #ifdef DEBUG_REFCOUNT
1843 if (o
->type
== REDIS_STRING
)
1844 printf("Decrement '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
-1);
1846 if (--(o
->refcount
) == 0) {
1848 case REDIS_STRING
: freeStringObject(o
); break;
1849 case REDIS_LIST
: freeListObject(o
); break;
1850 case REDIS_SET
: freeSetObject(o
); break;
1851 case REDIS_HASH
: freeHashObject(o
); break;
1852 default: assert(0 != 0); break;
1854 if (listLength(server
.objfreelist
) > REDIS_OBJFREELIST_MAX
||
1855 !listAddNodeHead(server
.objfreelist
,o
))
1860 static robj
*lookupKey(redisDb
*db
, robj
*key
) {
1861 dictEntry
*de
= dictFind(db
->dict
,key
);
1862 return de
? dictGetEntryVal(de
) : NULL
;
1865 static robj
*lookupKeyRead(redisDb
*db
, robj
*key
) {
1866 expireIfNeeded(db
,key
);
1867 return lookupKey(db
,key
);
1870 static robj
*lookupKeyWrite(redisDb
*db
, robj
*key
) {
1871 deleteIfVolatile(db
,key
);
1872 return lookupKey(db
,key
);
1875 static int deleteKey(redisDb
*db
, robj
*key
) {
1878 /* We need to protect key from destruction: after the first dictDelete()
1879 * it may happen that 'key' is no longer valid if we don't increment
1880 * it's count. This may happen when we get the object reference directly
1881 * from the hash table with dictRandomKey() or dict iterators */
1883 if (dictSize(db
->expires
)) dictDelete(db
->expires
,key
);
1884 retval
= dictDelete(db
->dict
,key
);
1887 return retval
== DICT_OK
;
1890 /* Try to share an object against the shared objects pool */
1891 static robj
*tryObjectSharing(robj
*o
) {
1892 struct dictEntry
*de
;
1895 if (o
== NULL
|| server
.shareobjects
== 0) return o
;
1897 assert(o
->type
== REDIS_STRING
);
1898 de
= dictFind(server
.sharingpool
,o
);
1900 robj
*shared
= dictGetEntryKey(de
);
1902 c
= ((unsigned long) dictGetEntryVal(de
))+1;
1903 dictGetEntryVal(de
) = (void*) c
;
1904 incrRefCount(shared
);
1908 /* Here we are using a stream algorihtm: Every time an object is
1909 * shared we increment its count, everytime there is a miss we
1910 * recrement the counter of a random object. If this object reaches
1911 * zero we remove the object and put the current object instead. */
1912 if (dictSize(server
.sharingpool
) >=
1913 server
.sharingpoolsize
) {
1914 de
= dictGetRandomKey(server
.sharingpool
);
1916 c
= ((unsigned long) dictGetEntryVal(de
))-1;
1917 dictGetEntryVal(de
) = (void*) c
;
1919 dictDelete(server
.sharingpool
,de
->key
);
1922 c
= 0; /* If the pool is empty we want to add this object */
1927 retval
= dictAdd(server
.sharingpool
,o
,(void*)1);
1928 assert(retval
== DICT_OK
);
1935 /* Check if the nul-terminated string 's' can be represented by a long
1936 * (that is, is a number that fits into long without any other space or
1937 * character before or after the digits).
1939 * If so, the function returns REDIS_OK and *longval is set to the value
1940 * of the number. Otherwise REDIS_ERR is returned */
1941 static int isStringRepresentableAsLong(sds s
, long *longval
) {
1942 char buf
[32], *endptr
;
1946 value
= strtol(s
, &endptr
, 10);
1947 if (endptr
[0] != '\0') return REDIS_ERR
;
1948 slen
= snprintf(buf
,32,"%ld",value
);
1950 /* If the number converted back into a string is not identical
1951 * then it's not possible to encode the string as integer */
1952 if (sdslen(s
) != (unsigned)slen
|| memcmp(buf
,s
,slen
)) return REDIS_ERR
;
1953 if (longval
) *longval
= value
;
1957 /* Try to encode a string object in order to save space */
1958 static int tryObjectEncoding(robj
*o
) {
1962 if (o
->encoding
!= REDIS_ENCODING_RAW
)
1963 return REDIS_ERR
; /* Already encoded */
1965 /* It's not save to encode shared objects: shared objects can be shared
1966 * everywhere in the "object space" of Redis. Encoded objects can only
1967 * appear as "values" (and not, for instance, as keys) */
1968 if (o
->refcount
> 1) return REDIS_ERR
;
1970 /* Currently we try to encode only strings */
1971 assert(o
->type
== REDIS_STRING
);
1973 /* Check if we can represent this string as a long integer */
1974 if (isStringRepresentableAsLong(s
,&value
) == REDIS_ERR
) return REDIS_ERR
;
1976 /* Ok, this object can be encoded */
1977 o
->encoding
= REDIS_ENCODING_INT
;
1979 o
->ptr
= (void*) value
;
1983 /* Get a decoded version of an encoded object (returned as a new object) */
1984 static robj
*getDecodedObject(const robj
*o
) {
1987 assert(o
->encoding
!= REDIS_ENCODING_RAW
);
1988 if (o
->type
== REDIS_STRING
&& o
->encoding
== REDIS_ENCODING_INT
) {
1991 snprintf(buf
,32,"%ld",(long)o
->ptr
);
1992 dec
= createStringObject(buf
,strlen(buf
));
1999 static int compareStringObjects(robj
*a
, robj
*b
) {
2000 assert(a
->type
== REDIS_STRING
&& b
->type
== REDIS_STRING
);
2002 if (a
->encoding
== REDIS_ENCODING_INT
&& b
->encoding
== REDIS_ENCODING_INT
){
2003 return (long)a
->ptr
- (long)b
->ptr
;
2009 if (a
->encoding
!= REDIS_ENCODING_RAW
) a
= getDecodedObject(a
);
2010 if (b
->encoding
!= REDIS_ENCODING_RAW
) b
= getDecodedObject(a
);
2011 retval
= sdscmp(a
->ptr
,b
->ptr
);
2018 static size_t stringObjectLen(robj
*o
) {
2019 assert(o
->type
== REDIS_STRING
);
2020 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2021 return sdslen(o
->ptr
);
2025 return snprintf(buf
,32,"%ld",(long)o
->ptr
);
2029 /*============================ DB saving/loading ============================ */
2031 static int rdbSaveType(FILE *fp
, unsigned char type
) {
2032 if (fwrite(&type
,1,1,fp
) == 0) return -1;
2036 static int rdbSaveTime(FILE *fp
, time_t t
) {
2037 int32_t t32
= (int32_t) t
;
2038 if (fwrite(&t32
,4,1,fp
) == 0) return -1;
2042 /* check rdbLoadLen() comments for more info */
2043 static int rdbSaveLen(FILE *fp
, uint32_t len
) {
2044 unsigned char buf
[2];
2047 /* Save a 6 bit len */
2048 buf
[0] = (len
&0xFF)|(REDIS_RDB_6BITLEN
<<6);
2049 if (fwrite(buf
,1,1,fp
) == 0) return -1;
2050 } else if (len
< (1<<14)) {
2051 /* Save a 14 bit len */
2052 buf
[0] = ((len
>>8)&0xFF)|(REDIS_RDB_14BITLEN
<<6);
2054 if (fwrite(buf
,2,1,fp
) == 0) return -1;
2056 /* Save a 32 bit len */
2057 buf
[0] = (REDIS_RDB_32BITLEN
<<6);
2058 if (fwrite(buf
,1,1,fp
) == 0) return -1;
2060 if (fwrite(&len
,4,1,fp
) == 0) return -1;
2065 /* String objects in the form "2391" "-100" without any space and with a
2066 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2067 * encoded as integers to save space */
2068 static int rdbTryIntegerEncoding(sds s
, unsigned char *enc
) {
2070 char *endptr
, buf
[32];
2072 /* Check if it's possible to encode this value as a number */
2073 value
= strtoll(s
, &endptr
, 10);
2074 if (endptr
[0] != '\0') return 0;
2075 snprintf(buf
,32,"%lld",value
);
2077 /* If the number converted back into a string is not identical
2078 * then it's not possible to encode the string as integer */
2079 if (strlen(buf
) != sdslen(s
) || memcmp(buf
,s
,sdslen(s
))) return 0;
2081 /* Finally check if it fits in our ranges */
2082 if (value
>= -(1<<7) && value
<= (1<<7)-1) {
2083 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT8
;
2084 enc
[1] = value
&0xFF;
2086 } else if (value
>= -(1<<15) && value
<= (1<<15)-1) {
2087 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT16
;
2088 enc
[1] = value
&0xFF;
2089 enc
[2] = (value
>>8)&0xFF;
2091 } else if (value
>= -((long long)1<<31) && value
<= ((long long)1<<31)-1) {
2092 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT32
;
2093 enc
[1] = value
&0xFF;
2094 enc
[2] = (value
>>8)&0xFF;
2095 enc
[3] = (value
>>16)&0xFF;
2096 enc
[4] = (value
>>24)&0xFF;
2103 static int rdbSaveLzfStringObject(FILE *fp
, robj
*obj
) {
2104 unsigned int comprlen
, outlen
;
2108 /* We require at least four bytes compression for this to be worth it */
2109 outlen
= sdslen(obj
->ptr
)-4;
2110 if (outlen
<= 0) return 0;
2111 if ((out
= zmalloc(outlen
+1)) == NULL
) return 0;
2112 comprlen
= lzf_compress(obj
->ptr
, sdslen(obj
->ptr
), out
, outlen
);
2113 if (comprlen
== 0) {
2117 /* Data compressed! Let's save it on disk */
2118 byte
= (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_LZF
;
2119 if (fwrite(&byte
,1,1,fp
) == 0) goto writeerr
;
2120 if (rdbSaveLen(fp
,comprlen
) == -1) goto writeerr
;
2121 if (rdbSaveLen(fp
,sdslen(obj
->ptr
)) == -1) goto writeerr
;
2122 if (fwrite(out
,comprlen
,1,fp
) == 0) goto writeerr
;
2131 /* Save a string objet as [len][data] on disk. If the object is a string
2132 * representation of an integer value we try to safe it in a special form */
2133 static int rdbSaveStringObjectRaw(FILE *fp
, robj
*obj
) {
2137 len
= sdslen(obj
->ptr
);
2139 /* Try integer encoding */
2141 unsigned char buf
[5];
2142 if ((enclen
= rdbTryIntegerEncoding(obj
->ptr
,buf
)) > 0) {
2143 if (fwrite(buf
,enclen
,1,fp
) == 0) return -1;
2148 /* Try LZF compression - under 20 bytes it's unable to compress even
2149 * aaaaaaaaaaaaaaaaaa so skip it */
2153 retval
= rdbSaveLzfStringObject(fp
,obj
);
2154 if (retval
== -1) return -1;
2155 if (retval
> 0) return 0;
2156 /* retval == 0 means data can't be compressed, save the old way */
2159 /* Store verbatim */
2160 if (rdbSaveLen(fp
,len
) == -1) return -1;
2161 if (len
&& fwrite(obj
->ptr
,len
,1,fp
) == 0) return -1;
2165 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2166 static int rdbSaveStringObject(FILE *fp
, robj
*obj
) {
2170 if (obj
->encoding
!= REDIS_ENCODING_RAW
) {
2171 dec
= getDecodedObject(obj
);
2172 retval
= rdbSaveStringObjectRaw(fp
,dec
);
2176 return rdbSaveStringObjectRaw(fp
,obj
);
2180 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2181 static int rdbSave(char *filename
) {
2182 dictIterator
*di
= NULL
;
2187 time_t now
= time(NULL
);
2189 snprintf(tmpfile
,256,"temp-%d.rdb", (int) getpid());
2190 fp
= fopen(tmpfile
,"w");
2192 redisLog(REDIS_WARNING
, "Failed saving the DB: %s", strerror(errno
));
2195 if (fwrite("REDIS0001",9,1,fp
) == 0) goto werr
;
2196 for (j
= 0; j
< server
.dbnum
; j
++) {
2197 redisDb
*db
= server
.db
+j
;
2199 if (dictSize(d
) == 0) continue;
2200 di
= dictGetIterator(d
);
2206 /* Write the SELECT DB opcode */
2207 if (rdbSaveType(fp
,REDIS_SELECTDB
) == -1) goto werr
;
2208 if (rdbSaveLen(fp
,j
) == -1) goto werr
;
2210 /* Iterate this DB writing every entry */
2211 while((de
= dictNext(di
)) != NULL
) {
2212 robj
*key
= dictGetEntryKey(de
);
2213 robj
*o
= dictGetEntryVal(de
);
2214 time_t expiretime
= getExpire(db
,key
);
2216 /* Save the expire time */
2217 if (expiretime
!= -1) {
2218 /* If this key is already expired skip it */
2219 if (expiretime
< now
) continue;
2220 if (rdbSaveType(fp
,REDIS_EXPIRETIME
) == -1) goto werr
;
2221 if (rdbSaveTime(fp
,expiretime
) == -1) goto werr
;
2223 /* Save the key and associated value */
2224 if (rdbSaveType(fp
,o
->type
) == -1) goto werr
;
2225 if (rdbSaveStringObject(fp
,key
) == -1) goto werr
;
2226 if (o
->type
== REDIS_STRING
) {
2227 /* Save a string value */
2228 if (rdbSaveStringObject(fp
,o
) == -1) goto werr
;
2229 } else if (o
->type
== REDIS_LIST
) {
2230 /* Save a list value */
2231 list
*list
= o
->ptr
;
2235 if (rdbSaveLen(fp
,listLength(list
)) == -1) goto werr
;
2236 while((ln
= listYield(list
))) {
2237 robj
*eleobj
= listNodeValue(ln
);
2239 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
2241 } else if (o
->type
== REDIS_SET
) {
2242 /* Save a set value */
2244 dictIterator
*di
= dictGetIterator(set
);
2247 if (!set
) oom("dictGetIteraotr");
2248 if (rdbSaveLen(fp
,dictSize(set
)) == -1) goto werr
;
2249 while((de
= dictNext(di
)) != NULL
) {
2250 robj
*eleobj
= dictGetEntryKey(de
);
2252 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
2254 dictReleaseIterator(di
);
2259 dictReleaseIterator(di
);
2262 if (rdbSaveType(fp
,REDIS_EOF
) == -1) goto werr
;
2264 /* Make sure data will not remain on the OS's output buffers */
2269 /* Use RENAME to make sure the DB file is changed atomically only
2270 * if the generate DB file is ok. */
2271 if (rename(tmpfile
,filename
) == -1) {
2272 redisLog(REDIS_WARNING
,"Error moving temp DB file on the final destionation: %s", strerror(errno
));
2276 redisLog(REDIS_NOTICE
,"DB saved on disk");
2278 server
.lastsave
= time(NULL
);
2284 redisLog(REDIS_WARNING
,"Write error saving DB on disk: %s", strerror(errno
));
2285 if (di
) dictReleaseIterator(di
);
2289 static int rdbSaveBackground(char *filename
) {
2292 if (server
.bgsaveinprogress
) return REDIS_ERR
;
2293 if ((childpid
= fork()) == 0) {
2296 if (rdbSave(filename
) == REDIS_OK
) {
2303 if (childpid
== -1) {
2304 redisLog(REDIS_WARNING
,"Can't save in background: fork: %s",
2308 redisLog(REDIS_NOTICE
,"Background saving started by pid %d",childpid
);
2309 server
.bgsaveinprogress
= 1;
2310 server
.bgsavechildpid
= childpid
;
2313 return REDIS_OK
; /* unreached */
2316 static void rdbRemoveTempFile(pid_t childpid
) {
2319 snprintf(tmpfile
,256,"temp-%d.rdb", (int) childpid
);
2323 static int rdbLoadType(FILE *fp
) {
2325 if (fread(&type
,1,1,fp
) == 0) return -1;
2329 static time_t rdbLoadTime(FILE *fp
) {
2331 if (fread(&t32
,4,1,fp
) == 0) return -1;
2332 return (time_t) t32
;
2335 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2336 * of this file for a description of how this are stored on disk.
2338 * isencoded is set to 1 if the readed length is not actually a length but
2339 * an "encoding type", check the above comments for more info */
2340 static uint32_t rdbLoadLen(FILE *fp
, int rdbver
, int *isencoded
) {
2341 unsigned char buf
[2];
2344 if (isencoded
) *isencoded
= 0;
2346 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
2351 if (fread(buf
,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2352 type
= (buf
[0]&0xC0)>>6;
2353 if (type
== REDIS_RDB_6BITLEN
) {
2354 /* Read a 6 bit len */
2356 } else if (type
== REDIS_RDB_ENCVAL
) {
2357 /* Read a 6 bit len encoding type */
2358 if (isencoded
) *isencoded
= 1;
2360 } else if (type
== REDIS_RDB_14BITLEN
) {
2361 /* Read a 14 bit len */
2362 if (fread(buf
+1,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2363 return ((buf
[0]&0x3F)<<8)|buf
[1];
2365 /* Read a 32 bit len */
2366 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
2372 static robj
*rdbLoadIntegerObject(FILE *fp
, int enctype
) {
2373 unsigned char enc
[4];
2376 if (enctype
== REDIS_RDB_ENC_INT8
) {
2377 if (fread(enc
,1,1,fp
) == 0) return NULL
;
2378 val
= (signed char)enc
[0];
2379 } else if (enctype
== REDIS_RDB_ENC_INT16
) {
2381 if (fread(enc
,2,1,fp
) == 0) return NULL
;
2382 v
= enc
[0]|(enc
[1]<<8);
2384 } else if (enctype
== REDIS_RDB_ENC_INT32
) {
2386 if (fread(enc
,4,1,fp
) == 0) return NULL
;
2387 v
= enc
[0]|(enc
[1]<<8)|(enc
[2]<<16)|(enc
[3]<<24);
2390 val
= 0; /* anti-warning */
2393 return createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",val
));
2396 static robj
*rdbLoadLzfStringObject(FILE*fp
, int rdbver
) {
2397 unsigned int len
, clen
;
2398 unsigned char *c
= NULL
;
2401 if ((clen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2402 if ((len
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2403 if ((c
= zmalloc(clen
)) == NULL
) goto err
;
2404 if ((val
= sdsnewlen(NULL
,len
)) == NULL
) goto err
;
2405 if (fread(c
,clen
,1,fp
) == 0) goto err
;
2406 if (lzf_decompress(c
,clen
,val
,len
) == 0) goto err
;
2408 return createObject(REDIS_STRING
,val
);
2415 static robj
*rdbLoadStringObject(FILE*fp
, int rdbver
) {
2420 len
= rdbLoadLen(fp
,rdbver
,&isencoded
);
2423 case REDIS_RDB_ENC_INT8
:
2424 case REDIS_RDB_ENC_INT16
:
2425 case REDIS_RDB_ENC_INT32
:
2426 return tryObjectSharing(rdbLoadIntegerObject(fp
,len
));
2427 case REDIS_RDB_ENC_LZF
:
2428 return tryObjectSharing(rdbLoadLzfStringObject(fp
,rdbver
));
2434 if (len
== REDIS_RDB_LENERR
) return NULL
;
2435 val
= sdsnewlen(NULL
,len
);
2436 if (len
&& fread(val
,len
,1,fp
) == 0) {
2440 return tryObjectSharing(createObject(REDIS_STRING
,val
));
2443 static int rdbLoad(char *filename
) {
2445 robj
*keyobj
= NULL
;
2447 int type
, retval
, rdbver
;
2448 dict
*d
= server
.db
[0].dict
;
2449 redisDb
*db
= server
.db
+0;
2451 time_t expiretime
= -1, now
= time(NULL
);
2453 fp
= fopen(filename
,"r");
2454 if (!fp
) return REDIS_ERR
;
2455 if (fread(buf
,9,1,fp
) == 0) goto eoferr
;
2457 if (memcmp(buf
,"REDIS",5) != 0) {
2459 redisLog(REDIS_WARNING
,"Wrong signature trying to load DB from file");
2462 rdbver
= atoi(buf
+5);
2465 redisLog(REDIS_WARNING
,"Can't handle RDB format version %d",rdbver
);
2472 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
2473 if (type
== REDIS_EXPIRETIME
) {
2474 if ((expiretime
= rdbLoadTime(fp
)) == -1) goto eoferr
;
2475 /* We read the time so we need to read the object type again */
2476 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
2478 if (type
== REDIS_EOF
) break;
2479 /* Handle SELECT DB opcode as a special case */
2480 if (type
== REDIS_SELECTDB
) {
2481 if ((dbid
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2483 if (dbid
>= (unsigned)server
.dbnum
) {
2484 redisLog(REDIS_WARNING
,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server
.dbnum
);
2487 db
= server
.db
+dbid
;
2492 if ((keyobj
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2494 if (type
== REDIS_STRING
) {
2495 /* Read string value */
2496 if ((o
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2497 tryObjectEncoding(o
);
2498 } else if (type
== REDIS_LIST
|| type
== REDIS_SET
) {
2499 /* Read list/set value */
2502 if ((listlen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2504 o
= (type
== REDIS_LIST
) ? createListObject() : createSetObject();
2505 /* Load every single element of the list/set */
2509 if ((ele
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2510 tryObjectEncoding(ele
);
2511 if (type
== REDIS_LIST
) {
2512 if (!listAddNodeTail((list
*)o
->ptr
,ele
))
2513 oom("listAddNodeTail");
2515 if (dictAdd((dict
*)o
->ptr
,ele
,NULL
) == DICT_ERR
)
2522 /* Add the new object in the hash table */
2523 retval
= dictAdd(d
,keyobj
,o
);
2524 if (retval
== DICT_ERR
) {
2525 redisLog(REDIS_WARNING
,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj
->ptr
);
2528 /* Set the expire time if needed */
2529 if (expiretime
!= -1) {
2530 setExpire(db
,keyobj
,expiretime
);
2531 /* Delete this key if already expired */
2532 if (expiretime
< now
) deleteKey(db
,keyobj
);
2540 eoferr
: /* unexpected end of file is handled here with a fatal exit */
2541 if (keyobj
) decrRefCount(keyobj
);
2542 redisLog(REDIS_WARNING
,"Short read or OOM loading DB. Unrecoverable error, exiting now.");
2544 return REDIS_ERR
; /* Just to avoid warning */
2547 /*================================== Commands =============================== */
2549 static void authCommand(redisClient
*c
) {
2550 if (!server
.requirepass
|| !strcmp(c
->argv
[1]->ptr
, server
.requirepass
)) {
2551 c
->authenticated
= 1;
2552 addReply(c
,shared
.ok
);
2554 c
->authenticated
= 0;
2555 addReply(c
,shared
.err
);
2559 static void pingCommand(redisClient
*c
) {
2560 addReply(c
,shared
.pong
);
2563 static void echoCommand(redisClient
*c
) {
2564 addReplyBulkLen(c
,c
->argv
[1]);
2565 addReply(c
,c
->argv
[1]);
2566 addReply(c
,shared
.crlf
);
2569 /*=================================== Strings =============================== */
2571 static void setGenericCommand(redisClient
*c
, int nx
) {
2574 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2575 if (retval
== DICT_ERR
) {
2577 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2578 incrRefCount(c
->argv
[2]);
2580 addReply(c
,shared
.czero
);
2584 incrRefCount(c
->argv
[1]);
2585 incrRefCount(c
->argv
[2]);
2588 removeExpire(c
->db
,c
->argv
[1]);
2589 addReply(c
, nx
? shared
.cone
: shared
.ok
);
2592 static void setCommand(redisClient
*c
) {
2593 setGenericCommand(c
,0);
2596 static void setnxCommand(redisClient
*c
) {
2597 setGenericCommand(c
,1);
2600 static void getCommand(redisClient
*c
) {
2601 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2604 addReply(c
,shared
.nullbulk
);
2606 if (o
->type
!= REDIS_STRING
) {
2607 addReply(c
,shared
.wrongtypeerr
);
2609 addReplyBulkLen(c
,o
);
2611 addReply(c
,shared
.crlf
);
2616 static void getsetCommand(redisClient
*c
) {
2618 if (dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]) == DICT_ERR
) {
2619 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2621 incrRefCount(c
->argv
[1]);
2623 incrRefCount(c
->argv
[2]);
2625 removeExpire(c
->db
,c
->argv
[1]);
2628 static void mgetCommand(redisClient
*c
) {
2631 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",c
->argc
-1));
2632 for (j
= 1; j
< c
->argc
; j
++) {
2633 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[j
]);
2635 addReply(c
,shared
.nullbulk
);
2637 if (o
->type
!= REDIS_STRING
) {
2638 addReply(c
,shared
.nullbulk
);
2640 addReplyBulkLen(c
,o
);
2642 addReply(c
,shared
.crlf
);
2648 static void incrDecrCommand(redisClient
*c
, long long incr
) {
2653 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2657 if (o
->type
!= REDIS_STRING
) {
2662 if (o
->encoding
== REDIS_ENCODING_RAW
)
2663 value
= strtoll(o
->ptr
, &eptr
, 10);
2664 else if (o
->encoding
== REDIS_ENCODING_INT
)
2665 value
= (long)o
->ptr
;
2672 o
= createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",value
));
2673 tryObjectEncoding(o
);
2674 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],o
);
2675 if (retval
== DICT_ERR
) {
2676 dictReplace(c
->db
->dict
,c
->argv
[1],o
);
2677 removeExpire(c
->db
,c
->argv
[1]);
2679 incrRefCount(c
->argv
[1]);
2682 addReply(c
,shared
.colon
);
2684 addReply(c
,shared
.crlf
);
2687 static void incrCommand(redisClient
*c
) {
2688 incrDecrCommand(c
,1);
2691 static void decrCommand(redisClient
*c
) {
2692 incrDecrCommand(c
,-1);
2695 static void incrbyCommand(redisClient
*c
) {
2696 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
2697 incrDecrCommand(c
,incr
);
2700 static void decrbyCommand(redisClient
*c
) {
2701 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
2702 incrDecrCommand(c
,-incr
);
2705 /* ========================= Type agnostic commands ========================= */
2707 static void delCommand(redisClient
*c
) {
2710 for (j
= 1; j
< c
->argc
; j
++) {
2711 if (deleteKey(c
->db
,c
->argv
[j
])) {
2718 addReply(c
,shared
.czero
);
2721 addReply(c
,shared
.cone
);
2724 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",deleted
));
2729 static void existsCommand(redisClient
*c
) {
2730 addReply(c
,lookupKeyRead(c
->db
,c
->argv
[1]) ? shared
.cone
: shared
.czero
);
2733 static void selectCommand(redisClient
*c
) {
2734 int id
= atoi(c
->argv
[1]->ptr
);
2736 if (selectDb(c
,id
) == REDIS_ERR
) {
2737 addReplySds(c
,sdsnew("-ERR invalid DB index\r\n"));
2739 addReply(c
,shared
.ok
);
2743 static void randomkeyCommand(redisClient
*c
) {
2747 de
= dictGetRandomKey(c
->db
->dict
);
2748 if (!de
|| expireIfNeeded(c
->db
,dictGetEntryKey(de
)) == 0) break;
2751 addReply(c
,shared
.plus
);
2752 addReply(c
,shared
.crlf
);
2754 addReply(c
,shared
.plus
);
2755 addReply(c
,dictGetEntryKey(de
));
2756 addReply(c
,shared
.crlf
);
2760 static void keysCommand(redisClient
*c
) {
2763 sds pattern
= c
->argv
[1]->ptr
;
2764 int plen
= sdslen(pattern
);
2765 int numkeys
= 0, keyslen
= 0;
2766 robj
*lenobj
= createObject(REDIS_STRING
,NULL
);
2768 di
= dictGetIterator(c
->db
->dict
);
2769 if (!di
) oom("dictGetIterator");
2771 decrRefCount(lenobj
);
2772 while((de
= dictNext(di
)) != NULL
) {
2773 robj
*keyobj
= dictGetEntryKey(de
);
2775 sds key
= keyobj
->ptr
;
2776 if ((pattern
[0] == '*' && pattern
[1] == '\0') ||
2777 stringmatchlen(pattern
,plen
,key
,sdslen(key
),0)) {
2778 if (expireIfNeeded(c
->db
,keyobj
) == 0) {
2780 addReply(c
,shared
.space
);
2783 keyslen
+= sdslen(key
);
2787 dictReleaseIterator(di
);
2788 lenobj
->ptr
= sdscatprintf(sdsempty(),"$%lu\r\n",keyslen
+(numkeys
? (numkeys
-1) : 0));
2789 addReply(c
,shared
.crlf
);
2792 static void dbsizeCommand(redisClient
*c
) {
2794 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c
->db
->dict
)));
2797 static void lastsaveCommand(redisClient
*c
) {
2799 sdscatprintf(sdsempty(),":%lu\r\n",server
.lastsave
));
2802 static void typeCommand(redisClient
*c
) {
2806 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2811 case REDIS_STRING
: type
= "+string"; break;
2812 case REDIS_LIST
: type
= "+list"; break;
2813 case REDIS_SET
: type
= "+set"; break;
2814 default: type
= "unknown"; break;
2817 addReplySds(c
,sdsnew(type
));
2818 addReply(c
,shared
.crlf
);
2821 static void saveCommand(redisClient
*c
) {
2822 if (server
.bgsaveinprogress
) {
2823 addReplySds(c
,sdsnew("-ERR background save in progress\r\n"));
2826 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
2827 addReply(c
,shared
.ok
);
2829 addReply(c
,shared
.err
);
2833 static void bgsaveCommand(redisClient
*c
) {
2834 if (server
.bgsaveinprogress
) {
2835 addReplySds(c
,sdsnew("-ERR background save already in progress\r\n"));
2838 if (rdbSaveBackground(server
.dbfilename
) == REDIS_OK
) {
2839 addReply(c
,shared
.ok
);
2841 addReply(c
,shared
.err
);
2845 static void shutdownCommand(redisClient
*c
) {
2846 redisLog(REDIS_WARNING
,"User requested shutdown, saving DB...");
2847 /* Kill the saving child if there is a background saving in progress.
2848 We want to avoid race conditions, for instance our saving child may
2849 overwrite the synchronous saving did by SHUTDOWN. */
2850 if (server
.bgsaveinprogress
) {
2851 redisLog(REDIS_WARNING
,"There is a live saving child. Killing it!");
2852 kill(server
.bgsavechildpid
,SIGKILL
);
2853 rdbRemoveTempFile(server
.bgsavechildpid
);
2856 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
2857 if (server
.daemonize
)
2858 unlink(server
.pidfile
);
2859 redisLog(REDIS_WARNING
,"%zu bytes used at exit",zmalloc_used_memory());
2860 redisLog(REDIS_WARNING
,"Server exit now, bye bye...");
2863 /* Ooops.. error saving! The best we can do is to continue operating.
2864 * Note that if there was a background saving process, in the next
2865 * cron() Redis will be notified that the background saving aborted,
2866 * handling special stuff like slaves pending for synchronization... */
2867 redisLog(REDIS_WARNING
,"Error trying to save the DB, can't exit");
2868 addReplySds(c
,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
2872 static void renameGenericCommand(redisClient
*c
, int nx
) {
2875 /* To use the same key as src and dst is probably an error */
2876 if (sdscmp(c
->argv
[1]->ptr
,c
->argv
[2]->ptr
) == 0) {
2877 addReply(c
,shared
.sameobjecterr
);
2881 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2883 addReply(c
,shared
.nokeyerr
);
2887 deleteIfVolatile(c
->db
,c
->argv
[2]);
2888 if (dictAdd(c
->db
->dict
,c
->argv
[2],o
) == DICT_ERR
) {
2891 addReply(c
,shared
.czero
);
2894 dictReplace(c
->db
->dict
,c
->argv
[2],o
);
2896 incrRefCount(c
->argv
[2]);
2898 deleteKey(c
->db
,c
->argv
[1]);
2900 addReply(c
,nx
? shared
.cone
: shared
.ok
);
2903 static void renameCommand(redisClient
*c
) {
2904 renameGenericCommand(c
,0);
2907 static void renamenxCommand(redisClient
*c
) {
2908 renameGenericCommand(c
,1);
2911 static void moveCommand(redisClient
*c
) {
2916 /* Obtain source and target DB pointers */
2919 if (selectDb(c
,atoi(c
->argv
[2]->ptr
)) == REDIS_ERR
) {
2920 addReply(c
,shared
.outofrangeerr
);
2924 selectDb(c
,srcid
); /* Back to the source DB */
2926 /* If the user is moving using as target the same
2927 * DB as the source DB it is probably an error. */
2929 addReply(c
,shared
.sameobjecterr
);
2933 /* Check if the element exists and get a reference */
2934 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2936 addReply(c
,shared
.czero
);
2940 /* Try to add the element to the target DB */
2941 deleteIfVolatile(dst
,c
->argv
[1]);
2942 if (dictAdd(dst
->dict
,c
->argv
[1],o
) == DICT_ERR
) {
2943 addReply(c
,shared
.czero
);
2946 incrRefCount(c
->argv
[1]);
2949 /* OK! key moved, free the entry in the source DB */
2950 deleteKey(src
,c
->argv
[1]);
2952 addReply(c
,shared
.cone
);
2955 /* =================================== Lists ================================ */
2956 static void pushGenericCommand(redisClient
*c
, int where
) {
2960 lobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2962 lobj
= createListObject();
2964 if (where
== REDIS_HEAD
) {
2965 if (!listAddNodeHead(list
,c
->argv
[2])) oom("listAddNodeHead");
2967 if (!listAddNodeTail(list
,c
->argv
[2])) oom("listAddNodeTail");
2969 dictAdd(c
->db
->dict
,c
->argv
[1],lobj
);
2970 incrRefCount(c
->argv
[1]);
2971 incrRefCount(c
->argv
[2]);
2973 if (lobj
->type
!= REDIS_LIST
) {
2974 addReply(c
,shared
.wrongtypeerr
);
2978 if (where
== REDIS_HEAD
) {
2979 if (!listAddNodeHead(list
,c
->argv
[2])) oom("listAddNodeHead");
2981 if (!listAddNodeTail(list
,c
->argv
[2])) oom("listAddNodeTail");
2983 incrRefCount(c
->argv
[2]);
2986 addReply(c
,shared
.ok
);
2989 static void lpushCommand(redisClient
*c
) {
2990 pushGenericCommand(c
,REDIS_HEAD
);
2993 static void rpushCommand(redisClient
*c
) {
2994 pushGenericCommand(c
,REDIS_TAIL
);
2997 static void llenCommand(redisClient
*c
) {
3001 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3003 addReply(c
,shared
.czero
);
3006 if (o
->type
!= REDIS_LIST
) {
3007 addReply(c
,shared
.wrongtypeerr
);
3010 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",listLength(l
)));
3015 static void lindexCommand(redisClient
*c
) {
3017 int index
= atoi(c
->argv
[2]->ptr
);
3019 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3021 addReply(c
,shared
.nullbulk
);
3023 if (o
->type
!= REDIS_LIST
) {
3024 addReply(c
,shared
.wrongtypeerr
);
3026 list
*list
= o
->ptr
;
3029 ln
= listIndex(list
, index
);
3031 addReply(c
,shared
.nullbulk
);
3033 robj
*ele
= listNodeValue(ln
);
3034 addReplyBulkLen(c
,ele
);
3036 addReply(c
,shared
.crlf
);
3042 static void lsetCommand(redisClient
*c
) {
3044 int index
= atoi(c
->argv
[2]->ptr
);
3046 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3048 addReply(c
,shared
.nokeyerr
);
3050 if (o
->type
!= REDIS_LIST
) {
3051 addReply(c
,shared
.wrongtypeerr
);
3053 list
*list
= o
->ptr
;
3056 ln
= listIndex(list
, index
);
3058 addReply(c
,shared
.outofrangeerr
);
3060 robj
*ele
= listNodeValue(ln
);
3063 listNodeValue(ln
) = c
->argv
[3];
3064 incrRefCount(c
->argv
[3]);
3065 addReply(c
,shared
.ok
);
3072 static void popGenericCommand(redisClient
*c
, int where
) {
3075 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3077 addReply(c
,shared
.nullbulk
);
3079 if (o
->type
!= REDIS_LIST
) {
3080 addReply(c
,shared
.wrongtypeerr
);
3082 list
*list
= o
->ptr
;
3085 if (where
== REDIS_HEAD
)
3086 ln
= listFirst(list
);
3088 ln
= listLast(list
);
3091 addReply(c
,shared
.nullbulk
);
3093 robj
*ele
= listNodeValue(ln
);
3094 addReplyBulkLen(c
,ele
);
3096 addReply(c
,shared
.crlf
);
3097 listDelNode(list
,ln
);
3104 static void lpopCommand(redisClient
*c
) {
3105 popGenericCommand(c
,REDIS_HEAD
);
3108 static void rpopCommand(redisClient
*c
) {
3109 popGenericCommand(c
,REDIS_TAIL
);
3112 static void lrangeCommand(redisClient
*c
) {
3114 int start
= atoi(c
->argv
[2]->ptr
);
3115 int end
= atoi(c
->argv
[3]->ptr
);
3117 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3119 addReply(c
,shared
.nullmultibulk
);
3121 if (o
->type
!= REDIS_LIST
) {
3122 addReply(c
,shared
.wrongtypeerr
);
3124 list
*list
= o
->ptr
;
3126 int llen
= listLength(list
);
3130 /* convert negative indexes */
3131 if (start
< 0) start
= llen
+start
;
3132 if (end
< 0) end
= llen
+end
;
3133 if (start
< 0) start
= 0;
3134 if (end
< 0) end
= 0;
3136 /* indexes sanity checks */
3137 if (start
> end
|| start
>= llen
) {
3138 /* Out of range start or start > end result in empty list */
3139 addReply(c
,shared
.emptymultibulk
);
3142 if (end
>= llen
) end
= llen
-1;
3143 rangelen
= (end
-start
)+1;
3145 /* Return the result in form of a multi-bulk reply */
3146 ln
= listIndex(list
, start
);
3147 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",rangelen
));
3148 for (j
= 0; j
< rangelen
; j
++) {
3149 ele
= listNodeValue(ln
);
3150 addReplyBulkLen(c
,ele
);
3152 addReply(c
,shared
.crlf
);
3159 static void ltrimCommand(redisClient
*c
) {
3161 int start
= atoi(c
->argv
[2]->ptr
);
3162 int end
= atoi(c
->argv
[3]->ptr
);
3164 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3166 addReply(c
,shared
.nokeyerr
);
3168 if (o
->type
!= REDIS_LIST
) {
3169 addReply(c
,shared
.wrongtypeerr
);
3171 list
*list
= o
->ptr
;
3173 int llen
= listLength(list
);
3174 int j
, ltrim
, rtrim
;
3176 /* convert negative indexes */
3177 if (start
< 0) start
= llen
+start
;
3178 if (end
< 0) end
= llen
+end
;
3179 if (start
< 0) start
= 0;
3180 if (end
< 0) end
= 0;
3182 /* indexes sanity checks */
3183 if (start
> end
|| start
>= llen
) {
3184 /* Out of range start or start > end result in empty list */
3188 if (end
>= llen
) end
= llen
-1;
3193 /* Remove list elements to perform the trim */
3194 for (j
= 0; j
< ltrim
; j
++) {
3195 ln
= listFirst(list
);
3196 listDelNode(list
,ln
);
3198 for (j
= 0; j
< rtrim
; j
++) {
3199 ln
= listLast(list
);
3200 listDelNode(list
,ln
);
3203 addReply(c
,shared
.ok
);
3208 static void lremCommand(redisClient
*c
) {
3211 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3213 addReply(c
,shared
.czero
);
3215 if (o
->type
!= REDIS_LIST
) {
3216 addReply(c
,shared
.wrongtypeerr
);
3218 list
*list
= o
->ptr
;
3219 listNode
*ln
, *next
;
3220 int toremove
= atoi(c
->argv
[2]->ptr
);
3225 toremove
= -toremove
;
3228 ln
= fromtail
? list
->tail
: list
->head
;
3230 robj
*ele
= listNodeValue(ln
);
3232 next
= fromtail
? ln
->prev
: ln
->next
;
3233 if (compareStringObjects(ele
,c
->argv
[3]) == 0) {
3234 listDelNode(list
,ln
);
3237 if (toremove
&& removed
== toremove
) break;
3241 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",removed
));
3246 /* ==================================== Sets ================================ */
3248 static void saddCommand(redisClient
*c
) {
3251 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3253 set
= createSetObject();
3254 dictAdd(c
->db
->dict
,c
->argv
[1],set
);
3255 incrRefCount(c
->argv
[1]);
3257 if (set
->type
!= REDIS_SET
) {
3258 addReply(c
,shared
.wrongtypeerr
);
3262 if (dictAdd(set
->ptr
,c
->argv
[2],NULL
) == DICT_OK
) {
3263 incrRefCount(c
->argv
[2]);
3265 addReply(c
,shared
.cone
);
3267 addReply(c
,shared
.czero
);
3271 static void sremCommand(redisClient
*c
) {
3274 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3276 addReply(c
,shared
.czero
);
3278 if (set
->type
!= REDIS_SET
) {
3279 addReply(c
,shared
.wrongtypeerr
);
3282 if (dictDelete(set
->ptr
,c
->argv
[2]) == DICT_OK
) {
3284 if (htNeedsResize(set
->ptr
)) dictResize(set
->ptr
);
3285 addReply(c
,shared
.cone
);
3287 addReply(c
,shared
.czero
);
3292 static void smoveCommand(redisClient
*c
) {
3293 robj
*srcset
, *dstset
;
3295 srcset
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3296 dstset
= lookupKeyWrite(c
->db
,c
->argv
[2]);
3298 /* If the source key does not exist return 0, if it's of the wrong type
3300 if (srcset
== NULL
|| srcset
->type
!= REDIS_SET
) {
3301 addReply(c
, srcset
? shared
.wrongtypeerr
: shared
.czero
);
3304 /* Error if the destination key is not a set as well */
3305 if (dstset
&& dstset
->type
!= REDIS_SET
) {
3306 addReply(c
,shared
.wrongtypeerr
);
3309 /* Remove the element from the source set */
3310 if (dictDelete(srcset
->ptr
,c
->argv
[3]) == DICT_ERR
) {
3311 /* Key not found in the src set! return zero */
3312 addReply(c
,shared
.czero
);
3316 /* Add the element to the destination set */
3318 dstset
= createSetObject();
3319 dictAdd(c
->db
->dict
,c
->argv
[2],dstset
);
3320 incrRefCount(c
->argv
[2]);
3322 if (dictAdd(dstset
->ptr
,c
->argv
[3],NULL
) == DICT_OK
)
3323 incrRefCount(c
->argv
[3]);
3324 addReply(c
,shared
.cone
);
3327 static void sismemberCommand(redisClient
*c
) {
3330 set
= lookupKeyRead(c
->db
,c
->argv
[1]);
3332 addReply(c
,shared
.czero
);
3334 if (set
->type
!= REDIS_SET
) {
3335 addReply(c
,shared
.wrongtypeerr
);
3338 if (dictFind(set
->ptr
,c
->argv
[2]))
3339 addReply(c
,shared
.cone
);
3341 addReply(c
,shared
.czero
);
3345 static void scardCommand(redisClient
*c
) {
3349 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3351 addReply(c
,shared
.czero
);
3354 if (o
->type
!= REDIS_SET
) {
3355 addReply(c
,shared
.wrongtypeerr
);
3358 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",
3364 static void spopCommand(redisClient
*c
) {
3368 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3370 addReply(c
,shared
.nullbulk
);
3372 if (set
->type
!= REDIS_SET
) {
3373 addReply(c
,shared
.wrongtypeerr
);
3376 de
= dictGetRandomKey(set
->ptr
);
3378 addReply(c
,shared
.nullbulk
);
3380 robj
*ele
= dictGetEntryKey(de
);
3382 addReplyBulkLen(c
,ele
);
3384 addReply(c
,shared
.crlf
);
3385 dictDelete(set
->ptr
,ele
);
3386 if (htNeedsResize(set
->ptr
)) dictResize(set
->ptr
);
3392 static void srandmemberCommand(redisClient
*c
) {
3396 set
= lookupKeyRead(c
->db
,c
->argv
[1]);
3398 addReply(c
,shared
.nullbulk
);
3400 if (set
->type
!= REDIS_SET
) {
3401 addReply(c
,shared
.wrongtypeerr
);
3404 de
= dictGetRandomKey(set
->ptr
);
3406 addReply(c
,shared
.nullbulk
);
3408 robj
*ele
= dictGetEntryKey(de
);
3410 addReplyBulkLen(c
,ele
);
3412 addReply(c
,shared
.crlf
);
3417 static int qsortCompareSetsByCardinality(const void *s1
, const void *s2
) {
3418 dict
**d1
= (void*) s1
, **d2
= (void*) s2
;
3420 return dictSize(*d1
)-dictSize(*d2
);
3423 static void sinterGenericCommand(redisClient
*c
, robj
**setskeys
, int setsnum
, robj
*dstkey
) {
3424 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
3427 robj
*lenobj
= NULL
, *dstset
= NULL
;
3428 int j
, cardinality
= 0;
3430 if (!dv
) oom("sinterGenericCommand");
3431 for (j
= 0; j
< setsnum
; j
++) {
3435 lookupKeyWrite(c
->db
,setskeys
[j
]) :
3436 lookupKeyRead(c
->db
,setskeys
[j
]);
3440 deleteKey(c
->db
,dstkey
);
3441 addReply(c
,shared
.ok
);
3443 addReply(c
,shared
.nullmultibulk
);
3447 if (setobj
->type
!= REDIS_SET
) {
3449 addReply(c
,shared
.wrongtypeerr
);
3452 dv
[j
] = setobj
->ptr
;
3454 /* Sort sets from the smallest to largest, this will improve our
3455 * algorithm's performace */
3456 qsort(dv
,setsnum
,sizeof(dict
*),qsortCompareSetsByCardinality
);
3458 /* The first thing we should output is the total number of elements...
3459 * since this is a multi-bulk write, but at this stage we don't know
3460 * the intersection set size, so we use a trick, append an empty object
3461 * to the output list and save the pointer to later modify it with the
3464 lenobj
= createObject(REDIS_STRING
,NULL
);
3466 decrRefCount(lenobj
);
3468 /* If we have a target key where to store the resulting set
3469 * create this key with an empty set inside */
3470 dstset
= createSetObject();
3473 /* Iterate all the elements of the first (smallest) set, and test
3474 * the element against all the other sets, if at least one set does
3475 * not include the element it is discarded */
3476 di
= dictGetIterator(dv
[0]);
3477 if (!di
) oom("dictGetIterator");
3479 while((de
= dictNext(di
)) != NULL
) {
3482 for (j
= 1; j
< setsnum
; j
++)
3483 if (dictFind(dv
[j
],dictGetEntryKey(de
)) == NULL
) break;
3485 continue; /* at least one set does not contain the member */
3486 ele
= dictGetEntryKey(de
);
3488 addReplyBulkLen(c
,ele
);
3490 addReply(c
,shared
.crlf
);
3493 dictAdd(dstset
->ptr
,ele
,NULL
);
3497 dictReleaseIterator(di
);
3500 /* Store the resulting set into the target */
3501 deleteKey(c
->db
,dstkey
);
3502 dictAdd(c
->db
->dict
,dstkey
,dstset
);
3503 incrRefCount(dstkey
);
3507 lenobj
->ptr
= sdscatprintf(sdsempty(),"*%d\r\n",cardinality
);
3509 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",
3510 dictSize((dict
*)dstset
->ptr
)));
3516 static void sinterCommand(redisClient
*c
) {
3517 sinterGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
);
3520 static void sinterstoreCommand(redisClient
*c
) {
3521 sinterGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1]);
3524 #define REDIS_OP_UNION 0
3525 #define REDIS_OP_DIFF 1
3527 static void sunionDiffGenericCommand(redisClient
*c
, robj
**setskeys
, int setsnum
, robj
*dstkey
, int op
) {
3528 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
3531 robj
*dstset
= NULL
;
3532 int j
, cardinality
= 0;
3534 if (!dv
) oom("sunionDiffGenericCommand");
3535 for (j
= 0; j
< setsnum
; j
++) {
3539 lookupKeyWrite(c
->db
,setskeys
[j
]) :
3540 lookupKeyRead(c
->db
,setskeys
[j
]);
3545 if (setobj
->type
!= REDIS_SET
) {
3547 addReply(c
,shared
.wrongtypeerr
);
3550 dv
[j
] = setobj
->ptr
;
3553 /* We need a temp set object to store our union. If the dstkey
3554 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
3555 * this set object will be the resulting object to set into the target key*/
3556 dstset
= createSetObject();
3558 /* Iterate all the elements of all the sets, add every element a single
3559 * time to the result set */
3560 for (j
= 0; j
< setsnum
; j
++) {
3561 if (op
== REDIS_OP_DIFF
&& j
== 0 && !dv
[j
]) break; /* result set is empty */
3562 if (!dv
[j
]) continue; /* non existing keys are like empty sets */
3564 di
= dictGetIterator(dv
[j
]);
3565 if (!di
) oom("dictGetIterator");
3567 while((de
= dictNext(di
)) != NULL
) {
3570 /* dictAdd will not add the same element multiple times */
3571 ele
= dictGetEntryKey(de
);
3572 if (op
== REDIS_OP_UNION
|| j
== 0) {
3573 if (dictAdd(dstset
->ptr
,ele
,NULL
) == DICT_OK
) {
3577 } else if (op
== REDIS_OP_DIFF
) {
3578 if (dictDelete(dstset
->ptr
,ele
) == DICT_OK
) {
3583 dictReleaseIterator(di
);
3585 if (op
== REDIS_OP_DIFF
&& cardinality
== 0) break; /* result set is empty */
3588 /* Output the content of the resulting set, if not in STORE mode */
3590 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",cardinality
));
3591 di
= dictGetIterator(dstset
->ptr
);
3592 if (!di
) oom("dictGetIterator");
3593 while((de
= dictNext(di
)) != NULL
) {
3596 ele
= dictGetEntryKey(de
);
3597 addReplyBulkLen(c
,ele
);
3599 addReply(c
,shared
.crlf
);
3601 dictReleaseIterator(di
);
3603 /* If we have a target key where to store the resulting set
3604 * create this key with the result set inside */
3605 deleteKey(c
->db
,dstkey
);
3606 dictAdd(c
->db
->dict
,dstkey
,dstset
);
3607 incrRefCount(dstkey
);
3612 decrRefCount(dstset
);
3614 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",
3615 dictSize((dict
*)dstset
->ptr
)));
3621 static void sunionCommand(redisClient
*c
) {
3622 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_UNION
);
3625 static void sunionstoreCommand(redisClient
*c
) {
3626 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_UNION
);
3629 static void sdiffCommand(redisClient
*c
) {
3630 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_DIFF
);
3633 static void sdiffstoreCommand(redisClient
*c
) {
3634 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_DIFF
);
3637 static void flushdbCommand(redisClient
*c
) {
3638 server
.dirty
+= dictSize(c
->db
->dict
);
3639 dictEmpty(c
->db
->dict
);
3640 dictEmpty(c
->db
->expires
);
3641 addReply(c
,shared
.ok
);
3644 static void flushallCommand(redisClient
*c
) {
3645 server
.dirty
+= emptyDb();
3646 addReply(c
,shared
.ok
);
3647 rdbSave(server
.dbfilename
);
3651 static redisSortOperation
*createSortOperation(int type
, robj
*pattern
) {
3652 redisSortOperation
*so
= zmalloc(sizeof(*so
));
3653 if (!so
) oom("createSortOperation");
3655 so
->pattern
= pattern
;
3659 /* Return the value associated to the key with a name obtained
3660 * substituting the first occurence of '*' in 'pattern' with 'subst' */
3661 static robj
*lookupKeyByPattern(redisDb
*db
, robj
*pattern
, robj
*subst
) {
3665 int prefixlen
, sublen
, postfixlen
;
3666 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
3670 char buf
[REDIS_SORTKEY_MAX
+1];
3673 if (subst
->encoding
== REDIS_ENCODING_RAW
)
3674 incrRefCount(subst
);
3676 subst
= getDecodedObject(subst
);
3679 spat
= pattern
->ptr
;
3681 if (sdslen(spat
)+sdslen(ssub
)-1 > REDIS_SORTKEY_MAX
) return NULL
;
3682 p
= strchr(spat
,'*');
3683 if (!p
) return NULL
;
3686 sublen
= sdslen(ssub
);
3687 postfixlen
= sdslen(spat
)-(prefixlen
+1);
3688 memcpy(keyname
.buf
,spat
,prefixlen
);
3689 memcpy(keyname
.buf
+prefixlen
,ssub
,sublen
);
3690 memcpy(keyname
.buf
+prefixlen
+sublen
,p
+1,postfixlen
);
3691 keyname
.buf
[prefixlen
+sublen
+postfixlen
] = '\0';
3692 keyname
.len
= prefixlen
+sublen
+postfixlen
;
3694 keyobj
.refcount
= 1;
3695 keyobj
.type
= REDIS_STRING
;
3696 keyobj
.ptr
= ((char*)&keyname
)+(sizeof(long)*2);
3698 decrRefCount(subst
);
3700 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
3701 return lookupKeyRead(db
,&keyobj
);
3704 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
3705 * the additional parameter is not standard but a BSD-specific we have to
3706 * pass sorting parameters via the global 'server' structure */
3707 static int sortCompare(const void *s1
, const void *s2
) {
3708 const redisSortObject
*so1
= s1
, *so2
= s2
;
3711 if (!server
.sort_alpha
) {
3712 /* Numeric sorting. Here it's trivial as we precomputed scores */
3713 if (so1
->u
.score
> so2
->u
.score
) {
3715 } else if (so1
->u
.score
< so2
->u
.score
) {
3721 /* Alphanumeric sorting */
3722 if (server
.sort_bypattern
) {
3723 if (!so1
->u
.cmpobj
|| !so2
->u
.cmpobj
) {
3724 /* At least one compare object is NULL */
3725 if (so1
->u
.cmpobj
== so2
->u
.cmpobj
)
3727 else if (so1
->u
.cmpobj
== NULL
)
3732 /* We have both the objects, use strcoll */
3733 cmp
= strcoll(so1
->u
.cmpobj
->ptr
,so2
->u
.cmpobj
->ptr
);
3736 /* Compare elements directly */
3737 if (so1
->obj
->encoding
== REDIS_ENCODING_RAW
&&
3738 so2
->obj
->encoding
== REDIS_ENCODING_RAW
) {
3739 cmp
= strcoll(so1
->obj
->ptr
,so2
->obj
->ptr
);
3743 dec1
= so1
->obj
->encoding
== REDIS_ENCODING_RAW
?
3744 so1
->obj
: getDecodedObject(so1
->obj
);
3745 dec2
= so2
->obj
->encoding
== REDIS_ENCODING_RAW
?
3746 so2
->obj
: getDecodedObject(so2
->obj
);
3747 cmp
= strcoll(dec1
->ptr
,dec2
->ptr
);
3748 if (dec1
!= so1
->obj
) decrRefCount(dec1
);
3749 if (dec2
!= so2
->obj
) decrRefCount(dec2
);
3753 return server
.sort_desc
? -cmp
: cmp
;
3756 /* The SORT command is the most complex command in Redis. Warning: this code
3757 * is optimized for speed and a bit less for readability */
3758 static void sortCommand(redisClient
*c
) {
3761 int desc
= 0, alpha
= 0;
3762 int limit_start
= 0, limit_count
= -1, start
, end
;
3763 int j
, dontsort
= 0, vectorlen
;
3764 int getop
= 0; /* GET operation counter */
3765 robj
*sortval
, *sortby
= NULL
;
3766 redisSortObject
*vector
; /* Resulting vector to sort */
3768 /* Lookup the key to sort. It must be of the right types */
3769 sortval
= lookupKeyRead(c
->db
,c
->argv
[1]);
3770 if (sortval
== NULL
) {
3771 addReply(c
,shared
.nokeyerr
);
3774 if (sortval
->type
!= REDIS_SET
&& sortval
->type
!= REDIS_LIST
) {
3775 addReply(c
,shared
.wrongtypeerr
);
3779 /* Create a list of operations to perform for every sorted element.
3780 * Operations can be GET/DEL/INCR/DECR */
3781 operations
= listCreate();
3782 listSetFreeMethod(operations
,zfree
);
3785 /* Now we need to protect sortval incrementing its count, in the future
3786 * SORT may have options able to overwrite/delete keys during the sorting
3787 * and the sorted key itself may get destroied */
3788 incrRefCount(sortval
);
3790 /* The SORT command has an SQL-alike syntax, parse it */
3791 while(j
< c
->argc
) {
3792 int leftargs
= c
->argc
-j
-1;
3793 if (!strcasecmp(c
->argv
[j
]->ptr
,"asc")) {
3795 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"desc")) {
3797 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"alpha")) {
3799 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"limit") && leftargs
>= 2) {
3800 limit_start
= atoi(c
->argv
[j
+1]->ptr
);
3801 limit_count
= atoi(c
->argv
[j
+2]->ptr
);
3803 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"by") && leftargs
>= 1) {
3804 sortby
= c
->argv
[j
+1];
3805 /* If the BY pattern does not contain '*', i.e. it is constant,
3806 * we don't need to sort nor to lookup the weight keys. */
3807 if (strchr(c
->argv
[j
+1]->ptr
,'*') == NULL
) dontsort
= 1;
3809 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"get") && leftargs
>= 1) {
3810 listAddNodeTail(operations
,createSortOperation(
3811 REDIS_SORT_GET
,c
->argv
[j
+1]));
3814 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"del") && leftargs
>= 1) {
3815 listAddNodeTail(operations
,createSortOperation(
3816 REDIS_SORT_DEL
,c
->argv
[j
+1]));
3818 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"incr") && leftargs
>= 1) {
3819 listAddNodeTail(operations
,createSortOperation(
3820 REDIS_SORT_INCR
,c
->argv
[j
+1]));
3822 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"get") && leftargs
>= 1) {
3823 listAddNodeTail(operations
,createSortOperation(
3824 REDIS_SORT_DECR
,c
->argv
[j
+1]));
3827 decrRefCount(sortval
);
3828 listRelease(operations
);
3829 addReply(c
,shared
.syntaxerr
);
3835 /* Load the sorting vector with all the objects to sort */
3836 vectorlen
= (sortval
->type
== REDIS_LIST
) ?
3837 listLength((list
*)sortval
->ptr
) :
3838 dictSize((dict
*)sortval
->ptr
);
3839 vector
= zmalloc(sizeof(redisSortObject
)*vectorlen
);
3840 if (!vector
) oom("allocating objects vector for SORT");
3842 if (sortval
->type
== REDIS_LIST
) {
3843 list
*list
= sortval
->ptr
;
3847 while((ln
= listYield(list
))) {
3848 robj
*ele
= ln
->value
;
3849 vector
[j
].obj
= ele
;
3850 vector
[j
].u
.score
= 0;
3851 vector
[j
].u
.cmpobj
= NULL
;
3855 dict
*set
= sortval
->ptr
;
3859 di
= dictGetIterator(set
);
3860 if (!di
) oom("dictGetIterator");
3861 while((setele
= dictNext(di
)) != NULL
) {
3862 vector
[j
].obj
= dictGetEntryKey(setele
);
3863 vector
[j
].u
.score
= 0;
3864 vector
[j
].u
.cmpobj
= NULL
;
3867 dictReleaseIterator(di
);
3869 assert(j
== vectorlen
);
3871 /* Now it's time to load the right scores in the sorting vector */
3872 if (dontsort
== 0) {
3873 for (j
= 0; j
< vectorlen
; j
++) {
3877 byval
= lookupKeyByPattern(c
->db
,sortby
,vector
[j
].obj
);
3878 if (!byval
|| byval
->type
!= REDIS_STRING
) continue;
3880 if (byval
->encoding
== REDIS_ENCODING_RAW
) {
3881 vector
[j
].u
.cmpobj
= byval
;
3882 incrRefCount(byval
);
3884 vector
[j
].u
.cmpobj
= getDecodedObject(byval
);
3887 if (byval
->encoding
== REDIS_ENCODING_RAW
) {
3888 vector
[j
].u
.score
= strtod(byval
->ptr
,NULL
);
3890 if (byval
->encoding
== REDIS_ENCODING_INT
) {
3891 vector
[j
].u
.score
= (long)byval
->ptr
;
3898 if (vector
[j
].obj
->encoding
== REDIS_ENCODING_RAW
)
3899 vector
[j
].u
.score
= strtod(vector
[j
].obj
->ptr
,NULL
);
3901 if (vector
[j
].obj
->encoding
== REDIS_ENCODING_INT
)
3902 vector
[j
].u
.score
= (long) vector
[j
].obj
->ptr
;
3911 /* We are ready to sort the vector... perform a bit of sanity check
3912 * on the LIMIT option too. We'll use a partial version of quicksort. */
3913 start
= (limit_start
< 0) ? 0 : limit_start
;
3914 end
= (limit_count
< 0) ? vectorlen
-1 : start
+limit_count
-1;
3915 if (start
>= vectorlen
) {
3916 start
= vectorlen
-1;
3919 if (end
>= vectorlen
) end
= vectorlen
-1;
3921 if (dontsort
== 0) {
3922 server
.sort_desc
= desc
;
3923 server
.sort_alpha
= alpha
;
3924 server
.sort_bypattern
= sortby
? 1 : 0;
3925 if (sortby
&& (start
!= 0 || end
!= vectorlen
-1))
3926 pqsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
, start
,end
);
3928 qsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
);
3931 /* Send command output to the output buffer, performing the specified
3932 * GET/DEL/INCR/DECR operations if any. */
3933 outputlen
= getop
? getop
*(end
-start
+1) : end
-start
+1;
3934 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",outputlen
));
3935 for (j
= start
; j
<= end
; j
++) {
3938 addReplyBulkLen(c
,vector
[j
].obj
);
3939 addReply(c
,vector
[j
].obj
);
3940 addReply(c
,shared
.crlf
);
3942 listRewind(operations
);
3943 while((ln
= listYield(operations
))) {
3944 redisSortOperation
*sop
= ln
->value
;
3945 robj
*val
= lookupKeyByPattern(c
->db
,sop
->pattern
,
3948 if (sop
->type
== REDIS_SORT_GET
) {
3949 if (!val
|| val
->type
!= REDIS_STRING
) {
3950 addReply(c
,shared
.nullbulk
);
3952 addReplyBulkLen(c
,val
);
3954 addReply(c
,shared
.crlf
);
3956 } else if (sop
->type
== REDIS_SORT_DEL
) {
3963 decrRefCount(sortval
);
3964 listRelease(operations
);
3965 for (j
= 0; j
< vectorlen
; j
++) {
3966 if (sortby
&& alpha
&& vector
[j
].u
.cmpobj
)
3967 decrRefCount(vector
[j
].u
.cmpobj
);
3972 static void infoCommand(redisClient
*c
) {
3974 time_t uptime
= time(NULL
)-server
.stat_starttime
;
3977 info
= sdscatprintf(sdsempty(),
3978 "redis_version:%s\r\n"
3980 "uptime_in_seconds:%d\r\n"
3981 "uptime_in_days:%d\r\n"
3982 "connected_clients:%d\r\n"
3983 "connected_slaves:%d\r\n"
3984 "used_memory:%zu\r\n"
3985 "changes_since_last_save:%lld\r\n"
3986 "bgsave_in_progress:%d\r\n"
3987 "last_save_time:%d\r\n"
3988 "total_connections_received:%lld\r\n"
3989 "total_commands_processed:%lld\r\n"
3992 (sizeof(long) == 8) ? "64" : "32",
3995 listLength(server
.clients
)-listLength(server
.slaves
),
3996 listLength(server
.slaves
),
3999 server
.bgsaveinprogress
,
4001 server
.stat_numconnections
,
4002 server
.stat_numcommands
,
4003 server
.masterhost
== NULL
? "master" : "slave"
4005 if (server
.masterhost
) {
4006 info
= sdscatprintf(info
,
4007 "master_host:%s\r\n"
4008 "master_port:%d\r\n"
4009 "master_link_status:%s\r\n"
4010 "master_last_io_seconds_ago:%d\r\n"
4013 (server
.replstate
== REDIS_REPL_CONNECTED
) ?
4015 (int)(time(NULL
)-server
.master
->lastinteraction
)
4018 for (j
= 0; j
< server
.dbnum
; j
++) {
4019 long long keys
, vkeys
;
4021 keys
= dictSize(server
.db
[j
].dict
);
4022 vkeys
= dictSize(server
.db
[j
].expires
);
4023 if (keys
|| vkeys
) {
4024 info
= sdscatprintf(info
, "db%d: keys=%lld,expires=%lld\r\n",
4028 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info
)));
4029 addReplySds(c
,info
);
4030 addReply(c
,shared
.crlf
);
4033 static void monitorCommand(redisClient
*c
) {
4034 /* ignore MONITOR if aleady slave or in monitor mode */
4035 if (c
->flags
& REDIS_SLAVE
) return;
4037 c
->flags
|= (REDIS_SLAVE
|REDIS_MONITOR
);
4039 if (!listAddNodeTail(server
.monitors
,c
)) oom("listAddNodeTail");
4040 addReply(c
,shared
.ok
);
4043 /* ================================= Expire ================================= */
4044 static int removeExpire(redisDb
*db
, robj
*key
) {
4045 if (dictDelete(db
->expires
,key
) == DICT_OK
) {
4052 static int setExpire(redisDb
*db
, robj
*key
, time_t when
) {
4053 if (dictAdd(db
->expires
,key
,(void*)when
) == DICT_ERR
) {
4061 /* Return the expire time of the specified key, or -1 if no expire
4062 * is associated with this key (i.e. the key is non volatile) */
4063 static time_t getExpire(redisDb
*db
, robj
*key
) {
4066 /* No expire? return ASAP */
4067 if (dictSize(db
->expires
) == 0 ||
4068 (de
= dictFind(db
->expires
,key
)) == NULL
) return -1;
4070 return (time_t) dictGetEntryVal(de
);
4073 static int expireIfNeeded(redisDb
*db
, robj
*key
) {
4077 /* No expire? return ASAP */
4078 if (dictSize(db
->expires
) == 0 ||
4079 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
4081 /* Lookup the expire */
4082 when
= (time_t) dictGetEntryVal(de
);
4083 if (time(NULL
) <= when
) return 0;
4085 /* Delete the key */
4086 dictDelete(db
->expires
,key
);
4087 return dictDelete(db
->dict
,key
) == DICT_OK
;
4090 static int deleteIfVolatile(redisDb
*db
, robj
*key
) {
4093 /* No expire? return ASAP */
4094 if (dictSize(db
->expires
) == 0 ||
4095 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
4097 /* Delete the key */
4099 dictDelete(db
->expires
,key
);
4100 return dictDelete(db
->dict
,key
) == DICT_OK
;
4103 static void expireCommand(redisClient
*c
) {
4105 int seconds
= atoi(c
->argv
[2]->ptr
);
4107 de
= dictFind(c
->db
->dict
,c
->argv
[1]);
4109 addReply(c
,shared
.czero
);
4113 addReply(c
, shared
.czero
);
4116 time_t when
= time(NULL
)+seconds
;
4117 if (setExpire(c
->db
,c
->argv
[1],when
)) {
4118 addReply(c
,shared
.cone
);
4121 addReply(c
,shared
.czero
);
4127 static void ttlCommand(redisClient
*c
) {
4131 expire
= getExpire(c
->db
,c
->argv
[1]);
4133 ttl
= (int) (expire
-time(NULL
));
4134 if (ttl
< 0) ttl
= -1;
4136 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",ttl
));
4139 static void msetGenericCommand(redisClient
*c
, int nx
) {
4142 if ((c
->argc
% 2) == 0) {
4143 addReplySds(c
,sdsnew("-ERR wrong number of arguments\r\n"));
4146 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
4147 * set nothing at all if at least one already key exists. */
4149 for (j
= 1; j
< c
->argc
; j
+= 2) {
4150 if (dictFind(c
->db
->dict
,c
->argv
[j
]) != NULL
) {
4151 addReply(c
, shared
.czero
);
4157 for (j
= 1; j
< c
->argc
; j
+= 2) {
4160 retval
= dictAdd(c
->db
->dict
,c
->argv
[j
],c
->argv
[j
+1]);
4161 if (retval
== DICT_ERR
) {
4162 dictReplace(c
->db
->dict
,c
->argv
[j
],c
->argv
[j
+1]);
4163 incrRefCount(c
->argv
[j
+1]);
4165 incrRefCount(c
->argv
[j
]);
4166 incrRefCount(c
->argv
[j
+1]);
4168 removeExpire(c
->db
,c
->argv
[j
]);
4170 server
.dirty
+= (c
->argc
-1)/2;
4171 addReply(c
, nx
? shared
.cone
: shared
.ok
);
4174 static void msetCommand(redisClient
*c
) {
4175 msetGenericCommand(c
,0);
4178 static void msetnxCommand(redisClient
*c
) {
4179 msetGenericCommand(c
,1);
4182 /* =============================== Replication ============================= */
4184 static int syncWrite(int fd
, char *ptr
, ssize_t size
, int timeout
) {
4185 ssize_t nwritten
, ret
= size
;
4186 time_t start
= time(NULL
);
4190 if (aeWait(fd
,AE_WRITABLE
,1000) & AE_WRITABLE
) {
4191 nwritten
= write(fd
,ptr
,size
);
4192 if (nwritten
== -1) return -1;
4196 if ((time(NULL
)-start
) > timeout
) {
4204 static int syncRead(int fd
, char *ptr
, ssize_t size
, int timeout
) {
4205 ssize_t nread
, totread
= 0;
4206 time_t start
= time(NULL
);
4210 if (aeWait(fd
,AE_READABLE
,1000) & AE_READABLE
) {
4211 nread
= read(fd
,ptr
,size
);
4212 if (nread
== -1) return -1;
4217 if ((time(NULL
)-start
) > timeout
) {
4225 static int syncReadLine(int fd
, char *ptr
, ssize_t size
, int timeout
) {
4232 if (syncRead(fd
,&c
,1,timeout
) == -1) return -1;
4235 if (nread
&& *(ptr
-1) == '\r') *(ptr
-1) = '\0';
4246 static void syncCommand(redisClient
*c
) {
4247 /* ignore SYNC if aleady slave or in monitor mode */
4248 if (c
->flags
& REDIS_SLAVE
) return;
4250 /* SYNC can't be issued when the server has pending data to send to
4251 * the client about already issued commands. We need a fresh reply
4252 * buffer registering the differences between the BGSAVE and the current
4253 * dataset, so that we can copy to other slaves if needed. */
4254 if (listLength(c
->reply
) != 0) {
4255 addReplySds(c
,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
4259 redisLog(REDIS_NOTICE
,"Slave ask for synchronization");
4260 /* Here we need to check if there is a background saving operation
4261 * in progress, or if it is required to start one */
4262 if (server
.bgsaveinprogress
) {
4263 /* Ok a background save is in progress. Let's check if it is a good
4264 * one for replication, i.e. if there is another slave that is
4265 * registering differences since the server forked to save */
4269 listRewind(server
.slaves
);
4270 while((ln
= listYield(server
.slaves
))) {
4272 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) break;
4275 /* Perfect, the server is already registering differences for
4276 * another slave. Set the right state, and copy the buffer. */
4277 listRelease(c
->reply
);
4278 c
->reply
= listDup(slave
->reply
);
4279 if (!c
->reply
) oom("listDup copying slave reply list");
4280 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
4281 redisLog(REDIS_NOTICE
,"Waiting for end of BGSAVE for SYNC");
4283 /* No way, we need to wait for the next BGSAVE in order to
4284 * register differences */
4285 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_START
;
4286 redisLog(REDIS_NOTICE
,"Waiting for next BGSAVE for SYNC");
4289 /* Ok we don't have a BGSAVE in progress, let's start one */
4290 redisLog(REDIS_NOTICE
,"Starting BGSAVE for SYNC");
4291 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
4292 redisLog(REDIS_NOTICE
,"Replication failed, can't BGSAVE");
4293 addReplySds(c
,sdsnew("-ERR Unalbe to perform background save\r\n"));
4296 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
4299 c
->flags
|= REDIS_SLAVE
;
4301 if (!listAddNodeTail(server
.slaves
,c
)) oom("listAddNodeTail");
4305 static void sendBulkToSlave(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
4306 redisClient
*slave
= privdata
;
4308 REDIS_NOTUSED(mask
);
4309 char buf
[REDIS_IOBUF_LEN
];
4310 ssize_t nwritten
, buflen
;
4312 if (slave
->repldboff
== 0) {
4313 /* Write the bulk write count before to transfer the DB. In theory here
4314 * we don't know how much room there is in the output buffer of the
4315 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
4316 * operations) will never be smaller than the few bytes we need. */
4319 bulkcount
= sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
4321 if (write(fd
,bulkcount
,sdslen(bulkcount
)) != (signed)sdslen(bulkcount
))
4329 lseek(slave
->repldbfd
,slave
->repldboff
,SEEK_SET
);
4330 buflen
= read(slave
->repldbfd
,buf
,REDIS_IOBUF_LEN
);
4332 redisLog(REDIS_WARNING
,"Read error sending DB to slave: %s",
4333 (buflen
== 0) ? "premature EOF" : strerror(errno
));
4337 if ((nwritten
= write(fd
,buf
,buflen
)) == -1) {
4338 redisLog(REDIS_DEBUG
,"Write error sending DB to slave: %s",
4343 slave
->repldboff
+= nwritten
;
4344 if (slave
->repldboff
== slave
->repldbsize
) {
4345 close(slave
->repldbfd
);
4346 slave
->repldbfd
= -1;
4347 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
4348 slave
->replstate
= REDIS_REPL_ONLINE
;
4349 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
,
4350 sendReplyToClient
, slave
, NULL
) == AE_ERR
) {
4354 addReplySds(slave
,sdsempty());
4355 redisLog(REDIS_NOTICE
,"Synchronization with slave succeeded");
4359 /* This function is called at the end of every backgrond saving.
4360 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
4361 * otherwise REDIS_ERR is passed to the function.
4363 * The goal of this function is to handle slaves waiting for a successful
4364 * background saving in order to perform non-blocking synchronization. */
4365 static void updateSlavesWaitingBgsave(int bgsaveerr
) {
4367 int startbgsave
= 0;
4369 listRewind(server
.slaves
);
4370 while((ln
= listYield(server
.slaves
))) {
4371 redisClient
*slave
= ln
->value
;
4373 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) {
4375 slave
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
4376 } else if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) {
4377 struct redis_stat buf
;
4379 if (bgsaveerr
!= REDIS_OK
) {
4381 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE child returned an error");
4384 if ((slave
->repldbfd
= open(server
.dbfilename
,O_RDONLY
)) == -1 ||
4385 redis_fstat(slave
->repldbfd
,&buf
) == -1) {
4387 redisLog(REDIS_WARNING
,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno
));
4390 slave
->repldboff
= 0;
4391 slave
->repldbsize
= buf
.st_size
;
4392 slave
->replstate
= REDIS_REPL_SEND_BULK
;
4393 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
4394 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
, sendBulkToSlave
, slave
, NULL
) == AE_ERR
) {
4401 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
4402 listRewind(server
.slaves
);
4403 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE failed");
4404 while((ln
= listYield(server
.slaves
))) {
4405 redisClient
*slave
= ln
->value
;
4407 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
)
4414 static int syncWithMaster(void) {
4415 char buf
[1024], tmpfile
[256];
4417 int fd
= anetTcpConnect(NULL
,server
.masterhost
,server
.masterport
);
4421 redisLog(REDIS_WARNING
,"Unable to connect to MASTER: %s",
4425 /* Issue the SYNC command */
4426 if (syncWrite(fd
,"SYNC \r\n",7,5) == -1) {
4428 redisLog(REDIS_WARNING
,"I/O error writing to MASTER: %s",
4432 /* Read the bulk write count */
4433 if (syncReadLine(fd
,buf
,1024,3600) == -1) {
4435 redisLog(REDIS_WARNING
,"I/O error reading bulk count from MASTER: %s",
4439 dumpsize
= atoi(buf
+1);
4440 redisLog(REDIS_NOTICE
,"Receiving %d bytes data dump from MASTER",dumpsize
);
4441 /* Read the bulk write data on a temp file */
4442 snprintf(tmpfile
,256,"temp-%d.%ld.rdb",(int)time(NULL
),(long int)random());
4443 dfd
= open(tmpfile
,O_CREAT
|O_WRONLY
,0644);
4446 redisLog(REDIS_WARNING
,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno
));
4450 int nread
, nwritten
;
4452 nread
= read(fd
,buf
,(dumpsize
< 1024)?dumpsize
:1024);
4454 redisLog(REDIS_WARNING
,"I/O error trying to sync with MASTER: %s",
4460 nwritten
= write(dfd
,buf
,nread
);
4461 if (nwritten
== -1) {
4462 redisLog(REDIS_WARNING
,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno
));
4470 if (rename(tmpfile
,server
.dbfilename
) == -1) {
4471 redisLog(REDIS_WARNING
,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno
));
4477 if (rdbLoad(server
.dbfilename
) != REDIS_OK
) {
4478 redisLog(REDIS_WARNING
,"Failed trying to load the MASTER synchronization DB from disk");
4482 server
.master
= createClient(fd
);
4483 server
.master
->flags
|= REDIS_MASTER
;
4484 server
.replstate
= REDIS_REPL_CONNECTED
;
4488 static void slaveofCommand(redisClient
*c
) {
4489 if (!strcasecmp(c
->argv
[1]->ptr
,"no") &&
4490 !strcasecmp(c
->argv
[2]->ptr
,"one")) {
4491 if (server
.masterhost
) {
4492 sdsfree(server
.masterhost
);
4493 server
.masterhost
= NULL
;
4494 if (server
.master
) freeClient(server
.master
);
4495 server
.replstate
= REDIS_REPL_NONE
;
4496 redisLog(REDIS_NOTICE
,"MASTER MODE enabled (user request)");
4499 sdsfree(server
.masterhost
);
4500 server
.masterhost
= sdsdup(c
->argv
[1]->ptr
);
4501 server
.masterport
= atoi(c
->argv
[2]->ptr
);
4502 if (server
.master
) freeClient(server
.master
);
4503 server
.replstate
= REDIS_REPL_CONNECT
;
4504 redisLog(REDIS_NOTICE
,"SLAVE OF %s:%d enabled (user request)",
4505 server
.masterhost
, server
.masterport
);
4507 addReply(c
,shared
.ok
);
4510 /* ============================ Maxmemory directive ======================== */
4512 /* This function gets called when 'maxmemory' is set on the config file to limit
4513 * the max memory used by the server, and we are out of memory.
4514 * This function will try to, in order:
4516 * - Free objects from the free list
4517 * - Try to remove keys with an EXPIRE set
4519 * It is not possible to free enough memory to reach used-memory < maxmemory
4520 * the server will start refusing commands that will enlarge even more the
4523 static void freeMemoryIfNeeded(void) {
4524 while (server
.maxmemory
&& zmalloc_used_memory() > server
.maxmemory
) {
4525 if (listLength(server
.objfreelist
)) {
4528 listNode
*head
= listFirst(server
.objfreelist
);
4529 o
= listNodeValue(head
);
4530 listDelNode(server
.objfreelist
,head
);
4533 int j
, k
, freed
= 0;
4535 for (j
= 0; j
< server
.dbnum
; j
++) {
4537 robj
*minkey
= NULL
;
4538 struct dictEntry
*de
;
4540 if (dictSize(server
.db
[j
].expires
)) {
4542 /* From a sample of three keys drop the one nearest to
4543 * the natural expire */
4544 for (k
= 0; k
< 3; k
++) {
4547 de
= dictGetRandomKey(server
.db
[j
].expires
);
4548 t
= (time_t) dictGetEntryVal(de
);
4549 if (minttl
== -1 || t
< minttl
) {
4550 minkey
= dictGetEntryKey(de
);
4554 deleteKey(server
.db
+j
,minkey
);
4557 if (!freed
) return; /* nothing to free... */
4562 /* ================================= Debugging ============================== */
4564 static void debugCommand(redisClient
*c
) {
4565 if (!strcasecmp(c
->argv
[1]->ptr
,"segfault")) {
4567 } else if (!strcasecmp(c
->argv
[1]->ptr
,"object") && c
->argc
== 3) {
4568 dictEntry
*de
= dictFind(c
->db
->dict
,c
->argv
[2]);
4572 addReply(c
,shared
.nokeyerr
);
4575 key
= dictGetEntryKey(de
);
4576 val
= dictGetEntryVal(de
);
4577 addReplySds(c
,sdscatprintf(sdsempty(),
4578 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
4579 key
, key
->refcount
, val
, val
->refcount
, val
->encoding
));
4581 addReplySds(c
,sdsnew(
4582 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>]\r\n"));
4586 #ifdef HAVE_BACKTRACE
4587 static struct redisFunctionSym symsTable
[] = {
4588 {"compareStringObjects", (unsigned long)compareStringObjects
},
4589 {"isStringRepresentableAsLong", (unsigned long)isStringRepresentableAsLong
},
4590 {"dictEncObjKeyCompare", (unsigned long)dictEncObjKeyCompare
},
4591 {"dictEncObjHash", (unsigned long)dictEncObjHash
},
4592 {"incrDecrCommand", (unsigned long)incrDecrCommand
},
4593 {"freeStringObject", (unsigned long)freeStringObject
},
4594 {"freeListObject", (unsigned long)freeListObject
},
4595 {"freeSetObject", (unsigned long)freeSetObject
},
4596 {"decrRefCount", (unsigned long)decrRefCount
},
4597 {"createObject", (unsigned long)createObject
},
4598 {"freeClient", (unsigned long)freeClient
},
4599 {"rdbLoad", (unsigned long)rdbLoad
},
4600 {"rdbSaveStringObject", (unsigned long)rdbSaveStringObject
},
4601 {"rdbSaveStringObjectRaw", (unsigned long)rdbSaveStringObjectRaw
},
4602 {"addReply", (unsigned long)addReply
},
4603 {"addReplySds", (unsigned long)addReplySds
},
4604 {"incrRefCount", (unsigned long)incrRefCount
},
4605 {"rdbSaveBackground", (unsigned long)rdbSaveBackground
},
4606 {"createStringObject", (unsigned long)createStringObject
},
4607 {"replicationFeedSlaves", (unsigned long)replicationFeedSlaves
},
4608 {"syncWithMaster", (unsigned long)syncWithMaster
},
4609 {"tryObjectSharing", (unsigned long)tryObjectSharing
},
4610 {"tryObjectEncoding", (unsigned long)tryObjectEncoding
},
4611 {"getDecodedObject", (unsigned long)getDecodedObject
},
4612 {"removeExpire", (unsigned long)removeExpire
},
4613 {"expireIfNeeded", (unsigned long)expireIfNeeded
},
4614 {"deleteIfVolatile", (unsigned long)deleteIfVolatile
},
4615 {"deleteKey", (unsigned long)deleteKey
},
4616 {"getExpire", (unsigned long)getExpire
},
4617 {"setExpire", (unsigned long)setExpire
},
4618 {"updateSlavesWaitingBgsave", (unsigned long)updateSlavesWaitingBgsave
},
4619 {"freeMemoryIfNeeded", (unsigned long)freeMemoryIfNeeded
},
4620 {"authCommand", (unsigned long)authCommand
},
4621 {"pingCommand", (unsigned long)pingCommand
},
4622 {"echoCommand", (unsigned long)echoCommand
},
4623 {"setCommand", (unsigned long)setCommand
},
4624 {"setnxCommand", (unsigned long)setnxCommand
},
4625 {"getCommand", (unsigned long)getCommand
},
4626 {"delCommand", (unsigned long)delCommand
},
4627 {"existsCommand", (unsigned long)existsCommand
},
4628 {"incrCommand", (unsigned long)incrCommand
},
4629 {"decrCommand", (unsigned long)decrCommand
},
4630 {"incrbyCommand", (unsigned long)incrbyCommand
},
4631 {"decrbyCommand", (unsigned long)decrbyCommand
},
4632 {"selectCommand", (unsigned long)selectCommand
},
4633 {"randomkeyCommand", (unsigned long)randomkeyCommand
},
4634 {"keysCommand", (unsigned long)keysCommand
},
4635 {"dbsizeCommand", (unsigned long)dbsizeCommand
},
4636 {"lastsaveCommand", (unsigned long)lastsaveCommand
},
4637 {"saveCommand", (unsigned long)saveCommand
},
4638 {"bgsaveCommand", (unsigned long)bgsaveCommand
},
4639 {"shutdownCommand", (unsigned long)shutdownCommand
},
4640 {"moveCommand", (unsigned long)moveCommand
},
4641 {"renameCommand", (unsigned long)renameCommand
},
4642 {"renamenxCommand", (unsigned long)renamenxCommand
},
4643 {"lpushCommand", (unsigned long)lpushCommand
},
4644 {"rpushCommand", (unsigned long)rpushCommand
},
4645 {"lpopCommand", (unsigned long)lpopCommand
},
4646 {"rpopCommand", (unsigned long)rpopCommand
},
4647 {"llenCommand", (unsigned long)llenCommand
},
4648 {"lindexCommand", (unsigned long)lindexCommand
},
4649 {"lrangeCommand", (unsigned long)lrangeCommand
},
4650 {"ltrimCommand", (unsigned long)ltrimCommand
},
4651 {"typeCommand", (unsigned long)typeCommand
},
4652 {"lsetCommand", (unsigned long)lsetCommand
},
4653 {"saddCommand", (unsigned long)saddCommand
},
4654 {"sremCommand", (unsigned long)sremCommand
},
4655 {"smoveCommand", (unsigned long)smoveCommand
},
4656 {"sismemberCommand", (unsigned long)sismemberCommand
},
4657 {"scardCommand", (unsigned long)scardCommand
},
4658 {"spopCommand", (unsigned long)spopCommand
},
4659 {"srandmemberCommand", (unsigned long)srandmemberCommand
},
4660 {"sinterCommand", (unsigned long)sinterCommand
},
4661 {"sinterstoreCommand", (unsigned long)sinterstoreCommand
},
4662 {"sunionCommand", (unsigned long)sunionCommand
},
4663 {"sunionstoreCommand", (unsigned long)sunionstoreCommand
},
4664 {"sdiffCommand", (unsigned long)sdiffCommand
},
4665 {"sdiffstoreCommand", (unsigned long)sdiffstoreCommand
},
4666 {"syncCommand", (unsigned long)syncCommand
},
4667 {"flushdbCommand", (unsigned long)flushdbCommand
},
4668 {"flushallCommand", (unsigned long)flushallCommand
},
4669 {"sortCommand", (unsigned long)sortCommand
},
4670 {"lremCommand", (unsigned long)lremCommand
},
4671 {"infoCommand", (unsigned long)infoCommand
},
4672 {"mgetCommand", (unsigned long)mgetCommand
},
4673 {"monitorCommand", (unsigned long)monitorCommand
},
4674 {"expireCommand", (unsigned long)expireCommand
},
4675 {"getsetCommand", (unsigned long)getsetCommand
},
4676 {"ttlCommand", (unsigned long)ttlCommand
},
4677 {"slaveofCommand", (unsigned long)slaveofCommand
},
4678 {"debugCommand", (unsigned long)debugCommand
},
4679 {"processCommand", (unsigned long)processCommand
},
4680 {"setupSigSegvAction", (unsigned long)setupSigSegvAction
},
4681 {"readQueryFromClient", (unsigned long)readQueryFromClient
},
4682 {"rdbRemoveTempFile", (unsigned long)rdbRemoveTempFile
},
4683 {"msetGenericCommand", (unsigned long)msetGenericCommand
},
4684 {"msetCommand", (unsigned long)msetCommand
},
4685 {"msetnxCommand", (unsigned long)msetnxCommand
},
4689 /* This function try to convert a pointer into a function name. It's used in
4690 * oreder to provide a backtrace under segmentation fault that's able to
4691 * display functions declared as static (otherwise the backtrace is useless). */
4692 static char *findFuncName(void *pointer
, unsigned long *offset
){
4694 unsigned long off
, minoff
= 0;
4696 /* Try to match against the Symbol with the smallest offset */
4697 for (i
=0; symsTable
[i
].pointer
; i
++) {
4698 unsigned long lp
= (unsigned long) pointer
;
4700 if (lp
!= (unsigned long)-1 && lp
>= symsTable
[i
].pointer
) {
4701 off
=lp
-symsTable
[i
].pointer
;
4702 if (ret
< 0 || off
< minoff
) {
4708 if (ret
== -1) return NULL
;
4710 return symsTable
[ret
].name
;
4713 static void *getMcontextEip(ucontext_t
*uc
) {
4714 #if defined(__FreeBSD__)
4715 return (void*) uc
->uc_mcontext
.mc_eip
;
4716 #elif defined(__dietlibc__)
4717 return (void*) uc
->uc_mcontext
.eip
;
4718 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
4719 return (void*) uc
->uc_mcontext
->__ss
.__eip
;
4720 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
4721 #ifdef _STRUCT_X86_THREAD_STATE64
4722 return (void*) uc
->uc_mcontext
->__ss
.__rip
;
4724 return (void*) uc
->uc_mcontext
->__ss
.__eip
;
4726 #elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
4727 return (void*) uc
->uc_mcontext
.gregs
[REG_EIP
];
4728 #elif defined(__ia64__) /* Linux IA64 */
4729 return (void*) uc
->uc_mcontext
.sc_ip
;
4735 static void segvHandler(int sig
, siginfo_t
*info
, void *secret
) {
4737 char **messages
= NULL
;
4738 int i
, trace_size
= 0;
4739 unsigned long offset
=0;
4740 time_t uptime
= time(NULL
)-server
.stat_starttime
;
4741 ucontext_t
*uc
= (ucontext_t
*) secret
;
4742 REDIS_NOTUSED(info
);
4744 redisLog(REDIS_WARNING
,
4745 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION
, sig
);
4746 redisLog(REDIS_WARNING
, "%s", sdscatprintf(sdsempty(),
4747 "redis_version:%s; "
4748 "uptime_in_seconds:%d; "
4749 "connected_clients:%d; "
4750 "connected_slaves:%d; "
4752 "changes_since_last_save:%lld; "
4753 "bgsave_in_progress:%d; "
4754 "last_save_time:%d; "
4755 "total_connections_received:%lld; "
4756 "total_commands_processed:%lld; "
4760 listLength(server
.clients
)-listLength(server
.slaves
),
4761 listLength(server
.slaves
),
4764 server
.bgsaveinprogress
,
4766 server
.stat_numconnections
,
4767 server
.stat_numcommands
,
4768 server
.masterhost
== NULL
? "master" : "slave"
4771 trace_size
= backtrace(trace
, 100);
4772 /* overwrite sigaction with caller's address */
4773 if (getMcontextEip(uc
) != NULL
) {
4774 trace
[1] = getMcontextEip(uc
);
4776 messages
= backtrace_symbols(trace
, trace_size
);
4778 for (i
=1; i
<trace_size
; ++i
) {
4779 char *fn
= findFuncName(trace
[i
], &offset
), *p
;
4781 p
= strchr(messages
[i
],'+');
4782 if (!fn
|| (p
&& ((unsigned long)strtol(p
+1,NULL
,10)) < offset
)) {
4783 redisLog(REDIS_WARNING
,"%s", messages
[i
]);
4785 redisLog(REDIS_WARNING
,"%d redis-server %p %s + %d", i
, trace
[i
], fn
, (unsigned int)offset
);
4792 static void setupSigSegvAction(void) {
4793 struct sigaction act
;
4795 sigemptyset (&act
.sa_mask
);
4796 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
4797 * is used. Otherwise, sa_handler is used */
4798 act
.sa_flags
= SA_NODEFER
| SA_ONSTACK
| SA_RESETHAND
| SA_SIGINFO
;
4799 act
.sa_sigaction
= segvHandler
;
4800 sigaction (SIGSEGV
, &act
, NULL
);
4801 sigaction (SIGBUS
, &act
, NULL
);
4802 sigaction (SIGFPE
, &act
, NULL
);
4803 sigaction (SIGILL
, &act
, NULL
);
4804 sigaction (SIGBUS
, &act
, NULL
);
4807 #else /* HAVE_BACKTRACE */
4808 static void setupSigSegvAction(void) {
4810 #endif /* HAVE_BACKTRACE */
4812 /* =================================== Main! ================================ */
4815 int linuxOvercommitMemoryValue(void) {
4816 FILE *fp
= fopen("/proc/sys/vm/overcommit_memory","r");
4820 if (fgets(buf
,64,fp
) == NULL
) {
4829 void linuxOvercommitMemoryWarning(void) {
4830 if (linuxOvercommitMemoryValue() == 0) {
4831 redisLog(REDIS_WARNING
,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
4834 #endif /* __linux__ */
4836 static void daemonize(void) {
4840 if (fork() != 0) exit(0); /* parent exits */
4841 setsid(); /* create a new session */
4843 /* Every output goes to /dev/null. If Redis is daemonized but
4844 * the 'logfile' is set to 'stdout' in the configuration file
4845 * it will not log at all. */
4846 if ((fd
= open("/dev/null", O_RDWR
, 0)) != -1) {
4847 dup2(fd
, STDIN_FILENO
);
4848 dup2(fd
, STDOUT_FILENO
);
4849 dup2(fd
, STDERR_FILENO
);
4850 if (fd
> STDERR_FILENO
) close(fd
);
4852 /* Try to write the pid file */
4853 fp
= fopen(server
.pidfile
,"w");
4855 fprintf(fp
,"%d\n",getpid());
4860 int main(int argc
, char **argv
) {
4863 ResetServerSaveParams();
4864 loadServerConfig(argv
[1]);
4865 } else if (argc
> 2) {
4866 fprintf(stderr
,"Usage: ./redis-server [/path/to/redis.conf]\n");
4869 redisLog(REDIS_WARNING
,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
4872 if (server
.daemonize
) daemonize();
4873 redisLog(REDIS_NOTICE
,"Server started, Redis version " REDIS_VERSION
);
4875 linuxOvercommitMemoryWarning();
4877 if (rdbLoad(server
.dbfilename
) == REDIS_OK
)
4878 redisLog(REDIS_NOTICE
,"DB loaded from disk");
4879 if (aeCreateFileEvent(server
.el
, server
.fd
, AE_READABLE
,
4880 acceptHandler
, NULL
, NULL
) == AE_ERR
) oom("creating file event");
4881 redisLog(REDIS_NOTICE
,"The server is now ready to accept connections on port %d", server
.port
);
4883 aeDeleteEventLoop(server
.el
);