2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
30 #define REDIS_VERSION "0.101"
39 #define __USE_POSIX199309
49 #include <arpa/inet.h>
53 #include <sys/resource.h>
58 #include "ae.h" /* Event driven programming library */
59 #include "sds.h" /* Dynamic safe strings */
60 #include "anet.h" /* Networking the easy way */
61 #include "dict.h" /* Hash tables */
62 #include "adlist.h" /* Linked lists */
63 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
64 #include "lzf.h" /* LZF compression library */
65 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
71 /* Static server configuration */
72 #define REDIS_SERVERPORT 6379 /* TCP port */
73 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
74 #define REDIS_IOBUF_LEN 1024
75 #define REDIS_LOADBUF_LEN 1024
76 #define REDIS_STATIC_ARGS 4
77 #define REDIS_DEFAULT_DBNUM 16
78 #define REDIS_CONFIGLINE_MAX 1024
79 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
80 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
81 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
83 /* Hash table parameters */
84 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
85 #define REDIS_HT_MINSLOTS 16384 /* Never resize the HT under this */
88 #define REDIS_CMD_BULK 1 /* Bulk write command */
89 #define REDIS_CMD_INLINE 2 /* Inline command */
90 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
91 this flags will return an error when the 'maxmemory' option is set in the
92 config file and the server is using more than maxmemory bytes of memory.
93 In short this commands are denied on low memory conditions. */
94 #define REDIS_CMD_DENYOOM 4
97 #define REDIS_STRING 0
102 /* Object types only used for dumping to disk */
103 #define REDIS_EXPIRETIME 253
104 #define REDIS_SELECTDB 254
105 #define REDIS_EOF 255
107 /* Defines related to the dump file format. To store 32 bits lengths for short
108 * keys requires a lot of space, so we check the most significant 2 bits of
109 * the first byte to interpreter the length:
111 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
112 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
113 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
114 * 11|000000 this means: specially encoded object will follow. The six bits
115 * number specify the kind of object that follows.
116 * See the REDIS_RDB_ENC_* defines.
118 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
119 * values, will fit inside. */
120 #define REDIS_RDB_6BITLEN 0
121 #define REDIS_RDB_14BITLEN 1
122 #define REDIS_RDB_32BITLEN 2
123 #define REDIS_RDB_ENCVAL 3
124 #define REDIS_RDB_LENERR UINT_MAX
126 /* When a length of a string object stored on disk has the first two bits
127 * set, the remaining two bits specify a special encoding for the object
128 * accordingly to the following defines: */
129 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
130 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
131 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
132 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
135 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
136 #define REDIS_SLAVE 2 /* This client is a slave server */
137 #define REDIS_MASTER 4 /* This client is a master server */
138 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
140 /* Slave replication state - slave side */
141 #define REDIS_REPL_NONE 0 /* No active replication */
142 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
143 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
145 /* Slave replication state - from the point of view of master
146 * Note that in SEND_BULK and ONLINE state the slave receives new updates
147 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
148 * to start the next background saving in order to send updates to it. */
149 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
150 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
151 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
152 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
154 /* List related stuff */
158 /* Sort operations */
159 #define REDIS_SORT_GET 0
160 #define REDIS_SORT_DEL 1
161 #define REDIS_SORT_INCR 2
162 #define REDIS_SORT_DECR 3
163 #define REDIS_SORT_ASC 4
164 #define REDIS_SORT_DESC 5
165 #define REDIS_SORTKEY_MAX 1024
168 #define REDIS_DEBUG 0
169 #define REDIS_NOTICE 1
170 #define REDIS_WARNING 2
172 /* Anti-warning macro... */
173 #define REDIS_NOTUSED(V) ((void) V)
176 /*================================= Data types ============================== */
178 /* A redis object, that is a type able to hold a string / list / set */
179 typedef struct redisObject
{
185 typedef struct redisDb
{
191 /* With multiplexing we need to take per-clinet state.
192 * Clients are taken in a liked list. */
193 typedef struct redisClient
{
200 int bulklen
; /* bulk read len. -1 if not in bulk read mode */
203 time_t lastinteraction
; /* time of the last interaction, used for timeout */
204 int flags
; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
205 int slaveseldb
; /* slave selected db, if this client is a slave */
206 int authenticated
; /* when requirepass is non-NULL */
207 int replstate
; /* replication state if this is a slave */
208 int repldbfd
; /* replication DB file descriptor */
209 long repldboff
; /* replication DB file offset */
210 off_t repldbsize
; /* replication DB file size */
218 /* Global server state structure */
224 unsigned int sharingpoolsize
;
225 long long dirty
; /* changes to DB from the last save */
227 list
*slaves
, *monitors
;
228 char neterr
[ANET_ERR_LEN
];
230 int cronloops
; /* number of times the cron function run */
231 list
*objfreelist
; /* A list of freed objects to avoid malloc() */
232 time_t lastsave
; /* Unix time of last save succeeede */
233 size_t usedmemory
; /* Used memory in megabytes */
234 /* Fields used only for stats */
235 time_t stat_starttime
; /* server start time */
236 long long stat_numcommands
; /* number of processed commands */
237 long long stat_numconnections
; /* number of connections received */
245 int bgsaveinprogress
;
246 struct saveparam
*saveparams
;
253 /* Replication related */
257 redisClient
*master
; /* client that is master for this slave */
259 unsigned int maxclients
;
260 unsigned int maxmemory
;
261 /* Sort parameters - qsort_r() is only available under BSD so we
262 * have to take this state global, in order to pass it to sortCompare() */
268 typedef void redisCommandProc(redisClient
*c
);
269 struct redisCommand
{
271 redisCommandProc
*proc
;
276 typedef struct _redisSortObject
{
284 typedef struct _redisSortOperation
{
287 } redisSortOperation
;
289 struct sharedObjectsStruct
{
290 robj
*crlf
, *ok
, *err
, *emptybulk
, *czero
, *cone
, *pong
, *space
,
291 *colon
, *nullbulk
, *nullmultibulk
,
292 *emptymultibulk
, *wrongtypeerr
, *nokeyerr
, *syntaxerr
, *sameobjecterr
,
293 *outofrangeerr
, *plus
,
294 *select0
, *select1
, *select2
, *select3
, *select4
,
295 *select5
, *select6
, *select7
, *select8
, *select9
;
298 /*================================ Prototypes =============================== */
300 static void freeStringObject(robj
*o
);
301 static void freeListObject(robj
*o
);
302 static void freeSetObject(robj
*o
);
303 static void decrRefCount(void *o
);
304 static robj
*createObject(int type
, void *ptr
);
305 static void freeClient(redisClient
*c
);
306 static int rdbLoad(char *filename
);
307 static void addReply(redisClient
*c
, robj
*obj
);
308 static void addReplySds(redisClient
*c
, sds s
);
309 static void incrRefCount(robj
*o
);
310 static int rdbSaveBackground(char *filename
);
311 static robj
*createStringObject(char *ptr
, size_t len
);
312 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
);
313 static int syncWithMaster(void);
314 static robj
*tryObjectSharing(robj
*o
);
315 static int removeExpire(redisDb
*db
, robj
*key
);
316 static int expireIfNeeded(redisDb
*db
, robj
*key
);
317 static int deleteIfVolatile(redisDb
*db
, robj
*key
);
318 static int deleteKey(redisDb
*db
, robj
*key
);
319 static time_t getExpire(redisDb
*db
, robj
*key
);
320 static int setExpire(redisDb
*db
, robj
*key
, time_t when
);
321 static void updateSalvesWaitingBgsave(int bgsaveerr
);
322 static void freeMemoryIfNeeded(void);
324 static void authCommand(redisClient
*c
);
325 static void pingCommand(redisClient
*c
);
326 static void echoCommand(redisClient
*c
);
327 static void setCommand(redisClient
*c
);
328 static void setnxCommand(redisClient
*c
);
329 static void getCommand(redisClient
*c
);
330 static void delCommand(redisClient
*c
);
331 static void existsCommand(redisClient
*c
);
332 static void incrCommand(redisClient
*c
);
333 static void decrCommand(redisClient
*c
);
334 static void incrbyCommand(redisClient
*c
);
335 static void decrbyCommand(redisClient
*c
);
336 static void selectCommand(redisClient
*c
);
337 static void randomkeyCommand(redisClient
*c
);
338 static void keysCommand(redisClient
*c
);
339 static void dbsizeCommand(redisClient
*c
);
340 static void lastsaveCommand(redisClient
*c
);
341 static void saveCommand(redisClient
*c
);
342 static void bgsaveCommand(redisClient
*c
);
343 static void shutdownCommand(redisClient
*c
);
344 static void moveCommand(redisClient
*c
);
345 static void renameCommand(redisClient
*c
);
346 static void renamenxCommand(redisClient
*c
);
347 static void lpushCommand(redisClient
*c
);
348 static void rpushCommand(redisClient
*c
);
349 static void lpopCommand(redisClient
*c
);
350 static void rpopCommand(redisClient
*c
);
351 static void llenCommand(redisClient
*c
);
352 static void lindexCommand(redisClient
*c
);
353 static void lrangeCommand(redisClient
*c
);
354 static void ltrimCommand(redisClient
*c
);
355 static void typeCommand(redisClient
*c
);
356 static void lsetCommand(redisClient
*c
);
357 static void saddCommand(redisClient
*c
);
358 static void sremCommand(redisClient
*c
);
359 static void smoveCommand(redisClient
*c
);
360 static void sismemberCommand(redisClient
*c
);
361 static void scardCommand(redisClient
*c
);
362 static void sinterCommand(redisClient
*c
);
363 static void sinterstoreCommand(redisClient
*c
);
364 static void sunionCommand(redisClient
*c
);
365 static void sunionstoreCommand(redisClient
*c
);
366 static void sdiffCommand(redisClient
*c
);
367 static void sdiffstoreCommand(redisClient
*c
);
368 static void syncCommand(redisClient
*c
);
369 static void flushdbCommand(redisClient
*c
);
370 static void flushallCommand(redisClient
*c
);
371 static void sortCommand(redisClient
*c
);
372 static void lremCommand(redisClient
*c
);
373 static void infoCommand(redisClient
*c
);
374 static void mgetCommand(redisClient
*c
);
375 static void monitorCommand(redisClient
*c
);
376 static void expireCommand(redisClient
*c
);
377 static void getSetCommand(redisClient
*c
);
378 static void ttlCommand(redisClient
*c
);
379 static void slaveofCommand(redisClient
*c
);
380 static void debugCommand(redisClient
*c
);
381 static void setupSigSegvAction();
382 /*================================= Globals ================================= */
385 static struct redisServer server
; /* server global state */
386 static struct redisCommand cmdTable
[] = {
387 {"get",getCommand
,2,REDIS_CMD_INLINE
},
388 {"set",setCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
389 {"setnx",setnxCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
390 {"del",delCommand
,-2,REDIS_CMD_INLINE
},
391 {"exists",existsCommand
,2,REDIS_CMD_INLINE
},
392 {"incr",incrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
393 {"decr",decrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
394 {"mget",mgetCommand
,-2,REDIS_CMD_INLINE
},
395 {"rpush",rpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
396 {"lpush",lpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
397 {"rpop",rpopCommand
,2,REDIS_CMD_INLINE
},
398 {"lpop",lpopCommand
,2,REDIS_CMD_INLINE
},
399 {"llen",llenCommand
,2,REDIS_CMD_INLINE
},
400 {"lindex",lindexCommand
,3,REDIS_CMD_INLINE
},
401 {"lset",lsetCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
402 {"lrange",lrangeCommand
,4,REDIS_CMD_INLINE
},
403 {"ltrim",ltrimCommand
,4,REDIS_CMD_INLINE
},
404 {"lrem",lremCommand
,4,REDIS_CMD_BULK
},
405 {"sadd",saddCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
406 {"srem",sremCommand
,3,REDIS_CMD_BULK
},
407 {"smove",smoveCommand
,4,REDIS_CMD_BULK
},
408 {"sismember",sismemberCommand
,3,REDIS_CMD_BULK
},
409 {"scard",scardCommand
,2,REDIS_CMD_INLINE
},
410 {"sinter",sinterCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
411 {"sinterstore",sinterstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
412 {"sunion",sunionCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
413 {"sunionstore",sunionstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
414 {"sdiff",sdiffCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
415 {"sdiffstore",sdiffstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
416 {"smembers",sinterCommand
,2,REDIS_CMD_INLINE
},
417 {"incrby",incrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
418 {"decrby",decrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
419 {"getset",getSetCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
420 {"randomkey",randomkeyCommand
,1,REDIS_CMD_INLINE
},
421 {"select",selectCommand
,2,REDIS_CMD_INLINE
},
422 {"move",moveCommand
,3,REDIS_CMD_INLINE
},
423 {"rename",renameCommand
,3,REDIS_CMD_INLINE
},
424 {"renamenx",renamenxCommand
,3,REDIS_CMD_INLINE
},
425 {"expire",expireCommand
,3,REDIS_CMD_INLINE
},
426 {"keys",keysCommand
,2,REDIS_CMD_INLINE
},
427 {"dbsize",dbsizeCommand
,1,REDIS_CMD_INLINE
},
428 {"auth",authCommand
,2,REDIS_CMD_INLINE
},
429 {"ping",pingCommand
,1,REDIS_CMD_INLINE
},
430 {"echo",echoCommand
,2,REDIS_CMD_BULK
},
431 {"save",saveCommand
,1,REDIS_CMD_INLINE
},
432 {"bgsave",bgsaveCommand
,1,REDIS_CMD_INLINE
},
433 {"shutdown",shutdownCommand
,1,REDIS_CMD_INLINE
},
434 {"lastsave",lastsaveCommand
,1,REDIS_CMD_INLINE
},
435 {"type",typeCommand
,2,REDIS_CMD_INLINE
},
436 {"sync",syncCommand
,1,REDIS_CMD_INLINE
},
437 {"flushdb",flushdbCommand
,1,REDIS_CMD_INLINE
},
438 {"flushall",flushallCommand
,1,REDIS_CMD_INLINE
},
439 {"sort",sortCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
440 {"info",infoCommand
,1,REDIS_CMD_INLINE
},
441 {"monitor",monitorCommand
,1,REDIS_CMD_INLINE
},
442 {"ttl",ttlCommand
,2,REDIS_CMD_INLINE
},
443 {"slaveof",slaveofCommand
,3,REDIS_CMD_INLINE
},
444 {"debug",debugCommand
,-2,REDIS_CMD_INLINE
},
448 /*============================ Utility functions ============================ */
450 /* Glob-style pattern matching. */
451 int stringmatchlen(const char *pattern
, int patternLen
,
452 const char *string
, int stringLen
, int nocase
)
457 while (pattern
[1] == '*') {
462 return 1; /* match */
464 if (stringmatchlen(pattern
+1, patternLen
-1,
465 string
, stringLen
, nocase
))
466 return 1; /* match */
470 return 0; /* no match */
474 return 0; /* no match */
484 not = pattern
[0] == '^';
491 if (pattern
[0] == '\\') {
494 if (pattern
[0] == string
[0])
496 } else if (pattern
[0] == ']') {
498 } else if (patternLen
== 0) {
502 } else if (pattern
[1] == '-' && patternLen
>= 3) {
503 int start
= pattern
[0];
504 int end
= pattern
[2];
512 start
= tolower(start
);
518 if (c
>= start
&& c
<= end
)
522 if (pattern
[0] == string
[0])
525 if (tolower((int)pattern
[0]) == tolower((int)string
[0]))
535 return 0; /* no match */
541 if (patternLen
>= 2) {
548 if (pattern
[0] != string
[0])
549 return 0; /* no match */
551 if (tolower((int)pattern
[0]) != tolower((int)string
[0]))
552 return 0; /* no match */
560 if (stringLen
== 0) {
561 while(*pattern
== '*') {
568 if (patternLen
== 0 && stringLen
== 0)
573 void redisLog(int level
, const char *fmt
, ...)
578 fp
= (server
.logfile
== NULL
) ? stdout
: fopen(server
.logfile
,"a");
582 if (level
>= server
.verbosity
) {
588 strftime(buf
,64,"%d %b %H:%M:%S",gmtime(&now
));
589 fprintf(fp
,"%s %c ",buf
,c
[level
]);
590 vfprintf(fp
, fmt
, ap
);
596 if (server
.logfile
) fclose(fp
);
599 /*====================== Hash table type implementation ==================== */
601 /* This is an hash table type that uses the SDS dynamic strings libary as
602 * keys and radis objects as values (objects can hold SDS strings,
605 static int sdsDictKeyCompare(void *privdata
, const void *key1
,
609 DICT_NOTUSED(privdata
);
611 l1
= sdslen((sds
)key1
);
612 l2
= sdslen((sds
)key2
);
613 if (l1
!= l2
) return 0;
614 return memcmp(key1
, key2
, l1
) == 0;
617 static void dictRedisObjectDestructor(void *privdata
, void *val
)
619 DICT_NOTUSED(privdata
);
624 static int dictSdsKeyCompare(void *privdata
, const void *key1
,
627 const robj
*o1
= key1
, *o2
= key2
;
628 return sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
631 static unsigned int dictSdsHash(const void *key
) {
633 return dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
636 static dictType setDictType
= {
637 dictSdsHash
, /* hash function */
640 dictSdsKeyCompare
, /* key compare */
641 dictRedisObjectDestructor
, /* key destructor */
642 NULL
/* val destructor */
645 static dictType hashDictType
= {
646 dictSdsHash
, /* hash function */
649 dictSdsKeyCompare
, /* key compare */
650 dictRedisObjectDestructor
, /* key destructor */
651 dictRedisObjectDestructor
/* val destructor */
654 /* ========================= Random utility functions ======================= */
656 /* Redis generally does not try to recover from out of memory conditions
657 * when allocating objects or strings, it is not clear if it will be possible
658 * to report this condition to the client since the networking layer itself
659 * is based on heap allocation for send buffers, so we simply abort.
660 * At least the code will be simpler to read... */
661 static void oom(const char *msg
) {
662 fprintf(stderr
, "%s: Out of memory\n",msg
);
668 /* ====================== Redis server networking stuff ===================== */
669 void closeTimedoutClients(void) {
672 time_t now
= time(NULL
);
674 listRewind(server
.clients
);
675 while ((ln
= listYield(server
.clients
)) != NULL
) {
676 c
= listNodeValue(ln
);
677 if (!(c
->flags
& REDIS_SLAVE
) && /* no timeout for slaves */
678 !(c
->flags
& REDIS_MASTER
) && /* no timeout for masters */
679 (now
- c
->lastinteraction
> server
.maxidletime
)) {
680 redisLog(REDIS_DEBUG
,"Closing idle client");
686 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
687 * we resize the hash table to save memory */
688 void tryResizeHashTables(void) {
691 for (j
= 0; j
< server
.dbnum
; j
++) {
692 long long size
, used
;
694 size
= dictSlots(server
.db
[j
].dict
);
695 used
= dictSize(server
.db
[j
].dict
);
696 if (size
&& used
&& size
> REDIS_HT_MINSLOTS
&&
697 (used
*100/size
< REDIS_HT_MINFILL
)) {
698 redisLog(REDIS_NOTICE
,"The hash table %d is too sparse, resize it...",j
);
699 dictResize(server
.db
[j
].dict
);
700 redisLog(REDIS_NOTICE
,"Hash table %d resized.",j
);
705 int serverCron(struct aeEventLoop
*eventLoop
, long long id
, void *clientData
) {
706 int j
, loops
= server
.cronloops
++;
707 REDIS_NOTUSED(eventLoop
);
709 REDIS_NOTUSED(clientData
);
711 /* Update the global state with the amount of used memory */
712 server
.usedmemory
= zmalloc_used_memory();
714 /* Show some info about non-empty databases */
715 for (j
= 0; j
< server
.dbnum
; j
++) {
716 long long size
, used
, vkeys
;
718 size
= dictSlots(server
.db
[j
].dict
);
719 used
= dictSize(server
.db
[j
].dict
);
720 vkeys
= dictSize(server
.db
[j
].expires
);
721 if (!(loops
% 5) && used
> 0) {
722 redisLog(REDIS_DEBUG
,"DB %d: %d keys (%d volatile) in %d slots HT.",j
,used
,vkeys
,size
);
723 /* dictPrintStats(server.dict); */
727 /* We don't want to resize the hash tables while a bacground saving
728 * is in progress: the saving child is created using fork() that is
729 * implemented with a copy-on-write semantic in most modern systems, so
730 * if we resize the HT while there is the saving child at work actually
731 * a lot of memory movements in the parent will cause a lot of pages
733 if (!server
.bgsaveinprogress
) tryResizeHashTables();
735 /* Show information about connected clients */
737 redisLog(REDIS_DEBUG
,"%d clients connected (%d slaves), %zu bytes in use",
738 listLength(server
.clients
)-listLength(server
.slaves
),
739 listLength(server
.slaves
),
741 dictSize(server
.sharingpool
));
744 /* Close connections of timedout clients */
745 if (server
.maxidletime
&& !(loops
% 10))
746 closeTimedoutClients();
748 /* Check if a background saving in progress terminated */
749 if (server
.bgsaveinprogress
) {
751 /* XXX: TODO handle the case of the saving child killed */
752 if (wait4(-1,&statloc
,WNOHANG
,NULL
)) {
753 int exitcode
= WEXITSTATUS(statloc
);
755 redisLog(REDIS_NOTICE
,
756 "Background saving terminated with success");
758 server
.lastsave
= time(NULL
);
760 redisLog(REDIS_WARNING
,
761 "Background saving error");
763 server
.bgsaveinprogress
= 0;
764 updateSalvesWaitingBgsave(exitcode
== 0 ? REDIS_OK
: REDIS_ERR
);
767 /* If there is not a background saving in progress check if
768 * we have to save now */
769 time_t now
= time(NULL
);
770 for (j
= 0; j
< server
.saveparamslen
; j
++) {
771 struct saveparam
*sp
= server
.saveparams
+j
;
773 if (server
.dirty
>= sp
->changes
&&
774 now
-server
.lastsave
> sp
->seconds
) {
775 redisLog(REDIS_NOTICE
,"%d changes in %d seconds. Saving...",
776 sp
->changes
, sp
->seconds
);
777 rdbSaveBackground(server
.dbfilename
);
783 /* Try to expire a few timed out keys */
784 for (j
= 0; j
< server
.dbnum
; j
++) {
785 redisDb
*db
= server
.db
+j
;
786 int num
= dictSize(db
->expires
);
789 time_t now
= time(NULL
);
791 if (num
> REDIS_EXPIRELOOKUPS_PER_CRON
)
792 num
= REDIS_EXPIRELOOKUPS_PER_CRON
;
797 if ((de
= dictGetRandomKey(db
->expires
)) == NULL
) break;
798 t
= (time_t) dictGetEntryVal(de
);
800 deleteKey(db
,dictGetEntryKey(de
));
806 /* Check if we should connect to a MASTER */
807 if (server
.replstate
== REDIS_REPL_CONNECT
) {
808 redisLog(REDIS_NOTICE
,"Connecting to MASTER...");
809 if (syncWithMaster() == REDIS_OK
) {
810 redisLog(REDIS_NOTICE
,"MASTER <-> SLAVE sync succeeded");
816 static void createSharedObjects(void) {
817 shared
.crlf
= createObject(REDIS_STRING
,sdsnew("\r\n"));
818 shared
.ok
= createObject(REDIS_STRING
,sdsnew("+OK\r\n"));
819 shared
.err
= createObject(REDIS_STRING
,sdsnew("-ERR\r\n"));
820 shared
.emptybulk
= createObject(REDIS_STRING
,sdsnew("$0\r\n\r\n"));
821 shared
.czero
= createObject(REDIS_STRING
,sdsnew(":0\r\n"));
822 shared
.cone
= createObject(REDIS_STRING
,sdsnew(":1\r\n"));
823 shared
.nullbulk
= createObject(REDIS_STRING
,sdsnew("$-1\r\n"));
824 shared
.nullmultibulk
= createObject(REDIS_STRING
,sdsnew("*-1\r\n"));
825 shared
.emptymultibulk
= createObject(REDIS_STRING
,sdsnew("*0\r\n"));
827 shared
.pong
= createObject(REDIS_STRING
,sdsnew("+PONG\r\n"));
828 shared
.wrongtypeerr
= createObject(REDIS_STRING
,sdsnew(
829 "-ERR Operation against a key holding the wrong kind of value\r\n"));
830 shared
.nokeyerr
= createObject(REDIS_STRING
,sdsnew(
831 "-ERR no such key\r\n"));
832 shared
.syntaxerr
= createObject(REDIS_STRING
,sdsnew(
833 "-ERR syntax error\r\n"));
834 shared
.sameobjecterr
= createObject(REDIS_STRING
,sdsnew(
835 "-ERR source and destination objects are the same\r\n"));
836 shared
.outofrangeerr
= createObject(REDIS_STRING
,sdsnew(
837 "-ERR index out of range\r\n"));
838 shared
.space
= createObject(REDIS_STRING
,sdsnew(" "));
839 shared
.colon
= createObject(REDIS_STRING
,sdsnew(":"));
840 shared
.plus
= createObject(REDIS_STRING
,sdsnew("+"));
841 shared
.select0
= createStringObject("select 0\r\n",10);
842 shared
.select1
= createStringObject("select 1\r\n",10);
843 shared
.select2
= createStringObject("select 2\r\n",10);
844 shared
.select3
= createStringObject("select 3\r\n",10);
845 shared
.select4
= createStringObject("select 4\r\n",10);
846 shared
.select5
= createStringObject("select 5\r\n",10);
847 shared
.select6
= createStringObject("select 6\r\n",10);
848 shared
.select7
= createStringObject("select 7\r\n",10);
849 shared
.select8
= createStringObject("select 8\r\n",10);
850 shared
.select9
= createStringObject("select 9\r\n",10);
853 static void appendServerSaveParams(time_t seconds
, int changes
) {
854 server
.saveparams
= zrealloc(server
.saveparams
,sizeof(struct saveparam
)*(server
.saveparamslen
+1));
855 if (server
.saveparams
== NULL
) oom("appendServerSaveParams");
856 server
.saveparams
[server
.saveparamslen
].seconds
= seconds
;
857 server
.saveparams
[server
.saveparamslen
].changes
= changes
;
858 server
.saveparamslen
++;
861 static void ResetServerSaveParams() {
862 zfree(server
.saveparams
);
863 server
.saveparams
= NULL
;
864 server
.saveparamslen
= 0;
867 static void initServerConfig() {
868 server
.dbnum
= REDIS_DEFAULT_DBNUM
;
869 server
.port
= REDIS_SERVERPORT
;
870 server
.verbosity
= REDIS_DEBUG
;
871 server
.maxidletime
= REDIS_MAXIDLETIME
;
872 server
.saveparams
= NULL
;
873 server
.logfile
= NULL
; /* NULL = log on standard output */
874 server
.bindaddr
= NULL
;
875 server
.glueoutputbuf
= 1;
876 server
.daemonize
= 0;
877 server
.pidfile
= "/var/run/redis.pid";
878 server
.dbfilename
= "dump.rdb";
879 server
.requirepass
= NULL
;
880 server
.shareobjects
= 0;
881 server
.maxclients
= 0;
882 server
.maxmemory
= 0;
883 ResetServerSaveParams();
885 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
886 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
887 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
888 /* Replication related */
890 server
.masterhost
= NULL
;
891 server
.masterport
= 6379;
892 server
.master
= NULL
;
893 server
.replstate
= REDIS_REPL_NONE
;
896 static void initServer() {
899 signal(SIGHUP
, SIG_IGN
);
900 signal(SIGPIPE
, SIG_IGN
);
901 setupSigSegvAction();
903 server
.clients
= listCreate();
904 server
.slaves
= listCreate();
905 server
.monitors
= listCreate();
906 server
.objfreelist
= listCreate();
907 createSharedObjects();
908 server
.el
= aeCreateEventLoop();
909 server
.db
= zmalloc(sizeof(redisDb
)*server
.dbnum
);
910 server
.sharingpool
= dictCreate(&setDictType
,NULL
);
911 server
.sharingpoolsize
= 1024;
912 if (!server
.db
|| !server
.clients
|| !server
.slaves
|| !server
.monitors
|| !server
.el
|| !server
.objfreelist
)
913 oom("server initialization"); /* Fatal OOM */
914 server
.fd
= anetTcpServer(server
.neterr
, server
.port
, server
.bindaddr
);
915 if (server
.fd
== -1) {
916 redisLog(REDIS_WARNING
, "Opening TCP port: %s", server
.neterr
);
919 for (j
= 0; j
< server
.dbnum
; j
++) {
920 server
.db
[j
].dict
= dictCreate(&hashDictType
,NULL
);
921 server
.db
[j
].expires
= dictCreate(&setDictType
,NULL
);
924 server
.cronloops
= 0;
925 server
.bgsaveinprogress
= 0;
926 server
.lastsave
= time(NULL
);
928 server
.usedmemory
= 0;
929 server
.stat_numcommands
= 0;
930 server
.stat_numconnections
= 0;
931 server
.stat_starttime
= time(NULL
);
932 aeCreateTimeEvent(server
.el
, 1000, serverCron
, NULL
, NULL
);
935 /* Empty the whole database */
936 static long long emptyDb() {
938 long long removed
= 0;
940 for (j
= 0; j
< server
.dbnum
; j
++) {
941 removed
+= dictSize(server
.db
[j
].dict
);
942 dictEmpty(server
.db
[j
].dict
);
943 dictEmpty(server
.db
[j
].expires
);
948 static int yesnotoi(char *s
) {
949 if (!strcasecmp(s
,"yes")) return 1;
950 else if (!strcasecmp(s
,"no")) return 0;
954 /* I agree, this is a very rudimental way to load a configuration...
955 will improve later if the config gets more complex */
956 static void loadServerConfig(char *filename
) {
957 FILE *fp
= fopen(filename
,"r");
958 char buf
[REDIS_CONFIGLINE_MAX
+1], *err
= NULL
;
963 redisLog(REDIS_WARNING
,"Fatal error, can't open config file");
966 while(fgets(buf
,REDIS_CONFIGLINE_MAX
+1,fp
) != NULL
) {
972 line
= sdstrim(line
," \t\r\n");
974 /* Skip comments and blank lines*/
975 if (line
[0] == '#' || line
[0] == '\0') {
980 /* Split into arguments */
981 argv
= sdssplitlen(line
,sdslen(line
)," ",1,&argc
);
984 /* Execute config directives */
985 if (!strcasecmp(argv
[0],"timeout") && argc
== 2) {
986 server
.maxidletime
= atoi(argv
[1]);
987 if (server
.maxidletime
< 0) {
988 err
= "Invalid timeout value"; goto loaderr
;
990 } else if (!strcasecmp(argv
[0],"port") && argc
== 2) {
991 server
.port
= atoi(argv
[1]);
992 if (server
.port
< 1 || server
.port
> 65535) {
993 err
= "Invalid port"; goto loaderr
;
995 } else if (!strcasecmp(argv
[0],"bind") && argc
== 2) {
996 server
.bindaddr
= zstrdup(argv
[1]);
997 } else if (!strcasecmp(argv
[0],"save") && argc
== 3) {
998 int seconds
= atoi(argv
[1]);
999 int changes
= atoi(argv
[2]);
1000 if (seconds
< 1 || changes
< 0) {
1001 err
= "Invalid save parameters"; goto loaderr
;
1003 appendServerSaveParams(seconds
,changes
);
1004 } else if (!strcasecmp(argv
[0],"dir") && argc
== 2) {
1005 if (chdir(argv
[1]) == -1) {
1006 redisLog(REDIS_WARNING
,"Can't chdir to '%s': %s",
1007 argv
[1], strerror(errno
));
1010 } else if (!strcasecmp(argv
[0],"loglevel") && argc
== 2) {
1011 if (!strcasecmp(argv
[1],"debug")) server
.verbosity
= REDIS_DEBUG
;
1012 else if (!strcasecmp(argv
[1],"notice")) server
.verbosity
= REDIS_NOTICE
;
1013 else if (!strcasecmp(argv
[1],"warning")) server
.verbosity
= REDIS_WARNING
;
1015 err
= "Invalid log level. Must be one of debug, notice, warning";
1018 } else if (!strcasecmp(argv
[0],"logfile") && argc
== 2) {
1021 server
.logfile
= zstrdup(argv
[1]);
1022 if (!strcasecmp(server
.logfile
,"stdout")) {
1023 zfree(server
.logfile
);
1024 server
.logfile
= NULL
;
1026 if (server
.logfile
) {
1027 /* Test if we are able to open the file. The server will not
1028 * be able to abort just for this problem later... */
1029 fp
= fopen(server
.logfile
,"a");
1031 err
= sdscatprintf(sdsempty(),
1032 "Can't open the log file: %s", strerror(errno
));
1037 } else if (!strcasecmp(argv
[0],"databases") && argc
== 2) {
1038 server
.dbnum
= atoi(argv
[1]);
1039 if (server
.dbnum
< 1) {
1040 err
= "Invalid number of databases"; goto loaderr
;
1042 } else if (!strcasecmp(argv
[0],"maxclients") && argc
== 2) {
1043 server
.maxclients
= atoi(argv
[1]);
1044 } else if (!strcasecmp(argv
[0],"maxmemory") && argc
== 2) {
1045 server
.maxmemory
= atoi(argv
[1]);
1046 } else if (!strcasecmp(argv
[0],"slaveof") && argc
== 3) {
1047 server
.masterhost
= sdsnew(argv
[1]);
1048 server
.masterport
= atoi(argv
[2]);
1049 server
.replstate
= REDIS_REPL_CONNECT
;
1050 } else if (!strcasecmp(argv
[0],"glueoutputbuf") && argc
== 2) {
1051 if ((server
.glueoutputbuf
= yesnotoi(argv
[1])) == -1) {
1052 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1054 } else if (!strcasecmp(argv
[0],"shareobjects") && argc
== 2) {
1055 if ((server
.shareobjects
= yesnotoi(argv
[1])) == -1) {
1056 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1058 } else if (!strcasecmp(argv
[0],"shareobjectspoolsize") && argc
== 2) {
1059 server
.sharingpoolsize
= atoi(argv
[1]);
1060 if (server
.sharingpoolsize
< 1) {
1061 err
= "invalid object sharing pool size"; goto loaderr
;
1063 } else if (!strcasecmp(argv
[0],"daemonize") && argc
== 2) {
1064 if ((server
.daemonize
= yesnotoi(argv
[1])) == -1) {
1065 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1067 } else if (!strcasecmp(argv
[0],"requirepass") && argc
== 2) {
1068 server
.requirepass
= zstrdup(argv
[1]);
1069 } else if (!strcasecmp(argv
[0],"pidfile") && argc
== 2) {
1070 server
.pidfile
= zstrdup(argv
[1]);
1071 } else if (!strcasecmp(argv
[0],"dbfilename") && argc
== 2) {
1072 server
.dbfilename
= zstrdup(argv
[1]);
1074 err
= "Bad directive or wrong number of arguments"; goto loaderr
;
1076 for (j
= 0; j
< argc
; j
++)
1085 fprintf(stderr
, "\n*** FATAL CONFIG FILE ERROR ***\n");
1086 fprintf(stderr
, "Reading the configuration file, at line %d\n", linenum
);
1087 fprintf(stderr
, ">>> '%s'\n", line
);
1088 fprintf(stderr
, "%s\n", err
);
1092 static void freeClientArgv(redisClient
*c
) {
1095 for (j
= 0; j
< c
->argc
; j
++)
1096 decrRefCount(c
->argv
[j
]);
1100 static void freeClient(redisClient
*c
) {
1103 aeDeleteFileEvent(server
.el
,c
->fd
,AE_READABLE
);
1104 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1105 sdsfree(c
->querybuf
);
1106 listRelease(c
->reply
);
1109 ln
= listSearchKey(server
.clients
,c
);
1111 listDelNode(server
.clients
,ln
);
1112 if (c
->flags
& REDIS_SLAVE
) {
1113 if (c
->replstate
== REDIS_REPL_SEND_BULK
&& c
->repldbfd
!= -1)
1115 list
*l
= (c
->flags
& REDIS_MONITOR
) ? server
.monitors
: server
.slaves
;
1116 ln
= listSearchKey(l
,c
);
1120 if (c
->flags
& REDIS_MASTER
) {
1121 server
.master
= NULL
;
1122 server
.replstate
= REDIS_REPL_CONNECT
;
1128 static void glueReplyBuffersIfNeeded(redisClient
*c
) {
1133 listRewind(c
->reply
);
1134 while((ln
= listYield(c
->reply
))) {
1136 totlen
+= sdslen(o
->ptr
);
1137 /* This optimization makes more sense if we don't have to copy
1139 if (totlen
> 1024) return;
1145 listRewind(c
->reply
);
1146 while((ln
= listYield(c
->reply
))) {
1148 memcpy(buf
+copylen
,o
->ptr
,sdslen(o
->ptr
));
1149 copylen
+= sdslen(o
->ptr
);
1150 listDelNode(c
->reply
,ln
);
1152 /* Now the output buffer is empty, add the new single element */
1153 o
= createObject(REDIS_STRING
,sdsnewlen(buf
,totlen
));
1154 if (!listAddNodeTail(c
->reply
,o
)) oom("listAddNodeTail");
1158 static void sendReplyToClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1159 redisClient
*c
= privdata
;
1160 int nwritten
= 0, totwritten
= 0, objlen
;
1163 REDIS_NOTUSED(mask
);
1165 if (server
.glueoutputbuf
&& listLength(c
->reply
) > 1)
1166 glueReplyBuffersIfNeeded(c
);
1167 while(listLength(c
->reply
)) {
1168 o
= listNodeValue(listFirst(c
->reply
));
1169 objlen
= sdslen(o
->ptr
);
1172 listDelNode(c
->reply
,listFirst(c
->reply
));
1176 if (c
->flags
& REDIS_MASTER
) {
1177 nwritten
= objlen
- c
->sentlen
;
1179 nwritten
= write(fd
, ((char*)o
->ptr
)+c
->sentlen
, objlen
- c
->sentlen
);
1180 if (nwritten
<= 0) break;
1182 c
->sentlen
+= nwritten
;
1183 totwritten
+= nwritten
;
1184 /* If we fully sent the object on head go to the next one */
1185 if (c
->sentlen
== objlen
) {
1186 listDelNode(c
->reply
,listFirst(c
->reply
));
1190 if (nwritten
== -1) {
1191 if (errno
== EAGAIN
) {
1194 redisLog(REDIS_DEBUG
,
1195 "Error writing to client: %s", strerror(errno
));
1200 if (totwritten
> 0) c
->lastinteraction
= time(NULL
);
1201 if (listLength(c
->reply
) == 0) {
1203 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1207 static struct redisCommand
*lookupCommand(char *name
) {
1209 while(cmdTable
[j
].name
!= NULL
) {
1210 if (!strcasecmp(name
,cmdTable
[j
].name
)) return &cmdTable
[j
];
1216 /* resetClient prepare the client to process the next command */
1217 static void resetClient(redisClient
*c
) {
1222 /* If this function gets called we already read a whole
1223 * command, argments are in the client argv/argc fields.
1224 * processCommand() execute the command or prepare the
1225 * server for a bulk read from the client.
1227 * If 1 is returned the client is still alive and valid and
1228 * and other operations can be performed by the caller. Otherwise
1229 * if 0 is returned the client was destroied (i.e. after QUIT). */
1230 static int processCommand(redisClient
*c
) {
1231 struct redisCommand
*cmd
;
1234 /* Free some memory if needed (maxmemory setting) */
1235 if (server
.maxmemory
) freeMemoryIfNeeded();
1237 /* The QUIT command is handled as a special case. Normal command
1238 * procs are unable to close the client connection safely */
1239 if (!strcasecmp(c
->argv
[0]->ptr
,"quit")) {
1243 cmd
= lookupCommand(c
->argv
[0]->ptr
);
1245 addReplySds(c
,sdsnew("-ERR unknown command\r\n"));
1248 } else if ((cmd
->arity
> 0 && cmd
->arity
!= c
->argc
) ||
1249 (c
->argc
< -cmd
->arity
)) {
1250 addReplySds(c
,sdsnew("-ERR wrong number of arguments\r\n"));
1253 } else if (server
.maxmemory
&& cmd
->flags
& REDIS_CMD_DENYOOM
&& zmalloc_used_memory() > server
.maxmemory
) {
1254 addReplySds(c
,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1257 } else if (cmd
->flags
& REDIS_CMD_BULK
&& c
->bulklen
== -1) {
1258 int bulklen
= atoi(c
->argv
[c
->argc
-1]->ptr
);
1260 decrRefCount(c
->argv
[c
->argc
-1]);
1261 if (bulklen
< 0 || bulklen
> 1024*1024*1024) {
1263 addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n"));
1268 c
->bulklen
= bulklen
+2; /* add two bytes for CR+LF */
1269 /* It is possible that the bulk read is already in the
1270 * buffer. Check this condition and handle it accordingly */
1271 if ((signed)sdslen(c
->querybuf
) >= c
->bulklen
) {
1272 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
1274 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
1279 /* Let's try to share objects on the command arguments vector */
1280 if (server
.shareobjects
) {
1282 for(j
= 1; j
< c
->argc
; j
++)
1283 c
->argv
[j
] = tryObjectSharing(c
->argv
[j
]);
1285 /* Check if the user is authenticated */
1286 if (server
.requirepass
&& !c
->authenticated
&& cmd
->proc
!= authCommand
) {
1287 addReplySds(c
,sdsnew("-ERR operation not permitted\r\n"));
1292 /* Exec the command */
1293 dirty
= server
.dirty
;
1295 if (server
.dirty
-dirty
!= 0 && listLength(server
.slaves
))
1296 replicationFeedSlaves(server
.slaves
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1297 if (listLength(server
.monitors
))
1298 replicationFeedSlaves(server
.monitors
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1299 server
.stat_numcommands
++;
1301 /* Prepare the client for the next command */
1302 if (c
->flags
& REDIS_CLOSE
) {
1310 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
) {
1314 /* (args*2)+1 is enough room for args, spaces, newlines */
1315 robj
*static_outv
[REDIS_STATIC_ARGS
*2+1];
1317 if (argc
<= REDIS_STATIC_ARGS
) {
1320 outv
= zmalloc(sizeof(robj
*)*(argc
*2+1));
1321 if (!outv
) oom("replicationFeedSlaves");
1324 for (j
= 0; j
< argc
; j
++) {
1325 if (j
!= 0) outv
[outc
++] = shared
.space
;
1326 if ((cmd
->flags
& REDIS_CMD_BULK
) && j
== argc
-1) {
1329 lenobj
= createObject(REDIS_STRING
,
1330 sdscatprintf(sdsempty(),"%d\r\n",sdslen(argv
[j
]->ptr
)));
1331 lenobj
->refcount
= 0;
1332 outv
[outc
++] = lenobj
;
1334 outv
[outc
++] = argv
[j
];
1336 outv
[outc
++] = shared
.crlf
;
1338 /* Increment all the refcounts at start and decrement at end in order to
1339 * be sure to free objects if there is no slave in a replication state
1340 * able to be feed with commands */
1341 for (j
= 0; j
< outc
; j
++) incrRefCount(outv
[j
]);
1343 while((ln
= listYield(slaves
))) {
1344 redisClient
*slave
= ln
->value
;
1346 /* Don't feed slaves that are still waiting for BGSAVE to start */
1347 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) continue;
1349 /* Feed all the other slaves, MONITORs and so on */
1350 if (slave
->slaveseldb
!= dictid
) {
1354 case 0: selectcmd
= shared
.select0
; break;
1355 case 1: selectcmd
= shared
.select1
; break;
1356 case 2: selectcmd
= shared
.select2
; break;
1357 case 3: selectcmd
= shared
.select3
; break;
1358 case 4: selectcmd
= shared
.select4
; break;
1359 case 5: selectcmd
= shared
.select5
; break;
1360 case 6: selectcmd
= shared
.select6
; break;
1361 case 7: selectcmd
= shared
.select7
; break;
1362 case 8: selectcmd
= shared
.select8
; break;
1363 case 9: selectcmd
= shared
.select9
; break;
1365 selectcmd
= createObject(REDIS_STRING
,
1366 sdscatprintf(sdsempty(),"select %d\r\n",dictid
));
1367 selectcmd
->refcount
= 0;
1370 addReply(slave
,selectcmd
);
1371 slave
->slaveseldb
= dictid
;
1373 for (j
= 0; j
< outc
; j
++) addReply(slave
,outv
[j
]);
1375 for (j
= 0; j
< outc
; j
++) decrRefCount(outv
[j
]);
1376 if (outv
!= static_outv
) zfree(outv
);
1379 static void readQueryFromClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1380 redisClient
*c
= (redisClient
*) privdata
;
1381 char buf
[REDIS_IOBUF_LEN
];
1384 REDIS_NOTUSED(mask
);
1386 nread
= read(fd
, buf
, REDIS_IOBUF_LEN
);
1388 if (errno
== EAGAIN
) {
1391 redisLog(REDIS_DEBUG
, "Reading from client: %s",strerror(errno
));
1395 } else if (nread
== 0) {
1396 redisLog(REDIS_DEBUG
, "Client closed connection");
1401 c
->querybuf
= sdscatlen(c
->querybuf
, buf
, nread
);
1402 c
->lastinteraction
= time(NULL
);
1408 if (c
->bulklen
== -1) {
1409 /* Read the first line of the query */
1410 char *p
= strchr(c
->querybuf
,'\n');
1416 query
= c
->querybuf
;
1417 c
->querybuf
= sdsempty();
1418 querylen
= 1+(p
-(query
));
1419 if (sdslen(query
) > querylen
) {
1420 /* leave data after the first line of the query in the buffer */
1421 c
->querybuf
= sdscatlen(c
->querybuf
,query
+querylen
,sdslen(query
)-querylen
);
1423 *p
= '\0'; /* remove "\n" */
1424 if (*(p
-1) == '\r') *(p
-1) = '\0'; /* and "\r" if any */
1425 sdsupdatelen(query
);
1427 /* Now we can split the query in arguments */
1428 if (sdslen(query
) == 0) {
1429 /* Ignore empty query */
1433 argv
= sdssplitlen(query
,sdslen(query
)," ",1,&argc
);
1434 if (argv
== NULL
) oom("sdssplitlen");
1437 if (c
->argv
) zfree(c
->argv
);
1438 c
->argv
= zmalloc(sizeof(robj
*)*argc
);
1439 if (c
->argv
== NULL
) oom("allocating arguments list for client");
1441 for (j
= 0; j
< argc
; j
++) {
1442 if (sdslen(argv
[j
])) {
1443 c
->argv
[c
->argc
] = createObject(REDIS_STRING
,argv
[j
]);
1450 /* Execute the command. If the client is still valid
1451 * after processCommand() return and there is something
1452 * on the query buffer try to process the next command. */
1453 if (processCommand(c
) && sdslen(c
->querybuf
)) goto again
;
1455 } else if (sdslen(c
->querybuf
) >= 1024*32) {
1456 redisLog(REDIS_DEBUG
, "Client protocol error");
1461 /* Bulk read handling. Note that if we are at this point
1462 the client already sent a command terminated with a newline,
1463 we are reading the bulk data that is actually the last
1464 argument of the command. */
1465 int qbl
= sdslen(c
->querybuf
);
1467 if (c
->bulklen
<= qbl
) {
1468 /* Copy everything but the final CRLF as final argument */
1469 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
1471 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
1478 static int selectDb(redisClient
*c
, int id
) {
1479 if (id
< 0 || id
>= server
.dbnum
)
1481 c
->db
= &server
.db
[id
];
1485 static void *dupClientReplyValue(void *o
) {
1486 incrRefCount((robj
*)o
);
1490 static redisClient
*createClient(int fd
) {
1491 redisClient
*c
= zmalloc(sizeof(*c
));
1493 anetNonBlock(NULL
,fd
);
1494 anetTcpNoDelay(NULL
,fd
);
1495 if (!c
) return NULL
;
1498 c
->querybuf
= sdsempty();
1504 c
->lastinteraction
= time(NULL
);
1505 c
->authenticated
= 0;
1506 c
->replstate
= REDIS_REPL_NONE
;
1507 if ((c
->reply
= listCreate()) == NULL
) oom("listCreate");
1508 listSetFreeMethod(c
->reply
,decrRefCount
);
1509 listSetDupMethod(c
->reply
,dupClientReplyValue
);
1510 if (aeCreateFileEvent(server
.el
, c
->fd
, AE_READABLE
,
1511 readQueryFromClient
, c
, NULL
) == AE_ERR
) {
1515 if (!listAddNodeTail(server
.clients
,c
)) oom("listAddNodeTail");
1519 static void addReply(redisClient
*c
, robj
*obj
) {
1520 if (listLength(c
->reply
) == 0 &&
1521 (c
->replstate
== REDIS_REPL_NONE
||
1522 c
->replstate
== REDIS_REPL_ONLINE
) &&
1523 aeCreateFileEvent(server
.el
, c
->fd
, AE_WRITABLE
,
1524 sendReplyToClient
, c
, NULL
) == AE_ERR
) return;
1525 if (!listAddNodeTail(c
->reply
,obj
)) oom("listAddNodeTail");
1529 static void addReplySds(redisClient
*c
, sds s
) {
1530 robj
*o
= createObject(REDIS_STRING
,s
);
1535 static void acceptHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1540 REDIS_NOTUSED(mask
);
1541 REDIS_NOTUSED(privdata
);
1543 cfd
= anetAccept(server
.neterr
, fd
, cip
, &cport
);
1544 if (cfd
== AE_ERR
) {
1545 redisLog(REDIS_DEBUG
,"Accepting client connection: %s", server
.neterr
);
1548 redisLog(REDIS_DEBUG
,"Accepted %s:%d", cip
, cport
);
1549 if ((c
= createClient(cfd
)) == NULL
) {
1550 redisLog(REDIS_WARNING
,"Error allocating resoures for the client");
1551 close(cfd
); /* May be already closed, just ingore errors */
1554 /* If maxclient directive is set and this is one client more... close the
1555 * connection. Note that we create the client instead to check before
1556 * for this condition, since now the socket is already set in nonblocking
1557 * mode and we can send an error for free using the Kernel I/O */
1558 if (server
.maxclients
&& listLength(server
.clients
) > server
.maxclients
) {
1559 char *err
= "-ERR max number of clients reached\r\n";
1561 /* That's a best effort error message, don't check write errors */
1562 (void) write(c
->fd
,err
,strlen(err
));
1566 server
.stat_numconnections
++;
1569 /* ======================= Redis objects implementation ===================== */
1571 static robj
*createObject(int type
, void *ptr
) {
1574 if (listLength(server
.objfreelist
)) {
1575 listNode
*head
= listFirst(server
.objfreelist
);
1576 o
= listNodeValue(head
);
1577 listDelNode(server
.objfreelist
,head
);
1579 o
= zmalloc(sizeof(*o
));
1581 if (!o
) oom("createObject");
1588 static robj
*createStringObject(char *ptr
, size_t len
) {
1589 return createObject(REDIS_STRING
,sdsnewlen(ptr
,len
));
1592 static robj
*createListObject(void) {
1593 list
*l
= listCreate();
1595 if (!l
) oom("listCreate");
1596 listSetFreeMethod(l
,decrRefCount
);
1597 return createObject(REDIS_LIST
,l
);
1600 static robj
*createSetObject(void) {
1601 dict
*d
= dictCreate(&setDictType
,NULL
);
1602 if (!d
) oom("dictCreate");
1603 return createObject(REDIS_SET
,d
);
1606 static void freeStringObject(robj
*o
) {
1610 static void freeListObject(robj
*o
) {
1611 listRelease((list
*) o
->ptr
);
1614 static void freeSetObject(robj
*o
) {
1615 dictRelease((dict
*) o
->ptr
);
1618 static void freeHashObject(robj
*o
) {
1619 dictRelease((dict
*) o
->ptr
);
1622 static void incrRefCount(robj
*o
) {
1624 #ifdef DEBUG_REFCOUNT
1625 if (o
->type
== REDIS_STRING
)
1626 printf("Increment '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
);
1630 static void decrRefCount(void *obj
) {
1633 #ifdef DEBUG_REFCOUNT
1634 if (o
->type
== REDIS_STRING
)
1635 printf("Decrement '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
-1);
1637 if (--(o
->refcount
) == 0) {
1639 case REDIS_STRING
: freeStringObject(o
); break;
1640 case REDIS_LIST
: freeListObject(o
); break;
1641 case REDIS_SET
: freeSetObject(o
); break;
1642 case REDIS_HASH
: freeHashObject(o
); break;
1643 default: assert(0 != 0); break;
1645 if (listLength(server
.objfreelist
) > REDIS_OBJFREELIST_MAX
||
1646 !listAddNodeHead(server
.objfreelist
,o
))
1651 /* Try to share an object against the shared objects pool */
1652 static robj
*tryObjectSharing(robj
*o
) {
1653 struct dictEntry
*de
;
1656 if (o
== NULL
|| server
.shareobjects
== 0) return o
;
1658 assert(o
->type
== REDIS_STRING
);
1659 de
= dictFind(server
.sharingpool
,o
);
1661 robj
*shared
= dictGetEntryKey(de
);
1663 c
= ((unsigned long) dictGetEntryVal(de
))+1;
1664 dictGetEntryVal(de
) = (void*) c
;
1665 incrRefCount(shared
);
1669 /* Here we are using a stream algorihtm: Every time an object is
1670 * shared we increment its count, everytime there is a miss we
1671 * recrement the counter of a random object. If this object reaches
1672 * zero we remove the object and put the current object instead. */
1673 if (dictSize(server
.sharingpool
) >=
1674 server
.sharingpoolsize
) {
1675 de
= dictGetRandomKey(server
.sharingpool
);
1677 c
= ((unsigned long) dictGetEntryVal(de
))-1;
1678 dictGetEntryVal(de
) = (void*) c
;
1680 dictDelete(server
.sharingpool
,de
->key
);
1683 c
= 0; /* If the pool is empty we want to add this object */
1688 retval
= dictAdd(server
.sharingpool
,o
,(void*)1);
1689 assert(retval
== DICT_OK
);
1696 static robj
*lookupKey(redisDb
*db
, robj
*key
) {
1697 dictEntry
*de
= dictFind(db
->dict
,key
);
1698 return de
? dictGetEntryVal(de
) : NULL
;
1701 static robj
*lookupKeyRead(redisDb
*db
, robj
*key
) {
1702 expireIfNeeded(db
,key
);
1703 return lookupKey(db
,key
);
1706 static robj
*lookupKeyWrite(redisDb
*db
, robj
*key
) {
1707 deleteIfVolatile(db
,key
);
1708 return lookupKey(db
,key
);
1711 static int deleteKey(redisDb
*db
, robj
*key
) {
1714 /* We need to protect key from destruction: after the first dictDelete()
1715 * it may happen that 'key' is no longer valid if we don't increment
1716 * it's count. This may happen when we get the object reference directly
1717 * from the hash table with dictRandomKey() or dict iterators */
1719 if (dictSize(db
->expires
)) dictDelete(db
->expires
,key
);
1720 retval
= dictDelete(db
->dict
,key
);
1723 return retval
== DICT_OK
;
1726 /*============================ DB saving/loading ============================ */
1728 static int rdbSaveType(FILE *fp
, unsigned char type
) {
1729 if (fwrite(&type
,1,1,fp
) == 0) return -1;
1733 static int rdbSaveTime(FILE *fp
, time_t t
) {
1734 int32_t t32
= (int32_t) t
;
1735 if (fwrite(&t32
,4,1,fp
) == 0) return -1;
1739 /* check rdbLoadLen() comments for more info */
1740 static int rdbSaveLen(FILE *fp
, uint32_t len
) {
1741 unsigned char buf
[2];
1744 /* Save a 6 bit len */
1745 buf
[0] = (len
&0xFF)|(REDIS_RDB_6BITLEN
<<6);
1746 if (fwrite(buf
,1,1,fp
) == 0) return -1;
1747 } else if (len
< (1<<14)) {
1748 /* Save a 14 bit len */
1749 buf
[0] = ((len
>>8)&0xFF)|(REDIS_RDB_14BITLEN
<<6);
1751 if (fwrite(buf
,2,1,fp
) == 0) return -1;
1753 /* Save a 32 bit len */
1754 buf
[0] = (REDIS_RDB_32BITLEN
<<6);
1755 if (fwrite(buf
,1,1,fp
) == 0) return -1;
1757 if (fwrite(&len
,4,1,fp
) == 0) return -1;
1762 /* String objects in the form "2391" "-100" without any space and with a
1763 * range of values that can fit in an 8, 16 or 32 bit signed value can be
1764 * encoded as integers to save space */
1765 int rdbTryIntegerEncoding(sds s
, unsigned char *enc
) {
1767 char *endptr
, buf
[32];
1769 /* Check if it's possible to encode this value as a number */
1770 value
= strtoll(s
, &endptr
, 10);
1771 if (endptr
[0] != '\0') return 0;
1772 snprintf(buf
,32,"%lld",value
);
1774 /* If the number converted back into a string is not identical
1775 * then it's not possible to encode the string as integer */
1776 if (strlen(buf
) != sdslen(s
) || memcmp(buf
,s
,sdslen(s
))) return 0;
1778 /* Finally check if it fits in our ranges */
1779 if (value
>= -(1<<7) && value
<= (1<<7)-1) {
1780 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT8
;
1781 enc
[1] = value
&0xFF;
1783 } else if (value
>= -(1<<15) && value
<= (1<<15)-1) {
1784 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT16
;
1785 enc
[1] = value
&0xFF;
1786 enc
[2] = (value
>>8)&0xFF;
1788 } else if (value
>= -((long long)1<<31) && value
<= ((long long)1<<31)-1) {
1789 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT32
;
1790 enc
[1] = value
&0xFF;
1791 enc
[2] = (value
>>8)&0xFF;
1792 enc
[3] = (value
>>16)&0xFF;
1793 enc
[4] = (value
>>24)&0xFF;
1800 static int rdbSaveLzfStringObject(FILE *fp
, robj
*obj
) {
1801 unsigned int comprlen
, outlen
;
1805 /* We require at least four bytes compression for this to be worth it */
1806 outlen
= sdslen(obj
->ptr
)-4;
1807 if (outlen
<= 0) return 0;
1808 if ((out
= zmalloc(outlen
+1)) == NULL
) return 0;
1809 comprlen
= lzf_compress(obj
->ptr
, sdslen(obj
->ptr
), out
, outlen
);
1810 if (comprlen
== 0) {
1814 /* Data compressed! Let's save it on disk */
1815 byte
= (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_LZF
;
1816 if (fwrite(&byte
,1,1,fp
) == 0) goto writeerr
;
1817 if (rdbSaveLen(fp
,comprlen
) == -1) goto writeerr
;
1818 if (rdbSaveLen(fp
,sdslen(obj
->ptr
)) == -1) goto writeerr
;
1819 if (fwrite(out
,comprlen
,1,fp
) == 0) goto writeerr
;
1828 /* Save a string objet as [len][data] on disk. If the object is a string
1829 * representation of an integer value we try to safe it in a special form */
1830 static int rdbSaveStringObject(FILE *fp
, robj
*obj
) {
1831 size_t len
= sdslen(obj
->ptr
);
1834 /* Try integer encoding */
1836 unsigned char buf
[5];
1837 if ((enclen
= rdbTryIntegerEncoding(obj
->ptr
,buf
)) > 0) {
1838 if (fwrite(buf
,enclen
,1,fp
) == 0) return -1;
1843 /* Try LZF compression - under 20 bytes it's unable to compress even
1844 * aaaaaaaaaaaaaaaaaa so skip it */
1845 if (1 && len
> 20) {
1848 retval
= rdbSaveLzfStringObject(fp
,obj
);
1849 if (retval
== -1) return -1;
1850 if (retval
> 0) return 0;
1851 /* retval == 0 means data can't be compressed, save the old way */
1854 /* Store verbatim */
1855 if (rdbSaveLen(fp
,len
) == -1) return -1;
1856 if (len
&& fwrite(obj
->ptr
,len
,1,fp
) == 0) return -1;
1860 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
1861 static int rdbSave(char *filename
) {
1862 dictIterator
*di
= NULL
;
1867 time_t now
= time(NULL
);
1869 snprintf(tmpfile
,256,"temp-%d.%ld.rdb",(int)time(NULL
),(long int)random());
1870 fp
= fopen(tmpfile
,"w");
1872 redisLog(REDIS_WARNING
, "Failed saving the DB: %s", strerror(errno
));
1875 if (fwrite("REDIS0001",9,1,fp
) == 0) goto werr
;
1876 for (j
= 0; j
< server
.dbnum
; j
++) {
1877 redisDb
*db
= server
.db
+j
;
1879 if (dictSize(d
) == 0) continue;
1880 di
= dictGetIterator(d
);
1886 /* Write the SELECT DB opcode */
1887 if (rdbSaveType(fp
,REDIS_SELECTDB
) == -1) goto werr
;
1888 if (rdbSaveLen(fp
,j
) == -1) goto werr
;
1890 /* Iterate this DB writing every entry */
1891 while((de
= dictNext(di
)) != NULL
) {
1892 robj
*key
= dictGetEntryKey(de
);
1893 robj
*o
= dictGetEntryVal(de
);
1894 time_t expiretime
= getExpire(db
,key
);
1896 /* Save the expire time */
1897 if (expiretime
!= -1) {
1898 /* If this key is already expired skip it */
1899 if (expiretime
< now
) continue;
1900 if (rdbSaveType(fp
,REDIS_EXPIRETIME
) == -1) goto werr
;
1901 if (rdbSaveTime(fp
,expiretime
) == -1) goto werr
;
1903 /* Save the key and associated value */
1904 if (rdbSaveType(fp
,o
->type
) == -1) goto werr
;
1905 if (rdbSaveStringObject(fp
,key
) == -1) goto werr
;
1906 if (o
->type
== REDIS_STRING
) {
1907 /* Save a string value */
1908 if (rdbSaveStringObject(fp
,o
) == -1) goto werr
;
1909 } else if (o
->type
== REDIS_LIST
) {
1910 /* Save a list value */
1911 list
*list
= o
->ptr
;
1915 if (rdbSaveLen(fp
,listLength(list
)) == -1) goto werr
;
1916 while((ln
= listYield(list
))) {
1917 robj
*eleobj
= listNodeValue(ln
);
1919 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
1921 } else if (o
->type
== REDIS_SET
) {
1922 /* Save a set value */
1924 dictIterator
*di
= dictGetIterator(set
);
1927 if (!set
) oom("dictGetIteraotr");
1928 if (rdbSaveLen(fp
,dictSize(set
)) == -1) goto werr
;
1929 while((de
= dictNext(di
)) != NULL
) {
1930 robj
*eleobj
= dictGetEntryKey(de
);
1932 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
1934 dictReleaseIterator(di
);
1939 dictReleaseIterator(di
);
1942 if (rdbSaveType(fp
,REDIS_EOF
) == -1) goto werr
;
1944 /* Make sure data will not remain on the OS's output buffers */
1949 /* Use RENAME to make sure the DB file is changed atomically only
1950 * if the generate DB file is ok. */
1951 if (rename(tmpfile
,filename
) == -1) {
1952 redisLog(REDIS_WARNING
,"Error moving temp DB file on the final destionation: %s", strerror(errno
));
1956 redisLog(REDIS_NOTICE
,"DB saved on disk");
1958 server
.lastsave
= time(NULL
);
1964 redisLog(REDIS_WARNING
,"Write error saving DB on disk: %s", strerror(errno
));
1965 if (di
) dictReleaseIterator(di
);
1969 static int rdbSaveBackground(char *filename
) {
1972 if (server
.bgsaveinprogress
) return REDIS_ERR
;
1973 if ((childpid
= fork()) == 0) {
1976 if (rdbSave(filename
) == REDIS_OK
) {
1983 if (childpid
== -1) {
1984 redisLog(REDIS_WARNING
,"Can't save in background: fork: %s",
1988 redisLog(REDIS_NOTICE
,"Background saving started by pid %d",childpid
);
1989 server
.bgsaveinprogress
= 1;
1992 return REDIS_OK
; /* unreached */
1995 static int rdbLoadType(FILE *fp
) {
1997 if (fread(&type
,1,1,fp
) == 0) return -1;
2001 static time_t rdbLoadTime(FILE *fp
) {
2003 if (fread(&t32
,4,1,fp
) == 0) return -1;
2004 return (time_t) t32
;
2007 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2008 * of this file for a description of how this are stored on disk.
2010 * isencoded is set to 1 if the readed length is not actually a length but
2011 * an "encoding type", check the above comments for more info */
2012 static uint32_t rdbLoadLen(FILE *fp
, int rdbver
, int *isencoded
) {
2013 unsigned char buf
[2];
2016 if (isencoded
) *isencoded
= 0;
2018 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
2023 if (fread(buf
,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2024 type
= (buf
[0]&0xC0)>>6;
2025 if (type
== REDIS_RDB_6BITLEN
) {
2026 /* Read a 6 bit len */
2028 } else if (type
== REDIS_RDB_ENCVAL
) {
2029 /* Read a 6 bit len encoding type */
2030 if (isencoded
) *isencoded
= 1;
2032 } else if (type
== REDIS_RDB_14BITLEN
) {
2033 /* Read a 14 bit len */
2034 if (fread(buf
+1,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2035 return ((buf
[0]&0x3F)<<8)|buf
[1];
2037 /* Read a 32 bit len */
2038 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
2044 static robj
*rdbLoadIntegerObject(FILE *fp
, int enctype
) {
2045 unsigned char enc
[4];
2048 if (enctype
== REDIS_RDB_ENC_INT8
) {
2049 if (fread(enc
,1,1,fp
) == 0) return NULL
;
2050 val
= (signed char)enc
[0];
2051 } else if (enctype
== REDIS_RDB_ENC_INT16
) {
2053 if (fread(enc
,2,1,fp
) == 0) return NULL
;
2054 v
= enc
[0]|(enc
[1]<<8);
2056 } else if (enctype
== REDIS_RDB_ENC_INT32
) {
2058 if (fread(enc
,4,1,fp
) == 0) return NULL
;
2059 v
= enc
[0]|(enc
[1]<<8)|(enc
[2]<<16)|(enc
[3]<<24);
2062 val
= 0; /* anti-warning */
2065 return createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",val
));
2068 static robj
*rdbLoadLzfStringObject(FILE*fp
, int rdbver
) {
2069 unsigned int len
, clen
;
2070 unsigned char *c
= NULL
;
2073 if ((clen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2074 if ((len
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2075 if ((c
= zmalloc(clen
)) == NULL
) goto err
;
2076 if ((val
= sdsnewlen(NULL
,len
)) == NULL
) goto err
;
2077 if (fread(c
,clen
,1,fp
) == 0) goto err
;
2078 if (lzf_decompress(c
,clen
,val
,len
) == 0) goto err
;
2080 return createObject(REDIS_STRING
,val
);
2087 static robj
*rdbLoadStringObject(FILE*fp
, int rdbver
) {
2092 len
= rdbLoadLen(fp
,rdbver
,&isencoded
);
2095 case REDIS_RDB_ENC_INT8
:
2096 case REDIS_RDB_ENC_INT16
:
2097 case REDIS_RDB_ENC_INT32
:
2098 return tryObjectSharing(rdbLoadIntegerObject(fp
,len
));
2099 case REDIS_RDB_ENC_LZF
:
2100 return tryObjectSharing(rdbLoadLzfStringObject(fp
,rdbver
));
2106 if (len
== REDIS_RDB_LENERR
) return NULL
;
2107 val
= sdsnewlen(NULL
,len
);
2108 if (len
&& fread(val
,len
,1,fp
) == 0) {
2112 return tryObjectSharing(createObject(REDIS_STRING
,val
));
2115 static int rdbLoad(char *filename
) {
2117 robj
*keyobj
= NULL
;
2119 int type
, retval
, rdbver
;
2120 dict
*d
= server
.db
[0].dict
;
2121 redisDb
*db
= server
.db
+0;
2123 time_t expiretime
= -1, now
= time(NULL
);
2125 fp
= fopen(filename
,"r");
2126 if (!fp
) return REDIS_ERR
;
2127 if (fread(buf
,9,1,fp
) == 0) goto eoferr
;
2129 if (memcmp(buf
,"REDIS",5) != 0) {
2131 redisLog(REDIS_WARNING
,"Wrong signature trying to load DB from file");
2134 rdbver
= atoi(buf
+5);
2137 redisLog(REDIS_WARNING
,"Can't handle RDB format version %d",rdbver
);
2144 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
2145 if (type
== REDIS_EXPIRETIME
) {
2146 if ((expiretime
= rdbLoadTime(fp
)) == -1) goto eoferr
;
2147 /* We read the time so we need to read the object type again */
2148 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
2150 if (type
== REDIS_EOF
) break;
2151 /* Handle SELECT DB opcode as a special case */
2152 if (type
== REDIS_SELECTDB
) {
2153 if ((dbid
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2155 if (dbid
>= (unsigned)server
.dbnum
) {
2156 redisLog(REDIS_WARNING
,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server
.dbnum
);
2159 db
= server
.db
+dbid
;
2164 if ((keyobj
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2166 if (type
== REDIS_STRING
) {
2167 /* Read string value */
2168 if ((o
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2169 } else if (type
== REDIS_LIST
|| type
== REDIS_SET
) {
2170 /* Read list/set value */
2173 if ((listlen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2175 o
= (type
== REDIS_LIST
) ? createListObject() : createSetObject();
2176 /* Load every single element of the list/set */
2180 if ((ele
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2181 if (type
== REDIS_LIST
) {
2182 if (!listAddNodeTail((list
*)o
->ptr
,ele
))
2183 oom("listAddNodeTail");
2185 if (dictAdd((dict
*)o
->ptr
,ele
,NULL
) == DICT_ERR
)
2192 /* Add the new object in the hash table */
2193 retval
= dictAdd(d
,keyobj
,o
);
2194 if (retval
== DICT_ERR
) {
2195 redisLog(REDIS_WARNING
,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj
->ptr
);
2198 /* Set the expire time if needed */
2199 if (expiretime
!= -1) {
2200 setExpire(db
,keyobj
,expiretime
);
2201 /* Delete this key if already expired */
2202 if (expiretime
< now
) deleteKey(db
,keyobj
);
2210 eoferr
: /* unexpected end of file is handled here with a fatal exit */
2211 if (keyobj
) decrRefCount(keyobj
);
2212 redisLog(REDIS_WARNING
,"Short read or OOM loading DB. Unrecoverable error, exiting now.");
2214 return REDIS_ERR
; /* Just to avoid warning */
2217 /*================================== Commands =============================== */
2219 static void authCommand(redisClient
*c
) {
2220 if (!server
.requirepass
|| !strcmp(c
->argv
[1]->ptr
, server
.requirepass
)) {
2221 c
->authenticated
= 1;
2222 addReply(c
,shared
.ok
);
2224 c
->authenticated
= 0;
2225 addReply(c
,shared
.err
);
2229 static void pingCommand(redisClient
*c
) {
2230 addReply(c
,shared
.pong
);
2233 static void echoCommand(redisClient
*c
) {
2234 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",
2235 (int)sdslen(c
->argv
[1]->ptr
)));
2236 addReply(c
,c
->argv
[1]);
2237 addReply(c
,shared
.crlf
);
2240 /*=================================== Strings =============================== */
2242 static void setGenericCommand(redisClient
*c
, int nx
) {
2245 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2246 if (retval
== DICT_ERR
) {
2248 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2249 incrRefCount(c
->argv
[2]);
2251 addReply(c
,shared
.czero
);
2255 incrRefCount(c
->argv
[1]);
2256 incrRefCount(c
->argv
[2]);
2259 removeExpire(c
->db
,c
->argv
[1]);
2260 addReply(c
, nx
? shared
.cone
: shared
.ok
);
2263 static void setCommand(redisClient
*c
) {
2264 setGenericCommand(c
,0);
2267 static void setnxCommand(redisClient
*c
) {
2268 setGenericCommand(c
,1);
2271 static void getCommand(redisClient
*c
) {
2272 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2275 addReply(c
,shared
.nullbulk
);
2277 if (o
->type
!= REDIS_STRING
) {
2278 addReply(c
,shared
.wrongtypeerr
);
2280 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(o
->ptr
)));
2282 addReply(c
,shared
.crlf
);
2287 static void getSetCommand(redisClient
*c
) {
2289 if (dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]) == DICT_ERR
) {
2290 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2292 incrRefCount(c
->argv
[1]);
2294 incrRefCount(c
->argv
[2]);
2296 removeExpire(c
->db
,c
->argv
[1]);
2299 static void mgetCommand(redisClient
*c
) {
2302 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",c
->argc
-1));
2303 for (j
= 1; j
< c
->argc
; j
++) {
2304 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[j
]);
2306 addReply(c
,shared
.nullbulk
);
2308 if (o
->type
!= REDIS_STRING
) {
2309 addReply(c
,shared
.nullbulk
);
2311 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(o
->ptr
)));
2313 addReply(c
,shared
.crlf
);
2319 static void incrDecrCommand(redisClient
*c
, long long incr
) {
2324 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2328 if (o
->type
!= REDIS_STRING
) {
2333 value
= strtoll(o
->ptr
, &eptr
, 10);
2338 o
= createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",value
));
2339 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],o
);
2340 if (retval
== DICT_ERR
) {
2341 dictReplace(c
->db
->dict
,c
->argv
[1],o
);
2342 removeExpire(c
->db
,c
->argv
[1]);
2344 incrRefCount(c
->argv
[1]);
2347 addReply(c
,shared
.colon
);
2349 addReply(c
,shared
.crlf
);
2352 static void incrCommand(redisClient
*c
) {
2353 incrDecrCommand(c
,1);
2356 static void decrCommand(redisClient
*c
) {
2357 incrDecrCommand(c
,-1);
2360 static void incrbyCommand(redisClient
*c
) {
2361 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
2362 incrDecrCommand(c
,incr
);
2365 static void decrbyCommand(redisClient
*c
) {
2366 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
2367 incrDecrCommand(c
,-incr
);
2370 /* ========================= Type agnostic commands ========================= */
2372 static void delCommand(redisClient
*c
) {
2375 for (j
= 1; j
< c
->argc
; j
++) {
2376 if (deleteKey(c
->db
,c
->argv
[j
])) {
2383 addReply(c
,shared
.czero
);
2386 addReply(c
,shared
.cone
);
2389 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",deleted
));
2394 static void existsCommand(redisClient
*c
) {
2395 addReply(c
,lookupKeyRead(c
->db
,c
->argv
[1]) ? shared
.cone
: shared
.czero
);
2398 static void selectCommand(redisClient
*c
) {
2399 int id
= atoi(c
->argv
[1]->ptr
);
2401 if (selectDb(c
,id
) == REDIS_ERR
) {
2402 addReplySds(c
,sdsnew("-ERR invalid DB index\r\n"));
2404 addReply(c
,shared
.ok
);
2408 static void randomkeyCommand(redisClient
*c
) {
2412 de
= dictGetRandomKey(c
->db
->dict
);
2413 if (!de
|| expireIfNeeded(c
->db
,dictGetEntryKey(de
)) == 0) break;
2416 addReply(c
,shared
.plus
);
2417 addReply(c
,shared
.crlf
);
2419 addReply(c
,shared
.plus
);
2420 addReply(c
,dictGetEntryKey(de
));
2421 addReply(c
,shared
.crlf
);
2425 static void keysCommand(redisClient
*c
) {
2428 sds pattern
= c
->argv
[1]->ptr
;
2429 int plen
= sdslen(pattern
);
2430 int numkeys
= 0, keyslen
= 0;
2431 robj
*lenobj
= createObject(REDIS_STRING
,NULL
);
2433 di
= dictGetIterator(c
->db
->dict
);
2434 if (!di
) oom("dictGetIterator");
2436 decrRefCount(lenobj
);
2437 while((de
= dictNext(di
)) != NULL
) {
2438 robj
*keyobj
= dictGetEntryKey(de
);
2440 sds key
= keyobj
->ptr
;
2441 if ((pattern
[0] == '*' && pattern
[1] == '\0') ||
2442 stringmatchlen(pattern
,plen
,key
,sdslen(key
),0)) {
2443 if (expireIfNeeded(c
->db
,keyobj
) == 0) {
2445 addReply(c
,shared
.space
);
2448 keyslen
+= sdslen(key
);
2452 dictReleaseIterator(di
);
2453 lenobj
->ptr
= sdscatprintf(sdsempty(),"$%lu\r\n",keyslen
+(numkeys
? (numkeys
-1) : 0));
2454 addReply(c
,shared
.crlf
);
2457 static void dbsizeCommand(redisClient
*c
) {
2459 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c
->db
->dict
)));
2462 static void lastsaveCommand(redisClient
*c
) {
2464 sdscatprintf(sdsempty(),":%lu\r\n",server
.lastsave
));
2467 static void typeCommand(redisClient
*c
) {
2471 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2476 case REDIS_STRING
: type
= "+string"; break;
2477 case REDIS_LIST
: type
= "+list"; break;
2478 case REDIS_SET
: type
= "+set"; break;
2479 default: type
= "unknown"; break;
2482 addReplySds(c
,sdsnew(type
));
2483 addReply(c
,shared
.crlf
);
2486 static void saveCommand(redisClient
*c
) {
2487 if (server
.bgsaveinprogress
) {
2488 addReplySds(c
,sdsnew("-ERR background save in progress\r\n"));
2491 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
2492 addReply(c
,shared
.ok
);
2494 addReply(c
,shared
.err
);
2498 static void bgsaveCommand(redisClient
*c
) {
2499 if (server
.bgsaveinprogress
) {
2500 addReplySds(c
,sdsnew("-ERR background save already in progress\r\n"));
2503 if (rdbSaveBackground(server
.dbfilename
) == REDIS_OK
) {
2504 addReply(c
,shared
.ok
);
2506 addReply(c
,shared
.err
);
2510 static void shutdownCommand(redisClient
*c
) {
2511 redisLog(REDIS_WARNING
,"User requested shutdown, saving DB...");
2512 /* XXX: TODO kill the child if there is a bgsave in progress */
2513 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
2514 if (server
.daemonize
) {
2515 unlink(server
.pidfile
);
2517 redisLog(REDIS_WARNING
,"%zu bytes used at exit",zmalloc_used_memory());
2518 redisLog(REDIS_WARNING
,"Server exit now, bye bye...");
2521 redisLog(REDIS_WARNING
,"Error trying to save the DB, can't exit");
2522 addReplySds(c
,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
2526 static void renameGenericCommand(redisClient
*c
, int nx
) {
2529 /* To use the same key as src and dst is probably an error */
2530 if (sdscmp(c
->argv
[1]->ptr
,c
->argv
[2]->ptr
) == 0) {
2531 addReply(c
,shared
.sameobjecterr
);
2535 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2537 addReply(c
,shared
.nokeyerr
);
2541 deleteIfVolatile(c
->db
,c
->argv
[2]);
2542 if (dictAdd(c
->db
->dict
,c
->argv
[2],o
) == DICT_ERR
) {
2545 addReply(c
,shared
.czero
);
2548 dictReplace(c
->db
->dict
,c
->argv
[2],o
);
2550 incrRefCount(c
->argv
[2]);
2552 deleteKey(c
->db
,c
->argv
[1]);
2554 addReply(c
,nx
? shared
.cone
: shared
.ok
);
2557 static void renameCommand(redisClient
*c
) {
2558 renameGenericCommand(c
,0);
2561 static void renamenxCommand(redisClient
*c
) {
2562 renameGenericCommand(c
,1);
2565 static void moveCommand(redisClient
*c
) {
2570 /* Obtain source and target DB pointers */
2573 if (selectDb(c
,atoi(c
->argv
[2]->ptr
)) == REDIS_ERR
) {
2574 addReply(c
,shared
.outofrangeerr
);
2578 selectDb(c
,srcid
); /* Back to the source DB */
2580 /* If the user is moving using as target the same
2581 * DB as the source DB it is probably an error. */
2583 addReply(c
,shared
.sameobjecterr
);
2587 /* Check if the element exists and get a reference */
2588 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2590 addReply(c
,shared
.czero
);
2594 /* Try to add the element to the target DB */
2595 deleteIfVolatile(dst
,c
->argv
[1]);
2596 if (dictAdd(dst
->dict
,c
->argv
[1],o
) == DICT_ERR
) {
2597 addReply(c
,shared
.czero
);
2600 incrRefCount(c
->argv
[1]);
2603 /* OK! key moved, free the entry in the source DB */
2604 deleteKey(src
,c
->argv
[1]);
2606 addReply(c
,shared
.cone
);
2609 /* =================================== Lists ================================ */
2610 static void pushGenericCommand(redisClient
*c
, int where
) {
2614 lobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2616 lobj
= createListObject();
2618 if (where
== REDIS_HEAD
) {
2619 if (!listAddNodeHead(list
,c
->argv
[2])) oom("listAddNodeHead");
2621 if (!listAddNodeTail(list
,c
->argv
[2])) oom("listAddNodeTail");
2623 dictAdd(c
->db
->dict
,c
->argv
[1],lobj
);
2624 incrRefCount(c
->argv
[1]);
2625 incrRefCount(c
->argv
[2]);
2627 if (lobj
->type
!= REDIS_LIST
) {
2628 addReply(c
,shared
.wrongtypeerr
);
2632 if (where
== REDIS_HEAD
) {
2633 if (!listAddNodeHead(list
,c
->argv
[2])) oom("listAddNodeHead");
2635 if (!listAddNodeTail(list
,c
->argv
[2])) oom("listAddNodeTail");
2637 incrRefCount(c
->argv
[2]);
2640 addReply(c
,shared
.ok
);
2643 static void lpushCommand(redisClient
*c
) {
2644 pushGenericCommand(c
,REDIS_HEAD
);
2647 static void rpushCommand(redisClient
*c
) {
2648 pushGenericCommand(c
,REDIS_TAIL
);
2651 static void llenCommand(redisClient
*c
) {
2655 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2657 addReply(c
,shared
.czero
);
2660 if (o
->type
!= REDIS_LIST
) {
2661 addReply(c
,shared
.wrongtypeerr
);
2664 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",listLength(l
)));
2669 static void lindexCommand(redisClient
*c
) {
2671 int index
= atoi(c
->argv
[2]->ptr
);
2673 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2675 addReply(c
,shared
.nullbulk
);
2677 if (o
->type
!= REDIS_LIST
) {
2678 addReply(c
,shared
.wrongtypeerr
);
2680 list
*list
= o
->ptr
;
2683 ln
= listIndex(list
, index
);
2685 addReply(c
,shared
.nullbulk
);
2687 robj
*ele
= listNodeValue(ln
);
2688 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele
->ptr
)));
2690 addReply(c
,shared
.crlf
);
2696 static void lsetCommand(redisClient
*c
) {
2698 int index
= atoi(c
->argv
[2]->ptr
);
2700 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2702 addReply(c
,shared
.nokeyerr
);
2704 if (o
->type
!= REDIS_LIST
) {
2705 addReply(c
,shared
.wrongtypeerr
);
2707 list
*list
= o
->ptr
;
2710 ln
= listIndex(list
, index
);
2712 addReply(c
,shared
.outofrangeerr
);
2714 robj
*ele
= listNodeValue(ln
);
2717 listNodeValue(ln
) = c
->argv
[3];
2718 incrRefCount(c
->argv
[3]);
2719 addReply(c
,shared
.ok
);
2726 static void popGenericCommand(redisClient
*c
, int where
) {
2729 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2731 addReply(c
,shared
.nullbulk
);
2733 if (o
->type
!= REDIS_LIST
) {
2734 addReply(c
,shared
.wrongtypeerr
);
2736 list
*list
= o
->ptr
;
2739 if (where
== REDIS_HEAD
)
2740 ln
= listFirst(list
);
2742 ln
= listLast(list
);
2745 addReply(c
,shared
.nullbulk
);
2747 robj
*ele
= listNodeValue(ln
);
2748 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele
->ptr
)));
2750 addReply(c
,shared
.crlf
);
2751 listDelNode(list
,ln
);
2758 static void lpopCommand(redisClient
*c
) {
2759 popGenericCommand(c
,REDIS_HEAD
);
2762 static void rpopCommand(redisClient
*c
) {
2763 popGenericCommand(c
,REDIS_TAIL
);
2766 static void lrangeCommand(redisClient
*c
) {
2768 int start
= atoi(c
->argv
[2]->ptr
);
2769 int end
= atoi(c
->argv
[3]->ptr
);
2771 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2773 addReply(c
,shared
.nullmultibulk
);
2775 if (o
->type
!= REDIS_LIST
) {
2776 addReply(c
,shared
.wrongtypeerr
);
2778 list
*list
= o
->ptr
;
2780 int llen
= listLength(list
);
2784 /* convert negative indexes */
2785 if (start
< 0) start
= llen
+start
;
2786 if (end
< 0) end
= llen
+end
;
2787 if (start
< 0) start
= 0;
2788 if (end
< 0) end
= 0;
2790 /* indexes sanity checks */
2791 if (start
> end
|| start
>= llen
) {
2792 /* Out of range start or start > end result in empty list */
2793 addReply(c
,shared
.emptymultibulk
);
2796 if (end
>= llen
) end
= llen
-1;
2797 rangelen
= (end
-start
)+1;
2799 /* Return the result in form of a multi-bulk reply */
2800 ln
= listIndex(list
, start
);
2801 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",rangelen
));
2802 for (j
= 0; j
< rangelen
; j
++) {
2803 ele
= listNodeValue(ln
);
2804 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele
->ptr
)));
2806 addReply(c
,shared
.crlf
);
2813 static void ltrimCommand(redisClient
*c
) {
2815 int start
= atoi(c
->argv
[2]->ptr
);
2816 int end
= atoi(c
->argv
[3]->ptr
);
2818 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2820 addReply(c
,shared
.nokeyerr
);
2822 if (o
->type
!= REDIS_LIST
) {
2823 addReply(c
,shared
.wrongtypeerr
);
2825 list
*list
= o
->ptr
;
2827 int llen
= listLength(list
);
2828 int j
, ltrim
, rtrim
;
2830 /* convert negative indexes */
2831 if (start
< 0) start
= llen
+start
;
2832 if (end
< 0) end
= llen
+end
;
2833 if (start
< 0) start
= 0;
2834 if (end
< 0) end
= 0;
2836 /* indexes sanity checks */
2837 if (start
> end
|| start
>= llen
) {
2838 /* Out of range start or start > end result in empty list */
2842 if (end
>= llen
) end
= llen
-1;
2847 /* Remove list elements to perform the trim */
2848 for (j
= 0; j
< ltrim
; j
++) {
2849 ln
= listFirst(list
);
2850 listDelNode(list
,ln
);
2852 for (j
= 0; j
< rtrim
; j
++) {
2853 ln
= listLast(list
);
2854 listDelNode(list
,ln
);
2856 addReply(c
,shared
.ok
);
2862 static void lremCommand(redisClient
*c
) {
2865 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2867 addReply(c
,shared
.nokeyerr
);
2869 if (o
->type
!= REDIS_LIST
) {
2870 addReply(c
,shared
.wrongtypeerr
);
2872 list
*list
= o
->ptr
;
2873 listNode
*ln
, *next
;
2874 int toremove
= atoi(c
->argv
[2]->ptr
);
2879 toremove
= -toremove
;
2882 ln
= fromtail
? list
->tail
: list
->head
;
2884 robj
*ele
= listNodeValue(ln
);
2886 next
= fromtail
? ln
->prev
: ln
->next
;
2887 if (sdscmp(ele
->ptr
,c
->argv
[3]->ptr
) == 0) {
2888 listDelNode(list
,ln
);
2891 if (toremove
&& removed
== toremove
) break;
2895 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",removed
));
2900 /* ==================================== Sets ================================ */
2902 static void saddCommand(redisClient
*c
) {
2905 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2907 set
= createSetObject();
2908 dictAdd(c
->db
->dict
,c
->argv
[1],set
);
2909 incrRefCount(c
->argv
[1]);
2911 if (set
->type
!= REDIS_SET
) {
2912 addReply(c
,shared
.wrongtypeerr
);
2916 if (dictAdd(set
->ptr
,c
->argv
[2],NULL
) == DICT_OK
) {
2917 incrRefCount(c
->argv
[2]);
2919 addReply(c
,shared
.cone
);
2921 addReply(c
,shared
.czero
);
2925 static void sremCommand(redisClient
*c
) {
2928 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2930 addReply(c
,shared
.czero
);
2932 if (set
->type
!= REDIS_SET
) {
2933 addReply(c
,shared
.wrongtypeerr
);
2936 if (dictDelete(set
->ptr
,c
->argv
[2]) == DICT_OK
) {
2938 addReply(c
,shared
.cone
);
2940 addReply(c
,shared
.czero
);
2945 static void smoveCommand(redisClient
*c
) {
2946 robj
*srcset
, *dstset
;
2948 srcset
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2949 dstset
= lookupKeyWrite(c
->db
,c
->argv
[2]);
2951 /* If the source key does not exist return 0, if it's of the wrong type
2953 if (srcset
== NULL
|| srcset
->type
!= REDIS_SET
) {
2954 addReply(c
, srcset
? shared
.wrongtypeerr
: shared
.czero
);
2957 /* Error if the destination key is not a set as well */
2958 if (dstset
&& dstset
->type
!= REDIS_SET
) {
2959 addReply(c
,shared
.wrongtypeerr
);
2962 /* Remove the element from the source set */
2963 if (dictDelete(srcset
->ptr
,c
->argv
[3]) == DICT_ERR
) {
2964 /* Key not found in the src set! return zero */
2965 addReply(c
,shared
.czero
);
2969 /* Add the element to the destination set */
2971 dstset
= createSetObject();
2972 dictAdd(c
->db
->dict
,c
->argv
[2],dstset
);
2973 incrRefCount(c
->argv
[2]);
2975 if (dictAdd(dstset
->ptr
,c
->argv
[3],NULL
) == DICT_OK
)
2976 incrRefCount(c
->argv
[3]);
2977 addReply(c
,shared
.cone
);
2980 static void sismemberCommand(redisClient
*c
) {
2983 set
= lookupKeyRead(c
->db
,c
->argv
[1]);
2985 addReply(c
,shared
.czero
);
2987 if (set
->type
!= REDIS_SET
) {
2988 addReply(c
,shared
.wrongtypeerr
);
2991 if (dictFind(set
->ptr
,c
->argv
[2]))
2992 addReply(c
,shared
.cone
);
2994 addReply(c
,shared
.czero
);
2998 static void scardCommand(redisClient
*c
) {
3002 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3004 addReply(c
,shared
.czero
);
3007 if (o
->type
!= REDIS_SET
) {
3008 addReply(c
,shared
.wrongtypeerr
);
3011 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",
3017 static int qsortCompareSetsByCardinality(const void *s1
, const void *s2
) {
3018 dict
**d1
= (void*) s1
, **d2
= (void*) s2
;
3020 return dictSize(*d1
)-dictSize(*d2
);
3023 static void sinterGenericCommand(redisClient
*c
, robj
**setskeys
, int setsnum
, robj
*dstkey
) {
3024 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
3027 robj
*lenobj
= NULL
, *dstset
= NULL
;
3028 int j
, cardinality
= 0;
3030 if (!dv
) oom("sinterGenericCommand");
3031 for (j
= 0; j
< setsnum
; j
++) {
3035 lookupKeyWrite(c
->db
,setskeys
[j
]) :
3036 lookupKeyRead(c
->db
,setskeys
[j
]);
3040 deleteKey(c
->db
,dstkey
);
3041 addReply(c
,shared
.ok
);
3043 addReply(c
,shared
.nullmultibulk
);
3047 if (setobj
->type
!= REDIS_SET
) {
3049 addReply(c
,shared
.wrongtypeerr
);
3052 dv
[j
] = setobj
->ptr
;
3054 /* Sort sets from the smallest to largest, this will improve our
3055 * algorithm's performace */
3056 qsort(dv
,setsnum
,sizeof(dict
*),qsortCompareSetsByCardinality
);
3058 /* The first thing we should output is the total number of elements...
3059 * since this is a multi-bulk write, but at this stage we don't know
3060 * the intersection set size, so we use a trick, append an empty object
3061 * to the output list and save the pointer to later modify it with the
3064 lenobj
= createObject(REDIS_STRING
,NULL
);
3066 decrRefCount(lenobj
);
3068 /* If we have a target key where to store the resulting set
3069 * create this key with an empty set inside */
3070 dstset
= createSetObject();
3073 /* Iterate all the elements of the first (smallest) set, and test
3074 * the element against all the other sets, if at least one set does
3075 * not include the element it is discarded */
3076 di
= dictGetIterator(dv
[0]);
3077 if (!di
) oom("dictGetIterator");
3079 while((de
= dictNext(di
)) != NULL
) {
3082 for (j
= 1; j
< setsnum
; j
++)
3083 if (dictFind(dv
[j
],dictGetEntryKey(de
)) == NULL
) break;
3085 continue; /* at least one set does not contain the member */
3086 ele
= dictGetEntryKey(de
);
3088 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(ele
->ptr
)));
3090 addReply(c
,shared
.crlf
);
3093 dictAdd(dstset
->ptr
,ele
,NULL
);
3097 dictReleaseIterator(di
);
3100 /* Store the resulting set into the target */
3101 deleteKey(c
->db
,dstkey
);
3102 dictAdd(c
->db
->dict
,dstkey
,dstset
);
3103 incrRefCount(dstkey
);
3107 lenobj
->ptr
= sdscatprintf(sdsempty(),"*%d\r\n",cardinality
);
3109 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",
3110 dictSize((dict
*)dstset
->ptr
)));
3116 static void sinterCommand(redisClient
*c
) {
3117 sinterGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
);
3120 static void sinterstoreCommand(redisClient
*c
) {
3121 sinterGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1]);
3124 #define REDIS_OP_UNION 0
3125 #define REDIS_OP_DIFF 1
3127 static void sunionDiffGenericCommand(redisClient
*c
, robj
**setskeys
, int setsnum
, robj
*dstkey
, int op
) {
3128 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
3131 robj
*dstset
= NULL
;
3132 int j
, cardinality
= 0;
3134 if (!dv
) oom("sunionDiffGenericCommand");
3135 for (j
= 0; j
< setsnum
; j
++) {
3139 lookupKeyWrite(c
->db
,setskeys
[j
]) :
3140 lookupKeyRead(c
->db
,setskeys
[j
]);
3145 if (setobj
->type
!= REDIS_SET
) {
3147 addReply(c
,shared
.wrongtypeerr
);
3150 dv
[j
] = setobj
->ptr
;
3153 /* We need a temp set object to store our union. If the dstkey
3154 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
3155 * this set object will be the resulting object to set into the target key*/
3156 dstset
= createSetObject();
3158 /* Iterate all the elements of all the sets, add every element a single
3159 * time to the result set */
3160 for (j
= 0; j
< setsnum
; j
++) {
3161 if (op
== REDIS_OP_DIFF
&& j
== 0 && !dv
[j
]) break; /* result set is empty */
3162 if (!dv
[j
]) continue; /* non existing keys are like empty sets */
3164 di
= dictGetIterator(dv
[j
]);
3165 if (!di
) oom("dictGetIterator");
3167 while((de
= dictNext(di
)) != NULL
) {
3170 /* dictAdd will not add the same element multiple times */
3171 ele
= dictGetEntryKey(de
);
3172 if (op
== REDIS_OP_UNION
|| j
== 0) {
3173 if (dictAdd(dstset
->ptr
,ele
,NULL
) == DICT_OK
) {
3177 } else if (op
== REDIS_OP_DIFF
) {
3178 if (dictDelete(dstset
->ptr
,ele
) == DICT_OK
) {
3183 dictReleaseIterator(di
);
3185 if (op
== REDIS_OP_DIFF
&& cardinality
== 0) break; /* result set is empty */
3188 /* Output the content of the resulting set, if not in STORE mode */
3190 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",cardinality
));
3191 di
= dictGetIterator(dstset
->ptr
);
3192 if (!di
) oom("dictGetIterator");
3193 while((de
= dictNext(di
)) != NULL
) {
3196 ele
= dictGetEntryKey(de
);
3197 addReplySds(c
,sdscatprintf(sdsempty(),
3198 "$%d\r\n",sdslen(ele
->ptr
)));
3200 addReply(c
,shared
.crlf
);
3202 dictReleaseIterator(di
);
3204 /* If we have a target key where to store the resulting set
3205 * create this key with the result set inside */
3206 deleteKey(c
->db
,dstkey
);
3207 dictAdd(c
->db
->dict
,dstkey
,dstset
);
3208 incrRefCount(dstkey
);
3213 decrRefCount(dstset
);
3215 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",
3216 dictSize((dict
*)dstset
->ptr
)));
3222 static void sunionCommand(redisClient
*c
) {
3223 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_UNION
);
3226 static void sunionstoreCommand(redisClient
*c
) {
3227 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_UNION
);
3230 static void sdiffCommand(redisClient
*c
) {
3231 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_DIFF
);
3234 static void sdiffstoreCommand(redisClient
*c
) {
3235 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_DIFF
);
3238 static void flushdbCommand(redisClient
*c
) {
3239 server
.dirty
+= dictSize(c
->db
->dict
);
3240 dictEmpty(c
->db
->dict
);
3241 dictEmpty(c
->db
->expires
);
3242 addReply(c
,shared
.ok
);
3245 static void flushallCommand(redisClient
*c
) {
3246 server
.dirty
+= emptyDb();
3247 addReply(c
,shared
.ok
);
3248 rdbSave(server
.dbfilename
);
3252 redisSortOperation
*createSortOperation(int type
, robj
*pattern
) {
3253 redisSortOperation
*so
= zmalloc(sizeof(*so
));
3254 if (!so
) oom("createSortOperation");
3256 so
->pattern
= pattern
;
3260 /* Return the value associated to the key with a name obtained
3261 * substituting the first occurence of '*' in 'pattern' with 'subst' */
3262 robj
*lookupKeyByPattern(redisDb
*db
, robj
*pattern
, robj
*subst
) {
3266 int prefixlen
, sublen
, postfixlen
;
3267 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
3271 char buf
[REDIS_SORTKEY_MAX
+1];
3274 spat
= pattern
->ptr
;
3276 if (sdslen(spat
)+sdslen(ssub
)-1 > REDIS_SORTKEY_MAX
) return NULL
;
3277 p
= strchr(spat
,'*');
3278 if (!p
) return NULL
;
3281 sublen
= sdslen(ssub
);
3282 postfixlen
= sdslen(spat
)-(prefixlen
+1);
3283 memcpy(keyname
.buf
,spat
,prefixlen
);
3284 memcpy(keyname
.buf
+prefixlen
,ssub
,sublen
);
3285 memcpy(keyname
.buf
+prefixlen
+sublen
,p
+1,postfixlen
);
3286 keyname
.buf
[prefixlen
+sublen
+postfixlen
] = '\0';
3287 keyname
.len
= prefixlen
+sublen
+postfixlen
;
3289 keyobj
.refcount
= 1;
3290 keyobj
.type
= REDIS_STRING
;
3291 keyobj
.ptr
= ((char*)&keyname
)+(sizeof(long)*2);
3293 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
3294 return lookupKeyRead(db
,&keyobj
);
3297 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
3298 * the additional parameter is not standard but a BSD-specific we have to
3299 * pass sorting parameters via the global 'server' structure */
3300 static int sortCompare(const void *s1
, const void *s2
) {
3301 const redisSortObject
*so1
= s1
, *so2
= s2
;
3304 if (!server
.sort_alpha
) {
3305 /* Numeric sorting. Here it's trivial as we precomputed scores */
3306 if (so1
->u
.score
> so2
->u
.score
) {
3308 } else if (so1
->u
.score
< so2
->u
.score
) {
3314 /* Alphanumeric sorting */
3315 if (server
.sort_bypattern
) {
3316 if (!so1
->u
.cmpobj
|| !so2
->u
.cmpobj
) {
3317 /* At least one compare object is NULL */
3318 if (so1
->u
.cmpobj
== so2
->u
.cmpobj
)
3320 else if (so1
->u
.cmpobj
== NULL
)
3325 /* We have both the objects, use strcoll */
3326 cmp
= strcoll(so1
->u
.cmpobj
->ptr
,so2
->u
.cmpobj
->ptr
);
3329 /* Compare elements directly */
3330 cmp
= strcoll(so1
->obj
->ptr
,so2
->obj
->ptr
);
3333 return server
.sort_desc
? -cmp
: cmp
;
3336 /* The SORT command is the most complex command in Redis. Warning: this code
3337 * is optimized for speed and a bit less for readability */
3338 static void sortCommand(redisClient
*c
) {
3341 int desc
= 0, alpha
= 0;
3342 int limit_start
= 0, limit_count
= -1, start
, end
;
3343 int j
, dontsort
= 0, vectorlen
;
3344 int getop
= 0; /* GET operation counter */
3345 robj
*sortval
, *sortby
= NULL
;
3346 redisSortObject
*vector
; /* Resulting vector to sort */
3348 /* Lookup the key to sort. It must be of the right types */
3349 sortval
= lookupKeyRead(c
->db
,c
->argv
[1]);
3350 if (sortval
== NULL
) {
3351 addReply(c
,shared
.nokeyerr
);
3354 if (sortval
->type
!= REDIS_SET
&& sortval
->type
!= REDIS_LIST
) {
3355 addReply(c
,shared
.wrongtypeerr
);
3359 /* Create a list of operations to perform for every sorted element.
3360 * Operations can be GET/DEL/INCR/DECR */
3361 operations
= listCreate();
3362 listSetFreeMethod(operations
,zfree
);
3365 /* Now we need to protect sortval incrementing its count, in the future
3366 * SORT may have options able to overwrite/delete keys during the sorting
3367 * and the sorted key itself may get destroied */
3368 incrRefCount(sortval
);
3370 /* The SORT command has an SQL-alike syntax, parse it */
3371 while(j
< c
->argc
) {
3372 int leftargs
= c
->argc
-j
-1;
3373 if (!strcasecmp(c
->argv
[j
]->ptr
,"asc")) {
3375 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"desc")) {
3377 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"alpha")) {
3379 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"limit") && leftargs
>= 2) {
3380 limit_start
= atoi(c
->argv
[j
+1]->ptr
);
3381 limit_count
= atoi(c
->argv
[j
+2]->ptr
);
3383 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"by") && leftargs
>= 1) {
3384 sortby
= c
->argv
[j
+1];
3385 /* If the BY pattern does not contain '*', i.e. it is constant,
3386 * we don't need to sort nor to lookup the weight keys. */
3387 if (strchr(c
->argv
[j
+1]->ptr
,'*') == NULL
) dontsort
= 1;
3389 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"get") && leftargs
>= 1) {
3390 listAddNodeTail(operations
,createSortOperation(
3391 REDIS_SORT_GET
,c
->argv
[j
+1]));
3394 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"del") && leftargs
>= 1) {
3395 listAddNodeTail(operations
,createSortOperation(
3396 REDIS_SORT_DEL
,c
->argv
[j
+1]));
3398 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"incr") && leftargs
>= 1) {
3399 listAddNodeTail(operations
,createSortOperation(
3400 REDIS_SORT_INCR
,c
->argv
[j
+1]));
3402 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"get") && leftargs
>= 1) {
3403 listAddNodeTail(operations
,createSortOperation(
3404 REDIS_SORT_DECR
,c
->argv
[j
+1]));
3407 decrRefCount(sortval
);
3408 listRelease(operations
);
3409 addReply(c
,shared
.syntaxerr
);
3415 /* Load the sorting vector with all the objects to sort */
3416 vectorlen
= (sortval
->type
== REDIS_LIST
) ?
3417 listLength((list
*)sortval
->ptr
) :
3418 dictSize((dict
*)sortval
->ptr
);
3419 vector
= zmalloc(sizeof(redisSortObject
)*vectorlen
);
3420 if (!vector
) oom("allocating objects vector for SORT");
3422 if (sortval
->type
== REDIS_LIST
) {
3423 list
*list
= sortval
->ptr
;
3427 while((ln
= listYield(list
))) {
3428 robj
*ele
= ln
->value
;
3429 vector
[j
].obj
= ele
;
3430 vector
[j
].u
.score
= 0;
3431 vector
[j
].u
.cmpobj
= NULL
;
3435 dict
*set
= sortval
->ptr
;
3439 di
= dictGetIterator(set
);
3440 if (!di
) oom("dictGetIterator");
3441 while((setele
= dictNext(di
)) != NULL
) {
3442 vector
[j
].obj
= dictGetEntryKey(setele
);
3443 vector
[j
].u
.score
= 0;
3444 vector
[j
].u
.cmpobj
= NULL
;
3447 dictReleaseIterator(di
);
3449 assert(j
== vectorlen
);
3451 /* Now it's time to load the right scores in the sorting vector */
3452 if (dontsort
== 0) {
3453 for (j
= 0; j
< vectorlen
; j
++) {
3457 byval
= lookupKeyByPattern(c
->db
,sortby
,vector
[j
].obj
);
3458 if (!byval
|| byval
->type
!= REDIS_STRING
) continue;
3460 vector
[j
].u
.cmpobj
= byval
;
3461 incrRefCount(byval
);
3463 vector
[j
].u
.score
= strtod(byval
->ptr
,NULL
);
3466 if (!alpha
) vector
[j
].u
.score
= strtod(vector
[j
].obj
->ptr
,NULL
);
3471 /* We are ready to sort the vector... perform a bit of sanity check
3472 * on the LIMIT option too. We'll use a partial version of quicksort. */
3473 start
= (limit_start
< 0) ? 0 : limit_start
;
3474 end
= (limit_count
< 0) ? vectorlen
-1 : start
+limit_count
-1;
3475 if (start
>= vectorlen
) {
3476 start
= vectorlen
-1;
3479 if (end
>= vectorlen
) end
= vectorlen
-1;
3481 if (dontsort
== 0) {
3482 server
.sort_desc
= desc
;
3483 server
.sort_alpha
= alpha
;
3484 server
.sort_bypattern
= sortby
? 1 : 0;
3485 if (sortby
&& (start
!= 0 || end
!= vectorlen
-1))
3486 pqsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
, start
,end
);
3488 qsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
);
3491 /* Send command output to the output buffer, performing the specified
3492 * GET/DEL/INCR/DECR operations if any. */
3493 outputlen
= getop
? getop
*(end
-start
+1) : end
-start
+1;
3494 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",outputlen
));
3495 for (j
= start
; j
<= end
; j
++) {
3498 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",
3499 sdslen(vector
[j
].obj
->ptr
)));
3500 addReply(c
,vector
[j
].obj
);
3501 addReply(c
,shared
.crlf
);
3503 listRewind(operations
);
3504 while((ln
= listYield(operations
))) {
3505 redisSortOperation
*sop
= ln
->value
;
3506 robj
*val
= lookupKeyByPattern(c
->db
,sop
->pattern
,
3509 if (sop
->type
== REDIS_SORT_GET
) {
3510 if (!val
|| val
->type
!= REDIS_STRING
) {
3511 addReply(c
,shared
.nullbulk
);
3513 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",
3516 addReply(c
,shared
.crlf
);
3518 } else if (sop
->type
== REDIS_SORT_DEL
) {
3525 decrRefCount(sortval
);
3526 listRelease(operations
);
3527 for (j
= 0; j
< vectorlen
; j
++) {
3528 if (sortby
&& alpha
&& vector
[j
].u
.cmpobj
)
3529 decrRefCount(vector
[j
].u
.cmpobj
);
3534 static void infoCommand(redisClient
*c
) {
3536 time_t uptime
= time(NULL
)-server
.stat_starttime
;
3538 info
= sdscatprintf(sdsempty(),
3539 "redis_version:%s\r\n"
3540 "uptime_in_seconds:%d\r\n"
3541 "uptime_in_days:%d\r\n"
3542 "connected_clients:%d\r\n"
3543 "connected_slaves:%d\r\n"
3544 "used_memory:%zu\r\n"
3545 "changes_since_last_save:%lld\r\n"
3546 "bgsave_in_progress:%d\r\n"
3547 "last_save_time:%d\r\n"
3548 "total_connections_received:%lld\r\n"
3549 "total_commands_processed:%lld\r\n"
3554 listLength(server
.clients
)-listLength(server
.slaves
),
3555 listLength(server
.slaves
),
3558 server
.bgsaveinprogress
,
3560 server
.stat_numconnections
,
3561 server
.stat_numcommands
,
3562 server
.masterhost
== NULL
? "master" : "slave"
3564 if (server
.masterhost
) {
3565 info
= sdscatprintf(info
,
3566 "master_host:%s\r\n"
3567 "master_port:%d\r\n"
3568 "master_link_status:%s\r\n"
3569 "master_last_io_seconds_ago:%d\r\n"
3572 (server
.replstate
== REDIS_REPL_CONNECTED
) ?
3574 (int)(time(NULL
)-server
.master
->lastinteraction
)
3577 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info
)));
3578 addReplySds(c
,info
);
3579 addReply(c
,shared
.crlf
);
3582 static void monitorCommand(redisClient
*c
) {
3583 /* ignore MONITOR if aleady slave or in monitor mode */
3584 if (c
->flags
& REDIS_SLAVE
) return;
3586 c
->flags
|= (REDIS_SLAVE
|REDIS_MONITOR
);
3588 if (!listAddNodeTail(server
.monitors
,c
)) oom("listAddNodeTail");
3589 addReply(c
,shared
.ok
);
3592 /* ================================= Expire ================================= */
3593 static int removeExpire(redisDb
*db
, robj
*key
) {
3594 if (dictDelete(db
->expires
,key
) == DICT_OK
) {
3601 static int setExpire(redisDb
*db
, robj
*key
, time_t when
) {
3602 if (dictAdd(db
->expires
,key
,(void*)when
) == DICT_ERR
) {
3610 /* Return the expire time of the specified key, or -1 if no expire
3611 * is associated with this key (i.e. the key is non volatile) */
3612 static time_t getExpire(redisDb
*db
, robj
*key
) {
3615 /* No expire? return ASAP */
3616 if (dictSize(db
->expires
) == 0 ||
3617 (de
= dictFind(db
->expires
,key
)) == NULL
) return -1;
3619 return (time_t) dictGetEntryVal(de
);
3622 static int expireIfNeeded(redisDb
*db
, robj
*key
) {
3626 /* No expire? return ASAP */
3627 if (dictSize(db
->expires
) == 0 ||
3628 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
3630 /* Lookup the expire */
3631 when
= (time_t) dictGetEntryVal(de
);
3632 if (time(NULL
) <= when
) return 0;
3634 /* Delete the key */
3635 dictDelete(db
->expires
,key
);
3636 return dictDelete(db
->dict
,key
) == DICT_OK
;
3639 static int deleteIfVolatile(redisDb
*db
, robj
*key
) {
3642 /* No expire? return ASAP */
3643 if (dictSize(db
->expires
) == 0 ||
3644 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
3646 /* Delete the key */
3648 dictDelete(db
->expires
,key
);
3649 return dictDelete(db
->dict
,key
) == DICT_OK
;
3652 static void expireCommand(redisClient
*c
) {
3654 int seconds
= atoi(c
->argv
[2]->ptr
);
3656 de
= dictFind(c
->db
->dict
,c
->argv
[1]);
3658 addReply(c
,shared
.czero
);
3662 addReply(c
, shared
.czero
);
3665 time_t when
= time(NULL
)+seconds
;
3666 if (setExpire(c
->db
,c
->argv
[1],when
))
3667 addReply(c
,shared
.cone
);
3669 addReply(c
,shared
.czero
);
3674 static void ttlCommand(redisClient
*c
) {
3678 expire
= getExpire(c
->db
,c
->argv
[1]);
3680 ttl
= (int) (expire
-time(NULL
));
3681 if (ttl
< 0) ttl
= -1;
3683 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",ttl
));
3686 /* =============================== Replication ============================= */
3688 static int syncWrite(int fd
, char *ptr
, ssize_t size
, int timeout
) {
3689 ssize_t nwritten
, ret
= size
;
3690 time_t start
= time(NULL
);
3694 if (aeWait(fd
,AE_WRITABLE
,1000) & AE_WRITABLE
) {
3695 nwritten
= write(fd
,ptr
,size
);
3696 if (nwritten
== -1) return -1;
3700 if ((time(NULL
)-start
) > timeout
) {
3708 static int syncRead(int fd
, char *ptr
, ssize_t size
, int timeout
) {
3709 ssize_t nread
, totread
= 0;
3710 time_t start
= time(NULL
);
3714 if (aeWait(fd
,AE_READABLE
,1000) & AE_READABLE
) {
3715 nread
= read(fd
,ptr
,size
);
3716 if (nread
== -1) return -1;
3721 if ((time(NULL
)-start
) > timeout
) {
3729 static int syncReadLine(int fd
, char *ptr
, ssize_t size
, int timeout
) {
3736 if (syncRead(fd
,&c
,1,timeout
) == -1) return -1;
3739 if (nread
&& *(ptr
-1) == '\r') *(ptr
-1) = '\0';
3750 static void syncCommand(redisClient
*c
) {
3751 /* ignore SYNC if aleady slave or in monitor mode */
3752 if (c
->flags
& REDIS_SLAVE
) return;
3754 /* SYNC can't be issued when the server has pending data to send to
3755 * the client about already issued commands. We need a fresh reply
3756 * buffer registering the differences between the BGSAVE and the current
3757 * dataset, so that we can copy to other slaves if needed. */
3758 if (listLength(c
->reply
) != 0) {
3759 addReplySds(c
,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
3763 redisLog(REDIS_NOTICE
,"Slave ask for synchronization");
3764 /* Here we need to check if there is a background saving operation
3765 * in progress, or if it is required to start one */
3766 if (server
.bgsaveinprogress
) {
3767 /* Ok a background save is in progress. Let's check if it is a good
3768 * one for replication, i.e. if there is another slave that is
3769 * registering differences since the server forked to save */
3773 listRewind(server
.slaves
);
3774 while((ln
= listYield(server
.slaves
))) {
3776 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) break;
3779 /* Perfect, the server is already registering differences for
3780 * another slave. Set the right state, and copy the buffer. */
3781 listRelease(c
->reply
);
3782 c
->reply
= listDup(slave
->reply
);
3783 if (!c
->reply
) oom("listDup copying slave reply list");
3784 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
3785 redisLog(REDIS_NOTICE
,"Waiting for end of BGSAVE for SYNC");
3787 /* No way, we need to wait for the next BGSAVE in order to
3788 * register differences */
3789 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_START
;
3790 redisLog(REDIS_NOTICE
,"Waiting for next BGSAVE for SYNC");
3793 /* Ok we don't have a BGSAVE in progress, let's start one */
3794 redisLog(REDIS_NOTICE
,"Starting BGSAVE for SYNC");
3795 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
3796 redisLog(REDIS_NOTICE
,"Replication failed, can't BGSAVE");
3797 addReplySds(c
,sdsnew("-ERR Unalbe to perform background save\r\n"));
3800 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
3803 c
->flags
|= REDIS_SLAVE
;
3805 if (!listAddNodeTail(server
.slaves
,c
)) oom("listAddNodeTail");
3809 static void sendBulkToSlave(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
3810 redisClient
*slave
= privdata
;
3812 REDIS_NOTUSED(mask
);
3813 char buf
[REDIS_IOBUF_LEN
];
3814 ssize_t nwritten
, buflen
;
3816 if (slave
->repldboff
== 0) {
3817 /* Write the bulk write count before to transfer the DB. In theory here
3818 * we don't know how much room there is in the output buffer of the
3819 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
3820 * operations) will never be smaller than the few bytes we need. */
3823 bulkcount
= sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
3825 if (write(fd
,bulkcount
,sdslen(bulkcount
)) != (signed)sdslen(bulkcount
))
3833 lseek(slave
->repldbfd
,slave
->repldboff
,SEEK_SET
);
3834 buflen
= read(slave
->repldbfd
,buf
,REDIS_IOBUF_LEN
);
3836 redisLog(REDIS_WARNING
,"Read error sending DB to slave: %s",
3837 (buflen
== 0) ? "premature EOF" : strerror(errno
));
3841 if ((nwritten
= write(fd
,buf
,buflen
)) == -1) {
3842 redisLog(REDIS_DEBUG
,"Write error sending DB to slave: %s",
3847 slave
->repldboff
+= nwritten
;
3848 if (slave
->repldboff
== slave
->repldbsize
) {
3849 close(slave
->repldbfd
);
3850 slave
->repldbfd
= -1;
3851 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
3852 slave
->replstate
= REDIS_REPL_ONLINE
;
3853 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
,
3854 sendReplyToClient
, slave
, NULL
) == AE_ERR
) {
3858 addReplySds(slave
,sdsempty());
3859 redisLog(REDIS_NOTICE
,"Synchronization with slave succeeded");
3863 static void updateSalvesWaitingBgsave(int bgsaveerr
) {
3865 int startbgsave
= 0;
3867 listRewind(server
.slaves
);
3868 while((ln
= listYield(server
.slaves
))) {
3869 redisClient
*slave
= ln
->value
;
3871 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) {
3873 slave
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
3874 } else if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) {
3877 if (bgsaveerr
!= REDIS_OK
) {
3879 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE child returned an error");
3882 if ((slave
->repldbfd
= open(server
.dbfilename
,O_RDONLY
)) == -1 ||
3883 fstat(slave
->repldbfd
,&buf
) == -1) {
3885 redisLog(REDIS_WARNING
,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno
));
3888 slave
->repldboff
= 0;
3889 slave
->repldbsize
= buf
.st_size
;
3890 slave
->replstate
= REDIS_REPL_SEND_BULK
;
3891 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
3892 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
, sendBulkToSlave
, slave
, NULL
) == AE_ERR
) {
3899 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
3900 listRewind(server
.slaves
);
3901 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE failed");
3902 while((ln
= listYield(server
.slaves
))) {
3903 redisClient
*slave
= ln
->value
;
3905 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
)
3912 static int syncWithMaster(void) {
3913 char buf
[1024], tmpfile
[256];
3915 int fd
= anetTcpConnect(NULL
,server
.masterhost
,server
.masterport
);
3919 redisLog(REDIS_WARNING
,"Unable to connect to MASTER: %s",
3923 /* Issue the SYNC command */
3924 if (syncWrite(fd
,"SYNC \r\n",7,5) == -1) {
3926 redisLog(REDIS_WARNING
,"I/O error writing to MASTER: %s",
3930 /* Read the bulk write count */
3931 if (syncReadLine(fd
,buf
,1024,3600) == -1) {
3933 redisLog(REDIS_WARNING
,"I/O error reading bulk count from MASTER: %s",
3937 dumpsize
= atoi(buf
+1);
3938 redisLog(REDIS_NOTICE
,"Receiving %d bytes data dump from MASTER",dumpsize
);
3939 /* Read the bulk write data on a temp file */
3940 snprintf(tmpfile
,256,"temp-%d.%ld.rdb",(int)time(NULL
),(long int)random());
3941 dfd
= open(tmpfile
,O_CREAT
|O_WRONLY
,0644);
3944 redisLog(REDIS_WARNING
,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno
));
3948 int nread
, nwritten
;
3950 nread
= read(fd
,buf
,(dumpsize
< 1024)?dumpsize
:1024);
3952 redisLog(REDIS_WARNING
,"I/O error trying to sync with MASTER: %s",
3958 nwritten
= write(dfd
,buf
,nread
);
3959 if (nwritten
== -1) {
3960 redisLog(REDIS_WARNING
,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno
));
3968 if (rename(tmpfile
,server
.dbfilename
) == -1) {
3969 redisLog(REDIS_WARNING
,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno
));
3975 if (rdbLoad(server
.dbfilename
) != REDIS_OK
) {
3976 redisLog(REDIS_WARNING
,"Failed trying to load the MASTER synchronization DB from disk");
3980 server
.master
= createClient(fd
);
3981 server
.master
->flags
|= REDIS_MASTER
;
3982 server
.replstate
= REDIS_REPL_CONNECTED
;
3986 static void slaveofCommand(redisClient
*c
) {
3987 if (!strcasecmp(c
->argv
[1]->ptr
,"no") &&
3988 !strcasecmp(c
->argv
[2]->ptr
,"one")) {
3989 if (server
.masterhost
) {
3990 sdsfree(server
.masterhost
);
3991 server
.masterhost
= NULL
;
3992 if (server
.master
) freeClient(server
.master
);
3993 server
.replstate
= REDIS_REPL_NONE
;
3994 redisLog(REDIS_NOTICE
,"MASTER MODE enabled (user request)");
3997 sdsfree(server
.masterhost
);
3998 server
.masterhost
= sdsdup(c
->argv
[1]->ptr
);
3999 server
.masterport
= atoi(c
->argv
[2]->ptr
);
4000 if (server
.master
) freeClient(server
.master
);
4001 server
.replstate
= REDIS_REPL_CONNECT
;
4002 redisLog(REDIS_NOTICE
,"SLAVE OF %s:%d enabled (user request)",
4003 server
.masterhost
, server
.masterport
);
4005 addReply(c
,shared
.ok
);
4008 /* ============================ Maxmemory directive ======================== */
4010 /* This function gets called when 'maxmemory' is set on the config file to limit
4011 * the max memory used by the server, and we are out of memory.
4012 * This function will try to, in order:
4014 * - Free objects from the free list
4015 * - Try to remove keys with an EXPIRE set
4017 * It is not possible to free enough memory to reach used-memory < maxmemory
4018 * the server will start refusing commands that will enlarge even more the
4021 static void freeMemoryIfNeeded(void) {
4022 while (server
.maxmemory
&& zmalloc_used_memory() > server
.maxmemory
) {
4023 if (listLength(server
.objfreelist
)) {
4026 listNode
*head
= listFirst(server
.objfreelist
);
4027 o
= listNodeValue(head
);
4028 listDelNode(server
.objfreelist
,head
);
4031 int j
, k
, freed
= 0;
4033 for (j
= 0; j
< server
.dbnum
; j
++) {
4035 robj
*minkey
= NULL
;
4036 struct dictEntry
*de
;
4038 if (dictSize(server
.db
[j
].expires
)) {
4040 /* From a sample of three keys drop the one nearest to
4041 * the natural expire */
4042 for (k
= 0; k
< 3; k
++) {
4045 de
= dictGetRandomKey(server
.db
[j
].expires
);
4046 t
= (time_t) dictGetEntryVal(de
);
4047 if (minttl
== -1 || t
< minttl
) {
4048 minkey
= dictGetEntryKey(de
);
4052 deleteKey(server
.db
+j
,minkey
);
4055 if (!freed
) return; /* nothing to free... */
4060 /* ================================= Debugging ============================== */
4062 static void debugCommand(redisClient
*c
) {
4063 if (!strcasecmp(c
->argv
[1]->ptr
,"segfault")) {
4065 } else if (!strcasecmp(c
->argv
[1]->ptr
,"object") && c
->argc
== 3) {
4066 dictEntry
*de
= dictFind(c
->db
->dict
,c
->argv
[2]);
4070 addReply(c
,shared
.nokeyerr
);
4073 key
= dictGetEntryKey(de
);
4074 val
= dictGetEntryVal(de
);
4075 addReplySds(c
,sdscatprintf(sdsempty(),
4076 "+Key at:%p refcount:%d, value at:%p refcount:%d\r\n",
4077 key
, key
->refcount
, val
, val
->refcount
));
4079 addReplySds(c
,sdsnew(
4080 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>]\r\n"));
4084 static void segvHandler (int sig
, siginfo_t
*info
, void *secret
) {
4087 char **messages
= (char **)NULL
;
4088 int i
, trace_size
= 0;
4089 ucontext_t
*uc
= (ucontext_t
*)secret
;
4091 redisLog(REDIS_DEBUG
, "Segmentation fault -%d- Redis %s", sig
, REDIS_VERSION
);
4092 redisLog(REDIS_DEBUG
,"EIP %p", (void *)uc
->uc_mcontext
.gregs
[REG_EIP
]);
4093 redisLog(REDIS_DEBUG
,"EAX %p, EBX %p, ECX %p, EDX %p", (void *)uc
->uc_mcontext
.gregs
[REG_EAX
], (void *)uc
->uc_mcontext
.gregs
[REG_EBX
], (void *)uc
->uc_mcontext
.gregs
[REG_ECX
], (void *)uc
->uc_mcontext
.gregs
[REG_EDX
]);
4096 trace_size
= backtrace(trace
, 100);
4097 /* overwrite sigaction with caller's address */
4098 trace
[1] = (void *) uc
->uc_mcontext
.gregs
[REG_EIP
];
4100 messages
= backtrace_symbols(trace
, trace_size
);
4102 for (i
=1; i
<trace_size
; ++i
)
4103 redisLog(REDIS_DEBUG
,"[bt] %s", messages
[i
]);
4109 void setupSigSegvAction(){
4110 struct sigaction act
;
4111 sigemptyset (&act
.sa_mask
);
4112 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction is used. Otherwise, sa_handler is used */
4113 act
.sa_flags
= SA_NODEFER
| SA_ONSTACK
| SA_RESETHAND
| SA_SIGINFO
;
4114 act
.sa_sigaction
= segvHandler
;
4115 sigaction (SIGSEGV
, &act
, NULL
);
4117 /* =================================== Main! ================================ */
4120 int linuxOvercommitMemoryValue(void) {
4121 FILE *fp
= fopen("/proc/sys/vm/overcommit_memory","r");
4125 if (fgets(buf
,64,fp
) == NULL
) {
4134 void linuxOvercommitMemoryWarning(void) {
4135 if (linuxOvercommitMemoryValue() == 0) {
4136 redisLog(REDIS_WARNING
,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'echo 1 > /proc/sys/vm/overcommit_memory' in your init scripts.");
4139 #endif /* __linux__ */
4141 static void daemonize(void) {
4145 if (fork() != 0) exit(0); /* parent exits */
4146 setsid(); /* create a new session */
4148 /* Every output goes to /dev/null. If Redis is daemonized but
4149 * the 'logfile' is set to 'stdout' in the configuration file
4150 * it will not log at all. */
4151 if ((fd
= open("/dev/null", O_RDWR
, 0)) != -1) {
4152 dup2(fd
, STDIN_FILENO
);
4153 dup2(fd
, STDOUT_FILENO
);
4154 dup2(fd
, STDERR_FILENO
);
4155 if (fd
> STDERR_FILENO
) close(fd
);
4157 /* Try to write the pid file */
4158 fp
= fopen(server
.pidfile
,"w");
4160 fprintf(fp
,"%d\n",getpid());
4165 int main(int argc
, char **argv
) {
4167 linuxOvercommitMemoryWarning();
4172 ResetServerSaveParams();
4173 loadServerConfig(argv
[1]);
4174 } else if (argc
> 2) {
4175 fprintf(stderr
,"Usage: ./redis-server [/path/to/redis.conf]\n");
4178 redisLog(REDIS_WARNING
,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
4181 if (server
.daemonize
) daemonize();
4182 redisLog(REDIS_NOTICE
,"Server started, Redis version " REDIS_VERSION
);
4183 if (rdbLoad(server
.dbfilename
) == REDIS_OK
)
4184 redisLog(REDIS_NOTICE
,"DB loaded from disk");
4185 if (aeCreateFileEvent(server
.el
, server
.fd
, AE_READABLE
,
4186 acceptHandler
, NULL
, NULL
) == AE_ERR
) oom("creating file event");
4187 redisLog(REDIS_NOTICE
,"The server is now ready to accept connections on port %d", server
.port
);
4189 aeDeleteEventLoop(server
.el
);