2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
30 #define REDIS_VERSION "0.101"
39 #define __USE_POSIX199309
49 #include <arpa/inet.h>
53 #include <sys/resource.h>
58 #include "ae.h" /* Event driven programming library */
59 #include "sds.h" /* Dynamic safe strings */
60 #include "anet.h" /* Networking the easy way */
61 #include "dict.h" /* Hash tables */
62 #include "adlist.h" /* Linked lists */
63 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
64 #include "lzf.h" /* LZF compression library */
65 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
71 /* Static server configuration */
72 #define REDIS_SERVERPORT 6379 /* TCP port */
73 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
74 #define REDIS_IOBUF_LEN 1024
75 #define REDIS_LOADBUF_LEN 1024
76 #define REDIS_STATIC_ARGS 4
77 #define REDIS_DEFAULT_DBNUM 16
78 #define REDIS_CONFIGLINE_MAX 1024
79 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
80 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
81 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
83 /* Hash table parameters */
84 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
85 #define REDIS_HT_MINSLOTS 16384 /* Never resize the HT under this */
88 #define REDIS_CMD_BULK 1 /* Bulk write command */
89 #define REDIS_CMD_INLINE 2 /* Inline command */
90 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
91 this flags will return an error when the 'maxmemory' option is set in the
92 config file and the server is using more than maxmemory bytes of memory.
93 In short this commands are denied on low memory conditions. */
94 #define REDIS_CMD_DENYOOM 4
97 #define REDIS_STRING 0
102 /* Object types only used for dumping to disk */
103 #define REDIS_EXPIRETIME 253
104 #define REDIS_SELECTDB 254
105 #define REDIS_EOF 255
107 /* Defines related to the dump file format. To store 32 bits lengths for short
108 * keys requires a lot of space, so we check the most significant 2 bits of
109 * the first byte to interpreter the length:
111 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
112 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
113 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
114 * 11|000000 this means: specially encoded object will follow. The six bits
115 * number specify the kind of object that follows.
116 * See the REDIS_RDB_ENC_* defines.
118 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
119 * values, will fit inside. */
120 #define REDIS_RDB_6BITLEN 0
121 #define REDIS_RDB_14BITLEN 1
122 #define REDIS_RDB_32BITLEN 2
123 #define REDIS_RDB_ENCVAL 3
124 #define REDIS_RDB_LENERR UINT_MAX
126 /* When a length of a string object stored on disk has the first two bits
127 * set, the remaining two bits specify a special encoding for the object
128 * accordingly to the following defines: */
129 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
130 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
131 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
132 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
135 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
136 #define REDIS_SLAVE 2 /* This client is a slave server */
137 #define REDIS_MASTER 4 /* This client is a master server */
138 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
140 /* Slave replication state - slave side */
141 #define REDIS_REPL_NONE 0 /* No active replication */
142 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
143 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
145 /* Slave replication state - from the point of view of master
146 * Note that in SEND_BULK and ONLINE state the slave receives new updates
147 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
148 * to start the next background saving in order to send updates to it. */
149 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
150 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
151 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
152 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
154 /* List related stuff */
158 /* Sort operations */
159 #define REDIS_SORT_GET 0
160 #define REDIS_SORT_DEL 1
161 #define REDIS_SORT_INCR 2
162 #define REDIS_SORT_DECR 3
163 #define REDIS_SORT_ASC 4
164 #define REDIS_SORT_DESC 5
165 #define REDIS_SORTKEY_MAX 1024
168 #define REDIS_DEBUG 0
169 #define REDIS_NOTICE 1
170 #define REDIS_WARNING 2
172 /* Anti-warning macro... */
173 #define REDIS_NOTUSED(V) ((void) V)
176 /*================================= Data types ============================== */
178 /* A redis object, that is a type able to hold a string / list / set */
179 typedef struct redisObject
{
185 typedef struct redisDb
{
191 /* With multiplexing we need to take per-clinet state.
192 * Clients are taken in a liked list. */
193 typedef struct redisClient
{
200 int bulklen
; /* bulk read len. -1 if not in bulk read mode */
203 time_t lastinteraction
; /* time of the last interaction, used for timeout */
204 int flags
; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
205 int slaveseldb
; /* slave selected db, if this client is a slave */
206 int authenticated
; /* when requirepass is non-NULL */
207 int replstate
; /* replication state if this is a slave */
208 int repldbfd
; /* replication DB file descriptor */
209 long repldboff
; /* replication DB file offset */
210 off_t repldbsize
; /* replication DB file size */
218 /* Global server state structure */
224 unsigned int sharingpoolsize
;
225 long long dirty
; /* changes to DB from the last save */
227 list
*slaves
, *monitors
;
228 char neterr
[ANET_ERR_LEN
];
230 int cronloops
; /* number of times the cron function run */
231 list
*objfreelist
; /* A list of freed objects to avoid malloc() */
232 time_t lastsave
; /* Unix time of last save succeeede */
233 size_t usedmemory
; /* Used memory in megabytes */
234 /* Fields used only for stats */
235 time_t stat_starttime
; /* server start time */
236 long long stat_numcommands
; /* number of processed commands */
237 long long stat_numconnections
; /* number of connections received */
245 int bgsaveinprogress
;
246 struct saveparam
*saveparams
;
253 /* Replication related */
257 redisClient
*master
; /* client that is master for this slave */
259 unsigned int maxclients
;
260 unsigned int maxmemory
;
261 /* Sort parameters - qsort_r() is only available under BSD so we
262 * have to take this state global, in order to pass it to sortCompare() */
268 typedef void redisCommandProc(redisClient
*c
);
269 struct redisCommand
{
271 redisCommandProc
*proc
;
276 typedef struct _redisSortObject
{
284 typedef struct _redisSortOperation
{
287 } redisSortOperation
;
289 struct sharedObjectsStruct
{
290 robj
*crlf
, *ok
, *err
, *emptybulk
, *czero
, *cone
, *pong
, *space
,
291 *colon
, *nullbulk
, *nullmultibulk
,
292 *emptymultibulk
, *wrongtypeerr
, *nokeyerr
, *syntaxerr
, *sameobjecterr
,
293 *outofrangeerr
, *plus
,
294 *select0
, *select1
, *select2
, *select3
, *select4
,
295 *select5
, *select6
, *select7
, *select8
, *select9
;
298 /*================================ Prototypes =============================== */
300 static void freeStringObject(robj
*o
);
301 static void freeListObject(robj
*o
);
302 static void freeSetObject(robj
*o
);
303 static void decrRefCount(void *o
);
304 static robj
*createObject(int type
, void *ptr
);
305 static void freeClient(redisClient
*c
);
306 static int rdbLoad(char *filename
);
307 static void addReply(redisClient
*c
, robj
*obj
);
308 static void addReplySds(redisClient
*c
, sds s
);
309 static void incrRefCount(robj
*o
);
310 static int rdbSaveBackground(char *filename
);
311 static robj
*createStringObject(char *ptr
, size_t len
);
312 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
);
313 static int syncWithMaster(void);
314 static robj
*tryObjectSharing(robj
*o
);
315 static int removeExpire(redisDb
*db
, robj
*key
);
316 static int expireIfNeeded(redisDb
*db
, robj
*key
);
317 static int deleteIfVolatile(redisDb
*db
, robj
*key
);
318 static int deleteKey(redisDb
*db
, robj
*key
);
319 static time_t getExpire(redisDb
*db
, robj
*key
);
320 static int setExpire(redisDb
*db
, robj
*key
, time_t when
);
321 static void updateSalvesWaitingBgsave(int bgsaveerr
);
322 static void freeMemoryIfNeeded(void);
324 static void authCommand(redisClient
*c
);
325 static void pingCommand(redisClient
*c
);
326 static void echoCommand(redisClient
*c
);
327 static void setCommand(redisClient
*c
);
328 static void setnxCommand(redisClient
*c
);
329 static void getCommand(redisClient
*c
);
330 static void delCommand(redisClient
*c
);
331 static void existsCommand(redisClient
*c
);
332 static void incrCommand(redisClient
*c
);
333 static void decrCommand(redisClient
*c
);
334 static void incrbyCommand(redisClient
*c
);
335 static void decrbyCommand(redisClient
*c
);
336 static void selectCommand(redisClient
*c
);
337 static void randomkeyCommand(redisClient
*c
);
338 static void keysCommand(redisClient
*c
);
339 static void dbsizeCommand(redisClient
*c
);
340 static void lastsaveCommand(redisClient
*c
);
341 static void saveCommand(redisClient
*c
);
342 static void bgsaveCommand(redisClient
*c
);
343 static void shutdownCommand(redisClient
*c
);
344 static void moveCommand(redisClient
*c
);
345 static void renameCommand(redisClient
*c
);
346 static void renamenxCommand(redisClient
*c
);
347 static void lpushCommand(redisClient
*c
);
348 static void rpushCommand(redisClient
*c
);
349 static void lpopCommand(redisClient
*c
);
350 static void rpopCommand(redisClient
*c
);
351 static void llenCommand(redisClient
*c
);
352 static void lindexCommand(redisClient
*c
);
353 static void lrangeCommand(redisClient
*c
);
354 static void ltrimCommand(redisClient
*c
);
355 static void typeCommand(redisClient
*c
);
356 static void lsetCommand(redisClient
*c
);
357 static void saddCommand(redisClient
*c
);
358 static void sremCommand(redisClient
*c
);
359 static void smoveCommand(redisClient
*c
);
360 static void sismemberCommand(redisClient
*c
);
361 static void scardCommand(redisClient
*c
);
362 static void sinterCommand(redisClient
*c
);
363 static void sinterstoreCommand(redisClient
*c
);
364 static void sunionCommand(redisClient
*c
);
365 static void sunionstoreCommand(redisClient
*c
);
366 static void sdiffCommand(redisClient
*c
);
367 static void sdiffstoreCommand(redisClient
*c
);
368 static void syncCommand(redisClient
*c
);
369 static void flushdbCommand(redisClient
*c
);
370 static void flushallCommand(redisClient
*c
);
371 static void sortCommand(redisClient
*c
);
372 static void lremCommand(redisClient
*c
);
373 static void infoCommand(redisClient
*c
);
374 static void mgetCommand(redisClient
*c
);
375 static void monitorCommand(redisClient
*c
);
376 static void expireCommand(redisClient
*c
);
377 static void getSetCommand(redisClient
*c
);
378 static void ttlCommand(redisClient
*c
);
379 static void slaveofCommand(redisClient
*c
);
380 static void debugCommand(redisClient
*c
);
382 /*================================= Globals ================================= */
385 static struct redisServer server
; /* server global state */
386 static struct redisCommand cmdTable
[] = {
387 {"get",getCommand
,2,REDIS_CMD_INLINE
},
388 {"set",setCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
389 {"setnx",setnxCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
390 {"del",delCommand
,-2,REDIS_CMD_INLINE
},
391 {"exists",existsCommand
,2,REDIS_CMD_INLINE
},
392 {"incr",incrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
393 {"decr",decrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
394 {"mget",mgetCommand
,-2,REDIS_CMD_INLINE
},
395 {"rpush",rpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
396 {"lpush",lpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
397 {"rpop",rpopCommand
,2,REDIS_CMD_INLINE
},
398 {"lpop",lpopCommand
,2,REDIS_CMD_INLINE
},
399 {"llen",llenCommand
,2,REDIS_CMD_INLINE
},
400 {"lindex",lindexCommand
,3,REDIS_CMD_INLINE
},
401 {"lset",lsetCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
402 {"lrange",lrangeCommand
,4,REDIS_CMD_INLINE
},
403 {"ltrim",ltrimCommand
,4,REDIS_CMD_INLINE
},
404 {"lrem",lremCommand
,4,REDIS_CMD_BULK
},
405 {"sadd",saddCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
406 {"srem",sremCommand
,3,REDIS_CMD_BULK
},
407 {"smove",smoveCommand
,4,REDIS_CMD_BULK
},
408 {"sismember",sismemberCommand
,3,REDIS_CMD_BULK
},
409 {"scard",scardCommand
,2,REDIS_CMD_INLINE
},
410 {"sinter",sinterCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
411 {"sinterstore",sinterstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
412 {"sunion",sunionCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
413 {"sunionstore",sunionstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
414 {"sdiff",sdiffCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
415 {"sdiffstore",sdiffstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
416 {"smembers",sinterCommand
,2,REDIS_CMD_INLINE
},
417 {"incrby",incrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
418 {"decrby",decrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
419 {"getset",getSetCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
420 {"randomkey",randomkeyCommand
,1,REDIS_CMD_INLINE
},
421 {"select",selectCommand
,2,REDIS_CMD_INLINE
},
422 {"move",moveCommand
,3,REDIS_CMD_INLINE
},
423 {"rename",renameCommand
,3,REDIS_CMD_INLINE
},
424 {"renamenx",renamenxCommand
,3,REDIS_CMD_INLINE
},
425 {"expire",expireCommand
,3,REDIS_CMD_INLINE
},
426 {"keys",keysCommand
,2,REDIS_CMD_INLINE
},
427 {"dbsize",dbsizeCommand
,1,REDIS_CMD_INLINE
},
428 {"auth",authCommand
,2,REDIS_CMD_INLINE
},
429 {"ping",pingCommand
,1,REDIS_CMD_INLINE
},
430 {"echo",echoCommand
,2,REDIS_CMD_BULK
},
431 {"save",saveCommand
,1,REDIS_CMD_INLINE
},
432 {"bgsave",bgsaveCommand
,1,REDIS_CMD_INLINE
},
433 {"shutdown",shutdownCommand
,1,REDIS_CMD_INLINE
},
434 {"lastsave",lastsaveCommand
,1,REDIS_CMD_INLINE
},
435 {"type",typeCommand
,2,REDIS_CMD_INLINE
},
436 {"sync",syncCommand
,1,REDIS_CMD_INLINE
},
437 {"flushdb",flushdbCommand
,1,REDIS_CMD_INLINE
},
438 {"flushall",flushallCommand
,1,REDIS_CMD_INLINE
},
439 {"sort",sortCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
440 {"info",infoCommand
,1,REDIS_CMD_INLINE
},
441 {"monitor",monitorCommand
,1,REDIS_CMD_INLINE
},
442 {"ttl",ttlCommand
,2,REDIS_CMD_INLINE
},
443 {"slaveof",slaveofCommand
,3,REDIS_CMD_INLINE
},
444 {"debug",debugCommand
,-2,REDIS_CMD_INLINE
},
448 /*============================ Utility functions ============================ */
450 /* Glob-style pattern matching. */
451 int stringmatchlen(const char *pattern
, int patternLen
,
452 const char *string
, int stringLen
, int nocase
)
457 while (pattern
[1] == '*') {
462 return 1; /* match */
464 if (stringmatchlen(pattern
+1, patternLen
-1,
465 string
, stringLen
, nocase
))
466 return 1; /* match */
470 return 0; /* no match */
474 return 0; /* no match */
484 not = pattern
[0] == '^';
491 if (pattern
[0] == '\\') {
494 if (pattern
[0] == string
[0])
496 } else if (pattern
[0] == ']') {
498 } else if (patternLen
== 0) {
502 } else if (pattern
[1] == '-' && patternLen
>= 3) {
503 int start
= pattern
[0];
504 int end
= pattern
[2];
512 start
= tolower(start
);
518 if (c
>= start
&& c
<= end
)
522 if (pattern
[0] == string
[0])
525 if (tolower((int)pattern
[0]) == tolower((int)string
[0]))
535 return 0; /* no match */
541 if (patternLen
>= 2) {
548 if (pattern
[0] != string
[0])
549 return 0; /* no match */
551 if (tolower((int)pattern
[0]) != tolower((int)string
[0]))
552 return 0; /* no match */
560 if (stringLen
== 0) {
561 while(*pattern
== '*') {
568 if (patternLen
== 0 && stringLen
== 0)
573 void redisLog(int level
, const char *fmt
, ...)
578 fp
= (server
.logfile
== NULL
) ? stdout
: fopen(server
.logfile
,"a");
582 if (level
>= server
.verbosity
) {
588 strftime(buf
,64,"%d %b %H:%M:%S",gmtime(&now
));
589 fprintf(fp
,"%s %c ",buf
,c
[level
]);
590 vfprintf(fp
, fmt
, ap
);
596 if (server
.logfile
) fclose(fp
);
599 /*====================== Hash table type implementation ==================== */
601 /* This is an hash table type that uses the SDS dynamic strings libary as
602 * keys and radis objects as values (objects can hold SDS strings,
605 static int sdsDictKeyCompare(void *privdata
, const void *key1
,
609 DICT_NOTUSED(privdata
);
611 l1
= sdslen((sds
)key1
);
612 l2
= sdslen((sds
)key2
);
613 if (l1
!= l2
) return 0;
614 return memcmp(key1
, key2
, l1
) == 0;
617 static void dictRedisObjectDestructor(void *privdata
, void *val
)
619 DICT_NOTUSED(privdata
);
624 static int dictSdsKeyCompare(void *privdata
, const void *key1
,
627 const robj
*o1
= key1
, *o2
= key2
;
628 return sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
631 static unsigned int dictSdsHash(const void *key
) {
633 return dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
636 static dictType setDictType
= {
637 dictSdsHash
, /* hash function */
640 dictSdsKeyCompare
, /* key compare */
641 dictRedisObjectDestructor
, /* key destructor */
642 NULL
/* val destructor */
645 static dictType hashDictType
= {
646 dictSdsHash
, /* hash function */
649 dictSdsKeyCompare
, /* key compare */
650 dictRedisObjectDestructor
, /* key destructor */
651 dictRedisObjectDestructor
/* val destructor */
654 /* ========================= Random utility functions ======================= */
656 /* Redis generally does not try to recover from out of memory conditions
657 * when allocating objects or strings, it is not clear if it will be possible
658 * to report this condition to the client since the networking layer itself
659 * is based on heap allocation for send buffers, so we simply abort.
660 * At least the code will be simpler to read... */
661 static void oom(const char *msg
) {
662 fprintf(stderr
, "%s: Out of memory\n",msg
);
668 /* ====================== Redis server networking stuff ===================== */
669 void closeTimedoutClients(void) {
672 time_t now
= time(NULL
);
674 listRewind(server
.clients
);
675 while ((ln
= listYield(server
.clients
)) != NULL
) {
676 c
= listNodeValue(ln
);
677 if (!(c
->flags
& REDIS_SLAVE
) && /* no timeout for slaves */
678 !(c
->flags
& REDIS_MASTER
) && /* no timeout for masters */
679 (now
- c
->lastinteraction
> server
.maxidletime
)) {
680 redisLog(REDIS_DEBUG
,"Closing idle client");
686 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
687 * we resize the hash table to save memory */
688 void tryResizeHashTables(void) {
691 for (j
= 0; j
< server
.dbnum
; j
++) {
692 long long size
, used
;
694 size
= dictSlots(server
.db
[j
].dict
);
695 used
= dictSize(server
.db
[j
].dict
);
696 if (size
&& used
&& size
> REDIS_HT_MINSLOTS
&&
697 (used
*100/size
< REDIS_HT_MINFILL
)) {
698 redisLog(REDIS_NOTICE
,"The hash table %d is too sparse, resize it...",j
);
699 dictResize(server
.db
[j
].dict
);
700 redisLog(REDIS_NOTICE
,"Hash table %d resized.",j
);
705 int serverCron(struct aeEventLoop
*eventLoop
, long long id
, void *clientData
) {
706 int j
, loops
= server
.cronloops
++;
707 REDIS_NOTUSED(eventLoop
);
709 REDIS_NOTUSED(clientData
);
711 /* Update the global state with the amount of used memory */
712 server
.usedmemory
= zmalloc_used_memory();
714 /* Show some info about non-empty databases */
715 for (j
= 0; j
< server
.dbnum
; j
++) {
716 long long size
, used
, vkeys
;
718 size
= dictSlots(server
.db
[j
].dict
);
719 used
= dictSize(server
.db
[j
].dict
);
720 vkeys
= dictSize(server
.db
[j
].expires
);
721 if (!(loops
% 5) && used
> 0) {
722 redisLog(REDIS_DEBUG
,"DB %d: %d keys (%d volatile) in %d slots HT.",j
,used
,vkeys
,size
);
723 /* dictPrintStats(server.dict); */
727 /* We don't want to resize the hash tables while a bacground saving
728 * is in progress: the saving child is created using fork() that is
729 * implemented with a copy-on-write semantic in most modern systems, so
730 * if we resize the HT while there is the saving child at work actually
731 * a lot of memory movements in the parent will cause a lot of pages
733 if (!server
.bgsaveinprogress
) tryResizeHashTables();
735 /* Show information about connected clients */
737 redisLog(REDIS_DEBUG
,"%d clients connected (%d slaves), %zu bytes in use",
738 listLength(server
.clients
)-listLength(server
.slaves
),
739 listLength(server
.slaves
),
741 dictSize(server
.sharingpool
));
744 /* Close connections of timedout clients */
745 if (server
.maxidletime
&& !(loops
% 10))
746 closeTimedoutClients();
748 /* Check if a background saving in progress terminated */
749 if (server
.bgsaveinprogress
) {
751 /* XXX: TODO handle the case of the saving child killed */
752 if (wait4(-1,&statloc
,WNOHANG
,NULL
)) {
753 int exitcode
= WEXITSTATUS(statloc
);
755 redisLog(REDIS_NOTICE
,
756 "Background saving terminated with success");
758 server
.lastsave
= time(NULL
);
760 redisLog(REDIS_WARNING
,
761 "Background saving error");
763 server
.bgsaveinprogress
= 0;
764 updateSalvesWaitingBgsave(exitcode
== 0 ? REDIS_OK
: REDIS_ERR
);
767 /* If there is not a background saving in progress check if
768 * we have to save now */
769 time_t now
= time(NULL
);
770 for (j
= 0; j
< server
.saveparamslen
; j
++) {
771 struct saveparam
*sp
= server
.saveparams
+j
;
773 if (server
.dirty
>= sp
->changes
&&
774 now
-server
.lastsave
> sp
->seconds
) {
775 redisLog(REDIS_NOTICE
,"%d changes in %d seconds. Saving...",
776 sp
->changes
, sp
->seconds
);
777 rdbSaveBackground(server
.dbfilename
);
783 /* Try to expire a few timed out keys */
784 for (j
= 0; j
< server
.dbnum
; j
++) {
785 redisDb
*db
= server
.db
+j
;
786 int num
= dictSize(db
->expires
);
789 time_t now
= time(NULL
);
791 if (num
> REDIS_EXPIRELOOKUPS_PER_CRON
)
792 num
= REDIS_EXPIRELOOKUPS_PER_CRON
;
797 if ((de
= dictGetRandomKey(db
->expires
)) == NULL
) break;
798 t
= (time_t) dictGetEntryVal(de
);
800 deleteKey(db
,dictGetEntryKey(de
));
806 /* Check if we should connect to a MASTER */
807 if (server
.replstate
== REDIS_REPL_CONNECT
) {
808 redisLog(REDIS_NOTICE
,"Connecting to MASTER...");
809 if (syncWithMaster() == REDIS_OK
) {
810 redisLog(REDIS_NOTICE
,"MASTER <-> SLAVE sync succeeded");
816 static void createSharedObjects(void) {
817 shared
.crlf
= createObject(REDIS_STRING
,sdsnew("\r\n"));
818 shared
.ok
= createObject(REDIS_STRING
,sdsnew("+OK\r\n"));
819 shared
.err
= createObject(REDIS_STRING
,sdsnew("-ERR\r\n"));
820 shared
.emptybulk
= createObject(REDIS_STRING
,sdsnew("$0\r\n\r\n"));
821 shared
.czero
= createObject(REDIS_STRING
,sdsnew(":0\r\n"));
822 shared
.cone
= createObject(REDIS_STRING
,sdsnew(":1\r\n"));
823 shared
.nullbulk
= createObject(REDIS_STRING
,sdsnew("$-1\r\n"));
824 shared
.nullmultibulk
= createObject(REDIS_STRING
,sdsnew("*-1\r\n"));
825 shared
.emptymultibulk
= createObject(REDIS_STRING
,sdsnew("*0\r\n"));
827 shared
.pong
= createObject(REDIS_STRING
,sdsnew("+PONG\r\n"));
828 shared
.wrongtypeerr
= createObject(REDIS_STRING
,sdsnew(
829 "-ERR Operation against a key holding the wrong kind of value\r\n"));
830 shared
.nokeyerr
= createObject(REDIS_STRING
,sdsnew(
831 "-ERR no such key\r\n"));
832 shared
.syntaxerr
= createObject(REDIS_STRING
,sdsnew(
833 "-ERR syntax error\r\n"));
834 shared
.sameobjecterr
= createObject(REDIS_STRING
,sdsnew(
835 "-ERR source and destination objects are the same\r\n"));
836 shared
.outofrangeerr
= createObject(REDIS_STRING
,sdsnew(
837 "-ERR index out of range\r\n"));
838 shared
.space
= createObject(REDIS_STRING
,sdsnew(" "));
839 shared
.colon
= createObject(REDIS_STRING
,sdsnew(":"));
840 shared
.plus
= createObject(REDIS_STRING
,sdsnew("+"));
841 shared
.select0
= createStringObject("select 0\r\n",10);
842 shared
.select1
= createStringObject("select 1\r\n",10);
843 shared
.select2
= createStringObject("select 2\r\n",10);
844 shared
.select3
= createStringObject("select 3\r\n",10);
845 shared
.select4
= createStringObject("select 4\r\n",10);
846 shared
.select5
= createStringObject("select 5\r\n",10);
847 shared
.select6
= createStringObject("select 6\r\n",10);
848 shared
.select7
= createStringObject("select 7\r\n",10);
849 shared
.select8
= createStringObject("select 8\r\n",10);
850 shared
.select9
= createStringObject("select 9\r\n",10);
853 static void appendServerSaveParams(time_t seconds
, int changes
) {
854 server
.saveparams
= zrealloc(server
.saveparams
,sizeof(struct saveparam
)*(server
.saveparamslen
+1));
855 if (server
.saveparams
== NULL
) oom("appendServerSaveParams");
856 server
.saveparams
[server
.saveparamslen
].seconds
= seconds
;
857 server
.saveparams
[server
.saveparamslen
].changes
= changes
;
858 server
.saveparamslen
++;
861 static void ResetServerSaveParams() {
862 zfree(server
.saveparams
);
863 server
.saveparams
= NULL
;
864 server
.saveparamslen
= 0;
867 static void initServerConfig() {
868 server
.dbnum
= REDIS_DEFAULT_DBNUM
;
869 server
.port
= REDIS_SERVERPORT
;
870 server
.verbosity
= REDIS_DEBUG
;
871 server
.maxidletime
= REDIS_MAXIDLETIME
;
872 server
.saveparams
= NULL
;
873 server
.logfile
= NULL
; /* NULL = log on standard output */
874 server
.bindaddr
= NULL
;
875 server
.glueoutputbuf
= 1;
876 server
.daemonize
= 0;
877 server
.pidfile
= "/var/run/redis.pid";
878 server
.dbfilename
= "dump.rdb";
879 server
.requirepass
= NULL
;
880 server
.shareobjects
= 0;
881 server
.maxclients
= 0;
882 server
.maxmemory
= 0;
883 ResetServerSaveParams();
885 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
886 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
887 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
888 /* Replication related */
890 server
.masterhost
= NULL
;
891 server
.masterport
= 6379;
892 server
.master
= NULL
;
893 server
.replstate
= REDIS_REPL_NONE
;
896 static void initServer() {
899 signal(SIGHUP
, SIG_IGN
);
900 signal(SIGPIPE
, SIG_IGN
);
902 server
.clients
= listCreate();
903 server
.slaves
= listCreate();
904 server
.monitors
= listCreate();
905 server
.objfreelist
= listCreate();
906 createSharedObjects();
907 server
.el
= aeCreateEventLoop();
908 server
.db
= zmalloc(sizeof(redisDb
)*server
.dbnum
);
909 server
.sharingpool
= dictCreate(&setDictType
,NULL
);
910 server
.sharingpoolsize
= 1024;
911 if (!server
.db
|| !server
.clients
|| !server
.slaves
|| !server
.monitors
|| !server
.el
|| !server
.objfreelist
)
912 oom("server initialization"); /* Fatal OOM */
913 server
.fd
= anetTcpServer(server
.neterr
, server
.port
, server
.bindaddr
);
914 if (server
.fd
== -1) {
915 redisLog(REDIS_WARNING
, "Opening TCP port: %s", server
.neterr
);
918 for (j
= 0; j
< server
.dbnum
; j
++) {
919 server
.db
[j
].dict
= dictCreate(&hashDictType
,NULL
);
920 server
.db
[j
].expires
= dictCreate(&setDictType
,NULL
);
923 server
.cronloops
= 0;
924 server
.bgsaveinprogress
= 0;
925 server
.lastsave
= time(NULL
);
927 server
.usedmemory
= 0;
928 server
.stat_numcommands
= 0;
929 server
.stat_numconnections
= 0;
930 server
.stat_starttime
= time(NULL
);
931 aeCreateTimeEvent(server
.el
, 1000, serverCron
, NULL
, NULL
);
934 /* Empty the whole database */
935 static long long emptyDb() {
937 long long removed
= 0;
939 for (j
= 0; j
< server
.dbnum
; j
++) {
940 removed
+= dictSize(server
.db
[j
].dict
);
941 dictEmpty(server
.db
[j
].dict
);
942 dictEmpty(server
.db
[j
].expires
);
947 static int yesnotoi(char *s
) {
948 if (!strcasecmp(s
,"yes")) return 1;
949 else if (!strcasecmp(s
,"no")) return 0;
953 /* I agree, this is a very rudimental way to load a configuration...
954 will improve later if the config gets more complex */
955 static void loadServerConfig(char *filename
) {
956 FILE *fp
= fopen(filename
,"r");
957 char buf
[REDIS_CONFIGLINE_MAX
+1], *err
= NULL
;
962 redisLog(REDIS_WARNING
,"Fatal error, can't open config file");
965 while(fgets(buf
,REDIS_CONFIGLINE_MAX
+1,fp
) != NULL
) {
971 line
= sdstrim(line
," \t\r\n");
973 /* Skip comments and blank lines*/
974 if (line
[0] == '#' || line
[0] == '\0') {
979 /* Split into arguments */
980 argv
= sdssplitlen(line
,sdslen(line
)," ",1,&argc
);
983 /* Execute config directives */
984 if (!strcasecmp(argv
[0],"timeout") && argc
== 2) {
985 server
.maxidletime
= atoi(argv
[1]);
986 if (server
.maxidletime
< 0) {
987 err
= "Invalid timeout value"; goto loaderr
;
989 } else if (!strcasecmp(argv
[0],"port") && argc
== 2) {
990 server
.port
= atoi(argv
[1]);
991 if (server
.port
< 1 || server
.port
> 65535) {
992 err
= "Invalid port"; goto loaderr
;
994 } else if (!strcasecmp(argv
[0],"bind") && argc
== 2) {
995 server
.bindaddr
= zstrdup(argv
[1]);
996 } else if (!strcasecmp(argv
[0],"save") && argc
== 3) {
997 int seconds
= atoi(argv
[1]);
998 int changes
= atoi(argv
[2]);
999 if (seconds
< 1 || changes
< 0) {
1000 err
= "Invalid save parameters"; goto loaderr
;
1002 appendServerSaveParams(seconds
,changes
);
1003 } else if (!strcasecmp(argv
[0],"dir") && argc
== 2) {
1004 if (chdir(argv
[1]) == -1) {
1005 redisLog(REDIS_WARNING
,"Can't chdir to '%s': %s",
1006 argv
[1], strerror(errno
));
1009 } else if (!strcasecmp(argv
[0],"loglevel") && argc
== 2) {
1010 if (!strcasecmp(argv
[1],"debug")) server
.verbosity
= REDIS_DEBUG
;
1011 else if (!strcasecmp(argv
[1],"notice")) server
.verbosity
= REDIS_NOTICE
;
1012 else if (!strcasecmp(argv
[1],"warning")) server
.verbosity
= REDIS_WARNING
;
1014 err
= "Invalid log level. Must be one of debug, notice, warning";
1017 } else if (!strcasecmp(argv
[0],"logfile") && argc
== 2) {
1020 server
.logfile
= zstrdup(argv
[1]);
1021 if (!strcasecmp(server
.logfile
,"stdout")) {
1022 zfree(server
.logfile
);
1023 server
.logfile
= NULL
;
1025 if (server
.logfile
) {
1026 /* Test if we are able to open the file. The server will not
1027 * be able to abort just for this problem later... */
1028 fp
= fopen(server
.logfile
,"a");
1030 err
= sdscatprintf(sdsempty(),
1031 "Can't open the log file: %s", strerror(errno
));
1036 } else if (!strcasecmp(argv
[0],"databases") && argc
== 2) {
1037 server
.dbnum
= atoi(argv
[1]);
1038 if (server
.dbnum
< 1) {
1039 err
= "Invalid number of databases"; goto loaderr
;
1041 } else if (!strcasecmp(argv
[0],"maxclients") && argc
== 2) {
1042 server
.maxclients
= atoi(argv
[1]);
1043 } else if (!strcasecmp(argv
[0],"maxmemory") && argc
== 2) {
1044 server
.maxmemory
= atoi(argv
[1]);
1045 } else if (!strcasecmp(argv
[0],"slaveof") && argc
== 3) {
1046 server
.masterhost
= sdsnew(argv
[1]);
1047 server
.masterport
= atoi(argv
[2]);
1048 server
.replstate
= REDIS_REPL_CONNECT
;
1049 } else if (!strcasecmp(argv
[0],"glueoutputbuf") && argc
== 2) {
1050 if ((server
.glueoutputbuf
= yesnotoi(argv
[1])) == -1) {
1051 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1053 } else if (!strcasecmp(argv
[0],"shareobjects") && argc
== 2) {
1054 if ((server
.shareobjects
= yesnotoi(argv
[1])) == -1) {
1055 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1057 } else if (!strcasecmp(argv
[0],"shareobjectspoolsize") && argc
== 2) {
1058 server
.sharingpoolsize
= atoi(argv
[1]);
1059 if (server
.sharingpoolsize
< 1) {
1060 err
= "invalid object sharing pool size"; goto loaderr
;
1062 } else if (!strcasecmp(argv
[0],"daemonize") && argc
== 2) {
1063 if ((server
.daemonize
= yesnotoi(argv
[1])) == -1) {
1064 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1066 } else if (!strcasecmp(argv
[0],"requirepass") && argc
== 2) {
1067 server
.requirepass
= zstrdup(argv
[1]);
1068 } else if (!strcasecmp(argv
[0],"pidfile") && argc
== 2) {
1069 server
.pidfile
= zstrdup(argv
[1]);
1070 } else if (!strcasecmp(argv
[0],"dbfilename") && argc
== 2) {
1071 server
.dbfilename
= zstrdup(argv
[1]);
1073 err
= "Bad directive or wrong number of arguments"; goto loaderr
;
1075 for (j
= 0; j
< argc
; j
++)
1084 fprintf(stderr
, "\n*** FATAL CONFIG FILE ERROR ***\n");
1085 fprintf(stderr
, "Reading the configuration file, at line %d\n", linenum
);
1086 fprintf(stderr
, ">>> '%s'\n", line
);
1087 fprintf(stderr
, "%s\n", err
);
1091 static void freeClientArgv(redisClient
*c
) {
1094 for (j
= 0; j
< c
->argc
; j
++)
1095 decrRefCount(c
->argv
[j
]);
1099 static void freeClient(redisClient
*c
) {
1102 aeDeleteFileEvent(server
.el
,c
->fd
,AE_READABLE
);
1103 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1104 sdsfree(c
->querybuf
);
1105 listRelease(c
->reply
);
1108 ln
= listSearchKey(server
.clients
,c
);
1110 listDelNode(server
.clients
,ln
);
1111 if (c
->flags
& REDIS_SLAVE
) {
1112 if (c
->replstate
== REDIS_REPL_SEND_BULK
&& c
->repldbfd
!= -1)
1114 list
*l
= (c
->flags
& REDIS_MONITOR
) ? server
.monitors
: server
.slaves
;
1115 ln
= listSearchKey(l
,c
);
1119 if (c
->flags
& REDIS_MASTER
) {
1120 server
.master
= NULL
;
1121 server
.replstate
= REDIS_REPL_CONNECT
;
1127 static void glueReplyBuffersIfNeeded(redisClient
*c
) {
1132 listRewind(c
->reply
);
1133 while((ln
= listYield(c
->reply
))) {
1135 totlen
+= sdslen(o
->ptr
);
1136 /* This optimization makes more sense if we don't have to copy
1138 if (totlen
> 1024) return;
1144 listRewind(c
->reply
);
1145 while((ln
= listYield(c
->reply
))) {
1147 memcpy(buf
+copylen
,o
->ptr
,sdslen(o
->ptr
));
1148 copylen
+= sdslen(o
->ptr
);
1149 listDelNode(c
->reply
,ln
);
1151 /* Now the output buffer is empty, add the new single element */
1152 o
= createObject(REDIS_STRING
,sdsnewlen(buf
,totlen
));
1153 if (!listAddNodeTail(c
->reply
,o
)) oom("listAddNodeTail");
1157 static void sendReplyToClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1158 redisClient
*c
= privdata
;
1159 int nwritten
= 0, totwritten
= 0, objlen
;
1162 REDIS_NOTUSED(mask
);
1164 if (server
.glueoutputbuf
&& listLength(c
->reply
) > 1)
1165 glueReplyBuffersIfNeeded(c
);
1166 while(listLength(c
->reply
)) {
1167 o
= listNodeValue(listFirst(c
->reply
));
1168 objlen
= sdslen(o
->ptr
);
1171 listDelNode(c
->reply
,listFirst(c
->reply
));
1175 if (c
->flags
& REDIS_MASTER
) {
1176 nwritten
= objlen
- c
->sentlen
;
1178 nwritten
= write(fd
, ((char*)o
->ptr
)+c
->sentlen
, objlen
- c
->sentlen
);
1179 if (nwritten
<= 0) break;
1181 c
->sentlen
+= nwritten
;
1182 totwritten
+= nwritten
;
1183 /* If we fully sent the object on head go to the next one */
1184 if (c
->sentlen
== objlen
) {
1185 listDelNode(c
->reply
,listFirst(c
->reply
));
1189 if (nwritten
== -1) {
1190 if (errno
== EAGAIN
) {
1193 redisLog(REDIS_DEBUG
,
1194 "Error writing to client: %s", strerror(errno
));
1199 if (totwritten
> 0) c
->lastinteraction
= time(NULL
);
1200 if (listLength(c
->reply
) == 0) {
1202 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1206 static struct redisCommand
*lookupCommand(char *name
) {
1208 while(cmdTable
[j
].name
!= NULL
) {
1209 if (!strcasecmp(name
,cmdTable
[j
].name
)) return &cmdTable
[j
];
1215 /* resetClient prepare the client to process the next command */
1216 static void resetClient(redisClient
*c
) {
1221 /* If this function gets called we already read a whole
1222 * command, argments are in the client argv/argc fields.
1223 * processCommand() execute the command or prepare the
1224 * server for a bulk read from the client.
1226 * If 1 is returned the client is still alive and valid and
1227 * and other operations can be performed by the caller. Otherwise
1228 * if 0 is returned the client was destroied (i.e. after QUIT). */
1229 static int processCommand(redisClient
*c
) {
1230 struct redisCommand
*cmd
;
1233 /* Free some memory if needed (maxmemory setting) */
1234 if (server
.maxmemory
) freeMemoryIfNeeded();
1236 /* The QUIT command is handled as a special case. Normal command
1237 * procs are unable to close the client connection safely */
1238 if (!strcasecmp(c
->argv
[0]->ptr
,"quit")) {
1242 cmd
= lookupCommand(c
->argv
[0]->ptr
);
1244 addReplySds(c
,sdsnew("-ERR unknown command\r\n"));
1247 } else if ((cmd
->arity
> 0 && cmd
->arity
!= c
->argc
) ||
1248 (c
->argc
< -cmd
->arity
)) {
1249 addReplySds(c
,sdsnew("-ERR wrong number of arguments\r\n"));
1252 } else if (server
.maxmemory
&& cmd
->flags
& REDIS_CMD_DENYOOM
&& zmalloc_used_memory() > server
.maxmemory
) {
1253 addReplySds(c
,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1256 } else if (cmd
->flags
& REDIS_CMD_BULK
&& c
->bulklen
== -1) {
1257 int bulklen
= atoi(c
->argv
[c
->argc
-1]->ptr
);
1259 decrRefCount(c
->argv
[c
->argc
-1]);
1260 if (bulklen
< 0 || bulklen
> 1024*1024*1024) {
1262 addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n"));
1267 c
->bulklen
= bulklen
+2; /* add two bytes for CR+LF */
1268 /* It is possible that the bulk read is already in the
1269 * buffer. Check this condition and handle it accordingly */
1270 if ((signed)sdslen(c
->querybuf
) >= c
->bulklen
) {
1271 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
1273 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
1278 /* Let's try to share objects on the command arguments vector */
1279 if (server
.shareobjects
) {
1281 for(j
= 1; j
< c
->argc
; j
++)
1282 c
->argv
[j
] = tryObjectSharing(c
->argv
[j
]);
1284 /* Check if the user is authenticated */
1285 if (server
.requirepass
&& !c
->authenticated
&& cmd
->proc
!= authCommand
) {
1286 addReplySds(c
,sdsnew("-ERR operation not permitted\r\n"));
1291 /* Exec the command */
1292 dirty
= server
.dirty
;
1294 if (server
.dirty
-dirty
!= 0 && listLength(server
.slaves
))
1295 replicationFeedSlaves(server
.slaves
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1296 if (listLength(server
.monitors
))
1297 replicationFeedSlaves(server
.monitors
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1298 server
.stat_numcommands
++;
1300 /* Prepare the client for the next command */
1301 if (c
->flags
& REDIS_CLOSE
) {
1309 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
) {
1313 /* (args*2)+1 is enough room for args, spaces, newlines */
1314 robj
*static_outv
[REDIS_STATIC_ARGS
*2+1];
1316 if (argc
<= REDIS_STATIC_ARGS
) {
1319 outv
= zmalloc(sizeof(robj
*)*(argc
*2+1));
1320 if (!outv
) oom("replicationFeedSlaves");
1323 for (j
= 0; j
< argc
; j
++) {
1324 if (j
!= 0) outv
[outc
++] = shared
.space
;
1325 if ((cmd
->flags
& REDIS_CMD_BULK
) && j
== argc
-1) {
1328 lenobj
= createObject(REDIS_STRING
,
1329 sdscatprintf(sdsempty(),"%d\r\n",sdslen(argv
[j
]->ptr
)));
1330 lenobj
->refcount
= 0;
1331 outv
[outc
++] = lenobj
;
1333 outv
[outc
++] = argv
[j
];
1335 outv
[outc
++] = shared
.crlf
;
1337 /* Increment all the refcounts at start and decrement at end in order to
1338 * be sure to free objects if there is no slave in a replication state
1339 * able to be feed with commands */
1340 for (j
= 0; j
< outc
; j
++) incrRefCount(outv
[j
]);
1342 while((ln
= listYield(slaves
))) {
1343 redisClient
*slave
= ln
->value
;
1345 /* Don't feed slaves that are still waiting for BGSAVE to start */
1346 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) continue;
1348 /* Feed all the other slaves, MONITORs and so on */
1349 if (slave
->slaveseldb
!= dictid
) {
1353 case 0: selectcmd
= shared
.select0
; break;
1354 case 1: selectcmd
= shared
.select1
; break;
1355 case 2: selectcmd
= shared
.select2
; break;
1356 case 3: selectcmd
= shared
.select3
; break;
1357 case 4: selectcmd
= shared
.select4
; break;
1358 case 5: selectcmd
= shared
.select5
; break;
1359 case 6: selectcmd
= shared
.select6
; break;
1360 case 7: selectcmd
= shared
.select7
; break;
1361 case 8: selectcmd
= shared
.select8
; break;
1362 case 9: selectcmd
= shared
.select9
; break;
1364 selectcmd
= createObject(REDIS_STRING
,
1365 sdscatprintf(sdsempty(),"select %d\r\n",dictid
));
1366 selectcmd
->refcount
= 0;
1369 addReply(slave
,selectcmd
);
1370 slave
->slaveseldb
= dictid
;
1372 for (j
= 0; j
< outc
; j
++) addReply(slave
,outv
[j
]);
1374 for (j
= 0; j
< outc
; j
++) decrRefCount(outv
[j
]);
1375 if (outv
!= static_outv
) zfree(outv
);
1378 static void readQueryFromClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1379 redisClient
*c
= (redisClient
*) privdata
;
1380 char buf
[REDIS_IOBUF_LEN
];
1383 REDIS_NOTUSED(mask
);
1385 nread
= read(fd
, buf
, REDIS_IOBUF_LEN
);
1387 if (errno
== EAGAIN
) {
1390 redisLog(REDIS_DEBUG
, "Reading from client: %s",strerror(errno
));
1394 } else if (nread
== 0) {
1395 redisLog(REDIS_DEBUG
, "Client closed connection");
1400 c
->querybuf
= sdscatlen(c
->querybuf
, buf
, nread
);
1401 c
->lastinteraction
= time(NULL
);
1407 if (c
->bulklen
== -1) {
1408 /* Read the first line of the query */
1409 char *p
= strchr(c
->querybuf
,'\n');
1415 query
= c
->querybuf
;
1416 c
->querybuf
= sdsempty();
1417 querylen
= 1+(p
-(query
));
1418 if (sdslen(query
) > querylen
) {
1419 /* leave data after the first line of the query in the buffer */
1420 c
->querybuf
= sdscatlen(c
->querybuf
,query
+querylen
,sdslen(query
)-querylen
);
1422 *p
= '\0'; /* remove "\n" */
1423 if (*(p
-1) == '\r') *(p
-1) = '\0'; /* and "\r" if any */
1424 sdsupdatelen(query
);
1426 /* Now we can split the query in arguments */
1427 if (sdslen(query
) == 0) {
1428 /* Ignore empty query */
1432 argv
= sdssplitlen(query
,sdslen(query
)," ",1,&argc
);
1433 if (argv
== NULL
) oom("sdssplitlen");
1436 if (c
->argv
) zfree(c
->argv
);
1437 c
->argv
= zmalloc(sizeof(robj
*)*argc
);
1438 if (c
->argv
== NULL
) oom("allocating arguments list for client");
1440 for (j
= 0; j
< argc
; j
++) {
1441 if (sdslen(argv
[j
])) {
1442 c
->argv
[c
->argc
] = createObject(REDIS_STRING
,argv
[j
]);
1449 /* Execute the command. If the client is still valid
1450 * after processCommand() return and there is something
1451 * on the query buffer try to process the next command. */
1452 if (processCommand(c
) && sdslen(c
->querybuf
)) goto again
;
1454 } else if (sdslen(c
->querybuf
) >= 1024*32) {
1455 redisLog(REDIS_DEBUG
, "Client protocol error");
1460 /* Bulk read handling. Note that if we are at this point
1461 the client already sent a command terminated with a newline,
1462 we are reading the bulk data that is actually the last
1463 argument of the command. */
1464 int qbl
= sdslen(c
->querybuf
);
1466 if (c
->bulklen
<= qbl
) {
1467 /* Copy everything but the final CRLF as final argument */
1468 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
1470 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
1477 static int selectDb(redisClient
*c
, int id
) {
1478 if (id
< 0 || id
>= server
.dbnum
)
1480 c
->db
= &server
.db
[id
];
1484 static void *dupClientReplyValue(void *o
) {
1485 incrRefCount((robj
*)o
);
1489 static redisClient
*createClient(int fd
) {
1490 redisClient
*c
= zmalloc(sizeof(*c
));
1492 anetNonBlock(NULL
,fd
);
1493 anetTcpNoDelay(NULL
,fd
);
1494 if (!c
) return NULL
;
1497 c
->querybuf
= sdsempty();
1503 c
->lastinteraction
= time(NULL
);
1504 c
->authenticated
= 0;
1505 c
->replstate
= REDIS_REPL_NONE
;
1506 if ((c
->reply
= listCreate()) == NULL
) oom("listCreate");
1507 listSetFreeMethod(c
->reply
,decrRefCount
);
1508 listSetDupMethod(c
->reply
,dupClientReplyValue
);
1509 if (aeCreateFileEvent(server
.el
, c
->fd
, AE_READABLE
,
1510 readQueryFromClient
, c
, NULL
) == AE_ERR
) {
1514 if (!listAddNodeTail(server
.clients
,c
)) oom("listAddNodeTail");
1518 static void addReply(redisClient
*c
, robj
*obj
) {
1519 if (listLength(c
->reply
) == 0 &&
1520 (c
->replstate
== REDIS_REPL_NONE
||
1521 c
->replstate
== REDIS_REPL_ONLINE
) &&
1522 aeCreateFileEvent(server
.el
, c
->fd
, AE_WRITABLE
,
1523 sendReplyToClient
, c
, NULL
) == AE_ERR
) return;
1524 if (!listAddNodeTail(c
->reply
,obj
)) oom("listAddNodeTail");
1528 static void addReplySds(redisClient
*c
, sds s
) {
1529 robj
*o
= createObject(REDIS_STRING
,s
);
1534 static void acceptHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1539 REDIS_NOTUSED(mask
);
1540 REDIS_NOTUSED(privdata
);
1542 cfd
= anetAccept(server
.neterr
, fd
, cip
, &cport
);
1543 if (cfd
== AE_ERR
) {
1544 redisLog(REDIS_DEBUG
,"Accepting client connection: %s", server
.neterr
);
1547 redisLog(REDIS_DEBUG
,"Accepted %s:%d", cip
, cport
);
1548 if ((c
= createClient(cfd
)) == NULL
) {
1549 redisLog(REDIS_WARNING
,"Error allocating resoures for the client");
1550 close(cfd
); /* May be already closed, just ingore errors */
1553 /* If maxclient directive is set and this is one client more... close the
1554 * connection. Note that we create the client instead to check before
1555 * for this condition, since now the socket is already set in nonblocking
1556 * mode and we can send an error for free using the Kernel I/O */
1557 if (server
.maxclients
&& listLength(server
.clients
) > server
.maxclients
) {
1558 char *err
= "-ERR max number of clients reached\r\n";
1560 /* That's a best effort error message, don't check write errors */
1561 (void) write(c
->fd
,err
,strlen(err
));
1565 server
.stat_numconnections
++;
1568 /* ======================= Redis objects implementation ===================== */
1570 static robj
*createObject(int type
, void *ptr
) {
1573 if (listLength(server
.objfreelist
)) {
1574 listNode
*head
= listFirst(server
.objfreelist
);
1575 o
= listNodeValue(head
);
1576 listDelNode(server
.objfreelist
,head
);
1578 o
= zmalloc(sizeof(*o
));
1580 if (!o
) oom("createObject");
1587 static robj
*createStringObject(char *ptr
, size_t len
) {
1588 return createObject(REDIS_STRING
,sdsnewlen(ptr
,len
));
1591 static robj
*createListObject(void) {
1592 list
*l
= listCreate();
1594 if (!l
) oom("listCreate");
1595 listSetFreeMethod(l
,decrRefCount
);
1596 return createObject(REDIS_LIST
,l
);
1599 static robj
*createSetObject(void) {
1600 dict
*d
= dictCreate(&setDictType
,NULL
);
1601 if (!d
) oom("dictCreate");
1602 return createObject(REDIS_SET
,d
);
1605 static void freeStringObject(robj
*o
) {
1609 static void freeListObject(robj
*o
) {
1610 listRelease((list
*) o
->ptr
);
1613 static void freeSetObject(robj
*o
) {
1614 dictRelease((dict
*) o
->ptr
);
1617 static void freeHashObject(robj
*o
) {
1618 dictRelease((dict
*) o
->ptr
);
1621 static void incrRefCount(robj
*o
) {
1623 #ifdef DEBUG_REFCOUNT
1624 if (o
->type
== REDIS_STRING
)
1625 printf("Increment '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
);
1629 static void decrRefCount(void *obj
) {
1632 #ifdef DEBUG_REFCOUNT
1633 if (o
->type
== REDIS_STRING
)
1634 printf("Decrement '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
-1);
1636 if (--(o
->refcount
) == 0) {
1638 case REDIS_STRING
: freeStringObject(o
); break;
1639 case REDIS_LIST
: freeListObject(o
); break;
1640 case REDIS_SET
: freeSetObject(o
); break;
1641 case REDIS_HASH
: freeHashObject(o
); break;
1642 default: assert(0 != 0); break;
1644 if (listLength(server
.objfreelist
) > REDIS_OBJFREELIST_MAX
||
1645 !listAddNodeHead(server
.objfreelist
,o
))
1650 /* Try to share an object against the shared objects pool */
1651 static robj
*tryObjectSharing(robj
*o
) {
1652 struct dictEntry
*de
;
1655 if (o
== NULL
|| server
.shareobjects
== 0) return o
;
1657 assert(o
->type
== REDIS_STRING
);
1658 de
= dictFind(server
.sharingpool
,o
);
1660 robj
*shared
= dictGetEntryKey(de
);
1662 c
= ((unsigned long) dictGetEntryVal(de
))+1;
1663 dictGetEntryVal(de
) = (void*) c
;
1664 incrRefCount(shared
);
1668 /* Here we are using a stream algorihtm: Every time an object is
1669 * shared we increment its count, everytime there is a miss we
1670 * recrement the counter of a random object. If this object reaches
1671 * zero we remove the object and put the current object instead. */
1672 if (dictSize(server
.sharingpool
) >=
1673 server
.sharingpoolsize
) {
1674 de
= dictGetRandomKey(server
.sharingpool
);
1676 c
= ((unsigned long) dictGetEntryVal(de
))-1;
1677 dictGetEntryVal(de
) = (void*) c
;
1679 dictDelete(server
.sharingpool
,de
->key
);
1682 c
= 0; /* If the pool is empty we want to add this object */
1687 retval
= dictAdd(server
.sharingpool
,o
,(void*)1);
1688 assert(retval
== DICT_OK
);
1695 static robj
*lookupKey(redisDb
*db
, robj
*key
) {
1696 dictEntry
*de
= dictFind(db
->dict
,key
);
1697 return de
? dictGetEntryVal(de
) : NULL
;
1700 static robj
*lookupKeyRead(redisDb
*db
, robj
*key
) {
1701 expireIfNeeded(db
,key
);
1702 return lookupKey(db
,key
);
1705 static robj
*lookupKeyWrite(redisDb
*db
, robj
*key
) {
1706 deleteIfVolatile(db
,key
);
1707 return lookupKey(db
,key
);
1710 static int deleteKey(redisDb
*db
, robj
*key
) {
1713 /* We need to protect key from destruction: after the first dictDelete()
1714 * it may happen that 'key' is no longer valid if we don't increment
1715 * it's count. This may happen when we get the object reference directly
1716 * from the hash table with dictRandomKey() or dict iterators */
1718 if (dictSize(db
->expires
)) dictDelete(db
->expires
,key
);
1719 retval
= dictDelete(db
->dict
,key
);
1722 return retval
== DICT_OK
;
1725 /*============================ DB saving/loading ============================ */
1727 static int rdbSaveType(FILE *fp
, unsigned char type
) {
1728 if (fwrite(&type
,1,1,fp
) == 0) return -1;
1732 static int rdbSaveTime(FILE *fp
, time_t t
) {
1733 int32_t t32
= (int32_t) t
;
1734 if (fwrite(&t32
,4,1,fp
) == 0) return -1;
1738 /* check rdbLoadLen() comments for more info */
1739 static int rdbSaveLen(FILE *fp
, uint32_t len
) {
1740 unsigned char buf
[2];
1743 /* Save a 6 bit len */
1744 buf
[0] = (len
&0xFF)|(REDIS_RDB_6BITLEN
<<6);
1745 if (fwrite(buf
,1,1,fp
) == 0) return -1;
1746 } else if (len
< (1<<14)) {
1747 /* Save a 14 bit len */
1748 buf
[0] = ((len
>>8)&0xFF)|(REDIS_RDB_14BITLEN
<<6);
1750 if (fwrite(buf
,2,1,fp
) == 0) return -1;
1752 /* Save a 32 bit len */
1753 buf
[0] = (REDIS_RDB_32BITLEN
<<6);
1754 if (fwrite(buf
,1,1,fp
) == 0) return -1;
1756 if (fwrite(&len
,4,1,fp
) == 0) return -1;
1761 /* String objects in the form "2391" "-100" without any space and with a
1762 * range of values that can fit in an 8, 16 or 32 bit signed value can be
1763 * encoded as integers to save space */
1764 int rdbTryIntegerEncoding(sds s
, unsigned char *enc
) {
1766 char *endptr
, buf
[32];
1768 /* Check if it's possible to encode this value as a number */
1769 value
= strtoll(s
, &endptr
, 10);
1770 if (endptr
[0] != '\0') return 0;
1771 snprintf(buf
,32,"%lld",value
);
1773 /* If the number converted back into a string is not identical
1774 * then it's not possible to encode the string as integer */
1775 if (strlen(buf
) != sdslen(s
) || memcmp(buf
,s
,sdslen(s
))) return 0;
1777 /* Finally check if it fits in our ranges */
1778 if (value
>= -(1<<7) && value
<= (1<<7)-1) {
1779 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT8
;
1780 enc
[1] = value
&0xFF;
1782 } else if (value
>= -(1<<15) && value
<= (1<<15)-1) {
1783 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT16
;
1784 enc
[1] = value
&0xFF;
1785 enc
[2] = (value
>>8)&0xFF;
1787 } else if (value
>= -((long long)1<<31) && value
<= ((long long)1<<31)-1) {
1788 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT32
;
1789 enc
[1] = value
&0xFF;
1790 enc
[2] = (value
>>8)&0xFF;
1791 enc
[3] = (value
>>16)&0xFF;
1792 enc
[4] = (value
>>24)&0xFF;
1799 static int rdbSaveLzfStringObject(FILE *fp
, robj
*obj
) {
1800 unsigned int comprlen
, outlen
;
1804 /* We require at least four bytes compression for this to be worth it */
1805 outlen
= sdslen(obj
->ptr
)-4;
1806 if (outlen
<= 0) return 0;
1807 if ((out
= zmalloc(outlen
+1)) == NULL
) return 0;
1808 comprlen
= lzf_compress(obj
->ptr
, sdslen(obj
->ptr
), out
, outlen
);
1809 if (comprlen
== 0) {
1813 /* Data compressed! Let's save it on disk */
1814 byte
= (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_LZF
;
1815 if (fwrite(&byte
,1,1,fp
) == 0) goto writeerr
;
1816 if (rdbSaveLen(fp
,comprlen
) == -1) goto writeerr
;
1817 if (rdbSaveLen(fp
,sdslen(obj
->ptr
)) == -1) goto writeerr
;
1818 if (fwrite(out
,comprlen
,1,fp
) == 0) goto writeerr
;
1827 /* Save a string objet as [len][data] on disk. If the object is a string
1828 * representation of an integer value we try to safe it in a special form */
1829 static int rdbSaveStringObject(FILE *fp
, robj
*obj
) {
1830 size_t len
= sdslen(obj
->ptr
);
1833 /* Try integer encoding */
1835 unsigned char buf
[5];
1836 if ((enclen
= rdbTryIntegerEncoding(obj
->ptr
,buf
)) > 0) {
1837 if (fwrite(buf
,enclen
,1,fp
) == 0) return -1;
1842 /* Try LZF compression - under 20 bytes it's unable to compress even
1843 * aaaaaaaaaaaaaaaaaa so skip it */
1844 if (1 && len
> 20) {
1847 retval
= rdbSaveLzfStringObject(fp
,obj
);
1848 if (retval
== -1) return -1;
1849 if (retval
> 0) return 0;
1850 /* retval == 0 means data can't be compressed, save the old way */
1853 /* Store verbatim */
1854 if (rdbSaveLen(fp
,len
) == -1) return -1;
1855 if (len
&& fwrite(obj
->ptr
,len
,1,fp
) == 0) return -1;
1859 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
1860 static int rdbSave(char *filename
) {
1861 dictIterator
*di
= NULL
;
1866 time_t now
= time(NULL
);
1868 snprintf(tmpfile
,256,"temp-%d.%ld.rdb",(int)time(NULL
),(long int)random());
1869 fp
= fopen(tmpfile
,"w");
1871 redisLog(REDIS_WARNING
, "Failed saving the DB: %s", strerror(errno
));
1874 if (fwrite("REDIS0001",9,1,fp
) == 0) goto werr
;
1875 for (j
= 0; j
< server
.dbnum
; j
++) {
1876 redisDb
*db
= server
.db
+j
;
1878 if (dictSize(d
) == 0) continue;
1879 di
= dictGetIterator(d
);
1885 /* Write the SELECT DB opcode */
1886 if (rdbSaveType(fp
,REDIS_SELECTDB
) == -1) goto werr
;
1887 if (rdbSaveLen(fp
,j
) == -1) goto werr
;
1889 /* Iterate this DB writing every entry */
1890 while((de
= dictNext(di
)) != NULL
) {
1891 robj
*key
= dictGetEntryKey(de
);
1892 robj
*o
= dictGetEntryVal(de
);
1893 time_t expiretime
= getExpire(db
,key
);
1895 /* Save the expire time */
1896 if (expiretime
!= -1) {
1897 /* If this key is already expired skip it */
1898 if (expiretime
< now
) continue;
1899 if (rdbSaveType(fp
,REDIS_EXPIRETIME
) == -1) goto werr
;
1900 if (rdbSaveTime(fp
,expiretime
) == -1) goto werr
;
1902 /* Save the key and associated value */
1903 if (rdbSaveType(fp
,o
->type
) == -1) goto werr
;
1904 if (rdbSaveStringObject(fp
,key
) == -1) goto werr
;
1905 if (o
->type
== REDIS_STRING
) {
1906 /* Save a string value */
1907 if (rdbSaveStringObject(fp
,o
) == -1) goto werr
;
1908 } else if (o
->type
== REDIS_LIST
) {
1909 /* Save a list value */
1910 list
*list
= o
->ptr
;
1914 if (rdbSaveLen(fp
,listLength(list
)) == -1) goto werr
;
1915 while((ln
= listYield(list
))) {
1916 robj
*eleobj
= listNodeValue(ln
);
1918 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
1920 } else if (o
->type
== REDIS_SET
) {
1921 /* Save a set value */
1923 dictIterator
*di
= dictGetIterator(set
);
1926 if (!set
) oom("dictGetIteraotr");
1927 if (rdbSaveLen(fp
,dictSize(set
)) == -1) goto werr
;
1928 while((de
= dictNext(di
)) != NULL
) {
1929 robj
*eleobj
= dictGetEntryKey(de
);
1931 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
1933 dictReleaseIterator(di
);
1938 dictReleaseIterator(di
);
1941 if (rdbSaveType(fp
,REDIS_EOF
) == -1) goto werr
;
1943 /* Make sure data will not remain on the OS's output buffers */
1948 /* Use RENAME to make sure the DB file is changed atomically only
1949 * if the generate DB file is ok. */
1950 if (rename(tmpfile
,filename
) == -1) {
1951 redisLog(REDIS_WARNING
,"Error moving temp DB file on the final destionation: %s", strerror(errno
));
1955 redisLog(REDIS_NOTICE
,"DB saved on disk");
1957 server
.lastsave
= time(NULL
);
1963 redisLog(REDIS_WARNING
,"Write error saving DB on disk: %s", strerror(errno
));
1964 if (di
) dictReleaseIterator(di
);
1968 static int rdbSaveBackground(char *filename
) {
1971 if (server
.bgsaveinprogress
) return REDIS_ERR
;
1972 if ((childpid
= fork()) == 0) {
1975 if (rdbSave(filename
) == REDIS_OK
) {
1982 if (childpid
== -1) {
1983 redisLog(REDIS_WARNING
,"Can't save in background: fork: %s",
1987 redisLog(REDIS_NOTICE
,"Background saving started by pid %d",childpid
);
1988 server
.bgsaveinprogress
= 1;
1991 return REDIS_OK
; /* unreached */
1994 static int rdbLoadType(FILE *fp
) {
1996 if (fread(&type
,1,1,fp
) == 0) return -1;
2000 static time_t rdbLoadTime(FILE *fp
) {
2002 if (fread(&t32
,4,1,fp
) == 0) return -1;
2003 return (time_t) t32
;
2006 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2007 * of this file for a description of how this are stored on disk.
2009 * isencoded is set to 1 if the readed length is not actually a length but
2010 * an "encoding type", check the above comments for more info */
2011 static uint32_t rdbLoadLen(FILE *fp
, int rdbver
, int *isencoded
) {
2012 unsigned char buf
[2];
2015 if (isencoded
) *isencoded
= 0;
2017 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
2022 if (fread(buf
,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2023 type
= (buf
[0]&0xC0)>>6;
2024 if (type
== REDIS_RDB_6BITLEN
) {
2025 /* Read a 6 bit len */
2027 } else if (type
== REDIS_RDB_ENCVAL
) {
2028 /* Read a 6 bit len encoding type */
2029 if (isencoded
) *isencoded
= 1;
2031 } else if (type
== REDIS_RDB_14BITLEN
) {
2032 /* Read a 14 bit len */
2033 if (fread(buf
+1,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2034 return ((buf
[0]&0x3F)<<8)|buf
[1];
2036 /* Read a 32 bit len */
2037 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
2043 static robj
*rdbLoadIntegerObject(FILE *fp
, int enctype
) {
2044 unsigned char enc
[4];
2047 if (enctype
== REDIS_RDB_ENC_INT8
) {
2048 if (fread(enc
,1,1,fp
) == 0) return NULL
;
2049 val
= (signed char)enc
[0];
2050 } else if (enctype
== REDIS_RDB_ENC_INT16
) {
2052 if (fread(enc
,2,1,fp
) == 0) return NULL
;
2053 v
= enc
[0]|(enc
[1]<<8);
2055 } else if (enctype
== REDIS_RDB_ENC_INT32
) {
2057 if (fread(enc
,4,1,fp
) == 0) return NULL
;
2058 v
= enc
[0]|(enc
[1]<<8)|(enc
[2]<<16)|(enc
[3]<<24);
2061 val
= 0; /* anti-warning */
2064 return createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",val
));
2067 static robj
*rdbLoadLzfStringObject(FILE*fp
, int rdbver
) {
2068 unsigned int len
, clen
;
2069 unsigned char *c
= NULL
;
2072 if ((clen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2073 if ((len
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2074 if ((c
= zmalloc(clen
)) == NULL
) goto err
;
2075 if ((val
= sdsnewlen(NULL
,len
)) == NULL
) goto err
;
2076 if (fread(c
,clen
,1,fp
) == 0) goto err
;
2077 if (lzf_decompress(c
,clen
,val
,len
) == 0) goto err
;
2079 return createObject(REDIS_STRING
,val
);
2086 static robj
*rdbLoadStringObject(FILE*fp
, int rdbver
) {
2091 len
= rdbLoadLen(fp
,rdbver
,&isencoded
);
2094 case REDIS_RDB_ENC_INT8
:
2095 case REDIS_RDB_ENC_INT16
:
2096 case REDIS_RDB_ENC_INT32
:
2097 return tryObjectSharing(rdbLoadIntegerObject(fp
,len
));
2098 case REDIS_RDB_ENC_LZF
:
2099 return tryObjectSharing(rdbLoadLzfStringObject(fp
,rdbver
));
2105 if (len
== REDIS_RDB_LENERR
) return NULL
;
2106 val
= sdsnewlen(NULL
,len
);
2107 if (len
&& fread(val
,len
,1,fp
) == 0) {
2111 return tryObjectSharing(createObject(REDIS_STRING
,val
));
2114 static int rdbLoad(char *filename
) {
2116 robj
*keyobj
= NULL
;
2118 int type
, retval
, rdbver
;
2119 dict
*d
= server
.db
[0].dict
;
2120 redisDb
*db
= server
.db
+0;
2122 time_t expiretime
= -1, now
= time(NULL
);
2124 fp
= fopen(filename
,"r");
2125 if (!fp
) return REDIS_ERR
;
2126 if (fread(buf
,9,1,fp
) == 0) goto eoferr
;
2128 if (memcmp(buf
,"REDIS",5) != 0) {
2130 redisLog(REDIS_WARNING
,"Wrong signature trying to load DB from file");
2133 rdbver
= atoi(buf
+5);
2136 redisLog(REDIS_WARNING
,"Can't handle RDB format version %d",rdbver
);
2143 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
2144 if (type
== REDIS_EXPIRETIME
) {
2145 if ((expiretime
= rdbLoadTime(fp
)) == -1) goto eoferr
;
2146 /* We read the time so we need to read the object type again */
2147 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
2149 if (type
== REDIS_EOF
) break;
2150 /* Handle SELECT DB opcode as a special case */
2151 if (type
== REDIS_SELECTDB
) {
2152 if ((dbid
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2154 if (dbid
>= (unsigned)server
.dbnum
) {
2155 redisLog(REDIS_WARNING
,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server
.dbnum
);
2158 db
= server
.db
+dbid
;
2163 if ((keyobj
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2165 if (type
== REDIS_STRING
) {
2166 /* Read string value */
2167 if ((o
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2168 } else if (type
== REDIS_LIST
|| type
== REDIS_SET
) {
2169 /* Read list/set value */
2172 if ((listlen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2174 o
= (type
== REDIS_LIST
) ? createListObject() : createSetObject();
2175 /* Load every single element of the list/set */
2179 if ((ele
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2180 if (type
== REDIS_LIST
) {
2181 if (!listAddNodeTail((list
*)o
->ptr
,ele
))
2182 oom("listAddNodeTail");
2184 if (dictAdd((dict
*)o
->ptr
,ele
,NULL
) == DICT_ERR
)
2191 /* Add the new object in the hash table */
2192 retval
= dictAdd(d
,keyobj
,o
);
2193 if (retval
== DICT_ERR
) {
2194 redisLog(REDIS_WARNING
,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj
->ptr
);
2197 /* Set the expire time if needed */
2198 if (expiretime
!= -1) {
2199 setExpire(db
,keyobj
,expiretime
);
2200 /* Delete this key if already expired */
2201 if (expiretime
< now
) deleteKey(db
,keyobj
);
2209 eoferr
: /* unexpected end of file is handled here with a fatal exit */
2210 if (keyobj
) decrRefCount(keyobj
);
2211 redisLog(REDIS_WARNING
,"Short read or OOM loading DB. Unrecoverable error, exiting now.");
2213 return REDIS_ERR
; /* Just to avoid warning */
2216 /*================================== Commands =============================== */
2218 static void authCommand(redisClient
*c
) {
2219 if (!server
.requirepass
|| !strcmp(c
->argv
[1]->ptr
, server
.requirepass
)) {
2220 c
->authenticated
= 1;
2221 addReply(c
,shared
.ok
);
2223 c
->authenticated
= 0;
2224 addReply(c
,shared
.err
);
2228 static void pingCommand(redisClient
*c
) {
2229 addReply(c
,shared
.pong
);
2232 static void echoCommand(redisClient
*c
) {
2233 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",
2234 (int)sdslen(c
->argv
[1]->ptr
)));
2235 addReply(c
,c
->argv
[1]);
2236 addReply(c
,shared
.crlf
);
2239 /*=================================== Strings =============================== */
2241 static void setGenericCommand(redisClient
*c
, int nx
) {
2244 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2245 if (retval
== DICT_ERR
) {
2247 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2248 incrRefCount(c
->argv
[2]);
2250 addReply(c
,shared
.czero
);
2254 incrRefCount(c
->argv
[1]);
2255 incrRefCount(c
->argv
[2]);
2258 removeExpire(c
->db
,c
->argv
[1]);
2259 addReply(c
, nx
? shared
.cone
: shared
.ok
);
2262 static void setCommand(redisClient
*c
) {
2263 setGenericCommand(c
,0);
2266 static void setnxCommand(redisClient
*c
) {
2267 setGenericCommand(c
,1);
2270 static void getCommand(redisClient
*c
) {
2271 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2274 addReply(c
,shared
.nullbulk
);
2276 if (o
->type
!= REDIS_STRING
) {
2277 addReply(c
,shared
.wrongtypeerr
);
2279 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(o
->ptr
)));
2281 addReply(c
,shared
.crlf
);
2286 static void getSetCommand(redisClient
*c
) {
2288 if (dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]) == DICT_ERR
) {
2289 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2291 incrRefCount(c
->argv
[1]);
2293 incrRefCount(c
->argv
[2]);
2295 removeExpire(c
->db
,c
->argv
[1]);
2298 static void mgetCommand(redisClient
*c
) {
2301 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",c
->argc
-1));
2302 for (j
= 1; j
< c
->argc
; j
++) {
2303 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[j
]);
2305 addReply(c
,shared
.nullbulk
);
2307 if (o
->type
!= REDIS_STRING
) {
2308 addReply(c
,shared
.nullbulk
);
2310 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(o
->ptr
)));
2312 addReply(c
,shared
.crlf
);
2318 static void incrDecrCommand(redisClient
*c
, long long incr
) {
2323 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2327 if (o
->type
!= REDIS_STRING
) {
2332 value
= strtoll(o
->ptr
, &eptr
, 10);
2337 o
= createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",value
));
2338 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],o
);
2339 if (retval
== DICT_ERR
) {
2340 dictReplace(c
->db
->dict
,c
->argv
[1],o
);
2341 removeExpire(c
->db
,c
->argv
[1]);
2343 incrRefCount(c
->argv
[1]);
2346 addReply(c
,shared
.colon
);
2348 addReply(c
,shared
.crlf
);
2351 static void incrCommand(redisClient
*c
) {
2352 incrDecrCommand(c
,1);
2355 static void decrCommand(redisClient
*c
) {
2356 incrDecrCommand(c
,-1);
2359 static void incrbyCommand(redisClient
*c
) {
2360 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
2361 incrDecrCommand(c
,incr
);
2364 static void decrbyCommand(redisClient
*c
) {
2365 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
2366 incrDecrCommand(c
,-incr
);
2369 /* ========================= Type agnostic commands ========================= */
2371 static void delCommand(redisClient
*c
) {
2374 for (j
= 1; j
< c
->argc
; j
++) {
2375 if (deleteKey(c
->db
,c
->argv
[j
])) {
2382 addReply(c
,shared
.czero
);
2385 addReply(c
,shared
.cone
);
2388 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",deleted
));
2393 static void existsCommand(redisClient
*c
) {
2394 addReply(c
,lookupKeyRead(c
->db
,c
->argv
[1]) ? shared
.cone
: shared
.czero
);
2397 static void selectCommand(redisClient
*c
) {
2398 int id
= atoi(c
->argv
[1]->ptr
);
2400 if (selectDb(c
,id
) == REDIS_ERR
) {
2401 addReplySds(c
,sdsnew("-ERR invalid DB index\r\n"));
2403 addReply(c
,shared
.ok
);
2407 static void randomkeyCommand(redisClient
*c
) {
2411 de
= dictGetRandomKey(c
->db
->dict
);
2412 if (!de
|| expireIfNeeded(c
->db
,dictGetEntryKey(de
)) == 0) break;
2415 addReply(c
,shared
.plus
);
2416 addReply(c
,shared
.crlf
);
2418 addReply(c
,shared
.plus
);
2419 addReply(c
,dictGetEntryKey(de
));
2420 addReply(c
,shared
.crlf
);
2424 static void keysCommand(redisClient
*c
) {
2427 sds pattern
= c
->argv
[1]->ptr
;
2428 int plen
= sdslen(pattern
);
2429 int numkeys
= 0, keyslen
= 0;
2430 robj
*lenobj
= createObject(REDIS_STRING
,NULL
);
2432 di
= dictGetIterator(c
->db
->dict
);
2433 if (!di
) oom("dictGetIterator");
2435 decrRefCount(lenobj
);
2436 while((de
= dictNext(di
)) != NULL
) {
2437 robj
*keyobj
= dictGetEntryKey(de
);
2439 sds key
= keyobj
->ptr
;
2440 if ((pattern
[0] == '*' && pattern
[1] == '\0') ||
2441 stringmatchlen(pattern
,plen
,key
,sdslen(key
),0)) {
2442 if (expireIfNeeded(c
->db
,keyobj
) == 0) {
2444 addReply(c
,shared
.space
);
2447 keyslen
+= sdslen(key
);
2451 dictReleaseIterator(di
);
2452 lenobj
->ptr
= sdscatprintf(sdsempty(),"$%lu\r\n",keyslen
+(numkeys
? (numkeys
-1) : 0));
2453 addReply(c
,shared
.crlf
);
2456 static void dbsizeCommand(redisClient
*c
) {
2458 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c
->db
->dict
)));
2461 static void lastsaveCommand(redisClient
*c
) {
2463 sdscatprintf(sdsempty(),":%lu\r\n",server
.lastsave
));
2466 static void typeCommand(redisClient
*c
) {
2470 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2475 case REDIS_STRING
: type
= "+string"; break;
2476 case REDIS_LIST
: type
= "+list"; break;
2477 case REDIS_SET
: type
= "+set"; break;
2478 default: type
= "unknown"; break;
2481 addReplySds(c
,sdsnew(type
));
2482 addReply(c
,shared
.crlf
);
2485 static void saveCommand(redisClient
*c
) {
2486 if (server
.bgsaveinprogress
) {
2487 addReplySds(c
,sdsnew("-ERR background save in progress\r\n"));
2490 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
2491 addReply(c
,shared
.ok
);
2493 addReply(c
,shared
.err
);
2497 static void bgsaveCommand(redisClient
*c
) {
2498 if (server
.bgsaveinprogress
) {
2499 addReplySds(c
,sdsnew("-ERR background save already in progress\r\n"));
2502 if (rdbSaveBackground(server
.dbfilename
) == REDIS_OK
) {
2503 addReply(c
,shared
.ok
);
2505 addReply(c
,shared
.err
);
2509 static void shutdownCommand(redisClient
*c
) {
2510 redisLog(REDIS_WARNING
,"User requested shutdown, saving DB...");
2511 /* XXX: TODO kill the child if there is a bgsave in progress */
2512 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
2513 if (server
.daemonize
) {
2514 unlink(server
.pidfile
);
2516 redisLog(REDIS_WARNING
,"%zu bytes used at exit",zmalloc_used_memory());
2517 redisLog(REDIS_WARNING
,"Server exit now, bye bye...");
2520 redisLog(REDIS_WARNING
,"Error trying to save the DB, can't exit");
2521 addReplySds(c
,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
2525 static void renameGenericCommand(redisClient
*c
, int nx
) {
2528 /* To use the same key as src and dst is probably an error */
2529 if (sdscmp(c
->argv
[1]->ptr
,c
->argv
[2]->ptr
) == 0) {
2530 addReply(c
,shared
.sameobjecterr
);
2534 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2536 addReply(c
,shared
.nokeyerr
);
2540 deleteIfVolatile(c
->db
,c
->argv
[2]);
2541 if (dictAdd(c
->db
->dict
,c
->argv
[2],o
) == DICT_ERR
) {
2544 addReply(c
,shared
.czero
);
2547 dictReplace(c
->db
->dict
,c
->argv
[2],o
);
2549 incrRefCount(c
->argv
[2]);
2551 deleteKey(c
->db
,c
->argv
[1]);
2553 addReply(c
,nx
? shared
.cone
: shared
.ok
);
2556 static void renameCommand(redisClient
*c
) {
2557 renameGenericCommand(c
,0);
2560 static void renamenxCommand(redisClient
*c
) {
2561 renameGenericCommand(c
,1);
2564 static void moveCommand(redisClient
*c
) {
2569 /* Obtain source and target DB pointers */
2572 if (selectDb(c
,atoi(c
->argv
[2]->ptr
)) == REDIS_ERR
) {
2573 addReply(c
,shared
.outofrangeerr
);
2577 selectDb(c
,srcid
); /* Back to the source DB */
2579 /* If the user is moving using as target the same
2580 * DB as the source DB it is probably an error. */
2582 addReply(c
,shared
.sameobjecterr
);
2586 /* Check if the element exists and get a reference */
2587 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2589 addReply(c
,shared
.czero
);
2593 /* Try to add the element to the target DB */
2594 deleteIfVolatile(dst
,c
->argv
[1]);
2595 if (dictAdd(dst
->dict
,c
->argv
[1],o
) == DICT_ERR
) {
2596 addReply(c
,shared
.czero
);
2599 incrRefCount(c
->argv
[1]);
2602 /* OK! key moved, free the entry in the source DB */
2603 deleteKey(src
,c
->argv
[1]);
2605 addReply(c
,shared
.cone
);
2608 /* =================================== Lists ================================ */
2609 static void pushGenericCommand(redisClient
*c
, int where
) {
2613 lobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2615 lobj
= createListObject();
2617 if (where
== REDIS_HEAD
) {
2618 if (!listAddNodeHead(list
,c
->argv
[2])) oom("listAddNodeHead");
2620 if (!listAddNodeTail(list
,c
->argv
[2])) oom("listAddNodeTail");
2622 dictAdd(c
->db
->dict
,c
->argv
[1],lobj
);
2623 incrRefCount(c
->argv
[1]);
2624 incrRefCount(c
->argv
[2]);
2626 if (lobj
->type
!= REDIS_LIST
) {
2627 addReply(c
,shared
.wrongtypeerr
);
2631 if (where
== REDIS_HEAD
) {
2632 if (!listAddNodeHead(list
,c
->argv
[2])) oom("listAddNodeHead");
2634 if (!listAddNodeTail(list
,c
->argv
[2])) oom("listAddNodeTail");
2636 incrRefCount(c
->argv
[2]);
2639 addReply(c
,shared
.ok
);
2642 static void lpushCommand(redisClient
*c
) {
2643 pushGenericCommand(c
,REDIS_HEAD
);
2646 static void rpushCommand(redisClient
*c
) {
2647 pushGenericCommand(c
,REDIS_TAIL
);
2650 static void llenCommand(redisClient
*c
) {
2654 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2656 addReply(c
,shared
.czero
);
2659 if (o
->type
!= REDIS_LIST
) {
2660 addReply(c
,shared
.wrongtypeerr
);
2663 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",listLength(l
)));
2668 static void lindexCommand(redisClient
*c
) {
2670 int index
= atoi(c
->argv
[2]->ptr
);
2672 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2674 addReply(c
,shared
.nullbulk
);
2676 if (o
->type
!= REDIS_LIST
) {
2677 addReply(c
,shared
.wrongtypeerr
);
2679 list
*list
= o
->ptr
;
2682 ln
= listIndex(list
, index
);
2684 addReply(c
,shared
.nullbulk
);
2686 robj
*ele
= listNodeValue(ln
);
2687 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele
->ptr
)));
2689 addReply(c
,shared
.crlf
);
2695 static void lsetCommand(redisClient
*c
) {
2697 int index
= atoi(c
->argv
[2]->ptr
);
2699 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2701 addReply(c
,shared
.nokeyerr
);
2703 if (o
->type
!= REDIS_LIST
) {
2704 addReply(c
,shared
.wrongtypeerr
);
2706 list
*list
= o
->ptr
;
2709 ln
= listIndex(list
, index
);
2711 addReply(c
,shared
.outofrangeerr
);
2713 robj
*ele
= listNodeValue(ln
);
2716 listNodeValue(ln
) = c
->argv
[3];
2717 incrRefCount(c
->argv
[3]);
2718 addReply(c
,shared
.ok
);
2725 static void popGenericCommand(redisClient
*c
, int where
) {
2728 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2730 addReply(c
,shared
.nullbulk
);
2732 if (o
->type
!= REDIS_LIST
) {
2733 addReply(c
,shared
.wrongtypeerr
);
2735 list
*list
= o
->ptr
;
2738 if (where
== REDIS_HEAD
)
2739 ln
= listFirst(list
);
2741 ln
= listLast(list
);
2744 addReply(c
,shared
.nullbulk
);
2746 robj
*ele
= listNodeValue(ln
);
2747 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele
->ptr
)));
2749 addReply(c
,shared
.crlf
);
2750 listDelNode(list
,ln
);
2757 static void lpopCommand(redisClient
*c
) {
2758 popGenericCommand(c
,REDIS_HEAD
);
2761 static void rpopCommand(redisClient
*c
) {
2762 popGenericCommand(c
,REDIS_TAIL
);
2765 static void lrangeCommand(redisClient
*c
) {
2767 int start
= atoi(c
->argv
[2]->ptr
);
2768 int end
= atoi(c
->argv
[3]->ptr
);
2770 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2772 addReply(c
,shared
.nullmultibulk
);
2774 if (o
->type
!= REDIS_LIST
) {
2775 addReply(c
,shared
.wrongtypeerr
);
2777 list
*list
= o
->ptr
;
2779 int llen
= listLength(list
);
2783 /* convert negative indexes */
2784 if (start
< 0) start
= llen
+start
;
2785 if (end
< 0) end
= llen
+end
;
2786 if (start
< 0) start
= 0;
2787 if (end
< 0) end
= 0;
2789 /* indexes sanity checks */
2790 if (start
> end
|| start
>= llen
) {
2791 /* Out of range start or start > end result in empty list */
2792 addReply(c
,shared
.emptymultibulk
);
2795 if (end
>= llen
) end
= llen
-1;
2796 rangelen
= (end
-start
)+1;
2798 /* Return the result in form of a multi-bulk reply */
2799 ln
= listIndex(list
, start
);
2800 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",rangelen
));
2801 for (j
= 0; j
< rangelen
; j
++) {
2802 ele
= listNodeValue(ln
);
2803 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele
->ptr
)));
2805 addReply(c
,shared
.crlf
);
2812 static void ltrimCommand(redisClient
*c
) {
2814 int start
= atoi(c
->argv
[2]->ptr
);
2815 int end
= atoi(c
->argv
[3]->ptr
);
2817 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2819 addReply(c
,shared
.nokeyerr
);
2821 if (o
->type
!= REDIS_LIST
) {
2822 addReply(c
,shared
.wrongtypeerr
);
2824 list
*list
= o
->ptr
;
2826 int llen
= listLength(list
);
2827 int j
, ltrim
, rtrim
;
2829 /* convert negative indexes */
2830 if (start
< 0) start
= llen
+start
;
2831 if (end
< 0) end
= llen
+end
;
2832 if (start
< 0) start
= 0;
2833 if (end
< 0) end
= 0;
2835 /* indexes sanity checks */
2836 if (start
> end
|| start
>= llen
) {
2837 /* Out of range start or start > end result in empty list */
2841 if (end
>= llen
) end
= llen
-1;
2846 /* Remove list elements to perform the trim */
2847 for (j
= 0; j
< ltrim
; j
++) {
2848 ln
= listFirst(list
);
2849 listDelNode(list
,ln
);
2851 for (j
= 0; j
< rtrim
; j
++) {
2852 ln
= listLast(list
);
2853 listDelNode(list
,ln
);
2855 addReply(c
,shared
.ok
);
2861 static void lremCommand(redisClient
*c
) {
2864 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2866 addReply(c
,shared
.nokeyerr
);
2868 if (o
->type
!= REDIS_LIST
) {
2869 addReply(c
,shared
.wrongtypeerr
);
2871 list
*list
= o
->ptr
;
2872 listNode
*ln
, *next
;
2873 int toremove
= atoi(c
->argv
[2]->ptr
);
2878 toremove
= -toremove
;
2881 ln
= fromtail
? list
->tail
: list
->head
;
2883 robj
*ele
= listNodeValue(ln
);
2885 next
= fromtail
? ln
->prev
: ln
->next
;
2886 if (sdscmp(ele
->ptr
,c
->argv
[3]->ptr
) == 0) {
2887 listDelNode(list
,ln
);
2890 if (toremove
&& removed
== toremove
) break;
2894 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",removed
));
2899 /* ==================================== Sets ================================ */
2901 static void saddCommand(redisClient
*c
) {
2904 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2906 set
= createSetObject();
2907 dictAdd(c
->db
->dict
,c
->argv
[1],set
);
2908 incrRefCount(c
->argv
[1]);
2910 if (set
->type
!= REDIS_SET
) {
2911 addReply(c
,shared
.wrongtypeerr
);
2915 if (dictAdd(set
->ptr
,c
->argv
[2],NULL
) == DICT_OK
) {
2916 incrRefCount(c
->argv
[2]);
2918 addReply(c
,shared
.cone
);
2920 addReply(c
,shared
.czero
);
2924 static void sremCommand(redisClient
*c
) {
2927 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2929 addReply(c
,shared
.czero
);
2931 if (set
->type
!= REDIS_SET
) {
2932 addReply(c
,shared
.wrongtypeerr
);
2935 if (dictDelete(set
->ptr
,c
->argv
[2]) == DICT_OK
) {
2937 addReply(c
,shared
.cone
);
2939 addReply(c
,shared
.czero
);
2944 static void smoveCommand(redisClient
*c
) {
2945 robj
*srcset
, *dstset
;
2947 srcset
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2948 dstset
= lookupKeyWrite(c
->db
,c
->argv
[2]);
2950 /* If the source key does not exist return 0, if it's of the wrong type
2952 if (srcset
== NULL
|| srcset
->type
!= REDIS_SET
) {
2953 addReply(c
, srcset
? shared
.wrongtypeerr
: shared
.czero
);
2956 /* Error if the destination key is not a set as well */
2957 if (dstset
&& dstset
->type
!= REDIS_SET
) {
2958 addReply(c
,shared
.wrongtypeerr
);
2961 /* Remove the element from the source set */
2962 if (dictDelete(srcset
->ptr
,c
->argv
[3]) == DICT_ERR
) {
2963 /* Key not found in the src set! return zero */
2964 addReply(c
,shared
.czero
);
2968 /* Add the element to the destination set */
2970 dstset
= createSetObject();
2971 dictAdd(c
->db
->dict
,c
->argv
[2],dstset
);
2972 incrRefCount(c
->argv
[2]);
2974 if (dictAdd(dstset
->ptr
,c
->argv
[3],NULL
) == DICT_OK
)
2975 incrRefCount(c
->argv
[3]);
2976 addReply(c
,shared
.cone
);
2979 static void sismemberCommand(redisClient
*c
) {
2982 set
= lookupKeyRead(c
->db
,c
->argv
[1]);
2984 addReply(c
,shared
.czero
);
2986 if (set
->type
!= REDIS_SET
) {
2987 addReply(c
,shared
.wrongtypeerr
);
2990 if (dictFind(set
->ptr
,c
->argv
[2]))
2991 addReply(c
,shared
.cone
);
2993 addReply(c
,shared
.czero
);
2997 static void scardCommand(redisClient
*c
) {
3001 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3003 addReply(c
,shared
.czero
);
3006 if (o
->type
!= REDIS_SET
) {
3007 addReply(c
,shared
.wrongtypeerr
);
3010 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",
3016 static int qsortCompareSetsByCardinality(const void *s1
, const void *s2
) {
3017 dict
**d1
= (void*) s1
, **d2
= (void*) s2
;
3019 return dictSize(*d1
)-dictSize(*d2
);
3022 static void sinterGenericCommand(redisClient
*c
, robj
**setskeys
, int setsnum
, robj
*dstkey
) {
3023 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
3026 robj
*lenobj
= NULL
, *dstset
= NULL
;
3027 int j
, cardinality
= 0;
3029 if (!dv
) oom("sinterGenericCommand");
3030 for (j
= 0; j
< setsnum
; j
++) {
3034 lookupKeyWrite(c
->db
,setskeys
[j
]) :
3035 lookupKeyRead(c
->db
,setskeys
[j
]);
3039 deleteKey(c
->db
,dstkey
);
3040 addReply(c
,shared
.ok
);
3042 addReply(c
,shared
.nullmultibulk
);
3046 if (setobj
->type
!= REDIS_SET
) {
3048 addReply(c
,shared
.wrongtypeerr
);
3051 dv
[j
] = setobj
->ptr
;
3053 /* Sort sets from the smallest to largest, this will improve our
3054 * algorithm's performace */
3055 qsort(dv
,setsnum
,sizeof(dict
*),qsortCompareSetsByCardinality
);
3057 /* The first thing we should output is the total number of elements...
3058 * since this is a multi-bulk write, but at this stage we don't know
3059 * the intersection set size, so we use a trick, append an empty object
3060 * to the output list and save the pointer to later modify it with the
3063 lenobj
= createObject(REDIS_STRING
,NULL
);
3065 decrRefCount(lenobj
);
3067 /* If we have a target key where to store the resulting set
3068 * create this key with an empty set inside */
3069 dstset
= createSetObject();
3072 /* Iterate all the elements of the first (smallest) set, and test
3073 * the element against all the other sets, if at least one set does
3074 * not include the element it is discarded */
3075 di
= dictGetIterator(dv
[0]);
3076 if (!di
) oom("dictGetIterator");
3078 while((de
= dictNext(di
)) != NULL
) {
3081 for (j
= 1; j
< setsnum
; j
++)
3082 if (dictFind(dv
[j
],dictGetEntryKey(de
)) == NULL
) break;
3084 continue; /* at least one set does not contain the member */
3085 ele
= dictGetEntryKey(de
);
3087 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(ele
->ptr
)));
3089 addReply(c
,shared
.crlf
);
3092 dictAdd(dstset
->ptr
,ele
,NULL
);
3096 dictReleaseIterator(di
);
3099 /* Store the resulting set into the target */
3100 deleteKey(c
->db
,dstkey
);
3101 dictAdd(c
->db
->dict
,dstkey
,dstset
);
3102 incrRefCount(dstkey
);
3106 lenobj
->ptr
= sdscatprintf(sdsempty(),"*%d\r\n",cardinality
);
3108 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",
3109 dictSize((dict
*)dstset
->ptr
)));
3115 static void sinterCommand(redisClient
*c
) {
3116 sinterGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
);
3119 static void sinterstoreCommand(redisClient
*c
) {
3120 sinterGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1]);
3123 #define REDIS_OP_UNION 0
3124 #define REDIS_OP_DIFF 1
3126 static void sunionDiffGenericCommand(redisClient
*c
, robj
**setskeys
, int setsnum
, robj
*dstkey
, int op
) {
3127 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
3130 robj
*dstset
= NULL
;
3131 int j
, cardinality
= 0;
3133 if (!dv
) oom("sunionDiffGenericCommand");
3134 for (j
= 0; j
< setsnum
; j
++) {
3138 lookupKeyWrite(c
->db
,setskeys
[j
]) :
3139 lookupKeyRead(c
->db
,setskeys
[j
]);
3144 if (setobj
->type
!= REDIS_SET
) {
3146 addReply(c
,shared
.wrongtypeerr
);
3149 dv
[j
] = setobj
->ptr
;
3152 /* We need a temp set object to store our union. If the dstkey
3153 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
3154 * this set object will be the resulting object to set into the target key*/
3155 dstset
= createSetObject();
3157 /* Iterate all the elements of all the sets, add every element a single
3158 * time to the result set */
3159 for (j
= 0; j
< setsnum
; j
++) {
3160 if (op
== REDIS_OP_DIFF
&& j
== 0 && !dv
[j
]) break; /* result set is empty */
3161 if (!dv
[j
]) continue; /* non existing keys are like empty sets */
3163 di
= dictGetIterator(dv
[j
]);
3164 if (!di
) oom("dictGetIterator");
3166 while((de
= dictNext(di
)) != NULL
) {
3169 /* dictAdd will not add the same element multiple times */
3170 ele
= dictGetEntryKey(de
);
3171 if (op
== REDIS_OP_UNION
|| j
== 0) {
3172 if (dictAdd(dstset
->ptr
,ele
,NULL
) == DICT_OK
) {
3176 } else if (op
== REDIS_OP_DIFF
) {
3177 if (dictDelete(dstset
->ptr
,ele
) == DICT_OK
) {
3182 dictReleaseIterator(di
);
3184 if (op
== REDIS_OP_DIFF
&& cardinality
== 0) break; /* result set is empty */
3187 /* Output the content of the resulting set, if not in STORE mode */
3189 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",cardinality
));
3190 di
= dictGetIterator(dstset
->ptr
);
3191 if (!di
) oom("dictGetIterator");
3192 while((de
= dictNext(di
)) != NULL
) {
3195 ele
= dictGetEntryKey(de
);
3196 addReplySds(c
,sdscatprintf(sdsempty(),
3197 "$%d\r\n",sdslen(ele
->ptr
)));
3199 addReply(c
,shared
.crlf
);
3201 dictReleaseIterator(di
);
3203 /* If we have a target key where to store the resulting set
3204 * create this key with the result set inside */
3205 deleteKey(c
->db
,dstkey
);
3206 dictAdd(c
->db
->dict
,dstkey
,dstset
);
3207 incrRefCount(dstkey
);
3212 decrRefCount(dstset
);
3214 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",
3215 dictSize((dict
*)dstset
->ptr
)));
3221 static void sunionCommand(redisClient
*c
) {
3222 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_UNION
);
3225 static void sunionstoreCommand(redisClient
*c
) {
3226 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_UNION
);
3229 static void sdiffCommand(redisClient
*c
) {
3230 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_DIFF
);
3233 static void sdiffstoreCommand(redisClient
*c
) {
3234 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_DIFF
);
3237 static void flushdbCommand(redisClient
*c
) {
3238 server
.dirty
+= dictSize(c
->db
->dict
);
3239 dictEmpty(c
->db
->dict
);
3240 dictEmpty(c
->db
->expires
);
3241 addReply(c
,shared
.ok
);
3244 static void flushallCommand(redisClient
*c
) {
3245 server
.dirty
+= emptyDb();
3246 addReply(c
,shared
.ok
);
3247 rdbSave(server
.dbfilename
);
3251 redisSortOperation
*createSortOperation(int type
, robj
*pattern
) {
3252 redisSortOperation
*so
= zmalloc(sizeof(*so
));
3253 if (!so
) oom("createSortOperation");
3255 so
->pattern
= pattern
;
3259 /* Return the value associated to the key with a name obtained
3260 * substituting the first occurence of '*' in 'pattern' with 'subst' */
3261 robj
*lookupKeyByPattern(redisDb
*db
, robj
*pattern
, robj
*subst
) {
3265 int prefixlen
, sublen
, postfixlen
;
3266 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
3270 char buf
[REDIS_SORTKEY_MAX
+1];
3273 spat
= pattern
->ptr
;
3275 if (sdslen(spat
)+sdslen(ssub
)-1 > REDIS_SORTKEY_MAX
) return NULL
;
3276 p
= strchr(spat
,'*');
3277 if (!p
) return NULL
;
3280 sublen
= sdslen(ssub
);
3281 postfixlen
= sdslen(spat
)-(prefixlen
+1);
3282 memcpy(keyname
.buf
,spat
,prefixlen
);
3283 memcpy(keyname
.buf
+prefixlen
,ssub
,sublen
);
3284 memcpy(keyname
.buf
+prefixlen
+sublen
,p
+1,postfixlen
);
3285 keyname
.buf
[prefixlen
+sublen
+postfixlen
] = '\0';
3286 keyname
.len
= prefixlen
+sublen
+postfixlen
;
3288 keyobj
.refcount
= 1;
3289 keyobj
.type
= REDIS_STRING
;
3290 keyobj
.ptr
= ((char*)&keyname
)+(sizeof(long)*2);
3292 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
3293 return lookupKeyRead(db
,&keyobj
);
3296 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
3297 * the additional parameter is not standard but a BSD-specific we have to
3298 * pass sorting parameters via the global 'server' structure */
3299 static int sortCompare(const void *s1
, const void *s2
) {
3300 const redisSortObject
*so1
= s1
, *so2
= s2
;
3303 if (!server
.sort_alpha
) {
3304 /* Numeric sorting. Here it's trivial as we precomputed scores */
3305 if (so1
->u
.score
> so2
->u
.score
) {
3307 } else if (so1
->u
.score
< so2
->u
.score
) {
3313 /* Alphanumeric sorting */
3314 if (server
.sort_bypattern
) {
3315 if (!so1
->u
.cmpobj
|| !so2
->u
.cmpobj
) {
3316 /* At least one compare object is NULL */
3317 if (so1
->u
.cmpobj
== so2
->u
.cmpobj
)
3319 else if (so1
->u
.cmpobj
== NULL
)
3324 /* We have both the objects, use strcoll */
3325 cmp
= strcoll(so1
->u
.cmpobj
->ptr
,so2
->u
.cmpobj
->ptr
);
3328 /* Compare elements directly */
3329 cmp
= strcoll(so1
->obj
->ptr
,so2
->obj
->ptr
);
3332 return server
.sort_desc
? -cmp
: cmp
;
3335 /* The SORT command is the most complex command in Redis. Warning: this code
3336 * is optimized for speed and a bit less for readability */
3337 static void sortCommand(redisClient
*c
) {
3340 int desc
= 0, alpha
= 0;
3341 int limit_start
= 0, limit_count
= -1, start
, end
;
3342 int j
, dontsort
= 0, vectorlen
;
3343 int getop
= 0; /* GET operation counter */
3344 robj
*sortval
, *sortby
= NULL
;
3345 redisSortObject
*vector
; /* Resulting vector to sort */
3347 /* Lookup the key to sort. It must be of the right types */
3348 sortval
= lookupKeyRead(c
->db
,c
->argv
[1]);
3349 if (sortval
== NULL
) {
3350 addReply(c
,shared
.nokeyerr
);
3353 if (sortval
->type
!= REDIS_SET
&& sortval
->type
!= REDIS_LIST
) {
3354 addReply(c
,shared
.wrongtypeerr
);
3358 /* Create a list of operations to perform for every sorted element.
3359 * Operations can be GET/DEL/INCR/DECR */
3360 operations
= listCreate();
3361 listSetFreeMethod(operations
,zfree
);
3364 /* Now we need to protect sortval incrementing its count, in the future
3365 * SORT may have options able to overwrite/delete keys during the sorting
3366 * and the sorted key itself may get destroied */
3367 incrRefCount(sortval
);
3369 /* The SORT command has an SQL-alike syntax, parse it */
3370 while(j
< c
->argc
) {
3371 int leftargs
= c
->argc
-j
-1;
3372 if (!strcasecmp(c
->argv
[j
]->ptr
,"asc")) {
3374 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"desc")) {
3376 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"alpha")) {
3378 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"limit") && leftargs
>= 2) {
3379 limit_start
= atoi(c
->argv
[j
+1]->ptr
);
3380 limit_count
= atoi(c
->argv
[j
+2]->ptr
);
3382 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"by") && leftargs
>= 1) {
3383 sortby
= c
->argv
[j
+1];
3384 /* If the BY pattern does not contain '*', i.e. it is constant,
3385 * we don't need to sort nor to lookup the weight keys. */
3386 if (strchr(c
->argv
[j
+1]->ptr
,'*') == NULL
) dontsort
= 1;
3388 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"get") && leftargs
>= 1) {
3389 listAddNodeTail(operations
,createSortOperation(
3390 REDIS_SORT_GET
,c
->argv
[j
+1]));
3393 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"del") && leftargs
>= 1) {
3394 listAddNodeTail(operations
,createSortOperation(
3395 REDIS_SORT_DEL
,c
->argv
[j
+1]));
3397 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"incr") && leftargs
>= 1) {
3398 listAddNodeTail(operations
,createSortOperation(
3399 REDIS_SORT_INCR
,c
->argv
[j
+1]));
3401 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"get") && leftargs
>= 1) {
3402 listAddNodeTail(operations
,createSortOperation(
3403 REDIS_SORT_DECR
,c
->argv
[j
+1]));
3406 decrRefCount(sortval
);
3407 listRelease(operations
);
3408 addReply(c
,shared
.syntaxerr
);
3414 /* Load the sorting vector with all the objects to sort */
3415 vectorlen
= (sortval
->type
== REDIS_LIST
) ?
3416 listLength((list
*)sortval
->ptr
) :
3417 dictSize((dict
*)sortval
->ptr
);
3418 vector
= zmalloc(sizeof(redisSortObject
)*vectorlen
);
3419 if (!vector
) oom("allocating objects vector for SORT");
3421 if (sortval
->type
== REDIS_LIST
) {
3422 list
*list
= sortval
->ptr
;
3426 while((ln
= listYield(list
))) {
3427 robj
*ele
= ln
->value
;
3428 vector
[j
].obj
= ele
;
3429 vector
[j
].u
.score
= 0;
3430 vector
[j
].u
.cmpobj
= NULL
;
3434 dict
*set
= sortval
->ptr
;
3438 di
= dictGetIterator(set
);
3439 if (!di
) oom("dictGetIterator");
3440 while((setele
= dictNext(di
)) != NULL
) {
3441 vector
[j
].obj
= dictGetEntryKey(setele
);
3442 vector
[j
].u
.score
= 0;
3443 vector
[j
].u
.cmpobj
= NULL
;
3446 dictReleaseIterator(di
);
3448 assert(j
== vectorlen
);
3450 /* Now it's time to load the right scores in the sorting vector */
3451 if (dontsort
== 0) {
3452 for (j
= 0; j
< vectorlen
; j
++) {
3456 byval
= lookupKeyByPattern(c
->db
,sortby
,vector
[j
].obj
);
3457 if (!byval
|| byval
->type
!= REDIS_STRING
) continue;
3459 vector
[j
].u
.cmpobj
= byval
;
3460 incrRefCount(byval
);
3462 vector
[j
].u
.score
= strtod(byval
->ptr
,NULL
);
3465 if (!alpha
) vector
[j
].u
.score
= strtod(vector
[j
].obj
->ptr
,NULL
);
3470 /* We are ready to sort the vector... perform a bit of sanity check
3471 * on the LIMIT option too. We'll use a partial version of quicksort. */
3472 start
= (limit_start
< 0) ? 0 : limit_start
;
3473 end
= (limit_count
< 0) ? vectorlen
-1 : start
+limit_count
-1;
3474 if (start
>= vectorlen
) {
3475 start
= vectorlen
-1;
3478 if (end
>= vectorlen
) end
= vectorlen
-1;
3480 if (dontsort
== 0) {
3481 server
.sort_desc
= desc
;
3482 server
.sort_alpha
= alpha
;
3483 server
.sort_bypattern
= sortby
? 1 : 0;
3484 if (sortby
&& (start
!= 0 || end
!= vectorlen
-1))
3485 pqsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
, start
,end
);
3487 qsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
);
3490 /* Send command output to the output buffer, performing the specified
3491 * GET/DEL/INCR/DECR operations if any. */
3492 outputlen
= getop
? getop
*(end
-start
+1) : end
-start
+1;
3493 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",outputlen
));
3494 for (j
= start
; j
<= end
; j
++) {
3497 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",
3498 sdslen(vector
[j
].obj
->ptr
)));
3499 addReply(c
,vector
[j
].obj
);
3500 addReply(c
,shared
.crlf
);
3502 listRewind(operations
);
3503 while((ln
= listYield(operations
))) {
3504 redisSortOperation
*sop
= ln
->value
;
3505 robj
*val
= lookupKeyByPattern(c
->db
,sop
->pattern
,
3508 if (sop
->type
== REDIS_SORT_GET
) {
3509 if (!val
|| val
->type
!= REDIS_STRING
) {
3510 addReply(c
,shared
.nullbulk
);
3512 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",
3515 addReply(c
,shared
.crlf
);
3517 } else if (sop
->type
== REDIS_SORT_DEL
) {
3524 decrRefCount(sortval
);
3525 listRelease(operations
);
3526 for (j
= 0; j
< vectorlen
; j
++) {
3527 if (sortby
&& alpha
&& vector
[j
].u
.cmpobj
)
3528 decrRefCount(vector
[j
].u
.cmpobj
);
3533 static void infoCommand(redisClient
*c
) {
3535 time_t uptime
= time(NULL
)-server
.stat_starttime
;
3537 info
= sdscatprintf(sdsempty(),
3538 "redis_version:%s\r\n"
3539 "uptime_in_seconds:%d\r\n"
3540 "uptime_in_days:%d\r\n"
3541 "connected_clients:%d\r\n"
3542 "connected_slaves:%d\r\n"
3543 "used_memory:%zu\r\n"
3544 "changes_since_last_save:%lld\r\n"
3545 "bgsave_in_progress:%d\r\n"
3546 "last_save_time:%d\r\n"
3547 "total_connections_received:%lld\r\n"
3548 "total_commands_processed:%lld\r\n"
3553 listLength(server
.clients
)-listLength(server
.slaves
),
3554 listLength(server
.slaves
),
3557 server
.bgsaveinprogress
,
3559 server
.stat_numconnections
,
3560 server
.stat_numcommands
,
3561 server
.masterhost
== NULL
? "master" : "slave"
3563 if (server
.masterhost
) {
3564 info
= sdscatprintf(info
,
3565 "master_host:%s\r\n"
3566 "master_port:%d\r\n"
3567 "master_link_status:%s\r\n"
3568 "master_last_io_seconds_ago:%d\r\n"
3571 (server
.replstate
== REDIS_REPL_CONNECTED
) ?
3573 (int)(time(NULL
)-server
.master
->lastinteraction
)
3576 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info
)));
3577 addReplySds(c
,info
);
3578 addReply(c
,shared
.crlf
);
3581 static void monitorCommand(redisClient
*c
) {
3582 /* ignore MONITOR if aleady slave or in monitor mode */
3583 if (c
->flags
& REDIS_SLAVE
) return;
3585 c
->flags
|= (REDIS_SLAVE
|REDIS_MONITOR
);
3587 if (!listAddNodeTail(server
.monitors
,c
)) oom("listAddNodeTail");
3588 addReply(c
,shared
.ok
);
3591 /* ================================= Expire ================================= */
3592 static int removeExpire(redisDb
*db
, robj
*key
) {
3593 if (dictDelete(db
->expires
,key
) == DICT_OK
) {
3600 static int setExpire(redisDb
*db
, robj
*key
, time_t when
) {
3601 if (dictAdd(db
->expires
,key
,(void*)when
) == DICT_ERR
) {
3609 /* Return the expire time of the specified key, or -1 if no expire
3610 * is associated with this key (i.e. the key is non volatile) */
3611 static time_t getExpire(redisDb
*db
, robj
*key
) {
3614 /* No expire? return ASAP */
3615 if (dictSize(db
->expires
) == 0 ||
3616 (de
= dictFind(db
->expires
,key
)) == NULL
) return -1;
3618 return (time_t) dictGetEntryVal(de
);
3621 static int expireIfNeeded(redisDb
*db
, robj
*key
) {
3625 /* No expire? return ASAP */
3626 if (dictSize(db
->expires
) == 0 ||
3627 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
3629 /* Lookup the expire */
3630 when
= (time_t) dictGetEntryVal(de
);
3631 if (time(NULL
) <= when
) return 0;
3633 /* Delete the key */
3634 dictDelete(db
->expires
,key
);
3635 return dictDelete(db
->dict
,key
) == DICT_OK
;
3638 static int deleteIfVolatile(redisDb
*db
, robj
*key
) {
3641 /* No expire? return ASAP */
3642 if (dictSize(db
->expires
) == 0 ||
3643 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
3645 /* Delete the key */
3647 dictDelete(db
->expires
,key
);
3648 return dictDelete(db
->dict
,key
) == DICT_OK
;
3651 static void expireCommand(redisClient
*c
) {
3653 int seconds
= atoi(c
->argv
[2]->ptr
);
3655 de
= dictFind(c
->db
->dict
,c
->argv
[1]);
3657 addReply(c
,shared
.czero
);
3661 addReply(c
, shared
.czero
);
3664 time_t when
= time(NULL
)+seconds
;
3665 if (setExpire(c
->db
,c
->argv
[1],when
))
3666 addReply(c
,shared
.cone
);
3668 addReply(c
,shared
.czero
);
3673 static void ttlCommand(redisClient
*c
) {
3677 expire
= getExpire(c
->db
,c
->argv
[1]);
3679 ttl
= (int) (expire
-time(NULL
));
3680 if (ttl
< 0) ttl
= -1;
3682 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",ttl
));
3685 /* =============================== Replication ============================= */
3687 static int syncWrite(int fd
, char *ptr
, ssize_t size
, int timeout
) {
3688 ssize_t nwritten
, ret
= size
;
3689 time_t start
= time(NULL
);
3693 if (aeWait(fd
,AE_WRITABLE
,1000) & AE_WRITABLE
) {
3694 nwritten
= write(fd
,ptr
,size
);
3695 if (nwritten
== -1) return -1;
3699 if ((time(NULL
)-start
) > timeout
) {
3707 static int syncRead(int fd
, char *ptr
, ssize_t size
, int timeout
) {
3708 ssize_t nread
, totread
= 0;
3709 time_t start
= time(NULL
);
3713 if (aeWait(fd
,AE_READABLE
,1000) & AE_READABLE
) {
3714 nread
= read(fd
,ptr
,size
);
3715 if (nread
== -1) return -1;
3720 if ((time(NULL
)-start
) > timeout
) {
3728 static int syncReadLine(int fd
, char *ptr
, ssize_t size
, int timeout
) {
3735 if (syncRead(fd
,&c
,1,timeout
) == -1) return -1;
3738 if (nread
&& *(ptr
-1) == '\r') *(ptr
-1) = '\0';
3749 static void syncCommand(redisClient
*c
) {
3750 /* ignore SYNC if aleady slave or in monitor mode */
3751 if (c
->flags
& REDIS_SLAVE
) return;
3753 /* SYNC can't be issued when the server has pending data to send to
3754 * the client about already issued commands. We need a fresh reply
3755 * buffer registering the differences between the BGSAVE and the current
3756 * dataset, so that we can copy to other slaves if needed. */
3757 if (listLength(c
->reply
) != 0) {
3758 addReplySds(c
,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
3762 redisLog(REDIS_NOTICE
,"Slave ask for synchronization");
3763 /* Here we need to check if there is a background saving operation
3764 * in progress, or if it is required to start one */
3765 if (server
.bgsaveinprogress
) {
3766 /* Ok a background save is in progress. Let's check if it is a good
3767 * one for replication, i.e. if there is another slave that is
3768 * registering differences since the server forked to save */
3772 listRewind(server
.slaves
);
3773 while((ln
= listYield(server
.slaves
))) {
3775 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) break;
3778 /* Perfect, the server is already registering differences for
3779 * another slave. Set the right state, and copy the buffer. */
3780 listRelease(c
->reply
);
3781 c
->reply
= listDup(slave
->reply
);
3782 if (!c
->reply
) oom("listDup copying slave reply list");
3783 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
3784 redisLog(REDIS_NOTICE
,"Waiting for end of BGSAVE for SYNC");
3786 /* No way, we need to wait for the next BGSAVE in order to
3787 * register differences */
3788 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_START
;
3789 redisLog(REDIS_NOTICE
,"Waiting for next BGSAVE for SYNC");
3792 /* Ok we don't have a BGSAVE in progress, let's start one */
3793 redisLog(REDIS_NOTICE
,"Starting BGSAVE for SYNC");
3794 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
3795 redisLog(REDIS_NOTICE
,"Replication failed, can't BGSAVE");
3796 addReplySds(c
,sdsnew("-ERR Unalbe to perform background save\r\n"));
3799 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
3802 c
->flags
|= REDIS_SLAVE
;
3804 if (!listAddNodeTail(server
.slaves
,c
)) oom("listAddNodeTail");
3808 static void sendBulkToSlave(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
3809 redisClient
*slave
= privdata
;
3811 REDIS_NOTUSED(mask
);
3812 char buf
[REDIS_IOBUF_LEN
];
3813 ssize_t nwritten
, buflen
;
3815 if (slave
->repldboff
== 0) {
3816 /* Write the bulk write count before to transfer the DB. In theory here
3817 * we don't know how much room there is in the output buffer of the
3818 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
3819 * operations) will never be smaller than the few bytes we need. */
3822 bulkcount
= sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
3824 if (write(fd
,bulkcount
,sdslen(bulkcount
)) != (signed)sdslen(bulkcount
))
3832 lseek(slave
->repldbfd
,slave
->repldboff
,SEEK_SET
);
3833 buflen
= read(slave
->repldbfd
,buf
,REDIS_IOBUF_LEN
);
3835 redisLog(REDIS_WARNING
,"Read error sending DB to slave: %s",
3836 (buflen
== 0) ? "premature EOF" : strerror(errno
));
3840 if ((nwritten
= write(fd
,buf
,buflen
)) == -1) {
3841 redisLog(REDIS_DEBUG
,"Write error sending DB to slave: %s",
3846 slave
->repldboff
+= nwritten
;
3847 if (slave
->repldboff
== slave
->repldbsize
) {
3848 close(slave
->repldbfd
);
3849 slave
->repldbfd
= -1;
3850 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
3851 slave
->replstate
= REDIS_REPL_ONLINE
;
3852 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
,
3853 sendReplyToClient
, slave
, NULL
) == AE_ERR
) {
3857 addReplySds(slave
,sdsempty());
3858 redisLog(REDIS_NOTICE
,"Synchronization with slave succeeded");
3862 static void updateSalvesWaitingBgsave(int bgsaveerr
) {
3864 int startbgsave
= 0;
3866 listRewind(server
.slaves
);
3867 while((ln
= listYield(server
.slaves
))) {
3868 redisClient
*slave
= ln
->value
;
3870 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) {
3872 slave
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
3873 } else if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) {
3876 if (bgsaveerr
!= REDIS_OK
) {
3878 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE child returned an error");
3881 if ((slave
->repldbfd
= open(server
.dbfilename
,O_RDONLY
)) == -1 ||
3882 fstat(slave
->repldbfd
,&buf
) == -1) {
3884 redisLog(REDIS_WARNING
,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno
));
3887 slave
->repldboff
= 0;
3888 slave
->repldbsize
= buf
.st_size
;
3889 slave
->replstate
= REDIS_REPL_SEND_BULK
;
3890 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
3891 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
, sendBulkToSlave
, slave
, NULL
) == AE_ERR
) {
3898 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
3899 listRewind(server
.slaves
);
3900 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE failed");
3901 while((ln
= listYield(server
.slaves
))) {
3902 redisClient
*slave
= ln
->value
;
3904 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
)
3911 static int syncWithMaster(void) {
3912 char buf
[1024], tmpfile
[256];
3914 int fd
= anetTcpConnect(NULL
,server
.masterhost
,server
.masterport
);
3918 redisLog(REDIS_WARNING
,"Unable to connect to MASTER: %s",
3922 /* Issue the SYNC command */
3923 if (syncWrite(fd
,"SYNC \r\n",7,5) == -1) {
3925 redisLog(REDIS_WARNING
,"I/O error writing to MASTER: %s",
3929 /* Read the bulk write count */
3930 if (syncReadLine(fd
,buf
,1024,3600) == -1) {
3932 redisLog(REDIS_WARNING
,"I/O error reading bulk count from MASTER: %s",
3936 dumpsize
= atoi(buf
+1);
3937 redisLog(REDIS_NOTICE
,"Receiving %d bytes data dump from MASTER",dumpsize
);
3938 /* Read the bulk write data on a temp file */
3939 snprintf(tmpfile
,256,"temp-%d.%ld.rdb",(int)time(NULL
),(long int)random());
3940 dfd
= open(tmpfile
,O_CREAT
|O_WRONLY
,0644);
3943 redisLog(REDIS_WARNING
,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno
));
3947 int nread
, nwritten
;
3949 nread
= read(fd
,buf
,(dumpsize
< 1024)?dumpsize
:1024);
3951 redisLog(REDIS_WARNING
,"I/O error trying to sync with MASTER: %s",
3957 nwritten
= write(dfd
,buf
,nread
);
3958 if (nwritten
== -1) {
3959 redisLog(REDIS_WARNING
,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno
));
3967 if (rename(tmpfile
,server
.dbfilename
) == -1) {
3968 redisLog(REDIS_WARNING
,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno
));
3974 if (rdbLoad(server
.dbfilename
) != REDIS_OK
) {
3975 redisLog(REDIS_WARNING
,"Failed trying to load the MASTER synchronization DB from disk");
3979 server
.master
= createClient(fd
);
3980 server
.master
->flags
|= REDIS_MASTER
;
3981 server
.replstate
= REDIS_REPL_CONNECTED
;
3985 static void slaveofCommand(redisClient
*c
) {
3986 if (!strcasecmp(c
->argv
[1]->ptr
,"no") &&
3987 !strcasecmp(c
->argv
[2]->ptr
,"one")) {
3988 if (server
.masterhost
) {
3989 sdsfree(server
.masterhost
);
3990 server
.masterhost
= NULL
;
3991 if (server
.master
) freeClient(server
.master
);
3992 server
.replstate
= REDIS_REPL_NONE
;
3993 redisLog(REDIS_NOTICE
,"MASTER MODE enabled (user request)");
3996 sdsfree(server
.masterhost
);
3997 server
.masterhost
= sdsdup(c
->argv
[1]->ptr
);
3998 server
.masterport
= atoi(c
->argv
[2]->ptr
);
3999 if (server
.master
) freeClient(server
.master
);
4000 server
.replstate
= REDIS_REPL_CONNECT
;
4001 redisLog(REDIS_NOTICE
,"SLAVE OF %s:%d enabled (user request)",
4002 server
.masterhost
, server
.masterport
);
4004 addReply(c
,shared
.ok
);
4007 /* ============================ Maxmemory directive ======================== */
4009 /* This function gets called when 'maxmemory' is set on the config file to limit
4010 * the max memory used by the server, and we are out of memory.
4011 * This function will try to, in order:
4013 * - Free objects from the free list
4014 * - Try to remove keys with an EXPIRE set
4016 * It is not possible to free enough memory to reach used-memory < maxmemory
4017 * the server will start refusing commands that will enlarge even more the
4020 static void freeMemoryIfNeeded(void) {
4021 while (server
.maxmemory
&& zmalloc_used_memory() > server
.maxmemory
) {
4022 if (listLength(server
.objfreelist
)) {
4025 listNode
*head
= listFirst(server
.objfreelist
);
4026 o
= listNodeValue(head
);
4027 listDelNode(server
.objfreelist
,head
);
4030 int j
, k
, freed
= 0;
4032 for (j
= 0; j
< server
.dbnum
; j
++) {
4034 robj
*minkey
= NULL
;
4035 struct dictEntry
*de
;
4037 if (dictSize(server
.db
[j
].expires
)) {
4039 /* From a sample of three keys drop the one nearest to
4040 * the natural expire */
4041 for (k
= 0; k
< 3; k
++) {
4044 de
= dictGetRandomKey(server
.db
[j
].expires
);
4045 t
= (time_t) dictGetEntryVal(de
);
4046 if (minttl
== -1 || t
< minttl
) {
4047 minkey
= dictGetEntryKey(de
);
4051 deleteKey(server
.db
+j
,minkey
);
4054 if (!freed
) return; /* nothing to free... */
4059 /* ================================= Debugging ============================== */
4061 static void debugCommand(redisClient
*c
) {
4062 if (!strcasecmp(c
->argv
[1]->ptr
,"segfault")) {
4064 } else if (!strcasecmp(c
->argv
[1]->ptr
,"object") && c
->argc
== 3) {
4065 dictEntry
*de
= dictFind(c
->db
->dict
,c
->argv
[2]);
4069 addReply(c
,shared
.nokeyerr
);
4072 key
= dictGetEntryKey(de
);
4073 val
= dictGetEntryVal(de
);
4074 addReplySds(c
,sdscatprintf(sdsempty(),
4075 "+Key at:%p refcount:%d, value at:%p refcount:%d\r\n",
4076 key
, key
->refcount
, val
, val
->refcount
));
4078 addReplySds(c
,sdsnew(
4079 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>]\r\n"));
4083 /* =================================== Main! ================================ */
4086 int linuxOvercommitMemoryValue(void) {
4087 FILE *fp
= fopen("/proc/sys/vm/overcommit_memory","r");
4091 if (fgets(buf
,64,fp
) == NULL
) {
4100 void linuxOvercommitMemoryWarning(void) {
4101 if (linuxOvercommitMemoryValue() == 0) {
4102 redisLog(REDIS_WARNING
,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'echo 1 > /proc/sys/vm/overcommit_memory' in your init scripts.");
4105 #endif /* __linux__ */
4107 static void daemonize(void) {
4111 if (fork() != 0) exit(0); /* parent exits */
4112 setsid(); /* create a new session */
4114 /* Every output goes to /dev/null. If Redis is daemonized but
4115 * the 'logfile' is set to 'stdout' in the configuration file
4116 * it will not log at all. */
4117 if ((fd
= open("/dev/null", O_RDWR
, 0)) != -1) {
4118 dup2(fd
, STDIN_FILENO
);
4119 dup2(fd
, STDOUT_FILENO
);
4120 dup2(fd
, STDERR_FILENO
);
4121 if (fd
> STDERR_FILENO
) close(fd
);
4123 /* Try to write the pid file */
4124 fp
= fopen(server
.pidfile
,"w");
4126 fprintf(fp
,"%d\n",getpid());
4131 static void segvHandler (int sig
, siginfo_t
*info
, void *secret
) {
4134 char **messages
= (char **)NULL
;
4135 int i
, trace_size
= 0;
4136 ucontext_t
*uc
= (ucontext_t
*)secret
;
4138 /* Do something useful with siginfo_t */
4140 printf("Got signal %d, faulty address is %p, from %p\n", sig
, info
->si_addr
,
4141 (void *)uc
->uc_mcontext
.gregs
[REG_EIP
]);
4143 printf("Got signal %d\n", sig
);
4145 trace_size
= backtrace(trace
, 16);
4146 /* overwrite sigaction with caller's address */
4147 trace
[1] = (void *) uc
->uc_mcontext
.gregs
[REG_EIP
];
4149 messages
= backtrace_symbols(trace
, trace_size
);
4150 /* skip first stack frame (points here) */
4151 printf("[bt] Execution path:\n");
4152 for (i
=1; i
<trace_size
; ++i
)
4153 printf("[bt] %s\n", messages
[i
]);
4158 void setupSigAction(){
4159 struct sigaction act
;
4160 sigemptyset (&act
.sa_mask
);
4161 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction is used. Otherwise, sa_handler is used */
4162 act
.sa_flags
= SA_NODEFER
| SA_ONSTACK
| SA_RESETHAND
| SA_SIGINFO
;
4163 act
.sa_sigaction
= segvHandler
;
4164 sigaction (SIGSEGV
, &act
, NULL
);
4167 int main(int argc
, char **argv
) {
4170 linuxOvercommitMemoryWarning();
4175 ResetServerSaveParams();
4176 loadServerConfig(argv
[1]);
4177 } else if (argc
> 2) {
4178 fprintf(stderr
,"Usage: ./redis-server [/path/to/redis.conf]\n");
4181 redisLog(REDIS_WARNING
,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
4184 if (server
.daemonize
) daemonize();
4185 redisLog(REDIS_NOTICE
,"Server started, Redis version " REDIS_VERSION
);
4186 if (rdbLoad(server
.dbfilename
) == REDIS_OK
)
4187 redisLog(REDIS_NOTICE
,"DB loaded from disk");
4188 if (aeCreateFileEvent(server
.el
, server
.fd
, AE_READABLE
,
4189 acceptHandler
, NULL
, NULL
) == AE_ERR
) oom("creating file event");
4190 redisLog(REDIS_NOTICE
,"The server is now ready to accept connections on port %d", server
.port
);
4192 aeDeleteEventLoop(server
.el
);