2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
30 #define REDIS_VERSION "0.101"
46 #include <arpa/inet.h>
50 #include <sys/resource.h>
54 #include "ae.h" /* Event driven programming library */
55 #include "sds.h" /* Dynamic safe strings */
56 #include "anet.h" /* Networking the easy way */
57 #include "dict.h" /* Hash tables */
58 #include "adlist.h" /* Linked lists */
59 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
60 #include "lzf.h" /* LZF compression library */
61 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
69 /* Static server configuration */
70 #define REDIS_SERVERPORT 6379 /* TCP port */
71 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
72 #define REDIS_IOBUF_LEN 1024
73 #define REDIS_LOADBUF_LEN 1024
74 #define REDIS_STATIC_ARGS 4
75 #define REDIS_DEFAULT_DBNUM 16
76 #define REDIS_CONFIGLINE_MAX 1024
77 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
78 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
79 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
81 /* Hash table parameters */
82 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
83 #define REDIS_HT_MINSLOTS 16384 /* Never resize the HT under this */
86 #define REDIS_CMD_BULK 1 /* Bulk write command */
87 #define REDIS_CMD_INLINE 2 /* Inline command */
88 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
89 this flags will return an error when the 'maxmemory' option is set in the
90 config file and the server is using more than maxmemory bytes of memory.
91 In short this commands are denied on low memory conditions. */
92 #define REDIS_CMD_DENYOOM 4
95 #define REDIS_STRING 0
100 /* Object types only used for dumping to disk */
101 #define REDIS_EXPIRETIME 253
102 #define REDIS_SELECTDB 254
103 #define REDIS_EOF 255
105 /* Defines related to the dump file format. To store 32 bits lengths for short
106 * keys requires a lot of space, so we check the most significant 2 bits of
107 * the first byte to interpreter the length:
109 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
110 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
111 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
112 * 11|000000 this means: specially encoded object will follow. The six bits
113 * number specify the kind of object that follows.
114 * See the REDIS_RDB_ENC_* defines.
116 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
117 * values, will fit inside. */
118 #define REDIS_RDB_6BITLEN 0
119 #define REDIS_RDB_14BITLEN 1
120 #define REDIS_RDB_32BITLEN 2
121 #define REDIS_RDB_ENCVAL 3
122 #define REDIS_RDB_LENERR UINT_MAX
124 /* When a length of a string object stored on disk has the first two bits
125 * set, the remaining two bits specify a special encoding for the object
126 * accordingly to the following defines: */
127 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
128 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
129 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
130 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
133 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
134 #define REDIS_SLAVE 2 /* This client is a slave server */
135 #define REDIS_MASTER 4 /* This client is a master server */
136 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
138 /* Slave replication state - slave side */
139 #define REDIS_REPL_NONE 0 /* No active replication */
140 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
141 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
143 /* Slave replication state - from the point of view of master
144 * Note that in SEND_BULK and ONLINE state the slave receives new updates
145 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
146 * to start the next background saving in order to send updates to it. */
147 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
148 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
149 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
150 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
152 /* List related stuff */
156 /* Sort operations */
157 #define REDIS_SORT_GET 0
158 #define REDIS_SORT_DEL 1
159 #define REDIS_SORT_INCR 2
160 #define REDIS_SORT_DECR 3
161 #define REDIS_SORT_ASC 4
162 #define REDIS_SORT_DESC 5
163 #define REDIS_SORTKEY_MAX 1024
166 #define REDIS_DEBUG 0
167 #define REDIS_NOTICE 1
168 #define REDIS_WARNING 2
170 /* Anti-warning macro... */
171 #define REDIS_NOTUSED(V) ((void) V)
173 /*================================= Data types ============================== */
175 /* A redis object, that is a type able to hold a string / list / set */
176 typedef struct redisObject
{
182 typedef struct redisDb
{
188 /* With multiplexing we need to take per-clinet state.
189 * Clients are taken in a liked list. */
190 typedef struct redisClient
{
197 int bulklen
; /* bulk read len. -1 if not in bulk read mode */
200 time_t lastinteraction
; /* time of the last interaction, used for timeout */
201 int flags
; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
202 int slaveseldb
; /* slave selected db, if this client is a slave */
203 int authenticated
; /* when requirepass is non-NULL */
204 int replstate
; /* replication state if this is a slave */
205 int repldbfd
; /* replication DB file descriptor */
206 long repldboff
; /* replication DB file offset */
207 off_t repldbsize
; /* replication DB file size */
215 /* Global server state structure */
221 unsigned int sharingpoolsize
;
222 long long dirty
; /* changes to DB from the last save */
224 list
*slaves
, *monitors
;
225 char neterr
[ANET_ERR_LEN
];
227 int cronloops
; /* number of times the cron function run */
228 list
*objfreelist
; /* A list of freed objects to avoid malloc() */
229 time_t lastsave
; /* Unix time of last save succeeede */
230 size_t usedmemory
; /* Used memory in megabytes */
231 /* Fields used only for stats */
232 time_t stat_starttime
; /* server start time */
233 long long stat_numcommands
; /* number of processed commands */
234 long long stat_numconnections
; /* number of connections received */
242 int bgsaveinprogress
;
243 pid_t bgsavechildpid
;
244 struct saveparam
*saveparams
;
251 /* Replication related */
255 redisClient
*master
; /* client that is master for this slave */
257 unsigned int maxclients
;
258 unsigned int maxmemory
;
259 /* Sort parameters - qsort_r() is only available under BSD so we
260 * have to take this state global, in order to pass it to sortCompare() */
266 typedef void redisCommandProc(redisClient
*c
);
267 struct redisCommand
{
269 redisCommandProc
*proc
;
274 typedef struct _redisSortObject
{
282 typedef struct _redisSortOperation
{
285 } redisSortOperation
;
287 struct sharedObjectsStruct
{
288 robj
*crlf
, *ok
, *err
, *emptybulk
, *czero
, *cone
, *pong
, *space
,
289 *colon
, *nullbulk
, *nullmultibulk
,
290 *emptymultibulk
, *wrongtypeerr
, *nokeyerr
, *syntaxerr
, *sameobjecterr
,
291 *outofrangeerr
, *plus
,
292 *select0
, *select1
, *select2
, *select3
, *select4
,
293 *select5
, *select6
, *select7
, *select8
, *select9
;
296 /*================================ Prototypes =============================== */
298 static void freeStringObject(robj
*o
);
299 static void freeListObject(robj
*o
);
300 static void freeSetObject(robj
*o
);
301 static void decrRefCount(void *o
);
302 static robj
*createObject(int type
, void *ptr
);
303 static void freeClient(redisClient
*c
);
304 static int rdbLoad(char *filename
);
305 static void addReply(redisClient
*c
, robj
*obj
);
306 static void addReplySds(redisClient
*c
, sds s
);
307 static void incrRefCount(robj
*o
);
308 static int rdbSaveBackground(char *filename
);
309 static robj
*createStringObject(char *ptr
, size_t len
);
310 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
);
311 static int syncWithMaster(void);
312 static robj
*tryObjectSharing(robj
*o
);
313 static int removeExpire(redisDb
*db
, robj
*key
);
314 static int expireIfNeeded(redisDb
*db
, robj
*key
);
315 static int deleteIfVolatile(redisDb
*db
, robj
*key
);
316 static int deleteKey(redisDb
*db
, robj
*key
);
317 static time_t getExpire(redisDb
*db
, robj
*key
);
318 static int setExpire(redisDb
*db
, robj
*key
, time_t when
);
319 static void updateSalvesWaitingBgsave(int bgsaveerr
);
320 static void freeMemoryIfNeeded(void);
322 static void authCommand(redisClient
*c
);
323 static void pingCommand(redisClient
*c
);
324 static void echoCommand(redisClient
*c
);
325 static void setCommand(redisClient
*c
);
326 static void setnxCommand(redisClient
*c
);
327 static void getCommand(redisClient
*c
);
328 static void delCommand(redisClient
*c
);
329 static void existsCommand(redisClient
*c
);
330 static void incrCommand(redisClient
*c
);
331 static void decrCommand(redisClient
*c
);
332 static void incrbyCommand(redisClient
*c
);
333 static void decrbyCommand(redisClient
*c
);
334 static void selectCommand(redisClient
*c
);
335 static void randomkeyCommand(redisClient
*c
);
336 static void keysCommand(redisClient
*c
);
337 static void dbsizeCommand(redisClient
*c
);
338 static void lastsaveCommand(redisClient
*c
);
339 static void saveCommand(redisClient
*c
);
340 static void bgsaveCommand(redisClient
*c
);
341 static void shutdownCommand(redisClient
*c
);
342 static void moveCommand(redisClient
*c
);
343 static void renameCommand(redisClient
*c
);
344 static void renamenxCommand(redisClient
*c
);
345 static void lpushCommand(redisClient
*c
);
346 static void rpushCommand(redisClient
*c
);
347 static void lpopCommand(redisClient
*c
);
348 static void rpopCommand(redisClient
*c
);
349 static void llenCommand(redisClient
*c
);
350 static void lindexCommand(redisClient
*c
);
351 static void lrangeCommand(redisClient
*c
);
352 static void ltrimCommand(redisClient
*c
);
353 static void typeCommand(redisClient
*c
);
354 static void lsetCommand(redisClient
*c
);
355 static void saddCommand(redisClient
*c
);
356 static void sremCommand(redisClient
*c
);
357 static void smoveCommand(redisClient
*c
);
358 static void sismemberCommand(redisClient
*c
);
359 static void scardCommand(redisClient
*c
);
360 static void sinterCommand(redisClient
*c
);
361 static void sinterstoreCommand(redisClient
*c
);
362 static void sunionCommand(redisClient
*c
);
363 static void sunionstoreCommand(redisClient
*c
);
364 static void sdiffCommand(redisClient
*c
);
365 static void sdiffstoreCommand(redisClient
*c
);
366 static void syncCommand(redisClient
*c
);
367 static void flushdbCommand(redisClient
*c
);
368 static void flushallCommand(redisClient
*c
);
369 static void sortCommand(redisClient
*c
);
370 static void lremCommand(redisClient
*c
);
371 static void infoCommand(redisClient
*c
);
372 static void mgetCommand(redisClient
*c
);
373 static void monitorCommand(redisClient
*c
);
374 static void expireCommand(redisClient
*c
);
375 static void getSetCommand(redisClient
*c
);
376 static void ttlCommand(redisClient
*c
);
377 static void slaveofCommand(redisClient
*c
);
378 static void debugCommand(redisClient
*c
);
380 /*================================= Globals ================================= */
383 static struct redisServer server
; /* server global state */
384 static struct redisCommand cmdTable
[] = {
385 {"get",getCommand
,2,REDIS_CMD_INLINE
},
386 {"set",setCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
387 {"setnx",setnxCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
388 {"del",delCommand
,-2,REDIS_CMD_INLINE
},
389 {"exists",existsCommand
,2,REDIS_CMD_INLINE
},
390 {"incr",incrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
391 {"decr",decrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
392 {"mget",mgetCommand
,-2,REDIS_CMD_INLINE
},
393 {"rpush",rpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
394 {"lpush",lpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
395 {"rpop",rpopCommand
,2,REDIS_CMD_INLINE
},
396 {"lpop",lpopCommand
,2,REDIS_CMD_INLINE
},
397 {"llen",llenCommand
,2,REDIS_CMD_INLINE
},
398 {"lindex",lindexCommand
,3,REDIS_CMD_INLINE
},
399 {"lset",lsetCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
400 {"lrange",lrangeCommand
,4,REDIS_CMD_INLINE
},
401 {"ltrim",ltrimCommand
,4,REDIS_CMD_INLINE
},
402 {"lrem",lremCommand
,4,REDIS_CMD_BULK
},
403 {"sadd",saddCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
404 {"srem",sremCommand
,3,REDIS_CMD_BULK
},
405 {"smove",smoveCommand
,4,REDIS_CMD_BULK
},
406 {"sismember",sismemberCommand
,3,REDIS_CMD_BULK
},
407 {"scard",scardCommand
,2,REDIS_CMD_INLINE
},
408 {"sinter",sinterCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
409 {"sinterstore",sinterstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
410 {"sunion",sunionCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
411 {"sunionstore",sunionstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
412 {"sdiff",sdiffCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
413 {"sdiffstore",sdiffstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
414 {"smembers",sinterCommand
,2,REDIS_CMD_INLINE
},
415 {"incrby",incrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
416 {"decrby",decrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
417 {"getset",getSetCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
418 {"randomkey",randomkeyCommand
,1,REDIS_CMD_INLINE
},
419 {"select",selectCommand
,2,REDIS_CMD_INLINE
},
420 {"move",moveCommand
,3,REDIS_CMD_INLINE
},
421 {"rename",renameCommand
,3,REDIS_CMD_INLINE
},
422 {"renamenx",renamenxCommand
,3,REDIS_CMD_INLINE
},
423 {"expire",expireCommand
,3,REDIS_CMD_INLINE
},
424 {"keys",keysCommand
,2,REDIS_CMD_INLINE
},
425 {"dbsize",dbsizeCommand
,1,REDIS_CMD_INLINE
},
426 {"auth",authCommand
,2,REDIS_CMD_INLINE
},
427 {"ping",pingCommand
,1,REDIS_CMD_INLINE
},
428 {"echo",echoCommand
,2,REDIS_CMD_BULK
},
429 {"save",saveCommand
,1,REDIS_CMD_INLINE
},
430 {"bgsave",bgsaveCommand
,1,REDIS_CMD_INLINE
},
431 {"shutdown",shutdownCommand
,1,REDIS_CMD_INLINE
},
432 {"lastsave",lastsaveCommand
,1,REDIS_CMD_INLINE
},
433 {"type",typeCommand
,2,REDIS_CMD_INLINE
},
434 {"sync",syncCommand
,1,REDIS_CMD_INLINE
},
435 {"flushdb",flushdbCommand
,1,REDIS_CMD_INLINE
},
436 {"flushall",flushallCommand
,1,REDIS_CMD_INLINE
},
437 {"sort",sortCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
438 {"info",infoCommand
,1,REDIS_CMD_INLINE
},
439 {"monitor",monitorCommand
,1,REDIS_CMD_INLINE
},
440 {"ttl",ttlCommand
,2,REDIS_CMD_INLINE
},
441 {"slaveof",slaveofCommand
,3,REDIS_CMD_INLINE
},
442 {"debug",debugCommand
,-2,REDIS_CMD_INLINE
},
446 /*============================ Utility functions ============================ */
448 /* Glob-style pattern matching. */
449 int stringmatchlen(const char *pattern
, int patternLen
,
450 const char *string
, int stringLen
, int nocase
)
455 while (pattern
[1] == '*') {
460 return 1; /* match */
462 if (stringmatchlen(pattern
+1, patternLen
-1,
463 string
, stringLen
, nocase
))
464 return 1; /* match */
468 return 0; /* no match */
472 return 0; /* no match */
482 not = pattern
[0] == '^';
489 if (pattern
[0] == '\\') {
492 if (pattern
[0] == string
[0])
494 } else if (pattern
[0] == ']') {
496 } else if (patternLen
== 0) {
500 } else if (pattern
[1] == '-' && patternLen
>= 3) {
501 int start
= pattern
[0];
502 int end
= pattern
[2];
510 start
= tolower(start
);
516 if (c
>= start
&& c
<= end
)
520 if (pattern
[0] == string
[0])
523 if (tolower((int)pattern
[0]) == tolower((int)string
[0]))
533 return 0; /* no match */
539 if (patternLen
>= 2) {
546 if (pattern
[0] != string
[0])
547 return 0; /* no match */
549 if (tolower((int)pattern
[0]) != tolower((int)string
[0]))
550 return 0; /* no match */
558 if (stringLen
== 0) {
559 while(*pattern
== '*') {
566 if (patternLen
== 0 && stringLen
== 0)
571 void redisLog(int level
, const char *fmt
, ...)
576 fp
= (server
.logfile
== NULL
) ? stdout
: fopen(server
.logfile
,"a");
580 if (level
>= server
.verbosity
) {
586 strftime(buf
,64,"%d %b %H:%M:%S",gmtime(&now
));
587 fprintf(fp
,"%s %c ",buf
,c
[level
]);
588 vfprintf(fp
, fmt
, ap
);
594 if (server
.logfile
) fclose(fp
);
597 /*====================== Hash table type implementation ==================== */
599 /* This is an hash table type that uses the SDS dynamic strings libary as
600 * keys and radis objects as values (objects can hold SDS strings,
603 static int sdsDictKeyCompare(void *privdata
, const void *key1
,
607 DICT_NOTUSED(privdata
);
609 l1
= sdslen((sds
)key1
);
610 l2
= sdslen((sds
)key2
);
611 if (l1
!= l2
) return 0;
612 return memcmp(key1
, key2
, l1
) == 0;
615 static void dictRedisObjectDestructor(void *privdata
, void *val
)
617 DICT_NOTUSED(privdata
);
622 static int dictSdsKeyCompare(void *privdata
, const void *key1
,
625 const robj
*o1
= key1
, *o2
= key2
;
626 return sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
629 static unsigned int dictSdsHash(const void *key
) {
631 return dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
634 static dictType setDictType
= {
635 dictSdsHash
, /* hash function */
638 dictSdsKeyCompare
, /* key compare */
639 dictRedisObjectDestructor
, /* key destructor */
640 NULL
/* val destructor */
643 static dictType hashDictType
= {
644 dictSdsHash
, /* hash function */
647 dictSdsKeyCompare
, /* key compare */
648 dictRedisObjectDestructor
, /* key destructor */
649 dictRedisObjectDestructor
/* val destructor */
652 /* ========================= Random utility functions ======================= */
654 /* Redis generally does not try to recover from out of memory conditions
655 * when allocating objects or strings, it is not clear if it will be possible
656 * to report this condition to the client since the networking layer itself
657 * is based on heap allocation for send buffers, so we simply abort.
658 * At least the code will be simpler to read... */
659 static void oom(const char *msg
) {
660 fprintf(stderr
, "%s: Out of memory\n",msg
);
666 /* ====================== Redis server networking stuff ===================== */
667 void closeTimedoutClients(void) {
670 time_t now
= time(NULL
);
672 listRewind(server
.clients
);
673 while ((ln
= listYield(server
.clients
)) != NULL
) {
674 c
= listNodeValue(ln
);
675 if (!(c
->flags
& REDIS_SLAVE
) && /* no timeout for slaves */
676 !(c
->flags
& REDIS_MASTER
) && /* no timeout for masters */
677 (now
- c
->lastinteraction
> server
.maxidletime
)) {
678 redisLog(REDIS_DEBUG
,"Closing idle client");
684 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
685 * we resize the hash table to save memory */
686 void tryResizeHashTables(void) {
689 for (j
= 0; j
< server
.dbnum
; j
++) {
690 long long size
, used
;
692 size
= dictSlots(server
.db
[j
].dict
);
693 used
= dictSize(server
.db
[j
].dict
);
694 if (size
&& used
&& size
> REDIS_HT_MINSLOTS
&&
695 (used
*100/size
< REDIS_HT_MINFILL
)) {
696 redisLog(REDIS_NOTICE
,"The hash table %d is too sparse, resize it...",j
);
697 dictResize(server
.db
[j
].dict
);
698 redisLog(REDIS_NOTICE
,"Hash table %d resized.",j
);
703 int serverCron(struct aeEventLoop
*eventLoop
, long long id
, void *clientData
) {
704 int j
, loops
= server
.cronloops
++;
705 REDIS_NOTUSED(eventLoop
);
707 REDIS_NOTUSED(clientData
);
709 /* Update the global state with the amount of used memory */
710 server
.usedmemory
= zmalloc_used_memory();
712 /* Show some info about non-empty databases */
713 for (j
= 0; j
< server
.dbnum
; j
++) {
714 long long size
, used
, vkeys
;
716 size
= dictSlots(server
.db
[j
].dict
);
717 used
= dictSize(server
.db
[j
].dict
);
718 vkeys
= dictSize(server
.db
[j
].expires
);
719 if (!(loops
% 5) && used
> 0) {
720 redisLog(REDIS_DEBUG
,"DB %d: %d keys (%d volatile) in %d slots HT.",j
,used
,vkeys
,size
);
721 /* dictPrintStats(server.dict); */
725 /* We don't want to resize the hash tables while a bacground saving
726 * is in progress: the saving child is created using fork() that is
727 * implemented with a copy-on-write semantic in most modern systems, so
728 * if we resize the HT while there is the saving child at work actually
729 * a lot of memory movements in the parent will cause a lot of pages
731 if (!server
.bgsaveinprogress
) tryResizeHashTables();
733 /* Show information about connected clients */
735 redisLog(REDIS_DEBUG
,"%d clients connected (%d slaves), %zu bytes in use",
736 listLength(server
.clients
)-listLength(server
.slaves
),
737 listLength(server
.slaves
),
739 dictSize(server
.sharingpool
));
742 /* Close connections of timedout clients */
743 if (server
.maxidletime
&& !(loops
% 10))
744 closeTimedoutClients();
746 /* Check if a background saving in progress terminated */
747 if (server
.bgsaveinprogress
) {
749 /* XXX: TODO handle the case of the saving child killed */
750 if (wait4(-1,&statloc
,WNOHANG
,NULL
)) {
751 int exitcode
= WEXITSTATUS(statloc
);
752 int bysignal
= WIFSIGNALED(statloc
);
754 if (!bysignal
&& exitcode
== 0) {
755 redisLog(REDIS_NOTICE
,
756 "Background saving terminated with success");
758 server
.lastsave
= time(NULL
);
759 } else if (!bysignal
&& exitcode
!= 0) {
760 redisLog(REDIS_WARNING
, "Background saving error");
762 redisLog(REDIS_WARNING
,
763 "Background saving terminated by signal");
765 server
.bgsaveinprogress
= 0;
766 server
.bgsavechildpid
= -1;
767 updateSalvesWaitingBgsave(exitcode
== 0 ? REDIS_OK
: REDIS_ERR
);
770 /* If there is not a background saving in progress check if
771 * we have to save now */
772 time_t now
= time(NULL
);
773 for (j
= 0; j
< server
.saveparamslen
; j
++) {
774 struct saveparam
*sp
= server
.saveparams
+j
;
776 if (server
.dirty
>= sp
->changes
&&
777 now
-server
.lastsave
> sp
->seconds
) {
778 redisLog(REDIS_NOTICE
,"%d changes in %d seconds. Saving...",
779 sp
->changes
, sp
->seconds
);
780 rdbSaveBackground(server
.dbfilename
);
786 /* Try to expire a few timed out keys */
787 for (j
= 0; j
< server
.dbnum
; j
++) {
788 redisDb
*db
= server
.db
+j
;
789 int num
= dictSize(db
->expires
);
792 time_t now
= time(NULL
);
794 if (num
> REDIS_EXPIRELOOKUPS_PER_CRON
)
795 num
= REDIS_EXPIRELOOKUPS_PER_CRON
;
800 if ((de
= dictGetRandomKey(db
->expires
)) == NULL
) break;
801 t
= (time_t) dictGetEntryVal(de
);
803 deleteKey(db
,dictGetEntryKey(de
));
809 /* Check if we should connect to a MASTER */
810 if (server
.replstate
== REDIS_REPL_CONNECT
) {
811 redisLog(REDIS_NOTICE
,"Connecting to MASTER...");
812 if (syncWithMaster() == REDIS_OK
) {
813 redisLog(REDIS_NOTICE
,"MASTER <-> SLAVE sync succeeded");
819 static void createSharedObjects(void) {
820 shared
.crlf
= createObject(REDIS_STRING
,sdsnew("\r\n"));
821 shared
.ok
= createObject(REDIS_STRING
,sdsnew("+OK\r\n"));
822 shared
.err
= createObject(REDIS_STRING
,sdsnew("-ERR\r\n"));
823 shared
.emptybulk
= createObject(REDIS_STRING
,sdsnew("$0\r\n\r\n"));
824 shared
.czero
= createObject(REDIS_STRING
,sdsnew(":0\r\n"));
825 shared
.cone
= createObject(REDIS_STRING
,sdsnew(":1\r\n"));
826 shared
.nullbulk
= createObject(REDIS_STRING
,sdsnew("$-1\r\n"));
827 shared
.nullmultibulk
= createObject(REDIS_STRING
,sdsnew("*-1\r\n"));
828 shared
.emptymultibulk
= createObject(REDIS_STRING
,sdsnew("*0\r\n"));
830 shared
.pong
= createObject(REDIS_STRING
,sdsnew("+PONG\r\n"));
831 shared
.wrongtypeerr
= createObject(REDIS_STRING
,sdsnew(
832 "-ERR Operation against a key holding the wrong kind of value\r\n"));
833 shared
.nokeyerr
= createObject(REDIS_STRING
,sdsnew(
834 "-ERR no such key\r\n"));
835 shared
.syntaxerr
= createObject(REDIS_STRING
,sdsnew(
836 "-ERR syntax error\r\n"));
837 shared
.sameobjecterr
= createObject(REDIS_STRING
,sdsnew(
838 "-ERR source and destination objects are the same\r\n"));
839 shared
.outofrangeerr
= createObject(REDIS_STRING
,sdsnew(
840 "-ERR index out of range\r\n"));
841 shared
.space
= createObject(REDIS_STRING
,sdsnew(" "));
842 shared
.colon
= createObject(REDIS_STRING
,sdsnew(":"));
843 shared
.plus
= createObject(REDIS_STRING
,sdsnew("+"));
844 shared
.select0
= createStringObject("select 0\r\n",10);
845 shared
.select1
= createStringObject("select 1\r\n",10);
846 shared
.select2
= createStringObject("select 2\r\n",10);
847 shared
.select3
= createStringObject("select 3\r\n",10);
848 shared
.select4
= createStringObject("select 4\r\n",10);
849 shared
.select5
= createStringObject("select 5\r\n",10);
850 shared
.select6
= createStringObject("select 6\r\n",10);
851 shared
.select7
= createStringObject("select 7\r\n",10);
852 shared
.select8
= createStringObject("select 8\r\n",10);
853 shared
.select9
= createStringObject("select 9\r\n",10);
856 static void appendServerSaveParams(time_t seconds
, int changes
) {
857 server
.saveparams
= zrealloc(server
.saveparams
,sizeof(struct saveparam
)*(server
.saveparamslen
+1));
858 if (server
.saveparams
== NULL
) oom("appendServerSaveParams");
859 server
.saveparams
[server
.saveparamslen
].seconds
= seconds
;
860 server
.saveparams
[server
.saveparamslen
].changes
= changes
;
861 server
.saveparamslen
++;
864 static void ResetServerSaveParams() {
865 zfree(server
.saveparams
);
866 server
.saveparams
= NULL
;
867 server
.saveparamslen
= 0;
870 static void initServerConfig() {
871 server
.dbnum
= REDIS_DEFAULT_DBNUM
;
872 server
.port
= REDIS_SERVERPORT
;
873 server
.verbosity
= REDIS_DEBUG
;
874 server
.maxidletime
= REDIS_MAXIDLETIME
;
875 server
.saveparams
= NULL
;
876 server
.logfile
= NULL
; /* NULL = log on standard output */
877 server
.bindaddr
= NULL
;
878 server
.glueoutputbuf
= 1;
879 server
.daemonize
= 0;
880 server
.pidfile
= "/var/run/redis.pid";
881 server
.dbfilename
= "dump.rdb";
882 server
.requirepass
= NULL
;
883 server
.shareobjects
= 0;
884 server
.maxclients
= 0;
885 server
.maxmemory
= 0;
886 ResetServerSaveParams();
888 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
889 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
890 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
891 /* Replication related */
893 server
.masterhost
= NULL
;
894 server
.masterport
= 6379;
895 server
.master
= NULL
;
896 server
.replstate
= REDIS_REPL_NONE
;
899 static void initServer() {
902 signal(SIGHUP
, SIG_IGN
);
903 signal(SIGPIPE
, SIG_IGN
);
905 server
.clients
= listCreate();
906 server
.slaves
= listCreate();
907 server
.monitors
= listCreate();
908 server
.objfreelist
= listCreate();
909 createSharedObjects();
910 server
.el
= aeCreateEventLoop();
911 server
.db
= zmalloc(sizeof(redisDb
)*server
.dbnum
);
912 server
.sharingpool
= dictCreate(&setDictType
,NULL
);
913 server
.sharingpoolsize
= 1024;
914 if (!server
.db
|| !server
.clients
|| !server
.slaves
|| !server
.monitors
|| !server
.el
|| !server
.objfreelist
)
915 oom("server initialization"); /* Fatal OOM */
916 server
.fd
= anetTcpServer(server
.neterr
, server
.port
, server
.bindaddr
);
917 if (server
.fd
== -1) {
918 redisLog(REDIS_WARNING
, "Opening TCP port: %s", server
.neterr
);
921 for (j
= 0; j
< server
.dbnum
; j
++) {
922 server
.db
[j
].dict
= dictCreate(&hashDictType
,NULL
);
923 server
.db
[j
].expires
= dictCreate(&setDictType
,NULL
);
926 server
.cronloops
= 0;
927 server
.bgsaveinprogress
= 0;
928 server
.bgsavechildpid
= -1;
929 server
.lastsave
= time(NULL
);
931 server
.usedmemory
= 0;
932 server
.stat_numcommands
= 0;
933 server
.stat_numconnections
= 0;
934 server
.stat_starttime
= time(NULL
);
935 aeCreateTimeEvent(server
.el
, 1000, serverCron
, NULL
, NULL
);
938 /* Empty the whole database */
939 static long long emptyDb() {
941 long long removed
= 0;
943 for (j
= 0; j
< server
.dbnum
; j
++) {
944 removed
+= dictSize(server
.db
[j
].dict
);
945 dictEmpty(server
.db
[j
].dict
);
946 dictEmpty(server
.db
[j
].expires
);
951 static int yesnotoi(char *s
) {
952 if (!strcasecmp(s
,"yes")) return 1;
953 else if (!strcasecmp(s
,"no")) return 0;
957 /* I agree, this is a very rudimental way to load a configuration...
958 will improve later if the config gets more complex */
959 static void loadServerConfig(char *filename
) {
960 FILE *fp
= fopen(filename
,"r");
961 char buf
[REDIS_CONFIGLINE_MAX
+1], *err
= NULL
;
966 redisLog(REDIS_WARNING
,"Fatal error, can't open config file");
969 while(fgets(buf
,REDIS_CONFIGLINE_MAX
+1,fp
) != NULL
) {
975 line
= sdstrim(line
," \t\r\n");
977 /* Skip comments and blank lines*/
978 if (line
[0] == '#' || line
[0] == '\0') {
983 /* Split into arguments */
984 argv
= sdssplitlen(line
,sdslen(line
)," ",1,&argc
);
987 /* Execute config directives */
988 if (!strcasecmp(argv
[0],"timeout") && argc
== 2) {
989 server
.maxidletime
= atoi(argv
[1]);
990 if (server
.maxidletime
< 0) {
991 err
= "Invalid timeout value"; goto loaderr
;
993 } else if (!strcasecmp(argv
[0],"port") && argc
== 2) {
994 server
.port
= atoi(argv
[1]);
995 if (server
.port
< 1 || server
.port
> 65535) {
996 err
= "Invalid port"; goto loaderr
;
998 } else if (!strcasecmp(argv
[0],"bind") && argc
== 2) {
999 server
.bindaddr
= zstrdup(argv
[1]);
1000 } else if (!strcasecmp(argv
[0],"save") && argc
== 3) {
1001 int seconds
= atoi(argv
[1]);
1002 int changes
= atoi(argv
[2]);
1003 if (seconds
< 1 || changes
< 0) {
1004 err
= "Invalid save parameters"; goto loaderr
;
1006 appendServerSaveParams(seconds
,changes
);
1007 } else if (!strcasecmp(argv
[0],"dir") && argc
== 2) {
1008 if (chdir(argv
[1]) == -1) {
1009 redisLog(REDIS_WARNING
,"Can't chdir to '%s': %s",
1010 argv
[1], strerror(errno
));
1013 } else if (!strcasecmp(argv
[0],"loglevel") && argc
== 2) {
1014 if (!strcasecmp(argv
[1],"debug")) server
.verbosity
= REDIS_DEBUG
;
1015 else if (!strcasecmp(argv
[1],"notice")) server
.verbosity
= REDIS_NOTICE
;
1016 else if (!strcasecmp(argv
[1],"warning")) server
.verbosity
= REDIS_WARNING
;
1018 err
= "Invalid log level. Must be one of debug, notice, warning";
1021 } else if (!strcasecmp(argv
[0],"logfile") && argc
== 2) {
1024 server
.logfile
= zstrdup(argv
[1]);
1025 if (!strcasecmp(server
.logfile
,"stdout")) {
1026 zfree(server
.logfile
);
1027 server
.logfile
= NULL
;
1029 if (server
.logfile
) {
1030 /* Test if we are able to open the file. The server will not
1031 * be able to abort just for this problem later... */
1032 fp
= fopen(server
.logfile
,"a");
1034 err
= sdscatprintf(sdsempty(),
1035 "Can't open the log file: %s", strerror(errno
));
1040 } else if (!strcasecmp(argv
[0],"databases") && argc
== 2) {
1041 server
.dbnum
= atoi(argv
[1]);
1042 if (server
.dbnum
< 1) {
1043 err
= "Invalid number of databases"; goto loaderr
;
1045 } else if (!strcasecmp(argv
[0],"maxclients") && argc
== 2) {
1046 server
.maxclients
= atoi(argv
[1]);
1047 } else if (!strcasecmp(argv
[0],"maxmemory") && argc
== 2) {
1048 server
.maxmemory
= atoi(argv
[1]);
1049 } else if (!strcasecmp(argv
[0],"slaveof") && argc
== 3) {
1050 server
.masterhost
= sdsnew(argv
[1]);
1051 server
.masterport
= atoi(argv
[2]);
1052 server
.replstate
= REDIS_REPL_CONNECT
;
1053 } else if (!strcasecmp(argv
[0],"glueoutputbuf") && argc
== 2) {
1054 if ((server
.glueoutputbuf
= yesnotoi(argv
[1])) == -1) {
1055 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1057 } else if (!strcasecmp(argv
[0],"shareobjects") && argc
== 2) {
1058 if ((server
.shareobjects
= yesnotoi(argv
[1])) == -1) {
1059 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1061 } else if (!strcasecmp(argv
[0],"shareobjectspoolsize") && argc
== 2) {
1062 server
.sharingpoolsize
= atoi(argv
[1]);
1063 if (server
.sharingpoolsize
< 1) {
1064 err
= "invalid object sharing pool size"; goto loaderr
;
1066 } else if (!strcasecmp(argv
[0],"daemonize") && argc
== 2) {
1067 if ((server
.daemonize
= yesnotoi(argv
[1])) == -1) {
1068 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1070 } else if (!strcasecmp(argv
[0],"requirepass") && argc
== 2) {
1071 server
.requirepass
= zstrdup(argv
[1]);
1072 } else if (!strcasecmp(argv
[0],"pidfile") && argc
== 2) {
1073 server
.pidfile
= zstrdup(argv
[1]);
1074 } else if (!strcasecmp(argv
[0],"dbfilename") && argc
== 2) {
1075 server
.dbfilename
= zstrdup(argv
[1]);
1077 err
= "Bad directive or wrong number of arguments"; goto loaderr
;
1079 for (j
= 0; j
< argc
; j
++)
1088 fprintf(stderr
, "\n*** FATAL CONFIG FILE ERROR ***\n");
1089 fprintf(stderr
, "Reading the configuration file, at line %d\n", linenum
);
1090 fprintf(stderr
, ">>> '%s'\n", line
);
1091 fprintf(stderr
, "%s\n", err
);
1095 static void freeClientArgv(redisClient
*c
) {
1098 for (j
= 0; j
< c
->argc
; j
++)
1099 decrRefCount(c
->argv
[j
]);
1103 static void freeClient(redisClient
*c
) {
1106 aeDeleteFileEvent(server
.el
,c
->fd
,AE_READABLE
);
1107 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1108 sdsfree(c
->querybuf
);
1109 listRelease(c
->reply
);
1112 ln
= listSearchKey(server
.clients
,c
);
1114 listDelNode(server
.clients
,ln
);
1115 if (c
->flags
& REDIS_SLAVE
) {
1116 if (c
->replstate
== REDIS_REPL_SEND_BULK
&& c
->repldbfd
!= -1)
1118 list
*l
= (c
->flags
& REDIS_MONITOR
) ? server
.monitors
: server
.slaves
;
1119 ln
= listSearchKey(l
,c
);
1123 if (c
->flags
& REDIS_MASTER
) {
1124 server
.master
= NULL
;
1125 server
.replstate
= REDIS_REPL_CONNECT
;
1131 static void glueReplyBuffersIfNeeded(redisClient
*c
) {
1136 listRewind(c
->reply
);
1137 while((ln
= listYield(c
->reply
))) {
1139 totlen
+= sdslen(o
->ptr
);
1140 /* This optimization makes more sense if we don't have to copy
1142 if (totlen
> 1024) return;
1148 listRewind(c
->reply
);
1149 while((ln
= listYield(c
->reply
))) {
1151 memcpy(buf
+copylen
,o
->ptr
,sdslen(o
->ptr
));
1152 copylen
+= sdslen(o
->ptr
);
1153 listDelNode(c
->reply
,ln
);
1155 /* Now the output buffer is empty, add the new single element */
1156 o
= createObject(REDIS_STRING
,sdsnewlen(buf
,totlen
));
1157 if (!listAddNodeTail(c
->reply
,o
)) oom("listAddNodeTail");
1161 static void sendReplyToClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1162 redisClient
*c
= privdata
;
1163 int nwritten
= 0, totwritten
= 0, objlen
;
1166 REDIS_NOTUSED(mask
);
1168 if (server
.glueoutputbuf
&& listLength(c
->reply
) > 1)
1169 glueReplyBuffersIfNeeded(c
);
1170 while(listLength(c
->reply
)) {
1171 o
= listNodeValue(listFirst(c
->reply
));
1172 objlen
= sdslen(o
->ptr
);
1175 listDelNode(c
->reply
,listFirst(c
->reply
));
1179 if (c
->flags
& REDIS_MASTER
) {
1180 nwritten
= objlen
- c
->sentlen
;
1182 nwritten
= write(fd
, ((char*)o
->ptr
)+c
->sentlen
, objlen
- c
->sentlen
);
1183 if (nwritten
<= 0) break;
1185 c
->sentlen
+= nwritten
;
1186 totwritten
+= nwritten
;
1187 /* If we fully sent the object on head go to the next one */
1188 if (c
->sentlen
== objlen
) {
1189 listDelNode(c
->reply
,listFirst(c
->reply
));
1193 if (nwritten
== -1) {
1194 if (errno
== EAGAIN
) {
1197 redisLog(REDIS_DEBUG
,
1198 "Error writing to client: %s", strerror(errno
));
1203 if (totwritten
> 0) c
->lastinteraction
= time(NULL
);
1204 if (listLength(c
->reply
) == 0) {
1206 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1210 static struct redisCommand
*lookupCommand(char *name
) {
1212 while(cmdTable
[j
].name
!= NULL
) {
1213 if (!strcasecmp(name
,cmdTable
[j
].name
)) return &cmdTable
[j
];
1219 /* resetClient prepare the client to process the next command */
1220 static void resetClient(redisClient
*c
) {
1225 /* If this function gets called we already read a whole
1226 * command, argments are in the client argv/argc fields.
1227 * processCommand() execute the command or prepare the
1228 * server for a bulk read from the client.
1230 * If 1 is returned the client is still alive and valid and
1231 * and other operations can be performed by the caller. Otherwise
1232 * if 0 is returned the client was destroied (i.e. after QUIT). */
1233 static int processCommand(redisClient
*c
) {
1234 struct redisCommand
*cmd
;
1237 /* Free some memory if needed (maxmemory setting) */
1238 if (server
.maxmemory
) freeMemoryIfNeeded();
1240 /* The QUIT command is handled as a special case. Normal command
1241 * procs are unable to close the client connection safely */
1242 if (!strcasecmp(c
->argv
[0]->ptr
,"quit")) {
1246 cmd
= lookupCommand(c
->argv
[0]->ptr
);
1248 addReplySds(c
,sdsnew("-ERR unknown command\r\n"));
1251 } else if ((cmd
->arity
> 0 && cmd
->arity
!= c
->argc
) ||
1252 (c
->argc
< -cmd
->arity
)) {
1253 addReplySds(c
,sdsnew("-ERR wrong number of arguments\r\n"));
1256 } else if (server
.maxmemory
&& cmd
->flags
& REDIS_CMD_DENYOOM
&& zmalloc_used_memory() > server
.maxmemory
) {
1257 addReplySds(c
,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1260 } else if (cmd
->flags
& REDIS_CMD_BULK
&& c
->bulklen
== -1) {
1261 int bulklen
= atoi(c
->argv
[c
->argc
-1]->ptr
);
1263 decrRefCount(c
->argv
[c
->argc
-1]);
1264 if (bulklen
< 0 || bulklen
> 1024*1024*1024) {
1266 addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n"));
1271 c
->bulklen
= bulklen
+2; /* add two bytes for CR+LF */
1272 /* It is possible that the bulk read is already in the
1273 * buffer. Check this condition and handle it accordingly */
1274 if ((signed)sdslen(c
->querybuf
) >= c
->bulklen
) {
1275 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
1277 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
1282 /* Let's try to share objects on the command arguments vector */
1283 if (server
.shareobjects
) {
1285 for(j
= 1; j
< c
->argc
; j
++)
1286 c
->argv
[j
] = tryObjectSharing(c
->argv
[j
]);
1288 /* Check if the user is authenticated */
1289 if (server
.requirepass
&& !c
->authenticated
&& cmd
->proc
!= authCommand
) {
1290 addReplySds(c
,sdsnew("-ERR operation not permitted\r\n"));
1295 /* Exec the command */
1296 dirty
= server
.dirty
;
1298 if (server
.dirty
-dirty
!= 0 && listLength(server
.slaves
))
1299 replicationFeedSlaves(server
.slaves
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1300 if (listLength(server
.monitors
))
1301 replicationFeedSlaves(server
.monitors
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1302 server
.stat_numcommands
++;
1304 /* Prepare the client for the next command */
1305 if (c
->flags
& REDIS_CLOSE
) {
1313 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
) {
1317 /* (args*2)+1 is enough room for args, spaces, newlines */
1318 robj
*static_outv
[REDIS_STATIC_ARGS
*2+1];
1320 if (argc
<= REDIS_STATIC_ARGS
) {
1323 outv
= zmalloc(sizeof(robj
*)*(argc
*2+1));
1324 if (!outv
) oom("replicationFeedSlaves");
1327 for (j
= 0; j
< argc
; j
++) {
1328 if (j
!= 0) outv
[outc
++] = shared
.space
;
1329 if ((cmd
->flags
& REDIS_CMD_BULK
) && j
== argc
-1) {
1332 lenobj
= createObject(REDIS_STRING
,
1333 sdscatprintf(sdsempty(),"%d\r\n",sdslen(argv
[j
]->ptr
)));
1334 lenobj
->refcount
= 0;
1335 outv
[outc
++] = lenobj
;
1337 outv
[outc
++] = argv
[j
];
1339 outv
[outc
++] = shared
.crlf
;
1341 /* Increment all the refcounts at start and decrement at end in order to
1342 * be sure to free objects if there is no slave in a replication state
1343 * able to be feed with commands */
1344 for (j
= 0; j
< outc
; j
++) incrRefCount(outv
[j
]);
1346 while((ln
= listYield(slaves
))) {
1347 redisClient
*slave
= ln
->value
;
1349 /* Don't feed slaves that are still waiting for BGSAVE to start */
1350 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) continue;
1352 /* Feed all the other slaves, MONITORs and so on */
1353 if (slave
->slaveseldb
!= dictid
) {
1357 case 0: selectcmd
= shared
.select0
; break;
1358 case 1: selectcmd
= shared
.select1
; break;
1359 case 2: selectcmd
= shared
.select2
; break;
1360 case 3: selectcmd
= shared
.select3
; break;
1361 case 4: selectcmd
= shared
.select4
; break;
1362 case 5: selectcmd
= shared
.select5
; break;
1363 case 6: selectcmd
= shared
.select6
; break;
1364 case 7: selectcmd
= shared
.select7
; break;
1365 case 8: selectcmd
= shared
.select8
; break;
1366 case 9: selectcmd
= shared
.select9
; break;
1368 selectcmd
= createObject(REDIS_STRING
,
1369 sdscatprintf(sdsempty(),"select %d\r\n",dictid
));
1370 selectcmd
->refcount
= 0;
1373 addReply(slave
,selectcmd
);
1374 slave
->slaveseldb
= dictid
;
1376 for (j
= 0; j
< outc
; j
++) addReply(slave
,outv
[j
]);
1378 for (j
= 0; j
< outc
; j
++) decrRefCount(outv
[j
]);
1379 if (outv
!= static_outv
) zfree(outv
);
1382 static void readQueryFromClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1383 redisClient
*c
= (redisClient
*) privdata
;
1384 char buf
[REDIS_IOBUF_LEN
];
1387 REDIS_NOTUSED(mask
);
1389 nread
= read(fd
, buf
, REDIS_IOBUF_LEN
);
1391 if (errno
== EAGAIN
) {
1394 redisLog(REDIS_DEBUG
, "Reading from client: %s",strerror(errno
));
1398 } else if (nread
== 0) {
1399 redisLog(REDIS_DEBUG
, "Client closed connection");
1404 c
->querybuf
= sdscatlen(c
->querybuf
, buf
, nread
);
1405 c
->lastinteraction
= time(NULL
);
1411 if (c
->bulklen
== -1) {
1412 /* Read the first line of the query */
1413 char *p
= strchr(c
->querybuf
,'\n');
1419 query
= c
->querybuf
;
1420 c
->querybuf
= sdsempty();
1421 querylen
= 1+(p
-(query
));
1422 if (sdslen(query
) > querylen
) {
1423 /* leave data after the first line of the query in the buffer */
1424 c
->querybuf
= sdscatlen(c
->querybuf
,query
+querylen
,sdslen(query
)-querylen
);
1426 *p
= '\0'; /* remove "\n" */
1427 if (*(p
-1) == '\r') *(p
-1) = '\0'; /* and "\r" if any */
1428 sdsupdatelen(query
);
1430 /* Now we can split the query in arguments */
1431 if (sdslen(query
) == 0) {
1432 /* Ignore empty query */
1436 argv
= sdssplitlen(query
,sdslen(query
)," ",1,&argc
);
1437 if (argv
== NULL
) oom("sdssplitlen");
1440 if (c
->argv
) zfree(c
->argv
);
1441 c
->argv
= zmalloc(sizeof(robj
*)*argc
);
1442 if (c
->argv
== NULL
) oom("allocating arguments list for client");
1444 for (j
= 0; j
< argc
; j
++) {
1445 if (sdslen(argv
[j
])) {
1446 c
->argv
[c
->argc
] = createObject(REDIS_STRING
,argv
[j
]);
1453 /* Execute the command. If the client is still valid
1454 * after processCommand() return and there is something
1455 * on the query buffer try to process the next command. */
1456 if (processCommand(c
) && sdslen(c
->querybuf
)) goto again
;
1458 } else if (sdslen(c
->querybuf
) >= 1024*32) {
1459 redisLog(REDIS_DEBUG
, "Client protocol error");
1464 /* Bulk read handling. Note that if we are at this point
1465 the client already sent a command terminated with a newline,
1466 we are reading the bulk data that is actually the last
1467 argument of the command. */
1468 int qbl
= sdslen(c
->querybuf
);
1470 if (c
->bulklen
<= qbl
) {
1471 /* Copy everything but the final CRLF as final argument */
1472 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
1474 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
1481 static int selectDb(redisClient
*c
, int id
) {
1482 if (id
< 0 || id
>= server
.dbnum
)
1484 c
->db
= &server
.db
[id
];
1488 static void *dupClientReplyValue(void *o
) {
1489 incrRefCount((robj
*)o
);
1493 static redisClient
*createClient(int fd
) {
1494 redisClient
*c
= zmalloc(sizeof(*c
));
1496 anetNonBlock(NULL
,fd
);
1497 anetTcpNoDelay(NULL
,fd
);
1498 if (!c
) return NULL
;
1501 c
->querybuf
= sdsempty();
1507 c
->lastinteraction
= time(NULL
);
1508 c
->authenticated
= 0;
1509 c
->replstate
= REDIS_REPL_NONE
;
1510 if ((c
->reply
= listCreate()) == NULL
) oom("listCreate");
1511 listSetFreeMethod(c
->reply
,decrRefCount
);
1512 listSetDupMethod(c
->reply
,dupClientReplyValue
);
1513 if (aeCreateFileEvent(server
.el
, c
->fd
, AE_READABLE
,
1514 readQueryFromClient
, c
, NULL
) == AE_ERR
) {
1518 if (!listAddNodeTail(server
.clients
,c
)) oom("listAddNodeTail");
1522 static void addReply(redisClient
*c
, robj
*obj
) {
1523 if (listLength(c
->reply
) == 0 &&
1524 (c
->replstate
== REDIS_REPL_NONE
||
1525 c
->replstate
== REDIS_REPL_ONLINE
) &&
1526 aeCreateFileEvent(server
.el
, c
->fd
, AE_WRITABLE
,
1527 sendReplyToClient
, c
, NULL
) == AE_ERR
) return;
1528 if (!listAddNodeTail(c
->reply
,obj
)) oom("listAddNodeTail");
1532 static void addReplySds(redisClient
*c
, sds s
) {
1533 robj
*o
= createObject(REDIS_STRING
,s
);
1538 static void acceptHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1543 REDIS_NOTUSED(mask
);
1544 REDIS_NOTUSED(privdata
);
1546 cfd
= anetAccept(server
.neterr
, fd
, cip
, &cport
);
1547 if (cfd
== AE_ERR
) {
1548 redisLog(REDIS_DEBUG
,"Accepting client connection: %s", server
.neterr
);
1551 redisLog(REDIS_DEBUG
,"Accepted %s:%d", cip
, cport
);
1552 if ((c
= createClient(cfd
)) == NULL
) {
1553 redisLog(REDIS_WARNING
,"Error allocating resoures for the client");
1554 close(cfd
); /* May be already closed, just ingore errors */
1557 /* If maxclient directive is set and this is one client more... close the
1558 * connection. Note that we create the client instead to check before
1559 * for this condition, since now the socket is already set in nonblocking
1560 * mode and we can send an error for free using the Kernel I/O */
1561 if (server
.maxclients
&& listLength(server
.clients
) > server
.maxclients
) {
1562 char *err
= "-ERR max number of clients reached\r\n";
1564 /* That's a best effort error message, don't check write errors */
1565 (void) write(c
->fd
,err
,strlen(err
));
1569 server
.stat_numconnections
++;
1572 /* ======================= Redis objects implementation ===================== */
1574 static robj
*createObject(int type
, void *ptr
) {
1577 if (listLength(server
.objfreelist
)) {
1578 listNode
*head
= listFirst(server
.objfreelist
);
1579 o
= listNodeValue(head
);
1580 listDelNode(server
.objfreelist
,head
);
1582 o
= zmalloc(sizeof(*o
));
1584 if (!o
) oom("createObject");
1591 static robj
*createStringObject(char *ptr
, size_t len
) {
1592 return createObject(REDIS_STRING
,sdsnewlen(ptr
,len
));
1595 static robj
*createListObject(void) {
1596 list
*l
= listCreate();
1598 if (!l
) oom("listCreate");
1599 listSetFreeMethod(l
,decrRefCount
);
1600 return createObject(REDIS_LIST
,l
);
1603 static robj
*createSetObject(void) {
1604 dict
*d
= dictCreate(&setDictType
,NULL
);
1605 if (!d
) oom("dictCreate");
1606 return createObject(REDIS_SET
,d
);
1609 static void freeStringObject(robj
*o
) {
1613 static void freeListObject(robj
*o
) {
1614 listRelease((list
*) o
->ptr
);
1617 static void freeSetObject(robj
*o
) {
1618 dictRelease((dict
*) o
->ptr
);
1621 static void freeHashObject(robj
*o
) {
1622 dictRelease((dict
*) o
->ptr
);
1625 static void incrRefCount(robj
*o
) {
1627 #ifdef DEBUG_REFCOUNT
1628 if (o
->type
== REDIS_STRING
)
1629 printf("Increment '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
);
1633 static void decrRefCount(void *obj
) {
1636 #ifdef DEBUG_REFCOUNT
1637 if (o
->type
== REDIS_STRING
)
1638 printf("Decrement '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
-1);
1640 if (--(o
->refcount
) == 0) {
1642 case REDIS_STRING
: freeStringObject(o
); break;
1643 case REDIS_LIST
: freeListObject(o
); break;
1644 case REDIS_SET
: freeSetObject(o
); break;
1645 case REDIS_HASH
: freeHashObject(o
); break;
1646 default: assert(0 != 0); break;
1648 if (listLength(server
.objfreelist
) > REDIS_OBJFREELIST_MAX
||
1649 !listAddNodeHead(server
.objfreelist
,o
))
1654 /* Try to share an object against the shared objects pool */
1655 static robj
*tryObjectSharing(robj
*o
) {
1656 struct dictEntry
*de
;
1659 if (o
== NULL
|| server
.shareobjects
== 0) return o
;
1661 assert(o
->type
== REDIS_STRING
);
1662 de
= dictFind(server
.sharingpool
,o
);
1664 robj
*shared
= dictGetEntryKey(de
);
1666 c
= ((unsigned long) dictGetEntryVal(de
))+1;
1667 dictGetEntryVal(de
) = (void*) c
;
1668 incrRefCount(shared
);
1672 /* Here we are using a stream algorihtm: Every time an object is
1673 * shared we increment its count, everytime there is a miss we
1674 * recrement the counter of a random object. If this object reaches
1675 * zero we remove the object and put the current object instead. */
1676 if (dictSize(server
.sharingpool
) >=
1677 server
.sharingpoolsize
) {
1678 de
= dictGetRandomKey(server
.sharingpool
);
1680 c
= ((unsigned long) dictGetEntryVal(de
))-1;
1681 dictGetEntryVal(de
) = (void*) c
;
1683 dictDelete(server
.sharingpool
,de
->key
);
1686 c
= 0; /* If the pool is empty we want to add this object */
1691 retval
= dictAdd(server
.sharingpool
,o
,(void*)1);
1692 assert(retval
== DICT_OK
);
1699 static robj
*lookupKey(redisDb
*db
, robj
*key
) {
1700 dictEntry
*de
= dictFind(db
->dict
,key
);
1701 return de
? dictGetEntryVal(de
) : NULL
;
1704 static robj
*lookupKeyRead(redisDb
*db
, robj
*key
) {
1705 expireIfNeeded(db
,key
);
1706 return lookupKey(db
,key
);
1709 static robj
*lookupKeyWrite(redisDb
*db
, robj
*key
) {
1710 deleteIfVolatile(db
,key
);
1711 return lookupKey(db
,key
);
1714 static int deleteKey(redisDb
*db
, robj
*key
) {
1717 /* We need to protect key from destruction: after the first dictDelete()
1718 * it may happen that 'key' is no longer valid if we don't increment
1719 * it's count. This may happen when we get the object reference directly
1720 * from the hash table with dictRandomKey() or dict iterators */
1722 if (dictSize(db
->expires
)) dictDelete(db
->expires
,key
);
1723 retval
= dictDelete(db
->dict
,key
);
1726 return retval
== DICT_OK
;
1729 /*============================ DB saving/loading ============================ */
1731 static int rdbSaveType(FILE *fp
, unsigned char type
) {
1732 if (fwrite(&type
,1,1,fp
) == 0) return -1;
1736 static int rdbSaveTime(FILE *fp
, time_t t
) {
1737 int32_t t32
= (int32_t) t
;
1738 if (fwrite(&t32
,4,1,fp
) == 0) return -1;
1742 /* check rdbLoadLen() comments for more info */
1743 static int rdbSaveLen(FILE *fp
, uint32_t len
) {
1744 unsigned char buf
[2];
1747 /* Save a 6 bit len */
1748 buf
[0] = (len
&0xFF)|(REDIS_RDB_6BITLEN
<<6);
1749 if (fwrite(buf
,1,1,fp
) == 0) return -1;
1750 } else if (len
< (1<<14)) {
1751 /* Save a 14 bit len */
1752 buf
[0] = ((len
>>8)&0xFF)|(REDIS_RDB_14BITLEN
<<6);
1754 if (fwrite(buf
,2,1,fp
) == 0) return -1;
1756 /* Save a 32 bit len */
1757 buf
[0] = (REDIS_RDB_32BITLEN
<<6);
1758 if (fwrite(buf
,1,1,fp
) == 0) return -1;
1760 if (fwrite(&len
,4,1,fp
) == 0) return -1;
1765 /* String objects in the form "2391" "-100" without any space and with a
1766 * range of values that can fit in an 8, 16 or 32 bit signed value can be
1767 * encoded as integers to save space */
1768 int rdbTryIntegerEncoding(sds s
, unsigned char *enc
) {
1770 char *endptr
, buf
[32];
1772 /* Check if it's possible to encode this value as a number */
1773 value
= strtoll(s
, &endptr
, 10);
1774 if (endptr
[0] != '\0') return 0;
1775 snprintf(buf
,32,"%lld",value
);
1777 /* If the number converted back into a string is not identical
1778 * then it's not possible to encode the string as integer */
1779 if (strlen(buf
) != sdslen(s
) || memcmp(buf
,s
,sdslen(s
))) return 0;
1781 /* Finally check if it fits in our ranges */
1782 if (value
>= -(1<<7) && value
<= (1<<7)-1) {
1783 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT8
;
1784 enc
[1] = value
&0xFF;
1786 } else if (value
>= -(1<<15) && value
<= (1<<15)-1) {
1787 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT16
;
1788 enc
[1] = value
&0xFF;
1789 enc
[2] = (value
>>8)&0xFF;
1791 } else if (value
>= -((long long)1<<31) && value
<= ((long long)1<<31)-1) {
1792 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT32
;
1793 enc
[1] = value
&0xFF;
1794 enc
[2] = (value
>>8)&0xFF;
1795 enc
[3] = (value
>>16)&0xFF;
1796 enc
[4] = (value
>>24)&0xFF;
1803 static int rdbSaveLzfStringObject(FILE *fp
, robj
*obj
) {
1804 unsigned int comprlen
, outlen
;
1808 /* We require at least four bytes compression for this to be worth it */
1809 outlen
= sdslen(obj
->ptr
)-4;
1810 if (outlen
<= 0) return 0;
1811 if ((out
= zmalloc(outlen
+1)) == NULL
) return 0;
1812 comprlen
= lzf_compress(obj
->ptr
, sdslen(obj
->ptr
), out
, outlen
);
1813 if (comprlen
== 0) {
1817 /* Data compressed! Let's save it on disk */
1818 byte
= (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_LZF
;
1819 if (fwrite(&byte
,1,1,fp
) == 0) goto writeerr
;
1820 if (rdbSaveLen(fp
,comprlen
) == -1) goto writeerr
;
1821 if (rdbSaveLen(fp
,sdslen(obj
->ptr
)) == -1) goto writeerr
;
1822 if (fwrite(out
,comprlen
,1,fp
) == 0) goto writeerr
;
1831 /* Save a string objet as [len][data] on disk. If the object is a string
1832 * representation of an integer value we try to safe it in a special form */
1833 static int rdbSaveStringObject(FILE *fp
, robj
*obj
) {
1834 size_t len
= sdslen(obj
->ptr
);
1837 /* Try integer encoding */
1839 unsigned char buf
[5];
1840 if ((enclen
= rdbTryIntegerEncoding(obj
->ptr
,buf
)) > 0) {
1841 if (fwrite(buf
,enclen
,1,fp
) == 0) return -1;
1846 /* Try LZF compression - under 20 bytes it's unable to compress even
1847 * aaaaaaaaaaaaaaaaaa so skip it */
1848 if (1 && len
> 20) {
1851 retval
= rdbSaveLzfStringObject(fp
,obj
);
1852 if (retval
== -1) return -1;
1853 if (retval
> 0) return 0;
1854 /* retval == 0 means data can't be compressed, save the old way */
1857 /* Store verbatim */
1858 if (rdbSaveLen(fp
,len
) == -1) return -1;
1859 if (len
&& fwrite(obj
->ptr
,len
,1,fp
) == 0) return -1;
1863 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
1864 static int rdbSave(char *filename
) {
1865 dictIterator
*di
= NULL
;
1870 time_t now
= time(NULL
);
1872 snprintf(tmpfile
,256,"temp-%d.%ld.rdb",(int)time(NULL
),(long int)random());
1873 fp
= fopen(tmpfile
,"w");
1875 redisLog(REDIS_WARNING
, "Failed saving the DB: %s", strerror(errno
));
1878 if (fwrite("REDIS0001",9,1,fp
) == 0) goto werr
;
1879 for (j
= 0; j
< server
.dbnum
; j
++) {
1880 redisDb
*db
= server
.db
+j
;
1882 if (dictSize(d
) == 0) continue;
1883 di
= dictGetIterator(d
);
1889 /* Write the SELECT DB opcode */
1890 if (rdbSaveType(fp
,REDIS_SELECTDB
) == -1) goto werr
;
1891 if (rdbSaveLen(fp
,j
) == -1) goto werr
;
1893 /* Iterate this DB writing every entry */
1894 while((de
= dictNext(di
)) != NULL
) {
1895 robj
*key
= dictGetEntryKey(de
);
1896 robj
*o
= dictGetEntryVal(de
);
1897 time_t expiretime
= getExpire(db
,key
);
1899 /* Save the expire time */
1900 if (expiretime
!= -1) {
1901 /* If this key is already expired skip it */
1902 if (expiretime
< now
) continue;
1903 if (rdbSaveType(fp
,REDIS_EXPIRETIME
) == -1) goto werr
;
1904 if (rdbSaveTime(fp
,expiretime
) == -1) goto werr
;
1906 /* Save the key and associated value */
1907 if (rdbSaveType(fp
,o
->type
) == -1) goto werr
;
1908 if (rdbSaveStringObject(fp
,key
) == -1) goto werr
;
1909 if (o
->type
== REDIS_STRING
) {
1910 /* Save a string value */
1911 if (rdbSaveStringObject(fp
,o
) == -1) goto werr
;
1912 } else if (o
->type
== REDIS_LIST
) {
1913 /* Save a list value */
1914 list
*list
= o
->ptr
;
1918 if (rdbSaveLen(fp
,listLength(list
)) == -1) goto werr
;
1919 while((ln
= listYield(list
))) {
1920 robj
*eleobj
= listNodeValue(ln
);
1922 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
1924 } else if (o
->type
== REDIS_SET
) {
1925 /* Save a set value */
1927 dictIterator
*di
= dictGetIterator(set
);
1930 if (!set
) oom("dictGetIteraotr");
1931 if (rdbSaveLen(fp
,dictSize(set
)) == -1) goto werr
;
1932 while((de
= dictNext(di
)) != NULL
) {
1933 robj
*eleobj
= dictGetEntryKey(de
);
1935 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
1937 dictReleaseIterator(di
);
1942 dictReleaseIterator(di
);
1945 if (rdbSaveType(fp
,REDIS_EOF
) == -1) goto werr
;
1947 /* Make sure data will not remain on the OS's output buffers */
1952 /* Use RENAME to make sure the DB file is changed atomically only
1953 * if the generate DB file is ok. */
1954 if (rename(tmpfile
,filename
) == -1) {
1955 redisLog(REDIS_WARNING
,"Error moving temp DB file on the final destionation: %s", strerror(errno
));
1959 redisLog(REDIS_NOTICE
,"DB saved on disk");
1961 server
.lastsave
= time(NULL
);
1967 redisLog(REDIS_WARNING
,"Write error saving DB on disk: %s", strerror(errno
));
1968 if (di
) dictReleaseIterator(di
);
1972 static int rdbSaveBackground(char *filename
) {
1975 if (server
.bgsaveinprogress
) return REDIS_ERR
;
1976 if ((childpid
= fork()) == 0) {
1979 if (rdbSave(filename
) == REDIS_OK
) {
1986 if (childpid
== -1) {
1987 redisLog(REDIS_WARNING
,"Can't save in background: fork: %s",
1991 redisLog(REDIS_NOTICE
,"Background saving started by pid %d",childpid
);
1992 server
.bgsaveinprogress
= 1;
1993 server
.bgsavechildpid
= childpid
;
1996 return REDIS_OK
; /* unreached */
1999 static int rdbLoadType(FILE *fp
) {
2001 if (fread(&type
,1,1,fp
) == 0) return -1;
2005 static time_t rdbLoadTime(FILE *fp
) {
2007 if (fread(&t32
,4,1,fp
) == 0) return -1;
2008 return (time_t) t32
;
2011 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2012 * of this file for a description of how this are stored on disk.
2014 * isencoded is set to 1 if the readed length is not actually a length but
2015 * an "encoding type", check the above comments for more info */
2016 static uint32_t rdbLoadLen(FILE *fp
, int rdbver
, int *isencoded
) {
2017 unsigned char buf
[2];
2020 if (isencoded
) *isencoded
= 0;
2022 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
2027 if (fread(buf
,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2028 type
= (buf
[0]&0xC0)>>6;
2029 if (type
== REDIS_RDB_6BITLEN
) {
2030 /* Read a 6 bit len */
2032 } else if (type
== REDIS_RDB_ENCVAL
) {
2033 /* Read a 6 bit len encoding type */
2034 if (isencoded
) *isencoded
= 1;
2036 } else if (type
== REDIS_RDB_14BITLEN
) {
2037 /* Read a 14 bit len */
2038 if (fread(buf
+1,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2039 return ((buf
[0]&0x3F)<<8)|buf
[1];
2041 /* Read a 32 bit len */
2042 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
2048 static robj
*rdbLoadIntegerObject(FILE *fp
, int enctype
) {
2049 unsigned char enc
[4];
2052 if (enctype
== REDIS_RDB_ENC_INT8
) {
2053 if (fread(enc
,1,1,fp
) == 0) return NULL
;
2054 val
= (signed char)enc
[0];
2055 } else if (enctype
== REDIS_RDB_ENC_INT16
) {
2057 if (fread(enc
,2,1,fp
) == 0) return NULL
;
2058 v
= enc
[0]|(enc
[1]<<8);
2060 } else if (enctype
== REDIS_RDB_ENC_INT32
) {
2062 if (fread(enc
,4,1,fp
) == 0) return NULL
;
2063 v
= enc
[0]|(enc
[1]<<8)|(enc
[2]<<16)|(enc
[3]<<24);
2066 val
= 0; /* anti-warning */
2069 return createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",val
));
2072 static robj
*rdbLoadLzfStringObject(FILE*fp
, int rdbver
) {
2073 unsigned int len
, clen
;
2074 unsigned char *c
= NULL
;
2077 if ((clen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2078 if ((len
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2079 if ((c
= zmalloc(clen
)) == NULL
) goto err
;
2080 if ((val
= sdsnewlen(NULL
,len
)) == NULL
) goto err
;
2081 if (fread(c
,clen
,1,fp
) == 0) goto err
;
2082 if (lzf_decompress(c
,clen
,val
,len
) == 0) goto err
;
2084 return createObject(REDIS_STRING
,val
);
2091 static robj
*rdbLoadStringObject(FILE*fp
, int rdbver
) {
2096 len
= rdbLoadLen(fp
,rdbver
,&isencoded
);
2099 case REDIS_RDB_ENC_INT8
:
2100 case REDIS_RDB_ENC_INT16
:
2101 case REDIS_RDB_ENC_INT32
:
2102 return tryObjectSharing(rdbLoadIntegerObject(fp
,len
));
2103 case REDIS_RDB_ENC_LZF
:
2104 return tryObjectSharing(rdbLoadLzfStringObject(fp
,rdbver
));
2110 if (len
== REDIS_RDB_LENERR
) return NULL
;
2111 val
= sdsnewlen(NULL
,len
);
2112 if (len
&& fread(val
,len
,1,fp
) == 0) {
2116 return tryObjectSharing(createObject(REDIS_STRING
,val
));
2119 static int rdbLoad(char *filename
) {
2121 robj
*keyobj
= NULL
;
2123 int type
, retval
, rdbver
;
2124 dict
*d
= server
.db
[0].dict
;
2125 redisDb
*db
= server
.db
+0;
2127 time_t expiretime
= -1, now
= time(NULL
);
2129 fp
= fopen(filename
,"r");
2130 if (!fp
) return REDIS_ERR
;
2131 if (fread(buf
,9,1,fp
) == 0) goto eoferr
;
2133 if (memcmp(buf
,"REDIS",5) != 0) {
2135 redisLog(REDIS_WARNING
,"Wrong signature trying to load DB from file");
2138 rdbver
= atoi(buf
+5);
2141 redisLog(REDIS_WARNING
,"Can't handle RDB format version %d",rdbver
);
2148 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
2149 if (type
== REDIS_EXPIRETIME
) {
2150 if ((expiretime
= rdbLoadTime(fp
)) == -1) goto eoferr
;
2151 /* We read the time so we need to read the object type again */
2152 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
2154 if (type
== REDIS_EOF
) break;
2155 /* Handle SELECT DB opcode as a special case */
2156 if (type
== REDIS_SELECTDB
) {
2157 if ((dbid
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2159 if (dbid
>= (unsigned)server
.dbnum
) {
2160 redisLog(REDIS_WARNING
,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server
.dbnum
);
2163 db
= server
.db
+dbid
;
2168 if ((keyobj
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2170 if (type
== REDIS_STRING
) {
2171 /* Read string value */
2172 if ((o
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2173 } else if (type
== REDIS_LIST
|| type
== REDIS_SET
) {
2174 /* Read list/set value */
2177 if ((listlen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2179 o
= (type
== REDIS_LIST
) ? createListObject() : createSetObject();
2180 /* Load every single element of the list/set */
2184 if ((ele
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2185 if (type
== REDIS_LIST
) {
2186 if (!listAddNodeTail((list
*)o
->ptr
,ele
))
2187 oom("listAddNodeTail");
2189 if (dictAdd((dict
*)o
->ptr
,ele
,NULL
) == DICT_ERR
)
2196 /* Add the new object in the hash table */
2197 retval
= dictAdd(d
,keyobj
,o
);
2198 if (retval
== DICT_ERR
) {
2199 redisLog(REDIS_WARNING
,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj
->ptr
);
2202 /* Set the expire time if needed */
2203 if (expiretime
!= -1) {
2204 setExpire(db
,keyobj
,expiretime
);
2205 /* Delete this key if already expired */
2206 if (expiretime
< now
) deleteKey(db
,keyobj
);
2214 eoferr
: /* unexpected end of file is handled here with a fatal exit */
2215 if (keyobj
) decrRefCount(keyobj
);
2216 redisLog(REDIS_WARNING
,"Short read or OOM loading DB. Unrecoverable error, exiting now.");
2218 return REDIS_ERR
; /* Just to avoid warning */
2221 /*================================== Commands =============================== */
2223 static void authCommand(redisClient
*c
) {
2224 if (!server
.requirepass
|| !strcmp(c
->argv
[1]->ptr
, server
.requirepass
)) {
2225 c
->authenticated
= 1;
2226 addReply(c
,shared
.ok
);
2228 c
->authenticated
= 0;
2229 addReply(c
,shared
.err
);
2233 static void pingCommand(redisClient
*c
) {
2234 addReply(c
,shared
.pong
);
2237 static void echoCommand(redisClient
*c
) {
2238 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",
2239 (int)sdslen(c
->argv
[1]->ptr
)));
2240 addReply(c
,c
->argv
[1]);
2241 addReply(c
,shared
.crlf
);
2244 /*=================================== Strings =============================== */
2246 static void setGenericCommand(redisClient
*c
, int nx
) {
2249 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2250 if (retval
== DICT_ERR
) {
2252 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2253 incrRefCount(c
->argv
[2]);
2255 addReply(c
,shared
.czero
);
2259 incrRefCount(c
->argv
[1]);
2260 incrRefCount(c
->argv
[2]);
2263 removeExpire(c
->db
,c
->argv
[1]);
2264 addReply(c
, nx
? shared
.cone
: shared
.ok
);
2267 static void setCommand(redisClient
*c
) {
2268 setGenericCommand(c
,0);
2271 static void setnxCommand(redisClient
*c
) {
2272 setGenericCommand(c
,1);
2275 static void getCommand(redisClient
*c
) {
2276 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2279 addReply(c
,shared
.nullbulk
);
2281 if (o
->type
!= REDIS_STRING
) {
2282 addReply(c
,shared
.wrongtypeerr
);
2284 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(o
->ptr
)));
2286 addReply(c
,shared
.crlf
);
2291 static void getSetCommand(redisClient
*c
) {
2293 if (dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]) == DICT_ERR
) {
2294 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2296 incrRefCount(c
->argv
[1]);
2298 incrRefCount(c
->argv
[2]);
2300 removeExpire(c
->db
,c
->argv
[1]);
2303 static void mgetCommand(redisClient
*c
) {
2306 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",c
->argc
-1));
2307 for (j
= 1; j
< c
->argc
; j
++) {
2308 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[j
]);
2310 addReply(c
,shared
.nullbulk
);
2312 if (o
->type
!= REDIS_STRING
) {
2313 addReply(c
,shared
.nullbulk
);
2315 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(o
->ptr
)));
2317 addReply(c
,shared
.crlf
);
2323 static void incrDecrCommand(redisClient
*c
, long long incr
) {
2328 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2332 if (o
->type
!= REDIS_STRING
) {
2337 value
= strtoll(o
->ptr
, &eptr
, 10);
2342 o
= createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",value
));
2343 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],o
);
2344 if (retval
== DICT_ERR
) {
2345 dictReplace(c
->db
->dict
,c
->argv
[1],o
);
2346 removeExpire(c
->db
,c
->argv
[1]);
2348 incrRefCount(c
->argv
[1]);
2351 addReply(c
,shared
.colon
);
2353 addReply(c
,shared
.crlf
);
2356 static void incrCommand(redisClient
*c
) {
2357 incrDecrCommand(c
,1);
2360 static void decrCommand(redisClient
*c
) {
2361 incrDecrCommand(c
,-1);
2364 static void incrbyCommand(redisClient
*c
) {
2365 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
2366 incrDecrCommand(c
,incr
);
2369 static void decrbyCommand(redisClient
*c
) {
2370 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
2371 incrDecrCommand(c
,-incr
);
2374 /* ========================= Type agnostic commands ========================= */
2376 static void delCommand(redisClient
*c
) {
2379 for (j
= 1; j
< c
->argc
; j
++) {
2380 if (deleteKey(c
->db
,c
->argv
[j
])) {
2387 addReply(c
,shared
.czero
);
2390 addReply(c
,shared
.cone
);
2393 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",deleted
));
2398 static void existsCommand(redisClient
*c
) {
2399 addReply(c
,lookupKeyRead(c
->db
,c
->argv
[1]) ? shared
.cone
: shared
.czero
);
2402 static void selectCommand(redisClient
*c
) {
2403 int id
= atoi(c
->argv
[1]->ptr
);
2405 if (selectDb(c
,id
) == REDIS_ERR
) {
2406 addReplySds(c
,sdsnew("-ERR invalid DB index\r\n"));
2408 addReply(c
,shared
.ok
);
2412 static void randomkeyCommand(redisClient
*c
) {
2416 de
= dictGetRandomKey(c
->db
->dict
);
2417 if (!de
|| expireIfNeeded(c
->db
,dictGetEntryKey(de
)) == 0) break;
2420 addReply(c
,shared
.plus
);
2421 addReply(c
,shared
.crlf
);
2423 addReply(c
,shared
.plus
);
2424 addReply(c
,dictGetEntryKey(de
));
2425 addReply(c
,shared
.crlf
);
2429 static void keysCommand(redisClient
*c
) {
2432 sds pattern
= c
->argv
[1]->ptr
;
2433 int plen
= sdslen(pattern
);
2434 int numkeys
= 0, keyslen
= 0;
2435 robj
*lenobj
= createObject(REDIS_STRING
,NULL
);
2437 di
= dictGetIterator(c
->db
->dict
);
2438 if (!di
) oom("dictGetIterator");
2440 decrRefCount(lenobj
);
2441 while((de
= dictNext(di
)) != NULL
) {
2442 robj
*keyobj
= dictGetEntryKey(de
);
2444 sds key
= keyobj
->ptr
;
2445 if ((pattern
[0] == '*' && pattern
[1] == '\0') ||
2446 stringmatchlen(pattern
,plen
,key
,sdslen(key
),0)) {
2447 if (expireIfNeeded(c
->db
,keyobj
) == 0) {
2449 addReply(c
,shared
.space
);
2452 keyslen
+= sdslen(key
);
2456 dictReleaseIterator(di
);
2457 lenobj
->ptr
= sdscatprintf(sdsempty(),"$%lu\r\n",keyslen
+(numkeys
? (numkeys
-1) : 0));
2458 addReply(c
,shared
.crlf
);
2461 static void dbsizeCommand(redisClient
*c
) {
2463 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c
->db
->dict
)));
2466 static void lastsaveCommand(redisClient
*c
) {
2468 sdscatprintf(sdsempty(),":%lu\r\n",server
.lastsave
));
2471 static void typeCommand(redisClient
*c
) {
2475 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2480 case REDIS_STRING
: type
= "+string"; break;
2481 case REDIS_LIST
: type
= "+list"; break;
2482 case REDIS_SET
: type
= "+set"; break;
2483 default: type
= "unknown"; break;
2486 addReplySds(c
,sdsnew(type
));
2487 addReply(c
,shared
.crlf
);
2490 static void saveCommand(redisClient
*c
) {
2491 if (server
.bgsaveinprogress
) {
2492 addReplySds(c
,sdsnew("-ERR background save in progress\r\n"));
2495 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
2496 addReply(c
,shared
.ok
);
2498 addReply(c
,shared
.err
);
2502 static void bgsaveCommand(redisClient
*c
) {
2503 if (server
.bgsaveinprogress
) {
2504 addReplySds(c
,sdsnew("-ERR background save already in progress\r\n"));
2507 if (rdbSaveBackground(server
.dbfilename
) == REDIS_OK
) {
2508 addReply(c
,shared
.ok
);
2510 addReply(c
,shared
.err
);
2514 static void shutdownCommand(redisClient
*c
) {
2515 redisLog(REDIS_WARNING
,"User requested shutdown, saving DB...");
2516 if (server
.bgsaveinprogress
) {
2517 redisLog(REDIS_WARNING
,"There is a live saving child. Killing it!");
2518 kill(server
.bgsavechildpid
,SIGKILL
);
2520 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
2521 if (server
.daemonize
)
2522 unlink(server
.pidfile
);
2523 redisLog(REDIS_WARNING
,"%zu bytes used at exit",zmalloc_used_memory());
2524 redisLog(REDIS_WARNING
,"Server exit now, bye bye...");
2527 redisLog(REDIS_WARNING
,"Error trying to save the DB, can't exit");
2528 addReplySds(c
,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
2532 static void renameGenericCommand(redisClient
*c
, int nx
) {
2535 /* To use the same key as src and dst is probably an error */
2536 if (sdscmp(c
->argv
[1]->ptr
,c
->argv
[2]->ptr
) == 0) {
2537 addReply(c
,shared
.sameobjecterr
);
2541 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2543 addReply(c
,shared
.nokeyerr
);
2547 deleteIfVolatile(c
->db
,c
->argv
[2]);
2548 if (dictAdd(c
->db
->dict
,c
->argv
[2],o
) == DICT_ERR
) {
2551 addReply(c
,shared
.czero
);
2554 dictReplace(c
->db
->dict
,c
->argv
[2],o
);
2556 incrRefCount(c
->argv
[2]);
2558 deleteKey(c
->db
,c
->argv
[1]);
2560 addReply(c
,nx
? shared
.cone
: shared
.ok
);
2563 static void renameCommand(redisClient
*c
) {
2564 renameGenericCommand(c
,0);
2567 static void renamenxCommand(redisClient
*c
) {
2568 renameGenericCommand(c
,1);
2571 static void moveCommand(redisClient
*c
) {
2576 /* Obtain source and target DB pointers */
2579 if (selectDb(c
,atoi(c
->argv
[2]->ptr
)) == REDIS_ERR
) {
2580 addReply(c
,shared
.outofrangeerr
);
2584 selectDb(c
,srcid
); /* Back to the source DB */
2586 /* If the user is moving using as target the same
2587 * DB as the source DB it is probably an error. */
2589 addReply(c
,shared
.sameobjecterr
);
2593 /* Check if the element exists and get a reference */
2594 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2596 addReply(c
,shared
.czero
);
2600 /* Try to add the element to the target DB */
2601 deleteIfVolatile(dst
,c
->argv
[1]);
2602 if (dictAdd(dst
->dict
,c
->argv
[1],o
) == DICT_ERR
) {
2603 addReply(c
,shared
.czero
);
2606 incrRefCount(c
->argv
[1]);
2609 /* OK! key moved, free the entry in the source DB */
2610 deleteKey(src
,c
->argv
[1]);
2612 addReply(c
,shared
.cone
);
2615 /* =================================== Lists ================================ */
2616 static void pushGenericCommand(redisClient
*c
, int where
) {
2620 lobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2622 lobj
= createListObject();
2624 if (where
== REDIS_HEAD
) {
2625 if (!listAddNodeHead(list
,c
->argv
[2])) oom("listAddNodeHead");
2627 if (!listAddNodeTail(list
,c
->argv
[2])) oom("listAddNodeTail");
2629 dictAdd(c
->db
->dict
,c
->argv
[1],lobj
);
2630 incrRefCount(c
->argv
[1]);
2631 incrRefCount(c
->argv
[2]);
2633 if (lobj
->type
!= REDIS_LIST
) {
2634 addReply(c
,shared
.wrongtypeerr
);
2638 if (where
== REDIS_HEAD
) {
2639 if (!listAddNodeHead(list
,c
->argv
[2])) oom("listAddNodeHead");
2641 if (!listAddNodeTail(list
,c
->argv
[2])) oom("listAddNodeTail");
2643 incrRefCount(c
->argv
[2]);
2646 addReply(c
,shared
.ok
);
2649 static void lpushCommand(redisClient
*c
) {
2650 pushGenericCommand(c
,REDIS_HEAD
);
2653 static void rpushCommand(redisClient
*c
) {
2654 pushGenericCommand(c
,REDIS_TAIL
);
2657 static void llenCommand(redisClient
*c
) {
2661 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2663 addReply(c
,shared
.czero
);
2666 if (o
->type
!= REDIS_LIST
) {
2667 addReply(c
,shared
.wrongtypeerr
);
2670 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",listLength(l
)));
2675 static void lindexCommand(redisClient
*c
) {
2677 int index
= atoi(c
->argv
[2]->ptr
);
2679 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2681 addReply(c
,shared
.nullbulk
);
2683 if (o
->type
!= REDIS_LIST
) {
2684 addReply(c
,shared
.wrongtypeerr
);
2686 list
*list
= o
->ptr
;
2689 ln
= listIndex(list
, index
);
2691 addReply(c
,shared
.nullbulk
);
2693 robj
*ele
= listNodeValue(ln
);
2694 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele
->ptr
)));
2696 addReply(c
,shared
.crlf
);
2702 static void lsetCommand(redisClient
*c
) {
2704 int index
= atoi(c
->argv
[2]->ptr
);
2706 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2708 addReply(c
,shared
.nokeyerr
);
2710 if (o
->type
!= REDIS_LIST
) {
2711 addReply(c
,shared
.wrongtypeerr
);
2713 list
*list
= o
->ptr
;
2716 ln
= listIndex(list
, index
);
2718 addReply(c
,shared
.outofrangeerr
);
2720 robj
*ele
= listNodeValue(ln
);
2723 listNodeValue(ln
) = c
->argv
[3];
2724 incrRefCount(c
->argv
[3]);
2725 addReply(c
,shared
.ok
);
2732 static void popGenericCommand(redisClient
*c
, int where
) {
2735 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2737 addReply(c
,shared
.nullbulk
);
2739 if (o
->type
!= REDIS_LIST
) {
2740 addReply(c
,shared
.wrongtypeerr
);
2742 list
*list
= o
->ptr
;
2745 if (where
== REDIS_HEAD
)
2746 ln
= listFirst(list
);
2748 ln
= listLast(list
);
2751 addReply(c
,shared
.nullbulk
);
2753 robj
*ele
= listNodeValue(ln
);
2754 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele
->ptr
)));
2756 addReply(c
,shared
.crlf
);
2757 listDelNode(list
,ln
);
2764 static void lpopCommand(redisClient
*c
) {
2765 popGenericCommand(c
,REDIS_HEAD
);
2768 static void rpopCommand(redisClient
*c
) {
2769 popGenericCommand(c
,REDIS_TAIL
);
2772 static void lrangeCommand(redisClient
*c
) {
2774 int start
= atoi(c
->argv
[2]->ptr
);
2775 int end
= atoi(c
->argv
[3]->ptr
);
2777 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2779 addReply(c
,shared
.nullmultibulk
);
2781 if (o
->type
!= REDIS_LIST
) {
2782 addReply(c
,shared
.wrongtypeerr
);
2784 list
*list
= o
->ptr
;
2786 int llen
= listLength(list
);
2790 /* convert negative indexes */
2791 if (start
< 0) start
= llen
+start
;
2792 if (end
< 0) end
= llen
+end
;
2793 if (start
< 0) start
= 0;
2794 if (end
< 0) end
= 0;
2796 /* indexes sanity checks */
2797 if (start
> end
|| start
>= llen
) {
2798 /* Out of range start or start > end result in empty list */
2799 addReply(c
,shared
.emptymultibulk
);
2802 if (end
>= llen
) end
= llen
-1;
2803 rangelen
= (end
-start
)+1;
2805 /* Return the result in form of a multi-bulk reply */
2806 ln
= listIndex(list
, start
);
2807 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",rangelen
));
2808 for (j
= 0; j
< rangelen
; j
++) {
2809 ele
= listNodeValue(ln
);
2810 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele
->ptr
)));
2812 addReply(c
,shared
.crlf
);
2819 static void ltrimCommand(redisClient
*c
) {
2821 int start
= atoi(c
->argv
[2]->ptr
);
2822 int end
= atoi(c
->argv
[3]->ptr
);
2824 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2826 addReply(c
,shared
.nokeyerr
);
2828 if (o
->type
!= REDIS_LIST
) {
2829 addReply(c
,shared
.wrongtypeerr
);
2831 list
*list
= o
->ptr
;
2833 int llen
= listLength(list
);
2834 int j
, ltrim
, rtrim
;
2836 /* convert negative indexes */
2837 if (start
< 0) start
= llen
+start
;
2838 if (end
< 0) end
= llen
+end
;
2839 if (start
< 0) start
= 0;
2840 if (end
< 0) end
= 0;
2842 /* indexes sanity checks */
2843 if (start
> end
|| start
>= llen
) {
2844 /* Out of range start or start > end result in empty list */
2848 if (end
>= llen
) end
= llen
-1;
2853 /* Remove list elements to perform the trim */
2854 for (j
= 0; j
< ltrim
; j
++) {
2855 ln
= listFirst(list
);
2856 listDelNode(list
,ln
);
2858 for (j
= 0; j
< rtrim
; j
++) {
2859 ln
= listLast(list
);
2860 listDelNode(list
,ln
);
2862 addReply(c
,shared
.ok
);
2868 static void lremCommand(redisClient
*c
) {
2871 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2873 addReply(c
,shared
.czero
);
2875 if (o
->type
!= REDIS_LIST
) {
2876 addReply(c
,shared
.wrongtypeerr
);
2878 list
*list
= o
->ptr
;
2879 listNode
*ln
, *next
;
2880 int toremove
= atoi(c
->argv
[2]->ptr
);
2885 toremove
= -toremove
;
2888 ln
= fromtail
? list
->tail
: list
->head
;
2890 robj
*ele
= listNodeValue(ln
);
2892 next
= fromtail
? ln
->prev
: ln
->next
;
2893 if (sdscmp(ele
->ptr
,c
->argv
[3]->ptr
) == 0) {
2894 listDelNode(list
,ln
);
2897 if (toremove
&& removed
== toremove
) break;
2901 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",removed
));
2906 /* ==================================== Sets ================================ */
2908 static void saddCommand(redisClient
*c
) {
2911 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2913 set
= createSetObject();
2914 dictAdd(c
->db
->dict
,c
->argv
[1],set
);
2915 incrRefCount(c
->argv
[1]);
2917 if (set
->type
!= REDIS_SET
) {
2918 addReply(c
,shared
.wrongtypeerr
);
2922 if (dictAdd(set
->ptr
,c
->argv
[2],NULL
) == DICT_OK
) {
2923 incrRefCount(c
->argv
[2]);
2925 addReply(c
,shared
.cone
);
2927 addReply(c
,shared
.czero
);
2931 static void sremCommand(redisClient
*c
) {
2934 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2936 addReply(c
,shared
.czero
);
2938 if (set
->type
!= REDIS_SET
) {
2939 addReply(c
,shared
.wrongtypeerr
);
2942 if (dictDelete(set
->ptr
,c
->argv
[2]) == DICT_OK
) {
2944 addReply(c
,shared
.cone
);
2946 addReply(c
,shared
.czero
);
2951 static void smoveCommand(redisClient
*c
) {
2952 robj
*srcset
, *dstset
;
2954 srcset
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2955 dstset
= lookupKeyWrite(c
->db
,c
->argv
[2]);
2957 /* If the source key does not exist return 0, if it's of the wrong type
2959 if (srcset
== NULL
|| srcset
->type
!= REDIS_SET
) {
2960 addReply(c
, srcset
? shared
.wrongtypeerr
: shared
.czero
);
2963 /* Error if the destination key is not a set as well */
2964 if (dstset
&& dstset
->type
!= REDIS_SET
) {
2965 addReply(c
,shared
.wrongtypeerr
);
2968 /* Remove the element from the source set */
2969 if (dictDelete(srcset
->ptr
,c
->argv
[3]) == DICT_ERR
) {
2970 /* Key not found in the src set! return zero */
2971 addReply(c
,shared
.czero
);
2975 /* Add the element to the destination set */
2977 dstset
= createSetObject();
2978 dictAdd(c
->db
->dict
,c
->argv
[2],dstset
);
2979 incrRefCount(c
->argv
[2]);
2981 if (dictAdd(dstset
->ptr
,c
->argv
[3],NULL
) == DICT_OK
)
2982 incrRefCount(c
->argv
[3]);
2983 addReply(c
,shared
.cone
);
2986 static void sismemberCommand(redisClient
*c
) {
2989 set
= lookupKeyRead(c
->db
,c
->argv
[1]);
2991 addReply(c
,shared
.czero
);
2993 if (set
->type
!= REDIS_SET
) {
2994 addReply(c
,shared
.wrongtypeerr
);
2997 if (dictFind(set
->ptr
,c
->argv
[2]))
2998 addReply(c
,shared
.cone
);
3000 addReply(c
,shared
.czero
);
3004 static void scardCommand(redisClient
*c
) {
3008 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3010 addReply(c
,shared
.czero
);
3013 if (o
->type
!= REDIS_SET
) {
3014 addReply(c
,shared
.wrongtypeerr
);
3017 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",
3023 static int qsortCompareSetsByCardinality(const void *s1
, const void *s2
) {
3024 dict
**d1
= (void*) s1
, **d2
= (void*) s2
;
3026 return dictSize(*d1
)-dictSize(*d2
);
3029 static void sinterGenericCommand(redisClient
*c
, robj
**setskeys
, int setsnum
, robj
*dstkey
) {
3030 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
3033 robj
*lenobj
= NULL
, *dstset
= NULL
;
3034 int j
, cardinality
= 0;
3036 if (!dv
) oom("sinterGenericCommand");
3037 for (j
= 0; j
< setsnum
; j
++) {
3041 lookupKeyWrite(c
->db
,setskeys
[j
]) :
3042 lookupKeyRead(c
->db
,setskeys
[j
]);
3046 deleteKey(c
->db
,dstkey
);
3047 addReply(c
,shared
.ok
);
3049 addReply(c
,shared
.nullmultibulk
);
3053 if (setobj
->type
!= REDIS_SET
) {
3055 addReply(c
,shared
.wrongtypeerr
);
3058 dv
[j
] = setobj
->ptr
;
3060 /* Sort sets from the smallest to largest, this will improve our
3061 * algorithm's performace */
3062 qsort(dv
,setsnum
,sizeof(dict
*),qsortCompareSetsByCardinality
);
3064 /* The first thing we should output is the total number of elements...
3065 * since this is a multi-bulk write, but at this stage we don't know
3066 * the intersection set size, so we use a trick, append an empty object
3067 * to the output list and save the pointer to later modify it with the
3070 lenobj
= createObject(REDIS_STRING
,NULL
);
3072 decrRefCount(lenobj
);
3074 /* If we have a target key where to store the resulting set
3075 * create this key with an empty set inside */
3076 dstset
= createSetObject();
3079 /* Iterate all the elements of the first (smallest) set, and test
3080 * the element against all the other sets, if at least one set does
3081 * not include the element it is discarded */
3082 di
= dictGetIterator(dv
[0]);
3083 if (!di
) oom("dictGetIterator");
3085 while((de
= dictNext(di
)) != NULL
) {
3088 for (j
= 1; j
< setsnum
; j
++)
3089 if (dictFind(dv
[j
],dictGetEntryKey(de
)) == NULL
) break;
3091 continue; /* at least one set does not contain the member */
3092 ele
= dictGetEntryKey(de
);
3094 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(ele
->ptr
)));
3096 addReply(c
,shared
.crlf
);
3099 dictAdd(dstset
->ptr
,ele
,NULL
);
3103 dictReleaseIterator(di
);
3106 /* Store the resulting set into the target */
3107 deleteKey(c
->db
,dstkey
);
3108 dictAdd(c
->db
->dict
,dstkey
,dstset
);
3109 incrRefCount(dstkey
);
3113 lenobj
->ptr
= sdscatprintf(sdsempty(),"*%d\r\n",cardinality
);
3115 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",
3116 dictSize((dict
*)dstset
->ptr
)));
3122 static void sinterCommand(redisClient
*c
) {
3123 sinterGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
);
3126 static void sinterstoreCommand(redisClient
*c
) {
3127 sinterGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1]);
3130 #define REDIS_OP_UNION 0
3131 #define REDIS_OP_DIFF 1
3133 static void sunionDiffGenericCommand(redisClient
*c
, robj
**setskeys
, int setsnum
, robj
*dstkey
, int op
) {
3134 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
3137 robj
*dstset
= NULL
;
3138 int j
, cardinality
= 0;
3140 if (!dv
) oom("sunionDiffGenericCommand");
3141 for (j
= 0; j
< setsnum
; j
++) {
3145 lookupKeyWrite(c
->db
,setskeys
[j
]) :
3146 lookupKeyRead(c
->db
,setskeys
[j
]);
3151 if (setobj
->type
!= REDIS_SET
) {
3153 addReply(c
,shared
.wrongtypeerr
);
3156 dv
[j
] = setobj
->ptr
;
3159 /* We need a temp set object to store our union. If the dstkey
3160 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
3161 * this set object will be the resulting object to set into the target key*/
3162 dstset
= createSetObject();
3164 /* Iterate all the elements of all the sets, add every element a single
3165 * time to the result set */
3166 for (j
= 0; j
< setsnum
; j
++) {
3167 if (op
== REDIS_OP_DIFF
&& j
== 0 && !dv
[j
]) break; /* result set is empty */
3168 if (!dv
[j
]) continue; /* non existing keys are like empty sets */
3170 di
= dictGetIterator(dv
[j
]);
3171 if (!di
) oom("dictGetIterator");
3173 while((de
= dictNext(di
)) != NULL
) {
3176 /* dictAdd will not add the same element multiple times */
3177 ele
= dictGetEntryKey(de
);
3178 if (op
== REDIS_OP_UNION
|| j
== 0) {
3179 if (dictAdd(dstset
->ptr
,ele
,NULL
) == DICT_OK
) {
3183 } else if (op
== REDIS_OP_DIFF
) {
3184 if (dictDelete(dstset
->ptr
,ele
) == DICT_OK
) {
3189 dictReleaseIterator(di
);
3191 if (op
== REDIS_OP_DIFF
&& cardinality
== 0) break; /* result set is empty */
3194 /* Output the content of the resulting set, if not in STORE mode */
3196 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",cardinality
));
3197 di
= dictGetIterator(dstset
->ptr
);
3198 if (!di
) oom("dictGetIterator");
3199 while((de
= dictNext(di
)) != NULL
) {
3202 ele
= dictGetEntryKey(de
);
3203 addReplySds(c
,sdscatprintf(sdsempty(),
3204 "$%d\r\n",sdslen(ele
->ptr
)));
3206 addReply(c
,shared
.crlf
);
3208 dictReleaseIterator(di
);
3210 /* If we have a target key where to store the resulting set
3211 * create this key with the result set inside */
3212 deleteKey(c
->db
,dstkey
);
3213 dictAdd(c
->db
->dict
,dstkey
,dstset
);
3214 incrRefCount(dstkey
);
3219 decrRefCount(dstset
);
3221 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",
3222 dictSize((dict
*)dstset
->ptr
)));
3228 static void sunionCommand(redisClient
*c
) {
3229 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_UNION
);
3232 static void sunionstoreCommand(redisClient
*c
) {
3233 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_UNION
);
3236 static void sdiffCommand(redisClient
*c
) {
3237 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_DIFF
);
3240 static void sdiffstoreCommand(redisClient
*c
) {
3241 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_DIFF
);
3244 static void flushdbCommand(redisClient
*c
) {
3245 server
.dirty
+= dictSize(c
->db
->dict
);
3246 dictEmpty(c
->db
->dict
);
3247 dictEmpty(c
->db
->expires
);
3248 addReply(c
,shared
.ok
);
3251 static void flushallCommand(redisClient
*c
) {
3252 server
.dirty
+= emptyDb();
3253 addReply(c
,shared
.ok
);
3254 rdbSave(server
.dbfilename
);
3258 redisSortOperation
*createSortOperation(int type
, robj
*pattern
) {
3259 redisSortOperation
*so
= zmalloc(sizeof(*so
));
3260 if (!so
) oom("createSortOperation");
3262 so
->pattern
= pattern
;
3266 /* Return the value associated to the key with a name obtained
3267 * substituting the first occurence of '*' in 'pattern' with 'subst' */
3268 robj
*lookupKeyByPattern(redisDb
*db
, robj
*pattern
, robj
*subst
) {
3272 int prefixlen
, sublen
, postfixlen
;
3273 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
3277 char buf
[REDIS_SORTKEY_MAX
+1];
3280 spat
= pattern
->ptr
;
3282 if (sdslen(spat
)+sdslen(ssub
)-1 > REDIS_SORTKEY_MAX
) return NULL
;
3283 p
= strchr(spat
,'*');
3284 if (!p
) return NULL
;
3287 sublen
= sdslen(ssub
);
3288 postfixlen
= sdslen(spat
)-(prefixlen
+1);
3289 memcpy(keyname
.buf
,spat
,prefixlen
);
3290 memcpy(keyname
.buf
+prefixlen
,ssub
,sublen
);
3291 memcpy(keyname
.buf
+prefixlen
+sublen
,p
+1,postfixlen
);
3292 keyname
.buf
[prefixlen
+sublen
+postfixlen
] = '\0';
3293 keyname
.len
= prefixlen
+sublen
+postfixlen
;
3295 keyobj
.refcount
= 1;
3296 keyobj
.type
= REDIS_STRING
;
3297 keyobj
.ptr
= ((char*)&keyname
)+(sizeof(long)*2);
3299 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
3300 return lookupKeyRead(db
,&keyobj
);
3303 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
3304 * the additional parameter is not standard but a BSD-specific we have to
3305 * pass sorting parameters via the global 'server' structure */
3306 static int sortCompare(const void *s1
, const void *s2
) {
3307 const redisSortObject
*so1
= s1
, *so2
= s2
;
3310 if (!server
.sort_alpha
) {
3311 /* Numeric sorting. Here it's trivial as we precomputed scores */
3312 if (so1
->u
.score
> so2
->u
.score
) {
3314 } else if (so1
->u
.score
< so2
->u
.score
) {
3320 /* Alphanumeric sorting */
3321 if (server
.sort_bypattern
) {
3322 if (!so1
->u
.cmpobj
|| !so2
->u
.cmpobj
) {
3323 /* At least one compare object is NULL */
3324 if (so1
->u
.cmpobj
== so2
->u
.cmpobj
)
3326 else if (so1
->u
.cmpobj
== NULL
)
3331 /* We have both the objects, use strcoll */
3332 cmp
= strcoll(so1
->u
.cmpobj
->ptr
,so2
->u
.cmpobj
->ptr
);
3335 /* Compare elements directly */
3336 cmp
= strcoll(so1
->obj
->ptr
,so2
->obj
->ptr
);
3339 return server
.sort_desc
? -cmp
: cmp
;
3342 /* The SORT command is the most complex command in Redis. Warning: this code
3343 * is optimized for speed and a bit less for readability */
3344 static void sortCommand(redisClient
*c
) {
3347 int desc
= 0, alpha
= 0;
3348 int limit_start
= 0, limit_count
= -1, start
, end
;
3349 int j
, dontsort
= 0, vectorlen
;
3350 int getop
= 0; /* GET operation counter */
3351 robj
*sortval
, *sortby
= NULL
;
3352 redisSortObject
*vector
; /* Resulting vector to sort */
3354 /* Lookup the key to sort. It must be of the right types */
3355 sortval
= lookupKeyRead(c
->db
,c
->argv
[1]);
3356 if (sortval
== NULL
) {
3357 addReply(c
,shared
.nokeyerr
);
3360 if (sortval
->type
!= REDIS_SET
&& sortval
->type
!= REDIS_LIST
) {
3361 addReply(c
,shared
.wrongtypeerr
);
3365 /* Create a list of operations to perform for every sorted element.
3366 * Operations can be GET/DEL/INCR/DECR */
3367 operations
= listCreate();
3368 listSetFreeMethod(operations
,zfree
);
3371 /* Now we need to protect sortval incrementing its count, in the future
3372 * SORT may have options able to overwrite/delete keys during the sorting
3373 * and the sorted key itself may get destroied */
3374 incrRefCount(sortval
);
3376 /* The SORT command has an SQL-alike syntax, parse it */
3377 while(j
< c
->argc
) {
3378 int leftargs
= c
->argc
-j
-1;
3379 if (!strcasecmp(c
->argv
[j
]->ptr
,"asc")) {
3381 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"desc")) {
3383 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"alpha")) {
3385 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"limit") && leftargs
>= 2) {
3386 limit_start
= atoi(c
->argv
[j
+1]->ptr
);
3387 limit_count
= atoi(c
->argv
[j
+2]->ptr
);
3389 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"by") && leftargs
>= 1) {
3390 sortby
= c
->argv
[j
+1];
3391 /* If the BY pattern does not contain '*', i.e. it is constant,
3392 * we don't need to sort nor to lookup the weight keys. */
3393 if (strchr(c
->argv
[j
+1]->ptr
,'*') == NULL
) dontsort
= 1;
3395 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"get") && leftargs
>= 1) {
3396 listAddNodeTail(operations
,createSortOperation(
3397 REDIS_SORT_GET
,c
->argv
[j
+1]));
3400 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"del") && leftargs
>= 1) {
3401 listAddNodeTail(operations
,createSortOperation(
3402 REDIS_SORT_DEL
,c
->argv
[j
+1]));
3404 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"incr") && leftargs
>= 1) {
3405 listAddNodeTail(operations
,createSortOperation(
3406 REDIS_SORT_INCR
,c
->argv
[j
+1]));
3408 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"get") && leftargs
>= 1) {
3409 listAddNodeTail(operations
,createSortOperation(
3410 REDIS_SORT_DECR
,c
->argv
[j
+1]));
3413 decrRefCount(sortval
);
3414 listRelease(operations
);
3415 addReply(c
,shared
.syntaxerr
);
3421 /* Load the sorting vector with all the objects to sort */
3422 vectorlen
= (sortval
->type
== REDIS_LIST
) ?
3423 listLength((list
*)sortval
->ptr
) :
3424 dictSize((dict
*)sortval
->ptr
);
3425 vector
= zmalloc(sizeof(redisSortObject
)*vectorlen
);
3426 if (!vector
) oom("allocating objects vector for SORT");
3428 if (sortval
->type
== REDIS_LIST
) {
3429 list
*list
= sortval
->ptr
;
3433 while((ln
= listYield(list
))) {
3434 robj
*ele
= ln
->value
;
3435 vector
[j
].obj
= ele
;
3436 vector
[j
].u
.score
= 0;
3437 vector
[j
].u
.cmpobj
= NULL
;
3441 dict
*set
= sortval
->ptr
;
3445 di
= dictGetIterator(set
);
3446 if (!di
) oom("dictGetIterator");
3447 while((setele
= dictNext(di
)) != NULL
) {
3448 vector
[j
].obj
= dictGetEntryKey(setele
);
3449 vector
[j
].u
.score
= 0;
3450 vector
[j
].u
.cmpobj
= NULL
;
3453 dictReleaseIterator(di
);
3455 assert(j
== vectorlen
);
3457 /* Now it's time to load the right scores in the sorting vector */
3458 if (dontsort
== 0) {
3459 for (j
= 0; j
< vectorlen
; j
++) {
3463 byval
= lookupKeyByPattern(c
->db
,sortby
,vector
[j
].obj
);
3464 if (!byval
|| byval
->type
!= REDIS_STRING
) continue;
3466 vector
[j
].u
.cmpobj
= byval
;
3467 incrRefCount(byval
);
3469 vector
[j
].u
.score
= strtod(byval
->ptr
,NULL
);
3472 if (!alpha
) vector
[j
].u
.score
= strtod(vector
[j
].obj
->ptr
,NULL
);
3477 /* We are ready to sort the vector... perform a bit of sanity check
3478 * on the LIMIT option too. We'll use a partial version of quicksort. */
3479 start
= (limit_start
< 0) ? 0 : limit_start
;
3480 end
= (limit_count
< 0) ? vectorlen
-1 : start
+limit_count
-1;
3481 if (start
>= vectorlen
) {
3482 start
= vectorlen
-1;
3485 if (end
>= vectorlen
) end
= vectorlen
-1;
3487 if (dontsort
== 0) {
3488 server
.sort_desc
= desc
;
3489 server
.sort_alpha
= alpha
;
3490 server
.sort_bypattern
= sortby
? 1 : 0;
3491 if (sortby
&& (start
!= 0 || end
!= vectorlen
-1))
3492 pqsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
, start
,end
);
3494 qsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
);
3497 /* Send command output to the output buffer, performing the specified
3498 * GET/DEL/INCR/DECR operations if any. */
3499 outputlen
= getop
? getop
*(end
-start
+1) : end
-start
+1;
3500 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",outputlen
));
3501 for (j
= start
; j
<= end
; j
++) {
3504 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",
3505 sdslen(vector
[j
].obj
->ptr
)));
3506 addReply(c
,vector
[j
].obj
);
3507 addReply(c
,shared
.crlf
);
3509 listRewind(operations
);
3510 while((ln
= listYield(operations
))) {
3511 redisSortOperation
*sop
= ln
->value
;
3512 robj
*val
= lookupKeyByPattern(c
->db
,sop
->pattern
,
3515 if (sop
->type
== REDIS_SORT_GET
) {
3516 if (!val
|| val
->type
!= REDIS_STRING
) {
3517 addReply(c
,shared
.nullbulk
);
3519 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",
3522 addReply(c
,shared
.crlf
);
3524 } else if (sop
->type
== REDIS_SORT_DEL
) {
3531 decrRefCount(sortval
);
3532 listRelease(operations
);
3533 for (j
= 0; j
< vectorlen
; j
++) {
3534 if (sortby
&& alpha
&& vector
[j
].u
.cmpobj
)
3535 decrRefCount(vector
[j
].u
.cmpobj
);
3540 static void infoCommand(redisClient
*c
) {
3542 time_t uptime
= time(NULL
)-server
.stat_starttime
;
3544 info
= sdscatprintf(sdsempty(),
3545 "redis_version:%s\r\n"
3546 "uptime_in_seconds:%d\r\n"
3547 "uptime_in_days:%d\r\n"
3548 "connected_clients:%d\r\n"
3549 "connected_slaves:%d\r\n"
3550 "used_memory:%zu\r\n"
3551 "changes_since_last_save:%lld\r\n"
3552 "bgsave_in_progress:%d\r\n"
3553 "last_save_time:%d\r\n"
3554 "total_connections_received:%lld\r\n"
3555 "total_commands_processed:%lld\r\n"
3560 listLength(server
.clients
)-listLength(server
.slaves
),
3561 listLength(server
.slaves
),
3564 server
.bgsaveinprogress
,
3566 server
.stat_numconnections
,
3567 server
.stat_numcommands
,
3568 server
.masterhost
== NULL
? "master" : "slave"
3570 if (server
.masterhost
) {
3571 info
= sdscatprintf(info
,
3572 "master_host:%s\r\n"
3573 "master_port:%d\r\n"
3574 "master_link_status:%s\r\n"
3575 "master_last_io_seconds_ago:%d\r\n"
3578 (server
.replstate
== REDIS_REPL_CONNECTED
) ?
3580 (int)(time(NULL
)-server
.master
->lastinteraction
)
3583 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info
)));
3584 addReplySds(c
,info
);
3585 addReply(c
,shared
.crlf
);
3588 static void monitorCommand(redisClient
*c
) {
3589 /* ignore MONITOR if aleady slave or in monitor mode */
3590 if (c
->flags
& REDIS_SLAVE
) return;
3592 c
->flags
|= (REDIS_SLAVE
|REDIS_MONITOR
);
3594 if (!listAddNodeTail(server
.monitors
,c
)) oom("listAddNodeTail");
3595 addReply(c
,shared
.ok
);
3598 /* ================================= Expire ================================= */
3599 static int removeExpire(redisDb
*db
, robj
*key
) {
3600 if (dictDelete(db
->expires
,key
) == DICT_OK
) {
3607 static int setExpire(redisDb
*db
, robj
*key
, time_t when
) {
3608 if (dictAdd(db
->expires
,key
,(void*)when
) == DICT_ERR
) {
3616 /* Return the expire time of the specified key, or -1 if no expire
3617 * is associated with this key (i.e. the key is non volatile) */
3618 static time_t getExpire(redisDb
*db
, robj
*key
) {
3621 /* No expire? return ASAP */
3622 if (dictSize(db
->expires
) == 0 ||
3623 (de
= dictFind(db
->expires
,key
)) == NULL
) return -1;
3625 return (time_t) dictGetEntryVal(de
);
3628 static int expireIfNeeded(redisDb
*db
, robj
*key
) {
3632 /* No expire? return ASAP */
3633 if (dictSize(db
->expires
) == 0 ||
3634 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
3636 /* Lookup the expire */
3637 when
= (time_t) dictGetEntryVal(de
);
3638 if (time(NULL
) <= when
) return 0;
3640 /* Delete the key */
3641 dictDelete(db
->expires
,key
);
3642 return dictDelete(db
->dict
,key
) == DICT_OK
;
3645 static int deleteIfVolatile(redisDb
*db
, robj
*key
) {
3648 /* No expire? return ASAP */
3649 if (dictSize(db
->expires
) == 0 ||
3650 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
3652 /* Delete the key */
3654 dictDelete(db
->expires
,key
);
3655 return dictDelete(db
->dict
,key
) == DICT_OK
;
3658 static void expireCommand(redisClient
*c
) {
3660 int seconds
= atoi(c
->argv
[2]->ptr
);
3662 de
= dictFind(c
->db
->dict
,c
->argv
[1]);
3664 addReply(c
,shared
.czero
);
3668 addReply(c
, shared
.czero
);
3671 time_t when
= time(NULL
)+seconds
;
3672 if (setExpire(c
->db
,c
->argv
[1],when
))
3673 addReply(c
,shared
.cone
);
3675 addReply(c
,shared
.czero
);
3680 static void ttlCommand(redisClient
*c
) {
3684 expire
= getExpire(c
->db
,c
->argv
[1]);
3686 ttl
= (int) (expire
-time(NULL
));
3687 if (ttl
< 0) ttl
= -1;
3689 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",ttl
));
3692 /* =============================== Replication ============================= */
3694 static int syncWrite(int fd
, char *ptr
, ssize_t size
, int timeout
) {
3695 ssize_t nwritten
, ret
= size
;
3696 time_t start
= time(NULL
);
3700 if (aeWait(fd
,AE_WRITABLE
,1000) & AE_WRITABLE
) {
3701 nwritten
= write(fd
,ptr
,size
);
3702 if (nwritten
== -1) return -1;
3706 if ((time(NULL
)-start
) > timeout
) {
3714 static int syncRead(int fd
, char *ptr
, ssize_t size
, int timeout
) {
3715 ssize_t nread
, totread
= 0;
3716 time_t start
= time(NULL
);
3720 if (aeWait(fd
,AE_READABLE
,1000) & AE_READABLE
) {
3721 nread
= read(fd
,ptr
,size
);
3722 if (nread
== -1) return -1;
3727 if ((time(NULL
)-start
) > timeout
) {
3735 static int syncReadLine(int fd
, char *ptr
, ssize_t size
, int timeout
) {
3742 if (syncRead(fd
,&c
,1,timeout
) == -1) return -1;
3745 if (nread
&& *(ptr
-1) == '\r') *(ptr
-1) = '\0';
3756 static void syncCommand(redisClient
*c
) {
3757 /* ignore SYNC if aleady slave or in monitor mode */
3758 if (c
->flags
& REDIS_SLAVE
) return;
3760 /* SYNC can't be issued when the server has pending data to send to
3761 * the client about already issued commands. We need a fresh reply
3762 * buffer registering the differences between the BGSAVE and the current
3763 * dataset, so that we can copy to other slaves if needed. */
3764 if (listLength(c
->reply
) != 0) {
3765 addReplySds(c
,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
3769 redisLog(REDIS_NOTICE
,"Slave ask for synchronization");
3770 /* Here we need to check if there is a background saving operation
3771 * in progress, or if it is required to start one */
3772 if (server
.bgsaveinprogress
) {
3773 /* Ok a background save is in progress. Let's check if it is a good
3774 * one for replication, i.e. if there is another slave that is
3775 * registering differences since the server forked to save */
3779 listRewind(server
.slaves
);
3780 while((ln
= listYield(server
.slaves
))) {
3782 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) break;
3785 /* Perfect, the server is already registering differences for
3786 * another slave. Set the right state, and copy the buffer. */
3787 listRelease(c
->reply
);
3788 c
->reply
= listDup(slave
->reply
);
3789 if (!c
->reply
) oom("listDup copying slave reply list");
3790 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
3791 redisLog(REDIS_NOTICE
,"Waiting for end of BGSAVE for SYNC");
3793 /* No way, we need to wait for the next BGSAVE in order to
3794 * register differences */
3795 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_START
;
3796 redisLog(REDIS_NOTICE
,"Waiting for next BGSAVE for SYNC");
3799 /* Ok we don't have a BGSAVE in progress, let's start one */
3800 redisLog(REDIS_NOTICE
,"Starting BGSAVE for SYNC");
3801 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
3802 redisLog(REDIS_NOTICE
,"Replication failed, can't BGSAVE");
3803 addReplySds(c
,sdsnew("-ERR Unalbe to perform background save\r\n"));
3806 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
3809 c
->flags
|= REDIS_SLAVE
;
3811 if (!listAddNodeTail(server
.slaves
,c
)) oom("listAddNodeTail");
3815 static void sendBulkToSlave(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
3816 redisClient
*slave
= privdata
;
3818 REDIS_NOTUSED(mask
);
3819 char buf
[REDIS_IOBUF_LEN
];
3820 ssize_t nwritten
, buflen
;
3822 if (slave
->repldboff
== 0) {
3823 /* Write the bulk write count before to transfer the DB. In theory here
3824 * we don't know how much room there is in the output buffer of the
3825 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
3826 * operations) will never be smaller than the few bytes we need. */
3829 bulkcount
= sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
3831 if (write(fd
,bulkcount
,sdslen(bulkcount
)) != (signed)sdslen(bulkcount
))
3839 lseek(slave
->repldbfd
,slave
->repldboff
,SEEK_SET
);
3840 buflen
= read(slave
->repldbfd
,buf
,REDIS_IOBUF_LEN
);
3842 redisLog(REDIS_WARNING
,"Read error sending DB to slave: %s",
3843 (buflen
== 0) ? "premature EOF" : strerror(errno
));
3847 if ((nwritten
= write(fd
,buf
,buflen
)) == -1) {
3848 redisLog(REDIS_DEBUG
,"Write error sending DB to slave: %s",
3853 slave
->repldboff
+= nwritten
;
3854 if (slave
->repldboff
== slave
->repldbsize
) {
3855 close(slave
->repldbfd
);
3856 slave
->repldbfd
= -1;
3857 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
3858 slave
->replstate
= REDIS_REPL_ONLINE
;
3859 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
,
3860 sendReplyToClient
, slave
, NULL
) == AE_ERR
) {
3864 addReplySds(slave
,sdsempty());
3865 redisLog(REDIS_NOTICE
,"Synchronization with slave succeeded");
3869 static void updateSalvesWaitingBgsave(int bgsaveerr
) {
3871 int startbgsave
= 0;
3873 listRewind(server
.slaves
);
3874 while((ln
= listYield(server
.slaves
))) {
3875 redisClient
*slave
= ln
->value
;
3877 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) {
3879 slave
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
3880 } else if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) {
3881 struct redis_stat buf
;
3883 if (bgsaveerr
!= REDIS_OK
) {
3885 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE child returned an error");
3888 if ((slave
->repldbfd
= open(server
.dbfilename
,O_RDONLY
)) == -1 ||
3889 redis_fstat(slave
->repldbfd
,&buf
) == -1) {
3891 redisLog(REDIS_WARNING
,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno
));
3894 slave
->repldboff
= 0;
3895 slave
->repldbsize
= buf
.st_size
;
3896 slave
->replstate
= REDIS_REPL_SEND_BULK
;
3897 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
3898 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
, sendBulkToSlave
, slave
, NULL
) == AE_ERR
) {
3905 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
3906 listRewind(server
.slaves
);
3907 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE failed");
3908 while((ln
= listYield(server
.slaves
))) {
3909 redisClient
*slave
= ln
->value
;
3911 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
)
3918 static int syncWithMaster(void) {
3919 char buf
[1024], tmpfile
[256];
3921 int fd
= anetTcpConnect(NULL
,server
.masterhost
,server
.masterport
);
3925 redisLog(REDIS_WARNING
,"Unable to connect to MASTER: %s",
3929 /* Issue the SYNC command */
3930 if (syncWrite(fd
,"SYNC \r\n",7,5) == -1) {
3932 redisLog(REDIS_WARNING
,"I/O error writing to MASTER: %s",
3936 /* Read the bulk write count */
3937 if (syncReadLine(fd
,buf
,1024,3600) == -1) {
3939 redisLog(REDIS_WARNING
,"I/O error reading bulk count from MASTER: %s",
3943 dumpsize
= atoi(buf
+1);
3944 redisLog(REDIS_NOTICE
,"Receiving %d bytes data dump from MASTER",dumpsize
);
3945 /* Read the bulk write data on a temp file */
3946 snprintf(tmpfile
,256,"temp-%d.%ld.rdb",(int)time(NULL
),(long int)random());
3947 dfd
= open(tmpfile
,O_CREAT
|O_WRONLY
,0644);
3950 redisLog(REDIS_WARNING
,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno
));
3954 int nread
, nwritten
;
3956 nread
= read(fd
,buf
,(dumpsize
< 1024)?dumpsize
:1024);
3958 redisLog(REDIS_WARNING
,"I/O error trying to sync with MASTER: %s",
3964 nwritten
= write(dfd
,buf
,nread
);
3965 if (nwritten
== -1) {
3966 redisLog(REDIS_WARNING
,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno
));
3974 if (rename(tmpfile
,server
.dbfilename
) == -1) {
3975 redisLog(REDIS_WARNING
,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno
));
3981 if (rdbLoad(server
.dbfilename
) != REDIS_OK
) {
3982 redisLog(REDIS_WARNING
,"Failed trying to load the MASTER synchronization DB from disk");
3986 server
.master
= createClient(fd
);
3987 server
.master
->flags
|= REDIS_MASTER
;
3988 server
.replstate
= REDIS_REPL_CONNECTED
;
3992 static void slaveofCommand(redisClient
*c
) {
3993 if (!strcasecmp(c
->argv
[1]->ptr
,"no") &&
3994 !strcasecmp(c
->argv
[2]->ptr
,"one")) {
3995 if (server
.masterhost
) {
3996 sdsfree(server
.masterhost
);
3997 server
.masterhost
= NULL
;
3998 if (server
.master
) freeClient(server
.master
);
3999 server
.replstate
= REDIS_REPL_NONE
;
4000 redisLog(REDIS_NOTICE
,"MASTER MODE enabled (user request)");
4003 sdsfree(server
.masterhost
);
4004 server
.masterhost
= sdsdup(c
->argv
[1]->ptr
);
4005 server
.masterport
= atoi(c
->argv
[2]->ptr
);
4006 if (server
.master
) freeClient(server
.master
);
4007 server
.replstate
= REDIS_REPL_CONNECT
;
4008 redisLog(REDIS_NOTICE
,"SLAVE OF %s:%d enabled (user request)",
4009 server
.masterhost
, server
.masterport
);
4011 addReply(c
,shared
.ok
);
4014 /* ============================ Maxmemory directive ======================== */
4016 /* This function gets called when 'maxmemory' is set on the config file to limit
4017 * the max memory used by the server, and we are out of memory.
4018 * This function will try to, in order:
4020 * - Free objects from the free list
4021 * - Try to remove keys with an EXPIRE set
4023 * It is not possible to free enough memory to reach used-memory < maxmemory
4024 * the server will start refusing commands that will enlarge even more the
4027 static void freeMemoryIfNeeded(void) {
4028 while (server
.maxmemory
&& zmalloc_used_memory() > server
.maxmemory
) {
4029 if (listLength(server
.objfreelist
)) {
4032 listNode
*head
= listFirst(server
.objfreelist
);
4033 o
= listNodeValue(head
);
4034 listDelNode(server
.objfreelist
,head
);
4037 int j
, k
, freed
= 0;
4039 for (j
= 0; j
< server
.dbnum
; j
++) {
4041 robj
*minkey
= NULL
;
4042 struct dictEntry
*de
;
4044 if (dictSize(server
.db
[j
].expires
)) {
4046 /* From a sample of three keys drop the one nearest to
4047 * the natural expire */
4048 for (k
= 0; k
< 3; k
++) {
4051 de
= dictGetRandomKey(server
.db
[j
].expires
);
4052 t
= (time_t) dictGetEntryVal(de
);
4053 if (minttl
== -1 || t
< minttl
) {
4054 minkey
= dictGetEntryKey(de
);
4058 deleteKey(server
.db
+j
,minkey
);
4061 if (!freed
) return; /* nothing to free... */
4066 /* ================================= Debugging ============================== */
4068 static void debugCommand(redisClient
*c
) {
4069 if (!strcasecmp(c
->argv
[1]->ptr
,"segfault")) {
4071 } else if (!strcasecmp(c
->argv
[1]->ptr
,"object") && c
->argc
== 3) {
4072 dictEntry
*de
= dictFind(c
->db
->dict
,c
->argv
[2]);
4076 addReply(c
,shared
.nokeyerr
);
4079 key
= dictGetEntryKey(de
);
4080 val
= dictGetEntryVal(de
);
4081 addReplySds(c
,sdscatprintf(sdsempty(),
4082 "+Key at:%p refcount:%d, value at:%p refcount:%d\r\n",
4083 key
, key
->refcount
, val
, val
->refcount
));
4085 addReplySds(c
,sdsnew(
4086 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>]\r\n"));
4090 /* =================================== Main! ================================ */
4093 int linuxOvercommitMemoryValue(void) {
4094 FILE *fp
= fopen("/proc/sys/vm/overcommit_memory","r");
4098 if (fgets(buf
,64,fp
) == NULL
) {
4107 void linuxOvercommitMemoryWarning(void) {
4108 if (linuxOvercommitMemoryValue() == 0) {
4109 redisLog(REDIS_WARNING
,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'echo 1 > /proc/sys/vm/overcommit_memory' in your init scripts.");
4112 #endif /* __linux__ */
4114 static void daemonize(void) {
4118 if (fork() != 0) exit(0); /* parent exits */
4119 setsid(); /* create a new session */
4121 /* Every output goes to /dev/null. If Redis is daemonized but
4122 * the 'logfile' is set to 'stdout' in the configuration file
4123 * it will not log at all. */
4124 if ((fd
= open("/dev/null", O_RDWR
, 0)) != -1) {
4125 dup2(fd
, STDIN_FILENO
);
4126 dup2(fd
, STDOUT_FILENO
);
4127 dup2(fd
, STDERR_FILENO
);
4128 if (fd
> STDERR_FILENO
) close(fd
);
4130 /* Try to write the pid file */
4131 fp
= fopen(server
.pidfile
,"w");
4133 fprintf(fp
,"%d\n",getpid());
4138 int main(int argc
, char **argv
) {
4140 linuxOvercommitMemoryWarning();
4145 ResetServerSaveParams();
4146 loadServerConfig(argv
[1]);
4147 } else if (argc
> 2) {
4148 fprintf(stderr
,"Usage: ./redis-server [/path/to/redis.conf]\n");
4151 redisLog(REDIS_WARNING
,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
4154 if (server
.daemonize
) daemonize();
4155 redisLog(REDIS_NOTICE
,"Server started, Redis version " REDIS_VERSION
);
4156 if (rdbLoad(server
.dbfilename
) == REDIS_OK
)
4157 redisLog(REDIS_NOTICE
,"DB loaded from disk");
4158 if (aeCreateFileEvent(server
.el
, server
.fd
, AE_READABLE
,
4159 acceptHandler
, NULL
, NULL
) == AE_ERR
) oom("creating file event");
4160 redisLog(REDIS_NOTICE
,"The server is now ready to accept connections on port %d", server
.port
);
4162 aeDeleteEventLoop(server
.el
);