2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
30 #define REDIS_VERSION "0.101"
46 #include <arpa/inet.h>
50 #include <sys/resource.h>
53 #include "ae.h" /* Event driven programming library */
54 #include "sds.h" /* Dynamic safe strings */
55 #include "anet.h" /* Networking the easy way */
56 #include "dict.h" /* Hash tables */
57 #include "adlist.h" /* Linked lists */
58 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
59 #include "lzf.h" /* LZF compression library */
60 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
66 /* Static server configuration */
67 #define REDIS_SERVERPORT 6379 /* TCP port */
68 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
69 #define REDIS_IOBUF_LEN 1024
70 #define REDIS_LOADBUF_LEN 1024
71 #define REDIS_STATIC_ARGS 4
72 #define REDIS_DEFAULT_DBNUM 16
73 #define REDIS_CONFIGLINE_MAX 1024
74 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
75 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
76 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
78 /* Hash table parameters */
79 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
80 #define REDIS_HT_MINSLOTS 16384 /* Never resize the HT under this */
83 #define REDIS_CMD_BULK 1 /* Bulk write command */
84 #define REDIS_CMD_INLINE 2 /* Inline command */
85 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
86 this flags will return an error when the 'maxmemory' option is set in the
87 config file and the server is using more than maxmemory bytes of memory.
88 In short this commands are denied on low memory conditions. */
89 #define REDIS_CMD_DENYOOM 4
92 #define REDIS_STRING 0
97 /* Object types only used for dumping to disk */
98 #define REDIS_EXPIRETIME 253
99 #define REDIS_SELECTDB 254
100 #define REDIS_EOF 255
102 /* Defines related to the dump file format. To store 32 bits lengths for short
103 * keys requires a lot of space, so we check the most significant 2 bits of
104 * the first byte to interpreter the length:
106 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
107 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
108 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
109 * 11|000000 this means: specially encoded object will follow. The six bits
110 * number specify the kind of object that follows.
111 * See the REDIS_RDB_ENC_* defines.
113 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
114 * values, will fit inside. */
115 #define REDIS_RDB_6BITLEN 0
116 #define REDIS_RDB_14BITLEN 1
117 #define REDIS_RDB_32BITLEN 2
118 #define REDIS_RDB_ENCVAL 3
119 #define REDIS_RDB_LENERR UINT_MAX
121 /* When a length of a string object stored on disk has the first two bits
122 * set, the remaining two bits specify a special encoding for the object
123 * accordingly to the following defines: */
124 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
125 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
126 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
127 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
130 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
131 #define REDIS_SLAVE 2 /* This client is a slave server */
132 #define REDIS_MASTER 4 /* This client is a master server */
133 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
135 /* Slave replication state - slave side */
136 #define REDIS_REPL_NONE 0 /* No active replication */
137 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
138 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
140 /* Slave replication state - from the point of view of master
141 * Note that in SEND_BULK and ONLINE state the slave receives new updates
142 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
143 * to start the next background saving in order to send updates to it. */
144 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
145 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
146 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
147 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
149 /* List related stuff */
153 /* Sort operations */
154 #define REDIS_SORT_GET 0
155 #define REDIS_SORT_DEL 1
156 #define REDIS_SORT_INCR 2
157 #define REDIS_SORT_DECR 3
158 #define REDIS_SORT_ASC 4
159 #define REDIS_SORT_DESC 5
160 #define REDIS_SORTKEY_MAX 1024
163 #define REDIS_DEBUG 0
164 #define REDIS_NOTICE 1
165 #define REDIS_WARNING 2
167 /* Anti-warning macro... */
168 #define REDIS_NOTUSED(V) ((void) V)
170 /*================================= Data types ============================== */
172 /* A redis object, that is a type able to hold a string / list / set */
173 typedef struct redisObject
{
179 typedef struct redisDb
{
185 /* With multiplexing we need to take per-clinet state.
186 * Clients are taken in a liked list. */
187 typedef struct redisClient
{
194 int bulklen
; /* bulk read len. -1 if not in bulk read mode */
197 time_t lastinteraction
; /* time of the last interaction, used for timeout */
198 int flags
; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
199 int slaveseldb
; /* slave selected db, if this client is a slave */
200 int authenticated
; /* when requirepass is non-NULL */
201 int replstate
; /* replication state if this is a slave */
202 int repldbfd
; /* replication DB file descriptor */
203 long repldboff
; /* replication DB file offset */
204 off_t repldbsize
; /* replication DB file size */
212 /* Global server state structure */
218 unsigned int sharingpoolsize
;
219 long long dirty
; /* changes to DB from the last save */
221 list
*slaves
, *monitors
;
222 char neterr
[ANET_ERR_LEN
];
224 int cronloops
; /* number of times the cron function run */
225 list
*objfreelist
; /* A list of freed objects to avoid malloc() */
226 time_t lastsave
; /* Unix time of last save succeeede */
227 size_t usedmemory
; /* Used memory in megabytes */
228 /* Fields used only for stats */
229 time_t stat_starttime
; /* server start time */
230 long long stat_numcommands
; /* number of processed commands */
231 long long stat_numconnections
; /* number of connections received */
239 int bgsaveinprogress
;
240 struct saveparam
*saveparams
;
247 /* Replication related */
251 redisClient
*master
; /* client that is master for this slave */
253 unsigned int maxclients
;
254 unsigned int maxmemory
;
255 /* Sort parameters - qsort_r() is only available under BSD so we
256 * have to take this state global, in order to pass it to sortCompare() */
262 typedef void redisCommandProc(redisClient
*c
);
263 struct redisCommand
{
265 redisCommandProc
*proc
;
270 typedef struct _redisSortObject
{
278 typedef struct _redisSortOperation
{
281 } redisSortOperation
;
283 struct sharedObjectsStruct
{
284 robj
*crlf
, *ok
, *err
, *emptybulk
, *czero
, *cone
, *pong
, *space
,
285 *colon
, *nullbulk
, *nullmultibulk
,
286 *emptymultibulk
, *wrongtypeerr
, *nokeyerr
, *syntaxerr
, *sameobjecterr
,
287 *outofrangeerr
, *plus
,
288 *select0
, *select1
, *select2
, *select3
, *select4
,
289 *select5
, *select6
, *select7
, *select8
, *select9
;
292 /*================================ Prototypes =============================== */
294 static void freeStringObject(robj
*o
);
295 static void freeListObject(robj
*o
);
296 static void freeSetObject(robj
*o
);
297 static void decrRefCount(void *o
);
298 static robj
*createObject(int type
, void *ptr
);
299 static void freeClient(redisClient
*c
);
300 static int rdbLoad(char *filename
);
301 static void addReply(redisClient
*c
, robj
*obj
);
302 static void addReplySds(redisClient
*c
, sds s
);
303 static void incrRefCount(robj
*o
);
304 static int rdbSaveBackground(char *filename
);
305 static robj
*createStringObject(char *ptr
, size_t len
);
306 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
);
307 static int syncWithMaster(void);
308 static robj
*tryObjectSharing(robj
*o
);
309 static int removeExpire(redisDb
*db
, robj
*key
);
310 static int expireIfNeeded(redisDb
*db
, robj
*key
);
311 static int deleteIfVolatile(redisDb
*db
, robj
*key
);
312 static int deleteKey(redisDb
*db
, robj
*key
);
313 static time_t getExpire(redisDb
*db
, robj
*key
);
314 static int setExpire(redisDb
*db
, robj
*key
, time_t when
);
315 static void updateSalvesWaitingBgsave(int bgsaveerr
);
316 static void freeMemoryIfNeeded(void);
318 static void authCommand(redisClient
*c
);
319 static void pingCommand(redisClient
*c
);
320 static void echoCommand(redisClient
*c
);
321 static void setCommand(redisClient
*c
);
322 static void setnxCommand(redisClient
*c
);
323 static void getCommand(redisClient
*c
);
324 static void delCommand(redisClient
*c
);
325 static void existsCommand(redisClient
*c
);
326 static void incrCommand(redisClient
*c
);
327 static void decrCommand(redisClient
*c
);
328 static void incrbyCommand(redisClient
*c
);
329 static void decrbyCommand(redisClient
*c
);
330 static void selectCommand(redisClient
*c
);
331 static void randomkeyCommand(redisClient
*c
);
332 static void keysCommand(redisClient
*c
);
333 static void dbsizeCommand(redisClient
*c
);
334 static void lastsaveCommand(redisClient
*c
);
335 static void saveCommand(redisClient
*c
);
336 static void bgsaveCommand(redisClient
*c
);
337 static void shutdownCommand(redisClient
*c
);
338 static void moveCommand(redisClient
*c
);
339 static void renameCommand(redisClient
*c
);
340 static void renamenxCommand(redisClient
*c
);
341 static void lpushCommand(redisClient
*c
);
342 static void rpushCommand(redisClient
*c
);
343 static void lpopCommand(redisClient
*c
);
344 static void rpopCommand(redisClient
*c
);
345 static void llenCommand(redisClient
*c
);
346 static void lindexCommand(redisClient
*c
);
347 static void lrangeCommand(redisClient
*c
);
348 static void ltrimCommand(redisClient
*c
);
349 static void typeCommand(redisClient
*c
);
350 static void lsetCommand(redisClient
*c
);
351 static void saddCommand(redisClient
*c
);
352 static void sremCommand(redisClient
*c
);
353 static void smoveCommand(redisClient
*c
);
354 static void sismemberCommand(redisClient
*c
);
355 static void scardCommand(redisClient
*c
);
356 static void sinterCommand(redisClient
*c
);
357 static void sinterstoreCommand(redisClient
*c
);
358 static void sunionCommand(redisClient
*c
);
359 static void sunionstoreCommand(redisClient
*c
);
360 static void sdiffCommand(redisClient
*c
);
361 static void sdiffstoreCommand(redisClient
*c
);
362 static void syncCommand(redisClient
*c
);
363 static void flushdbCommand(redisClient
*c
);
364 static void flushallCommand(redisClient
*c
);
365 static void sortCommand(redisClient
*c
);
366 static void lremCommand(redisClient
*c
);
367 static void infoCommand(redisClient
*c
);
368 static void mgetCommand(redisClient
*c
);
369 static void monitorCommand(redisClient
*c
);
370 static void expireCommand(redisClient
*c
);
371 static void getSetCommand(redisClient
*c
);
372 static void ttlCommand(redisClient
*c
);
373 static void slaveofCommand(redisClient
*c
);
375 /*================================= Globals ================================= */
378 static struct redisServer server
; /* server global state */
379 static struct redisCommand cmdTable
[] = {
380 {"get",getCommand
,2,REDIS_CMD_INLINE
},
381 {"set",setCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
382 {"setnx",setnxCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
383 {"del",delCommand
,-2,REDIS_CMD_INLINE
},
384 {"exists",existsCommand
,2,REDIS_CMD_INLINE
},
385 {"incr",incrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
386 {"decr",decrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
387 {"mget",mgetCommand
,-2,REDIS_CMD_INLINE
},
388 {"rpush",rpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
389 {"lpush",lpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
390 {"rpop",rpopCommand
,2,REDIS_CMD_INLINE
},
391 {"lpop",lpopCommand
,2,REDIS_CMD_INLINE
},
392 {"llen",llenCommand
,2,REDIS_CMD_INLINE
},
393 {"lindex",lindexCommand
,3,REDIS_CMD_INLINE
},
394 {"lset",lsetCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
395 {"lrange",lrangeCommand
,4,REDIS_CMD_INLINE
},
396 {"ltrim",ltrimCommand
,4,REDIS_CMD_INLINE
},
397 {"lrem",lremCommand
,4,REDIS_CMD_BULK
},
398 {"sadd",saddCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
399 {"srem",sremCommand
,3,REDIS_CMD_BULK
},
400 {"smove",smoveCommand
,4,REDIS_CMD_BULK
},
401 {"sismember",sismemberCommand
,3,REDIS_CMD_BULK
},
402 {"scard",scardCommand
,2,REDIS_CMD_INLINE
},
403 {"sinter",sinterCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
404 {"sinterstore",sinterstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
405 {"sunion",sunionCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
406 {"sunionstore",sunionstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
407 {"sdiff",sdiffCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
408 {"sdiffstore",sdiffstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
409 {"smembers",sinterCommand
,2,REDIS_CMD_INLINE
},
410 {"incrby",incrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
411 {"decrby",decrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
412 {"getset",getSetCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
413 {"randomkey",randomkeyCommand
,1,REDIS_CMD_INLINE
},
414 {"select",selectCommand
,2,REDIS_CMD_INLINE
},
415 {"move",moveCommand
,3,REDIS_CMD_INLINE
},
416 {"rename",renameCommand
,3,REDIS_CMD_INLINE
},
417 {"renamenx",renamenxCommand
,3,REDIS_CMD_INLINE
},
418 {"expire",expireCommand
,3,REDIS_CMD_INLINE
},
419 {"keys",keysCommand
,2,REDIS_CMD_INLINE
},
420 {"dbsize",dbsizeCommand
,1,REDIS_CMD_INLINE
},
421 {"auth",authCommand
,2,REDIS_CMD_INLINE
},
422 {"ping",pingCommand
,1,REDIS_CMD_INLINE
},
423 {"echo",echoCommand
,2,REDIS_CMD_BULK
},
424 {"save",saveCommand
,1,REDIS_CMD_INLINE
},
425 {"bgsave",bgsaveCommand
,1,REDIS_CMD_INLINE
},
426 {"shutdown",shutdownCommand
,1,REDIS_CMD_INLINE
},
427 {"lastsave",lastsaveCommand
,1,REDIS_CMD_INLINE
},
428 {"type",typeCommand
,2,REDIS_CMD_INLINE
},
429 {"sync",syncCommand
,1,REDIS_CMD_INLINE
},
430 {"flushdb",flushdbCommand
,1,REDIS_CMD_INLINE
},
431 {"flushall",flushallCommand
,1,REDIS_CMD_INLINE
},
432 {"sort",sortCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
433 {"info",infoCommand
,1,REDIS_CMD_INLINE
},
434 {"monitor",monitorCommand
,1,REDIS_CMD_INLINE
},
435 {"ttl",ttlCommand
,2,REDIS_CMD_INLINE
},
436 {"slaveof",slaveofCommand
,3,REDIS_CMD_INLINE
},
440 /*============================ Utility functions ============================ */
442 /* Glob-style pattern matching. */
443 int stringmatchlen(const char *pattern
, int patternLen
,
444 const char *string
, int stringLen
, int nocase
)
449 while (pattern
[1] == '*') {
454 return 1; /* match */
456 if (stringmatchlen(pattern
+1, patternLen
-1,
457 string
, stringLen
, nocase
))
458 return 1; /* match */
462 return 0; /* no match */
466 return 0; /* no match */
476 not = pattern
[0] == '^';
483 if (pattern
[0] == '\\') {
486 if (pattern
[0] == string
[0])
488 } else if (pattern
[0] == ']') {
490 } else if (patternLen
== 0) {
494 } else if (pattern
[1] == '-' && patternLen
>= 3) {
495 int start
= pattern
[0];
496 int end
= pattern
[2];
504 start
= tolower(start
);
510 if (c
>= start
&& c
<= end
)
514 if (pattern
[0] == string
[0])
517 if (tolower((int)pattern
[0]) == tolower((int)string
[0]))
527 return 0; /* no match */
533 if (patternLen
>= 2) {
540 if (pattern
[0] != string
[0])
541 return 0; /* no match */
543 if (tolower((int)pattern
[0]) != tolower((int)string
[0]))
544 return 0; /* no match */
552 if (stringLen
== 0) {
553 while(*pattern
== '*') {
560 if (patternLen
== 0 && stringLen
== 0)
565 void redisLog(int level
, const char *fmt
, ...)
570 fp
= (server
.logfile
== NULL
) ? stdout
: fopen(server
.logfile
,"a");
574 if (level
>= server
.verbosity
) {
580 strftime(buf
,64,"%d %b %H:%M:%S",gmtime(&now
));
581 fprintf(fp
,"%s %c ",buf
,c
[level
]);
582 vfprintf(fp
, fmt
, ap
);
588 if (server
.logfile
) fclose(fp
);
591 /*====================== Hash table type implementation ==================== */
593 /* This is an hash table type that uses the SDS dynamic strings libary as
594 * keys and radis objects as values (objects can hold SDS strings,
597 static int sdsDictKeyCompare(void *privdata
, const void *key1
,
601 DICT_NOTUSED(privdata
);
603 l1
= sdslen((sds
)key1
);
604 l2
= sdslen((sds
)key2
);
605 if (l1
!= l2
) return 0;
606 return memcmp(key1
, key2
, l1
) == 0;
609 static void dictRedisObjectDestructor(void *privdata
, void *val
)
611 DICT_NOTUSED(privdata
);
616 static int dictSdsKeyCompare(void *privdata
, const void *key1
,
619 const robj
*o1
= key1
, *o2
= key2
;
620 return sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
623 static unsigned int dictSdsHash(const void *key
) {
625 return dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
628 static dictType setDictType
= {
629 dictSdsHash
, /* hash function */
632 dictSdsKeyCompare
, /* key compare */
633 dictRedisObjectDestructor
, /* key destructor */
634 NULL
/* val destructor */
637 static dictType hashDictType
= {
638 dictSdsHash
, /* hash function */
641 dictSdsKeyCompare
, /* key compare */
642 dictRedisObjectDestructor
, /* key destructor */
643 dictRedisObjectDestructor
/* val destructor */
646 /* ========================= Random utility functions ======================= */
648 /* Redis generally does not try to recover from out of memory conditions
649 * when allocating objects or strings, it is not clear if it will be possible
650 * to report this condition to the client since the networking layer itself
651 * is based on heap allocation for send buffers, so we simply abort.
652 * At least the code will be simpler to read... */
653 static void oom(const char *msg
) {
654 fprintf(stderr
, "%s: Out of memory\n",msg
);
660 /* ====================== Redis server networking stuff ===================== */
661 void closeTimedoutClients(void) {
664 time_t now
= time(NULL
);
666 listRewind(server
.clients
);
667 while ((ln
= listYield(server
.clients
)) != NULL
) {
668 c
= listNodeValue(ln
);
669 if (!(c
->flags
& REDIS_SLAVE
) && /* no timeout for slaves */
670 !(c
->flags
& REDIS_MASTER
) && /* no timeout for masters */
671 (now
- c
->lastinteraction
> server
.maxidletime
)) {
672 redisLog(REDIS_DEBUG
,"Closing idle client");
678 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
679 * we resize the hash table to save memory */
680 void tryResizeHashTables(void) {
683 for (j
= 0; j
< server
.dbnum
; j
++) {
684 long long size
, used
;
686 size
= dictSlots(server
.db
[j
].dict
);
687 used
= dictSize(server
.db
[j
].dict
);
688 if (size
&& used
&& size
> REDIS_HT_MINSLOTS
&&
689 (used
*100/size
< REDIS_HT_MINFILL
)) {
690 redisLog(REDIS_NOTICE
,"The hash table %d is too sparse, resize it...",j
);
691 dictResize(server
.db
[j
].dict
);
692 redisLog(REDIS_NOTICE
,"Hash table %d resized.",j
);
697 int serverCron(struct aeEventLoop
*eventLoop
, long long id
, void *clientData
) {
698 int j
, loops
= server
.cronloops
++;
699 REDIS_NOTUSED(eventLoop
);
701 REDIS_NOTUSED(clientData
);
703 /* Update the global state with the amount of used memory */
704 server
.usedmemory
= zmalloc_used_memory();
706 /* Show some info about non-empty databases */
707 for (j
= 0; j
< server
.dbnum
; j
++) {
708 long long size
, used
, vkeys
;
710 size
= dictSlots(server
.db
[j
].dict
);
711 used
= dictSize(server
.db
[j
].dict
);
712 vkeys
= dictSize(server
.db
[j
].expires
);
713 if (!(loops
% 5) && used
> 0) {
714 redisLog(REDIS_DEBUG
,"DB %d: %d keys (%d volatile) in %d slots HT.",j
,used
,vkeys
,size
);
715 /* dictPrintStats(server.dict); */
719 /* We don't want to resize the hash tables while a bacground saving
720 * is in progress: the saving child is created using fork() that is
721 * implemented with a copy-on-write semantic in most modern systems, so
722 * if we resize the HT while there is the saving child at work actually
723 * a lot of memory movements in the parent will cause a lot of pages
725 if (!server
.bgsaveinprogress
) tryResizeHashTables();
727 /* Show information about connected clients */
729 redisLog(REDIS_DEBUG
,"%d clients connected (%d slaves), %zu bytes in use",
730 listLength(server
.clients
)-listLength(server
.slaves
),
731 listLength(server
.slaves
),
733 dictSize(server
.sharingpool
));
736 /* Close connections of timedout clients */
737 if (server
.maxidletime
&& !(loops
% 10))
738 closeTimedoutClients();
740 /* Check if a background saving in progress terminated */
741 if (server
.bgsaveinprogress
) {
743 /* XXX: TODO handle the case of the saving child killed */
744 if (wait4(-1,&statloc
,WNOHANG
,NULL
)) {
745 int exitcode
= WEXITSTATUS(statloc
);
747 redisLog(REDIS_NOTICE
,
748 "Background saving terminated with success");
750 server
.lastsave
= time(NULL
);
752 redisLog(REDIS_WARNING
,
753 "Background saving error");
755 server
.bgsaveinprogress
= 0;
756 updateSalvesWaitingBgsave(exitcode
== 0 ? REDIS_OK
: REDIS_ERR
);
759 /* If there is not a background saving in progress check if
760 * we have to save now */
761 time_t now
= time(NULL
);
762 for (j
= 0; j
< server
.saveparamslen
; j
++) {
763 struct saveparam
*sp
= server
.saveparams
+j
;
765 if (server
.dirty
>= sp
->changes
&&
766 now
-server
.lastsave
> sp
->seconds
) {
767 redisLog(REDIS_NOTICE
,"%d changes in %d seconds. Saving...",
768 sp
->changes
, sp
->seconds
);
769 rdbSaveBackground(server
.dbfilename
);
775 /* Try to expire a few timed out keys */
776 for (j
= 0; j
< server
.dbnum
; j
++) {
777 redisDb
*db
= server
.db
+j
;
778 int num
= dictSize(db
->expires
);
781 time_t now
= time(NULL
);
783 if (num
> REDIS_EXPIRELOOKUPS_PER_CRON
)
784 num
= REDIS_EXPIRELOOKUPS_PER_CRON
;
789 if ((de
= dictGetRandomKey(db
->expires
)) == NULL
) break;
790 t
= (time_t) dictGetEntryVal(de
);
792 deleteKey(db
,dictGetEntryKey(de
));
798 /* Check if we should connect to a MASTER */
799 if (server
.replstate
== REDIS_REPL_CONNECT
) {
800 redisLog(REDIS_NOTICE
,"Connecting to MASTER...");
801 if (syncWithMaster() == REDIS_OK
) {
802 redisLog(REDIS_NOTICE
,"MASTER <-> SLAVE sync succeeded");
808 static void createSharedObjects(void) {
809 shared
.crlf
= createObject(REDIS_STRING
,sdsnew("\r\n"));
810 shared
.ok
= createObject(REDIS_STRING
,sdsnew("+OK\r\n"));
811 shared
.err
= createObject(REDIS_STRING
,sdsnew("-ERR\r\n"));
812 shared
.emptybulk
= createObject(REDIS_STRING
,sdsnew("$0\r\n\r\n"));
813 shared
.czero
= createObject(REDIS_STRING
,sdsnew(":0\r\n"));
814 shared
.cone
= createObject(REDIS_STRING
,sdsnew(":1\r\n"));
815 shared
.nullbulk
= createObject(REDIS_STRING
,sdsnew("$-1\r\n"));
816 shared
.nullmultibulk
= createObject(REDIS_STRING
,sdsnew("*-1\r\n"));
817 shared
.emptymultibulk
= createObject(REDIS_STRING
,sdsnew("*0\r\n"));
819 shared
.pong
= createObject(REDIS_STRING
,sdsnew("+PONG\r\n"));
820 shared
.wrongtypeerr
= createObject(REDIS_STRING
,sdsnew(
821 "-ERR Operation against a key holding the wrong kind of value\r\n"));
822 shared
.nokeyerr
= createObject(REDIS_STRING
,sdsnew(
823 "-ERR no such key\r\n"));
824 shared
.syntaxerr
= createObject(REDIS_STRING
,sdsnew(
825 "-ERR syntax error\r\n"));
826 shared
.sameobjecterr
= createObject(REDIS_STRING
,sdsnew(
827 "-ERR source and destination objects are the same\r\n"));
828 shared
.outofrangeerr
= createObject(REDIS_STRING
,sdsnew(
829 "-ERR index out of range\r\n"));
830 shared
.space
= createObject(REDIS_STRING
,sdsnew(" "));
831 shared
.colon
= createObject(REDIS_STRING
,sdsnew(":"));
832 shared
.plus
= createObject(REDIS_STRING
,sdsnew("+"));
833 shared
.select0
= createStringObject("select 0\r\n",10);
834 shared
.select1
= createStringObject("select 1\r\n",10);
835 shared
.select2
= createStringObject("select 2\r\n",10);
836 shared
.select3
= createStringObject("select 3\r\n",10);
837 shared
.select4
= createStringObject("select 4\r\n",10);
838 shared
.select5
= createStringObject("select 5\r\n",10);
839 shared
.select6
= createStringObject("select 6\r\n",10);
840 shared
.select7
= createStringObject("select 7\r\n",10);
841 shared
.select8
= createStringObject("select 8\r\n",10);
842 shared
.select9
= createStringObject("select 9\r\n",10);
845 static void appendServerSaveParams(time_t seconds
, int changes
) {
846 server
.saveparams
= zrealloc(server
.saveparams
,sizeof(struct saveparam
)*(server
.saveparamslen
+1));
847 if (server
.saveparams
== NULL
) oom("appendServerSaveParams");
848 server
.saveparams
[server
.saveparamslen
].seconds
= seconds
;
849 server
.saveparams
[server
.saveparamslen
].changes
= changes
;
850 server
.saveparamslen
++;
853 static void ResetServerSaveParams() {
854 zfree(server
.saveparams
);
855 server
.saveparams
= NULL
;
856 server
.saveparamslen
= 0;
859 static void initServerConfig() {
860 server
.dbnum
= REDIS_DEFAULT_DBNUM
;
861 server
.port
= REDIS_SERVERPORT
;
862 server
.verbosity
= REDIS_DEBUG
;
863 server
.maxidletime
= REDIS_MAXIDLETIME
;
864 server
.saveparams
= NULL
;
865 server
.logfile
= NULL
; /* NULL = log on standard output */
866 server
.bindaddr
= NULL
;
867 server
.glueoutputbuf
= 1;
868 server
.daemonize
= 0;
869 server
.pidfile
= "/var/run/redis.pid";
870 server
.dbfilename
= "dump.rdb";
871 server
.requirepass
= NULL
;
872 server
.shareobjects
= 0;
873 server
.maxclients
= 0;
874 server
.maxmemory
= 0;
875 ResetServerSaveParams();
877 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
878 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
879 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
880 /* Replication related */
882 server
.masterhost
= NULL
;
883 server
.masterport
= 6379;
884 server
.master
= NULL
;
885 server
.replstate
= REDIS_REPL_NONE
;
888 static void initServer() {
891 signal(SIGHUP
, SIG_IGN
);
892 signal(SIGPIPE
, SIG_IGN
);
894 server
.clients
= listCreate();
895 server
.slaves
= listCreate();
896 server
.monitors
= listCreate();
897 server
.objfreelist
= listCreate();
898 createSharedObjects();
899 server
.el
= aeCreateEventLoop();
900 server
.db
= zmalloc(sizeof(redisDb
)*server
.dbnum
);
901 server
.sharingpool
= dictCreate(&setDictType
,NULL
);
902 server
.sharingpoolsize
= 1024;
903 if (!server
.db
|| !server
.clients
|| !server
.slaves
|| !server
.monitors
|| !server
.el
|| !server
.objfreelist
)
904 oom("server initialization"); /* Fatal OOM */
905 server
.fd
= anetTcpServer(server
.neterr
, server
.port
, server
.bindaddr
);
906 if (server
.fd
== -1) {
907 redisLog(REDIS_WARNING
, "Opening TCP port: %s", server
.neterr
);
910 for (j
= 0; j
< server
.dbnum
; j
++) {
911 server
.db
[j
].dict
= dictCreate(&hashDictType
,NULL
);
912 server
.db
[j
].expires
= dictCreate(&setDictType
,NULL
);
915 server
.cronloops
= 0;
916 server
.bgsaveinprogress
= 0;
917 server
.lastsave
= time(NULL
);
919 server
.usedmemory
= 0;
920 server
.stat_numcommands
= 0;
921 server
.stat_numconnections
= 0;
922 server
.stat_starttime
= time(NULL
);
923 aeCreateTimeEvent(server
.el
, 1000, serverCron
, NULL
, NULL
);
926 /* Empty the whole database */
927 static long long emptyDb() {
929 long long removed
= 0;
931 for (j
= 0; j
< server
.dbnum
; j
++) {
932 removed
+= dictSize(server
.db
[j
].dict
);
933 dictEmpty(server
.db
[j
].dict
);
934 dictEmpty(server
.db
[j
].expires
);
939 static int yesnotoi(char *s
) {
940 if (!strcasecmp(s
,"yes")) return 1;
941 else if (!strcasecmp(s
,"no")) return 0;
945 /* I agree, this is a very rudimental way to load a configuration...
946 will improve later if the config gets more complex */
947 static void loadServerConfig(char *filename
) {
948 FILE *fp
= fopen(filename
,"r");
949 char buf
[REDIS_CONFIGLINE_MAX
+1], *err
= NULL
;
954 redisLog(REDIS_WARNING
,"Fatal error, can't open config file");
957 while(fgets(buf
,REDIS_CONFIGLINE_MAX
+1,fp
) != NULL
) {
963 line
= sdstrim(line
," \t\r\n");
965 /* Skip comments and blank lines*/
966 if (line
[0] == '#' || line
[0] == '\0') {
971 /* Split into arguments */
972 argv
= sdssplitlen(line
,sdslen(line
)," ",1,&argc
);
975 /* Execute config directives */
976 if (!strcasecmp(argv
[0],"timeout") && argc
== 2) {
977 server
.maxidletime
= atoi(argv
[1]);
978 if (server
.maxidletime
< 0) {
979 err
= "Invalid timeout value"; goto loaderr
;
981 } else if (!strcasecmp(argv
[0],"port") && argc
== 2) {
982 server
.port
= atoi(argv
[1]);
983 if (server
.port
< 1 || server
.port
> 65535) {
984 err
= "Invalid port"; goto loaderr
;
986 } else if (!strcasecmp(argv
[0],"bind") && argc
== 2) {
987 server
.bindaddr
= zstrdup(argv
[1]);
988 } else if (!strcasecmp(argv
[0],"save") && argc
== 3) {
989 int seconds
= atoi(argv
[1]);
990 int changes
= atoi(argv
[2]);
991 if (seconds
< 1 || changes
< 0) {
992 err
= "Invalid save parameters"; goto loaderr
;
994 appendServerSaveParams(seconds
,changes
);
995 } else if (!strcasecmp(argv
[0],"dir") && argc
== 2) {
996 if (chdir(argv
[1]) == -1) {
997 redisLog(REDIS_WARNING
,"Can't chdir to '%s': %s",
998 argv
[1], strerror(errno
));
1001 } else if (!strcasecmp(argv
[0],"loglevel") && argc
== 2) {
1002 if (!strcasecmp(argv
[1],"debug")) server
.verbosity
= REDIS_DEBUG
;
1003 else if (!strcasecmp(argv
[1],"notice")) server
.verbosity
= REDIS_NOTICE
;
1004 else if (!strcasecmp(argv
[1],"warning")) server
.verbosity
= REDIS_WARNING
;
1006 err
= "Invalid log level. Must be one of debug, notice, warning";
1009 } else if (!strcasecmp(argv
[0],"logfile") && argc
== 2) {
1012 server
.logfile
= zstrdup(argv
[1]);
1013 if (!strcasecmp(server
.logfile
,"stdout")) {
1014 zfree(server
.logfile
);
1015 server
.logfile
= NULL
;
1017 if (server
.logfile
) {
1018 /* Test if we are able to open the file. The server will not
1019 * be able to abort just for this problem later... */
1020 fp
= fopen(server
.logfile
,"a");
1022 err
= sdscatprintf(sdsempty(),
1023 "Can't open the log file: %s", strerror(errno
));
1028 } else if (!strcasecmp(argv
[0],"databases") && argc
== 2) {
1029 server
.dbnum
= atoi(argv
[1]);
1030 if (server
.dbnum
< 1) {
1031 err
= "Invalid number of databases"; goto loaderr
;
1033 } else if (!strcasecmp(argv
[0],"maxclients") && argc
== 2) {
1034 server
.maxclients
= atoi(argv
[1]);
1035 } else if (!strcasecmp(argv
[0],"maxmemory") && argc
== 2) {
1036 server
.maxmemory
= atoi(argv
[1]);
1037 } else if (!strcasecmp(argv
[0],"slaveof") && argc
== 3) {
1038 server
.masterhost
= sdsnew(argv
[1]);
1039 server
.masterport
= atoi(argv
[2]);
1040 server
.replstate
= REDIS_REPL_CONNECT
;
1041 } else if (!strcasecmp(argv
[0],"glueoutputbuf") && argc
== 2) {
1042 if ((server
.glueoutputbuf
= yesnotoi(argv
[1])) == -1) {
1043 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1045 } else if (!strcasecmp(argv
[0],"shareobjects") && argc
== 2) {
1046 if ((server
.shareobjects
= yesnotoi(argv
[1])) == -1) {
1047 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1049 } else if (!strcasecmp(argv
[0],"daemonize") && argc
== 2) {
1050 if ((server
.daemonize
= yesnotoi(argv
[1])) == -1) {
1051 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1053 } else if (!strcasecmp(argv
[0],"requirepass") && argc
== 2) {
1054 server
.requirepass
= zstrdup(argv
[1]);
1055 } else if (!strcasecmp(argv
[0],"pidfile") && argc
== 2) {
1056 server
.pidfile
= zstrdup(argv
[1]);
1057 } else if (!strcasecmp(argv
[0],"dbfilename") && argc
== 2) {
1058 server
.dbfilename
= zstrdup(argv
[1]);
1060 err
= "Bad directive or wrong number of arguments"; goto loaderr
;
1062 for (j
= 0; j
< argc
; j
++)
1071 fprintf(stderr
, "\n*** FATAL CONFIG FILE ERROR ***\n");
1072 fprintf(stderr
, "Reading the configuration file, at line %d\n", linenum
);
1073 fprintf(stderr
, ">>> '%s'\n", line
);
1074 fprintf(stderr
, "%s\n", err
);
1078 static void freeClientArgv(redisClient
*c
) {
1081 for (j
= 0; j
< c
->argc
; j
++)
1082 decrRefCount(c
->argv
[j
]);
1086 static void freeClient(redisClient
*c
) {
1089 aeDeleteFileEvent(server
.el
,c
->fd
,AE_READABLE
);
1090 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1091 sdsfree(c
->querybuf
);
1092 listRelease(c
->reply
);
1095 ln
= listSearchKey(server
.clients
,c
);
1097 listDelNode(server
.clients
,ln
);
1098 if (c
->flags
& REDIS_SLAVE
) {
1099 if (c
->replstate
== REDIS_REPL_SEND_BULK
&& c
->repldbfd
!= -1)
1101 list
*l
= (c
->flags
& REDIS_MONITOR
) ? server
.monitors
: server
.slaves
;
1102 ln
= listSearchKey(l
,c
);
1106 if (c
->flags
& REDIS_MASTER
) {
1107 server
.master
= NULL
;
1108 server
.replstate
= REDIS_REPL_CONNECT
;
1114 static void glueReplyBuffersIfNeeded(redisClient
*c
) {
1119 listRewind(c
->reply
);
1120 while((ln
= listYield(c
->reply
))) {
1122 totlen
+= sdslen(o
->ptr
);
1123 /* This optimization makes more sense if we don't have to copy
1125 if (totlen
> 1024) return;
1131 listRewind(c
->reply
);
1132 while((ln
= listYield(c
->reply
))) {
1134 memcpy(buf
+copylen
,o
->ptr
,sdslen(o
->ptr
));
1135 copylen
+= sdslen(o
->ptr
);
1136 listDelNode(c
->reply
,ln
);
1138 /* Now the output buffer is empty, add the new single element */
1139 o
= createObject(REDIS_STRING
,sdsnewlen(buf
,totlen
));
1140 if (!listAddNodeTail(c
->reply
,o
)) oom("listAddNodeTail");
1144 static void sendReplyToClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1145 redisClient
*c
= privdata
;
1146 int nwritten
= 0, totwritten
= 0, objlen
;
1149 REDIS_NOTUSED(mask
);
1151 if (server
.glueoutputbuf
&& listLength(c
->reply
) > 1)
1152 glueReplyBuffersIfNeeded(c
);
1153 while(listLength(c
->reply
)) {
1154 o
= listNodeValue(listFirst(c
->reply
));
1155 objlen
= sdslen(o
->ptr
);
1158 listDelNode(c
->reply
,listFirst(c
->reply
));
1162 if (c
->flags
& REDIS_MASTER
) {
1163 nwritten
= objlen
- c
->sentlen
;
1165 nwritten
= write(fd
, ((char*)o
->ptr
)+c
->sentlen
, objlen
- c
->sentlen
);
1166 if (nwritten
<= 0) break;
1168 c
->sentlen
+= nwritten
;
1169 totwritten
+= nwritten
;
1170 /* If we fully sent the object on head go to the next one */
1171 if (c
->sentlen
== objlen
) {
1172 listDelNode(c
->reply
,listFirst(c
->reply
));
1176 if (nwritten
== -1) {
1177 if (errno
== EAGAIN
) {
1180 redisLog(REDIS_DEBUG
,
1181 "Error writing to client: %s", strerror(errno
));
1186 if (totwritten
> 0) c
->lastinteraction
= time(NULL
);
1187 if (listLength(c
->reply
) == 0) {
1189 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1193 static struct redisCommand
*lookupCommand(char *name
) {
1195 while(cmdTable
[j
].name
!= NULL
) {
1196 if (!strcasecmp(name
,cmdTable
[j
].name
)) return &cmdTable
[j
];
1202 /* resetClient prepare the client to process the next command */
1203 static void resetClient(redisClient
*c
) {
1208 /* If this function gets called we already read a whole
1209 * command, argments are in the client argv/argc fields.
1210 * processCommand() execute the command or prepare the
1211 * server for a bulk read from the client.
1213 * If 1 is returned the client is still alive and valid and
1214 * and other operations can be performed by the caller. Otherwise
1215 * if 0 is returned the client was destroied (i.e. after QUIT). */
1216 static int processCommand(redisClient
*c
) {
1217 struct redisCommand
*cmd
;
1220 /* Free some memory if needed (maxmemory setting) */
1221 if (server
.maxmemory
) freeMemoryIfNeeded();
1223 /* The QUIT command is handled as a special case. Normal command
1224 * procs are unable to close the client connection safely */
1225 if (!strcasecmp(c
->argv
[0]->ptr
,"quit")) {
1229 cmd
= lookupCommand(c
->argv
[0]->ptr
);
1231 addReplySds(c
,sdsnew("-ERR unknown command\r\n"));
1234 } else if ((cmd
->arity
> 0 && cmd
->arity
!= c
->argc
) ||
1235 (c
->argc
< -cmd
->arity
)) {
1236 addReplySds(c
,sdsnew("-ERR wrong number of arguments\r\n"));
1239 } else if (server
.maxmemory
&& cmd
->flags
& REDIS_CMD_DENYOOM
&& zmalloc_used_memory() > server
.maxmemory
) {
1240 addReplySds(c
,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1243 } else if (cmd
->flags
& REDIS_CMD_BULK
&& c
->bulklen
== -1) {
1244 int bulklen
= atoi(c
->argv
[c
->argc
-1]->ptr
);
1246 decrRefCount(c
->argv
[c
->argc
-1]);
1247 if (bulklen
< 0 || bulklen
> 1024*1024*1024) {
1249 addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n"));
1254 c
->bulklen
= bulklen
+2; /* add two bytes for CR+LF */
1255 /* It is possible that the bulk read is already in the
1256 * buffer. Check this condition and handle it accordingly */
1257 if ((signed)sdslen(c
->querybuf
) >= c
->bulklen
) {
1258 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
1260 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
1265 /* Let's try to share objects on the command arguments vector */
1266 if (server
.shareobjects
) {
1268 for(j
= 1; j
< c
->argc
; j
++)
1269 c
->argv
[j
] = tryObjectSharing(c
->argv
[j
]);
1271 /* Check if the user is authenticated */
1272 if (server
.requirepass
&& !c
->authenticated
&& cmd
->proc
!= authCommand
) {
1273 addReplySds(c
,sdsnew("-ERR operation not permitted\r\n"));
1278 /* Exec the command */
1279 dirty
= server
.dirty
;
1281 if (server
.dirty
-dirty
!= 0 && listLength(server
.slaves
))
1282 replicationFeedSlaves(server
.slaves
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1283 if (listLength(server
.monitors
))
1284 replicationFeedSlaves(server
.monitors
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1285 server
.stat_numcommands
++;
1287 /* Prepare the client for the next command */
1288 if (c
->flags
& REDIS_CLOSE
) {
1296 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
) {
1300 /* (args*2)+1 is enough room for args, spaces, newlines */
1301 robj
*static_outv
[REDIS_STATIC_ARGS
*2+1];
1303 if (argc
<= REDIS_STATIC_ARGS
) {
1306 outv
= zmalloc(sizeof(robj
*)*(argc
*2+1));
1307 if (!outv
) oom("replicationFeedSlaves");
1310 for (j
= 0; j
< argc
; j
++) {
1311 if (j
!= 0) outv
[outc
++] = shared
.space
;
1312 if ((cmd
->flags
& REDIS_CMD_BULK
) && j
== argc
-1) {
1315 lenobj
= createObject(REDIS_STRING
,
1316 sdscatprintf(sdsempty(),"%d\r\n",sdslen(argv
[j
]->ptr
)));
1317 lenobj
->refcount
= 0;
1318 outv
[outc
++] = lenobj
;
1320 outv
[outc
++] = argv
[j
];
1322 outv
[outc
++] = shared
.crlf
;
1324 /* Increment all the refcounts at start and decrement at end in order to
1325 * be sure to free objects if there is no slave in a replication state
1326 * able to be feed with commands */
1327 for (j
= 0; j
< outc
; j
++) incrRefCount(outv
[j
]);
1329 while((ln
= listYield(slaves
))) {
1330 redisClient
*slave
= ln
->value
;
1332 /* Don't feed slaves that are still waiting for BGSAVE to start */
1333 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) continue;
1335 /* Feed all the other slaves, MONITORs and so on */
1336 if (slave
->slaveseldb
!= dictid
) {
1340 case 0: selectcmd
= shared
.select0
; break;
1341 case 1: selectcmd
= shared
.select1
; break;
1342 case 2: selectcmd
= shared
.select2
; break;
1343 case 3: selectcmd
= shared
.select3
; break;
1344 case 4: selectcmd
= shared
.select4
; break;
1345 case 5: selectcmd
= shared
.select5
; break;
1346 case 6: selectcmd
= shared
.select6
; break;
1347 case 7: selectcmd
= shared
.select7
; break;
1348 case 8: selectcmd
= shared
.select8
; break;
1349 case 9: selectcmd
= shared
.select9
; break;
1351 selectcmd
= createObject(REDIS_STRING
,
1352 sdscatprintf(sdsempty(),"select %d\r\n",dictid
));
1353 selectcmd
->refcount
= 0;
1356 addReply(slave
,selectcmd
);
1357 slave
->slaveseldb
= dictid
;
1359 for (j
= 0; j
< outc
; j
++) addReply(slave
,outv
[j
]);
1361 for (j
= 0; j
< outc
; j
++) decrRefCount(outv
[j
]);
1362 if (outv
!= static_outv
) zfree(outv
);
1365 static void readQueryFromClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1366 redisClient
*c
= (redisClient
*) privdata
;
1367 char buf
[REDIS_IOBUF_LEN
];
1370 REDIS_NOTUSED(mask
);
1372 nread
= read(fd
, buf
, REDIS_IOBUF_LEN
);
1374 if (errno
== EAGAIN
) {
1377 redisLog(REDIS_DEBUG
, "Reading from client: %s",strerror(errno
));
1381 } else if (nread
== 0) {
1382 redisLog(REDIS_DEBUG
, "Client closed connection");
1387 c
->querybuf
= sdscatlen(c
->querybuf
, buf
, nread
);
1388 c
->lastinteraction
= time(NULL
);
1394 if (c
->bulklen
== -1) {
1395 /* Read the first line of the query */
1396 char *p
= strchr(c
->querybuf
,'\n');
1402 query
= c
->querybuf
;
1403 c
->querybuf
= sdsempty();
1404 querylen
= 1+(p
-(query
));
1405 if (sdslen(query
) > querylen
) {
1406 /* leave data after the first line of the query in the buffer */
1407 c
->querybuf
= sdscatlen(c
->querybuf
,query
+querylen
,sdslen(query
)-querylen
);
1409 *p
= '\0'; /* remove "\n" */
1410 if (*(p
-1) == '\r') *(p
-1) = '\0'; /* and "\r" if any */
1411 sdsupdatelen(query
);
1413 /* Now we can split the query in arguments */
1414 if (sdslen(query
) == 0) {
1415 /* Ignore empty query */
1419 argv
= sdssplitlen(query
,sdslen(query
)," ",1,&argc
);
1420 if (argv
== NULL
) oom("sdssplitlen");
1423 if (c
->argv
) zfree(c
->argv
);
1424 c
->argv
= zmalloc(sizeof(robj
*)*argc
);
1425 if (c
->argv
== NULL
) oom("allocating arguments list for client");
1427 for (j
= 0; j
< argc
; j
++) {
1428 if (sdslen(argv
[j
])) {
1429 c
->argv
[c
->argc
] = createObject(REDIS_STRING
,argv
[j
]);
1436 /* Execute the command. If the client is still valid
1437 * after processCommand() return and there is something
1438 * on the query buffer try to process the next command. */
1439 if (processCommand(c
) && sdslen(c
->querybuf
)) goto again
;
1441 } else if (sdslen(c
->querybuf
) >= 1024*32) {
1442 redisLog(REDIS_DEBUG
, "Client protocol error");
1447 /* Bulk read handling. Note that if we are at this point
1448 the client already sent a command terminated with a newline,
1449 we are reading the bulk data that is actually the last
1450 argument of the command. */
1451 int qbl
= sdslen(c
->querybuf
);
1453 if (c
->bulklen
<= qbl
) {
1454 /* Copy everything but the final CRLF as final argument */
1455 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
1457 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
1464 static int selectDb(redisClient
*c
, int id
) {
1465 if (id
< 0 || id
>= server
.dbnum
)
1467 c
->db
= &server
.db
[id
];
1471 static void *dupClientReplyValue(void *o
) {
1472 incrRefCount((robj
*)o
);
1476 static redisClient
*createClient(int fd
) {
1477 redisClient
*c
= zmalloc(sizeof(*c
));
1479 anetNonBlock(NULL
,fd
);
1480 anetTcpNoDelay(NULL
,fd
);
1481 if (!c
) return NULL
;
1484 c
->querybuf
= sdsempty();
1490 c
->lastinteraction
= time(NULL
);
1491 c
->authenticated
= 0;
1492 c
->replstate
= REDIS_REPL_NONE
;
1493 if ((c
->reply
= listCreate()) == NULL
) oom("listCreate");
1494 listSetFreeMethod(c
->reply
,decrRefCount
);
1495 listSetDupMethod(c
->reply
,dupClientReplyValue
);
1496 if (aeCreateFileEvent(server
.el
, c
->fd
, AE_READABLE
,
1497 readQueryFromClient
, c
, NULL
) == AE_ERR
) {
1501 if (!listAddNodeTail(server
.clients
,c
)) oom("listAddNodeTail");
1505 static void addReply(redisClient
*c
, robj
*obj
) {
1506 if (listLength(c
->reply
) == 0 &&
1507 (c
->replstate
== REDIS_REPL_NONE
||
1508 c
->replstate
== REDIS_REPL_ONLINE
) &&
1509 aeCreateFileEvent(server
.el
, c
->fd
, AE_WRITABLE
,
1510 sendReplyToClient
, c
, NULL
) == AE_ERR
) return;
1511 if (!listAddNodeTail(c
->reply
,obj
)) oom("listAddNodeTail");
1515 static void addReplySds(redisClient
*c
, sds s
) {
1516 robj
*o
= createObject(REDIS_STRING
,s
);
1521 static void acceptHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1526 REDIS_NOTUSED(mask
);
1527 REDIS_NOTUSED(privdata
);
1529 cfd
= anetAccept(server
.neterr
, fd
, cip
, &cport
);
1530 if (cfd
== AE_ERR
) {
1531 redisLog(REDIS_DEBUG
,"Accepting client connection: %s", server
.neterr
);
1534 redisLog(REDIS_DEBUG
,"Accepted %s:%d", cip
, cport
);
1535 if ((c
= createClient(cfd
)) == NULL
) {
1536 redisLog(REDIS_WARNING
,"Error allocating resoures for the client");
1537 close(cfd
); /* May be already closed, just ingore errors */
1540 /* If maxclient directive is set and this is one client more... close the
1541 * connection. Note that we create the client instead to check before
1542 * for this condition, since now the socket is already set in nonblocking
1543 * mode and we can send an error for free using the Kernel I/O */
1544 if (server
.maxclients
&& listLength(server
.clients
) > server
.maxclients
) {
1545 char *err
= "-ERR max number of clients reached\r\n";
1547 /* That's a best effort error message, don't check write errors */
1548 (void) write(c
->fd
,err
,strlen(err
));
1552 server
.stat_numconnections
++;
1555 /* ======================= Redis objects implementation ===================== */
1557 static robj
*createObject(int type
, void *ptr
) {
1560 if (listLength(server
.objfreelist
)) {
1561 listNode
*head
= listFirst(server
.objfreelist
);
1562 o
= listNodeValue(head
);
1563 listDelNode(server
.objfreelist
,head
);
1565 o
= zmalloc(sizeof(*o
));
1567 if (!o
) oom("createObject");
1574 static robj
*createStringObject(char *ptr
, size_t len
) {
1575 return createObject(REDIS_STRING
,sdsnewlen(ptr
,len
));
1578 static robj
*createListObject(void) {
1579 list
*l
= listCreate();
1581 if (!l
) oom("listCreate");
1582 listSetFreeMethod(l
,decrRefCount
);
1583 return createObject(REDIS_LIST
,l
);
1586 static robj
*createSetObject(void) {
1587 dict
*d
= dictCreate(&setDictType
,NULL
);
1588 if (!d
) oom("dictCreate");
1589 return createObject(REDIS_SET
,d
);
1592 static void freeStringObject(robj
*o
) {
1596 static void freeListObject(robj
*o
) {
1597 listRelease((list
*) o
->ptr
);
1600 static void freeSetObject(robj
*o
) {
1601 dictRelease((dict
*) o
->ptr
);
1604 static void freeHashObject(robj
*o
) {
1605 dictRelease((dict
*) o
->ptr
);
1608 static void incrRefCount(robj
*o
) {
1610 #ifdef DEBUG_REFCOUNT
1611 if (o
->type
== REDIS_STRING
)
1612 printf("Increment '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
);
1616 static void decrRefCount(void *obj
) {
1619 #ifdef DEBUG_REFCOUNT
1620 if (o
->type
== REDIS_STRING
)
1621 printf("Decrement '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
-1);
1623 if (--(o
->refcount
) == 0) {
1625 case REDIS_STRING
: freeStringObject(o
); break;
1626 case REDIS_LIST
: freeListObject(o
); break;
1627 case REDIS_SET
: freeSetObject(o
); break;
1628 case REDIS_HASH
: freeHashObject(o
); break;
1629 default: assert(0 != 0); break;
1631 if (listLength(server
.objfreelist
) > REDIS_OBJFREELIST_MAX
||
1632 !listAddNodeHead(server
.objfreelist
,o
))
1637 /* Try to share an object against the shared objects pool */
1638 static robj
*tryObjectSharing(robj
*o
) {
1639 struct dictEntry
*de
;
1642 if (o
== NULL
|| server
.shareobjects
== 0) return o
;
1644 assert(o
->type
== REDIS_STRING
);
1645 de
= dictFind(server
.sharingpool
,o
);
1647 robj
*shared
= dictGetEntryKey(de
);
1649 c
= ((unsigned long) dictGetEntryVal(de
))+1;
1650 dictGetEntryVal(de
) = (void*) c
;
1651 incrRefCount(shared
);
1655 /* Here we are using a stream algorihtm: Every time an object is
1656 * shared we increment its count, everytime there is a miss we
1657 * recrement the counter of a random object. If this object reaches
1658 * zero we remove the object and put the current object instead. */
1659 if (dictSize(server
.sharingpool
) >=
1660 server
.sharingpoolsize
) {
1661 de
= dictGetRandomKey(server
.sharingpool
);
1663 c
= ((unsigned long) dictGetEntryVal(de
))-1;
1664 dictGetEntryVal(de
) = (void*) c
;
1666 dictDelete(server
.sharingpool
,de
->key
);
1669 c
= 0; /* If the pool is empty we want to add this object */
1674 retval
= dictAdd(server
.sharingpool
,o
,(void*)1);
1675 assert(retval
== DICT_OK
);
1682 static robj
*lookupKey(redisDb
*db
, robj
*key
) {
1683 dictEntry
*de
= dictFind(db
->dict
,key
);
1684 return de
? dictGetEntryVal(de
) : NULL
;
1687 static robj
*lookupKeyRead(redisDb
*db
, robj
*key
) {
1688 expireIfNeeded(db
,key
);
1689 return lookupKey(db
,key
);
1692 static robj
*lookupKeyWrite(redisDb
*db
, robj
*key
) {
1693 deleteIfVolatile(db
,key
);
1694 return lookupKey(db
,key
);
1697 static int deleteKey(redisDb
*db
, robj
*key
) {
1700 /* We need to protect key from destruction: after the first dictDelete()
1701 * it may happen that 'key' is no longer valid if we don't increment
1702 * it's count. This may happen when we get the object reference directly
1703 * from the hash table with dictRandomKey() or dict iterators */
1705 if (dictSize(db
->expires
)) dictDelete(db
->expires
,key
);
1706 retval
= dictDelete(db
->dict
,key
);
1709 return retval
== DICT_OK
;
1712 /*============================ DB saving/loading ============================ */
1714 static int rdbSaveType(FILE *fp
, unsigned char type
) {
1715 if (fwrite(&type
,1,1,fp
) == 0) return -1;
1719 static int rdbSaveTime(FILE *fp
, time_t t
) {
1720 int32_t t32
= (int32_t) t
;
1721 if (fwrite(&t32
,4,1,fp
) == 0) return -1;
1725 /* check rdbLoadLen() comments for more info */
1726 static int rdbSaveLen(FILE *fp
, uint32_t len
) {
1727 unsigned char buf
[2];
1730 /* Save a 6 bit len */
1731 buf
[0] = (len
&0xFF)|(REDIS_RDB_6BITLEN
<<6);
1732 if (fwrite(buf
,1,1,fp
) == 0) return -1;
1733 } else if (len
< (1<<14)) {
1734 /* Save a 14 bit len */
1735 buf
[0] = ((len
>>8)&0xFF)|(REDIS_RDB_14BITLEN
<<6);
1737 if (fwrite(buf
,2,1,fp
) == 0) return -1;
1739 /* Save a 32 bit len */
1740 buf
[0] = (REDIS_RDB_32BITLEN
<<6);
1741 if (fwrite(buf
,1,1,fp
) == 0) return -1;
1743 if (fwrite(&len
,4,1,fp
) == 0) return -1;
1748 /* String objects in the form "2391" "-100" without any space and with a
1749 * range of values that can fit in an 8, 16 or 32 bit signed value can be
1750 * encoded as integers to save space */
1751 int rdbTryIntegerEncoding(sds s
, unsigned char *enc
) {
1753 char *endptr
, buf
[32];
1755 /* Check if it's possible to encode this value as a number */
1756 value
= strtoll(s
, &endptr
, 10);
1757 if (endptr
[0] != '\0') return 0;
1758 snprintf(buf
,32,"%lld",value
);
1760 /* If the number converted back into a string is not identical
1761 * then it's not possible to encode the string as integer */
1762 if (strlen(buf
) != sdslen(s
) || memcmp(buf
,s
,sdslen(s
))) return 0;
1764 /* Finally check if it fits in our ranges */
1765 if (value
>= -(1<<7) && value
<= (1<<7)-1) {
1766 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT8
;
1767 enc
[1] = value
&0xFF;
1769 } else if (value
>= -(1<<15) && value
<= (1<<15)-1) {
1770 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT16
;
1771 enc
[1] = value
&0xFF;
1772 enc
[2] = (value
>>8)&0xFF;
1774 } else if (value
>= -((long long)1<<31) && value
<= ((long long)1<<31)-1) {
1775 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT32
;
1776 enc
[1] = value
&0xFF;
1777 enc
[2] = (value
>>8)&0xFF;
1778 enc
[3] = (value
>>16)&0xFF;
1779 enc
[4] = (value
>>24)&0xFF;
1786 static int rdbSaveLzfStringObject(FILE *fp
, robj
*obj
) {
1787 unsigned int comprlen
, outlen
;
1791 /* We require at least four bytes compression for this to be worth it */
1792 outlen
= sdslen(obj
->ptr
)-4;
1793 if (outlen
<= 0) return 0;
1794 if ((out
= zmalloc(outlen
+1)) == NULL
) return 0;
1795 comprlen
= lzf_compress(obj
->ptr
, sdslen(obj
->ptr
), out
, outlen
);
1796 if (comprlen
== 0) {
1800 /* Data compressed! Let's save it on disk */
1801 byte
= (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_LZF
;
1802 if (fwrite(&byte
,1,1,fp
) == 0) goto writeerr
;
1803 if (rdbSaveLen(fp
,comprlen
) == -1) goto writeerr
;
1804 if (rdbSaveLen(fp
,sdslen(obj
->ptr
)) == -1) goto writeerr
;
1805 if (fwrite(out
,comprlen
,1,fp
) == 0) goto writeerr
;
1814 /* Save a string objet as [len][data] on disk. If the object is a string
1815 * representation of an integer value we try to safe it in a special form */
1816 static int rdbSaveStringObject(FILE *fp
, robj
*obj
) {
1817 size_t len
= sdslen(obj
->ptr
);
1820 /* Try integer encoding */
1822 unsigned char buf
[5];
1823 if ((enclen
= rdbTryIntegerEncoding(obj
->ptr
,buf
)) > 0) {
1824 if (fwrite(buf
,enclen
,1,fp
) == 0) return -1;
1829 /* Try LZF compression - under 20 bytes it's unable to compress even
1830 * aaaaaaaaaaaaaaaaaa so skip it */
1831 if (1 && len
> 20) {
1834 retval
= rdbSaveLzfStringObject(fp
,obj
);
1835 if (retval
== -1) return -1;
1836 if (retval
> 0) return 0;
1837 /* retval == 0 means data can't be compressed, save the old way */
1840 /* Store verbatim */
1841 if (rdbSaveLen(fp
,len
) == -1) return -1;
1842 if (len
&& fwrite(obj
->ptr
,len
,1,fp
) == 0) return -1;
1846 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
1847 static int rdbSave(char *filename
) {
1848 dictIterator
*di
= NULL
;
1853 time_t now
= time(NULL
);
1855 snprintf(tmpfile
,256,"temp-%d.%ld.rdb",(int)time(NULL
),(long int)random());
1856 fp
= fopen(tmpfile
,"w");
1858 redisLog(REDIS_WARNING
, "Failed saving the DB: %s", strerror(errno
));
1861 if (fwrite("REDIS0001",9,1,fp
) == 0) goto werr
;
1862 for (j
= 0; j
< server
.dbnum
; j
++) {
1863 redisDb
*db
= server
.db
+j
;
1865 if (dictSize(d
) == 0) continue;
1866 di
= dictGetIterator(d
);
1872 /* Write the SELECT DB opcode */
1873 if (rdbSaveType(fp
,REDIS_SELECTDB
) == -1) goto werr
;
1874 if (rdbSaveLen(fp
,j
) == -1) goto werr
;
1876 /* Iterate this DB writing every entry */
1877 while((de
= dictNext(di
)) != NULL
) {
1878 robj
*key
= dictGetEntryKey(de
);
1879 robj
*o
= dictGetEntryVal(de
);
1880 time_t expiretime
= getExpire(db
,key
);
1882 /* Save the expire time */
1883 if (expiretime
!= -1) {
1884 /* If this key is already expired skip it */
1885 if (expiretime
< now
) continue;
1886 if (rdbSaveType(fp
,REDIS_EXPIRETIME
) == -1) goto werr
;
1887 if (rdbSaveTime(fp
,expiretime
) == -1) goto werr
;
1889 /* Save the key and associated value */
1890 if (rdbSaveType(fp
,o
->type
) == -1) goto werr
;
1891 if (rdbSaveStringObject(fp
,key
) == -1) goto werr
;
1892 if (o
->type
== REDIS_STRING
) {
1893 /* Save a string value */
1894 if (rdbSaveStringObject(fp
,o
) == -1) goto werr
;
1895 } else if (o
->type
== REDIS_LIST
) {
1896 /* Save a list value */
1897 list
*list
= o
->ptr
;
1901 if (rdbSaveLen(fp
,listLength(list
)) == -1) goto werr
;
1902 while((ln
= listYield(list
))) {
1903 robj
*eleobj
= listNodeValue(ln
);
1905 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
1907 } else if (o
->type
== REDIS_SET
) {
1908 /* Save a set value */
1910 dictIterator
*di
= dictGetIterator(set
);
1913 if (!set
) oom("dictGetIteraotr");
1914 if (rdbSaveLen(fp
,dictSize(set
)) == -1) goto werr
;
1915 while((de
= dictNext(di
)) != NULL
) {
1916 robj
*eleobj
= dictGetEntryKey(de
);
1918 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
1920 dictReleaseIterator(di
);
1925 dictReleaseIterator(di
);
1928 if (rdbSaveType(fp
,REDIS_EOF
) == -1) goto werr
;
1930 /* Make sure data will not remain on the OS's output buffers */
1935 /* Use RENAME to make sure the DB file is changed atomically only
1936 * if the generate DB file is ok. */
1937 if (rename(tmpfile
,filename
) == -1) {
1938 redisLog(REDIS_WARNING
,"Error moving temp DB file on the final destionation: %s", strerror(errno
));
1942 redisLog(REDIS_NOTICE
,"DB saved on disk");
1944 server
.lastsave
= time(NULL
);
1950 redisLog(REDIS_WARNING
,"Write error saving DB on disk: %s", strerror(errno
));
1951 if (di
) dictReleaseIterator(di
);
1955 static int rdbSaveBackground(char *filename
) {
1958 if (server
.bgsaveinprogress
) return REDIS_ERR
;
1959 if ((childpid
= fork()) == 0) {
1962 if (rdbSave(filename
) == REDIS_OK
) {
1969 if (childpid
== -1) {
1970 redisLog(REDIS_WARNING
,"Can't save in background: fork: %s",
1974 redisLog(REDIS_NOTICE
,"Background saving started by pid %d",childpid
);
1975 server
.bgsaveinprogress
= 1;
1978 return REDIS_OK
; /* unreached */
1981 static int rdbLoadType(FILE *fp
) {
1983 if (fread(&type
,1,1,fp
) == 0) return -1;
1987 static time_t rdbLoadTime(FILE *fp
) {
1989 if (fread(&t32
,4,1,fp
) == 0) return -1;
1990 return (time_t) t32
;
1993 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
1994 * of this file for a description of how this are stored on disk.
1996 * isencoded is set to 1 if the readed length is not actually a length but
1997 * an "encoding type", check the above comments for more info */
1998 static uint32_t rdbLoadLen(FILE *fp
, int rdbver
, int *isencoded
) {
1999 unsigned char buf
[2];
2002 if (isencoded
) *isencoded
= 0;
2004 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
2009 if (fread(buf
,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2010 type
= (buf
[0]&0xC0)>>6;
2011 if (type
== REDIS_RDB_6BITLEN
) {
2012 /* Read a 6 bit len */
2014 } else if (type
== REDIS_RDB_ENCVAL
) {
2015 /* Read a 6 bit len encoding type */
2016 if (isencoded
) *isencoded
= 1;
2018 } else if (type
== REDIS_RDB_14BITLEN
) {
2019 /* Read a 14 bit len */
2020 if (fread(buf
+1,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2021 return ((buf
[0]&0x3F)<<8)|buf
[1];
2023 /* Read a 32 bit len */
2024 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
2030 static robj
*rdbLoadIntegerObject(FILE *fp
, int enctype
) {
2031 unsigned char enc
[4];
2034 if (enctype
== REDIS_RDB_ENC_INT8
) {
2035 if (fread(enc
,1,1,fp
) == 0) return NULL
;
2036 val
= (signed char)enc
[0];
2037 } else if (enctype
== REDIS_RDB_ENC_INT16
) {
2039 if (fread(enc
,2,1,fp
) == 0) return NULL
;
2040 v
= enc
[0]|(enc
[1]<<8);
2042 } else if (enctype
== REDIS_RDB_ENC_INT32
) {
2044 if (fread(enc
,4,1,fp
) == 0) return NULL
;
2045 v
= enc
[0]|(enc
[1]<<8)|(enc
[2]<<16)|(enc
[3]<<24);
2048 val
= 0; /* anti-warning */
2051 return createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",val
));
2054 static robj
*rdbLoadLzfStringObject(FILE*fp
, int rdbver
) {
2055 unsigned int len
, clen
;
2056 unsigned char *c
= NULL
;
2059 if ((clen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2060 if ((len
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2061 if ((c
= zmalloc(clen
)) == NULL
) goto err
;
2062 if ((val
= sdsnewlen(NULL
,len
)) == NULL
) goto err
;
2063 if (fread(c
,clen
,1,fp
) == 0) goto err
;
2064 if (lzf_decompress(c
,clen
,val
,len
) == 0) goto err
;
2066 return createObject(REDIS_STRING
,val
);
2073 static robj
*rdbLoadStringObject(FILE*fp
, int rdbver
) {
2078 len
= rdbLoadLen(fp
,rdbver
,&isencoded
);
2081 case REDIS_RDB_ENC_INT8
:
2082 case REDIS_RDB_ENC_INT16
:
2083 case REDIS_RDB_ENC_INT32
:
2084 return tryObjectSharing(rdbLoadIntegerObject(fp
,len
));
2085 case REDIS_RDB_ENC_LZF
:
2086 return tryObjectSharing(rdbLoadLzfStringObject(fp
,rdbver
));
2092 if (len
== REDIS_RDB_LENERR
) return NULL
;
2093 val
= sdsnewlen(NULL
,len
);
2094 if (len
&& fread(val
,len
,1,fp
) == 0) {
2098 return tryObjectSharing(createObject(REDIS_STRING
,val
));
2101 static int rdbLoad(char *filename
) {
2103 robj
*keyobj
= NULL
;
2105 int type
, retval
, rdbver
;
2106 dict
*d
= server
.db
[0].dict
;
2107 redisDb
*db
= server
.db
+0;
2109 time_t expiretime
= -1, now
= time(NULL
);
2111 fp
= fopen(filename
,"r");
2112 if (!fp
) return REDIS_ERR
;
2113 if (fread(buf
,9,1,fp
) == 0) goto eoferr
;
2115 if (memcmp(buf
,"REDIS",5) != 0) {
2117 redisLog(REDIS_WARNING
,"Wrong signature trying to load DB from file");
2120 rdbver
= atoi(buf
+5);
2123 redisLog(REDIS_WARNING
,"Can't handle RDB format version %d",rdbver
);
2130 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
2131 if (type
== REDIS_EXPIRETIME
) {
2132 if ((expiretime
= rdbLoadTime(fp
)) == -1) goto eoferr
;
2133 /* We read the time so we need to read the object type again */
2134 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
2136 if (type
== REDIS_EOF
) break;
2137 /* Handle SELECT DB opcode as a special case */
2138 if (type
== REDIS_SELECTDB
) {
2139 if ((dbid
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2141 if (dbid
>= (unsigned)server
.dbnum
) {
2142 redisLog(REDIS_WARNING
,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server
.dbnum
);
2145 db
= server
.db
+dbid
;
2150 if ((keyobj
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2152 if (type
== REDIS_STRING
) {
2153 /* Read string value */
2154 if ((o
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2155 } else if (type
== REDIS_LIST
|| type
== REDIS_SET
) {
2156 /* Read list/set value */
2159 if ((listlen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2161 o
= (type
== REDIS_LIST
) ? createListObject() : createSetObject();
2162 /* Load every single element of the list/set */
2166 if ((ele
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2167 if (type
== REDIS_LIST
) {
2168 if (!listAddNodeTail((list
*)o
->ptr
,ele
))
2169 oom("listAddNodeTail");
2171 if (dictAdd((dict
*)o
->ptr
,ele
,NULL
) == DICT_ERR
)
2178 /* Add the new object in the hash table */
2179 retval
= dictAdd(d
,keyobj
,o
);
2180 if (retval
== DICT_ERR
) {
2181 redisLog(REDIS_WARNING
,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj
->ptr
);
2184 /* Set the expire time if needed */
2185 if (expiretime
!= -1) {
2186 setExpire(db
,keyobj
,expiretime
);
2187 /* Delete this key if already expired */
2188 if (expiretime
< now
) deleteKey(db
,keyobj
);
2196 eoferr
: /* unexpected end of file is handled here with a fatal exit */
2197 if (keyobj
) decrRefCount(keyobj
);
2198 redisLog(REDIS_WARNING
,"Short read or OOM loading DB. Unrecoverable error, exiting now.");
2200 return REDIS_ERR
; /* Just to avoid warning */
2203 /*================================== Commands =============================== */
2205 static void authCommand(redisClient
*c
) {
2206 if (!server
.requirepass
|| !strcmp(c
->argv
[1]->ptr
, server
.requirepass
)) {
2207 c
->authenticated
= 1;
2208 addReply(c
,shared
.ok
);
2210 c
->authenticated
= 0;
2211 addReply(c
,shared
.err
);
2215 static void pingCommand(redisClient
*c
) {
2216 addReply(c
,shared
.pong
);
2219 static void echoCommand(redisClient
*c
) {
2220 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",
2221 (int)sdslen(c
->argv
[1]->ptr
)));
2222 addReply(c
,c
->argv
[1]);
2223 addReply(c
,shared
.crlf
);
2226 /*=================================== Strings =============================== */
2228 static void setGenericCommand(redisClient
*c
, int nx
) {
2231 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2232 if (retval
== DICT_ERR
) {
2234 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2235 incrRefCount(c
->argv
[2]);
2237 addReply(c
,shared
.czero
);
2241 incrRefCount(c
->argv
[1]);
2242 incrRefCount(c
->argv
[2]);
2245 removeExpire(c
->db
,c
->argv
[1]);
2246 addReply(c
, nx
? shared
.cone
: shared
.ok
);
2249 static void setCommand(redisClient
*c
) {
2250 setGenericCommand(c
,0);
2253 static void setnxCommand(redisClient
*c
) {
2254 setGenericCommand(c
,1);
2257 static void getCommand(redisClient
*c
) {
2258 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2261 addReply(c
,shared
.nullbulk
);
2263 if (o
->type
!= REDIS_STRING
) {
2264 addReply(c
,shared
.wrongtypeerr
);
2266 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(o
->ptr
)));
2268 addReply(c
,shared
.crlf
);
2273 static void getSetCommand(redisClient
*c
) {
2275 if (dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]) == DICT_ERR
) {
2276 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2278 incrRefCount(c
->argv
[1]);
2280 incrRefCount(c
->argv
[2]);
2282 removeExpire(c
->db
,c
->argv
[1]);
2285 static void mgetCommand(redisClient
*c
) {
2288 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",c
->argc
-1));
2289 for (j
= 1; j
< c
->argc
; j
++) {
2290 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[j
]);
2292 addReply(c
,shared
.nullbulk
);
2294 if (o
->type
!= REDIS_STRING
) {
2295 addReply(c
,shared
.nullbulk
);
2297 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(o
->ptr
)));
2299 addReply(c
,shared
.crlf
);
2305 static void incrDecrCommand(redisClient
*c
, long long incr
) {
2310 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2314 if (o
->type
!= REDIS_STRING
) {
2319 value
= strtoll(o
->ptr
, &eptr
, 10);
2324 o
= createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",value
));
2325 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],o
);
2326 if (retval
== DICT_ERR
) {
2327 dictReplace(c
->db
->dict
,c
->argv
[1],o
);
2328 removeExpire(c
->db
,c
->argv
[1]);
2330 incrRefCount(c
->argv
[1]);
2333 addReply(c
,shared
.colon
);
2335 addReply(c
,shared
.crlf
);
2338 static void incrCommand(redisClient
*c
) {
2339 incrDecrCommand(c
,1);
2342 static void decrCommand(redisClient
*c
) {
2343 incrDecrCommand(c
,-1);
2346 static void incrbyCommand(redisClient
*c
) {
2347 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
2348 incrDecrCommand(c
,incr
);
2351 static void decrbyCommand(redisClient
*c
) {
2352 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
2353 incrDecrCommand(c
,-incr
);
2356 /* ========================= Type agnostic commands ========================= */
2358 static void delCommand(redisClient
*c
) {
2361 for (j
= 1; j
< c
->argc
; j
++) {
2362 if (deleteKey(c
->db
,c
->argv
[j
])) {
2369 addReply(c
,shared
.czero
);
2372 addReply(c
,shared
.cone
);
2375 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",deleted
));
2380 static void existsCommand(redisClient
*c
) {
2381 addReply(c
,lookupKeyRead(c
->db
,c
->argv
[1]) ? shared
.cone
: shared
.czero
);
2384 static void selectCommand(redisClient
*c
) {
2385 int id
= atoi(c
->argv
[1]->ptr
);
2387 if (selectDb(c
,id
) == REDIS_ERR
) {
2388 addReplySds(c
,sdsnew("-ERR invalid DB index\r\n"));
2390 addReply(c
,shared
.ok
);
2394 static void randomkeyCommand(redisClient
*c
) {
2398 de
= dictGetRandomKey(c
->db
->dict
);
2399 if (!de
|| expireIfNeeded(c
->db
,dictGetEntryKey(de
)) == 0) break;
2402 addReply(c
,shared
.plus
);
2403 addReply(c
,shared
.crlf
);
2405 addReply(c
,shared
.plus
);
2406 addReply(c
,dictGetEntryKey(de
));
2407 addReply(c
,shared
.crlf
);
2411 static void keysCommand(redisClient
*c
) {
2414 sds pattern
= c
->argv
[1]->ptr
;
2415 int plen
= sdslen(pattern
);
2416 int numkeys
= 0, keyslen
= 0;
2417 robj
*lenobj
= createObject(REDIS_STRING
,NULL
);
2419 di
= dictGetIterator(c
->db
->dict
);
2420 if (!di
) oom("dictGetIterator");
2422 decrRefCount(lenobj
);
2423 while((de
= dictNext(di
)) != NULL
) {
2424 robj
*keyobj
= dictGetEntryKey(de
);
2426 sds key
= keyobj
->ptr
;
2427 if ((pattern
[0] == '*' && pattern
[1] == '\0') ||
2428 stringmatchlen(pattern
,plen
,key
,sdslen(key
),0)) {
2429 if (expireIfNeeded(c
->db
,keyobj
) == 0) {
2431 addReply(c
,shared
.space
);
2434 keyslen
+= sdslen(key
);
2438 dictReleaseIterator(di
);
2439 lenobj
->ptr
= sdscatprintf(sdsempty(),"$%lu\r\n",keyslen
+(numkeys
? (numkeys
-1) : 0));
2440 addReply(c
,shared
.crlf
);
2443 static void dbsizeCommand(redisClient
*c
) {
2445 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c
->db
->dict
)));
2448 static void lastsaveCommand(redisClient
*c
) {
2450 sdscatprintf(sdsempty(),":%lu\r\n",server
.lastsave
));
2453 static void typeCommand(redisClient
*c
) {
2457 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2462 case REDIS_STRING
: type
= "+string"; break;
2463 case REDIS_LIST
: type
= "+list"; break;
2464 case REDIS_SET
: type
= "+set"; break;
2465 default: type
= "unknown"; break;
2468 addReplySds(c
,sdsnew(type
));
2469 addReply(c
,shared
.crlf
);
2472 static void saveCommand(redisClient
*c
) {
2473 if (server
.bgsaveinprogress
) {
2474 addReplySds(c
,sdsnew("-ERR background save in progress\r\n"));
2477 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
2478 addReply(c
,shared
.ok
);
2480 addReply(c
,shared
.err
);
2484 static void bgsaveCommand(redisClient
*c
) {
2485 if (server
.bgsaveinprogress
) {
2486 addReplySds(c
,sdsnew("-ERR background save already in progress\r\n"));
2489 if (rdbSaveBackground(server
.dbfilename
) == REDIS_OK
) {
2490 addReply(c
,shared
.ok
);
2492 addReply(c
,shared
.err
);
2496 static void shutdownCommand(redisClient
*c
) {
2497 redisLog(REDIS_WARNING
,"User requested shutdown, saving DB...");
2498 /* XXX: TODO kill the child if there is a bgsave in progress */
2499 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
2500 if (server
.daemonize
) {
2501 unlink(server
.pidfile
);
2503 redisLog(REDIS_WARNING
,"%zu bytes used at exit",zmalloc_used_memory());
2504 redisLog(REDIS_WARNING
,"Server exit now, bye bye...");
2507 redisLog(REDIS_WARNING
,"Error trying to save the DB, can't exit");
2508 addReplySds(c
,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
2512 static void renameGenericCommand(redisClient
*c
, int nx
) {
2515 /* To use the same key as src and dst is probably an error */
2516 if (sdscmp(c
->argv
[1]->ptr
,c
->argv
[2]->ptr
) == 0) {
2517 addReply(c
,shared
.sameobjecterr
);
2521 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2523 addReply(c
,shared
.nokeyerr
);
2527 deleteIfVolatile(c
->db
,c
->argv
[2]);
2528 if (dictAdd(c
->db
->dict
,c
->argv
[2],o
) == DICT_ERR
) {
2531 addReply(c
,shared
.czero
);
2534 dictReplace(c
->db
->dict
,c
->argv
[2],o
);
2536 incrRefCount(c
->argv
[2]);
2538 deleteKey(c
->db
,c
->argv
[1]);
2540 addReply(c
,nx
? shared
.cone
: shared
.ok
);
2543 static void renameCommand(redisClient
*c
) {
2544 renameGenericCommand(c
,0);
2547 static void renamenxCommand(redisClient
*c
) {
2548 renameGenericCommand(c
,1);
2551 static void moveCommand(redisClient
*c
) {
2556 /* Obtain source and target DB pointers */
2559 if (selectDb(c
,atoi(c
->argv
[2]->ptr
)) == REDIS_ERR
) {
2560 addReply(c
,shared
.outofrangeerr
);
2564 selectDb(c
,srcid
); /* Back to the source DB */
2566 /* If the user is moving using as target the same
2567 * DB as the source DB it is probably an error. */
2569 addReply(c
,shared
.sameobjecterr
);
2573 /* Check if the element exists and get a reference */
2574 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2576 addReply(c
,shared
.czero
);
2580 /* Try to add the element to the target DB */
2581 deleteIfVolatile(dst
,c
->argv
[1]);
2582 if (dictAdd(dst
->dict
,c
->argv
[1],o
) == DICT_ERR
) {
2583 addReply(c
,shared
.czero
);
2586 incrRefCount(c
->argv
[1]);
2589 /* OK! key moved, free the entry in the source DB */
2590 deleteKey(src
,c
->argv
[1]);
2592 addReply(c
,shared
.cone
);
2595 /* =================================== Lists ================================ */
2596 static void pushGenericCommand(redisClient
*c
, int where
) {
2600 lobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2602 lobj
= createListObject();
2604 if (where
== REDIS_HEAD
) {
2605 if (!listAddNodeHead(list
,c
->argv
[2])) oom("listAddNodeHead");
2607 if (!listAddNodeTail(list
,c
->argv
[2])) oom("listAddNodeTail");
2609 dictAdd(c
->db
->dict
,c
->argv
[1],lobj
);
2610 incrRefCount(c
->argv
[1]);
2611 incrRefCount(c
->argv
[2]);
2613 if (lobj
->type
!= REDIS_LIST
) {
2614 addReply(c
,shared
.wrongtypeerr
);
2618 if (where
== REDIS_HEAD
) {
2619 if (!listAddNodeHead(list
,c
->argv
[2])) oom("listAddNodeHead");
2621 if (!listAddNodeTail(list
,c
->argv
[2])) oom("listAddNodeTail");
2623 incrRefCount(c
->argv
[2]);
2626 addReply(c
,shared
.ok
);
2629 static void lpushCommand(redisClient
*c
) {
2630 pushGenericCommand(c
,REDIS_HEAD
);
2633 static void rpushCommand(redisClient
*c
) {
2634 pushGenericCommand(c
,REDIS_TAIL
);
2637 static void llenCommand(redisClient
*c
) {
2641 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2643 addReply(c
,shared
.czero
);
2646 if (o
->type
!= REDIS_LIST
) {
2647 addReply(c
,shared
.wrongtypeerr
);
2650 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",listLength(l
)));
2655 static void lindexCommand(redisClient
*c
) {
2657 int index
= atoi(c
->argv
[2]->ptr
);
2659 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2661 addReply(c
,shared
.nullbulk
);
2663 if (o
->type
!= REDIS_LIST
) {
2664 addReply(c
,shared
.wrongtypeerr
);
2666 list
*list
= o
->ptr
;
2669 ln
= listIndex(list
, index
);
2671 addReply(c
,shared
.nullbulk
);
2673 robj
*ele
= listNodeValue(ln
);
2674 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele
->ptr
)));
2676 addReply(c
,shared
.crlf
);
2682 static void lsetCommand(redisClient
*c
) {
2684 int index
= atoi(c
->argv
[2]->ptr
);
2686 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2688 addReply(c
,shared
.nokeyerr
);
2690 if (o
->type
!= REDIS_LIST
) {
2691 addReply(c
,shared
.wrongtypeerr
);
2693 list
*list
= o
->ptr
;
2696 ln
= listIndex(list
, index
);
2698 addReply(c
,shared
.outofrangeerr
);
2700 robj
*ele
= listNodeValue(ln
);
2703 listNodeValue(ln
) = c
->argv
[3];
2704 incrRefCount(c
->argv
[3]);
2705 addReply(c
,shared
.ok
);
2712 static void popGenericCommand(redisClient
*c
, int where
) {
2715 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2717 addReply(c
,shared
.nullbulk
);
2719 if (o
->type
!= REDIS_LIST
) {
2720 addReply(c
,shared
.wrongtypeerr
);
2722 list
*list
= o
->ptr
;
2725 if (where
== REDIS_HEAD
)
2726 ln
= listFirst(list
);
2728 ln
= listLast(list
);
2731 addReply(c
,shared
.nullbulk
);
2733 robj
*ele
= listNodeValue(ln
);
2734 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele
->ptr
)));
2736 addReply(c
,shared
.crlf
);
2737 listDelNode(list
,ln
);
2744 static void lpopCommand(redisClient
*c
) {
2745 popGenericCommand(c
,REDIS_HEAD
);
2748 static void rpopCommand(redisClient
*c
) {
2749 popGenericCommand(c
,REDIS_TAIL
);
2752 static void lrangeCommand(redisClient
*c
) {
2754 int start
= atoi(c
->argv
[2]->ptr
);
2755 int end
= atoi(c
->argv
[3]->ptr
);
2757 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2759 addReply(c
,shared
.nullmultibulk
);
2761 if (o
->type
!= REDIS_LIST
) {
2762 addReply(c
,shared
.wrongtypeerr
);
2764 list
*list
= o
->ptr
;
2766 int llen
= listLength(list
);
2770 /* convert negative indexes */
2771 if (start
< 0) start
= llen
+start
;
2772 if (end
< 0) end
= llen
+end
;
2773 if (start
< 0) start
= 0;
2774 if (end
< 0) end
= 0;
2776 /* indexes sanity checks */
2777 if (start
> end
|| start
>= llen
) {
2778 /* Out of range start or start > end result in empty list */
2779 addReply(c
,shared
.emptymultibulk
);
2782 if (end
>= llen
) end
= llen
-1;
2783 rangelen
= (end
-start
)+1;
2785 /* Return the result in form of a multi-bulk reply */
2786 ln
= listIndex(list
, start
);
2787 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",rangelen
));
2788 for (j
= 0; j
< rangelen
; j
++) {
2789 ele
= listNodeValue(ln
);
2790 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele
->ptr
)));
2792 addReply(c
,shared
.crlf
);
2799 static void ltrimCommand(redisClient
*c
) {
2801 int start
= atoi(c
->argv
[2]->ptr
);
2802 int end
= atoi(c
->argv
[3]->ptr
);
2804 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2806 addReply(c
,shared
.nokeyerr
);
2808 if (o
->type
!= REDIS_LIST
) {
2809 addReply(c
,shared
.wrongtypeerr
);
2811 list
*list
= o
->ptr
;
2813 int llen
= listLength(list
);
2814 int j
, ltrim
, rtrim
;
2816 /* convert negative indexes */
2817 if (start
< 0) start
= llen
+start
;
2818 if (end
< 0) end
= llen
+end
;
2819 if (start
< 0) start
= 0;
2820 if (end
< 0) end
= 0;
2822 /* indexes sanity checks */
2823 if (start
> end
|| start
>= llen
) {
2824 /* Out of range start or start > end result in empty list */
2828 if (end
>= llen
) end
= llen
-1;
2833 /* Remove list elements to perform the trim */
2834 for (j
= 0; j
< ltrim
; j
++) {
2835 ln
= listFirst(list
);
2836 listDelNode(list
,ln
);
2838 for (j
= 0; j
< rtrim
; j
++) {
2839 ln
= listLast(list
);
2840 listDelNode(list
,ln
);
2842 addReply(c
,shared
.ok
);
2848 static void lremCommand(redisClient
*c
) {
2851 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2853 addReply(c
,shared
.nokeyerr
);
2855 if (o
->type
!= REDIS_LIST
) {
2856 addReply(c
,shared
.wrongtypeerr
);
2858 list
*list
= o
->ptr
;
2859 listNode
*ln
, *next
;
2860 int toremove
= atoi(c
->argv
[2]->ptr
);
2865 toremove
= -toremove
;
2868 ln
= fromtail
? list
->tail
: list
->head
;
2870 robj
*ele
= listNodeValue(ln
);
2872 next
= fromtail
? ln
->prev
: ln
->next
;
2873 if (sdscmp(ele
->ptr
,c
->argv
[3]->ptr
) == 0) {
2874 listDelNode(list
,ln
);
2877 if (toremove
&& removed
== toremove
) break;
2881 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",removed
));
2886 /* ==================================== Sets ================================ */
2888 static void saddCommand(redisClient
*c
) {
2891 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2893 set
= createSetObject();
2894 dictAdd(c
->db
->dict
,c
->argv
[1],set
);
2895 incrRefCount(c
->argv
[1]);
2897 if (set
->type
!= REDIS_SET
) {
2898 addReply(c
,shared
.wrongtypeerr
);
2902 if (dictAdd(set
->ptr
,c
->argv
[2],NULL
) == DICT_OK
) {
2903 incrRefCount(c
->argv
[2]);
2905 addReply(c
,shared
.cone
);
2907 addReply(c
,shared
.czero
);
2911 static void sremCommand(redisClient
*c
) {
2914 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2916 addReply(c
,shared
.czero
);
2918 if (set
->type
!= REDIS_SET
) {
2919 addReply(c
,shared
.wrongtypeerr
);
2922 if (dictDelete(set
->ptr
,c
->argv
[2]) == DICT_OK
) {
2924 addReply(c
,shared
.cone
);
2926 addReply(c
,shared
.czero
);
2931 static void smoveCommand(redisClient
*c
) {
2932 robj
*srcset
, *dstset
;
2934 srcset
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2935 dstset
= lookupKeyWrite(c
->db
,c
->argv
[2]);
2937 /* If the source key does not exist return 0, if it's of the wrong type
2939 if (srcset
== NULL
|| srcset
->type
!= REDIS_SET
) {
2940 addReply(c
, srcset
? shared
.wrongtypeerr
: shared
.czero
);
2943 /* Error if the destination key is not a set as well */
2944 if (dstset
&& dstset
->type
!= REDIS_SET
) {
2945 addReply(c
,shared
.wrongtypeerr
);
2948 /* Remove the element from the source set */
2949 if (dictDelete(srcset
->ptr
,c
->argv
[3]) == DICT_ERR
) {
2950 /* Key not found in the src set! return zero */
2951 addReply(c
,shared
.czero
);
2955 /* Add the element to the destination set */
2957 dstset
= createSetObject();
2958 dictAdd(c
->db
->dict
,c
->argv
[2],dstset
);
2959 incrRefCount(c
->argv
[2]);
2961 if (dictAdd(dstset
->ptr
,c
->argv
[3],NULL
) == DICT_OK
)
2962 incrRefCount(c
->argv
[3]);
2963 addReply(c
,shared
.cone
);
2966 static void sismemberCommand(redisClient
*c
) {
2969 set
= lookupKeyRead(c
->db
,c
->argv
[1]);
2971 addReply(c
,shared
.czero
);
2973 if (set
->type
!= REDIS_SET
) {
2974 addReply(c
,shared
.wrongtypeerr
);
2977 if (dictFind(set
->ptr
,c
->argv
[2]))
2978 addReply(c
,shared
.cone
);
2980 addReply(c
,shared
.czero
);
2984 static void scardCommand(redisClient
*c
) {
2988 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2990 addReply(c
,shared
.czero
);
2993 if (o
->type
!= REDIS_SET
) {
2994 addReply(c
,shared
.wrongtypeerr
);
2997 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",
3003 static int qsortCompareSetsByCardinality(const void *s1
, const void *s2
) {
3004 dict
**d1
= (void*) s1
, **d2
= (void*) s2
;
3006 return dictSize(*d1
)-dictSize(*d2
);
3009 static void sinterGenericCommand(redisClient
*c
, robj
**setskeys
, int setsnum
, robj
*dstkey
) {
3010 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
3013 robj
*lenobj
= NULL
, *dstset
= NULL
;
3014 int j
, cardinality
= 0;
3016 if (!dv
) oom("sinterGenericCommand");
3017 for (j
= 0; j
< setsnum
; j
++) {
3021 lookupKeyWrite(c
->db
,setskeys
[j
]) :
3022 lookupKeyRead(c
->db
,setskeys
[j
]);
3026 deleteKey(c
->db
,dstkey
);
3027 addReply(c
,shared
.ok
);
3029 addReply(c
,shared
.nullmultibulk
);
3033 if (setobj
->type
!= REDIS_SET
) {
3035 addReply(c
,shared
.wrongtypeerr
);
3038 dv
[j
] = setobj
->ptr
;
3040 /* Sort sets from the smallest to largest, this will improve our
3041 * algorithm's performace */
3042 qsort(dv
,setsnum
,sizeof(dict
*),qsortCompareSetsByCardinality
);
3044 /* The first thing we should output is the total number of elements...
3045 * since this is a multi-bulk write, but at this stage we don't know
3046 * the intersection set size, so we use a trick, append an empty object
3047 * to the output list and save the pointer to later modify it with the
3050 lenobj
= createObject(REDIS_STRING
,NULL
);
3052 decrRefCount(lenobj
);
3054 /* If we have a target key where to store the resulting set
3055 * create this key with an empty set inside */
3056 dstset
= createSetObject();
3059 /* Iterate all the elements of the first (smallest) set, and test
3060 * the element against all the other sets, if at least one set does
3061 * not include the element it is discarded */
3062 di
= dictGetIterator(dv
[0]);
3063 if (!di
) oom("dictGetIterator");
3065 while((de
= dictNext(di
)) != NULL
) {
3068 for (j
= 1; j
< setsnum
; j
++)
3069 if (dictFind(dv
[j
],dictGetEntryKey(de
)) == NULL
) break;
3071 continue; /* at least one set does not contain the member */
3072 ele
= dictGetEntryKey(de
);
3074 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(ele
->ptr
)));
3076 addReply(c
,shared
.crlf
);
3079 dictAdd(dstset
->ptr
,ele
,NULL
);
3083 dictReleaseIterator(di
);
3086 /* Store the resulting set into the target */
3087 deleteKey(c
->db
,dstkey
);
3088 dictAdd(c
->db
->dict
,dstkey
,dstset
);
3089 incrRefCount(dstkey
);
3093 lenobj
->ptr
= sdscatprintf(sdsempty(),"*%d\r\n",cardinality
);
3095 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",
3096 dictSize((dict
*)dstset
->ptr
)));
3102 static void sinterCommand(redisClient
*c
) {
3103 sinterGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
);
3106 static void sinterstoreCommand(redisClient
*c
) {
3107 sinterGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1]);
3110 #define REDIS_OP_UNION 0
3111 #define REDIS_OP_DIFF 1
3113 static void sunionDiffGenericCommand(redisClient
*c
, robj
**setskeys
, int setsnum
, robj
*dstkey
, int op
) {
3114 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
3117 robj
*dstset
= NULL
;
3118 int j
, cardinality
= 0;
3120 if (!dv
) oom("sunionDiffGenericCommand");
3121 for (j
= 0; j
< setsnum
; j
++) {
3125 lookupKeyWrite(c
->db
,setskeys
[j
]) :
3126 lookupKeyRead(c
->db
,setskeys
[j
]);
3131 if (setobj
->type
!= REDIS_SET
) {
3133 addReply(c
,shared
.wrongtypeerr
);
3136 dv
[j
] = setobj
->ptr
;
3139 /* We need a temp set object to store our union. If the dstkey
3140 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
3141 * this set object will be the resulting object to set into the target key*/
3142 dstset
= createSetObject();
3144 /* Iterate all the elements of all the sets, add every element a single
3145 * time to the result set */
3146 for (j
= 0; j
< setsnum
; j
++) {
3147 if (op
== REDIS_OP_DIFF
&& j
== 0 && !dv
[j
]) break; /* result set is empty */
3148 if (!dv
[j
]) continue; /* non existing keys are like empty sets */
3150 di
= dictGetIterator(dv
[j
]);
3151 if (!di
) oom("dictGetIterator");
3153 while((de
= dictNext(di
)) != NULL
) {
3156 /* dictAdd will not add the same element multiple times */
3157 ele
= dictGetEntryKey(de
);
3158 if (op
== REDIS_OP_UNION
|| j
== 0) {
3159 if (dictAdd(dstset
->ptr
,ele
,NULL
) == DICT_OK
) {
3163 } else if (op
== REDIS_OP_DIFF
) {
3164 if (dictDelete(dstset
->ptr
,ele
) == DICT_OK
) {
3169 dictReleaseIterator(di
);
3171 if (op
== REDIS_OP_DIFF
&& cardinality
== 0) break; /* result set is empty */
3174 /* Output the content of the resulting set, if not in STORE mode */
3176 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",cardinality
));
3177 di
= dictGetIterator(dstset
->ptr
);
3178 if (!di
) oom("dictGetIterator");
3179 while((de
= dictNext(di
)) != NULL
) {
3182 ele
= dictGetEntryKey(de
);
3183 addReplySds(c
,sdscatprintf(sdsempty(),
3184 "$%d\r\n",sdslen(ele
->ptr
)));
3186 addReply(c
,shared
.crlf
);
3188 dictReleaseIterator(di
);
3190 /* If we have a target key where to store the resulting set
3191 * create this key with the result set inside */
3192 deleteKey(c
->db
,dstkey
);
3193 dictAdd(c
->db
->dict
,dstkey
,dstset
);
3194 incrRefCount(dstkey
);
3199 decrRefCount(dstset
);
3201 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",
3202 dictSize((dict
*)dstset
->ptr
)));
3208 static void sunionCommand(redisClient
*c
) {
3209 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_UNION
);
3212 static void sunionstoreCommand(redisClient
*c
) {
3213 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_UNION
);
3216 static void sdiffCommand(redisClient
*c
) {
3217 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_DIFF
);
3220 static void sdiffstoreCommand(redisClient
*c
) {
3221 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_DIFF
);
3224 static void flushdbCommand(redisClient
*c
) {
3225 server
.dirty
+= dictSize(c
->db
->dict
);
3226 dictEmpty(c
->db
->dict
);
3227 dictEmpty(c
->db
->expires
);
3228 addReply(c
,shared
.ok
);
3231 static void flushallCommand(redisClient
*c
) {
3232 server
.dirty
+= emptyDb();
3233 addReply(c
,shared
.ok
);
3234 rdbSave(server
.dbfilename
);
3238 redisSortOperation
*createSortOperation(int type
, robj
*pattern
) {
3239 redisSortOperation
*so
= zmalloc(sizeof(*so
));
3240 if (!so
) oom("createSortOperation");
3242 so
->pattern
= pattern
;
3246 /* Return the value associated to the key with a name obtained
3247 * substituting the first occurence of '*' in 'pattern' with 'subst' */
3248 robj
*lookupKeyByPattern(redisDb
*db
, robj
*pattern
, robj
*subst
) {
3252 int prefixlen
, sublen
, postfixlen
;
3253 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
3257 char buf
[REDIS_SORTKEY_MAX
+1];
3260 spat
= pattern
->ptr
;
3262 if (sdslen(spat
)+sdslen(ssub
)-1 > REDIS_SORTKEY_MAX
) return NULL
;
3263 p
= strchr(spat
,'*');
3264 if (!p
) return NULL
;
3267 sublen
= sdslen(ssub
);
3268 postfixlen
= sdslen(spat
)-(prefixlen
+1);
3269 memcpy(keyname
.buf
,spat
,prefixlen
);
3270 memcpy(keyname
.buf
+prefixlen
,ssub
,sublen
);
3271 memcpy(keyname
.buf
+prefixlen
+sublen
,p
+1,postfixlen
);
3272 keyname
.buf
[prefixlen
+sublen
+postfixlen
] = '\0';
3273 keyname
.len
= prefixlen
+sublen
+postfixlen
;
3275 keyobj
.refcount
= 1;
3276 keyobj
.type
= REDIS_STRING
;
3277 keyobj
.ptr
= ((char*)&keyname
)+(sizeof(long)*2);
3279 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
3280 return lookupKeyRead(db
,&keyobj
);
3283 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
3284 * the additional parameter is not standard but a BSD-specific we have to
3285 * pass sorting parameters via the global 'server' structure */
3286 static int sortCompare(const void *s1
, const void *s2
) {
3287 const redisSortObject
*so1
= s1
, *so2
= s2
;
3290 if (!server
.sort_alpha
) {
3291 /* Numeric sorting. Here it's trivial as we precomputed scores */
3292 if (so1
->u
.score
> so2
->u
.score
) {
3294 } else if (so1
->u
.score
< so2
->u
.score
) {
3300 /* Alphanumeric sorting */
3301 if (server
.sort_bypattern
) {
3302 if (!so1
->u
.cmpobj
|| !so2
->u
.cmpobj
) {
3303 /* At least one compare object is NULL */
3304 if (so1
->u
.cmpobj
== so2
->u
.cmpobj
)
3306 else if (so1
->u
.cmpobj
== NULL
)
3311 /* We have both the objects, use strcoll */
3312 cmp
= strcoll(so1
->u
.cmpobj
->ptr
,so2
->u
.cmpobj
->ptr
);
3315 /* Compare elements directly */
3316 cmp
= strcoll(so1
->obj
->ptr
,so2
->obj
->ptr
);
3319 return server
.sort_desc
? -cmp
: cmp
;
3322 /* The SORT command is the most complex command in Redis. Warning: this code
3323 * is optimized for speed and a bit less for readability */
3324 static void sortCommand(redisClient
*c
) {
3327 int desc
= 0, alpha
= 0;
3328 int limit_start
= 0, limit_count
= -1, start
, end
;
3329 int j
, dontsort
= 0, vectorlen
;
3330 int getop
= 0; /* GET operation counter */
3331 robj
*sortval
, *sortby
= NULL
;
3332 redisSortObject
*vector
; /* Resulting vector to sort */
3334 /* Lookup the key to sort. It must be of the right types */
3335 sortval
= lookupKeyRead(c
->db
,c
->argv
[1]);
3336 if (sortval
== NULL
) {
3337 addReply(c
,shared
.nokeyerr
);
3340 if (sortval
->type
!= REDIS_SET
&& sortval
->type
!= REDIS_LIST
) {
3341 addReply(c
,shared
.wrongtypeerr
);
3345 /* Create a list of operations to perform for every sorted element.
3346 * Operations can be GET/DEL/INCR/DECR */
3347 operations
= listCreate();
3348 listSetFreeMethod(operations
,zfree
);
3351 /* Now we need to protect sortval incrementing its count, in the future
3352 * SORT may have options able to overwrite/delete keys during the sorting
3353 * and the sorted key itself may get destroied */
3354 incrRefCount(sortval
);
3356 /* The SORT command has an SQL-alike syntax, parse it */
3357 while(j
< c
->argc
) {
3358 int leftargs
= c
->argc
-j
-1;
3359 if (!strcasecmp(c
->argv
[j
]->ptr
,"asc")) {
3361 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"desc")) {
3363 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"alpha")) {
3365 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"limit") && leftargs
>= 2) {
3366 limit_start
= atoi(c
->argv
[j
+1]->ptr
);
3367 limit_count
= atoi(c
->argv
[j
+2]->ptr
);
3369 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"by") && leftargs
>= 1) {
3370 sortby
= c
->argv
[j
+1];
3371 /* If the BY pattern does not contain '*', i.e. it is constant,
3372 * we don't need to sort nor to lookup the weight keys. */
3373 if (strchr(c
->argv
[j
+1]->ptr
,'*') == NULL
) dontsort
= 1;
3375 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"get") && leftargs
>= 1) {
3376 listAddNodeTail(operations
,createSortOperation(
3377 REDIS_SORT_GET
,c
->argv
[j
+1]));
3380 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"del") && leftargs
>= 1) {
3381 listAddNodeTail(operations
,createSortOperation(
3382 REDIS_SORT_DEL
,c
->argv
[j
+1]));
3384 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"incr") && leftargs
>= 1) {
3385 listAddNodeTail(operations
,createSortOperation(
3386 REDIS_SORT_INCR
,c
->argv
[j
+1]));
3388 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"get") && leftargs
>= 1) {
3389 listAddNodeTail(operations
,createSortOperation(
3390 REDIS_SORT_DECR
,c
->argv
[j
+1]));
3393 decrRefCount(sortval
);
3394 listRelease(operations
);
3395 addReply(c
,shared
.syntaxerr
);
3401 /* Load the sorting vector with all the objects to sort */
3402 vectorlen
= (sortval
->type
== REDIS_LIST
) ?
3403 listLength((list
*)sortval
->ptr
) :
3404 dictSize((dict
*)sortval
->ptr
);
3405 vector
= zmalloc(sizeof(redisSortObject
)*vectorlen
);
3406 if (!vector
) oom("allocating objects vector for SORT");
3408 if (sortval
->type
== REDIS_LIST
) {
3409 list
*list
= sortval
->ptr
;
3413 while((ln
= listYield(list
))) {
3414 robj
*ele
= ln
->value
;
3415 vector
[j
].obj
= ele
;
3416 vector
[j
].u
.score
= 0;
3417 vector
[j
].u
.cmpobj
= NULL
;
3421 dict
*set
= sortval
->ptr
;
3425 di
= dictGetIterator(set
);
3426 if (!di
) oom("dictGetIterator");
3427 while((setele
= dictNext(di
)) != NULL
) {
3428 vector
[j
].obj
= dictGetEntryKey(setele
);
3429 vector
[j
].u
.score
= 0;
3430 vector
[j
].u
.cmpobj
= NULL
;
3433 dictReleaseIterator(di
);
3435 assert(j
== vectorlen
);
3437 /* Now it's time to load the right scores in the sorting vector */
3438 if (dontsort
== 0) {
3439 for (j
= 0; j
< vectorlen
; j
++) {
3443 byval
= lookupKeyByPattern(c
->db
,sortby
,vector
[j
].obj
);
3444 if (!byval
|| byval
->type
!= REDIS_STRING
) continue;
3446 vector
[j
].u
.cmpobj
= byval
;
3447 incrRefCount(byval
);
3449 vector
[j
].u
.score
= strtod(byval
->ptr
,NULL
);
3452 if (!alpha
) vector
[j
].u
.score
= strtod(vector
[j
].obj
->ptr
,NULL
);
3457 /* We are ready to sort the vector... perform a bit of sanity check
3458 * on the LIMIT option too. We'll use a partial version of quicksort. */
3459 start
= (limit_start
< 0) ? 0 : limit_start
;
3460 end
= (limit_count
< 0) ? vectorlen
-1 : start
+limit_count
-1;
3461 if (start
>= vectorlen
) {
3462 start
= vectorlen
-1;
3465 if (end
>= vectorlen
) end
= vectorlen
-1;
3467 if (dontsort
== 0) {
3468 server
.sort_desc
= desc
;
3469 server
.sort_alpha
= alpha
;
3470 server
.sort_bypattern
= sortby
? 1 : 0;
3471 if (sortby
&& (start
!= 0 || end
!= vectorlen
-1))
3472 pqsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
, start
,end
);
3474 qsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
);
3477 /* Send command output to the output buffer, performing the specified
3478 * GET/DEL/INCR/DECR operations if any. */
3479 outputlen
= getop
? getop
*(end
-start
+1) : end
-start
+1;
3480 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",outputlen
));
3481 for (j
= start
; j
<= end
; j
++) {
3484 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",
3485 sdslen(vector
[j
].obj
->ptr
)));
3486 addReply(c
,vector
[j
].obj
);
3487 addReply(c
,shared
.crlf
);
3489 listRewind(operations
);
3490 while((ln
= listYield(operations
))) {
3491 redisSortOperation
*sop
= ln
->value
;
3492 robj
*val
= lookupKeyByPattern(c
->db
,sop
->pattern
,
3495 if (sop
->type
== REDIS_SORT_GET
) {
3496 if (!val
|| val
->type
!= REDIS_STRING
) {
3497 addReply(c
,shared
.nullbulk
);
3499 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",
3502 addReply(c
,shared
.crlf
);
3504 } else if (sop
->type
== REDIS_SORT_DEL
) {
3511 decrRefCount(sortval
);
3512 listRelease(operations
);
3513 for (j
= 0; j
< vectorlen
; j
++) {
3514 if (sortby
&& alpha
&& vector
[j
].u
.cmpobj
)
3515 decrRefCount(vector
[j
].u
.cmpobj
);
3520 static void infoCommand(redisClient
*c
) {
3522 time_t uptime
= time(NULL
)-server
.stat_starttime
;
3524 info
= sdscatprintf(sdsempty(),
3525 "redis_version:%s\r\n"
3526 "uptime_in_seconds:%d\r\n"
3527 "uptime_in_days:%d\r\n"
3528 "connected_clients:%d\r\n"
3529 "connected_slaves:%d\r\n"
3530 "used_memory:%zu\r\n"
3531 "changes_since_last_save:%lld\r\n"
3532 "bgsave_in_progress:%d\r\n"
3533 "last_save_time:%d\r\n"
3534 "total_connections_received:%lld\r\n"
3535 "total_commands_processed:%lld\r\n"
3540 listLength(server
.clients
)-listLength(server
.slaves
),
3541 listLength(server
.slaves
),
3544 server
.bgsaveinprogress
,
3546 server
.stat_numconnections
,
3547 server
.stat_numcommands
,
3548 server
.masterhost
== NULL
? "master" : "slave"
3550 if (server
.masterhost
) {
3551 info
= sdscatprintf(info
,
3552 "master_host:%s\r\n"
3553 "master_port:%d\r\n"
3554 "master_link_status:%s\r\n"
3555 "master_last_io_seconds_ago:%d\r\n"
3558 (server
.replstate
== REDIS_REPL_CONNECTED
) ?
3560 (int)(time(NULL
)-server
.master
->lastinteraction
)
3563 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info
)));
3564 addReplySds(c
,info
);
3565 addReply(c
,shared
.crlf
);
3568 static void monitorCommand(redisClient
*c
) {
3569 /* ignore MONITOR if aleady slave or in monitor mode */
3570 if (c
->flags
& REDIS_SLAVE
) return;
3572 c
->flags
|= (REDIS_SLAVE
|REDIS_MONITOR
);
3574 if (!listAddNodeTail(server
.monitors
,c
)) oom("listAddNodeTail");
3575 addReply(c
,shared
.ok
);
3578 /* ================================= Expire ================================= */
3579 static int removeExpire(redisDb
*db
, robj
*key
) {
3580 if (dictDelete(db
->expires
,key
) == DICT_OK
) {
3587 static int setExpire(redisDb
*db
, robj
*key
, time_t when
) {
3588 if (dictAdd(db
->expires
,key
,(void*)when
) == DICT_ERR
) {
3596 /* Return the expire time of the specified key, or -1 if no expire
3597 * is associated with this key (i.e. the key is non volatile) */
3598 static time_t getExpire(redisDb
*db
, robj
*key
) {
3601 /* No expire? return ASAP */
3602 if (dictSize(db
->expires
) == 0 ||
3603 (de
= dictFind(db
->expires
,key
)) == NULL
) return -1;
3605 return (time_t) dictGetEntryVal(de
);
3608 static int expireIfNeeded(redisDb
*db
, robj
*key
) {
3612 /* No expire? return ASAP */
3613 if (dictSize(db
->expires
) == 0 ||
3614 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
3616 /* Lookup the expire */
3617 when
= (time_t) dictGetEntryVal(de
);
3618 if (time(NULL
) <= when
) return 0;
3620 /* Delete the key */
3621 dictDelete(db
->expires
,key
);
3622 return dictDelete(db
->dict
,key
) == DICT_OK
;
3625 static int deleteIfVolatile(redisDb
*db
, robj
*key
) {
3628 /* No expire? return ASAP */
3629 if (dictSize(db
->expires
) == 0 ||
3630 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
3632 /* Delete the key */
3634 dictDelete(db
->expires
,key
);
3635 return dictDelete(db
->dict
,key
) == DICT_OK
;
3638 static void expireCommand(redisClient
*c
) {
3640 int seconds
= atoi(c
->argv
[2]->ptr
);
3642 de
= dictFind(c
->db
->dict
,c
->argv
[1]);
3644 addReply(c
,shared
.czero
);
3648 addReply(c
, shared
.czero
);
3651 time_t when
= time(NULL
)+seconds
;
3652 if (setExpire(c
->db
,c
->argv
[1],when
))
3653 addReply(c
,shared
.cone
);
3655 addReply(c
,shared
.czero
);
3660 static void ttlCommand(redisClient
*c
) {
3664 expire
= getExpire(c
->db
,c
->argv
[1]);
3666 ttl
= (int) (expire
-time(NULL
));
3667 if (ttl
< 0) ttl
= -1;
3669 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",ttl
));
3672 /* =============================== Replication ============================= */
3674 static int syncWrite(int fd
, char *ptr
, ssize_t size
, int timeout
) {
3675 ssize_t nwritten
, ret
= size
;
3676 time_t start
= time(NULL
);
3680 if (aeWait(fd
,AE_WRITABLE
,1000) & AE_WRITABLE
) {
3681 nwritten
= write(fd
,ptr
,size
);
3682 if (nwritten
== -1) return -1;
3686 if ((time(NULL
)-start
) > timeout
) {
3694 static int syncRead(int fd
, char *ptr
, ssize_t size
, int timeout
) {
3695 ssize_t nread
, totread
= 0;
3696 time_t start
= time(NULL
);
3700 if (aeWait(fd
,AE_READABLE
,1000) & AE_READABLE
) {
3701 nread
= read(fd
,ptr
,size
);
3702 if (nread
== -1) return -1;
3707 if ((time(NULL
)-start
) > timeout
) {
3715 static int syncReadLine(int fd
, char *ptr
, ssize_t size
, int timeout
) {
3722 if (syncRead(fd
,&c
,1,timeout
) == -1) return -1;
3725 if (nread
&& *(ptr
-1) == '\r') *(ptr
-1) = '\0';
3736 static void syncCommand(redisClient
*c
) {
3737 /* ignore SYNC if aleady slave or in monitor mode */
3738 if (c
->flags
& REDIS_SLAVE
) return;
3740 /* SYNC can't be issued when the server has pending data to send to
3741 * the client about already issued commands. We need a fresh reply
3742 * buffer registering the differences between the BGSAVE and the current
3743 * dataset, so that we can copy to other slaves if needed. */
3744 if (listLength(c
->reply
) != 0) {
3745 addReplySds(c
,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
3749 redisLog(REDIS_NOTICE
,"Slave ask for synchronization");
3750 /* Here we need to check if there is a background saving operation
3751 * in progress, or if it is required to start one */
3752 if (server
.bgsaveinprogress
) {
3753 /* Ok a background save is in progress. Let's check if it is a good
3754 * one for replication, i.e. if there is another slave that is
3755 * registering differences since the server forked to save */
3759 listRewind(server
.slaves
);
3760 while((ln
= listYield(server
.slaves
))) {
3762 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) break;
3765 /* Perfect, the server is already registering differences for
3766 * another slave. Set the right state, and copy the buffer. */
3767 listRelease(c
->reply
);
3768 c
->reply
= listDup(slave
->reply
);
3769 if (!c
->reply
) oom("listDup copying slave reply list");
3770 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
3771 redisLog(REDIS_NOTICE
,"Waiting for end of BGSAVE for SYNC");
3773 /* No way, we need to wait for the next BGSAVE in order to
3774 * register differences */
3775 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_START
;
3776 redisLog(REDIS_NOTICE
,"Waiting for next BGSAVE for SYNC");
3779 /* Ok we don't have a BGSAVE in progress, let's start one */
3780 redisLog(REDIS_NOTICE
,"Starting BGSAVE for SYNC");
3781 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
3782 redisLog(REDIS_NOTICE
,"Replication failed, can't BGSAVE");
3783 addReplySds(c
,sdsnew("-ERR Unalbe to perform background save\r\n"));
3786 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
3789 c
->flags
|= REDIS_SLAVE
;
3791 if (!listAddNodeTail(server
.slaves
,c
)) oom("listAddNodeTail");
3795 static void sendBulkToSlave(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
3796 redisClient
*slave
= privdata
;
3798 REDIS_NOTUSED(mask
);
3799 char buf
[REDIS_IOBUF_LEN
];
3800 ssize_t nwritten
, buflen
;
3802 if (slave
->repldboff
== 0) {
3803 /* Write the bulk write count before to transfer the DB. In theory here
3804 * we don't know how much room there is in the output buffer of the
3805 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
3806 * operations) will never be smaller than the few bytes we need. */
3809 bulkcount
= sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
3811 if (write(fd
,bulkcount
,sdslen(bulkcount
)) != (signed)sdslen(bulkcount
))
3819 lseek(slave
->repldbfd
,slave
->repldboff
,SEEK_SET
);
3820 buflen
= read(slave
->repldbfd
,buf
,REDIS_IOBUF_LEN
);
3822 redisLog(REDIS_WARNING
,"Read error sending DB to slave: %s",
3823 (buflen
== 0) ? "premature EOF" : strerror(errno
));
3827 if ((nwritten
= write(fd
,buf
,buflen
)) == -1) {
3828 redisLog(REDIS_DEBUG
,"Write error sending DB to slave: %s",
3833 slave
->repldboff
+= nwritten
;
3834 if (slave
->repldboff
== slave
->repldbsize
) {
3835 close(slave
->repldbfd
);
3836 slave
->repldbfd
= -1;
3837 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
3838 slave
->replstate
= REDIS_REPL_ONLINE
;
3839 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
,
3840 sendReplyToClient
, slave
, NULL
) == AE_ERR
) {
3844 addReplySds(slave
,sdsempty());
3845 redisLog(REDIS_NOTICE
,"Synchronization with slave succeeded");
3849 static void updateSalvesWaitingBgsave(int bgsaveerr
) {
3851 int startbgsave
= 0;
3853 listRewind(server
.slaves
);
3854 while((ln
= listYield(server
.slaves
))) {
3855 redisClient
*slave
= ln
->value
;
3857 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) {
3859 slave
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
3860 } else if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) {
3863 if (bgsaveerr
!= REDIS_OK
) {
3865 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE child returned an error");
3868 if ((slave
->repldbfd
= open(server
.dbfilename
,O_RDONLY
)) == -1 ||
3869 fstat(slave
->repldbfd
,&buf
) == -1) {
3871 redisLog(REDIS_WARNING
,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno
));
3874 slave
->repldboff
= 0;
3875 slave
->repldbsize
= buf
.st_size
;
3876 slave
->replstate
= REDIS_REPL_SEND_BULK
;
3877 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
3878 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
, sendBulkToSlave
, slave
, NULL
) == AE_ERR
) {
3885 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
3886 listRewind(server
.slaves
);
3887 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE failed");
3888 while((ln
= listYield(server
.slaves
))) {
3889 redisClient
*slave
= ln
->value
;
3891 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
)
3898 static int syncWithMaster(void) {
3899 char buf
[1024], tmpfile
[256];
3901 int fd
= anetTcpConnect(NULL
,server
.masterhost
,server
.masterport
);
3905 redisLog(REDIS_WARNING
,"Unable to connect to MASTER: %s",
3909 /* Issue the SYNC command */
3910 if (syncWrite(fd
,"SYNC \r\n",7,5) == -1) {
3912 redisLog(REDIS_WARNING
,"I/O error writing to MASTER: %s",
3916 /* Read the bulk write count */
3917 if (syncReadLine(fd
,buf
,1024,3600) == -1) {
3919 redisLog(REDIS_WARNING
,"I/O error reading bulk count from MASTER: %s",
3923 dumpsize
= atoi(buf
+1);
3924 redisLog(REDIS_NOTICE
,"Receiving %d bytes data dump from MASTER",dumpsize
);
3925 /* Read the bulk write data on a temp file */
3926 snprintf(tmpfile
,256,"temp-%d.%ld.rdb",(int)time(NULL
),(long int)random());
3927 dfd
= open(tmpfile
,O_CREAT
|O_WRONLY
,0644);
3930 redisLog(REDIS_WARNING
,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno
));
3934 int nread
, nwritten
;
3936 nread
= read(fd
,buf
,(dumpsize
< 1024)?dumpsize
:1024);
3938 redisLog(REDIS_WARNING
,"I/O error trying to sync with MASTER: %s",
3944 nwritten
= write(dfd
,buf
,nread
);
3945 if (nwritten
== -1) {
3946 redisLog(REDIS_WARNING
,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno
));
3954 if (rename(tmpfile
,server
.dbfilename
) == -1) {
3955 redisLog(REDIS_WARNING
,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno
));
3961 if (rdbLoad(server
.dbfilename
) != REDIS_OK
) {
3962 redisLog(REDIS_WARNING
,"Failed trying to load the MASTER synchronization DB from disk");
3966 server
.master
= createClient(fd
);
3967 server
.master
->flags
|= REDIS_MASTER
;
3968 server
.replstate
= REDIS_REPL_CONNECTED
;
3972 static void slaveofCommand(redisClient
*c
) {
3973 if (!strcasecmp(c
->argv
[1]->ptr
,"no") &&
3974 !strcasecmp(c
->argv
[2]->ptr
,"one")) {
3975 if (server
.masterhost
) {
3976 sdsfree(server
.masterhost
);
3977 server
.masterhost
= NULL
;
3978 if (server
.master
) freeClient(server
.master
);
3979 server
.replstate
= REDIS_REPL_NONE
;
3980 redisLog(REDIS_NOTICE
,"MASTER MODE enabled (user request)");
3983 sdsfree(server
.masterhost
);
3984 server
.masterhost
= sdsdup(c
->argv
[1]->ptr
);
3985 server
.masterport
= atoi(c
->argv
[2]->ptr
);
3986 if (server
.master
) freeClient(server
.master
);
3987 server
.replstate
= REDIS_REPL_CONNECT
;
3988 redisLog(REDIS_NOTICE
,"SLAVE OF %s:%d enabled (user request)",
3989 server
.masterhost
, server
.masterport
);
3991 addReply(c
,shared
.ok
);
3994 /* ============================ Maxmemory directive ======================== */
3996 /* This function gets called when 'maxmemory' is set on the config file to limit
3997 * the max memory used by the server, and we are out of memory.
3998 * This function will try to, in order:
4000 * - Free objects from the free list
4001 * - Try to remove keys with an EXPIRE set
4003 * It is not possible to free enough memory to reach used-memory < maxmemory
4004 * the server will start refusing commands that will enlarge even more the
4007 static void freeMemoryIfNeeded(void) {
4008 while (server
.maxmemory
&& zmalloc_used_memory() > server
.maxmemory
) {
4009 if (listLength(server
.objfreelist
)) {
4012 listNode
*head
= listFirst(server
.objfreelist
);
4013 o
= listNodeValue(head
);
4014 listDelNode(server
.objfreelist
,head
);
4017 int j
, k
, freed
= 0;
4019 for (j
= 0; j
< server
.dbnum
; j
++) {
4021 robj
*minkey
= NULL
;
4022 struct dictEntry
*de
;
4024 if (dictSize(server
.db
[j
].expires
)) {
4026 /* From a sample of three keys drop the one nearest to
4027 * the natural expire */
4028 for (k
= 0; k
< 3; k
++) {
4031 de
= dictGetRandomKey(server
.db
[j
].expires
);
4032 t
= (time_t) dictGetEntryVal(de
);
4033 if (minttl
== -1 || t
< minttl
) {
4034 minkey
= dictGetEntryKey(de
);
4038 deleteKey(server
.db
+j
,minkey
);
4041 if (!freed
) return; /* nothing to free... */
4046 /* =================================== Main! ================================ */
4049 int linuxOvercommitMemoryValue(void) {
4050 FILE *fp
= fopen("/proc/sys/vm/overcommit_memory","r");
4054 if (fgets(buf
,64,fp
) == NULL
) {
4063 void linuxOvercommitMemoryWarning(void) {
4064 if (linuxOvercommitMemoryValue() == 0) {
4065 redisLog(REDIS_WARNING
,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'echo 1 > /proc/sys/vm/overcommit_memory' in your init scripts.");
4068 #endif /* __linux__ */
4070 static void daemonize(void) {
4074 if (fork() != 0) exit(0); /* parent exits */
4075 setsid(); /* create a new session */
4077 /* Every output goes to /dev/null. If Redis is daemonized but
4078 * the 'logfile' is set to 'stdout' in the configuration file
4079 * it will not log at all. */
4080 if ((fd
= open("/dev/null", O_RDWR
, 0)) != -1) {
4081 dup2(fd
, STDIN_FILENO
);
4082 dup2(fd
, STDOUT_FILENO
);
4083 dup2(fd
, STDERR_FILENO
);
4084 if (fd
> STDERR_FILENO
) close(fd
);
4086 /* Try to write the pid file */
4087 fp
= fopen(server
.pidfile
,"w");
4089 fprintf(fp
,"%d\n",getpid());
4094 int main(int argc
, char **argv
) {
4096 linuxOvercommitMemoryWarning();
4101 ResetServerSaveParams();
4102 loadServerConfig(argv
[1]);
4103 } else if (argc
> 2) {
4104 fprintf(stderr
,"Usage: ./redis-server [/path/to/redis.conf]\n");
4107 redisLog(REDIS_WARNING
,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
4110 if (server
.daemonize
) daemonize();
4111 redisLog(REDIS_NOTICE
,"Server started, Redis version " REDIS_VERSION
);
4112 if (rdbLoad(server
.dbfilename
) == REDIS_OK
)
4113 redisLog(REDIS_NOTICE
,"DB loaded from disk");
4114 if (aeCreateFileEvent(server
.el
, server
.fd
, AE_READABLE
,
4115 acceptHandler
, NULL
, NULL
) == AE_ERR
) oom("creating file event");
4116 redisLog(REDIS_NOTICE
,"The server is now ready to accept connections on port %d", server
.port
);
4118 aeDeleteEventLoop(server
.el
);