2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
30 #define REDIS_VERSION "0.101"
46 #include <arpa/inet.h>
50 #include <sys/resource.h>
54 #include "ae.h" /* Event driven programming library */
55 #include "sds.h" /* Dynamic safe strings */
56 #include "anet.h" /* Networking the easy way */
57 #include "dict.h" /* Hash tables */
58 #include "adlist.h" /* Linked lists */
59 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
60 #include "lzf.h" /* LZF compression library */
61 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
67 /* Static server configuration */
68 #define REDIS_SERVERPORT 6379 /* TCP port */
69 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
70 #define REDIS_IOBUF_LEN 1024
71 #define REDIS_LOADBUF_LEN 1024
72 #define REDIS_STATIC_ARGS 4
73 #define REDIS_DEFAULT_DBNUM 16
74 #define REDIS_CONFIGLINE_MAX 1024
75 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
76 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
77 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
79 /* Hash table parameters */
80 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
81 #define REDIS_HT_MINSLOTS 16384 /* Never resize the HT under this */
84 #define REDIS_CMD_BULK 1 /* Bulk write command */
85 #define REDIS_CMD_INLINE 2 /* Inline command */
86 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
87 this flags will return an error when the 'maxmemory' option is set in the
88 config file and the server is using more than maxmemory bytes of memory.
89 In short this commands are denied on low memory conditions. */
90 #define REDIS_CMD_DENYOOM 4
93 #define REDIS_STRING 0
98 /* Object types only used for dumping to disk */
99 #define REDIS_EXPIRETIME 253
100 #define REDIS_SELECTDB 254
101 #define REDIS_EOF 255
103 /* Defines related to the dump file format. To store 32 bits lengths for short
104 * keys requires a lot of space, so we check the most significant 2 bits of
105 * the first byte to interpreter the length:
107 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
108 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
109 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
110 * 11|000000 this means: specially encoded object will follow. The six bits
111 * number specify the kind of object that follows.
112 * See the REDIS_RDB_ENC_* defines.
114 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
115 * values, will fit inside. */
116 #define REDIS_RDB_6BITLEN 0
117 #define REDIS_RDB_14BITLEN 1
118 #define REDIS_RDB_32BITLEN 2
119 #define REDIS_RDB_ENCVAL 3
120 #define REDIS_RDB_LENERR UINT_MAX
122 /* When a length of a string object stored on disk has the first two bits
123 * set, the remaining two bits specify a special encoding for the object
124 * accordingly to the following defines: */
125 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
126 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
127 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
128 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
131 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
132 #define REDIS_SLAVE 2 /* This client is a slave server */
133 #define REDIS_MASTER 4 /* This client is a master server */
134 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
136 /* Slave replication state - slave side */
137 #define REDIS_REPL_NONE 0 /* No active replication */
138 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
139 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
141 /* Slave replication state - from the point of view of master
142 * Note that in SEND_BULK and ONLINE state the slave receives new updates
143 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
144 * to start the next background saving in order to send updates to it. */
145 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
146 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
147 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
148 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
150 /* List related stuff */
154 /* Sort operations */
155 #define REDIS_SORT_GET 0
156 #define REDIS_SORT_DEL 1
157 #define REDIS_SORT_INCR 2
158 #define REDIS_SORT_DECR 3
159 #define REDIS_SORT_ASC 4
160 #define REDIS_SORT_DESC 5
161 #define REDIS_SORTKEY_MAX 1024
164 #define REDIS_DEBUG 0
165 #define REDIS_NOTICE 1
166 #define REDIS_WARNING 2
168 /* Anti-warning macro... */
169 #define REDIS_NOTUSED(V) ((void) V)
171 /*================================= Data types ============================== */
173 /* A redis object, that is a type able to hold a string / list / set */
174 typedef struct redisObject
{
180 typedef struct redisDb
{
186 /* With multiplexing we need to take per-clinet state.
187 * Clients are taken in a liked list. */
188 typedef struct redisClient
{
195 int bulklen
; /* bulk read len. -1 if not in bulk read mode */
198 time_t lastinteraction
; /* time of the last interaction, used for timeout */
199 int flags
; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
200 int slaveseldb
; /* slave selected db, if this client is a slave */
201 int authenticated
; /* when requirepass is non-NULL */
202 int replstate
; /* replication state if this is a slave */
203 int repldbfd
; /* replication DB file descriptor */
204 long repldboff
; /* replication DB file offset */
205 off_t repldbsize
; /* replication DB file size */
213 /* Global server state structure */
219 unsigned int sharingpoolsize
;
220 long long dirty
; /* changes to DB from the last save */
222 list
*slaves
, *monitors
;
223 char neterr
[ANET_ERR_LEN
];
225 int cronloops
; /* number of times the cron function run */
226 list
*objfreelist
; /* A list of freed objects to avoid malloc() */
227 time_t lastsave
; /* Unix time of last save succeeede */
228 size_t usedmemory
; /* Used memory in megabytes */
229 /* Fields used only for stats */
230 time_t stat_starttime
; /* server start time */
231 long long stat_numcommands
; /* number of processed commands */
232 long long stat_numconnections
; /* number of connections received */
240 int bgsaveinprogress
;
241 struct saveparam
*saveparams
;
248 /* Replication related */
252 redisClient
*master
; /* client that is master for this slave */
254 unsigned int maxclients
;
255 unsigned int maxmemory
;
256 /* Sort parameters - qsort_r() is only available under BSD so we
257 * have to take this state global, in order to pass it to sortCompare() */
263 typedef void redisCommandProc(redisClient
*c
);
264 struct redisCommand
{
266 redisCommandProc
*proc
;
271 typedef struct _redisSortObject
{
279 typedef struct _redisSortOperation
{
282 } redisSortOperation
;
284 struct sharedObjectsStruct
{
285 robj
*crlf
, *ok
, *err
, *emptybulk
, *czero
, *cone
, *pong
, *space
,
286 *colon
, *nullbulk
, *nullmultibulk
,
287 *emptymultibulk
, *wrongtypeerr
, *nokeyerr
, *syntaxerr
, *sameobjecterr
,
288 *outofrangeerr
, *plus
,
289 *select0
, *select1
, *select2
, *select3
, *select4
,
290 *select5
, *select6
, *select7
, *select8
, *select9
;
293 /*================================ Prototypes =============================== */
295 static void freeStringObject(robj
*o
);
296 static void freeListObject(robj
*o
);
297 static void freeSetObject(robj
*o
);
298 static void decrRefCount(void *o
);
299 static robj
*createObject(int type
, void *ptr
);
300 static void freeClient(redisClient
*c
);
301 static int rdbLoad(char *filename
);
302 static void addReply(redisClient
*c
, robj
*obj
);
303 static void addReplySds(redisClient
*c
, sds s
);
304 static void incrRefCount(robj
*o
);
305 static int rdbSaveBackground(char *filename
);
306 static robj
*createStringObject(char *ptr
, size_t len
);
307 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
);
308 static int syncWithMaster(void);
309 static robj
*tryObjectSharing(robj
*o
);
310 static int removeExpire(redisDb
*db
, robj
*key
);
311 static int expireIfNeeded(redisDb
*db
, robj
*key
);
312 static int deleteIfVolatile(redisDb
*db
, robj
*key
);
313 static int deleteKey(redisDb
*db
, robj
*key
);
314 static time_t getExpire(redisDb
*db
, robj
*key
);
315 static int setExpire(redisDb
*db
, robj
*key
, time_t when
);
316 static void updateSalvesWaitingBgsave(int bgsaveerr
);
317 static void freeMemoryIfNeeded(void);
319 static void authCommand(redisClient
*c
);
320 static void pingCommand(redisClient
*c
);
321 static void echoCommand(redisClient
*c
);
322 static void setCommand(redisClient
*c
);
323 static void setnxCommand(redisClient
*c
);
324 static void getCommand(redisClient
*c
);
325 static void delCommand(redisClient
*c
);
326 static void existsCommand(redisClient
*c
);
327 static void incrCommand(redisClient
*c
);
328 static void decrCommand(redisClient
*c
);
329 static void incrbyCommand(redisClient
*c
);
330 static void decrbyCommand(redisClient
*c
);
331 static void selectCommand(redisClient
*c
);
332 static void randomkeyCommand(redisClient
*c
);
333 static void keysCommand(redisClient
*c
);
334 static void dbsizeCommand(redisClient
*c
);
335 static void lastsaveCommand(redisClient
*c
);
336 static void saveCommand(redisClient
*c
);
337 static void bgsaveCommand(redisClient
*c
);
338 static void shutdownCommand(redisClient
*c
);
339 static void moveCommand(redisClient
*c
);
340 static void renameCommand(redisClient
*c
);
341 static void renamenxCommand(redisClient
*c
);
342 static void lpushCommand(redisClient
*c
);
343 static void rpushCommand(redisClient
*c
);
344 static void lpopCommand(redisClient
*c
);
345 static void rpopCommand(redisClient
*c
);
346 static void llenCommand(redisClient
*c
);
347 static void lindexCommand(redisClient
*c
);
348 static void lrangeCommand(redisClient
*c
);
349 static void ltrimCommand(redisClient
*c
);
350 static void typeCommand(redisClient
*c
);
351 static void lsetCommand(redisClient
*c
);
352 static void saddCommand(redisClient
*c
);
353 static void sremCommand(redisClient
*c
);
354 static void smoveCommand(redisClient
*c
);
355 static void sismemberCommand(redisClient
*c
);
356 static void scardCommand(redisClient
*c
);
357 static void sinterCommand(redisClient
*c
);
358 static void sinterstoreCommand(redisClient
*c
);
359 static void sunionCommand(redisClient
*c
);
360 static void sunionstoreCommand(redisClient
*c
);
361 static void sdiffCommand(redisClient
*c
);
362 static void sdiffstoreCommand(redisClient
*c
);
363 static void syncCommand(redisClient
*c
);
364 static void flushdbCommand(redisClient
*c
);
365 static void flushallCommand(redisClient
*c
);
366 static void sortCommand(redisClient
*c
);
367 static void lremCommand(redisClient
*c
);
368 static void infoCommand(redisClient
*c
);
369 static void mgetCommand(redisClient
*c
);
370 static void monitorCommand(redisClient
*c
);
371 static void expireCommand(redisClient
*c
);
372 static void getSetCommand(redisClient
*c
);
373 static void ttlCommand(redisClient
*c
);
374 static void slaveofCommand(redisClient
*c
);
375 static void debugCommand(redisClient
*c
);
377 /*================================= Globals ================================= */
380 static struct redisServer server
; /* server global state */
381 static struct redisCommand cmdTable
[] = {
382 {"get",getCommand
,2,REDIS_CMD_INLINE
},
383 {"set",setCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
384 {"setnx",setnxCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
385 {"del",delCommand
,-2,REDIS_CMD_INLINE
},
386 {"exists",existsCommand
,2,REDIS_CMD_INLINE
},
387 {"incr",incrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
388 {"decr",decrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
389 {"mget",mgetCommand
,-2,REDIS_CMD_INLINE
},
390 {"rpush",rpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
391 {"lpush",lpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
392 {"rpop",rpopCommand
,2,REDIS_CMD_INLINE
},
393 {"lpop",lpopCommand
,2,REDIS_CMD_INLINE
},
394 {"llen",llenCommand
,2,REDIS_CMD_INLINE
},
395 {"lindex",lindexCommand
,3,REDIS_CMD_INLINE
},
396 {"lset",lsetCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
397 {"lrange",lrangeCommand
,4,REDIS_CMD_INLINE
},
398 {"ltrim",ltrimCommand
,4,REDIS_CMD_INLINE
},
399 {"lrem",lremCommand
,4,REDIS_CMD_BULK
},
400 {"sadd",saddCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
401 {"srem",sremCommand
,3,REDIS_CMD_BULK
},
402 {"smove",smoveCommand
,4,REDIS_CMD_BULK
},
403 {"sismember",sismemberCommand
,3,REDIS_CMD_BULK
},
404 {"scard",scardCommand
,2,REDIS_CMD_INLINE
},
405 {"sinter",sinterCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
406 {"sinterstore",sinterstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
407 {"sunion",sunionCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
408 {"sunionstore",sunionstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
409 {"sdiff",sdiffCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
410 {"sdiffstore",sdiffstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
411 {"smembers",sinterCommand
,2,REDIS_CMD_INLINE
},
412 {"incrby",incrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
413 {"decrby",decrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
414 {"getset",getSetCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
415 {"randomkey",randomkeyCommand
,1,REDIS_CMD_INLINE
},
416 {"select",selectCommand
,2,REDIS_CMD_INLINE
},
417 {"move",moveCommand
,3,REDIS_CMD_INLINE
},
418 {"rename",renameCommand
,3,REDIS_CMD_INLINE
},
419 {"renamenx",renamenxCommand
,3,REDIS_CMD_INLINE
},
420 {"expire",expireCommand
,3,REDIS_CMD_INLINE
},
421 {"keys",keysCommand
,2,REDIS_CMD_INLINE
},
422 {"dbsize",dbsizeCommand
,1,REDIS_CMD_INLINE
},
423 {"auth",authCommand
,2,REDIS_CMD_INLINE
},
424 {"ping",pingCommand
,1,REDIS_CMD_INLINE
},
425 {"echo",echoCommand
,2,REDIS_CMD_BULK
},
426 {"save",saveCommand
,1,REDIS_CMD_INLINE
},
427 {"bgsave",bgsaveCommand
,1,REDIS_CMD_INLINE
},
428 {"shutdown",shutdownCommand
,1,REDIS_CMD_INLINE
},
429 {"lastsave",lastsaveCommand
,1,REDIS_CMD_INLINE
},
430 {"type",typeCommand
,2,REDIS_CMD_INLINE
},
431 {"sync",syncCommand
,1,REDIS_CMD_INLINE
},
432 {"flushdb",flushdbCommand
,1,REDIS_CMD_INLINE
},
433 {"flushall",flushallCommand
,1,REDIS_CMD_INLINE
},
434 {"sort",sortCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
435 {"info",infoCommand
,1,REDIS_CMD_INLINE
},
436 {"monitor",monitorCommand
,1,REDIS_CMD_INLINE
},
437 {"ttl",ttlCommand
,2,REDIS_CMD_INLINE
},
438 {"slaveof",slaveofCommand
,3,REDIS_CMD_INLINE
},
439 {"debug",debugCommand
,-2,REDIS_CMD_INLINE
},
443 /*============================ Utility functions ============================ */
445 /* Glob-style pattern matching. */
446 int stringmatchlen(const char *pattern
, int patternLen
,
447 const char *string
, int stringLen
, int nocase
)
452 while (pattern
[1] == '*') {
457 return 1; /* match */
459 if (stringmatchlen(pattern
+1, patternLen
-1,
460 string
, stringLen
, nocase
))
461 return 1; /* match */
465 return 0; /* no match */
469 return 0; /* no match */
479 not = pattern
[0] == '^';
486 if (pattern
[0] == '\\') {
489 if (pattern
[0] == string
[0])
491 } else if (pattern
[0] == ']') {
493 } else if (patternLen
== 0) {
497 } else if (pattern
[1] == '-' && patternLen
>= 3) {
498 int start
= pattern
[0];
499 int end
= pattern
[2];
507 start
= tolower(start
);
513 if (c
>= start
&& c
<= end
)
517 if (pattern
[0] == string
[0])
520 if (tolower((int)pattern
[0]) == tolower((int)string
[0]))
530 return 0; /* no match */
536 if (patternLen
>= 2) {
543 if (pattern
[0] != string
[0])
544 return 0; /* no match */
546 if (tolower((int)pattern
[0]) != tolower((int)string
[0]))
547 return 0; /* no match */
555 if (stringLen
== 0) {
556 while(*pattern
== '*') {
563 if (patternLen
== 0 && stringLen
== 0)
568 void redisLog(int level
, const char *fmt
, ...)
573 fp
= (server
.logfile
== NULL
) ? stdout
: fopen(server
.logfile
,"a");
577 if (level
>= server
.verbosity
) {
583 strftime(buf
,64,"%d %b %H:%M:%S",gmtime(&now
));
584 fprintf(fp
,"%s %c ",buf
,c
[level
]);
585 vfprintf(fp
, fmt
, ap
);
591 if (server
.logfile
) fclose(fp
);
594 /*====================== Hash table type implementation ==================== */
596 /* This is an hash table type that uses the SDS dynamic strings libary as
597 * keys and radis objects as values (objects can hold SDS strings,
600 static int sdsDictKeyCompare(void *privdata
, const void *key1
,
604 DICT_NOTUSED(privdata
);
606 l1
= sdslen((sds
)key1
);
607 l2
= sdslen((sds
)key2
);
608 if (l1
!= l2
) return 0;
609 return memcmp(key1
, key2
, l1
) == 0;
612 static void dictRedisObjectDestructor(void *privdata
, void *val
)
614 DICT_NOTUSED(privdata
);
619 static int dictSdsKeyCompare(void *privdata
, const void *key1
,
622 const robj
*o1
= key1
, *o2
= key2
;
623 return sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
626 static unsigned int dictSdsHash(const void *key
) {
628 return dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
631 static dictType setDictType
= {
632 dictSdsHash
, /* hash function */
635 dictSdsKeyCompare
, /* key compare */
636 dictRedisObjectDestructor
, /* key destructor */
637 NULL
/* val destructor */
640 static dictType hashDictType
= {
641 dictSdsHash
, /* hash function */
644 dictSdsKeyCompare
, /* key compare */
645 dictRedisObjectDestructor
, /* key destructor */
646 dictRedisObjectDestructor
/* val destructor */
649 /* ========================= Random utility functions ======================= */
651 /* Redis generally does not try to recover from out of memory conditions
652 * when allocating objects or strings, it is not clear if it will be possible
653 * to report this condition to the client since the networking layer itself
654 * is based on heap allocation for send buffers, so we simply abort.
655 * At least the code will be simpler to read... */
656 static void oom(const char *msg
) {
657 fprintf(stderr
, "%s: Out of memory\n",msg
);
663 /* ====================== Redis server networking stuff ===================== */
664 void closeTimedoutClients(void) {
667 time_t now
= time(NULL
);
669 listRewind(server
.clients
);
670 while ((ln
= listYield(server
.clients
)) != NULL
) {
671 c
= listNodeValue(ln
);
672 if (!(c
->flags
& REDIS_SLAVE
) && /* no timeout for slaves */
673 !(c
->flags
& REDIS_MASTER
) && /* no timeout for masters */
674 (now
- c
->lastinteraction
> server
.maxidletime
)) {
675 redisLog(REDIS_DEBUG
,"Closing idle client");
681 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
682 * we resize the hash table to save memory */
683 void tryResizeHashTables(void) {
686 for (j
= 0; j
< server
.dbnum
; j
++) {
687 long long size
, used
;
689 size
= dictSlots(server
.db
[j
].dict
);
690 used
= dictSize(server
.db
[j
].dict
);
691 if (size
&& used
&& size
> REDIS_HT_MINSLOTS
&&
692 (used
*100/size
< REDIS_HT_MINFILL
)) {
693 redisLog(REDIS_NOTICE
,"The hash table %d is too sparse, resize it...",j
);
694 dictResize(server
.db
[j
].dict
);
695 redisLog(REDIS_NOTICE
,"Hash table %d resized.",j
);
700 int serverCron(struct aeEventLoop
*eventLoop
, long long id
, void *clientData
) {
701 int j
, loops
= server
.cronloops
++;
702 REDIS_NOTUSED(eventLoop
);
704 REDIS_NOTUSED(clientData
);
706 /* Update the global state with the amount of used memory */
707 server
.usedmemory
= zmalloc_used_memory();
709 /* Show some info about non-empty databases */
710 for (j
= 0; j
< server
.dbnum
; j
++) {
711 long long size
, used
, vkeys
;
713 size
= dictSlots(server
.db
[j
].dict
);
714 used
= dictSize(server
.db
[j
].dict
);
715 vkeys
= dictSize(server
.db
[j
].expires
);
716 if (!(loops
% 5) && used
> 0) {
717 redisLog(REDIS_DEBUG
,"DB %d: %d keys (%d volatile) in %d slots HT.",j
,used
,vkeys
,size
);
718 /* dictPrintStats(server.dict); */
722 /* We don't want to resize the hash tables while a bacground saving
723 * is in progress: the saving child is created using fork() that is
724 * implemented with a copy-on-write semantic in most modern systems, so
725 * if we resize the HT while there is the saving child at work actually
726 * a lot of memory movements in the parent will cause a lot of pages
728 if (!server
.bgsaveinprogress
) tryResizeHashTables();
730 /* Show information about connected clients */
732 redisLog(REDIS_DEBUG
,"%d clients connected (%d slaves), %zu bytes in use",
733 listLength(server
.clients
)-listLength(server
.slaves
),
734 listLength(server
.slaves
),
736 dictSize(server
.sharingpool
));
739 /* Close connections of timedout clients */
740 if (server
.maxidletime
&& !(loops
% 10))
741 closeTimedoutClients();
743 /* Check if a background saving in progress terminated */
744 if (server
.bgsaveinprogress
) {
746 /* XXX: TODO handle the case of the saving child killed */
747 if (wait4(-1,&statloc
,WNOHANG
,NULL
)) {
748 int exitcode
= WEXITSTATUS(statloc
);
750 redisLog(REDIS_NOTICE
,
751 "Background saving terminated with success");
753 server
.lastsave
= time(NULL
);
755 redisLog(REDIS_WARNING
,
756 "Background saving error");
758 server
.bgsaveinprogress
= 0;
759 updateSalvesWaitingBgsave(exitcode
== 0 ? REDIS_OK
: REDIS_ERR
);
762 /* If there is not a background saving in progress check if
763 * we have to save now */
764 time_t now
= time(NULL
);
765 for (j
= 0; j
< server
.saveparamslen
; j
++) {
766 struct saveparam
*sp
= server
.saveparams
+j
;
768 if (server
.dirty
>= sp
->changes
&&
769 now
-server
.lastsave
> sp
->seconds
) {
770 redisLog(REDIS_NOTICE
,"%d changes in %d seconds. Saving...",
771 sp
->changes
, sp
->seconds
);
772 rdbSaveBackground(server
.dbfilename
);
778 /* Try to expire a few timed out keys */
779 for (j
= 0; j
< server
.dbnum
; j
++) {
780 redisDb
*db
= server
.db
+j
;
781 int num
= dictSize(db
->expires
);
784 time_t now
= time(NULL
);
786 if (num
> REDIS_EXPIRELOOKUPS_PER_CRON
)
787 num
= REDIS_EXPIRELOOKUPS_PER_CRON
;
792 if ((de
= dictGetRandomKey(db
->expires
)) == NULL
) break;
793 t
= (time_t) dictGetEntryVal(de
);
795 deleteKey(db
,dictGetEntryKey(de
));
801 /* Check if we should connect to a MASTER */
802 if (server
.replstate
== REDIS_REPL_CONNECT
) {
803 redisLog(REDIS_NOTICE
,"Connecting to MASTER...");
804 if (syncWithMaster() == REDIS_OK
) {
805 redisLog(REDIS_NOTICE
,"MASTER <-> SLAVE sync succeeded");
811 static void createSharedObjects(void) {
812 shared
.crlf
= createObject(REDIS_STRING
,sdsnew("\r\n"));
813 shared
.ok
= createObject(REDIS_STRING
,sdsnew("+OK\r\n"));
814 shared
.err
= createObject(REDIS_STRING
,sdsnew("-ERR\r\n"));
815 shared
.emptybulk
= createObject(REDIS_STRING
,sdsnew("$0\r\n\r\n"));
816 shared
.czero
= createObject(REDIS_STRING
,sdsnew(":0\r\n"));
817 shared
.cone
= createObject(REDIS_STRING
,sdsnew(":1\r\n"));
818 shared
.nullbulk
= createObject(REDIS_STRING
,sdsnew("$-1\r\n"));
819 shared
.nullmultibulk
= createObject(REDIS_STRING
,sdsnew("*-1\r\n"));
820 shared
.emptymultibulk
= createObject(REDIS_STRING
,sdsnew("*0\r\n"));
822 shared
.pong
= createObject(REDIS_STRING
,sdsnew("+PONG\r\n"));
823 shared
.wrongtypeerr
= createObject(REDIS_STRING
,sdsnew(
824 "-ERR Operation against a key holding the wrong kind of value\r\n"));
825 shared
.nokeyerr
= createObject(REDIS_STRING
,sdsnew(
826 "-ERR no such key\r\n"));
827 shared
.syntaxerr
= createObject(REDIS_STRING
,sdsnew(
828 "-ERR syntax error\r\n"));
829 shared
.sameobjecterr
= createObject(REDIS_STRING
,sdsnew(
830 "-ERR source and destination objects are the same\r\n"));
831 shared
.outofrangeerr
= createObject(REDIS_STRING
,sdsnew(
832 "-ERR index out of range\r\n"));
833 shared
.space
= createObject(REDIS_STRING
,sdsnew(" "));
834 shared
.colon
= createObject(REDIS_STRING
,sdsnew(":"));
835 shared
.plus
= createObject(REDIS_STRING
,sdsnew("+"));
836 shared
.select0
= createStringObject("select 0\r\n",10);
837 shared
.select1
= createStringObject("select 1\r\n",10);
838 shared
.select2
= createStringObject("select 2\r\n",10);
839 shared
.select3
= createStringObject("select 3\r\n",10);
840 shared
.select4
= createStringObject("select 4\r\n",10);
841 shared
.select5
= createStringObject("select 5\r\n",10);
842 shared
.select6
= createStringObject("select 6\r\n",10);
843 shared
.select7
= createStringObject("select 7\r\n",10);
844 shared
.select8
= createStringObject("select 8\r\n",10);
845 shared
.select9
= createStringObject("select 9\r\n",10);
848 static void appendServerSaveParams(time_t seconds
, int changes
) {
849 server
.saveparams
= zrealloc(server
.saveparams
,sizeof(struct saveparam
)*(server
.saveparamslen
+1));
850 if (server
.saveparams
== NULL
) oom("appendServerSaveParams");
851 server
.saveparams
[server
.saveparamslen
].seconds
= seconds
;
852 server
.saveparams
[server
.saveparamslen
].changes
= changes
;
853 server
.saveparamslen
++;
856 static void ResetServerSaveParams() {
857 zfree(server
.saveparams
);
858 server
.saveparams
= NULL
;
859 server
.saveparamslen
= 0;
862 static void initServerConfig() {
863 server
.dbnum
= REDIS_DEFAULT_DBNUM
;
864 server
.port
= REDIS_SERVERPORT
;
865 server
.verbosity
= REDIS_DEBUG
;
866 server
.maxidletime
= REDIS_MAXIDLETIME
;
867 server
.saveparams
= NULL
;
868 server
.logfile
= NULL
; /* NULL = log on standard output */
869 server
.bindaddr
= NULL
;
870 server
.glueoutputbuf
= 1;
871 server
.daemonize
= 0;
872 server
.pidfile
= "/var/run/redis.pid";
873 server
.dbfilename
= "dump.rdb";
874 server
.requirepass
= NULL
;
875 server
.shareobjects
= 0;
876 server
.maxclients
= 0;
877 server
.maxmemory
= 0;
878 ResetServerSaveParams();
880 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
881 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
882 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
883 /* Replication related */
885 server
.masterhost
= NULL
;
886 server
.masterport
= 6379;
887 server
.master
= NULL
;
888 server
.replstate
= REDIS_REPL_NONE
;
891 static void initServer() {
894 signal(SIGHUP
, SIG_IGN
);
895 signal(SIGPIPE
, SIG_IGN
);
897 server
.clients
= listCreate();
898 server
.slaves
= listCreate();
899 server
.monitors
= listCreate();
900 server
.objfreelist
= listCreate();
901 createSharedObjects();
902 server
.el
= aeCreateEventLoop();
903 server
.db
= zmalloc(sizeof(redisDb
)*server
.dbnum
);
904 server
.sharingpool
= dictCreate(&setDictType
,NULL
);
905 server
.sharingpoolsize
= 1024;
906 if (!server
.db
|| !server
.clients
|| !server
.slaves
|| !server
.monitors
|| !server
.el
|| !server
.objfreelist
)
907 oom("server initialization"); /* Fatal OOM */
908 server
.fd
= anetTcpServer(server
.neterr
, server
.port
, server
.bindaddr
);
909 if (server
.fd
== -1) {
910 redisLog(REDIS_WARNING
, "Opening TCP port: %s", server
.neterr
);
913 for (j
= 0; j
< server
.dbnum
; j
++) {
914 server
.db
[j
].dict
= dictCreate(&hashDictType
,NULL
);
915 server
.db
[j
].expires
= dictCreate(&setDictType
,NULL
);
918 server
.cronloops
= 0;
919 server
.bgsaveinprogress
= 0;
920 server
.lastsave
= time(NULL
);
922 server
.usedmemory
= 0;
923 server
.stat_numcommands
= 0;
924 server
.stat_numconnections
= 0;
925 server
.stat_starttime
= time(NULL
);
926 aeCreateTimeEvent(server
.el
, 1000, serverCron
, NULL
, NULL
);
929 /* Empty the whole database */
930 static long long emptyDb() {
932 long long removed
= 0;
934 for (j
= 0; j
< server
.dbnum
; j
++) {
935 removed
+= dictSize(server
.db
[j
].dict
);
936 dictEmpty(server
.db
[j
].dict
);
937 dictEmpty(server
.db
[j
].expires
);
942 static int yesnotoi(char *s
) {
943 if (!strcasecmp(s
,"yes")) return 1;
944 else if (!strcasecmp(s
,"no")) return 0;
948 /* I agree, this is a very rudimental way to load a configuration...
949 will improve later if the config gets more complex */
950 static void loadServerConfig(char *filename
) {
951 FILE *fp
= fopen(filename
,"r");
952 char buf
[REDIS_CONFIGLINE_MAX
+1], *err
= NULL
;
957 redisLog(REDIS_WARNING
,"Fatal error, can't open config file");
960 while(fgets(buf
,REDIS_CONFIGLINE_MAX
+1,fp
) != NULL
) {
966 line
= sdstrim(line
," \t\r\n");
968 /* Skip comments and blank lines*/
969 if (line
[0] == '#' || line
[0] == '\0') {
974 /* Split into arguments */
975 argv
= sdssplitlen(line
,sdslen(line
)," ",1,&argc
);
978 /* Execute config directives */
979 if (!strcasecmp(argv
[0],"timeout") && argc
== 2) {
980 server
.maxidletime
= atoi(argv
[1]);
981 if (server
.maxidletime
< 0) {
982 err
= "Invalid timeout value"; goto loaderr
;
984 } else if (!strcasecmp(argv
[0],"port") && argc
== 2) {
985 server
.port
= atoi(argv
[1]);
986 if (server
.port
< 1 || server
.port
> 65535) {
987 err
= "Invalid port"; goto loaderr
;
989 } else if (!strcasecmp(argv
[0],"bind") && argc
== 2) {
990 server
.bindaddr
= zstrdup(argv
[1]);
991 } else if (!strcasecmp(argv
[0],"save") && argc
== 3) {
992 int seconds
= atoi(argv
[1]);
993 int changes
= atoi(argv
[2]);
994 if (seconds
< 1 || changes
< 0) {
995 err
= "Invalid save parameters"; goto loaderr
;
997 appendServerSaveParams(seconds
,changes
);
998 } else if (!strcasecmp(argv
[0],"dir") && argc
== 2) {
999 if (chdir(argv
[1]) == -1) {
1000 redisLog(REDIS_WARNING
,"Can't chdir to '%s': %s",
1001 argv
[1], strerror(errno
));
1004 } else if (!strcasecmp(argv
[0],"loglevel") && argc
== 2) {
1005 if (!strcasecmp(argv
[1],"debug")) server
.verbosity
= REDIS_DEBUG
;
1006 else if (!strcasecmp(argv
[1],"notice")) server
.verbosity
= REDIS_NOTICE
;
1007 else if (!strcasecmp(argv
[1],"warning")) server
.verbosity
= REDIS_WARNING
;
1009 err
= "Invalid log level. Must be one of debug, notice, warning";
1012 } else if (!strcasecmp(argv
[0],"logfile") && argc
== 2) {
1015 server
.logfile
= zstrdup(argv
[1]);
1016 if (!strcasecmp(server
.logfile
,"stdout")) {
1017 zfree(server
.logfile
);
1018 server
.logfile
= NULL
;
1020 if (server
.logfile
) {
1021 /* Test if we are able to open the file. The server will not
1022 * be able to abort just for this problem later... */
1023 fp
= fopen(server
.logfile
,"a");
1025 err
= sdscatprintf(sdsempty(),
1026 "Can't open the log file: %s", strerror(errno
));
1031 } else if (!strcasecmp(argv
[0],"databases") && argc
== 2) {
1032 server
.dbnum
= atoi(argv
[1]);
1033 if (server
.dbnum
< 1) {
1034 err
= "Invalid number of databases"; goto loaderr
;
1036 } else if (!strcasecmp(argv
[0],"maxclients") && argc
== 2) {
1037 server
.maxclients
= atoi(argv
[1]);
1038 } else if (!strcasecmp(argv
[0],"maxmemory") && argc
== 2) {
1039 server
.maxmemory
= atoi(argv
[1]);
1040 } else if (!strcasecmp(argv
[0],"slaveof") && argc
== 3) {
1041 server
.masterhost
= sdsnew(argv
[1]);
1042 server
.masterport
= atoi(argv
[2]);
1043 server
.replstate
= REDIS_REPL_CONNECT
;
1044 } else if (!strcasecmp(argv
[0],"glueoutputbuf") && argc
== 2) {
1045 if ((server
.glueoutputbuf
= yesnotoi(argv
[1])) == -1) {
1046 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1048 } else if (!strcasecmp(argv
[0],"shareobjects") && argc
== 2) {
1049 if ((server
.shareobjects
= yesnotoi(argv
[1])) == -1) {
1050 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1052 } else if (!strcasecmp(argv
[0],"shareobjectspoolsize") && argc
== 2) {
1053 server
.sharingpoolsize
= atoi(argv
[1]);
1054 if (server
.sharingpoolsize
< 1) {
1055 err
= "invalid object sharing pool size"; goto loaderr
;
1057 } else if (!strcasecmp(argv
[0],"daemonize") && argc
== 2) {
1058 if ((server
.daemonize
= yesnotoi(argv
[1])) == -1) {
1059 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1061 } else if (!strcasecmp(argv
[0],"requirepass") && argc
== 2) {
1062 server
.requirepass
= zstrdup(argv
[1]);
1063 } else if (!strcasecmp(argv
[0],"pidfile") && argc
== 2) {
1064 server
.pidfile
= zstrdup(argv
[1]);
1065 } else if (!strcasecmp(argv
[0],"dbfilename") && argc
== 2) {
1066 server
.dbfilename
= zstrdup(argv
[1]);
1068 err
= "Bad directive or wrong number of arguments"; goto loaderr
;
1070 for (j
= 0; j
< argc
; j
++)
1079 fprintf(stderr
, "\n*** FATAL CONFIG FILE ERROR ***\n");
1080 fprintf(stderr
, "Reading the configuration file, at line %d\n", linenum
);
1081 fprintf(stderr
, ">>> '%s'\n", line
);
1082 fprintf(stderr
, "%s\n", err
);
1086 static void freeClientArgv(redisClient
*c
) {
1089 for (j
= 0; j
< c
->argc
; j
++)
1090 decrRefCount(c
->argv
[j
]);
1094 static void freeClient(redisClient
*c
) {
1097 aeDeleteFileEvent(server
.el
,c
->fd
,AE_READABLE
);
1098 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1099 sdsfree(c
->querybuf
);
1100 listRelease(c
->reply
);
1103 ln
= listSearchKey(server
.clients
,c
);
1105 listDelNode(server
.clients
,ln
);
1106 if (c
->flags
& REDIS_SLAVE
) {
1107 if (c
->replstate
== REDIS_REPL_SEND_BULK
&& c
->repldbfd
!= -1)
1109 list
*l
= (c
->flags
& REDIS_MONITOR
) ? server
.monitors
: server
.slaves
;
1110 ln
= listSearchKey(l
,c
);
1114 if (c
->flags
& REDIS_MASTER
) {
1115 server
.master
= NULL
;
1116 server
.replstate
= REDIS_REPL_CONNECT
;
1122 static void glueReplyBuffersIfNeeded(redisClient
*c
) {
1127 listRewind(c
->reply
);
1128 while((ln
= listYield(c
->reply
))) {
1130 totlen
+= sdslen(o
->ptr
);
1131 /* This optimization makes more sense if we don't have to copy
1133 if (totlen
> 1024) return;
1139 listRewind(c
->reply
);
1140 while((ln
= listYield(c
->reply
))) {
1142 memcpy(buf
+copylen
,o
->ptr
,sdslen(o
->ptr
));
1143 copylen
+= sdslen(o
->ptr
);
1144 listDelNode(c
->reply
,ln
);
1146 /* Now the output buffer is empty, add the new single element */
1147 o
= createObject(REDIS_STRING
,sdsnewlen(buf
,totlen
));
1148 if (!listAddNodeTail(c
->reply
,o
)) oom("listAddNodeTail");
1152 static void sendReplyToClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1153 redisClient
*c
= privdata
;
1154 int nwritten
= 0, totwritten
= 0, objlen
;
1157 REDIS_NOTUSED(mask
);
1159 if (server
.glueoutputbuf
&& listLength(c
->reply
) > 1)
1160 glueReplyBuffersIfNeeded(c
);
1161 while(listLength(c
->reply
)) {
1162 o
= listNodeValue(listFirst(c
->reply
));
1163 objlen
= sdslen(o
->ptr
);
1166 listDelNode(c
->reply
,listFirst(c
->reply
));
1170 if (c
->flags
& REDIS_MASTER
) {
1171 nwritten
= objlen
- c
->sentlen
;
1173 nwritten
= write(fd
, ((char*)o
->ptr
)+c
->sentlen
, objlen
- c
->sentlen
);
1174 if (nwritten
<= 0) break;
1176 c
->sentlen
+= nwritten
;
1177 totwritten
+= nwritten
;
1178 /* If we fully sent the object on head go to the next one */
1179 if (c
->sentlen
== objlen
) {
1180 listDelNode(c
->reply
,listFirst(c
->reply
));
1184 if (nwritten
== -1) {
1185 if (errno
== EAGAIN
) {
1188 redisLog(REDIS_DEBUG
,
1189 "Error writing to client: %s", strerror(errno
));
1194 if (totwritten
> 0) c
->lastinteraction
= time(NULL
);
1195 if (listLength(c
->reply
) == 0) {
1197 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1201 static struct redisCommand
*lookupCommand(char *name
) {
1203 while(cmdTable
[j
].name
!= NULL
) {
1204 if (!strcasecmp(name
,cmdTable
[j
].name
)) return &cmdTable
[j
];
1210 /* resetClient prepare the client to process the next command */
1211 static void resetClient(redisClient
*c
) {
1216 /* If this function gets called we already read a whole
1217 * command, argments are in the client argv/argc fields.
1218 * processCommand() execute the command or prepare the
1219 * server for a bulk read from the client.
1221 * If 1 is returned the client is still alive and valid and
1222 * and other operations can be performed by the caller. Otherwise
1223 * if 0 is returned the client was destroied (i.e. after QUIT). */
1224 static int processCommand(redisClient
*c
) {
1225 struct redisCommand
*cmd
;
1228 /* Free some memory if needed (maxmemory setting) */
1229 if (server
.maxmemory
) freeMemoryIfNeeded();
1231 /* The QUIT command is handled as a special case. Normal command
1232 * procs are unable to close the client connection safely */
1233 if (!strcasecmp(c
->argv
[0]->ptr
,"quit")) {
1237 cmd
= lookupCommand(c
->argv
[0]->ptr
);
1239 addReplySds(c
,sdsnew("-ERR unknown command\r\n"));
1242 } else if ((cmd
->arity
> 0 && cmd
->arity
!= c
->argc
) ||
1243 (c
->argc
< -cmd
->arity
)) {
1244 addReplySds(c
,sdsnew("-ERR wrong number of arguments\r\n"));
1247 } else if (server
.maxmemory
&& cmd
->flags
& REDIS_CMD_DENYOOM
&& zmalloc_used_memory() > server
.maxmemory
) {
1248 addReplySds(c
,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1251 } else if (cmd
->flags
& REDIS_CMD_BULK
&& c
->bulklen
== -1) {
1252 int bulklen
= atoi(c
->argv
[c
->argc
-1]->ptr
);
1254 decrRefCount(c
->argv
[c
->argc
-1]);
1255 if (bulklen
< 0 || bulklen
> 1024*1024*1024) {
1257 addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n"));
1262 c
->bulklen
= bulklen
+2; /* add two bytes for CR+LF */
1263 /* It is possible that the bulk read is already in the
1264 * buffer. Check this condition and handle it accordingly */
1265 if ((signed)sdslen(c
->querybuf
) >= c
->bulklen
) {
1266 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
1268 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
1273 /* Let's try to share objects on the command arguments vector */
1274 if (server
.shareobjects
) {
1276 for(j
= 1; j
< c
->argc
; j
++)
1277 c
->argv
[j
] = tryObjectSharing(c
->argv
[j
]);
1279 /* Check if the user is authenticated */
1280 if (server
.requirepass
&& !c
->authenticated
&& cmd
->proc
!= authCommand
) {
1281 addReplySds(c
,sdsnew("-ERR operation not permitted\r\n"));
1286 /* Exec the command */
1287 dirty
= server
.dirty
;
1289 if (server
.dirty
-dirty
!= 0 && listLength(server
.slaves
))
1290 replicationFeedSlaves(server
.slaves
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1291 if (listLength(server
.monitors
))
1292 replicationFeedSlaves(server
.monitors
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1293 server
.stat_numcommands
++;
1295 /* Prepare the client for the next command */
1296 if (c
->flags
& REDIS_CLOSE
) {
1304 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
) {
1308 /* (args*2)+1 is enough room for args, spaces, newlines */
1309 robj
*static_outv
[REDIS_STATIC_ARGS
*2+1];
1311 if (argc
<= REDIS_STATIC_ARGS
) {
1314 outv
= zmalloc(sizeof(robj
*)*(argc
*2+1));
1315 if (!outv
) oom("replicationFeedSlaves");
1318 for (j
= 0; j
< argc
; j
++) {
1319 if (j
!= 0) outv
[outc
++] = shared
.space
;
1320 if ((cmd
->flags
& REDIS_CMD_BULK
) && j
== argc
-1) {
1323 lenobj
= createObject(REDIS_STRING
,
1324 sdscatprintf(sdsempty(),"%d\r\n",sdslen(argv
[j
]->ptr
)));
1325 lenobj
->refcount
= 0;
1326 outv
[outc
++] = lenobj
;
1328 outv
[outc
++] = argv
[j
];
1330 outv
[outc
++] = shared
.crlf
;
1332 /* Increment all the refcounts at start and decrement at end in order to
1333 * be sure to free objects if there is no slave in a replication state
1334 * able to be feed with commands */
1335 for (j
= 0; j
< outc
; j
++) incrRefCount(outv
[j
]);
1337 while((ln
= listYield(slaves
))) {
1338 redisClient
*slave
= ln
->value
;
1340 /* Don't feed slaves that are still waiting for BGSAVE to start */
1341 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) continue;
1343 /* Feed all the other slaves, MONITORs and so on */
1344 if (slave
->slaveseldb
!= dictid
) {
1348 case 0: selectcmd
= shared
.select0
; break;
1349 case 1: selectcmd
= shared
.select1
; break;
1350 case 2: selectcmd
= shared
.select2
; break;
1351 case 3: selectcmd
= shared
.select3
; break;
1352 case 4: selectcmd
= shared
.select4
; break;
1353 case 5: selectcmd
= shared
.select5
; break;
1354 case 6: selectcmd
= shared
.select6
; break;
1355 case 7: selectcmd
= shared
.select7
; break;
1356 case 8: selectcmd
= shared
.select8
; break;
1357 case 9: selectcmd
= shared
.select9
; break;
1359 selectcmd
= createObject(REDIS_STRING
,
1360 sdscatprintf(sdsempty(),"select %d\r\n",dictid
));
1361 selectcmd
->refcount
= 0;
1364 addReply(slave
,selectcmd
);
1365 slave
->slaveseldb
= dictid
;
1367 for (j
= 0; j
< outc
; j
++) addReply(slave
,outv
[j
]);
1369 for (j
= 0; j
< outc
; j
++) decrRefCount(outv
[j
]);
1370 if (outv
!= static_outv
) zfree(outv
);
1373 static void readQueryFromClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1374 redisClient
*c
= (redisClient
*) privdata
;
1375 char buf
[REDIS_IOBUF_LEN
];
1378 REDIS_NOTUSED(mask
);
1380 nread
= read(fd
, buf
, REDIS_IOBUF_LEN
);
1382 if (errno
== EAGAIN
) {
1385 redisLog(REDIS_DEBUG
, "Reading from client: %s",strerror(errno
));
1389 } else if (nread
== 0) {
1390 redisLog(REDIS_DEBUG
, "Client closed connection");
1395 c
->querybuf
= sdscatlen(c
->querybuf
, buf
, nread
);
1396 c
->lastinteraction
= time(NULL
);
1402 if (c
->bulklen
== -1) {
1403 /* Read the first line of the query */
1404 char *p
= strchr(c
->querybuf
,'\n');
1410 query
= c
->querybuf
;
1411 c
->querybuf
= sdsempty();
1412 querylen
= 1+(p
-(query
));
1413 if (sdslen(query
) > querylen
) {
1414 /* leave data after the first line of the query in the buffer */
1415 c
->querybuf
= sdscatlen(c
->querybuf
,query
+querylen
,sdslen(query
)-querylen
);
1417 *p
= '\0'; /* remove "\n" */
1418 if (*(p
-1) == '\r') *(p
-1) = '\0'; /* and "\r" if any */
1419 sdsupdatelen(query
);
1421 /* Now we can split the query in arguments */
1422 if (sdslen(query
) == 0) {
1423 /* Ignore empty query */
1427 argv
= sdssplitlen(query
,sdslen(query
)," ",1,&argc
);
1428 if (argv
== NULL
) oom("sdssplitlen");
1431 if (c
->argv
) zfree(c
->argv
);
1432 c
->argv
= zmalloc(sizeof(robj
*)*argc
);
1433 if (c
->argv
== NULL
) oom("allocating arguments list for client");
1435 for (j
= 0; j
< argc
; j
++) {
1436 if (sdslen(argv
[j
])) {
1437 c
->argv
[c
->argc
] = createObject(REDIS_STRING
,argv
[j
]);
1444 /* Execute the command. If the client is still valid
1445 * after processCommand() return and there is something
1446 * on the query buffer try to process the next command. */
1447 if (processCommand(c
) && sdslen(c
->querybuf
)) goto again
;
1449 } else if (sdslen(c
->querybuf
) >= 1024*32) {
1450 redisLog(REDIS_DEBUG
, "Client protocol error");
1455 /* Bulk read handling. Note that if we are at this point
1456 the client already sent a command terminated with a newline,
1457 we are reading the bulk data that is actually the last
1458 argument of the command. */
1459 int qbl
= sdslen(c
->querybuf
);
1461 if (c
->bulklen
<= qbl
) {
1462 /* Copy everything but the final CRLF as final argument */
1463 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
1465 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
1472 static int selectDb(redisClient
*c
, int id
) {
1473 if (id
< 0 || id
>= server
.dbnum
)
1475 c
->db
= &server
.db
[id
];
1479 static void *dupClientReplyValue(void *o
) {
1480 incrRefCount((robj
*)o
);
1484 static redisClient
*createClient(int fd
) {
1485 redisClient
*c
= zmalloc(sizeof(*c
));
1487 anetNonBlock(NULL
,fd
);
1488 anetTcpNoDelay(NULL
,fd
);
1489 if (!c
) return NULL
;
1492 c
->querybuf
= sdsempty();
1498 c
->lastinteraction
= time(NULL
);
1499 c
->authenticated
= 0;
1500 c
->replstate
= REDIS_REPL_NONE
;
1501 if ((c
->reply
= listCreate()) == NULL
) oom("listCreate");
1502 listSetFreeMethod(c
->reply
,decrRefCount
);
1503 listSetDupMethod(c
->reply
,dupClientReplyValue
);
1504 if (aeCreateFileEvent(server
.el
, c
->fd
, AE_READABLE
,
1505 readQueryFromClient
, c
, NULL
) == AE_ERR
) {
1509 if (!listAddNodeTail(server
.clients
,c
)) oom("listAddNodeTail");
1513 static void addReply(redisClient
*c
, robj
*obj
) {
1514 if (listLength(c
->reply
) == 0 &&
1515 (c
->replstate
== REDIS_REPL_NONE
||
1516 c
->replstate
== REDIS_REPL_ONLINE
) &&
1517 aeCreateFileEvent(server
.el
, c
->fd
, AE_WRITABLE
,
1518 sendReplyToClient
, c
, NULL
) == AE_ERR
) return;
1519 if (!listAddNodeTail(c
->reply
,obj
)) oom("listAddNodeTail");
1523 static void addReplySds(redisClient
*c
, sds s
) {
1524 robj
*o
= createObject(REDIS_STRING
,s
);
1529 static void acceptHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1534 REDIS_NOTUSED(mask
);
1535 REDIS_NOTUSED(privdata
);
1537 cfd
= anetAccept(server
.neterr
, fd
, cip
, &cport
);
1538 if (cfd
== AE_ERR
) {
1539 redisLog(REDIS_DEBUG
,"Accepting client connection: %s", server
.neterr
);
1542 redisLog(REDIS_DEBUG
,"Accepted %s:%d", cip
, cport
);
1543 if ((c
= createClient(cfd
)) == NULL
) {
1544 redisLog(REDIS_WARNING
,"Error allocating resoures for the client");
1545 close(cfd
); /* May be already closed, just ingore errors */
1548 /* If maxclient directive is set and this is one client more... close the
1549 * connection. Note that we create the client instead to check before
1550 * for this condition, since now the socket is already set in nonblocking
1551 * mode and we can send an error for free using the Kernel I/O */
1552 if (server
.maxclients
&& listLength(server
.clients
) > server
.maxclients
) {
1553 char *err
= "-ERR max number of clients reached\r\n";
1555 /* That's a best effort error message, don't check write errors */
1556 (void) write(c
->fd
,err
,strlen(err
));
1560 server
.stat_numconnections
++;
1563 /* ======================= Redis objects implementation ===================== */
1565 static robj
*createObject(int type
, void *ptr
) {
1568 if (listLength(server
.objfreelist
)) {
1569 listNode
*head
= listFirst(server
.objfreelist
);
1570 o
= listNodeValue(head
);
1571 listDelNode(server
.objfreelist
,head
);
1573 o
= zmalloc(sizeof(*o
));
1575 if (!o
) oom("createObject");
1582 static robj
*createStringObject(char *ptr
, size_t len
) {
1583 return createObject(REDIS_STRING
,sdsnewlen(ptr
,len
));
1586 static robj
*createListObject(void) {
1587 list
*l
= listCreate();
1589 if (!l
) oom("listCreate");
1590 listSetFreeMethod(l
,decrRefCount
);
1591 return createObject(REDIS_LIST
,l
);
1594 static robj
*createSetObject(void) {
1595 dict
*d
= dictCreate(&setDictType
,NULL
);
1596 if (!d
) oom("dictCreate");
1597 return createObject(REDIS_SET
,d
);
1600 static void freeStringObject(robj
*o
) {
1604 static void freeListObject(robj
*o
) {
1605 listRelease((list
*) o
->ptr
);
1608 static void freeSetObject(robj
*o
) {
1609 dictRelease((dict
*) o
->ptr
);
1612 static void freeHashObject(robj
*o
) {
1613 dictRelease((dict
*) o
->ptr
);
1616 static void incrRefCount(robj
*o
) {
1618 #ifdef DEBUG_REFCOUNT
1619 if (o
->type
== REDIS_STRING
)
1620 printf("Increment '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
);
1624 static void decrRefCount(void *obj
) {
1627 #ifdef DEBUG_REFCOUNT
1628 if (o
->type
== REDIS_STRING
)
1629 printf("Decrement '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
-1);
1631 if (--(o
->refcount
) == 0) {
1633 case REDIS_STRING
: freeStringObject(o
); break;
1634 case REDIS_LIST
: freeListObject(o
); break;
1635 case REDIS_SET
: freeSetObject(o
); break;
1636 case REDIS_HASH
: freeHashObject(o
); break;
1637 default: assert(0 != 0); break;
1639 if (listLength(server
.objfreelist
) > REDIS_OBJFREELIST_MAX
||
1640 !listAddNodeHead(server
.objfreelist
,o
))
1645 /* Try to share an object against the shared objects pool */
1646 static robj
*tryObjectSharing(robj
*o
) {
1647 struct dictEntry
*de
;
1650 if (o
== NULL
|| server
.shareobjects
== 0) return o
;
1652 assert(o
->type
== REDIS_STRING
);
1653 de
= dictFind(server
.sharingpool
,o
);
1655 robj
*shared
= dictGetEntryKey(de
);
1657 c
= ((unsigned long) dictGetEntryVal(de
))+1;
1658 dictGetEntryVal(de
) = (void*) c
;
1659 incrRefCount(shared
);
1663 /* Here we are using a stream algorihtm: Every time an object is
1664 * shared we increment its count, everytime there is a miss we
1665 * recrement the counter of a random object. If this object reaches
1666 * zero we remove the object and put the current object instead. */
1667 if (dictSize(server
.sharingpool
) >=
1668 server
.sharingpoolsize
) {
1669 de
= dictGetRandomKey(server
.sharingpool
);
1671 c
= ((unsigned long) dictGetEntryVal(de
))-1;
1672 dictGetEntryVal(de
) = (void*) c
;
1674 dictDelete(server
.sharingpool
,de
->key
);
1677 c
= 0; /* If the pool is empty we want to add this object */
1682 retval
= dictAdd(server
.sharingpool
,o
,(void*)1);
1683 assert(retval
== DICT_OK
);
1690 static robj
*lookupKey(redisDb
*db
, robj
*key
) {
1691 dictEntry
*de
= dictFind(db
->dict
,key
);
1692 return de
? dictGetEntryVal(de
) : NULL
;
1695 static robj
*lookupKeyRead(redisDb
*db
, robj
*key
) {
1696 expireIfNeeded(db
,key
);
1697 return lookupKey(db
,key
);
1700 static robj
*lookupKeyWrite(redisDb
*db
, robj
*key
) {
1701 deleteIfVolatile(db
,key
);
1702 return lookupKey(db
,key
);
1705 static int deleteKey(redisDb
*db
, robj
*key
) {
1708 /* We need to protect key from destruction: after the first dictDelete()
1709 * it may happen that 'key' is no longer valid if we don't increment
1710 * it's count. This may happen when we get the object reference directly
1711 * from the hash table with dictRandomKey() or dict iterators */
1713 if (dictSize(db
->expires
)) dictDelete(db
->expires
,key
);
1714 retval
= dictDelete(db
->dict
,key
);
1717 return retval
== DICT_OK
;
1720 /*============================ DB saving/loading ============================ */
1722 static int rdbSaveType(FILE *fp
, unsigned char type
) {
1723 if (fwrite(&type
,1,1,fp
) == 0) return -1;
1727 static int rdbSaveTime(FILE *fp
, time_t t
) {
1728 int32_t t32
= (int32_t) t
;
1729 if (fwrite(&t32
,4,1,fp
) == 0) return -1;
1733 /* check rdbLoadLen() comments for more info */
1734 static int rdbSaveLen(FILE *fp
, uint32_t len
) {
1735 unsigned char buf
[2];
1738 /* Save a 6 bit len */
1739 buf
[0] = (len
&0xFF)|(REDIS_RDB_6BITLEN
<<6);
1740 if (fwrite(buf
,1,1,fp
) == 0) return -1;
1741 } else if (len
< (1<<14)) {
1742 /* Save a 14 bit len */
1743 buf
[0] = ((len
>>8)&0xFF)|(REDIS_RDB_14BITLEN
<<6);
1745 if (fwrite(buf
,2,1,fp
) == 0) return -1;
1747 /* Save a 32 bit len */
1748 buf
[0] = (REDIS_RDB_32BITLEN
<<6);
1749 if (fwrite(buf
,1,1,fp
) == 0) return -1;
1751 if (fwrite(&len
,4,1,fp
) == 0) return -1;
1756 /* String objects in the form "2391" "-100" without any space and with a
1757 * range of values that can fit in an 8, 16 or 32 bit signed value can be
1758 * encoded as integers to save space */
1759 int rdbTryIntegerEncoding(sds s
, unsigned char *enc
) {
1761 char *endptr
, buf
[32];
1763 /* Check if it's possible to encode this value as a number */
1764 value
= strtoll(s
, &endptr
, 10);
1765 if (endptr
[0] != '\0') return 0;
1766 snprintf(buf
,32,"%lld",value
);
1768 /* If the number converted back into a string is not identical
1769 * then it's not possible to encode the string as integer */
1770 if (strlen(buf
) != sdslen(s
) || memcmp(buf
,s
,sdslen(s
))) return 0;
1772 /* Finally check if it fits in our ranges */
1773 if (value
>= -(1<<7) && value
<= (1<<7)-1) {
1774 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT8
;
1775 enc
[1] = value
&0xFF;
1777 } else if (value
>= -(1<<15) && value
<= (1<<15)-1) {
1778 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT16
;
1779 enc
[1] = value
&0xFF;
1780 enc
[2] = (value
>>8)&0xFF;
1782 } else if (value
>= -((long long)1<<31) && value
<= ((long long)1<<31)-1) {
1783 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT32
;
1784 enc
[1] = value
&0xFF;
1785 enc
[2] = (value
>>8)&0xFF;
1786 enc
[3] = (value
>>16)&0xFF;
1787 enc
[4] = (value
>>24)&0xFF;
1794 static int rdbSaveLzfStringObject(FILE *fp
, robj
*obj
) {
1795 unsigned int comprlen
, outlen
;
1799 /* We require at least four bytes compression for this to be worth it */
1800 outlen
= sdslen(obj
->ptr
)-4;
1801 if (outlen
<= 0) return 0;
1802 if ((out
= zmalloc(outlen
+1)) == NULL
) return 0;
1803 comprlen
= lzf_compress(obj
->ptr
, sdslen(obj
->ptr
), out
, outlen
);
1804 if (comprlen
== 0) {
1808 /* Data compressed! Let's save it on disk */
1809 byte
= (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_LZF
;
1810 if (fwrite(&byte
,1,1,fp
) == 0) goto writeerr
;
1811 if (rdbSaveLen(fp
,comprlen
) == -1) goto writeerr
;
1812 if (rdbSaveLen(fp
,sdslen(obj
->ptr
)) == -1) goto writeerr
;
1813 if (fwrite(out
,comprlen
,1,fp
) == 0) goto writeerr
;
1822 /* Save a string objet as [len][data] on disk. If the object is a string
1823 * representation of an integer value we try to safe it in a special form */
1824 static int rdbSaveStringObject(FILE *fp
, robj
*obj
) {
1825 size_t len
= sdslen(obj
->ptr
);
1828 /* Try integer encoding */
1830 unsigned char buf
[5];
1831 if ((enclen
= rdbTryIntegerEncoding(obj
->ptr
,buf
)) > 0) {
1832 if (fwrite(buf
,enclen
,1,fp
) == 0) return -1;
1837 /* Try LZF compression - under 20 bytes it's unable to compress even
1838 * aaaaaaaaaaaaaaaaaa so skip it */
1839 if (1 && len
> 20) {
1842 retval
= rdbSaveLzfStringObject(fp
,obj
);
1843 if (retval
== -1) return -1;
1844 if (retval
> 0) return 0;
1845 /* retval == 0 means data can't be compressed, save the old way */
1848 /* Store verbatim */
1849 if (rdbSaveLen(fp
,len
) == -1) return -1;
1850 if (len
&& fwrite(obj
->ptr
,len
,1,fp
) == 0) return -1;
1854 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
1855 static int rdbSave(char *filename
) {
1856 dictIterator
*di
= NULL
;
1861 time_t now
= time(NULL
);
1863 snprintf(tmpfile
,256,"temp-%d.%ld.rdb",(int)time(NULL
),(long int)random());
1864 fp
= fopen(tmpfile
,"w");
1866 redisLog(REDIS_WARNING
, "Failed saving the DB: %s", strerror(errno
));
1869 if (fwrite("REDIS0001",9,1,fp
) == 0) goto werr
;
1870 for (j
= 0; j
< server
.dbnum
; j
++) {
1871 redisDb
*db
= server
.db
+j
;
1873 if (dictSize(d
) == 0) continue;
1874 di
= dictGetIterator(d
);
1880 /* Write the SELECT DB opcode */
1881 if (rdbSaveType(fp
,REDIS_SELECTDB
) == -1) goto werr
;
1882 if (rdbSaveLen(fp
,j
) == -1) goto werr
;
1884 /* Iterate this DB writing every entry */
1885 while((de
= dictNext(di
)) != NULL
) {
1886 robj
*key
= dictGetEntryKey(de
);
1887 robj
*o
= dictGetEntryVal(de
);
1888 time_t expiretime
= getExpire(db
,key
);
1890 /* Save the expire time */
1891 if (expiretime
!= -1) {
1892 /* If this key is already expired skip it */
1893 if (expiretime
< now
) continue;
1894 if (rdbSaveType(fp
,REDIS_EXPIRETIME
) == -1) goto werr
;
1895 if (rdbSaveTime(fp
,expiretime
) == -1) goto werr
;
1897 /* Save the key and associated value */
1898 if (rdbSaveType(fp
,o
->type
) == -1) goto werr
;
1899 if (rdbSaveStringObject(fp
,key
) == -1) goto werr
;
1900 if (o
->type
== REDIS_STRING
) {
1901 /* Save a string value */
1902 if (rdbSaveStringObject(fp
,o
) == -1) goto werr
;
1903 } else if (o
->type
== REDIS_LIST
) {
1904 /* Save a list value */
1905 list
*list
= o
->ptr
;
1909 if (rdbSaveLen(fp
,listLength(list
)) == -1) goto werr
;
1910 while((ln
= listYield(list
))) {
1911 robj
*eleobj
= listNodeValue(ln
);
1913 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
1915 } else if (o
->type
== REDIS_SET
) {
1916 /* Save a set value */
1918 dictIterator
*di
= dictGetIterator(set
);
1921 if (!set
) oom("dictGetIteraotr");
1922 if (rdbSaveLen(fp
,dictSize(set
)) == -1) goto werr
;
1923 while((de
= dictNext(di
)) != NULL
) {
1924 robj
*eleobj
= dictGetEntryKey(de
);
1926 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
1928 dictReleaseIterator(di
);
1933 dictReleaseIterator(di
);
1936 if (rdbSaveType(fp
,REDIS_EOF
) == -1) goto werr
;
1938 /* Make sure data will not remain on the OS's output buffers */
1943 /* Use RENAME to make sure the DB file is changed atomically only
1944 * if the generate DB file is ok. */
1945 if (rename(tmpfile
,filename
) == -1) {
1946 redisLog(REDIS_WARNING
,"Error moving temp DB file on the final destionation: %s", strerror(errno
));
1950 redisLog(REDIS_NOTICE
,"DB saved on disk");
1952 server
.lastsave
= time(NULL
);
1958 redisLog(REDIS_WARNING
,"Write error saving DB on disk: %s", strerror(errno
));
1959 if (di
) dictReleaseIterator(di
);
1963 static int rdbSaveBackground(char *filename
) {
1966 if (server
.bgsaveinprogress
) return REDIS_ERR
;
1967 if ((childpid
= fork()) == 0) {
1970 if (rdbSave(filename
) == REDIS_OK
) {
1977 if (childpid
== -1) {
1978 redisLog(REDIS_WARNING
,"Can't save in background: fork: %s",
1982 redisLog(REDIS_NOTICE
,"Background saving started by pid %d",childpid
);
1983 server
.bgsaveinprogress
= 1;
1986 return REDIS_OK
; /* unreached */
1989 static int rdbLoadType(FILE *fp
) {
1991 if (fread(&type
,1,1,fp
) == 0) return -1;
1995 static time_t rdbLoadTime(FILE *fp
) {
1997 if (fread(&t32
,4,1,fp
) == 0) return -1;
1998 return (time_t) t32
;
2001 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2002 * of this file for a description of how this are stored on disk.
2004 * isencoded is set to 1 if the readed length is not actually a length but
2005 * an "encoding type", check the above comments for more info */
2006 static uint32_t rdbLoadLen(FILE *fp
, int rdbver
, int *isencoded
) {
2007 unsigned char buf
[2];
2010 if (isencoded
) *isencoded
= 0;
2012 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
2017 if (fread(buf
,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2018 type
= (buf
[0]&0xC0)>>6;
2019 if (type
== REDIS_RDB_6BITLEN
) {
2020 /* Read a 6 bit len */
2022 } else if (type
== REDIS_RDB_ENCVAL
) {
2023 /* Read a 6 bit len encoding type */
2024 if (isencoded
) *isencoded
= 1;
2026 } else if (type
== REDIS_RDB_14BITLEN
) {
2027 /* Read a 14 bit len */
2028 if (fread(buf
+1,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2029 return ((buf
[0]&0x3F)<<8)|buf
[1];
2031 /* Read a 32 bit len */
2032 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
2038 static robj
*rdbLoadIntegerObject(FILE *fp
, int enctype
) {
2039 unsigned char enc
[4];
2042 if (enctype
== REDIS_RDB_ENC_INT8
) {
2043 if (fread(enc
,1,1,fp
) == 0) return NULL
;
2044 val
= (signed char)enc
[0];
2045 } else if (enctype
== REDIS_RDB_ENC_INT16
) {
2047 if (fread(enc
,2,1,fp
) == 0) return NULL
;
2048 v
= enc
[0]|(enc
[1]<<8);
2050 } else if (enctype
== REDIS_RDB_ENC_INT32
) {
2052 if (fread(enc
,4,1,fp
) == 0) return NULL
;
2053 v
= enc
[0]|(enc
[1]<<8)|(enc
[2]<<16)|(enc
[3]<<24);
2056 val
= 0; /* anti-warning */
2059 return createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",val
));
2062 static robj
*rdbLoadLzfStringObject(FILE*fp
, int rdbver
) {
2063 unsigned int len
, clen
;
2064 unsigned char *c
= NULL
;
2067 if ((clen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2068 if ((len
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2069 if ((c
= zmalloc(clen
)) == NULL
) goto err
;
2070 if ((val
= sdsnewlen(NULL
,len
)) == NULL
) goto err
;
2071 if (fread(c
,clen
,1,fp
) == 0) goto err
;
2072 if (lzf_decompress(c
,clen
,val
,len
) == 0) goto err
;
2074 return createObject(REDIS_STRING
,val
);
2081 static robj
*rdbLoadStringObject(FILE*fp
, int rdbver
) {
2086 len
= rdbLoadLen(fp
,rdbver
,&isencoded
);
2089 case REDIS_RDB_ENC_INT8
:
2090 case REDIS_RDB_ENC_INT16
:
2091 case REDIS_RDB_ENC_INT32
:
2092 return tryObjectSharing(rdbLoadIntegerObject(fp
,len
));
2093 case REDIS_RDB_ENC_LZF
:
2094 return tryObjectSharing(rdbLoadLzfStringObject(fp
,rdbver
));
2100 if (len
== REDIS_RDB_LENERR
) return NULL
;
2101 val
= sdsnewlen(NULL
,len
);
2102 if (len
&& fread(val
,len
,1,fp
) == 0) {
2106 return tryObjectSharing(createObject(REDIS_STRING
,val
));
2109 static int rdbLoad(char *filename
) {
2111 robj
*keyobj
= NULL
;
2113 int type
, retval
, rdbver
;
2114 dict
*d
= server
.db
[0].dict
;
2115 redisDb
*db
= server
.db
+0;
2117 time_t expiretime
= -1, now
= time(NULL
);
2119 fp
= fopen(filename
,"r");
2120 if (!fp
) return REDIS_ERR
;
2121 if (fread(buf
,9,1,fp
) == 0) goto eoferr
;
2123 if (memcmp(buf
,"REDIS",5) != 0) {
2125 redisLog(REDIS_WARNING
,"Wrong signature trying to load DB from file");
2128 rdbver
= atoi(buf
+5);
2131 redisLog(REDIS_WARNING
,"Can't handle RDB format version %d",rdbver
);
2138 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
2139 if (type
== REDIS_EXPIRETIME
) {
2140 if ((expiretime
= rdbLoadTime(fp
)) == -1) goto eoferr
;
2141 /* We read the time so we need to read the object type again */
2142 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
2144 if (type
== REDIS_EOF
) break;
2145 /* Handle SELECT DB opcode as a special case */
2146 if (type
== REDIS_SELECTDB
) {
2147 if ((dbid
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2149 if (dbid
>= (unsigned)server
.dbnum
) {
2150 redisLog(REDIS_WARNING
,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server
.dbnum
);
2153 db
= server
.db
+dbid
;
2158 if ((keyobj
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2160 if (type
== REDIS_STRING
) {
2161 /* Read string value */
2162 if ((o
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2163 } else if (type
== REDIS_LIST
|| type
== REDIS_SET
) {
2164 /* Read list/set value */
2167 if ((listlen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2169 o
= (type
== REDIS_LIST
) ? createListObject() : createSetObject();
2170 /* Load every single element of the list/set */
2174 if ((ele
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2175 if (type
== REDIS_LIST
) {
2176 if (!listAddNodeTail((list
*)o
->ptr
,ele
))
2177 oom("listAddNodeTail");
2179 if (dictAdd((dict
*)o
->ptr
,ele
,NULL
) == DICT_ERR
)
2186 /* Add the new object in the hash table */
2187 retval
= dictAdd(d
,keyobj
,o
);
2188 if (retval
== DICT_ERR
) {
2189 redisLog(REDIS_WARNING
,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj
->ptr
);
2192 /* Set the expire time if needed */
2193 if (expiretime
!= -1) {
2194 setExpire(db
,keyobj
,expiretime
);
2195 /* Delete this key if already expired */
2196 if (expiretime
< now
) deleteKey(db
,keyobj
);
2204 eoferr
: /* unexpected end of file is handled here with a fatal exit */
2205 if (keyobj
) decrRefCount(keyobj
);
2206 redisLog(REDIS_WARNING
,"Short read or OOM loading DB. Unrecoverable error, exiting now.");
2208 return REDIS_ERR
; /* Just to avoid warning */
2211 /*================================== Commands =============================== */
2213 static void authCommand(redisClient
*c
) {
2214 if (!server
.requirepass
|| !strcmp(c
->argv
[1]->ptr
, server
.requirepass
)) {
2215 c
->authenticated
= 1;
2216 addReply(c
,shared
.ok
);
2218 c
->authenticated
= 0;
2219 addReply(c
,shared
.err
);
2223 static void pingCommand(redisClient
*c
) {
2224 addReply(c
,shared
.pong
);
2227 static void echoCommand(redisClient
*c
) {
2228 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",
2229 (int)sdslen(c
->argv
[1]->ptr
)));
2230 addReply(c
,c
->argv
[1]);
2231 addReply(c
,shared
.crlf
);
2234 /*=================================== Strings =============================== */
2236 static void setGenericCommand(redisClient
*c
, int nx
) {
2239 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2240 if (retval
== DICT_ERR
) {
2242 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2243 incrRefCount(c
->argv
[2]);
2245 addReply(c
,shared
.czero
);
2249 incrRefCount(c
->argv
[1]);
2250 incrRefCount(c
->argv
[2]);
2253 removeExpire(c
->db
,c
->argv
[1]);
2254 addReply(c
, nx
? shared
.cone
: shared
.ok
);
2257 static void setCommand(redisClient
*c
) {
2258 setGenericCommand(c
,0);
2261 static void setnxCommand(redisClient
*c
) {
2262 setGenericCommand(c
,1);
2265 static void getCommand(redisClient
*c
) {
2266 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2269 addReply(c
,shared
.nullbulk
);
2271 if (o
->type
!= REDIS_STRING
) {
2272 addReply(c
,shared
.wrongtypeerr
);
2274 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(o
->ptr
)));
2276 addReply(c
,shared
.crlf
);
2281 static void getSetCommand(redisClient
*c
) {
2283 if (dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]) == DICT_ERR
) {
2284 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2286 incrRefCount(c
->argv
[1]);
2288 incrRefCount(c
->argv
[2]);
2290 removeExpire(c
->db
,c
->argv
[1]);
2293 static void mgetCommand(redisClient
*c
) {
2296 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",c
->argc
-1));
2297 for (j
= 1; j
< c
->argc
; j
++) {
2298 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[j
]);
2300 addReply(c
,shared
.nullbulk
);
2302 if (o
->type
!= REDIS_STRING
) {
2303 addReply(c
,shared
.nullbulk
);
2305 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(o
->ptr
)));
2307 addReply(c
,shared
.crlf
);
2313 static void incrDecrCommand(redisClient
*c
, long long incr
) {
2318 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2322 if (o
->type
!= REDIS_STRING
) {
2327 value
= strtoll(o
->ptr
, &eptr
, 10);
2332 o
= createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",value
));
2333 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],o
);
2334 if (retval
== DICT_ERR
) {
2335 dictReplace(c
->db
->dict
,c
->argv
[1],o
);
2336 removeExpire(c
->db
,c
->argv
[1]);
2338 incrRefCount(c
->argv
[1]);
2341 addReply(c
,shared
.colon
);
2343 addReply(c
,shared
.crlf
);
2346 static void incrCommand(redisClient
*c
) {
2347 incrDecrCommand(c
,1);
2350 static void decrCommand(redisClient
*c
) {
2351 incrDecrCommand(c
,-1);
2354 static void incrbyCommand(redisClient
*c
) {
2355 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
2356 incrDecrCommand(c
,incr
);
2359 static void decrbyCommand(redisClient
*c
) {
2360 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
2361 incrDecrCommand(c
,-incr
);
2364 /* ========================= Type agnostic commands ========================= */
2366 static void delCommand(redisClient
*c
) {
2369 for (j
= 1; j
< c
->argc
; j
++) {
2370 if (deleteKey(c
->db
,c
->argv
[j
])) {
2377 addReply(c
,shared
.czero
);
2380 addReply(c
,shared
.cone
);
2383 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",deleted
));
2388 static void existsCommand(redisClient
*c
) {
2389 addReply(c
,lookupKeyRead(c
->db
,c
->argv
[1]) ? shared
.cone
: shared
.czero
);
2392 static void selectCommand(redisClient
*c
) {
2393 int id
= atoi(c
->argv
[1]->ptr
);
2395 if (selectDb(c
,id
) == REDIS_ERR
) {
2396 addReplySds(c
,sdsnew("-ERR invalid DB index\r\n"));
2398 addReply(c
,shared
.ok
);
2402 static void randomkeyCommand(redisClient
*c
) {
2406 de
= dictGetRandomKey(c
->db
->dict
);
2407 if (!de
|| expireIfNeeded(c
->db
,dictGetEntryKey(de
)) == 0) break;
2410 addReply(c
,shared
.plus
);
2411 addReply(c
,shared
.crlf
);
2413 addReply(c
,shared
.plus
);
2414 addReply(c
,dictGetEntryKey(de
));
2415 addReply(c
,shared
.crlf
);
2419 static void keysCommand(redisClient
*c
) {
2422 sds pattern
= c
->argv
[1]->ptr
;
2423 int plen
= sdslen(pattern
);
2424 int numkeys
= 0, keyslen
= 0;
2425 robj
*lenobj
= createObject(REDIS_STRING
,NULL
);
2427 di
= dictGetIterator(c
->db
->dict
);
2428 if (!di
) oom("dictGetIterator");
2430 decrRefCount(lenobj
);
2431 while((de
= dictNext(di
)) != NULL
) {
2432 robj
*keyobj
= dictGetEntryKey(de
);
2434 sds key
= keyobj
->ptr
;
2435 if ((pattern
[0] == '*' && pattern
[1] == '\0') ||
2436 stringmatchlen(pattern
,plen
,key
,sdslen(key
),0)) {
2437 if (expireIfNeeded(c
->db
,keyobj
) == 0) {
2439 addReply(c
,shared
.space
);
2442 keyslen
+= sdslen(key
);
2446 dictReleaseIterator(di
);
2447 lenobj
->ptr
= sdscatprintf(sdsempty(),"$%lu\r\n",keyslen
+(numkeys
? (numkeys
-1) : 0));
2448 addReply(c
,shared
.crlf
);
2451 static void dbsizeCommand(redisClient
*c
) {
2453 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c
->db
->dict
)));
2456 static void lastsaveCommand(redisClient
*c
) {
2458 sdscatprintf(sdsempty(),":%lu\r\n",server
.lastsave
));
2461 static void typeCommand(redisClient
*c
) {
2465 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2470 case REDIS_STRING
: type
= "+string"; break;
2471 case REDIS_LIST
: type
= "+list"; break;
2472 case REDIS_SET
: type
= "+set"; break;
2473 default: type
= "unknown"; break;
2476 addReplySds(c
,sdsnew(type
));
2477 addReply(c
,shared
.crlf
);
2480 static void saveCommand(redisClient
*c
) {
2481 if (server
.bgsaveinprogress
) {
2482 addReplySds(c
,sdsnew("-ERR background save in progress\r\n"));
2485 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
2486 addReply(c
,shared
.ok
);
2488 addReply(c
,shared
.err
);
2492 static void bgsaveCommand(redisClient
*c
) {
2493 if (server
.bgsaveinprogress
) {
2494 addReplySds(c
,sdsnew("-ERR background save already in progress\r\n"));
2497 if (rdbSaveBackground(server
.dbfilename
) == REDIS_OK
) {
2498 addReply(c
,shared
.ok
);
2500 addReply(c
,shared
.err
);
2504 static void shutdownCommand(redisClient
*c
) {
2505 redisLog(REDIS_WARNING
,"User requested shutdown, saving DB...");
2506 /* XXX: TODO kill the child if there is a bgsave in progress */
2507 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
2508 if (server
.daemonize
) {
2509 unlink(server
.pidfile
);
2511 redisLog(REDIS_WARNING
,"%zu bytes used at exit",zmalloc_used_memory());
2512 redisLog(REDIS_WARNING
,"Server exit now, bye bye...");
2515 redisLog(REDIS_WARNING
,"Error trying to save the DB, can't exit");
2516 addReplySds(c
,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
2520 static void renameGenericCommand(redisClient
*c
, int nx
) {
2523 /* To use the same key as src and dst is probably an error */
2524 if (sdscmp(c
->argv
[1]->ptr
,c
->argv
[2]->ptr
) == 0) {
2525 addReply(c
,shared
.sameobjecterr
);
2529 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2531 addReply(c
,shared
.nokeyerr
);
2535 deleteIfVolatile(c
->db
,c
->argv
[2]);
2536 if (dictAdd(c
->db
->dict
,c
->argv
[2],o
) == DICT_ERR
) {
2539 addReply(c
,shared
.czero
);
2542 dictReplace(c
->db
->dict
,c
->argv
[2],o
);
2544 incrRefCount(c
->argv
[2]);
2546 deleteKey(c
->db
,c
->argv
[1]);
2548 addReply(c
,nx
? shared
.cone
: shared
.ok
);
2551 static void renameCommand(redisClient
*c
) {
2552 renameGenericCommand(c
,0);
2555 static void renamenxCommand(redisClient
*c
) {
2556 renameGenericCommand(c
,1);
2559 static void moveCommand(redisClient
*c
) {
2564 /* Obtain source and target DB pointers */
2567 if (selectDb(c
,atoi(c
->argv
[2]->ptr
)) == REDIS_ERR
) {
2568 addReply(c
,shared
.outofrangeerr
);
2572 selectDb(c
,srcid
); /* Back to the source DB */
2574 /* If the user is moving using as target the same
2575 * DB as the source DB it is probably an error. */
2577 addReply(c
,shared
.sameobjecterr
);
2581 /* Check if the element exists and get a reference */
2582 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2584 addReply(c
,shared
.czero
);
2588 /* Try to add the element to the target DB */
2589 deleteIfVolatile(dst
,c
->argv
[1]);
2590 if (dictAdd(dst
->dict
,c
->argv
[1],o
) == DICT_ERR
) {
2591 addReply(c
,shared
.czero
);
2594 incrRefCount(c
->argv
[1]);
2597 /* OK! key moved, free the entry in the source DB */
2598 deleteKey(src
,c
->argv
[1]);
2600 addReply(c
,shared
.cone
);
2603 /* =================================== Lists ================================ */
2604 static void pushGenericCommand(redisClient
*c
, int where
) {
2608 lobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2610 lobj
= createListObject();
2612 if (where
== REDIS_HEAD
) {
2613 if (!listAddNodeHead(list
,c
->argv
[2])) oom("listAddNodeHead");
2615 if (!listAddNodeTail(list
,c
->argv
[2])) oom("listAddNodeTail");
2617 dictAdd(c
->db
->dict
,c
->argv
[1],lobj
);
2618 incrRefCount(c
->argv
[1]);
2619 incrRefCount(c
->argv
[2]);
2621 if (lobj
->type
!= REDIS_LIST
) {
2622 addReply(c
,shared
.wrongtypeerr
);
2626 if (where
== REDIS_HEAD
) {
2627 if (!listAddNodeHead(list
,c
->argv
[2])) oom("listAddNodeHead");
2629 if (!listAddNodeTail(list
,c
->argv
[2])) oom("listAddNodeTail");
2631 incrRefCount(c
->argv
[2]);
2634 addReply(c
,shared
.ok
);
2637 static void lpushCommand(redisClient
*c
) {
2638 pushGenericCommand(c
,REDIS_HEAD
);
2641 static void rpushCommand(redisClient
*c
) {
2642 pushGenericCommand(c
,REDIS_TAIL
);
2645 static void llenCommand(redisClient
*c
) {
2649 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2651 addReply(c
,shared
.czero
);
2654 if (o
->type
!= REDIS_LIST
) {
2655 addReply(c
,shared
.wrongtypeerr
);
2658 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",listLength(l
)));
2663 static void lindexCommand(redisClient
*c
) {
2665 int index
= atoi(c
->argv
[2]->ptr
);
2667 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2669 addReply(c
,shared
.nullbulk
);
2671 if (o
->type
!= REDIS_LIST
) {
2672 addReply(c
,shared
.wrongtypeerr
);
2674 list
*list
= o
->ptr
;
2677 ln
= listIndex(list
, index
);
2679 addReply(c
,shared
.nullbulk
);
2681 robj
*ele
= listNodeValue(ln
);
2682 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele
->ptr
)));
2684 addReply(c
,shared
.crlf
);
2690 static void lsetCommand(redisClient
*c
) {
2692 int index
= atoi(c
->argv
[2]->ptr
);
2694 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2696 addReply(c
,shared
.nokeyerr
);
2698 if (o
->type
!= REDIS_LIST
) {
2699 addReply(c
,shared
.wrongtypeerr
);
2701 list
*list
= o
->ptr
;
2704 ln
= listIndex(list
, index
);
2706 addReply(c
,shared
.outofrangeerr
);
2708 robj
*ele
= listNodeValue(ln
);
2711 listNodeValue(ln
) = c
->argv
[3];
2712 incrRefCount(c
->argv
[3]);
2713 addReply(c
,shared
.ok
);
2720 static void popGenericCommand(redisClient
*c
, int where
) {
2723 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2725 addReply(c
,shared
.nullbulk
);
2727 if (o
->type
!= REDIS_LIST
) {
2728 addReply(c
,shared
.wrongtypeerr
);
2730 list
*list
= o
->ptr
;
2733 if (where
== REDIS_HEAD
)
2734 ln
= listFirst(list
);
2736 ln
= listLast(list
);
2739 addReply(c
,shared
.nullbulk
);
2741 robj
*ele
= listNodeValue(ln
);
2742 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele
->ptr
)));
2744 addReply(c
,shared
.crlf
);
2745 listDelNode(list
,ln
);
2752 static void lpopCommand(redisClient
*c
) {
2753 popGenericCommand(c
,REDIS_HEAD
);
2756 static void rpopCommand(redisClient
*c
) {
2757 popGenericCommand(c
,REDIS_TAIL
);
2760 static void lrangeCommand(redisClient
*c
) {
2762 int start
= atoi(c
->argv
[2]->ptr
);
2763 int end
= atoi(c
->argv
[3]->ptr
);
2765 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2767 addReply(c
,shared
.nullmultibulk
);
2769 if (o
->type
!= REDIS_LIST
) {
2770 addReply(c
,shared
.wrongtypeerr
);
2772 list
*list
= o
->ptr
;
2774 int llen
= listLength(list
);
2778 /* convert negative indexes */
2779 if (start
< 0) start
= llen
+start
;
2780 if (end
< 0) end
= llen
+end
;
2781 if (start
< 0) start
= 0;
2782 if (end
< 0) end
= 0;
2784 /* indexes sanity checks */
2785 if (start
> end
|| start
>= llen
) {
2786 /* Out of range start or start > end result in empty list */
2787 addReply(c
,shared
.emptymultibulk
);
2790 if (end
>= llen
) end
= llen
-1;
2791 rangelen
= (end
-start
)+1;
2793 /* Return the result in form of a multi-bulk reply */
2794 ln
= listIndex(list
, start
);
2795 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",rangelen
));
2796 for (j
= 0; j
< rangelen
; j
++) {
2797 ele
= listNodeValue(ln
);
2798 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele
->ptr
)));
2800 addReply(c
,shared
.crlf
);
2807 static void ltrimCommand(redisClient
*c
) {
2809 int start
= atoi(c
->argv
[2]->ptr
);
2810 int end
= atoi(c
->argv
[3]->ptr
);
2812 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2814 addReply(c
,shared
.nokeyerr
);
2816 if (o
->type
!= REDIS_LIST
) {
2817 addReply(c
,shared
.wrongtypeerr
);
2819 list
*list
= o
->ptr
;
2821 int llen
= listLength(list
);
2822 int j
, ltrim
, rtrim
;
2824 /* convert negative indexes */
2825 if (start
< 0) start
= llen
+start
;
2826 if (end
< 0) end
= llen
+end
;
2827 if (start
< 0) start
= 0;
2828 if (end
< 0) end
= 0;
2830 /* indexes sanity checks */
2831 if (start
> end
|| start
>= llen
) {
2832 /* Out of range start or start > end result in empty list */
2836 if (end
>= llen
) end
= llen
-1;
2841 /* Remove list elements to perform the trim */
2842 for (j
= 0; j
< ltrim
; j
++) {
2843 ln
= listFirst(list
);
2844 listDelNode(list
,ln
);
2846 for (j
= 0; j
< rtrim
; j
++) {
2847 ln
= listLast(list
);
2848 listDelNode(list
,ln
);
2850 addReply(c
,shared
.ok
);
2856 static void lremCommand(redisClient
*c
) {
2859 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2861 addReply(c
,shared
.nokeyerr
);
2863 if (o
->type
!= REDIS_LIST
) {
2864 addReply(c
,shared
.wrongtypeerr
);
2866 list
*list
= o
->ptr
;
2867 listNode
*ln
, *next
;
2868 int toremove
= atoi(c
->argv
[2]->ptr
);
2873 toremove
= -toremove
;
2876 ln
= fromtail
? list
->tail
: list
->head
;
2878 robj
*ele
= listNodeValue(ln
);
2880 next
= fromtail
? ln
->prev
: ln
->next
;
2881 if (sdscmp(ele
->ptr
,c
->argv
[3]->ptr
) == 0) {
2882 listDelNode(list
,ln
);
2885 if (toremove
&& removed
== toremove
) break;
2889 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",removed
));
2894 /* ==================================== Sets ================================ */
2896 static void saddCommand(redisClient
*c
) {
2899 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2901 set
= createSetObject();
2902 dictAdd(c
->db
->dict
,c
->argv
[1],set
);
2903 incrRefCount(c
->argv
[1]);
2905 if (set
->type
!= REDIS_SET
) {
2906 addReply(c
,shared
.wrongtypeerr
);
2910 if (dictAdd(set
->ptr
,c
->argv
[2],NULL
) == DICT_OK
) {
2911 incrRefCount(c
->argv
[2]);
2913 addReply(c
,shared
.cone
);
2915 addReply(c
,shared
.czero
);
2919 static void sremCommand(redisClient
*c
) {
2922 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2924 addReply(c
,shared
.czero
);
2926 if (set
->type
!= REDIS_SET
) {
2927 addReply(c
,shared
.wrongtypeerr
);
2930 if (dictDelete(set
->ptr
,c
->argv
[2]) == DICT_OK
) {
2932 addReply(c
,shared
.cone
);
2934 addReply(c
,shared
.czero
);
2939 static void smoveCommand(redisClient
*c
) {
2940 robj
*srcset
, *dstset
;
2942 srcset
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2943 dstset
= lookupKeyWrite(c
->db
,c
->argv
[2]);
2945 /* If the source key does not exist return 0, if it's of the wrong type
2947 if (srcset
== NULL
|| srcset
->type
!= REDIS_SET
) {
2948 addReply(c
, srcset
? shared
.wrongtypeerr
: shared
.czero
);
2951 /* Error if the destination key is not a set as well */
2952 if (dstset
&& dstset
->type
!= REDIS_SET
) {
2953 addReply(c
,shared
.wrongtypeerr
);
2956 /* Remove the element from the source set */
2957 if (dictDelete(srcset
->ptr
,c
->argv
[3]) == DICT_ERR
) {
2958 /* Key not found in the src set! return zero */
2959 addReply(c
,shared
.czero
);
2963 /* Add the element to the destination set */
2965 dstset
= createSetObject();
2966 dictAdd(c
->db
->dict
,c
->argv
[2],dstset
);
2967 incrRefCount(c
->argv
[2]);
2969 if (dictAdd(dstset
->ptr
,c
->argv
[3],NULL
) == DICT_OK
)
2970 incrRefCount(c
->argv
[3]);
2971 addReply(c
,shared
.cone
);
2974 static void sismemberCommand(redisClient
*c
) {
2977 set
= lookupKeyRead(c
->db
,c
->argv
[1]);
2979 addReply(c
,shared
.czero
);
2981 if (set
->type
!= REDIS_SET
) {
2982 addReply(c
,shared
.wrongtypeerr
);
2985 if (dictFind(set
->ptr
,c
->argv
[2]))
2986 addReply(c
,shared
.cone
);
2988 addReply(c
,shared
.czero
);
2992 static void scardCommand(redisClient
*c
) {
2996 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2998 addReply(c
,shared
.czero
);
3001 if (o
->type
!= REDIS_SET
) {
3002 addReply(c
,shared
.wrongtypeerr
);
3005 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",
3011 static int qsortCompareSetsByCardinality(const void *s1
, const void *s2
) {
3012 dict
**d1
= (void*) s1
, **d2
= (void*) s2
;
3014 return dictSize(*d1
)-dictSize(*d2
);
3017 static void sinterGenericCommand(redisClient
*c
, robj
**setskeys
, int setsnum
, robj
*dstkey
) {
3018 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
3021 robj
*lenobj
= NULL
, *dstset
= NULL
;
3022 int j
, cardinality
= 0;
3024 if (!dv
) oom("sinterGenericCommand");
3025 for (j
= 0; j
< setsnum
; j
++) {
3029 lookupKeyWrite(c
->db
,setskeys
[j
]) :
3030 lookupKeyRead(c
->db
,setskeys
[j
]);
3034 deleteKey(c
->db
,dstkey
);
3035 addReply(c
,shared
.ok
);
3037 addReply(c
,shared
.nullmultibulk
);
3041 if (setobj
->type
!= REDIS_SET
) {
3043 addReply(c
,shared
.wrongtypeerr
);
3046 dv
[j
] = setobj
->ptr
;
3048 /* Sort sets from the smallest to largest, this will improve our
3049 * algorithm's performace */
3050 qsort(dv
,setsnum
,sizeof(dict
*),qsortCompareSetsByCardinality
);
3052 /* The first thing we should output is the total number of elements...
3053 * since this is a multi-bulk write, but at this stage we don't know
3054 * the intersection set size, so we use a trick, append an empty object
3055 * to the output list and save the pointer to later modify it with the
3058 lenobj
= createObject(REDIS_STRING
,NULL
);
3060 decrRefCount(lenobj
);
3062 /* If we have a target key where to store the resulting set
3063 * create this key with an empty set inside */
3064 dstset
= createSetObject();
3067 /* Iterate all the elements of the first (smallest) set, and test
3068 * the element against all the other sets, if at least one set does
3069 * not include the element it is discarded */
3070 di
= dictGetIterator(dv
[0]);
3071 if (!di
) oom("dictGetIterator");
3073 while((de
= dictNext(di
)) != NULL
) {
3076 for (j
= 1; j
< setsnum
; j
++)
3077 if (dictFind(dv
[j
],dictGetEntryKey(de
)) == NULL
) break;
3079 continue; /* at least one set does not contain the member */
3080 ele
= dictGetEntryKey(de
);
3082 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(ele
->ptr
)));
3084 addReply(c
,shared
.crlf
);
3087 dictAdd(dstset
->ptr
,ele
,NULL
);
3091 dictReleaseIterator(di
);
3094 /* Store the resulting set into the target */
3095 deleteKey(c
->db
,dstkey
);
3096 dictAdd(c
->db
->dict
,dstkey
,dstset
);
3097 incrRefCount(dstkey
);
3101 lenobj
->ptr
= sdscatprintf(sdsempty(),"*%d\r\n",cardinality
);
3103 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",
3104 dictSize((dict
*)dstset
->ptr
)));
3110 static void sinterCommand(redisClient
*c
) {
3111 sinterGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
);
3114 static void sinterstoreCommand(redisClient
*c
) {
3115 sinterGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1]);
3118 #define REDIS_OP_UNION 0
3119 #define REDIS_OP_DIFF 1
3121 static void sunionDiffGenericCommand(redisClient
*c
, robj
**setskeys
, int setsnum
, robj
*dstkey
, int op
) {
3122 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
3125 robj
*dstset
= NULL
;
3126 int j
, cardinality
= 0;
3128 if (!dv
) oom("sunionDiffGenericCommand");
3129 for (j
= 0; j
< setsnum
; j
++) {
3133 lookupKeyWrite(c
->db
,setskeys
[j
]) :
3134 lookupKeyRead(c
->db
,setskeys
[j
]);
3139 if (setobj
->type
!= REDIS_SET
) {
3141 addReply(c
,shared
.wrongtypeerr
);
3144 dv
[j
] = setobj
->ptr
;
3147 /* We need a temp set object to store our union. If the dstkey
3148 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
3149 * this set object will be the resulting object to set into the target key*/
3150 dstset
= createSetObject();
3152 /* Iterate all the elements of all the sets, add every element a single
3153 * time to the result set */
3154 for (j
= 0; j
< setsnum
; j
++) {
3155 if (op
== REDIS_OP_DIFF
&& j
== 0 && !dv
[j
]) break; /* result set is empty */
3156 if (!dv
[j
]) continue; /* non existing keys are like empty sets */
3158 di
= dictGetIterator(dv
[j
]);
3159 if (!di
) oom("dictGetIterator");
3161 while((de
= dictNext(di
)) != NULL
) {
3164 /* dictAdd will not add the same element multiple times */
3165 ele
= dictGetEntryKey(de
);
3166 if (op
== REDIS_OP_UNION
|| j
== 0) {
3167 if (dictAdd(dstset
->ptr
,ele
,NULL
) == DICT_OK
) {
3171 } else if (op
== REDIS_OP_DIFF
) {
3172 if (dictDelete(dstset
->ptr
,ele
) == DICT_OK
) {
3177 dictReleaseIterator(di
);
3179 if (op
== REDIS_OP_DIFF
&& cardinality
== 0) break; /* result set is empty */
3182 /* Output the content of the resulting set, if not in STORE mode */
3184 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",cardinality
));
3185 di
= dictGetIterator(dstset
->ptr
);
3186 if (!di
) oom("dictGetIterator");
3187 while((de
= dictNext(di
)) != NULL
) {
3190 ele
= dictGetEntryKey(de
);
3191 addReplySds(c
,sdscatprintf(sdsempty(),
3192 "$%d\r\n",sdslen(ele
->ptr
)));
3194 addReply(c
,shared
.crlf
);
3196 dictReleaseIterator(di
);
3198 /* If we have a target key where to store the resulting set
3199 * create this key with the result set inside */
3200 deleteKey(c
->db
,dstkey
);
3201 dictAdd(c
->db
->dict
,dstkey
,dstset
);
3202 incrRefCount(dstkey
);
3207 decrRefCount(dstset
);
3209 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",
3210 dictSize((dict
*)dstset
->ptr
)));
3216 static void sunionCommand(redisClient
*c
) {
3217 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_UNION
);
3220 static void sunionstoreCommand(redisClient
*c
) {
3221 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_UNION
);
3224 static void sdiffCommand(redisClient
*c
) {
3225 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_DIFF
);
3228 static void sdiffstoreCommand(redisClient
*c
) {
3229 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_DIFF
);
3232 static void flushdbCommand(redisClient
*c
) {
3233 server
.dirty
+= dictSize(c
->db
->dict
);
3234 dictEmpty(c
->db
->dict
);
3235 dictEmpty(c
->db
->expires
);
3236 addReply(c
,shared
.ok
);
3239 static void flushallCommand(redisClient
*c
) {
3240 server
.dirty
+= emptyDb();
3241 addReply(c
,shared
.ok
);
3242 rdbSave(server
.dbfilename
);
3246 redisSortOperation
*createSortOperation(int type
, robj
*pattern
) {
3247 redisSortOperation
*so
= zmalloc(sizeof(*so
));
3248 if (!so
) oom("createSortOperation");
3250 so
->pattern
= pattern
;
3254 /* Return the value associated to the key with a name obtained
3255 * substituting the first occurence of '*' in 'pattern' with 'subst' */
3256 robj
*lookupKeyByPattern(redisDb
*db
, robj
*pattern
, robj
*subst
) {
3260 int prefixlen
, sublen
, postfixlen
;
3261 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
3265 char buf
[REDIS_SORTKEY_MAX
+1];
3268 spat
= pattern
->ptr
;
3270 if (sdslen(spat
)+sdslen(ssub
)-1 > REDIS_SORTKEY_MAX
) return NULL
;
3271 p
= strchr(spat
,'*');
3272 if (!p
) return NULL
;
3275 sublen
= sdslen(ssub
);
3276 postfixlen
= sdslen(spat
)-(prefixlen
+1);
3277 memcpy(keyname
.buf
,spat
,prefixlen
);
3278 memcpy(keyname
.buf
+prefixlen
,ssub
,sublen
);
3279 memcpy(keyname
.buf
+prefixlen
+sublen
,p
+1,postfixlen
);
3280 keyname
.buf
[prefixlen
+sublen
+postfixlen
] = '\0';
3281 keyname
.len
= prefixlen
+sublen
+postfixlen
;
3283 keyobj
.refcount
= 1;
3284 keyobj
.type
= REDIS_STRING
;
3285 keyobj
.ptr
= ((char*)&keyname
)+(sizeof(long)*2);
3287 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
3288 return lookupKeyRead(db
,&keyobj
);
3291 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
3292 * the additional parameter is not standard but a BSD-specific we have to
3293 * pass sorting parameters via the global 'server' structure */
3294 static int sortCompare(const void *s1
, const void *s2
) {
3295 const redisSortObject
*so1
= s1
, *so2
= s2
;
3298 if (!server
.sort_alpha
) {
3299 /* Numeric sorting. Here it's trivial as we precomputed scores */
3300 if (so1
->u
.score
> so2
->u
.score
) {
3302 } else if (so1
->u
.score
< so2
->u
.score
) {
3308 /* Alphanumeric sorting */
3309 if (server
.sort_bypattern
) {
3310 if (!so1
->u
.cmpobj
|| !so2
->u
.cmpobj
) {
3311 /* At least one compare object is NULL */
3312 if (so1
->u
.cmpobj
== so2
->u
.cmpobj
)
3314 else if (so1
->u
.cmpobj
== NULL
)
3319 /* We have both the objects, use strcoll */
3320 cmp
= strcoll(so1
->u
.cmpobj
->ptr
,so2
->u
.cmpobj
->ptr
);
3323 /* Compare elements directly */
3324 cmp
= strcoll(so1
->obj
->ptr
,so2
->obj
->ptr
);
3327 return server
.sort_desc
? -cmp
: cmp
;
3330 /* The SORT command is the most complex command in Redis. Warning: this code
3331 * is optimized for speed and a bit less for readability */
3332 static void sortCommand(redisClient
*c
) {
3335 int desc
= 0, alpha
= 0;
3336 int limit_start
= 0, limit_count
= -1, start
, end
;
3337 int j
, dontsort
= 0, vectorlen
;
3338 int getop
= 0; /* GET operation counter */
3339 robj
*sortval
, *sortby
= NULL
;
3340 redisSortObject
*vector
; /* Resulting vector to sort */
3342 /* Lookup the key to sort. It must be of the right types */
3343 sortval
= lookupKeyRead(c
->db
,c
->argv
[1]);
3344 if (sortval
== NULL
) {
3345 addReply(c
,shared
.nokeyerr
);
3348 if (sortval
->type
!= REDIS_SET
&& sortval
->type
!= REDIS_LIST
) {
3349 addReply(c
,shared
.wrongtypeerr
);
3353 /* Create a list of operations to perform for every sorted element.
3354 * Operations can be GET/DEL/INCR/DECR */
3355 operations
= listCreate();
3356 listSetFreeMethod(operations
,zfree
);
3359 /* Now we need to protect sortval incrementing its count, in the future
3360 * SORT may have options able to overwrite/delete keys during the sorting
3361 * and the sorted key itself may get destroied */
3362 incrRefCount(sortval
);
3364 /* The SORT command has an SQL-alike syntax, parse it */
3365 while(j
< c
->argc
) {
3366 int leftargs
= c
->argc
-j
-1;
3367 if (!strcasecmp(c
->argv
[j
]->ptr
,"asc")) {
3369 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"desc")) {
3371 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"alpha")) {
3373 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"limit") && leftargs
>= 2) {
3374 limit_start
= atoi(c
->argv
[j
+1]->ptr
);
3375 limit_count
= atoi(c
->argv
[j
+2]->ptr
);
3377 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"by") && leftargs
>= 1) {
3378 sortby
= c
->argv
[j
+1];
3379 /* If the BY pattern does not contain '*', i.e. it is constant,
3380 * we don't need to sort nor to lookup the weight keys. */
3381 if (strchr(c
->argv
[j
+1]->ptr
,'*') == NULL
) dontsort
= 1;
3383 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"get") && leftargs
>= 1) {
3384 listAddNodeTail(operations
,createSortOperation(
3385 REDIS_SORT_GET
,c
->argv
[j
+1]));
3388 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"del") && leftargs
>= 1) {
3389 listAddNodeTail(operations
,createSortOperation(
3390 REDIS_SORT_DEL
,c
->argv
[j
+1]));
3392 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"incr") && leftargs
>= 1) {
3393 listAddNodeTail(operations
,createSortOperation(
3394 REDIS_SORT_INCR
,c
->argv
[j
+1]));
3396 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"get") && leftargs
>= 1) {
3397 listAddNodeTail(operations
,createSortOperation(
3398 REDIS_SORT_DECR
,c
->argv
[j
+1]));
3401 decrRefCount(sortval
);
3402 listRelease(operations
);
3403 addReply(c
,shared
.syntaxerr
);
3409 /* Load the sorting vector with all the objects to sort */
3410 vectorlen
= (sortval
->type
== REDIS_LIST
) ?
3411 listLength((list
*)sortval
->ptr
) :
3412 dictSize((dict
*)sortval
->ptr
);
3413 vector
= zmalloc(sizeof(redisSortObject
)*vectorlen
);
3414 if (!vector
) oom("allocating objects vector for SORT");
3416 if (sortval
->type
== REDIS_LIST
) {
3417 list
*list
= sortval
->ptr
;
3421 while((ln
= listYield(list
))) {
3422 robj
*ele
= ln
->value
;
3423 vector
[j
].obj
= ele
;
3424 vector
[j
].u
.score
= 0;
3425 vector
[j
].u
.cmpobj
= NULL
;
3429 dict
*set
= sortval
->ptr
;
3433 di
= dictGetIterator(set
);
3434 if (!di
) oom("dictGetIterator");
3435 while((setele
= dictNext(di
)) != NULL
) {
3436 vector
[j
].obj
= dictGetEntryKey(setele
);
3437 vector
[j
].u
.score
= 0;
3438 vector
[j
].u
.cmpobj
= NULL
;
3441 dictReleaseIterator(di
);
3443 assert(j
== vectorlen
);
3445 /* Now it's time to load the right scores in the sorting vector */
3446 if (dontsort
== 0) {
3447 for (j
= 0; j
< vectorlen
; j
++) {
3451 byval
= lookupKeyByPattern(c
->db
,sortby
,vector
[j
].obj
);
3452 if (!byval
|| byval
->type
!= REDIS_STRING
) continue;
3454 vector
[j
].u
.cmpobj
= byval
;
3455 incrRefCount(byval
);
3457 vector
[j
].u
.score
= strtod(byval
->ptr
,NULL
);
3460 if (!alpha
) vector
[j
].u
.score
= strtod(vector
[j
].obj
->ptr
,NULL
);
3465 /* We are ready to sort the vector... perform a bit of sanity check
3466 * on the LIMIT option too. We'll use a partial version of quicksort. */
3467 start
= (limit_start
< 0) ? 0 : limit_start
;
3468 end
= (limit_count
< 0) ? vectorlen
-1 : start
+limit_count
-1;
3469 if (start
>= vectorlen
) {
3470 start
= vectorlen
-1;
3473 if (end
>= vectorlen
) end
= vectorlen
-1;
3475 if (dontsort
== 0) {
3476 server
.sort_desc
= desc
;
3477 server
.sort_alpha
= alpha
;
3478 server
.sort_bypattern
= sortby
? 1 : 0;
3479 if (sortby
&& (start
!= 0 || end
!= vectorlen
-1))
3480 pqsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
, start
,end
);
3482 qsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
);
3485 /* Send command output to the output buffer, performing the specified
3486 * GET/DEL/INCR/DECR operations if any. */
3487 outputlen
= getop
? getop
*(end
-start
+1) : end
-start
+1;
3488 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",outputlen
));
3489 for (j
= start
; j
<= end
; j
++) {
3492 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",
3493 sdslen(vector
[j
].obj
->ptr
)));
3494 addReply(c
,vector
[j
].obj
);
3495 addReply(c
,shared
.crlf
);
3497 listRewind(operations
);
3498 while((ln
= listYield(operations
))) {
3499 redisSortOperation
*sop
= ln
->value
;
3500 robj
*val
= lookupKeyByPattern(c
->db
,sop
->pattern
,
3503 if (sop
->type
== REDIS_SORT_GET
) {
3504 if (!val
|| val
->type
!= REDIS_STRING
) {
3505 addReply(c
,shared
.nullbulk
);
3507 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",
3510 addReply(c
,shared
.crlf
);
3512 } else if (sop
->type
== REDIS_SORT_DEL
) {
3519 decrRefCount(sortval
);
3520 listRelease(operations
);
3521 for (j
= 0; j
< vectorlen
; j
++) {
3522 if (sortby
&& alpha
&& vector
[j
].u
.cmpobj
)
3523 decrRefCount(vector
[j
].u
.cmpobj
);
3528 static void infoCommand(redisClient
*c
) {
3530 time_t uptime
= time(NULL
)-server
.stat_starttime
;
3532 info
= sdscatprintf(sdsempty(),
3533 "redis_version:%s\r\n"
3534 "uptime_in_seconds:%d\r\n"
3535 "uptime_in_days:%d\r\n"
3536 "connected_clients:%d\r\n"
3537 "connected_slaves:%d\r\n"
3538 "used_memory:%zu\r\n"
3539 "changes_since_last_save:%lld\r\n"
3540 "bgsave_in_progress:%d\r\n"
3541 "last_save_time:%d\r\n"
3542 "total_connections_received:%lld\r\n"
3543 "total_commands_processed:%lld\r\n"
3548 listLength(server
.clients
)-listLength(server
.slaves
),
3549 listLength(server
.slaves
),
3552 server
.bgsaveinprogress
,
3554 server
.stat_numconnections
,
3555 server
.stat_numcommands
,
3556 server
.masterhost
== NULL
? "master" : "slave"
3558 if (server
.masterhost
) {
3559 info
= sdscatprintf(info
,
3560 "master_host:%s\r\n"
3561 "master_port:%d\r\n"
3562 "master_link_status:%s\r\n"
3563 "master_last_io_seconds_ago:%d\r\n"
3566 (server
.replstate
== REDIS_REPL_CONNECTED
) ?
3568 (int)(time(NULL
)-server
.master
->lastinteraction
)
3571 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info
)));
3572 addReplySds(c
,info
);
3573 addReply(c
,shared
.crlf
);
3576 static void monitorCommand(redisClient
*c
) {
3577 /* ignore MONITOR if aleady slave or in monitor mode */
3578 if (c
->flags
& REDIS_SLAVE
) return;
3580 c
->flags
|= (REDIS_SLAVE
|REDIS_MONITOR
);
3582 if (!listAddNodeTail(server
.monitors
,c
)) oom("listAddNodeTail");
3583 addReply(c
,shared
.ok
);
3586 /* ================================= Expire ================================= */
3587 static int removeExpire(redisDb
*db
, robj
*key
) {
3588 if (dictDelete(db
->expires
,key
) == DICT_OK
) {
3595 static int setExpire(redisDb
*db
, robj
*key
, time_t when
) {
3596 if (dictAdd(db
->expires
,key
,(void*)when
) == DICT_ERR
) {
3604 /* Return the expire time of the specified key, or -1 if no expire
3605 * is associated with this key (i.e. the key is non volatile) */
3606 static time_t getExpire(redisDb
*db
, robj
*key
) {
3609 /* No expire? return ASAP */
3610 if (dictSize(db
->expires
) == 0 ||
3611 (de
= dictFind(db
->expires
,key
)) == NULL
) return -1;
3613 return (time_t) dictGetEntryVal(de
);
3616 static int expireIfNeeded(redisDb
*db
, robj
*key
) {
3620 /* No expire? return ASAP */
3621 if (dictSize(db
->expires
) == 0 ||
3622 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
3624 /* Lookup the expire */
3625 when
= (time_t) dictGetEntryVal(de
);
3626 if (time(NULL
) <= when
) return 0;
3628 /* Delete the key */
3629 dictDelete(db
->expires
,key
);
3630 return dictDelete(db
->dict
,key
) == DICT_OK
;
3633 static int deleteIfVolatile(redisDb
*db
, robj
*key
) {
3636 /* No expire? return ASAP */
3637 if (dictSize(db
->expires
) == 0 ||
3638 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
3640 /* Delete the key */
3642 dictDelete(db
->expires
,key
);
3643 return dictDelete(db
->dict
,key
) == DICT_OK
;
3646 static void expireCommand(redisClient
*c
) {
3648 int seconds
= atoi(c
->argv
[2]->ptr
);
3650 de
= dictFind(c
->db
->dict
,c
->argv
[1]);
3652 addReply(c
,shared
.czero
);
3656 addReply(c
, shared
.czero
);
3659 time_t when
= time(NULL
)+seconds
;
3660 if (setExpire(c
->db
,c
->argv
[1],when
))
3661 addReply(c
,shared
.cone
);
3663 addReply(c
,shared
.czero
);
3668 static void ttlCommand(redisClient
*c
) {
3672 expire
= getExpire(c
->db
,c
->argv
[1]);
3674 ttl
= (int) (expire
-time(NULL
));
3675 if (ttl
< 0) ttl
= -1;
3677 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",ttl
));
3680 /* =============================== Replication ============================= */
3682 static int syncWrite(int fd
, char *ptr
, ssize_t size
, int timeout
) {
3683 ssize_t nwritten
, ret
= size
;
3684 time_t start
= time(NULL
);
3688 if (aeWait(fd
,AE_WRITABLE
,1000) & AE_WRITABLE
) {
3689 nwritten
= write(fd
,ptr
,size
);
3690 if (nwritten
== -1) return -1;
3694 if ((time(NULL
)-start
) > timeout
) {
3702 static int syncRead(int fd
, char *ptr
, ssize_t size
, int timeout
) {
3703 ssize_t nread
, totread
= 0;
3704 time_t start
= time(NULL
);
3708 if (aeWait(fd
,AE_READABLE
,1000) & AE_READABLE
) {
3709 nread
= read(fd
,ptr
,size
);
3710 if (nread
== -1) return -1;
3715 if ((time(NULL
)-start
) > timeout
) {
3723 static int syncReadLine(int fd
, char *ptr
, ssize_t size
, int timeout
) {
3730 if (syncRead(fd
,&c
,1,timeout
) == -1) return -1;
3733 if (nread
&& *(ptr
-1) == '\r') *(ptr
-1) = '\0';
3744 static void syncCommand(redisClient
*c
) {
3745 /* ignore SYNC if aleady slave or in monitor mode */
3746 if (c
->flags
& REDIS_SLAVE
) return;
3748 /* SYNC can't be issued when the server has pending data to send to
3749 * the client about already issued commands. We need a fresh reply
3750 * buffer registering the differences between the BGSAVE and the current
3751 * dataset, so that we can copy to other slaves if needed. */
3752 if (listLength(c
->reply
) != 0) {
3753 addReplySds(c
,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
3757 redisLog(REDIS_NOTICE
,"Slave ask for synchronization");
3758 /* Here we need to check if there is a background saving operation
3759 * in progress, or if it is required to start one */
3760 if (server
.bgsaveinprogress
) {
3761 /* Ok a background save is in progress. Let's check if it is a good
3762 * one for replication, i.e. if there is another slave that is
3763 * registering differences since the server forked to save */
3767 listRewind(server
.slaves
);
3768 while((ln
= listYield(server
.slaves
))) {
3770 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) break;
3773 /* Perfect, the server is already registering differences for
3774 * another slave. Set the right state, and copy the buffer. */
3775 listRelease(c
->reply
);
3776 c
->reply
= listDup(slave
->reply
);
3777 if (!c
->reply
) oom("listDup copying slave reply list");
3778 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
3779 redisLog(REDIS_NOTICE
,"Waiting for end of BGSAVE for SYNC");
3781 /* No way, we need to wait for the next BGSAVE in order to
3782 * register differences */
3783 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_START
;
3784 redisLog(REDIS_NOTICE
,"Waiting for next BGSAVE for SYNC");
3787 /* Ok we don't have a BGSAVE in progress, let's start one */
3788 redisLog(REDIS_NOTICE
,"Starting BGSAVE for SYNC");
3789 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
3790 redisLog(REDIS_NOTICE
,"Replication failed, can't BGSAVE");
3791 addReplySds(c
,sdsnew("-ERR Unalbe to perform background save\r\n"));
3794 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
3797 c
->flags
|= REDIS_SLAVE
;
3799 if (!listAddNodeTail(server
.slaves
,c
)) oom("listAddNodeTail");
3803 static void sendBulkToSlave(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
3804 redisClient
*slave
= privdata
;
3806 REDIS_NOTUSED(mask
);
3807 char buf
[REDIS_IOBUF_LEN
];
3808 ssize_t nwritten
, buflen
;
3810 if (slave
->repldboff
== 0) {
3811 /* Write the bulk write count before to transfer the DB. In theory here
3812 * we don't know how much room there is in the output buffer of the
3813 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
3814 * operations) will never be smaller than the few bytes we need. */
3817 bulkcount
= sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
3819 if (write(fd
,bulkcount
,sdslen(bulkcount
)) != (signed)sdslen(bulkcount
))
3827 lseek(slave
->repldbfd
,slave
->repldboff
,SEEK_SET
);
3828 buflen
= read(slave
->repldbfd
,buf
,REDIS_IOBUF_LEN
);
3830 redisLog(REDIS_WARNING
,"Read error sending DB to slave: %s",
3831 (buflen
== 0) ? "premature EOF" : strerror(errno
));
3835 if ((nwritten
= write(fd
,buf
,buflen
)) == -1) {
3836 redisLog(REDIS_DEBUG
,"Write error sending DB to slave: %s",
3841 slave
->repldboff
+= nwritten
;
3842 if (slave
->repldboff
== slave
->repldbsize
) {
3843 close(slave
->repldbfd
);
3844 slave
->repldbfd
= -1;
3845 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
3846 slave
->replstate
= REDIS_REPL_ONLINE
;
3847 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
,
3848 sendReplyToClient
, slave
, NULL
) == AE_ERR
) {
3852 addReplySds(slave
,sdsempty());
3853 redisLog(REDIS_NOTICE
,"Synchronization with slave succeeded");
3857 static void updateSalvesWaitingBgsave(int bgsaveerr
) {
3859 int startbgsave
= 0;
3861 listRewind(server
.slaves
);
3862 while((ln
= listYield(server
.slaves
))) {
3863 redisClient
*slave
= ln
->value
;
3865 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) {
3867 slave
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
3868 } else if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) {
3871 if (bgsaveerr
!= REDIS_OK
) {
3873 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE child returned an error");
3876 if ((slave
->repldbfd
= open(server
.dbfilename
,O_RDONLY
)) == -1 ||
3877 fstat(slave
->repldbfd
,&buf
) == -1) {
3879 redisLog(REDIS_WARNING
,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno
));
3882 slave
->repldboff
= 0;
3883 slave
->repldbsize
= buf
.st_size
;
3884 slave
->replstate
= REDIS_REPL_SEND_BULK
;
3885 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
3886 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
, sendBulkToSlave
, slave
, NULL
) == AE_ERR
) {
3893 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
3894 listRewind(server
.slaves
);
3895 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE failed");
3896 while((ln
= listYield(server
.slaves
))) {
3897 redisClient
*slave
= ln
->value
;
3899 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
)
3906 static int syncWithMaster(void) {
3907 char buf
[1024], tmpfile
[256];
3909 int fd
= anetTcpConnect(NULL
,server
.masterhost
,server
.masterport
);
3913 redisLog(REDIS_WARNING
,"Unable to connect to MASTER: %s",
3917 /* Issue the SYNC command */
3918 if (syncWrite(fd
,"SYNC \r\n",7,5) == -1) {
3920 redisLog(REDIS_WARNING
,"I/O error writing to MASTER: %s",
3924 /* Read the bulk write count */
3925 if (syncReadLine(fd
,buf
,1024,3600) == -1) {
3927 redisLog(REDIS_WARNING
,"I/O error reading bulk count from MASTER: %s",
3931 dumpsize
= atoi(buf
+1);
3932 redisLog(REDIS_NOTICE
,"Receiving %d bytes data dump from MASTER",dumpsize
);
3933 /* Read the bulk write data on a temp file */
3934 snprintf(tmpfile
,256,"temp-%d.%ld.rdb",(int)time(NULL
),(long int)random());
3935 dfd
= open(tmpfile
,O_CREAT
|O_WRONLY
,0644);
3938 redisLog(REDIS_WARNING
,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno
));
3942 int nread
, nwritten
;
3944 nread
= read(fd
,buf
,(dumpsize
< 1024)?dumpsize
:1024);
3946 redisLog(REDIS_WARNING
,"I/O error trying to sync with MASTER: %s",
3952 nwritten
= write(dfd
,buf
,nread
);
3953 if (nwritten
== -1) {
3954 redisLog(REDIS_WARNING
,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno
));
3962 if (rename(tmpfile
,server
.dbfilename
) == -1) {
3963 redisLog(REDIS_WARNING
,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno
));
3969 if (rdbLoad(server
.dbfilename
) != REDIS_OK
) {
3970 redisLog(REDIS_WARNING
,"Failed trying to load the MASTER synchronization DB from disk");
3974 server
.master
= createClient(fd
);
3975 server
.master
->flags
|= REDIS_MASTER
;
3976 server
.replstate
= REDIS_REPL_CONNECTED
;
3980 static void slaveofCommand(redisClient
*c
) {
3981 if (!strcasecmp(c
->argv
[1]->ptr
,"no") &&
3982 !strcasecmp(c
->argv
[2]->ptr
,"one")) {
3983 if (server
.masterhost
) {
3984 sdsfree(server
.masterhost
);
3985 server
.masterhost
= NULL
;
3986 if (server
.master
) freeClient(server
.master
);
3987 server
.replstate
= REDIS_REPL_NONE
;
3988 redisLog(REDIS_NOTICE
,"MASTER MODE enabled (user request)");
3991 sdsfree(server
.masterhost
);
3992 server
.masterhost
= sdsdup(c
->argv
[1]->ptr
);
3993 server
.masterport
= atoi(c
->argv
[2]->ptr
);
3994 if (server
.master
) freeClient(server
.master
);
3995 server
.replstate
= REDIS_REPL_CONNECT
;
3996 redisLog(REDIS_NOTICE
,"SLAVE OF %s:%d enabled (user request)",
3997 server
.masterhost
, server
.masterport
);
3999 addReply(c
,shared
.ok
);
4002 /* ============================ Maxmemory directive ======================== */
4004 /* This function gets called when 'maxmemory' is set on the config file to limit
4005 * the max memory used by the server, and we are out of memory.
4006 * This function will try to, in order:
4008 * - Free objects from the free list
4009 * - Try to remove keys with an EXPIRE set
4011 * It is not possible to free enough memory to reach used-memory < maxmemory
4012 * the server will start refusing commands that will enlarge even more the
4015 static void freeMemoryIfNeeded(void) {
4016 while (server
.maxmemory
&& zmalloc_used_memory() > server
.maxmemory
) {
4017 if (listLength(server
.objfreelist
)) {
4020 listNode
*head
= listFirst(server
.objfreelist
);
4021 o
= listNodeValue(head
);
4022 listDelNode(server
.objfreelist
,head
);
4025 int j
, k
, freed
= 0;
4027 for (j
= 0; j
< server
.dbnum
; j
++) {
4029 robj
*minkey
= NULL
;
4030 struct dictEntry
*de
;
4032 if (dictSize(server
.db
[j
].expires
)) {
4034 /* From a sample of three keys drop the one nearest to
4035 * the natural expire */
4036 for (k
= 0; k
< 3; k
++) {
4039 de
= dictGetRandomKey(server
.db
[j
].expires
);
4040 t
= (time_t) dictGetEntryVal(de
);
4041 if (minttl
== -1 || t
< minttl
) {
4042 minkey
= dictGetEntryKey(de
);
4046 deleteKey(server
.db
+j
,minkey
);
4049 if (!freed
) return; /* nothing to free... */
4054 /* ================================= Debugging ============================== */
4056 static void debugCommand(redisClient
*c
) {
4057 if (!strcasecmp(c
->argv
[1]->ptr
,"segfault")) {
4059 } else if (!strcasecmp(c
->argv
[1]->ptr
,"object") && c
->argc
== 3) {
4060 dictEntry
*de
= dictFind(c
->db
->dict
,c
->argv
[2]);
4064 addReply(c
,shared
.nokeyerr
);
4067 key
= dictGetEntryKey(de
);
4068 val
= dictGetEntryVal(de
);
4069 addReplySds(c
,sdscatprintf(sdsempty(),
4070 "+Key at:%p refcount:%d, value at:%p refcount:%d\r\n",
4071 key
, key
->refcount
, val
, val
->refcount
));
4073 addReplySds(c
,sdsnew(
4074 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>]\r\n"));
4078 /* =================================== Main! ================================ */
4081 int linuxOvercommitMemoryValue(void) {
4082 FILE *fp
= fopen("/proc/sys/vm/overcommit_memory","r");
4086 if (fgets(buf
,64,fp
) == NULL
) {
4095 void linuxOvercommitMemoryWarning(void) {
4096 if (linuxOvercommitMemoryValue() == 0) {
4097 redisLog(REDIS_WARNING
,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'echo 1 > /proc/sys/vm/overcommit_memory' in your init scripts.");
4100 #endif /* __linux__ */
4102 static void daemonize(void) {
4106 if (fork() != 0) exit(0); /* parent exits */
4107 setsid(); /* create a new session */
4109 /* Every output goes to /dev/null. If Redis is daemonized but
4110 * the 'logfile' is set to 'stdout' in the configuration file
4111 * it will not log at all. */
4112 if ((fd
= open("/dev/null", O_RDWR
, 0)) != -1) {
4113 dup2(fd
, STDIN_FILENO
);
4114 dup2(fd
, STDOUT_FILENO
);
4115 dup2(fd
, STDERR_FILENO
);
4116 if (fd
> STDERR_FILENO
) close(fd
);
4118 /* Try to write the pid file */
4119 fp
= fopen(server
.pidfile
,"w");
4121 fprintf(fp
,"%d\n",getpid());
4126 int main(int argc
, char **argv
) {
4128 linuxOvercommitMemoryWarning();
4133 ResetServerSaveParams();
4134 loadServerConfig(argv
[1]);
4135 } else if (argc
> 2) {
4136 fprintf(stderr
,"Usage: ./redis-server [/path/to/redis.conf]\n");
4139 redisLog(REDIS_WARNING
,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
4142 if (server
.daemonize
) daemonize();
4143 redisLog(REDIS_NOTICE
,"Server started, Redis version " REDIS_VERSION
);
4144 if (rdbLoad(server
.dbfilename
) == REDIS_OK
)
4145 redisLog(REDIS_NOTICE
,"DB loaded from disk");
4146 if (aeCreateFileEvent(server
.el
, server
.fd
, AE_READABLE
,
4147 acceptHandler
, NULL
, NULL
) == AE_ERR
) oom("creating file event");
4148 redisLog(REDIS_NOTICE
,"The server is now ready to accept connections on port %d", server
.port
);
4150 aeDeleteEventLoop(server
.el
);