2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
30 #define REDIS_VERSION "1.001"
40 #define __USE_POSIX199309
46 #endif /* HAVE_BACKTRACE */
54 #include <arpa/inet.h>
58 #include <sys/resource.h>
62 #include "ae.h" /* Event driven programming library */
63 #include "sds.h" /* Dynamic safe strings */
64 #include "anet.h" /* Networking the easy way */
65 #include "dict.h" /* Hash tables */
66 #include "adlist.h" /* Linked lists */
67 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
68 #include "lzf.h" /* LZF compression library */
69 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
75 /* Static server configuration */
76 #define REDIS_SERVERPORT 6379 /* TCP port */
77 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
78 #define REDIS_IOBUF_LEN 1024
79 #define REDIS_LOADBUF_LEN 1024
80 #define REDIS_STATIC_ARGS 4
81 #define REDIS_DEFAULT_DBNUM 16
82 #define REDIS_CONFIGLINE_MAX 1024
83 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
84 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
85 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
86 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
87 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
89 /* Hash table parameters */
90 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
93 #define REDIS_CMD_BULK 1 /* Bulk write command */
94 #define REDIS_CMD_INLINE 2 /* Inline command */
95 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
96 this flags will return an error when the 'maxmemory' option is set in the
97 config file and the server is using more than maxmemory bytes of memory.
98 In short this commands are denied on low memory conditions. */
99 #define REDIS_CMD_DENYOOM 4
102 #define REDIS_STRING 0
107 /* Objects encoding */
108 #define REDIS_ENCODING_RAW 0 /* Raw representation */
109 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
111 /* Object types only used for dumping to disk */
112 #define REDIS_EXPIRETIME 253
113 #define REDIS_SELECTDB 254
114 #define REDIS_EOF 255
116 /* Defines related to the dump file format. To store 32 bits lengths for short
117 * keys requires a lot of space, so we check the most significant 2 bits of
118 * the first byte to interpreter the length:
120 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
121 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
122 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
123 * 11|000000 this means: specially encoded object will follow. The six bits
124 * number specify the kind of object that follows.
125 * See the REDIS_RDB_ENC_* defines.
127 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
128 * values, will fit inside. */
129 #define REDIS_RDB_6BITLEN 0
130 #define REDIS_RDB_14BITLEN 1
131 #define REDIS_RDB_32BITLEN 2
132 #define REDIS_RDB_ENCVAL 3
133 #define REDIS_RDB_LENERR UINT_MAX
135 /* When a length of a string object stored on disk has the first two bits
136 * set, the remaining two bits specify a special encoding for the object
137 * accordingly to the following defines: */
138 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
139 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
140 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
141 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
144 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
145 #define REDIS_SLAVE 2 /* This client is a slave server */
146 #define REDIS_MASTER 4 /* This client is a master server */
147 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
149 /* Slave replication state - slave side */
150 #define REDIS_REPL_NONE 0 /* No active replication */
151 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
152 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
154 /* Slave replication state - from the point of view of master
155 * Note that in SEND_BULK and ONLINE state the slave receives new updates
156 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
157 * to start the next background saving in order to send updates to it. */
158 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
159 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
160 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
161 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
163 /* List related stuff */
167 /* Sort operations */
168 #define REDIS_SORT_GET 0
169 #define REDIS_SORT_DEL 1
170 #define REDIS_SORT_INCR 2
171 #define REDIS_SORT_DECR 3
172 #define REDIS_SORT_ASC 4
173 #define REDIS_SORT_DESC 5
174 #define REDIS_SORTKEY_MAX 1024
177 #define REDIS_DEBUG 0
178 #define REDIS_NOTICE 1
179 #define REDIS_WARNING 2
181 /* Anti-warning macro... */
182 #define REDIS_NOTUSED(V) ((void) V)
185 /*================================= Data types ============================== */
187 /* A redis object, that is a type able to hold a string / list / set */
188 typedef struct redisObject
{
191 unsigned char encoding
;
192 unsigned char notused
[2];
196 typedef struct redisDb
{
202 /* With multiplexing we need to take per-clinet state.
203 * Clients are taken in a liked list. */
204 typedef struct redisClient
{
211 int bulklen
; /* bulk read len. -1 if not in bulk read mode */
214 time_t lastinteraction
; /* time of the last interaction, used for timeout */
215 int flags
; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
216 int slaveseldb
; /* slave selected db, if this client is a slave */
217 int authenticated
; /* when requirepass is non-NULL */
218 int replstate
; /* replication state if this is a slave */
219 int repldbfd
; /* replication DB file descriptor */
220 long repldboff
; /* replication DB file offset */
221 off_t repldbsize
; /* replication DB file size */
229 /* Global server state structure */
235 unsigned int sharingpoolsize
;
236 long long dirty
; /* changes to DB from the last save */
238 list
*slaves
, *monitors
;
239 char neterr
[ANET_ERR_LEN
];
241 int cronloops
; /* number of times the cron function run */
242 list
*objfreelist
; /* A list of freed objects to avoid malloc() */
243 time_t lastsave
; /* Unix time of last save succeeede */
244 size_t usedmemory
; /* Used memory in megabytes */
245 /* Fields used only for stats */
246 time_t stat_starttime
; /* server start time */
247 long long stat_numcommands
; /* number of processed commands */
248 long long stat_numconnections
; /* number of connections received */
256 int bgsaveinprogress
;
257 pid_t bgsavechildpid
;
258 struct saveparam
*saveparams
;
265 /* Replication related */
269 redisClient
*master
; /* client that is master for this slave */
271 unsigned int maxclients
;
272 unsigned long maxmemory
;
273 /* Sort parameters - qsort_r() is only available under BSD so we
274 * have to take this state global, in order to pass it to sortCompare() */
280 typedef void redisCommandProc(redisClient
*c
);
281 struct redisCommand
{
283 redisCommandProc
*proc
;
288 struct redisFunctionSym
{
290 unsigned long pointer
;
293 typedef struct _redisSortObject
{
301 typedef struct _redisSortOperation
{
304 } redisSortOperation
;
306 struct sharedObjectsStruct
{
307 robj
*crlf
, *ok
, *err
, *emptybulk
, *czero
, *cone
, *pong
, *space
,
308 *colon
, *nullbulk
, *nullmultibulk
,
309 *emptymultibulk
, *wrongtypeerr
, *nokeyerr
, *syntaxerr
, *sameobjecterr
,
310 *outofrangeerr
, *plus
,
311 *select0
, *select1
, *select2
, *select3
, *select4
,
312 *select5
, *select6
, *select7
, *select8
, *select9
;
315 /*================================ Prototypes =============================== */
317 static void freeStringObject(robj
*o
);
318 static void freeListObject(robj
*o
);
319 static void freeSetObject(robj
*o
);
320 static void decrRefCount(void *o
);
321 static robj
*createObject(int type
, void *ptr
);
322 static void freeClient(redisClient
*c
);
323 static int rdbLoad(char *filename
);
324 static void addReply(redisClient
*c
, robj
*obj
);
325 static void addReplySds(redisClient
*c
, sds s
);
326 static void incrRefCount(robj
*o
);
327 static int rdbSaveBackground(char *filename
);
328 static robj
*createStringObject(char *ptr
, size_t len
);
329 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
);
330 static int syncWithMaster(void);
331 static robj
*tryObjectSharing(robj
*o
);
332 static int tryObjectEncoding(robj
*o
);
333 static robj
*getDecodedObject(const robj
*o
);
334 static int removeExpire(redisDb
*db
, robj
*key
);
335 static int expireIfNeeded(redisDb
*db
, robj
*key
);
336 static int deleteIfVolatile(redisDb
*db
, robj
*key
);
337 static int deleteKey(redisDb
*db
, robj
*key
);
338 static time_t getExpire(redisDb
*db
, robj
*key
);
339 static int setExpire(redisDb
*db
, robj
*key
, time_t when
);
340 static void updateSlavesWaitingBgsave(int bgsaveerr
);
341 static void freeMemoryIfNeeded(void);
342 static int processCommand(redisClient
*c
);
343 static void setupSigSegvAction(void);
344 static void rdbRemoveTempFile(pid_t childpid
);
345 static size_t stringObjectLen(robj
*o
);
347 static void authCommand(redisClient
*c
);
348 static void pingCommand(redisClient
*c
);
349 static void echoCommand(redisClient
*c
);
350 static void setCommand(redisClient
*c
);
351 static void setnxCommand(redisClient
*c
);
352 static void getCommand(redisClient
*c
);
353 static void delCommand(redisClient
*c
);
354 static void existsCommand(redisClient
*c
);
355 static void incrCommand(redisClient
*c
);
356 static void decrCommand(redisClient
*c
);
357 static void incrbyCommand(redisClient
*c
);
358 static void decrbyCommand(redisClient
*c
);
359 static void selectCommand(redisClient
*c
);
360 static void randomkeyCommand(redisClient
*c
);
361 static void keysCommand(redisClient
*c
);
362 static void dbsizeCommand(redisClient
*c
);
363 static void lastsaveCommand(redisClient
*c
);
364 static void saveCommand(redisClient
*c
);
365 static void bgsaveCommand(redisClient
*c
);
366 static void shutdownCommand(redisClient
*c
);
367 static void moveCommand(redisClient
*c
);
368 static void renameCommand(redisClient
*c
);
369 static void renamenxCommand(redisClient
*c
);
370 static void lpushCommand(redisClient
*c
);
371 static void rpushCommand(redisClient
*c
);
372 static void lpopCommand(redisClient
*c
);
373 static void rpopCommand(redisClient
*c
);
374 static void llenCommand(redisClient
*c
);
375 static void lindexCommand(redisClient
*c
);
376 static void lrangeCommand(redisClient
*c
);
377 static void ltrimCommand(redisClient
*c
);
378 static void typeCommand(redisClient
*c
);
379 static void lsetCommand(redisClient
*c
);
380 static void saddCommand(redisClient
*c
);
381 static void sremCommand(redisClient
*c
);
382 static void smoveCommand(redisClient
*c
);
383 static void sismemberCommand(redisClient
*c
);
384 static void scardCommand(redisClient
*c
);
385 static void spopCommand(redisClient
*c
);
386 static void sinterCommand(redisClient
*c
);
387 static void sinterstoreCommand(redisClient
*c
);
388 static void sunionCommand(redisClient
*c
);
389 static void sunionstoreCommand(redisClient
*c
);
390 static void sdiffCommand(redisClient
*c
);
391 static void sdiffstoreCommand(redisClient
*c
);
392 static void syncCommand(redisClient
*c
);
393 static void flushdbCommand(redisClient
*c
);
394 static void flushallCommand(redisClient
*c
);
395 static void sortCommand(redisClient
*c
);
396 static void lremCommand(redisClient
*c
);
397 static void infoCommand(redisClient
*c
);
398 static void mgetCommand(redisClient
*c
);
399 static void monitorCommand(redisClient
*c
);
400 static void expireCommand(redisClient
*c
);
401 static void getSetCommand(redisClient
*c
);
402 static void ttlCommand(redisClient
*c
);
403 static void slaveofCommand(redisClient
*c
);
404 static void debugCommand(redisClient
*c
);
405 /*================================= Globals ================================= */
408 static struct redisServer server
; /* server global state */
409 static struct redisCommand cmdTable
[] = {
410 {"get",getCommand
,2,REDIS_CMD_INLINE
},
411 {"set",setCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
412 {"setnx",setnxCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
413 {"del",delCommand
,-2,REDIS_CMD_INLINE
},
414 {"exists",existsCommand
,2,REDIS_CMD_INLINE
},
415 {"incr",incrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
416 {"decr",decrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
417 {"mget",mgetCommand
,-2,REDIS_CMD_INLINE
},
418 {"rpush",rpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
419 {"lpush",lpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
420 {"rpop",rpopCommand
,2,REDIS_CMD_INLINE
},
421 {"lpop",lpopCommand
,2,REDIS_CMD_INLINE
},
422 {"llen",llenCommand
,2,REDIS_CMD_INLINE
},
423 {"lindex",lindexCommand
,3,REDIS_CMD_INLINE
},
424 {"lset",lsetCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
425 {"lrange",lrangeCommand
,4,REDIS_CMD_INLINE
},
426 {"ltrim",ltrimCommand
,4,REDIS_CMD_INLINE
},
427 {"lrem",lremCommand
,4,REDIS_CMD_BULK
},
428 {"sadd",saddCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
429 {"srem",sremCommand
,3,REDIS_CMD_BULK
},
430 {"smove",smoveCommand
,4,REDIS_CMD_BULK
},
431 {"sismember",sismemberCommand
,3,REDIS_CMD_BULK
},
432 {"scard",scardCommand
,2,REDIS_CMD_INLINE
},
433 {"spop",spopCommand
,2,REDIS_CMD_INLINE
},
434 {"sinter",sinterCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
435 {"sinterstore",sinterstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
436 {"sunion",sunionCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
437 {"sunionstore",sunionstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
438 {"sdiff",sdiffCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
439 {"sdiffstore",sdiffstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
440 {"smembers",sinterCommand
,2,REDIS_CMD_INLINE
},
441 {"incrby",incrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
442 {"decrby",decrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
443 {"getset",getSetCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
444 {"randomkey",randomkeyCommand
,1,REDIS_CMD_INLINE
},
445 {"select",selectCommand
,2,REDIS_CMD_INLINE
},
446 {"move",moveCommand
,3,REDIS_CMD_INLINE
},
447 {"rename",renameCommand
,3,REDIS_CMD_INLINE
},
448 {"renamenx",renamenxCommand
,3,REDIS_CMD_INLINE
},
449 {"expire",expireCommand
,3,REDIS_CMD_INLINE
},
450 {"keys",keysCommand
,2,REDIS_CMD_INLINE
},
451 {"dbsize",dbsizeCommand
,1,REDIS_CMD_INLINE
},
452 {"auth",authCommand
,2,REDIS_CMD_INLINE
},
453 {"ping",pingCommand
,1,REDIS_CMD_INLINE
},
454 {"echo",echoCommand
,2,REDIS_CMD_BULK
},
455 {"save",saveCommand
,1,REDIS_CMD_INLINE
},
456 {"bgsave",bgsaveCommand
,1,REDIS_CMD_INLINE
},
457 {"shutdown",shutdownCommand
,1,REDIS_CMD_INLINE
},
458 {"lastsave",lastsaveCommand
,1,REDIS_CMD_INLINE
},
459 {"type",typeCommand
,2,REDIS_CMD_INLINE
},
460 {"sync",syncCommand
,1,REDIS_CMD_INLINE
},
461 {"flushdb",flushdbCommand
,1,REDIS_CMD_INLINE
},
462 {"flushall",flushallCommand
,1,REDIS_CMD_INLINE
},
463 {"sort",sortCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
464 {"info",infoCommand
,1,REDIS_CMD_INLINE
},
465 {"monitor",monitorCommand
,1,REDIS_CMD_INLINE
},
466 {"ttl",ttlCommand
,2,REDIS_CMD_INLINE
},
467 {"slaveof",slaveofCommand
,3,REDIS_CMD_INLINE
},
468 {"debug",debugCommand
,-2,REDIS_CMD_INLINE
},
471 /*============================ Utility functions ============================ */
473 /* Glob-style pattern matching. */
474 int stringmatchlen(const char *pattern
, int patternLen
,
475 const char *string
, int stringLen
, int nocase
)
480 while (pattern
[1] == '*') {
485 return 1; /* match */
487 if (stringmatchlen(pattern
+1, patternLen
-1,
488 string
, stringLen
, nocase
))
489 return 1; /* match */
493 return 0; /* no match */
497 return 0; /* no match */
507 not = pattern
[0] == '^';
514 if (pattern
[0] == '\\') {
517 if (pattern
[0] == string
[0])
519 } else if (pattern
[0] == ']') {
521 } else if (patternLen
== 0) {
525 } else if (pattern
[1] == '-' && patternLen
>= 3) {
526 int start
= pattern
[0];
527 int end
= pattern
[2];
535 start
= tolower(start
);
541 if (c
>= start
&& c
<= end
)
545 if (pattern
[0] == string
[0])
548 if (tolower((int)pattern
[0]) == tolower((int)string
[0]))
558 return 0; /* no match */
564 if (patternLen
>= 2) {
571 if (pattern
[0] != string
[0])
572 return 0; /* no match */
574 if (tolower((int)pattern
[0]) != tolower((int)string
[0]))
575 return 0; /* no match */
583 if (stringLen
== 0) {
584 while(*pattern
== '*') {
591 if (patternLen
== 0 && stringLen
== 0)
596 static void redisLog(int level
, const char *fmt
, ...) {
600 fp
= (server
.logfile
== NULL
) ? stdout
: fopen(server
.logfile
,"a");
604 if (level
>= server
.verbosity
) {
610 strftime(buf
,64,"%d %b %H:%M:%S",gmtime(&now
));
611 fprintf(fp
,"%s %c ",buf
,c
[level
]);
612 vfprintf(fp
, fmt
, ap
);
618 if (server
.logfile
) fclose(fp
);
621 /*====================== Hash table type implementation ==================== */
623 /* This is an hash table type that uses the SDS dynamic strings libary as
624 * keys and radis objects as values (objects can hold SDS strings,
627 static int sdsDictKeyCompare(void *privdata
, const void *key1
,
631 DICT_NOTUSED(privdata
);
633 l1
= sdslen((sds
)key1
);
634 l2
= sdslen((sds
)key2
);
635 if (l1
!= l2
) return 0;
636 return memcmp(key1
, key2
, l1
) == 0;
639 static void dictRedisObjectDestructor(void *privdata
, void *val
)
641 DICT_NOTUSED(privdata
);
646 static int dictObjKeyCompare(void *privdata
, const void *key1
,
649 const robj
*o1
= key1
, *o2
= key2
;
650 return sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
653 static unsigned int dictObjHash(const void *key
) {
655 return dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
658 static int dictEncObjKeyCompare(void *privdata
, const void *key1
,
661 const robj
*o1
= key1
, *o2
= key2
;
663 if (o1
->encoding
== REDIS_ENCODING_RAW
&&
664 o2
->encoding
== REDIS_ENCODING_RAW
)
665 return sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
670 dec1
= o1
->encoding
!= REDIS_ENCODING_RAW
?
671 getDecodedObject(o1
) : (robj
*)o1
;
672 dec2
= o2
->encoding
!= REDIS_ENCODING_RAW
?
673 getDecodedObject(o2
) : (robj
*)o2
;
674 cmp
= sdsDictKeyCompare(privdata
,dec1
->ptr
,dec2
->ptr
);
675 if (dec1
!= o1
) decrRefCount(dec1
);
676 if (dec2
!= o2
) decrRefCount(dec2
);
681 static unsigned int dictEncObjHash(const void *key
) {
684 if (o
->encoding
== REDIS_ENCODING_RAW
)
685 return dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
687 robj
*dec
= getDecodedObject(o
);
688 unsigned int hash
= dictGenHashFunction(dec
->ptr
, sdslen((sds
)dec
->ptr
));
694 static dictType setDictType
= {
695 dictEncObjHash
, /* hash function */
698 dictEncObjKeyCompare
, /* key compare */
699 dictRedisObjectDestructor
, /* key destructor */
700 NULL
/* val destructor */
703 static dictType hashDictType
= {
704 dictObjHash
, /* hash function */
707 dictObjKeyCompare
, /* key compare */
708 dictRedisObjectDestructor
, /* key destructor */
709 dictRedisObjectDestructor
/* val destructor */
712 /* ========================= Random utility functions ======================= */
714 /* Redis generally does not try to recover from out of memory conditions
715 * when allocating objects or strings, it is not clear if it will be possible
716 * to report this condition to the client since the networking layer itself
717 * is based on heap allocation for send buffers, so we simply abort.
718 * At least the code will be simpler to read... */
719 static void oom(const char *msg
) {
720 fprintf(stderr
, "%s: Out of memory\n",msg
);
726 /* ====================== Redis server networking stuff ===================== */
727 static void closeTimedoutClients(void) {
730 time_t now
= time(NULL
);
732 listRewind(server
.clients
);
733 while ((ln
= listYield(server
.clients
)) != NULL
) {
734 c
= listNodeValue(ln
);
735 if (!(c
->flags
& REDIS_SLAVE
) && /* no timeout for slaves */
736 !(c
->flags
& REDIS_MASTER
) && /* no timeout for masters */
737 (now
- c
->lastinteraction
> server
.maxidletime
)) {
738 redisLog(REDIS_DEBUG
,"Closing idle client");
744 static int htNeedsResize(dict
*dict
) {
745 long long size
, used
;
747 size
= dictSlots(dict
);
748 used
= dictSize(dict
);
749 return (size
&& used
&& size
> DICT_HT_INITIAL_SIZE
&&
750 (used
*100/size
< REDIS_HT_MINFILL
));
753 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
754 * we resize the hash table to save memory */
755 static void tryResizeHashTables(void) {
758 for (j
= 0; j
< server
.dbnum
; j
++) {
759 if (htNeedsResize(server
.db
[j
].dict
)) {
760 redisLog(REDIS_DEBUG
,"The hash table %d is too sparse, resize it...",j
);
761 dictResize(server
.db
[j
].dict
);
762 redisLog(REDIS_DEBUG
,"Hash table %d resized.",j
);
764 if (htNeedsResize(server
.db
[j
].expires
))
765 dictResize(server
.db
[j
].expires
);
769 static int serverCron(struct aeEventLoop
*eventLoop
, long long id
, void *clientData
) {
770 int j
, loops
= server
.cronloops
++;
771 REDIS_NOTUSED(eventLoop
);
773 REDIS_NOTUSED(clientData
);
775 /* Update the global state with the amount of used memory */
776 server
.usedmemory
= zmalloc_used_memory();
778 /* Show some info about non-empty databases */
779 for (j
= 0; j
< server
.dbnum
; j
++) {
780 long long size
, used
, vkeys
;
782 size
= dictSlots(server
.db
[j
].dict
);
783 used
= dictSize(server
.db
[j
].dict
);
784 vkeys
= dictSize(server
.db
[j
].expires
);
785 if (!(loops
% 5) && (used
|| vkeys
)) {
786 redisLog(REDIS_DEBUG
,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j
,used
,vkeys
,size
);
787 /* dictPrintStats(server.dict); */
791 /* We don't want to resize the hash tables while a bacground saving
792 * is in progress: the saving child is created using fork() that is
793 * implemented with a copy-on-write semantic in most modern systems, so
794 * if we resize the HT while there is the saving child at work actually
795 * a lot of memory movements in the parent will cause a lot of pages
797 if (!server
.bgsaveinprogress
) tryResizeHashTables();
799 /* Show information about connected clients */
801 redisLog(REDIS_DEBUG
,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
802 listLength(server
.clients
)-listLength(server
.slaves
),
803 listLength(server
.slaves
),
805 dictSize(server
.sharingpool
));
808 /* Close connections of timedout clients */
809 if (server
.maxidletime
&& !(loops
% 10))
810 closeTimedoutClients();
812 /* Check if a background saving in progress terminated */
813 if (server
.bgsaveinprogress
) {
815 if (wait4(-1,&statloc
,WNOHANG
,NULL
)) {
816 int exitcode
= WEXITSTATUS(statloc
);
817 int bysignal
= WIFSIGNALED(statloc
);
819 if (!bysignal
&& exitcode
== 0) {
820 redisLog(REDIS_NOTICE
,
821 "Background saving terminated with success");
823 server
.lastsave
= time(NULL
);
824 } else if (!bysignal
&& exitcode
!= 0) {
825 redisLog(REDIS_WARNING
, "Background saving error");
827 redisLog(REDIS_WARNING
,
828 "Background saving terminated by signal");
829 rdbRemoveTempFile(server
.bgsavechildpid
);
831 server
.bgsaveinprogress
= 0;
832 server
.bgsavechildpid
= -1;
833 updateSlavesWaitingBgsave(exitcode
== 0 ? REDIS_OK
: REDIS_ERR
);
836 /* If there is not a background saving in progress check if
837 * we have to save now */
838 time_t now
= time(NULL
);
839 for (j
= 0; j
< server
.saveparamslen
; j
++) {
840 struct saveparam
*sp
= server
.saveparams
+j
;
842 if (server
.dirty
>= sp
->changes
&&
843 now
-server
.lastsave
> sp
->seconds
) {
844 redisLog(REDIS_NOTICE
,"%d changes in %d seconds. Saving...",
845 sp
->changes
, sp
->seconds
);
846 rdbSaveBackground(server
.dbfilename
);
852 /* Try to expire a few timed out keys */
853 for (j
= 0; j
< server
.dbnum
; j
++) {
854 redisDb
*db
= server
.db
+j
;
855 int num
= dictSize(db
->expires
);
858 time_t now
= time(NULL
);
860 if (num
> REDIS_EXPIRELOOKUPS_PER_CRON
)
861 num
= REDIS_EXPIRELOOKUPS_PER_CRON
;
866 if ((de
= dictGetRandomKey(db
->expires
)) == NULL
) break;
867 t
= (time_t) dictGetEntryVal(de
);
869 deleteKey(db
,dictGetEntryKey(de
));
875 /* Check if we should connect to a MASTER */
876 if (server
.replstate
== REDIS_REPL_CONNECT
) {
877 redisLog(REDIS_NOTICE
,"Connecting to MASTER...");
878 if (syncWithMaster() == REDIS_OK
) {
879 redisLog(REDIS_NOTICE
,"MASTER <-> SLAVE sync succeeded");
885 static void createSharedObjects(void) {
886 shared
.crlf
= createObject(REDIS_STRING
,sdsnew("\r\n"));
887 shared
.ok
= createObject(REDIS_STRING
,sdsnew("+OK\r\n"));
888 shared
.err
= createObject(REDIS_STRING
,sdsnew("-ERR\r\n"));
889 shared
.emptybulk
= createObject(REDIS_STRING
,sdsnew("$0\r\n\r\n"));
890 shared
.czero
= createObject(REDIS_STRING
,sdsnew(":0\r\n"));
891 shared
.cone
= createObject(REDIS_STRING
,sdsnew(":1\r\n"));
892 shared
.nullbulk
= createObject(REDIS_STRING
,sdsnew("$-1\r\n"));
893 shared
.nullmultibulk
= createObject(REDIS_STRING
,sdsnew("*-1\r\n"));
894 shared
.emptymultibulk
= createObject(REDIS_STRING
,sdsnew("*0\r\n"));
896 shared
.pong
= createObject(REDIS_STRING
,sdsnew("+PONG\r\n"));
897 shared
.wrongtypeerr
= createObject(REDIS_STRING
,sdsnew(
898 "-ERR Operation against a key holding the wrong kind of value\r\n"));
899 shared
.nokeyerr
= createObject(REDIS_STRING
,sdsnew(
900 "-ERR no such key\r\n"));
901 shared
.syntaxerr
= createObject(REDIS_STRING
,sdsnew(
902 "-ERR syntax error\r\n"));
903 shared
.sameobjecterr
= createObject(REDIS_STRING
,sdsnew(
904 "-ERR source and destination objects are the same\r\n"));
905 shared
.outofrangeerr
= createObject(REDIS_STRING
,sdsnew(
906 "-ERR index out of range\r\n"));
907 shared
.space
= createObject(REDIS_STRING
,sdsnew(" "));
908 shared
.colon
= createObject(REDIS_STRING
,sdsnew(":"));
909 shared
.plus
= createObject(REDIS_STRING
,sdsnew("+"));
910 shared
.select0
= createStringObject("select 0\r\n",10);
911 shared
.select1
= createStringObject("select 1\r\n",10);
912 shared
.select2
= createStringObject("select 2\r\n",10);
913 shared
.select3
= createStringObject("select 3\r\n",10);
914 shared
.select4
= createStringObject("select 4\r\n",10);
915 shared
.select5
= createStringObject("select 5\r\n",10);
916 shared
.select6
= createStringObject("select 6\r\n",10);
917 shared
.select7
= createStringObject("select 7\r\n",10);
918 shared
.select8
= createStringObject("select 8\r\n",10);
919 shared
.select9
= createStringObject("select 9\r\n",10);
922 static void appendServerSaveParams(time_t seconds
, int changes
) {
923 server
.saveparams
= zrealloc(server
.saveparams
,sizeof(struct saveparam
)*(server
.saveparamslen
+1));
924 if (server
.saveparams
== NULL
) oom("appendServerSaveParams");
925 server
.saveparams
[server
.saveparamslen
].seconds
= seconds
;
926 server
.saveparams
[server
.saveparamslen
].changes
= changes
;
927 server
.saveparamslen
++;
930 static void ResetServerSaveParams() {
931 zfree(server
.saveparams
);
932 server
.saveparams
= NULL
;
933 server
.saveparamslen
= 0;
936 static void initServerConfig() {
937 server
.dbnum
= REDIS_DEFAULT_DBNUM
;
938 server
.port
= REDIS_SERVERPORT
;
939 server
.verbosity
= REDIS_DEBUG
;
940 server
.maxidletime
= REDIS_MAXIDLETIME
;
941 server
.saveparams
= NULL
;
942 server
.logfile
= NULL
; /* NULL = log on standard output */
943 server
.bindaddr
= NULL
;
944 server
.glueoutputbuf
= 1;
945 server
.daemonize
= 0;
946 server
.pidfile
= "/var/run/redis.pid";
947 server
.dbfilename
= "dump.rdb";
948 server
.requirepass
= NULL
;
949 server
.shareobjects
= 0;
950 server
.sharingpoolsize
= 1024;
951 server
.maxclients
= 0;
952 server
.maxmemory
= 0;
953 ResetServerSaveParams();
955 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
956 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
957 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
958 /* Replication related */
960 server
.masterhost
= NULL
;
961 server
.masterport
= 6379;
962 server
.master
= NULL
;
963 server
.replstate
= REDIS_REPL_NONE
;
966 static void initServer() {
969 signal(SIGHUP
, SIG_IGN
);
970 signal(SIGPIPE
, SIG_IGN
);
971 setupSigSegvAction();
973 server
.clients
= listCreate();
974 server
.slaves
= listCreate();
975 server
.monitors
= listCreate();
976 server
.objfreelist
= listCreate();
977 createSharedObjects();
978 server
.el
= aeCreateEventLoop();
979 server
.db
= zmalloc(sizeof(redisDb
)*server
.dbnum
);
980 server
.sharingpool
= dictCreate(&setDictType
,NULL
);
981 if (!server
.db
|| !server
.clients
|| !server
.slaves
|| !server
.monitors
|| !server
.el
|| !server
.objfreelist
)
982 oom("server initialization"); /* Fatal OOM */
983 server
.fd
= anetTcpServer(server
.neterr
, server
.port
, server
.bindaddr
);
984 if (server
.fd
== -1) {
985 redisLog(REDIS_WARNING
, "Opening TCP port: %s", server
.neterr
);
988 for (j
= 0; j
< server
.dbnum
; j
++) {
989 server
.db
[j
].dict
= dictCreate(&hashDictType
,NULL
);
990 server
.db
[j
].expires
= dictCreate(&setDictType
,NULL
);
993 server
.cronloops
= 0;
994 server
.bgsaveinprogress
= 0;
995 server
.bgsavechildpid
= -1;
996 server
.lastsave
= time(NULL
);
998 server
.usedmemory
= 0;
999 server
.stat_numcommands
= 0;
1000 server
.stat_numconnections
= 0;
1001 server
.stat_starttime
= time(NULL
);
1002 aeCreateTimeEvent(server
.el
, 1000, serverCron
, NULL
, NULL
);
1005 /* Empty the whole database */
1006 static long long emptyDb() {
1008 long long removed
= 0;
1010 for (j
= 0; j
< server
.dbnum
; j
++) {
1011 removed
+= dictSize(server
.db
[j
].dict
);
1012 dictEmpty(server
.db
[j
].dict
);
1013 dictEmpty(server
.db
[j
].expires
);
1018 static int yesnotoi(char *s
) {
1019 if (!strcasecmp(s
,"yes")) return 1;
1020 else if (!strcasecmp(s
,"no")) return 0;
1024 /* I agree, this is a very rudimental way to load a configuration...
1025 will improve later if the config gets more complex */
1026 static void loadServerConfig(char *filename
) {
1028 char buf
[REDIS_CONFIGLINE_MAX
+1], *err
= NULL
;
1032 if (filename
[0] == '-' && filename
[1] == '\0')
1035 if ((fp
= fopen(filename
,"r")) == NULL
) {
1036 redisLog(REDIS_WARNING
,"Fatal error, can't open config file");
1041 while(fgets(buf
,REDIS_CONFIGLINE_MAX
+1,fp
) != NULL
) {
1047 line
= sdstrim(line
," \t\r\n");
1049 /* Skip comments and blank lines*/
1050 if (line
[0] == '#' || line
[0] == '\0') {
1055 /* Split into arguments */
1056 argv
= sdssplitlen(line
,sdslen(line
)," ",1,&argc
);
1057 sdstolower(argv
[0]);
1059 /* Execute config directives */
1060 if (!strcasecmp(argv
[0],"timeout") && argc
== 2) {
1061 server
.maxidletime
= atoi(argv
[1]);
1062 if (server
.maxidletime
< 0) {
1063 err
= "Invalid timeout value"; goto loaderr
;
1065 } else if (!strcasecmp(argv
[0],"port") && argc
== 2) {
1066 server
.port
= atoi(argv
[1]);
1067 if (server
.port
< 1 || server
.port
> 65535) {
1068 err
= "Invalid port"; goto loaderr
;
1070 } else if (!strcasecmp(argv
[0],"bind") && argc
== 2) {
1071 server
.bindaddr
= zstrdup(argv
[1]);
1072 } else if (!strcasecmp(argv
[0],"save") && argc
== 3) {
1073 int seconds
= atoi(argv
[1]);
1074 int changes
= atoi(argv
[2]);
1075 if (seconds
< 1 || changes
< 0) {
1076 err
= "Invalid save parameters"; goto loaderr
;
1078 appendServerSaveParams(seconds
,changes
);
1079 } else if (!strcasecmp(argv
[0],"dir") && argc
== 2) {
1080 if (chdir(argv
[1]) == -1) {
1081 redisLog(REDIS_WARNING
,"Can't chdir to '%s': %s",
1082 argv
[1], strerror(errno
));
1085 } else if (!strcasecmp(argv
[0],"loglevel") && argc
== 2) {
1086 if (!strcasecmp(argv
[1],"debug")) server
.verbosity
= REDIS_DEBUG
;
1087 else if (!strcasecmp(argv
[1],"notice")) server
.verbosity
= REDIS_NOTICE
;
1088 else if (!strcasecmp(argv
[1],"warning")) server
.verbosity
= REDIS_WARNING
;
1090 err
= "Invalid log level. Must be one of debug, notice, warning";
1093 } else if (!strcasecmp(argv
[0],"logfile") && argc
== 2) {
1096 server
.logfile
= zstrdup(argv
[1]);
1097 if (!strcasecmp(server
.logfile
,"stdout")) {
1098 zfree(server
.logfile
);
1099 server
.logfile
= NULL
;
1101 if (server
.logfile
) {
1102 /* Test if we are able to open the file. The server will not
1103 * be able to abort just for this problem later... */
1104 logfp
= fopen(server
.logfile
,"a");
1105 if (logfp
== NULL
) {
1106 err
= sdscatprintf(sdsempty(),
1107 "Can't open the log file: %s", strerror(errno
));
1112 } else if (!strcasecmp(argv
[0],"databases") && argc
== 2) {
1113 server
.dbnum
= atoi(argv
[1]);
1114 if (server
.dbnum
< 1) {
1115 err
= "Invalid number of databases"; goto loaderr
;
1117 } else if (!strcasecmp(argv
[0],"maxclients") && argc
== 2) {
1118 server
.maxclients
= atoi(argv
[1]);
1119 } else if (!strcasecmp(argv
[0],"maxmemory") && argc
== 2) {
1120 server
.maxmemory
= strtoll(argv
[1], NULL
, 10);
1121 } else if (!strcasecmp(argv
[0],"slaveof") && argc
== 3) {
1122 server
.masterhost
= sdsnew(argv
[1]);
1123 server
.masterport
= atoi(argv
[2]);
1124 server
.replstate
= REDIS_REPL_CONNECT
;
1125 } else if (!strcasecmp(argv
[0],"glueoutputbuf") && argc
== 2) {
1126 if ((server
.glueoutputbuf
= yesnotoi(argv
[1])) == -1) {
1127 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1129 } else if (!strcasecmp(argv
[0],"shareobjects") && argc
== 2) {
1130 if ((server
.shareobjects
= yesnotoi(argv
[1])) == -1) {
1131 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1133 } else if (!strcasecmp(argv
[0],"shareobjectspoolsize") && argc
== 2) {
1134 server
.sharingpoolsize
= atoi(argv
[1]);
1135 if (server
.sharingpoolsize
< 1) {
1136 err
= "invalid object sharing pool size"; goto loaderr
;
1138 } else if (!strcasecmp(argv
[0],"daemonize") && argc
== 2) {
1139 if ((server
.daemonize
= yesnotoi(argv
[1])) == -1) {
1140 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1142 } else if (!strcasecmp(argv
[0],"requirepass") && argc
== 2) {
1143 server
.requirepass
= zstrdup(argv
[1]);
1144 } else if (!strcasecmp(argv
[0],"pidfile") && argc
== 2) {
1145 server
.pidfile
= zstrdup(argv
[1]);
1146 } else if (!strcasecmp(argv
[0],"dbfilename") && argc
== 2) {
1147 server
.dbfilename
= zstrdup(argv
[1]);
1149 err
= "Bad directive or wrong number of arguments"; goto loaderr
;
1151 for (j
= 0; j
< argc
; j
++)
1156 if (fp
!= stdin
) fclose(fp
);
1160 fprintf(stderr
, "\n*** FATAL CONFIG FILE ERROR ***\n");
1161 fprintf(stderr
, "Reading the configuration file, at line %d\n", linenum
);
1162 fprintf(stderr
, ">>> '%s'\n", line
);
1163 fprintf(stderr
, "%s\n", err
);
1167 static void freeClientArgv(redisClient
*c
) {
1170 for (j
= 0; j
< c
->argc
; j
++)
1171 decrRefCount(c
->argv
[j
]);
1175 static void freeClient(redisClient
*c
) {
1178 aeDeleteFileEvent(server
.el
,c
->fd
,AE_READABLE
);
1179 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1180 sdsfree(c
->querybuf
);
1181 listRelease(c
->reply
);
1184 ln
= listSearchKey(server
.clients
,c
);
1186 listDelNode(server
.clients
,ln
);
1187 if (c
->flags
& REDIS_SLAVE
) {
1188 if (c
->replstate
== REDIS_REPL_SEND_BULK
&& c
->repldbfd
!= -1)
1190 list
*l
= (c
->flags
& REDIS_MONITOR
) ? server
.monitors
: server
.slaves
;
1191 ln
= listSearchKey(l
,c
);
1195 if (c
->flags
& REDIS_MASTER
) {
1196 server
.master
= NULL
;
1197 server
.replstate
= REDIS_REPL_CONNECT
;
1203 static void glueReplyBuffersIfNeeded(redisClient
*c
) {
1208 listRewind(c
->reply
);
1209 while((ln
= listYield(c
->reply
))) {
1211 totlen
+= sdslen(o
->ptr
);
1212 /* This optimization makes more sense if we don't have to copy
1214 if (totlen
> 1024) return;
1220 listRewind(c
->reply
);
1221 while((ln
= listYield(c
->reply
))) {
1223 memcpy(buf
+copylen
,o
->ptr
,sdslen(o
->ptr
));
1224 copylen
+= sdslen(o
->ptr
);
1225 listDelNode(c
->reply
,ln
);
1227 /* Now the output buffer is empty, add the new single element */
1228 o
= createObject(REDIS_STRING
,sdsnewlen(buf
,totlen
));
1229 if (!listAddNodeTail(c
->reply
,o
)) oom("listAddNodeTail");
1233 static void sendReplyToClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1234 redisClient
*c
= privdata
;
1235 int nwritten
= 0, totwritten
= 0, objlen
;
1238 REDIS_NOTUSED(mask
);
1240 if (server
.glueoutputbuf
&& listLength(c
->reply
) > 1)
1241 glueReplyBuffersIfNeeded(c
);
1242 while(listLength(c
->reply
)) {
1243 o
= listNodeValue(listFirst(c
->reply
));
1244 objlen
= sdslen(o
->ptr
);
1247 listDelNode(c
->reply
,listFirst(c
->reply
));
1251 if (c
->flags
& REDIS_MASTER
) {
1252 /* Don't reply to a master */
1253 nwritten
= objlen
- c
->sentlen
;
1255 nwritten
= write(fd
, ((char*)o
->ptr
)+c
->sentlen
, objlen
- c
->sentlen
);
1256 if (nwritten
<= 0) break;
1258 c
->sentlen
+= nwritten
;
1259 totwritten
+= nwritten
;
1260 /* If we fully sent the object on head go to the next one */
1261 if (c
->sentlen
== objlen
) {
1262 listDelNode(c
->reply
,listFirst(c
->reply
));
1265 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1266 * bytes, in a single threaded server it's a good idea to server
1267 * other clients as well, even if a very large request comes from
1268 * super fast link that is always able to accept data (in real world
1269 * terms think to 'KEYS *' against the loopback interfae) */
1270 if (totwritten
> REDIS_MAX_WRITE_PER_EVENT
) break;
1272 if (nwritten
== -1) {
1273 if (errno
== EAGAIN
) {
1276 redisLog(REDIS_DEBUG
,
1277 "Error writing to client: %s", strerror(errno
));
1282 if (totwritten
> 0) c
->lastinteraction
= time(NULL
);
1283 if (listLength(c
->reply
) == 0) {
1285 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1289 static struct redisCommand
*lookupCommand(char *name
) {
1291 while(cmdTable
[j
].name
!= NULL
) {
1292 if (!strcasecmp(name
,cmdTable
[j
].name
)) return &cmdTable
[j
];
1298 /* resetClient prepare the client to process the next command */
1299 static void resetClient(redisClient
*c
) {
1304 /* If this function gets called we already read a whole
1305 * command, argments are in the client argv/argc fields.
1306 * processCommand() execute the command or prepare the
1307 * server for a bulk read from the client.
1309 * If 1 is returned the client is still alive and valid and
1310 * and other operations can be performed by the caller. Otherwise
1311 * if 0 is returned the client was destroied (i.e. after QUIT). */
1312 static int processCommand(redisClient
*c
) {
1313 struct redisCommand
*cmd
;
1316 /* Free some memory if needed (maxmemory setting) */
1317 if (server
.maxmemory
) freeMemoryIfNeeded();
1319 /* The QUIT command is handled as a special case. Normal command
1320 * procs are unable to close the client connection safely */
1321 if (!strcasecmp(c
->argv
[0]->ptr
,"quit")) {
1325 cmd
= lookupCommand(c
->argv
[0]->ptr
);
1327 addReplySds(c
,sdsnew("-ERR unknown command\r\n"));
1330 } else if ((cmd
->arity
> 0 && cmd
->arity
!= c
->argc
) ||
1331 (c
->argc
< -cmd
->arity
)) {
1332 addReplySds(c
,sdsnew("-ERR wrong number of arguments\r\n"));
1335 } else if (server
.maxmemory
&& cmd
->flags
& REDIS_CMD_DENYOOM
&& zmalloc_used_memory() > server
.maxmemory
) {
1336 addReplySds(c
,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1339 } else if (cmd
->flags
& REDIS_CMD_BULK
&& c
->bulklen
== -1) {
1340 int bulklen
= atoi(c
->argv
[c
->argc
-1]->ptr
);
1342 decrRefCount(c
->argv
[c
->argc
-1]);
1343 if (bulklen
< 0 || bulklen
> 1024*1024*1024) {
1345 addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n"));
1350 c
->bulklen
= bulklen
+2; /* add two bytes for CR+LF */
1351 /* It is possible that the bulk read is already in the
1352 * buffer. Check this condition and handle it accordingly */
1353 if ((signed)sdslen(c
->querybuf
) >= c
->bulklen
) {
1354 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
1356 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
1361 /* Let's try to share objects on the command arguments vector */
1362 if (server
.shareobjects
) {
1364 for(j
= 1; j
< c
->argc
; j
++)
1365 c
->argv
[j
] = tryObjectSharing(c
->argv
[j
]);
1367 /* Let's try to encode the bulk object to save space. */
1368 if (cmd
->flags
& REDIS_CMD_BULK
)
1369 tryObjectEncoding(c
->argv
[c
->argc
-1]);
1371 /* Check if the user is authenticated */
1372 if (server
.requirepass
&& !c
->authenticated
&& cmd
->proc
!= authCommand
) {
1373 addReplySds(c
,sdsnew("-ERR operation not permitted\r\n"));
1378 /* Exec the command */
1379 dirty
= server
.dirty
;
1381 if (server
.dirty
-dirty
!= 0 && listLength(server
.slaves
))
1382 replicationFeedSlaves(server
.slaves
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1383 if (listLength(server
.monitors
))
1384 replicationFeedSlaves(server
.monitors
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1385 server
.stat_numcommands
++;
1387 /* Prepare the client for the next command */
1388 if (c
->flags
& REDIS_CLOSE
) {
1396 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
) {
1400 /* (args*2)+1 is enough room for args, spaces, newlines */
1401 robj
*static_outv
[REDIS_STATIC_ARGS
*2+1];
1403 if (argc
<= REDIS_STATIC_ARGS
) {
1406 outv
= zmalloc(sizeof(robj
*)*(argc
*2+1));
1407 if (!outv
) oom("replicationFeedSlaves");
1410 for (j
= 0; j
< argc
; j
++) {
1411 if (j
!= 0) outv
[outc
++] = shared
.space
;
1412 if ((cmd
->flags
& REDIS_CMD_BULK
) && j
== argc
-1) {
1415 lenobj
= createObject(REDIS_STRING
,
1416 sdscatprintf(sdsempty(),"%d\r\n",
1417 stringObjectLen(argv
[j
])));
1418 lenobj
->refcount
= 0;
1419 outv
[outc
++] = lenobj
;
1421 outv
[outc
++] = argv
[j
];
1423 outv
[outc
++] = shared
.crlf
;
1425 /* Increment all the refcounts at start and decrement at end in order to
1426 * be sure to free objects if there is no slave in a replication state
1427 * able to be feed with commands */
1428 for (j
= 0; j
< outc
; j
++) incrRefCount(outv
[j
]);
1430 while((ln
= listYield(slaves
))) {
1431 redisClient
*slave
= ln
->value
;
1433 /* Don't feed slaves that are still waiting for BGSAVE to start */
1434 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) continue;
1436 /* Feed all the other slaves, MONITORs and so on */
1437 if (slave
->slaveseldb
!= dictid
) {
1441 case 0: selectcmd
= shared
.select0
; break;
1442 case 1: selectcmd
= shared
.select1
; break;
1443 case 2: selectcmd
= shared
.select2
; break;
1444 case 3: selectcmd
= shared
.select3
; break;
1445 case 4: selectcmd
= shared
.select4
; break;
1446 case 5: selectcmd
= shared
.select5
; break;
1447 case 6: selectcmd
= shared
.select6
; break;
1448 case 7: selectcmd
= shared
.select7
; break;
1449 case 8: selectcmd
= shared
.select8
; break;
1450 case 9: selectcmd
= shared
.select9
; break;
1452 selectcmd
= createObject(REDIS_STRING
,
1453 sdscatprintf(sdsempty(),"select %d\r\n",dictid
));
1454 selectcmd
->refcount
= 0;
1457 addReply(slave
,selectcmd
);
1458 slave
->slaveseldb
= dictid
;
1460 for (j
= 0; j
< outc
; j
++) addReply(slave
,outv
[j
]);
1462 for (j
= 0; j
< outc
; j
++) decrRefCount(outv
[j
]);
1463 if (outv
!= static_outv
) zfree(outv
);
1466 static void readQueryFromClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1467 redisClient
*c
= (redisClient
*) privdata
;
1468 char buf
[REDIS_IOBUF_LEN
];
1471 REDIS_NOTUSED(mask
);
1473 nread
= read(fd
, buf
, REDIS_IOBUF_LEN
);
1475 if (errno
== EAGAIN
) {
1478 redisLog(REDIS_DEBUG
, "Reading from client: %s",strerror(errno
));
1482 } else if (nread
== 0) {
1483 redisLog(REDIS_DEBUG
, "Client closed connection");
1488 c
->querybuf
= sdscatlen(c
->querybuf
, buf
, nread
);
1489 c
->lastinteraction
= time(NULL
);
1495 if (c
->bulklen
== -1) {
1496 /* Read the first line of the query */
1497 char *p
= strchr(c
->querybuf
,'\n');
1504 query
= c
->querybuf
;
1505 c
->querybuf
= sdsempty();
1506 querylen
= 1+(p
-(query
));
1507 if (sdslen(query
) > querylen
) {
1508 /* leave data after the first line of the query in the buffer */
1509 c
->querybuf
= sdscatlen(c
->querybuf
,query
+querylen
,sdslen(query
)-querylen
);
1511 *p
= '\0'; /* remove "\n" */
1512 if (*(p
-1) == '\r') *(p
-1) = '\0'; /* and "\r" if any */
1513 sdsupdatelen(query
);
1515 /* Now we can split the query in arguments */
1516 if (sdslen(query
) == 0) {
1517 /* Ignore empty query */
1521 argv
= sdssplitlen(query
,sdslen(query
)," ",1,&argc
);
1522 if (argv
== NULL
) oom("sdssplitlen");
1525 if (c
->argv
) zfree(c
->argv
);
1526 c
->argv
= zmalloc(sizeof(robj
*)*argc
);
1527 if (c
->argv
== NULL
) oom("allocating arguments list for client");
1529 for (j
= 0; j
< argc
; j
++) {
1530 if (sdslen(argv
[j
])) {
1531 c
->argv
[c
->argc
] = createObject(REDIS_STRING
,argv
[j
]);
1538 /* Execute the command. If the client is still valid
1539 * after processCommand() return and there is something
1540 * on the query buffer try to process the next command. */
1541 if (c
->argc
&& processCommand(c
) && sdslen(c
->querybuf
)) goto again
;
1543 } else if (sdslen(c
->querybuf
) >= REDIS_REQUEST_MAX_SIZE
) {
1544 redisLog(REDIS_DEBUG
, "Client protocol error");
1549 /* Bulk read handling. Note that if we are at this point
1550 the client already sent a command terminated with a newline,
1551 we are reading the bulk data that is actually the last
1552 argument of the command. */
1553 int qbl
= sdslen(c
->querybuf
);
1555 if (c
->bulklen
<= qbl
) {
1556 /* Copy everything but the final CRLF as final argument */
1557 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
1559 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
1566 static int selectDb(redisClient
*c
, int id
) {
1567 if (id
< 0 || id
>= server
.dbnum
)
1569 c
->db
= &server
.db
[id
];
1573 static void *dupClientReplyValue(void *o
) {
1574 incrRefCount((robj
*)o
);
1578 static redisClient
*createClient(int fd
) {
1579 redisClient
*c
= zmalloc(sizeof(*c
));
1581 anetNonBlock(NULL
,fd
);
1582 anetTcpNoDelay(NULL
,fd
);
1583 if (!c
) return NULL
;
1586 c
->querybuf
= sdsempty();
1592 c
->lastinteraction
= time(NULL
);
1593 c
->authenticated
= 0;
1594 c
->replstate
= REDIS_REPL_NONE
;
1595 if ((c
->reply
= listCreate()) == NULL
) oom("listCreate");
1596 listSetFreeMethod(c
->reply
,decrRefCount
);
1597 listSetDupMethod(c
->reply
,dupClientReplyValue
);
1598 if (aeCreateFileEvent(server
.el
, c
->fd
, AE_READABLE
,
1599 readQueryFromClient
, c
, NULL
) == AE_ERR
) {
1603 if (!listAddNodeTail(server
.clients
,c
)) oom("listAddNodeTail");
1607 static void addReply(redisClient
*c
, robj
*obj
) {
1608 if (listLength(c
->reply
) == 0 &&
1609 (c
->replstate
== REDIS_REPL_NONE
||
1610 c
->replstate
== REDIS_REPL_ONLINE
) &&
1611 aeCreateFileEvent(server
.el
, c
->fd
, AE_WRITABLE
,
1612 sendReplyToClient
, c
, NULL
) == AE_ERR
) return;
1613 if (obj
->encoding
!= REDIS_ENCODING_RAW
) {
1614 obj
= getDecodedObject(obj
);
1618 if (!listAddNodeTail(c
->reply
,obj
)) oom("listAddNodeTail");
1621 static void addReplySds(redisClient
*c
, sds s
) {
1622 robj
*o
= createObject(REDIS_STRING
,s
);
1627 static void addReplyBulkLen(redisClient
*c
, robj
*obj
) {
1630 if (obj
->encoding
== REDIS_ENCODING_RAW
) {
1631 len
= sdslen(obj
->ptr
);
1633 long n
= (long)obj
->ptr
;
1640 while((n
= n
/10) != 0) {
1644 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",len
));
1647 static void acceptHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1652 REDIS_NOTUSED(mask
);
1653 REDIS_NOTUSED(privdata
);
1655 cfd
= anetAccept(server
.neterr
, fd
, cip
, &cport
);
1656 if (cfd
== AE_ERR
) {
1657 redisLog(REDIS_DEBUG
,"Accepting client connection: %s", server
.neterr
);
1660 redisLog(REDIS_DEBUG
,"Accepted %s:%d", cip
, cport
);
1661 if ((c
= createClient(cfd
)) == NULL
) {
1662 redisLog(REDIS_WARNING
,"Error allocating resoures for the client");
1663 close(cfd
); /* May be already closed, just ingore errors */
1666 /* If maxclient directive is set and this is one client more... close the
1667 * connection. Note that we create the client instead to check before
1668 * for this condition, since now the socket is already set in nonblocking
1669 * mode and we can send an error for free using the Kernel I/O */
1670 if (server
.maxclients
&& listLength(server
.clients
) > server
.maxclients
) {
1671 char *err
= "-ERR max number of clients reached\r\n";
1673 /* That's a best effort error message, don't check write errors */
1674 (void) write(c
->fd
,err
,strlen(err
));
1678 server
.stat_numconnections
++;
1681 /* ======================= Redis objects implementation ===================== */
1683 static robj
*createObject(int type
, void *ptr
) {
1686 if (listLength(server
.objfreelist
)) {
1687 listNode
*head
= listFirst(server
.objfreelist
);
1688 o
= listNodeValue(head
);
1689 listDelNode(server
.objfreelist
,head
);
1691 o
= zmalloc(sizeof(*o
));
1693 if (!o
) oom("createObject");
1695 o
->encoding
= REDIS_ENCODING_RAW
;
1701 static robj
*createStringObject(char *ptr
, size_t len
) {
1702 return createObject(REDIS_STRING
,sdsnewlen(ptr
,len
));
1705 static robj
*createListObject(void) {
1706 list
*l
= listCreate();
1708 if (!l
) oom("listCreate");
1709 listSetFreeMethod(l
,decrRefCount
);
1710 return createObject(REDIS_LIST
,l
);
1713 static robj
*createSetObject(void) {
1714 dict
*d
= dictCreate(&setDictType
,NULL
);
1715 if (!d
) oom("dictCreate");
1716 return createObject(REDIS_SET
,d
);
1719 static void freeStringObject(robj
*o
) {
1720 if (o
->encoding
== REDIS_ENCODING_RAW
) {
1725 static void freeListObject(robj
*o
) {
1726 listRelease((list
*) o
->ptr
);
1729 static void freeSetObject(robj
*o
) {
1730 dictRelease((dict
*) o
->ptr
);
1733 static void freeHashObject(robj
*o
) {
1734 dictRelease((dict
*) o
->ptr
);
1737 static void incrRefCount(robj
*o
) {
1739 #ifdef DEBUG_REFCOUNT
1740 if (o
->type
== REDIS_STRING
)
1741 printf("Increment '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
);
1745 static void decrRefCount(void *obj
) {
1748 #ifdef DEBUG_REFCOUNT
1749 if (o
->type
== REDIS_STRING
)
1750 printf("Decrement '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
-1);
1752 if (--(o
->refcount
) == 0) {
1754 case REDIS_STRING
: freeStringObject(o
); break;
1755 case REDIS_LIST
: freeListObject(o
); break;
1756 case REDIS_SET
: freeSetObject(o
); break;
1757 case REDIS_HASH
: freeHashObject(o
); break;
1758 default: assert(0 != 0); break;
1760 if (listLength(server
.objfreelist
) > REDIS_OBJFREELIST_MAX
||
1761 !listAddNodeHead(server
.objfreelist
,o
))
1766 static robj
*lookupKey(redisDb
*db
, robj
*key
) {
1767 dictEntry
*de
= dictFind(db
->dict
,key
);
1768 return de
? dictGetEntryVal(de
) : NULL
;
1771 static robj
*lookupKeyRead(redisDb
*db
, robj
*key
) {
1772 expireIfNeeded(db
,key
);
1773 return lookupKey(db
,key
);
1776 static robj
*lookupKeyWrite(redisDb
*db
, robj
*key
) {
1777 deleteIfVolatile(db
,key
);
1778 return lookupKey(db
,key
);
1781 static int deleteKey(redisDb
*db
, robj
*key
) {
1784 /* We need to protect key from destruction: after the first dictDelete()
1785 * it may happen that 'key' is no longer valid if we don't increment
1786 * it's count. This may happen when we get the object reference directly
1787 * from the hash table with dictRandomKey() or dict iterators */
1789 if (dictSize(db
->expires
)) dictDelete(db
->expires
,key
);
1790 retval
= dictDelete(db
->dict
,key
);
1793 return retval
== DICT_OK
;
1796 /* Try to share an object against the shared objects pool */
1797 static robj
*tryObjectSharing(robj
*o
) {
1798 struct dictEntry
*de
;
1801 if (o
== NULL
|| server
.shareobjects
== 0) return o
;
1803 assert(o
->type
== REDIS_STRING
);
1804 de
= dictFind(server
.sharingpool
,o
);
1806 robj
*shared
= dictGetEntryKey(de
);
1808 c
= ((unsigned long) dictGetEntryVal(de
))+1;
1809 dictGetEntryVal(de
) = (void*) c
;
1810 incrRefCount(shared
);
1814 /* Here we are using a stream algorihtm: Every time an object is
1815 * shared we increment its count, everytime there is a miss we
1816 * recrement the counter of a random object. If this object reaches
1817 * zero we remove the object and put the current object instead. */
1818 if (dictSize(server
.sharingpool
) >=
1819 server
.sharingpoolsize
) {
1820 de
= dictGetRandomKey(server
.sharingpool
);
1822 c
= ((unsigned long) dictGetEntryVal(de
))-1;
1823 dictGetEntryVal(de
) = (void*) c
;
1825 dictDelete(server
.sharingpool
,de
->key
);
1828 c
= 0; /* If the pool is empty we want to add this object */
1833 retval
= dictAdd(server
.sharingpool
,o
,(void*)1);
1834 assert(retval
== DICT_OK
);
1841 /* Check if the nul-terminated string 's' can be represented by a long
1842 * (that is, is a number that fits into long without any other space or
1843 * character before or after the digits).
1845 * If so, the function returns REDIS_OK and *longval is set to the value
1846 * of the number. Otherwise REDIS_ERR is returned */
1847 static int isStringRepresentableAsLong(char *s
, long *longval
) {
1848 char buf
[32], *endptr
;
1852 value
= strtol(s
, &endptr
, 10);
1853 if (endptr
[0] != '\0') return REDIS_ERR
;
1854 slen
= snprintf(buf
,32,"%ld",value
);
1856 /* If the number converted back into a string is not identical
1857 * then it's not possible to encode the string as integer */
1858 if (strlen(buf
) != (unsigned)slen
|| memcmp(buf
,s
,slen
)) return REDIS_ERR
;
1859 if (longval
) *longval
= value
;
1863 /* Try to encode a string object in order to save space */
1864 static int tryObjectEncoding(robj
*o
) {
1868 if (o
->encoding
!= REDIS_ENCODING_RAW
)
1869 return REDIS_ERR
; /* Already encoded */
1871 /* It's not save to encode shared objects: shared objects can be shared
1872 * everywhere in the "object space" of Redis. Encoded objects can only
1873 * appear as "values" (and not, for instance, as keys) */
1874 if (o
->refcount
> 1) return REDIS_ERR
;
1876 /* Currently we try to encode only strings */
1877 assert(o
->type
== REDIS_STRING
);
1879 /* Check if we can represent this string as a long integer */
1880 if (isStringRepresentableAsLong(s
,&value
) == REDIS_ERR
) return REDIS_ERR
;
1882 /* Ok, this object can be encoded */
1883 o
->encoding
= REDIS_ENCODING_INT
;
1885 o
->ptr
= (void*) value
;
1889 /* Get a decoded version of an encoded object (returned as a new object) */
1890 static robj
*getDecodedObject(const robj
*o
) {
1893 assert(o
->encoding
!= REDIS_ENCODING_RAW
);
1894 if (o
->type
== REDIS_STRING
&& o
->encoding
== REDIS_ENCODING_INT
) {
1897 snprintf(buf
,32,"%ld",(long)o
->ptr
);
1898 dec
= createStringObject(buf
,strlen(buf
));
1905 static int compareStringObjects(robj
*a
, robj
*b
) {
1906 assert(a
->type
== REDIS_STRING
&& b
->type
== REDIS_STRING
);
1908 if (a
->encoding
== REDIS_ENCODING_INT
&& b
->encoding
== REDIS_ENCODING_INT
){
1909 return (long)a
->ptr
- (long)b
->ptr
;
1915 if (a
->encoding
!= REDIS_ENCODING_RAW
) a
= getDecodedObject(a
);
1916 if (b
->encoding
!= REDIS_ENCODING_RAW
) b
= getDecodedObject(a
);
1917 retval
= sdscmp(a
->ptr
,b
->ptr
);
1924 static size_t stringObjectLen(robj
*o
) {
1925 assert(o
->type
== REDIS_STRING
);
1926 if (o
->encoding
== REDIS_ENCODING_RAW
) {
1927 return sdslen(o
->ptr
);
1931 return snprintf(buf
,32,"%ld",(long)o
->ptr
);
1935 /*============================ DB saving/loading ============================ */
1937 static int rdbSaveType(FILE *fp
, unsigned char type
) {
1938 if (fwrite(&type
,1,1,fp
) == 0) return -1;
1942 static int rdbSaveTime(FILE *fp
, time_t t
) {
1943 int32_t t32
= (int32_t) t
;
1944 if (fwrite(&t32
,4,1,fp
) == 0) return -1;
1948 /* check rdbLoadLen() comments for more info */
1949 static int rdbSaveLen(FILE *fp
, uint32_t len
) {
1950 unsigned char buf
[2];
1953 /* Save a 6 bit len */
1954 buf
[0] = (len
&0xFF)|(REDIS_RDB_6BITLEN
<<6);
1955 if (fwrite(buf
,1,1,fp
) == 0) return -1;
1956 } else if (len
< (1<<14)) {
1957 /* Save a 14 bit len */
1958 buf
[0] = ((len
>>8)&0xFF)|(REDIS_RDB_14BITLEN
<<6);
1960 if (fwrite(buf
,2,1,fp
) == 0) return -1;
1962 /* Save a 32 bit len */
1963 buf
[0] = (REDIS_RDB_32BITLEN
<<6);
1964 if (fwrite(buf
,1,1,fp
) == 0) return -1;
1966 if (fwrite(&len
,4,1,fp
) == 0) return -1;
1971 /* String objects in the form "2391" "-100" without any space and with a
1972 * range of values that can fit in an 8, 16 or 32 bit signed value can be
1973 * encoded as integers to save space */
1974 static int rdbTryIntegerEncoding(sds s
, unsigned char *enc
) {
1976 char *endptr
, buf
[32];
1978 /* Check if it's possible to encode this value as a number */
1979 value
= strtoll(s
, &endptr
, 10);
1980 if (endptr
[0] != '\0') return 0;
1981 snprintf(buf
,32,"%lld",value
);
1983 /* If the number converted back into a string is not identical
1984 * then it's not possible to encode the string as integer */
1985 if (strlen(buf
) != sdslen(s
) || memcmp(buf
,s
,sdslen(s
))) return 0;
1987 /* Finally check if it fits in our ranges */
1988 if (value
>= -(1<<7) && value
<= (1<<7)-1) {
1989 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT8
;
1990 enc
[1] = value
&0xFF;
1992 } else if (value
>= -(1<<15) && value
<= (1<<15)-1) {
1993 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT16
;
1994 enc
[1] = value
&0xFF;
1995 enc
[2] = (value
>>8)&0xFF;
1997 } else if (value
>= -((long long)1<<31) && value
<= ((long long)1<<31)-1) {
1998 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT32
;
1999 enc
[1] = value
&0xFF;
2000 enc
[2] = (value
>>8)&0xFF;
2001 enc
[3] = (value
>>16)&0xFF;
2002 enc
[4] = (value
>>24)&0xFF;
2009 static int rdbSaveLzfStringObject(FILE *fp
, robj
*obj
) {
2010 unsigned int comprlen
, outlen
;
2014 /* We require at least four bytes compression for this to be worth it */
2015 outlen
= sdslen(obj
->ptr
)-4;
2016 if (outlen
<= 0) return 0;
2017 if ((out
= zmalloc(outlen
+1)) == NULL
) return 0;
2018 comprlen
= lzf_compress(obj
->ptr
, sdslen(obj
->ptr
), out
, outlen
);
2019 if (comprlen
== 0) {
2023 /* Data compressed! Let's save it on disk */
2024 byte
= (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_LZF
;
2025 if (fwrite(&byte
,1,1,fp
) == 0) goto writeerr
;
2026 if (rdbSaveLen(fp
,comprlen
) == -1) goto writeerr
;
2027 if (rdbSaveLen(fp
,sdslen(obj
->ptr
)) == -1) goto writeerr
;
2028 if (fwrite(out
,comprlen
,1,fp
) == 0) goto writeerr
;
2037 /* Save a string objet as [len][data] on disk. If the object is a string
2038 * representation of an integer value we try to safe it in a special form */
2039 static int rdbSaveStringObjectRaw(FILE *fp
, robj
*obj
) {
2043 len
= sdslen(obj
->ptr
);
2045 /* Try integer encoding */
2047 unsigned char buf
[5];
2048 if ((enclen
= rdbTryIntegerEncoding(obj
->ptr
,buf
)) > 0) {
2049 if (fwrite(buf
,enclen
,1,fp
) == 0) return -1;
2054 /* Try LZF compression - under 20 bytes it's unable to compress even
2055 * aaaaaaaaaaaaaaaaaa so skip it */
2059 retval
= rdbSaveLzfStringObject(fp
,obj
);
2060 if (retval
== -1) return -1;
2061 if (retval
> 0) return 0;
2062 /* retval == 0 means data can't be compressed, save the old way */
2065 /* Store verbatim */
2066 if (rdbSaveLen(fp
,len
) == -1) return -1;
2067 if (len
&& fwrite(obj
->ptr
,len
,1,fp
) == 0) return -1;
2071 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2072 static int rdbSaveStringObject(FILE *fp
, robj
*obj
) {
2076 if (obj
->encoding
!= REDIS_ENCODING_RAW
) {
2077 dec
= getDecodedObject(obj
);
2078 retval
= rdbSaveStringObjectRaw(fp
,dec
);
2082 return rdbSaveStringObjectRaw(fp
,obj
);
2086 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2087 static int rdbSave(char *filename
) {
2088 dictIterator
*di
= NULL
;
2093 time_t now
= time(NULL
);
2095 snprintf(tmpfile
,256,"temp-%d.rdb", (int) getpid());
2096 fp
= fopen(tmpfile
,"w");
2098 redisLog(REDIS_WARNING
, "Failed saving the DB: %s", strerror(errno
));
2101 if (fwrite("REDIS0001",9,1,fp
) == 0) goto werr
;
2102 for (j
= 0; j
< server
.dbnum
; j
++) {
2103 redisDb
*db
= server
.db
+j
;
2105 if (dictSize(d
) == 0) continue;
2106 di
= dictGetIterator(d
);
2112 /* Write the SELECT DB opcode */
2113 if (rdbSaveType(fp
,REDIS_SELECTDB
) == -1) goto werr
;
2114 if (rdbSaveLen(fp
,j
) == -1) goto werr
;
2116 /* Iterate this DB writing every entry */
2117 while((de
= dictNext(di
)) != NULL
) {
2118 robj
*key
= dictGetEntryKey(de
);
2119 robj
*o
= dictGetEntryVal(de
);
2120 time_t expiretime
= getExpire(db
,key
);
2122 /* Save the expire time */
2123 if (expiretime
!= -1) {
2124 /* If this key is already expired skip it */
2125 if (expiretime
< now
) continue;
2126 if (rdbSaveType(fp
,REDIS_EXPIRETIME
) == -1) goto werr
;
2127 if (rdbSaveTime(fp
,expiretime
) == -1) goto werr
;
2129 /* Save the key and associated value */
2130 if (rdbSaveType(fp
,o
->type
) == -1) goto werr
;
2131 if (rdbSaveStringObject(fp
,key
) == -1) goto werr
;
2132 if (o
->type
== REDIS_STRING
) {
2133 /* Save a string value */
2134 if (rdbSaveStringObject(fp
,o
) == -1) goto werr
;
2135 } else if (o
->type
== REDIS_LIST
) {
2136 /* Save a list value */
2137 list
*list
= o
->ptr
;
2141 if (rdbSaveLen(fp
,listLength(list
)) == -1) goto werr
;
2142 while((ln
= listYield(list
))) {
2143 robj
*eleobj
= listNodeValue(ln
);
2145 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
2147 } else if (o
->type
== REDIS_SET
) {
2148 /* Save a set value */
2150 dictIterator
*di
= dictGetIterator(set
);
2153 if (!set
) oom("dictGetIteraotr");
2154 if (rdbSaveLen(fp
,dictSize(set
)) == -1) goto werr
;
2155 while((de
= dictNext(di
)) != NULL
) {
2156 robj
*eleobj
= dictGetEntryKey(de
);
2158 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
2160 dictReleaseIterator(di
);
2165 dictReleaseIterator(di
);
2168 if (rdbSaveType(fp
,REDIS_EOF
) == -1) goto werr
;
2170 /* Make sure data will not remain on the OS's output buffers */
2175 /* Use RENAME to make sure the DB file is changed atomically only
2176 * if the generate DB file is ok. */
2177 if (rename(tmpfile
,filename
) == -1) {
2178 redisLog(REDIS_WARNING
,"Error moving temp DB file on the final destionation: %s", strerror(errno
));
2182 redisLog(REDIS_NOTICE
,"DB saved on disk");
2184 server
.lastsave
= time(NULL
);
2190 redisLog(REDIS_WARNING
,"Write error saving DB on disk: %s", strerror(errno
));
2191 if (di
) dictReleaseIterator(di
);
2195 static int rdbSaveBackground(char *filename
) {
2198 if (server
.bgsaveinprogress
) return REDIS_ERR
;
2199 if ((childpid
= fork()) == 0) {
2202 if (rdbSave(filename
) == REDIS_OK
) {
2209 if (childpid
== -1) {
2210 redisLog(REDIS_WARNING
,"Can't save in background: fork: %s",
2214 redisLog(REDIS_NOTICE
,"Background saving started by pid %d",childpid
);
2215 server
.bgsaveinprogress
= 1;
2216 server
.bgsavechildpid
= childpid
;
2219 return REDIS_OK
; /* unreached */
2222 static void rdbRemoveTempFile(pid_t childpid
) {
2225 snprintf(tmpfile
,256,"temp-%d.rdb", (int) childpid
);
2229 static int rdbLoadType(FILE *fp
) {
2231 if (fread(&type
,1,1,fp
) == 0) return -1;
2235 static time_t rdbLoadTime(FILE *fp
) {
2237 if (fread(&t32
,4,1,fp
) == 0) return -1;
2238 return (time_t) t32
;
2241 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2242 * of this file for a description of how this are stored on disk.
2244 * isencoded is set to 1 if the readed length is not actually a length but
2245 * an "encoding type", check the above comments for more info */
2246 static uint32_t rdbLoadLen(FILE *fp
, int rdbver
, int *isencoded
) {
2247 unsigned char buf
[2];
2250 if (isencoded
) *isencoded
= 0;
2252 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
2257 if (fread(buf
,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2258 type
= (buf
[0]&0xC0)>>6;
2259 if (type
== REDIS_RDB_6BITLEN
) {
2260 /* Read a 6 bit len */
2262 } else if (type
== REDIS_RDB_ENCVAL
) {
2263 /* Read a 6 bit len encoding type */
2264 if (isencoded
) *isencoded
= 1;
2266 } else if (type
== REDIS_RDB_14BITLEN
) {
2267 /* Read a 14 bit len */
2268 if (fread(buf
+1,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2269 return ((buf
[0]&0x3F)<<8)|buf
[1];
2271 /* Read a 32 bit len */
2272 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
2278 static robj
*rdbLoadIntegerObject(FILE *fp
, int enctype
) {
2279 unsigned char enc
[4];
2282 if (enctype
== REDIS_RDB_ENC_INT8
) {
2283 if (fread(enc
,1,1,fp
) == 0) return NULL
;
2284 val
= (signed char)enc
[0];
2285 } else if (enctype
== REDIS_RDB_ENC_INT16
) {
2287 if (fread(enc
,2,1,fp
) == 0) return NULL
;
2288 v
= enc
[0]|(enc
[1]<<8);
2290 } else if (enctype
== REDIS_RDB_ENC_INT32
) {
2292 if (fread(enc
,4,1,fp
) == 0) return NULL
;
2293 v
= enc
[0]|(enc
[1]<<8)|(enc
[2]<<16)|(enc
[3]<<24);
2296 val
= 0; /* anti-warning */
2299 return createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",val
));
2302 static robj
*rdbLoadLzfStringObject(FILE*fp
, int rdbver
) {
2303 unsigned int len
, clen
;
2304 unsigned char *c
= NULL
;
2307 if ((clen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2308 if ((len
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2309 if ((c
= zmalloc(clen
)) == NULL
) goto err
;
2310 if ((val
= sdsnewlen(NULL
,len
)) == NULL
) goto err
;
2311 if (fread(c
,clen
,1,fp
) == 0) goto err
;
2312 if (lzf_decompress(c
,clen
,val
,len
) == 0) goto err
;
2314 return createObject(REDIS_STRING
,val
);
2321 static robj
*rdbLoadStringObject(FILE*fp
, int rdbver
) {
2326 len
= rdbLoadLen(fp
,rdbver
,&isencoded
);
2329 case REDIS_RDB_ENC_INT8
:
2330 case REDIS_RDB_ENC_INT16
:
2331 case REDIS_RDB_ENC_INT32
:
2332 return tryObjectSharing(rdbLoadIntegerObject(fp
,len
));
2333 case REDIS_RDB_ENC_LZF
:
2334 return tryObjectSharing(rdbLoadLzfStringObject(fp
,rdbver
));
2340 if (len
== REDIS_RDB_LENERR
) return NULL
;
2341 val
= sdsnewlen(NULL
,len
);
2342 if (len
&& fread(val
,len
,1,fp
) == 0) {
2346 return tryObjectSharing(createObject(REDIS_STRING
,val
));
2349 static int rdbLoad(char *filename
) {
2351 robj
*keyobj
= NULL
;
2353 int type
, retval
, rdbver
;
2354 dict
*d
= server
.db
[0].dict
;
2355 redisDb
*db
= server
.db
+0;
2357 time_t expiretime
= -1, now
= time(NULL
);
2359 fp
= fopen(filename
,"r");
2360 if (!fp
) return REDIS_ERR
;
2361 if (fread(buf
,9,1,fp
) == 0) goto eoferr
;
2363 if (memcmp(buf
,"REDIS",5) != 0) {
2365 redisLog(REDIS_WARNING
,"Wrong signature trying to load DB from file");
2368 rdbver
= atoi(buf
+5);
2371 redisLog(REDIS_WARNING
,"Can't handle RDB format version %d",rdbver
);
2378 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
2379 if (type
== REDIS_EXPIRETIME
) {
2380 if ((expiretime
= rdbLoadTime(fp
)) == -1) goto eoferr
;
2381 /* We read the time so we need to read the object type again */
2382 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
2384 if (type
== REDIS_EOF
) break;
2385 /* Handle SELECT DB opcode as a special case */
2386 if (type
== REDIS_SELECTDB
) {
2387 if ((dbid
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2389 if (dbid
>= (unsigned)server
.dbnum
) {
2390 redisLog(REDIS_WARNING
,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server
.dbnum
);
2393 db
= server
.db
+dbid
;
2398 if ((keyobj
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2400 if (type
== REDIS_STRING
) {
2401 /* Read string value */
2402 if ((o
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2403 tryObjectEncoding(o
);
2404 } else if (type
== REDIS_LIST
|| type
== REDIS_SET
) {
2405 /* Read list/set value */
2408 if ((listlen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2410 o
= (type
== REDIS_LIST
) ? createListObject() : createSetObject();
2411 /* Load every single element of the list/set */
2415 if ((ele
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2416 tryObjectEncoding(ele
);
2417 if (type
== REDIS_LIST
) {
2418 if (!listAddNodeTail((list
*)o
->ptr
,ele
))
2419 oom("listAddNodeTail");
2421 if (dictAdd((dict
*)o
->ptr
,ele
,NULL
) == DICT_ERR
)
2428 /* Add the new object in the hash table */
2429 retval
= dictAdd(d
,keyobj
,o
);
2430 if (retval
== DICT_ERR
) {
2431 redisLog(REDIS_WARNING
,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj
->ptr
);
2434 /* Set the expire time if needed */
2435 if (expiretime
!= -1) {
2436 setExpire(db
,keyobj
,expiretime
);
2437 /* Delete this key if already expired */
2438 if (expiretime
< now
) deleteKey(db
,keyobj
);
2446 eoferr
: /* unexpected end of file is handled here with a fatal exit */
2447 if (keyobj
) decrRefCount(keyobj
);
2448 redisLog(REDIS_WARNING
,"Short read or OOM loading DB. Unrecoverable error, exiting now.");
2450 return REDIS_ERR
; /* Just to avoid warning */
2453 /*================================== Commands =============================== */
2455 static void authCommand(redisClient
*c
) {
2456 if (!server
.requirepass
|| !strcmp(c
->argv
[1]->ptr
, server
.requirepass
)) {
2457 c
->authenticated
= 1;
2458 addReply(c
,shared
.ok
);
2460 c
->authenticated
= 0;
2461 addReply(c
,shared
.err
);
2465 static void pingCommand(redisClient
*c
) {
2466 addReply(c
,shared
.pong
);
2469 static void echoCommand(redisClient
*c
) {
2470 addReplyBulkLen(c
,c
->argv
[1]);
2471 addReply(c
,c
->argv
[1]);
2472 addReply(c
,shared
.crlf
);
2475 /*=================================== Strings =============================== */
2477 static void setGenericCommand(redisClient
*c
, int nx
) {
2480 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2481 if (retval
== DICT_ERR
) {
2483 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2484 incrRefCount(c
->argv
[2]);
2486 addReply(c
,shared
.czero
);
2490 incrRefCount(c
->argv
[1]);
2491 incrRefCount(c
->argv
[2]);
2494 removeExpire(c
->db
,c
->argv
[1]);
2495 addReply(c
, nx
? shared
.cone
: shared
.ok
);
2498 static void setCommand(redisClient
*c
) {
2499 setGenericCommand(c
,0);
2502 static void setnxCommand(redisClient
*c
) {
2503 setGenericCommand(c
,1);
2506 static void getCommand(redisClient
*c
) {
2507 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2510 addReply(c
,shared
.nullbulk
);
2512 if (o
->type
!= REDIS_STRING
) {
2513 addReply(c
,shared
.wrongtypeerr
);
2515 addReplyBulkLen(c
,o
);
2517 addReply(c
,shared
.crlf
);
2522 static void getSetCommand(redisClient
*c
) {
2524 if (dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]) == DICT_ERR
) {
2525 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2527 incrRefCount(c
->argv
[1]);
2529 incrRefCount(c
->argv
[2]);
2531 removeExpire(c
->db
,c
->argv
[1]);
2534 static void mgetCommand(redisClient
*c
) {
2537 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",c
->argc
-1));
2538 for (j
= 1; j
< c
->argc
; j
++) {
2539 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[j
]);
2541 addReply(c
,shared
.nullbulk
);
2543 if (o
->type
!= REDIS_STRING
) {
2544 addReply(c
,shared
.nullbulk
);
2546 addReplyBulkLen(c
,o
);
2548 addReply(c
,shared
.crlf
);
2554 static void incrDecrCommand(redisClient
*c
, long long incr
) {
2559 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2563 if (o
->type
!= REDIS_STRING
) {
2568 if (o
->encoding
== REDIS_ENCODING_RAW
)
2569 value
= strtoll(o
->ptr
, &eptr
, 10);
2570 else if (o
->encoding
== REDIS_ENCODING_INT
)
2571 value
= (long)o
->ptr
;
2578 o
= createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",value
));
2579 tryObjectEncoding(o
);
2580 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],o
);
2581 if (retval
== DICT_ERR
) {
2582 dictReplace(c
->db
->dict
,c
->argv
[1],o
);
2583 removeExpire(c
->db
,c
->argv
[1]);
2585 incrRefCount(c
->argv
[1]);
2588 addReply(c
,shared
.colon
);
2590 addReply(c
,shared
.crlf
);
2593 static void incrCommand(redisClient
*c
) {
2594 incrDecrCommand(c
,1);
2597 static void decrCommand(redisClient
*c
) {
2598 incrDecrCommand(c
,-1);
2601 static void incrbyCommand(redisClient
*c
) {
2602 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
2603 incrDecrCommand(c
,incr
);
2606 static void decrbyCommand(redisClient
*c
) {
2607 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
2608 incrDecrCommand(c
,-incr
);
2611 /* ========================= Type agnostic commands ========================= */
2613 static void delCommand(redisClient
*c
) {
2616 for (j
= 1; j
< c
->argc
; j
++) {
2617 if (deleteKey(c
->db
,c
->argv
[j
])) {
2624 addReply(c
,shared
.czero
);
2627 addReply(c
,shared
.cone
);
2630 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",deleted
));
2635 static void existsCommand(redisClient
*c
) {
2636 addReply(c
,lookupKeyRead(c
->db
,c
->argv
[1]) ? shared
.cone
: shared
.czero
);
2639 static void selectCommand(redisClient
*c
) {
2640 int id
= atoi(c
->argv
[1]->ptr
);
2642 if (selectDb(c
,id
) == REDIS_ERR
) {
2643 addReplySds(c
,sdsnew("-ERR invalid DB index\r\n"));
2645 addReply(c
,shared
.ok
);
2649 static void randomkeyCommand(redisClient
*c
) {
2653 de
= dictGetRandomKey(c
->db
->dict
);
2654 if (!de
|| expireIfNeeded(c
->db
,dictGetEntryKey(de
)) == 0) break;
2657 addReply(c
,shared
.plus
);
2658 addReply(c
,shared
.crlf
);
2660 addReply(c
,shared
.plus
);
2661 addReply(c
,dictGetEntryKey(de
));
2662 addReply(c
,shared
.crlf
);
2666 static void keysCommand(redisClient
*c
) {
2669 sds pattern
= c
->argv
[1]->ptr
;
2670 int plen
= sdslen(pattern
);
2671 int numkeys
= 0, keyslen
= 0;
2672 robj
*lenobj
= createObject(REDIS_STRING
,NULL
);
2674 di
= dictGetIterator(c
->db
->dict
);
2675 if (!di
) oom("dictGetIterator");
2677 decrRefCount(lenobj
);
2678 while((de
= dictNext(di
)) != NULL
) {
2679 robj
*keyobj
= dictGetEntryKey(de
);
2681 sds key
= keyobj
->ptr
;
2682 if ((pattern
[0] == '*' && pattern
[1] == '\0') ||
2683 stringmatchlen(pattern
,plen
,key
,sdslen(key
),0)) {
2684 if (expireIfNeeded(c
->db
,keyobj
) == 0) {
2686 addReply(c
,shared
.space
);
2689 keyslen
+= sdslen(key
);
2693 dictReleaseIterator(di
);
2694 lenobj
->ptr
= sdscatprintf(sdsempty(),"$%lu\r\n",keyslen
+(numkeys
? (numkeys
-1) : 0));
2695 addReply(c
,shared
.crlf
);
2698 static void dbsizeCommand(redisClient
*c
) {
2700 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c
->db
->dict
)));
2703 static void lastsaveCommand(redisClient
*c
) {
2705 sdscatprintf(sdsempty(),":%lu\r\n",server
.lastsave
));
2708 static void typeCommand(redisClient
*c
) {
2712 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2717 case REDIS_STRING
: type
= "+string"; break;
2718 case REDIS_LIST
: type
= "+list"; break;
2719 case REDIS_SET
: type
= "+set"; break;
2720 default: type
= "unknown"; break;
2723 addReplySds(c
,sdsnew(type
));
2724 addReply(c
,shared
.crlf
);
2727 static void saveCommand(redisClient
*c
) {
2728 if (server
.bgsaveinprogress
) {
2729 addReplySds(c
,sdsnew("-ERR background save in progress\r\n"));
2732 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
2733 addReply(c
,shared
.ok
);
2735 addReply(c
,shared
.err
);
2739 static void bgsaveCommand(redisClient
*c
) {
2740 if (server
.bgsaveinprogress
) {
2741 addReplySds(c
,sdsnew("-ERR background save already in progress\r\n"));
2744 if (rdbSaveBackground(server
.dbfilename
) == REDIS_OK
) {
2745 addReply(c
,shared
.ok
);
2747 addReply(c
,shared
.err
);
2751 static void shutdownCommand(redisClient
*c
) {
2752 redisLog(REDIS_WARNING
,"User requested shutdown, saving DB...");
2753 /* Kill the saving child if there is a background saving in progress.
2754 We want to avoid race conditions, for instance our saving child may
2755 overwrite the synchronous saving did by SHUTDOWN. */
2756 if (server
.bgsaveinprogress
) {
2757 redisLog(REDIS_WARNING
,"There is a live saving child. Killing it!");
2758 kill(server
.bgsavechildpid
,SIGKILL
);
2759 rdbRemoveTempFile(server
.bgsavechildpid
);
2762 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
2763 if (server
.daemonize
)
2764 unlink(server
.pidfile
);
2765 redisLog(REDIS_WARNING
,"%zu bytes used at exit",zmalloc_used_memory());
2766 redisLog(REDIS_WARNING
,"Server exit now, bye bye...");
2769 /* Ooops.. error saving! The best we can do is to continue operating.
2770 * Note that if there was a background saving process, in the next
2771 * cron() Redis will be notified that the background saving aborted,
2772 * handling special stuff like slaves pending for synchronization... */
2773 redisLog(REDIS_WARNING
,"Error trying to save the DB, can't exit");
2774 addReplySds(c
,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
2778 static void renameGenericCommand(redisClient
*c
, int nx
) {
2781 /* To use the same key as src and dst is probably an error */
2782 if (sdscmp(c
->argv
[1]->ptr
,c
->argv
[2]->ptr
) == 0) {
2783 addReply(c
,shared
.sameobjecterr
);
2787 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2789 addReply(c
,shared
.nokeyerr
);
2793 deleteIfVolatile(c
->db
,c
->argv
[2]);
2794 if (dictAdd(c
->db
->dict
,c
->argv
[2],o
) == DICT_ERR
) {
2797 addReply(c
,shared
.czero
);
2800 dictReplace(c
->db
->dict
,c
->argv
[2],o
);
2802 incrRefCount(c
->argv
[2]);
2804 deleteKey(c
->db
,c
->argv
[1]);
2806 addReply(c
,nx
? shared
.cone
: shared
.ok
);
2809 static void renameCommand(redisClient
*c
) {
2810 renameGenericCommand(c
,0);
2813 static void renamenxCommand(redisClient
*c
) {
2814 renameGenericCommand(c
,1);
2817 static void moveCommand(redisClient
*c
) {
2822 /* Obtain source and target DB pointers */
2825 if (selectDb(c
,atoi(c
->argv
[2]->ptr
)) == REDIS_ERR
) {
2826 addReply(c
,shared
.outofrangeerr
);
2830 selectDb(c
,srcid
); /* Back to the source DB */
2832 /* If the user is moving using as target the same
2833 * DB as the source DB it is probably an error. */
2835 addReply(c
,shared
.sameobjecterr
);
2839 /* Check if the element exists and get a reference */
2840 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2842 addReply(c
,shared
.czero
);
2846 /* Try to add the element to the target DB */
2847 deleteIfVolatile(dst
,c
->argv
[1]);
2848 if (dictAdd(dst
->dict
,c
->argv
[1],o
) == DICT_ERR
) {
2849 addReply(c
,shared
.czero
);
2852 incrRefCount(c
->argv
[1]);
2855 /* OK! key moved, free the entry in the source DB */
2856 deleteKey(src
,c
->argv
[1]);
2858 addReply(c
,shared
.cone
);
2861 /* =================================== Lists ================================ */
2862 static void pushGenericCommand(redisClient
*c
, int where
) {
2866 lobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2868 lobj
= createListObject();
2870 if (where
== REDIS_HEAD
) {
2871 if (!listAddNodeHead(list
,c
->argv
[2])) oom("listAddNodeHead");
2873 if (!listAddNodeTail(list
,c
->argv
[2])) oom("listAddNodeTail");
2875 dictAdd(c
->db
->dict
,c
->argv
[1],lobj
);
2876 incrRefCount(c
->argv
[1]);
2877 incrRefCount(c
->argv
[2]);
2879 if (lobj
->type
!= REDIS_LIST
) {
2880 addReply(c
,shared
.wrongtypeerr
);
2884 if (where
== REDIS_HEAD
) {
2885 if (!listAddNodeHead(list
,c
->argv
[2])) oom("listAddNodeHead");
2887 if (!listAddNodeTail(list
,c
->argv
[2])) oom("listAddNodeTail");
2889 incrRefCount(c
->argv
[2]);
2892 addReply(c
,shared
.ok
);
2895 static void lpushCommand(redisClient
*c
) {
2896 pushGenericCommand(c
,REDIS_HEAD
);
2899 static void rpushCommand(redisClient
*c
) {
2900 pushGenericCommand(c
,REDIS_TAIL
);
2903 static void llenCommand(redisClient
*c
) {
2907 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2909 addReply(c
,shared
.czero
);
2912 if (o
->type
!= REDIS_LIST
) {
2913 addReply(c
,shared
.wrongtypeerr
);
2916 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",listLength(l
)));
2921 static void lindexCommand(redisClient
*c
) {
2923 int index
= atoi(c
->argv
[2]->ptr
);
2925 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2927 addReply(c
,shared
.nullbulk
);
2929 if (o
->type
!= REDIS_LIST
) {
2930 addReply(c
,shared
.wrongtypeerr
);
2932 list
*list
= o
->ptr
;
2935 ln
= listIndex(list
, index
);
2937 addReply(c
,shared
.nullbulk
);
2939 robj
*ele
= listNodeValue(ln
);
2940 addReplyBulkLen(c
,ele
);
2942 addReply(c
,shared
.crlf
);
2948 static void lsetCommand(redisClient
*c
) {
2950 int index
= atoi(c
->argv
[2]->ptr
);
2952 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2954 addReply(c
,shared
.nokeyerr
);
2956 if (o
->type
!= REDIS_LIST
) {
2957 addReply(c
,shared
.wrongtypeerr
);
2959 list
*list
= o
->ptr
;
2962 ln
= listIndex(list
, index
);
2964 addReply(c
,shared
.outofrangeerr
);
2966 robj
*ele
= listNodeValue(ln
);
2969 listNodeValue(ln
) = c
->argv
[3];
2970 incrRefCount(c
->argv
[3]);
2971 addReply(c
,shared
.ok
);
2978 static void popGenericCommand(redisClient
*c
, int where
) {
2981 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2983 addReply(c
,shared
.nullbulk
);
2985 if (o
->type
!= REDIS_LIST
) {
2986 addReply(c
,shared
.wrongtypeerr
);
2988 list
*list
= o
->ptr
;
2991 if (where
== REDIS_HEAD
)
2992 ln
= listFirst(list
);
2994 ln
= listLast(list
);
2997 addReply(c
,shared
.nullbulk
);
2999 robj
*ele
= listNodeValue(ln
);
3000 addReplyBulkLen(c
,ele
);
3002 addReply(c
,shared
.crlf
);
3003 listDelNode(list
,ln
);
3010 static void lpopCommand(redisClient
*c
) {
3011 popGenericCommand(c
,REDIS_HEAD
);
3014 static void rpopCommand(redisClient
*c
) {
3015 popGenericCommand(c
,REDIS_TAIL
);
3018 static void lrangeCommand(redisClient
*c
) {
3020 int start
= atoi(c
->argv
[2]->ptr
);
3021 int end
= atoi(c
->argv
[3]->ptr
);
3023 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3025 addReply(c
,shared
.nullmultibulk
);
3027 if (o
->type
!= REDIS_LIST
) {
3028 addReply(c
,shared
.wrongtypeerr
);
3030 list
*list
= o
->ptr
;
3032 int llen
= listLength(list
);
3036 /* convert negative indexes */
3037 if (start
< 0) start
= llen
+start
;
3038 if (end
< 0) end
= llen
+end
;
3039 if (start
< 0) start
= 0;
3040 if (end
< 0) end
= 0;
3042 /* indexes sanity checks */
3043 if (start
> end
|| start
>= llen
) {
3044 /* Out of range start or start > end result in empty list */
3045 addReply(c
,shared
.emptymultibulk
);
3048 if (end
>= llen
) end
= llen
-1;
3049 rangelen
= (end
-start
)+1;
3051 /* Return the result in form of a multi-bulk reply */
3052 ln
= listIndex(list
, start
);
3053 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",rangelen
));
3054 for (j
= 0; j
< rangelen
; j
++) {
3055 ele
= listNodeValue(ln
);
3056 addReplyBulkLen(c
,ele
);
3058 addReply(c
,shared
.crlf
);
3065 static void ltrimCommand(redisClient
*c
) {
3067 int start
= atoi(c
->argv
[2]->ptr
);
3068 int end
= atoi(c
->argv
[3]->ptr
);
3070 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3072 addReply(c
,shared
.nokeyerr
);
3074 if (o
->type
!= REDIS_LIST
) {
3075 addReply(c
,shared
.wrongtypeerr
);
3077 list
*list
= o
->ptr
;
3079 int llen
= listLength(list
);
3080 int j
, ltrim
, rtrim
;
3082 /* convert negative indexes */
3083 if (start
< 0) start
= llen
+start
;
3084 if (end
< 0) end
= llen
+end
;
3085 if (start
< 0) start
= 0;
3086 if (end
< 0) end
= 0;
3088 /* indexes sanity checks */
3089 if (start
> end
|| start
>= llen
) {
3090 /* Out of range start or start > end result in empty list */
3094 if (end
>= llen
) end
= llen
-1;
3099 /* Remove list elements to perform the trim */
3100 for (j
= 0; j
< ltrim
; j
++) {
3101 ln
= listFirst(list
);
3102 listDelNode(list
,ln
);
3104 for (j
= 0; j
< rtrim
; j
++) {
3105 ln
= listLast(list
);
3106 listDelNode(list
,ln
);
3109 addReply(c
,shared
.ok
);
3114 static void lremCommand(redisClient
*c
) {
3117 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3119 addReply(c
,shared
.czero
);
3121 if (o
->type
!= REDIS_LIST
) {
3122 addReply(c
,shared
.wrongtypeerr
);
3124 list
*list
= o
->ptr
;
3125 listNode
*ln
, *next
;
3126 int toremove
= atoi(c
->argv
[2]->ptr
);
3131 toremove
= -toremove
;
3134 ln
= fromtail
? list
->tail
: list
->head
;
3136 robj
*ele
= listNodeValue(ln
);
3138 next
= fromtail
? ln
->prev
: ln
->next
;
3139 if (compareStringObjects(ele
,c
->argv
[3]) == 0) {
3140 listDelNode(list
,ln
);
3143 if (toremove
&& removed
== toremove
) break;
3147 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",removed
));
3152 /* ==================================== Sets ================================ */
3154 static void saddCommand(redisClient
*c
) {
3157 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3159 set
= createSetObject();
3160 dictAdd(c
->db
->dict
,c
->argv
[1],set
);
3161 incrRefCount(c
->argv
[1]);
3163 if (set
->type
!= REDIS_SET
) {
3164 addReply(c
,shared
.wrongtypeerr
);
3168 if (dictAdd(set
->ptr
,c
->argv
[2],NULL
) == DICT_OK
) {
3169 incrRefCount(c
->argv
[2]);
3171 addReply(c
,shared
.cone
);
3173 addReply(c
,shared
.czero
);
3177 static void sremCommand(redisClient
*c
) {
3180 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3182 addReply(c
,shared
.czero
);
3184 if (set
->type
!= REDIS_SET
) {
3185 addReply(c
,shared
.wrongtypeerr
);
3188 if (dictDelete(set
->ptr
,c
->argv
[2]) == DICT_OK
) {
3190 if (htNeedsResize(set
->ptr
)) dictResize(set
->ptr
);
3191 addReply(c
,shared
.cone
);
3193 addReply(c
,shared
.czero
);
3198 static void smoveCommand(redisClient
*c
) {
3199 robj
*srcset
, *dstset
;
3201 srcset
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3202 dstset
= lookupKeyWrite(c
->db
,c
->argv
[2]);
3204 /* If the source key does not exist return 0, if it's of the wrong type
3206 if (srcset
== NULL
|| srcset
->type
!= REDIS_SET
) {
3207 addReply(c
, srcset
? shared
.wrongtypeerr
: shared
.czero
);
3210 /* Error if the destination key is not a set as well */
3211 if (dstset
&& dstset
->type
!= REDIS_SET
) {
3212 addReply(c
,shared
.wrongtypeerr
);
3215 /* Remove the element from the source set */
3216 if (dictDelete(srcset
->ptr
,c
->argv
[3]) == DICT_ERR
) {
3217 /* Key not found in the src set! return zero */
3218 addReply(c
,shared
.czero
);
3222 /* Add the element to the destination set */
3224 dstset
= createSetObject();
3225 dictAdd(c
->db
->dict
,c
->argv
[2],dstset
);
3226 incrRefCount(c
->argv
[2]);
3228 if (dictAdd(dstset
->ptr
,c
->argv
[3],NULL
) == DICT_OK
)
3229 incrRefCount(c
->argv
[3]);
3230 addReply(c
,shared
.cone
);
3233 static void sismemberCommand(redisClient
*c
) {
3236 set
= lookupKeyRead(c
->db
,c
->argv
[1]);
3238 addReply(c
,shared
.czero
);
3240 if (set
->type
!= REDIS_SET
) {
3241 addReply(c
,shared
.wrongtypeerr
);
3244 if (dictFind(set
->ptr
,c
->argv
[2]))
3245 addReply(c
,shared
.cone
);
3247 addReply(c
,shared
.czero
);
3251 static void scardCommand(redisClient
*c
) {
3255 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3257 addReply(c
,shared
.czero
);
3260 if (o
->type
!= REDIS_SET
) {
3261 addReply(c
,shared
.wrongtypeerr
);
3264 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",
3270 static void spopCommand(redisClient
*c
) {
3274 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3276 addReply(c
,shared
.nullbulk
);
3278 if (set
->type
!= REDIS_SET
) {
3279 addReply(c
,shared
.wrongtypeerr
);
3282 de
= dictGetRandomKey(set
->ptr
);
3284 addReply(c
,shared
.nullbulk
);
3286 robj
*ele
= dictGetEntryKey(de
);
3288 addReplyBulkLen(c
,ele
);
3290 addReply(c
,shared
.crlf
);
3291 dictDelete(set
->ptr
,ele
);
3292 if (htNeedsResize(set
->ptr
)) dictResize(set
->ptr
);
3298 static int qsortCompareSetsByCardinality(const void *s1
, const void *s2
) {
3299 dict
**d1
= (void*) s1
, **d2
= (void*) s2
;
3301 return dictSize(*d1
)-dictSize(*d2
);
3304 static void sinterGenericCommand(redisClient
*c
, robj
**setskeys
, int setsnum
, robj
*dstkey
) {
3305 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
3308 robj
*lenobj
= NULL
, *dstset
= NULL
;
3309 int j
, cardinality
= 0;
3311 if (!dv
) oom("sinterGenericCommand");
3312 for (j
= 0; j
< setsnum
; j
++) {
3316 lookupKeyWrite(c
->db
,setskeys
[j
]) :
3317 lookupKeyRead(c
->db
,setskeys
[j
]);
3321 deleteKey(c
->db
,dstkey
);
3322 addReply(c
,shared
.ok
);
3324 addReply(c
,shared
.nullmultibulk
);
3328 if (setobj
->type
!= REDIS_SET
) {
3330 addReply(c
,shared
.wrongtypeerr
);
3333 dv
[j
] = setobj
->ptr
;
3335 /* Sort sets from the smallest to largest, this will improve our
3336 * algorithm's performace */
3337 qsort(dv
,setsnum
,sizeof(dict
*),qsortCompareSetsByCardinality
);
3339 /* The first thing we should output is the total number of elements...
3340 * since this is a multi-bulk write, but at this stage we don't know
3341 * the intersection set size, so we use a trick, append an empty object
3342 * to the output list and save the pointer to later modify it with the
3345 lenobj
= createObject(REDIS_STRING
,NULL
);
3347 decrRefCount(lenobj
);
3349 /* If we have a target key where to store the resulting set
3350 * create this key with an empty set inside */
3351 dstset
= createSetObject();
3354 /* Iterate all the elements of the first (smallest) set, and test
3355 * the element against all the other sets, if at least one set does
3356 * not include the element it is discarded */
3357 di
= dictGetIterator(dv
[0]);
3358 if (!di
) oom("dictGetIterator");
3360 while((de
= dictNext(di
)) != NULL
) {
3363 for (j
= 1; j
< setsnum
; j
++)
3364 if (dictFind(dv
[j
],dictGetEntryKey(de
)) == NULL
) break;
3366 continue; /* at least one set does not contain the member */
3367 ele
= dictGetEntryKey(de
);
3369 addReplyBulkLen(c
,ele
);
3371 addReply(c
,shared
.crlf
);
3374 dictAdd(dstset
->ptr
,ele
,NULL
);
3378 dictReleaseIterator(di
);
3381 /* Store the resulting set into the target */
3382 deleteKey(c
->db
,dstkey
);
3383 dictAdd(c
->db
->dict
,dstkey
,dstset
);
3384 incrRefCount(dstkey
);
3388 lenobj
->ptr
= sdscatprintf(sdsempty(),"*%d\r\n",cardinality
);
3390 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",
3391 dictSize((dict
*)dstset
->ptr
)));
3397 static void sinterCommand(redisClient
*c
) {
3398 sinterGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
);
3401 static void sinterstoreCommand(redisClient
*c
) {
3402 sinterGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1]);
3405 #define REDIS_OP_UNION 0
3406 #define REDIS_OP_DIFF 1
3408 static void sunionDiffGenericCommand(redisClient
*c
, robj
**setskeys
, int setsnum
, robj
*dstkey
, int op
) {
3409 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
3412 robj
*dstset
= NULL
;
3413 int j
, cardinality
= 0;
3415 if (!dv
) oom("sunionDiffGenericCommand");
3416 for (j
= 0; j
< setsnum
; j
++) {
3420 lookupKeyWrite(c
->db
,setskeys
[j
]) :
3421 lookupKeyRead(c
->db
,setskeys
[j
]);
3426 if (setobj
->type
!= REDIS_SET
) {
3428 addReply(c
,shared
.wrongtypeerr
);
3431 dv
[j
] = setobj
->ptr
;
3434 /* We need a temp set object to store our union. If the dstkey
3435 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
3436 * this set object will be the resulting object to set into the target key*/
3437 dstset
= createSetObject();
3439 /* Iterate all the elements of all the sets, add every element a single
3440 * time to the result set */
3441 for (j
= 0; j
< setsnum
; j
++) {
3442 if (op
== REDIS_OP_DIFF
&& j
== 0 && !dv
[j
]) break; /* result set is empty */
3443 if (!dv
[j
]) continue; /* non existing keys are like empty sets */
3445 di
= dictGetIterator(dv
[j
]);
3446 if (!di
) oom("dictGetIterator");
3448 while((de
= dictNext(di
)) != NULL
) {
3451 /* dictAdd will not add the same element multiple times */
3452 ele
= dictGetEntryKey(de
);
3453 if (op
== REDIS_OP_UNION
|| j
== 0) {
3454 if (dictAdd(dstset
->ptr
,ele
,NULL
) == DICT_OK
) {
3458 } else if (op
== REDIS_OP_DIFF
) {
3459 if (dictDelete(dstset
->ptr
,ele
) == DICT_OK
) {
3464 dictReleaseIterator(di
);
3466 if (op
== REDIS_OP_DIFF
&& cardinality
== 0) break; /* result set is empty */
3469 /* Output the content of the resulting set, if not in STORE mode */
3471 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",cardinality
));
3472 di
= dictGetIterator(dstset
->ptr
);
3473 if (!di
) oom("dictGetIterator");
3474 while((de
= dictNext(di
)) != NULL
) {
3477 ele
= dictGetEntryKey(de
);
3478 addReplyBulkLen(c
,ele
);
3480 addReply(c
,shared
.crlf
);
3482 dictReleaseIterator(di
);
3484 /* If we have a target key where to store the resulting set
3485 * create this key with the result set inside */
3486 deleteKey(c
->db
,dstkey
);
3487 dictAdd(c
->db
->dict
,dstkey
,dstset
);
3488 incrRefCount(dstkey
);
3493 decrRefCount(dstset
);
3495 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",
3496 dictSize((dict
*)dstset
->ptr
)));
3502 static void sunionCommand(redisClient
*c
) {
3503 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_UNION
);
3506 static void sunionstoreCommand(redisClient
*c
) {
3507 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_UNION
);
3510 static void sdiffCommand(redisClient
*c
) {
3511 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_DIFF
);
3514 static void sdiffstoreCommand(redisClient
*c
) {
3515 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_DIFF
);
3518 static void flushdbCommand(redisClient
*c
) {
3519 server
.dirty
+= dictSize(c
->db
->dict
);
3520 dictEmpty(c
->db
->dict
);
3521 dictEmpty(c
->db
->expires
);
3522 addReply(c
,shared
.ok
);
3525 static void flushallCommand(redisClient
*c
) {
3526 server
.dirty
+= emptyDb();
3527 addReply(c
,shared
.ok
);
3528 rdbSave(server
.dbfilename
);
3532 static redisSortOperation
*createSortOperation(int type
, robj
*pattern
) {
3533 redisSortOperation
*so
= zmalloc(sizeof(*so
));
3534 if (!so
) oom("createSortOperation");
3536 so
->pattern
= pattern
;
3540 /* Return the value associated to the key with a name obtained
3541 * substituting the first occurence of '*' in 'pattern' with 'subst' */
3542 static robj
*lookupKeyByPattern(redisDb
*db
, robj
*pattern
, robj
*subst
) {
3546 int prefixlen
, sublen
, postfixlen
;
3547 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
3550 unsigned short free
;
3551 unsigned short _len
; /* not used here */
3552 char buf
[REDIS_SORTKEY_MAX
+1];
3555 if (subst
->encoding
== REDIS_ENCODING_RAW
)
3556 incrRefCount(subst
);
3558 subst
= getDecodedObject(subst
);
3561 spat
= pattern
->ptr
;
3563 if (sdslen(spat
)+sdslen(ssub
)-1 > REDIS_SORTKEY_MAX
) return NULL
;
3564 p
= strchr(spat
,'*');
3565 if (!p
) return NULL
;
3568 sublen
= sdslen(ssub
);
3569 postfixlen
= sdslen(spat
)-(prefixlen
+1);
3570 memcpy(keyname
.buf
,spat
,prefixlen
);
3571 memcpy(keyname
.buf
+prefixlen
,ssub
,sublen
);
3572 memcpy(keyname
.buf
+prefixlen
+sublen
,p
+1,postfixlen
);
3573 keyname
.buf
[prefixlen
+sublen
+postfixlen
] = '\0';
3574 keyname
.len
= prefixlen
+sublen
+postfixlen
;
3575 keyname
._len
= USHRT_MAX
;
3577 keyobj
.refcount
= 1;
3578 keyobj
.type
= REDIS_STRING
;
3579 keyobj
.ptr
= ((char*)&keyname
)+(sizeof(long)*2);
3581 decrRefCount(subst
);
3583 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
3584 return lookupKeyRead(db
,&keyobj
);
3587 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
3588 * the additional parameter is not standard but a BSD-specific we have to
3589 * pass sorting parameters via the global 'server' structure */
3590 static int sortCompare(const void *s1
, const void *s2
) {
3591 const redisSortObject
*so1
= s1
, *so2
= s2
;
3594 if (!server
.sort_alpha
) {
3595 /* Numeric sorting. Here it's trivial as we precomputed scores */
3596 if (so1
->u
.score
> so2
->u
.score
) {
3598 } else if (so1
->u
.score
< so2
->u
.score
) {
3604 /* Alphanumeric sorting */
3605 if (server
.sort_bypattern
) {
3606 if (!so1
->u
.cmpobj
|| !so2
->u
.cmpobj
) {
3607 /* At least one compare object is NULL */
3608 if (so1
->u
.cmpobj
== so2
->u
.cmpobj
)
3610 else if (so1
->u
.cmpobj
== NULL
)
3615 /* We have both the objects, use strcoll */
3616 cmp
= strcoll(so1
->u
.cmpobj
->ptr
,so2
->u
.cmpobj
->ptr
);
3619 /* Compare elements directly */
3620 if (so1
->obj
->encoding
== REDIS_ENCODING_RAW
&&
3621 so2
->obj
->encoding
== REDIS_ENCODING_RAW
) {
3622 cmp
= strcoll(so1
->obj
->ptr
,so2
->obj
->ptr
);
3626 dec1
= so1
->obj
->encoding
== REDIS_ENCODING_RAW
?
3627 so1
->obj
: getDecodedObject(so1
->obj
);
3628 dec2
= so2
->obj
->encoding
== REDIS_ENCODING_RAW
?
3629 so2
->obj
: getDecodedObject(so2
->obj
);
3630 cmp
= strcoll(dec1
->ptr
,dec2
->ptr
);
3631 if (dec1
!= so1
->obj
) decrRefCount(dec1
);
3632 if (dec2
!= so2
->obj
) decrRefCount(dec2
);
3636 return server
.sort_desc
? -cmp
: cmp
;
3639 /* The SORT command is the most complex command in Redis. Warning: this code
3640 * is optimized for speed and a bit less for readability */
3641 static void sortCommand(redisClient
*c
) {
3644 int desc
= 0, alpha
= 0;
3645 int limit_start
= 0, limit_count
= -1, start
, end
;
3646 int j
, dontsort
= 0, vectorlen
;
3647 int getop
= 0; /* GET operation counter */
3648 robj
*sortval
, *sortby
= NULL
;
3649 redisSortObject
*vector
; /* Resulting vector to sort */
3651 /* Lookup the key to sort. It must be of the right types */
3652 sortval
= lookupKeyRead(c
->db
,c
->argv
[1]);
3653 if (sortval
== NULL
) {
3654 addReply(c
,shared
.nokeyerr
);
3657 if (sortval
->type
!= REDIS_SET
&& sortval
->type
!= REDIS_LIST
) {
3658 addReply(c
,shared
.wrongtypeerr
);
3662 /* Create a list of operations to perform for every sorted element.
3663 * Operations can be GET/DEL/INCR/DECR */
3664 operations
= listCreate();
3665 listSetFreeMethod(operations
,zfree
);
3668 /* Now we need to protect sortval incrementing its count, in the future
3669 * SORT may have options able to overwrite/delete keys during the sorting
3670 * and the sorted key itself may get destroied */
3671 incrRefCount(sortval
);
3673 /* The SORT command has an SQL-alike syntax, parse it */
3674 while(j
< c
->argc
) {
3675 int leftargs
= c
->argc
-j
-1;
3676 if (!strcasecmp(c
->argv
[j
]->ptr
,"asc")) {
3678 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"desc")) {
3680 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"alpha")) {
3682 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"limit") && leftargs
>= 2) {
3683 limit_start
= atoi(c
->argv
[j
+1]->ptr
);
3684 limit_count
= atoi(c
->argv
[j
+2]->ptr
);
3686 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"by") && leftargs
>= 1) {
3687 sortby
= c
->argv
[j
+1];
3688 /* If the BY pattern does not contain '*', i.e. it is constant,
3689 * we don't need to sort nor to lookup the weight keys. */
3690 if (strchr(c
->argv
[j
+1]->ptr
,'*') == NULL
) dontsort
= 1;
3692 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"get") && leftargs
>= 1) {
3693 listAddNodeTail(operations
,createSortOperation(
3694 REDIS_SORT_GET
,c
->argv
[j
+1]));
3697 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"del") && leftargs
>= 1) {
3698 listAddNodeTail(operations
,createSortOperation(
3699 REDIS_SORT_DEL
,c
->argv
[j
+1]));
3701 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"incr") && leftargs
>= 1) {
3702 listAddNodeTail(operations
,createSortOperation(
3703 REDIS_SORT_INCR
,c
->argv
[j
+1]));
3705 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"get") && leftargs
>= 1) {
3706 listAddNodeTail(operations
,createSortOperation(
3707 REDIS_SORT_DECR
,c
->argv
[j
+1]));
3710 decrRefCount(sortval
);
3711 listRelease(operations
);
3712 addReply(c
,shared
.syntaxerr
);
3718 /* Load the sorting vector with all the objects to sort */
3719 vectorlen
= (sortval
->type
== REDIS_LIST
) ?
3720 listLength((list
*)sortval
->ptr
) :
3721 dictSize((dict
*)sortval
->ptr
);
3722 vector
= zmalloc(sizeof(redisSortObject
)*vectorlen
);
3723 if (!vector
) oom("allocating objects vector for SORT");
3725 if (sortval
->type
== REDIS_LIST
) {
3726 list
*list
= sortval
->ptr
;
3730 while((ln
= listYield(list
))) {
3731 robj
*ele
= ln
->value
;
3732 vector
[j
].obj
= ele
;
3733 vector
[j
].u
.score
= 0;
3734 vector
[j
].u
.cmpobj
= NULL
;
3738 dict
*set
= sortval
->ptr
;
3742 di
= dictGetIterator(set
);
3743 if (!di
) oom("dictGetIterator");
3744 while((setele
= dictNext(di
)) != NULL
) {
3745 vector
[j
].obj
= dictGetEntryKey(setele
);
3746 vector
[j
].u
.score
= 0;
3747 vector
[j
].u
.cmpobj
= NULL
;
3750 dictReleaseIterator(di
);
3752 assert(j
== vectorlen
);
3754 /* Now it's time to load the right scores in the sorting vector */
3755 if (dontsort
== 0) {
3756 for (j
= 0; j
< vectorlen
; j
++) {
3760 byval
= lookupKeyByPattern(c
->db
,sortby
,vector
[j
].obj
);
3761 if (!byval
|| byval
->type
!= REDIS_STRING
) continue;
3763 if (byval
->encoding
== REDIS_ENCODING_RAW
) {
3764 vector
[j
].u
.cmpobj
= byval
;
3765 incrRefCount(byval
);
3767 vector
[j
].u
.cmpobj
= getDecodedObject(byval
);
3770 if (byval
->encoding
== REDIS_ENCODING_RAW
) {
3771 vector
[j
].u
.score
= strtod(byval
->ptr
,NULL
);
3773 if (byval
->encoding
== REDIS_ENCODING_INT
)
3774 vector
[j
].u
.score
= (long)byval
->ptr
;
3781 if (vector
[j
].obj
->encoding
== REDIS_ENCODING_RAW
)
3782 vector
[j
].u
.score
= strtod(vector
[j
].obj
->ptr
,NULL
);
3784 if (vector
[j
].obj
->encoding
== REDIS_ENCODING_INT
)
3785 vector
[j
].u
.score
= (long) vector
[j
].obj
->ptr
;
3794 /* We are ready to sort the vector... perform a bit of sanity check
3795 * on the LIMIT option too. We'll use a partial version of quicksort. */
3796 start
= (limit_start
< 0) ? 0 : limit_start
;
3797 end
= (limit_count
< 0) ? vectorlen
-1 : start
+limit_count
-1;
3798 if (start
>= vectorlen
) {
3799 start
= vectorlen
-1;
3802 if (end
>= vectorlen
) end
= vectorlen
-1;
3804 if (dontsort
== 0) {
3805 server
.sort_desc
= desc
;
3806 server
.sort_alpha
= alpha
;
3807 server
.sort_bypattern
= sortby
? 1 : 0;
3808 if (sortby
&& (start
!= 0 || end
!= vectorlen
-1))
3809 pqsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
, start
,end
);
3811 qsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
);
3814 /* Send command output to the output buffer, performing the specified
3815 * GET/DEL/INCR/DECR operations if any. */
3816 outputlen
= getop
? getop
*(end
-start
+1) : end
-start
+1;
3817 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",outputlen
));
3818 for (j
= start
; j
<= end
; j
++) {
3821 addReplyBulkLen(c
,vector
[j
].obj
);
3822 addReply(c
,vector
[j
].obj
);
3823 addReply(c
,shared
.crlf
);
3825 listRewind(operations
);
3826 while((ln
= listYield(operations
))) {
3827 redisSortOperation
*sop
= ln
->value
;
3828 robj
*val
= lookupKeyByPattern(c
->db
,sop
->pattern
,
3831 if (sop
->type
== REDIS_SORT_GET
) {
3832 if (!val
|| val
->type
!= REDIS_STRING
) {
3833 addReply(c
,shared
.nullbulk
);
3835 addReplyBulkLen(c
,val
);
3837 addReply(c
,shared
.crlf
);
3839 } else if (sop
->type
== REDIS_SORT_DEL
) {
3846 decrRefCount(sortval
);
3847 listRelease(operations
);
3848 for (j
= 0; j
< vectorlen
; j
++) {
3849 if (sortby
&& alpha
&& vector
[j
].u
.cmpobj
)
3850 decrRefCount(vector
[j
].u
.cmpobj
);
3855 static void infoCommand(redisClient
*c
) {
3857 time_t uptime
= time(NULL
)-server
.stat_starttime
;
3860 info
= sdscatprintf(sdsempty(),
3861 "redis_version:%s\r\n"
3862 "uptime_in_seconds:%d\r\n"
3863 "uptime_in_days:%d\r\n"
3864 "connected_clients:%d\r\n"
3865 "connected_slaves:%d\r\n"
3866 "used_memory:%zu\r\n"
3867 "changes_since_last_save:%lld\r\n"
3868 "bgsave_in_progress:%d\r\n"
3869 "last_save_time:%d\r\n"
3870 "total_connections_received:%lld\r\n"
3871 "total_commands_processed:%lld\r\n"
3876 listLength(server
.clients
)-listLength(server
.slaves
),
3877 listLength(server
.slaves
),
3880 server
.bgsaveinprogress
,
3882 server
.stat_numconnections
,
3883 server
.stat_numcommands
,
3884 server
.masterhost
== NULL
? "master" : "slave"
3886 if (server
.masterhost
) {
3887 info
= sdscatprintf(info
,
3888 "master_host:%s\r\n"
3889 "master_port:%d\r\n"
3890 "master_link_status:%s\r\n"
3891 "master_last_io_seconds_ago:%d\r\n"
3894 (server
.replstate
== REDIS_REPL_CONNECTED
) ?
3896 (int)(time(NULL
)-server
.master
->lastinteraction
)
3899 for (j
= 0; j
< server
.dbnum
; j
++) {
3900 long long keys
, vkeys
;
3902 keys
= dictSize(server
.db
[j
].dict
);
3903 vkeys
= dictSize(server
.db
[j
].expires
);
3904 if (keys
|| vkeys
) {
3905 info
= sdscatprintf(info
, "db%d: keys=%lld,expires=%lld\r\n",
3909 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info
)));
3910 addReplySds(c
,info
);
3911 addReply(c
,shared
.crlf
);
3914 static void monitorCommand(redisClient
*c
) {
3915 /* ignore MONITOR if aleady slave or in monitor mode */
3916 if (c
->flags
& REDIS_SLAVE
) return;
3918 c
->flags
|= (REDIS_SLAVE
|REDIS_MONITOR
);
3920 if (!listAddNodeTail(server
.monitors
,c
)) oom("listAddNodeTail");
3921 addReply(c
,shared
.ok
);
3924 /* ================================= Expire ================================= */
3925 static int removeExpire(redisDb
*db
, robj
*key
) {
3926 if (dictDelete(db
->expires
,key
) == DICT_OK
) {
3933 static int setExpire(redisDb
*db
, robj
*key
, time_t when
) {
3934 if (dictAdd(db
->expires
,key
,(void*)when
) == DICT_ERR
) {
3942 /* Return the expire time of the specified key, or -1 if no expire
3943 * is associated with this key (i.e. the key is non volatile) */
3944 static time_t getExpire(redisDb
*db
, robj
*key
) {
3947 /* No expire? return ASAP */
3948 if (dictSize(db
->expires
) == 0 ||
3949 (de
= dictFind(db
->expires
,key
)) == NULL
) return -1;
3951 return (time_t) dictGetEntryVal(de
);
3954 static int expireIfNeeded(redisDb
*db
, robj
*key
) {
3958 /* No expire? return ASAP */
3959 if (dictSize(db
->expires
) == 0 ||
3960 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
3962 /* Lookup the expire */
3963 when
= (time_t) dictGetEntryVal(de
);
3964 if (time(NULL
) <= when
) return 0;
3966 /* Delete the key */
3967 dictDelete(db
->expires
,key
);
3968 return dictDelete(db
->dict
,key
) == DICT_OK
;
3971 static int deleteIfVolatile(redisDb
*db
, robj
*key
) {
3974 /* No expire? return ASAP */
3975 if (dictSize(db
->expires
) == 0 ||
3976 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
3978 /* Delete the key */
3980 dictDelete(db
->expires
,key
);
3981 return dictDelete(db
->dict
,key
) == DICT_OK
;
3984 static void expireCommand(redisClient
*c
) {
3986 int seconds
= atoi(c
->argv
[2]->ptr
);
3988 de
= dictFind(c
->db
->dict
,c
->argv
[1]);
3990 addReply(c
,shared
.czero
);
3994 addReply(c
, shared
.czero
);
3997 time_t when
= time(NULL
)+seconds
;
3998 if (setExpire(c
->db
,c
->argv
[1],when
)) {
3999 addReply(c
,shared
.cone
);
4002 addReply(c
,shared
.czero
);
4008 static void ttlCommand(redisClient
*c
) {
4012 expire
= getExpire(c
->db
,c
->argv
[1]);
4014 ttl
= (int) (expire
-time(NULL
));
4015 if (ttl
< 0) ttl
= -1;
4017 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",ttl
));
4020 /* =============================== Replication ============================= */
4022 static int syncWrite(int fd
, char *ptr
, ssize_t size
, int timeout
) {
4023 ssize_t nwritten
, ret
= size
;
4024 time_t start
= time(NULL
);
4028 if (aeWait(fd
,AE_WRITABLE
,1000) & AE_WRITABLE
) {
4029 nwritten
= write(fd
,ptr
,size
);
4030 if (nwritten
== -1) return -1;
4034 if ((time(NULL
)-start
) > timeout
) {
4042 static int syncRead(int fd
, char *ptr
, ssize_t size
, int timeout
) {
4043 ssize_t nread
, totread
= 0;
4044 time_t start
= time(NULL
);
4048 if (aeWait(fd
,AE_READABLE
,1000) & AE_READABLE
) {
4049 nread
= read(fd
,ptr
,size
);
4050 if (nread
== -1) return -1;
4055 if ((time(NULL
)-start
) > timeout
) {
4063 static int syncReadLine(int fd
, char *ptr
, ssize_t size
, int timeout
) {
4070 if (syncRead(fd
,&c
,1,timeout
) == -1) return -1;
4073 if (nread
&& *(ptr
-1) == '\r') *(ptr
-1) = '\0';
4084 static void syncCommand(redisClient
*c
) {
4085 /* ignore SYNC if aleady slave or in monitor mode */
4086 if (c
->flags
& REDIS_SLAVE
) return;
4088 /* SYNC can't be issued when the server has pending data to send to
4089 * the client about already issued commands. We need a fresh reply
4090 * buffer registering the differences between the BGSAVE and the current
4091 * dataset, so that we can copy to other slaves if needed. */
4092 if (listLength(c
->reply
) != 0) {
4093 addReplySds(c
,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
4097 redisLog(REDIS_NOTICE
,"Slave ask for synchronization");
4098 /* Here we need to check if there is a background saving operation
4099 * in progress, or if it is required to start one */
4100 if (server
.bgsaveinprogress
) {
4101 /* Ok a background save is in progress. Let's check if it is a good
4102 * one for replication, i.e. if there is another slave that is
4103 * registering differences since the server forked to save */
4107 listRewind(server
.slaves
);
4108 while((ln
= listYield(server
.slaves
))) {
4110 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) break;
4113 /* Perfect, the server is already registering differences for
4114 * another slave. Set the right state, and copy the buffer. */
4115 listRelease(c
->reply
);
4116 c
->reply
= listDup(slave
->reply
);
4117 if (!c
->reply
) oom("listDup copying slave reply list");
4118 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
4119 redisLog(REDIS_NOTICE
,"Waiting for end of BGSAVE for SYNC");
4121 /* No way, we need to wait for the next BGSAVE in order to
4122 * register differences */
4123 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_START
;
4124 redisLog(REDIS_NOTICE
,"Waiting for next BGSAVE for SYNC");
4127 /* Ok we don't have a BGSAVE in progress, let's start one */
4128 redisLog(REDIS_NOTICE
,"Starting BGSAVE for SYNC");
4129 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
4130 redisLog(REDIS_NOTICE
,"Replication failed, can't BGSAVE");
4131 addReplySds(c
,sdsnew("-ERR Unalbe to perform background save\r\n"));
4134 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
4137 c
->flags
|= REDIS_SLAVE
;
4139 if (!listAddNodeTail(server
.slaves
,c
)) oom("listAddNodeTail");
4143 static void sendBulkToSlave(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
4144 redisClient
*slave
= privdata
;
4146 REDIS_NOTUSED(mask
);
4147 char buf
[REDIS_IOBUF_LEN
];
4148 ssize_t nwritten
, buflen
;
4150 if (slave
->repldboff
== 0) {
4151 /* Write the bulk write count before to transfer the DB. In theory here
4152 * we don't know how much room there is in the output buffer of the
4153 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
4154 * operations) will never be smaller than the few bytes we need. */
4157 bulkcount
= sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
4159 if (write(fd
,bulkcount
,sdslen(bulkcount
)) != (signed)sdslen(bulkcount
))
4167 lseek(slave
->repldbfd
,slave
->repldboff
,SEEK_SET
);
4168 buflen
= read(slave
->repldbfd
,buf
,REDIS_IOBUF_LEN
);
4170 redisLog(REDIS_WARNING
,"Read error sending DB to slave: %s",
4171 (buflen
== 0) ? "premature EOF" : strerror(errno
));
4175 if ((nwritten
= write(fd
,buf
,buflen
)) == -1) {
4176 redisLog(REDIS_DEBUG
,"Write error sending DB to slave: %s",
4181 slave
->repldboff
+= nwritten
;
4182 if (slave
->repldboff
== slave
->repldbsize
) {
4183 close(slave
->repldbfd
);
4184 slave
->repldbfd
= -1;
4185 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
4186 slave
->replstate
= REDIS_REPL_ONLINE
;
4187 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
,
4188 sendReplyToClient
, slave
, NULL
) == AE_ERR
) {
4192 addReplySds(slave
,sdsempty());
4193 redisLog(REDIS_NOTICE
,"Synchronization with slave succeeded");
4197 /* This function is called at the end of every backgrond saving.
4198 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
4199 * otherwise REDIS_ERR is passed to the function.
4201 * The goal of this function is to handle slaves waiting for a successful
4202 * background saving in order to perform non-blocking synchronization. */
4203 static void updateSlavesWaitingBgsave(int bgsaveerr
) {
4205 int startbgsave
= 0;
4207 listRewind(server
.slaves
);
4208 while((ln
= listYield(server
.slaves
))) {
4209 redisClient
*slave
= ln
->value
;
4211 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) {
4213 slave
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
4214 } else if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) {
4215 struct redis_stat buf
;
4217 if (bgsaveerr
!= REDIS_OK
) {
4219 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE child returned an error");
4222 if ((slave
->repldbfd
= open(server
.dbfilename
,O_RDONLY
)) == -1 ||
4223 redis_fstat(slave
->repldbfd
,&buf
) == -1) {
4225 redisLog(REDIS_WARNING
,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno
));
4228 slave
->repldboff
= 0;
4229 slave
->repldbsize
= buf
.st_size
;
4230 slave
->replstate
= REDIS_REPL_SEND_BULK
;
4231 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
4232 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
, sendBulkToSlave
, slave
, NULL
) == AE_ERR
) {
4239 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
4240 listRewind(server
.slaves
);
4241 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE failed");
4242 while((ln
= listYield(server
.slaves
))) {
4243 redisClient
*slave
= ln
->value
;
4245 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
)
4252 static int syncWithMaster(void) {
4253 char buf
[1024], tmpfile
[256];
4255 int fd
= anetTcpConnect(NULL
,server
.masterhost
,server
.masterport
);
4259 redisLog(REDIS_WARNING
,"Unable to connect to MASTER: %s",
4263 /* Issue the SYNC command */
4264 if (syncWrite(fd
,"SYNC \r\n",7,5) == -1) {
4266 redisLog(REDIS_WARNING
,"I/O error writing to MASTER: %s",
4270 /* Read the bulk write count */
4271 if (syncReadLine(fd
,buf
,1024,3600) == -1) {
4273 redisLog(REDIS_WARNING
,"I/O error reading bulk count from MASTER: %s",
4277 dumpsize
= atoi(buf
+1);
4278 redisLog(REDIS_NOTICE
,"Receiving %d bytes data dump from MASTER",dumpsize
);
4279 /* Read the bulk write data on a temp file */
4280 snprintf(tmpfile
,256,"temp-%d.%ld.rdb",(int)time(NULL
),(long int)random());
4281 dfd
= open(tmpfile
,O_CREAT
|O_WRONLY
,0644);
4284 redisLog(REDIS_WARNING
,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno
));
4288 int nread
, nwritten
;
4290 nread
= read(fd
,buf
,(dumpsize
< 1024)?dumpsize
:1024);
4292 redisLog(REDIS_WARNING
,"I/O error trying to sync with MASTER: %s",
4298 nwritten
= write(dfd
,buf
,nread
);
4299 if (nwritten
== -1) {
4300 redisLog(REDIS_WARNING
,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno
));
4308 if (rename(tmpfile
,server
.dbfilename
) == -1) {
4309 redisLog(REDIS_WARNING
,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno
));
4315 if (rdbLoad(server
.dbfilename
) != REDIS_OK
) {
4316 redisLog(REDIS_WARNING
,"Failed trying to load the MASTER synchronization DB from disk");
4320 server
.master
= createClient(fd
);
4321 server
.master
->flags
|= REDIS_MASTER
;
4322 server
.replstate
= REDIS_REPL_CONNECTED
;
4326 static void slaveofCommand(redisClient
*c
) {
4327 if (!strcasecmp(c
->argv
[1]->ptr
,"no") &&
4328 !strcasecmp(c
->argv
[2]->ptr
,"one")) {
4329 if (server
.masterhost
) {
4330 sdsfree(server
.masterhost
);
4331 server
.masterhost
= NULL
;
4332 if (server
.master
) freeClient(server
.master
);
4333 server
.replstate
= REDIS_REPL_NONE
;
4334 redisLog(REDIS_NOTICE
,"MASTER MODE enabled (user request)");
4337 sdsfree(server
.masterhost
);
4338 server
.masterhost
= sdsdup(c
->argv
[1]->ptr
);
4339 server
.masterport
= atoi(c
->argv
[2]->ptr
);
4340 if (server
.master
) freeClient(server
.master
);
4341 server
.replstate
= REDIS_REPL_CONNECT
;
4342 redisLog(REDIS_NOTICE
,"SLAVE OF %s:%d enabled (user request)",
4343 server
.masterhost
, server
.masterport
);
4345 addReply(c
,shared
.ok
);
4348 /* ============================ Maxmemory directive ======================== */
4350 /* This function gets called when 'maxmemory' is set on the config file to limit
4351 * the max memory used by the server, and we are out of memory.
4352 * This function will try to, in order:
4354 * - Free objects from the free list
4355 * - Try to remove keys with an EXPIRE set
4357 * It is not possible to free enough memory to reach used-memory < maxmemory
4358 * the server will start refusing commands that will enlarge even more the
4361 static void freeMemoryIfNeeded(void) {
4362 while (server
.maxmemory
&& zmalloc_used_memory() > server
.maxmemory
) {
4363 if (listLength(server
.objfreelist
)) {
4366 listNode
*head
= listFirst(server
.objfreelist
);
4367 o
= listNodeValue(head
);
4368 listDelNode(server
.objfreelist
,head
);
4371 int j
, k
, freed
= 0;
4373 for (j
= 0; j
< server
.dbnum
; j
++) {
4375 robj
*minkey
= NULL
;
4376 struct dictEntry
*de
;
4378 if (dictSize(server
.db
[j
].expires
)) {
4380 /* From a sample of three keys drop the one nearest to
4381 * the natural expire */
4382 for (k
= 0; k
< 3; k
++) {
4385 de
= dictGetRandomKey(server
.db
[j
].expires
);
4386 t
= (time_t) dictGetEntryVal(de
);
4387 if (minttl
== -1 || t
< minttl
) {
4388 minkey
= dictGetEntryKey(de
);
4392 deleteKey(server
.db
+j
,minkey
);
4395 if (!freed
) return; /* nothing to free... */
4400 /* ================================= Debugging ============================== */
4402 static void debugCommand(redisClient
*c
) {
4403 if (!strcasecmp(c
->argv
[1]->ptr
,"segfault")) {
4405 } else if (!strcasecmp(c
->argv
[1]->ptr
,"object") && c
->argc
== 3) {
4406 dictEntry
*de
= dictFind(c
->db
->dict
,c
->argv
[2]);
4410 addReply(c
,shared
.nokeyerr
);
4413 key
= dictGetEntryKey(de
);
4414 val
= dictGetEntryVal(de
);
4415 addReplySds(c
,sdscatprintf(sdsempty(),
4416 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
4417 key
, key
->refcount
, val
, val
->refcount
, val
->encoding
));
4419 addReplySds(c
,sdsnew(
4420 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>]\r\n"));
4424 #ifdef HAVE_BACKTRACE
4425 static struct redisFunctionSym symsTable
[] = {
4426 {"compareStringObjects", (unsigned long)compareStringObjects
},
4427 {"isStringRepresentableAsLong", (unsigned long)isStringRepresentableAsLong
},
4428 {"dictEncObjKeyCompare", (unsigned long)dictEncObjKeyCompare
},
4429 {"dictEncObjHash", (unsigned long)dictEncObjHash
},
4430 {"incrDecrCommand", (unsigned long)incrDecrCommand
},
4431 {"freeStringObject", (unsigned long)freeStringObject
},
4432 {"freeListObject", (unsigned long)freeListObject
},
4433 {"freeSetObject", (unsigned long)freeSetObject
},
4434 {"decrRefCount", (unsigned long)decrRefCount
},
4435 {"createObject", (unsigned long)createObject
},
4436 {"freeClient", (unsigned long)freeClient
},
4437 {"rdbLoad", (unsigned long)rdbLoad
},
4438 {"rdbSaveStringObject", (unsigned long)rdbSaveStringObject
},
4439 {"rdbSaveStringObjectRaw", (unsigned long)rdbSaveStringObjectRaw
},
4440 {"addReply", (unsigned long)addReply
},
4441 {"addReplySds", (unsigned long)addReplySds
},
4442 {"incrRefCount", (unsigned long)incrRefCount
},
4443 {"rdbSaveBackground", (unsigned long)rdbSaveBackground
},
4444 {"createStringObject", (unsigned long)createStringObject
},
4445 {"replicationFeedSlaves", (unsigned long)replicationFeedSlaves
},
4446 {"syncWithMaster", (unsigned long)syncWithMaster
},
4447 {"tryObjectSharing", (unsigned long)tryObjectSharing
},
4448 {"tryObjectEncoding", (unsigned long)tryObjectEncoding
},
4449 {"getDecodedObject", (unsigned long)getDecodedObject
},
4450 {"removeExpire", (unsigned long)removeExpire
},
4451 {"expireIfNeeded", (unsigned long)expireIfNeeded
},
4452 {"deleteIfVolatile", (unsigned long)deleteIfVolatile
},
4453 {"deleteKey", (unsigned long)deleteKey
},
4454 {"getExpire", (unsigned long)getExpire
},
4455 {"setExpire", (unsigned long)setExpire
},
4456 {"updateSlavesWaitingBgsave", (unsigned long)updateSlavesWaitingBgsave
},
4457 {"freeMemoryIfNeeded", (unsigned long)freeMemoryIfNeeded
},
4458 {"authCommand", (unsigned long)authCommand
},
4459 {"pingCommand", (unsigned long)pingCommand
},
4460 {"echoCommand", (unsigned long)echoCommand
},
4461 {"setCommand", (unsigned long)setCommand
},
4462 {"setnxCommand", (unsigned long)setnxCommand
},
4463 {"getCommand", (unsigned long)getCommand
},
4464 {"delCommand", (unsigned long)delCommand
},
4465 {"existsCommand", (unsigned long)existsCommand
},
4466 {"incrCommand", (unsigned long)incrCommand
},
4467 {"decrCommand", (unsigned long)decrCommand
},
4468 {"incrbyCommand", (unsigned long)incrbyCommand
},
4469 {"decrbyCommand", (unsigned long)decrbyCommand
},
4470 {"selectCommand", (unsigned long)selectCommand
},
4471 {"randomkeyCommand", (unsigned long)randomkeyCommand
},
4472 {"keysCommand", (unsigned long)keysCommand
},
4473 {"dbsizeCommand", (unsigned long)dbsizeCommand
},
4474 {"lastsaveCommand", (unsigned long)lastsaveCommand
},
4475 {"saveCommand", (unsigned long)saveCommand
},
4476 {"bgsaveCommand", (unsigned long)bgsaveCommand
},
4477 {"shutdownCommand", (unsigned long)shutdownCommand
},
4478 {"moveCommand", (unsigned long)moveCommand
},
4479 {"renameCommand", (unsigned long)renameCommand
},
4480 {"renamenxCommand", (unsigned long)renamenxCommand
},
4481 {"lpushCommand", (unsigned long)lpushCommand
},
4482 {"rpushCommand", (unsigned long)rpushCommand
},
4483 {"lpopCommand", (unsigned long)lpopCommand
},
4484 {"rpopCommand", (unsigned long)rpopCommand
},
4485 {"llenCommand", (unsigned long)llenCommand
},
4486 {"lindexCommand", (unsigned long)lindexCommand
},
4487 {"lrangeCommand", (unsigned long)lrangeCommand
},
4488 {"ltrimCommand", (unsigned long)ltrimCommand
},
4489 {"typeCommand", (unsigned long)typeCommand
},
4490 {"lsetCommand", (unsigned long)lsetCommand
},
4491 {"saddCommand", (unsigned long)saddCommand
},
4492 {"sremCommand", (unsigned long)sremCommand
},
4493 {"smoveCommand", (unsigned long)smoveCommand
},
4494 {"sismemberCommand", (unsigned long)sismemberCommand
},
4495 {"scardCommand", (unsigned long)scardCommand
},
4496 {"spopCommand", (unsigned long)spopCommand
},
4497 {"sinterCommand", (unsigned long)sinterCommand
},
4498 {"sinterstoreCommand", (unsigned long)sinterstoreCommand
},
4499 {"sunionCommand", (unsigned long)sunionCommand
},
4500 {"sunionstoreCommand", (unsigned long)sunionstoreCommand
},
4501 {"sdiffCommand", (unsigned long)sdiffCommand
},
4502 {"sdiffstoreCommand", (unsigned long)sdiffstoreCommand
},
4503 {"syncCommand", (unsigned long)syncCommand
},
4504 {"flushdbCommand", (unsigned long)flushdbCommand
},
4505 {"flushallCommand", (unsigned long)flushallCommand
},
4506 {"sortCommand", (unsigned long)sortCommand
},
4507 {"lremCommand", (unsigned long)lremCommand
},
4508 {"infoCommand", (unsigned long)infoCommand
},
4509 {"mgetCommand", (unsigned long)mgetCommand
},
4510 {"monitorCommand", (unsigned long)monitorCommand
},
4511 {"expireCommand", (unsigned long)expireCommand
},
4512 {"getSetCommand", (unsigned long)getSetCommand
},
4513 {"ttlCommand", (unsigned long)ttlCommand
},
4514 {"slaveofCommand", (unsigned long)slaveofCommand
},
4515 {"debugCommand", (unsigned long)debugCommand
},
4516 {"processCommand", (unsigned long)processCommand
},
4517 {"setupSigSegvAction", (unsigned long)setupSigSegvAction
},
4518 {"readQueryFromClient", (unsigned long)readQueryFromClient
},
4519 {"rdbRemoveTempFile", (unsigned long)rdbRemoveTempFile
},
4523 /* This function try to convert a pointer into a function name. It's used in
4524 * oreder to provide a backtrace under segmentation fault that's able to
4525 * display functions declared as static (otherwise the backtrace is useless). */
4526 static char *findFuncName(void *pointer
, unsigned long *offset
){
4528 unsigned long off
, minoff
= 0;
4530 /* Try to match against the Symbol with the smallest offset */
4531 for (i
=0; symsTable
[i
].pointer
; i
++) {
4532 unsigned long lp
= (unsigned long) pointer
;
4534 if (lp
!= (unsigned long)-1 && lp
>= symsTable
[i
].pointer
) {
4535 off
=lp
-symsTable
[i
].pointer
;
4536 if (ret
< 0 || off
< minoff
) {
4542 if (ret
== -1) return NULL
;
4544 return symsTable
[ret
].name
;
4547 static void *getMcontextEip(ucontext_t
*uc
) {
4548 #if defined(__FreeBSD__)
4549 return (void*) uc
->uc_mcontext
.mc_eip
;
4550 #elif defined(__dietlibc__)
4551 return (void*) uc
->uc_mcontext
.eip
;
4552 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
4553 return (void*) uc
->uc_mcontext
->__ss
.__eip
;
4554 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
4555 #ifdef _STRUCT_X86_THREAD_STATE64
4556 return (void*) uc
->uc_mcontext
->__ss
.__rip
;
4558 return (void*) uc
->uc_mcontext
->__ss
.__eip
;
4560 #elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
4561 return (void*) uc
->uc_mcontext
.gregs
[REG_EIP
];
4562 #elif defined(__ia64__) /* Linux IA64 */
4563 return (void*) uc
->uc_mcontext
.sc_ip
;
4569 static void segvHandler(int sig
, siginfo_t
*info
, void *secret
) {
4571 char **messages
= NULL
;
4572 int i
, trace_size
= 0;
4573 unsigned long offset
=0;
4574 time_t uptime
= time(NULL
)-server
.stat_starttime
;
4575 ucontext_t
*uc
= (ucontext_t
*) secret
;
4576 REDIS_NOTUSED(info
);
4578 redisLog(REDIS_WARNING
,
4579 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION
, sig
);
4580 redisLog(REDIS_WARNING
, "%s", sdscatprintf(sdsempty(),
4581 "redis_version:%s; "
4582 "uptime_in_seconds:%d; "
4583 "connected_clients:%d; "
4584 "connected_slaves:%d; "
4586 "changes_since_last_save:%lld; "
4587 "bgsave_in_progress:%d; "
4588 "last_save_time:%d; "
4589 "total_connections_received:%lld; "
4590 "total_commands_processed:%lld; "
4594 listLength(server
.clients
)-listLength(server
.slaves
),
4595 listLength(server
.slaves
),
4598 server
.bgsaveinprogress
,
4600 server
.stat_numconnections
,
4601 server
.stat_numcommands
,
4602 server
.masterhost
== NULL
? "master" : "slave"
4605 trace_size
= backtrace(trace
, 100);
4606 /* overwrite sigaction with caller's address */
4607 if (getMcontextEip(uc
) != NULL
) {
4608 trace
[1] = getMcontextEip(uc
);
4610 messages
= backtrace_symbols(trace
, trace_size
);
4612 for (i
=1; i
<trace_size
; ++i
) {
4613 char *fn
= findFuncName(trace
[i
], &offset
), *p
;
4615 p
= strchr(messages
[i
],'+');
4616 if (!fn
|| (p
&& ((unsigned long)strtol(p
+1,NULL
,10)) < offset
)) {
4617 redisLog(REDIS_WARNING
,"%s", messages
[i
]);
4619 redisLog(REDIS_WARNING
,"%d redis-server %p %s + %d", i
, trace
[i
], fn
, (unsigned int)offset
);
4626 static void setupSigSegvAction(void) {
4627 struct sigaction act
;
4629 sigemptyset (&act
.sa_mask
);
4630 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
4631 * is used. Otherwise, sa_handler is used */
4632 act
.sa_flags
= SA_NODEFER
| SA_ONSTACK
| SA_RESETHAND
| SA_SIGINFO
;
4633 act
.sa_sigaction
= segvHandler
;
4634 sigaction (SIGSEGV
, &act
, NULL
);
4635 sigaction (SIGBUS
, &act
, NULL
);
4636 sigaction (SIGFPE
, &act
, NULL
);
4637 sigaction (SIGILL
, &act
, NULL
);
4638 sigaction (SIGBUS
, &act
, NULL
);
4641 #else /* HAVE_BACKTRACE */
4642 static void setupSigSegvAction(void) {
4644 #endif /* HAVE_BACKTRACE */
4646 /* =================================== Main! ================================ */
4649 int linuxOvercommitMemoryValue(void) {
4650 FILE *fp
= fopen("/proc/sys/vm/overcommit_memory","r");
4654 if (fgets(buf
,64,fp
) == NULL
) {
4663 void linuxOvercommitMemoryWarning(void) {
4664 if (linuxOvercommitMemoryValue() == 0) {
4665 redisLog(REDIS_WARNING
,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
4668 #endif /* __linux__ */
4670 static void daemonize(void) {
4674 if (fork() != 0) exit(0); /* parent exits */
4675 setsid(); /* create a new session */
4677 /* Every output goes to /dev/null. If Redis is daemonized but
4678 * the 'logfile' is set to 'stdout' in the configuration file
4679 * it will not log at all. */
4680 if ((fd
= open("/dev/null", O_RDWR
, 0)) != -1) {
4681 dup2(fd
, STDIN_FILENO
);
4682 dup2(fd
, STDOUT_FILENO
);
4683 dup2(fd
, STDERR_FILENO
);
4684 if (fd
> STDERR_FILENO
) close(fd
);
4686 /* Try to write the pid file */
4687 fp
= fopen(server
.pidfile
,"w");
4689 fprintf(fp
,"%d\n",getpid());
4694 int main(int argc
, char **argv
) {
4697 ResetServerSaveParams();
4698 loadServerConfig(argv
[1]);
4699 } else if (argc
> 2) {
4700 fprintf(stderr
,"Usage: ./redis-server [/path/to/redis.conf]\n");
4703 redisLog(REDIS_WARNING
,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
4706 if (server
.daemonize
) daemonize();
4707 redisLog(REDIS_NOTICE
,"Server started, Redis version " REDIS_VERSION
);
4709 linuxOvercommitMemoryWarning();
4711 if (rdbLoad(server
.dbfilename
) == REDIS_OK
)
4712 redisLog(REDIS_NOTICE
,"DB loaded from disk");
4713 if (aeCreateFileEvent(server
.el
, server
.fd
, AE_READABLE
,
4714 acceptHandler
, NULL
, NULL
) == AE_ERR
) oom("creating file event");
4715 redisLog(REDIS_NOTICE
,"The server is now ready to accept connections on port %d", server
.port
);
4717 aeDeleteEventLoop(server
.el
);