2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
30 #define REDIS_VERSION "1.050"
40 #define __USE_POSIX199309
46 #endif /* HAVE_BACKTRACE */
54 #include <arpa/inet.h>
58 #include <sys/resource.h>
63 #include "solarisfixes.h"
67 #include "ae.h" /* Event driven programming library */
68 #include "sds.h" /* Dynamic safe strings */
69 #include "anet.h" /* Networking the easy way */
70 #include "dict.h" /* Hash tables */
71 #include "adlist.h" /* Linked lists */
72 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
73 #include "lzf.h" /* LZF compression library */
74 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
80 /* Static server configuration */
81 #define REDIS_SERVERPORT 6379 /* TCP port */
82 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
83 #define REDIS_IOBUF_LEN 1024
84 #define REDIS_LOADBUF_LEN 1024
85 #define REDIS_STATIC_ARGS 4
86 #define REDIS_DEFAULT_DBNUM 16
87 #define REDIS_CONFIGLINE_MAX 1024
88 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
89 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
90 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
91 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
92 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
94 /* Hash table parameters */
95 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
98 #define REDIS_CMD_BULK 1 /* Bulk write command */
99 #define REDIS_CMD_INLINE 2 /* Inline command */
100 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
101 this flags will return an error when the 'maxmemory' option is set in the
102 config file and the server is using more than maxmemory bytes of memory.
103 In short this commands are denied on low memory conditions. */
104 #define REDIS_CMD_DENYOOM 4
107 #define REDIS_STRING 0
113 /* Objects encoding */
114 #define REDIS_ENCODING_RAW 0 /* Raw representation */
115 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
117 /* Object types only used for dumping to disk */
118 #define REDIS_EXPIRETIME 253
119 #define REDIS_SELECTDB 254
120 #define REDIS_EOF 255
122 /* Defines related to the dump file format. To store 32 bits lengths for short
123 * keys requires a lot of space, so we check the most significant 2 bits of
124 * the first byte to interpreter the length:
126 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
127 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
128 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
129 * 11|000000 this means: specially encoded object will follow. The six bits
130 * number specify the kind of object that follows.
131 * See the REDIS_RDB_ENC_* defines.
133 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
134 * values, will fit inside. */
135 #define REDIS_RDB_6BITLEN 0
136 #define REDIS_RDB_14BITLEN 1
137 #define REDIS_RDB_32BITLEN 2
138 #define REDIS_RDB_ENCVAL 3
139 #define REDIS_RDB_LENERR UINT_MAX
141 /* When a length of a string object stored on disk has the first two bits
142 * set, the remaining two bits specify a special encoding for the object
143 * accordingly to the following defines: */
144 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
145 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
146 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
147 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
150 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
151 #define REDIS_SLAVE 2 /* This client is a slave server */
152 #define REDIS_MASTER 4 /* This client is a master server */
153 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
155 /* Slave replication state - slave side */
156 #define REDIS_REPL_NONE 0 /* No active replication */
157 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
158 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
160 /* Slave replication state - from the point of view of master
161 * Note that in SEND_BULK and ONLINE state the slave receives new updates
162 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
163 * to start the next background saving in order to send updates to it. */
164 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
165 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
166 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
167 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
169 /* List related stuff */
173 /* Sort operations */
174 #define REDIS_SORT_GET 0
175 #define REDIS_SORT_DEL 1
176 #define REDIS_SORT_INCR 2
177 #define REDIS_SORT_DECR 3
178 #define REDIS_SORT_ASC 4
179 #define REDIS_SORT_DESC 5
180 #define REDIS_SORTKEY_MAX 1024
183 #define REDIS_DEBUG 0
184 #define REDIS_NOTICE 1
185 #define REDIS_WARNING 2
187 /* Anti-warning macro... */
188 #define REDIS_NOTUSED(V) ((void) V)
190 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
191 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
193 /*================================= Data types ============================== */
195 /* A redis object, that is a type able to hold a string / list / set */
196 typedef struct redisObject
{
199 unsigned char encoding
;
200 unsigned char notused
[2];
204 typedef struct redisDb
{
210 /* With multiplexing we need to take per-clinet state.
211 * Clients are taken in a liked list. */
212 typedef struct redisClient
{
217 robj
**argv
, **mbargv
;
219 int bulklen
; /* bulk read len. -1 if not in bulk read mode */
220 int multibulk
; /* multi bulk command format active */
223 time_t lastinteraction
; /* time of the last interaction, used for timeout */
224 int flags
; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
225 int slaveseldb
; /* slave selected db, if this client is a slave */
226 int authenticated
; /* when requirepass is non-NULL */
227 int replstate
; /* replication state if this is a slave */
228 int repldbfd
; /* replication DB file descriptor */
229 long repldboff
; /* replication DB file offset */
230 off_t repldbsize
; /* replication DB file size */
238 /* Global server state structure */
244 unsigned int sharingpoolsize
;
245 long long dirty
; /* changes to DB from the last save */
247 list
*slaves
, *monitors
;
248 char neterr
[ANET_ERR_LEN
];
250 int cronloops
; /* number of times the cron function run */
251 list
*objfreelist
; /* A list of freed objects to avoid malloc() */
252 time_t lastsave
; /* Unix time of last save succeeede */
253 size_t usedmemory
; /* Used memory in megabytes */
254 /* Fields used only for stats */
255 time_t stat_starttime
; /* server start time */
256 long long stat_numcommands
; /* number of processed commands */
257 long long stat_numconnections
; /* number of connections received */
268 int bgsaveinprogress
;
269 pid_t bgsavechildpid
;
270 struct saveparam
*saveparams
;
275 char *appendfilename
;
278 /* Replication related */
282 redisClient
*master
; /* client that is master for this slave */
284 unsigned int maxclients
;
285 unsigned long maxmemory
;
286 /* Sort parameters - qsort_r() is only available under BSD so we
287 * have to take this state global, in order to pass it to sortCompare() */
293 typedef void redisCommandProc(redisClient
*c
);
294 struct redisCommand
{
296 redisCommandProc
*proc
;
301 struct redisFunctionSym
{
303 unsigned long pointer
;
306 typedef struct _redisSortObject
{
314 typedef struct _redisSortOperation
{
317 } redisSortOperation
;
319 /* ZSETs use a specialized version of Skiplists */
321 typedef struct zskiplistNode
{
322 struct zskiplistNode
**forward
;
323 struct zskiplistNode
*backward
;
328 typedef struct zskiplist
{
329 struct zskiplistNode
*header
, *tail
;
330 unsigned long length
;
334 typedef struct zset
{
339 /* Our shared "common" objects */
341 struct sharedObjectsStruct
{
342 robj
*crlf
, *ok
, *err
, *emptybulk
, *czero
, *cone
, *pong
, *space
,
343 *colon
, *nullbulk
, *nullmultibulk
,
344 *emptymultibulk
, *wrongtypeerr
, *nokeyerr
, *syntaxerr
, *sameobjecterr
,
345 *outofrangeerr
, *plus
,
346 *select0
, *select1
, *select2
, *select3
, *select4
,
347 *select5
, *select6
, *select7
, *select8
, *select9
;
350 /* Global vars that are actally used as constants. The following double
351 * values are used for double on-disk serialization, and are initialized
352 * at runtime to avoid strange compiler optimizations. */
354 static double R_Zero
, R_PosInf
, R_NegInf
, R_Nan
;
356 /*================================ Prototypes =============================== */
358 static void freeStringObject(robj
*o
);
359 static void freeListObject(robj
*o
);
360 static void freeSetObject(robj
*o
);
361 static void decrRefCount(void *o
);
362 static robj
*createObject(int type
, void *ptr
);
363 static void freeClient(redisClient
*c
);
364 static int rdbLoad(char *filename
);
365 static void addReply(redisClient
*c
, robj
*obj
);
366 static void addReplySds(redisClient
*c
, sds s
);
367 static void incrRefCount(robj
*o
);
368 static int rdbSaveBackground(char *filename
);
369 static robj
*createStringObject(char *ptr
, size_t len
);
370 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
);
371 static void feedAppendOnlyFile(struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
);
372 static int syncWithMaster(void);
373 static robj
*tryObjectSharing(robj
*o
);
374 static int tryObjectEncoding(robj
*o
);
375 static robj
*getDecodedObject(const robj
*o
);
376 static int removeExpire(redisDb
*db
, robj
*key
);
377 static int expireIfNeeded(redisDb
*db
, robj
*key
);
378 static int deleteIfVolatile(redisDb
*db
, robj
*key
);
379 static int deleteKey(redisDb
*db
, robj
*key
);
380 static time_t getExpire(redisDb
*db
, robj
*key
);
381 static int setExpire(redisDb
*db
, robj
*key
, time_t when
);
382 static void updateSlavesWaitingBgsave(int bgsaveerr
);
383 static void freeMemoryIfNeeded(void);
384 static int processCommand(redisClient
*c
);
385 static void setupSigSegvAction(void);
386 static void rdbRemoveTempFile(pid_t childpid
);
387 static size_t stringObjectLen(robj
*o
);
388 static void processInputBuffer(redisClient
*c
);
389 static zskiplist
*zslCreate(void);
390 static void zslFree(zskiplist
*zsl
);
391 static void zslInsert(zskiplist
*zsl
, double score
, robj
*obj
);
393 static void authCommand(redisClient
*c
);
394 static void pingCommand(redisClient
*c
);
395 static void echoCommand(redisClient
*c
);
396 static void setCommand(redisClient
*c
);
397 static void setnxCommand(redisClient
*c
);
398 static void getCommand(redisClient
*c
);
399 static void delCommand(redisClient
*c
);
400 static void existsCommand(redisClient
*c
);
401 static void incrCommand(redisClient
*c
);
402 static void decrCommand(redisClient
*c
);
403 static void incrbyCommand(redisClient
*c
);
404 static void decrbyCommand(redisClient
*c
);
405 static void selectCommand(redisClient
*c
);
406 static void randomkeyCommand(redisClient
*c
);
407 static void keysCommand(redisClient
*c
);
408 static void dbsizeCommand(redisClient
*c
);
409 static void lastsaveCommand(redisClient
*c
);
410 static void saveCommand(redisClient
*c
);
411 static void bgsaveCommand(redisClient
*c
);
412 static void shutdownCommand(redisClient
*c
);
413 static void moveCommand(redisClient
*c
);
414 static void renameCommand(redisClient
*c
);
415 static void renamenxCommand(redisClient
*c
);
416 static void lpushCommand(redisClient
*c
);
417 static void rpushCommand(redisClient
*c
);
418 static void lpopCommand(redisClient
*c
);
419 static void rpopCommand(redisClient
*c
);
420 static void llenCommand(redisClient
*c
);
421 static void lindexCommand(redisClient
*c
);
422 static void lrangeCommand(redisClient
*c
);
423 static void ltrimCommand(redisClient
*c
);
424 static void typeCommand(redisClient
*c
);
425 static void lsetCommand(redisClient
*c
);
426 static void saddCommand(redisClient
*c
);
427 static void sremCommand(redisClient
*c
);
428 static void smoveCommand(redisClient
*c
);
429 static void sismemberCommand(redisClient
*c
);
430 static void scardCommand(redisClient
*c
);
431 static void spopCommand(redisClient
*c
);
432 static void srandmemberCommand(redisClient
*c
);
433 static void sinterCommand(redisClient
*c
);
434 static void sinterstoreCommand(redisClient
*c
);
435 static void sunionCommand(redisClient
*c
);
436 static void sunionstoreCommand(redisClient
*c
);
437 static void sdiffCommand(redisClient
*c
);
438 static void sdiffstoreCommand(redisClient
*c
);
439 static void syncCommand(redisClient
*c
);
440 static void flushdbCommand(redisClient
*c
);
441 static void flushallCommand(redisClient
*c
);
442 static void sortCommand(redisClient
*c
);
443 static void lremCommand(redisClient
*c
);
444 static void infoCommand(redisClient
*c
);
445 static void mgetCommand(redisClient
*c
);
446 static void monitorCommand(redisClient
*c
);
447 static void expireCommand(redisClient
*c
);
448 static void expireatCommand(redisClient
*c
);
449 static void getsetCommand(redisClient
*c
);
450 static void ttlCommand(redisClient
*c
);
451 static void slaveofCommand(redisClient
*c
);
452 static void debugCommand(redisClient
*c
);
453 static void msetCommand(redisClient
*c
);
454 static void msetnxCommand(redisClient
*c
);
455 static void zaddCommand(redisClient
*c
);
456 static void zrangeCommand(redisClient
*c
);
457 static void zrangebyscoreCommand(redisClient
*c
);
458 static void zrevrangeCommand(redisClient
*c
);
459 static void zcardCommand(redisClient
*c
);
460 static void zremCommand(redisClient
*c
);
461 static void zscoreCommand(redisClient
*c
);
462 static void zremrangebyscoreCommand(redisClient
*c
);
464 /*================================= Globals ================================= */
467 static struct redisServer server
; /* server global state */
468 static struct redisCommand cmdTable
[] = {
469 {"get",getCommand
,2,REDIS_CMD_INLINE
},
470 {"set",setCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
471 {"setnx",setnxCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
472 {"del",delCommand
,-2,REDIS_CMD_INLINE
},
473 {"exists",existsCommand
,2,REDIS_CMD_INLINE
},
474 {"incr",incrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
475 {"decr",decrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
476 {"mget",mgetCommand
,-2,REDIS_CMD_INLINE
},
477 {"rpush",rpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
478 {"lpush",lpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
479 {"rpop",rpopCommand
,2,REDIS_CMD_INLINE
},
480 {"lpop",lpopCommand
,2,REDIS_CMD_INLINE
},
481 {"llen",llenCommand
,2,REDIS_CMD_INLINE
},
482 {"lindex",lindexCommand
,3,REDIS_CMD_INLINE
},
483 {"lset",lsetCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
484 {"lrange",lrangeCommand
,4,REDIS_CMD_INLINE
},
485 {"ltrim",ltrimCommand
,4,REDIS_CMD_INLINE
},
486 {"lrem",lremCommand
,4,REDIS_CMD_BULK
},
487 {"sadd",saddCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
488 {"srem",sremCommand
,3,REDIS_CMD_BULK
},
489 {"smove",smoveCommand
,4,REDIS_CMD_BULK
},
490 {"sismember",sismemberCommand
,3,REDIS_CMD_BULK
},
491 {"scard",scardCommand
,2,REDIS_CMD_INLINE
},
492 {"spop",spopCommand
,2,REDIS_CMD_INLINE
},
493 {"srandmember",srandmemberCommand
,2,REDIS_CMD_INLINE
},
494 {"sinter",sinterCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
495 {"sinterstore",sinterstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
496 {"sunion",sunionCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
497 {"sunionstore",sunionstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
498 {"sdiff",sdiffCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
499 {"sdiffstore",sdiffstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
500 {"smembers",sinterCommand
,2,REDIS_CMD_INLINE
},
501 {"zadd",zaddCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
502 {"zrem",zremCommand
,3,REDIS_CMD_BULK
},
503 {"zremrangebyscore",zremrangebyscoreCommand
,4,REDIS_CMD_INLINE
},
504 {"zrange",zrangeCommand
,4,REDIS_CMD_INLINE
},
505 {"zrangebyscore",zrangebyscoreCommand
,4,REDIS_CMD_INLINE
},
506 {"zrevrange",zrevrangeCommand
,4,REDIS_CMD_INLINE
},
507 {"zcard",zcardCommand
,2,REDIS_CMD_INLINE
},
508 {"zscore",zscoreCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
509 {"incrby",incrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
510 {"decrby",decrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
511 {"getset",getsetCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
512 {"mset",msetCommand
,-3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
513 {"msetnx",msetnxCommand
,-3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
514 {"randomkey",randomkeyCommand
,1,REDIS_CMD_INLINE
},
515 {"select",selectCommand
,2,REDIS_CMD_INLINE
},
516 {"move",moveCommand
,3,REDIS_CMD_INLINE
},
517 {"rename",renameCommand
,3,REDIS_CMD_INLINE
},
518 {"renamenx",renamenxCommand
,3,REDIS_CMD_INLINE
},
519 {"expire",expireCommand
,3,REDIS_CMD_INLINE
},
520 {"expireat",expireatCommand
,3,REDIS_CMD_INLINE
},
521 {"keys",keysCommand
,2,REDIS_CMD_INLINE
},
522 {"dbsize",dbsizeCommand
,1,REDIS_CMD_INLINE
},
523 {"auth",authCommand
,2,REDIS_CMD_INLINE
},
524 {"ping",pingCommand
,1,REDIS_CMD_INLINE
},
525 {"echo",echoCommand
,2,REDIS_CMD_BULK
},
526 {"save",saveCommand
,1,REDIS_CMD_INLINE
},
527 {"bgsave",bgsaveCommand
,1,REDIS_CMD_INLINE
},
528 {"shutdown",shutdownCommand
,1,REDIS_CMD_INLINE
},
529 {"lastsave",lastsaveCommand
,1,REDIS_CMD_INLINE
},
530 {"type",typeCommand
,2,REDIS_CMD_INLINE
},
531 {"sync",syncCommand
,1,REDIS_CMD_INLINE
},
532 {"flushdb",flushdbCommand
,1,REDIS_CMD_INLINE
},
533 {"flushall",flushallCommand
,1,REDIS_CMD_INLINE
},
534 {"sort",sortCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
535 {"info",infoCommand
,1,REDIS_CMD_INLINE
},
536 {"monitor",monitorCommand
,1,REDIS_CMD_INLINE
},
537 {"ttl",ttlCommand
,2,REDIS_CMD_INLINE
},
538 {"slaveof",slaveofCommand
,3,REDIS_CMD_INLINE
},
539 {"debug",debugCommand
,-2,REDIS_CMD_INLINE
},
542 /*============================ Utility functions ============================ */
544 /* Glob-style pattern matching. */
545 int stringmatchlen(const char *pattern
, int patternLen
,
546 const char *string
, int stringLen
, int nocase
)
551 while (pattern
[1] == '*') {
556 return 1; /* match */
558 if (stringmatchlen(pattern
+1, patternLen
-1,
559 string
, stringLen
, nocase
))
560 return 1; /* match */
564 return 0; /* no match */
568 return 0; /* no match */
578 not = pattern
[0] == '^';
585 if (pattern
[0] == '\\') {
588 if (pattern
[0] == string
[0])
590 } else if (pattern
[0] == ']') {
592 } else if (patternLen
== 0) {
596 } else if (pattern
[1] == '-' && patternLen
>= 3) {
597 int start
= pattern
[0];
598 int end
= pattern
[2];
606 start
= tolower(start
);
612 if (c
>= start
&& c
<= end
)
616 if (pattern
[0] == string
[0])
619 if (tolower((int)pattern
[0]) == tolower((int)string
[0]))
629 return 0; /* no match */
635 if (patternLen
>= 2) {
642 if (pattern
[0] != string
[0])
643 return 0; /* no match */
645 if (tolower((int)pattern
[0]) != tolower((int)string
[0]))
646 return 0; /* no match */
654 if (stringLen
== 0) {
655 while(*pattern
== '*') {
662 if (patternLen
== 0 && stringLen
== 0)
667 static void redisLog(int level
, const char *fmt
, ...) {
671 fp
= (server
.logfile
== NULL
) ? stdout
: fopen(server
.logfile
,"a");
675 if (level
>= server
.verbosity
) {
681 strftime(buf
,64,"%d %b %H:%M:%S",localtime(&now
));
682 fprintf(fp
,"%s %c ",buf
,c
[level
]);
683 vfprintf(fp
, fmt
, ap
);
689 if (server
.logfile
) fclose(fp
);
692 /*====================== Hash table type implementation ==================== */
694 /* This is an hash table type that uses the SDS dynamic strings libary as
695 * keys and radis objects as values (objects can hold SDS strings,
698 static void dictVanillaFree(void *privdata
, void *val
)
700 DICT_NOTUSED(privdata
);
704 static int sdsDictKeyCompare(void *privdata
, const void *key1
,
708 DICT_NOTUSED(privdata
);
710 l1
= sdslen((sds
)key1
);
711 l2
= sdslen((sds
)key2
);
712 if (l1
!= l2
) return 0;
713 return memcmp(key1
, key2
, l1
) == 0;
716 static void dictRedisObjectDestructor(void *privdata
, void *val
)
718 DICT_NOTUSED(privdata
);
723 static int dictObjKeyCompare(void *privdata
, const void *key1
,
726 const robj
*o1
= key1
, *o2
= key2
;
727 return sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
730 static unsigned int dictObjHash(const void *key
) {
732 return dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
735 static int dictEncObjKeyCompare(void *privdata
, const void *key1
,
738 const robj
*o1
= key1
, *o2
= key2
;
740 if (o1
->encoding
== REDIS_ENCODING_RAW
&&
741 o2
->encoding
== REDIS_ENCODING_RAW
)
742 return sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
747 dec1
= o1
->encoding
!= REDIS_ENCODING_RAW
?
748 getDecodedObject(o1
) : (robj
*)o1
;
749 dec2
= o2
->encoding
!= REDIS_ENCODING_RAW
?
750 getDecodedObject(o2
) : (robj
*)o2
;
751 cmp
= sdsDictKeyCompare(privdata
,dec1
->ptr
,dec2
->ptr
);
752 if (dec1
!= o1
) decrRefCount(dec1
);
753 if (dec2
!= o2
) decrRefCount(dec2
);
758 static unsigned int dictEncObjHash(const void *key
) {
761 if (o
->encoding
== REDIS_ENCODING_RAW
)
762 return dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
764 robj
*dec
= getDecodedObject(o
);
765 unsigned int hash
= dictGenHashFunction(dec
->ptr
, sdslen((sds
)dec
->ptr
));
771 static dictType setDictType
= {
772 dictEncObjHash
, /* hash function */
775 dictEncObjKeyCompare
, /* key compare */
776 dictRedisObjectDestructor
, /* key destructor */
777 NULL
/* val destructor */
780 static dictType zsetDictType
= {
781 dictEncObjHash
, /* hash function */
784 dictEncObjKeyCompare
, /* key compare */
785 dictRedisObjectDestructor
, /* key destructor */
786 dictVanillaFree
/* val destructor */
789 static dictType hashDictType
= {
790 dictObjHash
, /* hash function */
793 dictObjKeyCompare
, /* key compare */
794 dictRedisObjectDestructor
, /* key destructor */
795 dictRedisObjectDestructor
/* val destructor */
798 /* ========================= Random utility functions ======================= */
800 /* Redis generally does not try to recover from out of memory conditions
801 * when allocating objects or strings, it is not clear if it will be possible
802 * to report this condition to the client since the networking layer itself
803 * is based on heap allocation for send buffers, so we simply abort.
804 * At least the code will be simpler to read... */
805 static void oom(const char *msg
) {
806 fprintf(stderr
, "%s: Out of memory\n",msg
);
812 /* ====================== Redis server networking stuff ===================== */
813 static void closeTimedoutClients(void) {
816 time_t now
= time(NULL
);
818 listRewind(server
.clients
);
819 while ((ln
= listYield(server
.clients
)) != NULL
) {
820 c
= listNodeValue(ln
);
821 if (!(c
->flags
& REDIS_SLAVE
) && /* no timeout for slaves */
822 !(c
->flags
& REDIS_MASTER
) && /* no timeout for masters */
823 (now
- c
->lastinteraction
> server
.maxidletime
)) {
824 redisLog(REDIS_DEBUG
,"Closing idle client");
830 static int htNeedsResize(dict
*dict
) {
831 long long size
, used
;
833 size
= dictSlots(dict
);
834 used
= dictSize(dict
);
835 return (size
&& used
&& size
> DICT_HT_INITIAL_SIZE
&&
836 (used
*100/size
< REDIS_HT_MINFILL
));
839 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
840 * we resize the hash table to save memory */
841 static void tryResizeHashTables(void) {
844 for (j
= 0; j
< server
.dbnum
; j
++) {
845 if (htNeedsResize(server
.db
[j
].dict
)) {
846 redisLog(REDIS_DEBUG
,"The hash table %d is too sparse, resize it...",j
);
847 dictResize(server
.db
[j
].dict
);
848 redisLog(REDIS_DEBUG
,"Hash table %d resized.",j
);
850 if (htNeedsResize(server
.db
[j
].expires
))
851 dictResize(server
.db
[j
].expires
);
855 static int serverCron(struct aeEventLoop
*eventLoop
, long long id
, void *clientData
) {
856 int j
, loops
= server
.cronloops
++;
857 REDIS_NOTUSED(eventLoop
);
859 REDIS_NOTUSED(clientData
);
861 /* Update the global state with the amount of used memory */
862 server
.usedmemory
= zmalloc_used_memory();
864 /* Show some info about non-empty databases */
865 for (j
= 0; j
< server
.dbnum
; j
++) {
866 long long size
, used
, vkeys
;
868 size
= dictSlots(server
.db
[j
].dict
);
869 used
= dictSize(server
.db
[j
].dict
);
870 vkeys
= dictSize(server
.db
[j
].expires
);
871 if (!(loops
% 5) && (used
|| vkeys
)) {
872 redisLog(REDIS_DEBUG
,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j
,used
,vkeys
,size
);
873 /* dictPrintStats(server.dict); */
877 /* We don't want to resize the hash tables while a bacground saving
878 * is in progress: the saving child is created using fork() that is
879 * implemented with a copy-on-write semantic in most modern systems, so
880 * if we resize the HT while there is the saving child at work actually
881 * a lot of memory movements in the parent will cause a lot of pages
883 if (!server
.bgsaveinprogress
) tryResizeHashTables();
885 /* Show information about connected clients */
887 redisLog(REDIS_DEBUG
,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
888 listLength(server
.clients
)-listLength(server
.slaves
),
889 listLength(server
.slaves
),
891 dictSize(server
.sharingpool
));
894 /* Close connections of timedout clients */
895 if (server
.maxidletime
&& !(loops
% 10))
896 closeTimedoutClients();
898 /* Check if a background saving in progress terminated */
899 if (server
.bgsaveinprogress
) {
901 if (wait4(-1,&statloc
,WNOHANG
,NULL
)) {
902 int exitcode
= WEXITSTATUS(statloc
);
903 int bysignal
= WIFSIGNALED(statloc
);
905 if (!bysignal
&& exitcode
== 0) {
906 redisLog(REDIS_NOTICE
,
907 "Background saving terminated with success");
909 server
.lastsave
= time(NULL
);
910 } else if (!bysignal
&& exitcode
!= 0) {
911 redisLog(REDIS_WARNING
, "Background saving error");
913 redisLog(REDIS_WARNING
,
914 "Background saving terminated by signal");
915 rdbRemoveTempFile(server
.bgsavechildpid
);
917 server
.bgsaveinprogress
= 0;
918 server
.bgsavechildpid
= -1;
919 updateSlavesWaitingBgsave(exitcode
== 0 ? REDIS_OK
: REDIS_ERR
);
922 /* If there is not a background saving in progress check if
923 * we have to save now */
924 time_t now
= time(NULL
);
925 for (j
= 0; j
< server
.saveparamslen
; j
++) {
926 struct saveparam
*sp
= server
.saveparams
+j
;
928 if (server
.dirty
>= sp
->changes
&&
929 now
-server
.lastsave
> sp
->seconds
) {
930 redisLog(REDIS_NOTICE
,"%d changes in %d seconds. Saving...",
931 sp
->changes
, sp
->seconds
);
932 rdbSaveBackground(server
.dbfilename
);
938 /* Try to expire a few timed out keys */
939 for (j
= 0; j
< server
.dbnum
; j
++) {
940 redisDb
*db
= server
.db
+j
;
941 int num
= dictSize(db
->expires
);
944 time_t now
= time(NULL
);
946 if (num
> REDIS_EXPIRELOOKUPS_PER_CRON
)
947 num
= REDIS_EXPIRELOOKUPS_PER_CRON
;
952 if ((de
= dictGetRandomKey(db
->expires
)) == NULL
) break;
953 t
= (time_t) dictGetEntryVal(de
);
955 deleteKey(db
,dictGetEntryKey(de
));
961 /* Check if we should connect to a MASTER */
962 if (server
.replstate
== REDIS_REPL_CONNECT
) {
963 redisLog(REDIS_NOTICE
,"Connecting to MASTER...");
964 if (syncWithMaster() == REDIS_OK
) {
965 redisLog(REDIS_NOTICE
,"MASTER <-> SLAVE sync succeeded");
971 static void createSharedObjects(void) {
972 shared
.crlf
= createObject(REDIS_STRING
,sdsnew("\r\n"));
973 shared
.ok
= createObject(REDIS_STRING
,sdsnew("+OK\r\n"));
974 shared
.err
= createObject(REDIS_STRING
,sdsnew("-ERR\r\n"));
975 shared
.emptybulk
= createObject(REDIS_STRING
,sdsnew("$0\r\n\r\n"));
976 shared
.czero
= createObject(REDIS_STRING
,sdsnew(":0\r\n"));
977 shared
.cone
= createObject(REDIS_STRING
,sdsnew(":1\r\n"));
978 shared
.nullbulk
= createObject(REDIS_STRING
,sdsnew("$-1\r\n"));
979 shared
.nullmultibulk
= createObject(REDIS_STRING
,sdsnew("*-1\r\n"));
980 shared
.emptymultibulk
= createObject(REDIS_STRING
,sdsnew("*0\r\n"));
982 shared
.pong
= createObject(REDIS_STRING
,sdsnew("+PONG\r\n"));
983 shared
.wrongtypeerr
= createObject(REDIS_STRING
,sdsnew(
984 "-ERR Operation against a key holding the wrong kind of value\r\n"));
985 shared
.nokeyerr
= createObject(REDIS_STRING
,sdsnew(
986 "-ERR no such key\r\n"));
987 shared
.syntaxerr
= createObject(REDIS_STRING
,sdsnew(
988 "-ERR syntax error\r\n"));
989 shared
.sameobjecterr
= createObject(REDIS_STRING
,sdsnew(
990 "-ERR source and destination objects are the same\r\n"));
991 shared
.outofrangeerr
= createObject(REDIS_STRING
,sdsnew(
992 "-ERR index out of range\r\n"));
993 shared
.space
= createObject(REDIS_STRING
,sdsnew(" "));
994 shared
.colon
= createObject(REDIS_STRING
,sdsnew(":"));
995 shared
.plus
= createObject(REDIS_STRING
,sdsnew("+"));
996 shared
.select0
= createStringObject("select 0\r\n",10);
997 shared
.select1
= createStringObject("select 1\r\n",10);
998 shared
.select2
= createStringObject("select 2\r\n",10);
999 shared
.select3
= createStringObject("select 3\r\n",10);
1000 shared
.select4
= createStringObject("select 4\r\n",10);
1001 shared
.select5
= createStringObject("select 5\r\n",10);
1002 shared
.select6
= createStringObject("select 6\r\n",10);
1003 shared
.select7
= createStringObject("select 7\r\n",10);
1004 shared
.select8
= createStringObject("select 8\r\n",10);
1005 shared
.select9
= createStringObject("select 9\r\n",10);
1008 static void appendServerSaveParams(time_t seconds
, int changes
) {
1009 server
.saveparams
= zrealloc(server
.saveparams
,sizeof(struct saveparam
)*(server
.saveparamslen
+1));
1010 server
.saveparams
[server
.saveparamslen
].seconds
= seconds
;
1011 server
.saveparams
[server
.saveparamslen
].changes
= changes
;
1012 server
.saveparamslen
++;
1015 static void ResetServerSaveParams() {
1016 zfree(server
.saveparams
);
1017 server
.saveparams
= NULL
;
1018 server
.saveparamslen
= 0;
1021 static void initServerConfig() {
1022 server
.dbnum
= REDIS_DEFAULT_DBNUM
;
1023 server
.port
= REDIS_SERVERPORT
;
1024 server
.verbosity
= REDIS_DEBUG
;
1025 server
.maxidletime
= REDIS_MAXIDLETIME
;
1026 server
.saveparams
= NULL
;
1027 server
.logfile
= NULL
; /* NULL = log on standard output */
1028 server
.bindaddr
= NULL
;
1029 server
.glueoutputbuf
= 1;
1030 server
.daemonize
= 0;
1031 server
.appendonly
= 0;
1032 server
.appendfd
= -1;
1033 server
.appendseldb
= -1; /* Make sure the first time will not match */
1034 server
.pidfile
= "/var/run/redis.pid";
1035 server
.dbfilename
= "dump.rdb";
1036 server
.appendfilename
= "appendonly.log";
1037 server
.requirepass
= NULL
;
1038 server
.shareobjects
= 0;
1039 server
.sharingpoolsize
= 1024;
1040 server
.maxclients
= 0;
1041 server
.maxmemory
= 0;
1042 ResetServerSaveParams();
1044 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1045 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1046 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1047 /* Replication related */
1049 server
.masterhost
= NULL
;
1050 server
.masterport
= 6379;
1051 server
.master
= NULL
;
1052 server
.replstate
= REDIS_REPL_NONE
;
1054 /* Double constants initialization */
1056 R_PosInf
= 1.0/R_Zero
;
1057 R_NegInf
= -1.0/R_Zero
;
1058 R_Nan
= R_Zero
/R_Zero
;
1061 static void initServer() {
1064 signal(SIGHUP
, SIG_IGN
);
1065 signal(SIGPIPE
, SIG_IGN
);
1066 setupSigSegvAction();
1068 server
.clients
= listCreate();
1069 server
.slaves
= listCreate();
1070 server
.monitors
= listCreate();
1071 server
.objfreelist
= listCreate();
1072 createSharedObjects();
1073 server
.el
= aeCreateEventLoop();
1074 server
.db
= zmalloc(sizeof(redisDb
)*server
.dbnum
);
1075 server
.sharingpool
= dictCreate(&setDictType
,NULL
);
1076 server
.fd
= anetTcpServer(server
.neterr
, server
.port
, server
.bindaddr
);
1077 if (server
.fd
== -1) {
1078 redisLog(REDIS_WARNING
, "Opening TCP port: %s", server
.neterr
);
1081 for (j
= 0; j
< server
.dbnum
; j
++) {
1082 server
.db
[j
].dict
= dictCreate(&hashDictType
,NULL
);
1083 server
.db
[j
].expires
= dictCreate(&setDictType
,NULL
);
1084 server
.db
[j
].id
= j
;
1086 server
.cronloops
= 0;
1087 server
.bgsaveinprogress
= 0;
1088 server
.bgsavechildpid
= -1;
1089 server
.lastsave
= time(NULL
);
1091 server
.usedmemory
= 0;
1092 server
.stat_numcommands
= 0;
1093 server
.stat_numconnections
= 0;
1094 server
.stat_starttime
= time(NULL
);
1095 aeCreateTimeEvent(server
.el
, 1000, serverCron
, NULL
, NULL
);
1097 if (server
.appendonly
) {
1098 server
.appendfd
= open(server
.appendfilename
,O_WRONLY
|O_APPEND
|O_CREAT
);
1099 if (server
.appendfd
== -1) {
1100 redisLog(REDIS_WARNING
, "Can't open the append-only file: %s",
1107 /* Empty the whole database */
1108 static long long emptyDb() {
1110 long long removed
= 0;
1112 for (j
= 0; j
< server
.dbnum
; j
++) {
1113 removed
+= dictSize(server
.db
[j
].dict
);
1114 dictEmpty(server
.db
[j
].dict
);
1115 dictEmpty(server
.db
[j
].expires
);
1120 static int yesnotoi(char *s
) {
1121 if (!strcasecmp(s
,"yes")) return 1;
1122 else if (!strcasecmp(s
,"no")) return 0;
1126 /* I agree, this is a very rudimental way to load a configuration...
1127 will improve later if the config gets more complex */
1128 static void loadServerConfig(char *filename
) {
1130 char buf
[REDIS_CONFIGLINE_MAX
+1], *err
= NULL
;
1134 if (filename
[0] == '-' && filename
[1] == '\0')
1137 if ((fp
= fopen(filename
,"r")) == NULL
) {
1138 redisLog(REDIS_WARNING
,"Fatal error, can't open config file");
1143 while(fgets(buf
,REDIS_CONFIGLINE_MAX
+1,fp
) != NULL
) {
1149 line
= sdstrim(line
," \t\r\n");
1151 /* Skip comments and blank lines*/
1152 if (line
[0] == '#' || line
[0] == '\0') {
1157 /* Split into arguments */
1158 argv
= sdssplitlen(line
,sdslen(line
)," ",1,&argc
);
1159 sdstolower(argv
[0]);
1161 /* Execute config directives */
1162 if (!strcasecmp(argv
[0],"timeout") && argc
== 2) {
1163 server
.maxidletime
= atoi(argv
[1]);
1164 if (server
.maxidletime
< 0) {
1165 err
= "Invalid timeout value"; goto loaderr
;
1167 } else if (!strcasecmp(argv
[0],"port") && argc
== 2) {
1168 server
.port
= atoi(argv
[1]);
1169 if (server
.port
< 1 || server
.port
> 65535) {
1170 err
= "Invalid port"; goto loaderr
;
1172 } else if (!strcasecmp(argv
[0],"bind") && argc
== 2) {
1173 server
.bindaddr
= zstrdup(argv
[1]);
1174 } else if (!strcasecmp(argv
[0],"save") && argc
== 3) {
1175 int seconds
= atoi(argv
[1]);
1176 int changes
= atoi(argv
[2]);
1177 if (seconds
< 1 || changes
< 0) {
1178 err
= "Invalid save parameters"; goto loaderr
;
1180 appendServerSaveParams(seconds
,changes
);
1181 } else if (!strcasecmp(argv
[0],"dir") && argc
== 2) {
1182 if (chdir(argv
[1]) == -1) {
1183 redisLog(REDIS_WARNING
,"Can't chdir to '%s': %s",
1184 argv
[1], strerror(errno
));
1187 } else if (!strcasecmp(argv
[0],"loglevel") && argc
== 2) {
1188 if (!strcasecmp(argv
[1],"debug")) server
.verbosity
= REDIS_DEBUG
;
1189 else if (!strcasecmp(argv
[1],"notice")) server
.verbosity
= REDIS_NOTICE
;
1190 else if (!strcasecmp(argv
[1],"warning")) server
.verbosity
= REDIS_WARNING
;
1192 err
= "Invalid log level. Must be one of debug, notice, warning";
1195 } else if (!strcasecmp(argv
[0],"logfile") && argc
== 2) {
1198 server
.logfile
= zstrdup(argv
[1]);
1199 if (!strcasecmp(server
.logfile
,"stdout")) {
1200 zfree(server
.logfile
);
1201 server
.logfile
= NULL
;
1203 if (server
.logfile
) {
1204 /* Test if we are able to open the file. The server will not
1205 * be able to abort just for this problem later... */
1206 logfp
= fopen(server
.logfile
,"a");
1207 if (logfp
== NULL
) {
1208 err
= sdscatprintf(sdsempty(),
1209 "Can't open the log file: %s", strerror(errno
));
1214 } else if (!strcasecmp(argv
[0],"databases") && argc
== 2) {
1215 server
.dbnum
= atoi(argv
[1]);
1216 if (server
.dbnum
< 1) {
1217 err
= "Invalid number of databases"; goto loaderr
;
1219 } else if (!strcasecmp(argv
[0],"maxclients") && argc
== 2) {
1220 server
.maxclients
= atoi(argv
[1]);
1221 } else if (!strcasecmp(argv
[0],"maxmemory") && argc
== 2) {
1222 server
.maxmemory
= strtoll(argv
[1], NULL
, 10);
1223 } else if (!strcasecmp(argv
[0],"slaveof") && argc
== 3) {
1224 server
.masterhost
= sdsnew(argv
[1]);
1225 server
.masterport
= atoi(argv
[2]);
1226 server
.replstate
= REDIS_REPL_CONNECT
;
1227 } else if (!strcasecmp(argv
[0],"glueoutputbuf") && argc
== 2) {
1228 if ((server
.glueoutputbuf
= yesnotoi(argv
[1])) == -1) {
1229 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1231 } else if (!strcasecmp(argv
[0],"shareobjects") && argc
== 2) {
1232 if ((server
.shareobjects
= yesnotoi(argv
[1])) == -1) {
1233 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1235 } else if (!strcasecmp(argv
[0],"shareobjectspoolsize") && argc
== 2) {
1236 server
.sharingpoolsize
= atoi(argv
[1]);
1237 if (server
.sharingpoolsize
< 1) {
1238 err
= "invalid object sharing pool size"; goto loaderr
;
1240 } else if (!strcasecmp(argv
[0],"daemonize") && argc
== 2) {
1241 if ((server
.daemonize
= yesnotoi(argv
[1])) == -1) {
1242 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1244 } else if (!strcasecmp(argv
[0],"appendonly") && argc
== 2) {
1245 if ((server
.appendonly
= yesnotoi(argv
[1])) == -1) {
1246 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1248 } else if (!strcasecmp(argv
[0],"requirepass") && argc
== 2) {
1249 server
.requirepass
= zstrdup(argv
[1]);
1250 } else if (!strcasecmp(argv
[0],"pidfile") && argc
== 2) {
1251 server
.pidfile
= zstrdup(argv
[1]);
1252 } else if (!strcasecmp(argv
[0],"dbfilename") && argc
== 2) {
1253 server
.dbfilename
= zstrdup(argv
[1]);
1255 err
= "Bad directive or wrong number of arguments"; goto loaderr
;
1257 for (j
= 0; j
< argc
; j
++)
1262 if (fp
!= stdin
) fclose(fp
);
1266 fprintf(stderr
, "\n*** FATAL CONFIG FILE ERROR ***\n");
1267 fprintf(stderr
, "Reading the configuration file, at line %d\n", linenum
);
1268 fprintf(stderr
, ">>> '%s'\n", line
);
1269 fprintf(stderr
, "%s\n", err
);
1273 static void freeClientArgv(redisClient
*c
) {
1276 for (j
= 0; j
< c
->argc
; j
++)
1277 decrRefCount(c
->argv
[j
]);
1278 for (j
= 0; j
< c
->mbargc
; j
++)
1279 decrRefCount(c
->mbargv
[j
]);
1284 static void freeClient(redisClient
*c
) {
1287 aeDeleteFileEvent(server
.el
,c
->fd
,AE_READABLE
);
1288 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1289 sdsfree(c
->querybuf
);
1290 listRelease(c
->reply
);
1293 ln
= listSearchKey(server
.clients
,c
);
1295 listDelNode(server
.clients
,ln
);
1296 if (c
->flags
& REDIS_SLAVE
) {
1297 if (c
->replstate
== REDIS_REPL_SEND_BULK
&& c
->repldbfd
!= -1)
1299 list
*l
= (c
->flags
& REDIS_MONITOR
) ? server
.monitors
: server
.slaves
;
1300 ln
= listSearchKey(l
,c
);
1304 if (c
->flags
& REDIS_MASTER
) {
1305 server
.master
= NULL
;
1306 server
.replstate
= REDIS_REPL_CONNECT
;
1313 static void glueReplyBuffersIfNeeded(redisClient
*c
) {
1318 listRewind(c
->reply
);
1319 while((ln
= listYield(c
->reply
))) {
1321 totlen
+= sdslen(o
->ptr
);
1322 /* This optimization makes more sense if we don't have to copy
1324 if (totlen
> 1024) return;
1330 listRewind(c
->reply
);
1331 while((ln
= listYield(c
->reply
))) {
1333 memcpy(buf
+copylen
,o
->ptr
,sdslen(o
->ptr
));
1334 copylen
+= sdslen(o
->ptr
);
1335 listDelNode(c
->reply
,ln
);
1337 /* Now the output buffer is empty, add the new single element */
1338 o
= createObject(REDIS_STRING
,sdsnewlen(buf
,totlen
));
1339 listAddNodeTail(c
->reply
,o
);
1343 static void sendReplyToClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1344 redisClient
*c
= privdata
;
1345 int nwritten
= 0, totwritten
= 0, objlen
;
1348 REDIS_NOTUSED(mask
);
1350 if (server
.glueoutputbuf
&& listLength(c
->reply
) > 1)
1351 glueReplyBuffersIfNeeded(c
);
1352 while(listLength(c
->reply
)) {
1353 o
= listNodeValue(listFirst(c
->reply
));
1354 objlen
= sdslen(o
->ptr
);
1357 listDelNode(c
->reply
,listFirst(c
->reply
));
1361 if (c
->flags
& REDIS_MASTER
) {
1362 /* Don't reply to a master */
1363 nwritten
= objlen
- c
->sentlen
;
1365 nwritten
= write(fd
, ((char*)o
->ptr
)+c
->sentlen
, objlen
- c
->sentlen
);
1366 if (nwritten
<= 0) break;
1368 c
->sentlen
+= nwritten
;
1369 totwritten
+= nwritten
;
1370 /* If we fully sent the object on head go to the next one */
1371 if (c
->sentlen
== objlen
) {
1372 listDelNode(c
->reply
,listFirst(c
->reply
));
1375 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1376 * bytes, in a single threaded server it's a good idea to server
1377 * other clients as well, even if a very large request comes from
1378 * super fast link that is always able to accept data (in real world
1379 * terms think to 'KEYS *' against the loopback interfae) */
1380 if (totwritten
> REDIS_MAX_WRITE_PER_EVENT
) break;
1382 if (nwritten
== -1) {
1383 if (errno
== EAGAIN
) {
1386 redisLog(REDIS_DEBUG
,
1387 "Error writing to client: %s", strerror(errno
));
1392 if (totwritten
> 0) c
->lastinteraction
= time(NULL
);
1393 if (listLength(c
->reply
) == 0) {
1395 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1399 static struct redisCommand
*lookupCommand(char *name
) {
1401 while(cmdTable
[j
].name
!= NULL
) {
1402 if (!strcasecmp(name
,cmdTable
[j
].name
)) return &cmdTable
[j
];
1408 /* resetClient prepare the client to process the next command */
1409 static void resetClient(redisClient
*c
) {
1415 /* If this function gets called we already read a whole
1416 * command, argments are in the client argv/argc fields.
1417 * processCommand() execute the command or prepare the
1418 * server for a bulk read from the client.
1420 * If 1 is returned the client is still alive and valid and
1421 * and other operations can be performed by the caller. Otherwise
1422 * if 0 is returned the client was destroied (i.e. after QUIT). */
1423 static int processCommand(redisClient
*c
) {
1424 struct redisCommand
*cmd
;
1427 /* Free some memory if needed (maxmemory setting) */
1428 if (server
.maxmemory
) freeMemoryIfNeeded();
1430 /* Handle the multi bulk command type. This is an alternative protocol
1431 * supported by Redis in order to receive commands that are composed of
1432 * multiple binary-safe "bulk" arguments. The latency of processing is
1433 * a bit higher but this allows things like multi-sets, so if this
1434 * protocol is used only for MSET and similar commands this is a big win. */
1435 if (c
->multibulk
== 0 && c
->argc
== 1 && ((char*)(c
->argv
[0]->ptr
))[0] == '*') {
1436 c
->multibulk
= atoi(((char*)c
->argv
[0]->ptr
)+1);
1437 if (c
->multibulk
<= 0) {
1441 decrRefCount(c
->argv
[c
->argc
-1]);
1445 } else if (c
->multibulk
) {
1446 if (c
->bulklen
== -1) {
1447 if (((char*)c
->argv
[0]->ptr
)[0] != '$') {
1448 addReplySds(c
,sdsnew("-ERR multi bulk protocol error\r\n"));
1452 int bulklen
= atoi(((char*)c
->argv
[0]->ptr
)+1);
1453 decrRefCount(c
->argv
[0]);
1454 if (bulklen
< 0 || bulklen
> 1024*1024*1024) {
1456 addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n"));
1461 c
->bulklen
= bulklen
+2; /* add two bytes for CR+LF */
1465 c
->mbargv
= zrealloc(c
->mbargv
,(sizeof(robj
*))*(c
->mbargc
+1));
1466 c
->mbargv
[c
->mbargc
] = c
->argv
[0];
1470 if (c
->multibulk
== 0) {
1474 /* Here we need to swap the multi-bulk argc/argv with the
1475 * normal argc/argv of the client structure. */
1477 c
->argv
= c
->mbargv
;
1478 c
->mbargv
= auxargv
;
1481 c
->argc
= c
->mbargc
;
1482 c
->mbargc
= auxargc
;
1484 /* We need to set bulklen to something different than -1
1485 * in order for the code below to process the command without
1486 * to try to read the last argument of a bulk command as
1487 * a special argument. */
1489 /* continue below and process the command */
1496 /* -- end of multi bulk commands processing -- */
1498 /* The QUIT command is handled as a special case. Normal command
1499 * procs are unable to close the client connection safely */
1500 if (!strcasecmp(c
->argv
[0]->ptr
,"quit")) {
1504 cmd
= lookupCommand(c
->argv
[0]->ptr
);
1506 addReplySds(c
,sdsnew("-ERR unknown command\r\n"));
1509 } else if ((cmd
->arity
> 0 && cmd
->arity
!= c
->argc
) ||
1510 (c
->argc
< -cmd
->arity
)) {
1511 addReplySds(c
,sdsnew("-ERR wrong number of arguments\r\n"));
1514 } else if (server
.maxmemory
&& cmd
->flags
& REDIS_CMD_DENYOOM
&& zmalloc_used_memory() > server
.maxmemory
) {
1515 addReplySds(c
,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1518 } else if (cmd
->flags
& REDIS_CMD_BULK
&& c
->bulklen
== -1) {
1519 int bulklen
= atoi(c
->argv
[c
->argc
-1]->ptr
);
1521 decrRefCount(c
->argv
[c
->argc
-1]);
1522 if (bulklen
< 0 || bulklen
> 1024*1024*1024) {
1524 addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n"));
1529 c
->bulklen
= bulklen
+2; /* add two bytes for CR+LF */
1530 /* It is possible that the bulk read is already in the
1531 * buffer. Check this condition and handle it accordingly.
1532 * This is just a fast path, alternative to call processInputBuffer().
1533 * It's a good idea since the code is small and this condition
1534 * happens most of the times. */
1535 if ((signed)sdslen(c
->querybuf
) >= c
->bulklen
) {
1536 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
1538 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
1543 /* Let's try to share objects on the command arguments vector */
1544 if (server
.shareobjects
) {
1546 for(j
= 1; j
< c
->argc
; j
++)
1547 c
->argv
[j
] = tryObjectSharing(c
->argv
[j
]);
1549 /* Let's try to encode the bulk object to save space. */
1550 if (cmd
->flags
& REDIS_CMD_BULK
)
1551 tryObjectEncoding(c
->argv
[c
->argc
-1]);
1553 /* Check if the user is authenticated */
1554 if (server
.requirepass
&& !c
->authenticated
&& cmd
->proc
!= authCommand
) {
1555 addReplySds(c
,sdsnew("-ERR operation not permitted\r\n"));
1560 /* Exec the command */
1561 dirty
= server
.dirty
;
1563 if (server
.appendonly
!= 0)
1564 feedAppendOnlyFile(cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1565 if (server
.dirty
-dirty
!= 0 && listLength(server
.slaves
))
1566 replicationFeedSlaves(server
.slaves
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1567 if (listLength(server
.monitors
))
1568 replicationFeedSlaves(server
.monitors
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1569 server
.stat_numcommands
++;
1571 /* Prepare the client for the next command */
1572 if (c
->flags
& REDIS_CLOSE
) {
1580 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
) {
1584 /* (args*2)+1 is enough room for args, spaces, newlines */
1585 robj
*static_outv
[REDIS_STATIC_ARGS
*2+1];
1587 if (argc
<= REDIS_STATIC_ARGS
) {
1590 outv
= zmalloc(sizeof(robj
*)*(argc
*2+1));
1593 for (j
= 0; j
< argc
; j
++) {
1594 if (j
!= 0) outv
[outc
++] = shared
.space
;
1595 if ((cmd
->flags
& REDIS_CMD_BULK
) && j
== argc
-1) {
1598 lenobj
= createObject(REDIS_STRING
,
1599 sdscatprintf(sdsempty(),"%d\r\n",
1600 stringObjectLen(argv
[j
])));
1601 lenobj
->refcount
= 0;
1602 outv
[outc
++] = lenobj
;
1604 outv
[outc
++] = argv
[j
];
1606 outv
[outc
++] = shared
.crlf
;
1608 /* Increment all the refcounts at start and decrement at end in order to
1609 * be sure to free objects if there is no slave in a replication state
1610 * able to be feed with commands */
1611 for (j
= 0; j
< outc
; j
++) incrRefCount(outv
[j
]);
1613 while((ln
= listYield(slaves
))) {
1614 redisClient
*slave
= ln
->value
;
1616 /* Don't feed slaves that are still waiting for BGSAVE to start */
1617 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) continue;
1619 /* Feed all the other slaves, MONITORs and so on */
1620 if (slave
->slaveseldb
!= dictid
) {
1624 case 0: selectcmd
= shared
.select0
; break;
1625 case 1: selectcmd
= shared
.select1
; break;
1626 case 2: selectcmd
= shared
.select2
; break;
1627 case 3: selectcmd
= shared
.select3
; break;
1628 case 4: selectcmd
= shared
.select4
; break;
1629 case 5: selectcmd
= shared
.select5
; break;
1630 case 6: selectcmd
= shared
.select6
; break;
1631 case 7: selectcmd
= shared
.select7
; break;
1632 case 8: selectcmd
= shared
.select8
; break;
1633 case 9: selectcmd
= shared
.select9
; break;
1635 selectcmd
= createObject(REDIS_STRING
,
1636 sdscatprintf(sdsempty(),"select %d\r\n",dictid
));
1637 selectcmd
->refcount
= 0;
1640 addReply(slave
,selectcmd
);
1641 slave
->slaveseldb
= dictid
;
1643 for (j
= 0; j
< outc
; j
++) addReply(slave
,outv
[j
]);
1645 for (j
= 0; j
< outc
; j
++) decrRefCount(outv
[j
]);
1646 if (outv
!= static_outv
) zfree(outv
);
1649 /* TODO: translate EXPIREs into EXPIRETOs */
1650 static void feedAppendOnlyFile(struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
) {
1651 sds buf
= sdsempty();
1655 /* The DB this command was targetting is not the same as the last command
1656 * we appendend. To issue a SELECT command is needed. */
1657 if (dictid
!= server
.appendseldb
) {
1660 snprintf(seldb
,sizeof(seldb
),"%d",dictid
);
1661 buf
= sdscatprintf(buf
,"*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
1662 strlen(seldb
),seldb
);
1663 server
.appendseldb
= dictid
;
1665 /* Append the actual command */
1666 buf
= sdscatprintf(buf
,"*%d\r\n",argc
);
1667 for (j
= 0; j
< argc
; j
++) {
1670 if (o
->encoding
!= REDIS_ENCODING_RAW
)
1671 o
= getDecodedObject(o
);
1672 buf
= sdscatprintf(buf
,"$%d\r\n",sdslen(o
->ptr
));
1673 buf
= sdscatlen(buf
,o
->ptr
,sdslen(o
->ptr
));
1674 buf
= sdscatlen(buf
,"\r\n",2);
1678 /* We want to perform a single write. This should be guaranteed atomic
1679 * at least if the filesystem we are writing is a real physical one.
1680 * While this will save us against the server being killed I don't think
1681 * there is much to do about the whole server stopping for power problems
1683 nwritten
= write(server
.appendfd
,buf
,sdslen(buf
));
1684 if (nwritten
!= (unsigned)sdslen(buf
)) {
1685 /* Ooops, we are in troubles. The best thing to do for now is
1686 * to simply exit instead to give the illusion that everything is
1687 * working as expected. */
1688 if (nwritten
== -1) {
1689 redisLog(REDIS_WARNING
,"Aborting on error writing to the append-only file: %s",strerror(errno
));
1691 redisLog(REDIS_WARNING
,"Aborting on short write while writing to the append-only file: %s",strerror(errno
));
1695 fsync(server
.appendfd
); /* Let's try to get this data on the disk */
1698 static void processInputBuffer(redisClient
*c
) {
1700 if (c
->bulklen
== -1) {
1701 /* Read the first line of the query */
1702 char *p
= strchr(c
->querybuf
,'\n');
1709 query
= c
->querybuf
;
1710 c
->querybuf
= sdsempty();
1711 querylen
= 1+(p
-(query
));
1712 if (sdslen(query
) > querylen
) {
1713 /* leave data after the first line of the query in the buffer */
1714 c
->querybuf
= sdscatlen(c
->querybuf
,query
+querylen
,sdslen(query
)-querylen
);
1716 *p
= '\0'; /* remove "\n" */
1717 if (*(p
-1) == '\r') *(p
-1) = '\0'; /* and "\r" if any */
1718 sdsupdatelen(query
);
1720 /* Now we can split the query in arguments */
1721 if (sdslen(query
) == 0) {
1722 /* Ignore empty query */
1726 argv
= sdssplitlen(query
,sdslen(query
)," ",1,&argc
);
1729 if (c
->argv
) zfree(c
->argv
);
1730 c
->argv
= zmalloc(sizeof(robj
*)*argc
);
1732 for (j
= 0; j
< argc
; j
++) {
1733 if (sdslen(argv
[j
])) {
1734 c
->argv
[c
->argc
] = createObject(REDIS_STRING
,argv
[j
]);
1741 /* Execute the command. If the client is still valid
1742 * after processCommand() return and there is something
1743 * on the query buffer try to process the next command. */
1744 if (c
->argc
&& processCommand(c
) && sdslen(c
->querybuf
)) goto again
;
1746 } else if (sdslen(c
->querybuf
) >= REDIS_REQUEST_MAX_SIZE
) {
1747 redisLog(REDIS_DEBUG
, "Client protocol error");
1752 /* Bulk read handling. Note that if we are at this point
1753 the client already sent a command terminated with a newline,
1754 we are reading the bulk data that is actually the last
1755 argument of the command. */
1756 int qbl
= sdslen(c
->querybuf
);
1758 if (c
->bulklen
<= qbl
) {
1759 /* Copy everything but the final CRLF as final argument */
1760 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
1762 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
1763 /* Process the command. If the client is still valid after
1764 * the processing and there is more data in the buffer
1765 * try to parse it. */
1766 if (processCommand(c
) && sdslen(c
->querybuf
)) goto again
;
1772 static void readQueryFromClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1773 redisClient
*c
= (redisClient
*) privdata
;
1774 char buf
[REDIS_IOBUF_LEN
];
1777 REDIS_NOTUSED(mask
);
1779 nread
= read(fd
, buf
, REDIS_IOBUF_LEN
);
1781 if (errno
== EAGAIN
) {
1784 redisLog(REDIS_DEBUG
, "Reading from client: %s",strerror(errno
));
1788 } else if (nread
== 0) {
1789 redisLog(REDIS_DEBUG
, "Client closed connection");
1794 c
->querybuf
= sdscatlen(c
->querybuf
, buf
, nread
);
1795 c
->lastinteraction
= time(NULL
);
1799 processInputBuffer(c
);
1802 static int selectDb(redisClient
*c
, int id
) {
1803 if (id
< 0 || id
>= server
.dbnum
)
1805 c
->db
= &server
.db
[id
];
1809 static void *dupClientReplyValue(void *o
) {
1810 incrRefCount((robj
*)o
);
1814 static redisClient
*createClient(int fd
) {
1815 redisClient
*c
= zmalloc(sizeof(*c
));
1817 anetNonBlock(NULL
,fd
);
1818 anetTcpNoDelay(NULL
,fd
);
1819 if (!c
) return NULL
;
1822 c
->querybuf
= sdsempty();
1831 c
->lastinteraction
= time(NULL
);
1832 c
->authenticated
= 0;
1833 c
->replstate
= REDIS_REPL_NONE
;
1834 c
->reply
= listCreate();
1835 listSetFreeMethod(c
->reply
,decrRefCount
);
1836 listSetDupMethod(c
->reply
,dupClientReplyValue
);
1837 if (aeCreateFileEvent(server
.el
, c
->fd
, AE_READABLE
,
1838 readQueryFromClient
, c
, NULL
) == AE_ERR
) {
1842 listAddNodeTail(server
.clients
,c
);
1846 static void addReply(redisClient
*c
, robj
*obj
) {
1847 if (listLength(c
->reply
) == 0 &&
1848 (c
->replstate
== REDIS_REPL_NONE
||
1849 c
->replstate
== REDIS_REPL_ONLINE
) &&
1850 aeCreateFileEvent(server
.el
, c
->fd
, AE_WRITABLE
,
1851 sendReplyToClient
, c
, NULL
) == AE_ERR
) return;
1852 if (obj
->encoding
!= REDIS_ENCODING_RAW
) {
1853 obj
= getDecodedObject(obj
);
1857 listAddNodeTail(c
->reply
,obj
);
1860 static void addReplySds(redisClient
*c
, sds s
) {
1861 robj
*o
= createObject(REDIS_STRING
,s
);
1866 static void addReplyBulkLen(redisClient
*c
, robj
*obj
) {
1869 if (obj
->encoding
== REDIS_ENCODING_RAW
) {
1870 len
= sdslen(obj
->ptr
);
1872 long n
= (long)obj
->ptr
;
1879 while((n
= n
/10) != 0) {
1883 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",len
));
1886 static void acceptHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1891 REDIS_NOTUSED(mask
);
1892 REDIS_NOTUSED(privdata
);
1894 cfd
= anetAccept(server
.neterr
, fd
, cip
, &cport
);
1895 if (cfd
== AE_ERR
) {
1896 redisLog(REDIS_DEBUG
,"Accepting client connection: %s", server
.neterr
);
1899 redisLog(REDIS_DEBUG
,"Accepted %s:%d", cip
, cport
);
1900 if ((c
= createClient(cfd
)) == NULL
) {
1901 redisLog(REDIS_WARNING
,"Error allocating resoures for the client");
1902 close(cfd
); /* May be already closed, just ingore errors */
1905 /* If maxclient directive is set and this is one client more... close the
1906 * connection. Note that we create the client instead to check before
1907 * for this condition, since now the socket is already set in nonblocking
1908 * mode and we can send an error for free using the Kernel I/O */
1909 if (server
.maxclients
&& listLength(server
.clients
) > server
.maxclients
) {
1910 char *err
= "-ERR max number of clients reached\r\n";
1912 /* That's a best effort error message, don't check write errors */
1913 (void) write(c
->fd
,err
,strlen(err
));
1917 server
.stat_numconnections
++;
1920 /* ======================= Redis objects implementation ===================== */
1922 static robj
*createObject(int type
, void *ptr
) {
1925 if (listLength(server
.objfreelist
)) {
1926 listNode
*head
= listFirst(server
.objfreelist
);
1927 o
= listNodeValue(head
);
1928 listDelNode(server
.objfreelist
,head
);
1930 o
= zmalloc(sizeof(*o
));
1933 o
->encoding
= REDIS_ENCODING_RAW
;
1939 static robj
*createStringObject(char *ptr
, size_t len
) {
1940 return createObject(REDIS_STRING
,sdsnewlen(ptr
,len
));
1943 static robj
*createListObject(void) {
1944 list
*l
= listCreate();
1946 listSetFreeMethod(l
,decrRefCount
);
1947 return createObject(REDIS_LIST
,l
);
1950 static robj
*createSetObject(void) {
1951 dict
*d
= dictCreate(&setDictType
,NULL
);
1952 return createObject(REDIS_SET
,d
);
1955 static robj
*createZsetObject(void) {
1956 zset
*zs
= zmalloc(sizeof(*zs
));
1958 zs
->dict
= dictCreate(&zsetDictType
,NULL
);
1959 zs
->zsl
= zslCreate();
1960 return createObject(REDIS_ZSET
,zs
);
1963 static void freeStringObject(robj
*o
) {
1964 if (o
->encoding
== REDIS_ENCODING_RAW
) {
1969 static void freeListObject(robj
*o
) {
1970 listRelease((list
*) o
->ptr
);
1973 static void freeSetObject(robj
*o
) {
1974 dictRelease((dict
*) o
->ptr
);
1977 static void freeZsetObject(robj
*o
) {
1980 dictRelease(zs
->dict
);
1985 static void freeHashObject(robj
*o
) {
1986 dictRelease((dict
*) o
->ptr
);
1989 static void incrRefCount(robj
*o
) {
1991 #ifdef DEBUG_REFCOUNT
1992 if (o
->type
== REDIS_STRING
)
1993 printf("Increment '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
);
1997 static void decrRefCount(void *obj
) {
2000 #ifdef DEBUG_REFCOUNT
2001 if (o
->type
== REDIS_STRING
)
2002 printf("Decrement '%s'(%p), now is: %d\n",o
->ptr
,o
,o
->refcount
-1);
2004 if (--(o
->refcount
) == 0) {
2006 case REDIS_STRING
: freeStringObject(o
); break;
2007 case REDIS_LIST
: freeListObject(o
); break;
2008 case REDIS_SET
: freeSetObject(o
); break;
2009 case REDIS_ZSET
: freeZsetObject(o
); break;
2010 case REDIS_HASH
: freeHashObject(o
); break;
2011 default: assert(0 != 0); break;
2013 if (listLength(server
.objfreelist
) > REDIS_OBJFREELIST_MAX
||
2014 !listAddNodeHead(server
.objfreelist
,o
))
2019 static robj
*lookupKey(redisDb
*db
, robj
*key
) {
2020 dictEntry
*de
= dictFind(db
->dict
,key
);
2021 return de
? dictGetEntryVal(de
) : NULL
;
2024 static robj
*lookupKeyRead(redisDb
*db
, robj
*key
) {
2025 expireIfNeeded(db
,key
);
2026 return lookupKey(db
,key
);
2029 static robj
*lookupKeyWrite(redisDb
*db
, robj
*key
) {
2030 deleteIfVolatile(db
,key
);
2031 return lookupKey(db
,key
);
2034 static int deleteKey(redisDb
*db
, robj
*key
) {
2037 /* We need to protect key from destruction: after the first dictDelete()
2038 * it may happen that 'key' is no longer valid if we don't increment
2039 * it's count. This may happen when we get the object reference directly
2040 * from the hash table with dictRandomKey() or dict iterators */
2042 if (dictSize(db
->expires
)) dictDelete(db
->expires
,key
);
2043 retval
= dictDelete(db
->dict
,key
);
2046 return retval
== DICT_OK
;
2049 /* Try to share an object against the shared objects pool */
2050 static robj
*tryObjectSharing(robj
*o
) {
2051 struct dictEntry
*de
;
2054 if (o
== NULL
|| server
.shareobjects
== 0) return o
;
2056 assert(o
->type
== REDIS_STRING
);
2057 de
= dictFind(server
.sharingpool
,o
);
2059 robj
*shared
= dictGetEntryKey(de
);
2061 c
= ((unsigned long) dictGetEntryVal(de
))+1;
2062 dictGetEntryVal(de
) = (void*) c
;
2063 incrRefCount(shared
);
2067 /* Here we are using a stream algorihtm: Every time an object is
2068 * shared we increment its count, everytime there is a miss we
2069 * recrement the counter of a random object. If this object reaches
2070 * zero we remove the object and put the current object instead. */
2071 if (dictSize(server
.sharingpool
) >=
2072 server
.sharingpoolsize
) {
2073 de
= dictGetRandomKey(server
.sharingpool
);
2075 c
= ((unsigned long) dictGetEntryVal(de
))-1;
2076 dictGetEntryVal(de
) = (void*) c
;
2078 dictDelete(server
.sharingpool
,de
->key
);
2081 c
= 0; /* If the pool is empty we want to add this object */
2086 retval
= dictAdd(server
.sharingpool
,o
,(void*)1);
2087 assert(retval
== DICT_OK
);
2094 /* Check if the nul-terminated string 's' can be represented by a long
2095 * (that is, is a number that fits into long without any other space or
2096 * character before or after the digits).
2098 * If so, the function returns REDIS_OK and *longval is set to the value
2099 * of the number. Otherwise REDIS_ERR is returned */
2100 static int isStringRepresentableAsLong(sds s
, long *longval
) {
2101 char buf
[32], *endptr
;
2105 value
= strtol(s
, &endptr
, 10);
2106 if (endptr
[0] != '\0') return REDIS_ERR
;
2107 slen
= snprintf(buf
,32,"%ld",value
);
2109 /* If the number converted back into a string is not identical
2110 * then it's not possible to encode the string as integer */
2111 if (sdslen(s
) != (unsigned)slen
|| memcmp(buf
,s
,slen
)) return REDIS_ERR
;
2112 if (longval
) *longval
= value
;
2116 /* Try to encode a string object in order to save space */
2117 static int tryObjectEncoding(robj
*o
) {
2121 if (o
->encoding
!= REDIS_ENCODING_RAW
)
2122 return REDIS_ERR
; /* Already encoded */
2124 /* It's not save to encode shared objects: shared objects can be shared
2125 * everywhere in the "object space" of Redis. Encoded objects can only
2126 * appear as "values" (and not, for instance, as keys) */
2127 if (o
->refcount
> 1) return REDIS_ERR
;
2129 /* Currently we try to encode only strings */
2130 assert(o
->type
== REDIS_STRING
);
2132 /* Check if we can represent this string as a long integer */
2133 if (isStringRepresentableAsLong(s
,&value
) == REDIS_ERR
) return REDIS_ERR
;
2135 /* Ok, this object can be encoded */
2136 o
->encoding
= REDIS_ENCODING_INT
;
2138 o
->ptr
= (void*) value
;
2142 /* Get a decoded version of an encoded object (returned as a new object) */
2143 static robj
*getDecodedObject(const robj
*o
) {
2146 assert(o
->encoding
!= REDIS_ENCODING_RAW
);
2147 if (o
->type
== REDIS_STRING
&& o
->encoding
== REDIS_ENCODING_INT
) {
2150 snprintf(buf
,32,"%ld",(long)o
->ptr
);
2151 dec
= createStringObject(buf
,strlen(buf
));
2158 /* Compare two string objects via strcmp() or alike.
2159 * Note that the objects may be integer-encoded. In such a case we
2160 * use snprintf() to get a string representation of the numbers on the stack
2161 * and compare the strings, it's much faster than calling getDecodedObject(). */
2162 static int compareStringObjects(robj
*a
, robj
*b
) {
2163 assert(a
->type
== REDIS_STRING
&& b
->type
== REDIS_STRING
);
2164 char bufa
[128], bufb
[128], *astr
, *bstr
;
2167 if (a
== b
) return 0;
2168 if (a
->encoding
!= REDIS_ENCODING_RAW
) {
2169 snprintf(bufa
,sizeof(bufa
),"%ld",(long) a
->ptr
);
2175 if (b
->encoding
!= REDIS_ENCODING_RAW
) {
2176 snprintf(bufb
,sizeof(bufb
),"%ld",(long) b
->ptr
);
2182 return bothsds
? sdscmp(astr
,bstr
) : strcmp(astr
,bstr
);
2185 static size_t stringObjectLen(robj
*o
) {
2186 assert(o
->type
== REDIS_STRING
);
2187 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2188 return sdslen(o
->ptr
);
2192 return snprintf(buf
,32,"%ld",(long)o
->ptr
);
2196 /*============================ DB saving/loading ============================ */
2198 static int rdbSaveType(FILE *fp
, unsigned char type
) {
2199 if (fwrite(&type
,1,1,fp
) == 0) return -1;
2203 static int rdbSaveTime(FILE *fp
, time_t t
) {
2204 int32_t t32
= (int32_t) t
;
2205 if (fwrite(&t32
,4,1,fp
) == 0) return -1;
2209 /* check rdbLoadLen() comments for more info */
2210 static int rdbSaveLen(FILE *fp
, uint32_t len
) {
2211 unsigned char buf
[2];
2214 /* Save a 6 bit len */
2215 buf
[0] = (len
&0xFF)|(REDIS_RDB_6BITLEN
<<6);
2216 if (fwrite(buf
,1,1,fp
) == 0) return -1;
2217 } else if (len
< (1<<14)) {
2218 /* Save a 14 bit len */
2219 buf
[0] = ((len
>>8)&0xFF)|(REDIS_RDB_14BITLEN
<<6);
2221 if (fwrite(buf
,2,1,fp
) == 0) return -1;
2223 /* Save a 32 bit len */
2224 buf
[0] = (REDIS_RDB_32BITLEN
<<6);
2225 if (fwrite(buf
,1,1,fp
) == 0) return -1;
2227 if (fwrite(&len
,4,1,fp
) == 0) return -1;
2232 /* String objects in the form "2391" "-100" without any space and with a
2233 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2234 * encoded as integers to save space */
2235 static int rdbTryIntegerEncoding(sds s
, unsigned char *enc
) {
2237 char *endptr
, buf
[32];
2239 /* Check if it's possible to encode this value as a number */
2240 value
= strtoll(s
, &endptr
, 10);
2241 if (endptr
[0] != '\0') return 0;
2242 snprintf(buf
,32,"%lld",value
);
2244 /* If the number converted back into a string is not identical
2245 * then it's not possible to encode the string as integer */
2246 if (strlen(buf
) != sdslen(s
) || memcmp(buf
,s
,sdslen(s
))) return 0;
2248 /* Finally check if it fits in our ranges */
2249 if (value
>= -(1<<7) && value
<= (1<<7)-1) {
2250 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT8
;
2251 enc
[1] = value
&0xFF;
2253 } else if (value
>= -(1<<15) && value
<= (1<<15)-1) {
2254 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT16
;
2255 enc
[1] = value
&0xFF;
2256 enc
[2] = (value
>>8)&0xFF;
2258 } else if (value
>= -((long long)1<<31) && value
<= ((long long)1<<31)-1) {
2259 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT32
;
2260 enc
[1] = value
&0xFF;
2261 enc
[2] = (value
>>8)&0xFF;
2262 enc
[3] = (value
>>16)&0xFF;
2263 enc
[4] = (value
>>24)&0xFF;
2270 static int rdbSaveLzfStringObject(FILE *fp
, robj
*obj
) {
2271 unsigned int comprlen
, outlen
;
2275 /* We require at least four bytes compression for this to be worth it */
2276 outlen
= sdslen(obj
->ptr
)-4;
2277 if (outlen
<= 0) return 0;
2278 if ((out
= zmalloc(outlen
+1)) == NULL
) return 0;
2279 comprlen
= lzf_compress(obj
->ptr
, sdslen(obj
->ptr
), out
, outlen
);
2280 if (comprlen
== 0) {
2284 /* Data compressed! Let's save it on disk */
2285 byte
= (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_LZF
;
2286 if (fwrite(&byte
,1,1,fp
) == 0) goto writeerr
;
2287 if (rdbSaveLen(fp
,comprlen
) == -1) goto writeerr
;
2288 if (rdbSaveLen(fp
,sdslen(obj
->ptr
)) == -1) goto writeerr
;
2289 if (fwrite(out
,comprlen
,1,fp
) == 0) goto writeerr
;
2298 /* Save a string objet as [len][data] on disk. If the object is a string
2299 * representation of an integer value we try to safe it in a special form */
2300 static int rdbSaveStringObjectRaw(FILE *fp
, robj
*obj
) {
2304 len
= sdslen(obj
->ptr
);
2306 /* Try integer encoding */
2308 unsigned char buf
[5];
2309 if ((enclen
= rdbTryIntegerEncoding(obj
->ptr
,buf
)) > 0) {
2310 if (fwrite(buf
,enclen
,1,fp
) == 0) return -1;
2315 /* Try LZF compression - under 20 bytes it's unable to compress even
2316 * aaaaaaaaaaaaaaaaaa so skip it */
2320 retval
= rdbSaveLzfStringObject(fp
,obj
);
2321 if (retval
== -1) return -1;
2322 if (retval
> 0) return 0;
2323 /* retval == 0 means data can't be compressed, save the old way */
2326 /* Store verbatim */
2327 if (rdbSaveLen(fp
,len
) == -1) return -1;
2328 if (len
&& fwrite(obj
->ptr
,len
,1,fp
) == 0) return -1;
2332 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2333 static int rdbSaveStringObject(FILE *fp
, robj
*obj
) {
2337 if (obj
->encoding
!= REDIS_ENCODING_RAW
) {
2338 dec
= getDecodedObject(obj
);
2339 retval
= rdbSaveStringObjectRaw(fp
,dec
);
2343 return rdbSaveStringObjectRaw(fp
,obj
);
2347 /* Save a double value. Doubles are saved as strings prefixed by an unsigned
2348 * 8 bit integer specifing the length of the representation.
2349 * This 8 bit integer has special values in order to specify the following
2355 static int rdbSaveDoubleValue(FILE *fp
, double val
) {
2356 unsigned char buf
[128];
2362 } else if (!isfinite(val
)) {
2364 buf
[0] = (val
< 0) ? 255 : 254;
2366 snprintf((char*)buf
+1,sizeof(buf
)-1,"%.16g",val
);
2367 buf
[0] = strlen((char*)buf
);
2370 if (fwrite(buf
,len
,1,fp
) == 0) return -1;
2374 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2375 static int rdbSave(char *filename
) {
2376 dictIterator
*di
= NULL
;
2381 time_t now
= time(NULL
);
2383 snprintf(tmpfile
,256,"temp-%d.rdb", (int) getpid());
2384 fp
= fopen(tmpfile
,"w");
2386 redisLog(REDIS_WARNING
, "Failed saving the DB: %s", strerror(errno
));
2389 if (fwrite("REDIS0001",9,1,fp
) == 0) goto werr
;
2390 for (j
= 0; j
< server
.dbnum
; j
++) {
2391 redisDb
*db
= server
.db
+j
;
2393 if (dictSize(d
) == 0) continue;
2394 di
= dictGetIterator(d
);
2400 /* Write the SELECT DB opcode */
2401 if (rdbSaveType(fp
,REDIS_SELECTDB
) == -1) goto werr
;
2402 if (rdbSaveLen(fp
,j
) == -1) goto werr
;
2404 /* Iterate this DB writing every entry */
2405 while((de
= dictNext(di
)) != NULL
) {
2406 robj
*key
= dictGetEntryKey(de
);
2407 robj
*o
= dictGetEntryVal(de
);
2408 time_t expiretime
= getExpire(db
,key
);
2410 /* Save the expire time */
2411 if (expiretime
!= -1) {
2412 /* If this key is already expired skip it */
2413 if (expiretime
< now
) continue;
2414 if (rdbSaveType(fp
,REDIS_EXPIRETIME
) == -1) goto werr
;
2415 if (rdbSaveTime(fp
,expiretime
) == -1) goto werr
;
2417 /* Save the key and associated value */
2418 if (rdbSaveType(fp
,o
->type
) == -1) goto werr
;
2419 if (rdbSaveStringObject(fp
,key
) == -1) goto werr
;
2420 if (o
->type
== REDIS_STRING
) {
2421 /* Save a string value */
2422 if (rdbSaveStringObject(fp
,o
) == -1) goto werr
;
2423 } else if (o
->type
== REDIS_LIST
) {
2424 /* Save a list value */
2425 list
*list
= o
->ptr
;
2429 if (rdbSaveLen(fp
,listLength(list
)) == -1) goto werr
;
2430 while((ln
= listYield(list
))) {
2431 robj
*eleobj
= listNodeValue(ln
);
2433 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
2435 } else if (o
->type
== REDIS_SET
) {
2436 /* Save a set value */
2438 dictIterator
*di
= dictGetIterator(set
);
2441 if (rdbSaveLen(fp
,dictSize(set
)) == -1) goto werr
;
2442 while((de
= dictNext(di
)) != NULL
) {
2443 robj
*eleobj
= dictGetEntryKey(de
);
2445 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
2447 dictReleaseIterator(di
);
2448 } else if (o
->type
== REDIS_ZSET
) {
2449 /* Save a set value */
2451 dictIterator
*di
= dictGetIterator(zs
->dict
);
2454 if (rdbSaveLen(fp
,dictSize(zs
->dict
)) == -1) goto werr
;
2455 while((de
= dictNext(di
)) != NULL
) {
2456 robj
*eleobj
= dictGetEntryKey(de
);
2457 double *score
= dictGetEntryVal(de
);
2459 if (rdbSaveStringObject(fp
,eleobj
) == -1) goto werr
;
2460 if (rdbSaveDoubleValue(fp
,*score
) == -1) goto werr
;
2462 dictReleaseIterator(di
);
2467 dictReleaseIterator(di
);
2470 if (rdbSaveType(fp
,REDIS_EOF
) == -1) goto werr
;
2472 /* Make sure data will not remain on the OS's output buffers */
2477 /* Use RENAME to make sure the DB file is changed atomically only
2478 * if the generate DB file is ok. */
2479 if (rename(tmpfile
,filename
) == -1) {
2480 redisLog(REDIS_WARNING
,"Error moving temp DB file on the final destination: %s", strerror(errno
));
2484 redisLog(REDIS_NOTICE
,"DB saved on disk");
2486 server
.lastsave
= time(NULL
);
2492 redisLog(REDIS_WARNING
,"Write error saving DB on disk: %s", strerror(errno
));
2493 if (di
) dictReleaseIterator(di
);
2497 static int rdbSaveBackground(char *filename
) {
2500 if (server
.bgsaveinprogress
) return REDIS_ERR
;
2501 if ((childpid
= fork()) == 0) {
2504 if (rdbSave(filename
) == REDIS_OK
) {
2511 if (childpid
== -1) {
2512 redisLog(REDIS_WARNING
,"Can't save in background: fork: %s",
2516 redisLog(REDIS_NOTICE
,"Background saving started by pid %d",childpid
);
2517 server
.bgsaveinprogress
= 1;
2518 server
.bgsavechildpid
= childpid
;
2521 return REDIS_OK
; /* unreached */
2524 static void rdbRemoveTempFile(pid_t childpid
) {
2527 snprintf(tmpfile
,256,"temp-%d.rdb", (int) childpid
);
2531 static int rdbLoadType(FILE *fp
) {
2533 if (fread(&type
,1,1,fp
) == 0) return -1;
2537 static time_t rdbLoadTime(FILE *fp
) {
2539 if (fread(&t32
,4,1,fp
) == 0) return -1;
2540 return (time_t) t32
;
2543 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2544 * of this file for a description of how this are stored on disk.
2546 * isencoded is set to 1 if the readed length is not actually a length but
2547 * an "encoding type", check the above comments for more info */
2548 static uint32_t rdbLoadLen(FILE *fp
, int rdbver
, int *isencoded
) {
2549 unsigned char buf
[2];
2552 if (isencoded
) *isencoded
= 0;
2554 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
2559 if (fread(buf
,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2560 type
= (buf
[0]&0xC0)>>6;
2561 if (type
== REDIS_RDB_6BITLEN
) {
2562 /* Read a 6 bit len */
2564 } else if (type
== REDIS_RDB_ENCVAL
) {
2565 /* Read a 6 bit len encoding type */
2566 if (isencoded
) *isencoded
= 1;
2568 } else if (type
== REDIS_RDB_14BITLEN
) {
2569 /* Read a 14 bit len */
2570 if (fread(buf
+1,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
2571 return ((buf
[0]&0x3F)<<8)|buf
[1];
2573 /* Read a 32 bit len */
2574 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
2580 static robj
*rdbLoadIntegerObject(FILE *fp
, int enctype
) {
2581 unsigned char enc
[4];
2584 if (enctype
== REDIS_RDB_ENC_INT8
) {
2585 if (fread(enc
,1,1,fp
) == 0) return NULL
;
2586 val
= (signed char)enc
[0];
2587 } else if (enctype
== REDIS_RDB_ENC_INT16
) {
2589 if (fread(enc
,2,1,fp
) == 0) return NULL
;
2590 v
= enc
[0]|(enc
[1]<<8);
2592 } else if (enctype
== REDIS_RDB_ENC_INT32
) {
2594 if (fread(enc
,4,1,fp
) == 0) return NULL
;
2595 v
= enc
[0]|(enc
[1]<<8)|(enc
[2]<<16)|(enc
[3]<<24);
2598 val
= 0; /* anti-warning */
2601 return createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",val
));
2604 static robj
*rdbLoadLzfStringObject(FILE*fp
, int rdbver
) {
2605 unsigned int len
, clen
;
2606 unsigned char *c
= NULL
;
2609 if ((clen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2610 if ((len
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
2611 if ((c
= zmalloc(clen
)) == NULL
) goto err
;
2612 if ((val
= sdsnewlen(NULL
,len
)) == NULL
) goto err
;
2613 if (fread(c
,clen
,1,fp
) == 0) goto err
;
2614 if (lzf_decompress(c
,clen
,val
,len
) == 0) goto err
;
2616 return createObject(REDIS_STRING
,val
);
2623 static robj
*rdbLoadStringObject(FILE*fp
, int rdbver
) {
2628 len
= rdbLoadLen(fp
,rdbver
,&isencoded
);
2631 case REDIS_RDB_ENC_INT8
:
2632 case REDIS_RDB_ENC_INT16
:
2633 case REDIS_RDB_ENC_INT32
:
2634 return tryObjectSharing(rdbLoadIntegerObject(fp
,len
));
2635 case REDIS_RDB_ENC_LZF
:
2636 return tryObjectSharing(rdbLoadLzfStringObject(fp
,rdbver
));
2642 if (len
== REDIS_RDB_LENERR
) return NULL
;
2643 val
= sdsnewlen(NULL
,len
);
2644 if (len
&& fread(val
,len
,1,fp
) == 0) {
2648 return tryObjectSharing(createObject(REDIS_STRING
,val
));
2651 /* For information about double serialization check rdbSaveDoubleValue() */
2652 static int rdbLoadDoubleValue(FILE *fp
, double *val
) {
2656 if (fread(&len
,1,1,fp
) == 0) return -1;
2658 case 255: *val
= R_NegInf
; return 0;
2659 case 254: *val
= R_PosInf
; return 0;
2660 case 253: *val
= R_Nan
; return 0;
2662 if (fread(buf
,len
,1,fp
) == 0) return -1;
2663 sscanf(buf
, "%lg", val
);
2668 static int rdbLoad(char *filename
) {
2670 robj
*keyobj
= NULL
;
2672 int type
, retval
, rdbver
;
2673 dict
*d
= server
.db
[0].dict
;
2674 redisDb
*db
= server
.db
+0;
2676 time_t expiretime
= -1, now
= time(NULL
);
2678 fp
= fopen(filename
,"r");
2679 if (!fp
) return REDIS_ERR
;
2680 if (fread(buf
,9,1,fp
) == 0) goto eoferr
;
2682 if (memcmp(buf
,"REDIS",5) != 0) {
2684 redisLog(REDIS_WARNING
,"Wrong signature trying to load DB from file");
2687 rdbver
= atoi(buf
+5);
2690 redisLog(REDIS_WARNING
,"Can't handle RDB format version %d",rdbver
);
2697 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
2698 if (type
== REDIS_EXPIRETIME
) {
2699 if ((expiretime
= rdbLoadTime(fp
)) == -1) goto eoferr
;
2700 /* We read the time so we need to read the object type again */
2701 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
2703 if (type
== REDIS_EOF
) break;
2704 /* Handle SELECT DB opcode as a special case */
2705 if (type
== REDIS_SELECTDB
) {
2706 if ((dbid
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2708 if (dbid
>= (unsigned)server
.dbnum
) {
2709 redisLog(REDIS_WARNING
,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server
.dbnum
);
2712 db
= server
.db
+dbid
;
2717 if ((keyobj
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2719 if (type
== REDIS_STRING
) {
2720 /* Read string value */
2721 if ((o
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2722 tryObjectEncoding(o
);
2723 } else if (type
== REDIS_LIST
|| type
== REDIS_SET
) {
2724 /* Read list/set value */
2727 if ((listlen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2729 o
= (type
== REDIS_LIST
) ? createListObject() : createSetObject();
2730 /* Load every single element of the list/set */
2734 if ((ele
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2735 tryObjectEncoding(ele
);
2736 if (type
== REDIS_LIST
) {
2737 listAddNodeTail((list
*)o
->ptr
,ele
);
2739 dictAdd((dict
*)o
->ptr
,ele
,NULL
);
2742 } else if (type
== REDIS_ZSET
) {
2743 /* Read list/set value */
2747 if ((zsetlen
= rdbLoadLen(fp
,rdbver
,NULL
)) == REDIS_RDB_LENERR
)
2749 o
= createZsetObject();
2751 /* Load every single element of the list/set */
2754 double *score
= zmalloc(sizeof(double));
2756 if ((ele
= rdbLoadStringObject(fp
,rdbver
)) == NULL
) goto eoferr
;
2757 tryObjectEncoding(ele
);
2758 if (rdbLoadDoubleValue(fp
,score
) == -1) goto eoferr
;
2759 dictAdd(zs
->dict
,ele
,score
);
2760 zslInsert(zs
->zsl
,*score
,ele
);
2761 incrRefCount(ele
); /* added to skiplist */
2766 /* Add the new object in the hash table */
2767 retval
= dictAdd(d
,keyobj
,o
);
2768 if (retval
== DICT_ERR
) {
2769 redisLog(REDIS_WARNING
,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj
->ptr
);
2772 /* Set the expire time if needed */
2773 if (expiretime
!= -1) {
2774 setExpire(db
,keyobj
,expiretime
);
2775 /* Delete this key if already expired */
2776 if (expiretime
< now
) deleteKey(db
,keyobj
);
2784 eoferr
: /* unexpected end of file is handled here with a fatal exit */
2785 if (keyobj
) decrRefCount(keyobj
);
2786 redisLog(REDIS_WARNING
,"Short read or OOM loading DB. Unrecoverable error, exiting now.");
2788 return REDIS_ERR
; /* Just to avoid warning */
2791 /*================================== Commands =============================== */
2793 static void authCommand(redisClient
*c
) {
2794 if (!server
.requirepass
|| !strcmp(c
->argv
[1]->ptr
, server
.requirepass
)) {
2795 c
->authenticated
= 1;
2796 addReply(c
,shared
.ok
);
2798 c
->authenticated
= 0;
2799 addReplySds(c
,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
2803 static void pingCommand(redisClient
*c
) {
2804 addReply(c
,shared
.pong
);
2807 static void echoCommand(redisClient
*c
) {
2808 addReplyBulkLen(c
,c
->argv
[1]);
2809 addReply(c
,c
->argv
[1]);
2810 addReply(c
,shared
.crlf
);
2813 /*=================================== Strings =============================== */
2815 static void setGenericCommand(redisClient
*c
, int nx
) {
2818 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2819 if (retval
== DICT_ERR
) {
2821 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2822 incrRefCount(c
->argv
[2]);
2824 addReply(c
,shared
.czero
);
2828 incrRefCount(c
->argv
[1]);
2829 incrRefCount(c
->argv
[2]);
2832 removeExpire(c
->db
,c
->argv
[1]);
2833 addReply(c
, nx
? shared
.cone
: shared
.ok
);
2836 static void setCommand(redisClient
*c
) {
2837 setGenericCommand(c
,0);
2840 static void setnxCommand(redisClient
*c
) {
2841 setGenericCommand(c
,1);
2844 static void getCommand(redisClient
*c
) {
2845 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[1]);
2848 addReply(c
,shared
.nullbulk
);
2850 if (o
->type
!= REDIS_STRING
) {
2851 addReply(c
,shared
.wrongtypeerr
);
2853 addReplyBulkLen(c
,o
);
2855 addReply(c
,shared
.crlf
);
2860 static void getsetCommand(redisClient
*c
) {
2862 if (dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]) == DICT_ERR
) {
2863 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
2865 incrRefCount(c
->argv
[1]);
2867 incrRefCount(c
->argv
[2]);
2869 removeExpire(c
->db
,c
->argv
[1]);
2872 static void mgetCommand(redisClient
*c
) {
2875 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",c
->argc
-1));
2876 for (j
= 1; j
< c
->argc
; j
++) {
2877 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[j
]);
2879 addReply(c
,shared
.nullbulk
);
2881 if (o
->type
!= REDIS_STRING
) {
2882 addReply(c
,shared
.nullbulk
);
2884 addReplyBulkLen(c
,o
);
2886 addReply(c
,shared
.crlf
);
2892 static void incrDecrCommand(redisClient
*c
, long long incr
) {
2897 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
2901 if (o
->type
!= REDIS_STRING
) {
2906 if (o
->encoding
== REDIS_ENCODING_RAW
)
2907 value
= strtoll(o
->ptr
, &eptr
, 10);
2908 else if (o
->encoding
== REDIS_ENCODING_INT
)
2909 value
= (long)o
->ptr
;
2916 o
= createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",value
));
2917 tryObjectEncoding(o
);
2918 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],o
);
2919 if (retval
== DICT_ERR
) {
2920 dictReplace(c
->db
->dict
,c
->argv
[1],o
);
2921 removeExpire(c
->db
,c
->argv
[1]);
2923 incrRefCount(c
->argv
[1]);
2926 addReply(c
,shared
.colon
);
2928 addReply(c
,shared
.crlf
);
2931 static void incrCommand(redisClient
*c
) {
2932 incrDecrCommand(c
,1);
2935 static void decrCommand(redisClient
*c
) {
2936 incrDecrCommand(c
,-1);
2939 static void incrbyCommand(redisClient
*c
) {
2940 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
2941 incrDecrCommand(c
,incr
);
2944 static void decrbyCommand(redisClient
*c
) {
2945 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
2946 incrDecrCommand(c
,-incr
);
2949 /* ========================= Type agnostic commands ========================= */
2951 static void delCommand(redisClient
*c
) {
2954 for (j
= 1; j
< c
->argc
; j
++) {
2955 if (deleteKey(c
->db
,c
->argv
[j
])) {
2962 addReply(c
,shared
.czero
);
2965 addReply(c
,shared
.cone
);
2968 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",deleted
));
2973 static void existsCommand(redisClient
*c
) {
2974 addReply(c
,lookupKeyRead(c
->db
,c
->argv
[1]) ? shared
.cone
: shared
.czero
);
2977 static void selectCommand(redisClient
*c
) {
2978 int id
= atoi(c
->argv
[1]->ptr
);
2980 if (selectDb(c
,id
) == REDIS_ERR
) {
2981 addReplySds(c
,sdsnew("-ERR invalid DB index\r\n"));
2983 addReply(c
,shared
.ok
);
2987 static void randomkeyCommand(redisClient
*c
) {
2991 de
= dictGetRandomKey(c
->db
->dict
);
2992 if (!de
|| expireIfNeeded(c
->db
,dictGetEntryKey(de
)) == 0) break;
2995 addReply(c
,shared
.plus
);
2996 addReply(c
,shared
.crlf
);
2998 addReply(c
,shared
.plus
);
2999 addReply(c
,dictGetEntryKey(de
));
3000 addReply(c
,shared
.crlf
);
3004 static void keysCommand(redisClient
*c
) {
3007 sds pattern
= c
->argv
[1]->ptr
;
3008 int plen
= sdslen(pattern
);
3009 int numkeys
= 0, keyslen
= 0;
3010 robj
*lenobj
= createObject(REDIS_STRING
,NULL
);
3012 di
= dictGetIterator(c
->db
->dict
);
3014 decrRefCount(lenobj
);
3015 while((de
= dictNext(di
)) != NULL
) {
3016 robj
*keyobj
= dictGetEntryKey(de
);
3018 sds key
= keyobj
->ptr
;
3019 if ((pattern
[0] == '*' && pattern
[1] == '\0') ||
3020 stringmatchlen(pattern
,plen
,key
,sdslen(key
),0)) {
3021 if (expireIfNeeded(c
->db
,keyobj
) == 0) {
3023 addReply(c
,shared
.space
);
3026 keyslen
+= sdslen(key
);
3030 dictReleaseIterator(di
);
3031 lenobj
->ptr
= sdscatprintf(sdsempty(),"$%lu\r\n",keyslen
+(numkeys
? (numkeys
-1) : 0));
3032 addReply(c
,shared
.crlf
);
3035 static void dbsizeCommand(redisClient
*c
) {
3037 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c
->db
->dict
)));
3040 static void lastsaveCommand(redisClient
*c
) {
3042 sdscatprintf(sdsempty(),":%lu\r\n",server
.lastsave
));
3045 static void typeCommand(redisClient
*c
) {
3049 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3054 case REDIS_STRING
: type
= "+string"; break;
3055 case REDIS_LIST
: type
= "+list"; break;
3056 case REDIS_SET
: type
= "+set"; break;
3057 default: type
= "unknown"; break;
3060 addReplySds(c
,sdsnew(type
));
3061 addReply(c
,shared
.crlf
);
3064 static void saveCommand(redisClient
*c
) {
3065 if (server
.bgsaveinprogress
) {
3066 addReplySds(c
,sdsnew("-ERR background save in progress\r\n"));
3069 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
3070 addReply(c
,shared
.ok
);
3072 addReply(c
,shared
.err
);
3076 static void bgsaveCommand(redisClient
*c
) {
3077 if (server
.bgsaveinprogress
) {
3078 addReplySds(c
,sdsnew("-ERR background save already in progress\r\n"));
3081 if (rdbSaveBackground(server
.dbfilename
) == REDIS_OK
) {
3082 addReply(c
,shared
.ok
);
3084 addReply(c
,shared
.err
);
3088 static void shutdownCommand(redisClient
*c
) {
3089 redisLog(REDIS_WARNING
,"User requested shutdown, saving DB...");
3090 /* Kill the saving child if there is a background saving in progress.
3091 We want to avoid race conditions, for instance our saving child may
3092 overwrite the synchronous saving did by SHUTDOWN. */
3093 if (server
.bgsaveinprogress
) {
3094 redisLog(REDIS_WARNING
,"There is a live saving child. Killing it!");
3095 kill(server
.bgsavechildpid
,SIGKILL
);
3096 rdbRemoveTempFile(server
.bgsavechildpid
);
3099 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
3100 if (server
.daemonize
)
3101 unlink(server
.pidfile
);
3102 redisLog(REDIS_WARNING
,"%zu bytes used at exit",zmalloc_used_memory());
3103 redisLog(REDIS_WARNING
,"Server exit now, bye bye...");
3106 /* Ooops.. error saving! The best we can do is to continue operating.
3107 * Note that if there was a background saving process, in the next
3108 * cron() Redis will be notified that the background saving aborted,
3109 * handling special stuff like slaves pending for synchronization... */
3110 redisLog(REDIS_WARNING
,"Error trying to save the DB, can't exit");
3111 addReplySds(c
,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3115 static void renameGenericCommand(redisClient
*c
, int nx
) {
3118 /* To use the same key as src and dst is probably an error */
3119 if (sdscmp(c
->argv
[1]->ptr
,c
->argv
[2]->ptr
) == 0) {
3120 addReply(c
,shared
.sameobjecterr
);
3124 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3126 addReply(c
,shared
.nokeyerr
);
3130 deleteIfVolatile(c
->db
,c
->argv
[2]);
3131 if (dictAdd(c
->db
->dict
,c
->argv
[2],o
) == DICT_ERR
) {
3134 addReply(c
,shared
.czero
);
3137 dictReplace(c
->db
->dict
,c
->argv
[2],o
);
3139 incrRefCount(c
->argv
[2]);
3141 deleteKey(c
->db
,c
->argv
[1]);
3143 addReply(c
,nx
? shared
.cone
: shared
.ok
);
3146 static void renameCommand(redisClient
*c
) {
3147 renameGenericCommand(c
,0);
3150 static void renamenxCommand(redisClient
*c
) {
3151 renameGenericCommand(c
,1);
3154 static void moveCommand(redisClient
*c
) {
3159 /* Obtain source and target DB pointers */
3162 if (selectDb(c
,atoi(c
->argv
[2]->ptr
)) == REDIS_ERR
) {
3163 addReply(c
,shared
.outofrangeerr
);
3167 selectDb(c
,srcid
); /* Back to the source DB */
3169 /* If the user is moving using as target the same
3170 * DB as the source DB it is probably an error. */
3172 addReply(c
,shared
.sameobjecterr
);
3176 /* Check if the element exists and get a reference */
3177 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3179 addReply(c
,shared
.czero
);
3183 /* Try to add the element to the target DB */
3184 deleteIfVolatile(dst
,c
->argv
[1]);
3185 if (dictAdd(dst
->dict
,c
->argv
[1],o
) == DICT_ERR
) {
3186 addReply(c
,shared
.czero
);
3189 incrRefCount(c
->argv
[1]);
3192 /* OK! key moved, free the entry in the source DB */
3193 deleteKey(src
,c
->argv
[1]);
3195 addReply(c
,shared
.cone
);
3198 /* =================================== Lists ================================ */
3199 static void pushGenericCommand(redisClient
*c
, int where
) {
3203 lobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3205 lobj
= createListObject();
3207 if (where
== REDIS_HEAD
) {
3208 listAddNodeHead(list
,c
->argv
[2]);
3210 listAddNodeTail(list
,c
->argv
[2]);
3212 dictAdd(c
->db
->dict
,c
->argv
[1],lobj
);
3213 incrRefCount(c
->argv
[1]);
3214 incrRefCount(c
->argv
[2]);
3216 if (lobj
->type
!= REDIS_LIST
) {
3217 addReply(c
,shared
.wrongtypeerr
);
3221 if (where
== REDIS_HEAD
) {
3222 listAddNodeHead(list
,c
->argv
[2]);
3224 listAddNodeTail(list
,c
->argv
[2]);
3226 incrRefCount(c
->argv
[2]);
3229 addReply(c
,shared
.ok
);
3232 static void lpushCommand(redisClient
*c
) {
3233 pushGenericCommand(c
,REDIS_HEAD
);
3236 static void rpushCommand(redisClient
*c
) {
3237 pushGenericCommand(c
,REDIS_TAIL
);
3240 static void llenCommand(redisClient
*c
) {
3244 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3246 addReply(c
,shared
.czero
);
3249 if (o
->type
!= REDIS_LIST
) {
3250 addReply(c
,shared
.wrongtypeerr
);
3253 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",listLength(l
)));
3258 static void lindexCommand(redisClient
*c
) {
3260 int index
= atoi(c
->argv
[2]->ptr
);
3262 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3264 addReply(c
,shared
.nullbulk
);
3266 if (o
->type
!= REDIS_LIST
) {
3267 addReply(c
,shared
.wrongtypeerr
);
3269 list
*list
= o
->ptr
;
3272 ln
= listIndex(list
, index
);
3274 addReply(c
,shared
.nullbulk
);
3276 robj
*ele
= listNodeValue(ln
);
3277 addReplyBulkLen(c
,ele
);
3279 addReply(c
,shared
.crlf
);
3285 static void lsetCommand(redisClient
*c
) {
3287 int index
= atoi(c
->argv
[2]->ptr
);
3289 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3291 addReply(c
,shared
.nokeyerr
);
3293 if (o
->type
!= REDIS_LIST
) {
3294 addReply(c
,shared
.wrongtypeerr
);
3296 list
*list
= o
->ptr
;
3299 ln
= listIndex(list
, index
);
3301 addReply(c
,shared
.outofrangeerr
);
3303 robj
*ele
= listNodeValue(ln
);
3306 listNodeValue(ln
) = c
->argv
[3];
3307 incrRefCount(c
->argv
[3]);
3308 addReply(c
,shared
.ok
);
3315 static void popGenericCommand(redisClient
*c
, int where
) {
3318 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3320 addReply(c
,shared
.nullbulk
);
3322 if (o
->type
!= REDIS_LIST
) {
3323 addReply(c
,shared
.wrongtypeerr
);
3325 list
*list
= o
->ptr
;
3328 if (where
== REDIS_HEAD
)
3329 ln
= listFirst(list
);
3331 ln
= listLast(list
);
3334 addReply(c
,shared
.nullbulk
);
3336 robj
*ele
= listNodeValue(ln
);
3337 addReplyBulkLen(c
,ele
);
3339 addReply(c
,shared
.crlf
);
3340 listDelNode(list
,ln
);
3347 static void lpopCommand(redisClient
*c
) {
3348 popGenericCommand(c
,REDIS_HEAD
);
3351 static void rpopCommand(redisClient
*c
) {
3352 popGenericCommand(c
,REDIS_TAIL
);
3355 static void lrangeCommand(redisClient
*c
) {
3357 int start
= atoi(c
->argv
[2]->ptr
);
3358 int end
= atoi(c
->argv
[3]->ptr
);
3360 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3362 addReply(c
,shared
.nullmultibulk
);
3364 if (o
->type
!= REDIS_LIST
) {
3365 addReply(c
,shared
.wrongtypeerr
);
3367 list
*list
= o
->ptr
;
3369 int llen
= listLength(list
);
3373 /* convert negative indexes */
3374 if (start
< 0) start
= llen
+start
;
3375 if (end
< 0) end
= llen
+end
;
3376 if (start
< 0) start
= 0;
3377 if (end
< 0) end
= 0;
3379 /* indexes sanity checks */
3380 if (start
> end
|| start
>= llen
) {
3381 /* Out of range start or start > end result in empty list */
3382 addReply(c
,shared
.emptymultibulk
);
3385 if (end
>= llen
) end
= llen
-1;
3386 rangelen
= (end
-start
)+1;
3388 /* Return the result in form of a multi-bulk reply */
3389 ln
= listIndex(list
, start
);
3390 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",rangelen
));
3391 for (j
= 0; j
< rangelen
; j
++) {
3392 ele
= listNodeValue(ln
);
3393 addReplyBulkLen(c
,ele
);
3395 addReply(c
,shared
.crlf
);
3402 static void ltrimCommand(redisClient
*c
) {
3404 int start
= atoi(c
->argv
[2]->ptr
);
3405 int end
= atoi(c
->argv
[3]->ptr
);
3407 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3409 addReply(c
,shared
.nokeyerr
);
3411 if (o
->type
!= REDIS_LIST
) {
3412 addReply(c
,shared
.wrongtypeerr
);
3414 list
*list
= o
->ptr
;
3416 int llen
= listLength(list
);
3417 int j
, ltrim
, rtrim
;
3419 /* convert negative indexes */
3420 if (start
< 0) start
= llen
+start
;
3421 if (end
< 0) end
= llen
+end
;
3422 if (start
< 0) start
= 0;
3423 if (end
< 0) end
= 0;
3425 /* indexes sanity checks */
3426 if (start
> end
|| start
>= llen
) {
3427 /* Out of range start or start > end result in empty list */
3431 if (end
>= llen
) end
= llen
-1;
3436 /* Remove list elements to perform the trim */
3437 for (j
= 0; j
< ltrim
; j
++) {
3438 ln
= listFirst(list
);
3439 listDelNode(list
,ln
);
3441 for (j
= 0; j
< rtrim
; j
++) {
3442 ln
= listLast(list
);
3443 listDelNode(list
,ln
);
3446 addReply(c
,shared
.ok
);
3451 static void lremCommand(redisClient
*c
) {
3454 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3456 addReply(c
,shared
.czero
);
3458 if (o
->type
!= REDIS_LIST
) {
3459 addReply(c
,shared
.wrongtypeerr
);
3461 list
*list
= o
->ptr
;
3462 listNode
*ln
, *next
;
3463 int toremove
= atoi(c
->argv
[2]->ptr
);
3468 toremove
= -toremove
;
3471 ln
= fromtail
? list
->tail
: list
->head
;
3473 robj
*ele
= listNodeValue(ln
);
3475 next
= fromtail
? ln
->prev
: ln
->next
;
3476 if (compareStringObjects(ele
,c
->argv
[3]) == 0) {
3477 listDelNode(list
,ln
);
3480 if (toremove
&& removed
== toremove
) break;
3484 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",removed
));
3489 /* ==================================== Sets ================================ */
3491 static void saddCommand(redisClient
*c
) {
3494 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3496 set
= createSetObject();
3497 dictAdd(c
->db
->dict
,c
->argv
[1],set
);
3498 incrRefCount(c
->argv
[1]);
3500 if (set
->type
!= REDIS_SET
) {
3501 addReply(c
,shared
.wrongtypeerr
);
3505 if (dictAdd(set
->ptr
,c
->argv
[2],NULL
) == DICT_OK
) {
3506 incrRefCount(c
->argv
[2]);
3508 addReply(c
,shared
.cone
);
3510 addReply(c
,shared
.czero
);
3514 static void sremCommand(redisClient
*c
) {
3517 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3519 addReply(c
,shared
.czero
);
3521 if (set
->type
!= REDIS_SET
) {
3522 addReply(c
,shared
.wrongtypeerr
);
3525 if (dictDelete(set
->ptr
,c
->argv
[2]) == DICT_OK
) {
3527 if (htNeedsResize(set
->ptr
)) dictResize(set
->ptr
);
3528 addReply(c
,shared
.cone
);
3530 addReply(c
,shared
.czero
);
3535 static void smoveCommand(redisClient
*c
) {
3536 robj
*srcset
, *dstset
;
3538 srcset
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3539 dstset
= lookupKeyWrite(c
->db
,c
->argv
[2]);
3541 /* If the source key does not exist return 0, if it's of the wrong type
3543 if (srcset
== NULL
|| srcset
->type
!= REDIS_SET
) {
3544 addReply(c
, srcset
? shared
.wrongtypeerr
: shared
.czero
);
3547 /* Error if the destination key is not a set as well */
3548 if (dstset
&& dstset
->type
!= REDIS_SET
) {
3549 addReply(c
,shared
.wrongtypeerr
);
3552 /* Remove the element from the source set */
3553 if (dictDelete(srcset
->ptr
,c
->argv
[3]) == DICT_ERR
) {
3554 /* Key not found in the src set! return zero */
3555 addReply(c
,shared
.czero
);
3559 /* Add the element to the destination set */
3561 dstset
= createSetObject();
3562 dictAdd(c
->db
->dict
,c
->argv
[2],dstset
);
3563 incrRefCount(c
->argv
[2]);
3565 if (dictAdd(dstset
->ptr
,c
->argv
[3],NULL
) == DICT_OK
)
3566 incrRefCount(c
->argv
[3]);
3567 addReply(c
,shared
.cone
);
3570 static void sismemberCommand(redisClient
*c
) {
3573 set
= lookupKeyRead(c
->db
,c
->argv
[1]);
3575 addReply(c
,shared
.czero
);
3577 if (set
->type
!= REDIS_SET
) {
3578 addReply(c
,shared
.wrongtypeerr
);
3581 if (dictFind(set
->ptr
,c
->argv
[2]))
3582 addReply(c
,shared
.cone
);
3584 addReply(c
,shared
.czero
);
3588 static void scardCommand(redisClient
*c
) {
3592 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3594 addReply(c
,shared
.czero
);
3597 if (o
->type
!= REDIS_SET
) {
3598 addReply(c
,shared
.wrongtypeerr
);
3601 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",
3607 static void spopCommand(redisClient
*c
) {
3611 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3613 addReply(c
,shared
.nullbulk
);
3615 if (set
->type
!= REDIS_SET
) {
3616 addReply(c
,shared
.wrongtypeerr
);
3619 de
= dictGetRandomKey(set
->ptr
);
3621 addReply(c
,shared
.nullbulk
);
3623 robj
*ele
= dictGetEntryKey(de
);
3625 addReplyBulkLen(c
,ele
);
3627 addReply(c
,shared
.crlf
);
3628 dictDelete(set
->ptr
,ele
);
3629 if (htNeedsResize(set
->ptr
)) dictResize(set
->ptr
);
3635 static void srandmemberCommand(redisClient
*c
) {
3639 set
= lookupKeyRead(c
->db
,c
->argv
[1]);
3641 addReply(c
,shared
.nullbulk
);
3643 if (set
->type
!= REDIS_SET
) {
3644 addReply(c
,shared
.wrongtypeerr
);
3647 de
= dictGetRandomKey(set
->ptr
);
3649 addReply(c
,shared
.nullbulk
);
3651 robj
*ele
= dictGetEntryKey(de
);
3653 addReplyBulkLen(c
,ele
);
3655 addReply(c
,shared
.crlf
);
3660 static int qsortCompareSetsByCardinality(const void *s1
, const void *s2
) {
3661 dict
**d1
= (void*) s1
, **d2
= (void*) s2
;
3663 return dictSize(*d1
)-dictSize(*d2
);
3666 static void sinterGenericCommand(redisClient
*c
, robj
**setskeys
, int setsnum
, robj
*dstkey
) {
3667 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
3670 robj
*lenobj
= NULL
, *dstset
= NULL
;
3671 int j
, cardinality
= 0;
3673 for (j
= 0; j
< setsnum
; j
++) {
3677 lookupKeyWrite(c
->db
,setskeys
[j
]) :
3678 lookupKeyRead(c
->db
,setskeys
[j
]);
3682 deleteKey(c
->db
,dstkey
);
3683 addReply(c
,shared
.ok
);
3685 addReply(c
,shared
.nullmultibulk
);
3689 if (setobj
->type
!= REDIS_SET
) {
3691 addReply(c
,shared
.wrongtypeerr
);
3694 dv
[j
] = setobj
->ptr
;
3696 /* Sort sets from the smallest to largest, this will improve our
3697 * algorithm's performace */
3698 qsort(dv
,setsnum
,sizeof(dict
*),qsortCompareSetsByCardinality
);
3700 /* The first thing we should output is the total number of elements...
3701 * since this is a multi-bulk write, but at this stage we don't know
3702 * the intersection set size, so we use a trick, append an empty object
3703 * to the output list and save the pointer to later modify it with the
3706 lenobj
= createObject(REDIS_STRING
,NULL
);
3708 decrRefCount(lenobj
);
3710 /* If we have a target key where to store the resulting set
3711 * create this key with an empty set inside */
3712 dstset
= createSetObject();
3715 /* Iterate all the elements of the first (smallest) set, and test
3716 * the element against all the other sets, if at least one set does
3717 * not include the element it is discarded */
3718 di
= dictGetIterator(dv
[0]);
3720 while((de
= dictNext(di
)) != NULL
) {
3723 for (j
= 1; j
< setsnum
; j
++)
3724 if (dictFind(dv
[j
],dictGetEntryKey(de
)) == NULL
) break;
3726 continue; /* at least one set does not contain the member */
3727 ele
= dictGetEntryKey(de
);
3729 addReplyBulkLen(c
,ele
);
3731 addReply(c
,shared
.crlf
);
3734 dictAdd(dstset
->ptr
,ele
,NULL
);
3738 dictReleaseIterator(di
);
3741 /* Store the resulting set into the target */
3742 deleteKey(c
->db
,dstkey
);
3743 dictAdd(c
->db
->dict
,dstkey
,dstset
);
3744 incrRefCount(dstkey
);
3748 lenobj
->ptr
= sdscatprintf(sdsempty(),"*%d\r\n",cardinality
);
3750 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",
3751 dictSize((dict
*)dstset
->ptr
)));
3757 static void sinterCommand(redisClient
*c
) {
3758 sinterGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
);
3761 static void sinterstoreCommand(redisClient
*c
) {
3762 sinterGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1]);
3765 #define REDIS_OP_UNION 0
3766 #define REDIS_OP_DIFF 1
3768 static void sunionDiffGenericCommand(redisClient
*c
, robj
**setskeys
, int setsnum
, robj
*dstkey
, int op
) {
3769 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
3772 robj
*dstset
= NULL
;
3773 int j
, cardinality
= 0;
3775 for (j
= 0; j
< setsnum
; j
++) {
3779 lookupKeyWrite(c
->db
,setskeys
[j
]) :
3780 lookupKeyRead(c
->db
,setskeys
[j
]);
3785 if (setobj
->type
!= REDIS_SET
) {
3787 addReply(c
,shared
.wrongtypeerr
);
3790 dv
[j
] = setobj
->ptr
;
3793 /* We need a temp set object to store our union. If the dstkey
3794 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
3795 * this set object will be the resulting object to set into the target key*/
3796 dstset
= createSetObject();
3798 /* Iterate all the elements of all the sets, add every element a single
3799 * time to the result set */
3800 for (j
= 0; j
< setsnum
; j
++) {
3801 if (op
== REDIS_OP_DIFF
&& j
== 0 && !dv
[j
]) break; /* result set is empty */
3802 if (!dv
[j
]) continue; /* non existing keys are like empty sets */
3804 di
= dictGetIterator(dv
[j
]);
3806 while((de
= dictNext(di
)) != NULL
) {
3809 /* dictAdd will not add the same element multiple times */
3810 ele
= dictGetEntryKey(de
);
3811 if (op
== REDIS_OP_UNION
|| j
== 0) {
3812 if (dictAdd(dstset
->ptr
,ele
,NULL
) == DICT_OK
) {
3816 } else if (op
== REDIS_OP_DIFF
) {
3817 if (dictDelete(dstset
->ptr
,ele
) == DICT_OK
) {
3822 dictReleaseIterator(di
);
3824 if (op
== REDIS_OP_DIFF
&& cardinality
== 0) break; /* result set is empty */
3827 /* Output the content of the resulting set, if not in STORE mode */
3829 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",cardinality
));
3830 di
= dictGetIterator(dstset
->ptr
);
3831 while((de
= dictNext(di
)) != NULL
) {
3834 ele
= dictGetEntryKey(de
);
3835 addReplyBulkLen(c
,ele
);
3837 addReply(c
,shared
.crlf
);
3839 dictReleaseIterator(di
);
3841 /* If we have a target key where to store the resulting set
3842 * create this key with the result set inside */
3843 deleteKey(c
->db
,dstkey
);
3844 dictAdd(c
->db
->dict
,dstkey
,dstset
);
3845 incrRefCount(dstkey
);
3850 decrRefCount(dstset
);
3852 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",
3853 dictSize((dict
*)dstset
->ptr
)));
3859 static void sunionCommand(redisClient
*c
) {
3860 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_UNION
);
3863 static void sunionstoreCommand(redisClient
*c
) {
3864 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_UNION
);
3867 static void sdiffCommand(redisClient
*c
) {
3868 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_DIFF
);
3871 static void sdiffstoreCommand(redisClient
*c
) {
3872 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_DIFF
);
3875 /* ==================================== ZSets =============================== */
3877 /* ZSETs are ordered sets using two data structures to hold the same elements
3878 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
3881 * The elements are added to an hash table mapping Redis objects to scores.
3882 * At the same time the elements are added to a skip list mapping scores
3883 * to Redis objects (so objects are sorted by scores in this "view"). */
3885 /* This skiplist implementation is almost a C translation of the original
3886 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
3887 * Alternative to Balanced Trees", modified in three ways:
3888 * a) this implementation allows for repeated values.
3889 * b) the comparison is not just by key (our 'score') but by satellite data.
3890 * c) there is a back pointer, so it's a doubly linked list with the back
3891 * pointers being only at "level 1". This allows to traverse the list
3892 * from tail to head, useful for ZREVRANGE. */
3894 static zskiplistNode
*zslCreateNode(int level
, double score
, robj
*obj
) {
3895 zskiplistNode
*zn
= zmalloc(sizeof(*zn
));
3897 zn
->forward
= zmalloc(sizeof(zskiplistNode
*) * level
);
3903 static zskiplist
*zslCreate(void) {
3907 zsl
= zmalloc(sizeof(*zsl
));
3910 zsl
->header
= zslCreateNode(ZSKIPLIST_MAXLEVEL
,0,NULL
);
3911 for (j
= 0; j
< ZSKIPLIST_MAXLEVEL
; j
++)
3912 zsl
->header
->forward
[j
] = NULL
;
3913 zsl
->header
->backward
= NULL
;
3918 static void zslFreeNode(zskiplistNode
*node
) {
3919 decrRefCount(node
->obj
);
3920 zfree(node
->forward
);
3924 static void zslFree(zskiplist
*zsl
) {
3925 zskiplistNode
*node
= zsl
->header
->forward
[0], *next
;
3927 zfree(zsl
->header
->forward
);
3930 next
= node
->forward
[0];
3937 static int zslRandomLevel(void) {
3939 while ((random()&0xFFFF) < (ZSKIPLIST_P
* 0xFFFF))
3944 static void zslInsert(zskiplist
*zsl
, double score
, robj
*obj
) {
3945 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
3949 for (i
= zsl
->level
-1; i
>= 0; i
--) {
3950 while (x
->forward
[i
] &&
3951 (x
->forward
[i
]->score
< score
||
3952 (x
->forward
[i
]->score
== score
&&
3953 compareStringObjects(x
->forward
[i
]->obj
,obj
) < 0)))
3957 /* we assume the key is not already inside, since we allow duplicated
3958 * scores, and the re-insertion of score and redis object should never
3959 * happpen since the caller of zslInsert() should test in the hash table
3960 * if the element is already inside or not. */
3961 level
= zslRandomLevel();
3962 if (level
> zsl
->level
) {
3963 for (i
= zsl
->level
; i
< level
; i
++)
3964 update
[i
] = zsl
->header
;
3967 x
= zslCreateNode(level
,score
,obj
);
3968 for (i
= 0; i
< level
; i
++) {
3969 x
->forward
[i
] = update
[i
]->forward
[i
];
3970 update
[i
]->forward
[i
] = x
;
3972 x
->backward
= (update
[0] == zsl
->header
) ? NULL
: update
[0];
3974 x
->forward
[0]->backward
= x
;
3980 /* Delete an element with matching score/object from the skiplist. */
3981 static int zslDelete(zskiplist
*zsl
, double score
, robj
*obj
) {
3982 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
3986 for (i
= zsl
->level
-1; i
>= 0; i
--) {
3987 while (x
->forward
[i
] &&
3988 (x
->forward
[i
]->score
< score
||
3989 (x
->forward
[i
]->score
== score
&&
3990 compareStringObjects(x
->forward
[i
]->obj
,obj
) < 0)))
3994 /* We may have multiple elements with the same score, what we need
3995 * is to find the element with both the right score and object. */
3997 if (x
&& score
== x
->score
&& compareStringObjects(x
->obj
,obj
) == 0) {
3998 for (i
= 0; i
< zsl
->level
; i
++) {
3999 if (update
[i
]->forward
[i
] != x
) break;
4000 update
[i
]->forward
[i
] = x
->forward
[i
];
4002 if (x
->forward
[0]) {
4003 x
->forward
[0]->backward
= (x
->backward
== zsl
->header
) ?
4006 zsl
->tail
= x
->backward
;
4009 while(zsl
->level
> 1 && zsl
->header
->forward
[zsl
->level
-1] == NULL
)
4014 return 0; /* not found */
4016 return 0; /* not found */
4019 /* Delete all the elements with score between min and max from the skiplist.
4020 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
4021 * Note that this function takes the reference to the hash table view of the
4022 * sorted set, in order to remove the elements from the hash table too. */
4023 static unsigned long zslDeleteRange(zskiplist
*zsl
, double min
, double max
, dict
*dict
) {
4024 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
4025 unsigned long removed
= 0;
4029 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4030 while (x
->forward
[i
] && x
->forward
[i
]->score
< min
)
4034 /* We may have multiple elements with the same score, what we need
4035 * is to find the element with both the right score and object. */
4037 while (x
&& x
->score
<= max
) {
4038 zskiplistNode
*next
;
4040 for (i
= 0; i
< zsl
->level
; i
++) {
4041 if (update
[i
]->forward
[i
] != x
) break;
4042 update
[i
]->forward
[i
] = x
->forward
[i
];
4044 if (x
->forward
[0]) {
4045 x
->forward
[0]->backward
= (x
->backward
== zsl
->header
) ?
4048 zsl
->tail
= x
->backward
;
4050 next
= x
->forward
[0];
4051 dictDelete(dict
,x
->obj
);
4053 while(zsl
->level
> 1 && zsl
->header
->forward
[zsl
->level
-1] == NULL
)
4059 return removed
; /* not found */
4062 /* Find the first node having a score equal or greater than the specified one.
4063 * Returns NULL if there is no match. */
4064 static zskiplistNode
*zslFirstWithScore(zskiplist
*zsl
, double score
) {
4069 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4070 while (x
->forward
[i
] && x
->forward
[i
]->score
< score
)
4073 /* We may have multiple elements with the same score, what we need
4074 * is to find the element with both the right score and object. */
4075 return x
->forward
[0];
4078 /* The actual Z-commands implementations */
4080 static void zaddCommand(redisClient
*c
) {
4085 zsetobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4086 if (zsetobj
== NULL
) {
4087 zsetobj
= createZsetObject();
4088 dictAdd(c
->db
->dict
,c
->argv
[1],zsetobj
);
4089 incrRefCount(c
->argv
[1]);
4091 if (zsetobj
->type
!= REDIS_ZSET
) {
4092 addReply(c
,shared
.wrongtypeerr
);
4096 score
= zmalloc(sizeof(double));
4097 *score
= strtod(c
->argv
[2]->ptr
,NULL
);
4099 if (dictAdd(zs
->dict
,c
->argv
[3],score
) == DICT_OK
) {
4100 /* case 1: New element */
4101 incrRefCount(c
->argv
[3]); /* added to hash */
4102 zslInsert(zs
->zsl
,*score
,c
->argv
[3]);
4103 incrRefCount(c
->argv
[3]); /* added to skiplist */
4105 addReply(c
,shared
.cone
);
4110 /* case 2: Score update operation */
4111 de
= dictFind(zs
->dict
,c
->argv
[3]);
4113 oldscore
= dictGetEntryVal(de
);
4114 if (*score
!= *oldscore
) {
4117 deleted
= zslDelete(zs
->zsl
,*oldscore
,c
->argv
[3]);
4118 assert(deleted
!= 0);
4119 zslInsert(zs
->zsl
,*score
,c
->argv
[3]);
4120 incrRefCount(c
->argv
[3]);
4121 dictReplace(zs
->dict
,c
->argv
[3],score
);
4126 addReply(c
,shared
.czero
);
4130 static void zremCommand(redisClient
*c
) {
4134 zsetobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4135 if (zsetobj
== NULL
) {
4136 addReply(c
,shared
.czero
);
4142 if (zsetobj
->type
!= REDIS_ZSET
) {
4143 addReply(c
,shared
.wrongtypeerr
);
4147 de
= dictFind(zs
->dict
,c
->argv
[2]);
4149 addReply(c
,shared
.czero
);
4152 /* Delete from the skiplist */
4153 oldscore
= dictGetEntryVal(de
);
4154 deleted
= zslDelete(zs
->zsl
,*oldscore
,c
->argv
[2]);
4155 assert(deleted
!= 0);
4157 /* Delete from the hash table */
4158 dictDelete(zs
->dict
,c
->argv
[2]);
4159 if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
);
4161 addReply(c
,shared
.cone
);
4165 static void zremrangebyscoreCommand(redisClient
*c
) {
4166 double min
= strtod(c
->argv
[2]->ptr
,NULL
);
4167 double max
= strtod(c
->argv
[3]->ptr
,NULL
);
4171 zsetobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4172 if (zsetobj
== NULL
) {
4173 addReply(c
,shared
.czero
);
4177 if (zsetobj
->type
!= REDIS_ZSET
) {
4178 addReply(c
,shared
.wrongtypeerr
);
4182 deleted
= zslDeleteRange(zs
->zsl
,min
,max
,zs
->dict
);
4183 if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
);
4184 server
.dirty
+= deleted
;
4185 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",deleted
));
4189 static void zrangeGenericCommand(redisClient
*c
, int reverse
) {
4191 int start
= atoi(c
->argv
[2]->ptr
);
4192 int end
= atoi(c
->argv
[3]->ptr
);
4194 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4196 addReply(c
,shared
.nullmultibulk
);
4198 if (o
->type
!= REDIS_ZSET
) {
4199 addReply(c
,shared
.wrongtypeerr
);
4201 zset
*zsetobj
= o
->ptr
;
4202 zskiplist
*zsl
= zsetobj
->zsl
;
4205 int llen
= zsl
->length
;
4209 /* convert negative indexes */
4210 if (start
< 0) start
= llen
+start
;
4211 if (end
< 0) end
= llen
+end
;
4212 if (start
< 0) start
= 0;
4213 if (end
< 0) end
= 0;
4215 /* indexes sanity checks */
4216 if (start
> end
|| start
>= llen
) {
4217 /* Out of range start or start > end result in empty list */
4218 addReply(c
,shared
.emptymultibulk
);
4221 if (end
>= llen
) end
= llen
-1;
4222 rangelen
= (end
-start
)+1;
4224 /* Return the result in form of a multi-bulk reply */
4230 ln
= zsl
->header
->forward
[0];
4232 ln
= ln
->forward
[0];
4235 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",rangelen
));
4236 for (j
= 0; j
< rangelen
; j
++) {
4238 addReplyBulkLen(c
,ele
);
4240 addReply(c
,shared
.crlf
);
4241 ln
= reverse
? ln
->backward
: ln
->forward
[0];
4247 static void zrangeCommand(redisClient
*c
) {
4248 zrangeGenericCommand(c
,0);
4251 static void zrevrangeCommand(redisClient
*c
) {
4252 zrangeGenericCommand(c
,1);
4255 static void zrangebyscoreCommand(redisClient
*c
) {
4257 double min
= strtod(c
->argv
[2]->ptr
,NULL
);
4258 double max
= strtod(c
->argv
[3]->ptr
,NULL
);
4260 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4262 addReply(c
,shared
.nullmultibulk
);
4264 if (o
->type
!= REDIS_ZSET
) {
4265 addReply(c
,shared
.wrongtypeerr
);
4267 zset
*zsetobj
= o
->ptr
;
4268 zskiplist
*zsl
= zsetobj
->zsl
;
4271 unsigned int rangelen
= 0;
4273 /* Get the first node with the score >= min */
4274 ln
= zslFirstWithScore(zsl
,min
);
4276 /* No element matching the speciifed interval */
4277 addReply(c
,shared
.emptymultibulk
);
4281 /* We don't know in advance how many matching elements there
4282 * are in the list, so we push this object that will represent
4283 * the multi-bulk length in the output buffer, and will "fix"
4285 lenobj
= createObject(REDIS_STRING
,NULL
);
4288 while(ln
&& ln
->score
<= max
) {
4290 addReplyBulkLen(c
,ele
);
4292 addReply(c
,shared
.crlf
);
4293 ln
= ln
->forward
[0];
4296 lenobj
->ptr
= sdscatprintf(sdsempty(),"*%d\r\n",rangelen
);
4301 static void zcardCommand(redisClient
*c
) {
4305 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4307 addReply(c
,shared
.czero
);
4310 if (o
->type
!= REDIS_ZSET
) {
4311 addReply(c
,shared
.wrongtypeerr
);
4314 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",zs
->zsl
->length
));
4319 static void zscoreCommand(redisClient
*c
) {
4323 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4325 addReply(c
,shared
.czero
);
4328 if (o
->type
!= REDIS_ZSET
) {
4329 addReply(c
,shared
.wrongtypeerr
);
4334 de
= dictFind(zs
->dict
,c
->argv
[2]);
4336 addReply(c
,shared
.nullbulk
);
4339 double *score
= dictGetEntryVal(de
);
4341 snprintf(buf
,sizeof(buf
),"%.16g",*score
);
4342 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n%s\r\n",
4349 /* ========================= Non type-specific commands ==================== */
4351 static void flushdbCommand(redisClient
*c
) {
4352 server
.dirty
+= dictSize(c
->db
->dict
);
4353 dictEmpty(c
->db
->dict
);
4354 dictEmpty(c
->db
->expires
);
4355 addReply(c
,shared
.ok
);
4358 static void flushallCommand(redisClient
*c
) {
4359 server
.dirty
+= emptyDb();
4360 addReply(c
,shared
.ok
);
4361 rdbSave(server
.dbfilename
);
4365 static redisSortOperation
*createSortOperation(int type
, robj
*pattern
) {
4366 redisSortOperation
*so
= zmalloc(sizeof(*so
));
4368 so
->pattern
= pattern
;
4372 /* Return the value associated to the key with a name obtained
4373 * substituting the first occurence of '*' in 'pattern' with 'subst' */
4374 static robj
*lookupKeyByPattern(redisDb
*db
, robj
*pattern
, robj
*subst
) {
4378 int prefixlen
, sublen
, postfixlen
;
4379 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
4383 char buf
[REDIS_SORTKEY_MAX
+1];
4386 if (subst
->encoding
== REDIS_ENCODING_RAW
)
4387 incrRefCount(subst
);
4389 subst
= getDecodedObject(subst
);
4392 spat
= pattern
->ptr
;
4394 if (sdslen(spat
)+sdslen(ssub
)-1 > REDIS_SORTKEY_MAX
) return NULL
;
4395 p
= strchr(spat
,'*');
4396 if (!p
) return NULL
;
4399 sublen
= sdslen(ssub
);
4400 postfixlen
= sdslen(spat
)-(prefixlen
+1);
4401 memcpy(keyname
.buf
,spat
,prefixlen
);
4402 memcpy(keyname
.buf
+prefixlen
,ssub
,sublen
);
4403 memcpy(keyname
.buf
+prefixlen
+sublen
,p
+1,postfixlen
);
4404 keyname
.buf
[prefixlen
+sublen
+postfixlen
] = '\0';
4405 keyname
.len
= prefixlen
+sublen
+postfixlen
;
4407 keyobj
.refcount
= 1;
4408 keyobj
.type
= REDIS_STRING
;
4409 keyobj
.ptr
= ((char*)&keyname
)+(sizeof(long)*2);
4411 decrRefCount(subst
);
4413 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
4414 return lookupKeyRead(db
,&keyobj
);
4417 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
4418 * the additional parameter is not standard but a BSD-specific we have to
4419 * pass sorting parameters via the global 'server' structure */
4420 static int sortCompare(const void *s1
, const void *s2
) {
4421 const redisSortObject
*so1
= s1
, *so2
= s2
;
4424 if (!server
.sort_alpha
) {
4425 /* Numeric sorting. Here it's trivial as we precomputed scores */
4426 if (so1
->u
.score
> so2
->u
.score
) {
4428 } else if (so1
->u
.score
< so2
->u
.score
) {
4434 /* Alphanumeric sorting */
4435 if (server
.sort_bypattern
) {
4436 if (!so1
->u
.cmpobj
|| !so2
->u
.cmpobj
) {
4437 /* At least one compare object is NULL */
4438 if (so1
->u
.cmpobj
== so2
->u
.cmpobj
)
4440 else if (so1
->u
.cmpobj
== NULL
)
4445 /* We have both the objects, use strcoll */
4446 cmp
= strcoll(so1
->u
.cmpobj
->ptr
,so2
->u
.cmpobj
->ptr
);
4449 /* Compare elements directly */
4450 if (so1
->obj
->encoding
== REDIS_ENCODING_RAW
&&
4451 so2
->obj
->encoding
== REDIS_ENCODING_RAW
) {
4452 cmp
= strcoll(so1
->obj
->ptr
,so2
->obj
->ptr
);
4456 dec1
= so1
->obj
->encoding
== REDIS_ENCODING_RAW
?
4457 so1
->obj
: getDecodedObject(so1
->obj
);
4458 dec2
= so2
->obj
->encoding
== REDIS_ENCODING_RAW
?
4459 so2
->obj
: getDecodedObject(so2
->obj
);
4460 cmp
= strcoll(dec1
->ptr
,dec2
->ptr
);
4461 if (dec1
!= so1
->obj
) decrRefCount(dec1
);
4462 if (dec2
!= so2
->obj
) decrRefCount(dec2
);
4466 return server
.sort_desc
? -cmp
: cmp
;
4469 /* The SORT command is the most complex command in Redis. Warning: this code
4470 * is optimized for speed and a bit less for readability */
4471 static void sortCommand(redisClient
*c
) {
4474 int desc
= 0, alpha
= 0;
4475 int limit_start
= 0, limit_count
= -1, start
, end
;
4476 int j
, dontsort
= 0, vectorlen
;
4477 int getop
= 0; /* GET operation counter */
4478 robj
*sortval
, *sortby
= NULL
;
4479 redisSortObject
*vector
; /* Resulting vector to sort */
4481 /* Lookup the key to sort. It must be of the right types */
4482 sortval
= lookupKeyRead(c
->db
,c
->argv
[1]);
4483 if (sortval
== NULL
) {
4484 addReply(c
,shared
.nokeyerr
);
4487 if (sortval
->type
!= REDIS_SET
&& sortval
->type
!= REDIS_LIST
) {
4488 addReply(c
,shared
.wrongtypeerr
);
4492 /* Create a list of operations to perform for every sorted element.
4493 * Operations can be GET/DEL/INCR/DECR */
4494 operations
= listCreate();
4495 listSetFreeMethod(operations
,zfree
);
4498 /* Now we need to protect sortval incrementing its count, in the future
4499 * SORT may have options able to overwrite/delete keys during the sorting
4500 * and the sorted key itself may get destroied */
4501 incrRefCount(sortval
);
4503 /* The SORT command has an SQL-alike syntax, parse it */
4504 while(j
< c
->argc
) {
4505 int leftargs
= c
->argc
-j
-1;
4506 if (!strcasecmp(c
->argv
[j
]->ptr
,"asc")) {
4508 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"desc")) {
4510 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"alpha")) {
4512 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"limit") && leftargs
>= 2) {
4513 limit_start
= atoi(c
->argv
[j
+1]->ptr
);
4514 limit_count
= atoi(c
->argv
[j
+2]->ptr
);
4516 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"by") && leftargs
>= 1) {
4517 sortby
= c
->argv
[j
+1];
4518 /* If the BY pattern does not contain '*', i.e. it is constant,
4519 * we don't need to sort nor to lookup the weight keys. */
4520 if (strchr(c
->argv
[j
+1]->ptr
,'*') == NULL
) dontsort
= 1;
4522 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"get") && leftargs
>= 1) {
4523 listAddNodeTail(operations
,createSortOperation(
4524 REDIS_SORT_GET
,c
->argv
[j
+1]));
4527 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"del") && leftargs
>= 1) {
4528 listAddNodeTail(operations
,createSortOperation(
4529 REDIS_SORT_DEL
,c
->argv
[j
+1]));
4531 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"incr") && leftargs
>= 1) {
4532 listAddNodeTail(operations
,createSortOperation(
4533 REDIS_SORT_INCR
,c
->argv
[j
+1]));
4535 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"get") && leftargs
>= 1) {
4536 listAddNodeTail(operations
,createSortOperation(
4537 REDIS_SORT_DECR
,c
->argv
[j
+1]));
4540 decrRefCount(sortval
);
4541 listRelease(operations
);
4542 addReply(c
,shared
.syntaxerr
);
4548 /* Load the sorting vector with all the objects to sort */
4549 vectorlen
= (sortval
->type
== REDIS_LIST
) ?
4550 listLength((list
*)sortval
->ptr
) :
4551 dictSize((dict
*)sortval
->ptr
);
4552 vector
= zmalloc(sizeof(redisSortObject
)*vectorlen
);
4554 if (sortval
->type
== REDIS_LIST
) {
4555 list
*list
= sortval
->ptr
;
4559 while((ln
= listYield(list
))) {
4560 robj
*ele
= ln
->value
;
4561 vector
[j
].obj
= ele
;
4562 vector
[j
].u
.score
= 0;
4563 vector
[j
].u
.cmpobj
= NULL
;
4567 dict
*set
= sortval
->ptr
;
4571 di
= dictGetIterator(set
);
4572 while((setele
= dictNext(di
)) != NULL
) {
4573 vector
[j
].obj
= dictGetEntryKey(setele
);
4574 vector
[j
].u
.score
= 0;
4575 vector
[j
].u
.cmpobj
= NULL
;
4578 dictReleaseIterator(di
);
4580 assert(j
== vectorlen
);
4582 /* Now it's time to load the right scores in the sorting vector */
4583 if (dontsort
== 0) {
4584 for (j
= 0; j
< vectorlen
; j
++) {
4588 byval
= lookupKeyByPattern(c
->db
,sortby
,vector
[j
].obj
);
4589 if (!byval
|| byval
->type
!= REDIS_STRING
) continue;
4591 if (byval
->encoding
== REDIS_ENCODING_RAW
) {
4592 vector
[j
].u
.cmpobj
= byval
;
4593 incrRefCount(byval
);
4595 vector
[j
].u
.cmpobj
= getDecodedObject(byval
);
4598 if (byval
->encoding
== REDIS_ENCODING_RAW
) {
4599 vector
[j
].u
.score
= strtod(byval
->ptr
,NULL
);
4601 if (byval
->encoding
== REDIS_ENCODING_INT
) {
4602 vector
[j
].u
.score
= (long)byval
->ptr
;
4609 if (vector
[j
].obj
->encoding
== REDIS_ENCODING_RAW
)
4610 vector
[j
].u
.score
= strtod(vector
[j
].obj
->ptr
,NULL
);
4612 if (vector
[j
].obj
->encoding
== REDIS_ENCODING_INT
)
4613 vector
[j
].u
.score
= (long) vector
[j
].obj
->ptr
;
4622 /* We are ready to sort the vector... perform a bit of sanity check
4623 * on the LIMIT option too. We'll use a partial version of quicksort. */
4624 start
= (limit_start
< 0) ? 0 : limit_start
;
4625 end
= (limit_count
< 0) ? vectorlen
-1 : start
+limit_count
-1;
4626 if (start
>= vectorlen
) {
4627 start
= vectorlen
-1;
4630 if (end
>= vectorlen
) end
= vectorlen
-1;
4632 if (dontsort
== 0) {
4633 server
.sort_desc
= desc
;
4634 server
.sort_alpha
= alpha
;
4635 server
.sort_bypattern
= sortby
? 1 : 0;
4636 if (sortby
&& (start
!= 0 || end
!= vectorlen
-1))
4637 pqsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
, start
,end
);
4639 qsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
);
4642 /* Send command output to the output buffer, performing the specified
4643 * GET/DEL/INCR/DECR operations if any. */
4644 outputlen
= getop
? getop
*(end
-start
+1) : end
-start
+1;
4645 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",outputlen
));
4646 for (j
= start
; j
<= end
; j
++) {
4649 addReplyBulkLen(c
,vector
[j
].obj
);
4650 addReply(c
,vector
[j
].obj
);
4651 addReply(c
,shared
.crlf
);
4653 listRewind(operations
);
4654 while((ln
= listYield(operations
))) {
4655 redisSortOperation
*sop
= ln
->value
;
4656 robj
*val
= lookupKeyByPattern(c
->db
,sop
->pattern
,
4659 if (sop
->type
== REDIS_SORT_GET
) {
4660 if (!val
|| val
->type
!= REDIS_STRING
) {
4661 addReply(c
,shared
.nullbulk
);
4663 addReplyBulkLen(c
,val
);
4665 addReply(c
,shared
.crlf
);
4667 } else if (sop
->type
== REDIS_SORT_DEL
) {
4674 decrRefCount(sortval
);
4675 listRelease(operations
);
4676 for (j
= 0; j
< vectorlen
; j
++) {
4677 if (sortby
&& alpha
&& vector
[j
].u
.cmpobj
)
4678 decrRefCount(vector
[j
].u
.cmpobj
);
4683 static void infoCommand(redisClient
*c
) {
4685 time_t uptime
= time(NULL
)-server
.stat_starttime
;
4688 info
= sdscatprintf(sdsempty(),
4689 "redis_version:%s\r\n"
4691 "uptime_in_seconds:%d\r\n"
4692 "uptime_in_days:%d\r\n"
4693 "connected_clients:%d\r\n"
4694 "connected_slaves:%d\r\n"
4695 "used_memory:%zu\r\n"
4696 "changes_since_last_save:%lld\r\n"
4697 "bgsave_in_progress:%d\r\n"
4698 "last_save_time:%d\r\n"
4699 "total_connections_received:%lld\r\n"
4700 "total_commands_processed:%lld\r\n"
4703 (sizeof(long) == 8) ? "64" : "32",
4706 listLength(server
.clients
)-listLength(server
.slaves
),
4707 listLength(server
.slaves
),
4710 server
.bgsaveinprogress
,
4712 server
.stat_numconnections
,
4713 server
.stat_numcommands
,
4714 server
.masterhost
== NULL
? "master" : "slave"
4716 if (server
.masterhost
) {
4717 info
= sdscatprintf(info
,
4718 "master_host:%s\r\n"
4719 "master_port:%d\r\n"
4720 "master_link_status:%s\r\n"
4721 "master_last_io_seconds_ago:%d\r\n"
4724 (server
.replstate
== REDIS_REPL_CONNECTED
) ?
4726 server
.master
? ((int)(time(NULL
)-server
.master
->lastinteraction
)) : -1
4729 for (j
= 0; j
< server
.dbnum
; j
++) {
4730 long long keys
, vkeys
;
4732 keys
= dictSize(server
.db
[j
].dict
);
4733 vkeys
= dictSize(server
.db
[j
].expires
);
4734 if (keys
|| vkeys
) {
4735 info
= sdscatprintf(info
, "db%d: keys=%lld,expires=%lld\r\n",
4739 addReplySds(c
,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info
)));
4740 addReplySds(c
,info
);
4741 addReply(c
,shared
.crlf
);
4744 static void monitorCommand(redisClient
*c
) {
4745 /* ignore MONITOR if aleady slave or in monitor mode */
4746 if (c
->flags
& REDIS_SLAVE
) return;
4748 c
->flags
|= (REDIS_SLAVE
|REDIS_MONITOR
);
4750 listAddNodeTail(server
.monitors
,c
);
4751 addReply(c
,shared
.ok
);
4754 /* ================================= Expire ================================= */
4755 static int removeExpire(redisDb
*db
, robj
*key
) {
4756 if (dictDelete(db
->expires
,key
) == DICT_OK
) {
4763 static int setExpire(redisDb
*db
, robj
*key
, time_t when
) {
4764 if (dictAdd(db
->expires
,key
,(void*)when
) == DICT_ERR
) {
4772 /* Return the expire time of the specified key, or -1 if no expire
4773 * is associated with this key (i.e. the key is non volatile) */
4774 static time_t getExpire(redisDb
*db
, robj
*key
) {
4777 /* No expire? return ASAP */
4778 if (dictSize(db
->expires
) == 0 ||
4779 (de
= dictFind(db
->expires
,key
)) == NULL
) return -1;
4781 return (time_t) dictGetEntryVal(de
);
4784 static int expireIfNeeded(redisDb
*db
, robj
*key
) {
4788 /* No expire? return ASAP */
4789 if (dictSize(db
->expires
) == 0 ||
4790 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
4792 /* Lookup the expire */
4793 when
= (time_t) dictGetEntryVal(de
);
4794 if (time(NULL
) <= when
) return 0;
4796 /* Delete the key */
4797 dictDelete(db
->expires
,key
);
4798 return dictDelete(db
->dict
,key
) == DICT_OK
;
4801 static int deleteIfVolatile(redisDb
*db
, robj
*key
) {
4804 /* No expire? return ASAP */
4805 if (dictSize(db
->expires
) == 0 ||
4806 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
4808 /* Delete the key */
4810 dictDelete(db
->expires
,key
);
4811 return dictDelete(db
->dict
,key
) == DICT_OK
;
4814 static void expireGenericCommand(redisClient
*c
, robj
*key
, time_t seconds
) {
4817 de
= dictFind(c
->db
->dict
,key
);
4819 addReply(c
,shared
.czero
);
4823 if (deleteKey(c
->db
,key
)) server
.dirty
++;
4824 addReply(c
, shared
.cone
);
4827 time_t when
= time(NULL
)+seconds
;
4828 if (setExpire(c
->db
,key
,when
)) {
4829 addReply(c
,shared
.cone
);
4832 addReply(c
,shared
.czero
);
4838 static void expireCommand(redisClient
*c
) {
4839 expireGenericCommand(c
,c
->argv
[1],strtol(c
->argv
[2]->ptr
,NULL
,10));
4842 static void expireatCommand(redisClient
*c
) {
4843 expireGenericCommand(c
,c
->argv
[1],strtol(c
->argv
[2]->ptr
,NULL
,10)-time(NULL
));
4846 static void ttlCommand(redisClient
*c
) {
4850 expire
= getExpire(c
->db
,c
->argv
[1]);
4852 ttl
= (int) (expire
-time(NULL
));
4853 if (ttl
< 0) ttl
= -1;
4855 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",ttl
));
4858 static void msetGenericCommand(redisClient
*c
, int nx
) {
4861 if ((c
->argc
% 2) == 0) {
4862 addReplySds(c
,sdsnew("-ERR wrong number of arguments\r\n"));
4865 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
4866 * set nothing at all if at least one already key exists. */
4868 for (j
= 1; j
< c
->argc
; j
+= 2) {
4869 if (dictFind(c
->db
->dict
,c
->argv
[j
]) != NULL
) {
4870 addReply(c
, shared
.czero
);
4876 for (j
= 1; j
< c
->argc
; j
+= 2) {
4879 retval
= dictAdd(c
->db
->dict
,c
->argv
[j
],c
->argv
[j
+1]);
4880 if (retval
== DICT_ERR
) {
4881 dictReplace(c
->db
->dict
,c
->argv
[j
],c
->argv
[j
+1]);
4882 incrRefCount(c
->argv
[j
+1]);
4884 incrRefCount(c
->argv
[j
]);
4885 incrRefCount(c
->argv
[j
+1]);
4887 removeExpire(c
->db
,c
->argv
[j
]);
4889 server
.dirty
+= (c
->argc
-1)/2;
4890 addReply(c
, nx
? shared
.cone
: shared
.ok
);
4893 static void msetCommand(redisClient
*c
) {
4894 msetGenericCommand(c
,0);
4897 static void msetnxCommand(redisClient
*c
) {
4898 msetGenericCommand(c
,1);
4901 /* =============================== Replication ============================= */
4903 static int syncWrite(int fd
, char *ptr
, ssize_t size
, int timeout
) {
4904 ssize_t nwritten
, ret
= size
;
4905 time_t start
= time(NULL
);
4909 if (aeWait(fd
,AE_WRITABLE
,1000) & AE_WRITABLE
) {
4910 nwritten
= write(fd
,ptr
,size
);
4911 if (nwritten
== -1) return -1;
4915 if ((time(NULL
)-start
) > timeout
) {
4923 static int syncRead(int fd
, char *ptr
, ssize_t size
, int timeout
) {
4924 ssize_t nread
, totread
= 0;
4925 time_t start
= time(NULL
);
4929 if (aeWait(fd
,AE_READABLE
,1000) & AE_READABLE
) {
4930 nread
= read(fd
,ptr
,size
);
4931 if (nread
== -1) return -1;
4936 if ((time(NULL
)-start
) > timeout
) {
4944 static int syncReadLine(int fd
, char *ptr
, ssize_t size
, int timeout
) {
4951 if (syncRead(fd
,&c
,1,timeout
) == -1) return -1;
4954 if (nread
&& *(ptr
-1) == '\r') *(ptr
-1) = '\0';
4965 static void syncCommand(redisClient
*c
) {
4966 /* ignore SYNC if aleady slave or in monitor mode */
4967 if (c
->flags
& REDIS_SLAVE
) return;
4969 /* SYNC can't be issued when the server has pending data to send to
4970 * the client about already issued commands. We need a fresh reply
4971 * buffer registering the differences between the BGSAVE and the current
4972 * dataset, so that we can copy to other slaves if needed. */
4973 if (listLength(c
->reply
) != 0) {
4974 addReplySds(c
,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
4978 redisLog(REDIS_NOTICE
,"Slave ask for synchronization");
4979 /* Here we need to check if there is a background saving operation
4980 * in progress, or if it is required to start one */
4981 if (server
.bgsaveinprogress
) {
4982 /* Ok a background save is in progress. Let's check if it is a good
4983 * one for replication, i.e. if there is another slave that is
4984 * registering differences since the server forked to save */
4988 listRewind(server
.slaves
);
4989 while((ln
= listYield(server
.slaves
))) {
4991 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) break;
4994 /* Perfect, the server is already registering differences for
4995 * another slave. Set the right state, and copy the buffer. */
4996 listRelease(c
->reply
);
4997 c
->reply
= listDup(slave
->reply
);
4998 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
4999 redisLog(REDIS_NOTICE
,"Waiting for end of BGSAVE for SYNC");
5001 /* No way, we need to wait for the next BGSAVE in order to
5002 * register differences */
5003 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_START
;
5004 redisLog(REDIS_NOTICE
,"Waiting for next BGSAVE for SYNC");
5007 /* Ok we don't have a BGSAVE in progress, let's start one */
5008 redisLog(REDIS_NOTICE
,"Starting BGSAVE for SYNC");
5009 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
5010 redisLog(REDIS_NOTICE
,"Replication failed, can't BGSAVE");
5011 addReplySds(c
,sdsnew("-ERR Unalbe to perform background save\r\n"));
5014 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
5017 c
->flags
|= REDIS_SLAVE
;
5019 listAddNodeTail(server
.slaves
,c
);
5023 static void sendBulkToSlave(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
5024 redisClient
*slave
= privdata
;
5026 REDIS_NOTUSED(mask
);
5027 char buf
[REDIS_IOBUF_LEN
];
5028 ssize_t nwritten
, buflen
;
5030 if (slave
->repldboff
== 0) {
5031 /* Write the bulk write count before to transfer the DB. In theory here
5032 * we don't know how much room there is in the output buffer of the
5033 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
5034 * operations) will never be smaller than the few bytes we need. */
5037 bulkcount
= sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
5039 if (write(fd
,bulkcount
,sdslen(bulkcount
)) != (signed)sdslen(bulkcount
))
5047 lseek(slave
->repldbfd
,slave
->repldboff
,SEEK_SET
);
5048 buflen
= read(slave
->repldbfd
,buf
,REDIS_IOBUF_LEN
);
5050 redisLog(REDIS_WARNING
,"Read error sending DB to slave: %s",
5051 (buflen
== 0) ? "premature EOF" : strerror(errno
));
5055 if ((nwritten
= write(fd
,buf
,buflen
)) == -1) {
5056 redisLog(REDIS_DEBUG
,"Write error sending DB to slave: %s",
5061 slave
->repldboff
+= nwritten
;
5062 if (slave
->repldboff
== slave
->repldbsize
) {
5063 close(slave
->repldbfd
);
5064 slave
->repldbfd
= -1;
5065 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
5066 slave
->replstate
= REDIS_REPL_ONLINE
;
5067 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
,
5068 sendReplyToClient
, slave
, NULL
) == AE_ERR
) {
5072 addReplySds(slave
,sdsempty());
5073 redisLog(REDIS_NOTICE
,"Synchronization with slave succeeded");
5077 /* This function is called at the end of every backgrond saving.
5078 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
5079 * otherwise REDIS_ERR is passed to the function.
5081 * The goal of this function is to handle slaves waiting for a successful
5082 * background saving in order to perform non-blocking synchronization. */
5083 static void updateSlavesWaitingBgsave(int bgsaveerr
) {
5085 int startbgsave
= 0;
5087 listRewind(server
.slaves
);
5088 while((ln
= listYield(server
.slaves
))) {
5089 redisClient
*slave
= ln
->value
;
5091 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) {
5093 slave
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
5094 } else if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) {
5095 struct redis_stat buf
;
5097 if (bgsaveerr
!= REDIS_OK
) {
5099 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE child returned an error");
5102 if ((slave
->repldbfd
= open(server
.dbfilename
,O_RDONLY
)) == -1 ||
5103 redis_fstat(slave
->repldbfd
,&buf
) == -1) {
5105 redisLog(REDIS_WARNING
,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno
));
5108 slave
->repldboff
= 0;
5109 slave
->repldbsize
= buf
.st_size
;
5110 slave
->replstate
= REDIS_REPL_SEND_BULK
;
5111 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
5112 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
, sendBulkToSlave
, slave
, NULL
) == AE_ERR
) {
5119 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
5120 listRewind(server
.slaves
);
5121 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE failed");
5122 while((ln
= listYield(server
.slaves
))) {
5123 redisClient
*slave
= ln
->value
;
5125 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
)
5132 static int syncWithMaster(void) {
5133 char buf
[1024], tmpfile
[256];
5135 int fd
= anetTcpConnect(NULL
,server
.masterhost
,server
.masterport
);
5139 redisLog(REDIS_WARNING
,"Unable to connect to MASTER: %s",
5143 /* Issue the SYNC command */
5144 if (syncWrite(fd
,"SYNC \r\n",7,5) == -1) {
5146 redisLog(REDIS_WARNING
,"I/O error writing to MASTER: %s",
5150 /* Read the bulk write count */
5151 if (syncReadLine(fd
,buf
,1024,3600) == -1) {
5153 redisLog(REDIS_WARNING
,"I/O error reading bulk count from MASTER: %s",
5157 if (buf
[0] != '$') {
5159 redisLog(REDIS_WARNING
,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
5162 dumpsize
= atoi(buf
+1);
5163 redisLog(REDIS_NOTICE
,"Receiving %d bytes data dump from MASTER",dumpsize
);
5164 /* Read the bulk write data on a temp file */
5165 snprintf(tmpfile
,256,"temp-%d.%ld.rdb",(int)time(NULL
),(long int)random());
5166 dfd
= open(tmpfile
,O_CREAT
|O_WRONLY
,0644);
5169 redisLog(REDIS_WARNING
,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno
));
5173 int nread
, nwritten
;
5175 nread
= read(fd
,buf
,(dumpsize
< 1024)?dumpsize
:1024);
5177 redisLog(REDIS_WARNING
,"I/O error trying to sync with MASTER: %s",
5183 nwritten
= write(dfd
,buf
,nread
);
5184 if (nwritten
== -1) {
5185 redisLog(REDIS_WARNING
,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno
));
5193 if (rename(tmpfile
,server
.dbfilename
) == -1) {
5194 redisLog(REDIS_WARNING
,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno
));
5200 if (rdbLoad(server
.dbfilename
) != REDIS_OK
) {
5201 redisLog(REDIS_WARNING
,"Failed trying to load the MASTER synchronization DB from disk");
5205 server
.master
= createClient(fd
);
5206 server
.master
->flags
|= REDIS_MASTER
;
5207 server
.replstate
= REDIS_REPL_CONNECTED
;
5211 static void slaveofCommand(redisClient
*c
) {
5212 if (!strcasecmp(c
->argv
[1]->ptr
,"no") &&
5213 !strcasecmp(c
->argv
[2]->ptr
,"one")) {
5214 if (server
.masterhost
) {
5215 sdsfree(server
.masterhost
);
5216 server
.masterhost
= NULL
;
5217 if (server
.master
) freeClient(server
.master
);
5218 server
.replstate
= REDIS_REPL_NONE
;
5219 redisLog(REDIS_NOTICE
,"MASTER MODE enabled (user request)");
5222 sdsfree(server
.masterhost
);
5223 server
.masterhost
= sdsdup(c
->argv
[1]->ptr
);
5224 server
.masterport
= atoi(c
->argv
[2]->ptr
);
5225 if (server
.master
) freeClient(server
.master
);
5226 server
.replstate
= REDIS_REPL_CONNECT
;
5227 redisLog(REDIS_NOTICE
,"SLAVE OF %s:%d enabled (user request)",
5228 server
.masterhost
, server
.masterport
);
5230 addReply(c
,shared
.ok
);
5233 /* ============================ Maxmemory directive ======================== */
5235 /* This function gets called when 'maxmemory' is set on the config file to limit
5236 * the max memory used by the server, and we are out of memory.
5237 * This function will try to, in order:
5239 * - Free objects from the free list
5240 * - Try to remove keys with an EXPIRE set
5242 * It is not possible to free enough memory to reach used-memory < maxmemory
5243 * the server will start refusing commands that will enlarge even more the
5246 static void freeMemoryIfNeeded(void) {
5247 while (server
.maxmemory
&& zmalloc_used_memory() > server
.maxmemory
) {
5248 if (listLength(server
.objfreelist
)) {
5251 listNode
*head
= listFirst(server
.objfreelist
);
5252 o
= listNodeValue(head
);
5253 listDelNode(server
.objfreelist
,head
);
5256 int j
, k
, freed
= 0;
5258 for (j
= 0; j
< server
.dbnum
; j
++) {
5260 robj
*minkey
= NULL
;
5261 struct dictEntry
*de
;
5263 if (dictSize(server
.db
[j
].expires
)) {
5265 /* From a sample of three keys drop the one nearest to
5266 * the natural expire */
5267 for (k
= 0; k
< 3; k
++) {
5270 de
= dictGetRandomKey(server
.db
[j
].expires
);
5271 t
= (time_t) dictGetEntryVal(de
);
5272 if (minttl
== -1 || t
< minttl
) {
5273 minkey
= dictGetEntryKey(de
);
5277 deleteKey(server
.db
+j
,minkey
);
5280 if (!freed
) return; /* nothing to free... */
5285 /* ================================= Debugging ============================== */
5287 static void debugCommand(redisClient
*c
) {
5288 if (!strcasecmp(c
->argv
[1]->ptr
,"segfault")) {
5290 } else if (!strcasecmp(c
->argv
[1]->ptr
,"object") && c
->argc
== 3) {
5291 dictEntry
*de
= dictFind(c
->db
->dict
,c
->argv
[2]);
5295 addReply(c
,shared
.nokeyerr
);
5298 key
= dictGetEntryKey(de
);
5299 val
= dictGetEntryVal(de
);
5300 addReplySds(c
,sdscatprintf(sdsempty(),
5301 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
5302 key
, key
->refcount
, val
, val
->refcount
, val
->encoding
));
5304 addReplySds(c
,sdsnew(
5305 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>]\r\n"));
5309 #ifdef HAVE_BACKTRACE
5310 static struct redisFunctionSym symsTable
[] = {
5311 {"compareStringObjects", (unsigned long)compareStringObjects
},
5312 {"isStringRepresentableAsLong", (unsigned long)isStringRepresentableAsLong
},
5313 {"dictEncObjKeyCompare", (unsigned long)dictEncObjKeyCompare
},
5314 {"dictEncObjHash", (unsigned long)dictEncObjHash
},
5315 {"incrDecrCommand", (unsigned long)incrDecrCommand
},
5316 {"freeStringObject", (unsigned long)freeStringObject
},
5317 {"freeListObject", (unsigned long)freeListObject
},
5318 {"freeSetObject", (unsigned long)freeSetObject
},
5319 {"decrRefCount", (unsigned long)decrRefCount
},
5320 {"createObject", (unsigned long)createObject
},
5321 {"freeClient", (unsigned long)freeClient
},
5322 {"rdbLoad", (unsigned long)rdbLoad
},
5323 {"rdbSaveStringObject", (unsigned long)rdbSaveStringObject
},
5324 {"rdbSaveStringObjectRaw", (unsigned long)rdbSaveStringObjectRaw
},
5325 {"addReply", (unsigned long)addReply
},
5326 {"addReplySds", (unsigned long)addReplySds
},
5327 {"incrRefCount", (unsigned long)incrRefCount
},
5328 {"rdbSaveBackground", (unsigned long)rdbSaveBackground
},
5329 {"createStringObject", (unsigned long)createStringObject
},
5330 {"replicationFeedSlaves", (unsigned long)replicationFeedSlaves
},
5331 {"syncWithMaster", (unsigned long)syncWithMaster
},
5332 {"tryObjectSharing", (unsigned long)tryObjectSharing
},
5333 {"tryObjectEncoding", (unsigned long)tryObjectEncoding
},
5334 {"getDecodedObject", (unsigned long)getDecodedObject
},
5335 {"removeExpire", (unsigned long)removeExpire
},
5336 {"expireIfNeeded", (unsigned long)expireIfNeeded
},
5337 {"deleteIfVolatile", (unsigned long)deleteIfVolatile
},
5338 {"deleteKey", (unsigned long)deleteKey
},
5339 {"getExpire", (unsigned long)getExpire
},
5340 {"setExpire", (unsigned long)setExpire
},
5341 {"updateSlavesWaitingBgsave", (unsigned long)updateSlavesWaitingBgsave
},
5342 {"freeMemoryIfNeeded", (unsigned long)freeMemoryIfNeeded
},
5343 {"authCommand", (unsigned long)authCommand
},
5344 {"pingCommand", (unsigned long)pingCommand
},
5345 {"echoCommand", (unsigned long)echoCommand
},
5346 {"setCommand", (unsigned long)setCommand
},
5347 {"setnxCommand", (unsigned long)setnxCommand
},
5348 {"getCommand", (unsigned long)getCommand
},
5349 {"delCommand", (unsigned long)delCommand
},
5350 {"existsCommand", (unsigned long)existsCommand
},
5351 {"incrCommand", (unsigned long)incrCommand
},
5352 {"decrCommand", (unsigned long)decrCommand
},
5353 {"incrbyCommand", (unsigned long)incrbyCommand
},
5354 {"decrbyCommand", (unsigned long)decrbyCommand
},
5355 {"selectCommand", (unsigned long)selectCommand
},
5356 {"randomkeyCommand", (unsigned long)randomkeyCommand
},
5357 {"keysCommand", (unsigned long)keysCommand
},
5358 {"dbsizeCommand", (unsigned long)dbsizeCommand
},
5359 {"lastsaveCommand", (unsigned long)lastsaveCommand
},
5360 {"saveCommand", (unsigned long)saveCommand
},
5361 {"bgsaveCommand", (unsigned long)bgsaveCommand
},
5362 {"shutdownCommand", (unsigned long)shutdownCommand
},
5363 {"moveCommand", (unsigned long)moveCommand
},
5364 {"renameCommand", (unsigned long)renameCommand
},
5365 {"renamenxCommand", (unsigned long)renamenxCommand
},
5366 {"lpushCommand", (unsigned long)lpushCommand
},
5367 {"rpushCommand", (unsigned long)rpushCommand
},
5368 {"lpopCommand", (unsigned long)lpopCommand
},
5369 {"rpopCommand", (unsigned long)rpopCommand
},
5370 {"llenCommand", (unsigned long)llenCommand
},
5371 {"lindexCommand", (unsigned long)lindexCommand
},
5372 {"lrangeCommand", (unsigned long)lrangeCommand
},
5373 {"ltrimCommand", (unsigned long)ltrimCommand
},
5374 {"typeCommand", (unsigned long)typeCommand
},
5375 {"lsetCommand", (unsigned long)lsetCommand
},
5376 {"saddCommand", (unsigned long)saddCommand
},
5377 {"sremCommand", (unsigned long)sremCommand
},
5378 {"smoveCommand", (unsigned long)smoveCommand
},
5379 {"sismemberCommand", (unsigned long)sismemberCommand
},
5380 {"scardCommand", (unsigned long)scardCommand
},
5381 {"spopCommand", (unsigned long)spopCommand
},
5382 {"srandmemberCommand", (unsigned long)srandmemberCommand
},
5383 {"sinterCommand", (unsigned long)sinterCommand
},
5384 {"sinterstoreCommand", (unsigned long)sinterstoreCommand
},
5385 {"sunionCommand", (unsigned long)sunionCommand
},
5386 {"sunionstoreCommand", (unsigned long)sunionstoreCommand
},
5387 {"sdiffCommand", (unsigned long)sdiffCommand
},
5388 {"sdiffstoreCommand", (unsigned long)sdiffstoreCommand
},
5389 {"syncCommand", (unsigned long)syncCommand
},
5390 {"flushdbCommand", (unsigned long)flushdbCommand
},
5391 {"flushallCommand", (unsigned long)flushallCommand
},
5392 {"sortCommand", (unsigned long)sortCommand
},
5393 {"lremCommand", (unsigned long)lremCommand
},
5394 {"infoCommand", (unsigned long)infoCommand
},
5395 {"mgetCommand", (unsigned long)mgetCommand
},
5396 {"monitorCommand", (unsigned long)monitorCommand
},
5397 {"expireCommand", (unsigned long)expireCommand
},
5398 {"expireatCommand", (unsigned long)expireatCommand
},
5399 {"getsetCommand", (unsigned long)getsetCommand
},
5400 {"ttlCommand", (unsigned long)ttlCommand
},
5401 {"slaveofCommand", (unsigned long)slaveofCommand
},
5402 {"debugCommand", (unsigned long)debugCommand
},
5403 {"processCommand", (unsigned long)processCommand
},
5404 {"setupSigSegvAction", (unsigned long)setupSigSegvAction
},
5405 {"readQueryFromClient", (unsigned long)readQueryFromClient
},
5406 {"rdbRemoveTempFile", (unsigned long)rdbRemoveTempFile
},
5407 {"msetGenericCommand", (unsigned long)msetGenericCommand
},
5408 {"msetCommand", (unsigned long)msetCommand
},
5409 {"msetnxCommand", (unsigned long)msetnxCommand
},
5410 {"zslCreateNode", (unsigned long)zslCreateNode
},
5411 {"zslCreate", (unsigned long)zslCreate
},
5412 {"zslFreeNode",(unsigned long)zslFreeNode
},
5413 {"zslFree",(unsigned long)zslFree
},
5414 {"zslRandomLevel",(unsigned long)zslRandomLevel
},
5415 {"zslInsert",(unsigned long)zslInsert
},
5416 {"zslDelete",(unsigned long)zslDelete
},
5417 {"createZsetObject",(unsigned long)createZsetObject
},
5418 {"zaddCommand",(unsigned long)zaddCommand
},
5419 {"zrangeGenericCommand",(unsigned long)zrangeGenericCommand
},
5420 {"zrangeCommand",(unsigned long)zrangeCommand
},
5421 {"zrevrangeCommand",(unsigned long)zrevrangeCommand
},
5422 {"zremCommand",(unsigned long)zremCommand
},
5423 {"rdbSaveDoubleValue",(unsigned long)rdbSaveDoubleValue
},
5424 {"rdbLoadDoubleValue",(unsigned long)rdbLoadDoubleValue
},
5428 /* This function try to convert a pointer into a function name. It's used in
5429 * oreder to provide a backtrace under segmentation fault that's able to
5430 * display functions declared as static (otherwise the backtrace is useless). */
5431 static char *findFuncName(void *pointer
, unsigned long *offset
){
5433 unsigned long off
, minoff
= 0;
5435 /* Try to match against the Symbol with the smallest offset */
5436 for (i
=0; symsTable
[i
].pointer
; i
++) {
5437 unsigned long lp
= (unsigned long) pointer
;
5439 if (lp
!= (unsigned long)-1 && lp
>= symsTable
[i
].pointer
) {
5440 off
=lp
-symsTable
[i
].pointer
;
5441 if (ret
< 0 || off
< minoff
) {
5447 if (ret
== -1) return NULL
;
5449 return symsTable
[ret
].name
;
5452 static void *getMcontextEip(ucontext_t
*uc
) {
5453 #if defined(__FreeBSD__)
5454 return (void*) uc
->uc_mcontext
.mc_eip
;
5455 #elif defined(__dietlibc__)
5456 return (void*) uc
->uc_mcontext
.eip
;
5457 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
5458 return (void*) uc
->uc_mcontext
->__ss
.__eip
;
5459 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
5460 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
5461 return (void*) uc
->uc_mcontext
->__ss
.__rip
;
5463 return (void*) uc
->uc_mcontext
->__ss
.__eip
;
5465 #elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
5466 return (void*) uc
->uc_mcontext
.gregs
[REG_EIP
];
5467 #elif defined(__ia64__) /* Linux IA64 */
5468 return (void*) uc
->uc_mcontext
.sc_ip
;
5474 static void segvHandler(int sig
, siginfo_t
*info
, void *secret
) {
5476 char **messages
= NULL
;
5477 int i
, trace_size
= 0;
5478 unsigned long offset
=0;
5479 time_t uptime
= time(NULL
)-server
.stat_starttime
;
5480 ucontext_t
*uc
= (ucontext_t
*) secret
;
5481 REDIS_NOTUSED(info
);
5483 redisLog(REDIS_WARNING
,
5484 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION
, sig
);
5485 redisLog(REDIS_WARNING
, "%s", sdscatprintf(sdsempty(),
5486 "redis_version:%s; "
5487 "uptime_in_seconds:%d; "
5488 "connected_clients:%d; "
5489 "connected_slaves:%d; "
5491 "changes_since_last_save:%lld; "
5492 "bgsave_in_progress:%d; "
5493 "last_save_time:%d; "
5494 "total_connections_received:%lld; "
5495 "total_commands_processed:%lld; "
5499 listLength(server
.clients
)-listLength(server
.slaves
),
5500 listLength(server
.slaves
),
5503 server
.bgsaveinprogress
,
5505 server
.stat_numconnections
,
5506 server
.stat_numcommands
,
5507 server
.masterhost
== NULL
? "master" : "slave"
5510 trace_size
= backtrace(trace
, 100);
5511 /* overwrite sigaction with caller's address */
5512 if (getMcontextEip(uc
) != NULL
) {
5513 trace
[1] = getMcontextEip(uc
);
5515 messages
= backtrace_symbols(trace
, trace_size
);
5517 for (i
=1; i
<trace_size
; ++i
) {
5518 char *fn
= findFuncName(trace
[i
], &offset
), *p
;
5520 p
= strchr(messages
[i
],'+');
5521 if (!fn
|| (p
&& ((unsigned long)strtol(p
+1,NULL
,10)) < offset
)) {
5522 redisLog(REDIS_WARNING
,"%s", messages
[i
]);
5524 redisLog(REDIS_WARNING
,"%d redis-server %p %s + %d", i
, trace
[i
], fn
, (unsigned int)offset
);
5531 static void setupSigSegvAction(void) {
5532 struct sigaction act
;
5534 sigemptyset (&act
.sa_mask
);
5535 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
5536 * is used. Otherwise, sa_handler is used */
5537 act
.sa_flags
= SA_NODEFER
| SA_ONSTACK
| SA_RESETHAND
| SA_SIGINFO
;
5538 act
.sa_sigaction
= segvHandler
;
5539 sigaction (SIGSEGV
, &act
, NULL
);
5540 sigaction (SIGBUS
, &act
, NULL
);
5541 sigaction (SIGFPE
, &act
, NULL
);
5542 sigaction (SIGILL
, &act
, NULL
);
5543 sigaction (SIGBUS
, &act
, NULL
);
5546 #else /* HAVE_BACKTRACE */
5547 static void setupSigSegvAction(void) {
5549 #endif /* HAVE_BACKTRACE */
5551 /* =================================== Main! ================================ */
5554 int linuxOvercommitMemoryValue(void) {
5555 FILE *fp
= fopen("/proc/sys/vm/overcommit_memory","r");
5559 if (fgets(buf
,64,fp
) == NULL
) {
5568 void linuxOvercommitMemoryWarning(void) {
5569 if (linuxOvercommitMemoryValue() == 0) {
5570 redisLog(REDIS_WARNING
,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
5573 #endif /* __linux__ */
5575 static void daemonize(void) {
5579 if (fork() != 0) exit(0); /* parent exits */
5580 setsid(); /* create a new session */
5582 /* Every output goes to /dev/null. If Redis is daemonized but
5583 * the 'logfile' is set to 'stdout' in the configuration file
5584 * it will not log at all. */
5585 if ((fd
= open("/dev/null", O_RDWR
, 0)) != -1) {
5586 dup2(fd
, STDIN_FILENO
);
5587 dup2(fd
, STDOUT_FILENO
);
5588 dup2(fd
, STDERR_FILENO
);
5589 if (fd
> STDERR_FILENO
) close(fd
);
5591 /* Try to write the pid file */
5592 fp
= fopen(server
.pidfile
,"w");
5594 fprintf(fp
,"%d\n",getpid());
5599 int main(int argc
, char **argv
) {
5602 ResetServerSaveParams();
5603 loadServerConfig(argv
[1]);
5604 } else if (argc
> 2) {
5605 fprintf(stderr
,"Usage: ./redis-server [/path/to/redis.conf]\n");
5608 redisLog(REDIS_WARNING
,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
5611 if (server
.daemonize
) daemonize();
5612 redisLog(REDIS_NOTICE
,"Server started, Redis version " REDIS_VERSION
);
5614 linuxOvercommitMemoryWarning();
5616 if (rdbLoad(server
.dbfilename
) == REDIS_OK
)
5617 redisLog(REDIS_NOTICE
,"DB loaded from disk");
5618 if (aeCreateFileEvent(server
.el
, server
.fd
, AE_READABLE
,
5619 acceptHandler
, NULL
, NULL
) == AE_ERR
) oom("creating file event");
5620 redisLog(REDIS_NOTICE
,"The server is now ready to accept connections on port %d", server
.port
);
5622 aeDeleteEventLoop(server
.el
);