2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
30 #define REDIS_VERSION "0.101"
39 #define __USE_POSIX199309
49 #include <arpa/inet.h>
53 #include <sys/resource.h>
58 #include "ae.h" /* Event driven programming library */
59 #include "sds.h" /* Dynamic safe strings */
60 #include "anet.h" /* Networking the easy way */
61 #include "dict.h" /* Hash tables */
62 #include "adlist.h" /* Linked lists */
63 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
64 #include "lzf.h" /* LZF compression library */
65 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
71 /* Static server configuration */
72 #define REDIS_SERVERPORT 6379 /* TCP port */
73 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
74 #define REDIS_IOBUF_LEN 1024
75 #define REDIS_LOADBUF_LEN 1024
76 #define REDIS_STATIC_ARGS 4
77 #define REDIS_DEFAULT_DBNUM 16
78 #define REDIS_CONFIGLINE_MAX 1024
79 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
80 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
81 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
83 /* Hash table parameters */
84 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
85 #define REDIS_HT_MINSLOTS 16384 /* Never resize the HT under this */
88 #define REDIS_CMD_BULK 1 /* Bulk write command */
89 #define REDIS_CMD_INLINE 2 /* Inline command */
90 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
91 this flags will return an error when the 'maxmemory' option is set in the
92 config file and the server is using more than maxmemory bytes of memory.
93 In short this commands are denied on low memory conditions. */
94 #define REDIS_CMD_DENYOOM 4
97 #define REDIS_STRING 0
102 /* Object types only used for dumping to disk */
103 #define REDIS_EXPIRETIME 253
104 #define REDIS_SELECTDB 254
105 #define REDIS_EOF 255
107 /* Defines related to the dump file format. To store 32 bits lengths for short
108 * keys requires a lot of space, so we check the most significant 2 bits of
109 * the first byte to interpreter the length:
111 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
112 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
113 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
114 * 11|000000 this means: specially encoded object will follow. The six bits
115 * number specify the kind of object that follows.
116 * See the REDIS_RDB_ENC_* defines.
118 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
119 * values, will fit inside. */
120 #define REDIS_RDB_6BITLEN 0
121 #define REDIS_RDB_14BITLEN 1
122 #define REDIS_RDB_32BITLEN 2
123 #define REDIS_RDB_ENCVAL 3
124 #define REDIS_RDB_LENERR UINT_MAX
126 /* When a length of a string object stored on disk has the first two bits
127 * set, the remaining two bits specify a special encoding for the object
128 * accordingly to the following defines: */
129 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
130 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
131 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
132 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
135 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
136 #define REDIS_SLAVE 2 /* This client is a slave server */
137 #define REDIS_MASTER 4 /* This client is a master server */
138 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
140 /* Slave replication state - slave side */
141 #define REDIS_REPL_NONE 0 /* No active replication */
142 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
143 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
145 /* Slave replication state - from the point of view of master
146 * Note that in SEND_BULK and ONLINE state the slave receives new updates
147 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
148 * to start the next background saving in order to send updates to it. */
149 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
150 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
151 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
152 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
154 /* List related stuff */
158 /* Sort operations */
159 #define REDIS_SORT_GET 0
160 #define REDIS_SORT_DEL 1
161 #define REDIS_SORT_INCR 2
162 #define REDIS_SORT_DECR 3
163 #define REDIS_SORT_ASC 4
164 #define REDIS_SORT_DESC 5
165 #define REDIS_SORTKEY_MAX 1024
168 #define REDIS_DEBUG 0
169 #define REDIS_NOTICE 1
170 #define REDIS_WARNING 2
172 /* Anti-warning macro... */
173 #define REDIS_NOTUSED(V) ((void) V)
176 sprintf(err
, "gonner");
181 /*================================= Data types ============================== */
183 /* A redis object, that is a type able to hold a string / list / set */
184 typedef struct redisObject
{
190 typedef struct redisDb
{
196 /* With multiplexing we need to take per-clinet state.
197 * Clients are taken in a liked list. */
198 typedef struct redisClient
{
205 int bulklen
; /* bulk read len. -1 if not in bulk read mode */
208 time_t lastinteraction
; /* time of the last interaction, used for timeout */
209 int flags
; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
210 int slaveseldb
; /* slave selected db, if this client is a slave */
211 int authenticated
; /* when requirepass is non-NULL */
212 int replstate
; /* replication state if this is a slave */
213 int repldbfd
; /* replication DB file descriptor */
214 long repldboff
; /* replication DB file offset */
215 off_t repldbsize
; /* replication DB file size */
223 /* Global server state structure */
229 unsigned int sharingpoolsize
;
230 long long dirty
; /* changes to DB from the last save */
232 list
*slaves
, *monitors
;
233 char neterr
[ANET_ERR_LEN
];
235 int cronloops
; /* number of times the cron function run */
236 list
*objfreelist
; /* A list of freed objects to avoid malloc() */
237 time_t lastsave
; /* Unix time of last save succeeede */
238 size_t usedmemory
; /* Used memory in megabytes */
239 /* Fields used only for stats */
240 time_t stat_starttime
; /* server start time */
241 long long stat_numcommands
; /* number of processed commands */
242 long long stat_numconnections
; /* number of connections received */
250 int bgsaveinprogress
;
251 struct saveparam
*saveparams
;
258 /* Replication related */
262 redisClient
*master
; /* client that is master for this slave */
264 unsigned int maxclients
;
265 unsigned int maxmemory
;
266 /* Sort parameters - qsort_r() is only available under BSD so we
267 * have to take this state global, in order to pass it to sortCompare() */
273 typedef void redisCommandProc(redisClient
*c
);
274 struct redisCommand
{
276 redisCommandProc
*proc
;
281 typedef struct _redisSortObject
{
289 typedef struct _redisSortOperation
{
292 } redisSortOperation
;
294 struct sharedObjectsStruct
{
295 robj
*crlf
, *ok
, *err
, *emptybulk
, *czero
, *cone
, *pong
, *space
,
296 *colon
, *nullbulk
, *nullmultibulk
,
297 *emptymultibulk
, *wrongtypeerr
, *nokeyerr
, *syntaxerr
, *sameobjecterr
,
298 *outofrangeerr
, *plus
,
299 *select0
, *select1
, *select2
, *select3
, *select4
,
300 *select5
, *select6
, *select7
, *select8
, *select9
;
303 /*================================ Prototypes =============================== */
305 static void freeStringObject(robj
*o
);
306 static void freeListObject(robj
*o
);
307 static void freeSetObject(robj
*o
);
308 static void decrRefCount(void *o
);
309 static robj
*createObject(int type
, void *ptr
);
310 static void freeClient(redisClient
*c
);
311 static int rdbLoad(char *filename
);
312 static void addReply(redisClient
*c
, robj
*obj
);
313 static void addReplySds(redisClient
*c
, sds s
);
314 static void incrRefCount(robj
*o
);
315 static int rdbSaveBackground(char *filename
);
316 static robj
*createStringObject(char *ptr
, size_t len
);
317 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
);
318 static int syncWithMaster(void);
319 static robj
*tryObjectSharing(robj
*o
);
320 static int removeExpire(redisDb
*db
, robj
*key
);
321 static int expireIfNeeded(redisDb
*db
, robj
*key
);
322 static int deleteIfVolatile(redisDb
*db
, robj
*key
);
323 static int deleteKey(redisDb
*db
, robj
*key
);
324 static time_t getExpire(redisDb
*db
, robj
*key
);
325 static int setExpire(redisDb
*db
, robj
*key
, time_t when
);
326 static void updateSalvesWaitingBgsave(int bgsaveerr
);
327 static void freeMemoryIfNeeded(void);
329 static void authCommand(redisClient
*c
);
330 static void pingCommand(redisClient
*c
);
331 static void echoCommand(redisClient
*c
);
332 static void setCommand(redisClient
*c
);
333 static void setnxCommand(redisClient
*c
);
334 static void getCommand(redisClient
*c
);
335 static void delCommand(redisClient
*c
);
336 static void existsCommand(redisClient
*c
);
337 static void incrCommand(redisClient
*c
);
338 static void decrCommand(redisClient
*c
);
339 static void incrbyCommand(redisClient
*c
);
340 static void decrbyCommand(redisClient
*c
);
341 static void selectCommand(redisClient
*c
);
342 static void randomkeyCommand(redisClient
*c
);
343 static void keysCommand(redisClient
*c
);
344 static void dbsizeCommand(redisClient
*c
);
345 static void lastsaveCommand(redisClient
*c
);
346 static void saveCommand(redisClient
*c
);
347 static void bgsaveCommand(redisClient
*c
);
348 static void shutdownCommand(redisClient
*c
);
349 static void moveCommand(redisClient
*c
);
350 static void renameCommand(redisClient
*c
);
351 static void renamenxCommand(redisClient
*c
);
352 static void lpushCommand(redisClient
*c
);
353 static void rpushCommand(redisClient
*c
);
354 static void lpopCommand(redisClient
*c
);
355 static void rpopCommand(redisClient
*c
);
356 static void llenCommand(redisClient
*c
);
357 static void lindexCommand(redisClient
*c
);
358 static void lrangeCommand(redisClient
*c
);
359 static void ltrimCommand(redisClient
*c
);
360 static void typeCommand(redisClient
*c
);
361 static void lsetCommand(redisClient
*c
);
362 static void saddCommand(redisClient
*c
);
363 static void sremCommand(redisClient
*c
);
364 static void smoveCommand(redisClient
*c
);
365 static void sismemberCommand(redisClient
*c
);
366 static void scardCommand(redisClient
*c
);
367 static void sinterCommand(redisClient
*c
);
368 static void sinterstoreCommand(redisClient
*c
);
369 static void sunionCommand(redisClient
*c
);
370 static void sunionstoreCommand(redisClient
*c
);
371 static void sdiffCommand(redisClient
*c
);
372 static void sdiffstoreCommand(redisClient
*c
);
373 static void syncCommand(redisClient
*c
);
374 static void flushdbCommand(redisClient
*c
);
375 static void flushallCommand(redisClient
*c
);
376 static void sortCommand(redisClient
*c
);
377 static void lremCommand(redisClient
*c
);
378 static void infoCommand(redisClient
*c
);
379 static void mgetCommand(redisClient
*c
);
380 static void monitorCommand(redisClient
*c
);
381 static void expireCommand(redisClient
*c
);
382 static void getSetCommand(redisClient
*c
);
383 static void ttlCommand(redisClient
*c
);
384 static void slaveofCommand(redisClient
*c
);
385 static void debugCommand(redisClient
*c
);
387 /*================================= Globals ================================= */
390 static struct redisServer server
; /* server global state */
391 static struct redisCommand cmdTable
[] = {
392 {"get",getCommand
,2,REDIS_CMD_INLINE
},
393 {"set",setCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
394 {"setnx",setnxCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
395 {"del",delCommand
,-2,REDIS_CMD_INLINE
},
396 {"exists",existsCommand
,2,REDIS_CMD_INLINE
},
397 {"incr",incrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
398 {"decr",decrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
399 {"mget",mgetCommand
,-2,REDIS_CMD_INLINE
},
400 {"rpush",rpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
401 {"lpush",lpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
402 {"rpop",rpopCommand
,2,REDIS_CMD_INLINE
},
403 {"lpop",lpopCommand
,2,REDIS_CMD_INLINE
},
404 {"llen",llenCommand
,2,REDIS_CMD_INLINE
},
405 {"lindex",lindexCommand
,3,REDIS_CMD_INLINE
},
406 {"lset",lsetCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
407 {"lrange",lrangeCommand
,4,REDIS_CMD_INLINE
},
408 {"ltrim",ltrimCommand
,4,REDIS_CMD_INLINE
},
409 {"lrem",lremCommand
,4,REDIS_CMD_BULK
},
410 {"sadd",saddCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
411 {"srem",sremCommand
,3,REDIS_CMD_BULK
},
412 {"smove",smoveCommand
,4,REDIS_CMD_BULK
},
413 {"sismember",sismemberCommand
,3,REDIS_CMD_BULK
},
414 {"scard",scardCommand
,2,REDIS_CMD_INLINE
},
415 {"sinter",sinterCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
416 {"sinterstore",sinterstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
417 {"sunion",sunionCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
418 {"sunionstore",sunionstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
419 {"sdiff",sdiffCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
420 {"sdiffstore",sdiffstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
421 {"smembers",sinterCommand
,2,REDIS_CMD_INLINE
},
422 {"incrby",incrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
423 {"decrby",decrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
424 {"getset",getSetCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
425 {"randomkey",randomkeyCommand
,1,REDIS_CMD_INLINE
},
426 {"select",selectCommand
,2,REDIS_CMD_INLINE
},
427 {"move",moveCommand
,3,REDIS_CMD_INLINE
},
428 {"rename",renameCommand
,3,REDIS_CMD_INLINE
},
429 {"renamenx",renamenxCommand
,3,REDIS_CMD_INLINE
},
430 {"expire",expireCommand
,3,REDIS_CMD_INLINE
},
431 {"keys",keysCommand
,2,REDIS_CMD_INLINE
},
432 {"dbsize",dbsizeCommand
,1,REDIS_CMD_INLINE
},
433 {"auth",authCommand
,2,REDIS_CMD_INLINE
},
434 {"ping",pingCommand
,1,REDIS_CMD_INLINE
},
435 {"echo",echoCommand
,2,REDIS_CMD_BULK
},
436 {"save",saveCommand
,1,REDIS_CMD_INLINE
},
437 {"bgsave",bgsaveCommand
,1,REDIS_CMD_INLINE
},
438 {"shutdown",shutdownCommand
,1,REDIS_CMD_INLINE
},
439 {"lastsave",lastsaveCommand
,1,REDIS_CMD_INLINE
},
440 {"type",typeCommand
,2,REDIS_CMD_INLINE
},
441 {"sync",syncCommand
,1,REDIS_CMD_INLINE
},
442 {"flushdb",flushdbCommand
,1,REDIS_CMD_INLINE
},
443 {"flushall",flushallCommand
,1,REDIS_CMD_INLINE
},
444 {"sort",sortCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
445 {"info",infoCommand
,1,REDIS_CMD_INLINE
},
446 {"monitor",monitorCommand
,1,REDIS_CMD_INLINE
},
447 {"ttl",ttlCommand
,2,REDIS_CMD_INLINE
},
448 {"slaveof",slaveofCommand
,3,REDIS_CMD_INLINE
},
449 {"debug",debugCommand
,-2,REDIS_CMD_INLINE
},
453 /*============================ Utility functions ============================ */
455 /* Glob-style pattern matching. */
456 int stringmatchlen(const char *pattern
, int patternLen
,
457 const char *string
, int stringLen
, int nocase
)
462 while (pattern
[1] == '*') {
467 return 1; /* match */
469 if (stringmatchlen(pattern
+1, patternLen
-1,
470 string
, stringLen
, nocase
))
471 return 1; /* match */
475 return 0; /* no match */
479 return 0; /* no match */
489 not = pattern
[0] == '^';
496 if (pattern
[0] == '\\') {
499 if (pattern
[0] == string
[0])
501 } else if (pattern
[0] == ']') {
503 } else if (patternLen
== 0) {
507 } else if (pattern
[1] == '-' && patternLen
>= 3) {
508 int start
= pattern
[0];
509 int end
= pattern
[2];
517 start
= tolower(start
);
523 if (c
>= start
&& c
<= end
)
527 if (pattern
[0] == string
[0])
530 if (tolower((int)pattern
[0]) == tolower((int)string
[0]))
540 return 0; /* no match */
546 if (patternLen
>= 2) {
553 if (pattern
[0] != string
[0])
554 return 0; /* no match */
556 if (tolower((int)pattern
[0]) != tolower((int)string
[0]))
557 return 0; /* no match */
565 if (stringLen
== 0) {
566 while(*pattern
== '*') {
573 if (patternLen
== 0 && stringLen
== 0)
578 void redisLog(int level
, const char *fmt
, ...)
583 fp
= (server
.logfile
== NULL
) ? stdout
: fopen(server
.logfile
,"a");
587 if (level
>= server
.verbosity
) {
593 strftime(buf
,64,"%d %b %H:%M:%S",gmtime(&now
));
594 fprintf(fp
,"%s %c ",buf
,c
[level
]);
595 vfprintf(fp
, fmt
, ap
);
601 if (server
.logfile
) fclose(fp
);
604 /*====================== Hash table type implementation ==================== */
606 /* This is an hash table type that uses the SDS dynamic strings libary as
607 * keys and radis objects as values (objects can hold SDS strings,
610 static int sdsDictKeyCompare(void *privdata
, const void *key1
,
614 DICT_NOTUSED(privdata
);
616 l1
= sdslen((sds
)key1
);
617 l2
= sdslen((sds
)key2
);
618 if (l1
!= l2
) return 0;
619 return memcmp(key1
, key2
, l1
) == 0;
622 static void dictRedisObjectDestructor(void *privdata
, void *val
)
624 DICT_NOTUSED(privdata
);
629 static int dictSdsKeyCompare(void *privdata
, const void *key1
,
632 const robj
*o1
= key1
, *o2
= key2
;
633 return sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
636 static unsigned int dictSdsHash(const void *key
) {
638 return dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
641 static dictType setDictType
= {
642 dictSdsHash
, /* hash function */
645 dictSdsKeyCompare
, /* key compare */
646 dictRedisObjectDestructor
, /* key destructor */
647 NULL
/* val destructor */
650 static dictType hashDictType
= {
651 dictSdsHash
, /* hash function */
654 dictSdsKeyCompare
, /* key compare */
655 dictRedisObjectDestructor
, /* key destructor */
656 dictRedisObjectDestructor
/* val destructor */
659 /* ========================= Random utility functions ======================= */
661 /* Redis generally does not try to recover from out of memory conditions
662 * when allocating objects or strings, it is not clear if it will be possible
663 * to report this condition to the client since the networking layer itself
664 * is based on heap allocation for send buffers, so we simply abort.
665 * At least the code will be simpler to read... */
666 static void oom(const char *msg
) {
667 fprintf(stderr
, "%s: Out of memory\n",msg
);
673 /* ====================== Redis server networking stuff ===================== */
674 void closeTimedoutClients(void) {
677 time_t now
= time(NULL
);
679 listRewind(server
.clients
);
680 while ((ln
= listYield(server
.clients
)) != NULL
) {
681 c
= listNodeValue(ln
);
682 if (!(c
->flags
& REDIS_SLAVE
) && /* no timeout for slaves */
683 !(c
->flags
& REDIS_MASTER
) && /* no timeout for masters */
684 (now
- c
->lastinteraction
> server
.maxidletime
)) {
685 redisLog(REDIS_DEBUG
,"Closing idle client");
691 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
692 * we resize the hash table to save memory */
693 void tryResizeHashTables(void) {
696 for (j
= 0; j
< server
.dbnum
; j
++) {
697 long long size
, used
;
699 size
= dictSlots(server
.db
[j
].dict
);
700 used
= dictSize(server
.db
[j
].dict
);
701 if (size
&& used
&& size
> REDIS_HT_MINSLOTS
&&
702 (used
*100/size
< REDIS_HT_MINFILL
)) {
703 redisLog(REDIS_NOTICE
,"The hash table %d is too sparse, resize it...",j
);
704 dictResize(server
.db
[j
].dict
);
705 redisLog(REDIS_NOTICE
,"Hash table %d resized.",j
);
710 int serverCron(struct aeEventLoop
*eventLoop
, long long id
, void *clientData
) {
711 int j
, loops
= server
.cronloops
++;
712 REDIS_NOTUSED(eventLoop
);
714 REDIS_NOTUSED(clientData
);
716 /* Update the global state with the amount of used memory */
717 server
.usedmemory
= zmalloc_used_memory();
719 /* Show some info about non-empty databases */
720 for (j
= 0; j
< server
.dbnum
; j
++) {
721 long long size
, used
, vkeys
;
723 size
= dictSlots(server
.db
[j
].dict
);
724 used
= dictSize(server
.db
[j
].dict
);
725 vkeys
= dictSize(server
.db
[j
].expires
);
726 if (!(loops
% 5) && used
> 0) {
727 redisLog(REDIS_DEBUG
,"DB %d: %d keys (%d volatile) in %d slots HT.",j
,used
,vkeys
,size
);
728 /* dictPrintStats(server.dict); */
732 /* We don't want to resize the hash tables while a bacground saving
733 * is in progress: the saving child is created using fork() that is
734 * implemented with a copy-on-write semantic in most modern systems, so
735 * if we resize the HT while there is the saving child at work actually
736 * a lot of memory movements in the parent will cause a lot of pages
738 if (!server
.bgsaveinprogress
) tryResizeHashTables();
740 /* Show information about connected clients */
742 redisLog(REDIS_DEBUG
,"%d clients connected (%d slaves), %zu bytes in use",
743 listLength(server
.clients
)-listLength(server
.slaves
),
744 listLength(server
.slaves
),
746 dictSize(server
.sharingpool
));
749 /* Close connections of timedout clients */
750 if (server
.maxidletime
&& !(loops
% 10))
751 closeTimedoutClients();
753 /* Check if a background saving in progress terminated */
754 if (server
.bgsaveinprogress
) {
756 /* XXX: TODO handle the case of the saving child killed */
757 if (wait4(-1,&statloc
,WNOHANG
,NULL
)) {
758 int exitcode
= WEXITSTATUS(statloc
);
760 redisLog(REDIS_NOTICE
,
761 "Background saving terminated with success");
763 server
.lastsave
= time(NULL
);
765 redisLog(REDIS_WARNING
,
766 "Background saving error");
768 server
.bgsaveinprogress
= 0;
769 updateSalvesWaitingBgsave(exitcode
== 0 ? REDIS_OK
: REDIS_ERR
);
772 /* If there is not a background saving in progress check if
773 * we have to save now */
774 time_t now
= time(NULL
);
775 for (j
= 0; j
< server
.saveparamslen
; j
++) {
776 struct saveparam
*sp
= server
.saveparams
+j
;
778 if (server
.dirty
>= sp
->changes
&&
779 now
-server
.lastsave
> sp
->seconds
) {
780 redisLog(REDIS_NOTICE
,"%d changes in %d seconds. Saving...",
781 sp
->changes
, sp
->seconds
);
782 rdbSaveBackground(server
.dbfilename
);
788 /* Try to expire a few timed out keys */
789 for (j
= 0; j
< server
.dbnum
; j
++) {
790 redisDb
*db
= server
.db
+j
;
791 int num
= dictSize(db
->expires
);
794 time_t now
= time(NULL
);
796 if (num
> REDIS_EXPIRELOOKUPS_PER_CRON
)
797 num
= REDIS_EXPIRELOOKUPS_PER_CRON
;
802 if ((de
= dictGetRandomKey(db
->expires
)) == NULL
) break;
803 t
= (time_t) dictGetEntryVal(de
);
805 deleteKey(db
,dictGetEntryKey(de
));
811 /* Check if we should connect to a MASTER */
812 if (server
.replstate
== REDIS_REPL_CONNECT
) {
813 redisLog(REDIS_NOTICE
,"Connecting to MASTER...");
814 if (syncWithMaster() == REDIS_OK
) {
815 redisLog(REDIS_NOTICE
,"MASTER <-> SLAVE sync succeeded");
821 static void createSharedObjects(void) {
822 shared
.crlf
= createObject(REDIS_STRING
,sdsnew("\r\n"));
823 shared
.ok
= createObject(REDIS_STRING
,sdsnew("+OK\r\n"));
824 shared
.err
= createObject(REDIS_STRING
,sdsnew("-ERR\r\n"));
825 shared
.emptybulk
= createObject(REDIS_STRING
,sdsnew("$0\r\n\r\n"));
826 shared
.czero
= createObject(REDIS_STRING
,sdsnew(":0\r\n"));
827 shared
.cone
= createObject(REDIS_STRING
,sdsnew(":1\r\n"));
828 shared
.nullbulk
= createObject(REDIS_STRING
,sdsnew("$-1\r\n"));
829 shared
.nullmultibulk
= createObject(REDIS_STRING
,sdsnew("*-1\r\n"));
830 shared
.emptymultibulk
= createObject(REDIS_STRING
,sdsnew("*0\r\n"));
832 shared
.pong
= createObject(REDIS_STRING
,sdsnew("+PONG\r\n"));
833 shared
.wrongtypeerr
= createObject(REDIS_STRING
,sdsnew(
834 "-ERR Operation against a key holding the wrong kind of value\r\n"));
835 shared
.nokeyerr
= createObject(REDIS_STRING
,sdsnew(
836 "-ERR no such key\r\n"));
837 shared
.syntaxerr
= createObject(REDIS_STRING
,sdsnew(
838 "-ERR syntax error\r\n"));
839 shared
.sameobjecterr
= createObject(REDIS_STRING
,sdsnew(
840 "-ERR source and destination objects are the same\r\n"));
841 shared
.outofrangeerr
= createObject(REDIS_STRING
,sdsnew(
842 "-ERR index out of range\r\n"));
843 shared
.space
= createObject(REDIS_STRING
,sdsnew(" "));
844 shared
.colon
= createObject(REDIS_STRING
,sdsnew(":"));
845 shared
.plus
= createObject(REDIS_STRING
,sdsnew("+"));
846 shared
.select0
= createStringObject("select 0\r\n",10);
847 shared
.select1
= createStringObject("select 1\r\n",10);
848 shared
.select2
= createStringObject("select 2\r\n",10);
849 shared
.select3
= createStringObject("select 3\r\n",10);
850 shared
.select4
= createStringObject("select 4\r\n",10);
851 shared
.select5
= createStringObject("select 5\r\n",10);
852 shared
.select6
= createStringObject("select 6\r\n",10);
853 shared
.select7
= createStringObject("select 7\r\n",10);
854 shared
.select8
= createStringObject("select 8\r\n",10);
855 shared
.select9
= createStringObject("select 9\r\n",10);
858 static void appendServerSaveParams(time_t seconds
, int changes
) {
859 server
.saveparams
= zrealloc(server
.saveparams
,sizeof(struct saveparam
)*(server
.saveparamslen
+1));
860 if (server
.saveparams
== NULL
) oom("appendServerSaveParams");
861 server
.saveparams
[server
.saveparamslen
].seconds
= seconds
;
862 server
.saveparams
[server
.saveparamslen
].changes
= changes
;
863 server
.saveparamslen
++;
866 static void ResetServerSaveParams() {
867 zfree(server
.saveparams
);
868 server
.saveparams
= NULL
;
869 server
.saveparamslen
= 0;
872 static void initServerConfig() {
873 server
.dbnum
= REDIS_DEFAULT_DBNUM
;
874 server
.port
= REDIS_SERVERPORT
;
875 server
.verbosity
= REDIS_DEBUG
;
876 server
.maxidletime
= REDIS_MAXIDLETIME
;
877 server
.saveparams
= NULL
;
878 server
.logfile
= NULL
; /* NULL = log on standard output */
879 server
.bindaddr
= NULL
;
880 server
.glueoutputbuf
= 1;
881 server
.daemonize
= 0;
882 server
.pidfile
= "/var/run/redis.pid";
883 server
.dbfilename
= "dump.rdb";
884 server
.requirepass
= NULL
;
885 server
.shareobjects
= 0;
886 server
.maxclients
= 0;
887 server
.maxmemory
= 0;
888 ResetServerSaveParams();
890 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
891 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
892 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
893 /* Replication related */
895 server
.masterhost
= NULL
;
896 server
.masterport
= 6379;
897 server
.master
= NULL
;
898 server
.replstate
= REDIS_REPL_NONE
;
901 static void initServer() {
904 signal(SIGHUP
, SIG_IGN
);
905 signal(SIGPIPE
, SIG_IGN
);
907 server
.clients
= listCreate();
908 server
.slaves
= listCreate();
909 server
.monitors
= listCreate();
910 server
.objfreelist
= listCreate();
911 createSharedObjects();
912 server
.el
= aeCreateEventLoop();
913 server
.db
= zmalloc(sizeof(redisDb
)*server
.dbnum
);
914 server
.sharingpool
= dictCreate(&setDictType
,NULL
);
915 server
.sharingpoolsize
= 1024;
916 if (!server
.db
|| !server
.clients
|| !server
.slaves
|| !server
.monitors
|| !server
.el
|| !server
.objfreelist
)
917 oom("server initialization"); /* Fatal OOM */
918 server
.fd
= anetTcpServer(server
.neterr
, server
.port
, server
.bindaddr
);
919 if (server
.fd
== -1) {
920 redisLog(REDIS_WARNING
, "Opening TCP port: %s", server
.neterr
);
923 for (j
= 0; j
< server
.dbnum
; j
++) {
924 server
.db
[j
].dict
= dictCreate(&hashDictType
,NULL
);
925 server
.db
[j
].expires
= dictCreate(&setDictType
,NULL
);
928 server
.cronloops
= 0;
929 server
.bgsaveinprogress
= 0;
930 server
.lastsave
= time(NULL
);
932 server
.usedmemory
= 0;
933 server
.stat_numcommands
= 0;
934 server
.stat_numconnections
= 0;
935 server
.stat_starttime
= time(NULL
);
936 aeCreateTimeEvent(server
.el
, 1000, serverCron
, NULL
, NULL
);
939 /* Empty the whole database */
940 static long long emptyDb() {
942 long long removed
= 0;
944 for (j
= 0; j
< server
.dbnum
; j
++) {
945 removed
+= dictSize(server
.db
[j
].dict
);
946 dictEmpty(server
.db
[j
].dict
);
947 dictEmpty(server
.db
[j
].expires
);
952 static int yesnotoi(char *s
) {
953 if (!strcasecmp(s
,"yes")) return 1;
954 else if (!strcasecmp(s
,"no")) return 0;
958 /* I agree, this is a very rudimental way to load a configuration...
959 will improve later if the config gets more complex */
960 static void loadServerConfig(char *filename
) {
961 FILE *fp
= fopen(filename
,"r");
962 char buf
[REDIS_CONFIGLINE_MAX
+1], *err
= NULL
;
967 redisLog(REDIS_WARNING
,"Fatal error, can't open config file");
970 while(fgets(buf
,REDIS_CONFIGLINE_MAX
+1,fp
) != NULL
) {
976 line
= sdstrim(line
," \t\r\n");
978 /* Skip comments and blank lines*/
979 if (line
[0] == '#' || line
[0] == '\0') {
984 /* Split into arguments */
985 argv
= sdssplitlen(line
,sdslen(line
)," ",1,&argc
);
988 /* Execute config directives */
989 if (!strcasecmp(argv
[0],"timeout") && argc
== 2) {
990 server
.maxidletime
= atoi(argv
[1]);
991 if (server
.maxidletime
< 0) {
992 err
= "Invalid timeout value"; goto loaderr
;
994 } else if (!strcasecmp(argv
[0],"port") && argc
== 2) {
995 server
.port
= atoi(argv
[1]);
996 if (server
.port
< 1 || server
.port
> 65535) {
997 err
= "Invalid port"; goto loaderr
;
999 } else if (!strcasecmp(argv
[0],"bind") && argc
== 2) {
1000 server
.bindaddr
= zstrdup(argv
[1]);
1001 } else if (!strcasecmp(argv
[0],"save") && argc
== 3) {
1002 int seconds
= atoi(argv
[1]);
1003 int changes
= atoi(argv
[2]);
1004 if (seconds
< 1 || changes
< 0) {
1005 err
= "Invalid save parameters"; goto loaderr
;
1007 appendServerSaveParams(seconds
,changes
);
1008 } else if (!strcasecmp(argv
[0],"dir") && argc
== 2) {
1009 if (chdir(argv
[1]) == -1) {
1010 redisLog(REDIS_WARNING
,"Can't chdir to '%s': %s",
1011 argv
[1], strerror(errno
));
1014 } else if (!strcasecmp(argv
[0],"loglevel") && argc
== 2) {
1015 if (!strcasecmp(argv
[1],"debug")) server
.verbosity
= REDIS_DEBUG
;
1016 else if (!strcasecmp(argv
[1],"notice")) server
.verbosity
= REDIS_NOTICE
;
1017 else if (!strcasecmp(argv
[1],"warning")) server
.verbosity
= REDIS_WARNING
;
1019 err
= "Invalid log level. Must be one of debug, notice, warning";
1022 } else if (!strcasecmp(argv
[0],"logfile") && argc
== 2) {
1025 server
.logfile
= zstrdup(argv
[1]);
1026 if (!strcasecmp(server
.logfile
,"stdout")) {
1027 zfree(server
.logfile
);
1028 server
.logfile
= NULL
;
1030 if (server
.logfile
) {
1031 /* Test if we are able to open the file. The server will not
1032 * be able to abort just for this problem later... */
1033 fp
= fopen(server
.logfile
,"a");
1035 err
= sdscatprintf(sdsempty(),
1036 "Can't open the log file: %s", strerror(errno
));
1041 } else if (!strcasecmp(argv
[0],"databases") && argc
== 2) {
1042 server
.dbnum
= atoi(argv
[1]);
1043 if (server
.dbnum
< 1) {
1044 err
= "Invalid number of databases"; goto loaderr
;
1046 } else if (!strcasecmp(argv
[0],"maxclients") && argc
== 2) {
1047 server
.maxclients
= atoi(argv
[1]);
1048 } else if (!strcasecmp(argv
[0],"maxmemory") && argc
== 2) {
1049 server
.maxmemory
= atoi(argv
[1]);
1050 } else if (!strcasecmp(argv
[0],"slaveof") && argc
== 3) {
1051 server
.masterhost
= sdsnew(argv
[1]);
1052 server
.masterport
= atoi(argv
[2]);
1053 server
.replstate
= REDIS_REPL_CONNECT
;
1054 } else if (!strcasecmp(argv
[0],"glueoutputbuf") && argc
== 2) {
1055 if ((server
.glueoutputbuf
= yesnotoi(argv
[1])) == -1) {
1056 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1058 } else if (!strcasecmp(argv
[0],"shareobjects") && argc
== 2) {
1059 if ((server
.shareobjects
= yesnotoi(argv
[1])) == -1) {
1060 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1062 } else if (!strcasecmp(argv
[0],"shareobjectspoolsize") && argc
== 2) {
1063 server
.sharingpoolsize
= atoi(argv
[1]);
1064 if (server
.sharingpoolsize
< 1) {
1065 err
= "invalid object sharing pool size"; goto loaderr
;
1067 } else if (!strcasecmp(argv
[0],"daemonize") && argc
== 2) {
1068 if ((server
.daemonize
= yesnotoi(argv
[1])) == -1) {
1069 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1071 } else if (!strcasecmp(argv
[0],"requirepass") && argc
== 2) {
1072 server
.requirepass
= zstrdup(argv
[1]);
1073 } else if (!strcasecmp(argv
[0],"pidfile") && argc
== 2) {
1074 server
.pidfile
= zstrdup(argv
[1]);
1075 } else if (!strcasecmp(argv
[0],"dbfilename") && argc
== 2) {
1076 server
.dbfilename
= zstrdup(argv
[1]);
1078 err
= "Bad directive or wrong number of arguments"; goto loaderr
;
1080 for (j
= 0; j
< argc
; j
++)
1089 fprintf(stderr
, "\n*** FATAL CONFIG FILE ERROR ***\n");
1090 fprintf(stderr
, "Reading the configuration file, at line %d\n", linenum
);
1091 fprintf(stderr
, ">>> '%s'\n", line
);
1092 fprintf(stderr
, "%s\n", err
);
1096 static void freeClientArgv(redisClient
*c
) {
1099 for (j
= 0; j
< c
->argc
; j
++)
1100 decrRefCount(c
->argv
[j
]);
1104 static void freeClient(redisClient
*c
) {
1107 aeDeleteFileEvent(server
.el
,c
->fd
,AE_READABLE
);
1108 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1109 sdsfree(c
->querybuf
);
1110 listRelease(c
->reply
);
1113 ln
= listSearchKey(server
.clients
,c
);
1115 listDelNode(server
.clients
,ln
);
1116 if (c
->flags
& REDIS_SLAVE
) {
1117 if (c
->replstate
== REDIS_REPL_SEND_BULK
&& c
->repldbfd
!= -1)
1119 list
*l
= (c
->flags
& REDIS_MONITOR
) ? server
.monitors
: server
.slaves
;
1120 ln
= listSearchKey(l
,c
);
1124 if (c
->flags
& REDIS_MASTER
) {
1125 server
.master
= NULL
;
1126 server
.replstate
= REDIS_REPL_CONNECT
;
1132 static void glueReplyBuffersIfNeeded(redisClient
*c
) {
1137 listRewind(c
->reply
);
1138 while((ln
= listYield(c
->reply
))) {
1140 totlen
+= sdslen(o
->ptr
);
1141 /* This optimization makes more sense if we don't have to copy
1143 if (totlen
> 1024) return;
1149 listRewind(c
->reply
);
1150 while((ln
= listYield(c
->reply
))) {
1152 memcpy(buf
+copylen
,o
->ptr
,sdslen(o
->ptr
));
1153 copylen
+= sdslen(o
->ptr
);
1154 listDelNode(c
->reply
,ln
);
1156 /* Now the output buffer is empty, add the new single element */
1157 o
= createObject(REDIS_STRING
,sdsnewlen(buf
,totlen
));
1158 if (!listAddNodeTail(c
->reply
,o
)) oom("listAddNodeTail");
1162 static void sendReplyToClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1163 redisClient
*c
= privdata
;
1164 int nwritten
= 0, totwritten
= 0, objlen
;
1167 REDIS_NOTUSED(mask
);
1169 if (server
.glueoutputbuf
&& listLength(c
->reply
) > 1)
1170 glueReplyBuffersIfNeeded(c
);
1171 while(listLength(c
->reply
)) {
1172 o
= listNodeValue(listFirst(c
->reply
));
1173 objlen
= sdslen(o
->ptr
);
1176 listDelNode(c
->reply
,listFirst(c
->reply
));
1180 if (c
->flags
& REDIS_MASTER
) {
1181 nwritten
= objlen
- c
->sentlen
;
1183 nwritten
= write(fd
, ((char*)o
->ptr
)+c
->sentlen
, objlen
- c
->sentlen
);
1184 if (nwritten
<= 0) break;
1186 c
->sentlen
+= nwritten
;
1187 totwritten
+= nwritten
;
1188 /* If we fully sent the object on head go to the next one */
1189 if (c
->sentlen
== objlen
) {
1190 listDelNode(c
->reply
,listFirst(c
->reply
));
1194 if (nwritten
== -1) {
1195 if (errno
== EAGAIN
) {
1198 redisLog(REDIS_DEBUG
,
1199 "Error writing to client: %s", strerror(errno
));
1204 if (totwritten
> 0) c
->lastinteraction
= time(NULL
);
1205 if (listLength(c
->reply
) == 0) {
1207 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1211 static struct redisCommand
*lookupCommand(char *name
) {
1213 while(cmdTable
[j
].name
!= NULL
) {
1214 if (!strcasecmp(name
,cmdTable
[j
].name
)) return &cmdTable
[j
];
1220 /* resetClient prepare the client to process the next command */
1221 static void resetClient(redisClient
*c
) {
1226 /* If this function gets called we already read a whole
1227 * command, argments are in the client argv/argc fields.
1228 * processCommand() execute the command or prepare the
1229 * server for a bulk read from the client.
1231 * If 1 is returned the client is still alive and valid and
1232 * and other operations can be performed by the caller. Otherwise
1233 * if 0 is returned the client was destroied (i.e. after QUIT). */
1234 static int processCommand(redisClient
*c
) {
1235 struct redisCommand
*cmd
;
1238 /* Free some memory if needed (maxmemory setting) */
1239 if (server
.maxmemory
) freeMemoryIfNeeded();
1241 /* The QUIT command is handled as a special case. Normal command
1242 * procs are unable to close the client connection safely */
1243 if (!strcasecmp(c
->argv
[0]->ptr
,"quit")) {
1247 cmd
= lookupCommand(c
->argv
[0]->ptr
);
1249 addReplySds(c
,sdsnew("-ERR unknown command\r\n"));
1252 } else if ((cmd
->arity
> 0 && cmd
->arity
!= c
->argc
) ||
1253 (c
->argc
< -cmd
->arity
)) {
1254 addReplySds(c
,sdsnew("-ERR wrong number of arguments\r\n"));
1257 } else if (server
.maxmemory
&& cmd
->flags
& REDIS_CMD_DENYOOM
&& zmalloc_used_memory() > server
.maxmemory
) {
1258 addReplySds(c
,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1261 } else if (cmd
->flags
& REDIS_CMD_BULK
&& c
->bulklen
== -1) {
1262 int bulklen
= atoi(c
->argv
[c
->argc
-1]->ptr
);
1264 decrRefCount(c
->argv
[c
->argc
-1]);
1265 if (bulklen
< 0 || bulklen
> 1024*1024*1024) {
1267 addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n"));
1272 c
->bulklen
= bulklen
+2; /* add two bytes for CR+LF */
1273 /* It is possible that the bulk read is already in the
1274 * buffer. Check this condition and handle it accordingly */
1275 if ((signed)sdslen(c
->querybuf
) >= c
->bulklen
) {
1276 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
1278 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
1283 /* Let's try to share objects on the command arguments vector */
1284 if (server
.shareobjects
) {
1286 for(j
= 1; j
< c
->argc
; j
++)
1287 c
->argv
[j
] = tryObjectSharing(c
->argv
[j
]);
1289 /* Check if the user is authenticated */
1290 if (server
.requirepass
&& !c
->authenticated
&& cmd
->proc
!= authCommand
) {
1291 addReplySds(c
,sdsnew("-ERR operation not permitted\r\n"));
1296 /* Exec the command */
1297 dirty
= server
.dirty
;
1299 if (server
.dirty
-dirty
!= 0 && listLength(server
.slaves
))
1300 replicationFeedSlaves(server
.slaves
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1301 if (listLength(server
.monitors
))
1302 replicationFeedSlaves(server
.monitors
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1303 server
.stat_numcommands
++;
1305 /* Prepare the client for the next command */
1306 if (c
->flags
& REDIS_CLOSE
) {
1314 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
) {
1318 /* (args*2)+1 is enough room for args, spaces, newlines */
1319 robj
*static_outv
[REDIS_STATIC_ARGS
*2+1];
1321 if (argc
<= REDIS_STATIC_ARGS
) {
1324 outv
= zmalloc(sizeof(robj
*)*(argc
*2+1));
1325 if (!outv
) oom("replicationFeedSlaves");
1328 for (j
= 0; j
< argc
; j
++) {
1329 if (j
!= 0) outv
[outc
++] = shared
.space
;
1330 if ((cmd
->flags
& REDIS_CMD_BULK
) && j
== argc
-1) {
1333 lenobj
= createObject(REDIS_STRING
,
1334 sdscatprintf(sdsempty(),"%d\r\n",sdslen(argv
[j
]->ptr
)));
1335 lenobj
->refcount
= 0;
1336 outv
[outc
++] = lenobj
;
1338 outv
[outc
++] = argv
[j
];
1340 outv
[outc
++] = shared
.crlf
;
1342 /* Increment all the refcounts at start and decrement at end in order to
1343 * be sure to free objects if there is no slave in a replication state
1344 * able to be feed with commands */
1345 for (j
= 0; j
< outc
; j
++) incrRefCount(outv
[j
]);
1347 while((ln
= listYield(slaves
))) {
1348 redisClient
*slave
= ln
->value
;
1350 /* Don't feed slaves that are still waiting for BGSAVE to start */
1351 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) continue;
1353 /* Feed all the other slaves, MONITORs and so on */
1354 if (slave
->slaveseldb
!= dictid
) {
1358 case 0: selectcmd
= shared
.select0
; break;
1359 case 1: selectcmd
= shared
.select1
; break;
1360 case 2: selectcmd
= shared
.select2
; break;
1361 case 3: selectcmd
= shared
.select3
; break;
1362 case 4: selectcmd
= shared
.select4
; break;
1363 case 5: selectcmd
= shared
.select5
; break;
1364 case 6: selectcmd
= shared
.select6
; break;
1365 case 7: selectcmd
= shared
.select7
; break;
1366 case 8: selectcmd
= shared
.select8
; break;
1367 case 9: selectcmd
= shared
.select9
; break;
1369 selectcmd
= createObject(REDIS_STRING
,
1370 sdscatprintf(sdsempty(),"select %d\r\n",dictid
));
1371 selectcmd
->refcount
= 0;
1374 addReply(slave
,selectcmd
);
1375 slave
->slaveseldb
= dictid
;
1377 for (j
= 0; j
< outc
; j
++) addReply(slave
,outv
[j
]);
1379 for (j
= 0; j
< outc
; j
++) decrRefCount(outv
[j
]);
1380 if (outv
!= static_outv
) zfree(outv
);
1383 static void readQueryFromClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1384 redisClient
*c
= (redisClient
*) privdata
;
1385 char buf
[REDIS_IOBUF_LEN
];
1388 REDIS_NOTUSED(mask
);
1390 nread
= read(fd
, buf
, REDIS_IOBUF_LEN
);
1392 if (errno
== EAGAIN
) {
1395 redisLog(REDIS_DEBUG
, "Reading from client: %s",strerror(errno
));
1399 } else if (nread
== 0) {
1400 redisLog(REDIS_DEBUG
, "Client closed connection");
1405 c
->querybuf
= sdscatlen(c
->querybuf
, buf
, nread
);
1406 c
->lastinteraction
= time(NULL
);
1412 if (c
->bulklen
== -1) {
1413 /* Read the first line of the query */
1414 char *p
= strchr(c
->querybuf
,'\n');
1420 query
= c
->querybuf
;
1421 c
->querybuf
= sdsempty();
1422 querylen
= 1+(p
-(query
));
1423 if (sdslen(query
) > querylen
) {
1424 /* leave data after the first line of the query in the buffer */
1425 c
->querybuf
= sdscatlen(c
->querybuf
,query
+querylen
,sdslen(query
)-querylen
);
1427 *p
= '\0'; /* remove "\n" */
1428 if (*(p
-1) == '\r') *(p
-1) = '\0'; /* and "\r" if any */
1429 sdsupdatelen(query
);
1431 /* Now we can split the query in arguments */
1432 if (sdslen(query
) == 0) {
1433 /* Ignore empty query */
1437 argv
= sdssplitlen(query
,sdslen(query
)," ",1,&argc
);
1438 if (argv
== NULL
) oom("sdssplitlen");
1441 if (c
->argv
) zfree(c
->argv
);
1442 c
->argv
= zmalloc(sizeof(robj
*)*argc
);
1443 if (c
->argv
== NULL
) oom("allocating arguments list for client");
1445 for (j
= 0; j
< argc
; j
++) {
1446 if (sdslen(argv
[j
])) {
1447 c
->argv
[c
->argc
] = createObject(REDIS_STRING
,argv
[j
]);
1454 /* Execute the command. If the client is still valid
1455 * after processCommand() return and there is something
1456 * on the query buffer try to process the next command. */
1457 if (processCommand(c
) && sdslen(c
->querybuf
)) goto again
;
1459 } else if (sdslen(c
->querybuf
) >= 1024*32) {
1460 redisLog(REDIS_DEBUG
, "Client protocol error");
1465 /* Bulk read handling. Note that if we are at this point
1466 the client already sent a command terminated with a newline,
1467 we are reading the bulk data that is actually the last
1468 argument of the command. */
1469 int qbl
= sdslen(c
->querybuf
);
1471 if (c
->bulklen
<= qbl
) {
1472 /* Copy everything but the final CRLF as final argument */
1473 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
1475 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
1482 static int selectDb(redisClient
*c
, int id
) {
1483 if (id
< 0 || id
>= server
.dbnum
)
1485 c
->db
= &server
.db
[id
];
1489 static void *dupClientReplyValue(void *o
) {
1490 incrRefCount((robj
*)o
);
1494 static redisClient
*createClient(int fd
) {
1495 redisClient
*c
= zmalloc(sizeof(*c
));
1497 anetNonBlock(NULL
,fd
);
1498 anetTcpNoDelay(NULL
,fd
);
1499 if (!c
) return NULL
;
1502 c
->querybuf
= sdsempty();
1508 c
->lastinteraction
= time(NULL
);
1509 c
->authenticated
= 0;
1510 c
->replstate
= REDIS_REPL_NONE
;
1511 if ((c
->reply
= listCreate()) == NULL
) oom("listCreate");
1512 listSetFreeMethod(c
->reply
,decrRefCount
);
1513 listSetDupMethod(c
->reply
,dupClientReplyValue
);
1514 if (aeCreateFileEvent(server
.el
, c
->fd
, AE_READABLE
,
1515 readQueryFromClient
, c
, NULL
) == AE_ERR
) {
1519 if (!listAddNodeTail(server
.clients
,c
)) oom("listAddNodeTail");
1523 static void addReply(redisClient
*c
, robj
*obj
) {
1524 if (listLength(c
->reply
) == 0 &&
1525 (c
->replstate
== REDIS_REPL_NONE
||
1526 c
->replstate
== REDIS_REPL_ONLINE
) &&
1527 aeCreateFileEvent(server
.el
, c
->fd
, AE_WRITABLE
,
1528 sendReplyToClient
, c
, NULL
) == AE_ERR
) return;
1529 if (!listAddNodeTail(c
->reply
,obj
)) oom("listAddNodeTail");
1533 static void addReplySds(redisClient
*c
, sds s
) {
1534 robj
*o
= createObject(REDIS_STRING
,s
);
1539 static void acceptHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1544 REDIS_NOTUSED(mask
);
1545 REDIS_NOTUSED(privdata
);
1547 cfd
= anetAccept(server
.neterr
, fd
, cip
, &cport
);
1548 if (cfd
== AE_ERR
) {
1549 redisLog(REDIS_DEBUG
,"Accepting client connection: %s", server
.neterr
);
1552 redisLog(REDIS_DEBUG
,"Accepted %s:%d", cip
, cport
);
1553 if ((c
= createClient(cfd
)) == NULL
) {
1554 redisLog(REDIS_WARNING
,"Error allocating resoures for the client");
1555 close(cfd
); /* May be already closed, just ingore errors */
1558 /* If maxclient directive is set and this is one client more... close the
1559 * connection. Note that we create the client instead to check before
1560 * for this condition, since now the socket is already set in nonblocking
1561 * mode and we can send an error for free using the Kernel I/O */
1562 if (server
.maxclients
&& listLength(server
.clients
) > server
.maxclients
) {
1563 char *err
= "-ERR max number of clients reached\r\n";
1565 /* That's a best effort error message, don't check write errors */
1566 (void) write(c
->fd
,err
,strlen(err
));
1570 server
.stat_numconnections
++;
1573 /* ======================= Redis objects implementation ===================== */
1575 static robj
*createObject(int type
, void *ptr
) {
1578 if (listLength(server
.objfreelist
)) {
1579 listNode
*head
= listFirst(server
.objfreelist
);
1580 o
= listNodeValue(head
);
1581 listDelNode(server
.objfreelist
,head
);
1583 o
= zmalloc(sizeof(*o
));
1585 if (!o
) oom("createObject");
1592 static robj
*createStringObject(char *ptr
, size_t len
) {
1593 return createObject(REDIS_STRING
,sdsnewlen(ptr
,len
));
1596 static robj
*createListObject(void) {
1597 list
*l
= listCreate();
1599 if (!l
) oom("listCreate");
1600 listSetFreeMethod(l
,decrRefCount
);
1601 return createObject(REDIS_LIST
,l
);
1604 static robj
*createSetObject(void) {
1605 dict
*d
= dictCreate(&setDictType
,NULL
);
1606 if (!d
) oom("dictCreate");
1608 return createObject(REDIS_SET
,d
);
1611 static void freeStringObject(robj
*o
) {
1615 static void freeListObject(robj
*o
) {
1616 listRelease((list
*) o
->ptr
);
1619 static void freeSetObject(robj
*o
) {
1620 dictRelease((dict
*) o
->ptr
);
1623 static void freeHashObject(robj
*o
) {
1624 dictRelease((dict
*) o
->ptr
);
1627 static void incrRefCount(robj
*o
) {
1629 #ifdef DEBUG_REFCOUNT
1630 if (o
->type
== REDIS_STRING
)
1631 printf("Increment '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
);
1635 static void decrRefCount(void *obj
) {
1638 #ifdef DEBUG_REFCOUNT
1639 if (o
->type
== REDIS_STRING
)
1640 printf("Decrement '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
-1);
1642 if (--(o
->refcount
) == 0) {
1644 case REDIS_STRING
: freeStringObject(o
); break;
1645 case REDIS_LIST
: freeListObject(o
); break;
1646 case REDIS_SET
: freeSetObject(o
); break;
1647 case REDIS_HASH
: freeHashObject(o
); break;
1648 default: assert(0 != 0); break;
1650 if (listLength(server
.objfreelist
) > REDIS_OBJFREELIST_MAX
||
1651 !listAddNodeHead(server
.objfreelist
,o
))
1656 /* Try to share an object against the shared objects pool */
1657 static robj
*tryObjectSharing(robj
*o
) {
1658 struct dictEntry
*de
;
1661 if (o
== NULL
|| server
.shareobjects
== 0) return o
;
1663 assert(o
->type
== REDIS_STRING
);
1664 de
= dictFind(server
.sharingpool
,o
);
1666 robj
*shared
= dictGetEntryKey(de
);
1668 c
= ((unsigned long) dictGetEntryVal(de
))+1;
1669 dictGetEntryVal(de
) = (void*) c
;
1670 incrRefCount(shared
);
1674 /* Here we are using a stream algorihtm: Every time an object is
1675 * shared we increment its count, everytime there is a miss we
1676 * recrement the counter of a random object. If this object reaches
1677 * zero we remove the object and put the current object instead. */
1678 if (dictSize(server
.sharingpool
) >=
1679 server
.sharingpoolsize
) {
1680 de
= dictGetRandomKey(server
.sharingpool
);
1682 c
= ((unsigned long) dictGetEntryVal(de
))-1;
1683 dictGetEntryVal(de
) = (void*) c
;
1685 dictDelete(server
.sharingpool
,de
->key
);
1688 c
= 0; /* If the pool is empty we want to add this object */
1693 retval
= dictAdd(server
.sharingpool
,o
,(void*)1);
1694 assert(retval
== DICT_OK
);
1701 static robj
*lookupKey(redisDb
*db
, robj
*key
) {
1702 dictEntry
*de
= dictFind(db
->dict
,key
);
1703 return de
? dictGetEntryVal(de
) : NULL
;
1706 static robj
*lookupKeyRead(redisDb
*db
, robj
*key
) {
1707 expireIfNeeded(db
,key
);
1708 return lookupKey(db
,key
);
1711 static robj
*lookupKeyWrite(redisDb
*db
, robj
*key
) {
1712 deleteIfVolatile(db
,key
);
1713 return lookupKey(db
,key
);
1716 static int deleteKey(redisDb
*db
, robj
*key
) {
1719 /* We need to protect key from destruction: after the first dictDelete()
1720 * it may happen that 'key' is no longer valid if we don't increment
1721 * it's count. This may happen when we get the object reference directly
1722 * from the hash table with dictRandomKey() or dict iterators */
1724 if (dictSize(db
->expires
)) dictDelete(db
->expires
,key
);
1725 retval
= dictDelete(db
->dict
,key
);
1728 return retval
== DICT_OK
;
1731 /*============================ DB saving/loading ============================ */
1733 static int rdbSaveType(FILE *fp
, unsigned char type
) {
1734 if (fwrite(&type
,1,1,fp
) == 0) return -1;
1738 static int rdbSaveTime(FILE *fp
, time_t t
) {
1739 int32_t t32
= (int32_t) t
;
1740 if (fwrite(&t32
,4,1,fp
) == 0) return -1;
1744 /* check rdbLoadLen() comments for more info */
1745 static int rdbSaveLen(FILE *fp
, uint32_t len
) {
1746 unsigned char buf
[2];
1749 /* Save a 6 bit len */
1750 buf
[0] = (len
&0xFF)|(REDIS_RDB_6BITLEN
<<6);
1751 if (fwrite(buf
,1,1,fp
) == 0) return -1;
1752 } else if (len
< (1<<14)) {
1753 /* Save a 14 bit len */
1754 buf
[0] = ((len
>>8)&0xFF)|(REDIS_RDB_14BITLEN
<<6);
1756 if (fwrite(buf
,2,1,fp
) == 0) return -1;
1758 /* Save a 32 bit len */
1759 buf
[0] = (REDIS_RDB_32BITLEN
<<6);
1760 if (fwrite(buf
,1,1,fp
) == 0) return -1;
1762 if (fwrite(&len
,4,1,fp
) == 0) return -1;
1767 /* String objects in the form "2391" "-100" without any space and with a
1768 * range of values that can fit in an 8, 16 or 32 bit signed value can be
1769 * encoded as integers to save space */
1770 int rdbTryIntegerEncoding(sds s
, unsigned char *enc
) {
1772 char *endptr
, buf
[32];
1774 /* Check if it's possible to encode this value as a number */
1775 value
= strtoll(s
, &endptr
, 10);
1776 if (endptr
[0] != '\0') return 0;
1777 snprintf(buf
,32,"%lld",value
);
1779 /* If the number converted back into a string is not identical
1780 * then it's not possible to encode the string as integer */
1781 if (strlen(buf
) != sdslen(s
) || memcmp(buf
,s
,sdslen(s
))) return 0;
1783 /* Finally check if it fits in our ranges */
1784 if (value
>= -(1<<7) && value
<= (1<<7)-1) {
1785 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT8
;
1786 enc
[1] = value
&0xFF;
1788 } else if (value
>= -(1<<15) && value
<= (1<<15)-1) {
1789 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT16
;
1790 enc
[1] = value
&0xFF;
1791 enc
[2] = (value
>>8)&0xFF;
1793 } else if (value
>= -((long long)1<<31) && value
<= ((long long)1<<31)-1) {
1794 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT32
;
1795 enc
[1] = value
&0xFF;
1796 enc
[2] = (value
>>8)&0xFF;
1797 enc
[3] = (value
>>16)&0xFF;
1798 enc
[4] = (value
>>24)&0xFF;
1805 static int rdbSaveLzfStringObject(FILE *fp
, robj
*obj
) {
1806 unsigned int comprlen
, outlen
;
1810 /* We require at least four bytes compression for this to be worth it */
1811 outlen
= sdslen(obj
->ptr
)-4;
1812 if (outlen
<= 0) return 0;
1813 if ((out
= zmalloc(outlen
+1)) == NULL
) return 0;
1814 comprlen
= lzf_compress(obj
->ptr
, sdslen(obj
->ptr
), out
, outlen
);
1815 if (comprlen
== 0) {
1819 /* Data compressed! Let's save it on disk */
1820 byte
= (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_LZF
;
1821 if (fwrite(&byte
,1,1,fp
) == 0) goto writeerr
;
1822 if (rdbSaveLen(fp
,comprlen
) == -1) goto writeerr
;
1823 if (rdbSaveLen(fp
,sdslen(obj
->ptr
)) == -1) goto writeerr
;
1824 if (fwrite(out
,comprlen
,1,fp
) == 0) goto writeerr
;
1833 /* Save a string objet as [len][data] on disk. If the object is a string
1834 * representation of an integer value we try to safe it in a special form */
1835 static int rdbSaveStringObject(FILE *fp
, robj
*obj
) {
1836 size_t len
= sdslen(obj
->ptr
);
1839 /* Try integer encoding */
1841 unsigned char buf
[5];
1842 if ((enclen
= rdbTryIntegerEncoding(obj
->ptr
,buf
)) > 0) {
1843 if (fwrite(buf
,enclen
,1,fp
) == 0) return -1;
1848 /* Try LZF compression - under 20 bytes it's unable to compress even
1849 * aaaaaaaaaaaaaaaaaa so skip it */
1850 if (1 && len
> 20) {
1853 retval
= rdbSaveLzfStringObject(fp
,obj
);
1854 if (retval
== -1) return -1;
1855 if (retval
> 0) return 0;
1856 /* retval == 0 means data can't be compressed, save the old way */
1859 /* Store verbatim */
1860 if (rdbSaveLen(fp
,len
) == -1) return -1;
1861 if (len
&& fwrite(obj
->ptr
,len
,1,fp
) == 0) return -1;
1865 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
1866 static int rdbSave(char *filename
) {
1867 dictIterator
*di
= NULL
;
1872 time_t now
= time(NULL
);
1874 snprintf(tmpfile
,256,"temp-%d.%ld.rdb",(int)time(NULL
),(long int)random());
1875 fp
= fopen(tmpfile
,"w");
1877 redisLog(REDIS_WARNING
, "Failed saving the DB: %s", strerror(errno
));
1880 if (fwrite("REDIS0001",9,1,fp
) == 0) goto werr
;
1881 for (j
= 0; j
< server
.dbnum
; j
++) {
1882 redisDb
*db
= server
.db
+j
;
1884 if (dictSize(d
) == 0) continue;
1885 di
= dictGetIterator(d
);
1891 /* Write the SELECT DB opcode */
1892 if (rdbSaveType(fp
,REDIS_SELECTDB
) == -1) goto werr
;
1893 if (rdbSaveLen(fp
,j
) == -1) goto werr
;
1895 /* Iterate this DB writing every entry */
1896 while((de
= dictNext(di
)) != NULL
) {
1897 robj
*key
= dictGetEntryKey(de
);
1898 robj
*o
= dictGetEntryVal(de
);
1899 time_t expiretime
= getExpire(db
,key
);
1901 /* Save the expire time */
1902 if (expiretime
!= -1) {
1903 /* If this key is already expired skip it */
1904 if (expiretime
< now
) continue;
1905 if (rdbSaveType(fp
,REDIS_EXPIRETIME
) == -1) goto werr
;
1906 if (rdbSaveTime(fp
,expiretime
) == -1) goto werr
;
1908 /* Save the key and associated value */
1909 if (rdbSaveType(fp
,o
->type
) == -1) goto werr
;
1910 if (rdbSaveStringObject(fp
,key
) == -1) goto werr
;
1911 if (o
->type
== REDIS_STRING
) {
1912 /* Save a string value */
1913 if (rdbSaveStringObject(fp
,o
) == -1) goto werr
;
1914 } else if (o
->type
== REDIS_LIST
) {
1915 /* Save a list value */
1916 list
*list
= o
->ptr
;
1920 if (rdbSaveLen(fp
,listLength(list
)) == -1) goto werr
;
1921 while((ln
= listYield(list
))) {
1922 robj
*eleobj
= listNodeValue(ln
);
1924 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
1926 } else if (o
->type
== REDIS_SET
) {
1927 /* Save a set value */
1929 dictIterator
*di
= dictGetIterator(set
);
1932 if (!set
) oom("dictGetIteraotr");
1933 if (rdbSaveLen(fp
,dictSize(set
)) == -1) goto werr
;
1934 while((de
= dictNext(di
)) != NULL
) {
1935 robj
*eleobj
= dictGetEntryKey(de
);
1937 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
1939 dictReleaseIterator(di
);
1944 dictReleaseIterator(di
);
1947 if (rdbSaveType(fp
,REDIS_EOF
) == -1) goto werr
;
1949 /* Make sure data will not remain on the OS's output buffers */
1954 /* Use RENAME to make sure the DB file is changed atomically only
1955 * if the generate DB file is ok. */
1956 if (rename(tmpfile
,filename
) == -1) {
1957 redisLog(REDIS_WARNING
,"Error moving temp DB file on the final destionation: %s", strerror(errno
));
1961 redisLog(REDIS_NOTICE
,"DB saved on disk");
1963 server
.lastsave
= time(NULL
);
1969 redisLog(REDIS_WARNING
,"Write error saving DB on disk: %s", strerror(errno
));
1970 if (di
) dictReleaseIterator(di
);
1974 static int rdbSaveBackground(char *filename
) {
1977 if (server
.bgsaveinprogress
) return REDIS_ERR
;
1978 if ((childpid
= fork()) == 0) {
1981 if (rdbSave(filename
) == REDIS_OK
) {
1988 if (childpid
== -1) {
1989 redisLog(REDIS_WARNING
,"Can't save in background: fork: %s",
1993 redisLog(REDIS_NOTICE
,"Background saving started by pid %d",childpid
);
1994 server
.bgsaveinprogress
= 1;
1997 return REDIS_OK
; /* unreached */
2000 static int rdbLoadType(FILE *fp
) {
2002 if (fread(&type
,1,1,fp
) == 0) return -1;
2006 static time_t rdbLoadTime(FILE *fp
) {
2008 if (fread(&t32
,4,1,fp
) == 0) return -1;
2009 return (time_t) t32
;
2012 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2013 * of this file for a description of how this are stored on disk.
2015 * isencoded is set to 1 if the readed length is not actually a length but
2016 * an "encoding type", check the above comments for more info */
2017 static uint32_t rdbLoadLen(FILE *fp
, int rdbver
, int *isencoded
) {
2018 unsigned char buf
[2];
2021 if (isencoded
) *isencoded
= 0;
2023 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
2028 if (fread(buf
,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2029 type
= (buf
[0]&0xC0)>>6;
2030 if (type
== REDIS_RDB_6BITLEN
) {
2031 /* Read a 6 bit len */
2033 } else if (type
== REDIS_RDB_ENCVAL
) {
2034 /* Read a 6 bit len encoding type */
2035 if (isencoded
) *isencoded
= 1;
2037 } else if (type
== REDIS_RDB_14BITLEN
) {
2038 /* Read a 14 bit len */
2039 if (fread(buf
+1,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2040 return ((buf
[0]&0x3F)<<8)|buf
[1];
2042 /* Read a 32 bit len */
2043 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
2049 static robj
*rdbLoadIntegerObject(FILE *fp
, int enctype
) {
2050 unsigned char enc
[4];
2053 if (enctype
== REDIS_RDB_ENC_INT8
) {
2054 if (fread(enc
,1,1,fp
) == 0) return NULL
;
2055 val
= (signed char)enc
[0];
2056 } else if (enctype
== REDIS_RDB_ENC_INT16
) {
2058 if (fread(enc
,2,1,fp
) == 0) return NULL
;
2059 v
= enc
[0]|(enc
[1]<<8);
2061 } else if (enctype
== REDIS_RDB_ENC_INT32
) {
2063 if (fread(enc
,4,1,fp
) == 0) return NULL
;
2064 v
= enc
[0]|(enc
[1]<<8)|(enc
[2]<<16)|(enc
[3]<<24);
2067 val
= 0; /* anti-warning */
2070 return createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",val
));
2073 static robj
*rdbLoadLzfStringObject(FILE*fp
, int rdbver
) {
2074 unsigned int len
, clen
;
2075 unsigned char *c
= NULL
;
2078 if ((clen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2079 if ((len
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2080 if ((c
= zmalloc(clen
)) == NULL
) goto err
;
2081 if ((val
= sdsnewlen(NULL
,len
)) == NULL
) goto err
;
2082 if (fread(c
,clen
,1,fp
) == 0) goto err
;
2083 if (lzf_decompress(c
,clen
,val
,len
) == 0) goto err
;
2085 return createObject(REDIS_STRING
,val
);
2092 static robj
*rdbLoadStringObject(FILE*fp
, int rdbver
) {
2097 len
= rdbLoadLen(fp
,rdbver
,&isencoded
);
2100 case REDIS_RDB_ENC_INT8
:
2101 case REDIS_RDB_ENC_INT16
:
2102 case REDIS_RDB_ENC_INT32
:
2103 return tryObjectSharing(rdbLoadIntegerObject(fp
,len
));
2104 case REDIS_RDB_ENC_LZF
:
2105 return tryObjectSharing(rdbLoadLzfStringObject(fp
,rdbver
));
2111 if (len
== REDIS_RDB_LENERR
) return NULL
;
2112 val
= sdsnewlen(NULL
,len
);
2113 if (len
&& fread(val
,len
,1,fp
) == 0) {
2117 return tryObjectSharing(createObject(REDIS_STRING
,val
));
2120 static int rdbLoad(char *filename
) {
2122 robj
*keyobj
= NULL
;
2124 int type
, retval
, rdbver
;
2125 dict
*d
= server
.db
[0].dict
;
2126 redisDb
*db
= server
.db
+0;
2128 time_t expiretime
= -1, now
= time(NULL
);
2130 fp
= fopen(filename
,"r");
2131 if (!fp
) return REDIS_ERR
;
2132 if (fread(buf
,9,1,fp
) == 0) goto eoferr
;
2134 if (memcmp(buf
,"REDIS",5) != 0) {
2136 redisLog(REDIS_WARNING
,"Wrong signature trying to load DB from file");
2139 rdbver
= atoi(buf
+5);
2142 redisLog(REDIS_WARNING
,"Can't handle RDB format version %d",rdbver
);
2149 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
2150 if (type
== REDIS_EXPIRETIME
) {
2151 if ((expiretime
= rdbLoadTime(fp
)) == -1) goto eoferr
;
2152 /* We read the time so we need to read the object type again */
2153 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
2155 if (type
== REDIS_EOF
) break;
2156 /* Handle SELECT DB opcode as a special case */
2157 if (type
== REDIS_SELECTDB
) {
2158 if ((dbid
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2160 if (dbid
>= (unsigned)server
.dbnum
) {
2161 redisLog(REDIS_WARNING
,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server
.dbnum
);
2164 db
= server
.db
+dbid
;
2169 if ((keyobj
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2171 if (type
== REDIS_STRING
) {
2172 /* Read string value */
2173 if ((o
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2174 } else if (type
== REDIS_LIST
|| type
== REDIS_SET
) {
2175 /* Read list/set value */
2178 if ((listlen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2180 o
= (type
== REDIS_LIST
) ? createListObject() : createSetObject();
2181 /* Load every single element of the list/set */
2185 if ((ele
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2186 if (type
== REDIS_LIST
) {
2187 if (!listAddNodeTail((list
*)o
->ptr
,ele
))
2188 oom("listAddNodeTail");
2190 if (dictAdd((dict
*)o
->ptr
,ele
,NULL
) == DICT_ERR
)
2197 /* Add the new object in the hash table */
2198 retval
= dictAdd(d
,keyobj
,o
);
2199 if (retval
== DICT_ERR
) {
2200 redisLog(REDIS_WARNING
,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj
->ptr
);
2203 /* Set the expire time if needed */
2204 if (expiretime
!= -1) {
2205 setExpire(db
,keyobj
,expiretime
);
2206 /* Delete this key if already expired */
2207 if (expiretime
< now
) deleteKey(db
,keyobj
);
2215 eoferr
: /* unexpected end of file is handled here with a fatal exit */
2216 if (keyobj
) decrRefCount(keyobj
);
2217 redisLog(REDIS_WARNING
,"Short read or OOM loading DB. Unrecoverable error, exiting now.");
2219 return REDIS_ERR
; /* Just to avoid warning */
2222 /*================================== Commands =============================== */
2224 static void authCommand(redisClient
*c
) {
2225 if (!server
.requirepass
|| !strcmp(c
->argv
[1]->ptr
, server
.requirepass
)) {
2226 c
->authenticated
= 1;
2227 addReply(c
,shared
.ok
);
2229 c
->authenticated
= 0;
2230 addReply(c
,shared
.err
);
2234 static void pingCommand(redisClient
*c
) {
2235 addReply(c
,shared
.pong
);
2238 static void echoCommand(redisClient
*c
) {
2239 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",
2240 (int)sdslen(c
->argv
[1]->ptr
)));
2241 addReply(c
,c
->argv
[1]);
2242 addReply(c
,shared
.crlf
);
2245 /*=================================== Strings =============================== */
2247 static void setGenericCommand(redisClient
*c
, int nx
) {
2250 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2251 if (retval
== DICT_ERR
) {
2253 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2254 incrRefCount(c
->argv
[2]);
2256 addReply(c
,shared
.czero
);
2260 incrRefCount(c
->argv
[1]);
2261 incrRefCount(c
->argv
[2]);
2264 removeExpire(c
->db
,c
->argv
[1]);
2265 addReply(c
, nx
? shared
.cone
: shared
.ok
);
2268 static void setCommand(redisClient
*c
) {
2269 setGenericCommand(c
,0);
2272 static void setnxCommand(redisClient
*c
) {
2273 setGenericCommand(c
,1);
2276 static void getCommand(redisClient
*c
) {
2277 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2280 addReply(c
,shared
.nullbulk
);
2282 if (o
->type
!= REDIS_STRING
) {
2283 addReply(c
,shared
.wrongtypeerr
);
2285 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(o
->ptr
)));
2287 addReply(c
,shared
.crlf
);
2292 static void getSetCommand(redisClient
*c
) {
2294 if (dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]) == DICT_ERR
) {
2295 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2297 incrRefCount(c
->argv
[1]);
2299 incrRefCount(c
->argv
[2]);
2301 removeExpire(c
->db
,c
->argv
[1]);
2304 static void mgetCommand(redisClient
*c
) {
2307 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",c
->argc
-1));
2308 for (j
= 1; j
< c
->argc
; j
++) {
2309 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[j
]);
2311 addReply(c
,shared
.nullbulk
);
2313 if (o
->type
!= REDIS_STRING
) {
2314 addReply(c
,shared
.nullbulk
);
2316 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(o
->ptr
)));
2318 addReply(c
,shared
.crlf
);
2324 static void incrDecrCommand(redisClient
*c
, long long incr
) {
2329 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2333 if (o
->type
!= REDIS_STRING
) {
2338 value
= strtoll(o
->ptr
, &eptr
, 10);
2343 o
= createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",value
));
2344 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],o
);
2345 if (retval
== DICT_ERR
) {
2346 dictReplace(c
->db
->dict
,c
->argv
[1],o
);
2347 removeExpire(c
->db
,c
->argv
[1]);
2349 incrRefCount(c
->argv
[1]);
2352 addReply(c
,shared
.colon
);
2354 addReply(c
,shared
.crlf
);
2357 static void incrCommand(redisClient
*c
) {
2358 incrDecrCommand(c
,1);
2361 static void decrCommand(redisClient
*c
) {
2362 incrDecrCommand(c
,-1);
2365 static void incrbyCommand(redisClient
*c
) {
2366 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
2367 incrDecrCommand(c
,incr
);
2370 static void decrbyCommand(redisClient
*c
) {
2371 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
2372 incrDecrCommand(c
,-incr
);
2375 /* ========================= Type agnostic commands ========================= */
2377 static void delCommand(redisClient
*c
) {
2380 for (j
= 1; j
< c
->argc
; j
++) {
2381 if (deleteKey(c
->db
,c
->argv
[j
])) {
2388 addReply(c
,shared
.czero
);
2391 addReply(c
,shared
.cone
);
2394 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",deleted
));
2399 static void existsCommand(redisClient
*c
) {
2400 addReply(c
,lookupKeyRead(c
->db
,c
->argv
[1]) ? shared
.cone
: shared
.czero
);
2403 static void selectCommand(redisClient
*c
) {
2404 int id
= atoi(c
->argv
[1]->ptr
);
2406 if (selectDb(c
,id
) == REDIS_ERR
) {
2407 addReplySds(c
,sdsnew("-ERR invalid DB index\r\n"));
2409 addReply(c
,shared
.ok
);
2413 static void randomkeyCommand(redisClient
*c
) {
2417 de
= dictGetRandomKey(c
->db
->dict
);
2418 if (!de
|| expireIfNeeded(c
->db
,dictGetEntryKey(de
)) == 0) break;
2421 addReply(c
,shared
.plus
);
2422 addReply(c
,shared
.crlf
);
2424 addReply(c
,shared
.plus
);
2425 addReply(c
,dictGetEntryKey(de
));
2426 addReply(c
,shared
.crlf
);
2430 static void keysCommand(redisClient
*c
) {
2433 sds pattern
= c
->argv
[1]->ptr
;
2434 int plen
= sdslen(pattern
);
2435 int numkeys
= 0, keyslen
= 0;
2436 robj
*lenobj
= createObject(REDIS_STRING
,NULL
);
2438 di
= dictGetIterator(c
->db
->dict
);
2439 if (!di
) oom("dictGetIterator");
2441 decrRefCount(lenobj
);
2442 while((de
= dictNext(di
)) != NULL
) {
2443 robj
*keyobj
= dictGetEntryKey(de
);
2445 sds key
= keyobj
->ptr
;
2446 if ((pattern
[0] == '*' && pattern
[1] == '\0') ||
2447 stringmatchlen(pattern
,plen
,key
,sdslen(key
),0)) {
2448 if (expireIfNeeded(c
->db
,keyobj
) == 0) {
2450 addReply(c
,shared
.space
);
2453 keyslen
+= sdslen(key
);
2457 dictReleaseIterator(di
);
2458 lenobj
->ptr
= sdscatprintf(sdsempty(),"$%lu\r\n",keyslen
+(numkeys
? (numkeys
-1) : 0));
2459 addReply(c
,shared
.crlf
);
2462 static void dbsizeCommand(redisClient
*c
) {
2464 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c
->db
->dict
)));
2467 static void lastsaveCommand(redisClient
*c
) {
2469 sdscatprintf(sdsempty(),":%lu\r\n",server
.lastsave
));
2472 static void typeCommand(redisClient
*c
) {
2476 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2481 case REDIS_STRING
: type
= "+string"; break;
2482 case REDIS_LIST
: type
= "+list"; break;
2483 case REDIS_SET
: type
= "+set"; break;
2484 default: type
= "unknown"; break;
2487 addReplySds(c
,sdsnew(type
));
2488 addReply(c
,shared
.crlf
);
2491 static void saveCommand(redisClient
*c
) {
2492 if (server
.bgsaveinprogress
) {
2493 addReplySds(c
,sdsnew("-ERR background save in progress\r\n"));
2496 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
2497 addReply(c
,shared
.ok
);
2499 addReply(c
,shared
.err
);
2503 static void bgsaveCommand(redisClient
*c
) {
2504 if (server
.bgsaveinprogress
) {
2505 addReplySds(c
,sdsnew("-ERR background save already in progress\r\n"));
2508 if (rdbSaveBackground(server
.dbfilename
) == REDIS_OK
) {
2509 addReply(c
,shared
.ok
);
2511 addReply(c
,shared
.err
);
2515 static void shutdownCommand(redisClient
*c
) {
2516 redisLog(REDIS_WARNING
,"User requested shutdown, saving DB...");
2517 /* XXX: TODO kill the child if there is a bgsave in progress */
2518 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
2519 if (server
.daemonize
) {
2520 unlink(server
.pidfile
);
2522 redisLog(REDIS_WARNING
,"%zu bytes used at exit",zmalloc_used_memory());
2523 redisLog(REDIS_WARNING
,"Server exit now, bye bye...");
2526 redisLog(REDIS_WARNING
,"Error trying to save the DB, can't exit");
2527 addReplySds(c
,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
2531 static void renameGenericCommand(redisClient
*c
, int nx
) {
2534 /* To use the same key as src and dst is probably an error */
2535 if (sdscmp(c
->argv
[1]->ptr
,c
->argv
[2]->ptr
) == 0) {
2536 addReply(c
,shared
.sameobjecterr
);
2540 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2542 addReply(c
,shared
.nokeyerr
);
2546 deleteIfVolatile(c
->db
,c
->argv
[2]);
2547 if (dictAdd(c
->db
->dict
,c
->argv
[2],o
) == DICT_ERR
) {
2550 addReply(c
,shared
.czero
);
2553 dictReplace(c
->db
->dict
,c
->argv
[2],o
);
2555 incrRefCount(c
->argv
[2]);
2557 deleteKey(c
->db
,c
->argv
[1]);
2559 addReply(c
,nx
? shared
.cone
: shared
.ok
);
2562 static void renameCommand(redisClient
*c
) {
2563 renameGenericCommand(c
,0);
2566 static void renamenxCommand(redisClient
*c
) {
2567 renameGenericCommand(c
,1);
2570 static void moveCommand(redisClient
*c
) {
2575 /* Obtain source and target DB pointers */
2578 if (selectDb(c
,atoi(c
->argv
[2]->ptr
)) == REDIS_ERR
) {
2579 addReply(c
,shared
.outofrangeerr
);
2583 selectDb(c
,srcid
); /* Back to the source DB */
2585 /* If the user is moving using as target the same
2586 * DB as the source DB it is probably an error. */
2588 addReply(c
,shared
.sameobjecterr
);
2592 /* Check if the element exists and get a reference */
2593 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2595 addReply(c
,shared
.czero
);
2599 /* Try to add the element to the target DB */
2600 deleteIfVolatile(dst
,c
->argv
[1]);
2601 if (dictAdd(dst
->dict
,c
->argv
[1],o
) == DICT_ERR
) {
2602 addReply(c
,shared
.czero
);
2605 incrRefCount(c
->argv
[1]);
2608 /* OK! key moved, free the entry in the source DB */
2609 deleteKey(src
,c
->argv
[1]);
2611 addReply(c
,shared
.cone
);
2614 /* =================================== Lists ================================ */
2615 static void pushGenericCommand(redisClient
*c
, int where
) {
2619 lobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2621 lobj
= createListObject();
2623 if (where
== REDIS_HEAD
) {
2624 if (!listAddNodeHead(list
,c
->argv
[2])) oom("listAddNodeHead");
2626 if (!listAddNodeTail(list
,c
->argv
[2])) oom("listAddNodeTail");
2628 dictAdd(c
->db
->dict
,c
->argv
[1],lobj
);
2629 incrRefCount(c
->argv
[1]);
2630 incrRefCount(c
->argv
[2]);
2632 if (lobj
->type
!= REDIS_LIST
) {
2633 addReply(c
,shared
.wrongtypeerr
);
2637 if (where
== REDIS_HEAD
) {
2638 if (!listAddNodeHead(list
,c
->argv
[2])) oom("listAddNodeHead");
2640 if (!listAddNodeTail(list
,c
->argv
[2])) oom("listAddNodeTail");
2642 incrRefCount(c
->argv
[2]);
2645 addReply(c
,shared
.ok
);
2648 static void lpushCommand(redisClient
*c
) {
2649 pushGenericCommand(c
,REDIS_HEAD
);
2652 static void rpushCommand(redisClient
*c
) {
2653 pushGenericCommand(c
,REDIS_TAIL
);
2656 static void llenCommand(redisClient
*c
) {
2660 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2662 addReply(c
,shared
.czero
);
2665 if (o
->type
!= REDIS_LIST
) {
2666 addReply(c
,shared
.wrongtypeerr
);
2669 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",listLength(l
)));
2674 static void lindexCommand(redisClient
*c
) {
2676 int index
= atoi(c
->argv
[2]->ptr
);
2678 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2680 addReply(c
,shared
.nullbulk
);
2682 if (o
->type
!= REDIS_LIST
) {
2683 addReply(c
,shared
.wrongtypeerr
);
2685 list
*list
= o
->ptr
;
2688 ln
= listIndex(list
, index
);
2690 addReply(c
,shared
.nullbulk
);
2692 robj
*ele
= listNodeValue(ln
);
2693 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele
->ptr
)));
2695 addReply(c
,shared
.crlf
);
2701 static void lsetCommand(redisClient
*c
) {
2703 int index
= atoi(c
->argv
[2]->ptr
);
2705 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2707 addReply(c
,shared
.nokeyerr
);
2709 if (o
->type
!= REDIS_LIST
) {
2710 addReply(c
,shared
.wrongtypeerr
);
2712 list
*list
= o
->ptr
;
2715 ln
= listIndex(list
, index
);
2717 addReply(c
,shared
.outofrangeerr
);
2719 robj
*ele
= listNodeValue(ln
);
2722 listNodeValue(ln
) = c
->argv
[3];
2723 incrRefCount(c
->argv
[3]);
2724 addReply(c
,shared
.ok
);
2731 static void popGenericCommand(redisClient
*c
, int where
) {
2734 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2736 addReply(c
,shared
.nullbulk
);
2738 if (o
->type
!= REDIS_LIST
) {
2739 addReply(c
,shared
.wrongtypeerr
);
2741 list
*list
= o
->ptr
;
2744 if (where
== REDIS_HEAD
)
2745 ln
= listFirst(list
);
2747 ln
= listLast(list
);
2750 addReply(c
,shared
.nullbulk
);
2752 robj
*ele
= listNodeValue(ln
);
2753 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele
->ptr
)));
2755 addReply(c
,shared
.crlf
);
2756 listDelNode(list
,ln
);
2763 static void lpopCommand(redisClient
*c
) {
2764 popGenericCommand(c
,REDIS_HEAD
);
2767 static void rpopCommand(redisClient
*c
) {
2768 popGenericCommand(c
,REDIS_TAIL
);
2771 static void lrangeCommand(redisClient
*c
) {
2773 int start
= atoi(c
->argv
[2]->ptr
);
2774 int end
= atoi(c
->argv
[3]->ptr
);
2776 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2778 addReply(c
,shared
.nullmultibulk
);
2780 if (o
->type
!= REDIS_LIST
) {
2781 addReply(c
,shared
.wrongtypeerr
);
2783 list
*list
= o
->ptr
;
2785 int llen
= listLength(list
);
2789 /* convert negative indexes */
2790 if (start
< 0) start
= llen
+start
;
2791 if (end
< 0) end
= llen
+end
;
2792 if (start
< 0) start
= 0;
2793 if (end
< 0) end
= 0;
2795 /* indexes sanity checks */
2796 if (start
> end
|| start
>= llen
) {
2797 /* Out of range start or start > end result in empty list */
2798 addReply(c
,shared
.emptymultibulk
);
2801 if (end
>= llen
) end
= llen
-1;
2802 rangelen
= (end
-start
)+1;
2804 /* Return the result in form of a multi-bulk reply */
2805 ln
= listIndex(list
, start
);
2806 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",rangelen
));
2807 for (j
= 0; j
< rangelen
; j
++) {
2808 ele
= listNodeValue(ln
);
2809 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele
->ptr
)));
2811 addReply(c
,shared
.crlf
);
2818 static void ltrimCommand(redisClient
*c
) {
2820 int start
= atoi(c
->argv
[2]->ptr
);
2821 int end
= atoi(c
->argv
[3]->ptr
);
2823 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2825 addReply(c
,shared
.nokeyerr
);
2827 if (o
->type
!= REDIS_LIST
) {
2828 addReply(c
,shared
.wrongtypeerr
);
2830 list
*list
= o
->ptr
;
2832 int llen
= listLength(list
);
2833 int j
, ltrim
, rtrim
;
2835 /* convert negative indexes */
2836 if (start
< 0) start
= llen
+start
;
2837 if (end
< 0) end
= llen
+end
;
2838 if (start
< 0) start
= 0;
2839 if (end
< 0) end
= 0;
2841 /* indexes sanity checks */
2842 if (start
> end
|| start
>= llen
) {
2843 /* Out of range start or start > end result in empty list */
2847 if (end
>= llen
) end
= llen
-1;
2852 /* Remove list elements to perform the trim */
2853 for (j
= 0; j
< ltrim
; j
++) {
2854 ln
= listFirst(list
);
2855 listDelNode(list
,ln
);
2857 for (j
= 0; j
< rtrim
; j
++) {
2858 ln
= listLast(list
);
2859 listDelNode(list
,ln
);
2861 addReply(c
,shared
.ok
);
2867 static void lremCommand(redisClient
*c
) {
2870 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2872 addReply(c
,shared
.nokeyerr
);
2874 if (o
->type
!= REDIS_LIST
) {
2875 addReply(c
,shared
.wrongtypeerr
);
2877 list
*list
= o
->ptr
;
2878 listNode
*ln
, *next
;
2879 int toremove
= atoi(c
->argv
[2]->ptr
);
2884 toremove
= -toremove
;
2887 ln
= fromtail
? list
->tail
: list
->head
;
2889 robj
*ele
= listNodeValue(ln
);
2891 next
= fromtail
? ln
->prev
: ln
->next
;
2892 if (sdscmp(ele
->ptr
,c
->argv
[3]->ptr
) == 0) {
2893 listDelNode(list
,ln
);
2896 if (toremove
&& removed
== toremove
) break;
2900 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",removed
));
2905 /* ==================================== Sets ================================ */
2907 static void saddCommand(redisClient
*c
) {
2910 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2912 set
= createSetObject();
2913 dictAdd(c
->db
->dict
,c
->argv
[1],set
);
2914 incrRefCount(c
->argv
[1]);
2916 if (set
->type
!= REDIS_SET
) {
2917 addReply(c
,shared
.wrongtypeerr
);
2921 if (dictAdd(set
->ptr
,c
->argv
[2],NULL
) == DICT_OK
) {
2922 incrRefCount(c
->argv
[2]);
2924 addReply(c
,shared
.cone
);
2926 addReply(c
,shared
.czero
);
2930 static void sremCommand(redisClient
*c
) {
2933 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2935 addReply(c
,shared
.czero
);
2937 if (set
->type
!= REDIS_SET
) {
2938 addReply(c
,shared
.wrongtypeerr
);
2941 if (dictDelete(set
->ptr
,c
->argv
[2]) == DICT_OK
) {
2943 addReply(c
,shared
.cone
);
2945 addReply(c
,shared
.czero
);
2950 static void smoveCommand(redisClient
*c
) {
2951 robj
*srcset
, *dstset
;
2953 srcset
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2954 dstset
= lookupKeyWrite(c
->db
,c
->argv
[2]);
2956 /* If the source key does not exist return 0, if it's of the wrong type
2958 if (srcset
== NULL
|| srcset
->type
!= REDIS_SET
) {
2959 addReply(c
, srcset
? shared
.wrongtypeerr
: shared
.czero
);
2962 /* Error if the destination key is not a set as well */
2963 if (dstset
&& dstset
->type
!= REDIS_SET
) {
2964 addReply(c
,shared
.wrongtypeerr
);
2967 /* Remove the element from the source set */
2968 if (dictDelete(srcset
->ptr
,c
->argv
[3]) == DICT_ERR
) {
2969 /* Key not found in the src set! return zero */
2970 addReply(c
,shared
.czero
);
2974 /* Add the element to the destination set */
2976 dstset
= createSetObject();
2977 dictAdd(c
->db
->dict
,c
->argv
[2],dstset
);
2978 incrRefCount(c
->argv
[2]);
2980 if (dictAdd(dstset
->ptr
,c
->argv
[3],NULL
) == DICT_OK
)
2981 incrRefCount(c
->argv
[3]);
2982 addReply(c
,shared
.cone
);
2985 static void sismemberCommand(redisClient
*c
) {
2988 set
= lookupKeyRead(c
->db
,c
->argv
[1]);
2990 addReply(c
,shared
.czero
);
2992 if (set
->type
!= REDIS_SET
) {
2993 addReply(c
,shared
.wrongtypeerr
);
2996 if (dictFind(set
->ptr
,c
->argv
[2]))
2997 addReply(c
,shared
.cone
);
2999 addReply(c
,shared
.czero
);
3003 static void scardCommand(redisClient
*c
) {
3007 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3009 addReply(c
,shared
.czero
);
3012 if (o
->type
!= REDIS_SET
) {
3013 addReply(c
,shared
.wrongtypeerr
);
3016 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",
3022 static int qsortCompareSetsByCardinality(const void *s1
, const void *s2
) {
3023 dict
**d1
= (void*) s1
, **d2
= (void*) s2
;
3025 return dictSize(*d1
)-dictSize(*d2
);
3028 static void sinterGenericCommand(redisClient
*c
, robj
**setskeys
, int setsnum
, robj
*dstkey
) {
3029 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
3032 robj
*lenobj
= NULL
, *dstset
= NULL
;
3033 int j
, cardinality
= 0;
3035 if (!dv
) oom("sinterGenericCommand");
3036 for (j
= 0; j
< setsnum
; j
++) {
3040 lookupKeyWrite(c
->db
,setskeys
[j
]) :
3041 lookupKeyRead(c
->db
,setskeys
[j
]);
3045 deleteKey(c
->db
,dstkey
);
3046 addReply(c
,shared
.ok
);
3048 addReply(c
,shared
.nullmultibulk
);
3052 if (setobj
->type
!= REDIS_SET
) {
3054 addReply(c
,shared
.wrongtypeerr
);
3057 dv
[j
] = setobj
->ptr
;
3059 /* Sort sets from the smallest to largest, this will improve our
3060 * algorithm's performace */
3061 qsort(dv
,setsnum
,sizeof(dict
*),qsortCompareSetsByCardinality
);
3063 /* The first thing we should output is the total number of elements...
3064 * since this is a multi-bulk write, but at this stage we don't know
3065 * the intersection set size, so we use a trick, append an empty object
3066 * to the output list and save the pointer to later modify it with the
3069 lenobj
= createObject(REDIS_STRING
,NULL
);
3071 decrRefCount(lenobj
);
3073 /* If we have a target key where to store the resulting set
3074 * create this key with an empty set inside */
3075 dstset
= createSetObject();
3078 /* Iterate all the elements of the first (smallest) set, and test
3079 * the element against all the other sets, if at least one set does
3080 * not include the element it is discarded */
3081 di
= dictGetIterator(dv
[0]);
3082 if (!di
) oom("dictGetIterator");
3084 while((de
= dictNext(di
)) != NULL
) {
3087 for (j
= 1; j
< setsnum
; j
++)
3088 if (dictFind(dv
[j
],dictGetEntryKey(de
)) == NULL
) break;
3090 continue; /* at least one set does not contain the member */
3091 ele
= dictGetEntryKey(de
);
3093 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(ele
->ptr
)));
3095 addReply(c
,shared
.crlf
);
3098 dictAdd(dstset
->ptr
,ele
,NULL
);
3102 dictReleaseIterator(di
);
3105 /* Store the resulting set into the target */
3106 deleteKey(c
->db
,dstkey
);
3107 dictAdd(c
->db
->dict
,dstkey
,dstset
);
3108 incrRefCount(dstkey
);
3112 lenobj
->ptr
= sdscatprintf(sdsempty(),"*%d\r\n",cardinality
);
3114 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",
3115 dictSize((dict
*)dstset
->ptr
)));
3121 static void sinterCommand(redisClient
*c
) {
3122 sinterGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
);
3125 static void sinterstoreCommand(redisClient
*c
) {
3126 sinterGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1]);
3129 #define REDIS_OP_UNION 0
3130 #define REDIS_OP_DIFF 1
3132 static void sunionDiffGenericCommand(redisClient
*c
, robj
**setskeys
, int setsnum
, robj
*dstkey
, int op
) {
3133 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
3136 robj
*dstset
= NULL
;
3137 int j
, cardinality
= 0;
3139 if (!dv
) oom("sunionDiffGenericCommand");
3140 for (j
= 0; j
< setsnum
; j
++) {
3144 lookupKeyWrite(c
->db
,setskeys
[j
]) :
3145 lookupKeyRead(c
->db
,setskeys
[j
]);
3150 if (setobj
->type
!= REDIS_SET
) {
3152 addReply(c
,shared
.wrongtypeerr
);
3155 dv
[j
] = setobj
->ptr
;
3158 /* We need a temp set object to store our union. If the dstkey
3159 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
3160 * this set object will be the resulting object to set into the target key*/
3161 dstset
= createSetObject();
3163 /* Iterate all the elements of all the sets, add every element a single
3164 * time to the result set */
3165 for (j
= 0; j
< setsnum
; j
++) {
3166 if (op
== REDIS_OP_DIFF
&& j
== 0 && !dv
[j
]) break; /* result set is empty */
3167 if (!dv
[j
]) continue; /* non existing keys are like empty sets */
3169 di
= dictGetIterator(dv
[j
]);
3170 if (!di
) oom("dictGetIterator");
3172 while((de
= dictNext(di
)) != NULL
) {
3175 /* dictAdd will not add the same element multiple times */
3176 ele
= dictGetEntryKey(de
);
3177 if (op
== REDIS_OP_UNION
|| j
== 0) {
3178 if (dictAdd(dstset
->ptr
,ele
,NULL
) == DICT_OK
) {
3182 } else if (op
== REDIS_OP_DIFF
) {
3183 if (dictDelete(dstset
->ptr
,ele
) == DICT_OK
) {
3188 dictReleaseIterator(di
);
3190 if (op
== REDIS_OP_DIFF
&& cardinality
== 0) break; /* result set is empty */
3193 /* Output the content of the resulting set, if not in STORE mode */
3195 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",cardinality
));
3196 di
= dictGetIterator(dstset
->ptr
);
3197 if (!di
) oom("dictGetIterator");
3198 while((de
= dictNext(di
)) != NULL
) {
3201 ele
= dictGetEntryKey(de
);
3202 addReplySds(c
,sdscatprintf(sdsempty(),
3203 "$%d\r\n",sdslen(ele
->ptr
)));
3205 addReply(c
,shared
.crlf
);
3207 dictReleaseIterator(di
);
3209 /* If we have a target key where to store the resulting set
3210 * create this key with the result set inside */
3211 deleteKey(c
->db
,dstkey
);
3212 dictAdd(c
->db
->dict
,dstkey
,dstset
);
3213 incrRefCount(dstkey
);
3218 decrRefCount(dstset
);
3220 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",
3221 dictSize((dict
*)dstset
->ptr
)));
3227 static void sunionCommand(redisClient
*c
) {
3228 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_UNION
);
3231 static void sunionstoreCommand(redisClient
*c
) {
3232 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_UNION
);
3235 static void sdiffCommand(redisClient
*c
) {
3236 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_DIFF
);
3239 static void sdiffstoreCommand(redisClient
*c
) {
3240 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_DIFF
);
3243 static void flushdbCommand(redisClient
*c
) {
3244 server
.dirty
+= dictSize(c
->db
->dict
);
3245 dictEmpty(c
->db
->dict
);
3246 dictEmpty(c
->db
->expires
);
3247 addReply(c
,shared
.ok
);
3250 static void flushallCommand(redisClient
*c
) {
3251 server
.dirty
+= emptyDb();
3252 addReply(c
,shared
.ok
);
3253 rdbSave(server
.dbfilename
);
3257 redisSortOperation
*createSortOperation(int type
, robj
*pattern
) {
3258 redisSortOperation
*so
= zmalloc(sizeof(*so
));
3259 if (!so
) oom("createSortOperation");
3261 so
->pattern
= pattern
;
3265 /* Return the value associated to the key with a name obtained
3266 * substituting the first occurence of '*' in 'pattern' with 'subst' */
3267 robj
*lookupKeyByPattern(redisDb
*db
, robj
*pattern
, robj
*subst
) {
3271 int prefixlen
, sublen
, postfixlen
;
3272 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
3276 char buf
[REDIS_SORTKEY_MAX
+1];
3279 spat
= pattern
->ptr
;
3281 if (sdslen(spat
)+sdslen(ssub
)-1 > REDIS_SORTKEY_MAX
) return NULL
;
3282 p
= strchr(spat
,'*');
3283 if (!p
) return NULL
;
3286 sublen
= sdslen(ssub
);
3287 postfixlen
= sdslen(spat
)-(prefixlen
+1);
3288 memcpy(keyname
.buf
,spat
,prefixlen
);
3289 memcpy(keyname
.buf
+prefixlen
,ssub
,sublen
);
3290 memcpy(keyname
.buf
+prefixlen
+sublen
,p
+1,postfixlen
);
3291 keyname
.buf
[prefixlen
+sublen
+postfixlen
] = '\0';
3292 keyname
.len
= prefixlen
+sublen
+postfixlen
;
3294 keyobj
.refcount
= 1;
3295 keyobj
.type
= REDIS_STRING
;
3296 keyobj
.ptr
= ((char*)&keyname
)+(sizeof(long)*2);
3298 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
3299 return lookupKeyRead(db
,&keyobj
);
3302 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
3303 * the additional parameter is not standard but a BSD-specific we have to
3304 * pass sorting parameters via the global 'server' structure */
3305 static int sortCompare(const void *s1
, const void *s2
) {
3306 const redisSortObject
*so1
= s1
, *so2
= s2
;
3309 if (!server
.sort_alpha
) {
3310 /* Numeric sorting. Here it's trivial as we precomputed scores */
3311 if (so1
->u
.score
> so2
->u
.score
) {
3313 } else if (so1
->u
.score
< so2
->u
.score
) {
3319 /* Alphanumeric sorting */
3320 if (server
.sort_bypattern
) {
3321 if (!so1
->u
.cmpobj
|| !so2
->u
.cmpobj
) {
3322 /* At least one compare object is NULL */
3323 if (so1
->u
.cmpobj
== so2
->u
.cmpobj
)
3325 else if (so1
->u
.cmpobj
== NULL
)
3330 /* We have both the objects, use strcoll */
3331 cmp
= strcoll(so1
->u
.cmpobj
->ptr
,so2
->u
.cmpobj
->ptr
);
3334 /* Compare elements directly */
3335 cmp
= strcoll(so1
->obj
->ptr
,so2
->obj
->ptr
);
3338 return server
.sort_desc
? -cmp
: cmp
;
3341 /* The SORT command is the most complex command in Redis. Warning: this code
3342 * is optimized for speed and a bit less for readability */
3343 static void sortCommand(redisClient
*c
) {
3346 int desc
= 0, alpha
= 0;
3347 int limit_start
= 0, limit_count
= -1, start
, end
;
3348 int j
, dontsort
= 0, vectorlen
;
3349 int getop
= 0; /* GET operation counter */
3350 robj
*sortval
, *sortby
= NULL
;
3351 redisSortObject
*vector
; /* Resulting vector to sort */
3353 /* Lookup the key to sort. It must be of the right types */
3354 sortval
= lookupKeyRead(c
->db
,c
->argv
[1]);
3355 if (sortval
== NULL
) {
3356 addReply(c
,shared
.nokeyerr
);
3359 if (sortval
->type
!= REDIS_SET
&& sortval
->type
!= REDIS_LIST
) {
3360 addReply(c
,shared
.wrongtypeerr
);
3364 /* Create a list of operations to perform for every sorted element.
3365 * Operations can be GET/DEL/INCR/DECR */
3366 operations
= listCreate();
3367 listSetFreeMethod(operations
,zfree
);
3370 /* Now we need to protect sortval incrementing its count, in the future
3371 * SORT may have options able to overwrite/delete keys during the sorting
3372 * and the sorted key itself may get destroied */
3373 incrRefCount(sortval
);
3375 /* The SORT command has an SQL-alike syntax, parse it */
3376 while(j
< c
->argc
) {
3377 int leftargs
= c
->argc
-j
-1;
3378 if (!strcasecmp(c
->argv
[j
]->ptr
,"asc")) {
3380 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"desc")) {
3382 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"alpha")) {
3384 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"limit") && leftargs
>= 2) {
3385 limit_start
= atoi(c
->argv
[j
+1]->ptr
);
3386 limit_count
= atoi(c
->argv
[j
+2]->ptr
);
3388 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"by") && leftargs
>= 1) {
3389 sortby
= c
->argv
[j
+1];
3390 /* If the BY pattern does not contain '*', i.e. it is constant,
3391 * we don't need to sort nor to lookup the weight keys. */
3392 if (strchr(c
->argv
[j
+1]->ptr
,'*') == NULL
) dontsort
= 1;
3394 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"get") && leftargs
>= 1) {
3395 listAddNodeTail(operations
,createSortOperation(
3396 REDIS_SORT_GET
,c
->argv
[j
+1]));
3399 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"del") && leftargs
>= 1) {
3400 listAddNodeTail(operations
,createSortOperation(
3401 REDIS_SORT_DEL
,c
->argv
[j
+1]));
3403 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"incr") && leftargs
>= 1) {
3404 listAddNodeTail(operations
,createSortOperation(
3405 REDIS_SORT_INCR
,c
->argv
[j
+1]));
3407 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"get") && leftargs
>= 1) {
3408 listAddNodeTail(operations
,createSortOperation(
3409 REDIS_SORT_DECR
,c
->argv
[j
+1]));
3412 decrRefCount(sortval
);
3413 listRelease(operations
);
3414 addReply(c
,shared
.syntaxerr
);
3420 /* Load the sorting vector with all the objects to sort */
3421 vectorlen
= (sortval
->type
== REDIS_LIST
) ?
3422 listLength((list
*)sortval
->ptr
) :
3423 dictSize((dict
*)sortval
->ptr
);
3424 vector
= zmalloc(sizeof(redisSortObject
)*vectorlen
);
3425 if (!vector
) oom("allocating objects vector for SORT");
3427 if (sortval
->type
== REDIS_LIST
) {
3428 list
*list
= sortval
->ptr
;
3432 while((ln
= listYield(list
))) {
3433 robj
*ele
= ln
->value
;
3434 vector
[j
].obj
= ele
;
3435 vector
[j
].u
.score
= 0;
3436 vector
[j
].u
.cmpobj
= NULL
;
3440 dict
*set
= sortval
->ptr
;
3444 di
= dictGetIterator(set
);
3445 if (!di
) oom("dictGetIterator");
3446 while((setele
= dictNext(di
)) != NULL
) {
3447 vector
[j
].obj
= dictGetEntryKey(setele
);
3448 vector
[j
].u
.score
= 0;
3449 vector
[j
].u
.cmpobj
= NULL
;
3452 dictReleaseIterator(di
);
3454 assert(j
== vectorlen
);
3456 /* Now it's time to load the right scores in the sorting vector */
3457 if (dontsort
== 0) {
3458 for (j
= 0; j
< vectorlen
; j
++) {
3462 byval
= lookupKeyByPattern(c
->db
,sortby
,vector
[j
].obj
);
3463 if (!byval
|| byval
->type
!= REDIS_STRING
) continue;
3465 vector
[j
].u
.cmpobj
= byval
;
3466 incrRefCount(byval
);
3468 vector
[j
].u
.score
= strtod(byval
->ptr
,NULL
);
3471 if (!alpha
) vector
[j
].u
.score
= strtod(vector
[j
].obj
->ptr
,NULL
);
3476 /* We are ready to sort the vector... perform a bit of sanity check
3477 * on the LIMIT option too. We'll use a partial version of quicksort. */
3478 start
= (limit_start
< 0) ? 0 : limit_start
;
3479 end
= (limit_count
< 0) ? vectorlen
-1 : start
+limit_count
-1;
3480 if (start
>= vectorlen
) {
3481 start
= vectorlen
-1;
3484 if (end
>= vectorlen
) end
= vectorlen
-1;
3486 if (dontsort
== 0) {
3487 server
.sort_desc
= desc
;
3488 server
.sort_alpha
= alpha
;
3489 server
.sort_bypattern
= sortby
? 1 : 0;
3490 if (sortby
&& (start
!= 0 || end
!= vectorlen
-1))
3491 pqsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
, start
,end
);
3493 qsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
);
3496 /* Send command output to the output buffer, performing the specified
3497 * GET/DEL/INCR/DECR operations if any. */
3498 outputlen
= getop
? getop
*(end
-start
+1) : end
-start
+1;
3499 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",outputlen
));
3500 for (j
= start
; j
<= end
; j
++) {
3503 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",
3504 sdslen(vector
[j
].obj
->ptr
)));
3505 addReply(c
,vector
[j
].obj
);
3506 addReply(c
,shared
.crlf
);
3508 listRewind(operations
);
3509 while((ln
= listYield(operations
))) {
3510 redisSortOperation
*sop
= ln
->value
;
3511 robj
*val
= lookupKeyByPattern(c
->db
,sop
->pattern
,
3514 if (sop
->type
== REDIS_SORT_GET
) {
3515 if (!val
|| val
->type
!= REDIS_STRING
) {
3516 addReply(c
,shared
.nullbulk
);
3518 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",
3521 addReply(c
,shared
.crlf
);
3523 } else if (sop
->type
== REDIS_SORT_DEL
) {
3530 decrRefCount(sortval
);
3531 listRelease(operations
);
3532 for (j
= 0; j
< vectorlen
; j
++) {
3533 if (sortby
&& alpha
&& vector
[j
].u
.cmpobj
)
3534 decrRefCount(vector
[j
].u
.cmpobj
);
3539 static void infoCommand(redisClient
*c
) {
3541 time_t uptime
= time(NULL
)-server
.stat_starttime
;
3543 info
= sdscatprintf(sdsempty(),
3544 "redis_version:%s\r\n"
3545 "uptime_in_seconds:%d\r\n"
3546 "uptime_in_days:%d\r\n"
3547 "connected_clients:%d\r\n"
3548 "connected_slaves:%d\r\n"
3549 "used_memory:%zu\r\n"
3550 "changes_since_last_save:%lld\r\n"
3551 "bgsave_in_progress:%d\r\n"
3552 "last_save_time:%d\r\n"
3553 "total_connections_received:%lld\r\n"
3554 "total_commands_processed:%lld\r\n"
3559 listLength(server
.clients
)-listLength(server
.slaves
),
3560 listLength(server
.slaves
),
3563 server
.bgsaveinprogress
,
3565 server
.stat_numconnections
,
3566 server
.stat_numcommands
,
3567 server
.masterhost
== NULL
? "master" : "slave"
3569 if (server
.masterhost
) {
3570 info
= sdscatprintf(info
,
3571 "master_host:%s\r\n"
3572 "master_port:%d\r\n"
3573 "master_link_status:%s\r\n"
3574 "master_last_io_seconds_ago:%d\r\n"
3577 (server
.replstate
== REDIS_REPL_CONNECTED
) ?
3579 (int)(time(NULL
)-server
.master
->lastinteraction
)
3582 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info
)));
3583 addReplySds(c
,info
);
3584 addReply(c
,shared
.crlf
);
3587 static void monitorCommand(redisClient
*c
) {
3588 /* ignore MONITOR if aleady slave or in monitor mode */
3589 if (c
->flags
& REDIS_SLAVE
) return;
3591 c
->flags
|= (REDIS_SLAVE
|REDIS_MONITOR
);
3593 if (!listAddNodeTail(server
.monitors
,c
)) oom("listAddNodeTail");
3594 addReply(c
,shared
.ok
);
3597 /* ================================= Expire ================================= */
3598 static int removeExpire(redisDb
*db
, robj
*key
) {
3599 if (dictDelete(db
->expires
,key
) == DICT_OK
) {
3606 static int setExpire(redisDb
*db
, robj
*key
, time_t when
) {
3607 if (dictAdd(db
->expires
,key
,(void*)when
) == DICT_ERR
) {
3615 /* Return the expire time of the specified key, or -1 if no expire
3616 * is associated with this key (i.e. the key is non volatile) */
3617 static time_t getExpire(redisDb
*db
, robj
*key
) {
3620 /* No expire? return ASAP */
3621 if (dictSize(db
->expires
) == 0 ||
3622 (de
= dictFind(db
->expires
,key
)) == NULL
) return -1;
3624 return (time_t) dictGetEntryVal(de
);
3627 static int expireIfNeeded(redisDb
*db
, robj
*key
) {
3631 /* No expire? return ASAP */
3632 if (dictSize(db
->expires
) == 0 ||
3633 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
3635 /* Lookup the expire */
3636 when
= (time_t) dictGetEntryVal(de
);
3637 if (time(NULL
) <= when
) return 0;
3639 /* Delete the key */
3640 dictDelete(db
->expires
,key
);
3641 return dictDelete(db
->dict
,key
) == DICT_OK
;
3644 static int deleteIfVolatile(redisDb
*db
, robj
*key
) {
3647 /* No expire? return ASAP */
3648 if (dictSize(db
->expires
) == 0 ||
3649 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
3651 /* Delete the key */
3653 dictDelete(db
->expires
,key
);
3654 return dictDelete(db
->dict
,key
) == DICT_OK
;
3657 static void expireCommand(redisClient
*c
) {
3659 int seconds
= atoi(c
->argv
[2]->ptr
);
3661 de
= dictFind(c
->db
->dict
,c
->argv
[1]);
3663 addReply(c
,shared
.czero
);
3667 addReply(c
, shared
.czero
);
3670 time_t when
= time(NULL
)+seconds
;
3671 if (setExpire(c
->db
,c
->argv
[1],when
))
3672 addReply(c
,shared
.cone
);
3674 addReply(c
,shared
.czero
);
3679 static void ttlCommand(redisClient
*c
) {
3683 expire
= getExpire(c
->db
,c
->argv
[1]);
3685 ttl
= (int) (expire
-time(NULL
));
3686 if (ttl
< 0) ttl
= -1;
3688 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",ttl
));
3691 /* =============================== Replication ============================= */
3693 static int syncWrite(int fd
, char *ptr
, ssize_t size
, int timeout
) {
3694 ssize_t nwritten
, ret
= size
;
3695 time_t start
= time(NULL
);
3699 if (aeWait(fd
,AE_WRITABLE
,1000) & AE_WRITABLE
) {
3700 nwritten
= write(fd
,ptr
,size
);
3701 if (nwritten
== -1) return -1;
3705 if ((time(NULL
)-start
) > timeout
) {
3713 static int syncRead(int fd
, char *ptr
, ssize_t size
, int timeout
) {
3714 ssize_t nread
, totread
= 0;
3715 time_t start
= time(NULL
);
3719 if (aeWait(fd
,AE_READABLE
,1000) & AE_READABLE
) {
3720 nread
= read(fd
,ptr
,size
);
3721 if (nread
== -1) return -1;
3726 if ((time(NULL
)-start
) > timeout
) {
3734 static int syncReadLine(int fd
, char *ptr
, ssize_t size
, int timeout
) {
3741 if (syncRead(fd
,&c
,1,timeout
) == -1) return -1;
3744 if (nread
&& *(ptr
-1) == '\r') *(ptr
-1) = '\0';
3755 static void syncCommand(redisClient
*c
) {
3756 /* ignore SYNC if aleady slave or in monitor mode */
3757 if (c
->flags
& REDIS_SLAVE
) return;
3759 /* SYNC can't be issued when the server has pending data to send to
3760 * the client about already issued commands. We need a fresh reply
3761 * buffer registering the differences between the BGSAVE and the current
3762 * dataset, so that we can copy to other slaves if needed. */
3763 if (listLength(c
->reply
) != 0) {
3764 addReplySds(c
,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
3768 redisLog(REDIS_NOTICE
,"Slave ask for synchronization");
3769 /* Here we need to check if there is a background saving operation
3770 * in progress, or if it is required to start one */
3771 if (server
.bgsaveinprogress
) {
3772 /* Ok a background save is in progress. Let's check if it is a good
3773 * one for replication, i.e. if there is another slave that is
3774 * registering differences since the server forked to save */
3778 listRewind(server
.slaves
);
3779 while((ln
= listYield(server
.slaves
))) {
3781 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) break;
3784 /* Perfect, the server is already registering differences for
3785 * another slave. Set the right state, and copy the buffer. */
3786 listRelease(c
->reply
);
3787 c
->reply
= listDup(slave
->reply
);
3788 if (!c
->reply
) oom("listDup copying slave reply list");
3789 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
3790 redisLog(REDIS_NOTICE
,"Waiting for end of BGSAVE for SYNC");
3792 /* No way, we need to wait for the next BGSAVE in order to
3793 * register differences */
3794 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_START
;
3795 redisLog(REDIS_NOTICE
,"Waiting for next BGSAVE for SYNC");
3798 /* Ok we don't have a BGSAVE in progress, let's start one */
3799 redisLog(REDIS_NOTICE
,"Starting BGSAVE for SYNC");
3800 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
3801 redisLog(REDIS_NOTICE
,"Replication failed, can't BGSAVE");
3802 addReplySds(c
,sdsnew("-ERR Unalbe to perform background save\r\n"));
3805 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
3808 c
->flags
|= REDIS_SLAVE
;
3810 if (!listAddNodeTail(server
.slaves
,c
)) oom("listAddNodeTail");
3814 static void sendBulkToSlave(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
3815 redisClient
*slave
= privdata
;
3817 REDIS_NOTUSED(mask
);
3818 char buf
[REDIS_IOBUF_LEN
];
3819 ssize_t nwritten
, buflen
;
3821 if (slave
->repldboff
== 0) {
3822 /* Write the bulk write count before to transfer the DB. In theory here
3823 * we don't know how much room there is in the output buffer of the
3824 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
3825 * operations) will never be smaller than the few bytes we need. */
3828 bulkcount
= sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
3830 if (write(fd
,bulkcount
,sdslen(bulkcount
)) != (signed)sdslen(bulkcount
))
3838 lseek(slave
->repldbfd
,slave
->repldboff
,SEEK_SET
);
3839 buflen
= read(slave
->repldbfd
,buf
,REDIS_IOBUF_LEN
);
3841 redisLog(REDIS_WARNING
,"Read error sending DB to slave: %s",
3842 (buflen
== 0) ? "premature EOF" : strerror(errno
));
3846 if ((nwritten
= write(fd
,buf
,buflen
)) == -1) {
3847 redisLog(REDIS_DEBUG
,"Write error sending DB to slave: %s",
3852 slave
->repldboff
+= nwritten
;
3853 if (slave
->repldboff
== slave
->repldbsize
) {
3854 close(slave
->repldbfd
);
3855 slave
->repldbfd
= -1;
3856 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
3857 slave
->replstate
= REDIS_REPL_ONLINE
;
3858 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
,
3859 sendReplyToClient
, slave
, NULL
) == AE_ERR
) {
3863 addReplySds(slave
,sdsempty());
3864 redisLog(REDIS_NOTICE
,"Synchronization with slave succeeded");
3868 static void updateSalvesWaitingBgsave(int bgsaveerr
) {
3870 int startbgsave
= 0;
3872 listRewind(server
.slaves
);
3873 while((ln
= listYield(server
.slaves
))) {
3874 redisClient
*slave
= ln
->value
;
3876 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) {
3878 slave
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
3879 } else if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) {
3882 if (bgsaveerr
!= REDIS_OK
) {
3884 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE child returned an error");
3887 if ((slave
->repldbfd
= open(server
.dbfilename
,O_RDONLY
)) == -1 ||
3888 fstat(slave
->repldbfd
,&buf
) == -1) {
3890 redisLog(REDIS_WARNING
,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno
));
3893 slave
->repldboff
= 0;
3894 slave
->repldbsize
= buf
.st_size
;
3895 slave
->replstate
= REDIS_REPL_SEND_BULK
;
3896 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
3897 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
, sendBulkToSlave
, slave
, NULL
) == AE_ERR
) {
3904 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
3905 listRewind(server
.slaves
);
3906 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE failed");
3907 while((ln
= listYield(server
.slaves
))) {
3908 redisClient
*slave
= ln
->value
;
3910 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
)
3917 static int syncWithMaster(void) {
3918 char buf
[1024], tmpfile
[256];
3920 int fd
= anetTcpConnect(NULL
,server
.masterhost
,server
.masterport
);
3924 redisLog(REDIS_WARNING
,"Unable to connect to MASTER: %s",
3928 /* Issue the SYNC command */
3929 if (syncWrite(fd
,"SYNC \r\n",7,5) == -1) {
3931 redisLog(REDIS_WARNING
,"I/O error writing to MASTER: %s",
3935 /* Read the bulk write count */
3936 if (syncReadLine(fd
,buf
,1024,3600) == -1) {
3938 redisLog(REDIS_WARNING
,"I/O error reading bulk count from MASTER: %s",
3942 dumpsize
= atoi(buf
+1);
3943 redisLog(REDIS_NOTICE
,"Receiving %d bytes data dump from MASTER",dumpsize
);
3944 /* Read the bulk write data on a temp file */
3945 snprintf(tmpfile
,256,"temp-%d.%ld.rdb",(int)time(NULL
),(long int)random());
3946 dfd
= open(tmpfile
,O_CREAT
|O_WRONLY
,0644);
3949 redisLog(REDIS_WARNING
,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno
));
3953 int nread
, nwritten
;
3955 nread
= read(fd
,buf
,(dumpsize
< 1024)?dumpsize
:1024);
3957 redisLog(REDIS_WARNING
,"I/O error trying to sync with MASTER: %s",
3963 nwritten
= write(dfd
,buf
,nread
);
3964 if (nwritten
== -1) {
3965 redisLog(REDIS_WARNING
,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno
));
3973 if (rename(tmpfile
,server
.dbfilename
) == -1) {
3974 redisLog(REDIS_WARNING
,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno
));
3980 if (rdbLoad(server
.dbfilename
) != REDIS_OK
) {
3981 redisLog(REDIS_WARNING
,"Failed trying to load the MASTER synchronization DB from disk");
3985 server
.master
= createClient(fd
);
3986 server
.master
->flags
|= REDIS_MASTER
;
3987 server
.replstate
= REDIS_REPL_CONNECTED
;
3991 static void slaveofCommand(redisClient
*c
) {
3992 if (!strcasecmp(c
->argv
[1]->ptr
,"no") &&
3993 !strcasecmp(c
->argv
[2]->ptr
,"one")) {
3994 if (server
.masterhost
) {
3995 sdsfree(server
.masterhost
);
3996 server
.masterhost
= NULL
;
3997 if (server
.master
) freeClient(server
.master
);
3998 server
.replstate
= REDIS_REPL_NONE
;
3999 redisLog(REDIS_NOTICE
,"MASTER MODE enabled (user request)");
4002 sdsfree(server
.masterhost
);
4003 server
.masterhost
= sdsdup(c
->argv
[1]->ptr
);
4004 server
.masterport
= atoi(c
->argv
[2]->ptr
);
4005 if (server
.master
) freeClient(server
.master
);
4006 server
.replstate
= REDIS_REPL_CONNECT
;
4007 redisLog(REDIS_NOTICE
,"SLAVE OF %s:%d enabled (user request)",
4008 server
.masterhost
, server
.masterport
);
4010 addReply(c
,shared
.ok
);
4013 /* ============================ Maxmemory directive ======================== */
4015 /* This function gets called when 'maxmemory' is set on the config file to limit
4016 * the max memory used by the server, and we are out of memory.
4017 * This function will try to, in order:
4019 * - Free objects from the free list
4020 * - Try to remove keys with an EXPIRE set
4022 * It is not possible to free enough memory to reach used-memory < maxmemory
4023 * the server will start refusing commands that will enlarge even more the
4026 static void freeMemoryIfNeeded(void) {
4027 while (server
.maxmemory
&& zmalloc_used_memory() > server
.maxmemory
) {
4028 if (listLength(server
.objfreelist
)) {
4031 listNode
*head
= listFirst(server
.objfreelist
);
4032 o
= listNodeValue(head
);
4033 listDelNode(server
.objfreelist
,head
);
4036 int j
, k
, freed
= 0;
4038 for (j
= 0; j
< server
.dbnum
; j
++) {
4040 robj
*minkey
= NULL
;
4041 struct dictEntry
*de
;
4043 if (dictSize(server
.db
[j
].expires
)) {
4045 /* From a sample of three keys drop the one nearest to
4046 * the natural expire */
4047 for (k
= 0; k
< 3; k
++) {
4050 de
= dictGetRandomKey(server
.db
[j
].expires
);
4051 t
= (time_t) dictGetEntryVal(de
);
4052 if (minttl
== -1 || t
< minttl
) {
4053 minkey
= dictGetEntryKey(de
);
4057 deleteKey(server
.db
+j
,minkey
);
4060 if (!freed
) return; /* nothing to free... */
4065 /* ================================= Debugging ============================== */
4067 static void debugCommand(redisClient
*c
) {
4068 if (!strcasecmp(c
->argv
[1]->ptr
,"segfault")) {
4070 } else if (!strcasecmp(c
->argv
[1]->ptr
,"object") && c
->argc
== 3) {
4071 dictEntry
*de
= dictFind(c
->db
->dict
,c
->argv
[2]);
4075 addReply(c
,shared
.nokeyerr
);
4078 key
= dictGetEntryKey(de
);
4079 val
= dictGetEntryVal(de
);
4080 addReplySds(c
,sdscatprintf(sdsempty(),
4081 "+Key at:%p refcount:%d, value at:%p refcount:%d\r\n",
4082 key
, key
->refcount
, val
, val
->refcount
));
4084 addReplySds(c
,sdsnew(
4085 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>]\r\n"));
4089 /* =================================== Main! ================================ */
4092 int linuxOvercommitMemoryValue(void) {
4093 FILE *fp
= fopen("/proc/sys/vm/overcommit_memory","r");
4097 if (fgets(buf
,64,fp
) == NULL
) {
4106 void linuxOvercommitMemoryWarning(void) {
4107 if (linuxOvercommitMemoryValue() == 0) {
4108 redisLog(REDIS_WARNING
,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'echo 1 > /proc/sys/vm/overcommit_memory' in your init scripts.");
4111 #endif /* __linux__ */
4113 static void daemonize(void) {
4117 if (fork() != 0) exit(0); /* parent exits */
4118 setsid(); /* create a new session */
4120 /* Every output goes to /dev/null. If Redis is daemonized but
4121 * the 'logfile' is set to 'stdout' in the configuration file
4122 * it will not log at all. */
4123 if ((fd
= open("/dev/null", O_RDWR
, 0)) != -1) {
4124 dup2(fd
, STDIN_FILENO
);
4125 dup2(fd
, STDOUT_FILENO
);
4126 dup2(fd
, STDERR_FILENO
);
4127 if (fd
> STDERR_FILENO
) close(fd
);
4129 /* Try to write the pid file */
4130 fp
= fopen(server
.pidfile
,"w");
4132 fprintf(fp
,"%d\n",getpid());
4137 static void segvHandler (int sig
, siginfo_t
*info
, void *secret
) {
4140 char **messages
= (char **)NULL
;
4141 int i
, trace_size
= 0;
4142 ucontext_t
*uc
= (ucontext_t
*)secret
;
4144 /* Do something useful with siginfo_t */
4146 printf("Got signal %d, faulty address is %p, from %p\n", sig
, info
->si_addr
,
4147 (void *)uc
->uc_mcontext
.gregs
[REG_EIP
]);
4149 printf("Got signal %d\n", sig
);
4151 trace_size
= backtrace(trace
, 16);
4152 /* overwrite sigaction with caller's address */
4153 trace
[1] = (void *) uc
->uc_mcontext
.gregs
[REG_EIP
];
4155 messages
= backtrace_symbols(trace
, trace_size
);
4156 /* skip first stack frame (points here) */
4157 printf("[bt] Execution path:\n");
4158 for (i
=1; i
<trace_size
; ++i
)
4159 printf("[bt] %s\n", messages
[i
]);
4164 void setupSigAction(){
4165 struct sigaction act
;
4166 sigemptyset (&act
.sa_mask
);
4167 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction is used. Otherwise, sa_handler is used */
4168 act
.sa_flags
= SA_NODEFER
| SA_ONSTACK
| SA_RESETHAND
| SA_SIGINFO
;
4169 act
.sa_sigaction
= segvHandler
;
4170 sigaction (SIGSEGV
, &act
, NULL
);
4173 int main(int argc
, char **argv
) {
4176 linuxOvercommitMemoryWarning();
4181 ResetServerSaveParams();
4182 loadServerConfig(argv
[1]);
4183 } else if (argc
> 2) {
4184 fprintf(stderr
,"Usage: ./redis-server [/path/to/redis.conf]\n");
4187 redisLog(REDIS_WARNING
,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
4190 if (server
.daemonize
) daemonize();
4191 redisLog(REDIS_NOTICE
,"Server started, Redis version " REDIS_VERSION
);
4192 if (rdbLoad(server
.dbfilename
) == REDIS_OK
)
4193 redisLog(REDIS_NOTICE
,"DB loaded from disk");
4194 if (aeCreateFileEvent(server
.el
, server
.fd
, AE_READABLE
,
4195 acceptHandler
, NULL
, NULL
) == AE_ERR
) oom("creating file event");
4196 redisLog(REDIS_NOTICE
,"The server is now ready to accept connections on port %d", server
.port
);
4198 aeDeleteEventLoop(server
.el
);