2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
30 #define REDIS_VERSION "0.101"
46 #include <arpa/inet.h>
50 #include <sys/resource.h>
54 #include "ae.h" /* Event driven programming library */
55 #include "sds.h" /* Dynamic safe strings */
56 #include "anet.h" /* Networking the easy way */
57 #include "dict.h" /* Hash tables */
58 #include "adlist.h" /* Linked lists */
59 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
60 #include "lzf.h" /* LZF compression library */
61 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
67 /* Static server configuration */
68 #define REDIS_SERVERPORT 6379 /* TCP port */
69 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
70 #define REDIS_IOBUF_LEN 1024
71 #define REDIS_LOADBUF_LEN 1024
72 #define REDIS_STATIC_ARGS 4
73 #define REDIS_DEFAULT_DBNUM 16
74 #define REDIS_CONFIGLINE_MAX 1024
75 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
76 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
77 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
79 /* Hash table parameters */
80 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
81 #define REDIS_HT_MINSLOTS 16384 /* Never resize the HT under this */
84 #define REDIS_CMD_BULK 1 /* Bulk write command */
85 #define REDIS_CMD_INLINE 2 /* Inline command */
86 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
87 this flags will return an error when the 'maxmemory' option is set in the
88 config file and the server is using more than maxmemory bytes of memory.
89 In short this commands are denied on low memory conditions. */
90 #define REDIS_CMD_DENYOOM 4
93 #define REDIS_STRING 0
98 /* Object types only used for dumping to disk */
99 #define REDIS_EXPIRETIME 253
100 #define REDIS_SELECTDB 254
101 #define REDIS_EOF 255
103 /* Defines related to the dump file format. To store 32 bits lengths for short
104 * keys requires a lot of space, so we check the most significant 2 bits of
105 * the first byte to interpreter the length:
107 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
108 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
109 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
110 * 11|000000 this means: specially encoded object will follow. The six bits
111 * number specify the kind of object that follows.
112 * See the REDIS_RDB_ENC_* defines.
114 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
115 * values, will fit inside. */
116 #define REDIS_RDB_6BITLEN 0
117 #define REDIS_RDB_14BITLEN 1
118 #define REDIS_RDB_32BITLEN 2
119 #define REDIS_RDB_ENCVAL 3
120 #define REDIS_RDB_LENERR UINT_MAX
122 /* When a length of a string object stored on disk has the first two bits
123 * set, the remaining two bits specify a special encoding for the object
124 * accordingly to the following defines: */
125 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
126 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
127 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
128 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
131 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
132 #define REDIS_SLAVE 2 /* This client is a slave server */
133 #define REDIS_MASTER 4 /* This client is a master server */
134 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
136 /* Slave replication state - slave side */
137 #define REDIS_REPL_NONE 0 /* No active replication */
138 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
139 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
141 /* Slave replication state - from the point of view of master
142 * Note that in SEND_BULK and ONLINE state the slave receives new updates
143 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
144 * to start the next background saving in order to send updates to it. */
145 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
146 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
147 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
148 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
150 /* List related stuff */
154 /* Sort operations */
155 #define REDIS_SORT_GET 0
156 #define REDIS_SORT_DEL 1
157 #define REDIS_SORT_INCR 2
158 #define REDIS_SORT_DECR 3
159 #define REDIS_SORT_ASC 4
160 #define REDIS_SORT_DESC 5
161 #define REDIS_SORTKEY_MAX 1024
164 #define REDIS_DEBUG 0
165 #define REDIS_NOTICE 1
166 #define REDIS_WARNING 2
168 /* Anti-warning macro... */
169 #define REDIS_NOTUSED(V) ((void) V)
171 /*================================= Data types ============================== */
173 /* A redis object, that is a type able to hold a string / list / set */
174 typedef struct redisObject
{
180 typedef struct redisDb
{
186 /* With multiplexing we need to take per-clinet state.
187 * Clients are taken in a liked list. */
188 typedef struct redisClient
{
195 int bulklen
; /* bulk read len. -1 if not in bulk read mode */
198 time_t lastinteraction
; /* time of the last interaction, used for timeout */
199 int flags
; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
200 int slaveseldb
; /* slave selected db, if this client is a slave */
201 int authenticated
; /* when requirepass is non-NULL */
202 int replstate
; /* replication state if this is a slave */
203 int repldbfd
; /* replication DB file descriptor */
204 long repldboff
; /* replication DB file offset */
205 off_t repldbsize
; /* replication DB file size */
213 /* Global server state structure */
219 unsigned int sharingpoolsize
;
220 long long dirty
; /* changes to DB from the last save */
222 list
*slaves
, *monitors
;
223 char neterr
[ANET_ERR_LEN
];
225 int cronloops
; /* number of times the cron function run */
226 list
*objfreelist
; /* A list of freed objects to avoid malloc() */
227 time_t lastsave
; /* Unix time of last save succeeede */
228 size_t usedmemory
; /* Used memory in megabytes */
229 /* Fields used only for stats */
230 time_t stat_starttime
; /* server start time */
231 long long stat_numcommands
; /* number of processed commands */
232 long long stat_numconnections
; /* number of connections received */
240 int bgsaveinprogress
;
241 struct saveparam
*saveparams
;
248 /* Replication related */
252 redisClient
*master
; /* client that is master for this slave */
254 unsigned int maxclients
;
255 unsigned int maxmemory
;
256 /* Sort parameters - qsort_r() is only available under BSD so we
257 * have to take this state global, in order to pass it to sortCompare() */
263 typedef void redisCommandProc(redisClient
*c
);
264 struct redisCommand
{
266 redisCommandProc
*proc
;
271 typedef struct _redisSortObject
{
279 typedef struct _redisSortOperation
{
282 } redisSortOperation
;
284 struct sharedObjectsStruct
{
285 robj
*crlf
, *ok
, *err
, *emptybulk
, *czero
, *cone
, *pong
, *space
,
286 *colon
, *nullbulk
, *nullmultibulk
,
287 *emptymultibulk
, *wrongtypeerr
, *nokeyerr
, *syntaxerr
, *sameobjecterr
,
288 *outofrangeerr
, *plus
,
289 *select0
, *select1
, *select2
, *select3
, *select4
,
290 *select5
, *select6
, *select7
, *select8
, *select9
;
293 /*================================ Prototypes =============================== */
295 static void freeStringObject(robj
*o
);
296 static void freeListObject(robj
*o
);
297 static void freeSetObject(robj
*o
);
298 static void decrRefCount(void *o
);
299 static robj
*createObject(int type
, void *ptr
);
300 static void freeClient(redisClient
*c
);
301 static int rdbLoad(char *filename
);
302 static void addReply(redisClient
*c
, robj
*obj
);
303 static void addReplySds(redisClient
*c
, sds s
);
304 static void incrRefCount(robj
*o
);
305 static int rdbSaveBackground(char *filename
);
306 static robj
*createStringObject(char *ptr
, size_t len
);
307 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
);
308 static int syncWithMaster(void);
309 static robj
*tryObjectSharing(robj
*o
);
310 static int removeExpire(redisDb
*db
, robj
*key
);
311 static int expireIfNeeded(redisDb
*db
, robj
*key
);
312 static int deleteIfVolatile(redisDb
*db
, robj
*key
);
313 static int deleteKey(redisDb
*db
, robj
*key
);
314 static time_t getExpire(redisDb
*db
, robj
*key
);
315 static int setExpire(redisDb
*db
, robj
*key
, time_t when
);
316 static void updateSalvesWaitingBgsave(int bgsaveerr
);
317 static void freeMemoryIfNeeded(void);
318 static void onSigsegv(int sig
);
320 static void authCommand(redisClient
*c
);
321 static void pingCommand(redisClient
*c
);
322 static void echoCommand(redisClient
*c
);
323 static void setCommand(redisClient
*c
);
324 static void setnxCommand(redisClient
*c
);
325 static void getCommand(redisClient
*c
);
326 static void delCommand(redisClient
*c
);
327 static void existsCommand(redisClient
*c
);
328 static void incrCommand(redisClient
*c
);
329 static void decrCommand(redisClient
*c
);
330 static void incrbyCommand(redisClient
*c
);
331 static void decrbyCommand(redisClient
*c
);
332 static void selectCommand(redisClient
*c
);
333 static void randomkeyCommand(redisClient
*c
);
334 static void keysCommand(redisClient
*c
);
335 static void dbsizeCommand(redisClient
*c
);
336 static void lastsaveCommand(redisClient
*c
);
337 static void saveCommand(redisClient
*c
);
338 static void bgsaveCommand(redisClient
*c
);
339 static void shutdownCommand(redisClient
*c
);
340 static void moveCommand(redisClient
*c
);
341 static void renameCommand(redisClient
*c
);
342 static void renamenxCommand(redisClient
*c
);
343 static void lpushCommand(redisClient
*c
);
344 static void rpushCommand(redisClient
*c
);
345 static void lpopCommand(redisClient
*c
);
346 static void rpopCommand(redisClient
*c
);
347 static void llenCommand(redisClient
*c
);
348 static void lindexCommand(redisClient
*c
);
349 static void lrangeCommand(redisClient
*c
);
350 static void ltrimCommand(redisClient
*c
);
351 static void typeCommand(redisClient
*c
);
352 static void lsetCommand(redisClient
*c
);
353 static void saddCommand(redisClient
*c
);
354 static void sremCommand(redisClient
*c
);
355 static void smoveCommand(redisClient
*c
);
356 static void sismemberCommand(redisClient
*c
);
357 static void scardCommand(redisClient
*c
);
358 static void sinterCommand(redisClient
*c
);
359 static void sinterstoreCommand(redisClient
*c
);
360 static void sunionCommand(redisClient
*c
);
361 static void sunionstoreCommand(redisClient
*c
);
362 static void sdiffCommand(redisClient
*c
);
363 static void sdiffstoreCommand(redisClient
*c
);
364 static void syncCommand(redisClient
*c
);
365 static void flushdbCommand(redisClient
*c
);
366 static void flushallCommand(redisClient
*c
);
367 static void sortCommand(redisClient
*c
);
368 static void lremCommand(redisClient
*c
);
369 static void infoCommand(redisClient
*c
);
370 static void mgetCommand(redisClient
*c
);
371 static void monitorCommand(redisClient
*c
);
372 static void expireCommand(redisClient
*c
);
373 static void getSetCommand(redisClient
*c
);
374 static void ttlCommand(redisClient
*c
);
375 static void slaveofCommand(redisClient
*c
);
376 static void debugCommand(redisClient
*c
);
378 /*================================= Globals ================================= */
381 static struct redisServer server
; /* server global state */
382 static struct redisCommand cmdTable
[] = {
383 {"get",getCommand
,2,REDIS_CMD_INLINE
},
384 {"set",setCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
385 {"setnx",setnxCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
386 {"del",delCommand
,-2,REDIS_CMD_INLINE
},
387 {"exists",existsCommand
,2,REDIS_CMD_INLINE
},
388 {"incr",incrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
389 {"decr",decrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
390 {"mget",mgetCommand
,-2,REDIS_CMD_INLINE
},
391 {"rpush",rpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
392 {"lpush",lpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
393 {"rpop",rpopCommand
,2,REDIS_CMD_INLINE
},
394 {"lpop",lpopCommand
,2,REDIS_CMD_INLINE
},
395 {"llen",llenCommand
,2,REDIS_CMD_INLINE
},
396 {"lindex",lindexCommand
,3,REDIS_CMD_INLINE
},
397 {"lset",lsetCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
398 {"lrange",lrangeCommand
,4,REDIS_CMD_INLINE
},
399 {"ltrim",ltrimCommand
,4,REDIS_CMD_INLINE
},
400 {"lrem",lremCommand
,4,REDIS_CMD_BULK
},
401 {"sadd",saddCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
402 {"srem",sremCommand
,3,REDIS_CMD_BULK
},
403 {"smove",smoveCommand
,4,REDIS_CMD_BULK
},
404 {"sismember",sismemberCommand
,3,REDIS_CMD_BULK
},
405 {"scard",scardCommand
,2,REDIS_CMD_INLINE
},
406 {"sinter",sinterCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
407 {"sinterstore",sinterstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
408 {"sunion",sunionCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
409 {"sunionstore",sunionstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
410 {"sdiff",sdiffCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
411 {"sdiffstore",sdiffstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
412 {"smembers",sinterCommand
,2,REDIS_CMD_INLINE
},
413 {"incrby",incrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
414 {"decrby",decrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
415 {"getset",getSetCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
416 {"randomkey",randomkeyCommand
,1,REDIS_CMD_INLINE
},
417 {"select",selectCommand
,2,REDIS_CMD_INLINE
},
418 {"move",moveCommand
,3,REDIS_CMD_INLINE
},
419 {"rename",renameCommand
,3,REDIS_CMD_INLINE
},
420 {"renamenx",renamenxCommand
,3,REDIS_CMD_INLINE
},
421 {"expire",expireCommand
,3,REDIS_CMD_INLINE
},
422 {"keys",keysCommand
,2,REDIS_CMD_INLINE
},
423 {"dbsize",dbsizeCommand
,1,REDIS_CMD_INLINE
},
424 {"auth",authCommand
,2,REDIS_CMD_INLINE
},
425 {"ping",pingCommand
,1,REDIS_CMD_INLINE
},
426 {"echo",echoCommand
,2,REDIS_CMD_BULK
},
427 {"save",saveCommand
,1,REDIS_CMD_INLINE
},
428 {"bgsave",bgsaveCommand
,1,REDIS_CMD_INLINE
},
429 {"shutdown",shutdownCommand
,1,REDIS_CMD_INLINE
},
430 {"lastsave",lastsaveCommand
,1,REDIS_CMD_INLINE
},
431 {"type",typeCommand
,2,REDIS_CMD_INLINE
},
432 {"sync",syncCommand
,1,REDIS_CMD_INLINE
},
433 {"flushdb",flushdbCommand
,1,REDIS_CMD_INLINE
},
434 {"flushall",flushallCommand
,1,REDIS_CMD_INLINE
},
435 {"sort",sortCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
436 {"info",infoCommand
,1,REDIS_CMD_INLINE
},
437 {"monitor",monitorCommand
,1,REDIS_CMD_INLINE
},
438 {"ttl",ttlCommand
,2,REDIS_CMD_INLINE
},
439 {"slaveof",slaveofCommand
,3,REDIS_CMD_INLINE
},
440 {"debug",debugCommand
,-2,REDIS_CMD_INLINE
},
444 /*============================ Utility functions ============================ */
446 /* Glob-style pattern matching. */
447 int stringmatchlen(const char *pattern
, int patternLen
,
448 const char *string
, int stringLen
, int nocase
)
453 while (pattern
[1] == '*') {
458 return 1; /* match */
460 if (stringmatchlen(pattern
+1, patternLen
-1,
461 string
, stringLen
, nocase
))
462 return 1; /* match */
466 return 0; /* no match */
470 return 0; /* no match */
480 not = pattern
[0] == '^';
487 if (pattern
[0] == '\\') {
490 if (pattern
[0] == string
[0])
492 } else if (pattern
[0] == ']') {
494 } else if (patternLen
== 0) {
498 } else if (pattern
[1] == '-' && patternLen
>= 3) {
499 int start
= pattern
[0];
500 int end
= pattern
[2];
508 start
= tolower(start
);
514 if (c
>= start
&& c
<= end
)
518 if (pattern
[0] == string
[0])
521 if (tolower((int)pattern
[0]) == tolower((int)string
[0]))
531 return 0; /* no match */
537 if (patternLen
>= 2) {
544 if (pattern
[0] != string
[0])
545 return 0; /* no match */
547 if (tolower((int)pattern
[0]) != tolower((int)string
[0]))
548 return 0; /* no match */
556 if (stringLen
== 0) {
557 while(*pattern
== '*') {
564 if (patternLen
== 0 && stringLen
== 0)
569 void redisLog(int level
, const char *fmt
, ...)
574 fp
= (server
.logfile
== NULL
) ? stdout
: fopen(server
.logfile
,"a");
578 if (level
>= server
.verbosity
) {
584 strftime(buf
,64,"%d %b %H:%M:%S",gmtime(&now
));
585 fprintf(fp
,"%s %c ",buf
,c
[level
]);
586 vfprintf(fp
, fmt
, ap
);
592 if (server
.logfile
) fclose(fp
);
595 /*====================== Hash table type implementation ==================== */
597 /* This is an hash table type that uses the SDS dynamic strings libary as
598 * keys and radis objects as values (objects can hold SDS strings,
601 static int sdsDictKeyCompare(void *privdata
, const void *key1
,
605 DICT_NOTUSED(privdata
);
607 l1
= sdslen((sds
)key1
);
608 l2
= sdslen((sds
)key2
);
609 if (l1
!= l2
) return 0;
610 return memcmp(key1
, key2
, l1
) == 0;
613 static void dictRedisObjectDestructor(void *privdata
, void *val
)
615 DICT_NOTUSED(privdata
);
620 static int dictSdsKeyCompare(void *privdata
, const void *key1
,
623 const robj
*o1
= key1
, *o2
= key2
;
624 return sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
627 static unsigned int dictSdsHash(const void *key
) {
629 return dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
632 static dictType setDictType
= {
633 dictSdsHash
, /* hash function */
636 dictSdsKeyCompare
, /* key compare */
637 dictRedisObjectDestructor
, /* key destructor */
638 NULL
/* val destructor */
641 static dictType hashDictType
= {
642 dictSdsHash
, /* hash function */
645 dictSdsKeyCompare
, /* key compare */
646 dictRedisObjectDestructor
, /* key destructor */
647 dictRedisObjectDestructor
/* val destructor */
650 /* ========================= Random utility functions ======================= */
652 /* Redis generally does not try to recover from out of memory conditions
653 * when allocating objects or strings, it is not clear if it will be possible
654 * to report this condition to the client since the networking layer itself
655 * is based on heap allocation for send buffers, so we simply abort.
656 * At least the code will be simpler to read... */
657 static void oom(const char *msg
) {
658 fprintf(stderr
, "%s: Out of memory\n",msg
);
664 /* ====================== Redis server networking stuff ===================== */
665 void closeTimedoutClients(void) {
668 time_t now
= time(NULL
);
670 listRewind(server
.clients
);
671 while ((ln
= listYield(server
.clients
)) != NULL
) {
672 c
= listNodeValue(ln
);
673 if (!(c
->flags
& REDIS_SLAVE
) && /* no timeout for slaves */
674 !(c
->flags
& REDIS_MASTER
) && /* no timeout for masters */
675 (now
- c
->lastinteraction
> server
.maxidletime
)) {
676 redisLog(REDIS_DEBUG
,"Closing idle client");
682 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
683 * we resize the hash table to save memory */
684 void tryResizeHashTables(void) {
687 for (j
= 0; j
< server
.dbnum
; j
++) {
688 long long size
, used
;
690 size
= dictSlots(server
.db
[j
].dict
);
691 used
= dictSize(server
.db
[j
].dict
);
692 if (size
&& used
&& size
> REDIS_HT_MINSLOTS
&&
693 (used
*100/size
< REDIS_HT_MINFILL
)) {
694 redisLog(REDIS_NOTICE
,"The hash table %d is too sparse, resize it...",j
);
695 dictResize(server
.db
[j
].dict
);
696 redisLog(REDIS_NOTICE
,"Hash table %d resized.",j
);
701 int serverCron(struct aeEventLoop
*eventLoop
, long long id
, void *clientData
) {
702 int j
, loops
= server
.cronloops
++;
703 REDIS_NOTUSED(eventLoop
);
705 REDIS_NOTUSED(clientData
);
707 /* Update the global state with the amount of used memory */
708 server
.usedmemory
= zmalloc_used_memory();
710 /* Show some info about non-empty databases */
711 for (j
= 0; j
< server
.dbnum
; j
++) {
712 long long size
, used
, vkeys
;
714 size
= dictSlots(server
.db
[j
].dict
);
715 used
= dictSize(server
.db
[j
].dict
);
716 vkeys
= dictSize(server
.db
[j
].expires
);
717 if (!(loops
% 5) && used
> 0) {
718 redisLog(REDIS_DEBUG
,"DB %d: %d keys (%d volatile) in %d slots HT.",j
,used
,vkeys
,size
);
719 /* dictPrintStats(server.dict); */
723 /* We don't want to resize the hash tables while a bacground saving
724 * is in progress: the saving child is created using fork() that is
725 * implemented with a copy-on-write semantic in most modern systems, so
726 * if we resize the HT while there is the saving child at work actually
727 * a lot of memory movements in the parent will cause a lot of pages
729 if (!server
.bgsaveinprogress
) tryResizeHashTables();
731 /* Show information about connected clients */
733 redisLog(REDIS_DEBUG
,"%d clients connected (%d slaves), %zu bytes in use",
734 listLength(server
.clients
)-listLength(server
.slaves
),
735 listLength(server
.slaves
),
737 dictSize(server
.sharingpool
));
740 /* Close connections of timedout clients */
741 if (server
.maxidletime
&& !(loops
% 10))
742 closeTimedoutClients();
744 /* Check if a background saving in progress terminated */
745 if (server
.bgsaveinprogress
) {
747 /* XXX: TODO handle the case of the saving child killed */
748 if (wait4(-1,&statloc
,WNOHANG
,NULL
)) {
749 int exitcode
= WEXITSTATUS(statloc
);
751 redisLog(REDIS_NOTICE
,
752 "Background saving terminated with success");
754 server
.lastsave
= time(NULL
);
756 redisLog(REDIS_WARNING
,
757 "Background saving error");
759 server
.bgsaveinprogress
= 0;
760 updateSalvesWaitingBgsave(exitcode
== 0 ? REDIS_OK
: REDIS_ERR
);
763 /* If there is not a background saving in progress check if
764 * we have to save now */
765 time_t now
= time(NULL
);
766 for (j
= 0; j
< server
.saveparamslen
; j
++) {
767 struct saveparam
*sp
= server
.saveparams
+j
;
769 if (server
.dirty
>= sp
->changes
&&
770 now
-server
.lastsave
> sp
->seconds
) {
771 redisLog(REDIS_NOTICE
,"%d changes in %d seconds. Saving...",
772 sp
->changes
, sp
->seconds
);
773 rdbSaveBackground(server
.dbfilename
);
779 /* Try to expire a few timed out keys */
780 for (j
= 0; j
< server
.dbnum
; j
++) {
781 redisDb
*db
= server
.db
+j
;
782 int num
= dictSize(db
->expires
);
785 time_t now
= time(NULL
);
787 if (num
> REDIS_EXPIRELOOKUPS_PER_CRON
)
788 num
= REDIS_EXPIRELOOKUPS_PER_CRON
;
793 if ((de
= dictGetRandomKey(db
->expires
)) == NULL
) break;
794 t
= (time_t) dictGetEntryVal(de
);
796 deleteKey(db
,dictGetEntryKey(de
));
802 /* Check if we should connect to a MASTER */
803 if (server
.replstate
== REDIS_REPL_CONNECT
) {
804 redisLog(REDIS_NOTICE
,"Connecting to MASTER...");
805 if (syncWithMaster() == REDIS_OK
) {
806 redisLog(REDIS_NOTICE
,"MASTER <-> SLAVE sync succeeded");
812 static void createSharedObjects(void) {
813 shared
.crlf
= createObject(REDIS_STRING
,sdsnew("\r\n"));
814 shared
.ok
= createObject(REDIS_STRING
,sdsnew("+OK\r\n"));
815 shared
.err
= createObject(REDIS_STRING
,sdsnew("-ERR\r\n"));
816 shared
.emptybulk
= createObject(REDIS_STRING
,sdsnew("$0\r\n\r\n"));
817 shared
.czero
= createObject(REDIS_STRING
,sdsnew(":0\r\n"));
818 shared
.cone
= createObject(REDIS_STRING
,sdsnew(":1\r\n"));
819 shared
.nullbulk
= createObject(REDIS_STRING
,sdsnew("$-1\r\n"));
820 shared
.nullmultibulk
= createObject(REDIS_STRING
,sdsnew("*-1\r\n"));
821 shared
.emptymultibulk
= createObject(REDIS_STRING
,sdsnew("*0\r\n"));
823 shared
.pong
= createObject(REDIS_STRING
,sdsnew("+PONG\r\n"));
824 shared
.wrongtypeerr
= createObject(REDIS_STRING
,sdsnew(
825 "-ERR Operation against a key holding the wrong kind of value\r\n"));
826 shared
.nokeyerr
= createObject(REDIS_STRING
,sdsnew(
827 "-ERR no such key\r\n"));
828 shared
.syntaxerr
= createObject(REDIS_STRING
,sdsnew(
829 "-ERR syntax error\r\n"));
830 shared
.sameobjecterr
= createObject(REDIS_STRING
,sdsnew(
831 "-ERR source and destination objects are the same\r\n"));
832 shared
.outofrangeerr
= createObject(REDIS_STRING
,sdsnew(
833 "-ERR index out of range\r\n"));
834 shared
.space
= createObject(REDIS_STRING
,sdsnew(" "));
835 shared
.colon
= createObject(REDIS_STRING
,sdsnew(":"));
836 shared
.plus
= createObject(REDIS_STRING
,sdsnew("+"));
837 shared
.select0
= createStringObject("select 0\r\n",10);
838 shared
.select1
= createStringObject("select 1\r\n",10);
839 shared
.select2
= createStringObject("select 2\r\n",10);
840 shared
.select3
= createStringObject("select 3\r\n",10);
841 shared
.select4
= createStringObject("select 4\r\n",10);
842 shared
.select5
= createStringObject("select 5\r\n",10);
843 shared
.select6
= createStringObject("select 6\r\n",10);
844 shared
.select7
= createStringObject("select 7\r\n",10);
845 shared
.select8
= createStringObject("select 8\r\n",10);
846 shared
.select9
= createStringObject("select 9\r\n",10);
849 static void appendServerSaveParams(time_t seconds
, int changes
) {
850 server
.saveparams
= zrealloc(server
.saveparams
,sizeof(struct saveparam
)*(server
.saveparamslen
+1));
851 if (server
.saveparams
== NULL
) oom("appendServerSaveParams");
852 server
.saveparams
[server
.saveparamslen
].seconds
= seconds
;
853 server
.saveparams
[server
.saveparamslen
].changes
= changes
;
854 server
.saveparamslen
++;
857 static void ResetServerSaveParams() {
858 zfree(server
.saveparams
);
859 server
.saveparams
= NULL
;
860 server
.saveparamslen
= 0;
863 static void initServerConfig() {
864 server
.dbnum
= REDIS_DEFAULT_DBNUM
;
865 server
.port
= REDIS_SERVERPORT
;
866 server
.verbosity
= REDIS_DEBUG
;
867 server
.maxidletime
= REDIS_MAXIDLETIME
;
868 server
.saveparams
= NULL
;
869 server
.logfile
= NULL
; /* NULL = log on standard output */
870 server
.bindaddr
= NULL
;
871 server
.glueoutputbuf
= 1;
872 server
.daemonize
= 0;
873 server
.pidfile
= "/var/run/redis.pid";
874 server
.dbfilename
= "dump.rdb";
875 server
.requirepass
= NULL
;
876 server
.shareobjects
= 0;
877 server
.maxclients
= 0;
878 server
.maxmemory
= 0;
879 ResetServerSaveParams();
881 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
882 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
883 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
884 /* Replication related */
886 server
.masterhost
= NULL
;
887 server
.masterport
= 6379;
888 server
.master
= NULL
;
889 server
.replstate
= REDIS_REPL_NONE
;
892 static void initServer() {
895 signal(SIGHUP
, SIG_IGN
);
896 signal(SIGPIPE
, SIG_IGN
);
897 signal(SIGSEGV
, onSigsegv
);
898 signal(SIGBUS
, onSigsegv
);
900 server
.clients
= listCreate();
901 server
.slaves
= listCreate();
902 server
.monitors
= listCreate();
903 server
.objfreelist
= listCreate();
904 createSharedObjects();
905 server
.el
= aeCreateEventLoop();
906 server
.db
= zmalloc(sizeof(redisDb
)*server
.dbnum
);
907 server
.sharingpool
= dictCreate(&setDictType
,NULL
);
908 server
.sharingpoolsize
= 1024;
909 if (!server
.db
|| !server
.clients
|| !server
.slaves
|| !server
.monitors
|| !server
.el
|| !server
.objfreelist
)
910 oom("server initialization"); /* Fatal OOM */
911 server
.fd
= anetTcpServer(server
.neterr
, server
.port
, server
.bindaddr
);
912 if (server
.fd
== -1) {
913 redisLog(REDIS_WARNING
, "Opening TCP port: %s", server
.neterr
);
916 for (j
= 0; j
< server
.dbnum
; j
++) {
917 server
.db
[j
].dict
= dictCreate(&hashDictType
,NULL
);
918 server
.db
[j
].expires
= dictCreate(&setDictType
,NULL
);
921 server
.cronloops
= 0;
922 server
.bgsaveinprogress
= 0;
923 server
.lastsave
= time(NULL
);
925 server
.usedmemory
= 0;
926 server
.stat_numcommands
= 0;
927 server
.stat_numconnections
= 0;
928 server
.stat_starttime
= time(NULL
);
929 aeCreateTimeEvent(server
.el
, 1000, serverCron
, NULL
, NULL
);
932 /* Empty the whole database */
933 static long long emptyDb() {
935 long long removed
= 0;
937 for (j
= 0; j
< server
.dbnum
; j
++) {
938 removed
+= dictSize(server
.db
[j
].dict
);
939 dictEmpty(server
.db
[j
].dict
);
940 dictEmpty(server
.db
[j
].expires
);
945 static int yesnotoi(char *s
) {
946 if (!strcasecmp(s
,"yes")) return 1;
947 else if (!strcasecmp(s
,"no")) return 0;
951 /* I agree, this is a very rudimental way to load a configuration...
952 will improve later if the config gets more complex */
953 static void loadServerConfig(char *filename
) {
954 FILE *fp
= fopen(filename
,"r");
955 char buf
[REDIS_CONFIGLINE_MAX
+1], *err
= NULL
;
960 redisLog(REDIS_WARNING
,"Fatal error, can't open config file");
963 while(fgets(buf
,REDIS_CONFIGLINE_MAX
+1,fp
) != NULL
) {
969 line
= sdstrim(line
," \t\r\n");
971 /* Skip comments and blank lines*/
972 if (line
[0] == '#' || line
[0] == '\0') {
977 /* Split into arguments */
978 argv
= sdssplitlen(line
,sdslen(line
)," ",1,&argc
);
981 /* Execute config directives */
982 if (!strcasecmp(argv
[0],"timeout") && argc
== 2) {
983 server
.maxidletime
= atoi(argv
[1]);
984 if (server
.maxidletime
< 0) {
985 err
= "Invalid timeout value"; goto loaderr
;
987 } else if (!strcasecmp(argv
[0],"port") && argc
== 2) {
988 server
.port
= atoi(argv
[1]);
989 if (server
.port
< 1 || server
.port
> 65535) {
990 err
= "Invalid port"; goto loaderr
;
992 } else if (!strcasecmp(argv
[0],"bind") && argc
== 2) {
993 server
.bindaddr
= zstrdup(argv
[1]);
994 } else if (!strcasecmp(argv
[0],"save") && argc
== 3) {
995 int seconds
= atoi(argv
[1]);
996 int changes
= atoi(argv
[2]);
997 if (seconds
< 1 || changes
< 0) {
998 err
= "Invalid save parameters"; goto loaderr
;
1000 appendServerSaveParams(seconds
,changes
);
1001 } else if (!strcasecmp(argv
[0],"dir") && argc
== 2) {
1002 if (chdir(argv
[1]) == -1) {
1003 redisLog(REDIS_WARNING
,"Can't chdir to '%s': %s",
1004 argv
[1], strerror(errno
));
1007 } else if (!strcasecmp(argv
[0],"loglevel") && argc
== 2) {
1008 if (!strcasecmp(argv
[1],"debug")) server
.verbosity
= REDIS_DEBUG
;
1009 else if (!strcasecmp(argv
[1],"notice")) server
.verbosity
= REDIS_NOTICE
;
1010 else if (!strcasecmp(argv
[1],"warning")) server
.verbosity
= REDIS_WARNING
;
1012 err
= "Invalid log level. Must be one of debug, notice, warning";
1015 } else if (!strcasecmp(argv
[0],"logfile") && argc
== 2) {
1018 server
.logfile
= zstrdup(argv
[1]);
1019 if (!strcasecmp(server
.logfile
,"stdout")) {
1020 zfree(server
.logfile
);
1021 server
.logfile
= NULL
;
1023 if (server
.logfile
) {
1024 /* Test if we are able to open the file. The server will not
1025 * be able to abort just for this problem later... */
1026 fp
= fopen(server
.logfile
,"a");
1028 err
= sdscatprintf(sdsempty(),
1029 "Can't open the log file: %s", strerror(errno
));
1034 } else if (!strcasecmp(argv
[0],"databases") && argc
== 2) {
1035 server
.dbnum
= atoi(argv
[1]);
1036 if (server
.dbnum
< 1) {
1037 err
= "Invalid number of databases"; goto loaderr
;
1039 } else if (!strcasecmp(argv
[0],"maxclients") && argc
== 2) {
1040 server
.maxclients
= atoi(argv
[1]);
1041 } else if (!strcasecmp(argv
[0],"maxmemory") && argc
== 2) {
1042 server
.maxmemory
= atoi(argv
[1]);
1043 } else if (!strcasecmp(argv
[0],"slaveof") && argc
== 3) {
1044 server
.masterhost
= sdsnew(argv
[1]);
1045 server
.masterport
= atoi(argv
[2]);
1046 server
.replstate
= REDIS_REPL_CONNECT
;
1047 } else if (!strcasecmp(argv
[0],"glueoutputbuf") && argc
== 2) {
1048 if ((server
.glueoutputbuf
= yesnotoi(argv
[1])) == -1) {
1049 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1051 } else if (!strcasecmp(argv
[0],"shareobjects") && argc
== 2) {
1052 if ((server
.shareobjects
= yesnotoi(argv
[1])) == -1) {
1053 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1055 } else if (!strcasecmp(argv
[0],"shareobjectspoolsize") && argc
== 2) {
1056 server
.sharingpoolsize
= atoi(argv
[1]);
1057 if (server
.sharingpoolsize
< 1) {
1058 err
= "invalid object sharing pool size"; goto loaderr
;
1060 } else if (!strcasecmp(argv
[0],"daemonize") && argc
== 2) {
1061 if ((server
.daemonize
= yesnotoi(argv
[1])) == -1) {
1062 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1064 } else if (!strcasecmp(argv
[0],"requirepass") && argc
== 2) {
1065 server
.requirepass
= zstrdup(argv
[1]);
1066 } else if (!strcasecmp(argv
[0],"pidfile") && argc
== 2) {
1067 server
.pidfile
= zstrdup(argv
[1]);
1068 } else if (!strcasecmp(argv
[0],"dbfilename") && argc
== 2) {
1069 server
.dbfilename
= zstrdup(argv
[1]);
1071 err
= "Bad directive or wrong number of arguments"; goto loaderr
;
1073 for (j
= 0; j
< argc
; j
++)
1082 fprintf(stderr
, "\n*** FATAL CONFIG FILE ERROR ***\n");
1083 fprintf(stderr
, "Reading the configuration file, at line %d\n", linenum
);
1084 fprintf(stderr
, ">>> '%s'\n", line
);
1085 fprintf(stderr
, "%s\n", err
);
1089 static void freeClientArgv(redisClient
*c
) {
1092 for (j
= 0; j
< c
->argc
; j
++)
1093 decrRefCount(c
->argv
[j
]);
1097 static void freeClient(redisClient
*c
) {
1100 aeDeleteFileEvent(server
.el
,c
->fd
,AE_READABLE
);
1101 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1102 sdsfree(c
->querybuf
);
1103 listRelease(c
->reply
);
1106 ln
= listSearchKey(server
.clients
,c
);
1108 listDelNode(server
.clients
,ln
);
1109 if (c
->flags
& REDIS_SLAVE
) {
1110 if (c
->replstate
== REDIS_REPL_SEND_BULK
&& c
->repldbfd
!= -1)
1112 list
*l
= (c
->flags
& REDIS_MONITOR
) ? server
.monitors
: server
.slaves
;
1113 ln
= listSearchKey(l
,c
);
1117 if (c
->flags
& REDIS_MASTER
) {
1118 server
.master
= NULL
;
1119 server
.replstate
= REDIS_REPL_CONNECT
;
1125 static void glueReplyBuffersIfNeeded(redisClient
*c
) {
1130 listRewind(c
->reply
);
1131 while((ln
= listYield(c
->reply
))) {
1133 totlen
+= sdslen(o
->ptr
);
1134 /* This optimization makes more sense if we don't have to copy
1136 if (totlen
> 1024) return;
1142 listRewind(c
->reply
);
1143 while((ln
= listYield(c
->reply
))) {
1145 memcpy(buf
+copylen
,o
->ptr
,sdslen(o
->ptr
));
1146 copylen
+= sdslen(o
->ptr
);
1147 listDelNode(c
->reply
,ln
);
1149 /* Now the output buffer is empty, add the new single element */
1150 o
= createObject(REDIS_STRING
,sdsnewlen(buf
,totlen
));
1151 if (!listAddNodeTail(c
->reply
,o
)) oom("listAddNodeTail");
1155 static void sendReplyToClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1156 redisClient
*c
= privdata
;
1157 int nwritten
= 0, totwritten
= 0, objlen
;
1160 REDIS_NOTUSED(mask
);
1162 if (server
.glueoutputbuf
&& listLength(c
->reply
) > 1)
1163 glueReplyBuffersIfNeeded(c
);
1164 while(listLength(c
->reply
)) {
1165 o
= listNodeValue(listFirst(c
->reply
));
1166 objlen
= sdslen(o
->ptr
);
1169 listDelNode(c
->reply
,listFirst(c
->reply
));
1173 if (c
->flags
& REDIS_MASTER
) {
1174 nwritten
= objlen
- c
->sentlen
;
1176 nwritten
= write(fd
, ((char*)o
->ptr
)+c
->sentlen
, objlen
- c
->sentlen
);
1177 if (nwritten
<= 0) break;
1179 c
->sentlen
+= nwritten
;
1180 totwritten
+= nwritten
;
1181 /* If we fully sent the object on head go to the next one */
1182 if (c
->sentlen
== objlen
) {
1183 listDelNode(c
->reply
,listFirst(c
->reply
));
1187 if (nwritten
== -1) {
1188 if (errno
== EAGAIN
) {
1191 redisLog(REDIS_DEBUG
,
1192 "Error writing to client: %s", strerror(errno
));
1197 if (totwritten
> 0) c
->lastinteraction
= time(NULL
);
1198 if (listLength(c
->reply
) == 0) {
1200 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1204 static struct redisCommand
*lookupCommand(char *name
) {
1206 while(cmdTable
[j
].name
!= NULL
) {
1207 if (!strcasecmp(name
,cmdTable
[j
].name
)) return &cmdTable
[j
];
1213 /* resetClient prepare the client to process the next command */
1214 static void resetClient(redisClient
*c
) {
1219 /* If this function gets called we already read a whole
1220 * command, argments are in the client argv/argc fields.
1221 * processCommand() execute the command or prepare the
1222 * server for a bulk read from the client.
1224 * If 1 is returned the client is still alive and valid and
1225 * and other operations can be performed by the caller. Otherwise
1226 * if 0 is returned the client was destroied (i.e. after QUIT). */
1227 static int processCommand(redisClient
*c
) {
1228 struct redisCommand
*cmd
;
1231 /* Free some memory if needed (maxmemory setting) */
1232 if (server
.maxmemory
) freeMemoryIfNeeded();
1234 /* The QUIT command is handled as a special case. Normal command
1235 * procs are unable to close the client connection safely */
1236 if (!strcasecmp(c
->argv
[0]->ptr
,"quit")) {
1240 cmd
= lookupCommand(c
->argv
[0]->ptr
);
1242 addReplySds(c
,sdsnew("-ERR unknown command\r\n"));
1245 } else if ((cmd
->arity
> 0 && cmd
->arity
!= c
->argc
) ||
1246 (c
->argc
< -cmd
->arity
)) {
1247 addReplySds(c
,sdsnew("-ERR wrong number of arguments\r\n"));
1250 } else if (server
.maxmemory
&& cmd
->flags
& REDIS_CMD_DENYOOM
&& zmalloc_used_memory() > server
.maxmemory
) {
1251 addReplySds(c
,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1254 } else if (cmd
->flags
& REDIS_CMD_BULK
&& c
->bulklen
== -1) {
1255 int bulklen
= atoi(c
->argv
[c
->argc
-1]->ptr
);
1257 decrRefCount(c
->argv
[c
->argc
-1]);
1258 if (bulklen
< 0 || bulklen
> 1024*1024*1024) {
1260 addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n"));
1265 c
->bulklen
= bulklen
+2; /* add two bytes for CR+LF */
1266 /* It is possible that the bulk read is already in the
1267 * buffer. Check this condition and handle it accordingly */
1268 if ((signed)sdslen(c
->querybuf
) >= c
->bulklen
) {
1269 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
1271 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
1276 /* Let's try to share objects on the command arguments vector */
1277 if (server
.shareobjects
) {
1279 for(j
= 1; j
< c
->argc
; j
++)
1280 c
->argv
[j
] = tryObjectSharing(c
->argv
[j
]);
1282 /* Check if the user is authenticated */
1283 if (server
.requirepass
&& !c
->authenticated
&& cmd
->proc
!= authCommand
) {
1284 addReplySds(c
,sdsnew("-ERR operation not permitted\r\n"));
1289 /* Exec the command */
1290 dirty
= server
.dirty
;
1292 if (server
.dirty
-dirty
!= 0 && listLength(server
.slaves
))
1293 replicationFeedSlaves(server
.slaves
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1294 if (listLength(server
.monitors
))
1295 replicationFeedSlaves(server
.monitors
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1296 server
.stat_numcommands
++;
1298 /* Prepare the client for the next command */
1299 if (c
->flags
& REDIS_CLOSE
) {
1307 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
) {
1311 /* (args*2)+1 is enough room for args, spaces, newlines */
1312 robj
*static_outv
[REDIS_STATIC_ARGS
*2+1];
1314 if (argc
<= REDIS_STATIC_ARGS
) {
1317 outv
= zmalloc(sizeof(robj
*)*(argc
*2+1));
1318 if (!outv
) oom("replicationFeedSlaves");
1321 for (j
= 0; j
< argc
; j
++) {
1322 if (j
!= 0) outv
[outc
++] = shared
.space
;
1323 if ((cmd
->flags
& REDIS_CMD_BULK
) && j
== argc
-1) {
1326 lenobj
= createObject(REDIS_STRING
,
1327 sdscatprintf(sdsempty(),"%d\r\n",sdslen(argv
[j
]->ptr
)));
1328 lenobj
->refcount
= 0;
1329 outv
[outc
++] = lenobj
;
1331 outv
[outc
++] = argv
[j
];
1333 outv
[outc
++] = shared
.crlf
;
1335 /* Increment all the refcounts at start and decrement at end in order to
1336 * be sure to free objects if there is no slave in a replication state
1337 * able to be feed with commands */
1338 for (j
= 0; j
< outc
; j
++) incrRefCount(outv
[j
]);
1340 while((ln
= listYield(slaves
))) {
1341 redisClient
*slave
= ln
->value
;
1343 /* Don't feed slaves that are still waiting for BGSAVE to start */
1344 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) continue;
1346 /* Feed all the other slaves, MONITORs and so on */
1347 if (slave
->slaveseldb
!= dictid
) {
1351 case 0: selectcmd
= shared
.select0
; break;
1352 case 1: selectcmd
= shared
.select1
; break;
1353 case 2: selectcmd
= shared
.select2
; break;
1354 case 3: selectcmd
= shared
.select3
; break;
1355 case 4: selectcmd
= shared
.select4
; break;
1356 case 5: selectcmd
= shared
.select5
; break;
1357 case 6: selectcmd
= shared
.select6
; break;
1358 case 7: selectcmd
= shared
.select7
; break;
1359 case 8: selectcmd
= shared
.select8
; break;
1360 case 9: selectcmd
= shared
.select9
; break;
1362 selectcmd
= createObject(REDIS_STRING
,
1363 sdscatprintf(sdsempty(),"select %d\r\n",dictid
));
1364 selectcmd
->refcount
= 0;
1367 addReply(slave
,selectcmd
);
1368 slave
->slaveseldb
= dictid
;
1370 for (j
= 0; j
< outc
; j
++) addReply(slave
,outv
[j
]);
1372 for (j
= 0; j
< outc
; j
++) decrRefCount(outv
[j
]);
1373 if (outv
!= static_outv
) zfree(outv
);
1376 static void readQueryFromClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1377 redisClient
*c
= (redisClient
*) privdata
;
1378 char buf
[REDIS_IOBUF_LEN
];
1381 REDIS_NOTUSED(mask
);
1383 nread
= read(fd
, buf
, REDIS_IOBUF_LEN
);
1385 if (errno
== EAGAIN
) {
1388 redisLog(REDIS_DEBUG
, "Reading from client: %s",strerror(errno
));
1392 } else if (nread
== 0) {
1393 redisLog(REDIS_DEBUG
, "Client closed connection");
1398 c
->querybuf
= sdscatlen(c
->querybuf
, buf
, nread
);
1399 c
->lastinteraction
= time(NULL
);
1405 if (c
->bulklen
== -1) {
1406 /* Read the first line of the query */
1407 char *p
= strchr(c
->querybuf
,'\n');
1413 query
= c
->querybuf
;
1414 c
->querybuf
= sdsempty();
1415 querylen
= 1+(p
-(query
));
1416 if (sdslen(query
) > querylen
) {
1417 /* leave data after the first line of the query in the buffer */
1418 c
->querybuf
= sdscatlen(c
->querybuf
,query
+querylen
,sdslen(query
)-querylen
);
1420 *p
= '\0'; /* remove "\n" */
1421 if (*(p
-1) == '\r') *(p
-1) = '\0'; /* and "\r" if any */
1422 sdsupdatelen(query
);
1424 /* Now we can split the query in arguments */
1425 if (sdslen(query
) == 0) {
1426 /* Ignore empty query */
1430 argv
= sdssplitlen(query
,sdslen(query
)," ",1,&argc
);
1431 if (argv
== NULL
) oom("sdssplitlen");
1434 if (c
->argv
) zfree(c
->argv
);
1435 c
->argv
= zmalloc(sizeof(robj
*)*argc
);
1436 if (c
->argv
== NULL
) oom("allocating arguments list for client");
1438 for (j
= 0; j
< argc
; j
++) {
1439 if (sdslen(argv
[j
])) {
1440 c
->argv
[c
->argc
] = createObject(REDIS_STRING
,argv
[j
]);
1447 /* Execute the command. If the client is still valid
1448 * after processCommand() return and there is something
1449 * on the query buffer try to process the next command. */
1450 if (processCommand(c
) && sdslen(c
->querybuf
)) goto again
;
1452 } else if (sdslen(c
->querybuf
) >= 1024*32) {
1453 redisLog(REDIS_DEBUG
, "Client protocol error");
1458 /* Bulk read handling. Note that if we are at this point
1459 the client already sent a command terminated with a newline,
1460 we are reading the bulk data that is actually the last
1461 argument of the command. */
1462 int qbl
= sdslen(c
->querybuf
);
1464 if (c
->bulklen
<= qbl
) {
1465 /* Copy everything but the final CRLF as final argument */
1466 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
1468 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
1475 static int selectDb(redisClient
*c
, int id
) {
1476 if (id
< 0 || id
>= server
.dbnum
)
1478 c
->db
= &server
.db
[id
];
1482 static void *dupClientReplyValue(void *o
) {
1483 incrRefCount((robj
*)o
);
1487 static redisClient
*createClient(int fd
) {
1488 redisClient
*c
= zmalloc(sizeof(*c
));
1490 anetNonBlock(NULL
,fd
);
1491 anetTcpNoDelay(NULL
,fd
);
1492 if (!c
) return NULL
;
1495 c
->querybuf
= sdsempty();
1501 c
->lastinteraction
= time(NULL
);
1502 c
->authenticated
= 0;
1503 c
->replstate
= REDIS_REPL_NONE
;
1504 if ((c
->reply
= listCreate()) == NULL
) oom("listCreate");
1505 listSetFreeMethod(c
->reply
,decrRefCount
);
1506 listSetDupMethod(c
->reply
,dupClientReplyValue
);
1507 if (aeCreateFileEvent(server
.el
, c
->fd
, AE_READABLE
,
1508 readQueryFromClient
, c
, NULL
) == AE_ERR
) {
1512 if (!listAddNodeTail(server
.clients
,c
)) oom("listAddNodeTail");
1516 static void addReply(redisClient
*c
, robj
*obj
) {
1517 if (listLength(c
->reply
) == 0 &&
1518 (c
->replstate
== REDIS_REPL_NONE
||
1519 c
->replstate
== REDIS_REPL_ONLINE
) &&
1520 aeCreateFileEvent(server
.el
, c
->fd
, AE_WRITABLE
,
1521 sendReplyToClient
, c
, NULL
) == AE_ERR
) return;
1522 if (!listAddNodeTail(c
->reply
,obj
)) oom("listAddNodeTail");
1526 static void addReplySds(redisClient
*c
, sds s
) {
1527 robj
*o
= createObject(REDIS_STRING
,s
);
1532 static void acceptHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1537 REDIS_NOTUSED(mask
);
1538 REDIS_NOTUSED(privdata
);
1540 cfd
= anetAccept(server
.neterr
, fd
, cip
, &cport
);
1541 if (cfd
== AE_ERR
) {
1542 redisLog(REDIS_DEBUG
,"Accepting client connection: %s", server
.neterr
);
1545 redisLog(REDIS_DEBUG
,"Accepted %s:%d", cip
, cport
);
1546 if ((c
= createClient(cfd
)) == NULL
) {
1547 redisLog(REDIS_WARNING
,"Error allocating resoures for the client");
1548 close(cfd
); /* May be already closed, just ingore errors */
1551 /* If maxclient directive is set and this is one client more... close the
1552 * connection. Note that we create the client instead to check before
1553 * for this condition, since now the socket is already set in nonblocking
1554 * mode and we can send an error for free using the Kernel I/O */
1555 if (server
.maxclients
&& listLength(server
.clients
) > server
.maxclients
) {
1556 char *err
= "-ERR max number of clients reached\r\n";
1558 /* That's a best effort error message, don't check write errors */
1559 (void) write(c
->fd
,err
,strlen(err
));
1563 server
.stat_numconnections
++;
1566 /* ======================= Redis objects implementation ===================== */
1568 static robj
*createObject(int type
, void *ptr
) {
1571 if (listLength(server
.objfreelist
)) {
1572 listNode
*head
= listFirst(server
.objfreelist
);
1573 o
= listNodeValue(head
);
1574 listDelNode(server
.objfreelist
,head
);
1576 o
= zmalloc(sizeof(*o
));
1578 if (!o
) oom("createObject");
1585 static robj
*createStringObject(char *ptr
, size_t len
) {
1586 return createObject(REDIS_STRING
,sdsnewlen(ptr
,len
));
1589 static robj
*createListObject(void) {
1590 list
*l
= listCreate();
1592 if (!l
) oom("listCreate");
1593 listSetFreeMethod(l
,decrRefCount
);
1594 return createObject(REDIS_LIST
,l
);
1597 static robj
*createSetObject(void) {
1598 dict
*d
= dictCreate(&setDictType
,NULL
);
1599 if (!d
) oom("dictCreate");
1600 return createObject(REDIS_SET
,d
);
1603 static void freeStringObject(robj
*o
) {
1607 static void freeListObject(robj
*o
) {
1608 listRelease((list
*) o
->ptr
);
1611 static void freeSetObject(robj
*o
) {
1612 dictRelease((dict
*) o
->ptr
);
1615 static void freeHashObject(robj
*o
) {
1616 dictRelease((dict
*) o
->ptr
);
1619 static void incrRefCount(robj
*o
) {
1621 #ifdef DEBUG_REFCOUNT
1622 if (o
->type
== REDIS_STRING
)
1623 printf("Increment '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
);
1627 static void decrRefCount(void *obj
) {
1630 #ifdef DEBUG_REFCOUNT
1631 if (o
->type
== REDIS_STRING
)
1632 printf("Decrement '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
-1);
1634 if (--(o
->refcount
) == 0) {
1636 case REDIS_STRING
: freeStringObject(o
); break;
1637 case REDIS_LIST
: freeListObject(o
); break;
1638 case REDIS_SET
: freeSetObject(o
); break;
1639 case REDIS_HASH
: freeHashObject(o
); break;
1640 default: assert(0 != 0); break;
1642 if (listLength(server
.objfreelist
) > REDIS_OBJFREELIST_MAX
||
1643 !listAddNodeHead(server
.objfreelist
,o
))
1648 /* Try to share an object against the shared objects pool */
1649 static robj
*tryObjectSharing(robj
*o
) {
1650 struct dictEntry
*de
;
1653 if (o
== NULL
|| server
.shareobjects
== 0) return o
;
1655 assert(o
->type
== REDIS_STRING
);
1656 de
= dictFind(server
.sharingpool
,o
);
1658 robj
*shared
= dictGetEntryKey(de
);
1660 c
= ((unsigned long) dictGetEntryVal(de
))+1;
1661 dictGetEntryVal(de
) = (void*) c
;
1662 incrRefCount(shared
);
1666 /* Here we are using a stream algorihtm: Every time an object is
1667 * shared we increment its count, everytime there is a miss we
1668 * recrement the counter of a random object. If this object reaches
1669 * zero we remove the object and put the current object instead. */
1670 if (dictSize(server
.sharingpool
) >=
1671 server
.sharingpoolsize
) {
1672 de
= dictGetRandomKey(server
.sharingpool
);
1674 c
= ((unsigned long) dictGetEntryVal(de
))-1;
1675 dictGetEntryVal(de
) = (void*) c
;
1677 dictDelete(server
.sharingpool
,de
->key
);
1680 c
= 0; /* If the pool is empty we want to add this object */
1685 retval
= dictAdd(server
.sharingpool
,o
,(void*)1);
1686 assert(retval
== DICT_OK
);
1693 static robj
*lookupKey(redisDb
*db
, robj
*key
) {
1694 dictEntry
*de
= dictFind(db
->dict
,key
);
1695 return de
? dictGetEntryVal(de
) : NULL
;
1698 static robj
*lookupKeyRead(redisDb
*db
, robj
*key
) {
1699 expireIfNeeded(db
,key
);
1700 return lookupKey(db
,key
);
1703 static robj
*lookupKeyWrite(redisDb
*db
, robj
*key
) {
1704 deleteIfVolatile(db
,key
);
1705 return lookupKey(db
,key
);
1708 static int deleteKey(redisDb
*db
, robj
*key
) {
1711 /* We need to protect key from destruction: after the first dictDelete()
1712 * it may happen that 'key' is no longer valid if we don't increment
1713 * it's count. This may happen when we get the object reference directly
1714 * from the hash table with dictRandomKey() or dict iterators */
1716 if (dictSize(db
->expires
)) dictDelete(db
->expires
,key
);
1717 retval
= dictDelete(db
->dict
,key
);
1720 return retval
== DICT_OK
;
1723 /*============================ DB saving/loading ============================ */
1725 static int rdbSaveType(FILE *fp
, unsigned char type
) {
1726 if (fwrite(&type
,1,1,fp
) == 0) return -1;
1730 static int rdbSaveTime(FILE *fp
, time_t t
) {
1731 int32_t t32
= (int32_t) t
;
1732 if (fwrite(&t32
,4,1,fp
) == 0) return -1;
1736 /* check rdbLoadLen() comments for more info */
1737 static int rdbSaveLen(FILE *fp
, uint32_t len
) {
1738 unsigned char buf
[2];
1741 /* Save a 6 bit len */
1742 buf
[0] = (len
&0xFF)|(REDIS_RDB_6BITLEN
<<6);
1743 if (fwrite(buf
,1,1,fp
) == 0) return -1;
1744 } else if (len
< (1<<14)) {
1745 /* Save a 14 bit len */
1746 buf
[0] = ((len
>>8)&0xFF)|(REDIS_RDB_14BITLEN
<<6);
1748 if (fwrite(buf
,2,1,fp
) == 0) return -1;
1750 /* Save a 32 bit len */
1751 buf
[0] = (REDIS_RDB_32BITLEN
<<6);
1752 if (fwrite(buf
,1,1,fp
) == 0) return -1;
1754 if (fwrite(&len
,4,1,fp
) == 0) return -1;
1759 /* String objects in the form "2391" "-100" without any space and with a
1760 * range of values that can fit in an 8, 16 or 32 bit signed value can be
1761 * encoded as integers to save space */
1762 int rdbTryIntegerEncoding(sds s
, unsigned char *enc
) {
1764 char *endptr
, buf
[32];
1766 /* Check if it's possible to encode this value as a number */
1767 value
= strtoll(s
, &endptr
, 10);
1768 if (endptr
[0] != '\0') return 0;
1769 snprintf(buf
,32,"%lld",value
);
1771 /* If the number converted back into a string is not identical
1772 * then it's not possible to encode the string as integer */
1773 if (strlen(buf
) != sdslen(s
) || memcmp(buf
,s
,sdslen(s
))) return 0;
1775 /* Finally check if it fits in our ranges */
1776 if (value
>= -(1<<7) && value
<= (1<<7)-1) {
1777 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT8
;
1778 enc
[1] = value
&0xFF;
1780 } else if (value
>= -(1<<15) && value
<= (1<<15)-1) {
1781 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT16
;
1782 enc
[1] = value
&0xFF;
1783 enc
[2] = (value
>>8)&0xFF;
1785 } else if (value
>= -((long long)1<<31) && value
<= ((long long)1<<31)-1) {
1786 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT32
;
1787 enc
[1] = value
&0xFF;
1788 enc
[2] = (value
>>8)&0xFF;
1789 enc
[3] = (value
>>16)&0xFF;
1790 enc
[4] = (value
>>24)&0xFF;
1797 static int rdbSaveLzfStringObject(FILE *fp
, robj
*obj
) {
1798 unsigned int comprlen
, outlen
;
1802 /* We require at least four bytes compression for this to be worth it */
1803 outlen
= sdslen(obj
->ptr
)-4;
1804 if (outlen
<= 0) return 0;
1805 if ((out
= zmalloc(outlen
+1)) == NULL
) return 0;
1806 comprlen
= lzf_compress(obj
->ptr
, sdslen(obj
->ptr
), out
, outlen
);
1807 if (comprlen
== 0) {
1811 /* Data compressed! Let's save it on disk */
1812 byte
= (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_LZF
;
1813 if (fwrite(&byte
,1,1,fp
) == 0) goto writeerr
;
1814 if (rdbSaveLen(fp
,comprlen
) == -1) goto writeerr
;
1815 if (rdbSaveLen(fp
,sdslen(obj
->ptr
)) == -1) goto writeerr
;
1816 if (fwrite(out
,comprlen
,1,fp
) == 0) goto writeerr
;
1825 /* Save a string objet as [len][data] on disk. If the object is a string
1826 * representation of an integer value we try to safe it in a special form */
1827 static int rdbSaveStringObject(FILE *fp
, robj
*obj
) {
1828 size_t len
= sdslen(obj
->ptr
);
1831 /* Try integer encoding */
1833 unsigned char buf
[5];
1834 if ((enclen
= rdbTryIntegerEncoding(obj
->ptr
,buf
)) > 0) {
1835 if (fwrite(buf
,enclen
,1,fp
) == 0) return -1;
1840 /* Try LZF compression - under 20 bytes it's unable to compress even
1841 * aaaaaaaaaaaaaaaaaa so skip it */
1842 if (1 && len
> 20) {
1845 retval
= rdbSaveLzfStringObject(fp
,obj
);
1846 if (retval
== -1) return -1;
1847 if (retval
> 0) return 0;
1848 /* retval == 0 means data can't be compressed, save the old way */
1851 /* Store verbatim */
1852 if (rdbSaveLen(fp
,len
) == -1) return -1;
1853 if (len
&& fwrite(obj
->ptr
,len
,1,fp
) == 0) return -1;
1857 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
1858 static int rdbSave(char *filename
) {
1859 dictIterator
*di
= NULL
;
1864 time_t now
= time(NULL
);
1866 snprintf(tmpfile
,256,"temp-%d.%ld.rdb",(int)time(NULL
),(long int)random());
1867 fp
= fopen(tmpfile
,"w");
1869 redisLog(REDIS_WARNING
, "Failed saving the DB: %s", strerror(errno
));
1872 if (fwrite("REDIS0001",9,1,fp
) == 0) goto werr
;
1873 for (j
= 0; j
< server
.dbnum
; j
++) {
1874 redisDb
*db
= server
.db
+j
;
1876 if (dictSize(d
) == 0) continue;
1877 di
= dictGetIterator(d
);
1883 /* Write the SELECT DB opcode */
1884 if (rdbSaveType(fp
,REDIS_SELECTDB
) == -1) goto werr
;
1885 if (rdbSaveLen(fp
,j
) == -1) goto werr
;
1887 /* Iterate this DB writing every entry */
1888 while((de
= dictNext(di
)) != NULL
) {
1889 robj
*key
= dictGetEntryKey(de
);
1890 robj
*o
= dictGetEntryVal(de
);
1891 time_t expiretime
= getExpire(db
,key
);
1893 /* Save the expire time */
1894 if (expiretime
!= -1) {
1895 /* If this key is already expired skip it */
1896 if (expiretime
< now
) continue;
1897 if (rdbSaveType(fp
,REDIS_EXPIRETIME
) == -1) goto werr
;
1898 if (rdbSaveTime(fp
,expiretime
) == -1) goto werr
;
1900 /* Save the key and associated value */
1901 if (rdbSaveType(fp
,o
->type
) == -1) goto werr
;
1902 if (rdbSaveStringObject(fp
,key
) == -1) goto werr
;
1903 if (o
->type
== REDIS_STRING
) {
1904 /* Save a string value */
1905 if (rdbSaveStringObject(fp
,o
) == -1) goto werr
;
1906 } else if (o
->type
== REDIS_LIST
) {
1907 /* Save a list value */
1908 list
*list
= o
->ptr
;
1912 if (rdbSaveLen(fp
,listLength(list
)) == -1) goto werr
;
1913 while((ln
= listYield(list
))) {
1914 robj
*eleobj
= listNodeValue(ln
);
1916 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
1918 } else if (o
->type
== REDIS_SET
) {
1919 /* Save a set value */
1921 dictIterator
*di
= dictGetIterator(set
);
1924 if (!set
) oom("dictGetIteraotr");
1925 if (rdbSaveLen(fp
,dictSize(set
)) == -1) goto werr
;
1926 while((de
= dictNext(di
)) != NULL
) {
1927 robj
*eleobj
= dictGetEntryKey(de
);
1929 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
1931 dictReleaseIterator(di
);
1936 dictReleaseIterator(di
);
1939 if (rdbSaveType(fp
,REDIS_EOF
) == -1) goto werr
;
1941 /* Make sure data will not remain on the OS's output buffers */
1946 /* Use RENAME to make sure the DB file is changed atomically only
1947 * if the generate DB file is ok. */
1948 if (rename(tmpfile
,filename
) == -1) {
1949 redisLog(REDIS_WARNING
,"Error moving temp DB file on the final destionation: %s", strerror(errno
));
1953 redisLog(REDIS_NOTICE
,"DB saved on disk");
1955 server
.lastsave
= time(NULL
);
1961 redisLog(REDIS_WARNING
,"Write error saving DB on disk: %s", strerror(errno
));
1962 if (di
) dictReleaseIterator(di
);
1966 static int rdbSaveBackground(char *filename
) {
1969 if (server
.bgsaveinprogress
) return REDIS_ERR
;
1970 if ((childpid
= fork()) == 0) {
1973 if (rdbSave(filename
) == REDIS_OK
) {
1980 if (childpid
== -1) {
1981 redisLog(REDIS_WARNING
,"Can't save in background: fork: %s",
1985 redisLog(REDIS_NOTICE
,"Background saving started by pid %d",childpid
);
1986 server
.bgsaveinprogress
= 1;
1989 return REDIS_OK
; /* unreached */
1992 static int rdbLoadType(FILE *fp
) {
1994 if (fread(&type
,1,1,fp
) == 0) return -1;
1998 static time_t rdbLoadTime(FILE *fp
) {
2000 if (fread(&t32
,4,1,fp
) == 0) return -1;
2001 return (time_t) t32
;
2004 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2005 * of this file for a description of how this are stored on disk.
2007 * isencoded is set to 1 if the readed length is not actually a length but
2008 * an "encoding type", check the above comments for more info */
2009 static uint32_t rdbLoadLen(FILE *fp
, int rdbver
, int *isencoded
) {
2010 unsigned char buf
[2];
2013 if (isencoded
) *isencoded
= 0;
2015 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
2020 if (fread(buf
,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2021 type
= (buf
[0]&0xC0)>>6;
2022 if (type
== REDIS_RDB_6BITLEN
) {
2023 /* Read a 6 bit len */
2025 } else if (type
== REDIS_RDB_ENCVAL
) {
2026 /* Read a 6 bit len encoding type */
2027 if (isencoded
) *isencoded
= 1;
2029 } else if (type
== REDIS_RDB_14BITLEN
) {
2030 /* Read a 14 bit len */
2031 if (fread(buf
+1,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2032 return ((buf
[0]&0x3F)<<8)|buf
[1];
2034 /* Read a 32 bit len */
2035 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
2041 static robj
*rdbLoadIntegerObject(FILE *fp
, int enctype
) {
2042 unsigned char enc
[4];
2045 if (enctype
== REDIS_RDB_ENC_INT8
) {
2046 if (fread(enc
,1,1,fp
) == 0) return NULL
;
2047 val
= (signed char)enc
[0];
2048 } else if (enctype
== REDIS_RDB_ENC_INT16
) {
2050 if (fread(enc
,2,1,fp
) == 0) return NULL
;
2051 v
= enc
[0]|(enc
[1]<<8);
2053 } else if (enctype
== REDIS_RDB_ENC_INT32
) {
2055 if (fread(enc
,4,1,fp
) == 0) return NULL
;
2056 v
= enc
[0]|(enc
[1]<<8)|(enc
[2]<<16)|(enc
[3]<<24);
2059 val
= 0; /* anti-warning */
2062 return createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",val
));
2065 static robj
*rdbLoadLzfStringObject(FILE*fp
, int rdbver
) {
2066 unsigned int len
, clen
;
2067 unsigned char *c
= NULL
;
2070 if ((clen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2071 if ((len
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2072 if ((c
= zmalloc(clen
)) == NULL
) goto err
;
2073 if ((val
= sdsnewlen(NULL
,len
)) == NULL
) goto err
;
2074 if (fread(c
,clen
,1,fp
) == 0) goto err
;
2075 if (lzf_decompress(c
,clen
,val
,len
) == 0) goto err
;
2077 return createObject(REDIS_STRING
,val
);
2084 static robj
*rdbLoadStringObject(FILE*fp
, int rdbver
) {
2089 len
= rdbLoadLen(fp
,rdbver
,&isencoded
);
2092 case REDIS_RDB_ENC_INT8
:
2093 case REDIS_RDB_ENC_INT16
:
2094 case REDIS_RDB_ENC_INT32
:
2095 return tryObjectSharing(rdbLoadIntegerObject(fp
,len
));
2096 case REDIS_RDB_ENC_LZF
:
2097 return tryObjectSharing(rdbLoadLzfStringObject(fp
,rdbver
));
2103 if (len
== REDIS_RDB_LENERR
) return NULL
;
2104 val
= sdsnewlen(NULL
,len
);
2105 if (len
&& fread(val
,len
,1,fp
) == 0) {
2109 return tryObjectSharing(createObject(REDIS_STRING
,val
));
2112 static int rdbLoad(char *filename
) {
2114 robj
*keyobj
= NULL
;
2116 int type
, retval
, rdbver
;
2117 dict
*d
= server
.db
[0].dict
;
2118 redisDb
*db
= server
.db
+0;
2120 time_t expiretime
= -1, now
= time(NULL
);
2122 fp
= fopen(filename
,"r");
2123 if (!fp
) return REDIS_ERR
;
2124 if (fread(buf
,9,1,fp
) == 0) goto eoferr
;
2126 if (memcmp(buf
,"REDIS",5) != 0) {
2128 redisLog(REDIS_WARNING
,"Wrong signature trying to load DB from file");
2131 rdbver
= atoi(buf
+5);
2134 redisLog(REDIS_WARNING
,"Can't handle RDB format version %d",rdbver
);
2141 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
2142 if (type
== REDIS_EXPIRETIME
) {
2143 if ((expiretime
= rdbLoadTime(fp
)) == -1) goto eoferr
;
2144 /* We read the time so we need to read the object type again */
2145 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
2147 if (type
== REDIS_EOF
) break;
2148 /* Handle SELECT DB opcode as a special case */
2149 if (type
== REDIS_SELECTDB
) {
2150 if ((dbid
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2152 if (dbid
>= (unsigned)server
.dbnum
) {
2153 redisLog(REDIS_WARNING
,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server
.dbnum
);
2156 db
= server
.db
+dbid
;
2161 if ((keyobj
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2163 if (type
== REDIS_STRING
) {
2164 /* Read string value */
2165 if ((o
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2166 } else if (type
== REDIS_LIST
|| type
== REDIS_SET
) {
2167 /* Read list/set value */
2170 if ((listlen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2172 o
= (type
== REDIS_LIST
) ? createListObject() : createSetObject();
2173 /* Load every single element of the list/set */
2177 if ((ele
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2178 if (type
== REDIS_LIST
) {
2179 if (!listAddNodeTail((list
*)o
->ptr
,ele
))
2180 oom("listAddNodeTail");
2182 if (dictAdd((dict
*)o
->ptr
,ele
,NULL
) == DICT_ERR
)
2189 /* Add the new object in the hash table */
2190 retval
= dictAdd(d
,keyobj
,o
);
2191 if (retval
== DICT_ERR
) {
2192 redisLog(REDIS_WARNING
,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj
->ptr
);
2195 /* Set the expire time if needed */
2196 if (expiretime
!= -1) {
2197 setExpire(db
,keyobj
,expiretime
);
2198 /* Delete this key if already expired */
2199 if (expiretime
< now
) deleteKey(db
,keyobj
);
2207 eoferr
: /* unexpected end of file is handled here with a fatal exit */
2208 if (keyobj
) decrRefCount(keyobj
);
2209 redisLog(REDIS_WARNING
,"Short read or OOM loading DB. Unrecoverable error, exiting now.");
2211 return REDIS_ERR
; /* Just to avoid warning */
2214 /*================================== Commands =============================== */
2216 static void authCommand(redisClient
*c
) {
2217 if (!server
.requirepass
|| !strcmp(c
->argv
[1]->ptr
, server
.requirepass
)) {
2218 c
->authenticated
= 1;
2219 addReply(c
,shared
.ok
);
2221 c
->authenticated
= 0;
2222 addReply(c
,shared
.err
);
2226 static void pingCommand(redisClient
*c
) {
2227 addReply(c
,shared
.pong
);
2230 static void echoCommand(redisClient
*c
) {
2231 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",
2232 (int)sdslen(c
->argv
[1]->ptr
)));
2233 addReply(c
,c
->argv
[1]);
2234 addReply(c
,shared
.crlf
);
2237 /*=================================== Strings =============================== */
2239 static void setGenericCommand(redisClient
*c
, int nx
) {
2242 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2243 if (retval
== DICT_ERR
) {
2245 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2246 incrRefCount(c
->argv
[2]);
2248 addReply(c
,shared
.czero
);
2252 incrRefCount(c
->argv
[1]);
2253 incrRefCount(c
->argv
[2]);
2256 removeExpire(c
->db
,c
->argv
[1]);
2257 addReply(c
, nx
? shared
.cone
: shared
.ok
);
2260 static void setCommand(redisClient
*c
) {
2261 setGenericCommand(c
,0);
2264 static void setnxCommand(redisClient
*c
) {
2265 setGenericCommand(c
,1);
2268 static void getCommand(redisClient
*c
) {
2269 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2272 addReply(c
,shared
.nullbulk
);
2274 if (o
->type
!= REDIS_STRING
) {
2275 addReply(c
,shared
.wrongtypeerr
);
2277 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(o
->ptr
)));
2279 addReply(c
,shared
.crlf
);
2284 static void getSetCommand(redisClient
*c
) {
2286 if (dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]) == DICT_ERR
) {
2287 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2289 incrRefCount(c
->argv
[1]);
2291 incrRefCount(c
->argv
[2]);
2293 removeExpire(c
->db
,c
->argv
[1]);
2296 static void mgetCommand(redisClient
*c
) {
2299 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",c
->argc
-1));
2300 for (j
= 1; j
< c
->argc
; j
++) {
2301 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[j
]);
2303 addReply(c
,shared
.nullbulk
);
2305 if (o
->type
!= REDIS_STRING
) {
2306 addReply(c
,shared
.nullbulk
);
2308 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(o
->ptr
)));
2310 addReply(c
,shared
.crlf
);
2316 static void incrDecrCommand(redisClient
*c
, long long incr
) {
2321 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2325 if (o
->type
!= REDIS_STRING
) {
2330 value
= strtoll(o
->ptr
, &eptr
, 10);
2335 o
= createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",value
));
2336 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],o
);
2337 if (retval
== DICT_ERR
) {
2338 dictReplace(c
->db
->dict
,c
->argv
[1],o
);
2339 removeExpire(c
->db
,c
->argv
[1]);
2341 incrRefCount(c
->argv
[1]);
2344 addReply(c
,shared
.colon
);
2346 addReply(c
,shared
.crlf
);
2349 static void incrCommand(redisClient
*c
) {
2350 incrDecrCommand(c
,1);
2353 static void decrCommand(redisClient
*c
) {
2354 incrDecrCommand(c
,-1);
2357 static void incrbyCommand(redisClient
*c
) {
2358 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
2359 incrDecrCommand(c
,incr
);
2362 static void decrbyCommand(redisClient
*c
) {
2363 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
2364 incrDecrCommand(c
,-incr
);
2367 /* ========================= Type agnostic commands ========================= */
2369 static void delCommand(redisClient
*c
) {
2372 for (j
= 1; j
< c
->argc
; j
++) {
2373 if (deleteKey(c
->db
,c
->argv
[j
])) {
2380 addReply(c
,shared
.czero
);
2383 addReply(c
,shared
.cone
);
2386 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",deleted
));
2391 static void existsCommand(redisClient
*c
) {
2392 addReply(c
,lookupKeyRead(c
->db
,c
->argv
[1]) ? shared
.cone
: shared
.czero
);
2395 static void selectCommand(redisClient
*c
) {
2396 int id
= atoi(c
->argv
[1]->ptr
);
2398 if (selectDb(c
,id
) == REDIS_ERR
) {
2399 addReplySds(c
,sdsnew("-ERR invalid DB index\r\n"));
2401 addReply(c
,shared
.ok
);
2405 static void randomkeyCommand(redisClient
*c
) {
2409 de
= dictGetRandomKey(c
->db
->dict
);
2410 if (!de
|| expireIfNeeded(c
->db
,dictGetEntryKey(de
)) == 0) break;
2413 addReply(c
,shared
.plus
);
2414 addReply(c
,shared
.crlf
);
2416 addReply(c
,shared
.plus
);
2417 addReply(c
,dictGetEntryKey(de
));
2418 addReply(c
,shared
.crlf
);
2422 static void keysCommand(redisClient
*c
) {
2425 sds pattern
= c
->argv
[1]->ptr
;
2426 int plen
= sdslen(pattern
);
2427 int numkeys
= 0, keyslen
= 0;
2428 robj
*lenobj
= createObject(REDIS_STRING
,NULL
);
2430 di
= dictGetIterator(c
->db
->dict
);
2431 if (!di
) oom("dictGetIterator");
2433 decrRefCount(lenobj
);
2434 while((de
= dictNext(di
)) != NULL
) {
2435 robj
*keyobj
= dictGetEntryKey(de
);
2437 sds key
= keyobj
->ptr
;
2438 if ((pattern
[0] == '*' && pattern
[1] == '\0') ||
2439 stringmatchlen(pattern
,plen
,key
,sdslen(key
),0)) {
2440 if (expireIfNeeded(c
->db
,keyobj
) == 0) {
2442 addReply(c
,shared
.space
);
2445 keyslen
+= sdslen(key
);
2449 dictReleaseIterator(di
);
2450 lenobj
->ptr
= sdscatprintf(sdsempty(),"$%lu\r\n",keyslen
+(numkeys
? (numkeys
-1) : 0));
2451 addReply(c
,shared
.crlf
);
2454 static void dbsizeCommand(redisClient
*c
) {
2456 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c
->db
->dict
)));
2459 static void lastsaveCommand(redisClient
*c
) {
2461 sdscatprintf(sdsempty(),":%lu\r\n",server
.lastsave
));
2464 static void typeCommand(redisClient
*c
) {
2468 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2473 case REDIS_STRING
: type
= "+string"; break;
2474 case REDIS_LIST
: type
= "+list"; break;
2475 case REDIS_SET
: type
= "+set"; break;
2476 default: type
= "unknown"; break;
2479 addReplySds(c
,sdsnew(type
));
2480 addReply(c
,shared
.crlf
);
2483 static void saveCommand(redisClient
*c
) {
2484 if (server
.bgsaveinprogress
) {
2485 addReplySds(c
,sdsnew("-ERR background save in progress\r\n"));
2488 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
2489 addReply(c
,shared
.ok
);
2491 addReply(c
,shared
.err
);
2495 static void bgsaveCommand(redisClient
*c
) {
2496 if (server
.bgsaveinprogress
) {
2497 addReplySds(c
,sdsnew("-ERR background save already in progress\r\n"));
2500 if (rdbSaveBackground(server
.dbfilename
) == REDIS_OK
) {
2501 addReply(c
,shared
.ok
);
2503 addReply(c
,shared
.err
);
2507 static void shutdownCommand(redisClient
*c
) {
2508 redisLog(REDIS_WARNING
,"User requested shutdown, saving DB...");
2509 /* XXX: TODO kill the child if there is a bgsave in progress */
2510 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
2511 if (server
.daemonize
) {
2512 unlink(server
.pidfile
);
2514 redisLog(REDIS_WARNING
,"%zu bytes used at exit",zmalloc_used_memory());
2515 redisLog(REDIS_WARNING
,"Server exit now, bye bye...");
2518 redisLog(REDIS_WARNING
,"Error trying to save the DB, can't exit");
2519 addReplySds(c
,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
2523 static void renameGenericCommand(redisClient
*c
, int nx
) {
2526 /* To use the same key as src and dst is probably an error */
2527 if (sdscmp(c
->argv
[1]->ptr
,c
->argv
[2]->ptr
) == 0) {
2528 addReply(c
,shared
.sameobjecterr
);
2532 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2534 addReply(c
,shared
.nokeyerr
);
2538 deleteIfVolatile(c
->db
,c
->argv
[2]);
2539 if (dictAdd(c
->db
->dict
,c
->argv
[2],o
) == DICT_ERR
) {
2542 addReply(c
,shared
.czero
);
2545 dictReplace(c
->db
->dict
,c
->argv
[2],o
);
2547 incrRefCount(c
->argv
[2]);
2549 deleteKey(c
->db
,c
->argv
[1]);
2551 addReply(c
,nx
? shared
.cone
: shared
.ok
);
2554 static void renameCommand(redisClient
*c
) {
2555 renameGenericCommand(c
,0);
2558 static void renamenxCommand(redisClient
*c
) {
2559 renameGenericCommand(c
,1);
2562 static void moveCommand(redisClient
*c
) {
2567 /* Obtain source and target DB pointers */
2570 if (selectDb(c
,atoi(c
->argv
[2]->ptr
)) == REDIS_ERR
) {
2571 addReply(c
,shared
.outofrangeerr
);
2575 selectDb(c
,srcid
); /* Back to the source DB */
2577 /* If the user is moving using as target the same
2578 * DB as the source DB it is probably an error. */
2580 addReply(c
,shared
.sameobjecterr
);
2584 /* Check if the element exists and get a reference */
2585 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2587 addReply(c
,shared
.czero
);
2591 /* Try to add the element to the target DB */
2592 deleteIfVolatile(dst
,c
->argv
[1]);
2593 if (dictAdd(dst
->dict
,c
->argv
[1],o
) == DICT_ERR
) {
2594 addReply(c
,shared
.czero
);
2597 incrRefCount(c
->argv
[1]);
2600 /* OK! key moved, free the entry in the source DB */
2601 deleteKey(src
,c
->argv
[1]);
2603 addReply(c
,shared
.cone
);
2606 /* =================================== Lists ================================ */
2607 static void pushGenericCommand(redisClient
*c
, int where
) {
2611 lobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2613 lobj
= createListObject();
2615 if (where
== REDIS_HEAD
) {
2616 if (!listAddNodeHead(list
,c
->argv
[2])) oom("listAddNodeHead");
2618 if (!listAddNodeTail(list
,c
->argv
[2])) oom("listAddNodeTail");
2620 dictAdd(c
->db
->dict
,c
->argv
[1],lobj
);
2621 incrRefCount(c
->argv
[1]);
2622 incrRefCount(c
->argv
[2]);
2624 if (lobj
->type
!= REDIS_LIST
) {
2625 addReply(c
,shared
.wrongtypeerr
);
2629 if (where
== REDIS_HEAD
) {
2630 if (!listAddNodeHead(list
,c
->argv
[2])) oom("listAddNodeHead");
2632 if (!listAddNodeTail(list
,c
->argv
[2])) oom("listAddNodeTail");
2634 incrRefCount(c
->argv
[2]);
2637 addReply(c
,shared
.ok
);
2640 static void lpushCommand(redisClient
*c
) {
2641 pushGenericCommand(c
,REDIS_HEAD
);
2644 static void rpushCommand(redisClient
*c
) {
2645 pushGenericCommand(c
,REDIS_TAIL
);
2648 static void llenCommand(redisClient
*c
) {
2652 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2654 addReply(c
,shared
.czero
);
2657 if (o
->type
!= REDIS_LIST
) {
2658 addReply(c
,shared
.wrongtypeerr
);
2661 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",listLength(l
)));
2666 static void lindexCommand(redisClient
*c
) {
2668 int index
= atoi(c
->argv
[2]->ptr
);
2670 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2672 addReply(c
,shared
.nullbulk
);
2674 if (o
->type
!= REDIS_LIST
) {
2675 addReply(c
,shared
.wrongtypeerr
);
2677 list
*list
= o
->ptr
;
2680 ln
= listIndex(list
, index
);
2682 addReply(c
,shared
.nullbulk
);
2684 robj
*ele
= listNodeValue(ln
);
2685 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele
->ptr
)));
2687 addReply(c
,shared
.crlf
);
2693 static void lsetCommand(redisClient
*c
) {
2695 int index
= atoi(c
->argv
[2]->ptr
);
2697 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2699 addReply(c
,shared
.nokeyerr
);
2701 if (o
->type
!= REDIS_LIST
) {
2702 addReply(c
,shared
.wrongtypeerr
);
2704 list
*list
= o
->ptr
;
2707 ln
= listIndex(list
, index
);
2709 addReply(c
,shared
.outofrangeerr
);
2711 robj
*ele
= listNodeValue(ln
);
2714 listNodeValue(ln
) = c
->argv
[3];
2715 incrRefCount(c
->argv
[3]);
2716 addReply(c
,shared
.ok
);
2723 static void popGenericCommand(redisClient
*c
, int where
) {
2726 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2728 addReply(c
,shared
.nullbulk
);
2730 if (o
->type
!= REDIS_LIST
) {
2731 addReply(c
,shared
.wrongtypeerr
);
2733 list
*list
= o
->ptr
;
2736 if (where
== REDIS_HEAD
)
2737 ln
= listFirst(list
);
2739 ln
= listLast(list
);
2742 addReply(c
,shared
.nullbulk
);
2744 robj
*ele
= listNodeValue(ln
);
2745 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele
->ptr
)));
2747 addReply(c
,shared
.crlf
);
2748 listDelNode(list
,ln
);
2755 static void lpopCommand(redisClient
*c
) {
2756 popGenericCommand(c
,REDIS_HEAD
);
2759 static void rpopCommand(redisClient
*c
) {
2760 popGenericCommand(c
,REDIS_TAIL
);
2763 static void lrangeCommand(redisClient
*c
) {
2765 int start
= atoi(c
->argv
[2]->ptr
);
2766 int end
= atoi(c
->argv
[3]->ptr
);
2768 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2770 addReply(c
,shared
.nullmultibulk
);
2772 if (o
->type
!= REDIS_LIST
) {
2773 addReply(c
,shared
.wrongtypeerr
);
2775 list
*list
= o
->ptr
;
2777 int llen
= listLength(list
);
2781 /* convert negative indexes */
2782 if (start
< 0) start
= llen
+start
;
2783 if (end
< 0) end
= llen
+end
;
2784 if (start
< 0) start
= 0;
2785 if (end
< 0) end
= 0;
2787 /* indexes sanity checks */
2788 if (start
> end
|| start
>= llen
) {
2789 /* Out of range start or start > end result in empty list */
2790 addReply(c
,shared
.emptymultibulk
);
2793 if (end
>= llen
) end
= llen
-1;
2794 rangelen
= (end
-start
)+1;
2796 /* Return the result in form of a multi-bulk reply */
2797 ln
= listIndex(list
, start
);
2798 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",rangelen
));
2799 for (j
= 0; j
< rangelen
; j
++) {
2800 ele
= listNodeValue(ln
);
2801 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele
->ptr
)));
2803 addReply(c
,shared
.crlf
);
2810 static void ltrimCommand(redisClient
*c
) {
2812 int start
= atoi(c
->argv
[2]->ptr
);
2813 int end
= atoi(c
->argv
[3]->ptr
);
2815 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2817 addReply(c
,shared
.nokeyerr
);
2819 if (o
->type
!= REDIS_LIST
) {
2820 addReply(c
,shared
.wrongtypeerr
);
2822 list
*list
= o
->ptr
;
2824 int llen
= listLength(list
);
2825 int j
, ltrim
, rtrim
;
2827 /* convert negative indexes */
2828 if (start
< 0) start
= llen
+start
;
2829 if (end
< 0) end
= llen
+end
;
2830 if (start
< 0) start
= 0;
2831 if (end
< 0) end
= 0;
2833 /* indexes sanity checks */
2834 if (start
> end
|| start
>= llen
) {
2835 /* Out of range start or start > end result in empty list */
2839 if (end
>= llen
) end
= llen
-1;
2844 /* Remove list elements to perform the trim */
2845 for (j
= 0; j
< ltrim
; j
++) {
2846 ln
= listFirst(list
);
2847 listDelNode(list
,ln
);
2849 for (j
= 0; j
< rtrim
; j
++) {
2850 ln
= listLast(list
);
2851 listDelNode(list
,ln
);
2853 addReply(c
,shared
.ok
);
2859 static void lremCommand(redisClient
*c
) {
2862 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2864 addReply(c
,shared
.nokeyerr
);
2866 if (o
->type
!= REDIS_LIST
) {
2867 addReply(c
,shared
.wrongtypeerr
);
2869 list
*list
= o
->ptr
;
2870 listNode
*ln
, *next
;
2871 int toremove
= atoi(c
->argv
[2]->ptr
);
2876 toremove
= -toremove
;
2879 ln
= fromtail
? list
->tail
: list
->head
;
2881 robj
*ele
= listNodeValue(ln
);
2883 next
= fromtail
? ln
->prev
: ln
->next
;
2884 if (sdscmp(ele
->ptr
,c
->argv
[3]->ptr
) == 0) {
2885 listDelNode(list
,ln
);
2888 if (toremove
&& removed
== toremove
) break;
2892 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",removed
));
2897 /* ==================================== Sets ================================ */
2899 static void saddCommand(redisClient
*c
) {
2902 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2904 set
= createSetObject();
2905 dictAdd(c
->db
->dict
,c
->argv
[1],set
);
2906 incrRefCount(c
->argv
[1]);
2908 if (set
->type
!= REDIS_SET
) {
2909 addReply(c
,shared
.wrongtypeerr
);
2913 if (dictAdd(set
->ptr
,c
->argv
[2],NULL
) == DICT_OK
) {
2914 incrRefCount(c
->argv
[2]);
2916 addReply(c
,shared
.cone
);
2918 addReply(c
,shared
.czero
);
2922 static void sremCommand(redisClient
*c
) {
2925 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2927 addReply(c
,shared
.czero
);
2929 if (set
->type
!= REDIS_SET
) {
2930 addReply(c
,shared
.wrongtypeerr
);
2933 if (dictDelete(set
->ptr
,c
->argv
[2]) == DICT_OK
) {
2935 addReply(c
,shared
.cone
);
2937 addReply(c
,shared
.czero
);
2942 static void smoveCommand(redisClient
*c
) {
2943 robj
*srcset
, *dstset
;
2945 srcset
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2946 dstset
= lookupKeyWrite(c
->db
,c
->argv
[2]);
2948 /* If the source key does not exist return 0, if it's of the wrong type
2950 if (srcset
== NULL
|| srcset
->type
!= REDIS_SET
) {
2951 addReply(c
, srcset
? shared
.wrongtypeerr
: shared
.czero
);
2954 /* Error if the destination key is not a set as well */
2955 if (dstset
&& dstset
->type
!= REDIS_SET
) {
2956 addReply(c
,shared
.wrongtypeerr
);
2959 /* Remove the element from the source set */
2960 if (dictDelete(srcset
->ptr
,c
->argv
[3]) == DICT_ERR
) {
2961 /* Key not found in the src set! return zero */
2962 addReply(c
,shared
.czero
);
2966 /* Add the element to the destination set */
2968 dstset
= createSetObject();
2969 dictAdd(c
->db
->dict
,c
->argv
[2],dstset
);
2970 incrRefCount(c
->argv
[2]);
2972 if (dictAdd(dstset
->ptr
,c
->argv
[3],NULL
) == DICT_OK
)
2973 incrRefCount(c
->argv
[3]);
2974 addReply(c
,shared
.cone
);
2977 static void sismemberCommand(redisClient
*c
) {
2980 set
= lookupKeyRead(c
->db
,c
->argv
[1]);
2982 addReply(c
,shared
.czero
);
2984 if (set
->type
!= REDIS_SET
) {
2985 addReply(c
,shared
.wrongtypeerr
);
2988 if (dictFind(set
->ptr
,c
->argv
[2]))
2989 addReply(c
,shared
.cone
);
2991 addReply(c
,shared
.czero
);
2995 static void scardCommand(redisClient
*c
) {
2999 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3001 addReply(c
,shared
.czero
);
3004 if (o
->type
!= REDIS_SET
) {
3005 addReply(c
,shared
.wrongtypeerr
);
3008 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",
3014 static int qsortCompareSetsByCardinality(const void *s1
, const void *s2
) {
3015 dict
**d1
= (void*) s1
, **d2
= (void*) s2
;
3017 return dictSize(*d1
)-dictSize(*d2
);
3020 static void sinterGenericCommand(redisClient
*c
, robj
**setskeys
, int setsnum
, robj
*dstkey
) {
3021 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
3024 robj
*lenobj
= NULL
, *dstset
= NULL
;
3025 int j
, cardinality
= 0;
3027 if (!dv
) oom("sinterGenericCommand");
3028 for (j
= 0; j
< setsnum
; j
++) {
3032 lookupKeyWrite(c
->db
,setskeys
[j
]) :
3033 lookupKeyRead(c
->db
,setskeys
[j
]);
3037 deleteKey(c
->db
,dstkey
);
3038 addReply(c
,shared
.ok
);
3040 addReply(c
,shared
.nullmultibulk
);
3044 if (setobj
->type
!= REDIS_SET
) {
3046 addReply(c
,shared
.wrongtypeerr
);
3049 dv
[j
] = setobj
->ptr
;
3051 /* Sort sets from the smallest to largest, this will improve our
3052 * algorithm's performace */
3053 qsort(dv
,setsnum
,sizeof(dict
*),qsortCompareSetsByCardinality
);
3055 /* The first thing we should output is the total number of elements...
3056 * since this is a multi-bulk write, but at this stage we don't know
3057 * the intersection set size, so we use a trick, append an empty object
3058 * to the output list and save the pointer to later modify it with the
3061 lenobj
= createObject(REDIS_STRING
,NULL
);
3063 decrRefCount(lenobj
);
3065 /* If we have a target key where to store the resulting set
3066 * create this key with an empty set inside */
3067 dstset
= createSetObject();
3070 /* Iterate all the elements of the first (smallest) set, and test
3071 * the element against all the other sets, if at least one set does
3072 * not include the element it is discarded */
3073 di
= dictGetIterator(dv
[0]);
3074 if (!di
) oom("dictGetIterator");
3076 while((de
= dictNext(di
)) != NULL
) {
3079 for (j
= 1; j
< setsnum
; j
++)
3080 if (dictFind(dv
[j
],dictGetEntryKey(de
)) == NULL
) break;
3082 continue; /* at least one set does not contain the member */
3083 ele
= dictGetEntryKey(de
);
3085 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(ele
->ptr
)));
3087 addReply(c
,shared
.crlf
);
3090 dictAdd(dstset
->ptr
,ele
,NULL
);
3094 dictReleaseIterator(di
);
3097 /* Store the resulting set into the target */
3098 deleteKey(c
->db
,dstkey
);
3099 dictAdd(c
->db
->dict
,dstkey
,dstset
);
3100 incrRefCount(dstkey
);
3104 lenobj
->ptr
= sdscatprintf(sdsempty(),"*%d\r\n",cardinality
);
3106 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",
3107 dictSize((dict
*)dstset
->ptr
)));
3113 static void sinterCommand(redisClient
*c
) {
3114 sinterGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
);
3117 static void sinterstoreCommand(redisClient
*c
) {
3118 sinterGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1]);
3121 #define REDIS_OP_UNION 0
3122 #define REDIS_OP_DIFF 1
3124 static void sunionDiffGenericCommand(redisClient
*c
, robj
**setskeys
, int setsnum
, robj
*dstkey
, int op
) {
3125 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
3128 robj
*dstset
= NULL
;
3129 int j
, cardinality
= 0;
3131 if (!dv
) oom("sunionDiffGenericCommand");
3132 for (j
= 0; j
< setsnum
; j
++) {
3136 lookupKeyWrite(c
->db
,setskeys
[j
]) :
3137 lookupKeyRead(c
->db
,setskeys
[j
]);
3142 if (setobj
->type
!= REDIS_SET
) {
3144 addReply(c
,shared
.wrongtypeerr
);
3147 dv
[j
] = setobj
->ptr
;
3150 /* We need a temp set object to store our union. If the dstkey
3151 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
3152 * this set object will be the resulting object to set into the target key*/
3153 dstset
= createSetObject();
3155 /* Iterate all the elements of all the sets, add every element a single
3156 * time to the result set */
3157 for (j
= 0; j
< setsnum
; j
++) {
3158 if (op
== REDIS_OP_DIFF
&& j
== 0 && !dv
[j
]) break; /* result set is empty */
3159 if (!dv
[j
]) continue; /* non existing keys are like empty sets */
3161 di
= dictGetIterator(dv
[j
]);
3162 if (!di
) oom("dictGetIterator");
3164 while((de
= dictNext(di
)) != NULL
) {
3167 /* dictAdd will not add the same element multiple times */
3168 ele
= dictGetEntryKey(de
);
3169 if (op
== REDIS_OP_UNION
|| j
== 0) {
3170 if (dictAdd(dstset
->ptr
,ele
,NULL
) == DICT_OK
) {
3174 } else if (op
== REDIS_OP_DIFF
) {
3175 if (dictDelete(dstset
->ptr
,ele
) == DICT_OK
) {
3180 dictReleaseIterator(di
);
3182 if (op
== REDIS_OP_DIFF
&& cardinality
== 0) break; /* result set is empty */
3185 /* Output the content of the resulting set, if not in STORE mode */
3187 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",cardinality
));
3188 di
= dictGetIterator(dstset
->ptr
);
3189 if (!di
) oom("dictGetIterator");
3190 while((de
= dictNext(di
)) != NULL
) {
3193 ele
= dictGetEntryKey(de
);
3194 addReplySds(c
,sdscatprintf(sdsempty(),
3195 "$%d\r\n",sdslen(ele
->ptr
)));
3197 addReply(c
,shared
.crlf
);
3199 dictReleaseIterator(di
);
3201 /* If we have a target key where to store the resulting set
3202 * create this key with the result set inside */
3203 deleteKey(c
->db
,dstkey
);
3204 dictAdd(c
->db
->dict
,dstkey
,dstset
);
3205 incrRefCount(dstkey
);
3210 decrRefCount(dstset
);
3212 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",
3213 dictSize((dict
*)dstset
->ptr
)));
3219 static void sunionCommand(redisClient
*c
) {
3220 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_UNION
);
3223 static void sunionstoreCommand(redisClient
*c
) {
3224 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_UNION
);
3227 static void sdiffCommand(redisClient
*c
) {
3228 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_DIFF
);
3231 static void sdiffstoreCommand(redisClient
*c
) {
3232 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_DIFF
);
3235 static void flushdbCommand(redisClient
*c
) {
3236 server
.dirty
+= dictSize(c
->db
->dict
);
3237 dictEmpty(c
->db
->dict
);
3238 dictEmpty(c
->db
->expires
);
3239 addReply(c
,shared
.ok
);
3242 static void flushallCommand(redisClient
*c
) {
3243 server
.dirty
+= emptyDb();
3244 addReply(c
,shared
.ok
);
3245 rdbSave(server
.dbfilename
);
3249 redisSortOperation
*createSortOperation(int type
, robj
*pattern
) {
3250 redisSortOperation
*so
= zmalloc(sizeof(*so
));
3251 if (!so
) oom("createSortOperation");
3253 so
->pattern
= pattern
;
3257 /* Return the value associated to the key with a name obtained
3258 * substituting the first occurence of '*' in 'pattern' with 'subst' */
3259 robj
*lookupKeyByPattern(redisDb
*db
, robj
*pattern
, robj
*subst
) {
3263 int prefixlen
, sublen
, postfixlen
;
3264 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
3268 char buf
[REDIS_SORTKEY_MAX
+1];
3271 spat
= pattern
->ptr
;
3273 if (sdslen(spat
)+sdslen(ssub
)-1 > REDIS_SORTKEY_MAX
) return NULL
;
3274 p
= strchr(spat
,'*');
3275 if (!p
) return NULL
;
3278 sublen
= sdslen(ssub
);
3279 postfixlen
= sdslen(spat
)-(prefixlen
+1);
3280 memcpy(keyname
.buf
,spat
,prefixlen
);
3281 memcpy(keyname
.buf
+prefixlen
,ssub
,sublen
);
3282 memcpy(keyname
.buf
+prefixlen
+sublen
,p
+1,postfixlen
);
3283 keyname
.buf
[prefixlen
+sublen
+postfixlen
] = '\0';
3284 keyname
.len
= prefixlen
+sublen
+postfixlen
;
3286 keyobj
.refcount
= 1;
3287 keyobj
.type
= REDIS_STRING
;
3288 keyobj
.ptr
= ((char*)&keyname
)+(sizeof(long)*2);
3290 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
3291 return lookupKeyRead(db
,&keyobj
);
3294 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
3295 * the additional parameter is not standard but a BSD-specific we have to
3296 * pass sorting parameters via the global 'server' structure */
3297 static int sortCompare(const void *s1
, const void *s2
) {
3298 const redisSortObject
*so1
= s1
, *so2
= s2
;
3301 if (!server
.sort_alpha
) {
3302 /* Numeric sorting. Here it's trivial as we precomputed scores */
3303 if (so1
->u
.score
> so2
->u
.score
) {
3305 } else if (so1
->u
.score
< so2
->u
.score
) {
3311 /* Alphanumeric sorting */
3312 if (server
.sort_bypattern
) {
3313 if (!so1
->u
.cmpobj
|| !so2
->u
.cmpobj
) {
3314 /* At least one compare object is NULL */
3315 if (so1
->u
.cmpobj
== so2
->u
.cmpobj
)
3317 else if (so1
->u
.cmpobj
== NULL
)
3322 /* We have both the objects, use strcoll */
3323 cmp
= strcoll(so1
->u
.cmpobj
->ptr
,so2
->u
.cmpobj
->ptr
);
3326 /* Compare elements directly */
3327 cmp
= strcoll(so1
->obj
->ptr
,so2
->obj
->ptr
);
3330 return server
.sort_desc
? -cmp
: cmp
;
3333 /* The SORT command is the most complex command in Redis. Warning: this code
3334 * is optimized for speed and a bit less for readability */
3335 static void sortCommand(redisClient
*c
) {
3338 int desc
= 0, alpha
= 0;
3339 int limit_start
= 0, limit_count
= -1, start
, end
;
3340 int j
, dontsort
= 0, vectorlen
;
3341 int getop
= 0; /* GET operation counter */
3342 robj
*sortval
, *sortby
= NULL
;
3343 redisSortObject
*vector
; /* Resulting vector to sort */
3345 /* Lookup the key to sort. It must be of the right types */
3346 sortval
= lookupKeyRead(c
->db
,c
->argv
[1]);
3347 if (sortval
== NULL
) {
3348 addReply(c
,shared
.nokeyerr
);
3351 if (sortval
->type
!= REDIS_SET
&& sortval
->type
!= REDIS_LIST
) {
3352 addReply(c
,shared
.wrongtypeerr
);
3356 /* Create a list of operations to perform for every sorted element.
3357 * Operations can be GET/DEL/INCR/DECR */
3358 operations
= listCreate();
3359 listSetFreeMethod(operations
,zfree
);
3362 /* Now we need to protect sortval incrementing its count, in the future
3363 * SORT may have options able to overwrite/delete keys during the sorting
3364 * and the sorted key itself may get destroied */
3365 incrRefCount(sortval
);
3367 /* The SORT command has an SQL-alike syntax, parse it */
3368 while(j
< c
->argc
) {
3369 int leftargs
= c
->argc
-j
-1;
3370 if (!strcasecmp(c
->argv
[j
]->ptr
,"asc")) {
3372 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"desc")) {
3374 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"alpha")) {
3376 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"limit") && leftargs
>= 2) {
3377 limit_start
= atoi(c
->argv
[j
+1]->ptr
);
3378 limit_count
= atoi(c
->argv
[j
+2]->ptr
);
3380 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"by") && leftargs
>= 1) {
3381 sortby
= c
->argv
[j
+1];
3382 /* If the BY pattern does not contain '*', i.e. it is constant,
3383 * we don't need to sort nor to lookup the weight keys. */
3384 if (strchr(c
->argv
[j
+1]->ptr
,'*') == NULL
) dontsort
= 1;
3386 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"get") && leftargs
>= 1) {
3387 listAddNodeTail(operations
,createSortOperation(
3388 REDIS_SORT_GET
,c
->argv
[j
+1]));
3391 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"del") && leftargs
>= 1) {
3392 listAddNodeTail(operations
,createSortOperation(
3393 REDIS_SORT_DEL
,c
->argv
[j
+1]));
3395 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"incr") && leftargs
>= 1) {
3396 listAddNodeTail(operations
,createSortOperation(
3397 REDIS_SORT_INCR
,c
->argv
[j
+1]));
3399 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"get") && leftargs
>= 1) {
3400 listAddNodeTail(operations
,createSortOperation(
3401 REDIS_SORT_DECR
,c
->argv
[j
+1]));
3404 decrRefCount(sortval
);
3405 listRelease(operations
);
3406 addReply(c
,shared
.syntaxerr
);
3412 /* Load the sorting vector with all the objects to sort */
3413 vectorlen
= (sortval
->type
== REDIS_LIST
) ?
3414 listLength((list
*)sortval
->ptr
) :
3415 dictSize((dict
*)sortval
->ptr
);
3416 vector
= zmalloc(sizeof(redisSortObject
)*vectorlen
);
3417 if (!vector
) oom("allocating objects vector for SORT");
3419 if (sortval
->type
== REDIS_LIST
) {
3420 list
*list
= sortval
->ptr
;
3424 while((ln
= listYield(list
))) {
3425 robj
*ele
= ln
->value
;
3426 vector
[j
].obj
= ele
;
3427 vector
[j
].u
.score
= 0;
3428 vector
[j
].u
.cmpobj
= NULL
;
3432 dict
*set
= sortval
->ptr
;
3436 di
= dictGetIterator(set
);
3437 if (!di
) oom("dictGetIterator");
3438 while((setele
= dictNext(di
)) != NULL
) {
3439 vector
[j
].obj
= dictGetEntryKey(setele
);
3440 vector
[j
].u
.score
= 0;
3441 vector
[j
].u
.cmpobj
= NULL
;
3444 dictReleaseIterator(di
);
3446 assert(j
== vectorlen
);
3448 /* Now it's time to load the right scores in the sorting vector */
3449 if (dontsort
== 0) {
3450 for (j
= 0; j
< vectorlen
; j
++) {
3454 byval
= lookupKeyByPattern(c
->db
,sortby
,vector
[j
].obj
);
3455 if (!byval
|| byval
->type
!= REDIS_STRING
) continue;
3457 vector
[j
].u
.cmpobj
= byval
;
3458 incrRefCount(byval
);
3460 vector
[j
].u
.score
= strtod(byval
->ptr
,NULL
);
3463 if (!alpha
) vector
[j
].u
.score
= strtod(vector
[j
].obj
->ptr
,NULL
);
3468 /* We are ready to sort the vector... perform a bit of sanity check
3469 * on the LIMIT option too. We'll use a partial version of quicksort. */
3470 start
= (limit_start
< 0) ? 0 : limit_start
;
3471 end
= (limit_count
< 0) ? vectorlen
-1 : start
+limit_count
-1;
3472 if (start
>= vectorlen
) {
3473 start
= vectorlen
-1;
3476 if (end
>= vectorlen
) end
= vectorlen
-1;
3478 if (dontsort
== 0) {
3479 server
.sort_desc
= desc
;
3480 server
.sort_alpha
= alpha
;
3481 server
.sort_bypattern
= sortby
? 1 : 0;
3482 if (sortby
&& (start
!= 0 || end
!= vectorlen
-1))
3483 pqsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
, start
,end
);
3485 qsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
);
3488 /* Send command output to the output buffer, performing the specified
3489 * GET/DEL/INCR/DECR operations if any. */
3490 outputlen
= getop
? getop
*(end
-start
+1) : end
-start
+1;
3491 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",outputlen
));
3492 for (j
= start
; j
<= end
; j
++) {
3495 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",
3496 sdslen(vector
[j
].obj
->ptr
)));
3497 addReply(c
,vector
[j
].obj
);
3498 addReply(c
,shared
.crlf
);
3500 listRewind(operations
);
3501 while((ln
= listYield(operations
))) {
3502 redisSortOperation
*sop
= ln
->value
;
3503 robj
*val
= lookupKeyByPattern(c
->db
,sop
->pattern
,
3506 if (sop
->type
== REDIS_SORT_GET
) {
3507 if (!val
|| val
->type
!= REDIS_STRING
) {
3508 addReply(c
,shared
.nullbulk
);
3510 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",
3513 addReply(c
,shared
.crlf
);
3515 } else if (sop
->type
== REDIS_SORT_DEL
) {
3522 decrRefCount(sortval
);
3523 listRelease(operations
);
3524 for (j
= 0; j
< vectorlen
; j
++) {
3525 if (sortby
&& alpha
&& vector
[j
].u
.cmpobj
)
3526 decrRefCount(vector
[j
].u
.cmpobj
);
3531 static void infoCommand(redisClient
*c
) {
3533 time_t uptime
= time(NULL
)-server
.stat_starttime
;
3535 info
= sdscatprintf(sdsempty(),
3536 "redis_version:%s\r\n"
3537 "uptime_in_seconds:%d\r\n"
3538 "uptime_in_days:%d\r\n"
3539 "connected_clients:%d\r\n"
3540 "connected_slaves:%d\r\n"
3541 "used_memory:%zu\r\n"
3542 "changes_since_last_save:%lld\r\n"
3543 "bgsave_in_progress:%d\r\n"
3544 "last_save_time:%d\r\n"
3545 "total_connections_received:%lld\r\n"
3546 "total_commands_processed:%lld\r\n"
3551 listLength(server
.clients
)-listLength(server
.slaves
),
3552 listLength(server
.slaves
),
3555 server
.bgsaveinprogress
,
3557 server
.stat_numconnections
,
3558 server
.stat_numcommands
,
3559 server
.masterhost
== NULL
? "master" : "slave"
3561 if (server
.masterhost
) {
3562 info
= sdscatprintf(info
,
3563 "master_host:%s\r\n"
3564 "master_port:%d\r\n"
3565 "master_link_status:%s\r\n"
3566 "master_last_io_seconds_ago:%d\r\n"
3569 (server
.replstate
== REDIS_REPL_CONNECTED
) ?
3571 (int)(time(NULL
)-server
.master
->lastinteraction
)
3574 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info
)));
3575 addReplySds(c
,info
);
3576 addReply(c
,shared
.crlf
);
3579 static void monitorCommand(redisClient
*c
) {
3580 /* ignore MONITOR if aleady slave or in monitor mode */
3581 if (c
->flags
& REDIS_SLAVE
) return;
3583 c
->flags
|= (REDIS_SLAVE
|REDIS_MONITOR
);
3585 if (!listAddNodeTail(server
.monitors
,c
)) oom("listAddNodeTail");
3586 addReply(c
,shared
.ok
);
3589 /* ================================= Expire ================================= */
3590 static int removeExpire(redisDb
*db
, robj
*key
) {
3591 if (dictDelete(db
->expires
,key
) == DICT_OK
) {
3598 static int setExpire(redisDb
*db
, robj
*key
, time_t when
) {
3599 if (dictAdd(db
->expires
,key
,(void*)when
) == DICT_ERR
) {
3607 /* Return the expire time of the specified key, or -1 if no expire
3608 * is associated with this key (i.e. the key is non volatile) */
3609 static time_t getExpire(redisDb
*db
, robj
*key
) {
3612 /* No expire? return ASAP */
3613 if (dictSize(db
->expires
) == 0 ||
3614 (de
= dictFind(db
->expires
,key
)) == NULL
) return -1;
3616 return (time_t) dictGetEntryVal(de
);
3619 static int expireIfNeeded(redisDb
*db
, robj
*key
) {
3623 /* No expire? return ASAP */
3624 if (dictSize(db
->expires
) == 0 ||
3625 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
3627 /* Lookup the expire */
3628 when
= (time_t) dictGetEntryVal(de
);
3629 if (time(NULL
) <= when
) return 0;
3631 /* Delete the key */
3632 dictDelete(db
->expires
,key
);
3633 return dictDelete(db
->dict
,key
) == DICT_OK
;
3636 static int deleteIfVolatile(redisDb
*db
, robj
*key
) {
3639 /* No expire? return ASAP */
3640 if (dictSize(db
->expires
) == 0 ||
3641 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
3643 /* Delete the key */
3645 dictDelete(db
->expires
,key
);
3646 return dictDelete(db
->dict
,key
) == DICT_OK
;
3649 static void expireCommand(redisClient
*c
) {
3651 int seconds
= atoi(c
->argv
[2]->ptr
);
3653 de
= dictFind(c
->db
->dict
,c
->argv
[1]);
3655 addReply(c
,shared
.czero
);
3659 addReply(c
, shared
.czero
);
3662 time_t when
= time(NULL
)+seconds
;
3663 if (setExpire(c
->db
,c
->argv
[1],when
))
3664 addReply(c
,shared
.cone
);
3666 addReply(c
,shared
.czero
);
3671 static void ttlCommand(redisClient
*c
) {
3675 expire
= getExpire(c
->db
,c
->argv
[1]);
3677 ttl
= (int) (expire
-time(NULL
));
3678 if (ttl
< 0) ttl
= -1;
3680 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",ttl
));
3683 /* =============================== Replication ============================= */
3685 static int syncWrite(int fd
, char *ptr
, ssize_t size
, int timeout
) {
3686 ssize_t nwritten
, ret
= size
;
3687 time_t start
= time(NULL
);
3691 if (aeWait(fd
,AE_WRITABLE
,1000) & AE_WRITABLE
) {
3692 nwritten
= write(fd
,ptr
,size
);
3693 if (nwritten
== -1) return -1;
3697 if ((time(NULL
)-start
) > timeout
) {
3705 static int syncRead(int fd
, char *ptr
, ssize_t size
, int timeout
) {
3706 ssize_t nread
, totread
= 0;
3707 time_t start
= time(NULL
);
3711 if (aeWait(fd
,AE_READABLE
,1000) & AE_READABLE
) {
3712 nread
= read(fd
,ptr
,size
);
3713 if (nread
== -1) return -1;
3718 if ((time(NULL
)-start
) > timeout
) {
3726 static int syncReadLine(int fd
, char *ptr
, ssize_t size
, int timeout
) {
3733 if (syncRead(fd
,&c
,1,timeout
) == -1) return -1;
3736 if (nread
&& *(ptr
-1) == '\r') *(ptr
-1) = '\0';
3747 static void syncCommand(redisClient
*c
) {
3748 /* ignore SYNC if aleady slave or in monitor mode */
3749 if (c
->flags
& REDIS_SLAVE
) return;
3751 /* SYNC can't be issued when the server has pending data to send to
3752 * the client about already issued commands. We need a fresh reply
3753 * buffer registering the differences between the BGSAVE and the current
3754 * dataset, so that we can copy to other slaves if needed. */
3755 if (listLength(c
->reply
) != 0) {
3756 addReplySds(c
,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
3760 redisLog(REDIS_NOTICE
,"Slave ask for synchronization");
3761 /* Here we need to check if there is a background saving operation
3762 * in progress, or if it is required to start one */
3763 if (server
.bgsaveinprogress
) {
3764 /* Ok a background save is in progress. Let's check if it is a good
3765 * one for replication, i.e. if there is another slave that is
3766 * registering differences since the server forked to save */
3770 listRewind(server
.slaves
);
3771 while((ln
= listYield(server
.slaves
))) {
3773 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) break;
3776 /* Perfect, the server is already registering differences for
3777 * another slave. Set the right state, and copy the buffer. */
3778 listRelease(c
->reply
);
3779 c
->reply
= listDup(slave
->reply
);
3780 if (!c
->reply
) oom("listDup copying slave reply list");
3781 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
3782 redisLog(REDIS_NOTICE
,"Waiting for end of BGSAVE for SYNC");
3784 /* No way, we need to wait for the next BGSAVE in order to
3785 * register differences */
3786 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_START
;
3787 redisLog(REDIS_NOTICE
,"Waiting for next BGSAVE for SYNC");
3790 /* Ok we don't have a BGSAVE in progress, let's start one */
3791 redisLog(REDIS_NOTICE
,"Starting BGSAVE for SYNC");
3792 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
3793 redisLog(REDIS_NOTICE
,"Replication failed, can't BGSAVE");
3794 addReplySds(c
,sdsnew("-ERR Unalbe to perform background save\r\n"));
3797 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
3800 c
->flags
|= REDIS_SLAVE
;
3802 if (!listAddNodeTail(server
.slaves
,c
)) oom("listAddNodeTail");
3806 static void sendBulkToSlave(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
3807 redisClient
*slave
= privdata
;
3809 REDIS_NOTUSED(mask
);
3810 char buf
[REDIS_IOBUF_LEN
];
3811 ssize_t nwritten
, buflen
;
3813 if (slave
->repldboff
== 0) {
3814 /* Write the bulk write count before to transfer the DB. In theory here
3815 * we don't know how much room there is in the output buffer of the
3816 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
3817 * operations) will never be smaller than the few bytes we need. */
3820 bulkcount
= sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
3822 if (write(fd
,bulkcount
,sdslen(bulkcount
)) != (signed)sdslen(bulkcount
))
3830 lseek(slave
->repldbfd
,slave
->repldboff
,SEEK_SET
);
3831 buflen
= read(slave
->repldbfd
,buf
,REDIS_IOBUF_LEN
);
3833 redisLog(REDIS_WARNING
,"Read error sending DB to slave: %s",
3834 (buflen
== 0) ? "premature EOF" : strerror(errno
));
3838 if ((nwritten
= write(fd
,buf
,buflen
)) == -1) {
3839 redisLog(REDIS_DEBUG
,"Write error sending DB to slave: %s",
3844 slave
->repldboff
+= nwritten
;
3845 if (slave
->repldboff
== slave
->repldbsize
) {
3846 close(slave
->repldbfd
);
3847 slave
->repldbfd
= -1;
3848 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
3849 slave
->replstate
= REDIS_REPL_ONLINE
;
3850 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
,
3851 sendReplyToClient
, slave
, NULL
) == AE_ERR
) {
3855 addReplySds(slave
,sdsempty());
3856 redisLog(REDIS_NOTICE
,"Synchronization with slave succeeded");
3860 static void updateSalvesWaitingBgsave(int bgsaveerr
) {
3862 int startbgsave
= 0;
3864 listRewind(server
.slaves
);
3865 while((ln
= listYield(server
.slaves
))) {
3866 redisClient
*slave
= ln
->value
;
3868 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) {
3870 slave
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
3871 } else if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) {
3874 if (bgsaveerr
!= REDIS_OK
) {
3876 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE child returned an error");
3879 if ((slave
->repldbfd
= open(server
.dbfilename
,O_RDONLY
)) == -1 ||
3880 fstat(slave
->repldbfd
,&buf
) == -1) {
3882 redisLog(REDIS_WARNING
,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno
));
3885 slave
->repldboff
= 0;
3886 slave
->repldbsize
= buf
.st_size
;
3887 slave
->replstate
= REDIS_REPL_SEND_BULK
;
3888 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
3889 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
, sendBulkToSlave
, slave
, NULL
) == AE_ERR
) {
3896 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
3897 listRewind(server
.slaves
);
3898 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE failed");
3899 while((ln
= listYield(server
.slaves
))) {
3900 redisClient
*slave
= ln
->value
;
3902 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
)
3909 static int syncWithMaster(void) {
3910 char buf
[1024], tmpfile
[256];
3912 int fd
= anetTcpConnect(NULL
,server
.masterhost
,server
.masterport
);
3916 redisLog(REDIS_WARNING
,"Unable to connect to MASTER: %s",
3920 /* Issue the SYNC command */
3921 if (syncWrite(fd
,"SYNC \r\n",7,5) == -1) {
3923 redisLog(REDIS_WARNING
,"I/O error writing to MASTER: %s",
3927 /* Read the bulk write count */
3928 if (syncReadLine(fd
,buf
,1024,3600) == -1) {
3930 redisLog(REDIS_WARNING
,"I/O error reading bulk count from MASTER: %s",
3934 dumpsize
= atoi(buf
+1);
3935 redisLog(REDIS_NOTICE
,"Receiving %d bytes data dump from MASTER",dumpsize
);
3936 /* Read the bulk write data on a temp file */
3937 snprintf(tmpfile
,256,"temp-%d.%ld.rdb",(int)time(NULL
),(long int)random());
3938 dfd
= open(tmpfile
,O_CREAT
|O_WRONLY
,0644);
3941 redisLog(REDIS_WARNING
,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno
));
3945 int nread
, nwritten
;
3947 nread
= read(fd
,buf
,(dumpsize
< 1024)?dumpsize
:1024);
3949 redisLog(REDIS_WARNING
,"I/O error trying to sync with MASTER: %s",
3955 nwritten
= write(dfd
,buf
,nread
);
3956 if (nwritten
== -1) {
3957 redisLog(REDIS_WARNING
,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno
));
3965 if (rename(tmpfile
,server
.dbfilename
) == -1) {
3966 redisLog(REDIS_WARNING
,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno
));
3972 if (rdbLoad(server
.dbfilename
) != REDIS_OK
) {
3973 redisLog(REDIS_WARNING
,"Failed trying to load the MASTER synchronization DB from disk");
3977 server
.master
= createClient(fd
);
3978 server
.master
->flags
|= REDIS_MASTER
;
3979 server
.replstate
= REDIS_REPL_CONNECTED
;
3983 static void slaveofCommand(redisClient
*c
) {
3984 if (!strcasecmp(c
->argv
[1]->ptr
,"no") &&
3985 !strcasecmp(c
->argv
[2]->ptr
,"one")) {
3986 if (server
.masterhost
) {
3987 sdsfree(server
.masterhost
);
3988 server
.masterhost
= NULL
;
3989 if (server
.master
) freeClient(server
.master
);
3990 server
.replstate
= REDIS_REPL_NONE
;
3991 redisLog(REDIS_NOTICE
,"MASTER MODE enabled (user request)");
3994 sdsfree(server
.masterhost
);
3995 server
.masterhost
= sdsdup(c
->argv
[1]->ptr
);
3996 server
.masterport
= atoi(c
->argv
[2]->ptr
);
3997 if (server
.master
) freeClient(server
.master
);
3998 server
.replstate
= REDIS_REPL_CONNECT
;
3999 redisLog(REDIS_NOTICE
,"SLAVE OF %s:%d enabled (user request)",
4000 server
.masterhost
, server
.masterport
);
4002 addReply(c
,shared
.ok
);
4005 /* ============================ Maxmemory directive ======================== */
4007 /* This function gets called when 'maxmemory' is set on the config file to limit
4008 * the max memory used by the server, and we are out of memory.
4009 * This function will try to, in order:
4011 * - Free objects from the free list
4012 * - Try to remove keys with an EXPIRE set
4014 * It is not possible to free enough memory to reach used-memory < maxmemory
4015 * the server will start refusing commands that will enlarge even more the
4018 static void freeMemoryIfNeeded(void) {
4019 while (server
.maxmemory
&& zmalloc_used_memory() > server
.maxmemory
) {
4020 if (listLength(server
.objfreelist
)) {
4023 listNode
*head
= listFirst(server
.objfreelist
);
4024 o
= listNodeValue(head
);
4025 listDelNode(server
.objfreelist
,head
);
4028 int j
, k
, freed
= 0;
4030 for (j
= 0; j
< server
.dbnum
; j
++) {
4032 robj
*minkey
= NULL
;
4033 struct dictEntry
*de
;
4035 if (dictSize(server
.db
[j
].expires
)) {
4037 /* From a sample of three keys drop the one nearest to
4038 * the natural expire */
4039 for (k
= 0; k
< 3; k
++) {
4042 de
= dictGetRandomKey(server
.db
[j
].expires
);
4043 t
= (time_t) dictGetEntryVal(de
);
4044 if (minttl
== -1 || t
< minttl
) {
4045 minkey
= dictGetEntryKey(de
);
4049 deleteKey(server
.db
+j
,minkey
);
4052 if (!freed
) return; /* nothing to free... */
4057 /* ================================= Debugging ============================== */
4059 static void debugCommand(redisClient
*c
) {
4060 if (!strcasecmp(c
->argv
[1]->ptr
,"segfault")) {
4063 addReplySds(c
,sdsnew("-ERR Syntax error, try DEBUG SEGFAULT\r\n"));
4067 static void onSigsegv(int sig
) {
4069 int n
= backtrace(trace
, 25);
4070 char **symbols
= backtrace_symbols(trace
, n
);
4072 redisLog(REDIS_WARNING
,"Got %s!!! Redis crashed, backtrace:",
4073 sig
== SIGSEGV
? "SIGSEGV" : "SIGBUS");
4074 for (int i
= 0; i
< n
; i
++)
4075 redisLog(REDIS_WARNING
,symbols
[i
]);
4079 /* =================================== Main! ================================ */
4082 int linuxOvercommitMemoryValue(void) {
4083 FILE *fp
= fopen("/proc/sys/vm/overcommit_memory","r");
4087 if (fgets(buf
,64,fp
) == NULL
) {
4096 void linuxOvercommitMemoryWarning(void) {
4097 if (linuxOvercommitMemoryValue() == 0) {
4098 redisLog(REDIS_WARNING
,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'echo 1 > /proc/sys/vm/overcommit_memory' in your init scripts.");
4101 #endif /* __linux__ */
4103 static void daemonize(void) {
4107 if (fork() != 0) exit(0); /* parent exits */
4108 setsid(); /* create a new session */
4110 /* Every output goes to /dev/null. If Redis is daemonized but
4111 * the 'logfile' is set to 'stdout' in the configuration file
4112 * it will not log at all. */
4113 if ((fd
= open("/dev/null", O_RDWR
, 0)) != -1) {
4114 dup2(fd
, STDIN_FILENO
);
4115 dup2(fd
, STDOUT_FILENO
);
4116 dup2(fd
, STDERR_FILENO
);
4117 if (fd
> STDERR_FILENO
) close(fd
);
4119 /* Try to write the pid file */
4120 fp
= fopen(server
.pidfile
,"w");
4122 fprintf(fp
,"%d\n",getpid());
4127 int main(int argc
, char **argv
) {
4129 linuxOvercommitMemoryWarning();
4134 ResetServerSaveParams();
4135 loadServerConfig(argv
[1]);
4136 } else if (argc
> 2) {
4137 fprintf(stderr
,"Usage: ./redis-server [/path/to/redis.conf]\n");
4140 redisLog(REDIS_WARNING
,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
4143 if (server
.daemonize
) daemonize();
4144 redisLog(REDIS_NOTICE
,"Server started, Redis version " REDIS_VERSION
);
4145 if (rdbLoad(server
.dbfilename
) == REDIS_OK
)
4146 redisLog(REDIS_NOTICE
,"DB loaded from disk");
4147 if (aeCreateFileEvent(server
.el
, server
.fd
, AE_READABLE
,
4148 acceptHandler
, NULL
, NULL
) == AE_ERR
) oom("creating file event");
4149 redisLog(REDIS_NOTICE
,"The server is now ready to accept connections on port %d", server
.port
);
4151 aeDeleteEventLoop(server
.el
);