2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
30 #define REDIS_VERSION "0.101"
46 #include <arpa/inet.h>
50 #include <sys/resource.h>
54 #include "ae.h" /* Event driven programming library */
55 #include "sds.h" /* Dynamic safe strings */
56 #include "anet.h" /* Networking the easy way */
57 #include "dict.h" /* Hash tables */
58 #include "adlist.h" /* Linked lists */
59 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
60 #include "lzf.h" /* LZF compression library */
61 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
69 /* Static server configuration */
70 #define REDIS_SERVERPORT 6379 /* TCP port */
71 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
72 #define REDIS_IOBUF_LEN 1024
73 #define REDIS_LOADBUF_LEN 1024
74 #define REDIS_STATIC_ARGS 4
75 #define REDIS_DEFAULT_DBNUM 16
76 #define REDIS_CONFIGLINE_MAX 1024
77 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
78 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
79 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
81 /* Hash table parameters */
82 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
83 #define REDIS_HT_MINSLOTS 16384 /* Never resize the HT under this */
86 #define REDIS_CMD_BULK 1 /* Bulk write command */
87 #define REDIS_CMD_INLINE 2 /* Inline command */
88 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
89 this flags will return an error when the 'maxmemory' option is set in the
90 config file and the server is using more than maxmemory bytes of memory.
91 In short this commands are denied on low memory conditions. */
92 #define REDIS_CMD_DENYOOM 4
95 #define REDIS_STRING 0
100 /* Object types only used for dumping to disk */
101 #define REDIS_EXPIRETIME 253
102 #define REDIS_SELECTDB 254
103 #define REDIS_EOF 255
105 /* Defines related to the dump file format. To store 32 bits lengths for short
106 * keys requires a lot of space, so we check the most significant 2 bits of
107 * the first byte to interpreter the length:
109 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
110 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
111 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
112 * 11|000000 this means: specially encoded object will follow. The six bits
113 * number specify the kind of object that follows.
114 * See the REDIS_RDB_ENC_* defines.
116 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
117 * values, will fit inside. */
118 #define REDIS_RDB_6BITLEN 0
119 #define REDIS_RDB_14BITLEN 1
120 #define REDIS_RDB_32BITLEN 2
121 #define REDIS_RDB_ENCVAL 3
122 #define REDIS_RDB_LENERR UINT_MAX
124 /* When a length of a string object stored on disk has the first two bits
125 * set, the remaining two bits specify a special encoding for the object
126 * accordingly to the following defines: */
127 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
128 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
129 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
130 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
133 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
134 #define REDIS_SLAVE 2 /* This client is a slave server */
135 #define REDIS_MASTER 4 /* This client is a master server */
136 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
138 /* Slave replication state - slave side */
139 #define REDIS_REPL_NONE 0 /* No active replication */
140 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
141 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
143 /* Slave replication state - from the point of view of master
144 * Note that in SEND_BULK and ONLINE state the slave receives new updates
145 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
146 * to start the next background saving in order to send updates to it. */
147 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
148 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
149 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
150 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
152 /* List related stuff */
156 /* Sort operations */
157 #define REDIS_SORT_GET 0
158 #define REDIS_SORT_DEL 1
159 #define REDIS_SORT_INCR 2
160 #define REDIS_SORT_DECR 3
161 #define REDIS_SORT_ASC 4
162 #define REDIS_SORT_DESC 5
163 #define REDIS_SORTKEY_MAX 1024
166 #define REDIS_DEBUG 0
167 #define REDIS_NOTICE 1
168 #define REDIS_WARNING 2
170 /* Anti-warning macro... */
171 #define REDIS_NOTUSED(V) ((void) V)
173 /*================================= Data types ============================== */
175 /* A redis object, that is a type able to hold a string / list / set */
176 typedef struct redisObject
{
182 typedef struct redisDb
{
188 /* With multiplexing we need to take per-clinet state.
189 * Clients are taken in a liked list. */
190 typedef struct redisClient
{
197 int bulklen
; /* bulk read len. -1 if not in bulk read mode */
200 time_t lastinteraction
; /* time of the last interaction, used for timeout */
201 int flags
; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
202 int slaveseldb
; /* slave selected db, if this client is a slave */
203 int authenticated
; /* when requirepass is non-NULL */
204 int replstate
; /* replication state if this is a slave */
205 int repldbfd
; /* replication DB file descriptor */
206 long repldboff
; /* replication DB file offset */
207 off_t repldbsize
; /* replication DB file size */
215 /* Global server state structure */
221 unsigned int sharingpoolsize
;
222 long long dirty
; /* changes to DB from the last save */
224 list
*slaves
, *monitors
;
225 char neterr
[ANET_ERR_LEN
];
227 int cronloops
; /* number of times the cron function run */
228 list
*objfreelist
; /* A list of freed objects to avoid malloc() */
229 time_t lastsave
; /* Unix time of last save succeeede */
230 size_t usedmemory
; /* Used memory in megabytes */
231 /* Fields used only for stats */
232 time_t stat_starttime
; /* server start time */
233 long long stat_numcommands
; /* number of processed commands */
234 long long stat_numconnections
; /* number of connections received */
242 int bgsaveinprogress
;
243 struct saveparam
*saveparams
;
250 /* Replication related */
254 redisClient
*master
; /* client that is master for this slave */
256 unsigned int maxclients
;
257 unsigned int maxmemory
;
258 /* Sort parameters - qsort_r() is only available under BSD so we
259 * have to take this state global, in order to pass it to sortCompare() */
265 typedef void redisCommandProc(redisClient
*c
);
266 struct redisCommand
{
268 redisCommandProc
*proc
;
273 typedef struct _redisSortObject
{
281 typedef struct _redisSortOperation
{
284 } redisSortOperation
;
286 struct sharedObjectsStruct
{
287 robj
*crlf
, *ok
, *err
, *emptybulk
, *czero
, *cone
, *pong
, *space
,
288 *colon
, *nullbulk
, *nullmultibulk
,
289 *emptymultibulk
, *wrongtypeerr
, *nokeyerr
, *syntaxerr
, *sameobjecterr
,
290 *outofrangeerr
, *plus
,
291 *select0
, *select1
, *select2
, *select3
, *select4
,
292 *select5
, *select6
, *select7
, *select8
, *select9
;
295 /*================================ Prototypes =============================== */
297 static void freeStringObject(robj
*o
);
298 static void freeListObject(robj
*o
);
299 static void freeSetObject(robj
*o
);
300 static void decrRefCount(void *o
);
301 static robj
*createObject(int type
, void *ptr
);
302 static void freeClient(redisClient
*c
);
303 static int rdbLoad(char *filename
);
304 static void addReply(redisClient
*c
, robj
*obj
);
305 static void addReplySds(redisClient
*c
, sds s
);
306 static void incrRefCount(robj
*o
);
307 static int rdbSaveBackground(char *filename
);
308 static robj
*createStringObject(char *ptr
, size_t len
);
309 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
);
310 static int syncWithMaster(void);
311 static robj
*tryObjectSharing(robj
*o
);
312 static int removeExpire(redisDb
*db
, robj
*key
);
313 static int expireIfNeeded(redisDb
*db
, robj
*key
);
314 static int deleteIfVolatile(redisDb
*db
, robj
*key
);
315 static int deleteKey(redisDb
*db
, robj
*key
);
316 static time_t getExpire(redisDb
*db
, robj
*key
);
317 static int setExpire(redisDb
*db
, robj
*key
, time_t when
);
318 static void updateSalvesWaitingBgsave(int bgsaveerr
);
319 static void freeMemoryIfNeeded(void);
321 static void authCommand(redisClient
*c
);
322 static void pingCommand(redisClient
*c
);
323 static void echoCommand(redisClient
*c
);
324 static void setCommand(redisClient
*c
);
325 static void setnxCommand(redisClient
*c
);
326 static void getCommand(redisClient
*c
);
327 static void delCommand(redisClient
*c
);
328 static void existsCommand(redisClient
*c
);
329 static void incrCommand(redisClient
*c
);
330 static void decrCommand(redisClient
*c
);
331 static void incrbyCommand(redisClient
*c
);
332 static void decrbyCommand(redisClient
*c
);
333 static void selectCommand(redisClient
*c
);
334 static void randomkeyCommand(redisClient
*c
);
335 static void keysCommand(redisClient
*c
);
336 static void dbsizeCommand(redisClient
*c
);
337 static void lastsaveCommand(redisClient
*c
);
338 static void saveCommand(redisClient
*c
);
339 static void bgsaveCommand(redisClient
*c
);
340 static void shutdownCommand(redisClient
*c
);
341 static void moveCommand(redisClient
*c
);
342 static void renameCommand(redisClient
*c
);
343 static void renamenxCommand(redisClient
*c
);
344 static void lpushCommand(redisClient
*c
);
345 static void rpushCommand(redisClient
*c
);
346 static void lpopCommand(redisClient
*c
);
347 static void rpopCommand(redisClient
*c
);
348 static void llenCommand(redisClient
*c
);
349 static void lindexCommand(redisClient
*c
);
350 static void lrangeCommand(redisClient
*c
);
351 static void ltrimCommand(redisClient
*c
);
352 static void typeCommand(redisClient
*c
);
353 static void lsetCommand(redisClient
*c
);
354 static void saddCommand(redisClient
*c
);
355 static void sremCommand(redisClient
*c
);
356 static void smoveCommand(redisClient
*c
);
357 static void sismemberCommand(redisClient
*c
);
358 static void scardCommand(redisClient
*c
);
359 static void sinterCommand(redisClient
*c
);
360 static void sinterstoreCommand(redisClient
*c
);
361 static void sunionCommand(redisClient
*c
);
362 static void sunionstoreCommand(redisClient
*c
);
363 static void sdiffCommand(redisClient
*c
);
364 static void sdiffstoreCommand(redisClient
*c
);
365 static void syncCommand(redisClient
*c
);
366 static void flushdbCommand(redisClient
*c
);
367 static void flushallCommand(redisClient
*c
);
368 static void sortCommand(redisClient
*c
);
369 static void lremCommand(redisClient
*c
);
370 static void infoCommand(redisClient
*c
);
371 static void mgetCommand(redisClient
*c
);
372 static void monitorCommand(redisClient
*c
);
373 static void expireCommand(redisClient
*c
);
374 static void getSetCommand(redisClient
*c
);
375 static void ttlCommand(redisClient
*c
);
376 static void slaveofCommand(redisClient
*c
);
377 static void debugCommand(redisClient
*c
);
379 /*================================= Globals ================================= */
382 static struct redisServer server
; /* server global state */
383 static struct redisCommand cmdTable
[] = {
384 {"get",getCommand
,2,REDIS_CMD_INLINE
},
385 {"set",setCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
386 {"setnx",setnxCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
387 {"del",delCommand
,-2,REDIS_CMD_INLINE
},
388 {"exists",existsCommand
,2,REDIS_CMD_INLINE
},
389 {"incr",incrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
390 {"decr",decrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
391 {"mget",mgetCommand
,-2,REDIS_CMD_INLINE
},
392 {"rpush",rpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
393 {"lpush",lpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
394 {"rpop",rpopCommand
,2,REDIS_CMD_INLINE
},
395 {"lpop",lpopCommand
,2,REDIS_CMD_INLINE
},
396 {"llen",llenCommand
,2,REDIS_CMD_INLINE
},
397 {"lindex",lindexCommand
,3,REDIS_CMD_INLINE
},
398 {"lset",lsetCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
399 {"lrange",lrangeCommand
,4,REDIS_CMD_INLINE
},
400 {"ltrim",ltrimCommand
,4,REDIS_CMD_INLINE
},
401 {"lrem",lremCommand
,4,REDIS_CMD_BULK
},
402 {"sadd",saddCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
403 {"srem",sremCommand
,3,REDIS_CMD_BULK
},
404 {"smove",smoveCommand
,4,REDIS_CMD_BULK
},
405 {"sismember",sismemberCommand
,3,REDIS_CMD_BULK
},
406 {"scard",scardCommand
,2,REDIS_CMD_INLINE
},
407 {"sinter",sinterCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
408 {"sinterstore",sinterstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
409 {"sunion",sunionCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
410 {"sunionstore",sunionstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
411 {"sdiff",sdiffCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
412 {"sdiffstore",sdiffstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
413 {"smembers",sinterCommand
,2,REDIS_CMD_INLINE
},
414 {"incrby",incrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
415 {"decrby",decrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
416 {"getset",getSetCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
417 {"randomkey",randomkeyCommand
,1,REDIS_CMD_INLINE
},
418 {"select",selectCommand
,2,REDIS_CMD_INLINE
},
419 {"move",moveCommand
,3,REDIS_CMD_INLINE
},
420 {"rename",renameCommand
,3,REDIS_CMD_INLINE
},
421 {"renamenx",renamenxCommand
,3,REDIS_CMD_INLINE
},
422 {"expire",expireCommand
,3,REDIS_CMD_INLINE
},
423 {"keys",keysCommand
,2,REDIS_CMD_INLINE
},
424 {"dbsize",dbsizeCommand
,1,REDIS_CMD_INLINE
},
425 {"auth",authCommand
,2,REDIS_CMD_INLINE
},
426 {"ping",pingCommand
,1,REDIS_CMD_INLINE
},
427 {"echo",echoCommand
,2,REDIS_CMD_BULK
},
428 {"save",saveCommand
,1,REDIS_CMD_INLINE
},
429 {"bgsave",bgsaveCommand
,1,REDIS_CMD_INLINE
},
430 {"shutdown",shutdownCommand
,1,REDIS_CMD_INLINE
},
431 {"lastsave",lastsaveCommand
,1,REDIS_CMD_INLINE
},
432 {"type",typeCommand
,2,REDIS_CMD_INLINE
},
433 {"sync",syncCommand
,1,REDIS_CMD_INLINE
},
434 {"flushdb",flushdbCommand
,1,REDIS_CMD_INLINE
},
435 {"flushall",flushallCommand
,1,REDIS_CMD_INLINE
},
436 {"sort",sortCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
437 {"info",infoCommand
,1,REDIS_CMD_INLINE
},
438 {"monitor",monitorCommand
,1,REDIS_CMD_INLINE
},
439 {"ttl",ttlCommand
,2,REDIS_CMD_INLINE
},
440 {"slaveof",slaveofCommand
,3,REDIS_CMD_INLINE
},
441 {"debug",debugCommand
,-2,REDIS_CMD_INLINE
},
445 /*============================ Utility functions ============================ */
447 /* Glob-style pattern matching. */
448 int stringmatchlen(const char *pattern
, int patternLen
,
449 const char *string
, int stringLen
, int nocase
)
454 while (pattern
[1] == '*') {
459 return 1; /* match */
461 if (stringmatchlen(pattern
+1, patternLen
-1,
462 string
, stringLen
, nocase
))
463 return 1; /* match */
467 return 0; /* no match */
471 return 0; /* no match */
481 not = pattern
[0] == '^';
488 if (pattern
[0] == '\\') {
491 if (pattern
[0] == string
[0])
493 } else if (pattern
[0] == ']') {
495 } else if (patternLen
== 0) {
499 } else if (pattern
[1] == '-' && patternLen
>= 3) {
500 int start
= pattern
[0];
501 int end
= pattern
[2];
509 start
= tolower(start
);
515 if (c
>= start
&& c
<= end
)
519 if (pattern
[0] == string
[0])
522 if (tolower((int)pattern
[0]) == tolower((int)string
[0]))
532 return 0; /* no match */
538 if (patternLen
>= 2) {
545 if (pattern
[0] != string
[0])
546 return 0; /* no match */
548 if (tolower((int)pattern
[0]) != tolower((int)string
[0]))
549 return 0; /* no match */
557 if (stringLen
== 0) {
558 while(*pattern
== '*') {
565 if (patternLen
== 0 && stringLen
== 0)
570 void redisLog(int level
, const char *fmt
, ...)
575 fp
= (server
.logfile
== NULL
) ? stdout
: fopen(server
.logfile
,"a");
579 if (level
>= server
.verbosity
) {
585 strftime(buf
,64,"%d %b %H:%M:%S",gmtime(&now
));
586 fprintf(fp
,"%s %c ",buf
,c
[level
]);
587 vfprintf(fp
, fmt
, ap
);
593 if (server
.logfile
) fclose(fp
);
596 /*====================== Hash table type implementation ==================== */
598 /* This is an hash table type that uses the SDS dynamic strings libary as
599 * keys and radis objects as values (objects can hold SDS strings,
602 static int sdsDictKeyCompare(void *privdata
, const void *key1
,
606 DICT_NOTUSED(privdata
);
608 l1
= sdslen((sds
)key1
);
609 l2
= sdslen((sds
)key2
);
610 if (l1
!= l2
) return 0;
611 return memcmp(key1
, key2
, l1
) == 0;
614 static void dictRedisObjectDestructor(void *privdata
, void *val
)
616 DICT_NOTUSED(privdata
);
621 static int dictSdsKeyCompare(void *privdata
, const void *key1
,
624 const robj
*o1
= key1
, *o2
= key2
;
625 return sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
628 static unsigned int dictSdsHash(const void *key
) {
630 return dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
633 static dictType setDictType
= {
634 dictSdsHash
, /* hash function */
637 dictSdsKeyCompare
, /* key compare */
638 dictRedisObjectDestructor
, /* key destructor */
639 NULL
/* val destructor */
642 static dictType hashDictType
= {
643 dictSdsHash
, /* hash function */
646 dictSdsKeyCompare
, /* key compare */
647 dictRedisObjectDestructor
, /* key destructor */
648 dictRedisObjectDestructor
/* val destructor */
651 /* ========================= Random utility functions ======================= */
653 /* Redis generally does not try to recover from out of memory conditions
654 * when allocating objects or strings, it is not clear if it will be possible
655 * to report this condition to the client since the networking layer itself
656 * is based on heap allocation for send buffers, so we simply abort.
657 * At least the code will be simpler to read... */
658 static void oom(const char *msg
) {
659 fprintf(stderr
, "%s: Out of memory\n",msg
);
665 /* ====================== Redis server networking stuff ===================== */
666 void closeTimedoutClients(void) {
669 time_t now
= time(NULL
);
671 listRewind(server
.clients
);
672 while ((ln
= listYield(server
.clients
)) != NULL
) {
673 c
= listNodeValue(ln
);
674 if (!(c
->flags
& REDIS_SLAVE
) && /* no timeout for slaves */
675 !(c
->flags
& REDIS_MASTER
) && /* no timeout for masters */
676 (now
- c
->lastinteraction
> server
.maxidletime
)) {
677 redisLog(REDIS_DEBUG
,"Closing idle client");
683 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
684 * we resize the hash table to save memory */
685 void tryResizeHashTables(void) {
688 for (j
= 0; j
< server
.dbnum
; j
++) {
689 long long size
, used
;
691 size
= dictSlots(server
.db
[j
].dict
);
692 used
= dictSize(server
.db
[j
].dict
);
693 if (size
&& used
&& size
> REDIS_HT_MINSLOTS
&&
694 (used
*100/size
< REDIS_HT_MINFILL
)) {
695 redisLog(REDIS_NOTICE
,"The hash table %d is too sparse, resize it...",j
);
696 dictResize(server
.db
[j
].dict
);
697 redisLog(REDIS_NOTICE
,"Hash table %d resized.",j
);
702 int serverCron(struct aeEventLoop
*eventLoop
, long long id
, void *clientData
) {
703 int j
, loops
= server
.cronloops
++;
704 REDIS_NOTUSED(eventLoop
);
706 REDIS_NOTUSED(clientData
);
708 /* Update the global state with the amount of used memory */
709 server
.usedmemory
= zmalloc_used_memory();
711 /* Show some info about non-empty databases */
712 for (j
= 0; j
< server
.dbnum
; j
++) {
713 long long size
, used
, vkeys
;
715 size
= dictSlots(server
.db
[j
].dict
);
716 used
= dictSize(server
.db
[j
].dict
);
717 vkeys
= dictSize(server
.db
[j
].expires
);
718 if (!(loops
% 5) && used
> 0) {
719 redisLog(REDIS_DEBUG
,"DB %d: %d keys (%d volatile) in %d slots HT.",j
,used
,vkeys
,size
);
720 /* dictPrintStats(server.dict); */
724 /* We don't want to resize the hash tables while a bacground saving
725 * is in progress: the saving child is created using fork() that is
726 * implemented with a copy-on-write semantic in most modern systems, so
727 * if we resize the HT while there is the saving child at work actually
728 * a lot of memory movements in the parent will cause a lot of pages
730 if (!server
.bgsaveinprogress
) tryResizeHashTables();
732 /* Show information about connected clients */
734 redisLog(REDIS_DEBUG
,"%d clients connected (%d slaves), %zu bytes in use",
735 listLength(server
.clients
)-listLength(server
.slaves
),
736 listLength(server
.slaves
),
738 dictSize(server
.sharingpool
));
741 /* Close connections of timedout clients */
742 if (server
.maxidletime
&& !(loops
% 10))
743 closeTimedoutClients();
745 /* Check if a background saving in progress terminated */
746 if (server
.bgsaveinprogress
) {
748 /* XXX: TODO handle the case of the saving child killed */
749 if (wait4(-1,&statloc
,WNOHANG
,NULL
)) {
750 int exitcode
= WEXITSTATUS(statloc
);
752 redisLog(REDIS_NOTICE
,
753 "Background saving terminated with success");
755 server
.lastsave
= time(NULL
);
757 redisLog(REDIS_WARNING
,
758 "Background saving error");
760 server
.bgsaveinprogress
= 0;
761 updateSalvesWaitingBgsave(exitcode
== 0 ? REDIS_OK
: REDIS_ERR
);
764 /* If there is not a background saving in progress check if
765 * we have to save now */
766 time_t now
= time(NULL
);
767 for (j
= 0; j
< server
.saveparamslen
; j
++) {
768 struct saveparam
*sp
= server
.saveparams
+j
;
770 if (server
.dirty
>= sp
->changes
&&
771 now
-server
.lastsave
> sp
->seconds
) {
772 redisLog(REDIS_NOTICE
,"%d changes in %d seconds. Saving...",
773 sp
->changes
, sp
->seconds
);
774 rdbSaveBackground(server
.dbfilename
);
780 /* Try to expire a few timed out keys */
781 for (j
= 0; j
< server
.dbnum
; j
++) {
782 redisDb
*db
= server
.db
+j
;
783 int num
= dictSize(db
->expires
);
786 time_t now
= time(NULL
);
788 if (num
> REDIS_EXPIRELOOKUPS_PER_CRON
)
789 num
= REDIS_EXPIRELOOKUPS_PER_CRON
;
794 if ((de
= dictGetRandomKey(db
->expires
)) == NULL
) break;
795 t
= (time_t) dictGetEntryVal(de
);
797 deleteKey(db
,dictGetEntryKey(de
));
803 /* Check if we should connect to a MASTER */
804 if (server
.replstate
== REDIS_REPL_CONNECT
) {
805 redisLog(REDIS_NOTICE
,"Connecting to MASTER...");
806 if (syncWithMaster() == REDIS_OK
) {
807 redisLog(REDIS_NOTICE
,"MASTER <-> SLAVE sync succeeded");
813 static void createSharedObjects(void) {
814 shared
.crlf
= createObject(REDIS_STRING
,sdsnew("\r\n"));
815 shared
.ok
= createObject(REDIS_STRING
,sdsnew("+OK\r\n"));
816 shared
.err
= createObject(REDIS_STRING
,sdsnew("-ERR\r\n"));
817 shared
.emptybulk
= createObject(REDIS_STRING
,sdsnew("$0\r\n\r\n"));
818 shared
.czero
= createObject(REDIS_STRING
,sdsnew(":0\r\n"));
819 shared
.cone
= createObject(REDIS_STRING
,sdsnew(":1\r\n"));
820 shared
.nullbulk
= createObject(REDIS_STRING
,sdsnew("$-1\r\n"));
821 shared
.nullmultibulk
= createObject(REDIS_STRING
,sdsnew("*-1\r\n"));
822 shared
.emptymultibulk
= createObject(REDIS_STRING
,sdsnew("*0\r\n"));
824 shared
.pong
= createObject(REDIS_STRING
,sdsnew("+PONG\r\n"));
825 shared
.wrongtypeerr
= createObject(REDIS_STRING
,sdsnew(
826 "-ERR Operation against a key holding the wrong kind of value\r\n"));
827 shared
.nokeyerr
= createObject(REDIS_STRING
,sdsnew(
828 "-ERR no such key\r\n"));
829 shared
.syntaxerr
= createObject(REDIS_STRING
,sdsnew(
830 "-ERR syntax error\r\n"));
831 shared
.sameobjecterr
= createObject(REDIS_STRING
,sdsnew(
832 "-ERR source and destination objects are the same\r\n"));
833 shared
.outofrangeerr
= createObject(REDIS_STRING
,sdsnew(
834 "-ERR index out of range\r\n"));
835 shared
.space
= createObject(REDIS_STRING
,sdsnew(" "));
836 shared
.colon
= createObject(REDIS_STRING
,sdsnew(":"));
837 shared
.plus
= createObject(REDIS_STRING
,sdsnew("+"));
838 shared
.select0
= createStringObject("select 0\r\n",10);
839 shared
.select1
= createStringObject("select 1\r\n",10);
840 shared
.select2
= createStringObject("select 2\r\n",10);
841 shared
.select3
= createStringObject("select 3\r\n",10);
842 shared
.select4
= createStringObject("select 4\r\n",10);
843 shared
.select5
= createStringObject("select 5\r\n",10);
844 shared
.select6
= createStringObject("select 6\r\n",10);
845 shared
.select7
= createStringObject("select 7\r\n",10);
846 shared
.select8
= createStringObject("select 8\r\n",10);
847 shared
.select9
= createStringObject("select 9\r\n",10);
850 static void appendServerSaveParams(time_t seconds
, int changes
) {
851 server
.saveparams
= zrealloc(server
.saveparams
,sizeof(struct saveparam
)*(server
.saveparamslen
+1));
852 if (server
.saveparams
== NULL
) oom("appendServerSaveParams");
853 server
.saveparams
[server
.saveparamslen
].seconds
= seconds
;
854 server
.saveparams
[server
.saveparamslen
].changes
= changes
;
855 server
.saveparamslen
++;
858 static void ResetServerSaveParams() {
859 zfree(server
.saveparams
);
860 server
.saveparams
= NULL
;
861 server
.saveparamslen
= 0;
864 static void initServerConfig() {
865 server
.dbnum
= REDIS_DEFAULT_DBNUM
;
866 server
.port
= REDIS_SERVERPORT
;
867 server
.verbosity
= REDIS_DEBUG
;
868 server
.maxidletime
= REDIS_MAXIDLETIME
;
869 server
.saveparams
= NULL
;
870 server
.logfile
= NULL
; /* NULL = log on standard output */
871 server
.bindaddr
= NULL
;
872 server
.glueoutputbuf
= 1;
873 server
.daemonize
= 0;
874 server
.pidfile
= "/var/run/redis.pid";
875 server
.dbfilename
= "dump.rdb";
876 server
.requirepass
= NULL
;
877 server
.shareobjects
= 0;
878 server
.maxclients
= 0;
879 server
.maxmemory
= 0;
880 ResetServerSaveParams();
882 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
883 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
884 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
885 /* Replication related */
887 server
.masterhost
= NULL
;
888 server
.masterport
= 6379;
889 server
.master
= NULL
;
890 server
.replstate
= REDIS_REPL_NONE
;
893 static void initServer() {
896 signal(SIGHUP
, SIG_IGN
);
897 signal(SIGPIPE
, SIG_IGN
);
899 server
.clients
= listCreate();
900 server
.slaves
= listCreate();
901 server
.monitors
= listCreate();
902 server
.objfreelist
= listCreate();
903 createSharedObjects();
904 server
.el
= aeCreateEventLoop();
905 server
.db
= zmalloc(sizeof(redisDb
)*server
.dbnum
);
906 server
.sharingpool
= dictCreate(&setDictType
,NULL
);
907 server
.sharingpoolsize
= 1024;
908 if (!server
.db
|| !server
.clients
|| !server
.slaves
|| !server
.monitors
|| !server
.el
|| !server
.objfreelist
)
909 oom("server initialization"); /* Fatal OOM */
910 server
.fd
= anetTcpServer(server
.neterr
, server
.port
, server
.bindaddr
);
911 if (server
.fd
== -1) {
912 redisLog(REDIS_WARNING
, "Opening TCP port: %s", server
.neterr
);
915 for (j
= 0; j
< server
.dbnum
; j
++) {
916 server
.db
[j
].dict
= dictCreate(&hashDictType
,NULL
);
917 server
.db
[j
].expires
= dictCreate(&setDictType
,NULL
);
920 server
.cronloops
= 0;
921 server
.bgsaveinprogress
= 0;
922 server
.lastsave
= time(NULL
);
924 server
.usedmemory
= 0;
925 server
.stat_numcommands
= 0;
926 server
.stat_numconnections
= 0;
927 server
.stat_starttime
= time(NULL
);
928 aeCreateTimeEvent(server
.el
, 1000, serverCron
, NULL
, NULL
);
931 /* Empty the whole database */
932 static long long emptyDb() {
934 long long removed
= 0;
936 for (j
= 0; j
< server
.dbnum
; j
++) {
937 removed
+= dictSize(server
.db
[j
].dict
);
938 dictEmpty(server
.db
[j
].dict
);
939 dictEmpty(server
.db
[j
].expires
);
944 static int yesnotoi(char *s
) {
945 if (!strcasecmp(s
,"yes")) return 1;
946 else if (!strcasecmp(s
,"no")) return 0;
950 /* I agree, this is a very rudimental way to load a configuration...
951 will improve later if the config gets more complex */
952 static void loadServerConfig(char *filename
) {
953 FILE *fp
= fopen(filename
,"r");
954 char buf
[REDIS_CONFIGLINE_MAX
+1], *err
= NULL
;
959 redisLog(REDIS_WARNING
,"Fatal error, can't open config file");
962 while(fgets(buf
,REDIS_CONFIGLINE_MAX
+1,fp
) != NULL
) {
968 line
= sdstrim(line
," \t\r\n");
970 /* Skip comments and blank lines*/
971 if (line
[0] == '#' || line
[0] == '\0') {
976 /* Split into arguments */
977 argv
= sdssplitlen(line
,sdslen(line
)," ",1,&argc
);
980 /* Execute config directives */
981 if (!strcasecmp(argv
[0],"timeout") && argc
== 2) {
982 server
.maxidletime
= atoi(argv
[1]);
983 if (server
.maxidletime
< 0) {
984 err
= "Invalid timeout value"; goto loaderr
;
986 } else if (!strcasecmp(argv
[0],"port") && argc
== 2) {
987 server
.port
= atoi(argv
[1]);
988 if (server
.port
< 1 || server
.port
> 65535) {
989 err
= "Invalid port"; goto loaderr
;
991 } else if (!strcasecmp(argv
[0],"bind") && argc
== 2) {
992 server
.bindaddr
= zstrdup(argv
[1]);
993 } else if (!strcasecmp(argv
[0],"save") && argc
== 3) {
994 int seconds
= atoi(argv
[1]);
995 int changes
= atoi(argv
[2]);
996 if (seconds
< 1 || changes
< 0) {
997 err
= "Invalid save parameters"; goto loaderr
;
999 appendServerSaveParams(seconds
,changes
);
1000 } else if (!strcasecmp(argv
[0],"dir") && argc
== 2) {
1001 if (chdir(argv
[1]) == -1) {
1002 redisLog(REDIS_WARNING
,"Can't chdir to '%s': %s",
1003 argv
[1], strerror(errno
));
1006 } else if (!strcasecmp(argv
[0],"loglevel") && argc
== 2) {
1007 if (!strcasecmp(argv
[1],"debug")) server
.verbosity
= REDIS_DEBUG
;
1008 else if (!strcasecmp(argv
[1],"notice")) server
.verbosity
= REDIS_NOTICE
;
1009 else if (!strcasecmp(argv
[1],"warning")) server
.verbosity
= REDIS_WARNING
;
1011 err
= "Invalid log level. Must be one of debug, notice, warning";
1014 } else if (!strcasecmp(argv
[0],"logfile") && argc
== 2) {
1017 server
.logfile
= zstrdup(argv
[1]);
1018 if (!strcasecmp(server
.logfile
,"stdout")) {
1019 zfree(server
.logfile
);
1020 server
.logfile
= NULL
;
1022 if (server
.logfile
) {
1023 /* Test if we are able to open the file. The server will not
1024 * be able to abort just for this problem later... */
1025 fp
= fopen(server
.logfile
,"a");
1027 err
= sdscatprintf(sdsempty(),
1028 "Can't open the log file: %s", strerror(errno
));
1033 } else if (!strcasecmp(argv
[0],"databases") && argc
== 2) {
1034 server
.dbnum
= atoi(argv
[1]);
1035 if (server
.dbnum
< 1) {
1036 err
= "Invalid number of databases"; goto loaderr
;
1038 } else if (!strcasecmp(argv
[0],"maxclients") && argc
== 2) {
1039 server
.maxclients
= atoi(argv
[1]);
1040 } else if (!strcasecmp(argv
[0],"maxmemory") && argc
== 2) {
1041 server
.maxmemory
= atoi(argv
[1]);
1042 } else if (!strcasecmp(argv
[0],"slaveof") && argc
== 3) {
1043 server
.masterhost
= sdsnew(argv
[1]);
1044 server
.masterport
= atoi(argv
[2]);
1045 server
.replstate
= REDIS_REPL_CONNECT
;
1046 } else if (!strcasecmp(argv
[0],"glueoutputbuf") && argc
== 2) {
1047 if ((server
.glueoutputbuf
= yesnotoi(argv
[1])) == -1) {
1048 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1050 } else if (!strcasecmp(argv
[0],"shareobjects") && argc
== 2) {
1051 if ((server
.shareobjects
= yesnotoi(argv
[1])) == -1) {
1052 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1054 } else if (!strcasecmp(argv
[0],"shareobjectspoolsize") && argc
== 2) {
1055 server
.sharingpoolsize
= atoi(argv
[1]);
1056 if (server
.sharingpoolsize
< 1) {
1057 err
= "invalid object sharing pool size"; goto loaderr
;
1059 } else if (!strcasecmp(argv
[0],"daemonize") && argc
== 2) {
1060 if ((server
.daemonize
= yesnotoi(argv
[1])) == -1) {
1061 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1063 } else if (!strcasecmp(argv
[0],"requirepass") && argc
== 2) {
1064 server
.requirepass
= zstrdup(argv
[1]);
1065 } else if (!strcasecmp(argv
[0],"pidfile") && argc
== 2) {
1066 server
.pidfile
= zstrdup(argv
[1]);
1067 } else if (!strcasecmp(argv
[0],"dbfilename") && argc
== 2) {
1068 server
.dbfilename
= zstrdup(argv
[1]);
1070 err
= "Bad directive or wrong number of arguments"; goto loaderr
;
1072 for (j
= 0; j
< argc
; j
++)
1081 fprintf(stderr
, "\n*** FATAL CONFIG FILE ERROR ***\n");
1082 fprintf(stderr
, "Reading the configuration file, at line %d\n", linenum
);
1083 fprintf(stderr
, ">>> '%s'\n", line
);
1084 fprintf(stderr
, "%s\n", err
);
1088 static void freeClientArgv(redisClient
*c
) {
1091 for (j
= 0; j
< c
->argc
; j
++)
1092 decrRefCount(c
->argv
[j
]);
1096 static void freeClient(redisClient
*c
) {
1099 aeDeleteFileEvent(server
.el
,c
->fd
,AE_READABLE
);
1100 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1101 sdsfree(c
->querybuf
);
1102 listRelease(c
->reply
);
1105 ln
= listSearchKey(server
.clients
,c
);
1107 listDelNode(server
.clients
,ln
);
1108 if (c
->flags
& REDIS_SLAVE
) {
1109 if (c
->replstate
== REDIS_REPL_SEND_BULK
&& c
->repldbfd
!= -1)
1111 list
*l
= (c
->flags
& REDIS_MONITOR
) ? server
.monitors
: server
.slaves
;
1112 ln
= listSearchKey(l
,c
);
1116 if (c
->flags
& REDIS_MASTER
) {
1117 server
.master
= NULL
;
1118 server
.replstate
= REDIS_REPL_CONNECT
;
1124 static void glueReplyBuffersIfNeeded(redisClient
*c
) {
1129 listRewind(c
->reply
);
1130 while((ln
= listYield(c
->reply
))) {
1132 totlen
+= sdslen(o
->ptr
);
1133 /* This optimization makes more sense if we don't have to copy
1135 if (totlen
> 1024) return;
1141 listRewind(c
->reply
);
1142 while((ln
= listYield(c
->reply
))) {
1144 memcpy(buf
+copylen
,o
->ptr
,sdslen(o
->ptr
));
1145 copylen
+= sdslen(o
->ptr
);
1146 listDelNode(c
->reply
,ln
);
1148 /* Now the output buffer is empty, add the new single element */
1149 o
= createObject(REDIS_STRING
,sdsnewlen(buf
,totlen
));
1150 if (!listAddNodeTail(c
->reply
,o
)) oom("listAddNodeTail");
1154 static void sendReplyToClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1155 redisClient
*c
= privdata
;
1156 int nwritten
= 0, totwritten
= 0, objlen
;
1159 REDIS_NOTUSED(mask
);
1161 if (server
.glueoutputbuf
&& listLength(c
->reply
) > 1)
1162 glueReplyBuffersIfNeeded(c
);
1163 while(listLength(c
->reply
)) {
1164 o
= listNodeValue(listFirst(c
->reply
));
1165 objlen
= sdslen(o
->ptr
);
1168 listDelNode(c
->reply
,listFirst(c
->reply
));
1172 if (c
->flags
& REDIS_MASTER
) {
1173 nwritten
= objlen
- c
->sentlen
;
1175 nwritten
= write(fd
, ((char*)o
->ptr
)+c
->sentlen
, objlen
- c
->sentlen
);
1176 if (nwritten
<= 0) break;
1178 c
->sentlen
+= nwritten
;
1179 totwritten
+= nwritten
;
1180 /* If we fully sent the object on head go to the next one */
1181 if (c
->sentlen
== objlen
) {
1182 listDelNode(c
->reply
,listFirst(c
->reply
));
1186 if (nwritten
== -1) {
1187 if (errno
== EAGAIN
) {
1190 redisLog(REDIS_DEBUG
,
1191 "Error writing to client: %s", strerror(errno
));
1196 if (totwritten
> 0) c
->lastinteraction
= time(NULL
);
1197 if (listLength(c
->reply
) == 0) {
1199 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1203 static struct redisCommand
*lookupCommand(char *name
) {
1205 while(cmdTable
[j
].name
!= NULL
) {
1206 if (!strcasecmp(name
,cmdTable
[j
].name
)) return &cmdTable
[j
];
1212 /* resetClient prepare the client to process the next command */
1213 static void resetClient(redisClient
*c
) {
1218 /* If this function gets called we already read a whole
1219 * command, argments are in the client argv/argc fields.
1220 * processCommand() execute the command or prepare the
1221 * server for a bulk read from the client.
1223 * If 1 is returned the client is still alive and valid and
1224 * and other operations can be performed by the caller. Otherwise
1225 * if 0 is returned the client was destroied (i.e. after QUIT). */
1226 static int processCommand(redisClient
*c
) {
1227 struct redisCommand
*cmd
;
1230 /* Free some memory if needed (maxmemory setting) */
1231 if (server
.maxmemory
) freeMemoryIfNeeded();
1233 /* The QUIT command is handled as a special case. Normal command
1234 * procs are unable to close the client connection safely */
1235 if (!strcasecmp(c
->argv
[0]->ptr
,"quit")) {
1239 cmd
= lookupCommand(c
->argv
[0]->ptr
);
1241 addReplySds(c
,sdsnew("-ERR unknown command\r\n"));
1244 } else if ((cmd
->arity
> 0 && cmd
->arity
!= c
->argc
) ||
1245 (c
->argc
< -cmd
->arity
)) {
1246 addReplySds(c
,sdsnew("-ERR wrong number of arguments\r\n"));
1249 } else if (server
.maxmemory
&& cmd
->flags
& REDIS_CMD_DENYOOM
&& zmalloc_used_memory() > server
.maxmemory
) {
1250 addReplySds(c
,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1253 } else if (cmd
->flags
& REDIS_CMD_BULK
&& c
->bulklen
== -1) {
1254 int bulklen
= atoi(c
->argv
[c
->argc
-1]->ptr
);
1256 decrRefCount(c
->argv
[c
->argc
-1]);
1257 if (bulklen
< 0 || bulklen
> 1024*1024*1024) {
1259 addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n"));
1264 c
->bulklen
= bulklen
+2; /* add two bytes for CR+LF */
1265 /* It is possible that the bulk read is already in the
1266 * buffer. Check this condition and handle it accordingly */
1267 if ((signed)sdslen(c
->querybuf
) >= c
->bulklen
) {
1268 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
1270 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
1275 /* Let's try to share objects on the command arguments vector */
1276 if (server
.shareobjects
) {
1278 for(j
= 1; j
< c
->argc
; j
++)
1279 c
->argv
[j
] = tryObjectSharing(c
->argv
[j
]);
1281 /* Check if the user is authenticated */
1282 if (server
.requirepass
&& !c
->authenticated
&& cmd
->proc
!= authCommand
) {
1283 addReplySds(c
,sdsnew("-ERR operation not permitted\r\n"));
1288 /* Exec the command */
1289 dirty
= server
.dirty
;
1291 if (server
.dirty
-dirty
!= 0 && listLength(server
.slaves
))
1292 replicationFeedSlaves(server
.slaves
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1293 if (listLength(server
.monitors
))
1294 replicationFeedSlaves(server
.monitors
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1295 server
.stat_numcommands
++;
1297 /* Prepare the client for the next command */
1298 if (c
->flags
& REDIS_CLOSE
) {
1306 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
) {
1310 /* (args*2)+1 is enough room for args, spaces, newlines */
1311 robj
*static_outv
[REDIS_STATIC_ARGS
*2+1];
1313 if (argc
<= REDIS_STATIC_ARGS
) {
1316 outv
= zmalloc(sizeof(robj
*)*(argc
*2+1));
1317 if (!outv
) oom("replicationFeedSlaves");
1320 for (j
= 0; j
< argc
; j
++) {
1321 if (j
!= 0) outv
[outc
++] = shared
.space
;
1322 if ((cmd
->flags
& REDIS_CMD_BULK
) && j
== argc
-1) {
1325 lenobj
= createObject(REDIS_STRING
,
1326 sdscatprintf(sdsempty(),"%d\r\n",sdslen(argv
[j
]->ptr
)));
1327 lenobj
->refcount
= 0;
1328 outv
[outc
++] = lenobj
;
1330 outv
[outc
++] = argv
[j
];
1332 outv
[outc
++] = shared
.crlf
;
1334 /* Increment all the refcounts at start and decrement at end in order to
1335 * be sure to free objects if there is no slave in a replication state
1336 * able to be feed with commands */
1337 for (j
= 0; j
< outc
; j
++) incrRefCount(outv
[j
]);
1339 while((ln
= listYield(slaves
))) {
1340 redisClient
*slave
= ln
->value
;
1342 /* Don't feed slaves that are still waiting for BGSAVE to start */
1343 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) continue;
1345 /* Feed all the other slaves, MONITORs and so on */
1346 if (slave
->slaveseldb
!= dictid
) {
1350 case 0: selectcmd
= shared
.select0
; break;
1351 case 1: selectcmd
= shared
.select1
; break;
1352 case 2: selectcmd
= shared
.select2
; break;
1353 case 3: selectcmd
= shared
.select3
; break;
1354 case 4: selectcmd
= shared
.select4
; break;
1355 case 5: selectcmd
= shared
.select5
; break;
1356 case 6: selectcmd
= shared
.select6
; break;
1357 case 7: selectcmd
= shared
.select7
; break;
1358 case 8: selectcmd
= shared
.select8
; break;
1359 case 9: selectcmd
= shared
.select9
; break;
1361 selectcmd
= createObject(REDIS_STRING
,
1362 sdscatprintf(sdsempty(),"select %d\r\n",dictid
));
1363 selectcmd
->refcount
= 0;
1366 addReply(slave
,selectcmd
);
1367 slave
->slaveseldb
= dictid
;
1369 for (j
= 0; j
< outc
; j
++) addReply(slave
,outv
[j
]);
1371 for (j
= 0; j
< outc
; j
++) decrRefCount(outv
[j
]);
1372 if (outv
!= static_outv
) zfree(outv
);
1375 static void readQueryFromClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1376 redisClient
*c
= (redisClient
*) privdata
;
1377 char buf
[REDIS_IOBUF_LEN
];
1380 REDIS_NOTUSED(mask
);
1382 nread
= read(fd
, buf
, REDIS_IOBUF_LEN
);
1384 if (errno
== EAGAIN
) {
1387 redisLog(REDIS_DEBUG
, "Reading from client: %s",strerror(errno
));
1391 } else if (nread
== 0) {
1392 redisLog(REDIS_DEBUG
, "Client closed connection");
1397 c
->querybuf
= sdscatlen(c
->querybuf
, buf
, nread
);
1398 c
->lastinteraction
= time(NULL
);
1404 if (c
->bulklen
== -1) {
1405 /* Read the first line of the query */
1406 char *p
= strchr(c
->querybuf
,'\n');
1412 query
= c
->querybuf
;
1413 c
->querybuf
= sdsempty();
1414 querylen
= 1+(p
-(query
));
1415 if (sdslen(query
) > querylen
) {
1416 /* leave data after the first line of the query in the buffer */
1417 c
->querybuf
= sdscatlen(c
->querybuf
,query
+querylen
,sdslen(query
)-querylen
);
1419 *p
= '\0'; /* remove "\n" */
1420 if (*(p
-1) == '\r') *(p
-1) = '\0'; /* and "\r" if any */
1421 sdsupdatelen(query
);
1423 /* Now we can split the query in arguments */
1424 if (sdslen(query
) == 0) {
1425 /* Ignore empty query */
1429 argv
= sdssplitlen(query
,sdslen(query
)," ",1,&argc
);
1430 if (argv
== NULL
) oom("sdssplitlen");
1433 if (c
->argv
) zfree(c
->argv
);
1434 c
->argv
= zmalloc(sizeof(robj
*)*argc
);
1435 if (c
->argv
== NULL
) oom("allocating arguments list for client");
1437 for (j
= 0; j
< argc
; j
++) {
1438 if (sdslen(argv
[j
])) {
1439 c
->argv
[c
->argc
] = createObject(REDIS_STRING
,argv
[j
]);
1446 /* Execute the command. If the client is still valid
1447 * after processCommand() return and there is something
1448 * on the query buffer try to process the next command. */
1449 if (processCommand(c
) && sdslen(c
->querybuf
)) goto again
;
1451 } else if (sdslen(c
->querybuf
) >= 1024*32) {
1452 redisLog(REDIS_DEBUG
, "Client protocol error");
1457 /* Bulk read handling. Note that if we are at this point
1458 the client already sent a command terminated with a newline,
1459 we are reading the bulk data that is actually the last
1460 argument of the command. */
1461 int qbl
= sdslen(c
->querybuf
);
1463 if (c
->bulklen
<= qbl
) {
1464 /* Copy everything but the final CRLF as final argument */
1465 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
1467 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
1474 static int selectDb(redisClient
*c
, int id
) {
1475 if (id
< 0 || id
>= server
.dbnum
)
1477 c
->db
= &server
.db
[id
];
1481 static void *dupClientReplyValue(void *o
) {
1482 incrRefCount((robj
*)o
);
1486 static redisClient
*createClient(int fd
) {
1487 redisClient
*c
= zmalloc(sizeof(*c
));
1489 anetNonBlock(NULL
,fd
);
1490 anetTcpNoDelay(NULL
,fd
);
1491 if (!c
) return NULL
;
1494 c
->querybuf
= sdsempty();
1500 c
->lastinteraction
= time(NULL
);
1501 c
->authenticated
= 0;
1502 c
->replstate
= REDIS_REPL_NONE
;
1503 if ((c
->reply
= listCreate()) == NULL
) oom("listCreate");
1504 listSetFreeMethod(c
->reply
,decrRefCount
);
1505 listSetDupMethod(c
->reply
,dupClientReplyValue
);
1506 if (aeCreateFileEvent(server
.el
, c
->fd
, AE_READABLE
,
1507 readQueryFromClient
, c
, NULL
) == AE_ERR
) {
1511 if (!listAddNodeTail(server
.clients
,c
)) oom("listAddNodeTail");
1515 static void addReply(redisClient
*c
, robj
*obj
) {
1516 if (listLength(c
->reply
) == 0 &&
1517 (c
->replstate
== REDIS_REPL_NONE
||
1518 c
->replstate
== REDIS_REPL_ONLINE
) &&
1519 aeCreateFileEvent(server
.el
, c
->fd
, AE_WRITABLE
,
1520 sendReplyToClient
, c
, NULL
) == AE_ERR
) return;
1521 if (!listAddNodeTail(c
->reply
,obj
)) oom("listAddNodeTail");
1525 static void addReplySds(redisClient
*c
, sds s
) {
1526 robj
*o
= createObject(REDIS_STRING
,s
);
1531 static void acceptHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1536 REDIS_NOTUSED(mask
);
1537 REDIS_NOTUSED(privdata
);
1539 cfd
= anetAccept(server
.neterr
, fd
, cip
, &cport
);
1540 if (cfd
== AE_ERR
) {
1541 redisLog(REDIS_DEBUG
,"Accepting client connection: %s", server
.neterr
);
1544 redisLog(REDIS_DEBUG
,"Accepted %s:%d", cip
, cport
);
1545 if ((c
= createClient(cfd
)) == NULL
) {
1546 redisLog(REDIS_WARNING
,"Error allocating resoures for the client");
1547 close(cfd
); /* May be already closed, just ingore errors */
1550 /* If maxclient directive is set and this is one client more... close the
1551 * connection. Note that we create the client instead to check before
1552 * for this condition, since now the socket is already set in nonblocking
1553 * mode and we can send an error for free using the Kernel I/O */
1554 if (server
.maxclients
&& listLength(server
.clients
) > server
.maxclients
) {
1555 char *err
= "-ERR max number of clients reached\r\n";
1557 /* That's a best effort error message, don't check write errors */
1558 (void) write(c
->fd
,err
,strlen(err
));
1562 server
.stat_numconnections
++;
1565 /* ======================= Redis objects implementation ===================== */
1567 static robj
*createObject(int type
, void *ptr
) {
1570 if (listLength(server
.objfreelist
)) {
1571 listNode
*head
= listFirst(server
.objfreelist
);
1572 o
= listNodeValue(head
);
1573 listDelNode(server
.objfreelist
,head
);
1575 o
= zmalloc(sizeof(*o
));
1577 if (!o
) oom("createObject");
1584 static robj
*createStringObject(char *ptr
, size_t len
) {
1585 return createObject(REDIS_STRING
,sdsnewlen(ptr
,len
));
1588 static robj
*createListObject(void) {
1589 list
*l
= listCreate();
1591 if (!l
) oom("listCreate");
1592 listSetFreeMethod(l
,decrRefCount
);
1593 return createObject(REDIS_LIST
,l
);
1596 static robj
*createSetObject(void) {
1597 dict
*d
= dictCreate(&setDictType
,NULL
);
1598 if (!d
) oom("dictCreate");
1599 return createObject(REDIS_SET
,d
);
1602 static void freeStringObject(robj
*o
) {
1606 static void freeListObject(robj
*o
) {
1607 listRelease((list
*) o
->ptr
);
1610 static void freeSetObject(robj
*o
) {
1611 dictRelease((dict
*) o
->ptr
);
1614 static void freeHashObject(robj
*o
) {
1615 dictRelease((dict
*) o
->ptr
);
1618 static void incrRefCount(robj
*o
) {
1620 #ifdef DEBUG_REFCOUNT
1621 if (o
->type
== REDIS_STRING
)
1622 printf("Increment '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
);
1626 static void decrRefCount(void *obj
) {
1629 #ifdef DEBUG_REFCOUNT
1630 if (o
->type
== REDIS_STRING
)
1631 printf("Decrement '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
-1);
1633 if (--(o
->refcount
) == 0) {
1635 case REDIS_STRING
: freeStringObject(o
); break;
1636 case REDIS_LIST
: freeListObject(o
); break;
1637 case REDIS_SET
: freeSetObject(o
); break;
1638 case REDIS_HASH
: freeHashObject(o
); break;
1639 default: assert(0 != 0); break;
1641 if (listLength(server
.objfreelist
) > REDIS_OBJFREELIST_MAX
||
1642 !listAddNodeHead(server
.objfreelist
,o
))
1647 /* Try to share an object against the shared objects pool */
1648 static robj
*tryObjectSharing(robj
*o
) {
1649 struct dictEntry
*de
;
1652 if (o
== NULL
|| server
.shareobjects
== 0) return o
;
1654 assert(o
->type
== REDIS_STRING
);
1655 de
= dictFind(server
.sharingpool
,o
);
1657 robj
*shared
= dictGetEntryKey(de
);
1659 c
= ((unsigned long) dictGetEntryVal(de
))+1;
1660 dictGetEntryVal(de
) = (void*) c
;
1661 incrRefCount(shared
);
1665 /* Here we are using a stream algorihtm: Every time an object is
1666 * shared we increment its count, everytime there is a miss we
1667 * recrement the counter of a random object. If this object reaches
1668 * zero we remove the object and put the current object instead. */
1669 if (dictSize(server
.sharingpool
) >=
1670 server
.sharingpoolsize
) {
1671 de
= dictGetRandomKey(server
.sharingpool
);
1673 c
= ((unsigned long) dictGetEntryVal(de
))-1;
1674 dictGetEntryVal(de
) = (void*) c
;
1676 dictDelete(server
.sharingpool
,de
->key
);
1679 c
= 0; /* If the pool is empty we want to add this object */
1684 retval
= dictAdd(server
.sharingpool
,o
,(void*)1);
1685 assert(retval
== DICT_OK
);
1692 static robj
*lookupKey(redisDb
*db
, robj
*key
) {
1693 dictEntry
*de
= dictFind(db
->dict
,key
);
1694 return de
? dictGetEntryVal(de
) : NULL
;
1697 static robj
*lookupKeyRead(redisDb
*db
, robj
*key
) {
1698 expireIfNeeded(db
,key
);
1699 return lookupKey(db
,key
);
1702 static robj
*lookupKeyWrite(redisDb
*db
, robj
*key
) {
1703 deleteIfVolatile(db
,key
);
1704 return lookupKey(db
,key
);
1707 static int deleteKey(redisDb
*db
, robj
*key
) {
1710 /* We need to protect key from destruction: after the first dictDelete()
1711 * it may happen that 'key' is no longer valid if we don't increment
1712 * it's count. This may happen when we get the object reference directly
1713 * from the hash table with dictRandomKey() or dict iterators */
1715 if (dictSize(db
->expires
)) dictDelete(db
->expires
,key
);
1716 retval
= dictDelete(db
->dict
,key
);
1719 return retval
== DICT_OK
;
1722 /*============================ DB saving/loading ============================ */
1724 static int rdbSaveType(FILE *fp
, unsigned char type
) {
1725 if (fwrite(&type
,1,1,fp
) == 0) return -1;
1729 static int rdbSaveTime(FILE *fp
, time_t t
) {
1730 int32_t t32
= (int32_t) t
;
1731 if (fwrite(&t32
,4,1,fp
) == 0) return -1;
1735 /* check rdbLoadLen() comments for more info */
1736 static int rdbSaveLen(FILE *fp
, uint32_t len
) {
1737 unsigned char buf
[2];
1740 /* Save a 6 bit len */
1741 buf
[0] = (len
&0xFF)|(REDIS_RDB_6BITLEN
<<6);
1742 if (fwrite(buf
,1,1,fp
) == 0) return -1;
1743 } else if (len
< (1<<14)) {
1744 /* Save a 14 bit len */
1745 buf
[0] = ((len
>>8)&0xFF)|(REDIS_RDB_14BITLEN
<<6);
1747 if (fwrite(buf
,2,1,fp
) == 0) return -1;
1749 /* Save a 32 bit len */
1750 buf
[0] = (REDIS_RDB_32BITLEN
<<6);
1751 if (fwrite(buf
,1,1,fp
) == 0) return -1;
1753 if (fwrite(&len
,4,1,fp
) == 0) return -1;
1758 /* String objects in the form "2391" "-100" without any space and with a
1759 * range of values that can fit in an 8, 16 or 32 bit signed value can be
1760 * encoded as integers to save space */
1761 int rdbTryIntegerEncoding(sds s
, unsigned char *enc
) {
1763 char *endptr
, buf
[32];
1765 /* Check if it's possible to encode this value as a number */
1766 value
= strtoll(s
, &endptr
, 10);
1767 if (endptr
[0] != '\0') return 0;
1768 snprintf(buf
,32,"%lld",value
);
1770 /* If the number converted back into a string is not identical
1771 * then it's not possible to encode the string as integer */
1772 if (strlen(buf
) != sdslen(s
) || memcmp(buf
,s
,sdslen(s
))) return 0;
1774 /* Finally check if it fits in our ranges */
1775 if (value
>= -(1<<7) && value
<= (1<<7)-1) {
1776 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT8
;
1777 enc
[1] = value
&0xFF;
1779 } else if (value
>= -(1<<15) && value
<= (1<<15)-1) {
1780 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT16
;
1781 enc
[1] = value
&0xFF;
1782 enc
[2] = (value
>>8)&0xFF;
1784 } else if (value
>= -((long long)1<<31) && value
<= ((long long)1<<31)-1) {
1785 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT32
;
1786 enc
[1] = value
&0xFF;
1787 enc
[2] = (value
>>8)&0xFF;
1788 enc
[3] = (value
>>16)&0xFF;
1789 enc
[4] = (value
>>24)&0xFF;
1796 static int rdbSaveLzfStringObject(FILE *fp
, robj
*obj
) {
1797 unsigned int comprlen
, outlen
;
1801 /* We require at least four bytes compression for this to be worth it */
1802 outlen
= sdslen(obj
->ptr
)-4;
1803 if (outlen
<= 0) return 0;
1804 if ((out
= zmalloc(outlen
+1)) == NULL
) return 0;
1805 comprlen
= lzf_compress(obj
->ptr
, sdslen(obj
->ptr
), out
, outlen
);
1806 if (comprlen
== 0) {
1810 /* Data compressed! Let's save it on disk */
1811 byte
= (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_LZF
;
1812 if (fwrite(&byte
,1,1,fp
) == 0) goto writeerr
;
1813 if (rdbSaveLen(fp
,comprlen
) == -1) goto writeerr
;
1814 if (rdbSaveLen(fp
,sdslen(obj
->ptr
)) == -1) goto writeerr
;
1815 if (fwrite(out
,comprlen
,1,fp
) == 0) goto writeerr
;
1824 /* Save a string objet as [len][data] on disk. If the object is a string
1825 * representation of an integer value we try to safe it in a special form */
1826 static int rdbSaveStringObject(FILE *fp
, robj
*obj
) {
1827 size_t len
= sdslen(obj
->ptr
);
1830 /* Try integer encoding */
1832 unsigned char buf
[5];
1833 if ((enclen
= rdbTryIntegerEncoding(obj
->ptr
,buf
)) > 0) {
1834 if (fwrite(buf
,enclen
,1,fp
) == 0) return -1;
1839 /* Try LZF compression - under 20 bytes it's unable to compress even
1840 * aaaaaaaaaaaaaaaaaa so skip it */
1841 if (1 && len
> 20) {
1844 retval
= rdbSaveLzfStringObject(fp
,obj
);
1845 if (retval
== -1) return -1;
1846 if (retval
> 0) return 0;
1847 /* retval == 0 means data can't be compressed, save the old way */
1850 /* Store verbatim */
1851 if (rdbSaveLen(fp
,len
) == -1) return -1;
1852 if (len
&& fwrite(obj
->ptr
,len
,1,fp
) == 0) return -1;
1856 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
1857 static int rdbSave(char *filename
) {
1858 dictIterator
*di
= NULL
;
1863 time_t now
= time(NULL
);
1865 snprintf(tmpfile
,256,"temp-%d.%ld.rdb",(int)time(NULL
),(long int)random());
1866 fp
= fopen(tmpfile
,"w");
1868 redisLog(REDIS_WARNING
, "Failed saving the DB: %s", strerror(errno
));
1871 if (fwrite("REDIS0001",9,1,fp
) == 0) goto werr
;
1872 for (j
= 0; j
< server
.dbnum
; j
++) {
1873 redisDb
*db
= server
.db
+j
;
1875 if (dictSize(d
) == 0) continue;
1876 di
= dictGetIterator(d
);
1882 /* Write the SELECT DB opcode */
1883 if (rdbSaveType(fp
,REDIS_SELECTDB
) == -1) goto werr
;
1884 if (rdbSaveLen(fp
,j
) == -1) goto werr
;
1886 /* Iterate this DB writing every entry */
1887 while((de
= dictNext(di
)) != NULL
) {
1888 robj
*key
= dictGetEntryKey(de
);
1889 robj
*o
= dictGetEntryVal(de
);
1890 time_t expiretime
= getExpire(db
,key
);
1892 /* Save the expire time */
1893 if (expiretime
!= -1) {
1894 /* If this key is already expired skip it */
1895 if (expiretime
< now
) continue;
1896 if (rdbSaveType(fp
,REDIS_EXPIRETIME
) == -1) goto werr
;
1897 if (rdbSaveTime(fp
,expiretime
) == -1) goto werr
;
1899 /* Save the key and associated value */
1900 if (rdbSaveType(fp
,o
->type
) == -1) goto werr
;
1901 if (rdbSaveStringObject(fp
,key
) == -1) goto werr
;
1902 if (o
->type
== REDIS_STRING
) {
1903 /* Save a string value */
1904 if (rdbSaveStringObject(fp
,o
) == -1) goto werr
;
1905 } else if (o
->type
== REDIS_LIST
) {
1906 /* Save a list value */
1907 list
*list
= o
->ptr
;
1911 if (rdbSaveLen(fp
,listLength(list
)) == -1) goto werr
;
1912 while((ln
= listYield(list
))) {
1913 robj
*eleobj
= listNodeValue(ln
);
1915 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
1917 } else if (o
->type
== REDIS_SET
) {
1918 /* Save a set value */
1920 dictIterator
*di
= dictGetIterator(set
);
1923 if (!set
) oom("dictGetIteraotr");
1924 if (rdbSaveLen(fp
,dictSize(set
)) == -1) goto werr
;
1925 while((de
= dictNext(di
)) != NULL
) {
1926 robj
*eleobj
= dictGetEntryKey(de
);
1928 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
1930 dictReleaseIterator(di
);
1935 dictReleaseIterator(di
);
1938 if (rdbSaveType(fp
,REDIS_EOF
) == -1) goto werr
;
1940 /* Make sure data will not remain on the OS's output buffers */
1945 /* Use RENAME to make sure the DB file is changed atomically only
1946 * if the generate DB file is ok. */
1947 if (rename(tmpfile
,filename
) == -1) {
1948 redisLog(REDIS_WARNING
,"Error moving temp DB file on the final destionation: %s", strerror(errno
));
1952 redisLog(REDIS_NOTICE
,"DB saved on disk");
1954 server
.lastsave
= time(NULL
);
1960 redisLog(REDIS_WARNING
,"Write error saving DB on disk: %s", strerror(errno
));
1961 if (di
) dictReleaseIterator(di
);
1965 static int rdbSaveBackground(char *filename
) {
1968 if (server
.bgsaveinprogress
) return REDIS_ERR
;
1969 if ((childpid
= fork()) == 0) {
1972 if (rdbSave(filename
) == REDIS_OK
) {
1979 if (childpid
== -1) {
1980 redisLog(REDIS_WARNING
,"Can't save in background: fork: %s",
1984 redisLog(REDIS_NOTICE
,"Background saving started by pid %d",childpid
);
1985 server
.bgsaveinprogress
= 1;
1988 return REDIS_OK
; /* unreached */
1991 static int rdbLoadType(FILE *fp
) {
1993 if (fread(&type
,1,1,fp
) == 0) return -1;
1997 static time_t rdbLoadTime(FILE *fp
) {
1999 if (fread(&t32
,4,1,fp
) == 0) return -1;
2000 return (time_t) t32
;
2003 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2004 * of this file for a description of how this are stored on disk.
2006 * isencoded is set to 1 if the readed length is not actually a length but
2007 * an "encoding type", check the above comments for more info */
2008 static uint32_t rdbLoadLen(FILE *fp
, int rdbver
, int *isencoded
) {
2009 unsigned char buf
[2];
2012 if (isencoded
) *isencoded
= 0;
2014 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
2019 if (fread(buf
,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2020 type
= (buf
[0]&0xC0)>>6;
2021 if (type
== REDIS_RDB_6BITLEN
) {
2022 /* Read a 6 bit len */
2024 } else if (type
== REDIS_RDB_ENCVAL
) {
2025 /* Read a 6 bit len encoding type */
2026 if (isencoded
) *isencoded
= 1;
2028 } else if (type
== REDIS_RDB_14BITLEN
) {
2029 /* Read a 14 bit len */
2030 if (fread(buf
+1,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2031 return ((buf
[0]&0x3F)<<8)|buf
[1];
2033 /* Read a 32 bit len */
2034 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
2040 static robj
*rdbLoadIntegerObject(FILE *fp
, int enctype
) {
2041 unsigned char enc
[4];
2044 if (enctype
== REDIS_RDB_ENC_INT8
) {
2045 if (fread(enc
,1,1,fp
) == 0) return NULL
;
2046 val
= (signed char)enc
[0];
2047 } else if (enctype
== REDIS_RDB_ENC_INT16
) {
2049 if (fread(enc
,2,1,fp
) == 0) return NULL
;
2050 v
= enc
[0]|(enc
[1]<<8);
2052 } else if (enctype
== REDIS_RDB_ENC_INT32
) {
2054 if (fread(enc
,4,1,fp
) == 0) return NULL
;
2055 v
= enc
[0]|(enc
[1]<<8)|(enc
[2]<<16)|(enc
[3]<<24);
2058 val
= 0; /* anti-warning */
2061 return createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",val
));
2064 static robj
*rdbLoadLzfStringObject(FILE*fp
, int rdbver
) {
2065 unsigned int len
, clen
;
2066 unsigned char *c
= NULL
;
2069 if ((clen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2070 if ((len
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2071 if ((c
= zmalloc(clen
)) == NULL
) goto err
;
2072 if ((val
= sdsnewlen(NULL
,len
)) == NULL
) goto err
;
2073 if (fread(c
,clen
,1,fp
) == 0) goto err
;
2074 if (lzf_decompress(c
,clen
,val
,len
) == 0) goto err
;
2076 return createObject(REDIS_STRING
,val
);
2083 static robj
*rdbLoadStringObject(FILE*fp
, int rdbver
) {
2088 len
= rdbLoadLen(fp
,rdbver
,&isencoded
);
2091 case REDIS_RDB_ENC_INT8
:
2092 case REDIS_RDB_ENC_INT16
:
2093 case REDIS_RDB_ENC_INT32
:
2094 return tryObjectSharing(rdbLoadIntegerObject(fp
,len
));
2095 case REDIS_RDB_ENC_LZF
:
2096 return tryObjectSharing(rdbLoadLzfStringObject(fp
,rdbver
));
2102 if (len
== REDIS_RDB_LENERR
) return NULL
;
2103 val
= sdsnewlen(NULL
,len
);
2104 if (len
&& fread(val
,len
,1,fp
) == 0) {
2108 return tryObjectSharing(createObject(REDIS_STRING
,val
));
2111 static int rdbLoad(char *filename
) {
2113 robj
*keyobj
= NULL
;
2115 int type
, retval
, rdbver
;
2116 dict
*d
= server
.db
[0].dict
;
2117 redisDb
*db
= server
.db
+0;
2119 time_t expiretime
= -1, now
= time(NULL
);
2121 fp
= fopen(filename
,"r");
2122 if (!fp
) return REDIS_ERR
;
2123 if (fread(buf
,9,1,fp
) == 0) goto eoferr
;
2125 if (memcmp(buf
,"REDIS",5) != 0) {
2127 redisLog(REDIS_WARNING
,"Wrong signature trying to load DB from file");
2130 rdbver
= atoi(buf
+5);
2133 redisLog(REDIS_WARNING
,"Can't handle RDB format version %d",rdbver
);
2140 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
2141 if (type
== REDIS_EXPIRETIME
) {
2142 if ((expiretime
= rdbLoadTime(fp
)) == -1) goto eoferr
;
2143 /* We read the time so we need to read the object type again */
2144 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
2146 if (type
== REDIS_EOF
) break;
2147 /* Handle SELECT DB opcode as a special case */
2148 if (type
== REDIS_SELECTDB
) {
2149 if ((dbid
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2151 if (dbid
>= (unsigned)server
.dbnum
) {
2152 redisLog(REDIS_WARNING
,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server
.dbnum
);
2155 db
= server
.db
+dbid
;
2160 if ((keyobj
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2162 if (type
== REDIS_STRING
) {
2163 /* Read string value */
2164 if ((o
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2165 } else if (type
== REDIS_LIST
|| type
== REDIS_SET
) {
2166 /* Read list/set value */
2169 if ((listlen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2171 o
= (type
== REDIS_LIST
) ? createListObject() : createSetObject();
2172 /* Load every single element of the list/set */
2176 if ((ele
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2177 if (type
== REDIS_LIST
) {
2178 if (!listAddNodeTail((list
*)o
->ptr
,ele
))
2179 oom("listAddNodeTail");
2181 if (dictAdd((dict
*)o
->ptr
,ele
,NULL
) == DICT_ERR
)
2188 /* Add the new object in the hash table */
2189 retval
= dictAdd(d
,keyobj
,o
);
2190 if (retval
== DICT_ERR
) {
2191 redisLog(REDIS_WARNING
,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj
->ptr
);
2194 /* Set the expire time if needed */
2195 if (expiretime
!= -1) {
2196 setExpire(db
,keyobj
,expiretime
);
2197 /* Delete this key if already expired */
2198 if (expiretime
< now
) deleteKey(db
,keyobj
);
2206 eoferr
: /* unexpected end of file is handled here with a fatal exit */
2207 if (keyobj
) decrRefCount(keyobj
);
2208 redisLog(REDIS_WARNING
,"Short read or OOM loading DB. Unrecoverable error, exiting now.");
2210 return REDIS_ERR
; /* Just to avoid warning */
2213 /*================================== Commands =============================== */
2215 static void authCommand(redisClient
*c
) {
2216 if (!server
.requirepass
|| !strcmp(c
->argv
[1]->ptr
, server
.requirepass
)) {
2217 c
->authenticated
= 1;
2218 addReply(c
,shared
.ok
);
2220 c
->authenticated
= 0;
2221 addReply(c
,shared
.err
);
2225 static void pingCommand(redisClient
*c
) {
2226 addReply(c
,shared
.pong
);
2229 static void echoCommand(redisClient
*c
) {
2230 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",
2231 (int)sdslen(c
->argv
[1]->ptr
)));
2232 addReply(c
,c
->argv
[1]);
2233 addReply(c
,shared
.crlf
);
2236 /*=================================== Strings =============================== */
2238 static void setGenericCommand(redisClient
*c
, int nx
) {
2241 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2242 if (retval
== DICT_ERR
) {
2244 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2245 incrRefCount(c
->argv
[2]);
2247 addReply(c
,shared
.czero
);
2251 incrRefCount(c
->argv
[1]);
2252 incrRefCount(c
->argv
[2]);
2255 removeExpire(c
->db
,c
->argv
[1]);
2256 addReply(c
, nx
? shared
.cone
: shared
.ok
);
2259 static void setCommand(redisClient
*c
) {
2260 setGenericCommand(c
,0);
2263 static void setnxCommand(redisClient
*c
) {
2264 setGenericCommand(c
,1);
2267 static void getCommand(redisClient
*c
) {
2268 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2271 addReply(c
,shared
.nullbulk
);
2273 if (o
->type
!= REDIS_STRING
) {
2274 addReply(c
,shared
.wrongtypeerr
);
2276 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(o
->ptr
)));
2278 addReply(c
,shared
.crlf
);
2283 static void getSetCommand(redisClient
*c
) {
2285 if (dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]) == DICT_ERR
) {
2286 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2288 incrRefCount(c
->argv
[1]);
2290 incrRefCount(c
->argv
[2]);
2292 removeExpire(c
->db
,c
->argv
[1]);
2295 static void mgetCommand(redisClient
*c
) {
2298 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",c
->argc
-1));
2299 for (j
= 1; j
< c
->argc
; j
++) {
2300 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[j
]);
2302 addReply(c
,shared
.nullbulk
);
2304 if (o
->type
!= REDIS_STRING
) {
2305 addReply(c
,shared
.nullbulk
);
2307 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(o
->ptr
)));
2309 addReply(c
,shared
.crlf
);
2315 static void incrDecrCommand(redisClient
*c
, long long incr
) {
2320 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2324 if (o
->type
!= REDIS_STRING
) {
2329 value
= strtoll(o
->ptr
, &eptr
, 10);
2334 o
= createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",value
));
2335 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],o
);
2336 if (retval
== DICT_ERR
) {
2337 dictReplace(c
->db
->dict
,c
->argv
[1],o
);
2338 removeExpire(c
->db
,c
->argv
[1]);
2340 incrRefCount(c
->argv
[1]);
2343 addReply(c
,shared
.colon
);
2345 addReply(c
,shared
.crlf
);
2348 static void incrCommand(redisClient
*c
) {
2349 incrDecrCommand(c
,1);
2352 static void decrCommand(redisClient
*c
) {
2353 incrDecrCommand(c
,-1);
2356 static void incrbyCommand(redisClient
*c
) {
2357 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
2358 incrDecrCommand(c
,incr
);
2361 static void decrbyCommand(redisClient
*c
) {
2362 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
2363 incrDecrCommand(c
,-incr
);
2366 /* ========================= Type agnostic commands ========================= */
2368 static void delCommand(redisClient
*c
) {
2371 for (j
= 1; j
< c
->argc
; j
++) {
2372 if (deleteKey(c
->db
,c
->argv
[j
])) {
2379 addReply(c
,shared
.czero
);
2382 addReply(c
,shared
.cone
);
2385 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",deleted
));
2390 static void existsCommand(redisClient
*c
) {
2391 addReply(c
,lookupKeyRead(c
->db
,c
->argv
[1]) ? shared
.cone
: shared
.czero
);
2394 static void selectCommand(redisClient
*c
) {
2395 int id
= atoi(c
->argv
[1]->ptr
);
2397 if (selectDb(c
,id
) == REDIS_ERR
) {
2398 addReplySds(c
,sdsnew("-ERR invalid DB index\r\n"));
2400 addReply(c
,shared
.ok
);
2404 static void randomkeyCommand(redisClient
*c
) {
2408 de
= dictGetRandomKey(c
->db
->dict
);
2409 if (!de
|| expireIfNeeded(c
->db
,dictGetEntryKey(de
)) == 0) break;
2412 addReply(c
,shared
.plus
);
2413 addReply(c
,shared
.crlf
);
2415 addReply(c
,shared
.plus
);
2416 addReply(c
,dictGetEntryKey(de
));
2417 addReply(c
,shared
.crlf
);
2421 static void keysCommand(redisClient
*c
) {
2424 sds pattern
= c
->argv
[1]->ptr
;
2425 int plen
= sdslen(pattern
);
2426 int numkeys
= 0, keyslen
= 0;
2427 robj
*lenobj
= createObject(REDIS_STRING
,NULL
);
2429 di
= dictGetIterator(c
->db
->dict
);
2430 if (!di
) oom("dictGetIterator");
2432 decrRefCount(lenobj
);
2433 while((de
= dictNext(di
)) != NULL
) {
2434 robj
*keyobj
= dictGetEntryKey(de
);
2436 sds key
= keyobj
->ptr
;
2437 if ((pattern
[0] == '*' && pattern
[1] == '\0') ||
2438 stringmatchlen(pattern
,plen
,key
,sdslen(key
),0)) {
2439 if (expireIfNeeded(c
->db
,keyobj
) == 0) {
2441 addReply(c
,shared
.space
);
2444 keyslen
+= sdslen(key
);
2448 dictReleaseIterator(di
);
2449 lenobj
->ptr
= sdscatprintf(sdsempty(),"$%lu\r\n",keyslen
+(numkeys
? (numkeys
-1) : 0));
2450 addReply(c
,shared
.crlf
);
2453 static void dbsizeCommand(redisClient
*c
) {
2455 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c
->db
->dict
)));
2458 static void lastsaveCommand(redisClient
*c
) {
2460 sdscatprintf(sdsempty(),":%lu\r\n",server
.lastsave
));
2463 static void typeCommand(redisClient
*c
) {
2467 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2472 case REDIS_STRING
: type
= "+string"; break;
2473 case REDIS_LIST
: type
= "+list"; break;
2474 case REDIS_SET
: type
= "+set"; break;
2475 default: type
= "unknown"; break;
2478 addReplySds(c
,sdsnew(type
));
2479 addReply(c
,shared
.crlf
);
2482 static void saveCommand(redisClient
*c
) {
2483 if (server
.bgsaveinprogress
) {
2484 addReplySds(c
,sdsnew("-ERR background save in progress\r\n"));
2487 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
2488 addReply(c
,shared
.ok
);
2490 addReply(c
,shared
.err
);
2494 static void bgsaveCommand(redisClient
*c
) {
2495 if (server
.bgsaveinprogress
) {
2496 addReplySds(c
,sdsnew("-ERR background save already in progress\r\n"));
2499 if (rdbSaveBackground(server
.dbfilename
) == REDIS_OK
) {
2500 addReply(c
,shared
.ok
);
2502 addReply(c
,shared
.err
);
2506 static void shutdownCommand(redisClient
*c
) {
2507 redisLog(REDIS_WARNING
,"User requested shutdown, saving DB...");
2508 /* XXX: TODO kill the child if there is a bgsave in progress */
2509 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
2510 if (server
.daemonize
) {
2511 unlink(server
.pidfile
);
2513 redisLog(REDIS_WARNING
,"%zu bytes used at exit",zmalloc_used_memory());
2514 redisLog(REDIS_WARNING
,"Server exit now, bye bye...");
2517 redisLog(REDIS_WARNING
,"Error trying to save the DB, can't exit");
2518 addReplySds(c
,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
2522 static void renameGenericCommand(redisClient
*c
, int nx
) {
2525 /* To use the same key as src and dst is probably an error */
2526 if (sdscmp(c
->argv
[1]->ptr
,c
->argv
[2]->ptr
) == 0) {
2527 addReply(c
,shared
.sameobjecterr
);
2531 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2533 addReply(c
,shared
.nokeyerr
);
2537 deleteIfVolatile(c
->db
,c
->argv
[2]);
2538 if (dictAdd(c
->db
->dict
,c
->argv
[2],o
) == DICT_ERR
) {
2541 addReply(c
,shared
.czero
);
2544 dictReplace(c
->db
->dict
,c
->argv
[2],o
);
2546 incrRefCount(c
->argv
[2]);
2548 deleteKey(c
->db
,c
->argv
[1]);
2550 addReply(c
,nx
? shared
.cone
: shared
.ok
);
2553 static void renameCommand(redisClient
*c
) {
2554 renameGenericCommand(c
,0);
2557 static void renamenxCommand(redisClient
*c
) {
2558 renameGenericCommand(c
,1);
2561 static void moveCommand(redisClient
*c
) {
2566 /* Obtain source and target DB pointers */
2569 if (selectDb(c
,atoi(c
->argv
[2]->ptr
)) == REDIS_ERR
) {
2570 addReply(c
,shared
.outofrangeerr
);
2574 selectDb(c
,srcid
); /* Back to the source DB */
2576 /* If the user is moving using as target the same
2577 * DB as the source DB it is probably an error. */
2579 addReply(c
,shared
.sameobjecterr
);
2583 /* Check if the element exists and get a reference */
2584 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2586 addReply(c
,shared
.czero
);
2590 /* Try to add the element to the target DB */
2591 deleteIfVolatile(dst
,c
->argv
[1]);
2592 if (dictAdd(dst
->dict
,c
->argv
[1],o
) == DICT_ERR
) {
2593 addReply(c
,shared
.czero
);
2596 incrRefCount(c
->argv
[1]);
2599 /* OK! key moved, free the entry in the source DB */
2600 deleteKey(src
,c
->argv
[1]);
2602 addReply(c
,shared
.cone
);
2605 /* =================================== Lists ================================ */
2606 static void pushGenericCommand(redisClient
*c
, int where
) {
2610 lobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2612 lobj
= createListObject();
2614 if (where
== REDIS_HEAD
) {
2615 if (!listAddNodeHead(list
,c
->argv
[2])) oom("listAddNodeHead");
2617 if (!listAddNodeTail(list
,c
->argv
[2])) oom("listAddNodeTail");
2619 dictAdd(c
->db
->dict
,c
->argv
[1],lobj
);
2620 incrRefCount(c
->argv
[1]);
2621 incrRefCount(c
->argv
[2]);
2623 if (lobj
->type
!= REDIS_LIST
) {
2624 addReply(c
,shared
.wrongtypeerr
);
2628 if (where
== REDIS_HEAD
) {
2629 if (!listAddNodeHead(list
,c
->argv
[2])) oom("listAddNodeHead");
2631 if (!listAddNodeTail(list
,c
->argv
[2])) oom("listAddNodeTail");
2633 incrRefCount(c
->argv
[2]);
2636 addReply(c
,shared
.ok
);
2639 static void lpushCommand(redisClient
*c
) {
2640 pushGenericCommand(c
,REDIS_HEAD
);
2643 static void rpushCommand(redisClient
*c
) {
2644 pushGenericCommand(c
,REDIS_TAIL
);
2647 static void llenCommand(redisClient
*c
) {
2651 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2653 addReply(c
,shared
.czero
);
2656 if (o
->type
!= REDIS_LIST
) {
2657 addReply(c
,shared
.wrongtypeerr
);
2660 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",listLength(l
)));
2665 static void lindexCommand(redisClient
*c
) {
2667 int index
= atoi(c
->argv
[2]->ptr
);
2669 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2671 addReply(c
,shared
.nullbulk
);
2673 if (o
->type
!= REDIS_LIST
) {
2674 addReply(c
,shared
.wrongtypeerr
);
2676 list
*list
= o
->ptr
;
2679 ln
= listIndex(list
, index
);
2681 addReply(c
,shared
.nullbulk
);
2683 robj
*ele
= listNodeValue(ln
);
2684 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele
->ptr
)));
2686 addReply(c
,shared
.crlf
);
2692 static void lsetCommand(redisClient
*c
) {
2694 int index
= atoi(c
->argv
[2]->ptr
);
2696 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2698 addReply(c
,shared
.nokeyerr
);
2700 if (o
->type
!= REDIS_LIST
) {
2701 addReply(c
,shared
.wrongtypeerr
);
2703 list
*list
= o
->ptr
;
2706 ln
= listIndex(list
, index
);
2708 addReply(c
,shared
.outofrangeerr
);
2710 robj
*ele
= listNodeValue(ln
);
2713 listNodeValue(ln
) = c
->argv
[3];
2714 incrRefCount(c
->argv
[3]);
2715 addReply(c
,shared
.ok
);
2722 static void popGenericCommand(redisClient
*c
, int where
) {
2725 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2727 addReply(c
,shared
.nullbulk
);
2729 if (o
->type
!= REDIS_LIST
) {
2730 addReply(c
,shared
.wrongtypeerr
);
2732 list
*list
= o
->ptr
;
2735 if (where
== REDIS_HEAD
)
2736 ln
= listFirst(list
);
2738 ln
= listLast(list
);
2741 addReply(c
,shared
.nullbulk
);
2743 robj
*ele
= listNodeValue(ln
);
2744 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele
->ptr
)));
2746 addReply(c
,shared
.crlf
);
2747 listDelNode(list
,ln
);
2754 static void lpopCommand(redisClient
*c
) {
2755 popGenericCommand(c
,REDIS_HEAD
);
2758 static void rpopCommand(redisClient
*c
) {
2759 popGenericCommand(c
,REDIS_TAIL
);
2762 static void lrangeCommand(redisClient
*c
) {
2764 int start
= atoi(c
->argv
[2]->ptr
);
2765 int end
= atoi(c
->argv
[3]->ptr
);
2767 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2769 addReply(c
,shared
.nullmultibulk
);
2771 if (o
->type
!= REDIS_LIST
) {
2772 addReply(c
,shared
.wrongtypeerr
);
2774 list
*list
= o
->ptr
;
2776 int llen
= listLength(list
);
2780 /* convert negative indexes */
2781 if (start
< 0) start
= llen
+start
;
2782 if (end
< 0) end
= llen
+end
;
2783 if (start
< 0) start
= 0;
2784 if (end
< 0) end
= 0;
2786 /* indexes sanity checks */
2787 if (start
> end
|| start
>= llen
) {
2788 /* Out of range start or start > end result in empty list */
2789 addReply(c
,shared
.emptymultibulk
);
2792 if (end
>= llen
) end
= llen
-1;
2793 rangelen
= (end
-start
)+1;
2795 /* Return the result in form of a multi-bulk reply */
2796 ln
= listIndex(list
, start
);
2797 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",rangelen
));
2798 for (j
= 0; j
< rangelen
; j
++) {
2799 ele
= listNodeValue(ln
);
2800 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele
->ptr
)));
2802 addReply(c
,shared
.crlf
);
2809 static void ltrimCommand(redisClient
*c
) {
2811 int start
= atoi(c
->argv
[2]->ptr
);
2812 int end
= atoi(c
->argv
[3]->ptr
);
2814 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2816 addReply(c
,shared
.nokeyerr
);
2818 if (o
->type
!= REDIS_LIST
) {
2819 addReply(c
,shared
.wrongtypeerr
);
2821 list
*list
= o
->ptr
;
2823 int llen
= listLength(list
);
2824 int j
, ltrim
, rtrim
;
2826 /* convert negative indexes */
2827 if (start
< 0) start
= llen
+start
;
2828 if (end
< 0) end
= llen
+end
;
2829 if (start
< 0) start
= 0;
2830 if (end
< 0) end
= 0;
2832 /* indexes sanity checks */
2833 if (start
> end
|| start
>= llen
) {
2834 /* Out of range start or start > end result in empty list */
2838 if (end
>= llen
) end
= llen
-1;
2843 /* Remove list elements to perform the trim */
2844 for (j
= 0; j
< ltrim
; j
++) {
2845 ln
= listFirst(list
);
2846 listDelNode(list
,ln
);
2848 for (j
= 0; j
< rtrim
; j
++) {
2849 ln
= listLast(list
);
2850 listDelNode(list
,ln
);
2852 addReply(c
,shared
.ok
);
2858 static void lremCommand(redisClient
*c
) {
2861 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2863 addReply(c
,shared
.czero
);
2865 if (o
->type
!= REDIS_LIST
) {
2866 addReply(c
,shared
.wrongtypeerr
);
2868 list
*list
= o
->ptr
;
2869 listNode
*ln
, *next
;
2870 int toremove
= atoi(c
->argv
[2]->ptr
);
2875 toremove
= -toremove
;
2878 ln
= fromtail
? list
->tail
: list
->head
;
2880 robj
*ele
= listNodeValue(ln
);
2882 next
= fromtail
? ln
->prev
: ln
->next
;
2883 if (sdscmp(ele
->ptr
,c
->argv
[3]->ptr
) == 0) {
2884 listDelNode(list
,ln
);
2887 if (toremove
&& removed
== toremove
) break;
2891 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",removed
));
2896 /* ==================================== Sets ================================ */
2898 static void saddCommand(redisClient
*c
) {
2901 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2903 set
= createSetObject();
2904 dictAdd(c
->db
->dict
,c
->argv
[1],set
);
2905 incrRefCount(c
->argv
[1]);
2907 if (set
->type
!= REDIS_SET
) {
2908 addReply(c
,shared
.wrongtypeerr
);
2912 if (dictAdd(set
->ptr
,c
->argv
[2],NULL
) == DICT_OK
) {
2913 incrRefCount(c
->argv
[2]);
2915 addReply(c
,shared
.cone
);
2917 addReply(c
,shared
.czero
);
2921 static void sremCommand(redisClient
*c
) {
2924 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2926 addReply(c
,shared
.czero
);
2928 if (set
->type
!= REDIS_SET
) {
2929 addReply(c
,shared
.wrongtypeerr
);
2932 if (dictDelete(set
->ptr
,c
->argv
[2]) == DICT_OK
) {
2934 addReply(c
,shared
.cone
);
2936 addReply(c
,shared
.czero
);
2941 static void smoveCommand(redisClient
*c
) {
2942 robj
*srcset
, *dstset
;
2944 srcset
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2945 dstset
= lookupKeyWrite(c
->db
,c
->argv
[2]);
2947 /* If the source key does not exist return 0, if it's of the wrong type
2949 if (srcset
== NULL
|| srcset
->type
!= REDIS_SET
) {
2950 addReply(c
, srcset
? shared
.wrongtypeerr
: shared
.czero
);
2953 /* Error if the destination key is not a set as well */
2954 if (dstset
&& dstset
->type
!= REDIS_SET
) {
2955 addReply(c
,shared
.wrongtypeerr
);
2958 /* Remove the element from the source set */
2959 if (dictDelete(srcset
->ptr
,c
->argv
[3]) == DICT_ERR
) {
2960 /* Key not found in the src set! return zero */
2961 addReply(c
,shared
.czero
);
2965 /* Add the element to the destination set */
2967 dstset
= createSetObject();
2968 dictAdd(c
->db
->dict
,c
->argv
[2],dstset
);
2969 incrRefCount(c
->argv
[2]);
2971 if (dictAdd(dstset
->ptr
,c
->argv
[3],NULL
) == DICT_OK
)
2972 incrRefCount(c
->argv
[3]);
2973 addReply(c
,shared
.cone
);
2976 static void sismemberCommand(redisClient
*c
) {
2979 set
= lookupKeyRead(c
->db
,c
->argv
[1]);
2981 addReply(c
,shared
.czero
);
2983 if (set
->type
!= REDIS_SET
) {
2984 addReply(c
,shared
.wrongtypeerr
);
2987 if (dictFind(set
->ptr
,c
->argv
[2]))
2988 addReply(c
,shared
.cone
);
2990 addReply(c
,shared
.czero
);
2994 static void scardCommand(redisClient
*c
) {
2998 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3000 addReply(c
,shared
.czero
);
3003 if (o
->type
!= REDIS_SET
) {
3004 addReply(c
,shared
.wrongtypeerr
);
3007 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",
3013 static int qsortCompareSetsByCardinality(const void *s1
, const void *s2
) {
3014 dict
**d1
= (void*) s1
, **d2
= (void*) s2
;
3016 return dictSize(*d1
)-dictSize(*d2
);
3019 static void sinterGenericCommand(redisClient
*c
, robj
**setskeys
, int setsnum
, robj
*dstkey
) {
3020 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
3023 robj
*lenobj
= NULL
, *dstset
= NULL
;
3024 int j
, cardinality
= 0;
3026 if (!dv
) oom("sinterGenericCommand");
3027 for (j
= 0; j
< setsnum
; j
++) {
3031 lookupKeyWrite(c
->db
,setskeys
[j
]) :
3032 lookupKeyRead(c
->db
,setskeys
[j
]);
3036 deleteKey(c
->db
,dstkey
);
3037 addReply(c
,shared
.ok
);
3039 addReply(c
,shared
.nullmultibulk
);
3043 if (setobj
->type
!= REDIS_SET
) {
3045 addReply(c
,shared
.wrongtypeerr
);
3048 dv
[j
] = setobj
->ptr
;
3050 /* Sort sets from the smallest to largest, this will improve our
3051 * algorithm's performace */
3052 qsort(dv
,setsnum
,sizeof(dict
*),qsortCompareSetsByCardinality
);
3054 /* The first thing we should output is the total number of elements...
3055 * since this is a multi-bulk write, but at this stage we don't know
3056 * the intersection set size, so we use a trick, append an empty object
3057 * to the output list and save the pointer to later modify it with the
3060 lenobj
= createObject(REDIS_STRING
,NULL
);
3062 decrRefCount(lenobj
);
3064 /* If we have a target key where to store the resulting set
3065 * create this key with an empty set inside */
3066 dstset
= createSetObject();
3069 /* Iterate all the elements of the first (smallest) set, and test
3070 * the element against all the other sets, if at least one set does
3071 * not include the element it is discarded */
3072 di
= dictGetIterator(dv
[0]);
3073 if (!di
) oom("dictGetIterator");
3075 while((de
= dictNext(di
)) != NULL
) {
3078 for (j
= 1; j
< setsnum
; j
++)
3079 if (dictFind(dv
[j
],dictGetEntryKey(de
)) == NULL
) break;
3081 continue; /* at least one set does not contain the member */
3082 ele
= dictGetEntryKey(de
);
3084 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(ele
->ptr
)));
3086 addReply(c
,shared
.crlf
);
3089 dictAdd(dstset
->ptr
,ele
,NULL
);
3093 dictReleaseIterator(di
);
3096 /* Store the resulting set into the target */
3097 deleteKey(c
->db
,dstkey
);
3098 dictAdd(c
->db
->dict
,dstkey
,dstset
);
3099 incrRefCount(dstkey
);
3103 lenobj
->ptr
= sdscatprintf(sdsempty(),"*%d\r\n",cardinality
);
3105 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",
3106 dictSize((dict
*)dstset
->ptr
)));
3112 static void sinterCommand(redisClient
*c
) {
3113 sinterGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
);
3116 static void sinterstoreCommand(redisClient
*c
) {
3117 sinterGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1]);
3120 #define REDIS_OP_UNION 0
3121 #define REDIS_OP_DIFF 1
3123 static void sunionDiffGenericCommand(redisClient
*c
, robj
**setskeys
, int setsnum
, robj
*dstkey
, int op
) {
3124 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
3127 robj
*dstset
= NULL
;
3128 int j
, cardinality
= 0;
3130 if (!dv
) oom("sunionDiffGenericCommand");
3131 for (j
= 0; j
< setsnum
; j
++) {
3135 lookupKeyWrite(c
->db
,setskeys
[j
]) :
3136 lookupKeyRead(c
->db
,setskeys
[j
]);
3141 if (setobj
->type
!= REDIS_SET
) {
3143 addReply(c
,shared
.wrongtypeerr
);
3146 dv
[j
] = setobj
->ptr
;
3149 /* We need a temp set object to store our union. If the dstkey
3150 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
3151 * this set object will be the resulting object to set into the target key*/
3152 dstset
= createSetObject();
3154 /* Iterate all the elements of all the sets, add every element a single
3155 * time to the result set */
3156 for (j
= 0; j
< setsnum
; j
++) {
3157 if (op
== REDIS_OP_DIFF
&& j
== 0 && !dv
[j
]) break; /* result set is empty */
3158 if (!dv
[j
]) continue; /* non existing keys are like empty sets */
3160 di
= dictGetIterator(dv
[j
]);
3161 if (!di
) oom("dictGetIterator");
3163 while((de
= dictNext(di
)) != NULL
) {
3166 /* dictAdd will not add the same element multiple times */
3167 ele
= dictGetEntryKey(de
);
3168 if (op
== REDIS_OP_UNION
|| j
== 0) {
3169 if (dictAdd(dstset
->ptr
,ele
,NULL
) == DICT_OK
) {
3173 } else if (op
== REDIS_OP_DIFF
) {
3174 if (dictDelete(dstset
->ptr
,ele
) == DICT_OK
) {
3179 dictReleaseIterator(di
);
3181 if (op
== REDIS_OP_DIFF
&& cardinality
== 0) break; /* result set is empty */
3184 /* Output the content of the resulting set, if not in STORE mode */
3186 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",cardinality
));
3187 di
= dictGetIterator(dstset
->ptr
);
3188 if (!di
) oom("dictGetIterator");
3189 while((de
= dictNext(di
)) != NULL
) {
3192 ele
= dictGetEntryKey(de
);
3193 addReplySds(c
,sdscatprintf(sdsempty(),
3194 "$%d\r\n",sdslen(ele
->ptr
)));
3196 addReply(c
,shared
.crlf
);
3198 dictReleaseIterator(di
);
3200 /* If we have a target key where to store the resulting set
3201 * create this key with the result set inside */
3202 deleteKey(c
->db
,dstkey
);
3203 dictAdd(c
->db
->dict
,dstkey
,dstset
);
3204 incrRefCount(dstkey
);
3209 decrRefCount(dstset
);
3211 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",
3212 dictSize((dict
*)dstset
->ptr
)));
3218 static void sunionCommand(redisClient
*c
) {
3219 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_UNION
);
3222 static void sunionstoreCommand(redisClient
*c
) {
3223 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_UNION
);
3226 static void sdiffCommand(redisClient
*c
) {
3227 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_DIFF
);
3230 static void sdiffstoreCommand(redisClient
*c
) {
3231 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_DIFF
);
3234 static void flushdbCommand(redisClient
*c
) {
3235 server
.dirty
+= dictSize(c
->db
->dict
);
3236 dictEmpty(c
->db
->dict
);
3237 dictEmpty(c
->db
->expires
);
3238 addReply(c
,shared
.ok
);
3241 static void flushallCommand(redisClient
*c
) {
3242 server
.dirty
+= emptyDb();
3243 addReply(c
,shared
.ok
);
3244 rdbSave(server
.dbfilename
);
3248 redisSortOperation
*createSortOperation(int type
, robj
*pattern
) {
3249 redisSortOperation
*so
= zmalloc(sizeof(*so
));
3250 if (!so
) oom("createSortOperation");
3252 so
->pattern
= pattern
;
3256 /* Return the value associated to the key with a name obtained
3257 * substituting the first occurence of '*' in 'pattern' with 'subst' */
3258 robj
*lookupKeyByPattern(redisDb
*db
, robj
*pattern
, robj
*subst
) {
3262 int prefixlen
, sublen
, postfixlen
;
3263 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
3267 char buf
[REDIS_SORTKEY_MAX
+1];
3270 spat
= pattern
->ptr
;
3272 if (sdslen(spat
)+sdslen(ssub
)-1 > REDIS_SORTKEY_MAX
) return NULL
;
3273 p
= strchr(spat
,'*');
3274 if (!p
) return NULL
;
3277 sublen
= sdslen(ssub
);
3278 postfixlen
= sdslen(spat
)-(prefixlen
+1);
3279 memcpy(keyname
.buf
,spat
,prefixlen
);
3280 memcpy(keyname
.buf
+prefixlen
,ssub
,sublen
);
3281 memcpy(keyname
.buf
+prefixlen
+sublen
,p
+1,postfixlen
);
3282 keyname
.buf
[prefixlen
+sublen
+postfixlen
] = '\0';
3283 keyname
.len
= prefixlen
+sublen
+postfixlen
;
3285 keyobj
.refcount
= 1;
3286 keyobj
.type
= REDIS_STRING
;
3287 keyobj
.ptr
= ((char*)&keyname
)+(sizeof(long)*2);
3289 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
3290 return lookupKeyRead(db
,&keyobj
);
3293 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
3294 * the additional parameter is not standard but a BSD-specific we have to
3295 * pass sorting parameters via the global 'server' structure */
3296 static int sortCompare(const void *s1
, const void *s2
) {
3297 const redisSortObject
*so1
= s1
, *so2
= s2
;
3300 if (!server
.sort_alpha
) {
3301 /* Numeric sorting. Here it's trivial as we precomputed scores */
3302 if (so1
->u
.score
> so2
->u
.score
) {
3304 } else if (so1
->u
.score
< so2
->u
.score
) {
3310 /* Alphanumeric sorting */
3311 if (server
.sort_bypattern
) {
3312 if (!so1
->u
.cmpobj
|| !so2
->u
.cmpobj
) {
3313 /* At least one compare object is NULL */
3314 if (so1
->u
.cmpobj
== so2
->u
.cmpobj
)
3316 else if (so1
->u
.cmpobj
== NULL
)
3321 /* We have both the objects, use strcoll */
3322 cmp
= strcoll(so1
->u
.cmpobj
->ptr
,so2
->u
.cmpobj
->ptr
);
3325 /* Compare elements directly */
3326 cmp
= strcoll(so1
->obj
->ptr
,so2
->obj
->ptr
);
3329 return server
.sort_desc
? -cmp
: cmp
;
3332 /* The SORT command is the most complex command in Redis. Warning: this code
3333 * is optimized for speed and a bit less for readability */
3334 static void sortCommand(redisClient
*c
) {
3337 int desc
= 0, alpha
= 0;
3338 int limit_start
= 0, limit_count
= -1, start
, end
;
3339 int j
, dontsort
= 0, vectorlen
;
3340 int getop
= 0; /* GET operation counter */
3341 robj
*sortval
, *sortby
= NULL
;
3342 redisSortObject
*vector
; /* Resulting vector to sort */
3344 /* Lookup the key to sort. It must be of the right types */
3345 sortval
= lookupKeyRead(c
->db
,c
->argv
[1]);
3346 if (sortval
== NULL
) {
3347 addReply(c
,shared
.nokeyerr
);
3350 if (sortval
->type
!= REDIS_SET
&& sortval
->type
!= REDIS_LIST
) {
3351 addReply(c
,shared
.wrongtypeerr
);
3355 /* Create a list of operations to perform for every sorted element.
3356 * Operations can be GET/DEL/INCR/DECR */
3357 operations
= listCreate();
3358 listSetFreeMethod(operations
,zfree
);
3361 /* Now we need to protect sortval incrementing its count, in the future
3362 * SORT may have options able to overwrite/delete keys during the sorting
3363 * and the sorted key itself may get destroied */
3364 incrRefCount(sortval
);
3366 /* The SORT command has an SQL-alike syntax, parse it */
3367 while(j
< c
->argc
) {
3368 int leftargs
= c
->argc
-j
-1;
3369 if (!strcasecmp(c
->argv
[j
]->ptr
,"asc")) {
3371 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"desc")) {
3373 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"alpha")) {
3375 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"limit") && leftargs
>= 2) {
3376 limit_start
= atoi(c
->argv
[j
+1]->ptr
);
3377 limit_count
= atoi(c
->argv
[j
+2]->ptr
);
3379 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"by") && leftargs
>= 1) {
3380 sortby
= c
->argv
[j
+1];
3381 /* If the BY pattern does not contain '*', i.e. it is constant,
3382 * we don't need to sort nor to lookup the weight keys. */
3383 if (strchr(c
->argv
[j
+1]->ptr
,'*') == NULL
) dontsort
= 1;
3385 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"get") && leftargs
>= 1) {
3386 listAddNodeTail(operations
,createSortOperation(
3387 REDIS_SORT_GET
,c
->argv
[j
+1]));
3390 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"del") && leftargs
>= 1) {
3391 listAddNodeTail(operations
,createSortOperation(
3392 REDIS_SORT_DEL
,c
->argv
[j
+1]));
3394 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"incr") && leftargs
>= 1) {
3395 listAddNodeTail(operations
,createSortOperation(
3396 REDIS_SORT_INCR
,c
->argv
[j
+1]));
3398 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"get") && leftargs
>= 1) {
3399 listAddNodeTail(operations
,createSortOperation(
3400 REDIS_SORT_DECR
,c
->argv
[j
+1]));
3403 decrRefCount(sortval
);
3404 listRelease(operations
);
3405 addReply(c
,shared
.syntaxerr
);
3411 /* Load the sorting vector with all the objects to sort */
3412 vectorlen
= (sortval
->type
== REDIS_LIST
) ?
3413 listLength((list
*)sortval
->ptr
) :
3414 dictSize((dict
*)sortval
->ptr
);
3415 vector
= zmalloc(sizeof(redisSortObject
)*vectorlen
);
3416 if (!vector
) oom("allocating objects vector for SORT");
3418 if (sortval
->type
== REDIS_LIST
) {
3419 list
*list
= sortval
->ptr
;
3423 while((ln
= listYield(list
))) {
3424 robj
*ele
= ln
->value
;
3425 vector
[j
].obj
= ele
;
3426 vector
[j
].u
.score
= 0;
3427 vector
[j
].u
.cmpobj
= NULL
;
3431 dict
*set
= sortval
->ptr
;
3435 di
= dictGetIterator(set
);
3436 if (!di
) oom("dictGetIterator");
3437 while((setele
= dictNext(di
)) != NULL
) {
3438 vector
[j
].obj
= dictGetEntryKey(setele
);
3439 vector
[j
].u
.score
= 0;
3440 vector
[j
].u
.cmpobj
= NULL
;
3443 dictReleaseIterator(di
);
3445 assert(j
== vectorlen
);
3447 /* Now it's time to load the right scores in the sorting vector */
3448 if (dontsort
== 0) {
3449 for (j
= 0; j
< vectorlen
; j
++) {
3453 byval
= lookupKeyByPattern(c
->db
,sortby
,vector
[j
].obj
);
3454 if (!byval
|| byval
->type
!= REDIS_STRING
) continue;
3456 vector
[j
].u
.cmpobj
= byval
;
3457 incrRefCount(byval
);
3459 vector
[j
].u
.score
= strtod(byval
->ptr
,NULL
);
3462 if (!alpha
) vector
[j
].u
.score
= strtod(vector
[j
].obj
->ptr
,NULL
);
3467 /* We are ready to sort the vector... perform a bit of sanity check
3468 * on the LIMIT option too. We'll use a partial version of quicksort. */
3469 start
= (limit_start
< 0) ? 0 : limit_start
;
3470 end
= (limit_count
< 0) ? vectorlen
-1 : start
+limit_count
-1;
3471 if (start
>= vectorlen
) {
3472 start
= vectorlen
-1;
3475 if (end
>= vectorlen
) end
= vectorlen
-1;
3477 if (dontsort
== 0) {
3478 server
.sort_desc
= desc
;
3479 server
.sort_alpha
= alpha
;
3480 server
.sort_bypattern
= sortby
? 1 : 0;
3481 if (sortby
&& (start
!= 0 || end
!= vectorlen
-1))
3482 pqsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
, start
,end
);
3484 qsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
);
3487 /* Send command output to the output buffer, performing the specified
3488 * GET/DEL/INCR/DECR operations if any. */
3489 outputlen
= getop
? getop
*(end
-start
+1) : end
-start
+1;
3490 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",outputlen
));
3491 for (j
= start
; j
<= end
; j
++) {
3494 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",
3495 sdslen(vector
[j
].obj
->ptr
)));
3496 addReply(c
,vector
[j
].obj
);
3497 addReply(c
,shared
.crlf
);
3499 listRewind(operations
);
3500 while((ln
= listYield(operations
))) {
3501 redisSortOperation
*sop
= ln
->value
;
3502 robj
*val
= lookupKeyByPattern(c
->db
,sop
->pattern
,
3505 if (sop
->type
== REDIS_SORT_GET
) {
3506 if (!val
|| val
->type
!= REDIS_STRING
) {
3507 addReply(c
,shared
.nullbulk
);
3509 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",
3512 addReply(c
,shared
.crlf
);
3514 } else if (sop
->type
== REDIS_SORT_DEL
) {
3521 decrRefCount(sortval
);
3522 listRelease(operations
);
3523 for (j
= 0; j
< vectorlen
; j
++) {
3524 if (sortby
&& alpha
&& vector
[j
].u
.cmpobj
)
3525 decrRefCount(vector
[j
].u
.cmpobj
);
3530 static void infoCommand(redisClient
*c
) {
3532 time_t uptime
= time(NULL
)-server
.stat_starttime
;
3534 info
= sdscatprintf(sdsempty(),
3535 "redis_version:%s\r\n"
3536 "uptime_in_seconds:%d\r\n"
3537 "uptime_in_days:%d\r\n"
3538 "connected_clients:%d\r\n"
3539 "connected_slaves:%d\r\n"
3540 "used_memory:%zu\r\n"
3541 "changes_since_last_save:%lld\r\n"
3542 "bgsave_in_progress:%d\r\n"
3543 "last_save_time:%d\r\n"
3544 "total_connections_received:%lld\r\n"
3545 "total_commands_processed:%lld\r\n"
3550 listLength(server
.clients
)-listLength(server
.slaves
),
3551 listLength(server
.slaves
),
3554 server
.bgsaveinprogress
,
3556 server
.stat_numconnections
,
3557 server
.stat_numcommands
,
3558 server
.masterhost
== NULL
? "master" : "slave"
3560 if (server
.masterhost
) {
3561 info
= sdscatprintf(info
,
3562 "master_host:%s\r\n"
3563 "master_port:%d\r\n"
3564 "master_link_status:%s\r\n"
3565 "master_last_io_seconds_ago:%d\r\n"
3568 (server
.replstate
== REDIS_REPL_CONNECTED
) ?
3570 (int)(time(NULL
)-server
.master
->lastinteraction
)
3573 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info
)));
3574 addReplySds(c
,info
);
3575 addReply(c
,shared
.crlf
);
3578 static void monitorCommand(redisClient
*c
) {
3579 /* ignore MONITOR if aleady slave or in monitor mode */
3580 if (c
->flags
& REDIS_SLAVE
) return;
3582 c
->flags
|= (REDIS_SLAVE
|REDIS_MONITOR
);
3584 if (!listAddNodeTail(server
.monitors
,c
)) oom("listAddNodeTail");
3585 addReply(c
,shared
.ok
);
3588 /* ================================= Expire ================================= */
3589 static int removeExpire(redisDb
*db
, robj
*key
) {
3590 if (dictDelete(db
->expires
,key
) == DICT_OK
) {
3597 static int setExpire(redisDb
*db
, robj
*key
, time_t when
) {
3598 if (dictAdd(db
->expires
,key
,(void*)when
) == DICT_ERR
) {
3606 /* Return the expire time of the specified key, or -1 if no expire
3607 * is associated with this key (i.e. the key is non volatile) */
3608 static time_t getExpire(redisDb
*db
, robj
*key
) {
3611 /* No expire? return ASAP */
3612 if (dictSize(db
->expires
) == 0 ||
3613 (de
= dictFind(db
->expires
,key
)) == NULL
) return -1;
3615 return (time_t) dictGetEntryVal(de
);
3618 static int expireIfNeeded(redisDb
*db
, robj
*key
) {
3622 /* No expire? return ASAP */
3623 if (dictSize(db
->expires
) == 0 ||
3624 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
3626 /* Lookup the expire */
3627 when
= (time_t) dictGetEntryVal(de
);
3628 if (time(NULL
) <= when
) return 0;
3630 /* Delete the key */
3631 dictDelete(db
->expires
,key
);
3632 return dictDelete(db
->dict
,key
) == DICT_OK
;
3635 static int deleteIfVolatile(redisDb
*db
, robj
*key
) {
3638 /* No expire? return ASAP */
3639 if (dictSize(db
->expires
) == 0 ||
3640 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
3642 /* Delete the key */
3644 dictDelete(db
->expires
,key
);
3645 return dictDelete(db
->dict
,key
) == DICT_OK
;
3648 static void expireCommand(redisClient
*c
) {
3650 int seconds
= atoi(c
->argv
[2]->ptr
);
3652 de
= dictFind(c
->db
->dict
,c
->argv
[1]);
3654 addReply(c
,shared
.czero
);
3658 addReply(c
, shared
.czero
);
3661 time_t when
= time(NULL
)+seconds
;
3662 if (setExpire(c
->db
,c
->argv
[1],when
))
3663 addReply(c
,shared
.cone
);
3665 addReply(c
,shared
.czero
);
3670 static void ttlCommand(redisClient
*c
) {
3674 expire
= getExpire(c
->db
,c
->argv
[1]);
3676 ttl
= (int) (expire
-time(NULL
));
3677 if (ttl
< 0) ttl
= -1;
3679 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",ttl
));
3682 /* =============================== Replication ============================= */
3684 static int syncWrite(int fd
, char *ptr
, ssize_t size
, int timeout
) {
3685 ssize_t nwritten
, ret
= size
;
3686 time_t start
= time(NULL
);
3690 if (aeWait(fd
,AE_WRITABLE
,1000) & AE_WRITABLE
) {
3691 nwritten
= write(fd
,ptr
,size
);
3692 if (nwritten
== -1) return -1;
3696 if ((time(NULL
)-start
) > timeout
) {
3704 static int syncRead(int fd
, char *ptr
, ssize_t size
, int timeout
) {
3705 ssize_t nread
, totread
= 0;
3706 time_t start
= time(NULL
);
3710 if (aeWait(fd
,AE_READABLE
,1000) & AE_READABLE
) {
3711 nread
= read(fd
,ptr
,size
);
3712 if (nread
== -1) return -1;
3717 if ((time(NULL
)-start
) > timeout
) {
3725 static int syncReadLine(int fd
, char *ptr
, ssize_t size
, int timeout
) {
3732 if (syncRead(fd
,&c
,1,timeout
) == -1) return -1;
3735 if (nread
&& *(ptr
-1) == '\r') *(ptr
-1) = '\0';
3746 static void syncCommand(redisClient
*c
) {
3747 /* ignore SYNC if aleady slave or in monitor mode */
3748 if (c
->flags
& REDIS_SLAVE
) return;
3750 /* SYNC can't be issued when the server has pending data to send to
3751 * the client about already issued commands. We need a fresh reply
3752 * buffer registering the differences between the BGSAVE and the current
3753 * dataset, so that we can copy to other slaves if needed. */
3754 if (listLength(c
->reply
) != 0) {
3755 addReplySds(c
,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
3759 redisLog(REDIS_NOTICE
,"Slave ask for synchronization");
3760 /* Here we need to check if there is a background saving operation
3761 * in progress, or if it is required to start one */
3762 if (server
.bgsaveinprogress
) {
3763 /* Ok a background save is in progress. Let's check if it is a good
3764 * one for replication, i.e. if there is another slave that is
3765 * registering differences since the server forked to save */
3769 listRewind(server
.slaves
);
3770 while((ln
= listYield(server
.slaves
))) {
3772 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) break;
3775 /* Perfect, the server is already registering differences for
3776 * another slave. Set the right state, and copy the buffer. */
3777 listRelease(c
->reply
);
3778 c
->reply
= listDup(slave
->reply
);
3779 if (!c
->reply
) oom("listDup copying slave reply list");
3780 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
3781 redisLog(REDIS_NOTICE
,"Waiting for end of BGSAVE for SYNC");
3783 /* No way, we need to wait for the next BGSAVE in order to
3784 * register differences */
3785 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_START
;
3786 redisLog(REDIS_NOTICE
,"Waiting for next BGSAVE for SYNC");
3789 /* Ok we don't have a BGSAVE in progress, let's start one */
3790 redisLog(REDIS_NOTICE
,"Starting BGSAVE for SYNC");
3791 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
3792 redisLog(REDIS_NOTICE
,"Replication failed, can't BGSAVE");
3793 addReplySds(c
,sdsnew("-ERR Unalbe to perform background save\r\n"));
3796 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
3799 c
->flags
|= REDIS_SLAVE
;
3801 if (!listAddNodeTail(server
.slaves
,c
)) oom("listAddNodeTail");
3805 static void sendBulkToSlave(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
3806 redisClient
*slave
= privdata
;
3808 REDIS_NOTUSED(mask
);
3809 char buf
[REDIS_IOBUF_LEN
];
3810 ssize_t nwritten
, buflen
;
3812 if (slave
->repldboff
== 0) {
3813 /* Write the bulk write count before to transfer the DB. In theory here
3814 * we don't know how much room there is in the output buffer of the
3815 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
3816 * operations) will never be smaller than the few bytes we need. */
3819 bulkcount
= sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
3821 if (write(fd
,bulkcount
,sdslen(bulkcount
)) != (signed)sdslen(bulkcount
))
3829 lseek(slave
->repldbfd
,slave
->repldboff
,SEEK_SET
);
3830 buflen
= read(slave
->repldbfd
,buf
,REDIS_IOBUF_LEN
);
3832 redisLog(REDIS_WARNING
,"Read error sending DB to slave: %s",
3833 (buflen
== 0) ? "premature EOF" : strerror(errno
));
3837 if ((nwritten
= write(fd
,buf
,buflen
)) == -1) {
3838 redisLog(REDIS_DEBUG
,"Write error sending DB to slave: %s",
3843 slave
->repldboff
+= nwritten
;
3844 if (slave
->repldboff
== slave
->repldbsize
) {
3845 close(slave
->repldbfd
);
3846 slave
->repldbfd
= -1;
3847 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
3848 slave
->replstate
= REDIS_REPL_ONLINE
;
3849 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
,
3850 sendReplyToClient
, slave
, NULL
) == AE_ERR
) {
3854 addReplySds(slave
,sdsempty());
3855 redisLog(REDIS_NOTICE
,"Synchronization with slave succeeded");
3859 static void updateSalvesWaitingBgsave(int bgsaveerr
) {
3861 int startbgsave
= 0;
3863 listRewind(server
.slaves
);
3864 while((ln
= listYield(server
.slaves
))) {
3865 redisClient
*slave
= ln
->value
;
3867 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) {
3869 slave
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
3870 } else if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) {
3871 struct redis_stat buf
;
3873 if (bgsaveerr
!= REDIS_OK
) {
3875 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE child returned an error");
3878 if ((slave
->repldbfd
= open(server
.dbfilename
,O_RDONLY
)) == -1 ||
3879 redis_fstat(slave
->repldbfd
,&buf
) == -1) {
3881 redisLog(REDIS_WARNING
,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno
));
3884 slave
->repldboff
= 0;
3885 slave
->repldbsize
= buf
.st_size
;
3886 slave
->replstate
= REDIS_REPL_SEND_BULK
;
3887 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
3888 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
, sendBulkToSlave
, slave
, NULL
) == AE_ERR
) {
3895 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
3896 listRewind(server
.slaves
);
3897 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE failed");
3898 while((ln
= listYield(server
.slaves
))) {
3899 redisClient
*slave
= ln
->value
;
3901 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
)
3908 static int syncWithMaster(void) {
3909 char buf
[1024], tmpfile
[256];
3911 int fd
= anetTcpConnect(NULL
,server
.masterhost
,server
.masterport
);
3915 redisLog(REDIS_WARNING
,"Unable to connect to MASTER: %s",
3919 /* Issue the SYNC command */
3920 if (syncWrite(fd
,"SYNC \r\n",7,5) == -1) {
3922 redisLog(REDIS_WARNING
,"I/O error writing to MASTER: %s",
3926 /* Read the bulk write count */
3927 if (syncReadLine(fd
,buf
,1024,3600) == -1) {
3929 redisLog(REDIS_WARNING
,"I/O error reading bulk count from MASTER: %s",
3933 dumpsize
= atoi(buf
+1);
3934 redisLog(REDIS_NOTICE
,"Receiving %d bytes data dump from MASTER",dumpsize
);
3935 /* Read the bulk write data on a temp file */
3936 snprintf(tmpfile
,256,"temp-%d.%ld.rdb",(int)time(NULL
),(long int)random());
3937 dfd
= open(tmpfile
,O_CREAT
|O_WRONLY
,0644);
3940 redisLog(REDIS_WARNING
,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno
));
3944 int nread
, nwritten
;
3946 nread
= read(fd
,buf
,(dumpsize
< 1024)?dumpsize
:1024);
3948 redisLog(REDIS_WARNING
,"I/O error trying to sync with MASTER: %s",
3954 nwritten
= write(dfd
,buf
,nread
);
3955 if (nwritten
== -1) {
3956 redisLog(REDIS_WARNING
,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno
));
3964 if (rename(tmpfile
,server
.dbfilename
) == -1) {
3965 redisLog(REDIS_WARNING
,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno
));
3971 if (rdbLoad(server
.dbfilename
) != REDIS_OK
) {
3972 redisLog(REDIS_WARNING
,"Failed trying to load the MASTER synchronization DB from disk");
3976 server
.master
= createClient(fd
);
3977 server
.master
->flags
|= REDIS_MASTER
;
3978 server
.replstate
= REDIS_REPL_CONNECTED
;
3982 static void slaveofCommand(redisClient
*c
) {
3983 if (!strcasecmp(c
->argv
[1]->ptr
,"no") &&
3984 !strcasecmp(c
->argv
[2]->ptr
,"one")) {
3985 if (server
.masterhost
) {
3986 sdsfree(server
.masterhost
);
3987 server
.masterhost
= NULL
;
3988 if (server
.master
) freeClient(server
.master
);
3989 server
.replstate
= REDIS_REPL_NONE
;
3990 redisLog(REDIS_NOTICE
,"MASTER MODE enabled (user request)");
3993 sdsfree(server
.masterhost
);
3994 server
.masterhost
= sdsdup(c
->argv
[1]->ptr
);
3995 server
.masterport
= atoi(c
->argv
[2]->ptr
);
3996 if (server
.master
) freeClient(server
.master
);
3997 server
.replstate
= REDIS_REPL_CONNECT
;
3998 redisLog(REDIS_NOTICE
,"SLAVE OF %s:%d enabled (user request)",
3999 server
.masterhost
, server
.masterport
);
4001 addReply(c
,shared
.ok
);
4004 /* ============================ Maxmemory directive ======================== */
4006 /* This function gets called when 'maxmemory' is set on the config file to limit
4007 * the max memory used by the server, and we are out of memory.
4008 * This function will try to, in order:
4010 * - Free objects from the free list
4011 * - Try to remove keys with an EXPIRE set
4013 * It is not possible to free enough memory to reach used-memory < maxmemory
4014 * the server will start refusing commands that will enlarge even more the
4017 static void freeMemoryIfNeeded(void) {
4018 while (server
.maxmemory
&& zmalloc_used_memory() > server
.maxmemory
) {
4019 if (listLength(server
.objfreelist
)) {
4022 listNode
*head
= listFirst(server
.objfreelist
);
4023 o
= listNodeValue(head
);
4024 listDelNode(server
.objfreelist
,head
);
4027 int j
, k
, freed
= 0;
4029 for (j
= 0; j
< server
.dbnum
; j
++) {
4031 robj
*minkey
= NULL
;
4032 struct dictEntry
*de
;
4034 if (dictSize(server
.db
[j
].expires
)) {
4036 /* From a sample of three keys drop the one nearest to
4037 * the natural expire */
4038 for (k
= 0; k
< 3; k
++) {
4041 de
= dictGetRandomKey(server
.db
[j
].expires
);
4042 t
= (time_t) dictGetEntryVal(de
);
4043 if (minttl
== -1 || t
< minttl
) {
4044 minkey
= dictGetEntryKey(de
);
4048 deleteKey(server
.db
+j
,minkey
);
4051 if (!freed
) return; /* nothing to free... */
4056 /* ================================= Debugging ============================== */
4058 static void debugCommand(redisClient
*c
) {
4059 if (!strcasecmp(c
->argv
[1]->ptr
,"segfault")) {
4061 } else if (!strcasecmp(c
->argv
[1]->ptr
,"object") && c
->argc
== 3) {
4062 dictEntry
*de
= dictFind(c
->db
->dict
,c
->argv
[2]);
4066 addReply(c
,shared
.nokeyerr
);
4069 key
= dictGetEntryKey(de
);
4070 val
= dictGetEntryVal(de
);
4071 addReplySds(c
,sdscatprintf(sdsempty(),
4072 "+Key at:%p refcount:%d, value at:%p refcount:%d\r\n",
4073 key
, key
->refcount
, val
, val
->refcount
));
4075 addReplySds(c
,sdsnew(
4076 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>]\r\n"));
4080 /* =================================== Main! ================================ */
4083 int linuxOvercommitMemoryValue(void) {
4084 FILE *fp
= fopen("/proc/sys/vm/overcommit_memory","r");
4088 if (fgets(buf
,64,fp
) == NULL
) {
4097 void linuxOvercommitMemoryWarning(void) {
4098 if (linuxOvercommitMemoryValue() == 0) {
4099 redisLog(REDIS_WARNING
,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'echo 1 > /proc/sys/vm/overcommit_memory' in your init scripts.");
4102 #endif /* __linux__ */
4104 static void daemonize(void) {
4108 if (fork() != 0) exit(0); /* parent exits */
4109 setsid(); /* create a new session */
4111 /* Every output goes to /dev/null. If Redis is daemonized but
4112 * the 'logfile' is set to 'stdout' in the configuration file
4113 * it will not log at all. */
4114 if ((fd
= open("/dev/null", O_RDWR
, 0)) != -1) {
4115 dup2(fd
, STDIN_FILENO
);
4116 dup2(fd
, STDOUT_FILENO
);
4117 dup2(fd
, STDERR_FILENO
);
4118 if (fd
> STDERR_FILENO
) close(fd
);
4120 /* Try to write the pid file */
4121 fp
= fopen(server
.pidfile
,"w");
4123 fprintf(fp
,"%d\n",getpid());
4128 int main(int argc
, char **argv
) {
4130 linuxOvercommitMemoryWarning();
4135 ResetServerSaveParams();
4136 loadServerConfig(argv
[1]);
4137 } else if (argc
> 2) {
4138 fprintf(stderr
,"Usage: ./redis-server [/path/to/redis.conf]\n");
4141 redisLog(REDIS_WARNING
,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
4144 if (server
.daemonize
) daemonize();
4145 redisLog(REDIS_NOTICE
,"Server started, Redis version " REDIS_VERSION
);
4146 if (rdbLoad(server
.dbfilename
) == REDIS_OK
)
4147 redisLog(REDIS_NOTICE
,"DB loaded from disk");
4148 if (aeCreateFileEvent(server
.el
, server
.fd
, AE_READABLE
,
4149 acceptHandler
, NULL
, NULL
) == AE_ERR
) oom("creating file event");
4150 redisLog(REDIS_NOTICE
,"The server is now ready to accept connections on port %d", server
.port
);
4152 aeDeleteEventLoop(server
.el
);