2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
30 #define REDIS_VERSION "0.101"
46 #include <arpa/inet.h>
50 #include <sys/resource.h>
53 #include "ae.h" /* Event driven programming library */
54 #include "sds.h" /* Dynamic safe strings */
55 #include "anet.h" /* Networking the easy way */
56 #include "dict.h" /* Hash tables */
57 #include "adlist.h" /* Linked lists */
58 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
59 #include "lzf.h" /* LZF compression library */
60 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
66 /* Static server configuration */
67 #define REDIS_SERVERPORT 6379 /* TCP port */
68 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
69 #define REDIS_IOBUF_LEN 1024
70 #define REDIS_LOADBUF_LEN 1024
71 #define REDIS_STATIC_ARGS 4
72 #define REDIS_DEFAULT_DBNUM 16
73 #define REDIS_CONFIGLINE_MAX 1024
74 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
75 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
76 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
78 /* Hash table parameters */
79 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
80 #define REDIS_HT_MINSLOTS 16384 /* Never resize the HT under this */
83 #define REDIS_CMD_BULK 1 /* Bulk write command */
84 #define REDIS_CMD_INLINE 2 /* Inline command */
85 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
86 this flags will return an error when the 'maxmemory' option is set in the
87 config file and the server is using more than maxmemory bytes of memory.
88 In short this commands are denied on low memory conditions. */
89 #define REDIS_CMD_DENYOOM 4
92 #define REDIS_STRING 0
97 /* Object types only used for dumping to disk */
98 #define REDIS_EXPIRETIME 253
99 #define REDIS_SELECTDB 254
100 #define REDIS_EOF 255
102 /* Defines related to the dump file format. To store 32 bits lengths for short
103 * keys requires a lot of space, so we check the most significant 2 bits of
104 * the first byte to interpreter the length:
106 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
107 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
108 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
109 * 11|000000 this means: specially encoded object will follow. The six bits
110 * number specify the kind of object that follows.
111 * See the REDIS_RDB_ENC_* defines.
113 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
114 * values, will fit inside. */
115 #define REDIS_RDB_6BITLEN 0
116 #define REDIS_RDB_14BITLEN 1
117 #define REDIS_RDB_32BITLEN 2
118 #define REDIS_RDB_ENCVAL 3
119 #define REDIS_RDB_LENERR UINT_MAX
121 /* When a length of a string object stored on disk has the first two bits
122 * set, the remaining two bits specify a special encoding for the object
123 * accordingly to the following defines: */
124 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
125 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
126 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
127 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
130 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
131 #define REDIS_SLAVE 2 /* This client is a slave server */
132 #define REDIS_MASTER 4 /* This client is a master server */
133 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
135 /* Slave replication state - slave side */
136 #define REDIS_REPL_NONE 0 /* No active replication */
137 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
138 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
140 /* Slave replication state - from the point of view of master
141 * Note that in SEND_BULK and ONLINE state the slave receives new updates
142 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
143 * to start the next background saving in order to send updates to it. */
144 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
145 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
146 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
147 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
149 /* List related stuff */
153 /* Sort operations */
154 #define REDIS_SORT_GET 0
155 #define REDIS_SORT_DEL 1
156 #define REDIS_SORT_INCR 2
157 #define REDIS_SORT_DECR 3
158 #define REDIS_SORT_ASC 4
159 #define REDIS_SORT_DESC 5
160 #define REDIS_SORTKEY_MAX 1024
163 #define REDIS_DEBUG 0
164 #define REDIS_NOTICE 1
165 #define REDIS_WARNING 2
167 /* Anti-warning macro... */
168 #define REDIS_NOTUSED(V) ((void) V)
170 /*================================= Data types ============================== */
172 /* A redis object, that is a type able to hold a string / list / set */
173 typedef struct redisObject
{
179 typedef struct redisDb
{
185 /* With multiplexing we need to take per-clinet state.
186 * Clients are taken in a liked list. */
187 typedef struct redisClient
{
194 int bulklen
; /* bulk read len. -1 if not in bulk read mode */
197 time_t lastinteraction
; /* time of the last interaction, used for timeout */
198 int flags
; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
199 int slaveseldb
; /* slave selected db, if this client is a slave */
200 int authenticated
; /* when requirepass is non-NULL */
201 int replstate
; /* replication state if this is a slave */
202 int repldbfd
; /* replication DB file descriptor */
203 long repldboff
; /* replication DB file offset */
204 off_t repldbsize
; /* replication DB file size */
212 /* Global server state structure */
218 unsigned int sharingpoolsize
;
219 long long dirty
; /* changes to DB from the last save */
221 list
*slaves
, *monitors
;
222 char neterr
[ANET_ERR_LEN
];
224 int cronloops
; /* number of times the cron function run */
225 list
*objfreelist
; /* A list of freed objects to avoid malloc() */
226 time_t lastsave
; /* Unix time of last save succeeede */
227 size_t usedmemory
; /* Used memory in megabytes */
228 /* Fields used only for stats */
229 time_t stat_starttime
; /* server start time */
230 long long stat_numcommands
; /* number of processed commands */
231 long long stat_numconnections
; /* number of connections received */
239 int bgsaveinprogress
;
240 struct saveparam
*saveparams
;
247 /* Replication related */
251 redisClient
*master
; /* client that is master for this slave */
253 unsigned int maxclients
;
254 unsigned int maxmemory
;
255 /* Sort parameters - qsort_r() is only available under BSD so we
256 * have to take this state global, in order to pass it to sortCompare() */
262 typedef void redisCommandProc(redisClient
*c
);
263 struct redisCommand
{
265 redisCommandProc
*proc
;
270 typedef struct _redisSortObject
{
278 typedef struct _redisSortOperation
{
281 } redisSortOperation
;
283 struct sharedObjectsStruct
{
284 robj
*crlf
, *ok
, *err
, *emptybulk
, *czero
, *cone
, *pong
, *space
,
285 *colon
, *nullbulk
, *nullmultibulk
,
286 *emptymultibulk
, *wrongtypeerr
, *nokeyerr
, *syntaxerr
, *sameobjecterr
,
287 *outofrangeerr
, *plus
,
288 *select0
, *select1
, *select2
, *select3
, *select4
,
289 *select5
, *select6
, *select7
, *select8
, *select9
;
292 /*================================ Prototypes =============================== */
294 static void freeStringObject(robj
*o
);
295 static void freeListObject(robj
*o
);
296 static void freeSetObject(robj
*o
);
297 static void decrRefCount(void *o
);
298 static robj
*createObject(int type
, void *ptr
);
299 static void freeClient(redisClient
*c
);
300 static int rdbLoad(char *filename
);
301 static void addReply(redisClient
*c
, robj
*obj
);
302 static void addReplySds(redisClient
*c
, sds s
);
303 static void incrRefCount(robj
*o
);
304 static int rdbSaveBackground(char *filename
);
305 static robj
*createStringObject(char *ptr
, size_t len
);
306 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
);
307 static int syncWithMaster(void);
308 static robj
*tryObjectSharing(robj
*o
);
309 static int removeExpire(redisDb
*db
, robj
*key
);
310 static int expireIfNeeded(redisDb
*db
, robj
*key
);
311 static int deleteIfVolatile(redisDb
*db
, robj
*key
);
312 static int deleteKey(redisDb
*db
, robj
*key
);
313 static time_t getExpire(redisDb
*db
, robj
*key
);
314 static int setExpire(redisDb
*db
, robj
*key
, time_t when
);
315 static void updateSalvesWaitingBgsave(int bgsaveerr
);
316 static void freeMemoryIfNeeded(void);
318 static void authCommand(redisClient
*c
);
319 static void pingCommand(redisClient
*c
);
320 static void echoCommand(redisClient
*c
);
321 static void setCommand(redisClient
*c
);
322 static void setnxCommand(redisClient
*c
);
323 static void getCommand(redisClient
*c
);
324 static void delCommand(redisClient
*c
);
325 static void existsCommand(redisClient
*c
);
326 static void incrCommand(redisClient
*c
);
327 static void decrCommand(redisClient
*c
);
328 static void incrbyCommand(redisClient
*c
);
329 static void decrbyCommand(redisClient
*c
);
330 static void selectCommand(redisClient
*c
);
331 static void randomkeyCommand(redisClient
*c
);
332 static void keysCommand(redisClient
*c
);
333 static void dbsizeCommand(redisClient
*c
);
334 static void lastsaveCommand(redisClient
*c
);
335 static void saveCommand(redisClient
*c
);
336 static void bgsaveCommand(redisClient
*c
);
337 static void shutdownCommand(redisClient
*c
);
338 static void moveCommand(redisClient
*c
);
339 static void renameCommand(redisClient
*c
);
340 static void renamenxCommand(redisClient
*c
);
341 static void lpushCommand(redisClient
*c
);
342 static void rpushCommand(redisClient
*c
);
343 static void lpopCommand(redisClient
*c
);
344 static void rpopCommand(redisClient
*c
);
345 static void llenCommand(redisClient
*c
);
346 static void lindexCommand(redisClient
*c
);
347 static void lrangeCommand(redisClient
*c
);
348 static void ltrimCommand(redisClient
*c
);
349 static void typeCommand(redisClient
*c
);
350 static void lsetCommand(redisClient
*c
);
351 static void saddCommand(redisClient
*c
);
352 static void sremCommand(redisClient
*c
);
353 static void smoveCommand(redisClient
*c
);
354 static void sismemberCommand(redisClient
*c
);
355 static void scardCommand(redisClient
*c
);
356 static void sinterCommand(redisClient
*c
);
357 static void sinterstoreCommand(redisClient
*c
);
358 static void sunionCommand(redisClient
*c
);
359 static void sunionstoreCommand(redisClient
*c
);
360 static void sdiffCommand(redisClient
*c
);
361 static void sdiffstoreCommand(redisClient
*c
);
362 static void syncCommand(redisClient
*c
);
363 static void flushdbCommand(redisClient
*c
);
364 static void flushallCommand(redisClient
*c
);
365 static void sortCommand(redisClient
*c
);
366 static void lremCommand(redisClient
*c
);
367 static void infoCommand(redisClient
*c
);
368 static void mgetCommand(redisClient
*c
);
369 static void monitorCommand(redisClient
*c
);
370 static void expireCommand(redisClient
*c
);
371 static void getSetCommand(redisClient
*c
);
372 static void ttlCommand(redisClient
*c
);
373 static void slaveofCommand(redisClient
*c
);
375 /*================================= Globals ================================= */
378 static struct redisServer server
; /* server global state */
379 static struct redisCommand cmdTable
[] = {
380 {"get",getCommand
,2,REDIS_CMD_INLINE
},
381 {"set",setCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
382 {"setnx",setnxCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
383 {"del",delCommand
,-2,REDIS_CMD_INLINE
},
384 {"exists",existsCommand
,2,REDIS_CMD_INLINE
},
385 {"incr",incrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
386 {"decr",decrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
387 {"mget",mgetCommand
,-2,REDIS_CMD_INLINE
},
388 {"rpush",rpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
389 {"lpush",lpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
390 {"rpop",rpopCommand
,2,REDIS_CMD_INLINE
},
391 {"lpop",lpopCommand
,2,REDIS_CMD_INLINE
},
392 {"llen",llenCommand
,2,REDIS_CMD_INLINE
},
393 {"lindex",lindexCommand
,3,REDIS_CMD_INLINE
},
394 {"lset",lsetCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
395 {"lrange",lrangeCommand
,4,REDIS_CMD_INLINE
},
396 {"ltrim",ltrimCommand
,4,REDIS_CMD_INLINE
},
397 {"lrem",lremCommand
,4,REDIS_CMD_BULK
},
398 {"sadd",saddCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
399 {"srem",sremCommand
,3,REDIS_CMD_BULK
},
400 {"smove",smoveCommand
,4,REDIS_CMD_BULK
},
401 {"sismember",sismemberCommand
,3,REDIS_CMD_BULK
},
402 {"scard",scardCommand
,2,REDIS_CMD_INLINE
},
403 {"sinter",sinterCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
404 {"sinterstore",sinterstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
405 {"sunion",sunionCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
406 {"sunionstore",sunionstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
407 {"sdiff",sdiffCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
408 {"sdiffstore",sdiffstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
409 {"smembers",sinterCommand
,2,REDIS_CMD_INLINE
},
410 {"incrby",incrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
411 {"decrby",decrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
412 {"getset",getSetCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
413 {"randomkey",randomkeyCommand
,1,REDIS_CMD_INLINE
},
414 {"select",selectCommand
,2,REDIS_CMD_INLINE
},
415 {"move",moveCommand
,3,REDIS_CMD_INLINE
},
416 {"rename",renameCommand
,3,REDIS_CMD_INLINE
},
417 {"renamenx",renamenxCommand
,3,REDIS_CMD_INLINE
},
418 {"expire",expireCommand
,3,REDIS_CMD_INLINE
},
419 {"keys",keysCommand
,2,REDIS_CMD_INLINE
},
420 {"dbsize",dbsizeCommand
,1,REDIS_CMD_INLINE
},
421 {"auth",authCommand
,2,REDIS_CMD_INLINE
},
422 {"ping",pingCommand
,1,REDIS_CMD_INLINE
},
423 {"echo",echoCommand
,2,REDIS_CMD_BULK
},
424 {"save",saveCommand
,1,REDIS_CMD_INLINE
},
425 {"bgsave",bgsaveCommand
,1,REDIS_CMD_INLINE
},
426 {"shutdown",shutdownCommand
,1,REDIS_CMD_INLINE
},
427 {"lastsave",lastsaveCommand
,1,REDIS_CMD_INLINE
},
428 {"type",typeCommand
,2,REDIS_CMD_INLINE
},
429 {"sync",syncCommand
,1,REDIS_CMD_INLINE
},
430 {"flushdb",flushdbCommand
,1,REDIS_CMD_INLINE
},
431 {"flushall",flushallCommand
,1,REDIS_CMD_INLINE
},
432 {"sort",sortCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
433 {"info",infoCommand
,1,REDIS_CMD_INLINE
},
434 {"monitor",monitorCommand
,1,REDIS_CMD_INLINE
},
435 {"ttl",ttlCommand
,2,REDIS_CMD_INLINE
},
436 {"slaveof",slaveofCommand
,3,REDIS_CMD_INLINE
},
440 /*============================ Utility functions ============================ */
442 /* Glob-style pattern matching. */
443 int stringmatchlen(const char *pattern
, int patternLen
,
444 const char *string
, int stringLen
, int nocase
)
449 while (pattern
[1] == '*') {
454 return 1; /* match */
456 if (stringmatchlen(pattern
+1, patternLen
-1,
457 string
, stringLen
, nocase
))
458 return 1; /* match */
462 return 0; /* no match */
466 return 0; /* no match */
476 not = pattern
[0] == '^';
483 if (pattern
[0] == '\\') {
486 if (pattern
[0] == string
[0])
488 } else if (pattern
[0] == ']') {
490 } else if (patternLen
== 0) {
494 } else if (pattern
[1] == '-' && patternLen
>= 3) {
495 int start
= pattern
[0];
496 int end
= pattern
[2];
504 start
= tolower(start
);
510 if (c
>= start
&& c
<= end
)
514 if (pattern
[0] == string
[0])
517 if (tolower((int)pattern
[0]) == tolower((int)string
[0]))
527 return 0; /* no match */
533 if (patternLen
>= 2) {
540 if (pattern
[0] != string
[0])
541 return 0; /* no match */
543 if (tolower((int)pattern
[0]) != tolower((int)string
[0]))
544 return 0; /* no match */
552 if (stringLen
== 0) {
553 while(*pattern
== '*') {
560 if (patternLen
== 0 && stringLen
== 0)
565 void redisLog(int level
, const char *fmt
, ...)
570 fp
= (server
.logfile
== NULL
) ? stdout
: fopen(server
.logfile
,"a");
574 if (level
>= server
.verbosity
) {
580 strftime(buf
,64,"%d %b %H:%M:%S",gmtime(&now
));
581 fprintf(fp
,"%s %c ",buf
,c
[level
]);
582 vfprintf(fp
, fmt
, ap
);
588 if (server
.logfile
) fclose(fp
);
591 /*====================== Hash table type implementation ==================== */
593 /* This is an hash table type that uses the SDS dynamic strings libary as
594 * keys and radis objects as values (objects can hold SDS strings,
597 static int sdsDictKeyCompare(void *privdata
, const void *key1
,
601 DICT_NOTUSED(privdata
);
603 l1
= sdslen((sds
)key1
);
604 l2
= sdslen((sds
)key2
);
605 if (l1
!= l2
) return 0;
606 return memcmp(key1
, key2
, l1
) == 0;
609 static void dictRedisObjectDestructor(void *privdata
, void *val
)
611 DICT_NOTUSED(privdata
);
616 static int dictSdsKeyCompare(void *privdata
, const void *key1
,
619 const robj
*o1
= key1
, *o2
= key2
;
620 return sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
623 static unsigned int dictSdsHash(const void *key
) {
625 return dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
628 static dictType setDictType
= {
629 dictSdsHash
, /* hash function */
632 dictSdsKeyCompare
, /* key compare */
633 dictRedisObjectDestructor
, /* key destructor */
634 NULL
/* val destructor */
637 static dictType hashDictType
= {
638 dictSdsHash
, /* hash function */
641 dictSdsKeyCompare
, /* key compare */
642 dictRedisObjectDestructor
, /* key destructor */
643 dictRedisObjectDestructor
/* val destructor */
646 /* ========================= Random utility functions ======================= */
648 /* Redis generally does not try to recover from out of memory conditions
649 * when allocating objects or strings, it is not clear if it will be possible
650 * to report this condition to the client since the networking layer itself
651 * is based on heap allocation for send buffers, so we simply abort.
652 * At least the code will be simpler to read... */
653 static void oom(const char *msg
) {
654 fprintf(stderr
, "%s: Out of memory\n",msg
);
660 /* ====================== Redis server networking stuff ===================== */
661 void closeTimedoutClients(void) {
664 time_t now
= time(NULL
);
666 listRewind(server
.clients
);
667 while ((ln
= listYield(server
.clients
)) != NULL
) {
668 c
= listNodeValue(ln
);
669 if (!(c
->flags
& REDIS_SLAVE
) && /* no timeout for slaves */
670 !(c
->flags
& REDIS_MASTER
) && /* no timeout for masters */
671 (now
- c
->lastinteraction
> server
.maxidletime
)) {
672 redisLog(REDIS_DEBUG
,"Closing idle client");
678 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
679 * we resize the hash table to save memory */
680 void tryResizeHashTables(void) {
683 for (j
= 0; j
< server
.dbnum
; j
++) {
684 long long size
, used
;
686 size
= dictSlots(server
.db
[j
].dict
);
687 used
= dictSize(server
.db
[j
].dict
);
688 if (size
&& used
&& size
> REDIS_HT_MINSLOTS
&&
689 (used
*100/size
< REDIS_HT_MINFILL
)) {
690 redisLog(REDIS_NOTICE
,"The hash table %d is too sparse, resize it...",j
);
691 dictResize(server
.db
[j
].dict
);
692 redisLog(REDIS_NOTICE
,"Hash table %d resized.",j
);
697 int serverCron(struct aeEventLoop
*eventLoop
, long long id
, void *clientData
) {
698 int j
, loops
= server
.cronloops
++;
699 REDIS_NOTUSED(eventLoop
);
701 REDIS_NOTUSED(clientData
);
703 /* Update the global state with the amount of used memory */
704 server
.usedmemory
= zmalloc_used_memory();
706 /* Show some info about non-empty databases */
707 for (j
= 0; j
< server
.dbnum
; j
++) {
708 long long size
, used
, vkeys
;
710 size
= dictSlots(server
.db
[j
].dict
);
711 used
= dictSize(server
.db
[j
].dict
);
712 vkeys
= dictSize(server
.db
[j
].expires
);
713 if (!(loops
% 5) && used
> 0) {
714 redisLog(REDIS_DEBUG
,"DB %d: %d keys (%d volatile) in %d slots HT.",j
,used
,vkeys
,size
);
715 /* dictPrintStats(server.dict); */
719 /* We don't want to resize the hash tables while a bacground saving
720 * is in progress: the saving child is created using fork() that is
721 * implemented with a copy-on-write semantic in most modern systems, so
722 * if we resize the HT while there is the saving child at work actually
723 * a lot of memory movements in the parent will cause a lot of pages
725 if (!server
.bgsaveinprogress
) tryResizeHashTables();
727 /* Show information about connected clients */
729 redisLog(REDIS_DEBUG
,"%d clients connected (%d slaves), %zu bytes in use",
730 listLength(server
.clients
)-listLength(server
.slaves
),
731 listLength(server
.slaves
),
733 dictSize(server
.sharingpool
));
736 /* Close connections of timedout clients */
737 if (server
.maxidletime
&& !(loops
% 10))
738 closeTimedoutClients();
740 /* Check if a background saving in progress terminated */
741 if (server
.bgsaveinprogress
) {
743 /* XXX: TODO handle the case of the saving child killed */
744 if (wait4(-1,&statloc
,WNOHANG
,NULL
)) {
745 int exitcode
= WEXITSTATUS(statloc
);
747 redisLog(REDIS_NOTICE
,
748 "Background saving terminated with success");
750 server
.lastsave
= time(NULL
);
752 redisLog(REDIS_WARNING
,
753 "Background saving error");
755 server
.bgsaveinprogress
= 0;
756 updateSalvesWaitingBgsave(exitcode
== 0 ? REDIS_OK
: REDIS_ERR
);
759 /* If there is not a background saving in progress check if
760 * we have to save now */
761 time_t now
= time(NULL
);
762 for (j
= 0; j
< server
.saveparamslen
; j
++) {
763 struct saveparam
*sp
= server
.saveparams
+j
;
765 if (server
.dirty
>= sp
->changes
&&
766 now
-server
.lastsave
> sp
->seconds
) {
767 redisLog(REDIS_NOTICE
,"%d changes in %d seconds. Saving...",
768 sp
->changes
, sp
->seconds
);
769 rdbSaveBackground(server
.dbfilename
);
775 /* Try to expire a few timed out keys */
776 for (j
= 0; j
< server
.dbnum
; j
++) {
777 redisDb
*db
= server
.db
+j
;
778 int num
= dictSize(db
->expires
);
781 time_t now
= time(NULL
);
783 if (num
> REDIS_EXPIRELOOKUPS_PER_CRON
)
784 num
= REDIS_EXPIRELOOKUPS_PER_CRON
;
789 if ((de
= dictGetRandomKey(db
->expires
)) == NULL
) break;
790 t
= (time_t) dictGetEntryVal(de
);
792 deleteKey(db
,dictGetEntryKey(de
));
798 /* Check if we should connect to a MASTER */
799 if (server
.replstate
== REDIS_REPL_CONNECT
) {
800 redisLog(REDIS_NOTICE
,"Connecting to MASTER...");
801 if (syncWithMaster() == REDIS_OK
) {
802 redisLog(REDIS_NOTICE
,"MASTER <-> SLAVE sync succeeded");
808 static void createSharedObjects(void) {
809 shared
.crlf
= createObject(REDIS_STRING
,sdsnew("\r\n"));
810 shared
.ok
= createObject(REDIS_STRING
,sdsnew("+OK\r\n"));
811 shared
.err
= createObject(REDIS_STRING
,sdsnew("-ERR\r\n"));
812 shared
.emptybulk
= createObject(REDIS_STRING
,sdsnew("$0\r\n\r\n"));
813 shared
.czero
= createObject(REDIS_STRING
,sdsnew(":0\r\n"));
814 shared
.cone
= createObject(REDIS_STRING
,sdsnew(":1\r\n"));
815 shared
.nullbulk
= createObject(REDIS_STRING
,sdsnew("$-1\r\n"));
816 shared
.nullmultibulk
= createObject(REDIS_STRING
,sdsnew("*-1\r\n"));
817 shared
.emptymultibulk
= createObject(REDIS_STRING
,sdsnew("*0\r\n"));
819 shared
.pong
= createObject(REDIS_STRING
,sdsnew("+PONG\r\n"));
820 shared
.wrongtypeerr
= createObject(REDIS_STRING
,sdsnew(
821 "-ERR Operation against a key holding the wrong kind of value\r\n"));
822 shared
.nokeyerr
= createObject(REDIS_STRING
,sdsnew(
823 "-ERR no such key\r\n"));
824 shared
.syntaxerr
= createObject(REDIS_STRING
,sdsnew(
825 "-ERR syntax error\r\n"));
826 shared
.sameobjecterr
= createObject(REDIS_STRING
,sdsnew(
827 "-ERR source and destination objects are the same\r\n"));
828 shared
.outofrangeerr
= createObject(REDIS_STRING
,sdsnew(
829 "-ERR index out of range\r\n"));
830 shared
.space
= createObject(REDIS_STRING
,sdsnew(" "));
831 shared
.colon
= createObject(REDIS_STRING
,sdsnew(":"));
832 shared
.plus
= createObject(REDIS_STRING
,sdsnew("+"));
833 shared
.select0
= createStringObject("select 0\r\n",10);
834 shared
.select1
= createStringObject("select 1\r\n",10);
835 shared
.select2
= createStringObject("select 2\r\n",10);
836 shared
.select3
= createStringObject("select 3\r\n",10);
837 shared
.select4
= createStringObject("select 4\r\n",10);
838 shared
.select5
= createStringObject("select 5\r\n",10);
839 shared
.select6
= createStringObject("select 6\r\n",10);
840 shared
.select7
= createStringObject("select 7\r\n",10);
841 shared
.select8
= createStringObject("select 8\r\n",10);
842 shared
.select9
= createStringObject("select 9\r\n",10);
845 static void appendServerSaveParams(time_t seconds
, int changes
) {
846 server
.saveparams
= zrealloc(server
.saveparams
,sizeof(struct saveparam
)*(server
.saveparamslen
+1));
847 if (server
.saveparams
== NULL
) oom("appendServerSaveParams");
848 server
.saveparams
[server
.saveparamslen
].seconds
= seconds
;
849 server
.saveparams
[server
.saveparamslen
].changes
= changes
;
850 server
.saveparamslen
++;
853 static void ResetServerSaveParams() {
854 zfree(server
.saveparams
);
855 server
.saveparams
= NULL
;
856 server
.saveparamslen
= 0;
859 static void initServerConfig() {
860 server
.dbnum
= REDIS_DEFAULT_DBNUM
;
861 server
.port
= REDIS_SERVERPORT
;
862 server
.verbosity
= REDIS_DEBUG
;
863 server
.maxidletime
= REDIS_MAXIDLETIME
;
864 server
.saveparams
= NULL
;
865 server
.logfile
= NULL
; /* NULL = log on standard output */
866 server
.bindaddr
= NULL
;
867 server
.glueoutputbuf
= 1;
868 server
.daemonize
= 0;
869 server
.pidfile
= "/var/run/redis.pid";
870 server
.dbfilename
= "dump.rdb";
871 server
.requirepass
= NULL
;
872 server
.shareobjects
= 0;
873 server
.maxclients
= 0;
874 server
.maxmemory
= 0;
875 ResetServerSaveParams();
877 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
878 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
879 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
880 /* Replication related */
882 server
.masterhost
= NULL
;
883 server
.masterport
= 6379;
884 server
.master
= NULL
;
885 server
.replstate
= REDIS_REPL_NONE
;
888 static void initServer() {
891 signal(SIGHUP
, SIG_IGN
);
892 signal(SIGPIPE
, SIG_IGN
);
894 server
.clients
= listCreate();
895 server
.slaves
= listCreate();
896 server
.monitors
= listCreate();
897 server
.objfreelist
= listCreate();
898 createSharedObjects();
899 server
.el
= aeCreateEventLoop();
900 server
.db
= zmalloc(sizeof(redisDb
)*server
.dbnum
);
901 server
.sharingpool
= dictCreate(&setDictType
,NULL
);
902 server
.sharingpoolsize
= 1024;
903 if (!server
.db
|| !server
.clients
|| !server
.slaves
|| !server
.monitors
|| !server
.el
|| !server
.objfreelist
)
904 oom("server initialization"); /* Fatal OOM */
905 server
.fd
= anetTcpServer(server
.neterr
, server
.port
, server
.bindaddr
);
906 if (server
.fd
== -1) {
907 redisLog(REDIS_WARNING
, "Opening TCP port: %s", server
.neterr
);
910 for (j
= 0; j
< server
.dbnum
; j
++) {
911 server
.db
[j
].dict
= dictCreate(&hashDictType
,NULL
);
912 server
.db
[j
].expires
= dictCreate(&setDictType
,NULL
);
915 server
.cronloops
= 0;
916 server
.bgsaveinprogress
= 0;
917 server
.lastsave
= time(NULL
);
919 server
.usedmemory
= 0;
920 server
.stat_numcommands
= 0;
921 server
.stat_numconnections
= 0;
922 server
.stat_starttime
= time(NULL
);
923 aeCreateTimeEvent(server
.el
, 1000, serverCron
, NULL
, NULL
);
926 /* Empty the whole database */
927 static long long emptyDb() {
929 long long removed
= 0;
931 for (j
= 0; j
< server
.dbnum
; j
++) {
932 removed
+= dictSize(server
.db
[j
].dict
);
933 dictEmpty(server
.db
[j
].dict
);
934 dictEmpty(server
.db
[j
].expires
);
939 static int yesnotoi(char *s
) {
940 if (!strcasecmp(s
,"yes")) return 1;
941 else if (!strcasecmp(s
,"no")) return 0;
945 /* I agree, this is a very rudimental way to load a configuration...
946 will improve later if the config gets more complex */
947 static void loadServerConfig(char *filename
) {
948 FILE *fp
= fopen(filename
,"r");
949 char buf
[REDIS_CONFIGLINE_MAX
+1], *err
= NULL
;
954 redisLog(REDIS_WARNING
,"Fatal error, can't open config file");
957 while(fgets(buf
,REDIS_CONFIGLINE_MAX
+1,fp
) != NULL
) {
963 line
= sdstrim(line
," \t\r\n");
965 /* Skip comments and blank lines*/
966 if (line
[0] == '#' || line
[0] == '\0') {
971 /* Split into arguments */
972 argv
= sdssplitlen(line
,sdslen(line
)," ",1,&argc
);
975 /* Execute config directives */
976 if (!strcasecmp(argv
[0],"timeout") && argc
== 2) {
977 server
.maxidletime
= atoi(argv
[1]);
978 if (server
.maxidletime
< 0) {
979 err
= "Invalid timeout value"; goto loaderr
;
981 } else if (!strcasecmp(argv
[0],"port") && argc
== 2) {
982 server
.port
= atoi(argv
[1]);
983 if (server
.port
< 1 || server
.port
> 65535) {
984 err
= "Invalid port"; goto loaderr
;
986 } else if (!strcasecmp(argv
[0],"bind") && argc
== 2) {
987 server
.bindaddr
= zstrdup(argv
[1]);
988 } else if (!strcasecmp(argv
[0],"save") && argc
== 3) {
989 int seconds
= atoi(argv
[1]);
990 int changes
= atoi(argv
[2]);
991 if (seconds
< 1 || changes
< 0) {
992 err
= "Invalid save parameters"; goto loaderr
;
994 appendServerSaveParams(seconds
,changes
);
995 } else if (!strcasecmp(argv
[0],"dir") && argc
== 2) {
996 if (chdir(argv
[1]) == -1) {
997 redisLog(REDIS_WARNING
,"Can't chdir to '%s': %s",
998 argv
[1], strerror(errno
));
1001 } else if (!strcasecmp(argv
[0],"loglevel") && argc
== 2) {
1002 if (!strcasecmp(argv
[1],"debug")) server
.verbosity
= REDIS_DEBUG
;
1003 else if (!strcasecmp(argv
[1],"notice")) server
.verbosity
= REDIS_NOTICE
;
1004 else if (!strcasecmp(argv
[1],"warning")) server
.verbosity
= REDIS_WARNING
;
1006 err
= "Invalid log level. Must be one of debug, notice, warning";
1009 } else if (!strcasecmp(argv
[0],"logfile") && argc
== 2) {
1012 server
.logfile
= zstrdup(argv
[1]);
1013 if (!strcasecmp(server
.logfile
,"stdout")) {
1014 zfree(server
.logfile
);
1015 server
.logfile
= NULL
;
1017 if (server
.logfile
) {
1018 /* Test if we are able to open the file. The server will not
1019 * be able to abort just for this problem later... */
1020 fp
= fopen(server
.logfile
,"a");
1022 err
= sdscatprintf(sdsempty(),
1023 "Can't open the log file: %s", strerror(errno
));
1028 } else if (!strcasecmp(argv
[0],"databases") && argc
== 2) {
1029 server
.dbnum
= atoi(argv
[1]);
1030 if (server
.dbnum
< 1) {
1031 err
= "Invalid number of databases"; goto loaderr
;
1033 } else if (!strcasecmp(argv
[0],"maxclients") && argc
== 2) {
1034 server
.maxclients
= atoi(argv
[1]);
1035 } else if (!strcasecmp(argv
[0],"maxmemory") && argc
== 2) {
1036 server
.maxmemory
= atoi(argv
[1]);
1037 } else if (!strcasecmp(argv
[0],"slaveof") && argc
== 3) {
1038 server
.masterhost
= sdsnew(argv
[1]);
1039 server
.masterport
= atoi(argv
[2]);
1040 server
.replstate
= REDIS_REPL_CONNECT
;
1041 } else if (!strcasecmp(argv
[0],"glueoutputbuf") && argc
== 2) {
1042 if ((server
.glueoutputbuf
= yesnotoi(argv
[1])) == -1) {
1043 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1045 } else if (!strcasecmp(argv
[0],"shareobjects") && argc
== 2) {
1046 if ((server
.shareobjects
= yesnotoi(argv
[1])) == -1) {
1047 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1049 } else if (!strcasecmp(argv
[0],"shareobjectspoolsize") && argc
== 2) {
1050 server
.sharingpoolsize
= atoi(argv
[1]);
1051 if (server
.sharingpoolsize
< 1) {
1052 err
= "invalid object sharing pool size"; goto loaderr
;
1054 } else if (!strcasecmp(argv
[0],"daemonize") && argc
== 2) {
1055 if ((server
.daemonize
= yesnotoi(argv
[1])) == -1) {
1056 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1058 } else if (!strcasecmp(argv
[0],"requirepass") && argc
== 2) {
1059 server
.requirepass
= zstrdup(argv
[1]);
1060 } else if (!strcasecmp(argv
[0],"pidfile") && argc
== 2) {
1061 server
.pidfile
= zstrdup(argv
[1]);
1062 } else if (!strcasecmp(argv
[0],"dbfilename") && argc
== 2) {
1063 server
.dbfilename
= zstrdup(argv
[1]);
1065 err
= "Bad directive or wrong number of arguments"; goto loaderr
;
1067 for (j
= 0; j
< argc
; j
++)
1076 fprintf(stderr
, "\n*** FATAL CONFIG FILE ERROR ***\n");
1077 fprintf(stderr
, "Reading the configuration file, at line %d\n", linenum
);
1078 fprintf(stderr
, ">>> '%s'\n", line
);
1079 fprintf(stderr
, "%s\n", err
);
1083 static void freeClientArgv(redisClient
*c
) {
1086 for (j
= 0; j
< c
->argc
; j
++)
1087 decrRefCount(c
->argv
[j
]);
1091 static void freeClient(redisClient
*c
) {
1094 aeDeleteFileEvent(server
.el
,c
->fd
,AE_READABLE
);
1095 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1096 sdsfree(c
->querybuf
);
1097 listRelease(c
->reply
);
1100 ln
= listSearchKey(server
.clients
,c
);
1102 listDelNode(server
.clients
,ln
);
1103 if (c
->flags
& REDIS_SLAVE
) {
1104 if (c
->replstate
== REDIS_REPL_SEND_BULK
&& c
->repldbfd
!= -1)
1106 list
*l
= (c
->flags
& REDIS_MONITOR
) ? server
.monitors
: server
.slaves
;
1107 ln
= listSearchKey(l
,c
);
1111 if (c
->flags
& REDIS_MASTER
) {
1112 server
.master
= NULL
;
1113 server
.replstate
= REDIS_REPL_CONNECT
;
1119 static void glueReplyBuffersIfNeeded(redisClient
*c
) {
1124 listRewind(c
->reply
);
1125 while((ln
= listYield(c
->reply
))) {
1127 totlen
+= sdslen(o
->ptr
);
1128 /* This optimization makes more sense if we don't have to copy
1130 if (totlen
> 1024) return;
1136 listRewind(c
->reply
);
1137 while((ln
= listYield(c
->reply
))) {
1139 memcpy(buf
+copylen
,o
->ptr
,sdslen(o
->ptr
));
1140 copylen
+= sdslen(o
->ptr
);
1141 listDelNode(c
->reply
,ln
);
1143 /* Now the output buffer is empty, add the new single element */
1144 o
= createObject(REDIS_STRING
,sdsnewlen(buf
,totlen
));
1145 if (!listAddNodeTail(c
->reply
,o
)) oom("listAddNodeTail");
1149 static void sendReplyToClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1150 redisClient
*c
= privdata
;
1151 int nwritten
= 0, totwritten
= 0, objlen
;
1154 REDIS_NOTUSED(mask
);
1156 if (server
.glueoutputbuf
&& listLength(c
->reply
) > 1)
1157 glueReplyBuffersIfNeeded(c
);
1158 while(listLength(c
->reply
)) {
1159 o
= listNodeValue(listFirst(c
->reply
));
1160 objlen
= sdslen(o
->ptr
);
1163 listDelNode(c
->reply
,listFirst(c
->reply
));
1167 if (c
->flags
& REDIS_MASTER
) {
1168 nwritten
= objlen
- c
->sentlen
;
1170 nwritten
= write(fd
, ((char*)o
->ptr
)+c
->sentlen
, objlen
- c
->sentlen
);
1171 if (nwritten
<= 0) break;
1173 c
->sentlen
+= nwritten
;
1174 totwritten
+= nwritten
;
1175 /* If we fully sent the object on head go to the next one */
1176 if (c
->sentlen
== objlen
) {
1177 listDelNode(c
->reply
,listFirst(c
->reply
));
1181 if (nwritten
== -1) {
1182 if (errno
== EAGAIN
) {
1185 redisLog(REDIS_DEBUG
,
1186 "Error writing to client: %s", strerror(errno
));
1191 if (totwritten
> 0) c
->lastinteraction
= time(NULL
);
1192 if (listLength(c
->reply
) == 0) {
1194 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1198 static struct redisCommand
*lookupCommand(char *name
) {
1200 while(cmdTable
[j
].name
!= NULL
) {
1201 if (!strcasecmp(name
,cmdTable
[j
].name
)) return &cmdTable
[j
];
1207 /* resetClient prepare the client to process the next command */
1208 static void resetClient(redisClient
*c
) {
1213 /* If this function gets called we already read a whole
1214 * command, argments are in the client argv/argc fields.
1215 * processCommand() execute the command or prepare the
1216 * server for a bulk read from the client.
1218 * If 1 is returned the client is still alive and valid and
1219 * and other operations can be performed by the caller. Otherwise
1220 * if 0 is returned the client was destroied (i.e. after QUIT). */
1221 static int processCommand(redisClient
*c
) {
1222 struct redisCommand
*cmd
;
1225 /* Free some memory if needed (maxmemory setting) */
1226 if (server
.maxmemory
) freeMemoryIfNeeded();
1228 /* The QUIT command is handled as a special case. Normal command
1229 * procs are unable to close the client connection safely */
1230 if (!strcasecmp(c
->argv
[0]->ptr
,"quit")) {
1234 cmd
= lookupCommand(c
->argv
[0]->ptr
);
1236 addReplySds(c
,sdsnew("-ERR unknown command\r\n"));
1239 } else if ((cmd
->arity
> 0 && cmd
->arity
!= c
->argc
) ||
1240 (c
->argc
< -cmd
->arity
)) {
1241 addReplySds(c
,sdsnew("-ERR wrong number of arguments\r\n"));
1244 } else if (server
.maxmemory
&& cmd
->flags
& REDIS_CMD_DENYOOM
&& zmalloc_used_memory() > server
.maxmemory
) {
1245 addReplySds(c
,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1248 } else if (cmd
->flags
& REDIS_CMD_BULK
&& c
->bulklen
== -1) {
1249 int bulklen
= atoi(c
->argv
[c
->argc
-1]->ptr
);
1251 decrRefCount(c
->argv
[c
->argc
-1]);
1252 if (bulklen
< 0 || bulklen
> 1024*1024*1024) {
1254 addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n"));
1259 c
->bulklen
= bulklen
+2; /* add two bytes for CR+LF */
1260 /* It is possible that the bulk read is already in the
1261 * buffer. Check this condition and handle it accordingly */
1262 if ((signed)sdslen(c
->querybuf
) >= c
->bulklen
) {
1263 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
1265 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
1270 /* Let's try to share objects on the command arguments vector */
1271 if (server
.shareobjects
) {
1273 for(j
= 1; j
< c
->argc
; j
++)
1274 c
->argv
[j
] = tryObjectSharing(c
->argv
[j
]);
1276 /* Check if the user is authenticated */
1277 if (server
.requirepass
&& !c
->authenticated
&& cmd
->proc
!= authCommand
) {
1278 addReplySds(c
,sdsnew("-ERR operation not permitted\r\n"));
1283 /* Exec the command */
1284 dirty
= server
.dirty
;
1286 if (server
.dirty
-dirty
!= 0 && listLength(server
.slaves
))
1287 replicationFeedSlaves(server
.slaves
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1288 if (listLength(server
.monitors
))
1289 replicationFeedSlaves(server
.monitors
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1290 server
.stat_numcommands
++;
1292 /* Prepare the client for the next command */
1293 if (c
->flags
& REDIS_CLOSE
) {
1301 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
) {
1305 /* (args*2)+1 is enough room for args, spaces, newlines */
1306 robj
*static_outv
[REDIS_STATIC_ARGS
*2+1];
1308 if (argc
<= REDIS_STATIC_ARGS
) {
1311 outv
= zmalloc(sizeof(robj
*)*(argc
*2+1));
1312 if (!outv
) oom("replicationFeedSlaves");
1315 for (j
= 0; j
< argc
; j
++) {
1316 if (j
!= 0) outv
[outc
++] = shared
.space
;
1317 if ((cmd
->flags
& REDIS_CMD_BULK
) && j
== argc
-1) {
1320 lenobj
= createObject(REDIS_STRING
,
1321 sdscatprintf(sdsempty(),"%d\r\n",sdslen(argv
[j
]->ptr
)));
1322 lenobj
->refcount
= 0;
1323 outv
[outc
++] = lenobj
;
1325 outv
[outc
++] = argv
[j
];
1327 outv
[outc
++] = shared
.crlf
;
1329 /* Increment all the refcounts at start and decrement at end in order to
1330 * be sure to free objects if there is no slave in a replication state
1331 * able to be feed with commands */
1332 for (j
= 0; j
< outc
; j
++) incrRefCount(outv
[j
]);
1334 while((ln
= listYield(slaves
))) {
1335 redisClient
*slave
= ln
->value
;
1337 /* Don't feed slaves that are still waiting for BGSAVE to start */
1338 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) continue;
1340 /* Feed all the other slaves, MONITORs and so on */
1341 if (slave
->slaveseldb
!= dictid
) {
1345 case 0: selectcmd
= shared
.select0
; break;
1346 case 1: selectcmd
= shared
.select1
; break;
1347 case 2: selectcmd
= shared
.select2
; break;
1348 case 3: selectcmd
= shared
.select3
; break;
1349 case 4: selectcmd
= shared
.select4
; break;
1350 case 5: selectcmd
= shared
.select5
; break;
1351 case 6: selectcmd
= shared
.select6
; break;
1352 case 7: selectcmd
= shared
.select7
; break;
1353 case 8: selectcmd
= shared
.select8
; break;
1354 case 9: selectcmd
= shared
.select9
; break;
1356 selectcmd
= createObject(REDIS_STRING
,
1357 sdscatprintf(sdsempty(),"select %d\r\n",dictid
));
1358 selectcmd
->refcount
= 0;
1361 addReply(slave
,selectcmd
);
1362 slave
->slaveseldb
= dictid
;
1364 for (j
= 0; j
< outc
; j
++) addReply(slave
,outv
[j
]);
1366 for (j
= 0; j
< outc
; j
++) decrRefCount(outv
[j
]);
1367 if (outv
!= static_outv
) zfree(outv
);
1370 static void readQueryFromClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1371 redisClient
*c
= (redisClient
*) privdata
;
1372 char buf
[REDIS_IOBUF_LEN
];
1375 REDIS_NOTUSED(mask
);
1377 nread
= read(fd
, buf
, REDIS_IOBUF_LEN
);
1379 if (errno
== EAGAIN
) {
1382 redisLog(REDIS_DEBUG
, "Reading from client: %s",strerror(errno
));
1386 } else if (nread
== 0) {
1387 redisLog(REDIS_DEBUG
, "Client closed connection");
1392 c
->querybuf
= sdscatlen(c
->querybuf
, buf
, nread
);
1393 c
->lastinteraction
= time(NULL
);
1399 if (c
->bulklen
== -1) {
1400 /* Read the first line of the query */
1401 char *p
= strchr(c
->querybuf
,'\n');
1407 query
= c
->querybuf
;
1408 c
->querybuf
= sdsempty();
1409 querylen
= 1+(p
-(query
));
1410 if (sdslen(query
) > querylen
) {
1411 /* leave data after the first line of the query in the buffer */
1412 c
->querybuf
= sdscatlen(c
->querybuf
,query
+querylen
,sdslen(query
)-querylen
);
1414 *p
= '\0'; /* remove "\n" */
1415 if (*(p
-1) == '\r') *(p
-1) = '\0'; /* and "\r" if any */
1416 sdsupdatelen(query
);
1418 /* Now we can split the query in arguments */
1419 if (sdslen(query
) == 0) {
1420 /* Ignore empty query */
1424 argv
= sdssplitlen(query
,sdslen(query
)," ",1,&argc
);
1425 if (argv
== NULL
) oom("sdssplitlen");
1428 if (c
->argv
) zfree(c
->argv
);
1429 c
->argv
= zmalloc(sizeof(robj
*)*argc
);
1430 if (c
->argv
== NULL
) oom("allocating arguments list for client");
1432 for (j
= 0; j
< argc
; j
++) {
1433 if (sdslen(argv
[j
])) {
1434 c
->argv
[c
->argc
] = createObject(REDIS_STRING
,argv
[j
]);
1441 /* Execute the command. If the client is still valid
1442 * after processCommand() return and there is something
1443 * on the query buffer try to process the next command. */
1444 if (processCommand(c
) && sdslen(c
->querybuf
)) goto again
;
1446 } else if (sdslen(c
->querybuf
) >= 1024*32) {
1447 redisLog(REDIS_DEBUG
, "Client protocol error");
1452 /* Bulk read handling. Note that if we are at this point
1453 the client already sent a command terminated with a newline,
1454 we are reading the bulk data that is actually the last
1455 argument of the command. */
1456 int qbl
= sdslen(c
->querybuf
);
1458 if (c
->bulklen
<= qbl
) {
1459 /* Copy everything but the final CRLF as final argument */
1460 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
1462 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
1469 static int selectDb(redisClient
*c
, int id
) {
1470 if (id
< 0 || id
>= server
.dbnum
)
1472 c
->db
= &server
.db
[id
];
1476 static void *dupClientReplyValue(void *o
) {
1477 incrRefCount((robj
*)o
);
1481 static redisClient
*createClient(int fd
) {
1482 redisClient
*c
= zmalloc(sizeof(*c
));
1484 anetNonBlock(NULL
,fd
);
1485 anetTcpNoDelay(NULL
,fd
);
1486 if (!c
) return NULL
;
1489 c
->querybuf
= sdsempty();
1495 c
->lastinteraction
= time(NULL
);
1496 c
->authenticated
= 0;
1497 c
->replstate
= REDIS_REPL_NONE
;
1498 if ((c
->reply
= listCreate()) == NULL
) oom("listCreate");
1499 listSetFreeMethod(c
->reply
,decrRefCount
);
1500 listSetDupMethod(c
->reply
,dupClientReplyValue
);
1501 if (aeCreateFileEvent(server
.el
, c
->fd
, AE_READABLE
,
1502 readQueryFromClient
, c
, NULL
) == AE_ERR
) {
1506 if (!listAddNodeTail(server
.clients
,c
)) oom("listAddNodeTail");
1510 static void addReply(redisClient
*c
, robj
*obj
) {
1511 if (listLength(c
->reply
) == 0 &&
1512 (c
->replstate
== REDIS_REPL_NONE
||
1513 c
->replstate
== REDIS_REPL_ONLINE
) &&
1514 aeCreateFileEvent(server
.el
, c
->fd
, AE_WRITABLE
,
1515 sendReplyToClient
, c
, NULL
) == AE_ERR
) return;
1516 if (!listAddNodeTail(c
->reply
,obj
)) oom("listAddNodeTail");
1520 static void addReplySds(redisClient
*c
, sds s
) {
1521 robj
*o
= createObject(REDIS_STRING
,s
);
1526 static void acceptHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1531 REDIS_NOTUSED(mask
);
1532 REDIS_NOTUSED(privdata
);
1534 cfd
= anetAccept(server
.neterr
, fd
, cip
, &cport
);
1535 if (cfd
== AE_ERR
) {
1536 redisLog(REDIS_DEBUG
,"Accepting client connection: %s", server
.neterr
);
1539 redisLog(REDIS_DEBUG
,"Accepted %s:%d", cip
, cport
);
1540 if ((c
= createClient(cfd
)) == NULL
) {
1541 redisLog(REDIS_WARNING
,"Error allocating resoures for the client");
1542 close(cfd
); /* May be already closed, just ingore errors */
1545 /* If maxclient directive is set and this is one client more... close the
1546 * connection. Note that we create the client instead to check before
1547 * for this condition, since now the socket is already set in nonblocking
1548 * mode and we can send an error for free using the Kernel I/O */
1549 if (server
.maxclients
&& listLength(server
.clients
) > server
.maxclients
) {
1550 char *err
= "-ERR max number of clients reached\r\n";
1552 /* That's a best effort error message, don't check write errors */
1553 (void) write(c
->fd
,err
,strlen(err
));
1557 server
.stat_numconnections
++;
1560 /* ======================= Redis objects implementation ===================== */
1562 static robj
*createObject(int type
, void *ptr
) {
1565 if (listLength(server
.objfreelist
)) {
1566 listNode
*head
= listFirst(server
.objfreelist
);
1567 o
= listNodeValue(head
);
1568 listDelNode(server
.objfreelist
,head
);
1570 o
= zmalloc(sizeof(*o
));
1572 if (!o
) oom("createObject");
1579 static robj
*createStringObject(char *ptr
, size_t len
) {
1580 return createObject(REDIS_STRING
,sdsnewlen(ptr
,len
));
1583 static robj
*createListObject(void) {
1584 list
*l
= listCreate();
1586 if (!l
) oom("listCreate");
1587 listSetFreeMethod(l
,decrRefCount
);
1588 return createObject(REDIS_LIST
,l
);
1591 static robj
*createSetObject(void) {
1592 dict
*d
= dictCreate(&setDictType
,NULL
);
1593 if (!d
) oom("dictCreate");
1594 return createObject(REDIS_SET
,d
);
1597 static void freeStringObject(robj
*o
) {
1601 static void freeListObject(robj
*o
) {
1602 listRelease((list
*) o
->ptr
);
1605 static void freeSetObject(robj
*o
) {
1606 dictRelease((dict
*) o
->ptr
);
1609 static void freeHashObject(robj
*o
) {
1610 dictRelease((dict
*) o
->ptr
);
1613 static void incrRefCount(robj
*o
) {
1615 #ifdef DEBUG_REFCOUNT
1616 if (o
->type
== REDIS_STRING
)
1617 printf("Increment '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
);
1621 static void decrRefCount(void *obj
) {
1624 #ifdef DEBUG_REFCOUNT
1625 if (o
->type
== REDIS_STRING
)
1626 printf("Decrement '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
-1);
1628 if (--(o
->refcount
) == 0) {
1630 case REDIS_STRING
: freeStringObject(o
); break;
1631 case REDIS_LIST
: freeListObject(o
); break;
1632 case REDIS_SET
: freeSetObject(o
); break;
1633 case REDIS_HASH
: freeHashObject(o
); break;
1634 default: assert(0 != 0); break;
1636 if (listLength(server
.objfreelist
) > REDIS_OBJFREELIST_MAX
||
1637 !listAddNodeHead(server
.objfreelist
,o
))
1642 /* Try to share an object against the shared objects pool */
1643 static robj
*tryObjectSharing(robj
*o
) {
1644 struct dictEntry
*de
;
1647 if (o
== NULL
|| server
.shareobjects
== 0) return o
;
1649 assert(o
->type
== REDIS_STRING
);
1650 de
= dictFind(server
.sharingpool
,o
);
1652 robj
*shared
= dictGetEntryKey(de
);
1654 c
= ((unsigned long) dictGetEntryVal(de
))+1;
1655 dictGetEntryVal(de
) = (void*) c
;
1656 incrRefCount(shared
);
1660 /* Here we are using a stream algorihtm: Every time an object is
1661 * shared we increment its count, everytime there is a miss we
1662 * recrement the counter of a random object. If this object reaches
1663 * zero we remove the object and put the current object instead. */
1664 if (dictSize(server
.sharingpool
) >=
1665 server
.sharingpoolsize
) {
1666 de
= dictGetRandomKey(server
.sharingpool
);
1668 c
= ((unsigned long) dictGetEntryVal(de
))-1;
1669 dictGetEntryVal(de
) = (void*) c
;
1671 dictDelete(server
.sharingpool
,de
->key
);
1674 c
= 0; /* If the pool is empty we want to add this object */
1679 retval
= dictAdd(server
.sharingpool
,o
,(void*)1);
1680 assert(retval
== DICT_OK
);
1687 static robj
*lookupKey(redisDb
*db
, robj
*key
) {
1688 dictEntry
*de
= dictFind(db
->dict
,key
);
1689 return de
? dictGetEntryVal(de
) : NULL
;
1692 static robj
*lookupKeyRead(redisDb
*db
, robj
*key
) {
1693 expireIfNeeded(db
,key
);
1694 return lookupKey(db
,key
);
1697 static robj
*lookupKeyWrite(redisDb
*db
, robj
*key
) {
1698 deleteIfVolatile(db
,key
);
1699 return lookupKey(db
,key
);
1702 static int deleteKey(redisDb
*db
, robj
*key
) {
1705 /* We need to protect key from destruction: after the first dictDelete()
1706 * it may happen that 'key' is no longer valid if we don't increment
1707 * it's count. This may happen when we get the object reference directly
1708 * from the hash table with dictRandomKey() or dict iterators */
1710 if (dictSize(db
->expires
)) dictDelete(db
->expires
,key
);
1711 retval
= dictDelete(db
->dict
,key
);
1714 return retval
== DICT_OK
;
1717 /*============================ DB saving/loading ============================ */
1719 static int rdbSaveType(FILE *fp
, unsigned char type
) {
1720 if (fwrite(&type
,1,1,fp
) == 0) return -1;
1724 static int rdbSaveTime(FILE *fp
, time_t t
) {
1725 int32_t t32
= (int32_t) t
;
1726 if (fwrite(&t32
,4,1,fp
) == 0) return -1;
1730 /* check rdbLoadLen() comments for more info */
1731 static int rdbSaveLen(FILE *fp
, uint32_t len
) {
1732 unsigned char buf
[2];
1735 /* Save a 6 bit len */
1736 buf
[0] = (len
&0xFF)|(REDIS_RDB_6BITLEN
<<6);
1737 if (fwrite(buf
,1,1,fp
) == 0) return -1;
1738 } else if (len
< (1<<14)) {
1739 /* Save a 14 bit len */
1740 buf
[0] = ((len
>>8)&0xFF)|(REDIS_RDB_14BITLEN
<<6);
1742 if (fwrite(buf
,2,1,fp
) == 0) return -1;
1744 /* Save a 32 bit len */
1745 buf
[0] = (REDIS_RDB_32BITLEN
<<6);
1746 if (fwrite(buf
,1,1,fp
) == 0) return -1;
1748 if (fwrite(&len
,4,1,fp
) == 0) return -1;
1753 /* String objects in the form "2391" "-100" without any space and with a
1754 * range of values that can fit in an 8, 16 or 32 bit signed value can be
1755 * encoded as integers to save space */
1756 int rdbTryIntegerEncoding(sds s
, unsigned char *enc
) {
1758 char *endptr
, buf
[32];
1760 /* Check if it's possible to encode this value as a number */
1761 value
= strtoll(s
, &endptr
, 10);
1762 if (endptr
[0] != '\0') return 0;
1763 snprintf(buf
,32,"%lld",value
);
1765 /* If the number converted back into a string is not identical
1766 * then it's not possible to encode the string as integer */
1767 if (strlen(buf
) != sdslen(s
) || memcmp(buf
,s
,sdslen(s
))) return 0;
1769 /* Finally check if it fits in our ranges */
1770 if (value
>= -(1<<7) && value
<= (1<<7)-1) {
1771 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT8
;
1772 enc
[1] = value
&0xFF;
1774 } else if (value
>= -(1<<15) && value
<= (1<<15)-1) {
1775 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT16
;
1776 enc
[1] = value
&0xFF;
1777 enc
[2] = (value
>>8)&0xFF;
1779 } else if (value
>= -((long long)1<<31) && value
<= ((long long)1<<31)-1) {
1780 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT32
;
1781 enc
[1] = value
&0xFF;
1782 enc
[2] = (value
>>8)&0xFF;
1783 enc
[3] = (value
>>16)&0xFF;
1784 enc
[4] = (value
>>24)&0xFF;
1791 static int rdbSaveLzfStringObject(FILE *fp
, robj
*obj
) {
1792 unsigned int comprlen
, outlen
;
1796 /* We require at least four bytes compression for this to be worth it */
1797 outlen
= sdslen(obj
->ptr
)-4;
1798 if (outlen
<= 0) return 0;
1799 if ((out
= zmalloc(outlen
+1)) == NULL
) return 0;
1800 comprlen
= lzf_compress(obj
->ptr
, sdslen(obj
->ptr
), out
, outlen
);
1801 if (comprlen
== 0) {
1805 /* Data compressed! Let's save it on disk */
1806 byte
= (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_LZF
;
1807 if (fwrite(&byte
,1,1,fp
) == 0) goto writeerr
;
1808 if (rdbSaveLen(fp
,comprlen
) == -1) goto writeerr
;
1809 if (rdbSaveLen(fp
,sdslen(obj
->ptr
)) == -1) goto writeerr
;
1810 if (fwrite(out
,comprlen
,1,fp
) == 0) goto writeerr
;
1819 /* Save a string objet as [len][data] on disk. If the object is a string
1820 * representation of an integer value we try to safe it in a special form */
1821 static int rdbSaveStringObject(FILE *fp
, robj
*obj
) {
1822 size_t len
= sdslen(obj
->ptr
);
1825 /* Try integer encoding */
1827 unsigned char buf
[5];
1828 if ((enclen
= rdbTryIntegerEncoding(obj
->ptr
,buf
)) > 0) {
1829 if (fwrite(buf
,enclen
,1,fp
) == 0) return -1;
1834 /* Try LZF compression - under 20 bytes it's unable to compress even
1835 * aaaaaaaaaaaaaaaaaa so skip it */
1836 if (1 && len
> 20) {
1839 retval
= rdbSaveLzfStringObject(fp
,obj
);
1840 if (retval
== -1) return -1;
1841 if (retval
> 0) return 0;
1842 /* retval == 0 means data can't be compressed, save the old way */
1845 /* Store verbatim */
1846 if (rdbSaveLen(fp
,len
) == -1) return -1;
1847 if (len
&& fwrite(obj
->ptr
,len
,1,fp
) == 0) return -1;
1851 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
1852 static int rdbSave(char *filename
) {
1853 dictIterator
*di
= NULL
;
1858 time_t now
= time(NULL
);
1860 snprintf(tmpfile
,256,"temp-%d.%ld.rdb",(int)time(NULL
),(long int)random());
1861 fp
= fopen(tmpfile
,"w");
1863 redisLog(REDIS_WARNING
, "Failed saving the DB: %s", strerror(errno
));
1866 if (fwrite("REDIS0001",9,1,fp
) == 0) goto werr
;
1867 for (j
= 0; j
< server
.dbnum
; j
++) {
1868 redisDb
*db
= server
.db
+j
;
1870 if (dictSize(d
) == 0) continue;
1871 di
= dictGetIterator(d
);
1877 /* Write the SELECT DB opcode */
1878 if (rdbSaveType(fp
,REDIS_SELECTDB
) == -1) goto werr
;
1879 if (rdbSaveLen(fp
,j
) == -1) goto werr
;
1881 /* Iterate this DB writing every entry */
1882 while((de
= dictNext(di
)) != NULL
) {
1883 robj
*key
= dictGetEntryKey(de
);
1884 robj
*o
= dictGetEntryVal(de
);
1885 time_t expiretime
= getExpire(db
,key
);
1887 /* Save the expire time */
1888 if (expiretime
!= -1) {
1889 /* If this key is already expired skip it */
1890 if (expiretime
< now
) continue;
1891 if (rdbSaveType(fp
,REDIS_EXPIRETIME
) == -1) goto werr
;
1892 if (rdbSaveTime(fp
,expiretime
) == -1) goto werr
;
1894 /* Save the key and associated value */
1895 if (rdbSaveType(fp
,o
->type
) == -1) goto werr
;
1896 if (rdbSaveStringObject(fp
,key
) == -1) goto werr
;
1897 if (o
->type
== REDIS_STRING
) {
1898 /* Save a string value */
1899 if (rdbSaveStringObject(fp
,o
) == -1) goto werr
;
1900 } else if (o
->type
== REDIS_LIST
) {
1901 /* Save a list value */
1902 list
*list
= o
->ptr
;
1906 if (rdbSaveLen(fp
,listLength(list
)) == -1) goto werr
;
1907 while((ln
= listYield(list
))) {
1908 robj
*eleobj
= listNodeValue(ln
);
1910 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
1912 } else if (o
->type
== REDIS_SET
) {
1913 /* Save a set value */
1915 dictIterator
*di
= dictGetIterator(set
);
1918 if (!set
) oom("dictGetIteraotr");
1919 if (rdbSaveLen(fp
,dictSize(set
)) == -1) goto werr
;
1920 while((de
= dictNext(di
)) != NULL
) {
1921 robj
*eleobj
= dictGetEntryKey(de
);
1923 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
1925 dictReleaseIterator(di
);
1930 dictReleaseIterator(di
);
1933 if (rdbSaveType(fp
,REDIS_EOF
) == -1) goto werr
;
1935 /* Make sure data will not remain on the OS's output buffers */
1940 /* Use RENAME to make sure the DB file is changed atomically only
1941 * if the generate DB file is ok. */
1942 if (rename(tmpfile
,filename
) == -1) {
1943 redisLog(REDIS_WARNING
,"Error moving temp DB file on the final destionation: %s", strerror(errno
));
1947 redisLog(REDIS_NOTICE
,"DB saved on disk");
1949 server
.lastsave
= time(NULL
);
1955 redisLog(REDIS_WARNING
,"Write error saving DB on disk: %s", strerror(errno
));
1956 if (di
) dictReleaseIterator(di
);
1960 static int rdbSaveBackground(char *filename
) {
1963 if (server
.bgsaveinprogress
) return REDIS_ERR
;
1964 if ((childpid
= fork()) == 0) {
1967 if (rdbSave(filename
) == REDIS_OK
) {
1974 if (childpid
== -1) {
1975 redisLog(REDIS_WARNING
,"Can't save in background: fork: %s",
1979 redisLog(REDIS_NOTICE
,"Background saving started by pid %d",childpid
);
1980 server
.bgsaveinprogress
= 1;
1983 return REDIS_OK
; /* unreached */
1986 static int rdbLoadType(FILE *fp
) {
1988 if (fread(&type
,1,1,fp
) == 0) return -1;
1992 static time_t rdbLoadTime(FILE *fp
) {
1994 if (fread(&t32
,4,1,fp
) == 0) return -1;
1995 return (time_t) t32
;
1998 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
1999 * of this file for a description of how this are stored on disk.
2001 * isencoded is set to 1 if the readed length is not actually a length but
2002 * an "encoding type", check the above comments for more info */
2003 static uint32_t rdbLoadLen(FILE *fp
, int rdbver
, int *isencoded
) {
2004 unsigned char buf
[2];
2007 if (isencoded
) *isencoded
= 0;
2009 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
2014 if (fread(buf
,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2015 type
= (buf
[0]&0xC0)>>6;
2016 if (type
== REDIS_RDB_6BITLEN
) {
2017 /* Read a 6 bit len */
2019 } else if (type
== REDIS_RDB_ENCVAL
) {
2020 /* Read a 6 bit len encoding type */
2021 if (isencoded
) *isencoded
= 1;
2023 } else if (type
== REDIS_RDB_14BITLEN
) {
2024 /* Read a 14 bit len */
2025 if (fread(buf
+1,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2026 return ((buf
[0]&0x3F)<<8)|buf
[1];
2028 /* Read a 32 bit len */
2029 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
2035 static robj
*rdbLoadIntegerObject(FILE *fp
, int enctype
) {
2036 unsigned char enc
[4];
2039 if (enctype
== REDIS_RDB_ENC_INT8
) {
2040 if (fread(enc
,1,1,fp
) == 0) return NULL
;
2041 val
= (signed char)enc
[0];
2042 } else if (enctype
== REDIS_RDB_ENC_INT16
) {
2044 if (fread(enc
,2,1,fp
) == 0) return NULL
;
2045 v
= enc
[0]|(enc
[1]<<8);
2047 } else if (enctype
== REDIS_RDB_ENC_INT32
) {
2049 if (fread(enc
,4,1,fp
) == 0) return NULL
;
2050 v
= enc
[0]|(enc
[1]<<8)|(enc
[2]<<16)|(enc
[3]<<24);
2053 val
= 0; /* anti-warning */
2056 return createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",val
));
2059 static robj
*rdbLoadLzfStringObject(FILE*fp
, int rdbver
) {
2060 unsigned int len
, clen
;
2061 unsigned char *c
= NULL
;
2064 if ((clen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2065 if ((len
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2066 if ((c
= zmalloc(clen
)) == NULL
) goto err
;
2067 if ((val
= sdsnewlen(NULL
,len
)) == NULL
) goto err
;
2068 if (fread(c
,clen
,1,fp
) == 0) goto err
;
2069 if (lzf_decompress(c
,clen
,val
,len
) == 0) goto err
;
2071 return createObject(REDIS_STRING
,val
);
2078 static robj
*rdbLoadStringObject(FILE*fp
, int rdbver
) {
2083 len
= rdbLoadLen(fp
,rdbver
,&isencoded
);
2086 case REDIS_RDB_ENC_INT8
:
2087 case REDIS_RDB_ENC_INT16
:
2088 case REDIS_RDB_ENC_INT32
:
2089 return tryObjectSharing(rdbLoadIntegerObject(fp
,len
));
2090 case REDIS_RDB_ENC_LZF
:
2091 return tryObjectSharing(rdbLoadLzfStringObject(fp
,rdbver
));
2097 if (len
== REDIS_RDB_LENERR
) return NULL
;
2098 val
= sdsnewlen(NULL
,len
);
2099 if (len
&& fread(val
,len
,1,fp
) == 0) {
2103 return tryObjectSharing(createObject(REDIS_STRING
,val
));
2106 static int rdbLoad(char *filename
) {
2108 robj
*keyobj
= NULL
;
2110 int type
, retval
, rdbver
;
2111 dict
*d
= server
.db
[0].dict
;
2112 redisDb
*db
= server
.db
+0;
2114 time_t expiretime
= -1, now
= time(NULL
);
2116 fp
= fopen(filename
,"r");
2117 if (!fp
) return REDIS_ERR
;
2118 if (fread(buf
,9,1,fp
) == 0) goto eoferr
;
2120 if (memcmp(buf
,"REDIS",5) != 0) {
2122 redisLog(REDIS_WARNING
,"Wrong signature trying to load DB from file");
2125 rdbver
= atoi(buf
+5);
2128 redisLog(REDIS_WARNING
,"Can't handle RDB format version %d",rdbver
);
2135 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
2136 if (type
== REDIS_EXPIRETIME
) {
2137 if ((expiretime
= rdbLoadTime(fp
)) == -1) goto eoferr
;
2138 /* We read the time so we need to read the object type again */
2139 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
2141 if (type
== REDIS_EOF
) break;
2142 /* Handle SELECT DB opcode as a special case */
2143 if (type
== REDIS_SELECTDB
) {
2144 if ((dbid
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2146 if (dbid
>= (unsigned)server
.dbnum
) {
2147 redisLog(REDIS_WARNING
,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server
.dbnum
);
2150 db
= server
.db
+dbid
;
2155 if ((keyobj
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2157 if (type
== REDIS_STRING
) {
2158 /* Read string value */
2159 if ((o
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2160 } else if (type
== REDIS_LIST
|| type
== REDIS_SET
) {
2161 /* Read list/set value */
2164 if ((listlen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2166 o
= (type
== REDIS_LIST
) ? createListObject() : createSetObject();
2167 /* Load every single element of the list/set */
2171 if ((ele
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2172 if (type
== REDIS_LIST
) {
2173 if (!listAddNodeTail((list
*)o
->ptr
,ele
))
2174 oom("listAddNodeTail");
2176 if (dictAdd((dict
*)o
->ptr
,ele
,NULL
) == DICT_ERR
)
2183 /* Add the new object in the hash table */
2184 retval
= dictAdd(d
,keyobj
,o
);
2185 if (retval
== DICT_ERR
) {
2186 redisLog(REDIS_WARNING
,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj
->ptr
);
2189 /* Set the expire time if needed */
2190 if (expiretime
!= -1) {
2191 setExpire(db
,keyobj
,expiretime
);
2192 /* Delete this key if already expired */
2193 if (expiretime
< now
) deleteKey(db
,keyobj
);
2201 eoferr
: /* unexpected end of file is handled here with a fatal exit */
2202 if (keyobj
) decrRefCount(keyobj
);
2203 redisLog(REDIS_WARNING
,"Short read or OOM loading DB. Unrecoverable error, exiting now.");
2205 return REDIS_ERR
; /* Just to avoid warning */
2208 /*================================== Commands =============================== */
2210 static void authCommand(redisClient
*c
) {
2211 if (!server
.requirepass
|| !strcmp(c
->argv
[1]->ptr
, server
.requirepass
)) {
2212 c
->authenticated
= 1;
2213 addReply(c
,shared
.ok
);
2215 c
->authenticated
= 0;
2216 addReply(c
,shared
.err
);
2220 static void pingCommand(redisClient
*c
) {
2221 addReply(c
,shared
.pong
);
2224 static void echoCommand(redisClient
*c
) {
2225 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",
2226 (int)sdslen(c
->argv
[1]->ptr
)));
2227 addReply(c
,c
->argv
[1]);
2228 addReply(c
,shared
.crlf
);
2231 /*=================================== Strings =============================== */
2233 static void setGenericCommand(redisClient
*c
, int nx
) {
2236 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2237 if (retval
== DICT_ERR
) {
2239 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2240 incrRefCount(c
->argv
[2]);
2242 addReply(c
,shared
.czero
);
2246 incrRefCount(c
->argv
[1]);
2247 incrRefCount(c
->argv
[2]);
2250 removeExpire(c
->db
,c
->argv
[1]);
2251 addReply(c
, nx
? shared
.cone
: shared
.ok
);
2254 static void setCommand(redisClient
*c
) {
2255 setGenericCommand(c
,0);
2258 static void setnxCommand(redisClient
*c
) {
2259 setGenericCommand(c
,1);
2262 static void getCommand(redisClient
*c
) {
2263 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2266 addReply(c
,shared
.nullbulk
);
2268 if (o
->type
!= REDIS_STRING
) {
2269 addReply(c
,shared
.wrongtypeerr
);
2271 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(o
->ptr
)));
2273 addReply(c
,shared
.crlf
);
2278 static void getSetCommand(redisClient
*c
) {
2280 if (dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]) == DICT_ERR
) {
2281 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2283 incrRefCount(c
->argv
[1]);
2285 incrRefCount(c
->argv
[2]);
2287 removeExpire(c
->db
,c
->argv
[1]);
2290 static void mgetCommand(redisClient
*c
) {
2293 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",c
->argc
-1));
2294 for (j
= 1; j
< c
->argc
; j
++) {
2295 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[j
]);
2297 addReply(c
,shared
.nullbulk
);
2299 if (o
->type
!= REDIS_STRING
) {
2300 addReply(c
,shared
.nullbulk
);
2302 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(o
->ptr
)));
2304 addReply(c
,shared
.crlf
);
2310 static void incrDecrCommand(redisClient
*c
, long long incr
) {
2315 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2319 if (o
->type
!= REDIS_STRING
) {
2324 value
= strtoll(o
->ptr
, &eptr
, 10);
2329 o
= createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",value
));
2330 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],o
);
2331 if (retval
== DICT_ERR
) {
2332 dictReplace(c
->db
->dict
,c
->argv
[1],o
);
2333 removeExpire(c
->db
,c
->argv
[1]);
2335 incrRefCount(c
->argv
[1]);
2338 addReply(c
,shared
.colon
);
2340 addReply(c
,shared
.crlf
);
2343 static void incrCommand(redisClient
*c
) {
2344 incrDecrCommand(c
,1);
2347 static void decrCommand(redisClient
*c
) {
2348 incrDecrCommand(c
,-1);
2351 static void incrbyCommand(redisClient
*c
) {
2352 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
2353 incrDecrCommand(c
,incr
);
2356 static void decrbyCommand(redisClient
*c
) {
2357 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
2358 incrDecrCommand(c
,-incr
);
2361 /* ========================= Type agnostic commands ========================= */
2363 static void delCommand(redisClient
*c
) {
2366 for (j
= 1; j
< c
->argc
; j
++) {
2367 if (deleteKey(c
->db
,c
->argv
[j
])) {
2374 addReply(c
,shared
.czero
);
2377 addReply(c
,shared
.cone
);
2380 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",deleted
));
2385 static void existsCommand(redisClient
*c
) {
2386 addReply(c
,lookupKeyRead(c
->db
,c
->argv
[1]) ? shared
.cone
: shared
.czero
);
2389 static void selectCommand(redisClient
*c
) {
2390 int id
= atoi(c
->argv
[1]->ptr
);
2392 if (selectDb(c
,id
) == REDIS_ERR
) {
2393 addReplySds(c
,sdsnew("-ERR invalid DB index\r\n"));
2395 addReply(c
,shared
.ok
);
2399 static void randomkeyCommand(redisClient
*c
) {
2403 de
= dictGetRandomKey(c
->db
->dict
);
2404 if (!de
|| expireIfNeeded(c
->db
,dictGetEntryKey(de
)) == 0) break;
2407 addReply(c
,shared
.plus
);
2408 addReply(c
,shared
.crlf
);
2410 addReply(c
,shared
.plus
);
2411 addReply(c
,dictGetEntryKey(de
));
2412 addReply(c
,shared
.crlf
);
2416 static void keysCommand(redisClient
*c
) {
2419 sds pattern
= c
->argv
[1]->ptr
;
2420 int plen
= sdslen(pattern
);
2421 int numkeys
= 0, keyslen
= 0;
2422 robj
*lenobj
= createObject(REDIS_STRING
,NULL
);
2424 di
= dictGetIterator(c
->db
->dict
);
2425 if (!di
) oom("dictGetIterator");
2427 decrRefCount(lenobj
);
2428 while((de
= dictNext(di
)) != NULL
) {
2429 robj
*keyobj
= dictGetEntryKey(de
);
2431 sds key
= keyobj
->ptr
;
2432 if ((pattern
[0] == '*' && pattern
[1] == '\0') ||
2433 stringmatchlen(pattern
,plen
,key
,sdslen(key
),0)) {
2434 if (expireIfNeeded(c
->db
,keyobj
) == 0) {
2436 addReply(c
,shared
.space
);
2439 keyslen
+= sdslen(key
);
2443 dictReleaseIterator(di
);
2444 lenobj
->ptr
= sdscatprintf(sdsempty(),"$%lu\r\n",keyslen
+(numkeys
? (numkeys
-1) : 0));
2445 addReply(c
,shared
.crlf
);
2448 static void dbsizeCommand(redisClient
*c
) {
2450 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c
->db
->dict
)));
2453 static void lastsaveCommand(redisClient
*c
) {
2455 sdscatprintf(sdsempty(),":%lu\r\n",server
.lastsave
));
2458 static void typeCommand(redisClient
*c
) {
2462 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2467 case REDIS_STRING
: type
= "+string"; break;
2468 case REDIS_LIST
: type
= "+list"; break;
2469 case REDIS_SET
: type
= "+set"; break;
2470 default: type
= "unknown"; break;
2473 addReplySds(c
,sdsnew(type
));
2474 addReply(c
,shared
.crlf
);
2477 static void saveCommand(redisClient
*c
) {
2478 if (server
.bgsaveinprogress
) {
2479 addReplySds(c
,sdsnew("-ERR background save in progress\r\n"));
2482 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
2483 addReply(c
,shared
.ok
);
2485 addReply(c
,shared
.err
);
2489 static void bgsaveCommand(redisClient
*c
) {
2490 if (server
.bgsaveinprogress
) {
2491 addReplySds(c
,sdsnew("-ERR background save already in progress\r\n"));
2494 if (rdbSaveBackground(server
.dbfilename
) == REDIS_OK
) {
2495 addReply(c
,shared
.ok
);
2497 addReply(c
,shared
.err
);
2501 static void shutdownCommand(redisClient
*c
) {
2502 redisLog(REDIS_WARNING
,"User requested shutdown, saving DB...");
2503 /* XXX: TODO kill the child if there is a bgsave in progress */
2504 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
2505 if (server
.daemonize
) {
2506 unlink(server
.pidfile
);
2508 redisLog(REDIS_WARNING
,"%zu bytes used at exit",zmalloc_used_memory());
2509 redisLog(REDIS_WARNING
,"Server exit now, bye bye...");
2512 redisLog(REDIS_WARNING
,"Error trying to save the DB, can't exit");
2513 addReplySds(c
,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
2517 static void renameGenericCommand(redisClient
*c
, int nx
) {
2520 /* To use the same key as src and dst is probably an error */
2521 if (sdscmp(c
->argv
[1]->ptr
,c
->argv
[2]->ptr
) == 0) {
2522 addReply(c
,shared
.sameobjecterr
);
2526 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2528 addReply(c
,shared
.nokeyerr
);
2532 deleteIfVolatile(c
->db
,c
->argv
[2]);
2533 if (dictAdd(c
->db
->dict
,c
->argv
[2],o
) == DICT_ERR
) {
2536 addReply(c
,shared
.czero
);
2539 dictReplace(c
->db
->dict
,c
->argv
[2],o
);
2541 incrRefCount(c
->argv
[2]);
2543 deleteKey(c
->db
,c
->argv
[1]);
2545 addReply(c
,nx
? shared
.cone
: shared
.ok
);
2548 static void renameCommand(redisClient
*c
) {
2549 renameGenericCommand(c
,0);
2552 static void renamenxCommand(redisClient
*c
) {
2553 renameGenericCommand(c
,1);
2556 static void moveCommand(redisClient
*c
) {
2561 /* Obtain source and target DB pointers */
2564 if (selectDb(c
,atoi(c
->argv
[2]->ptr
)) == REDIS_ERR
) {
2565 addReply(c
,shared
.outofrangeerr
);
2569 selectDb(c
,srcid
); /* Back to the source DB */
2571 /* If the user is moving using as target the same
2572 * DB as the source DB it is probably an error. */
2574 addReply(c
,shared
.sameobjecterr
);
2578 /* Check if the element exists and get a reference */
2579 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2581 addReply(c
,shared
.czero
);
2585 /* Try to add the element to the target DB */
2586 deleteIfVolatile(dst
,c
->argv
[1]);
2587 if (dictAdd(dst
->dict
,c
->argv
[1],o
) == DICT_ERR
) {
2588 addReply(c
,shared
.czero
);
2591 incrRefCount(c
->argv
[1]);
2594 /* OK! key moved, free the entry in the source DB */
2595 deleteKey(src
,c
->argv
[1]);
2597 addReply(c
,shared
.cone
);
2600 /* =================================== Lists ================================ */
2601 static void pushGenericCommand(redisClient
*c
, int where
) {
2605 lobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2607 lobj
= createListObject();
2609 if (where
== REDIS_HEAD
) {
2610 if (!listAddNodeHead(list
,c
->argv
[2])) oom("listAddNodeHead");
2612 if (!listAddNodeTail(list
,c
->argv
[2])) oom("listAddNodeTail");
2614 dictAdd(c
->db
->dict
,c
->argv
[1],lobj
);
2615 incrRefCount(c
->argv
[1]);
2616 incrRefCount(c
->argv
[2]);
2618 if (lobj
->type
!= REDIS_LIST
) {
2619 addReply(c
,shared
.wrongtypeerr
);
2623 if (where
== REDIS_HEAD
) {
2624 if (!listAddNodeHead(list
,c
->argv
[2])) oom("listAddNodeHead");
2626 if (!listAddNodeTail(list
,c
->argv
[2])) oom("listAddNodeTail");
2628 incrRefCount(c
->argv
[2]);
2631 addReply(c
,shared
.ok
);
2634 static void lpushCommand(redisClient
*c
) {
2635 pushGenericCommand(c
,REDIS_HEAD
);
2638 static void rpushCommand(redisClient
*c
) {
2639 pushGenericCommand(c
,REDIS_TAIL
);
2642 static void llenCommand(redisClient
*c
) {
2646 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2648 addReply(c
,shared
.czero
);
2651 if (o
->type
!= REDIS_LIST
) {
2652 addReply(c
,shared
.wrongtypeerr
);
2655 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",listLength(l
)));
2660 static void lindexCommand(redisClient
*c
) {
2662 int index
= atoi(c
->argv
[2]->ptr
);
2664 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2666 addReply(c
,shared
.nullbulk
);
2668 if (o
->type
!= REDIS_LIST
) {
2669 addReply(c
,shared
.wrongtypeerr
);
2671 list
*list
= o
->ptr
;
2674 ln
= listIndex(list
, index
);
2676 addReply(c
,shared
.nullbulk
);
2678 robj
*ele
= listNodeValue(ln
);
2679 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele
->ptr
)));
2681 addReply(c
,shared
.crlf
);
2687 static void lsetCommand(redisClient
*c
) {
2689 int index
= atoi(c
->argv
[2]->ptr
);
2691 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2693 addReply(c
,shared
.nokeyerr
);
2695 if (o
->type
!= REDIS_LIST
) {
2696 addReply(c
,shared
.wrongtypeerr
);
2698 list
*list
= o
->ptr
;
2701 ln
= listIndex(list
, index
);
2703 addReply(c
,shared
.outofrangeerr
);
2705 robj
*ele
= listNodeValue(ln
);
2708 listNodeValue(ln
) = c
->argv
[3];
2709 incrRefCount(c
->argv
[3]);
2710 addReply(c
,shared
.ok
);
2717 static void popGenericCommand(redisClient
*c
, int where
) {
2720 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2722 addReply(c
,shared
.nullbulk
);
2724 if (o
->type
!= REDIS_LIST
) {
2725 addReply(c
,shared
.wrongtypeerr
);
2727 list
*list
= o
->ptr
;
2730 if (where
== REDIS_HEAD
)
2731 ln
= listFirst(list
);
2733 ln
= listLast(list
);
2736 addReply(c
,shared
.nullbulk
);
2738 robj
*ele
= listNodeValue(ln
);
2739 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele
->ptr
)));
2741 addReply(c
,shared
.crlf
);
2742 listDelNode(list
,ln
);
2749 static void lpopCommand(redisClient
*c
) {
2750 popGenericCommand(c
,REDIS_HEAD
);
2753 static void rpopCommand(redisClient
*c
) {
2754 popGenericCommand(c
,REDIS_TAIL
);
2757 static void lrangeCommand(redisClient
*c
) {
2759 int start
= atoi(c
->argv
[2]->ptr
);
2760 int end
= atoi(c
->argv
[3]->ptr
);
2762 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2764 addReply(c
,shared
.nullmultibulk
);
2766 if (o
->type
!= REDIS_LIST
) {
2767 addReply(c
,shared
.wrongtypeerr
);
2769 list
*list
= o
->ptr
;
2771 int llen
= listLength(list
);
2775 /* convert negative indexes */
2776 if (start
< 0) start
= llen
+start
;
2777 if (end
< 0) end
= llen
+end
;
2778 if (start
< 0) start
= 0;
2779 if (end
< 0) end
= 0;
2781 /* indexes sanity checks */
2782 if (start
> end
|| start
>= llen
) {
2783 /* Out of range start or start > end result in empty list */
2784 addReply(c
,shared
.emptymultibulk
);
2787 if (end
>= llen
) end
= llen
-1;
2788 rangelen
= (end
-start
)+1;
2790 /* Return the result in form of a multi-bulk reply */
2791 ln
= listIndex(list
, start
);
2792 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",rangelen
));
2793 for (j
= 0; j
< rangelen
; j
++) {
2794 ele
= listNodeValue(ln
);
2795 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele
->ptr
)));
2797 addReply(c
,shared
.crlf
);
2804 static void ltrimCommand(redisClient
*c
) {
2806 int start
= atoi(c
->argv
[2]->ptr
);
2807 int end
= atoi(c
->argv
[3]->ptr
);
2809 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2811 addReply(c
,shared
.nokeyerr
);
2813 if (o
->type
!= REDIS_LIST
) {
2814 addReply(c
,shared
.wrongtypeerr
);
2816 list
*list
= o
->ptr
;
2818 int llen
= listLength(list
);
2819 int j
, ltrim
, rtrim
;
2821 /* convert negative indexes */
2822 if (start
< 0) start
= llen
+start
;
2823 if (end
< 0) end
= llen
+end
;
2824 if (start
< 0) start
= 0;
2825 if (end
< 0) end
= 0;
2827 /* indexes sanity checks */
2828 if (start
> end
|| start
>= llen
) {
2829 /* Out of range start or start > end result in empty list */
2833 if (end
>= llen
) end
= llen
-1;
2838 /* Remove list elements to perform the trim */
2839 for (j
= 0; j
< ltrim
; j
++) {
2840 ln
= listFirst(list
);
2841 listDelNode(list
,ln
);
2843 for (j
= 0; j
< rtrim
; j
++) {
2844 ln
= listLast(list
);
2845 listDelNode(list
,ln
);
2847 addReply(c
,shared
.ok
);
2853 static void lremCommand(redisClient
*c
) {
2856 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2858 addReply(c
,shared
.nokeyerr
);
2860 if (o
->type
!= REDIS_LIST
) {
2861 addReply(c
,shared
.wrongtypeerr
);
2863 list
*list
= o
->ptr
;
2864 listNode
*ln
, *next
;
2865 int toremove
= atoi(c
->argv
[2]->ptr
);
2870 toremove
= -toremove
;
2873 ln
= fromtail
? list
->tail
: list
->head
;
2875 robj
*ele
= listNodeValue(ln
);
2877 next
= fromtail
? ln
->prev
: ln
->next
;
2878 if (sdscmp(ele
->ptr
,c
->argv
[3]->ptr
) == 0) {
2879 listDelNode(list
,ln
);
2882 if (toremove
&& removed
== toremove
) break;
2886 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",removed
));
2891 /* ==================================== Sets ================================ */
2893 static void saddCommand(redisClient
*c
) {
2896 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2898 set
= createSetObject();
2899 dictAdd(c
->db
->dict
,c
->argv
[1],set
);
2900 incrRefCount(c
->argv
[1]);
2902 if (set
->type
!= REDIS_SET
) {
2903 addReply(c
,shared
.wrongtypeerr
);
2907 if (dictAdd(set
->ptr
,c
->argv
[2],NULL
) == DICT_OK
) {
2908 incrRefCount(c
->argv
[2]);
2910 addReply(c
,shared
.cone
);
2912 addReply(c
,shared
.czero
);
2916 static void sremCommand(redisClient
*c
) {
2919 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2921 addReply(c
,shared
.czero
);
2923 if (set
->type
!= REDIS_SET
) {
2924 addReply(c
,shared
.wrongtypeerr
);
2927 if (dictDelete(set
->ptr
,c
->argv
[2]) == DICT_OK
) {
2929 addReply(c
,shared
.cone
);
2931 addReply(c
,shared
.czero
);
2936 static void smoveCommand(redisClient
*c
) {
2937 robj
*srcset
, *dstset
;
2939 srcset
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2940 dstset
= lookupKeyWrite(c
->db
,c
->argv
[2]);
2942 /* If the source key does not exist return 0, if it's of the wrong type
2944 if (srcset
== NULL
|| srcset
->type
!= REDIS_SET
) {
2945 addReply(c
, srcset
? shared
.wrongtypeerr
: shared
.czero
);
2948 /* Error if the destination key is not a set as well */
2949 if (dstset
&& dstset
->type
!= REDIS_SET
) {
2950 addReply(c
,shared
.wrongtypeerr
);
2953 /* Remove the element from the source set */
2954 if (dictDelete(srcset
->ptr
,c
->argv
[3]) == DICT_ERR
) {
2955 /* Key not found in the src set! return zero */
2956 addReply(c
,shared
.czero
);
2960 /* Add the element to the destination set */
2962 dstset
= createSetObject();
2963 dictAdd(c
->db
->dict
,c
->argv
[2],dstset
);
2964 incrRefCount(c
->argv
[2]);
2966 if (dictAdd(dstset
->ptr
,c
->argv
[3],NULL
) == DICT_OK
)
2967 incrRefCount(c
->argv
[3]);
2968 addReply(c
,shared
.cone
);
2971 static void sismemberCommand(redisClient
*c
) {
2974 set
= lookupKeyRead(c
->db
,c
->argv
[1]);
2976 addReply(c
,shared
.czero
);
2978 if (set
->type
!= REDIS_SET
) {
2979 addReply(c
,shared
.wrongtypeerr
);
2982 if (dictFind(set
->ptr
,c
->argv
[2]))
2983 addReply(c
,shared
.cone
);
2985 addReply(c
,shared
.czero
);
2989 static void scardCommand(redisClient
*c
) {
2993 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2995 addReply(c
,shared
.czero
);
2998 if (o
->type
!= REDIS_SET
) {
2999 addReply(c
,shared
.wrongtypeerr
);
3002 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",
3008 static int qsortCompareSetsByCardinality(const void *s1
, const void *s2
) {
3009 dict
**d1
= (void*) s1
, **d2
= (void*) s2
;
3011 return dictSize(*d1
)-dictSize(*d2
);
3014 static void sinterGenericCommand(redisClient
*c
, robj
**setskeys
, int setsnum
, robj
*dstkey
) {
3015 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
3018 robj
*lenobj
= NULL
, *dstset
= NULL
;
3019 int j
, cardinality
= 0;
3021 if (!dv
) oom("sinterGenericCommand");
3022 for (j
= 0; j
< setsnum
; j
++) {
3026 lookupKeyWrite(c
->db
,setskeys
[j
]) :
3027 lookupKeyRead(c
->db
,setskeys
[j
]);
3031 deleteKey(c
->db
,dstkey
);
3032 addReply(c
,shared
.ok
);
3034 addReply(c
,shared
.nullmultibulk
);
3038 if (setobj
->type
!= REDIS_SET
) {
3040 addReply(c
,shared
.wrongtypeerr
);
3043 dv
[j
] = setobj
->ptr
;
3045 /* Sort sets from the smallest to largest, this will improve our
3046 * algorithm's performace */
3047 qsort(dv
,setsnum
,sizeof(dict
*),qsortCompareSetsByCardinality
);
3049 /* The first thing we should output is the total number of elements...
3050 * since this is a multi-bulk write, but at this stage we don't know
3051 * the intersection set size, so we use a trick, append an empty object
3052 * to the output list and save the pointer to later modify it with the
3055 lenobj
= createObject(REDIS_STRING
,NULL
);
3057 decrRefCount(lenobj
);
3059 /* If we have a target key where to store the resulting set
3060 * create this key with an empty set inside */
3061 dstset
= createSetObject();
3064 /* Iterate all the elements of the first (smallest) set, and test
3065 * the element against all the other sets, if at least one set does
3066 * not include the element it is discarded */
3067 di
= dictGetIterator(dv
[0]);
3068 if (!di
) oom("dictGetIterator");
3070 while((de
= dictNext(di
)) != NULL
) {
3073 for (j
= 1; j
< setsnum
; j
++)
3074 if (dictFind(dv
[j
],dictGetEntryKey(de
)) == NULL
) break;
3076 continue; /* at least one set does not contain the member */
3077 ele
= dictGetEntryKey(de
);
3079 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(ele
->ptr
)));
3081 addReply(c
,shared
.crlf
);
3084 dictAdd(dstset
->ptr
,ele
,NULL
);
3088 dictReleaseIterator(di
);
3091 /* Store the resulting set into the target */
3092 deleteKey(c
->db
,dstkey
);
3093 dictAdd(c
->db
->dict
,dstkey
,dstset
);
3094 incrRefCount(dstkey
);
3098 lenobj
->ptr
= sdscatprintf(sdsempty(),"*%d\r\n",cardinality
);
3100 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",
3101 dictSize((dict
*)dstset
->ptr
)));
3107 static void sinterCommand(redisClient
*c
) {
3108 sinterGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
);
3111 static void sinterstoreCommand(redisClient
*c
) {
3112 sinterGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1]);
3115 #define REDIS_OP_UNION 0
3116 #define REDIS_OP_DIFF 1
3118 static void sunionDiffGenericCommand(redisClient
*c
, robj
**setskeys
, int setsnum
, robj
*dstkey
, int op
) {
3119 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
3122 robj
*dstset
= NULL
;
3123 int j
, cardinality
= 0;
3125 if (!dv
) oom("sunionDiffGenericCommand");
3126 for (j
= 0; j
< setsnum
; j
++) {
3130 lookupKeyWrite(c
->db
,setskeys
[j
]) :
3131 lookupKeyRead(c
->db
,setskeys
[j
]);
3136 if (setobj
->type
!= REDIS_SET
) {
3138 addReply(c
,shared
.wrongtypeerr
);
3141 dv
[j
] = setobj
->ptr
;
3144 /* We need a temp set object to store our union. If the dstkey
3145 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
3146 * this set object will be the resulting object to set into the target key*/
3147 dstset
= createSetObject();
3149 /* Iterate all the elements of all the sets, add every element a single
3150 * time to the result set */
3151 for (j
= 0; j
< setsnum
; j
++) {
3152 if (op
== REDIS_OP_DIFF
&& j
== 0 && !dv
[j
]) break; /* result set is empty */
3153 if (!dv
[j
]) continue; /* non existing keys are like empty sets */
3155 di
= dictGetIterator(dv
[j
]);
3156 if (!di
) oom("dictGetIterator");
3158 while((de
= dictNext(di
)) != NULL
) {
3161 /* dictAdd will not add the same element multiple times */
3162 ele
= dictGetEntryKey(de
);
3163 if (op
== REDIS_OP_UNION
|| j
== 0) {
3164 if (dictAdd(dstset
->ptr
,ele
,NULL
) == DICT_OK
) {
3168 } else if (op
== REDIS_OP_DIFF
) {
3169 if (dictDelete(dstset
->ptr
,ele
) == DICT_OK
) {
3174 dictReleaseIterator(di
);
3176 if (op
== REDIS_OP_DIFF
&& cardinality
== 0) break; /* result set is empty */
3179 /* Output the content of the resulting set, if not in STORE mode */
3181 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",cardinality
));
3182 di
= dictGetIterator(dstset
->ptr
);
3183 if (!di
) oom("dictGetIterator");
3184 while((de
= dictNext(di
)) != NULL
) {
3187 ele
= dictGetEntryKey(de
);
3188 addReplySds(c
,sdscatprintf(sdsempty(),
3189 "$%d\r\n",sdslen(ele
->ptr
)));
3191 addReply(c
,shared
.crlf
);
3193 dictReleaseIterator(di
);
3195 /* If we have a target key where to store the resulting set
3196 * create this key with the result set inside */
3197 deleteKey(c
->db
,dstkey
);
3198 dictAdd(c
->db
->dict
,dstkey
,dstset
);
3199 incrRefCount(dstkey
);
3204 decrRefCount(dstset
);
3206 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",
3207 dictSize((dict
*)dstset
->ptr
)));
3213 static void sunionCommand(redisClient
*c
) {
3214 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_UNION
);
3217 static void sunionstoreCommand(redisClient
*c
) {
3218 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_UNION
);
3221 static void sdiffCommand(redisClient
*c
) {
3222 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_DIFF
);
3225 static void sdiffstoreCommand(redisClient
*c
) {
3226 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_DIFF
);
3229 static void flushdbCommand(redisClient
*c
) {
3230 server
.dirty
+= dictSize(c
->db
->dict
);
3231 dictEmpty(c
->db
->dict
);
3232 dictEmpty(c
->db
->expires
);
3233 addReply(c
,shared
.ok
);
3236 static void flushallCommand(redisClient
*c
) {
3237 server
.dirty
+= emptyDb();
3238 addReply(c
,shared
.ok
);
3239 rdbSave(server
.dbfilename
);
3243 redisSortOperation
*createSortOperation(int type
, robj
*pattern
) {
3244 redisSortOperation
*so
= zmalloc(sizeof(*so
));
3245 if (!so
) oom("createSortOperation");
3247 so
->pattern
= pattern
;
3251 /* Return the value associated to the key with a name obtained
3252 * substituting the first occurence of '*' in 'pattern' with 'subst' */
3253 robj
*lookupKeyByPattern(redisDb
*db
, robj
*pattern
, robj
*subst
) {
3257 int prefixlen
, sublen
, postfixlen
;
3258 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
3262 char buf
[REDIS_SORTKEY_MAX
+1];
3265 spat
= pattern
->ptr
;
3267 if (sdslen(spat
)+sdslen(ssub
)-1 > REDIS_SORTKEY_MAX
) return NULL
;
3268 p
= strchr(spat
,'*');
3269 if (!p
) return NULL
;
3272 sublen
= sdslen(ssub
);
3273 postfixlen
= sdslen(spat
)-(prefixlen
+1);
3274 memcpy(keyname
.buf
,spat
,prefixlen
);
3275 memcpy(keyname
.buf
+prefixlen
,ssub
,sublen
);
3276 memcpy(keyname
.buf
+prefixlen
+sublen
,p
+1,postfixlen
);
3277 keyname
.buf
[prefixlen
+sublen
+postfixlen
] = '\0';
3278 keyname
.len
= prefixlen
+sublen
+postfixlen
;
3280 keyobj
.refcount
= 1;
3281 keyobj
.type
= REDIS_STRING
;
3282 keyobj
.ptr
= ((char*)&keyname
)+(sizeof(long)*2);
3284 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
3285 return lookupKeyRead(db
,&keyobj
);
3288 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
3289 * the additional parameter is not standard but a BSD-specific we have to
3290 * pass sorting parameters via the global 'server' structure */
3291 static int sortCompare(const void *s1
, const void *s2
) {
3292 const redisSortObject
*so1
= s1
, *so2
= s2
;
3295 if (!server
.sort_alpha
) {
3296 /* Numeric sorting. Here it's trivial as we precomputed scores */
3297 if (so1
->u
.score
> so2
->u
.score
) {
3299 } else if (so1
->u
.score
< so2
->u
.score
) {
3305 /* Alphanumeric sorting */
3306 if (server
.sort_bypattern
) {
3307 if (!so1
->u
.cmpobj
|| !so2
->u
.cmpobj
) {
3308 /* At least one compare object is NULL */
3309 if (so1
->u
.cmpobj
== so2
->u
.cmpobj
)
3311 else if (so1
->u
.cmpobj
== NULL
)
3316 /* We have both the objects, use strcoll */
3317 cmp
= strcoll(so1
->u
.cmpobj
->ptr
,so2
->u
.cmpobj
->ptr
);
3320 /* Compare elements directly */
3321 cmp
= strcoll(so1
->obj
->ptr
,so2
->obj
->ptr
);
3324 return server
.sort_desc
? -cmp
: cmp
;
3327 /* The SORT command is the most complex command in Redis. Warning: this code
3328 * is optimized for speed and a bit less for readability */
3329 static void sortCommand(redisClient
*c
) {
3332 int desc
= 0, alpha
= 0;
3333 int limit_start
= 0, limit_count
= -1, start
, end
;
3334 int j
, dontsort
= 0, vectorlen
;
3335 int getop
= 0; /* GET operation counter */
3336 robj
*sortval
, *sortby
= NULL
;
3337 redisSortObject
*vector
; /* Resulting vector to sort */
3339 /* Lookup the key to sort. It must be of the right types */
3340 sortval
= lookupKeyRead(c
->db
,c
->argv
[1]);
3341 if (sortval
== NULL
) {
3342 addReply(c
,shared
.nokeyerr
);
3345 if (sortval
->type
!= REDIS_SET
&& sortval
->type
!= REDIS_LIST
) {
3346 addReply(c
,shared
.wrongtypeerr
);
3350 /* Create a list of operations to perform for every sorted element.
3351 * Operations can be GET/DEL/INCR/DECR */
3352 operations
= listCreate();
3353 listSetFreeMethod(operations
,zfree
);
3356 /* Now we need to protect sortval incrementing its count, in the future
3357 * SORT may have options able to overwrite/delete keys during the sorting
3358 * and the sorted key itself may get destroied */
3359 incrRefCount(sortval
);
3361 /* The SORT command has an SQL-alike syntax, parse it */
3362 while(j
< c
->argc
) {
3363 int leftargs
= c
->argc
-j
-1;
3364 if (!strcasecmp(c
->argv
[j
]->ptr
,"asc")) {
3366 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"desc")) {
3368 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"alpha")) {
3370 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"limit") && leftargs
>= 2) {
3371 limit_start
= atoi(c
->argv
[j
+1]->ptr
);
3372 limit_count
= atoi(c
->argv
[j
+2]->ptr
);
3374 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"by") && leftargs
>= 1) {
3375 sortby
= c
->argv
[j
+1];
3376 /* If the BY pattern does not contain '*', i.e. it is constant,
3377 * we don't need to sort nor to lookup the weight keys. */
3378 if (strchr(c
->argv
[j
+1]->ptr
,'*') == NULL
) dontsort
= 1;
3380 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"get") && leftargs
>= 1) {
3381 listAddNodeTail(operations
,createSortOperation(
3382 REDIS_SORT_GET
,c
->argv
[j
+1]));
3385 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"del") && leftargs
>= 1) {
3386 listAddNodeTail(operations
,createSortOperation(
3387 REDIS_SORT_DEL
,c
->argv
[j
+1]));
3389 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"incr") && leftargs
>= 1) {
3390 listAddNodeTail(operations
,createSortOperation(
3391 REDIS_SORT_INCR
,c
->argv
[j
+1]));
3393 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"get") && leftargs
>= 1) {
3394 listAddNodeTail(operations
,createSortOperation(
3395 REDIS_SORT_DECR
,c
->argv
[j
+1]));
3398 decrRefCount(sortval
);
3399 listRelease(operations
);
3400 addReply(c
,shared
.syntaxerr
);
3406 /* Load the sorting vector with all the objects to sort */
3407 vectorlen
= (sortval
->type
== REDIS_LIST
) ?
3408 listLength((list
*)sortval
->ptr
) :
3409 dictSize((dict
*)sortval
->ptr
);
3410 vector
= zmalloc(sizeof(redisSortObject
)*vectorlen
);
3411 if (!vector
) oom("allocating objects vector for SORT");
3413 if (sortval
->type
== REDIS_LIST
) {
3414 list
*list
= sortval
->ptr
;
3418 while((ln
= listYield(list
))) {
3419 robj
*ele
= ln
->value
;
3420 vector
[j
].obj
= ele
;
3421 vector
[j
].u
.score
= 0;
3422 vector
[j
].u
.cmpobj
= NULL
;
3426 dict
*set
= sortval
->ptr
;
3430 di
= dictGetIterator(set
);
3431 if (!di
) oom("dictGetIterator");
3432 while((setele
= dictNext(di
)) != NULL
) {
3433 vector
[j
].obj
= dictGetEntryKey(setele
);
3434 vector
[j
].u
.score
= 0;
3435 vector
[j
].u
.cmpobj
= NULL
;
3438 dictReleaseIterator(di
);
3440 assert(j
== vectorlen
);
3442 /* Now it's time to load the right scores in the sorting vector */
3443 if (dontsort
== 0) {
3444 for (j
= 0; j
< vectorlen
; j
++) {
3448 byval
= lookupKeyByPattern(c
->db
,sortby
,vector
[j
].obj
);
3449 if (!byval
|| byval
->type
!= REDIS_STRING
) continue;
3451 vector
[j
].u
.cmpobj
= byval
;
3452 incrRefCount(byval
);
3454 vector
[j
].u
.score
= strtod(byval
->ptr
,NULL
);
3457 if (!alpha
) vector
[j
].u
.score
= strtod(vector
[j
].obj
->ptr
,NULL
);
3462 /* We are ready to sort the vector... perform a bit of sanity check
3463 * on the LIMIT option too. We'll use a partial version of quicksort. */
3464 start
= (limit_start
< 0) ? 0 : limit_start
;
3465 end
= (limit_count
< 0) ? vectorlen
-1 : start
+limit_count
-1;
3466 if (start
>= vectorlen
) {
3467 start
= vectorlen
-1;
3470 if (end
>= vectorlen
) end
= vectorlen
-1;
3472 if (dontsort
== 0) {
3473 server
.sort_desc
= desc
;
3474 server
.sort_alpha
= alpha
;
3475 server
.sort_bypattern
= sortby
? 1 : 0;
3476 if (sortby
&& (start
!= 0 || end
!= vectorlen
-1))
3477 pqsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
, start
,end
);
3479 qsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
);
3482 /* Send command output to the output buffer, performing the specified
3483 * GET/DEL/INCR/DECR operations if any. */
3484 outputlen
= getop
? getop
*(end
-start
+1) : end
-start
+1;
3485 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",outputlen
));
3486 for (j
= start
; j
<= end
; j
++) {
3489 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",
3490 sdslen(vector
[j
].obj
->ptr
)));
3491 addReply(c
,vector
[j
].obj
);
3492 addReply(c
,shared
.crlf
);
3494 listRewind(operations
);
3495 while((ln
= listYield(operations
))) {
3496 redisSortOperation
*sop
= ln
->value
;
3497 robj
*val
= lookupKeyByPattern(c
->db
,sop
->pattern
,
3500 if (sop
->type
== REDIS_SORT_GET
) {
3501 if (!val
|| val
->type
!= REDIS_STRING
) {
3502 addReply(c
,shared
.nullbulk
);
3504 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",
3507 addReply(c
,shared
.crlf
);
3509 } else if (sop
->type
== REDIS_SORT_DEL
) {
3516 decrRefCount(sortval
);
3517 listRelease(operations
);
3518 for (j
= 0; j
< vectorlen
; j
++) {
3519 if (sortby
&& alpha
&& vector
[j
].u
.cmpobj
)
3520 decrRefCount(vector
[j
].u
.cmpobj
);
3525 static void infoCommand(redisClient
*c
) {
3527 time_t uptime
= time(NULL
)-server
.stat_starttime
;
3529 info
= sdscatprintf(sdsempty(),
3530 "redis_version:%s\r\n"
3531 "uptime_in_seconds:%d\r\n"
3532 "uptime_in_days:%d\r\n"
3533 "connected_clients:%d\r\n"
3534 "connected_slaves:%d\r\n"
3535 "used_memory:%zu\r\n"
3536 "changes_since_last_save:%lld\r\n"
3537 "bgsave_in_progress:%d\r\n"
3538 "last_save_time:%d\r\n"
3539 "total_connections_received:%lld\r\n"
3540 "total_commands_processed:%lld\r\n"
3545 listLength(server
.clients
)-listLength(server
.slaves
),
3546 listLength(server
.slaves
),
3549 server
.bgsaveinprogress
,
3551 server
.stat_numconnections
,
3552 server
.stat_numcommands
,
3553 server
.masterhost
== NULL
? "master" : "slave"
3555 if (server
.masterhost
) {
3556 info
= sdscatprintf(info
,
3557 "master_host:%s\r\n"
3558 "master_port:%d\r\n"
3559 "master_link_status:%s\r\n"
3560 "master_last_io_seconds_ago:%d\r\n"
3563 (server
.replstate
== REDIS_REPL_CONNECTED
) ?
3565 (int)(time(NULL
)-server
.master
->lastinteraction
)
3568 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info
)));
3569 addReplySds(c
,info
);
3570 addReply(c
,shared
.crlf
);
3573 static void monitorCommand(redisClient
*c
) {
3574 /* ignore MONITOR if aleady slave or in monitor mode */
3575 if (c
->flags
& REDIS_SLAVE
) return;
3577 c
->flags
|= (REDIS_SLAVE
|REDIS_MONITOR
);
3579 if (!listAddNodeTail(server
.monitors
,c
)) oom("listAddNodeTail");
3580 addReply(c
,shared
.ok
);
3583 /* ================================= Expire ================================= */
3584 static int removeExpire(redisDb
*db
, robj
*key
) {
3585 if (dictDelete(db
->expires
,key
) == DICT_OK
) {
3592 static int setExpire(redisDb
*db
, robj
*key
, time_t when
) {
3593 if (dictAdd(db
->expires
,key
,(void*)when
) == DICT_ERR
) {
3601 /* Return the expire time of the specified key, or -1 if no expire
3602 * is associated with this key (i.e. the key is non volatile) */
3603 static time_t getExpire(redisDb
*db
, robj
*key
) {
3606 /* No expire? return ASAP */
3607 if (dictSize(db
->expires
) == 0 ||
3608 (de
= dictFind(db
->expires
,key
)) == NULL
) return -1;
3610 return (time_t) dictGetEntryVal(de
);
3613 static int expireIfNeeded(redisDb
*db
, robj
*key
) {
3617 /* No expire? return ASAP */
3618 if (dictSize(db
->expires
) == 0 ||
3619 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
3621 /* Lookup the expire */
3622 when
= (time_t) dictGetEntryVal(de
);
3623 if (time(NULL
) <= when
) return 0;
3625 /* Delete the key */
3626 dictDelete(db
->expires
,key
);
3627 return dictDelete(db
->dict
,key
) == DICT_OK
;
3630 static int deleteIfVolatile(redisDb
*db
, robj
*key
) {
3633 /* No expire? return ASAP */
3634 if (dictSize(db
->expires
) == 0 ||
3635 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
3637 /* Delete the key */
3639 dictDelete(db
->expires
,key
);
3640 return dictDelete(db
->dict
,key
) == DICT_OK
;
3643 static void expireCommand(redisClient
*c
) {
3645 int seconds
= atoi(c
->argv
[2]->ptr
);
3647 de
= dictFind(c
->db
->dict
,c
->argv
[1]);
3649 addReply(c
,shared
.czero
);
3653 addReply(c
, shared
.czero
);
3656 time_t when
= time(NULL
)+seconds
;
3657 if (setExpire(c
->db
,c
->argv
[1],when
))
3658 addReply(c
,shared
.cone
);
3660 addReply(c
,shared
.czero
);
3665 static void ttlCommand(redisClient
*c
) {
3669 expire
= getExpire(c
->db
,c
->argv
[1]);
3671 ttl
= (int) (expire
-time(NULL
));
3672 if (ttl
< 0) ttl
= -1;
3674 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",ttl
));
3677 /* =============================== Replication ============================= */
3679 static int syncWrite(int fd
, char *ptr
, ssize_t size
, int timeout
) {
3680 ssize_t nwritten
, ret
= size
;
3681 time_t start
= time(NULL
);
3685 if (aeWait(fd
,AE_WRITABLE
,1000) & AE_WRITABLE
) {
3686 nwritten
= write(fd
,ptr
,size
);
3687 if (nwritten
== -1) return -1;
3691 if ((time(NULL
)-start
) > timeout
) {
3699 static int syncRead(int fd
, char *ptr
, ssize_t size
, int timeout
) {
3700 ssize_t nread
, totread
= 0;
3701 time_t start
= time(NULL
);
3705 if (aeWait(fd
,AE_READABLE
,1000) & AE_READABLE
) {
3706 nread
= read(fd
,ptr
,size
);
3707 if (nread
== -1) return -1;
3712 if ((time(NULL
)-start
) > timeout
) {
3720 static int syncReadLine(int fd
, char *ptr
, ssize_t size
, int timeout
) {
3727 if (syncRead(fd
,&c
,1,timeout
) == -1) return -1;
3730 if (nread
&& *(ptr
-1) == '\r') *(ptr
-1) = '\0';
3741 static void syncCommand(redisClient
*c
) {
3742 /* ignore SYNC if aleady slave or in monitor mode */
3743 if (c
->flags
& REDIS_SLAVE
) return;
3745 /* SYNC can't be issued when the server has pending data to send to
3746 * the client about already issued commands. We need a fresh reply
3747 * buffer registering the differences between the BGSAVE and the current
3748 * dataset, so that we can copy to other slaves if needed. */
3749 if (listLength(c
->reply
) != 0) {
3750 addReplySds(c
,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
3754 redisLog(REDIS_NOTICE
,"Slave ask for synchronization");
3755 /* Here we need to check if there is a background saving operation
3756 * in progress, or if it is required to start one */
3757 if (server
.bgsaveinprogress
) {
3758 /* Ok a background save is in progress. Let's check if it is a good
3759 * one for replication, i.e. if there is another slave that is
3760 * registering differences since the server forked to save */
3764 listRewind(server
.slaves
);
3765 while((ln
= listYield(server
.slaves
))) {
3767 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) break;
3770 /* Perfect, the server is already registering differences for
3771 * another slave. Set the right state, and copy the buffer. */
3772 listRelease(c
->reply
);
3773 c
->reply
= listDup(slave
->reply
);
3774 if (!c
->reply
) oom("listDup copying slave reply list");
3775 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
3776 redisLog(REDIS_NOTICE
,"Waiting for end of BGSAVE for SYNC");
3778 /* No way, we need to wait for the next BGSAVE in order to
3779 * register differences */
3780 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_START
;
3781 redisLog(REDIS_NOTICE
,"Waiting for next BGSAVE for SYNC");
3784 /* Ok we don't have a BGSAVE in progress, let's start one */
3785 redisLog(REDIS_NOTICE
,"Starting BGSAVE for SYNC");
3786 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
3787 redisLog(REDIS_NOTICE
,"Replication failed, can't BGSAVE");
3788 addReplySds(c
,sdsnew("-ERR Unalbe to perform background save\r\n"));
3791 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
3794 c
->flags
|= REDIS_SLAVE
;
3796 if (!listAddNodeTail(server
.slaves
,c
)) oom("listAddNodeTail");
3800 static void sendBulkToSlave(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
3801 redisClient
*slave
= privdata
;
3803 REDIS_NOTUSED(mask
);
3804 char buf
[REDIS_IOBUF_LEN
];
3805 ssize_t nwritten
, buflen
;
3807 if (slave
->repldboff
== 0) {
3808 /* Write the bulk write count before to transfer the DB. In theory here
3809 * we don't know how much room there is in the output buffer of the
3810 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
3811 * operations) will never be smaller than the few bytes we need. */
3814 bulkcount
= sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
3816 if (write(fd
,bulkcount
,sdslen(bulkcount
)) != (signed)sdslen(bulkcount
))
3824 lseek(slave
->repldbfd
,slave
->repldboff
,SEEK_SET
);
3825 buflen
= read(slave
->repldbfd
,buf
,REDIS_IOBUF_LEN
);
3827 redisLog(REDIS_WARNING
,"Read error sending DB to slave: %s",
3828 (buflen
== 0) ? "premature EOF" : strerror(errno
));
3832 if ((nwritten
= write(fd
,buf
,buflen
)) == -1) {
3833 redisLog(REDIS_DEBUG
,"Write error sending DB to slave: %s",
3838 slave
->repldboff
+= nwritten
;
3839 if (slave
->repldboff
== slave
->repldbsize
) {
3840 close(slave
->repldbfd
);
3841 slave
->repldbfd
= -1;
3842 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
3843 slave
->replstate
= REDIS_REPL_ONLINE
;
3844 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
,
3845 sendReplyToClient
, slave
, NULL
) == AE_ERR
) {
3849 addReplySds(slave
,sdsempty());
3850 redisLog(REDIS_NOTICE
,"Synchronization with slave succeeded");
3854 static void updateSalvesWaitingBgsave(int bgsaveerr
) {
3856 int startbgsave
= 0;
3858 listRewind(server
.slaves
);
3859 while((ln
= listYield(server
.slaves
))) {
3860 redisClient
*slave
= ln
->value
;
3862 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) {
3864 slave
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
3865 } else if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) {
3868 if (bgsaveerr
!= REDIS_OK
) {
3870 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE child returned an error");
3873 if ((slave
->repldbfd
= open(server
.dbfilename
,O_RDONLY
)) == -1 ||
3874 fstat(slave
->repldbfd
,&buf
) == -1) {
3876 redisLog(REDIS_WARNING
,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno
));
3879 slave
->repldboff
= 0;
3880 slave
->repldbsize
= buf
.st_size
;
3881 slave
->replstate
= REDIS_REPL_SEND_BULK
;
3882 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
3883 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
, sendBulkToSlave
, slave
, NULL
) == AE_ERR
) {
3890 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
3891 listRewind(server
.slaves
);
3892 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE failed");
3893 while((ln
= listYield(server
.slaves
))) {
3894 redisClient
*slave
= ln
->value
;
3896 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
)
3903 static int syncWithMaster(void) {
3904 char buf
[1024], tmpfile
[256];
3906 int fd
= anetTcpConnect(NULL
,server
.masterhost
,server
.masterport
);
3910 redisLog(REDIS_WARNING
,"Unable to connect to MASTER: %s",
3914 /* Issue the SYNC command */
3915 if (syncWrite(fd
,"SYNC \r\n",7,5) == -1) {
3917 redisLog(REDIS_WARNING
,"I/O error writing to MASTER: %s",
3921 /* Read the bulk write count */
3922 if (syncReadLine(fd
,buf
,1024,3600) == -1) {
3924 redisLog(REDIS_WARNING
,"I/O error reading bulk count from MASTER: %s",
3928 dumpsize
= atoi(buf
+1);
3929 redisLog(REDIS_NOTICE
,"Receiving %d bytes data dump from MASTER",dumpsize
);
3930 /* Read the bulk write data on a temp file */
3931 snprintf(tmpfile
,256,"temp-%d.%ld.rdb",(int)time(NULL
),(long int)random());
3932 dfd
= open(tmpfile
,O_CREAT
|O_WRONLY
,0644);
3935 redisLog(REDIS_WARNING
,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno
));
3939 int nread
, nwritten
;
3941 nread
= read(fd
,buf
,(dumpsize
< 1024)?dumpsize
:1024);
3943 redisLog(REDIS_WARNING
,"I/O error trying to sync with MASTER: %s",
3949 nwritten
= write(dfd
,buf
,nread
);
3950 if (nwritten
== -1) {
3951 redisLog(REDIS_WARNING
,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno
));
3959 if (rename(tmpfile
,server
.dbfilename
) == -1) {
3960 redisLog(REDIS_WARNING
,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno
));
3966 if (rdbLoad(server
.dbfilename
) != REDIS_OK
) {
3967 redisLog(REDIS_WARNING
,"Failed trying to load the MASTER synchronization DB from disk");
3971 server
.master
= createClient(fd
);
3972 server
.master
->flags
|= REDIS_MASTER
;
3973 server
.replstate
= REDIS_REPL_CONNECTED
;
3977 static void slaveofCommand(redisClient
*c
) {
3978 if (!strcasecmp(c
->argv
[1]->ptr
,"no") &&
3979 !strcasecmp(c
->argv
[2]->ptr
,"one")) {
3980 if (server
.masterhost
) {
3981 sdsfree(server
.masterhost
);
3982 server
.masterhost
= NULL
;
3983 if (server
.master
) freeClient(server
.master
);
3984 server
.replstate
= REDIS_REPL_NONE
;
3985 redisLog(REDIS_NOTICE
,"MASTER MODE enabled (user request)");
3988 sdsfree(server
.masterhost
);
3989 server
.masterhost
= sdsdup(c
->argv
[1]->ptr
);
3990 server
.masterport
= atoi(c
->argv
[2]->ptr
);
3991 if (server
.master
) freeClient(server
.master
);
3992 server
.replstate
= REDIS_REPL_CONNECT
;
3993 redisLog(REDIS_NOTICE
,"SLAVE OF %s:%d enabled (user request)",
3994 server
.masterhost
, server
.masterport
);
3996 addReply(c
,shared
.ok
);
3999 /* ============================ Maxmemory directive ======================== */
4001 /* This function gets called when 'maxmemory' is set on the config file to limit
4002 * the max memory used by the server, and we are out of memory.
4003 * This function will try to, in order:
4005 * - Free objects from the free list
4006 * - Try to remove keys with an EXPIRE set
4008 * It is not possible to free enough memory to reach used-memory < maxmemory
4009 * the server will start refusing commands that will enlarge even more the
4012 static void freeMemoryIfNeeded(void) {
4013 while (server
.maxmemory
&& zmalloc_used_memory() > server
.maxmemory
) {
4014 if (listLength(server
.objfreelist
)) {
4017 listNode
*head
= listFirst(server
.objfreelist
);
4018 o
= listNodeValue(head
);
4019 listDelNode(server
.objfreelist
,head
);
4022 int j
, k
, freed
= 0;
4024 for (j
= 0; j
< server
.dbnum
; j
++) {
4026 robj
*minkey
= NULL
;
4027 struct dictEntry
*de
;
4029 if (dictSize(server
.db
[j
].expires
)) {
4031 /* From a sample of three keys drop the one nearest to
4032 * the natural expire */
4033 for (k
= 0; k
< 3; k
++) {
4036 de
= dictGetRandomKey(server
.db
[j
].expires
);
4037 t
= (time_t) dictGetEntryVal(de
);
4038 if (minttl
== -1 || t
< minttl
) {
4039 minkey
= dictGetEntryKey(de
);
4043 deleteKey(server
.db
+j
,minkey
);
4046 if (!freed
) return; /* nothing to free... */
4051 /* =================================== Main! ================================ */
4054 int linuxOvercommitMemoryValue(void) {
4055 FILE *fp
= fopen("/proc/sys/vm/overcommit_memory","r");
4059 if (fgets(buf
,64,fp
) == NULL
) {
4068 void linuxOvercommitMemoryWarning(void) {
4069 if (linuxOvercommitMemoryValue() == 0) {
4070 redisLog(REDIS_WARNING
,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'echo 1 > /proc/sys/vm/overcommit_memory' in your init scripts.");
4073 #endif /* __linux__ */
4075 static void daemonize(void) {
4079 if (fork() != 0) exit(0); /* parent exits */
4080 setsid(); /* create a new session */
4082 /* Every output goes to /dev/null. If Redis is daemonized but
4083 * the 'logfile' is set to 'stdout' in the configuration file
4084 * it will not log at all. */
4085 if ((fd
= open("/dev/null", O_RDWR
, 0)) != -1) {
4086 dup2(fd
, STDIN_FILENO
);
4087 dup2(fd
, STDOUT_FILENO
);
4088 dup2(fd
, STDERR_FILENO
);
4089 if (fd
> STDERR_FILENO
) close(fd
);
4091 /* Try to write the pid file */
4092 fp
= fopen(server
.pidfile
,"w");
4094 fprintf(fp
,"%d\n",getpid());
4099 int main(int argc
, char **argv
) {
4101 linuxOvercommitMemoryWarning();
4106 ResetServerSaveParams();
4107 loadServerConfig(argv
[1]);
4108 } else if (argc
> 2) {
4109 fprintf(stderr
,"Usage: ./redis-server [/path/to/redis.conf]\n");
4112 redisLog(REDIS_WARNING
,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
4115 if (server
.daemonize
) daemonize();
4116 redisLog(REDIS_NOTICE
,"Server started, Redis version " REDIS_VERSION
);
4117 if (rdbLoad(server
.dbfilename
) == REDIS_OK
)
4118 redisLog(REDIS_NOTICE
,"DB loaded from disk");
4119 if (aeCreateFileEvent(server
.el
, server
.fd
, AE_READABLE
,
4120 acceptHandler
, NULL
, NULL
) == AE_ERR
) oom("creating file event");
4121 redisLog(REDIS_NOTICE
,"The server is now ready to accept connections on port %d", server
.port
);
4123 aeDeleteEventLoop(server
.el
);