2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
30 #define REDIS_VERSION "0.101"
46 #include <arpa/inet.h>
50 #include <sys/resource.h>
54 #include "ae.h" /* Event driven programming library */
55 #include "sds.h" /* Dynamic safe strings */
56 #include "anet.h" /* Networking the easy way */
57 #include "dict.h" /* Hash tables */
58 #include "adlist.h" /* Linked lists */
59 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
60 #include "lzf.h" /* LZF compression library */
61 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
69 /* Static server configuration */
70 #define REDIS_SERVERPORT 6379 /* TCP port */
71 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
72 #define REDIS_IOBUF_LEN 1024
73 #define REDIS_LOADBUF_LEN 1024
74 #define REDIS_STATIC_ARGS 4
75 #define REDIS_DEFAULT_DBNUM 16
76 #define REDIS_CONFIGLINE_MAX 1024
77 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
78 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
79 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
80 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
82 /* Hash table parameters */
83 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
84 #define REDIS_HT_MINSLOTS 16384 /* Never resize the HT under this */
87 #define REDIS_CMD_BULK 1 /* Bulk write command */
88 #define REDIS_CMD_INLINE 2 /* Inline command */
89 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
90 this flags will return an error when the 'maxmemory' option is set in the
91 config file and the server is using more than maxmemory bytes of memory.
92 In short this commands are denied on low memory conditions. */
93 #define REDIS_CMD_DENYOOM 4
96 #define REDIS_STRING 0
101 /* Object types only used for dumping to disk */
102 #define REDIS_EXPIRETIME 253
103 #define REDIS_SELECTDB 254
104 #define REDIS_EOF 255
106 /* Defines related to the dump file format. To store 32 bits lengths for short
107 * keys requires a lot of space, so we check the most significant 2 bits of
108 * the first byte to interpreter the length:
110 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
111 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
112 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
113 * 11|000000 this means: specially encoded object will follow. The six bits
114 * number specify the kind of object that follows.
115 * See the REDIS_RDB_ENC_* defines.
117 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
118 * values, will fit inside. */
119 #define REDIS_RDB_6BITLEN 0
120 #define REDIS_RDB_14BITLEN 1
121 #define REDIS_RDB_32BITLEN 2
122 #define REDIS_RDB_ENCVAL 3
123 #define REDIS_RDB_LENERR UINT_MAX
125 /* When a length of a string object stored on disk has the first two bits
126 * set, the remaining two bits specify a special encoding for the object
127 * accordingly to the following defines: */
128 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
129 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
130 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
131 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
134 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
135 #define REDIS_SLAVE 2 /* This client is a slave server */
136 #define REDIS_MASTER 4 /* This client is a master server */
137 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
139 /* Slave replication state - slave side */
140 #define REDIS_REPL_NONE 0 /* No active replication */
141 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
142 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
144 /* Slave replication state - from the point of view of master
145 * Note that in SEND_BULK and ONLINE state the slave receives new updates
146 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
147 * to start the next background saving in order to send updates to it. */
148 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
149 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
150 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
151 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
153 /* List related stuff */
157 /* Sort operations */
158 #define REDIS_SORT_GET 0
159 #define REDIS_SORT_DEL 1
160 #define REDIS_SORT_INCR 2
161 #define REDIS_SORT_DECR 3
162 #define REDIS_SORT_ASC 4
163 #define REDIS_SORT_DESC 5
164 #define REDIS_SORTKEY_MAX 1024
167 #define REDIS_DEBUG 0
168 #define REDIS_NOTICE 1
169 #define REDIS_WARNING 2
171 /* Anti-warning macro... */
172 #define REDIS_NOTUSED(V) ((void) V)
174 /*================================= Data types ============================== */
176 /* A redis object, that is a type able to hold a string / list / set */
177 typedef struct redisObject
{
183 typedef struct redisDb
{
189 /* With multiplexing we need to take per-clinet state.
190 * Clients are taken in a liked list. */
191 typedef struct redisClient
{
198 int bulklen
; /* bulk read len. -1 if not in bulk read mode */
201 time_t lastinteraction
; /* time of the last interaction, used for timeout */
202 int flags
; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
203 int slaveseldb
; /* slave selected db, if this client is a slave */
204 int authenticated
; /* when requirepass is non-NULL */
205 int replstate
; /* replication state if this is a slave */
206 int repldbfd
; /* replication DB file descriptor */
207 long repldboff
; /* replication DB file offset */
208 off_t repldbsize
; /* replication DB file size */
216 /* Global server state structure */
222 unsigned int sharingpoolsize
;
223 long long dirty
; /* changes to DB from the last save */
225 list
*slaves
, *monitors
;
226 char neterr
[ANET_ERR_LEN
];
228 int cronloops
; /* number of times the cron function run */
229 list
*objfreelist
; /* A list of freed objects to avoid malloc() */
230 time_t lastsave
; /* Unix time of last save succeeede */
231 size_t usedmemory
; /* Used memory in megabytes */
232 /* Fields used only for stats */
233 time_t stat_starttime
; /* server start time */
234 long long stat_numcommands
; /* number of processed commands */
235 long long stat_numconnections
; /* number of connections received */
243 int bgsaveinprogress
;
244 pid_t bgsavechildpid
;
245 struct saveparam
*saveparams
;
252 /* Replication related */
256 redisClient
*master
; /* client that is master for this slave */
258 unsigned int maxclients
;
259 unsigned int maxmemory
;
260 /* Sort parameters - qsort_r() is only available under BSD so we
261 * have to take this state global, in order to pass it to sortCompare() */
267 typedef void redisCommandProc(redisClient
*c
);
268 struct redisCommand
{
270 redisCommandProc
*proc
;
275 typedef struct _redisSortObject
{
283 typedef struct _redisSortOperation
{
286 } redisSortOperation
;
288 struct sharedObjectsStruct
{
289 robj
*crlf
, *ok
, *err
, *emptybulk
, *czero
, *cone
, *pong
, *space
,
290 *colon
, *nullbulk
, *nullmultibulk
,
291 *emptymultibulk
, *wrongtypeerr
, *nokeyerr
, *syntaxerr
, *sameobjecterr
,
292 *outofrangeerr
, *plus
,
293 *select0
, *select1
, *select2
, *select3
, *select4
,
294 *select5
, *select6
, *select7
, *select8
, *select9
;
297 /*================================ Prototypes =============================== */
299 static void freeStringObject(robj
*o
);
300 static void freeListObject(robj
*o
);
301 static void freeSetObject(robj
*o
);
302 static void decrRefCount(void *o
);
303 static robj
*createObject(int type
, void *ptr
);
304 static void freeClient(redisClient
*c
);
305 static int rdbLoad(char *filename
);
306 static void addReply(redisClient
*c
, robj
*obj
);
307 static void addReplySds(redisClient
*c
, sds s
);
308 static void incrRefCount(robj
*o
);
309 static int rdbSaveBackground(char *filename
);
310 static robj
*createStringObject(char *ptr
, size_t len
);
311 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
);
312 static int syncWithMaster(void);
313 static robj
*tryObjectSharing(robj
*o
);
314 static int removeExpire(redisDb
*db
, robj
*key
);
315 static int expireIfNeeded(redisDb
*db
, robj
*key
);
316 static int deleteIfVolatile(redisDb
*db
, robj
*key
);
317 static int deleteKey(redisDb
*db
, robj
*key
);
318 static time_t getExpire(redisDb
*db
, robj
*key
);
319 static int setExpire(redisDb
*db
, robj
*key
, time_t when
);
320 static void updateSalvesWaitingBgsave(int bgsaveerr
);
321 static void freeMemoryIfNeeded(void);
323 static void authCommand(redisClient
*c
);
324 static void pingCommand(redisClient
*c
);
325 static void echoCommand(redisClient
*c
);
326 static void setCommand(redisClient
*c
);
327 static void setnxCommand(redisClient
*c
);
328 static void getCommand(redisClient
*c
);
329 static void delCommand(redisClient
*c
);
330 static void existsCommand(redisClient
*c
);
331 static void incrCommand(redisClient
*c
);
332 static void decrCommand(redisClient
*c
);
333 static void incrbyCommand(redisClient
*c
);
334 static void decrbyCommand(redisClient
*c
);
335 static void selectCommand(redisClient
*c
);
336 static void randomkeyCommand(redisClient
*c
);
337 static void keysCommand(redisClient
*c
);
338 static void dbsizeCommand(redisClient
*c
);
339 static void lastsaveCommand(redisClient
*c
);
340 static void saveCommand(redisClient
*c
);
341 static void bgsaveCommand(redisClient
*c
);
342 static void shutdownCommand(redisClient
*c
);
343 static void moveCommand(redisClient
*c
);
344 static void renameCommand(redisClient
*c
);
345 static void renamenxCommand(redisClient
*c
);
346 static void lpushCommand(redisClient
*c
);
347 static void rpushCommand(redisClient
*c
);
348 static void lpopCommand(redisClient
*c
);
349 static void rpopCommand(redisClient
*c
);
350 static void llenCommand(redisClient
*c
);
351 static void lindexCommand(redisClient
*c
);
352 static void lrangeCommand(redisClient
*c
);
353 static void ltrimCommand(redisClient
*c
);
354 static void typeCommand(redisClient
*c
);
355 static void lsetCommand(redisClient
*c
);
356 static void saddCommand(redisClient
*c
);
357 static void sremCommand(redisClient
*c
);
358 static void smoveCommand(redisClient
*c
);
359 static void sismemberCommand(redisClient
*c
);
360 static void scardCommand(redisClient
*c
);
361 static void sinterCommand(redisClient
*c
);
362 static void sinterstoreCommand(redisClient
*c
);
363 static void sunionCommand(redisClient
*c
);
364 static void sunionstoreCommand(redisClient
*c
);
365 static void sdiffCommand(redisClient
*c
);
366 static void sdiffstoreCommand(redisClient
*c
);
367 static void syncCommand(redisClient
*c
);
368 static void flushdbCommand(redisClient
*c
);
369 static void flushallCommand(redisClient
*c
);
370 static void sortCommand(redisClient
*c
);
371 static void lremCommand(redisClient
*c
);
372 static void infoCommand(redisClient
*c
);
373 static void mgetCommand(redisClient
*c
);
374 static void monitorCommand(redisClient
*c
);
375 static void expireCommand(redisClient
*c
);
376 static void getSetCommand(redisClient
*c
);
377 static void ttlCommand(redisClient
*c
);
378 static void slaveofCommand(redisClient
*c
);
379 static void debugCommand(redisClient
*c
);
381 /*================================= Globals ================================= */
384 static struct redisServer server
; /* server global state */
385 static struct redisCommand cmdTable
[] = {
386 {"get",getCommand
,2,REDIS_CMD_INLINE
},
387 {"set",setCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
388 {"setnx",setnxCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
389 {"del",delCommand
,-2,REDIS_CMD_INLINE
},
390 {"exists",existsCommand
,2,REDIS_CMD_INLINE
},
391 {"incr",incrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
392 {"decr",decrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
393 {"mget",mgetCommand
,-2,REDIS_CMD_INLINE
},
394 {"rpush",rpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
395 {"lpush",lpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
396 {"rpop",rpopCommand
,2,REDIS_CMD_INLINE
},
397 {"lpop",lpopCommand
,2,REDIS_CMD_INLINE
},
398 {"llen",llenCommand
,2,REDIS_CMD_INLINE
},
399 {"lindex",lindexCommand
,3,REDIS_CMD_INLINE
},
400 {"lset",lsetCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
401 {"lrange",lrangeCommand
,4,REDIS_CMD_INLINE
},
402 {"ltrim",ltrimCommand
,4,REDIS_CMD_INLINE
},
403 {"lrem",lremCommand
,4,REDIS_CMD_BULK
},
404 {"sadd",saddCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
405 {"srem",sremCommand
,3,REDIS_CMD_BULK
},
406 {"smove",smoveCommand
,4,REDIS_CMD_BULK
},
407 {"sismember",sismemberCommand
,3,REDIS_CMD_BULK
},
408 {"scard",scardCommand
,2,REDIS_CMD_INLINE
},
409 {"sinter",sinterCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
410 {"sinterstore",sinterstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
411 {"sunion",sunionCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
412 {"sunionstore",sunionstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
413 {"sdiff",sdiffCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
414 {"sdiffstore",sdiffstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
415 {"smembers",sinterCommand
,2,REDIS_CMD_INLINE
},
416 {"incrby",incrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
417 {"decrby",decrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
418 {"getset",getSetCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
419 {"randomkey",randomkeyCommand
,1,REDIS_CMD_INLINE
},
420 {"select",selectCommand
,2,REDIS_CMD_INLINE
},
421 {"move",moveCommand
,3,REDIS_CMD_INLINE
},
422 {"rename",renameCommand
,3,REDIS_CMD_INLINE
},
423 {"renamenx",renamenxCommand
,3,REDIS_CMD_INLINE
},
424 {"expire",expireCommand
,3,REDIS_CMD_INLINE
},
425 {"keys",keysCommand
,2,REDIS_CMD_INLINE
},
426 {"dbsize",dbsizeCommand
,1,REDIS_CMD_INLINE
},
427 {"auth",authCommand
,2,REDIS_CMD_INLINE
},
428 {"ping",pingCommand
,1,REDIS_CMD_INLINE
},
429 {"echo",echoCommand
,2,REDIS_CMD_BULK
},
430 {"save",saveCommand
,1,REDIS_CMD_INLINE
},
431 {"bgsave",bgsaveCommand
,1,REDIS_CMD_INLINE
},
432 {"shutdown",shutdownCommand
,1,REDIS_CMD_INLINE
},
433 {"lastsave",lastsaveCommand
,1,REDIS_CMD_INLINE
},
434 {"type",typeCommand
,2,REDIS_CMD_INLINE
},
435 {"sync",syncCommand
,1,REDIS_CMD_INLINE
},
436 {"flushdb",flushdbCommand
,1,REDIS_CMD_INLINE
},
437 {"flushall",flushallCommand
,1,REDIS_CMD_INLINE
},
438 {"sort",sortCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
439 {"info",infoCommand
,1,REDIS_CMD_INLINE
},
440 {"monitor",monitorCommand
,1,REDIS_CMD_INLINE
},
441 {"ttl",ttlCommand
,2,REDIS_CMD_INLINE
},
442 {"slaveof",slaveofCommand
,3,REDIS_CMD_INLINE
},
443 {"debug",debugCommand
,-2,REDIS_CMD_INLINE
},
447 /*============================ Utility functions ============================ */
449 /* Glob-style pattern matching. */
450 int stringmatchlen(const char *pattern
, int patternLen
,
451 const char *string
, int stringLen
, int nocase
)
456 while (pattern
[1] == '*') {
461 return 1; /* match */
463 if (stringmatchlen(pattern
+1, patternLen
-1,
464 string
, stringLen
, nocase
))
465 return 1; /* match */
469 return 0; /* no match */
473 return 0; /* no match */
483 not = pattern
[0] == '^';
490 if (pattern
[0] == '\\') {
493 if (pattern
[0] == string
[0])
495 } else if (pattern
[0] == ']') {
497 } else if (patternLen
== 0) {
501 } else if (pattern
[1] == '-' && patternLen
>= 3) {
502 int start
= pattern
[0];
503 int end
= pattern
[2];
511 start
= tolower(start
);
517 if (c
>= start
&& c
<= end
)
521 if (pattern
[0] == string
[0])
524 if (tolower((int)pattern
[0]) == tolower((int)string
[0]))
534 return 0; /* no match */
540 if (patternLen
>= 2) {
547 if (pattern
[0] != string
[0])
548 return 0; /* no match */
550 if (tolower((int)pattern
[0]) != tolower((int)string
[0]))
551 return 0; /* no match */
559 if (stringLen
== 0) {
560 while(*pattern
== '*') {
567 if (patternLen
== 0 && stringLen
== 0)
572 void redisLog(int level
, const char *fmt
, ...)
577 fp
= (server
.logfile
== NULL
) ? stdout
: fopen(server
.logfile
,"a");
581 if (level
>= server
.verbosity
) {
587 strftime(buf
,64,"%d %b %H:%M:%S",gmtime(&now
));
588 fprintf(fp
,"%s %c ",buf
,c
[level
]);
589 vfprintf(fp
, fmt
, ap
);
595 if (server
.logfile
) fclose(fp
);
598 /*====================== Hash table type implementation ==================== */
600 /* This is an hash table type that uses the SDS dynamic strings libary as
601 * keys and radis objects as values (objects can hold SDS strings,
604 static int sdsDictKeyCompare(void *privdata
, const void *key1
,
608 DICT_NOTUSED(privdata
);
610 l1
= sdslen((sds
)key1
);
611 l2
= sdslen((sds
)key2
);
612 if (l1
!= l2
) return 0;
613 return memcmp(key1
, key2
, l1
) == 0;
616 static void dictRedisObjectDestructor(void *privdata
, void *val
)
618 DICT_NOTUSED(privdata
);
623 static int dictSdsKeyCompare(void *privdata
, const void *key1
,
626 const robj
*o1
= key1
, *o2
= key2
;
627 return sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
630 static unsigned int dictSdsHash(const void *key
) {
632 return dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
635 static dictType setDictType
= {
636 dictSdsHash
, /* hash function */
639 dictSdsKeyCompare
, /* key compare */
640 dictRedisObjectDestructor
, /* key destructor */
641 NULL
/* val destructor */
644 static dictType hashDictType
= {
645 dictSdsHash
, /* hash function */
648 dictSdsKeyCompare
, /* key compare */
649 dictRedisObjectDestructor
, /* key destructor */
650 dictRedisObjectDestructor
/* val destructor */
653 /* ========================= Random utility functions ======================= */
655 /* Redis generally does not try to recover from out of memory conditions
656 * when allocating objects or strings, it is not clear if it will be possible
657 * to report this condition to the client since the networking layer itself
658 * is based on heap allocation for send buffers, so we simply abort.
659 * At least the code will be simpler to read... */
660 static void oom(const char *msg
) {
661 fprintf(stderr
, "%s: Out of memory\n",msg
);
667 /* ====================== Redis server networking stuff ===================== */
668 void closeTimedoutClients(void) {
671 time_t now
= time(NULL
);
673 listRewind(server
.clients
);
674 while ((ln
= listYield(server
.clients
)) != NULL
) {
675 c
= listNodeValue(ln
);
676 if (!(c
->flags
& REDIS_SLAVE
) && /* no timeout for slaves */
677 !(c
->flags
& REDIS_MASTER
) && /* no timeout for masters */
678 (now
- c
->lastinteraction
> server
.maxidletime
)) {
679 redisLog(REDIS_DEBUG
,"Closing idle client");
685 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
686 * we resize the hash table to save memory */
687 void tryResizeHashTables(void) {
690 for (j
= 0; j
< server
.dbnum
; j
++) {
691 long long size
, used
;
693 size
= dictSlots(server
.db
[j
].dict
);
694 used
= dictSize(server
.db
[j
].dict
);
695 if (size
&& used
&& size
> REDIS_HT_MINSLOTS
&&
696 (used
*100/size
< REDIS_HT_MINFILL
)) {
697 redisLog(REDIS_NOTICE
,"The hash table %d is too sparse, resize it...",j
);
698 dictResize(server
.db
[j
].dict
);
699 redisLog(REDIS_NOTICE
,"Hash table %d resized.",j
);
704 int serverCron(struct aeEventLoop
*eventLoop
, long long id
, void *clientData
) {
705 int j
, loops
= server
.cronloops
++;
706 REDIS_NOTUSED(eventLoop
);
708 REDIS_NOTUSED(clientData
);
710 /* Update the global state with the amount of used memory */
711 server
.usedmemory
= zmalloc_used_memory();
713 /* Show some info about non-empty databases */
714 for (j
= 0; j
< server
.dbnum
; j
++) {
715 long long size
, used
, vkeys
;
717 size
= dictSlots(server
.db
[j
].dict
);
718 used
= dictSize(server
.db
[j
].dict
);
719 vkeys
= dictSize(server
.db
[j
].expires
);
720 if (!(loops
% 5) && used
> 0) {
721 redisLog(REDIS_DEBUG
,"DB %d: %d keys (%d volatile) in %d slots HT.",j
,used
,vkeys
,size
);
722 /* dictPrintStats(server.dict); */
726 /* We don't want to resize the hash tables while a bacground saving
727 * is in progress: the saving child is created using fork() that is
728 * implemented with a copy-on-write semantic in most modern systems, so
729 * if we resize the HT while there is the saving child at work actually
730 * a lot of memory movements in the parent will cause a lot of pages
732 if (!server
.bgsaveinprogress
) tryResizeHashTables();
734 /* Show information about connected clients */
736 redisLog(REDIS_DEBUG
,"%d clients connected (%d slaves), %zu bytes in use",
737 listLength(server
.clients
)-listLength(server
.slaves
),
738 listLength(server
.slaves
),
740 dictSize(server
.sharingpool
));
743 /* Close connections of timedout clients */
744 if (server
.maxidletime
&& !(loops
% 10))
745 closeTimedoutClients();
747 /* Check if a background saving in progress terminated */
748 if (server
.bgsaveinprogress
) {
750 /* XXX: TODO handle the case of the saving child killed */
751 if (wait4(-1,&statloc
,WNOHANG
,NULL
)) {
752 int exitcode
= WEXITSTATUS(statloc
);
753 int bysignal
= WIFSIGNALED(statloc
);
755 if (!bysignal
&& exitcode
== 0) {
756 redisLog(REDIS_NOTICE
,
757 "Background saving terminated with success");
759 server
.lastsave
= time(NULL
);
760 } else if (!bysignal
&& exitcode
!= 0) {
761 redisLog(REDIS_WARNING
, "Background saving error");
763 redisLog(REDIS_WARNING
,
764 "Background saving terminated by signal");
766 server
.bgsaveinprogress
= 0;
767 server
.bgsavechildpid
= -1;
768 updateSalvesWaitingBgsave(exitcode
== 0 ? REDIS_OK
: REDIS_ERR
);
771 /* If there is not a background saving in progress check if
772 * we have to save now */
773 time_t now
= time(NULL
);
774 for (j
= 0; j
< server
.saveparamslen
; j
++) {
775 struct saveparam
*sp
= server
.saveparams
+j
;
777 if (server
.dirty
>= sp
->changes
&&
778 now
-server
.lastsave
> sp
->seconds
) {
779 redisLog(REDIS_NOTICE
,"%d changes in %d seconds. Saving...",
780 sp
->changes
, sp
->seconds
);
781 rdbSaveBackground(server
.dbfilename
);
787 /* Try to expire a few timed out keys */
788 for (j
= 0; j
< server
.dbnum
; j
++) {
789 redisDb
*db
= server
.db
+j
;
790 int num
= dictSize(db
->expires
);
793 time_t now
= time(NULL
);
795 if (num
> REDIS_EXPIRELOOKUPS_PER_CRON
)
796 num
= REDIS_EXPIRELOOKUPS_PER_CRON
;
801 if ((de
= dictGetRandomKey(db
->expires
)) == NULL
) break;
802 t
= (time_t) dictGetEntryVal(de
);
804 deleteKey(db
,dictGetEntryKey(de
));
810 /* Check if we should connect to a MASTER */
811 if (server
.replstate
== REDIS_REPL_CONNECT
) {
812 redisLog(REDIS_NOTICE
,"Connecting to MASTER...");
813 if (syncWithMaster() == REDIS_OK
) {
814 redisLog(REDIS_NOTICE
,"MASTER <-> SLAVE sync succeeded");
820 static void createSharedObjects(void) {
821 shared
.crlf
= createObject(REDIS_STRING
,sdsnew("\r\n"));
822 shared
.ok
= createObject(REDIS_STRING
,sdsnew("+OK\r\n"));
823 shared
.err
= createObject(REDIS_STRING
,sdsnew("-ERR\r\n"));
824 shared
.emptybulk
= createObject(REDIS_STRING
,sdsnew("$0\r\n\r\n"));
825 shared
.czero
= createObject(REDIS_STRING
,sdsnew(":0\r\n"));
826 shared
.cone
= createObject(REDIS_STRING
,sdsnew(":1\r\n"));
827 shared
.nullbulk
= createObject(REDIS_STRING
,sdsnew("$-1\r\n"));
828 shared
.nullmultibulk
= createObject(REDIS_STRING
,sdsnew("*-1\r\n"));
829 shared
.emptymultibulk
= createObject(REDIS_STRING
,sdsnew("*0\r\n"));
831 shared
.pong
= createObject(REDIS_STRING
,sdsnew("+PONG\r\n"));
832 shared
.wrongtypeerr
= createObject(REDIS_STRING
,sdsnew(
833 "-ERR Operation against a key holding the wrong kind of value\r\n"));
834 shared
.nokeyerr
= createObject(REDIS_STRING
,sdsnew(
835 "-ERR no such key\r\n"));
836 shared
.syntaxerr
= createObject(REDIS_STRING
,sdsnew(
837 "-ERR syntax error\r\n"));
838 shared
.sameobjecterr
= createObject(REDIS_STRING
,sdsnew(
839 "-ERR source and destination objects are the same\r\n"));
840 shared
.outofrangeerr
= createObject(REDIS_STRING
,sdsnew(
841 "-ERR index out of range\r\n"));
842 shared
.space
= createObject(REDIS_STRING
,sdsnew(" "));
843 shared
.colon
= createObject(REDIS_STRING
,sdsnew(":"));
844 shared
.plus
= createObject(REDIS_STRING
,sdsnew("+"));
845 shared
.select0
= createStringObject("select 0\r\n",10);
846 shared
.select1
= createStringObject("select 1\r\n",10);
847 shared
.select2
= createStringObject("select 2\r\n",10);
848 shared
.select3
= createStringObject("select 3\r\n",10);
849 shared
.select4
= createStringObject("select 4\r\n",10);
850 shared
.select5
= createStringObject("select 5\r\n",10);
851 shared
.select6
= createStringObject("select 6\r\n",10);
852 shared
.select7
= createStringObject("select 7\r\n",10);
853 shared
.select8
= createStringObject("select 8\r\n",10);
854 shared
.select9
= createStringObject("select 9\r\n",10);
857 static void appendServerSaveParams(time_t seconds
, int changes
) {
858 server
.saveparams
= zrealloc(server
.saveparams
,sizeof(struct saveparam
)*(server
.saveparamslen
+1));
859 if (server
.saveparams
== NULL
) oom("appendServerSaveParams");
860 server
.saveparams
[server
.saveparamslen
].seconds
= seconds
;
861 server
.saveparams
[server
.saveparamslen
].changes
= changes
;
862 server
.saveparamslen
++;
865 static void ResetServerSaveParams() {
866 zfree(server
.saveparams
);
867 server
.saveparams
= NULL
;
868 server
.saveparamslen
= 0;
871 static void initServerConfig() {
872 server
.dbnum
= REDIS_DEFAULT_DBNUM
;
873 server
.port
= REDIS_SERVERPORT
;
874 server
.verbosity
= REDIS_DEBUG
;
875 server
.maxidletime
= REDIS_MAXIDLETIME
;
876 server
.saveparams
= NULL
;
877 server
.logfile
= NULL
; /* NULL = log on standard output */
878 server
.bindaddr
= NULL
;
879 server
.glueoutputbuf
= 1;
880 server
.daemonize
= 0;
881 server
.pidfile
= "/var/run/redis.pid";
882 server
.dbfilename
= "dump.rdb";
883 server
.requirepass
= NULL
;
884 server
.shareobjects
= 0;
885 server
.maxclients
= 0;
886 server
.maxmemory
= 0;
887 ResetServerSaveParams();
889 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
890 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
891 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
892 /* Replication related */
894 server
.masterhost
= NULL
;
895 server
.masterport
= 6379;
896 server
.master
= NULL
;
897 server
.replstate
= REDIS_REPL_NONE
;
900 static void initServer() {
903 signal(SIGHUP
, SIG_IGN
);
904 signal(SIGPIPE
, SIG_IGN
);
906 server
.clients
= listCreate();
907 server
.slaves
= listCreate();
908 server
.monitors
= listCreate();
909 server
.objfreelist
= listCreate();
910 createSharedObjects();
911 server
.el
= aeCreateEventLoop();
912 server
.db
= zmalloc(sizeof(redisDb
)*server
.dbnum
);
913 server
.sharingpool
= dictCreate(&setDictType
,NULL
);
914 server
.sharingpoolsize
= 1024;
915 if (!server
.db
|| !server
.clients
|| !server
.slaves
|| !server
.monitors
|| !server
.el
|| !server
.objfreelist
)
916 oom("server initialization"); /* Fatal OOM */
917 server
.fd
= anetTcpServer(server
.neterr
, server
.port
, server
.bindaddr
);
918 if (server
.fd
== -1) {
919 redisLog(REDIS_WARNING
, "Opening TCP port: %s", server
.neterr
);
922 for (j
= 0; j
< server
.dbnum
; j
++) {
923 server
.db
[j
].dict
= dictCreate(&hashDictType
,NULL
);
924 server
.db
[j
].expires
= dictCreate(&setDictType
,NULL
);
927 server
.cronloops
= 0;
928 server
.bgsaveinprogress
= 0;
929 server
.bgsavechildpid
= -1;
930 server
.lastsave
= time(NULL
);
932 server
.usedmemory
= 0;
933 server
.stat_numcommands
= 0;
934 server
.stat_numconnections
= 0;
935 server
.stat_starttime
= time(NULL
);
936 aeCreateTimeEvent(server
.el
, 1000, serverCron
, NULL
, NULL
);
939 /* Empty the whole database */
940 static long long emptyDb() {
942 long long removed
= 0;
944 for (j
= 0; j
< server
.dbnum
; j
++) {
945 removed
+= dictSize(server
.db
[j
].dict
);
946 dictEmpty(server
.db
[j
].dict
);
947 dictEmpty(server
.db
[j
].expires
);
952 static int yesnotoi(char *s
) {
953 if (!strcasecmp(s
,"yes")) return 1;
954 else if (!strcasecmp(s
,"no")) return 0;
958 /* I agree, this is a very rudimental way to load a configuration...
959 will improve later if the config gets more complex */
960 static void loadServerConfig(char *filename
) {
961 FILE *fp
= fopen(filename
,"r");
962 char buf
[REDIS_CONFIGLINE_MAX
+1], *err
= NULL
;
967 redisLog(REDIS_WARNING
,"Fatal error, can't open config file");
970 while(fgets(buf
,REDIS_CONFIGLINE_MAX
+1,fp
) != NULL
) {
976 line
= sdstrim(line
," \t\r\n");
978 /* Skip comments and blank lines*/
979 if (line
[0] == '#' || line
[0] == '\0') {
984 /* Split into arguments */
985 argv
= sdssplitlen(line
,sdslen(line
)," ",1,&argc
);
988 /* Execute config directives */
989 if (!strcasecmp(argv
[0],"timeout") && argc
== 2) {
990 server
.maxidletime
= atoi(argv
[1]);
991 if (server
.maxidletime
< 0) {
992 err
= "Invalid timeout value"; goto loaderr
;
994 } else if (!strcasecmp(argv
[0],"port") && argc
== 2) {
995 server
.port
= atoi(argv
[1]);
996 if (server
.port
< 1 || server
.port
> 65535) {
997 err
= "Invalid port"; goto loaderr
;
999 } else if (!strcasecmp(argv
[0],"bind") && argc
== 2) {
1000 server
.bindaddr
= zstrdup(argv
[1]);
1001 } else if (!strcasecmp(argv
[0],"save") && argc
== 3) {
1002 int seconds
= atoi(argv
[1]);
1003 int changes
= atoi(argv
[2]);
1004 if (seconds
< 1 || changes
< 0) {
1005 err
= "Invalid save parameters"; goto loaderr
;
1007 appendServerSaveParams(seconds
,changes
);
1008 } else if (!strcasecmp(argv
[0],"dir") && argc
== 2) {
1009 if (chdir(argv
[1]) == -1) {
1010 redisLog(REDIS_WARNING
,"Can't chdir to '%s': %s",
1011 argv
[1], strerror(errno
));
1014 } else if (!strcasecmp(argv
[0],"loglevel") && argc
== 2) {
1015 if (!strcasecmp(argv
[1],"debug")) server
.verbosity
= REDIS_DEBUG
;
1016 else if (!strcasecmp(argv
[1],"notice")) server
.verbosity
= REDIS_NOTICE
;
1017 else if (!strcasecmp(argv
[1],"warning")) server
.verbosity
= REDIS_WARNING
;
1019 err
= "Invalid log level. Must be one of debug, notice, warning";
1022 } else if (!strcasecmp(argv
[0],"logfile") && argc
== 2) {
1025 server
.logfile
= zstrdup(argv
[1]);
1026 if (!strcasecmp(server
.logfile
,"stdout")) {
1027 zfree(server
.logfile
);
1028 server
.logfile
= NULL
;
1030 if (server
.logfile
) {
1031 /* Test if we are able to open the file. The server will not
1032 * be able to abort just for this problem later... */
1033 fp
= fopen(server
.logfile
,"a");
1035 err
= sdscatprintf(sdsempty(),
1036 "Can't open the log file: %s", strerror(errno
));
1041 } else if (!strcasecmp(argv
[0],"databases") && argc
== 2) {
1042 server
.dbnum
= atoi(argv
[1]);
1043 if (server
.dbnum
< 1) {
1044 err
= "Invalid number of databases"; goto loaderr
;
1046 } else if (!strcasecmp(argv
[0],"maxclients") && argc
== 2) {
1047 server
.maxclients
= atoi(argv
[1]);
1048 } else if (!strcasecmp(argv
[0],"maxmemory") && argc
== 2) {
1049 server
.maxmemory
= atoi(argv
[1]);
1050 } else if (!strcasecmp(argv
[0],"slaveof") && argc
== 3) {
1051 server
.masterhost
= sdsnew(argv
[1]);
1052 server
.masterport
= atoi(argv
[2]);
1053 server
.replstate
= REDIS_REPL_CONNECT
;
1054 } else if (!strcasecmp(argv
[0],"glueoutputbuf") && argc
== 2) {
1055 if ((server
.glueoutputbuf
= yesnotoi(argv
[1])) == -1) {
1056 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1058 } else if (!strcasecmp(argv
[0],"shareobjects") && argc
== 2) {
1059 if ((server
.shareobjects
= yesnotoi(argv
[1])) == -1) {
1060 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1062 } else if (!strcasecmp(argv
[0],"shareobjectspoolsize") && argc
== 2) {
1063 server
.sharingpoolsize
= atoi(argv
[1]);
1064 if (server
.sharingpoolsize
< 1) {
1065 err
= "invalid object sharing pool size"; goto loaderr
;
1067 } else if (!strcasecmp(argv
[0],"daemonize") && argc
== 2) {
1068 if ((server
.daemonize
= yesnotoi(argv
[1])) == -1) {
1069 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1071 } else if (!strcasecmp(argv
[0],"requirepass") && argc
== 2) {
1072 server
.requirepass
= zstrdup(argv
[1]);
1073 } else if (!strcasecmp(argv
[0],"pidfile") && argc
== 2) {
1074 server
.pidfile
= zstrdup(argv
[1]);
1075 } else if (!strcasecmp(argv
[0],"dbfilename") && argc
== 2) {
1076 server
.dbfilename
= zstrdup(argv
[1]);
1078 err
= "Bad directive or wrong number of arguments"; goto loaderr
;
1080 for (j
= 0; j
< argc
; j
++)
1089 fprintf(stderr
, "\n*** FATAL CONFIG FILE ERROR ***\n");
1090 fprintf(stderr
, "Reading the configuration file, at line %d\n", linenum
);
1091 fprintf(stderr
, ">>> '%s'\n", line
);
1092 fprintf(stderr
, "%s\n", err
);
1096 static void freeClientArgv(redisClient
*c
) {
1099 for (j
= 0; j
< c
->argc
; j
++)
1100 decrRefCount(c
->argv
[j
]);
1104 static void freeClient(redisClient
*c
) {
1107 aeDeleteFileEvent(server
.el
,c
->fd
,AE_READABLE
);
1108 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1109 sdsfree(c
->querybuf
);
1110 listRelease(c
->reply
);
1113 ln
= listSearchKey(server
.clients
,c
);
1115 listDelNode(server
.clients
,ln
);
1116 if (c
->flags
& REDIS_SLAVE
) {
1117 if (c
->replstate
== REDIS_REPL_SEND_BULK
&& c
->repldbfd
!= -1)
1119 list
*l
= (c
->flags
& REDIS_MONITOR
) ? server
.monitors
: server
.slaves
;
1120 ln
= listSearchKey(l
,c
);
1124 if (c
->flags
& REDIS_MASTER
) {
1125 server
.master
= NULL
;
1126 server
.replstate
= REDIS_REPL_CONNECT
;
1132 static void glueReplyBuffersIfNeeded(redisClient
*c
) {
1137 listRewind(c
->reply
);
1138 while((ln
= listYield(c
->reply
))) {
1140 totlen
+= sdslen(o
->ptr
);
1141 /* This optimization makes more sense if we don't have to copy
1143 if (totlen
> 1024) return;
1149 listRewind(c
->reply
);
1150 while((ln
= listYield(c
->reply
))) {
1152 memcpy(buf
+copylen
,o
->ptr
,sdslen(o
->ptr
));
1153 copylen
+= sdslen(o
->ptr
);
1154 listDelNode(c
->reply
,ln
);
1156 /* Now the output buffer is empty, add the new single element */
1157 o
= createObject(REDIS_STRING
,sdsnewlen(buf
,totlen
));
1158 if (!listAddNodeTail(c
->reply
,o
)) oom("listAddNodeTail");
1162 static void sendReplyToClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1163 redisClient
*c
= privdata
;
1164 int nwritten
= 0, totwritten
= 0, objlen
;
1167 REDIS_NOTUSED(mask
);
1169 if (server
.glueoutputbuf
&& listLength(c
->reply
) > 1)
1170 glueReplyBuffersIfNeeded(c
);
1171 while(listLength(c
->reply
)) {
1172 o
= listNodeValue(listFirst(c
->reply
));
1173 objlen
= sdslen(o
->ptr
);
1176 listDelNode(c
->reply
,listFirst(c
->reply
));
1180 if (c
->flags
& REDIS_MASTER
) {
1181 /* Don't reply to a master */
1182 nwritten
= objlen
- c
->sentlen
;
1184 nwritten
= write(fd
, ((char*)o
->ptr
)+c
->sentlen
, objlen
- c
->sentlen
);
1185 if (nwritten
<= 0) break;
1187 c
->sentlen
+= nwritten
;
1188 totwritten
+= nwritten
;
1189 /* If we fully sent the object on head go to the next one */
1190 if (c
->sentlen
== objlen
) {
1191 listDelNode(c
->reply
,listFirst(c
->reply
));
1194 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1195 * bytes, in a single threaded server it's a good idea to server
1196 * other clients as well, even if a very large request comes from
1197 * super fast link that is always able to accept data (in real world
1198 * terms think to 'KEYS *' against the loopback interfae) */
1199 if (totwritten
> REDIS_MAX_WRITE_PER_EVENT
) break;
1201 if (nwritten
== -1) {
1202 if (errno
== EAGAIN
) {
1205 redisLog(REDIS_DEBUG
,
1206 "Error writing to client: %s", strerror(errno
));
1211 if (totwritten
> 0) c
->lastinteraction
= time(NULL
);
1212 if (listLength(c
->reply
) == 0) {
1214 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1218 static struct redisCommand
*lookupCommand(char *name
) {
1220 while(cmdTable
[j
].name
!= NULL
) {
1221 if (!strcasecmp(name
,cmdTable
[j
].name
)) return &cmdTable
[j
];
1227 /* resetClient prepare the client to process the next command */
1228 static void resetClient(redisClient
*c
) {
1233 /* If this function gets called we already read a whole
1234 * command, argments are in the client argv/argc fields.
1235 * processCommand() execute the command or prepare the
1236 * server for a bulk read from the client.
1238 * If 1 is returned the client is still alive and valid and
1239 * and other operations can be performed by the caller. Otherwise
1240 * if 0 is returned the client was destroied (i.e. after QUIT). */
1241 static int processCommand(redisClient
*c
) {
1242 struct redisCommand
*cmd
;
1245 /* Free some memory if needed (maxmemory setting) */
1246 if (server
.maxmemory
) freeMemoryIfNeeded();
1248 /* The QUIT command is handled as a special case. Normal command
1249 * procs are unable to close the client connection safely */
1250 if (!strcasecmp(c
->argv
[0]->ptr
,"quit")) {
1254 cmd
= lookupCommand(c
->argv
[0]->ptr
);
1256 addReplySds(c
,sdsnew("-ERR unknown command\r\n"));
1259 } else if ((cmd
->arity
> 0 && cmd
->arity
!= c
->argc
) ||
1260 (c
->argc
< -cmd
->arity
)) {
1261 addReplySds(c
,sdsnew("-ERR wrong number of arguments\r\n"));
1264 } else if (server
.maxmemory
&& cmd
->flags
& REDIS_CMD_DENYOOM
&& zmalloc_used_memory() > server
.maxmemory
) {
1265 addReplySds(c
,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1268 } else if (cmd
->flags
& REDIS_CMD_BULK
&& c
->bulklen
== -1) {
1269 int bulklen
= atoi(c
->argv
[c
->argc
-1]->ptr
);
1271 decrRefCount(c
->argv
[c
->argc
-1]);
1272 if (bulklen
< 0 || bulklen
> 1024*1024*1024) {
1274 addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n"));
1279 c
->bulklen
= bulklen
+2; /* add two bytes for CR+LF */
1280 /* It is possible that the bulk read is already in the
1281 * buffer. Check this condition and handle it accordingly */
1282 if ((signed)sdslen(c
->querybuf
) >= c
->bulklen
) {
1283 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
1285 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
1290 /* Let's try to share objects on the command arguments vector */
1291 if (server
.shareobjects
) {
1293 for(j
= 1; j
< c
->argc
; j
++)
1294 c
->argv
[j
] = tryObjectSharing(c
->argv
[j
]);
1296 /* Check if the user is authenticated */
1297 if (server
.requirepass
&& !c
->authenticated
&& cmd
->proc
!= authCommand
) {
1298 addReplySds(c
,sdsnew("-ERR operation not permitted\r\n"));
1303 /* Exec the command */
1304 dirty
= server
.dirty
;
1306 if (server
.dirty
-dirty
!= 0 && listLength(server
.slaves
))
1307 replicationFeedSlaves(server
.slaves
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1308 if (listLength(server
.monitors
))
1309 replicationFeedSlaves(server
.monitors
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1310 server
.stat_numcommands
++;
1312 /* Prepare the client for the next command */
1313 if (c
->flags
& REDIS_CLOSE
) {
1321 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
) {
1325 /* (args*2)+1 is enough room for args, spaces, newlines */
1326 robj
*static_outv
[REDIS_STATIC_ARGS
*2+1];
1328 if (argc
<= REDIS_STATIC_ARGS
) {
1331 outv
= zmalloc(sizeof(robj
*)*(argc
*2+1));
1332 if (!outv
) oom("replicationFeedSlaves");
1335 for (j
= 0; j
< argc
; j
++) {
1336 if (j
!= 0) outv
[outc
++] = shared
.space
;
1337 if ((cmd
->flags
& REDIS_CMD_BULK
) && j
== argc
-1) {
1340 lenobj
= createObject(REDIS_STRING
,
1341 sdscatprintf(sdsempty(),"%d\r\n",sdslen(argv
[j
]->ptr
)));
1342 lenobj
->refcount
= 0;
1343 outv
[outc
++] = lenobj
;
1345 outv
[outc
++] = argv
[j
];
1347 outv
[outc
++] = shared
.crlf
;
1349 /* Increment all the refcounts at start and decrement at end in order to
1350 * be sure to free objects if there is no slave in a replication state
1351 * able to be feed with commands */
1352 for (j
= 0; j
< outc
; j
++) incrRefCount(outv
[j
]);
1354 while((ln
= listYield(slaves
))) {
1355 redisClient
*slave
= ln
->value
;
1357 /* Don't feed slaves that are still waiting for BGSAVE to start */
1358 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) continue;
1360 /* Feed all the other slaves, MONITORs and so on */
1361 if (slave
->slaveseldb
!= dictid
) {
1365 case 0: selectcmd
= shared
.select0
; break;
1366 case 1: selectcmd
= shared
.select1
; break;
1367 case 2: selectcmd
= shared
.select2
; break;
1368 case 3: selectcmd
= shared
.select3
; break;
1369 case 4: selectcmd
= shared
.select4
; break;
1370 case 5: selectcmd
= shared
.select5
; break;
1371 case 6: selectcmd
= shared
.select6
; break;
1372 case 7: selectcmd
= shared
.select7
; break;
1373 case 8: selectcmd
= shared
.select8
; break;
1374 case 9: selectcmd
= shared
.select9
; break;
1376 selectcmd
= createObject(REDIS_STRING
,
1377 sdscatprintf(sdsempty(),"select %d\r\n",dictid
));
1378 selectcmd
->refcount
= 0;
1381 addReply(slave
,selectcmd
);
1382 slave
->slaveseldb
= dictid
;
1384 for (j
= 0; j
< outc
; j
++) addReply(slave
,outv
[j
]);
1386 for (j
= 0; j
< outc
; j
++) decrRefCount(outv
[j
]);
1387 if (outv
!= static_outv
) zfree(outv
);
1390 static void readQueryFromClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1391 redisClient
*c
= (redisClient
*) privdata
;
1392 char buf
[REDIS_IOBUF_LEN
];
1395 REDIS_NOTUSED(mask
);
1397 nread
= read(fd
, buf
, REDIS_IOBUF_LEN
);
1399 if (errno
== EAGAIN
) {
1402 redisLog(REDIS_DEBUG
, "Reading from client: %s",strerror(errno
));
1406 } else if (nread
== 0) {
1407 redisLog(REDIS_DEBUG
, "Client closed connection");
1412 c
->querybuf
= sdscatlen(c
->querybuf
, buf
, nread
);
1413 c
->lastinteraction
= time(NULL
);
1419 if (c
->bulklen
== -1) {
1420 /* Read the first line of the query */
1421 char *p
= strchr(c
->querybuf
,'\n');
1427 query
= c
->querybuf
;
1428 c
->querybuf
= sdsempty();
1429 querylen
= 1+(p
-(query
));
1430 if (sdslen(query
) > querylen
) {
1431 /* leave data after the first line of the query in the buffer */
1432 c
->querybuf
= sdscatlen(c
->querybuf
,query
+querylen
,sdslen(query
)-querylen
);
1434 *p
= '\0'; /* remove "\n" */
1435 if (*(p
-1) == '\r') *(p
-1) = '\0'; /* and "\r" if any */
1436 sdsupdatelen(query
);
1438 /* Now we can split the query in arguments */
1439 if (sdslen(query
) == 0) {
1440 /* Ignore empty query */
1444 argv
= sdssplitlen(query
,sdslen(query
)," ",1,&argc
);
1445 if (argv
== NULL
) oom("sdssplitlen");
1448 if (c
->argv
) zfree(c
->argv
);
1449 c
->argv
= zmalloc(sizeof(robj
*)*argc
);
1450 if (c
->argv
== NULL
) oom("allocating arguments list for client");
1452 for (j
= 0; j
< argc
; j
++) {
1453 if (sdslen(argv
[j
])) {
1454 c
->argv
[c
->argc
] = createObject(REDIS_STRING
,argv
[j
]);
1461 /* Execute the command. If the client is still valid
1462 * after processCommand() return and there is something
1463 * on the query buffer try to process the next command. */
1464 if (processCommand(c
) && sdslen(c
->querybuf
)) goto again
;
1466 } else if (sdslen(c
->querybuf
) >= 1024*32) {
1467 redisLog(REDIS_DEBUG
, "Client protocol error");
1472 /* Bulk read handling. Note that if we are at this point
1473 the client already sent a command terminated with a newline,
1474 we are reading the bulk data that is actually the last
1475 argument of the command. */
1476 int qbl
= sdslen(c
->querybuf
);
1478 if (c
->bulklen
<= qbl
) {
1479 /* Copy everything but the final CRLF as final argument */
1480 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
1482 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
1489 static int selectDb(redisClient
*c
, int id
) {
1490 if (id
< 0 || id
>= server
.dbnum
)
1492 c
->db
= &server
.db
[id
];
1496 static void *dupClientReplyValue(void *o
) {
1497 incrRefCount((robj
*)o
);
1501 static redisClient
*createClient(int fd
) {
1502 redisClient
*c
= zmalloc(sizeof(*c
));
1504 anetNonBlock(NULL
,fd
);
1505 anetTcpNoDelay(NULL
,fd
);
1506 if (!c
) return NULL
;
1509 c
->querybuf
= sdsempty();
1515 c
->lastinteraction
= time(NULL
);
1516 c
->authenticated
= 0;
1517 c
->replstate
= REDIS_REPL_NONE
;
1518 if ((c
->reply
= listCreate()) == NULL
) oom("listCreate");
1519 listSetFreeMethod(c
->reply
,decrRefCount
);
1520 listSetDupMethod(c
->reply
,dupClientReplyValue
);
1521 if (aeCreateFileEvent(server
.el
, c
->fd
, AE_READABLE
,
1522 readQueryFromClient
, c
, NULL
) == AE_ERR
) {
1526 if (!listAddNodeTail(server
.clients
,c
)) oom("listAddNodeTail");
1530 static void addReply(redisClient
*c
, robj
*obj
) {
1531 if (listLength(c
->reply
) == 0 &&
1532 (c
->replstate
== REDIS_REPL_NONE
||
1533 c
->replstate
== REDIS_REPL_ONLINE
) &&
1534 aeCreateFileEvent(server
.el
, c
->fd
, AE_WRITABLE
,
1535 sendReplyToClient
, c
, NULL
) == AE_ERR
) return;
1536 if (!listAddNodeTail(c
->reply
,obj
)) oom("listAddNodeTail");
1540 static void addReplySds(redisClient
*c
, sds s
) {
1541 robj
*o
= createObject(REDIS_STRING
,s
);
1546 static void acceptHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1551 REDIS_NOTUSED(mask
);
1552 REDIS_NOTUSED(privdata
);
1554 cfd
= anetAccept(server
.neterr
, fd
, cip
, &cport
);
1555 if (cfd
== AE_ERR
) {
1556 redisLog(REDIS_DEBUG
,"Accepting client connection: %s", server
.neterr
);
1559 redisLog(REDIS_DEBUG
,"Accepted %s:%d", cip
, cport
);
1560 if ((c
= createClient(cfd
)) == NULL
) {
1561 redisLog(REDIS_WARNING
,"Error allocating resoures for the client");
1562 close(cfd
); /* May be already closed, just ingore errors */
1565 /* If maxclient directive is set and this is one client more... close the
1566 * connection. Note that we create the client instead to check before
1567 * for this condition, since now the socket is already set in nonblocking
1568 * mode and we can send an error for free using the Kernel I/O */
1569 if (server
.maxclients
&& listLength(server
.clients
) > server
.maxclients
) {
1570 char *err
= "-ERR max number of clients reached\r\n";
1572 /* That's a best effort error message, don't check write errors */
1573 (void) write(c
->fd
,err
,strlen(err
));
1577 server
.stat_numconnections
++;
1580 /* ======================= Redis objects implementation ===================== */
1582 static robj
*createObject(int type
, void *ptr
) {
1585 if (listLength(server
.objfreelist
)) {
1586 listNode
*head
= listFirst(server
.objfreelist
);
1587 o
= listNodeValue(head
);
1588 listDelNode(server
.objfreelist
,head
);
1590 o
= zmalloc(sizeof(*o
));
1592 if (!o
) oom("createObject");
1599 static robj
*createStringObject(char *ptr
, size_t len
) {
1600 return createObject(REDIS_STRING
,sdsnewlen(ptr
,len
));
1603 static robj
*createListObject(void) {
1604 list
*l
= listCreate();
1606 if (!l
) oom("listCreate");
1607 listSetFreeMethod(l
,decrRefCount
);
1608 return createObject(REDIS_LIST
,l
);
1611 static robj
*createSetObject(void) {
1612 dict
*d
= dictCreate(&setDictType
,NULL
);
1613 if (!d
) oom("dictCreate");
1614 return createObject(REDIS_SET
,d
);
1617 static void freeStringObject(robj
*o
) {
1621 static void freeListObject(robj
*o
) {
1622 listRelease((list
*) o
->ptr
);
1625 static void freeSetObject(robj
*o
) {
1626 dictRelease((dict
*) o
->ptr
);
1629 static void freeHashObject(robj
*o
) {
1630 dictRelease((dict
*) o
->ptr
);
1633 static void incrRefCount(robj
*o
) {
1635 #ifdef DEBUG_REFCOUNT
1636 if (o
->type
== REDIS_STRING
)
1637 printf("Increment '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
);
1641 static void decrRefCount(void *obj
) {
1644 #ifdef DEBUG_REFCOUNT
1645 if (o
->type
== REDIS_STRING
)
1646 printf("Decrement '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
-1);
1648 if (--(o
->refcount
) == 0) {
1650 case REDIS_STRING
: freeStringObject(o
); break;
1651 case REDIS_LIST
: freeListObject(o
); break;
1652 case REDIS_SET
: freeSetObject(o
); break;
1653 case REDIS_HASH
: freeHashObject(o
); break;
1654 default: assert(0 != 0); break;
1656 if (listLength(server
.objfreelist
) > REDIS_OBJFREELIST_MAX
||
1657 !listAddNodeHead(server
.objfreelist
,o
))
1662 /* Try to share an object against the shared objects pool */
1663 static robj
*tryObjectSharing(robj
*o
) {
1664 struct dictEntry
*de
;
1667 if (o
== NULL
|| server
.shareobjects
== 0) return o
;
1669 assert(o
->type
== REDIS_STRING
);
1670 de
= dictFind(server
.sharingpool
,o
);
1672 robj
*shared
= dictGetEntryKey(de
);
1674 c
= ((unsigned long) dictGetEntryVal(de
))+1;
1675 dictGetEntryVal(de
) = (void*) c
;
1676 incrRefCount(shared
);
1680 /* Here we are using a stream algorihtm: Every time an object is
1681 * shared we increment its count, everytime there is a miss we
1682 * recrement the counter of a random object. If this object reaches
1683 * zero we remove the object and put the current object instead. */
1684 if (dictSize(server
.sharingpool
) >=
1685 server
.sharingpoolsize
) {
1686 de
= dictGetRandomKey(server
.sharingpool
);
1688 c
= ((unsigned long) dictGetEntryVal(de
))-1;
1689 dictGetEntryVal(de
) = (void*) c
;
1691 dictDelete(server
.sharingpool
,de
->key
);
1694 c
= 0; /* If the pool is empty we want to add this object */
1699 retval
= dictAdd(server
.sharingpool
,o
,(void*)1);
1700 assert(retval
== DICT_OK
);
1707 static robj
*lookupKey(redisDb
*db
, robj
*key
) {
1708 dictEntry
*de
= dictFind(db
->dict
,key
);
1709 return de
? dictGetEntryVal(de
) : NULL
;
1712 static robj
*lookupKeyRead(redisDb
*db
, robj
*key
) {
1713 expireIfNeeded(db
,key
);
1714 return lookupKey(db
,key
);
1717 static robj
*lookupKeyWrite(redisDb
*db
, robj
*key
) {
1718 deleteIfVolatile(db
,key
);
1719 return lookupKey(db
,key
);
1722 static int deleteKey(redisDb
*db
, robj
*key
) {
1725 /* We need to protect key from destruction: after the first dictDelete()
1726 * it may happen that 'key' is no longer valid if we don't increment
1727 * it's count. This may happen when we get the object reference directly
1728 * from the hash table with dictRandomKey() or dict iterators */
1730 if (dictSize(db
->expires
)) dictDelete(db
->expires
,key
);
1731 retval
= dictDelete(db
->dict
,key
);
1734 return retval
== DICT_OK
;
1737 /*============================ DB saving/loading ============================ */
1739 static int rdbSaveType(FILE *fp
, unsigned char type
) {
1740 if (fwrite(&type
,1,1,fp
) == 0) return -1;
1744 static int rdbSaveTime(FILE *fp
, time_t t
) {
1745 int32_t t32
= (int32_t) t
;
1746 if (fwrite(&t32
,4,1,fp
) == 0) return -1;
1750 /* check rdbLoadLen() comments for more info */
1751 static int rdbSaveLen(FILE *fp
, uint32_t len
) {
1752 unsigned char buf
[2];
1755 /* Save a 6 bit len */
1756 buf
[0] = (len
&0xFF)|(REDIS_RDB_6BITLEN
<<6);
1757 if (fwrite(buf
,1,1,fp
) == 0) return -1;
1758 } else if (len
< (1<<14)) {
1759 /* Save a 14 bit len */
1760 buf
[0] = ((len
>>8)&0xFF)|(REDIS_RDB_14BITLEN
<<6);
1762 if (fwrite(buf
,2,1,fp
) == 0) return -1;
1764 /* Save a 32 bit len */
1765 buf
[0] = (REDIS_RDB_32BITLEN
<<6);
1766 if (fwrite(buf
,1,1,fp
) == 0) return -1;
1768 if (fwrite(&len
,4,1,fp
) == 0) return -1;
1773 /* String objects in the form "2391" "-100" without any space and with a
1774 * range of values that can fit in an 8, 16 or 32 bit signed value can be
1775 * encoded as integers to save space */
1776 int rdbTryIntegerEncoding(sds s
, unsigned char *enc
) {
1778 char *endptr
, buf
[32];
1780 /* Check if it's possible to encode this value as a number */
1781 value
= strtoll(s
, &endptr
, 10);
1782 if (endptr
[0] != '\0') return 0;
1783 snprintf(buf
,32,"%lld",value
);
1785 /* If the number converted back into a string is not identical
1786 * then it's not possible to encode the string as integer */
1787 if (strlen(buf
) != sdslen(s
) || memcmp(buf
,s
,sdslen(s
))) return 0;
1789 /* Finally check if it fits in our ranges */
1790 if (value
>= -(1<<7) && value
<= (1<<7)-1) {
1791 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT8
;
1792 enc
[1] = value
&0xFF;
1794 } else if (value
>= -(1<<15) && value
<= (1<<15)-1) {
1795 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT16
;
1796 enc
[1] = value
&0xFF;
1797 enc
[2] = (value
>>8)&0xFF;
1799 } else if (value
>= -((long long)1<<31) && value
<= ((long long)1<<31)-1) {
1800 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT32
;
1801 enc
[1] = value
&0xFF;
1802 enc
[2] = (value
>>8)&0xFF;
1803 enc
[3] = (value
>>16)&0xFF;
1804 enc
[4] = (value
>>24)&0xFF;
1811 static int rdbSaveLzfStringObject(FILE *fp
, robj
*obj
) {
1812 unsigned int comprlen
, outlen
;
1816 /* We require at least four bytes compression for this to be worth it */
1817 outlen
= sdslen(obj
->ptr
)-4;
1818 if (outlen
<= 0) return 0;
1819 if ((out
= zmalloc(outlen
+1)) == NULL
) return 0;
1820 comprlen
= lzf_compress(obj
->ptr
, sdslen(obj
->ptr
), out
, outlen
);
1821 if (comprlen
== 0) {
1825 /* Data compressed! Let's save it on disk */
1826 byte
= (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_LZF
;
1827 if (fwrite(&byte
,1,1,fp
) == 0) goto writeerr
;
1828 if (rdbSaveLen(fp
,comprlen
) == -1) goto writeerr
;
1829 if (rdbSaveLen(fp
,sdslen(obj
->ptr
)) == -1) goto writeerr
;
1830 if (fwrite(out
,comprlen
,1,fp
) == 0) goto writeerr
;
1839 /* Save a string objet as [len][data] on disk. If the object is a string
1840 * representation of an integer value we try to safe it in a special form */
1841 static int rdbSaveStringObject(FILE *fp
, robj
*obj
) {
1842 size_t len
= sdslen(obj
->ptr
);
1845 /* Try integer encoding */
1847 unsigned char buf
[5];
1848 if ((enclen
= rdbTryIntegerEncoding(obj
->ptr
,buf
)) > 0) {
1849 if (fwrite(buf
,enclen
,1,fp
) == 0) return -1;
1854 /* Try LZF compression - under 20 bytes it's unable to compress even
1855 * aaaaaaaaaaaaaaaaaa so skip it */
1856 if (1 && len
> 20) {
1859 retval
= rdbSaveLzfStringObject(fp
,obj
);
1860 if (retval
== -1) return -1;
1861 if (retval
> 0) return 0;
1862 /* retval == 0 means data can't be compressed, save the old way */
1865 /* Store verbatim */
1866 if (rdbSaveLen(fp
,len
) == -1) return -1;
1867 if (len
&& fwrite(obj
->ptr
,len
,1,fp
) == 0) return -1;
1871 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
1872 static int rdbSave(char *filename
) {
1873 dictIterator
*di
= NULL
;
1878 time_t now
= time(NULL
);
1880 snprintf(tmpfile
,256,"temp-%d.%ld.rdb",(int)time(NULL
),(long int)random());
1881 fp
= fopen(tmpfile
,"w");
1883 redisLog(REDIS_WARNING
, "Failed saving the DB: %s", strerror(errno
));
1886 if (fwrite("REDIS0001",9,1,fp
) == 0) goto werr
;
1887 for (j
= 0; j
< server
.dbnum
; j
++) {
1888 redisDb
*db
= server
.db
+j
;
1890 if (dictSize(d
) == 0) continue;
1891 di
= dictGetIterator(d
);
1897 /* Write the SELECT DB opcode */
1898 if (rdbSaveType(fp
,REDIS_SELECTDB
) == -1) goto werr
;
1899 if (rdbSaveLen(fp
,j
) == -1) goto werr
;
1901 /* Iterate this DB writing every entry */
1902 while((de
= dictNext(di
)) != NULL
) {
1903 robj
*key
= dictGetEntryKey(de
);
1904 robj
*o
= dictGetEntryVal(de
);
1905 time_t expiretime
= getExpire(db
,key
);
1907 /* Save the expire time */
1908 if (expiretime
!= -1) {
1909 /* If this key is already expired skip it */
1910 if (expiretime
< now
) continue;
1911 if (rdbSaveType(fp
,REDIS_EXPIRETIME
) == -1) goto werr
;
1912 if (rdbSaveTime(fp
,expiretime
) == -1) goto werr
;
1914 /* Save the key and associated value */
1915 if (rdbSaveType(fp
,o
->type
) == -1) goto werr
;
1916 if (rdbSaveStringObject(fp
,key
) == -1) goto werr
;
1917 if (o
->type
== REDIS_STRING
) {
1918 /* Save a string value */
1919 if (rdbSaveStringObject(fp
,o
) == -1) goto werr
;
1920 } else if (o
->type
== REDIS_LIST
) {
1921 /* Save a list value */
1922 list
*list
= o
->ptr
;
1926 if (rdbSaveLen(fp
,listLength(list
)) == -1) goto werr
;
1927 while((ln
= listYield(list
))) {
1928 robj
*eleobj
= listNodeValue(ln
);
1930 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
1932 } else if (o
->type
== REDIS_SET
) {
1933 /* Save a set value */
1935 dictIterator
*di
= dictGetIterator(set
);
1938 if (!set
) oom("dictGetIteraotr");
1939 if (rdbSaveLen(fp
,dictSize(set
)) == -1) goto werr
;
1940 while((de
= dictNext(di
)) != NULL
) {
1941 robj
*eleobj
= dictGetEntryKey(de
);
1943 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
1945 dictReleaseIterator(di
);
1950 dictReleaseIterator(di
);
1953 if (rdbSaveType(fp
,REDIS_EOF
) == -1) goto werr
;
1955 /* Make sure data will not remain on the OS's output buffers */
1960 /* Use RENAME to make sure the DB file is changed atomically only
1961 * if the generate DB file is ok. */
1962 if (rename(tmpfile
,filename
) == -1) {
1963 redisLog(REDIS_WARNING
,"Error moving temp DB file on the final destionation: %s", strerror(errno
));
1967 redisLog(REDIS_NOTICE
,"DB saved on disk");
1969 server
.lastsave
= time(NULL
);
1975 redisLog(REDIS_WARNING
,"Write error saving DB on disk: %s", strerror(errno
));
1976 if (di
) dictReleaseIterator(di
);
1980 static int rdbSaveBackground(char *filename
) {
1983 if (server
.bgsaveinprogress
) return REDIS_ERR
;
1984 if ((childpid
= fork()) == 0) {
1987 if (rdbSave(filename
) == REDIS_OK
) {
1994 if (childpid
== -1) {
1995 redisLog(REDIS_WARNING
,"Can't save in background: fork: %s",
1999 redisLog(REDIS_NOTICE
,"Background saving started by pid %d",childpid
);
2000 server
.bgsaveinprogress
= 1;
2001 server
.bgsavechildpid
= childpid
;
2004 return REDIS_OK
; /* unreached */
2007 static int rdbLoadType(FILE *fp
) {
2009 if (fread(&type
,1,1,fp
) == 0) return -1;
2013 static time_t rdbLoadTime(FILE *fp
) {
2015 if (fread(&t32
,4,1,fp
) == 0) return -1;
2016 return (time_t) t32
;
2019 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2020 * of this file for a description of how this are stored on disk.
2022 * isencoded is set to 1 if the readed length is not actually a length but
2023 * an "encoding type", check the above comments for more info */
2024 static uint32_t rdbLoadLen(FILE *fp
, int rdbver
, int *isencoded
) {
2025 unsigned char buf
[2];
2028 if (isencoded
) *isencoded
= 0;
2030 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
2035 if (fread(buf
,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2036 type
= (buf
[0]&0xC0)>>6;
2037 if (type
== REDIS_RDB_6BITLEN
) {
2038 /* Read a 6 bit len */
2040 } else if (type
== REDIS_RDB_ENCVAL
) {
2041 /* Read a 6 bit len encoding type */
2042 if (isencoded
) *isencoded
= 1;
2044 } else if (type
== REDIS_RDB_14BITLEN
) {
2045 /* Read a 14 bit len */
2046 if (fread(buf
+1,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2047 return ((buf
[0]&0x3F)<<8)|buf
[1];
2049 /* Read a 32 bit len */
2050 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
2056 static robj
*rdbLoadIntegerObject(FILE *fp
, int enctype
) {
2057 unsigned char enc
[4];
2060 if (enctype
== REDIS_RDB_ENC_INT8
) {
2061 if (fread(enc
,1,1,fp
) == 0) return NULL
;
2062 val
= (signed char)enc
[0];
2063 } else if (enctype
== REDIS_RDB_ENC_INT16
) {
2065 if (fread(enc
,2,1,fp
) == 0) return NULL
;
2066 v
= enc
[0]|(enc
[1]<<8);
2068 } else if (enctype
== REDIS_RDB_ENC_INT32
) {
2070 if (fread(enc
,4,1,fp
) == 0) return NULL
;
2071 v
= enc
[0]|(enc
[1]<<8)|(enc
[2]<<16)|(enc
[3]<<24);
2074 val
= 0; /* anti-warning */
2077 return createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",val
));
2080 static robj
*rdbLoadLzfStringObject(FILE*fp
, int rdbver
) {
2081 unsigned int len
, clen
;
2082 unsigned char *c
= NULL
;
2085 if ((clen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2086 if ((len
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2087 if ((c
= zmalloc(clen
)) == NULL
) goto err
;
2088 if ((val
= sdsnewlen(NULL
,len
)) == NULL
) goto err
;
2089 if (fread(c
,clen
,1,fp
) == 0) goto err
;
2090 if (lzf_decompress(c
,clen
,val
,len
) == 0) goto err
;
2092 return createObject(REDIS_STRING
,val
);
2099 static robj
*rdbLoadStringObject(FILE*fp
, int rdbver
) {
2104 len
= rdbLoadLen(fp
,rdbver
,&isencoded
);
2107 case REDIS_RDB_ENC_INT8
:
2108 case REDIS_RDB_ENC_INT16
:
2109 case REDIS_RDB_ENC_INT32
:
2110 return tryObjectSharing(rdbLoadIntegerObject(fp
,len
));
2111 case REDIS_RDB_ENC_LZF
:
2112 return tryObjectSharing(rdbLoadLzfStringObject(fp
,rdbver
));
2118 if (len
== REDIS_RDB_LENERR
) return NULL
;
2119 val
= sdsnewlen(NULL
,len
);
2120 if (len
&& fread(val
,len
,1,fp
) == 0) {
2124 return tryObjectSharing(createObject(REDIS_STRING
,val
));
2127 static int rdbLoad(char *filename
) {
2129 robj
*keyobj
= NULL
;
2131 int type
, retval
, rdbver
;
2132 dict
*d
= server
.db
[0].dict
;
2133 redisDb
*db
= server
.db
+0;
2135 time_t expiretime
= -1, now
= time(NULL
);
2137 fp
= fopen(filename
,"r");
2138 if (!fp
) return REDIS_ERR
;
2139 if (fread(buf
,9,1,fp
) == 0) goto eoferr
;
2141 if (memcmp(buf
,"REDIS",5) != 0) {
2143 redisLog(REDIS_WARNING
,"Wrong signature trying to load DB from file");
2146 rdbver
= atoi(buf
+5);
2149 redisLog(REDIS_WARNING
,"Can't handle RDB format version %d",rdbver
);
2156 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
2157 if (type
== REDIS_EXPIRETIME
) {
2158 if ((expiretime
= rdbLoadTime(fp
)) == -1) goto eoferr
;
2159 /* We read the time so we need to read the object type again */
2160 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
2162 if (type
== REDIS_EOF
) break;
2163 /* Handle SELECT DB opcode as a special case */
2164 if (type
== REDIS_SELECTDB
) {
2165 if ((dbid
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2167 if (dbid
>= (unsigned)server
.dbnum
) {
2168 redisLog(REDIS_WARNING
,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server
.dbnum
);
2171 db
= server
.db
+dbid
;
2176 if ((keyobj
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2178 if (type
== REDIS_STRING
) {
2179 /* Read string value */
2180 if ((o
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2181 } else if (type
== REDIS_LIST
|| type
== REDIS_SET
) {
2182 /* Read list/set value */
2185 if ((listlen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2187 o
= (type
== REDIS_LIST
) ? createListObject() : createSetObject();
2188 /* Load every single element of the list/set */
2192 if ((ele
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2193 if (type
== REDIS_LIST
) {
2194 if (!listAddNodeTail((list
*)o
->ptr
,ele
))
2195 oom("listAddNodeTail");
2197 if (dictAdd((dict
*)o
->ptr
,ele
,NULL
) == DICT_ERR
)
2204 /* Add the new object in the hash table */
2205 retval
= dictAdd(d
,keyobj
,o
);
2206 if (retval
== DICT_ERR
) {
2207 redisLog(REDIS_WARNING
,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj
->ptr
);
2210 /* Set the expire time if needed */
2211 if (expiretime
!= -1) {
2212 setExpire(db
,keyobj
,expiretime
);
2213 /* Delete this key if already expired */
2214 if (expiretime
< now
) deleteKey(db
,keyobj
);
2222 eoferr
: /* unexpected end of file is handled here with a fatal exit */
2223 if (keyobj
) decrRefCount(keyobj
);
2224 redisLog(REDIS_WARNING
,"Short read or OOM loading DB. Unrecoverable error, exiting now.");
2226 return REDIS_ERR
; /* Just to avoid warning */
2229 /*================================== Commands =============================== */
2231 static void authCommand(redisClient
*c
) {
2232 if (!server
.requirepass
|| !strcmp(c
->argv
[1]->ptr
, server
.requirepass
)) {
2233 c
->authenticated
= 1;
2234 addReply(c
,shared
.ok
);
2236 c
->authenticated
= 0;
2237 addReply(c
,shared
.err
);
2241 static void pingCommand(redisClient
*c
) {
2242 addReply(c
,shared
.pong
);
2245 static void echoCommand(redisClient
*c
) {
2246 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",
2247 (int)sdslen(c
->argv
[1]->ptr
)));
2248 addReply(c
,c
->argv
[1]);
2249 addReply(c
,shared
.crlf
);
2252 /*=================================== Strings =============================== */
2254 static void setGenericCommand(redisClient
*c
, int nx
) {
2257 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2258 if (retval
== DICT_ERR
) {
2260 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2261 incrRefCount(c
->argv
[2]);
2263 addReply(c
,shared
.czero
);
2267 incrRefCount(c
->argv
[1]);
2268 incrRefCount(c
->argv
[2]);
2271 removeExpire(c
->db
,c
->argv
[1]);
2272 addReply(c
, nx
? shared
.cone
: shared
.ok
);
2275 static void setCommand(redisClient
*c
) {
2276 setGenericCommand(c
,0);
2279 static void setnxCommand(redisClient
*c
) {
2280 setGenericCommand(c
,1);
2283 static void getCommand(redisClient
*c
) {
2284 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2287 addReply(c
,shared
.nullbulk
);
2289 if (o
->type
!= REDIS_STRING
) {
2290 addReply(c
,shared
.wrongtypeerr
);
2292 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(o
->ptr
)));
2294 addReply(c
,shared
.crlf
);
2299 static void getSetCommand(redisClient
*c
) {
2301 if (dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]) == DICT_ERR
) {
2302 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2304 incrRefCount(c
->argv
[1]);
2306 incrRefCount(c
->argv
[2]);
2308 removeExpire(c
->db
,c
->argv
[1]);
2311 static void mgetCommand(redisClient
*c
) {
2314 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",c
->argc
-1));
2315 for (j
= 1; j
< c
->argc
; j
++) {
2316 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[j
]);
2318 addReply(c
,shared
.nullbulk
);
2320 if (o
->type
!= REDIS_STRING
) {
2321 addReply(c
,shared
.nullbulk
);
2323 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(o
->ptr
)));
2325 addReply(c
,shared
.crlf
);
2331 static void incrDecrCommand(redisClient
*c
, long long incr
) {
2336 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2340 if (o
->type
!= REDIS_STRING
) {
2345 value
= strtoll(o
->ptr
, &eptr
, 10);
2350 o
= createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",value
));
2351 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],o
);
2352 if (retval
== DICT_ERR
) {
2353 dictReplace(c
->db
->dict
,c
->argv
[1],o
);
2354 removeExpire(c
->db
,c
->argv
[1]);
2356 incrRefCount(c
->argv
[1]);
2359 addReply(c
,shared
.colon
);
2361 addReply(c
,shared
.crlf
);
2364 static void incrCommand(redisClient
*c
) {
2365 incrDecrCommand(c
,1);
2368 static void decrCommand(redisClient
*c
) {
2369 incrDecrCommand(c
,-1);
2372 static void incrbyCommand(redisClient
*c
) {
2373 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
2374 incrDecrCommand(c
,incr
);
2377 static void decrbyCommand(redisClient
*c
) {
2378 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
2379 incrDecrCommand(c
,-incr
);
2382 /* ========================= Type agnostic commands ========================= */
2384 static void delCommand(redisClient
*c
) {
2387 for (j
= 1; j
< c
->argc
; j
++) {
2388 if (deleteKey(c
->db
,c
->argv
[j
])) {
2395 addReply(c
,shared
.czero
);
2398 addReply(c
,shared
.cone
);
2401 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",deleted
));
2406 static void existsCommand(redisClient
*c
) {
2407 addReply(c
,lookupKeyRead(c
->db
,c
->argv
[1]) ? shared
.cone
: shared
.czero
);
2410 static void selectCommand(redisClient
*c
) {
2411 int id
= atoi(c
->argv
[1]->ptr
);
2413 if (selectDb(c
,id
) == REDIS_ERR
) {
2414 addReplySds(c
,sdsnew("-ERR invalid DB index\r\n"));
2416 addReply(c
,shared
.ok
);
2420 static void randomkeyCommand(redisClient
*c
) {
2424 de
= dictGetRandomKey(c
->db
->dict
);
2425 if (!de
|| expireIfNeeded(c
->db
,dictGetEntryKey(de
)) == 0) break;
2428 addReply(c
,shared
.plus
);
2429 addReply(c
,shared
.crlf
);
2431 addReply(c
,shared
.plus
);
2432 addReply(c
,dictGetEntryKey(de
));
2433 addReply(c
,shared
.crlf
);
2437 static void keysCommand(redisClient
*c
) {
2440 sds pattern
= c
->argv
[1]->ptr
;
2441 int plen
= sdslen(pattern
);
2442 int numkeys
= 0, keyslen
= 0;
2443 robj
*lenobj
= createObject(REDIS_STRING
,NULL
);
2445 di
= dictGetIterator(c
->db
->dict
);
2446 if (!di
) oom("dictGetIterator");
2448 decrRefCount(lenobj
);
2449 while((de
= dictNext(di
)) != NULL
) {
2450 robj
*keyobj
= dictGetEntryKey(de
);
2452 sds key
= keyobj
->ptr
;
2453 if ((pattern
[0] == '*' && pattern
[1] == '\0') ||
2454 stringmatchlen(pattern
,plen
,key
,sdslen(key
),0)) {
2455 if (expireIfNeeded(c
->db
,keyobj
) == 0) {
2457 addReply(c
,shared
.space
);
2460 keyslen
+= sdslen(key
);
2464 dictReleaseIterator(di
);
2465 lenobj
->ptr
= sdscatprintf(sdsempty(),"$%lu\r\n",keyslen
+(numkeys
? (numkeys
-1) : 0));
2466 addReply(c
,shared
.crlf
);
2469 static void dbsizeCommand(redisClient
*c
) {
2471 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c
->db
->dict
)));
2474 static void lastsaveCommand(redisClient
*c
) {
2476 sdscatprintf(sdsempty(),":%lu\r\n",server
.lastsave
));
2479 static void typeCommand(redisClient
*c
) {
2483 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2488 case REDIS_STRING
: type
= "+string"; break;
2489 case REDIS_LIST
: type
= "+list"; break;
2490 case REDIS_SET
: type
= "+set"; break;
2491 default: type
= "unknown"; break;
2494 addReplySds(c
,sdsnew(type
));
2495 addReply(c
,shared
.crlf
);
2498 static void saveCommand(redisClient
*c
) {
2499 if (server
.bgsaveinprogress
) {
2500 addReplySds(c
,sdsnew("-ERR background save in progress\r\n"));
2503 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
2504 addReply(c
,shared
.ok
);
2506 addReply(c
,shared
.err
);
2510 static void bgsaveCommand(redisClient
*c
) {
2511 if (server
.bgsaveinprogress
) {
2512 addReplySds(c
,sdsnew("-ERR background save already in progress\r\n"));
2515 if (rdbSaveBackground(server
.dbfilename
) == REDIS_OK
) {
2516 addReply(c
,shared
.ok
);
2518 addReply(c
,shared
.err
);
2522 static void shutdownCommand(redisClient
*c
) {
2523 redisLog(REDIS_WARNING
,"User requested shutdown, saving DB...");
2524 if (server
.bgsaveinprogress
) {
2525 redisLog(REDIS_WARNING
,"There is a live saving child. Killing it!");
2526 signal(SIGCHLD
, SIG_IGN
);
2527 kill(server
.bgsavechildpid
,SIGKILL
);
2529 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
2530 if (server
.daemonize
)
2531 unlink(server
.pidfile
);
2532 redisLog(REDIS_WARNING
,"%zu bytes used at exit",zmalloc_used_memory());
2533 redisLog(REDIS_WARNING
,"Server exit now, bye bye...");
2536 signal(SIGCHLD
, SIG_DFL
);
2537 redisLog(REDIS_WARNING
,"Error trying to save the DB, can't exit");
2538 addReplySds(c
,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
2542 static void renameGenericCommand(redisClient
*c
, int nx
) {
2545 /* To use the same key as src and dst is probably an error */
2546 if (sdscmp(c
->argv
[1]->ptr
,c
->argv
[2]->ptr
) == 0) {
2547 addReply(c
,shared
.sameobjecterr
);
2551 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2553 addReply(c
,shared
.nokeyerr
);
2557 deleteIfVolatile(c
->db
,c
->argv
[2]);
2558 if (dictAdd(c
->db
->dict
,c
->argv
[2],o
) == DICT_ERR
) {
2561 addReply(c
,shared
.czero
);
2564 dictReplace(c
->db
->dict
,c
->argv
[2],o
);
2566 incrRefCount(c
->argv
[2]);
2568 deleteKey(c
->db
,c
->argv
[1]);
2570 addReply(c
,nx
? shared
.cone
: shared
.ok
);
2573 static void renameCommand(redisClient
*c
) {
2574 renameGenericCommand(c
,0);
2577 static void renamenxCommand(redisClient
*c
) {
2578 renameGenericCommand(c
,1);
2581 static void moveCommand(redisClient
*c
) {
2586 /* Obtain source and target DB pointers */
2589 if (selectDb(c
,atoi(c
->argv
[2]->ptr
)) == REDIS_ERR
) {
2590 addReply(c
,shared
.outofrangeerr
);
2594 selectDb(c
,srcid
); /* Back to the source DB */
2596 /* If the user is moving using as target the same
2597 * DB as the source DB it is probably an error. */
2599 addReply(c
,shared
.sameobjecterr
);
2603 /* Check if the element exists and get a reference */
2604 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2606 addReply(c
,shared
.czero
);
2610 /* Try to add the element to the target DB */
2611 deleteIfVolatile(dst
,c
->argv
[1]);
2612 if (dictAdd(dst
->dict
,c
->argv
[1],o
) == DICT_ERR
) {
2613 addReply(c
,shared
.czero
);
2616 incrRefCount(c
->argv
[1]);
2619 /* OK! key moved, free the entry in the source DB */
2620 deleteKey(src
,c
->argv
[1]);
2622 addReply(c
,shared
.cone
);
2625 /* =================================== Lists ================================ */
2626 static void pushGenericCommand(redisClient
*c
, int where
) {
2630 lobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2632 lobj
= createListObject();
2634 if (where
== REDIS_HEAD
) {
2635 if (!listAddNodeHead(list
,c
->argv
[2])) oom("listAddNodeHead");
2637 if (!listAddNodeTail(list
,c
->argv
[2])) oom("listAddNodeTail");
2639 dictAdd(c
->db
->dict
,c
->argv
[1],lobj
);
2640 incrRefCount(c
->argv
[1]);
2641 incrRefCount(c
->argv
[2]);
2643 if (lobj
->type
!= REDIS_LIST
) {
2644 addReply(c
,shared
.wrongtypeerr
);
2648 if (where
== REDIS_HEAD
) {
2649 if (!listAddNodeHead(list
,c
->argv
[2])) oom("listAddNodeHead");
2651 if (!listAddNodeTail(list
,c
->argv
[2])) oom("listAddNodeTail");
2653 incrRefCount(c
->argv
[2]);
2656 addReply(c
,shared
.ok
);
2659 static void lpushCommand(redisClient
*c
) {
2660 pushGenericCommand(c
,REDIS_HEAD
);
2663 static void rpushCommand(redisClient
*c
) {
2664 pushGenericCommand(c
,REDIS_TAIL
);
2667 static void llenCommand(redisClient
*c
) {
2671 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2673 addReply(c
,shared
.czero
);
2676 if (o
->type
!= REDIS_LIST
) {
2677 addReply(c
,shared
.wrongtypeerr
);
2680 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",listLength(l
)));
2685 static void lindexCommand(redisClient
*c
) {
2687 int index
= atoi(c
->argv
[2]->ptr
);
2689 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2691 addReply(c
,shared
.nullbulk
);
2693 if (o
->type
!= REDIS_LIST
) {
2694 addReply(c
,shared
.wrongtypeerr
);
2696 list
*list
= o
->ptr
;
2699 ln
= listIndex(list
, index
);
2701 addReply(c
,shared
.nullbulk
);
2703 robj
*ele
= listNodeValue(ln
);
2704 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele
->ptr
)));
2706 addReply(c
,shared
.crlf
);
2712 static void lsetCommand(redisClient
*c
) {
2714 int index
= atoi(c
->argv
[2]->ptr
);
2716 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2718 addReply(c
,shared
.nokeyerr
);
2720 if (o
->type
!= REDIS_LIST
) {
2721 addReply(c
,shared
.wrongtypeerr
);
2723 list
*list
= o
->ptr
;
2726 ln
= listIndex(list
, index
);
2728 addReply(c
,shared
.outofrangeerr
);
2730 robj
*ele
= listNodeValue(ln
);
2733 listNodeValue(ln
) = c
->argv
[3];
2734 incrRefCount(c
->argv
[3]);
2735 addReply(c
,shared
.ok
);
2742 static void popGenericCommand(redisClient
*c
, int where
) {
2745 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2747 addReply(c
,shared
.nullbulk
);
2749 if (o
->type
!= REDIS_LIST
) {
2750 addReply(c
,shared
.wrongtypeerr
);
2752 list
*list
= o
->ptr
;
2755 if (where
== REDIS_HEAD
)
2756 ln
= listFirst(list
);
2758 ln
= listLast(list
);
2761 addReply(c
,shared
.nullbulk
);
2763 robj
*ele
= listNodeValue(ln
);
2764 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele
->ptr
)));
2766 addReply(c
,shared
.crlf
);
2767 listDelNode(list
,ln
);
2774 static void lpopCommand(redisClient
*c
) {
2775 popGenericCommand(c
,REDIS_HEAD
);
2778 static void rpopCommand(redisClient
*c
) {
2779 popGenericCommand(c
,REDIS_TAIL
);
2782 static void lrangeCommand(redisClient
*c
) {
2784 int start
= atoi(c
->argv
[2]->ptr
);
2785 int end
= atoi(c
->argv
[3]->ptr
);
2787 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2789 addReply(c
,shared
.nullmultibulk
);
2791 if (o
->type
!= REDIS_LIST
) {
2792 addReply(c
,shared
.wrongtypeerr
);
2794 list
*list
= o
->ptr
;
2796 int llen
= listLength(list
);
2800 /* convert negative indexes */
2801 if (start
< 0) start
= llen
+start
;
2802 if (end
< 0) end
= llen
+end
;
2803 if (start
< 0) start
= 0;
2804 if (end
< 0) end
= 0;
2806 /* indexes sanity checks */
2807 if (start
> end
|| start
>= llen
) {
2808 /* Out of range start or start > end result in empty list */
2809 addReply(c
,shared
.emptymultibulk
);
2812 if (end
>= llen
) end
= llen
-1;
2813 rangelen
= (end
-start
)+1;
2815 /* Return the result in form of a multi-bulk reply */
2816 ln
= listIndex(list
, start
);
2817 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",rangelen
));
2818 for (j
= 0; j
< rangelen
; j
++) {
2819 ele
= listNodeValue(ln
);
2820 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele
->ptr
)));
2822 addReply(c
,shared
.crlf
);
2829 static void ltrimCommand(redisClient
*c
) {
2831 int start
= atoi(c
->argv
[2]->ptr
);
2832 int end
= atoi(c
->argv
[3]->ptr
);
2834 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2836 addReply(c
,shared
.nokeyerr
);
2838 if (o
->type
!= REDIS_LIST
) {
2839 addReply(c
,shared
.wrongtypeerr
);
2841 list
*list
= o
->ptr
;
2843 int llen
= listLength(list
);
2844 int j
, ltrim
, rtrim
;
2846 /* convert negative indexes */
2847 if (start
< 0) start
= llen
+start
;
2848 if (end
< 0) end
= llen
+end
;
2849 if (start
< 0) start
= 0;
2850 if (end
< 0) end
= 0;
2852 /* indexes sanity checks */
2853 if (start
> end
|| start
>= llen
) {
2854 /* Out of range start or start > end result in empty list */
2858 if (end
>= llen
) end
= llen
-1;
2863 /* Remove list elements to perform the trim */
2864 for (j
= 0; j
< ltrim
; j
++) {
2865 ln
= listFirst(list
);
2866 listDelNode(list
,ln
);
2868 for (j
= 0; j
< rtrim
; j
++) {
2869 ln
= listLast(list
);
2870 listDelNode(list
,ln
);
2872 addReply(c
,shared
.ok
);
2878 static void lremCommand(redisClient
*c
) {
2881 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2883 addReply(c
,shared
.czero
);
2885 if (o
->type
!= REDIS_LIST
) {
2886 addReply(c
,shared
.wrongtypeerr
);
2888 list
*list
= o
->ptr
;
2889 listNode
*ln
, *next
;
2890 int toremove
= atoi(c
->argv
[2]->ptr
);
2895 toremove
= -toremove
;
2898 ln
= fromtail
? list
->tail
: list
->head
;
2900 robj
*ele
= listNodeValue(ln
);
2902 next
= fromtail
? ln
->prev
: ln
->next
;
2903 if (sdscmp(ele
->ptr
,c
->argv
[3]->ptr
) == 0) {
2904 listDelNode(list
,ln
);
2907 if (toremove
&& removed
== toremove
) break;
2911 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",removed
));
2916 /* ==================================== Sets ================================ */
2918 static void saddCommand(redisClient
*c
) {
2921 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2923 set
= createSetObject();
2924 dictAdd(c
->db
->dict
,c
->argv
[1],set
);
2925 incrRefCount(c
->argv
[1]);
2927 if (set
->type
!= REDIS_SET
) {
2928 addReply(c
,shared
.wrongtypeerr
);
2932 if (dictAdd(set
->ptr
,c
->argv
[2],NULL
) == DICT_OK
) {
2933 incrRefCount(c
->argv
[2]);
2935 addReply(c
,shared
.cone
);
2937 addReply(c
,shared
.czero
);
2941 static void sremCommand(redisClient
*c
) {
2944 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2946 addReply(c
,shared
.czero
);
2948 if (set
->type
!= REDIS_SET
) {
2949 addReply(c
,shared
.wrongtypeerr
);
2952 if (dictDelete(set
->ptr
,c
->argv
[2]) == DICT_OK
) {
2954 addReply(c
,shared
.cone
);
2956 addReply(c
,shared
.czero
);
2961 static void smoveCommand(redisClient
*c
) {
2962 robj
*srcset
, *dstset
;
2964 srcset
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2965 dstset
= lookupKeyWrite(c
->db
,c
->argv
[2]);
2967 /* If the source key does not exist return 0, if it's of the wrong type
2969 if (srcset
== NULL
|| srcset
->type
!= REDIS_SET
) {
2970 addReply(c
, srcset
? shared
.wrongtypeerr
: shared
.czero
);
2973 /* Error if the destination key is not a set as well */
2974 if (dstset
&& dstset
->type
!= REDIS_SET
) {
2975 addReply(c
,shared
.wrongtypeerr
);
2978 /* Remove the element from the source set */
2979 if (dictDelete(srcset
->ptr
,c
->argv
[3]) == DICT_ERR
) {
2980 /* Key not found in the src set! return zero */
2981 addReply(c
,shared
.czero
);
2985 /* Add the element to the destination set */
2987 dstset
= createSetObject();
2988 dictAdd(c
->db
->dict
,c
->argv
[2],dstset
);
2989 incrRefCount(c
->argv
[2]);
2991 if (dictAdd(dstset
->ptr
,c
->argv
[3],NULL
) == DICT_OK
)
2992 incrRefCount(c
->argv
[3]);
2993 addReply(c
,shared
.cone
);
2996 static void sismemberCommand(redisClient
*c
) {
2999 set
= lookupKeyRead(c
->db
,c
->argv
[1]);
3001 addReply(c
,shared
.czero
);
3003 if (set
->type
!= REDIS_SET
) {
3004 addReply(c
,shared
.wrongtypeerr
);
3007 if (dictFind(set
->ptr
,c
->argv
[2]))
3008 addReply(c
,shared
.cone
);
3010 addReply(c
,shared
.czero
);
3014 static void scardCommand(redisClient
*c
) {
3018 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3020 addReply(c
,shared
.czero
);
3023 if (o
->type
!= REDIS_SET
) {
3024 addReply(c
,shared
.wrongtypeerr
);
3027 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",
3033 static int qsortCompareSetsByCardinality(const void *s1
, const void *s2
) {
3034 dict
**d1
= (void*) s1
, **d2
= (void*) s2
;
3036 return dictSize(*d1
)-dictSize(*d2
);
3039 static void sinterGenericCommand(redisClient
*c
, robj
**setskeys
, int setsnum
, robj
*dstkey
) {
3040 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
3043 robj
*lenobj
= NULL
, *dstset
= NULL
;
3044 int j
, cardinality
= 0;
3046 if (!dv
) oom("sinterGenericCommand");
3047 for (j
= 0; j
< setsnum
; j
++) {
3051 lookupKeyWrite(c
->db
,setskeys
[j
]) :
3052 lookupKeyRead(c
->db
,setskeys
[j
]);
3056 deleteKey(c
->db
,dstkey
);
3057 addReply(c
,shared
.ok
);
3059 addReply(c
,shared
.nullmultibulk
);
3063 if (setobj
->type
!= REDIS_SET
) {
3065 addReply(c
,shared
.wrongtypeerr
);
3068 dv
[j
] = setobj
->ptr
;
3070 /* Sort sets from the smallest to largest, this will improve our
3071 * algorithm's performace */
3072 qsort(dv
,setsnum
,sizeof(dict
*),qsortCompareSetsByCardinality
);
3074 /* The first thing we should output is the total number of elements...
3075 * since this is a multi-bulk write, but at this stage we don't know
3076 * the intersection set size, so we use a trick, append an empty object
3077 * to the output list and save the pointer to later modify it with the
3080 lenobj
= createObject(REDIS_STRING
,NULL
);
3082 decrRefCount(lenobj
);
3084 /* If we have a target key where to store the resulting set
3085 * create this key with an empty set inside */
3086 dstset
= createSetObject();
3089 /* Iterate all the elements of the first (smallest) set, and test
3090 * the element against all the other sets, if at least one set does
3091 * not include the element it is discarded */
3092 di
= dictGetIterator(dv
[0]);
3093 if (!di
) oom("dictGetIterator");
3095 while((de
= dictNext(di
)) != NULL
) {
3098 for (j
= 1; j
< setsnum
; j
++)
3099 if (dictFind(dv
[j
],dictGetEntryKey(de
)) == NULL
) break;
3101 continue; /* at least one set does not contain the member */
3102 ele
= dictGetEntryKey(de
);
3104 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(ele
->ptr
)));
3106 addReply(c
,shared
.crlf
);
3109 dictAdd(dstset
->ptr
,ele
,NULL
);
3113 dictReleaseIterator(di
);
3116 /* Store the resulting set into the target */
3117 deleteKey(c
->db
,dstkey
);
3118 dictAdd(c
->db
->dict
,dstkey
,dstset
);
3119 incrRefCount(dstkey
);
3123 lenobj
->ptr
= sdscatprintf(sdsempty(),"*%d\r\n",cardinality
);
3125 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",
3126 dictSize((dict
*)dstset
->ptr
)));
3132 static void sinterCommand(redisClient
*c
) {
3133 sinterGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
);
3136 static void sinterstoreCommand(redisClient
*c
) {
3137 sinterGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1]);
3140 #define REDIS_OP_UNION 0
3141 #define REDIS_OP_DIFF 1
3143 static void sunionDiffGenericCommand(redisClient
*c
, robj
**setskeys
, int setsnum
, robj
*dstkey
, int op
) {
3144 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
3147 robj
*dstset
= NULL
;
3148 int j
, cardinality
= 0;
3150 if (!dv
) oom("sunionDiffGenericCommand");
3151 for (j
= 0; j
< setsnum
; j
++) {
3155 lookupKeyWrite(c
->db
,setskeys
[j
]) :
3156 lookupKeyRead(c
->db
,setskeys
[j
]);
3161 if (setobj
->type
!= REDIS_SET
) {
3163 addReply(c
,shared
.wrongtypeerr
);
3166 dv
[j
] = setobj
->ptr
;
3169 /* We need a temp set object to store our union. If the dstkey
3170 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
3171 * this set object will be the resulting object to set into the target key*/
3172 dstset
= createSetObject();
3174 /* Iterate all the elements of all the sets, add every element a single
3175 * time to the result set */
3176 for (j
= 0; j
< setsnum
; j
++) {
3177 if (op
== REDIS_OP_DIFF
&& j
== 0 && !dv
[j
]) break; /* result set is empty */
3178 if (!dv
[j
]) continue; /* non existing keys are like empty sets */
3180 di
= dictGetIterator(dv
[j
]);
3181 if (!di
) oom("dictGetIterator");
3183 while((de
= dictNext(di
)) != NULL
) {
3186 /* dictAdd will not add the same element multiple times */
3187 ele
= dictGetEntryKey(de
);
3188 if (op
== REDIS_OP_UNION
|| j
== 0) {
3189 if (dictAdd(dstset
->ptr
,ele
,NULL
) == DICT_OK
) {
3193 } else if (op
== REDIS_OP_DIFF
) {
3194 if (dictDelete(dstset
->ptr
,ele
) == DICT_OK
) {
3199 dictReleaseIterator(di
);
3201 if (op
== REDIS_OP_DIFF
&& cardinality
== 0) break; /* result set is empty */
3204 /* Output the content of the resulting set, if not in STORE mode */
3206 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",cardinality
));
3207 di
= dictGetIterator(dstset
->ptr
);
3208 if (!di
) oom("dictGetIterator");
3209 while((de
= dictNext(di
)) != NULL
) {
3212 ele
= dictGetEntryKey(de
);
3213 addReplySds(c
,sdscatprintf(sdsempty(),
3214 "$%d\r\n",sdslen(ele
->ptr
)));
3216 addReply(c
,shared
.crlf
);
3218 dictReleaseIterator(di
);
3220 /* If we have a target key where to store the resulting set
3221 * create this key with the result set inside */
3222 deleteKey(c
->db
,dstkey
);
3223 dictAdd(c
->db
->dict
,dstkey
,dstset
);
3224 incrRefCount(dstkey
);
3229 decrRefCount(dstset
);
3231 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",
3232 dictSize((dict
*)dstset
->ptr
)));
3238 static void sunionCommand(redisClient
*c
) {
3239 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_UNION
);
3242 static void sunionstoreCommand(redisClient
*c
) {
3243 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_UNION
);
3246 static void sdiffCommand(redisClient
*c
) {
3247 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_DIFF
);
3250 static void sdiffstoreCommand(redisClient
*c
) {
3251 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_DIFF
);
3254 static void flushdbCommand(redisClient
*c
) {
3255 server
.dirty
+= dictSize(c
->db
->dict
);
3256 dictEmpty(c
->db
->dict
);
3257 dictEmpty(c
->db
->expires
);
3258 addReply(c
,shared
.ok
);
3261 static void flushallCommand(redisClient
*c
) {
3262 server
.dirty
+= emptyDb();
3263 addReply(c
,shared
.ok
);
3264 rdbSave(server
.dbfilename
);
3268 redisSortOperation
*createSortOperation(int type
, robj
*pattern
) {
3269 redisSortOperation
*so
= zmalloc(sizeof(*so
));
3270 if (!so
) oom("createSortOperation");
3272 so
->pattern
= pattern
;
3276 /* Return the value associated to the key with a name obtained
3277 * substituting the first occurence of '*' in 'pattern' with 'subst' */
3278 robj
*lookupKeyByPattern(redisDb
*db
, robj
*pattern
, robj
*subst
) {
3282 int prefixlen
, sublen
, postfixlen
;
3283 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
3287 char buf
[REDIS_SORTKEY_MAX
+1];
3290 spat
= pattern
->ptr
;
3292 if (sdslen(spat
)+sdslen(ssub
)-1 > REDIS_SORTKEY_MAX
) return NULL
;
3293 p
= strchr(spat
,'*');
3294 if (!p
) return NULL
;
3297 sublen
= sdslen(ssub
);
3298 postfixlen
= sdslen(spat
)-(prefixlen
+1);
3299 memcpy(keyname
.buf
,spat
,prefixlen
);
3300 memcpy(keyname
.buf
+prefixlen
,ssub
,sublen
);
3301 memcpy(keyname
.buf
+prefixlen
+sublen
,p
+1,postfixlen
);
3302 keyname
.buf
[prefixlen
+sublen
+postfixlen
] = '\0';
3303 keyname
.len
= prefixlen
+sublen
+postfixlen
;
3305 keyobj
.refcount
= 1;
3306 keyobj
.type
= REDIS_STRING
;
3307 keyobj
.ptr
= ((char*)&keyname
)+(sizeof(long)*2);
3309 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
3310 return lookupKeyRead(db
,&keyobj
);
3313 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
3314 * the additional parameter is not standard but a BSD-specific we have to
3315 * pass sorting parameters via the global 'server' structure */
3316 static int sortCompare(const void *s1
, const void *s2
) {
3317 const redisSortObject
*so1
= s1
, *so2
= s2
;
3320 if (!server
.sort_alpha
) {
3321 /* Numeric sorting. Here it's trivial as we precomputed scores */
3322 if (so1
->u
.score
> so2
->u
.score
) {
3324 } else if (so1
->u
.score
< so2
->u
.score
) {
3330 /* Alphanumeric sorting */
3331 if (server
.sort_bypattern
) {
3332 if (!so1
->u
.cmpobj
|| !so2
->u
.cmpobj
) {
3333 /* At least one compare object is NULL */
3334 if (so1
->u
.cmpobj
== so2
->u
.cmpobj
)
3336 else if (so1
->u
.cmpobj
== NULL
)
3341 /* We have both the objects, use strcoll */
3342 cmp
= strcoll(so1
->u
.cmpobj
->ptr
,so2
->u
.cmpobj
->ptr
);
3345 /* Compare elements directly */
3346 cmp
= strcoll(so1
->obj
->ptr
,so2
->obj
->ptr
);
3349 return server
.sort_desc
? -cmp
: cmp
;
3352 /* The SORT command is the most complex command in Redis. Warning: this code
3353 * is optimized for speed and a bit less for readability */
3354 static void sortCommand(redisClient
*c
) {
3357 int desc
= 0, alpha
= 0;
3358 int limit_start
= 0, limit_count
= -1, start
, end
;
3359 int j
, dontsort
= 0, vectorlen
;
3360 int getop
= 0; /* GET operation counter */
3361 robj
*sortval
, *sortby
= NULL
;
3362 redisSortObject
*vector
; /* Resulting vector to sort */
3364 /* Lookup the key to sort. It must be of the right types */
3365 sortval
= lookupKeyRead(c
->db
,c
->argv
[1]);
3366 if (sortval
== NULL
) {
3367 addReply(c
,shared
.nokeyerr
);
3370 if (sortval
->type
!= REDIS_SET
&& sortval
->type
!= REDIS_LIST
) {
3371 addReply(c
,shared
.wrongtypeerr
);
3375 /* Create a list of operations to perform for every sorted element.
3376 * Operations can be GET/DEL/INCR/DECR */
3377 operations
= listCreate();
3378 listSetFreeMethod(operations
,zfree
);
3381 /* Now we need to protect sortval incrementing its count, in the future
3382 * SORT may have options able to overwrite/delete keys during the sorting
3383 * and the sorted key itself may get destroied */
3384 incrRefCount(sortval
);
3386 /* The SORT command has an SQL-alike syntax, parse it */
3387 while(j
< c
->argc
) {
3388 int leftargs
= c
->argc
-j
-1;
3389 if (!strcasecmp(c
->argv
[j
]->ptr
,"asc")) {
3391 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"desc")) {
3393 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"alpha")) {
3395 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"limit") && leftargs
>= 2) {
3396 limit_start
= atoi(c
->argv
[j
+1]->ptr
);
3397 limit_count
= atoi(c
->argv
[j
+2]->ptr
);
3399 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"by") && leftargs
>= 1) {
3400 sortby
= c
->argv
[j
+1];
3401 /* If the BY pattern does not contain '*', i.e. it is constant,
3402 * we don't need to sort nor to lookup the weight keys. */
3403 if (strchr(c
->argv
[j
+1]->ptr
,'*') == NULL
) dontsort
= 1;
3405 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"get") && leftargs
>= 1) {
3406 listAddNodeTail(operations
,createSortOperation(
3407 REDIS_SORT_GET
,c
->argv
[j
+1]));
3410 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"del") && leftargs
>= 1) {
3411 listAddNodeTail(operations
,createSortOperation(
3412 REDIS_SORT_DEL
,c
->argv
[j
+1]));
3414 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"incr") && leftargs
>= 1) {
3415 listAddNodeTail(operations
,createSortOperation(
3416 REDIS_SORT_INCR
,c
->argv
[j
+1]));
3418 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"get") && leftargs
>= 1) {
3419 listAddNodeTail(operations
,createSortOperation(
3420 REDIS_SORT_DECR
,c
->argv
[j
+1]));
3423 decrRefCount(sortval
);
3424 listRelease(operations
);
3425 addReply(c
,shared
.syntaxerr
);
3431 /* Load the sorting vector with all the objects to sort */
3432 vectorlen
= (sortval
->type
== REDIS_LIST
) ?
3433 listLength((list
*)sortval
->ptr
) :
3434 dictSize((dict
*)sortval
->ptr
);
3435 vector
= zmalloc(sizeof(redisSortObject
)*vectorlen
);
3436 if (!vector
) oom("allocating objects vector for SORT");
3438 if (sortval
->type
== REDIS_LIST
) {
3439 list
*list
= sortval
->ptr
;
3443 while((ln
= listYield(list
))) {
3444 robj
*ele
= ln
->value
;
3445 vector
[j
].obj
= ele
;
3446 vector
[j
].u
.score
= 0;
3447 vector
[j
].u
.cmpobj
= NULL
;
3451 dict
*set
= sortval
->ptr
;
3455 di
= dictGetIterator(set
);
3456 if (!di
) oom("dictGetIterator");
3457 while((setele
= dictNext(di
)) != NULL
) {
3458 vector
[j
].obj
= dictGetEntryKey(setele
);
3459 vector
[j
].u
.score
= 0;
3460 vector
[j
].u
.cmpobj
= NULL
;
3463 dictReleaseIterator(di
);
3465 assert(j
== vectorlen
);
3467 /* Now it's time to load the right scores in the sorting vector */
3468 if (dontsort
== 0) {
3469 for (j
= 0; j
< vectorlen
; j
++) {
3473 byval
= lookupKeyByPattern(c
->db
,sortby
,vector
[j
].obj
);
3474 if (!byval
|| byval
->type
!= REDIS_STRING
) continue;
3476 vector
[j
].u
.cmpobj
= byval
;
3477 incrRefCount(byval
);
3479 vector
[j
].u
.score
= strtod(byval
->ptr
,NULL
);
3482 if (!alpha
) vector
[j
].u
.score
= strtod(vector
[j
].obj
->ptr
,NULL
);
3487 /* We are ready to sort the vector... perform a bit of sanity check
3488 * on the LIMIT option too. We'll use a partial version of quicksort. */
3489 start
= (limit_start
< 0) ? 0 : limit_start
;
3490 end
= (limit_count
< 0) ? vectorlen
-1 : start
+limit_count
-1;
3491 if (start
>= vectorlen
) {
3492 start
= vectorlen
-1;
3495 if (end
>= vectorlen
) end
= vectorlen
-1;
3497 if (dontsort
== 0) {
3498 server
.sort_desc
= desc
;
3499 server
.sort_alpha
= alpha
;
3500 server
.sort_bypattern
= sortby
? 1 : 0;
3501 if (sortby
&& (start
!= 0 || end
!= vectorlen
-1))
3502 pqsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
, start
,end
);
3504 qsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
);
3507 /* Send command output to the output buffer, performing the specified
3508 * GET/DEL/INCR/DECR operations if any. */
3509 outputlen
= getop
? getop
*(end
-start
+1) : end
-start
+1;
3510 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",outputlen
));
3511 for (j
= start
; j
<= end
; j
++) {
3514 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",
3515 sdslen(vector
[j
].obj
->ptr
)));
3516 addReply(c
,vector
[j
].obj
);
3517 addReply(c
,shared
.crlf
);
3519 listRewind(operations
);
3520 while((ln
= listYield(operations
))) {
3521 redisSortOperation
*sop
= ln
->value
;
3522 robj
*val
= lookupKeyByPattern(c
->db
,sop
->pattern
,
3525 if (sop
->type
== REDIS_SORT_GET
) {
3526 if (!val
|| val
->type
!= REDIS_STRING
) {
3527 addReply(c
,shared
.nullbulk
);
3529 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",
3532 addReply(c
,shared
.crlf
);
3534 } else if (sop
->type
== REDIS_SORT_DEL
) {
3541 decrRefCount(sortval
);
3542 listRelease(operations
);
3543 for (j
= 0; j
< vectorlen
; j
++) {
3544 if (sortby
&& alpha
&& vector
[j
].u
.cmpobj
)
3545 decrRefCount(vector
[j
].u
.cmpobj
);
3550 static void infoCommand(redisClient
*c
) {
3552 time_t uptime
= time(NULL
)-server
.stat_starttime
;
3554 info
= sdscatprintf(sdsempty(),
3555 "redis_version:%s\r\n"
3556 "uptime_in_seconds:%d\r\n"
3557 "uptime_in_days:%d\r\n"
3558 "connected_clients:%d\r\n"
3559 "connected_slaves:%d\r\n"
3560 "used_memory:%zu\r\n"
3561 "changes_since_last_save:%lld\r\n"
3562 "bgsave_in_progress:%d\r\n"
3563 "last_save_time:%d\r\n"
3564 "total_connections_received:%lld\r\n"
3565 "total_commands_processed:%lld\r\n"
3570 listLength(server
.clients
)-listLength(server
.slaves
),
3571 listLength(server
.slaves
),
3574 server
.bgsaveinprogress
,
3576 server
.stat_numconnections
,
3577 server
.stat_numcommands
,
3578 server
.masterhost
== NULL
? "master" : "slave"
3580 if (server
.masterhost
) {
3581 info
= sdscatprintf(info
,
3582 "master_host:%s\r\n"
3583 "master_port:%d\r\n"
3584 "master_link_status:%s\r\n"
3585 "master_last_io_seconds_ago:%d\r\n"
3588 (server
.replstate
== REDIS_REPL_CONNECTED
) ?
3590 (int)(time(NULL
)-server
.master
->lastinteraction
)
3593 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info
)));
3594 addReplySds(c
,info
);
3595 addReply(c
,shared
.crlf
);
3598 static void monitorCommand(redisClient
*c
) {
3599 /* ignore MONITOR if aleady slave or in monitor mode */
3600 if (c
->flags
& REDIS_SLAVE
) return;
3602 c
->flags
|= (REDIS_SLAVE
|REDIS_MONITOR
);
3604 if (!listAddNodeTail(server
.monitors
,c
)) oom("listAddNodeTail");
3605 addReply(c
,shared
.ok
);
3608 /* ================================= Expire ================================= */
3609 static int removeExpire(redisDb
*db
, robj
*key
) {
3610 if (dictDelete(db
->expires
,key
) == DICT_OK
) {
3617 static int setExpire(redisDb
*db
, robj
*key
, time_t when
) {
3618 if (dictAdd(db
->expires
,key
,(void*)when
) == DICT_ERR
) {
3626 /* Return the expire time of the specified key, or -1 if no expire
3627 * is associated with this key (i.e. the key is non volatile) */
3628 static time_t getExpire(redisDb
*db
, robj
*key
) {
3631 /* No expire? return ASAP */
3632 if (dictSize(db
->expires
) == 0 ||
3633 (de
= dictFind(db
->expires
,key
)) == NULL
) return -1;
3635 return (time_t) dictGetEntryVal(de
);
3638 static int expireIfNeeded(redisDb
*db
, robj
*key
) {
3642 /* No expire? return ASAP */
3643 if (dictSize(db
->expires
) == 0 ||
3644 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
3646 /* Lookup the expire */
3647 when
= (time_t) dictGetEntryVal(de
);
3648 if (time(NULL
) <= when
) return 0;
3650 /* Delete the key */
3651 dictDelete(db
->expires
,key
);
3652 return dictDelete(db
->dict
,key
) == DICT_OK
;
3655 static int deleteIfVolatile(redisDb
*db
, robj
*key
) {
3658 /* No expire? return ASAP */
3659 if (dictSize(db
->expires
) == 0 ||
3660 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
3662 /* Delete the key */
3664 dictDelete(db
->expires
,key
);
3665 return dictDelete(db
->dict
,key
) == DICT_OK
;
3668 static void expireCommand(redisClient
*c
) {
3670 int seconds
= atoi(c
->argv
[2]->ptr
);
3672 de
= dictFind(c
->db
->dict
,c
->argv
[1]);
3674 addReply(c
,shared
.czero
);
3678 addReply(c
, shared
.czero
);
3681 time_t when
= time(NULL
)+seconds
;
3682 if (setExpire(c
->db
,c
->argv
[1],when
))
3683 addReply(c
,shared
.cone
);
3685 addReply(c
,shared
.czero
);
3690 static void ttlCommand(redisClient
*c
) {
3694 expire
= getExpire(c
->db
,c
->argv
[1]);
3696 ttl
= (int) (expire
-time(NULL
));
3697 if (ttl
< 0) ttl
= -1;
3699 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",ttl
));
3702 /* =============================== Replication ============================= */
3704 static int syncWrite(int fd
, char *ptr
, ssize_t size
, int timeout
) {
3705 ssize_t nwritten
, ret
= size
;
3706 time_t start
= time(NULL
);
3710 if (aeWait(fd
,AE_WRITABLE
,1000) & AE_WRITABLE
) {
3711 nwritten
= write(fd
,ptr
,size
);
3712 if (nwritten
== -1) return -1;
3716 if ((time(NULL
)-start
) > timeout
) {
3724 static int syncRead(int fd
, char *ptr
, ssize_t size
, int timeout
) {
3725 ssize_t nread
, totread
= 0;
3726 time_t start
= time(NULL
);
3730 if (aeWait(fd
,AE_READABLE
,1000) & AE_READABLE
) {
3731 nread
= read(fd
,ptr
,size
);
3732 if (nread
== -1) return -1;
3737 if ((time(NULL
)-start
) > timeout
) {
3745 static int syncReadLine(int fd
, char *ptr
, ssize_t size
, int timeout
) {
3752 if (syncRead(fd
,&c
,1,timeout
) == -1) return -1;
3755 if (nread
&& *(ptr
-1) == '\r') *(ptr
-1) = '\0';
3766 static void syncCommand(redisClient
*c
) {
3767 /* ignore SYNC if aleady slave or in monitor mode */
3768 if (c
->flags
& REDIS_SLAVE
) return;
3770 /* SYNC can't be issued when the server has pending data to send to
3771 * the client about already issued commands. We need a fresh reply
3772 * buffer registering the differences between the BGSAVE and the current
3773 * dataset, so that we can copy to other slaves if needed. */
3774 if (listLength(c
->reply
) != 0) {
3775 addReplySds(c
,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
3779 redisLog(REDIS_NOTICE
,"Slave ask for synchronization");
3780 /* Here we need to check if there is a background saving operation
3781 * in progress, or if it is required to start one */
3782 if (server
.bgsaveinprogress
) {
3783 /* Ok a background save is in progress. Let's check if it is a good
3784 * one for replication, i.e. if there is another slave that is
3785 * registering differences since the server forked to save */
3789 listRewind(server
.slaves
);
3790 while((ln
= listYield(server
.slaves
))) {
3792 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) break;
3795 /* Perfect, the server is already registering differences for
3796 * another slave. Set the right state, and copy the buffer. */
3797 listRelease(c
->reply
);
3798 c
->reply
= listDup(slave
->reply
);
3799 if (!c
->reply
) oom("listDup copying slave reply list");
3800 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
3801 redisLog(REDIS_NOTICE
,"Waiting for end of BGSAVE for SYNC");
3803 /* No way, we need to wait for the next BGSAVE in order to
3804 * register differences */
3805 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_START
;
3806 redisLog(REDIS_NOTICE
,"Waiting for next BGSAVE for SYNC");
3809 /* Ok we don't have a BGSAVE in progress, let's start one */
3810 redisLog(REDIS_NOTICE
,"Starting BGSAVE for SYNC");
3811 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
3812 redisLog(REDIS_NOTICE
,"Replication failed, can't BGSAVE");
3813 addReplySds(c
,sdsnew("-ERR Unalbe to perform background save\r\n"));
3816 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
3819 c
->flags
|= REDIS_SLAVE
;
3821 if (!listAddNodeTail(server
.slaves
,c
)) oom("listAddNodeTail");
3825 static void sendBulkToSlave(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
3826 redisClient
*slave
= privdata
;
3828 REDIS_NOTUSED(mask
);
3829 char buf
[REDIS_IOBUF_LEN
];
3830 ssize_t nwritten
, buflen
;
3832 if (slave
->repldboff
== 0) {
3833 /* Write the bulk write count before to transfer the DB. In theory here
3834 * we don't know how much room there is in the output buffer of the
3835 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
3836 * operations) will never be smaller than the few bytes we need. */
3839 bulkcount
= sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
3841 if (write(fd
,bulkcount
,sdslen(bulkcount
)) != (signed)sdslen(bulkcount
))
3849 lseek(slave
->repldbfd
,slave
->repldboff
,SEEK_SET
);
3850 buflen
= read(slave
->repldbfd
,buf
,REDIS_IOBUF_LEN
);
3852 redisLog(REDIS_WARNING
,"Read error sending DB to slave: %s",
3853 (buflen
== 0) ? "premature EOF" : strerror(errno
));
3857 if ((nwritten
= write(fd
,buf
,buflen
)) == -1) {
3858 redisLog(REDIS_DEBUG
,"Write error sending DB to slave: %s",
3863 slave
->repldboff
+= nwritten
;
3864 if (slave
->repldboff
== slave
->repldbsize
) {
3865 close(slave
->repldbfd
);
3866 slave
->repldbfd
= -1;
3867 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
3868 slave
->replstate
= REDIS_REPL_ONLINE
;
3869 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
,
3870 sendReplyToClient
, slave
, NULL
) == AE_ERR
) {
3874 addReplySds(slave
,sdsempty());
3875 redisLog(REDIS_NOTICE
,"Synchronization with slave succeeded");
3879 static void updateSalvesWaitingBgsave(int bgsaveerr
) {
3881 int startbgsave
= 0;
3883 listRewind(server
.slaves
);
3884 while((ln
= listYield(server
.slaves
))) {
3885 redisClient
*slave
= ln
->value
;
3887 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) {
3889 slave
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
3890 } else if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) {
3891 struct redis_stat buf
;
3893 if (bgsaveerr
!= REDIS_OK
) {
3895 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE child returned an error");
3898 if ((slave
->repldbfd
= open(server
.dbfilename
,O_RDONLY
)) == -1 ||
3899 redis_fstat(slave
->repldbfd
,&buf
) == -1) {
3901 redisLog(REDIS_WARNING
,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno
));
3904 slave
->repldboff
= 0;
3905 slave
->repldbsize
= buf
.st_size
;
3906 slave
->replstate
= REDIS_REPL_SEND_BULK
;
3907 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
3908 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
, sendBulkToSlave
, slave
, NULL
) == AE_ERR
) {
3915 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
3916 listRewind(server
.slaves
);
3917 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE failed");
3918 while((ln
= listYield(server
.slaves
))) {
3919 redisClient
*slave
= ln
->value
;
3921 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
)
3928 static int syncWithMaster(void) {
3929 char buf
[1024], tmpfile
[256];
3931 int fd
= anetTcpConnect(NULL
,server
.masterhost
,server
.masterport
);
3935 redisLog(REDIS_WARNING
,"Unable to connect to MASTER: %s",
3939 /* Issue the SYNC command */
3940 if (syncWrite(fd
,"SYNC \r\n",7,5) == -1) {
3942 redisLog(REDIS_WARNING
,"I/O error writing to MASTER: %s",
3946 /* Read the bulk write count */
3947 if (syncReadLine(fd
,buf
,1024,3600) == -1) {
3949 redisLog(REDIS_WARNING
,"I/O error reading bulk count from MASTER: %s",
3953 dumpsize
= atoi(buf
+1);
3954 redisLog(REDIS_NOTICE
,"Receiving %d bytes data dump from MASTER",dumpsize
);
3955 /* Read the bulk write data on a temp file */
3956 snprintf(tmpfile
,256,"temp-%d.%ld.rdb",(int)time(NULL
),(long int)random());
3957 dfd
= open(tmpfile
,O_CREAT
|O_WRONLY
,0644);
3960 redisLog(REDIS_WARNING
,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno
));
3964 int nread
, nwritten
;
3966 nread
= read(fd
,buf
,(dumpsize
< 1024)?dumpsize
:1024);
3968 redisLog(REDIS_WARNING
,"I/O error trying to sync with MASTER: %s",
3974 nwritten
= write(dfd
,buf
,nread
);
3975 if (nwritten
== -1) {
3976 redisLog(REDIS_WARNING
,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno
));
3984 if (rename(tmpfile
,server
.dbfilename
) == -1) {
3985 redisLog(REDIS_WARNING
,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno
));
3991 if (rdbLoad(server
.dbfilename
) != REDIS_OK
) {
3992 redisLog(REDIS_WARNING
,"Failed trying to load the MASTER synchronization DB from disk");
3996 server
.master
= createClient(fd
);
3997 server
.master
->flags
|= REDIS_MASTER
;
3998 server
.replstate
= REDIS_REPL_CONNECTED
;
4002 static void slaveofCommand(redisClient
*c
) {
4003 if (!strcasecmp(c
->argv
[1]->ptr
,"no") &&
4004 !strcasecmp(c
->argv
[2]->ptr
,"one")) {
4005 if (server
.masterhost
) {
4006 sdsfree(server
.masterhost
);
4007 server
.masterhost
= NULL
;
4008 if (server
.master
) freeClient(server
.master
);
4009 server
.replstate
= REDIS_REPL_NONE
;
4010 redisLog(REDIS_NOTICE
,"MASTER MODE enabled (user request)");
4013 sdsfree(server
.masterhost
);
4014 server
.masterhost
= sdsdup(c
->argv
[1]->ptr
);
4015 server
.masterport
= atoi(c
->argv
[2]->ptr
);
4016 if (server
.master
) freeClient(server
.master
);
4017 server
.replstate
= REDIS_REPL_CONNECT
;
4018 redisLog(REDIS_NOTICE
,"SLAVE OF %s:%d enabled (user request)",
4019 server
.masterhost
, server
.masterport
);
4021 addReply(c
,shared
.ok
);
4024 /* ============================ Maxmemory directive ======================== */
4026 /* This function gets called when 'maxmemory' is set on the config file to limit
4027 * the max memory used by the server, and we are out of memory.
4028 * This function will try to, in order:
4030 * - Free objects from the free list
4031 * - Try to remove keys with an EXPIRE set
4033 * It is not possible to free enough memory to reach used-memory < maxmemory
4034 * the server will start refusing commands that will enlarge even more the
4037 static void freeMemoryIfNeeded(void) {
4038 while (server
.maxmemory
&& zmalloc_used_memory() > server
.maxmemory
) {
4039 if (listLength(server
.objfreelist
)) {
4042 listNode
*head
= listFirst(server
.objfreelist
);
4043 o
= listNodeValue(head
);
4044 listDelNode(server
.objfreelist
,head
);
4047 int j
, k
, freed
= 0;
4049 for (j
= 0; j
< server
.dbnum
; j
++) {
4051 robj
*minkey
= NULL
;
4052 struct dictEntry
*de
;
4054 if (dictSize(server
.db
[j
].expires
)) {
4056 /* From a sample of three keys drop the one nearest to
4057 * the natural expire */
4058 for (k
= 0; k
< 3; k
++) {
4061 de
= dictGetRandomKey(server
.db
[j
].expires
);
4062 t
= (time_t) dictGetEntryVal(de
);
4063 if (minttl
== -1 || t
< minttl
) {
4064 minkey
= dictGetEntryKey(de
);
4068 deleteKey(server
.db
+j
,minkey
);
4071 if (!freed
) return; /* nothing to free... */
4076 /* ================================= Debugging ============================== */
4078 static void debugCommand(redisClient
*c
) {
4079 if (!strcasecmp(c
->argv
[1]->ptr
,"segfault")) {
4081 } else if (!strcasecmp(c
->argv
[1]->ptr
,"object") && c
->argc
== 3) {
4082 dictEntry
*de
= dictFind(c
->db
->dict
,c
->argv
[2]);
4086 addReply(c
,shared
.nokeyerr
);
4089 key
= dictGetEntryKey(de
);
4090 val
= dictGetEntryVal(de
);
4091 addReplySds(c
,sdscatprintf(sdsempty(),
4092 "+Key at:%p refcount:%d, value at:%p refcount:%d\r\n",
4093 key
, key
->refcount
, val
, val
->refcount
));
4095 addReplySds(c
,sdsnew(
4096 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>]\r\n"));
4100 /* =================================== Main! ================================ */
4103 int linuxOvercommitMemoryValue(void) {
4104 FILE *fp
= fopen("/proc/sys/vm/overcommit_memory","r");
4108 if (fgets(buf
,64,fp
) == NULL
) {
4117 void linuxOvercommitMemoryWarning(void) {
4118 if (linuxOvercommitMemoryValue() == 0) {
4119 redisLog(REDIS_WARNING
,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'echo 1 > /proc/sys/vm/overcommit_memory' in your init scripts.");
4122 #endif /* __linux__ */
4124 static void daemonize(void) {
4128 if (fork() != 0) exit(0); /* parent exits */
4129 setsid(); /* create a new session */
4131 /* Every output goes to /dev/null. If Redis is daemonized but
4132 * the 'logfile' is set to 'stdout' in the configuration file
4133 * it will not log at all. */
4134 if ((fd
= open("/dev/null", O_RDWR
, 0)) != -1) {
4135 dup2(fd
, STDIN_FILENO
);
4136 dup2(fd
, STDOUT_FILENO
);
4137 dup2(fd
, STDERR_FILENO
);
4138 if (fd
> STDERR_FILENO
) close(fd
);
4140 /* Try to write the pid file */
4141 fp
= fopen(server
.pidfile
,"w");
4143 fprintf(fp
,"%d\n",getpid());
4148 int main(int argc
, char **argv
) {
4150 linuxOvercommitMemoryWarning();
4155 ResetServerSaveParams();
4156 loadServerConfig(argv
[1]);
4157 } else if (argc
> 2) {
4158 fprintf(stderr
,"Usage: ./redis-server [/path/to/redis.conf]\n");
4161 redisLog(REDIS_WARNING
,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
4164 if (server
.daemonize
) daemonize();
4165 redisLog(REDIS_NOTICE
,"Server started, Redis version " REDIS_VERSION
);
4166 if (rdbLoad(server
.dbfilename
) == REDIS_OK
)
4167 redisLog(REDIS_NOTICE
,"DB loaded from disk");
4168 if (aeCreateFileEvent(server
.el
, server
.fd
, AE_READABLE
,
4169 acceptHandler
, NULL
, NULL
) == AE_ERR
) oom("creating file event");
4170 redisLog(REDIS_NOTICE
,"The server is now ready to accept connections on port %d", server
.port
);
4172 aeDeleteEventLoop(server
.el
);