2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
30 #define REDIS_VERSION "1.3.2"
40 #define __USE_POSIX199309
46 #endif /* HAVE_BACKTRACE */
54 #include <arpa/inet.h>
58 #include <sys/resource.h>
65 #include "solarisfixes.h"
69 #include "ae.h" /* Event driven programming library */
70 #include "sds.h" /* Dynamic safe strings */
71 #include "anet.h" /* Networking the easy way */
72 #include "dict.h" /* Hash tables */
73 #include "adlist.h" /* Linked lists */
74 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
75 #include "lzf.h" /* LZF compression library */
76 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
82 /* Static server configuration */
83 #define REDIS_SERVERPORT 6379 /* TCP port */
84 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
85 #define REDIS_IOBUF_LEN 1024
86 #define REDIS_LOADBUF_LEN 1024
87 #define REDIS_STATIC_ARGS 4
88 #define REDIS_DEFAULT_DBNUM 16
89 #define REDIS_CONFIGLINE_MAX 1024
90 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
91 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
92 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
93 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
94 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
96 /* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */
97 #define REDIS_WRITEV_THRESHOLD 3
98 /* Max number of iovecs used for each writev call */
99 #define REDIS_WRITEV_IOVEC_COUNT 256
101 /* Hash table parameters */
102 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
105 #define REDIS_CMD_BULK 1 /* Bulk write command */
106 #define REDIS_CMD_INLINE 2 /* Inline command */
107 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
108 this flags will return an error when the 'maxmemory' option is set in the
109 config file and the server is using more than maxmemory bytes of memory.
110 In short this commands are denied on low memory conditions. */
111 #define REDIS_CMD_DENYOOM 4
114 #define REDIS_STRING 0
120 /* Objects encoding */
121 #define REDIS_ENCODING_RAW 0 /* Raw representation */
122 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
124 /* Object types only used for dumping to disk */
125 #define REDIS_EXPIRETIME 253
126 #define REDIS_SELECTDB 254
127 #define REDIS_EOF 255
129 /* Defines related to the dump file format. To store 32 bits lengths for short
130 * keys requires a lot of space, so we check the most significant 2 bits of
131 * the first byte to interpreter the length:
133 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
134 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
135 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
136 * 11|000000 this means: specially encoded object will follow. The six bits
137 * number specify the kind of object that follows.
138 * See the REDIS_RDB_ENC_* defines.
140 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
141 * values, will fit inside. */
142 #define REDIS_RDB_6BITLEN 0
143 #define REDIS_RDB_14BITLEN 1
144 #define REDIS_RDB_32BITLEN 2
145 #define REDIS_RDB_ENCVAL 3
146 #define REDIS_RDB_LENERR UINT_MAX
148 /* When a length of a string object stored on disk has the first two bits
149 * set, the remaining two bits specify a special encoding for the object
150 * accordingly to the following defines: */
151 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
152 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
153 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
154 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
156 /* Virtual memory object->where field. */
157 #define REDIS_VM_MEMORY 0 /* The object is on memory */
158 #define REDIS_VM_SWAPPED 1 /* The object is on disk */
159 #define REDIS_VM_SWAPPING 2 /* Redis is swapping this object on disk */
160 #define REDIS_VM_LOADING 3 /* Redis is loading this object from disk */
162 /* Virtual memory static configuration stuff.
163 * Check vmFindContiguousPages() to know more about this magic numbers. */
164 #define REDIS_VM_MAX_NEAR_PAGES 65536
165 #define REDIS_VM_MAX_RANDOM_JUMP 4096
166 #define REDIS_VM_MAX_THREADS 32
169 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
170 #define REDIS_SLAVE 2 /* This client is a slave server */
171 #define REDIS_MASTER 4 /* This client is a master server */
172 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
173 #define REDIS_MULTI 16 /* This client is in a MULTI context */
174 #define REDIS_BLOCKED 32 /* The client is waiting in a blocking operation */
175 #define REDIS_IO_WAIT 64 /* The client is waiting for Virutal Memory I/O */
177 /* Slave replication state - slave side */
178 #define REDIS_REPL_NONE 0 /* No active replication */
179 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
180 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
182 /* Slave replication state - from the point of view of master
183 * Note that in SEND_BULK and ONLINE state the slave receives new updates
184 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
185 * to start the next background saving in order to send updates to it. */
186 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
187 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
188 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
189 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
191 /* List related stuff */
195 /* Sort operations */
196 #define REDIS_SORT_GET 0
197 #define REDIS_SORT_ASC 1
198 #define REDIS_SORT_DESC 2
199 #define REDIS_SORTKEY_MAX 1024
202 #define REDIS_DEBUG 0
203 #define REDIS_VERBOSE 1
204 #define REDIS_NOTICE 2
205 #define REDIS_WARNING 3
207 /* Anti-warning macro... */
208 #define REDIS_NOTUSED(V) ((void) V)
210 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
211 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
213 /* Append only defines */
214 #define APPENDFSYNC_NO 0
215 #define APPENDFSYNC_ALWAYS 1
216 #define APPENDFSYNC_EVERYSEC 2
218 /* We can print the stacktrace, so our assert is defined this way: */
219 #define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e),exit(1)))
220 static void _redisAssert(char *estr
);
222 /*================================= Data types ============================== */
224 /* A redis object, that is a type able to hold a string / list / set */
226 /* The VM object structure */
227 struct redisObjectVM
{
228 off_t page
; /* the page at witch the object is stored on disk */
229 off_t usedpages
; /* number of pages used on disk */
230 time_t atime
; /* Last access time */
233 /* The actual Redis Object */
234 typedef struct redisObject
{
237 unsigned char encoding
;
238 unsigned char storage
; /* If this object is a key, where is the value?
239 * REDIS_VM_MEMORY, REDIS_VM_SWAPPED, ... */
240 unsigned char vtype
; /* If this object is a key, and value is swapped out,
241 * this is the type of the swapped out object. */
243 /* VM fields, this are only allocated if VM is active, otherwise the
244 * object allocation function will just allocate
245 * sizeof(redisObjct) minus sizeof(redisObjectVM), so using
246 * Redis without VM active will not have any overhead. */
247 struct redisObjectVM vm
;
250 /* Macro used to initalize a Redis object allocated on the stack.
251 * Note that this macro is taken near the structure definition to make sure
252 * we'll update it when the structure is changed, to avoid bugs like
253 * bug #85 introduced exactly in this way. */
254 #define initStaticStringObject(_var,_ptr) do { \
256 _var.type = REDIS_STRING; \
257 _var.encoding = REDIS_ENCODING_RAW; \
259 if (server.vm_enabled) _var.storage = REDIS_VM_MEMORY; \
262 typedef struct redisDb
{
263 dict
*dict
; /* The keyspace for this DB */
264 dict
*expires
; /* Timeout of keys with a timeout set */
265 dict
*blockingkeys
; /* Keys with clients waiting for data (BLPOP) */
269 /* Client MULTI/EXEC state */
270 typedef struct multiCmd
{
273 struct redisCommand
*cmd
;
276 typedef struct multiState
{
277 multiCmd
*commands
; /* Array of MULTI commands */
278 int count
; /* Total number of MULTI commands */
281 /* With multiplexing we need to take per-clinet state.
282 * Clients are taken in a liked list. */
283 typedef struct redisClient
{
288 robj
**argv
, **mbargv
;
290 int bulklen
; /* bulk read len. -1 if not in bulk read mode */
291 int multibulk
; /* multi bulk command format active */
294 time_t lastinteraction
; /* time of the last interaction, used for timeout */
295 int flags
; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
297 int slaveseldb
; /* slave selected db, if this client is a slave */
298 int authenticated
; /* when requirepass is non-NULL */
299 int replstate
; /* replication state if this is a slave */
300 int repldbfd
; /* replication DB file descriptor */
301 long repldboff
; /* replication DB file offset */
302 off_t repldbsize
; /* replication DB file size */
303 multiState mstate
; /* MULTI/EXEC state */
304 robj
**blockingkeys
; /* The key we waiting to terminate a blocking
305 * operation such as BLPOP. Otherwise NULL. */
306 int blockingkeysnum
; /* Number of blocking keys */
307 time_t blockingto
; /* Blocking operation timeout. If UNIX current time
308 * is >= blockingto then the operation timed out. */
309 list
*io_keys
; /* Keys this client is waiting to be loaded from the
310 * swap file in order to continue. */
318 /* Global server state structure */
323 dict
*sharingpool
; /* Poll used for object sharing */
324 unsigned int sharingpoolsize
;
325 long long dirty
; /* changes to DB from the last save */
327 list
*slaves
, *monitors
;
328 char neterr
[ANET_ERR_LEN
];
330 int cronloops
; /* number of times the cron function run */
331 list
*objfreelist
; /* A list of freed objects to avoid malloc() */
332 time_t lastsave
; /* Unix time of last save succeeede */
333 size_t usedmemory
; /* Used memory in megabytes */
334 /* Fields used only for stats */
335 time_t stat_starttime
; /* server start time */
336 long long stat_numcommands
; /* number of processed commands */
337 long long stat_numconnections
; /* number of connections received */
350 pid_t bgsavechildpid
;
351 pid_t bgrewritechildpid
;
352 sds bgrewritebuf
; /* buffer taken by parent during oppend only rewrite */
353 struct saveparam
*saveparams
;
358 char *appendfilename
;
362 /* Replication related */
367 redisClient
*master
; /* client that is master for this slave */
369 unsigned int maxclients
;
370 unsigned long long maxmemory
;
371 unsigned int blockedclients
;
372 /* Sort parameters - qsort_r() is only available under BSD so we
373 * have to take this state global, in order to pass it to sortCompare() */
377 /* Virtual memory configuration */
381 unsigned long long vm_max_memory
;
382 /* Virtual memory state */
385 off_t vm_next_page
; /* Next probably empty page */
386 off_t vm_near_pages
; /* Number of pages allocated sequentially */
387 unsigned char *vm_bitmap
; /* Bitmap of free/used pages */
388 time_t unixtime
; /* Unix time sampled every second. */
389 /* Virtual memory I/O threads stuff */
390 pthread_t io_threads
[REDIS_VM_MAX_THREADS
];
391 /* An I/O thread process an element taken from the io_jobs queue and
392 * put the result of the operation in the io_done list. */
393 list
*io_jobs
; /* List of VM I/O jobs */
394 list
*io_done
; /* List of VM processed jobs */
395 list
*io_clients
; /* All the clients waiting for SWAP I/O operations */
396 pthread_mutex_t io_mutex
; /* lock to access io_jobs and io_done */
397 int io_active_threads
; /* Number of running I/O threads */
398 int vm_max_threads
; /* Max number of I/O threads running at the same time */
399 /* Virtual memory stats */
400 unsigned long long vm_stats_used_pages
;
401 unsigned long long vm_stats_swapped_objects
;
402 unsigned long long vm_stats_swapouts
;
403 unsigned long long vm_stats_swapins
;
406 typedef void redisCommandProc(redisClient
*c
);
407 struct redisCommand
{
409 redisCommandProc
*proc
;
414 struct redisFunctionSym
{
416 unsigned long pointer
;
419 typedef struct _redisSortObject
{
427 typedef struct _redisSortOperation
{
430 } redisSortOperation
;
432 /* ZSETs use a specialized version of Skiplists */
434 typedef struct zskiplistNode
{
435 struct zskiplistNode
**forward
;
436 struct zskiplistNode
*backward
;
441 typedef struct zskiplist
{
442 struct zskiplistNode
*header
, *tail
;
443 unsigned long length
;
447 typedef struct zset
{
452 /* Our shared "common" objects */
454 struct sharedObjectsStruct
{
455 robj
*crlf
, *ok
, *err
, *emptybulk
, *czero
, *cone
, *pong
, *space
,
456 *colon
, *nullbulk
, *nullmultibulk
, *queued
,
457 *emptymultibulk
, *wrongtypeerr
, *nokeyerr
, *syntaxerr
, *sameobjecterr
,
458 *outofrangeerr
, *plus
,
459 *select0
, *select1
, *select2
, *select3
, *select4
,
460 *select5
, *select6
, *select7
, *select8
, *select9
;
463 /* Global vars that are actally used as constants. The following double
464 * values are used for double on-disk serialization, and are initialized
465 * at runtime to avoid strange compiler optimizations. */
467 static double R_Zero
, R_PosInf
, R_NegInf
, R_Nan
;
469 /* VM threaded I/O request message */
470 #define REDIS_IOREQ_LOAD 0
471 #define REDIS_IOREQ_SWAP 1
472 typedef struct ioreq
{
473 int type
; /* Request type, REDIS_IOREQ_* */
474 int dbid
; /* Redis database ID */
475 robj
*key
; /* This I/O request is about swapping this key */
476 robj
*val
; /* the value to swap for REDIS_IOREQ_SWAP, otherwise this
477 * field is populated by the I/O thread for REDIS_IOREQ_LOAD. */
478 off_t page
; /* Swap page where to read/write the object */
481 /*================================ Prototypes =============================== */
483 static void freeStringObject(robj
*o
);
484 static void freeListObject(robj
*o
);
485 static void freeSetObject(robj
*o
);
486 static void decrRefCount(void *o
);
487 static robj
*createObject(int type
, void *ptr
);
488 static void freeClient(redisClient
*c
);
489 static int rdbLoad(char *filename
);
490 static void addReply(redisClient
*c
, robj
*obj
);
491 static void addReplySds(redisClient
*c
, sds s
);
492 static void incrRefCount(robj
*o
);
493 static int rdbSaveBackground(char *filename
);
494 static robj
*createStringObject(char *ptr
, size_t len
);
495 static robj
*dupStringObject(robj
*o
);
496 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
);
497 static void feedAppendOnlyFile(struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
);
498 static int syncWithMaster(void);
499 static robj
*tryObjectSharing(robj
*o
);
500 static int tryObjectEncoding(robj
*o
);
501 static robj
*getDecodedObject(robj
*o
);
502 static int removeExpire(redisDb
*db
, robj
*key
);
503 static int expireIfNeeded(redisDb
*db
, robj
*key
);
504 static int deleteIfVolatile(redisDb
*db
, robj
*key
);
505 static int deleteIfSwapped(redisDb
*db
, robj
*key
);
506 static int deleteKey(redisDb
*db
, robj
*key
);
507 static time_t getExpire(redisDb
*db
, robj
*key
);
508 static int setExpire(redisDb
*db
, robj
*key
, time_t when
);
509 static void updateSlavesWaitingBgsave(int bgsaveerr
);
510 static void freeMemoryIfNeeded(void);
511 static int processCommand(redisClient
*c
);
512 static void setupSigSegvAction(void);
513 static void rdbRemoveTempFile(pid_t childpid
);
514 static void aofRemoveTempFile(pid_t childpid
);
515 static size_t stringObjectLen(robj
*o
);
516 static void processInputBuffer(redisClient
*c
);
517 static zskiplist
*zslCreate(void);
518 static void zslFree(zskiplist
*zsl
);
519 static void zslInsert(zskiplist
*zsl
, double score
, robj
*obj
);
520 static void sendReplyToClientWritev(aeEventLoop
*el
, int fd
, void *privdata
, int mask
);
521 static void initClientMultiState(redisClient
*c
);
522 static void freeClientMultiState(redisClient
*c
);
523 static void queueMultiCommand(redisClient
*c
, struct redisCommand
*cmd
);
524 static void unblockClient(redisClient
*c
);
525 static int handleClientsWaitingListPush(redisClient
*c
, robj
*key
, robj
*ele
);
526 static void vmInit(void);
527 static void vmMarkPagesFree(off_t page
, off_t count
);
528 static robj
*vmLoadObject(robj
*key
);
529 static robj
*vmPreviewObject(robj
*key
);
530 static int vmSwapOneObject(void);
531 static int vmCanSwapOut(void);
532 static void freeOneObjectFromFreelist(void);
534 static void authCommand(redisClient
*c
);
535 static void pingCommand(redisClient
*c
);
536 static void echoCommand(redisClient
*c
);
537 static void setCommand(redisClient
*c
);
538 static void setnxCommand(redisClient
*c
);
539 static void getCommand(redisClient
*c
);
540 static void delCommand(redisClient
*c
);
541 static void existsCommand(redisClient
*c
);
542 static void incrCommand(redisClient
*c
);
543 static void decrCommand(redisClient
*c
);
544 static void incrbyCommand(redisClient
*c
);
545 static void decrbyCommand(redisClient
*c
);
546 static void selectCommand(redisClient
*c
);
547 static void randomkeyCommand(redisClient
*c
);
548 static void keysCommand(redisClient
*c
);
549 static void dbsizeCommand(redisClient
*c
);
550 static void lastsaveCommand(redisClient
*c
);
551 static void saveCommand(redisClient
*c
);
552 static void bgsaveCommand(redisClient
*c
);
553 static void bgrewriteaofCommand(redisClient
*c
);
554 static void shutdownCommand(redisClient
*c
);
555 static void moveCommand(redisClient
*c
);
556 static void renameCommand(redisClient
*c
);
557 static void renamenxCommand(redisClient
*c
);
558 static void lpushCommand(redisClient
*c
);
559 static void rpushCommand(redisClient
*c
);
560 static void lpopCommand(redisClient
*c
);
561 static void rpopCommand(redisClient
*c
);
562 static void llenCommand(redisClient
*c
);
563 static void lindexCommand(redisClient
*c
);
564 static void lrangeCommand(redisClient
*c
);
565 static void ltrimCommand(redisClient
*c
);
566 static void typeCommand(redisClient
*c
);
567 static void lsetCommand(redisClient
*c
);
568 static void saddCommand(redisClient
*c
);
569 static void sremCommand(redisClient
*c
);
570 static void smoveCommand(redisClient
*c
);
571 static void sismemberCommand(redisClient
*c
);
572 static void scardCommand(redisClient
*c
);
573 static void spopCommand(redisClient
*c
);
574 static void srandmemberCommand(redisClient
*c
);
575 static void sinterCommand(redisClient
*c
);
576 static void sinterstoreCommand(redisClient
*c
);
577 static void sunionCommand(redisClient
*c
);
578 static void sunionstoreCommand(redisClient
*c
);
579 static void sdiffCommand(redisClient
*c
);
580 static void sdiffstoreCommand(redisClient
*c
);
581 static void syncCommand(redisClient
*c
);
582 static void flushdbCommand(redisClient
*c
);
583 static void flushallCommand(redisClient
*c
);
584 static void sortCommand(redisClient
*c
);
585 static void lremCommand(redisClient
*c
);
586 static void rpoplpushcommand(redisClient
*c
);
587 static void infoCommand(redisClient
*c
);
588 static void mgetCommand(redisClient
*c
);
589 static void monitorCommand(redisClient
*c
);
590 static void expireCommand(redisClient
*c
);
591 static void expireatCommand(redisClient
*c
);
592 static void getsetCommand(redisClient
*c
);
593 static void ttlCommand(redisClient
*c
);
594 static void slaveofCommand(redisClient
*c
);
595 static void debugCommand(redisClient
*c
);
596 static void msetCommand(redisClient
*c
);
597 static void msetnxCommand(redisClient
*c
);
598 static void zaddCommand(redisClient
*c
);
599 static void zincrbyCommand(redisClient
*c
);
600 static void zrangeCommand(redisClient
*c
);
601 static void zrangebyscoreCommand(redisClient
*c
);
602 static void zrevrangeCommand(redisClient
*c
);
603 static void zcardCommand(redisClient
*c
);
604 static void zremCommand(redisClient
*c
);
605 static void zscoreCommand(redisClient
*c
);
606 static void zremrangebyscoreCommand(redisClient
*c
);
607 static void multiCommand(redisClient
*c
);
608 static void execCommand(redisClient
*c
);
609 static void blpopCommand(redisClient
*c
);
610 static void brpopCommand(redisClient
*c
);
612 /*================================= Globals ================================= */
615 static struct redisServer server
; /* server global state */
616 static struct redisCommand cmdTable
[] = {
617 {"get",getCommand
,2,REDIS_CMD_INLINE
},
618 {"set",setCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
619 {"setnx",setnxCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
620 {"del",delCommand
,-2,REDIS_CMD_INLINE
},
621 {"exists",existsCommand
,2,REDIS_CMD_INLINE
},
622 {"incr",incrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
623 {"decr",decrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
624 {"mget",mgetCommand
,-2,REDIS_CMD_INLINE
},
625 {"rpush",rpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
626 {"lpush",lpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
627 {"rpop",rpopCommand
,2,REDIS_CMD_INLINE
},
628 {"lpop",lpopCommand
,2,REDIS_CMD_INLINE
},
629 {"brpop",brpopCommand
,-3,REDIS_CMD_INLINE
},
630 {"blpop",blpopCommand
,-3,REDIS_CMD_INLINE
},
631 {"llen",llenCommand
,2,REDIS_CMD_INLINE
},
632 {"lindex",lindexCommand
,3,REDIS_CMD_INLINE
},
633 {"lset",lsetCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
634 {"lrange",lrangeCommand
,4,REDIS_CMD_INLINE
},
635 {"ltrim",ltrimCommand
,4,REDIS_CMD_INLINE
},
636 {"lrem",lremCommand
,4,REDIS_CMD_BULK
},
637 {"rpoplpush",rpoplpushcommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
638 {"sadd",saddCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
639 {"srem",sremCommand
,3,REDIS_CMD_BULK
},
640 {"smove",smoveCommand
,4,REDIS_CMD_BULK
},
641 {"sismember",sismemberCommand
,3,REDIS_CMD_BULK
},
642 {"scard",scardCommand
,2,REDIS_CMD_INLINE
},
643 {"spop",spopCommand
,2,REDIS_CMD_INLINE
},
644 {"srandmember",srandmemberCommand
,2,REDIS_CMD_INLINE
},
645 {"sinter",sinterCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
646 {"sinterstore",sinterstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
647 {"sunion",sunionCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
648 {"sunionstore",sunionstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
649 {"sdiff",sdiffCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
650 {"sdiffstore",sdiffstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
651 {"smembers",sinterCommand
,2,REDIS_CMD_INLINE
},
652 {"zadd",zaddCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
653 {"zincrby",zincrbyCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
654 {"zrem",zremCommand
,3,REDIS_CMD_BULK
},
655 {"zremrangebyscore",zremrangebyscoreCommand
,4,REDIS_CMD_INLINE
},
656 {"zrange",zrangeCommand
,-4,REDIS_CMD_INLINE
},
657 {"zrangebyscore",zrangebyscoreCommand
,-4,REDIS_CMD_INLINE
},
658 {"zrevrange",zrevrangeCommand
,-4,REDIS_CMD_INLINE
},
659 {"zcard",zcardCommand
,2,REDIS_CMD_INLINE
},
660 {"zscore",zscoreCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
661 {"incrby",incrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
662 {"decrby",decrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
663 {"getset",getsetCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
664 {"mset",msetCommand
,-3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
665 {"msetnx",msetnxCommand
,-3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
666 {"randomkey",randomkeyCommand
,1,REDIS_CMD_INLINE
},
667 {"select",selectCommand
,2,REDIS_CMD_INLINE
},
668 {"move",moveCommand
,3,REDIS_CMD_INLINE
},
669 {"rename",renameCommand
,3,REDIS_CMD_INLINE
},
670 {"renamenx",renamenxCommand
,3,REDIS_CMD_INLINE
},
671 {"expire",expireCommand
,3,REDIS_CMD_INLINE
},
672 {"expireat",expireatCommand
,3,REDIS_CMD_INLINE
},
673 {"keys",keysCommand
,2,REDIS_CMD_INLINE
},
674 {"dbsize",dbsizeCommand
,1,REDIS_CMD_INLINE
},
675 {"auth",authCommand
,2,REDIS_CMD_INLINE
},
676 {"ping",pingCommand
,1,REDIS_CMD_INLINE
},
677 {"echo",echoCommand
,2,REDIS_CMD_BULK
},
678 {"save",saveCommand
,1,REDIS_CMD_INLINE
},
679 {"bgsave",bgsaveCommand
,1,REDIS_CMD_INLINE
},
680 {"bgrewriteaof",bgrewriteaofCommand
,1,REDIS_CMD_INLINE
},
681 {"shutdown",shutdownCommand
,1,REDIS_CMD_INLINE
},
682 {"lastsave",lastsaveCommand
,1,REDIS_CMD_INLINE
},
683 {"type",typeCommand
,2,REDIS_CMD_INLINE
},
684 {"multi",multiCommand
,1,REDIS_CMD_INLINE
},
685 {"exec",execCommand
,1,REDIS_CMD_INLINE
},
686 {"sync",syncCommand
,1,REDIS_CMD_INLINE
},
687 {"flushdb",flushdbCommand
,1,REDIS_CMD_INLINE
},
688 {"flushall",flushallCommand
,1,REDIS_CMD_INLINE
},
689 {"sort",sortCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
690 {"info",infoCommand
,1,REDIS_CMD_INLINE
},
691 {"monitor",monitorCommand
,1,REDIS_CMD_INLINE
},
692 {"ttl",ttlCommand
,2,REDIS_CMD_INLINE
},
693 {"slaveof",slaveofCommand
,3,REDIS_CMD_INLINE
},
694 {"debug",debugCommand
,-2,REDIS_CMD_INLINE
},
698 /*============================ Utility functions ============================ */
700 /* Glob-style pattern matching. */
701 int stringmatchlen(const char *pattern
, int patternLen
,
702 const char *string
, int stringLen
, int nocase
)
707 while (pattern
[1] == '*') {
712 return 1; /* match */
714 if (stringmatchlen(pattern
+1, patternLen
-1,
715 string
, stringLen
, nocase
))
716 return 1; /* match */
720 return 0; /* no match */
724 return 0; /* no match */
734 not = pattern
[0] == '^';
741 if (pattern
[0] == '\\') {
744 if (pattern
[0] == string
[0])
746 } else if (pattern
[0] == ']') {
748 } else if (patternLen
== 0) {
752 } else if (pattern
[1] == '-' && patternLen
>= 3) {
753 int start
= pattern
[0];
754 int end
= pattern
[2];
762 start
= tolower(start
);
768 if (c
>= start
&& c
<= end
)
772 if (pattern
[0] == string
[0])
775 if (tolower((int)pattern
[0]) == tolower((int)string
[0]))
785 return 0; /* no match */
791 if (patternLen
>= 2) {
798 if (pattern
[0] != string
[0])
799 return 0; /* no match */
801 if (tolower((int)pattern
[0]) != tolower((int)string
[0]))
802 return 0; /* no match */
810 if (stringLen
== 0) {
811 while(*pattern
== '*') {
818 if (patternLen
== 0 && stringLen
== 0)
823 static void redisLog(int level
, const char *fmt
, ...) {
827 fp
= (server
.logfile
== NULL
) ? stdout
: fopen(server
.logfile
,"a");
831 if (level
>= server
.verbosity
) {
837 strftime(buf
,64,"%d %b %H:%M:%S",localtime(&now
));
838 fprintf(fp
,"%s %c ",buf
,c
[level
]);
839 vfprintf(fp
, fmt
, ap
);
845 if (server
.logfile
) fclose(fp
);
848 /*====================== Hash table type implementation ==================== */
850 /* This is an hash table type that uses the SDS dynamic strings libary as
851 * keys and radis objects as values (objects can hold SDS strings,
854 static void dictVanillaFree(void *privdata
, void *val
)
856 DICT_NOTUSED(privdata
);
860 static void dictListDestructor(void *privdata
, void *val
)
862 DICT_NOTUSED(privdata
);
863 listRelease((list
*)val
);
866 static int sdsDictKeyCompare(void *privdata
, const void *key1
,
870 DICT_NOTUSED(privdata
);
872 l1
= sdslen((sds
)key1
);
873 l2
= sdslen((sds
)key2
);
874 if (l1
!= l2
) return 0;
875 return memcmp(key1
, key2
, l1
) == 0;
878 static void dictRedisObjectDestructor(void *privdata
, void *val
)
880 DICT_NOTUSED(privdata
);
882 if (val
== NULL
) return; /* Values of swapped out keys as set to NULL */
886 static int dictObjKeyCompare(void *privdata
, const void *key1
,
889 const robj
*o1
= key1
, *o2
= key2
;
890 return sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
893 static unsigned int dictObjHash(const void *key
) {
895 return dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
898 static int dictEncObjKeyCompare(void *privdata
, const void *key1
,
901 robj
*o1
= (robj
*) key1
, *o2
= (robj
*) key2
;
904 o1
= getDecodedObject(o1
);
905 o2
= getDecodedObject(o2
);
906 cmp
= sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
912 static unsigned int dictEncObjHash(const void *key
) {
913 robj
*o
= (robj
*) key
;
915 o
= getDecodedObject(o
);
916 unsigned int hash
= dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
921 static dictType setDictType
= {
922 dictEncObjHash
, /* hash function */
925 dictEncObjKeyCompare
, /* key compare */
926 dictRedisObjectDestructor
, /* key destructor */
927 NULL
/* val destructor */
930 static dictType zsetDictType
= {
931 dictEncObjHash
, /* hash function */
934 dictEncObjKeyCompare
, /* key compare */
935 dictRedisObjectDestructor
, /* key destructor */
936 dictVanillaFree
/* val destructor of malloc(sizeof(double)) */
939 static dictType hashDictType
= {
940 dictObjHash
, /* hash function */
943 dictObjKeyCompare
, /* key compare */
944 dictRedisObjectDestructor
, /* key destructor */
945 dictRedisObjectDestructor
/* val destructor */
948 /* Keylist hash table type has unencoded redis objects as keys and
949 * lists as values. It's used for blocking operations (BLPOP) */
950 static dictType keylistDictType
= {
951 dictObjHash
, /* hash function */
954 dictObjKeyCompare
, /* key compare */
955 dictRedisObjectDestructor
, /* key destructor */
956 dictListDestructor
/* val destructor */
959 /* ========================= Random utility functions ======================= */
961 /* Redis generally does not try to recover from out of memory conditions
962 * when allocating objects or strings, it is not clear if it will be possible
963 * to report this condition to the client since the networking layer itself
964 * is based on heap allocation for send buffers, so we simply abort.
965 * At least the code will be simpler to read... */
966 static void oom(const char *msg
) {
967 redisLog(REDIS_WARNING
, "%s: Out of memory\n",msg
);
972 /* ====================== Redis server networking stuff ===================== */
973 static void closeTimedoutClients(void) {
976 time_t now
= time(NULL
);
978 listRewind(server
.clients
);
979 while ((ln
= listYield(server
.clients
)) != NULL
) {
980 c
= listNodeValue(ln
);
981 if (server
.maxidletime
&&
982 !(c
->flags
& REDIS_SLAVE
) && /* no timeout for slaves */
983 !(c
->flags
& REDIS_MASTER
) && /* no timeout for masters */
984 (now
- c
->lastinteraction
> server
.maxidletime
))
986 redisLog(REDIS_VERBOSE
,"Closing idle client");
988 } else if (c
->flags
& REDIS_BLOCKED
) {
989 if (c
->blockingto
!= 0 && c
->blockingto
< now
) {
990 addReply(c
,shared
.nullmultibulk
);
997 static int htNeedsResize(dict
*dict
) {
998 long long size
, used
;
1000 size
= dictSlots(dict
);
1001 used
= dictSize(dict
);
1002 return (size
&& used
&& size
> DICT_HT_INITIAL_SIZE
&&
1003 (used
*100/size
< REDIS_HT_MINFILL
));
1006 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
1007 * we resize the hash table to save memory */
1008 static void tryResizeHashTables(void) {
1011 for (j
= 0; j
< server
.dbnum
; j
++) {
1012 if (htNeedsResize(server
.db
[j
].dict
)) {
1013 redisLog(REDIS_VERBOSE
,"The hash table %d is too sparse, resize it...",j
);
1014 dictResize(server
.db
[j
].dict
);
1015 redisLog(REDIS_VERBOSE
,"Hash table %d resized.",j
);
1017 if (htNeedsResize(server
.db
[j
].expires
))
1018 dictResize(server
.db
[j
].expires
);
1022 /* A background saving child (BGSAVE) terminated its work. Handle this. */
1023 void backgroundSaveDoneHandler(int statloc
) {
1024 int exitcode
= WEXITSTATUS(statloc
);
1025 int bysignal
= WIFSIGNALED(statloc
);
1027 if (!bysignal
&& exitcode
== 0) {
1028 redisLog(REDIS_NOTICE
,
1029 "Background saving terminated with success");
1031 server
.lastsave
= time(NULL
);
1032 } else if (!bysignal
&& exitcode
!= 0) {
1033 redisLog(REDIS_WARNING
, "Background saving error");
1035 redisLog(REDIS_WARNING
,
1036 "Background saving terminated by signal");
1037 rdbRemoveTempFile(server
.bgsavechildpid
);
1039 server
.bgsavechildpid
= -1;
1040 /* Possibly there are slaves waiting for a BGSAVE in order to be served
1041 * (the first stage of SYNC is a bulk transfer of dump.rdb) */
1042 updateSlavesWaitingBgsave(exitcode
== 0 ? REDIS_OK
: REDIS_ERR
);
1045 /* A background append only file rewriting (BGREWRITEAOF) terminated its work.
1047 void backgroundRewriteDoneHandler(int statloc
) {
1048 int exitcode
= WEXITSTATUS(statloc
);
1049 int bysignal
= WIFSIGNALED(statloc
);
1051 if (!bysignal
&& exitcode
== 0) {
1055 redisLog(REDIS_NOTICE
,
1056 "Background append only file rewriting terminated with success");
1057 /* Now it's time to flush the differences accumulated by the parent */
1058 snprintf(tmpfile
,256,"temp-rewriteaof-bg-%d.aof", (int) server
.bgrewritechildpid
);
1059 fd
= open(tmpfile
,O_WRONLY
|O_APPEND
);
1061 redisLog(REDIS_WARNING
, "Not able to open the temp append only file produced by the child: %s", strerror(errno
));
1064 /* Flush our data... */
1065 if (write(fd
,server
.bgrewritebuf
,sdslen(server
.bgrewritebuf
)) !=
1066 (signed) sdslen(server
.bgrewritebuf
)) {
1067 redisLog(REDIS_WARNING
, "Error or short write trying to flush the parent diff of the append log file in the child temp file: %s", strerror(errno
));
1071 redisLog(REDIS_NOTICE
,"Parent diff flushed into the new append log file with success (%lu bytes)",sdslen(server
.bgrewritebuf
));
1072 /* Now our work is to rename the temp file into the stable file. And
1073 * switch the file descriptor used by the server for append only. */
1074 if (rename(tmpfile
,server
.appendfilename
) == -1) {
1075 redisLog(REDIS_WARNING
,"Can't rename the temp append only file into the stable one: %s", strerror(errno
));
1079 /* Mission completed... almost */
1080 redisLog(REDIS_NOTICE
,"Append only file successfully rewritten.");
1081 if (server
.appendfd
!= -1) {
1082 /* If append only is actually enabled... */
1083 close(server
.appendfd
);
1084 server
.appendfd
= fd
;
1086 server
.appendseldb
= -1; /* Make sure it will issue SELECT */
1087 redisLog(REDIS_NOTICE
,"The new append only file was selected for future appends.");
1089 /* If append only is disabled we just generate a dump in this
1090 * format. Why not? */
1093 } else if (!bysignal
&& exitcode
!= 0) {
1094 redisLog(REDIS_WARNING
, "Background append only file rewriting error");
1096 redisLog(REDIS_WARNING
,
1097 "Background append only file rewriting terminated by signal");
1100 sdsfree(server
.bgrewritebuf
);
1101 server
.bgrewritebuf
= sdsempty();
1102 aofRemoveTempFile(server
.bgrewritechildpid
);
1103 server
.bgrewritechildpid
= -1;
1106 static int serverCron(struct aeEventLoop
*eventLoop
, long long id
, void *clientData
) {
1107 int j
, loops
= server
.cronloops
++;
1108 REDIS_NOTUSED(eventLoop
);
1110 REDIS_NOTUSED(clientData
);
1112 /* We take a cached value of the unix time in the global state because
1113 * with virtual memory and aging there is to store the current time
1114 * in objects at every object access, and accuracy is not needed.
1115 * To access a global var is faster than calling time(NULL) */
1116 server
.unixtime
= time(NULL
);
1118 /* Update the global state with the amount of used memory */
1119 server
.usedmemory
= zmalloc_used_memory();
1121 /* Show some info about non-empty databases */
1122 for (j
= 0; j
< server
.dbnum
; j
++) {
1123 long long size
, used
, vkeys
;
1125 size
= dictSlots(server
.db
[j
].dict
);
1126 used
= dictSize(server
.db
[j
].dict
);
1127 vkeys
= dictSize(server
.db
[j
].expires
);
1128 if (!(loops
% 5) && (used
|| vkeys
)) {
1129 redisLog(REDIS_VERBOSE
,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j
,used
,vkeys
,size
);
1130 /* dictPrintStats(server.dict); */
1134 /* We don't want to resize the hash tables while a bacground saving
1135 * is in progress: the saving child is created using fork() that is
1136 * implemented with a copy-on-write semantic in most modern systems, so
1137 * if we resize the HT while there is the saving child at work actually
1138 * a lot of memory movements in the parent will cause a lot of pages
1140 if (server
.bgsavechildpid
== -1) tryResizeHashTables();
1142 /* Show information about connected clients */
1144 redisLog(REDIS_VERBOSE
,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
1145 listLength(server
.clients
)-listLength(server
.slaves
),
1146 listLength(server
.slaves
),
1148 dictSize(server
.sharingpool
));
1151 /* Close connections of timedout clients */
1152 if ((server
.maxidletime
&& !(loops
% 10)) || server
.blockedclients
)
1153 closeTimedoutClients();
1155 /* Check if a background saving or AOF rewrite in progress terminated */
1156 if (server
.bgsavechildpid
!= -1 || server
.bgrewritechildpid
!= -1) {
1160 if ((pid
= wait3(&statloc
,WNOHANG
,NULL
)) != 0) {
1161 if (pid
== server
.bgsavechildpid
) {
1162 backgroundSaveDoneHandler(statloc
);
1164 backgroundRewriteDoneHandler(statloc
);
1168 /* If there is not a background saving in progress check if
1169 * we have to save now */
1170 time_t now
= time(NULL
);
1171 for (j
= 0; j
< server
.saveparamslen
; j
++) {
1172 struct saveparam
*sp
= server
.saveparams
+j
;
1174 if (server
.dirty
>= sp
->changes
&&
1175 now
-server
.lastsave
> sp
->seconds
) {
1176 redisLog(REDIS_NOTICE
,"%d changes in %d seconds. Saving...",
1177 sp
->changes
, sp
->seconds
);
1178 rdbSaveBackground(server
.dbfilename
);
1184 /* Try to expire a few timed out keys. The algorithm used is adaptive and
1185 * will use few CPU cycles if there are few expiring keys, otherwise
1186 * it will get more aggressive to avoid that too much memory is used by
1187 * keys that can be removed from the keyspace. */
1188 for (j
= 0; j
< server
.dbnum
; j
++) {
1190 redisDb
*db
= server
.db
+j
;
1192 /* Continue to expire if at the end of the cycle more than 25%
1193 * of the keys were expired. */
1195 long num
= dictSize(db
->expires
);
1196 time_t now
= time(NULL
);
1199 if (num
> REDIS_EXPIRELOOKUPS_PER_CRON
)
1200 num
= REDIS_EXPIRELOOKUPS_PER_CRON
;
1205 if ((de
= dictGetRandomKey(db
->expires
)) == NULL
) break;
1206 t
= (time_t) dictGetEntryVal(de
);
1208 deleteKey(db
,dictGetEntryKey(de
));
1212 } while (expired
> REDIS_EXPIRELOOKUPS_PER_CRON
/4);
1215 /* Swap a few keys on disk if we are over the memory limit and VM
1216 * is enbled. Try to free objects from the free list first. */
1217 if (vmCanSwapOut()) {
1218 while (server
.vm_enabled
&& zmalloc_used_memory() >
1219 server
.vm_max_memory
)
1221 if (listLength(server
.objfreelist
)) {
1222 freeOneObjectFromFreelist();
1223 } else if (vmSwapOneObject() == REDIS_ERR
) {
1224 if ((loops
% 30) == 0 && zmalloc_used_memory() >
1225 (server
.vm_max_memory
+server
.vm_max_memory
/10)) {
1226 redisLog(REDIS_WARNING
,"WARNING: vm-max-memory limit exceeded by more than 10%% but unable to swap more objects out!");
1233 /* Check if we should connect to a MASTER */
1234 if (server
.replstate
== REDIS_REPL_CONNECT
) {
1235 redisLog(REDIS_NOTICE
,"Connecting to MASTER...");
1236 if (syncWithMaster() == REDIS_OK
) {
1237 redisLog(REDIS_NOTICE
,"MASTER <-> SLAVE sync succeeded");
1243 static void createSharedObjects(void) {
1244 shared
.crlf
= createObject(REDIS_STRING
,sdsnew("\r\n"));
1245 shared
.ok
= createObject(REDIS_STRING
,sdsnew("+OK\r\n"));
1246 shared
.err
= createObject(REDIS_STRING
,sdsnew("-ERR\r\n"));
1247 shared
.emptybulk
= createObject(REDIS_STRING
,sdsnew("$0\r\n\r\n"));
1248 shared
.czero
= createObject(REDIS_STRING
,sdsnew(":0\r\n"));
1249 shared
.cone
= createObject(REDIS_STRING
,sdsnew(":1\r\n"));
1250 shared
.nullbulk
= createObject(REDIS_STRING
,sdsnew("$-1\r\n"));
1251 shared
.nullmultibulk
= createObject(REDIS_STRING
,sdsnew("*-1\r\n"));
1252 shared
.emptymultibulk
= createObject(REDIS_STRING
,sdsnew("*0\r\n"));
1253 shared
.pong
= createObject(REDIS_STRING
,sdsnew("+PONG\r\n"));
1254 shared
.queued
= createObject(REDIS_STRING
,sdsnew("+QUEUED\r\n"));
1255 shared
.wrongtypeerr
= createObject(REDIS_STRING
,sdsnew(
1256 "-ERR Operation against a key holding the wrong kind of value\r\n"));
1257 shared
.nokeyerr
= createObject(REDIS_STRING
,sdsnew(
1258 "-ERR no such key\r\n"));
1259 shared
.syntaxerr
= createObject(REDIS_STRING
,sdsnew(
1260 "-ERR syntax error\r\n"));
1261 shared
.sameobjecterr
= createObject(REDIS_STRING
,sdsnew(
1262 "-ERR source and destination objects are the same\r\n"));
1263 shared
.outofrangeerr
= createObject(REDIS_STRING
,sdsnew(
1264 "-ERR index out of range\r\n"));
1265 shared
.space
= createObject(REDIS_STRING
,sdsnew(" "));
1266 shared
.colon
= createObject(REDIS_STRING
,sdsnew(":"));
1267 shared
.plus
= createObject(REDIS_STRING
,sdsnew("+"));
1268 shared
.select0
= createStringObject("select 0\r\n",10);
1269 shared
.select1
= createStringObject("select 1\r\n",10);
1270 shared
.select2
= createStringObject("select 2\r\n",10);
1271 shared
.select3
= createStringObject("select 3\r\n",10);
1272 shared
.select4
= createStringObject("select 4\r\n",10);
1273 shared
.select5
= createStringObject("select 5\r\n",10);
1274 shared
.select6
= createStringObject("select 6\r\n",10);
1275 shared
.select7
= createStringObject("select 7\r\n",10);
1276 shared
.select8
= createStringObject("select 8\r\n",10);
1277 shared
.select9
= createStringObject("select 9\r\n",10);
1280 static void appendServerSaveParams(time_t seconds
, int changes
) {
1281 server
.saveparams
= zrealloc(server
.saveparams
,sizeof(struct saveparam
)*(server
.saveparamslen
+1));
1282 server
.saveparams
[server
.saveparamslen
].seconds
= seconds
;
1283 server
.saveparams
[server
.saveparamslen
].changes
= changes
;
1284 server
.saveparamslen
++;
1287 static void resetServerSaveParams() {
1288 zfree(server
.saveparams
);
1289 server
.saveparams
= NULL
;
1290 server
.saveparamslen
= 0;
1293 static void initServerConfig() {
1294 server
.dbnum
= REDIS_DEFAULT_DBNUM
;
1295 server
.port
= REDIS_SERVERPORT
;
1296 server
.verbosity
= REDIS_VERBOSE
;
1297 server
.maxidletime
= REDIS_MAXIDLETIME
;
1298 server
.saveparams
= NULL
;
1299 server
.logfile
= NULL
; /* NULL = log on standard output */
1300 server
.bindaddr
= NULL
;
1301 server
.glueoutputbuf
= 1;
1302 server
.daemonize
= 0;
1303 server
.appendonly
= 0;
1304 server
.appendfsync
= APPENDFSYNC_ALWAYS
;
1305 server
.lastfsync
= time(NULL
);
1306 server
.appendfd
= -1;
1307 server
.appendseldb
= -1; /* Make sure the first time will not match */
1308 server
.pidfile
= "/var/run/redis.pid";
1309 server
.dbfilename
= "dump.rdb";
1310 server
.appendfilename
= "appendonly.aof";
1311 server
.requirepass
= NULL
;
1312 server
.shareobjects
= 0;
1313 server
.rdbcompression
= 1;
1314 server
.sharingpoolsize
= 1024;
1315 server
.maxclients
= 0;
1316 server
.blockedclients
= 0;
1317 server
.maxmemory
= 0;
1318 server
.vm_enabled
= 0;
1319 server
.vm_page_size
= 256; /* 256 bytes per page */
1320 server
.vm_pages
= 1024*1024*100; /* 104 millions of pages */
1321 server
.vm_max_memory
= 1024LL*1024*1024*1; /* 1 GB of RAM */
1322 server
.vm_max_threads
= 4;
1324 resetServerSaveParams();
1326 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1327 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1328 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1329 /* Replication related */
1331 server
.masterauth
= NULL
;
1332 server
.masterhost
= NULL
;
1333 server
.masterport
= 6379;
1334 server
.master
= NULL
;
1335 server
.replstate
= REDIS_REPL_NONE
;
1337 /* Double constants initialization */
1339 R_PosInf
= 1.0/R_Zero
;
1340 R_NegInf
= -1.0/R_Zero
;
1341 R_Nan
= R_Zero
/R_Zero
;
1344 static void initServer() {
1347 signal(SIGHUP
, SIG_IGN
);
1348 signal(SIGPIPE
, SIG_IGN
);
1349 setupSigSegvAction();
1351 server
.clients
= listCreate();
1352 server
.slaves
= listCreate();
1353 server
.monitors
= listCreate();
1354 server
.objfreelist
= listCreate();
1355 createSharedObjects();
1356 server
.el
= aeCreateEventLoop();
1357 server
.db
= zmalloc(sizeof(redisDb
)*server
.dbnum
);
1358 server
.sharingpool
= dictCreate(&setDictType
,NULL
);
1359 server
.fd
= anetTcpServer(server
.neterr
, server
.port
, server
.bindaddr
);
1360 if (server
.fd
== -1) {
1361 redisLog(REDIS_WARNING
, "Opening TCP port: %s", server
.neterr
);
1364 for (j
= 0; j
< server
.dbnum
; j
++) {
1365 server
.db
[j
].dict
= dictCreate(&hashDictType
,NULL
);
1366 server
.db
[j
].expires
= dictCreate(&setDictType
,NULL
);
1367 server
.db
[j
].blockingkeys
= dictCreate(&keylistDictType
,NULL
);
1368 server
.db
[j
].id
= j
;
1370 server
.cronloops
= 0;
1371 server
.bgsavechildpid
= -1;
1372 server
.bgrewritechildpid
= -1;
1373 server
.bgrewritebuf
= sdsempty();
1374 server
.lastsave
= time(NULL
);
1376 server
.usedmemory
= 0;
1377 server
.stat_numcommands
= 0;
1378 server
.stat_numconnections
= 0;
1379 server
.stat_starttime
= time(NULL
);
1380 server
.unixtime
= time(NULL
);
1381 aeCreateTimeEvent(server
.el
, 1, serverCron
, NULL
, NULL
);
1383 if (server
.appendonly
) {
1384 server
.appendfd
= open(server
.appendfilename
,O_WRONLY
|O_APPEND
|O_CREAT
,0644);
1385 if (server
.appendfd
== -1) {
1386 redisLog(REDIS_WARNING
, "Can't open the append-only file: %s",
1392 if (server
.vm_enabled
) vmInit();
1395 /* Empty the whole database */
1396 static long long emptyDb() {
1398 long long removed
= 0;
1400 for (j
= 0; j
< server
.dbnum
; j
++) {
1401 removed
+= dictSize(server
.db
[j
].dict
);
1402 dictEmpty(server
.db
[j
].dict
);
1403 dictEmpty(server
.db
[j
].expires
);
1408 static int yesnotoi(char *s
) {
1409 if (!strcasecmp(s
,"yes")) return 1;
1410 else if (!strcasecmp(s
,"no")) return 0;
1414 /* I agree, this is a very rudimental way to load a configuration...
1415 will improve later if the config gets more complex */
1416 static void loadServerConfig(char *filename
) {
1418 char buf
[REDIS_CONFIGLINE_MAX
+1], *err
= NULL
;
1422 if (filename
[0] == '-' && filename
[1] == '\0')
1425 if ((fp
= fopen(filename
,"r")) == NULL
) {
1426 redisLog(REDIS_WARNING
,"Fatal error, can't open config file");
1431 while(fgets(buf
,REDIS_CONFIGLINE_MAX
+1,fp
) != NULL
) {
1437 line
= sdstrim(line
," \t\r\n");
1439 /* Skip comments and blank lines*/
1440 if (line
[0] == '#' || line
[0] == '\0') {
1445 /* Split into arguments */
1446 argv
= sdssplitlen(line
,sdslen(line
)," ",1,&argc
);
1447 sdstolower(argv
[0]);
1449 /* Execute config directives */
1450 if (!strcasecmp(argv
[0],"timeout") && argc
== 2) {
1451 server
.maxidletime
= atoi(argv
[1]);
1452 if (server
.maxidletime
< 0) {
1453 err
= "Invalid timeout value"; goto loaderr
;
1455 } else if (!strcasecmp(argv
[0],"port") && argc
== 2) {
1456 server
.port
= atoi(argv
[1]);
1457 if (server
.port
< 1 || server
.port
> 65535) {
1458 err
= "Invalid port"; goto loaderr
;
1460 } else if (!strcasecmp(argv
[0],"bind") && argc
== 2) {
1461 server
.bindaddr
= zstrdup(argv
[1]);
1462 } else if (!strcasecmp(argv
[0],"save") && argc
== 3) {
1463 int seconds
= atoi(argv
[1]);
1464 int changes
= atoi(argv
[2]);
1465 if (seconds
< 1 || changes
< 0) {
1466 err
= "Invalid save parameters"; goto loaderr
;
1468 appendServerSaveParams(seconds
,changes
);
1469 } else if (!strcasecmp(argv
[0],"dir") && argc
== 2) {
1470 if (chdir(argv
[1]) == -1) {
1471 redisLog(REDIS_WARNING
,"Can't chdir to '%s': %s",
1472 argv
[1], strerror(errno
));
1475 } else if (!strcasecmp(argv
[0],"loglevel") && argc
== 2) {
1476 if (!strcasecmp(argv
[1],"debug")) server
.verbosity
= REDIS_DEBUG
;
1477 else if (!strcasecmp(argv
[1],"verbose")) server
.verbosity
= REDIS_VERBOSE
;
1478 else if (!strcasecmp(argv
[1],"notice")) server
.verbosity
= REDIS_NOTICE
;
1479 else if (!strcasecmp(argv
[1],"warning")) server
.verbosity
= REDIS_WARNING
;
1481 err
= "Invalid log level. Must be one of debug, notice, warning";
1484 } else if (!strcasecmp(argv
[0],"logfile") && argc
== 2) {
1487 server
.logfile
= zstrdup(argv
[1]);
1488 if (!strcasecmp(server
.logfile
,"stdout")) {
1489 zfree(server
.logfile
);
1490 server
.logfile
= NULL
;
1492 if (server
.logfile
) {
1493 /* Test if we are able to open the file. The server will not
1494 * be able to abort just for this problem later... */
1495 logfp
= fopen(server
.logfile
,"a");
1496 if (logfp
== NULL
) {
1497 err
= sdscatprintf(sdsempty(),
1498 "Can't open the log file: %s", strerror(errno
));
1503 } else if (!strcasecmp(argv
[0],"databases") && argc
== 2) {
1504 server
.dbnum
= atoi(argv
[1]);
1505 if (server
.dbnum
< 1) {
1506 err
= "Invalid number of databases"; goto loaderr
;
1508 } else if (!strcasecmp(argv
[0],"maxclients") && argc
== 2) {
1509 server
.maxclients
= atoi(argv
[1]);
1510 } else if (!strcasecmp(argv
[0],"maxmemory") && argc
== 2) {
1511 server
.maxmemory
= strtoll(argv
[1], NULL
, 10);
1512 } else if (!strcasecmp(argv
[0],"slaveof") && argc
== 3) {
1513 server
.masterhost
= sdsnew(argv
[1]);
1514 server
.masterport
= atoi(argv
[2]);
1515 server
.replstate
= REDIS_REPL_CONNECT
;
1516 } else if (!strcasecmp(argv
[0],"masterauth") && argc
== 2) {
1517 server
.masterauth
= zstrdup(argv
[1]);
1518 } else if (!strcasecmp(argv
[0],"glueoutputbuf") && argc
== 2) {
1519 if ((server
.glueoutputbuf
= yesnotoi(argv
[1])) == -1) {
1520 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1522 } else if (!strcasecmp(argv
[0],"shareobjects") && argc
== 2) {
1523 if ((server
.shareobjects
= yesnotoi(argv
[1])) == -1) {
1524 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1526 } else if (!strcasecmp(argv
[0],"rdbcompression") && argc
== 2) {
1527 if ((server
.rdbcompression
= yesnotoi(argv
[1])) == -1) {
1528 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1530 } else if (!strcasecmp(argv
[0],"shareobjectspoolsize") && argc
== 2) {
1531 server
.sharingpoolsize
= atoi(argv
[1]);
1532 if (server
.sharingpoolsize
< 1) {
1533 err
= "invalid object sharing pool size"; goto loaderr
;
1535 } else if (!strcasecmp(argv
[0],"daemonize") && argc
== 2) {
1536 if ((server
.daemonize
= yesnotoi(argv
[1])) == -1) {
1537 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1539 } else if (!strcasecmp(argv
[0],"appendonly") && argc
== 2) {
1540 if ((server
.appendonly
= yesnotoi(argv
[1])) == -1) {
1541 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1543 } else if (!strcasecmp(argv
[0],"appendfsync") && argc
== 2) {
1544 if (!strcasecmp(argv
[1],"no")) {
1545 server
.appendfsync
= APPENDFSYNC_NO
;
1546 } else if (!strcasecmp(argv
[1],"always")) {
1547 server
.appendfsync
= APPENDFSYNC_ALWAYS
;
1548 } else if (!strcasecmp(argv
[1],"everysec")) {
1549 server
.appendfsync
= APPENDFSYNC_EVERYSEC
;
1551 err
= "argument must be 'no', 'always' or 'everysec'";
1554 } else if (!strcasecmp(argv
[0],"requirepass") && argc
== 2) {
1555 server
.requirepass
= zstrdup(argv
[1]);
1556 } else if (!strcasecmp(argv
[0],"pidfile") && argc
== 2) {
1557 server
.pidfile
= zstrdup(argv
[1]);
1558 } else if (!strcasecmp(argv
[0],"dbfilename") && argc
== 2) {
1559 server
.dbfilename
= zstrdup(argv
[1]);
1560 } else if (!strcasecmp(argv
[0],"vm-enabled") && argc
== 2) {
1561 if ((server
.vm_enabled
= yesnotoi(argv
[1])) == -1) {
1562 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1564 } else if (!strcasecmp(argv
[0],"vm-max-memory") && argc
== 2) {
1565 server
.vm_max_memory
= strtoll(argv
[1], NULL
, 10);
1566 } else if (!strcasecmp(argv
[0],"vm-page-size") && argc
== 2) {
1567 server
.vm_page_size
= strtoll(argv
[1], NULL
, 10);
1568 } else if (!strcasecmp(argv
[0],"vm-pages") && argc
== 2) {
1569 server
.vm_pages
= strtoll(argv
[1], NULL
, 10);
1570 } else if (!strcasecmp(argv
[0],"vm-max-threads") && argc
== 2) {
1571 server
.vm_max_threads
= strtoll(argv
[1], NULL
, 10);
1573 err
= "Bad directive or wrong number of arguments"; goto loaderr
;
1575 for (j
= 0; j
< argc
; j
++)
1580 if (fp
!= stdin
) fclose(fp
);
1584 fprintf(stderr
, "\n*** FATAL CONFIG FILE ERROR ***\n");
1585 fprintf(stderr
, "Reading the configuration file, at line %d\n", linenum
);
1586 fprintf(stderr
, ">>> '%s'\n", line
);
1587 fprintf(stderr
, "%s\n", err
);
1591 static void freeClientArgv(redisClient
*c
) {
1594 for (j
= 0; j
< c
->argc
; j
++)
1595 decrRefCount(c
->argv
[j
]);
1596 for (j
= 0; j
< c
->mbargc
; j
++)
1597 decrRefCount(c
->mbargv
[j
]);
1602 static void freeClient(redisClient
*c
) {
1605 /* Note that if the client we are freeing is blocked into a blocking
1606 * call, we have to set querybuf to NULL *before* to call unblockClient()
1607 * to avoid processInputBuffer() will get called. Also it is important
1608 * to remove the file events after this, because this call adds
1609 * the READABLE event. */
1610 sdsfree(c
->querybuf
);
1612 if (c
->flags
& REDIS_BLOCKED
)
1615 aeDeleteFileEvent(server
.el
,c
->fd
,AE_READABLE
);
1616 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1617 listRelease(c
->reply
);
1620 /* Remove from the list of clients */
1621 ln
= listSearchKey(server
.clients
,c
);
1622 redisAssert(ln
!= NULL
);
1623 listDelNode(server
.clients
,ln
);
1624 /* Remove from the list of clients waiting for VM operations */
1625 if (server
.vm_enabled
&& listLength(c
->io_keys
)) {
1626 ln
= listSearchKey(server
.io_clients
,c
);
1627 if (ln
) listDelNode(server
.io_clients
,ln
);
1628 listRelease(c
->io_keys
);
1631 if (c
->flags
& REDIS_SLAVE
) {
1632 if (c
->replstate
== REDIS_REPL_SEND_BULK
&& c
->repldbfd
!= -1)
1634 list
*l
= (c
->flags
& REDIS_MONITOR
) ? server
.monitors
: server
.slaves
;
1635 ln
= listSearchKey(l
,c
);
1636 redisAssert(ln
!= NULL
);
1639 if (c
->flags
& REDIS_MASTER
) {
1640 server
.master
= NULL
;
1641 server
.replstate
= REDIS_REPL_CONNECT
;
1645 freeClientMultiState(c
);
1649 #define GLUEREPLY_UP_TO (1024)
1650 static void glueReplyBuffersIfNeeded(redisClient
*c
) {
1652 char buf
[GLUEREPLY_UP_TO
];
1656 listRewind(c
->reply
);
1657 while((ln
= listYield(c
->reply
))) {
1661 objlen
= sdslen(o
->ptr
);
1662 if (copylen
+ objlen
<= GLUEREPLY_UP_TO
) {
1663 memcpy(buf
+copylen
,o
->ptr
,objlen
);
1665 listDelNode(c
->reply
,ln
);
1667 if (copylen
== 0) return;
1671 /* Now the output buffer is empty, add the new single element */
1672 o
= createObject(REDIS_STRING
,sdsnewlen(buf
,copylen
));
1673 listAddNodeHead(c
->reply
,o
);
1676 static void sendReplyToClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1677 redisClient
*c
= privdata
;
1678 int nwritten
= 0, totwritten
= 0, objlen
;
1681 REDIS_NOTUSED(mask
);
1683 /* Use writev() if we have enough buffers to send */
1684 if (!server
.glueoutputbuf
&&
1685 listLength(c
->reply
) > REDIS_WRITEV_THRESHOLD
&&
1686 !(c
->flags
& REDIS_MASTER
))
1688 sendReplyToClientWritev(el
, fd
, privdata
, mask
);
1692 while(listLength(c
->reply
)) {
1693 if (server
.glueoutputbuf
&& listLength(c
->reply
) > 1)
1694 glueReplyBuffersIfNeeded(c
);
1696 o
= listNodeValue(listFirst(c
->reply
));
1697 objlen
= sdslen(o
->ptr
);
1700 listDelNode(c
->reply
,listFirst(c
->reply
));
1704 if (c
->flags
& REDIS_MASTER
) {
1705 /* Don't reply to a master */
1706 nwritten
= objlen
- c
->sentlen
;
1708 nwritten
= write(fd
, ((char*)o
->ptr
)+c
->sentlen
, objlen
- c
->sentlen
);
1709 if (nwritten
<= 0) break;
1711 c
->sentlen
+= nwritten
;
1712 totwritten
+= nwritten
;
1713 /* If we fully sent the object on head go to the next one */
1714 if (c
->sentlen
== objlen
) {
1715 listDelNode(c
->reply
,listFirst(c
->reply
));
1718 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1719 * bytes, in a single threaded server it's a good idea to serve
1720 * other clients as well, even if a very large request comes from
1721 * super fast link that is always able to accept data (in real world
1722 * scenario think about 'KEYS *' against the loopback interfae) */
1723 if (totwritten
> REDIS_MAX_WRITE_PER_EVENT
) break;
1725 if (nwritten
== -1) {
1726 if (errno
== EAGAIN
) {
1729 redisLog(REDIS_VERBOSE
,
1730 "Error writing to client: %s", strerror(errno
));
1735 if (totwritten
> 0) c
->lastinteraction
= time(NULL
);
1736 if (listLength(c
->reply
) == 0) {
1738 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1742 static void sendReplyToClientWritev(aeEventLoop
*el
, int fd
, void *privdata
, int mask
)
1744 redisClient
*c
= privdata
;
1745 int nwritten
= 0, totwritten
= 0, objlen
, willwrite
;
1747 struct iovec iov
[REDIS_WRITEV_IOVEC_COUNT
];
1748 int offset
, ion
= 0;
1750 REDIS_NOTUSED(mask
);
1753 while (listLength(c
->reply
)) {
1754 offset
= c
->sentlen
;
1758 /* fill-in the iov[] array */
1759 for(node
= listFirst(c
->reply
); node
; node
= listNextNode(node
)) {
1760 o
= listNodeValue(node
);
1761 objlen
= sdslen(o
->ptr
);
1763 if (totwritten
+ objlen
- offset
> REDIS_MAX_WRITE_PER_EVENT
)
1766 if(ion
== REDIS_WRITEV_IOVEC_COUNT
)
1767 break; /* no more iovecs */
1769 iov
[ion
].iov_base
= ((char*)o
->ptr
) + offset
;
1770 iov
[ion
].iov_len
= objlen
- offset
;
1771 willwrite
+= objlen
- offset
;
1772 offset
= 0; /* just for the first item */
1779 /* write all collected blocks at once */
1780 if((nwritten
= writev(fd
, iov
, ion
)) < 0) {
1781 if (errno
!= EAGAIN
) {
1782 redisLog(REDIS_VERBOSE
,
1783 "Error writing to client: %s", strerror(errno
));
1790 totwritten
+= nwritten
;
1791 offset
= c
->sentlen
;
1793 /* remove written robjs from c->reply */
1794 while (nwritten
&& listLength(c
->reply
)) {
1795 o
= listNodeValue(listFirst(c
->reply
));
1796 objlen
= sdslen(o
->ptr
);
1798 if(nwritten
>= objlen
- offset
) {
1799 listDelNode(c
->reply
, listFirst(c
->reply
));
1800 nwritten
-= objlen
- offset
;
1804 c
->sentlen
+= nwritten
;
1812 c
->lastinteraction
= time(NULL
);
1814 if (listLength(c
->reply
) == 0) {
1816 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1820 static struct redisCommand
*lookupCommand(char *name
) {
1822 while(cmdTable
[j
].name
!= NULL
) {
1823 if (!strcasecmp(name
,cmdTable
[j
].name
)) return &cmdTable
[j
];
1829 /* resetClient prepare the client to process the next command */
1830 static void resetClient(redisClient
*c
) {
1836 /* Call() is the core of Redis execution of a command */
1837 static void call(redisClient
*c
, struct redisCommand
*cmd
) {
1840 dirty
= server
.dirty
;
1842 if (server
.appendonly
&& server
.dirty
-dirty
)
1843 feedAppendOnlyFile(cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1844 if (server
.dirty
-dirty
&& listLength(server
.slaves
))
1845 replicationFeedSlaves(server
.slaves
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1846 if (listLength(server
.monitors
))
1847 replicationFeedSlaves(server
.monitors
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1848 server
.stat_numcommands
++;
1851 /* If this function gets called we already read a whole
1852 * command, argments are in the client argv/argc fields.
1853 * processCommand() execute the command or prepare the
1854 * server for a bulk read from the client.
1856 * If 1 is returned the client is still alive and valid and
1857 * and other operations can be performed by the caller. Otherwise
1858 * if 0 is returned the client was destroied (i.e. after QUIT). */
1859 static int processCommand(redisClient
*c
) {
1860 struct redisCommand
*cmd
;
1862 /* Free some memory if needed (maxmemory setting) */
1863 if (server
.maxmemory
) freeMemoryIfNeeded();
1865 /* Handle the multi bulk command type. This is an alternative protocol
1866 * supported by Redis in order to receive commands that are composed of
1867 * multiple binary-safe "bulk" arguments. The latency of processing is
1868 * a bit higher but this allows things like multi-sets, so if this
1869 * protocol is used only for MSET and similar commands this is a big win. */
1870 if (c
->multibulk
== 0 && c
->argc
== 1 && ((char*)(c
->argv
[0]->ptr
))[0] == '*') {
1871 c
->multibulk
= atoi(((char*)c
->argv
[0]->ptr
)+1);
1872 if (c
->multibulk
<= 0) {
1876 decrRefCount(c
->argv
[c
->argc
-1]);
1880 } else if (c
->multibulk
) {
1881 if (c
->bulklen
== -1) {
1882 if (((char*)c
->argv
[0]->ptr
)[0] != '$') {
1883 addReplySds(c
,sdsnew("-ERR multi bulk protocol error\r\n"));
1887 int bulklen
= atoi(((char*)c
->argv
[0]->ptr
)+1);
1888 decrRefCount(c
->argv
[0]);
1889 if (bulklen
< 0 || bulklen
> 1024*1024*1024) {
1891 addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n"));
1896 c
->bulklen
= bulklen
+2; /* add two bytes for CR+LF */
1900 c
->mbargv
= zrealloc(c
->mbargv
,(sizeof(robj
*))*(c
->mbargc
+1));
1901 c
->mbargv
[c
->mbargc
] = c
->argv
[0];
1905 if (c
->multibulk
== 0) {
1909 /* Here we need to swap the multi-bulk argc/argv with the
1910 * normal argc/argv of the client structure. */
1912 c
->argv
= c
->mbargv
;
1913 c
->mbargv
= auxargv
;
1916 c
->argc
= c
->mbargc
;
1917 c
->mbargc
= auxargc
;
1919 /* We need to set bulklen to something different than -1
1920 * in order for the code below to process the command without
1921 * to try to read the last argument of a bulk command as
1922 * a special argument. */
1924 /* continue below and process the command */
1931 /* -- end of multi bulk commands processing -- */
1933 /* The QUIT command is handled as a special case. Normal command
1934 * procs are unable to close the client connection safely */
1935 if (!strcasecmp(c
->argv
[0]->ptr
,"quit")) {
1939 cmd
= lookupCommand(c
->argv
[0]->ptr
);
1942 sdscatprintf(sdsempty(), "-ERR unknown command '%s'\r\n",
1943 (char*)c
->argv
[0]->ptr
));
1946 } else if ((cmd
->arity
> 0 && cmd
->arity
!= c
->argc
) ||
1947 (c
->argc
< -cmd
->arity
)) {
1949 sdscatprintf(sdsempty(),
1950 "-ERR wrong number of arguments for '%s' command\r\n",
1954 } else if (server
.maxmemory
&& cmd
->flags
& REDIS_CMD_DENYOOM
&& zmalloc_used_memory() > server
.maxmemory
) {
1955 addReplySds(c
,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1958 } else if (cmd
->flags
& REDIS_CMD_BULK
&& c
->bulklen
== -1) {
1959 int bulklen
= atoi(c
->argv
[c
->argc
-1]->ptr
);
1961 decrRefCount(c
->argv
[c
->argc
-1]);
1962 if (bulklen
< 0 || bulklen
> 1024*1024*1024) {
1964 addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n"));
1969 c
->bulklen
= bulklen
+2; /* add two bytes for CR+LF */
1970 /* It is possible that the bulk read is already in the
1971 * buffer. Check this condition and handle it accordingly.
1972 * This is just a fast path, alternative to call processInputBuffer().
1973 * It's a good idea since the code is small and this condition
1974 * happens most of the times. */
1975 if ((signed)sdslen(c
->querybuf
) >= c
->bulklen
) {
1976 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
1978 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
1983 /* Let's try to share objects on the command arguments vector */
1984 if (server
.shareobjects
) {
1986 for(j
= 1; j
< c
->argc
; j
++)
1987 c
->argv
[j
] = tryObjectSharing(c
->argv
[j
]);
1989 /* Let's try to encode the bulk object to save space. */
1990 if (cmd
->flags
& REDIS_CMD_BULK
)
1991 tryObjectEncoding(c
->argv
[c
->argc
-1]);
1993 /* Check if the user is authenticated */
1994 if (server
.requirepass
&& !c
->authenticated
&& cmd
->proc
!= authCommand
) {
1995 addReplySds(c
,sdsnew("-ERR operation not permitted\r\n"));
2000 /* Exec the command */
2001 if (c
->flags
& REDIS_MULTI
&& cmd
->proc
!= execCommand
) {
2002 queueMultiCommand(c
,cmd
);
2003 addReply(c
,shared
.queued
);
2008 /* Prepare the client for the next command */
2009 if (c
->flags
& REDIS_CLOSE
) {
2017 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
) {
2021 /* (args*2)+1 is enough room for args, spaces, newlines */
2022 robj
*static_outv
[REDIS_STATIC_ARGS
*2+1];
2024 if (argc
<= REDIS_STATIC_ARGS
) {
2027 outv
= zmalloc(sizeof(robj
*)*(argc
*2+1));
2030 for (j
= 0; j
< argc
; j
++) {
2031 if (j
!= 0) outv
[outc
++] = shared
.space
;
2032 if ((cmd
->flags
& REDIS_CMD_BULK
) && j
== argc
-1) {
2035 lenobj
= createObject(REDIS_STRING
,
2036 sdscatprintf(sdsempty(),"%lu\r\n",
2037 (unsigned long) stringObjectLen(argv
[j
])));
2038 lenobj
->refcount
= 0;
2039 outv
[outc
++] = lenobj
;
2041 outv
[outc
++] = argv
[j
];
2043 outv
[outc
++] = shared
.crlf
;
2045 /* Increment all the refcounts at start and decrement at end in order to
2046 * be sure to free objects if there is no slave in a replication state
2047 * able to be feed with commands */
2048 for (j
= 0; j
< outc
; j
++) incrRefCount(outv
[j
]);
2050 while((ln
= listYield(slaves
))) {
2051 redisClient
*slave
= ln
->value
;
2053 /* Don't feed slaves that are still waiting for BGSAVE to start */
2054 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) continue;
2056 /* Feed all the other slaves, MONITORs and so on */
2057 if (slave
->slaveseldb
!= dictid
) {
2061 case 0: selectcmd
= shared
.select0
; break;
2062 case 1: selectcmd
= shared
.select1
; break;
2063 case 2: selectcmd
= shared
.select2
; break;
2064 case 3: selectcmd
= shared
.select3
; break;
2065 case 4: selectcmd
= shared
.select4
; break;
2066 case 5: selectcmd
= shared
.select5
; break;
2067 case 6: selectcmd
= shared
.select6
; break;
2068 case 7: selectcmd
= shared
.select7
; break;
2069 case 8: selectcmd
= shared
.select8
; break;
2070 case 9: selectcmd
= shared
.select9
; break;
2072 selectcmd
= createObject(REDIS_STRING
,
2073 sdscatprintf(sdsempty(),"select %d\r\n",dictid
));
2074 selectcmd
->refcount
= 0;
2077 addReply(slave
,selectcmd
);
2078 slave
->slaveseldb
= dictid
;
2080 for (j
= 0; j
< outc
; j
++) addReply(slave
,outv
[j
]);
2082 for (j
= 0; j
< outc
; j
++) decrRefCount(outv
[j
]);
2083 if (outv
!= static_outv
) zfree(outv
);
2086 static void processInputBuffer(redisClient
*c
) {
2088 /* Before to process the input buffer, make sure the client is not
2089 * waitig for a blocking operation such as BLPOP. Note that the first
2090 * iteration the client is never blocked, otherwise the processInputBuffer
2091 * would not be called at all, but after the execution of the first commands
2092 * in the input buffer the client may be blocked, and the "goto again"
2093 * will try to reiterate. The following line will make it return asap. */
2094 if (c
->flags
& REDIS_BLOCKED
|| c
->flags
& REDIS_IO_WAIT
) return;
2095 if (c
->bulklen
== -1) {
2096 /* Read the first line of the query */
2097 char *p
= strchr(c
->querybuf
,'\n');
2104 query
= c
->querybuf
;
2105 c
->querybuf
= sdsempty();
2106 querylen
= 1+(p
-(query
));
2107 if (sdslen(query
) > querylen
) {
2108 /* leave data after the first line of the query in the buffer */
2109 c
->querybuf
= sdscatlen(c
->querybuf
,query
+querylen
,sdslen(query
)-querylen
);
2111 *p
= '\0'; /* remove "\n" */
2112 if (*(p
-1) == '\r') *(p
-1) = '\0'; /* and "\r" if any */
2113 sdsupdatelen(query
);
2115 /* Now we can split the query in arguments */
2116 argv
= sdssplitlen(query
,sdslen(query
)," ",1,&argc
);
2119 if (c
->argv
) zfree(c
->argv
);
2120 c
->argv
= zmalloc(sizeof(robj
*)*argc
);
2122 for (j
= 0; j
< argc
; j
++) {
2123 if (sdslen(argv
[j
])) {
2124 c
->argv
[c
->argc
] = createObject(REDIS_STRING
,argv
[j
]);
2132 /* Execute the command. If the client is still valid
2133 * after processCommand() return and there is something
2134 * on the query buffer try to process the next command. */
2135 if (processCommand(c
) && sdslen(c
->querybuf
)) goto again
;
2137 /* Nothing to process, argc == 0. Just process the query
2138 * buffer if it's not empty or return to the caller */
2139 if (sdslen(c
->querybuf
)) goto again
;
2142 } else if (sdslen(c
->querybuf
) >= REDIS_REQUEST_MAX_SIZE
) {
2143 redisLog(REDIS_VERBOSE
, "Client protocol error");
2148 /* Bulk read handling. Note that if we are at this point
2149 the client already sent a command terminated with a newline,
2150 we are reading the bulk data that is actually the last
2151 argument of the command. */
2152 int qbl
= sdslen(c
->querybuf
);
2154 if (c
->bulklen
<= qbl
) {
2155 /* Copy everything but the final CRLF as final argument */
2156 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
2158 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
2159 /* Process the command. If the client is still valid after
2160 * the processing and there is more data in the buffer
2161 * try to parse it. */
2162 if (processCommand(c
) && sdslen(c
->querybuf
)) goto again
;
2168 static void readQueryFromClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
2169 redisClient
*c
= (redisClient
*) privdata
;
2170 char buf
[REDIS_IOBUF_LEN
];
2173 REDIS_NOTUSED(mask
);
2175 nread
= read(fd
, buf
, REDIS_IOBUF_LEN
);
2177 if (errno
== EAGAIN
) {
2180 redisLog(REDIS_VERBOSE
, "Reading from client: %s",strerror(errno
));
2184 } else if (nread
== 0) {
2185 redisLog(REDIS_VERBOSE
, "Client closed connection");
2190 c
->querybuf
= sdscatlen(c
->querybuf
, buf
, nread
);
2191 c
->lastinteraction
= time(NULL
);
2195 processInputBuffer(c
);
2198 static int selectDb(redisClient
*c
, int id
) {
2199 if (id
< 0 || id
>= server
.dbnum
)
2201 c
->db
= &server
.db
[id
];
2205 static void *dupClientReplyValue(void *o
) {
2206 incrRefCount((robj
*)o
);
2210 static redisClient
*createClient(int fd
) {
2211 redisClient
*c
= zmalloc(sizeof(*c
));
2213 anetNonBlock(NULL
,fd
);
2214 anetTcpNoDelay(NULL
,fd
);
2215 if (!c
) return NULL
;
2218 c
->querybuf
= sdsempty();
2227 c
->lastinteraction
= time(NULL
);
2228 c
->authenticated
= 0;
2229 c
->replstate
= REDIS_REPL_NONE
;
2230 c
->reply
= listCreate();
2231 listSetFreeMethod(c
->reply
,decrRefCount
);
2232 listSetDupMethod(c
->reply
,dupClientReplyValue
);
2233 c
->blockingkeys
= NULL
;
2234 c
->blockingkeysnum
= 0;
2235 c
->io_keys
= listCreate();
2236 listSetFreeMethod(c
->io_keys
,decrRefCount
);
2237 if (aeCreateFileEvent(server
.el
, c
->fd
, AE_READABLE
,
2238 readQueryFromClient
, c
) == AE_ERR
) {
2242 listAddNodeTail(server
.clients
,c
);
2243 initClientMultiState(c
);
2247 static void addReply(redisClient
*c
, robj
*obj
) {
2248 if (listLength(c
->reply
) == 0 &&
2249 (c
->replstate
== REDIS_REPL_NONE
||
2250 c
->replstate
== REDIS_REPL_ONLINE
) &&
2251 aeCreateFileEvent(server
.el
, c
->fd
, AE_WRITABLE
,
2252 sendReplyToClient
, c
) == AE_ERR
) return;
2254 if (server
.vm_enabled
&& obj
->storage
!= REDIS_VM_MEMORY
) {
2255 obj
= dupStringObject(obj
);
2256 obj
->refcount
= 0; /* getDecodedObject() will increment the refcount */
2258 listAddNodeTail(c
->reply
,getDecodedObject(obj
));
2261 static void addReplySds(redisClient
*c
, sds s
) {
2262 robj
*o
= createObject(REDIS_STRING
,s
);
2267 static void addReplyDouble(redisClient
*c
, double d
) {
2270 snprintf(buf
,sizeof(buf
),"%.17g",d
);
2271 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n%s\r\n",
2272 (unsigned long) strlen(buf
),buf
));
2275 static void addReplyBulkLen(redisClient
*c
, robj
*obj
) {
2278 if (obj
->encoding
== REDIS_ENCODING_RAW
) {
2279 len
= sdslen(obj
->ptr
);
2281 long n
= (long)obj
->ptr
;
2283 /* Compute how many bytes will take this integer as a radix 10 string */
2289 while((n
= n
/10) != 0) {
2293 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n",(unsigned long)len
));
2296 static void acceptHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
2301 REDIS_NOTUSED(mask
);
2302 REDIS_NOTUSED(privdata
);
2304 cfd
= anetAccept(server
.neterr
, fd
, cip
, &cport
);
2305 if (cfd
== AE_ERR
) {
2306 redisLog(REDIS_VERBOSE
,"Accepting client connection: %s", server
.neterr
);
2309 redisLog(REDIS_VERBOSE
,"Accepted %s:%d", cip
, cport
);
2310 if ((c
= createClient(cfd
)) == NULL
) {
2311 redisLog(REDIS_WARNING
,"Error allocating resoures for the client");
2312 close(cfd
); /* May be already closed, just ingore errors */
2315 /* If maxclient directive is set and this is one client more... close the
2316 * connection. Note that we create the client instead to check before
2317 * for this condition, since now the socket is already set in nonblocking
2318 * mode and we can send an error for free using the Kernel I/O */
2319 if (server
.maxclients
&& listLength(server
.clients
) > server
.maxclients
) {
2320 char *err
= "-ERR max number of clients reached\r\n";
2322 /* That's a best effort error message, don't check write errors */
2323 if (write(c
->fd
,err
,strlen(err
)) == -1) {
2324 /* Nothing to do, Just to avoid the warning... */
2329 server
.stat_numconnections
++;
2332 /* ======================= Redis objects implementation ===================== */
2334 static robj
*createObject(int type
, void *ptr
) {
2337 if (listLength(server
.objfreelist
)) {
2338 listNode
*head
= listFirst(server
.objfreelist
);
2339 o
= listNodeValue(head
);
2340 listDelNode(server
.objfreelist
,head
);
2342 if (server
.vm_enabled
) {
2343 o
= zmalloc(sizeof(*o
));
2345 o
= zmalloc(sizeof(*o
)-sizeof(struct redisObjectVM
));
2349 o
->encoding
= REDIS_ENCODING_RAW
;
2352 if (server
.vm_enabled
) {
2353 o
->vm
.atime
= server
.unixtime
;
2354 o
->storage
= REDIS_VM_MEMORY
;
2359 static robj
*createStringObject(char *ptr
, size_t len
) {
2360 return createObject(REDIS_STRING
,sdsnewlen(ptr
,len
));
2363 static robj
*dupStringObject(robj
*o
) {
2364 return createStringObject(o
->ptr
,sdslen(o
->ptr
));
2367 static robj
*createListObject(void) {
2368 list
*l
= listCreate();
2370 listSetFreeMethod(l
,decrRefCount
);
2371 return createObject(REDIS_LIST
,l
);
2374 static robj
*createSetObject(void) {
2375 dict
*d
= dictCreate(&setDictType
,NULL
);
2376 return createObject(REDIS_SET
,d
);
2379 static robj
*createZsetObject(void) {
2380 zset
*zs
= zmalloc(sizeof(*zs
));
2382 zs
->dict
= dictCreate(&zsetDictType
,NULL
);
2383 zs
->zsl
= zslCreate();
2384 return createObject(REDIS_ZSET
,zs
);
2387 static void freeStringObject(robj
*o
) {
2388 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2393 static void freeListObject(robj
*o
) {
2394 listRelease((list
*) o
->ptr
);
2397 static void freeSetObject(robj
*o
) {
2398 dictRelease((dict
*) o
->ptr
);
2401 static void freeZsetObject(robj
*o
) {
2404 dictRelease(zs
->dict
);
2409 static void freeHashObject(robj
*o
) {
2410 dictRelease((dict
*) o
->ptr
);
2413 static void incrRefCount(robj
*o
) {
2414 redisAssert(!server
.vm_enabled
|| o
->storage
== REDIS_VM_MEMORY
);
2418 static void decrRefCount(void *obj
) {
2421 /* REDIS_VM_SWAPPED */
2422 if (server
.vm_enabled
&& o
->storage
== REDIS_VM_SWAPPED
) {
2423 redisAssert(o
->refcount
== 1);
2424 redisAssert(o
->type
== REDIS_STRING
);
2425 freeStringObject(o
);
2426 vmMarkPagesFree(o
->vm
.page
,o
->vm
.usedpages
);
2427 if (listLength(server
.objfreelist
) > REDIS_OBJFREELIST_MAX
||
2428 !listAddNodeHead(server
.objfreelist
,o
))
2430 server
.vm_stats_swapped_objects
--;
2433 /* REDIS_VM_MEMORY */
2434 if (--(o
->refcount
) == 0) {
2436 case REDIS_STRING
: freeStringObject(o
); break;
2437 case REDIS_LIST
: freeListObject(o
); break;
2438 case REDIS_SET
: freeSetObject(o
); break;
2439 case REDIS_ZSET
: freeZsetObject(o
); break;
2440 case REDIS_HASH
: freeHashObject(o
); break;
2441 default: redisAssert(0 != 0); break;
2443 if (listLength(server
.objfreelist
) > REDIS_OBJFREELIST_MAX
||
2444 !listAddNodeHead(server
.objfreelist
,o
))
2449 static robj
*lookupKey(redisDb
*db
, robj
*key
) {
2450 dictEntry
*de
= dictFind(db
->dict
,key
);
2452 robj
*key
= dictGetEntryKey(de
);
2453 robj
*val
= dictGetEntryVal(de
);
2455 if (server
.vm_enabled
) {
2456 if (key
->storage
== REDIS_VM_MEMORY
) {
2457 /* Update the access time of the key for the aging algorithm. */
2458 key
->vm
.atime
= server
.unixtime
;
2460 /* Our value was swapped on disk. Bring it at home. */
2461 redisAssert(val
== NULL
);
2462 val
= vmLoadObject(key
);
2463 dictGetEntryVal(de
) = val
;
2472 static robj
*lookupKeyRead(redisDb
*db
, robj
*key
) {
2473 expireIfNeeded(db
,key
);
2474 return lookupKey(db
,key
);
2477 static robj
*lookupKeyWrite(redisDb
*db
, robj
*key
) {
2478 deleteIfVolatile(db
,key
);
2479 return lookupKey(db
,key
);
2482 static int deleteKey(redisDb
*db
, robj
*key
) {
2485 /* We need to protect key from destruction: after the first dictDelete()
2486 * it may happen that 'key' is no longer valid if we don't increment
2487 * it's count. This may happen when we get the object reference directly
2488 * from the hash table with dictRandomKey() or dict iterators */
2490 if (dictSize(db
->expires
)) dictDelete(db
->expires
,key
);
2491 retval
= dictDelete(db
->dict
,key
);
2494 return retval
== DICT_OK
;
2497 /* Try to share an object against the shared objects pool */
2498 static robj
*tryObjectSharing(robj
*o
) {
2499 struct dictEntry
*de
;
2502 if (o
== NULL
|| server
.shareobjects
== 0) return o
;
2504 redisAssert(o
->type
== REDIS_STRING
);
2505 de
= dictFind(server
.sharingpool
,o
);
2507 robj
*shared
= dictGetEntryKey(de
);
2509 c
= ((unsigned long) dictGetEntryVal(de
))+1;
2510 dictGetEntryVal(de
) = (void*) c
;
2511 incrRefCount(shared
);
2515 /* Here we are using a stream algorihtm: Every time an object is
2516 * shared we increment its count, everytime there is a miss we
2517 * recrement the counter of a random object. If this object reaches
2518 * zero we remove the object and put the current object instead. */
2519 if (dictSize(server
.sharingpool
) >=
2520 server
.sharingpoolsize
) {
2521 de
= dictGetRandomKey(server
.sharingpool
);
2522 redisAssert(de
!= NULL
);
2523 c
= ((unsigned long) dictGetEntryVal(de
))-1;
2524 dictGetEntryVal(de
) = (void*) c
;
2526 dictDelete(server
.sharingpool
,de
->key
);
2529 c
= 0; /* If the pool is empty we want to add this object */
2534 retval
= dictAdd(server
.sharingpool
,o
,(void*)1);
2535 redisAssert(retval
== DICT_OK
);
2542 /* Check if the nul-terminated string 's' can be represented by a long
2543 * (that is, is a number that fits into long without any other space or
2544 * character before or after the digits).
2546 * If so, the function returns REDIS_OK and *longval is set to the value
2547 * of the number. Otherwise REDIS_ERR is returned */
2548 static int isStringRepresentableAsLong(sds s
, long *longval
) {
2549 char buf
[32], *endptr
;
2553 value
= strtol(s
, &endptr
, 10);
2554 if (endptr
[0] != '\0') return REDIS_ERR
;
2555 slen
= snprintf(buf
,32,"%ld",value
);
2557 /* If the number converted back into a string is not identical
2558 * then it's not possible to encode the string as integer */
2559 if (sdslen(s
) != (unsigned)slen
|| memcmp(buf
,s
,slen
)) return REDIS_ERR
;
2560 if (longval
) *longval
= value
;
2564 /* Try to encode a string object in order to save space */
2565 static int tryObjectEncoding(robj
*o
) {
2569 if (o
->encoding
!= REDIS_ENCODING_RAW
)
2570 return REDIS_ERR
; /* Already encoded */
2572 /* It's not save to encode shared objects: shared objects can be shared
2573 * everywhere in the "object space" of Redis. Encoded objects can only
2574 * appear as "values" (and not, for instance, as keys) */
2575 if (o
->refcount
> 1) return REDIS_ERR
;
2577 /* Currently we try to encode only strings */
2578 redisAssert(o
->type
== REDIS_STRING
);
2580 /* Check if we can represent this string as a long integer */
2581 if (isStringRepresentableAsLong(s
,&value
) == REDIS_ERR
) return REDIS_ERR
;
2583 /* Ok, this object can be encoded */
2584 o
->encoding
= REDIS_ENCODING_INT
;
2586 o
->ptr
= (void*) value
;
2590 /* Get a decoded version of an encoded object (returned as a new object).
2591 * If the object is already raw-encoded just increment the ref count. */
2592 static robj
*getDecodedObject(robj
*o
) {
2595 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2599 if (o
->type
== REDIS_STRING
&& o
->encoding
== REDIS_ENCODING_INT
) {
2602 snprintf(buf
,32,"%ld",(long)o
->ptr
);
2603 dec
= createStringObject(buf
,strlen(buf
));
2606 redisAssert(1 != 1);
2610 /* Compare two string objects via strcmp() or alike.
2611 * Note that the objects may be integer-encoded. In such a case we
2612 * use snprintf() to get a string representation of the numbers on the stack
2613 * and compare the strings, it's much faster than calling getDecodedObject().
2615 * Important note: if objects are not integer encoded, but binary-safe strings,
2616 * sdscmp() from sds.c will apply memcmp() so this function ca be considered
2618 static int compareStringObjects(robj
*a
, robj
*b
) {
2619 redisAssert(a
->type
== REDIS_STRING
&& b
->type
== REDIS_STRING
);
2620 char bufa
[128], bufb
[128], *astr
, *bstr
;
2623 if (a
== b
) return 0;
2624 if (a
->encoding
!= REDIS_ENCODING_RAW
) {
2625 snprintf(bufa
,sizeof(bufa
),"%ld",(long) a
->ptr
);
2631 if (b
->encoding
!= REDIS_ENCODING_RAW
) {
2632 snprintf(bufb
,sizeof(bufb
),"%ld",(long) b
->ptr
);
2638 return bothsds
? sdscmp(astr
,bstr
) : strcmp(astr
,bstr
);
2641 static size_t stringObjectLen(robj
*o
) {
2642 redisAssert(o
->type
== REDIS_STRING
);
2643 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2644 return sdslen(o
->ptr
);
2648 return snprintf(buf
,32,"%ld",(long)o
->ptr
);
2652 /*============================ RDB saving/loading =========================== */
2654 static int rdbSaveType(FILE *fp
, unsigned char type
) {
2655 if (fwrite(&type
,1,1,fp
) == 0) return -1;
2659 static int rdbSaveTime(FILE *fp
, time_t t
) {
2660 int32_t t32
= (int32_t) t
;
2661 if (fwrite(&t32
,4,1,fp
) == 0) return -1;
2665 /* check rdbLoadLen() comments for more info */
2666 static int rdbSaveLen(FILE *fp
, uint32_t len
) {
2667 unsigned char buf
[2];
2670 /* Save a 6 bit len */
2671 buf
[0] = (len
&0xFF)|(REDIS_RDB_6BITLEN
<<6);
2672 if (fwrite(buf
,1,1,fp
) == 0) return -1;
2673 } else if (len
< (1<<14)) {
2674 /* Save a 14 bit len */
2675 buf
[0] = ((len
>>8)&0xFF)|(REDIS_RDB_14BITLEN
<<6);
2677 if (fwrite(buf
,2,1,fp
) == 0) return -1;
2679 /* Save a 32 bit len */
2680 buf
[0] = (REDIS_RDB_32BITLEN
<<6);
2681 if (fwrite(buf
,1,1,fp
) == 0) return -1;
2683 if (fwrite(&len
,4,1,fp
) == 0) return -1;
2688 /* String objects in the form "2391" "-100" without any space and with a
2689 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2690 * encoded as integers to save space */
2691 static int rdbTryIntegerEncoding(sds s
, unsigned char *enc
) {
2693 char *endptr
, buf
[32];
2695 /* Check if it's possible to encode this value as a number */
2696 value
= strtoll(s
, &endptr
, 10);
2697 if (endptr
[0] != '\0') return 0;
2698 snprintf(buf
,32,"%lld",value
);
2700 /* If the number converted back into a string is not identical
2701 * then it's not possible to encode the string as integer */
2702 if (strlen(buf
) != sdslen(s
) || memcmp(buf
,s
,sdslen(s
))) return 0;
2704 /* Finally check if it fits in our ranges */
2705 if (value
>= -(1<<7) && value
<= (1<<7)-1) {
2706 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT8
;
2707 enc
[1] = value
&0xFF;
2709 } else if (value
>= -(1<<15) && value
<= (1<<15)-1) {
2710 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT16
;
2711 enc
[1] = value
&0xFF;
2712 enc
[2] = (value
>>8)&0xFF;
2714 } else if (value
>= -((long long)1<<31) && value
<= ((long long)1<<31)-1) {
2715 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT32
;
2716 enc
[1] = value
&0xFF;
2717 enc
[2] = (value
>>8)&0xFF;
2718 enc
[3] = (value
>>16)&0xFF;
2719 enc
[4] = (value
>>24)&0xFF;
2726 static int rdbSaveLzfStringObject(FILE *fp
, robj
*obj
) {
2727 unsigned int comprlen
, outlen
;
2731 /* We require at least four bytes compression for this to be worth it */
2732 outlen
= sdslen(obj
->ptr
)-4;
2733 if (outlen
<= 0) return 0;
2734 if ((out
= zmalloc(outlen
+1)) == NULL
) return 0;
2735 comprlen
= lzf_compress(obj
->ptr
, sdslen(obj
->ptr
), out
, outlen
);
2736 if (comprlen
== 0) {
2740 /* Data compressed! Let's save it on disk */
2741 byte
= (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_LZF
;
2742 if (fwrite(&byte
,1,1,fp
) == 0) goto writeerr
;
2743 if (rdbSaveLen(fp
,comprlen
) == -1) goto writeerr
;
2744 if (rdbSaveLen(fp
,sdslen(obj
->ptr
)) == -1) goto writeerr
;
2745 if (fwrite(out
,comprlen
,1,fp
) == 0) goto writeerr
;
2754 /* Save a string objet as [len][data] on disk. If the object is a string
2755 * representation of an integer value we try to safe it in a special form */
2756 static int rdbSaveStringObjectRaw(FILE *fp
, robj
*obj
) {
2760 len
= sdslen(obj
->ptr
);
2762 /* Try integer encoding */
2764 unsigned char buf
[5];
2765 if ((enclen
= rdbTryIntegerEncoding(obj
->ptr
,buf
)) > 0) {
2766 if (fwrite(buf
,enclen
,1,fp
) == 0) return -1;
2771 /* Try LZF compression - under 20 bytes it's unable to compress even
2772 * aaaaaaaaaaaaaaaaaa so skip it */
2773 if (server
.rdbcompression
&& len
> 20) {
2776 retval
= rdbSaveLzfStringObject(fp
,obj
);
2777 if (retval
== -1) return -1;
2778 if (retval
> 0) return 0;
2779 /* retval == 0 means data can't be compressed, save the old way */
2782 /* Store verbatim */
2783 if (rdbSaveLen(fp
,len
) == -1) return -1;
2784 if (len
&& fwrite(obj
->ptr
,len
,1,fp
) == 0) return -1;
2788 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2789 static int rdbSaveStringObject(FILE *fp
, robj
*obj
) {
2792 obj
= getDecodedObject(obj
);
2793 retval
= rdbSaveStringObjectRaw(fp
,obj
);
2798 /* Save a double value. Doubles are saved as strings prefixed by an unsigned
2799 * 8 bit integer specifing the length of the representation.
2800 * This 8 bit integer has special values in order to specify the following
2806 static int rdbSaveDoubleValue(FILE *fp
, double val
) {
2807 unsigned char buf
[128];
2813 } else if (!isfinite(val
)) {
2815 buf
[0] = (val
< 0) ? 255 : 254;
2817 snprintf((char*)buf
+1,sizeof(buf
)-1,"%.17g",val
);
2818 buf
[0] = strlen((char*)buf
+1);
2821 if (fwrite(buf
,len
,1,fp
) == 0) return -1;
2825 /* Save a Redis object. */
2826 static int rdbSaveObject(FILE *fp
, robj
*o
) {
2827 if (o
->type
== REDIS_STRING
) {
2828 /* Save a string value */
2829 if (rdbSaveStringObject(fp
,o
) == -1) return -1;
2830 } else if (o
->type
== REDIS_LIST
) {
2831 /* Save a list value */
2832 list
*list
= o
->ptr
;
2836 if (rdbSaveLen(fp
,listLength(list
)) == -1) return -1;
2837 while((ln
= listYield(list
))) {
2838 robj
*eleobj
= listNodeValue(ln
);
2840 if (rdbSaveStringObject(fp
,eleobj
) == -1) return -1;
2842 } else if (o
->type
== REDIS_SET
) {
2843 /* Save a set value */
2845 dictIterator
*di
= dictGetIterator(set
);
2848 if (rdbSaveLen(fp
,dictSize(set
)) == -1) return -1;
2849 while((de
= dictNext(di
)) != NULL
) {
2850 robj
*eleobj
= dictGetEntryKey(de
);
2852 if (rdbSaveStringObject(fp
,eleobj
) == -1) return -1;
2854 dictReleaseIterator(di
);
2855 } else if (o
->type
== REDIS_ZSET
) {
2856 /* Save a set value */
2858 dictIterator
*di
= dictGetIterator(zs
->dict
);
2861 if (rdbSaveLen(fp
,dictSize(zs
->dict
)) == -1) return -1;
2862 while((de
= dictNext(di
)) != NULL
) {
2863 robj
*eleobj
= dictGetEntryKey(de
);
2864 double *score
= dictGetEntryVal(de
);
2866 if (rdbSaveStringObject(fp
,eleobj
) == -1) return -1;
2867 if (rdbSaveDoubleValue(fp
,*score
) == -1) return -1;
2869 dictReleaseIterator(di
);
2871 redisAssert(0 != 0);
2876 /* Return the length the object will have on disk if saved with
2877 * the rdbSaveObject() function. Currently we use a trick to get
2878 * this length with very little changes to the code. In the future
2879 * we could switch to a faster solution. */
2880 static off_t
rdbSavedObjectLen(robj
*o
) {
2881 static FILE *fp
= NULL
;
2883 if (fp
== NULL
) fp
= fopen("/dev/null","w");
2887 assert(rdbSaveObject(fp
,o
) != 1);
2891 /* Return the number of pages required to save this object in the swap file */
2892 static off_t
rdbSavedObjectPages(robj
*o
) {
2893 off_t bytes
= rdbSavedObjectLen(o
);
2895 return (bytes
+(server
.vm_page_size
-1))/server
.vm_page_size
;
2898 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2899 static int rdbSave(char *filename
) {
2900 dictIterator
*di
= NULL
;
2905 time_t now
= time(NULL
);
2907 snprintf(tmpfile
,256,"temp-%d.rdb", (int) getpid());
2908 fp
= fopen(tmpfile
,"w");
2910 redisLog(REDIS_WARNING
, "Failed saving the DB: %s", strerror(errno
));
2913 if (fwrite("REDIS0001",9,1,fp
) == 0) goto werr
;
2914 for (j
= 0; j
< server
.dbnum
; j
++) {
2915 redisDb
*db
= server
.db
+j
;
2917 if (dictSize(d
) == 0) continue;
2918 di
= dictGetIterator(d
);
2924 /* Write the SELECT DB opcode */
2925 if (rdbSaveType(fp
,REDIS_SELECTDB
) == -1) goto werr
;
2926 if (rdbSaveLen(fp
,j
) == -1) goto werr
;
2928 /* Iterate this DB writing every entry */
2929 while((de
= dictNext(di
)) != NULL
) {
2930 robj
*key
= dictGetEntryKey(de
);
2931 robj
*o
= dictGetEntryVal(de
);
2932 time_t expiretime
= getExpire(db
,key
);
2934 /* Save the expire time */
2935 if (expiretime
!= -1) {
2936 /* If this key is already expired skip it */
2937 if (expiretime
< now
) continue;
2938 if (rdbSaveType(fp
,REDIS_EXPIRETIME
) == -1) goto werr
;
2939 if (rdbSaveTime(fp
,expiretime
) == -1) goto werr
;
2941 /* Save the key and associated value. This requires special
2942 * handling if the value is swapped out. */
2943 if (!server
.vm_enabled
|| key
->storage
== REDIS_VM_MEMORY
) {
2944 /* Save type, key, value */
2945 if (rdbSaveType(fp
,o
->type
) == -1) goto werr
;
2946 if (rdbSaveStringObject(fp
,key
) == -1) goto werr
;
2947 if (rdbSaveObject(fp
,o
) == -1) goto werr
;
2950 /* Get a preview of the object in memory */
2951 po
= vmPreviewObject(key
);
2952 /* Also duplicate the key object, to pass around a standard
2954 newkey
= dupStringObject(key
);
2955 /* Save type, key, value */
2956 if (rdbSaveType(fp
,key
->vtype
) == -1) goto werr
;
2957 if (rdbSaveStringObject(fp
,newkey
) == -1) goto werr
;
2958 if (rdbSaveObject(fp
,po
) == -1) goto werr
;
2959 /* Remove the loaded object from memory */
2961 decrRefCount(newkey
);
2964 dictReleaseIterator(di
);
2967 if (rdbSaveType(fp
,REDIS_EOF
) == -1) goto werr
;
2969 /* Make sure data will not remain on the OS's output buffers */
2974 /* Use RENAME to make sure the DB file is changed atomically only
2975 * if the generate DB file is ok. */
2976 if (rename(tmpfile
,filename
) == -1) {
2977 redisLog(REDIS_WARNING
,"Error moving temp DB file on the final destination: %s", strerror(errno
));
2981 redisLog(REDIS_NOTICE
,"DB saved on disk");
2983 server
.lastsave
= time(NULL
);
2989 redisLog(REDIS_WARNING
,"Write error saving DB on disk: %s", strerror(errno
));
2990 if (di
) dictReleaseIterator(di
);
2994 static int rdbSaveBackground(char *filename
) {
2997 if (server
.bgsavechildpid
!= -1) return REDIS_ERR
;
2998 if ((childpid
= fork()) == 0) {
3001 if (rdbSave(filename
) == REDIS_OK
) {
3008 if (childpid
== -1) {
3009 redisLog(REDIS_WARNING
,"Can't save in background: fork: %s",
3013 redisLog(REDIS_NOTICE
,"Background saving started by pid %d",childpid
);
3014 server
.bgsavechildpid
= childpid
;
3017 return REDIS_OK
; /* unreached */
3020 static void rdbRemoveTempFile(pid_t childpid
) {
3023 snprintf(tmpfile
,256,"temp-%d.rdb", (int) childpid
);
3027 static int rdbLoadType(FILE *fp
) {
3029 if (fread(&type
,1,1,fp
) == 0) return -1;
3033 static time_t rdbLoadTime(FILE *fp
) {
3035 if (fread(&t32
,4,1,fp
) == 0) return -1;
3036 return (time_t) t32
;
3039 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
3040 * of this file for a description of how this are stored on disk.
3042 * isencoded is set to 1 if the readed length is not actually a length but
3043 * an "encoding type", check the above comments for more info */
3044 static uint32_t rdbLoadLen(FILE *fp
, int *isencoded
) {
3045 unsigned char buf
[2];
3049 if (isencoded
) *isencoded
= 0;
3050 if (fread(buf
,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
3051 type
= (buf
[0]&0xC0)>>6;
3052 if (type
== REDIS_RDB_6BITLEN
) {
3053 /* Read a 6 bit len */
3055 } else if (type
== REDIS_RDB_ENCVAL
) {
3056 /* Read a 6 bit len encoding type */
3057 if (isencoded
) *isencoded
= 1;
3059 } else if (type
== REDIS_RDB_14BITLEN
) {
3060 /* Read a 14 bit len */
3061 if (fread(buf
+1,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
3062 return ((buf
[0]&0x3F)<<8)|buf
[1];
3064 /* Read a 32 bit len */
3065 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
3070 static robj
*rdbLoadIntegerObject(FILE *fp
, int enctype
) {
3071 unsigned char enc
[4];
3074 if (enctype
== REDIS_RDB_ENC_INT8
) {
3075 if (fread(enc
,1,1,fp
) == 0) return NULL
;
3076 val
= (signed char)enc
[0];
3077 } else if (enctype
== REDIS_RDB_ENC_INT16
) {
3079 if (fread(enc
,2,1,fp
) == 0) return NULL
;
3080 v
= enc
[0]|(enc
[1]<<8);
3082 } else if (enctype
== REDIS_RDB_ENC_INT32
) {
3084 if (fread(enc
,4,1,fp
) == 0) return NULL
;
3085 v
= enc
[0]|(enc
[1]<<8)|(enc
[2]<<16)|(enc
[3]<<24);
3088 val
= 0; /* anti-warning */
3091 return createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",val
));
3094 static robj
*rdbLoadLzfStringObject(FILE*fp
) {
3095 unsigned int len
, clen
;
3096 unsigned char *c
= NULL
;
3099 if ((clen
= rdbLoadLen(fp
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
3100 if ((len
= rdbLoadLen(fp
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
3101 if ((c
= zmalloc(clen
)) == NULL
) goto err
;
3102 if ((val
= sdsnewlen(NULL
,len
)) == NULL
) goto err
;
3103 if (fread(c
,clen
,1,fp
) == 0) goto err
;
3104 if (lzf_decompress(c
,clen
,val
,len
) == 0) goto err
;
3106 return createObject(REDIS_STRING
,val
);
3113 static robj
*rdbLoadStringObject(FILE*fp
) {
3118 len
= rdbLoadLen(fp
,&isencoded
);
3121 case REDIS_RDB_ENC_INT8
:
3122 case REDIS_RDB_ENC_INT16
:
3123 case REDIS_RDB_ENC_INT32
:
3124 return tryObjectSharing(rdbLoadIntegerObject(fp
,len
));
3125 case REDIS_RDB_ENC_LZF
:
3126 return tryObjectSharing(rdbLoadLzfStringObject(fp
));
3132 if (len
== REDIS_RDB_LENERR
) return NULL
;
3133 val
= sdsnewlen(NULL
,len
);
3134 if (len
&& fread(val
,len
,1,fp
) == 0) {
3138 return tryObjectSharing(createObject(REDIS_STRING
,val
));
3141 /* For information about double serialization check rdbSaveDoubleValue() */
3142 static int rdbLoadDoubleValue(FILE *fp
, double *val
) {
3146 if (fread(&len
,1,1,fp
) == 0) return -1;
3148 case 255: *val
= R_NegInf
; return 0;
3149 case 254: *val
= R_PosInf
; return 0;
3150 case 253: *val
= R_Nan
; return 0;
3152 if (fread(buf
,len
,1,fp
) == 0) return -1;
3154 sscanf(buf
, "%lg", val
);
3159 /* Load a Redis object of the specified type from the specified file.
3160 * On success a newly allocated object is returned, otherwise NULL. */
3161 static robj
*rdbLoadObject(int type
, FILE *fp
) {
3164 if (type
== REDIS_STRING
) {
3165 /* Read string value */
3166 if ((o
= rdbLoadStringObject(fp
)) == NULL
) return NULL
;
3167 tryObjectEncoding(o
);
3168 } else if (type
== REDIS_LIST
|| type
== REDIS_SET
) {
3169 /* Read list/set value */
3172 if ((listlen
= rdbLoadLen(fp
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
3173 o
= (type
== REDIS_LIST
) ? createListObject() : createSetObject();
3174 /* Load every single element of the list/set */
3178 if ((ele
= rdbLoadStringObject(fp
)) == NULL
) return NULL
;
3179 tryObjectEncoding(ele
);
3180 if (type
== REDIS_LIST
) {
3181 listAddNodeTail((list
*)o
->ptr
,ele
);
3183 dictAdd((dict
*)o
->ptr
,ele
,NULL
);
3186 } else if (type
== REDIS_ZSET
) {
3187 /* Read list/set value */
3191 if ((zsetlen
= rdbLoadLen(fp
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
3192 o
= createZsetObject();
3194 /* Load every single element of the list/set */
3197 double *score
= zmalloc(sizeof(double));
3199 if ((ele
= rdbLoadStringObject(fp
)) == NULL
) return NULL
;
3200 tryObjectEncoding(ele
);
3201 if (rdbLoadDoubleValue(fp
,score
) == -1) return NULL
;
3202 dictAdd(zs
->dict
,ele
,score
);
3203 zslInsert(zs
->zsl
,*score
,ele
);
3204 incrRefCount(ele
); /* added to skiplist */
3207 redisAssert(0 != 0);
3212 static int rdbLoad(char *filename
) {
3214 robj
*keyobj
= NULL
;
3216 int type
, retval
, rdbver
;
3217 dict
*d
= server
.db
[0].dict
;
3218 redisDb
*db
= server
.db
+0;
3220 time_t expiretime
= -1, now
= time(NULL
);
3221 long long loadedkeys
= 0;
3223 fp
= fopen(filename
,"r");
3224 if (!fp
) return REDIS_ERR
;
3225 if (fread(buf
,9,1,fp
) == 0) goto eoferr
;
3227 if (memcmp(buf
,"REDIS",5) != 0) {
3229 redisLog(REDIS_WARNING
,"Wrong signature trying to load DB from file");
3232 rdbver
= atoi(buf
+5);
3235 redisLog(REDIS_WARNING
,"Can't handle RDB format version %d",rdbver
);
3242 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
3243 if (type
== REDIS_EXPIRETIME
) {
3244 if ((expiretime
= rdbLoadTime(fp
)) == -1) goto eoferr
;
3245 /* We read the time so we need to read the object type again */
3246 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
3248 if (type
== REDIS_EOF
) break;
3249 /* Handle SELECT DB opcode as a special case */
3250 if (type
== REDIS_SELECTDB
) {
3251 if ((dbid
= rdbLoadLen(fp
,NULL
)) == REDIS_RDB_LENERR
)
3253 if (dbid
>= (unsigned)server
.dbnum
) {
3254 redisLog(REDIS_WARNING
,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server
.dbnum
);
3257 db
= server
.db
+dbid
;
3262 if ((keyobj
= rdbLoadStringObject(fp
)) == NULL
) goto eoferr
;
3264 if ((o
= rdbLoadObject(type
,fp
)) == NULL
) goto eoferr
;
3265 /* Add the new object in the hash table */
3266 retval
= dictAdd(d
,keyobj
,o
);
3267 if (retval
== DICT_ERR
) {
3268 redisLog(REDIS_WARNING
,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj
->ptr
);
3271 /* Set the expire time if needed */
3272 if (expiretime
!= -1) {
3273 setExpire(db
,keyobj
,expiretime
);
3274 /* Delete this key if already expired */
3275 if (expiretime
< now
) deleteKey(db
,keyobj
);
3279 /* Handle swapping while loading big datasets when VM is on */
3281 if (server
.vm_enabled
&& (loadedkeys
% 5000) == 0) {
3282 while (zmalloc_used_memory() > server
.vm_max_memory
) {
3283 if (vmSwapOneObject() == REDIS_ERR
) break;
3290 eoferr
: /* unexpected end of file is handled here with a fatal exit */
3291 if (keyobj
) decrRefCount(keyobj
);
3292 redisLog(REDIS_WARNING
,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
3294 return REDIS_ERR
; /* Just to avoid warning */
3297 /*================================== Commands =============================== */
3299 static void authCommand(redisClient
*c
) {
3300 if (!server
.requirepass
|| !strcmp(c
->argv
[1]->ptr
, server
.requirepass
)) {
3301 c
->authenticated
= 1;
3302 addReply(c
,shared
.ok
);
3304 c
->authenticated
= 0;
3305 addReplySds(c
,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
3309 static void pingCommand(redisClient
*c
) {
3310 addReply(c
,shared
.pong
);
3313 static void echoCommand(redisClient
*c
) {
3314 addReplyBulkLen(c
,c
->argv
[1]);
3315 addReply(c
,c
->argv
[1]);
3316 addReply(c
,shared
.crlf
);
3319 /*=================================== Strings =============================== */
3321 static void setGenericCommand(redisClient
*c
, int nx
) {
3324 if (nx
) deleteIfVolatile(c
->db
,c
->argv
[1]);
3325 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
3326 if (retval
== DICT_ERR
) {
3328 /* If the key is about a swapped value, we want a new key object
3329 * to overwrite the old. So we delete the old key in the database.
3330 * This will also make sure that swap pages about the old object
3331 * will be marked as free. */
3332 if (deleteIfSwapped(c
->db
,c
->argv
[1]))
3333 incrRefCount(c
->argv
[1]);
3334 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
3335 incrRefCount(c
->argv
[2]);
3337 addReply(c
,shared
.czero
);
3341 incrRefCount(c
->argv
[1]);
3342 incrRefCount(c
->argv
[2]);
3345 removeExpire(c
->db
,c
->argv
[1]);
3346 addReply(c
, nx
? shared
.cone
: shared
.ok
);
3349 static void setCommand(redisClient
*c
) {
3350 setGenericCommand(c
,0);
3353 static void setnxCommand(redisClient
*c
) {
3354 setGenericCommand(c
,1);
3357 static int getGenericCommand(redisClient
*c
) {
3358 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3361 addReply(c
,shared
.nullbulk
);
3364 if (o
->type
!= REDIS_STRING
) {
3365 addReply(c
,shared
.wrongtypeerr
);
3368 addReplyBulkLen(c
,o
);
3370 addReply(c
,shared
.crlf
);
3376 static void getCommand(redisClient
*c
) {
3377 getGenericCommand(c
);
3380 static void getsetCommand(redisClient
*c
) {
3381 if (getGenericCommand(c
) == REDIS_ERR
) return;
3382 if (dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]) == DICT_ERR
) {
3383 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
3385 incrRefCount(c
->argv
[1]);
3387 incrRefCount(c
->argv
[2]);
3389 removeExpire(c
->db
,c
->argv
[1]);
3392 static void mgetCommand(redisClient
*c
) {
3395 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",c
->argc
-1));
3396 for (j
= 1; j
< c
->argc
; j
++) {
3397 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[j
]);
3399 addReply(c
,shared
.nullbulk
);
3401 if (o
->type
!= REDIS_STRING
) {
3402 addReply(c
,shared
.nullbulk
);
3404 addReplyBulkLen(c
,o
);
3406 addReply(c
,shared
.crlf
);
3412 static void msetGenericCommand(redisClient
*c
, int nx
) {
3413 int j
, busykeys
= 0;
3415 if ((c
->argc
% 2) == 0) {
3416 addReplySds(c
,sdsnew("-ERR wrong number of arguments for MSET\r\n"));
3419 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
3420 * set nothing at all if at least one already key exists. */
3422 for (j
= 1; j
< c
->argc
; j
+= 2) {
3423 if (lookupKeyWrite(c
->db
,c
->argv
[j
]) != NULL
) {
3429 addReply(c
, shared
.czero
);
3433 for (j
= 1; j
< c
->argc
; j
+= 2) {
3436 tryObjectEncoding(c
->argv
[j
+1]);
3437 retval
= dictAdd(c
->db
->dict
,c
->argv
[j
],c
->argv
[j
+1]);
3438 if (retval
== DICT_ERR
) {
3439 dictReplace(c
->db
->dict
,c
->argv
[j
],c
->argv
[j
+1]);
3440 incrRefCount(c
->argv
[j
+1]);
3442 incrRefCount(c
->argv
[j
]);
3443 incrRefCount(c
->argv
[j
+1]);
3445 removeExpire(c
->db
,c
->argv
[j
]);
3447 server
.dirty
+= (c
->argc
-1)/2;
3448 addReply(c
, nx
? shared
.cone
: shared
.ok
);
3451 static void msetCommand(redisClient
*c
) {
3452 msetGenericCommand(c
,0);
3455 static void msetnxCommand(redisClient
*c
) {
3456 msetGenericCommand(c
,1);
3459 static void incrDecrCommand(redisClient
*c
, long long incr
) {
3464 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3468 if (o
->type
!= REDIS_STRING
) {
3473 if (o
->encoding
== REDIS_ENCODING_RAW
)
3474 value
= strtoll(o
->ptr
, &eptr
, 10);
3475 else if (o
->encoding
== REDIS_ENCODING_INT
)
3476 value
= (long)o
->ptr
;
3478 redisAssert(1 != 1);
3483 o
= createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",value
));
3484 tryObjectEncoding(o
);
3485 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],o
);
3486 if (retval
== DICT_ERR
) {
3487 dictReplace(c
->db
->dict
,c
->argv
[1],o
);
3488 removeExpire(c
->db
,c
->argv
[1]);
3490 incrRefCount(c
->argv
[1]);
3493 addReply(c
,shared
.colon
);
3495 addReply(c
,shared
.crlf
);
3498 static void incrCommand(redisClient
*c
) {
3499 incrDecrCommand(c
,1);
3502 static void decrCommand(redisClient
*c
) {
3503 incrDecrCommand(c
,-1);
3506 static void incrbyCommand(redisClient
*c
) {
3507 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
3508 incrDecrCommand(c
,incr
);
3511 static void decrbyCommand(redisClient
*c
) {
3512 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
3513 incrDecrCommand(c
,-incr
);
3516 /* ========================= Type agnostic commands ========================= */
3518 static void delCommand(redisClient
*c
) {
3521 for (j
= 1; j
< c
->argc
; j
++) {
3522 if (deleteKey(c
->db
,c
->argv
[j
])) {
3529 addReply(c
,shared
.czero
);
3532 addReply(c
,shared
.cone
);
3535 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",deleted
));
3540 static void existsCommand(redisClient
*c
) {
3541 addReply(c
,lookupKeyRead(c
->db
,c
->argv
[1]) ? shared
.cone
: shared
.czero
);
3544 static void selectCommand(redisClient
*c
) {
3545 int id
= atoi(c
->argv
[1]->ptr
);
3547 if (selectDb(c
,id
) == REDIS_ERR
) {
3548 addReplySds(c
,sdsnew("-ERR invalid DB index\r\n"));
3550 addReply(c
,shared
.ok
);
3554 static void randomkeyCommand(redisClient
*c
) {
3558 de
= dictGetRandomKey(c
->db
->dict
);
3559 if (!de
|| expireIfNeeded(c
->db
,dictGetEntryKey(de
)) == 0) break;
3562 addReply(c
,shared
.plus
);
3563 addReply(c
,shared
.crlf
);
3565 addReply(c
,shared
.plus
);
3566 addReply(c
,dictGetEntryKey(de
));
3567 addReply(c
,shared
.crlf
);
3571 static void keysCommand(redisClient
*c
) {
3574 sds pattern
= c
->argv
[1]->ptr
;
3575 int plen
= sdslen(pattern
);
3576 unsigned long numkeys
= 0, keyslen
= 0;
3577 robj
*lenobj
= createObject(REDIS_STRING
,NULL
);
3579 di
= dictGetIterator(c
->db
->dict
);
3581 decrRefCount(lenobj
);
3582 while((de
= dictNext(di
)) != NULL
) {
3583 robj
*keyobj
= dictGetEntryKey(de
);
3585 sds key
= keyobj
->ptr
;
3586 if ((pattern
[0] == '*' && pattern
[1] == '\0') ||
3587 stringmatchlen(pattern
,plen
,key
,sdslen(key
),0)) {
3588 if (expireIfNeeded(c
->db
,keyobj
) == 0) {
3590 addReply(c
,shared
.space
);
3593 keyslen
+= sdslen(key
);
3597 dictReleaseIterator(di
);
3598 lenobj
->ptr
= sdscatprintf(sdsempty(),"$%lu\r\n",keyslen
+(numkeys
? (numkeys
-1) : 0));
3599 addReply(c
,shared
.crlf
);
3602 static void dbsizeCommand(redisClient
*c
) {
3604 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c
->db
->dict
)));
3607 static void lastsaveCommand(redisClient
*c
) {
3609 sdscatprintf(sdsempty(),":%lu\r\n",server
.lastsave
));
3612 static void typeCommand(redisClient
*c
) {
3616 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3621 case REDIS_STRING
: type
= "+string"; break;
3622 case REDIS_LIST
: type
= "+list"; break;
3623 case REDIS_SET
: type
= "+set"; break;
3624 case REDIS_ZSET
: type
= "+zset"; break;
3625 default: type
= "unknown"; break;
3628 addReplySds(c
,sdsnew(type
));
3629 addReply(c
,shared
.crlf
);
3632 static void saveCommand(redisClient
*c
) {
3633 if (server
.bgsavechildpid
!= -1) {
3634 addReplySds(c
,sdsnew("-ERR background save in progress\r\n"));
3637 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
3638 addReply(c
,shared
.ok
);
3640 addReply(c
,shared
.err
);
3644 static void bgsaveCommand(redisClient
*c
) {
3645 if (server
.bgsavechildpid
!= -1) {
3646 addReplySds(c
,sdsnew("-ERR background save already in progress\r\n"));
3649 if (rdbSaveBackground(server
.dbfilename
) == REDIS_OK
) {
3650 char *status
= "+Background saving started\r\n";
3651 addReplySds(c
,sdsnew(status
));
3653 addReply(c
,shared
.err
);
3657 static void shutdownCommand(redisClient
*c
) {
3658 redisLog(REDIS_WARNING
,"User requested shutdown, saving DB...");
3659 /* Kill the saving child if there is a background saving in progress.
3660 We want to avoid race conditions, for instance our saving child may
3661 overwrite the synchronous saving did by SHUTDOWN. */
3662 if (server
.bgsavechildpid
!= -1) {
3663 redisLog(REDIS_WARNING
,"There is a live saving child. Killing it!");
3664 kill(server
.bgsavechildpid
,SIGKILL
);
3665 rdbRemoveTempFile(server
.bgsavechildpid
);
3667 if (server
.appendonly
) {
3668 /* Append only file: fsync() the AOF and exit */
3669 fsync(server
.appendfd
);
3672 /* Snapshotting. Perform a SYNC SAVE and exit */
3673 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
3674 if (server
.daemonize
)
3675 unlink(server
.pidfile
);
3676 redisLog(REDIS_WARNING
,"%zu bytes used at exit",zmalloc_used_memory());
3677 redisLog(REDIS_WARNING
,"Server exit now, bye bye...");
3680 /* Ooops.. error saving! The best we can do is to continue operating.
3681 * Note that if there was a background saving process, in the next
3682 * cron() Redis will be notified that the background saving aborted,
3683 * handling special stuff like slaves pending for synchronization... */
3684 redisLog(REDIS_WARNING
,"Error trying to save the DB, can't exit");
3685 addReplySds(c
,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3690 static void renameGenericCommand(redisClient
*c
, int nx
) {
3693 /* To use the same key as src and dst is probably an error */
3694 if (sdscmp(c
->argv
[1]->ptr
,c
->argv
[2]->ptr
) == 0) {
3695 addReply(c
,shared
.sameobjecterr
);
3699 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3701 addReply(c
,shared
.nokeyerr
);
3705 deleteIfVolatile(c
->db
,c
->argv
[2]);
3706 if (dictAdd(c
->db
->dict
,c
->argv
[2],o
) == DICT_ERR
) {
3709 addReply(c
,shared
.czero
);
3712 dictReplace(c
->db
->dict
,c
->argv
[2],o
);
3714 incrRefCount(c
->argv
[2]);
3716 deleteKey(c
->db
,c
->argv
[1]);
3718 addReply(c
,nx
? shared
.cone
: shared
.ok
);
3721 static void renameCommand(redisClient
*c
) {
3722 renameGenericCommand(c
,0);
3725 static void renamenxCommand(redisClient
*c
) {
3726 renameGenericCommand(c
,1);
3729 static void moveCommand(redisClient
*c
) {
3734 /* Obtain source and target DB pointers */
3737 if (selectDb(c
,atoi(c
->argv
[2]->ptr
)) == REDIS_ERR
) {
3738 addReply(c
,shared
.outofrangeerr
);
3742 selectDb(c
,srcid
); /* Back to the source DB */
3744 /* If the user is moving using as target the same
3745 * DB as the source DB it is probably an error. */
3747 addReply(c
,shared
.sameobjecterr
);
3751 /* Check if the element exists and get a reference */
3752 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3754 addReply(c
,shared
.czero
);
3758 /* Try to add the element to the target DB */
3759 deleteIfVolatile(dst
,c
->argv
[1]);
3760 if (dictAdd(dst
->dict
,c
->argv
[1],o
) == DICT_ERR
) {
3761 addReply(c
,shared
.czero
);
3764 incrRefCount(c
->argv
[1]);
3767 /* OK! key moved, free the entry in the source DB */
3768 deleteKey(src
,c
->argv
[1]);
3770 addReply(c
,shared
.cone
);
3773 /* =================================== Lists ================================ */
3774 static void pushGenericCommand(redisClient
*c
, int where
) {
3778 lobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3780 if (handleClientsWaitingListPush(c
,c
->argv
[1],c
->argv
[2])) {
3781 addReply(c
,shared
.ok
);
3784 lobj
= createListObject();
3786 if (where
== REDIS_HEAD
) {
3787 listAddNodeHead(list
,c
->argv
[2]);
3789 listAddNodeTail(list
,c
->argv
[2]);
3791 dictAdd(c
->db
->dict
,c
->argv
[1],lobj
);
3792 incrRefCount(c
->argv
[1]);
3793 incrRefCount(c
->argv
[2]);
3795 if (lobj
->type
!= REDIS_LIST
) {
3796 addReply(c
,shared
.wrongtypeerr
);
3799 if (handleClientsWaitingListPush(c
,c
->argv
[1],c
->argv
[2])) {
3800 addReply(c
,shared
.ok
);
3804 if (where
== REDIS_HEAD
) {
3805 listAddNodeHead(list
,c
->argv
[2]);
3807 listAddNodeTail(list
,c
->argv
[2]);
3809 incrRefCount(c
->argv
[2]);
3812 addReply(c
,shared
.ok
);
3815 static void lpushCommand(redisClient
*c
) {
3816 pushGenericCommand(c
,REDIS_HEAD
);
3819 static void rpushCommand(redisClient
*c
) {
3820 pushGenericCommand(c
,REDIS_TAIL
);
3823 static void llenCommand(redisClient
*c
) {
3827 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3829 addReply(c
,shared
.czero
);
3832 if (o
->type
!= REDIS_LIST
) {
3833 addReply(c
,shared
.wrongtypeerr
);
3836 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",listLength(l
)));
3841 static void lindexCommand(redisClient
*c
) {
3843 int index
= atoi(c
->argv
[2]->ptr
);
3845 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3847 addReply(c
,shared
.nullbulk
);
3849 if (o
->type
!= REDIS_LIST
) {
3850 addReply(c
,shared
.wrongtypeerr
);
3852 list
*list
= o
->ptr
;
3855 ln
= listIndex(list
, index
);
3857 addReply(c
,shared
.nullbulk
);
3859 robj
*ele
= listNodeValue(ln
);
3860 addReplyBulkLen(c
,ele
);
3862 addReply(c
,shared
.crlf
);
3868 static void lsetCommand(redisClient
*c
) {
3870 int index
= atoi(c
->argv
[2]->ptr
);
3872 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3874 addReply(c
,shared
.nokeyerr
);
3876 if (o
->type
!= REDIS_LIST
) {
3877 addReply(c
,shared
.wrongtypeerr
);
3879 list
*list
= o
->ptr
;
3882 ln
= listIndex(list
, index
);
3884 addReply(c
,shared
.outofrangeerr
);
3886 robj
*ele
= listNodeValue(ln
);
3889 listNodeValue(ln
) = c
->argv
[3];
3890 incrRefCount(c
->argv
[3]);
3891 addReply(c
,shared
.ok
);
3898 static void popGenericCommand(redisClient
*c
, int where
) {
3901 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3903 addReply(c
,shared
.nullbulk
);
3905 if (o
->type
!= REDIS_LIST
) {
3906 addReply(c
,shared
.wrongtypeerr
);
3908 list
*list
= o
->ptr
;
3911 if (where
== REDIS_HEAD
)
3912 ln
= listFirst(list
);
3914 ln
= listLast(list
);
3917 addReply(c
,shared
.nullbulk
);
3919 robj
*ele
= listNodeValue(ln
);
3920 addReplyBulkLen(c
,ele
);
3922 addReply(c
,shared
.crlf
);
3923 listDelNode(list
,ln
);
3930 static void lpopCommand(redisClient
*c
) {
3931 popGenericCommand(c
,REDIS_HEAD
);
3934 static void rpopCommand(redisClient
*c
) {
3935 popGenericCommand(c
,REDIS_TAIL
);
3938 static void lrangeCommand(redisClient
*c
) {
3940 int start
= atoi(c
->argv
[2]->ptr
);
3941 int end
= atoi(c
->argv
[3]->ptr
);
3943 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3945 addReply(c
,shared
.nullmultibulk
);
3947 if (o
->type
!= REDIS_LIST
) {
3948 addReply(c
,shared
.wrongtypeerr
);
3950 list
*list
= o
->ptr
;
3952 int llen
= listLength(list
);
3956 /* convert negative indexes */
3957 if (start
< 0) start
= llen
+start
;
3958 if (end
< 0) end
= llen
+end
;
3959 if (start
< 0) start
= 0;
3960 if (end
< 0) end
= 0;
3962 /* indexes sanity checks */
3963 if (start
> end
|| start
>= llen
) {
3964 /* Out of range start or start > end result in empty list */
3965 addReply(c
,shared
.emptymultibulk
);
3968 if (end
>= llen
) end
= llen
-1;
3969 rangelen
= (end
-start
)+1;
3971 /* Return the result in form of a multi-bulk reply */
3972 ln
= listIndex(list
, start
);
3973 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",rangelen
));
3974 for (j
= 0; j
< rangelen
; j
++) {
3975 ele
= listNodeValue(ln
);
3976 addReplyBulkLen(c
,ele
);
3978 addReply(c
,shared
.crlf
);
3985 static void ltrimCommand(redisClient
*c
) {
3987 int start
= atoi(c
->argv
[2]->ptr
);
3988 int end
= atoi(c
->argv
[3]->ptr
);
3990 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3992 addReply(c
,shared
.ok
);
3994 if (o
->type
!= REDIS_LIST
) {
3995 addReply(c
,shared
.wrongtypeerr
);
3997 list
*list
= o
->ptr
;
3999 int llen
= listLength(list
);
4000 int j
, ltrim
, rtrim
;
4002 /* convert negative indexes */
4003 if (start
< 0) start
= llen
+start
;
4004 if (end
< 0) end
= llen
+end
;
4005 if (start
< 0) start
= 0;
4006 if (end
< 0) end
= 0;
4008 /* indexes sanity checks */
4009 if (start
> end
|| start
>= llen
) {
4010 /* Out of range start or start > end result in empty list */
4014 if (end
>= llen
) end
= llen
-1;
4019 /* Remove list elements to perform the trim */
4020 for (j
= 0; j
< ltrim
; j
++) {
4021 ln
= listFirst(list
);
4022 listDelNode(list
,ln
);
4024 for (j
= 0; j
< rtrim
; j
++) {
4025 ln
= listLast(list
);
4026 listDelNode(list
,ln
);
4029 addReply(c
,shared
.ok
);
4034 static void lremCommand(redisClient
*c
) {
4037 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4039 addReply(c
,shared
.czero
);
4041 if (o
->type
!= REDIS_LIST
) {
4042 addReply(c
,shared
.wrongtypeerr
);
4044 list
*list
= o
->ptr
;
4045 listNode
*ln
, *next
;
4046 int toremove
= atoi(c
->argv
[2]->ptr
);
4051 toremove
= -toremove
;
4054 ln
= fromtail
? list
->tail
: list
->head
;
4056 robj
*ele
= listNodeValue(ln
);
4058 next
= fromtail
? ln
->prev
: ln
->next
;
4059 if (compareStringObjects(ele
,c
->argv
[3]) == 0) {
4060 listDelNode(list
,ln
);
4063 if (toremove
&& removed
== toremove
) break;
4067 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",removed
));
4072 /* This is the semantic of this command:
4073 * RPOPLPUSH srclist dstlist:
4074 * IF LLEN(srclist) > 0
4075 * element = RPOP srclist
4076 * LPUSH dstlist element
4083 * The idea is to be able to get an element from a list in a reliable way
4084 * since the element is not just returned but pushed against another list
4085 * as well. This command was originally proposed by Ezra Zygmuntowicz.
4087 static void rpoplpushcommand(redisClient
*c
) {
4090 sobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4092 addReply(c
,shared
.nullbulk
);
4094 if (sobj
->type
!= REDIS_LIST
) {
4095 addReply(c
,shared
.wrongtypeerr
);
4097 list
*srclist
= sobj
->ptr
;
4098 listNode
*ln
= listLast(srclist
);
4101 addReply(c
,shared
.nullbulk
);
4103 robj
*dobj
= lookupKeyWrite(c
->db
,c
->argv
[2]);
4104 robj
*ele
= listNodeValue(ln
);
4107 if (dobj
&& dobj
->type
!= REDIS_LIST
) {
4108 addReply(c
,shared
.wrongtypeerr
);
4112 /* Add the element to the target list (unless it's directly
4113 * passed to some BLPOP-ing client */
4114 if (!handleClientsWaitingListPush(c
,c
->argv
[2],ele
)) {
4116 /* Create the list if the key does not exist */
4117 dobj
= createListObject();
4118 dictAdd(c
->db
->dict
,c
->argv
[2],dobj
);
4119 incrRefCount(c
->argv
[2]);
4121 dstlist
= dobj
->ptr
;
4122 listAddNodeHead(dstlist
,ele
);
4126 /* Send the element to the client as reply as well */
4127 addReplyBulkLen(c
,ele
);
4129 addReply(c
,shared
.crlf
);
4131 /* Finally remove the element from the source list */
4132 listDelNode(srclist
,ln
);
4140 /* ==================================== Sets ================================ */
4142 static void saddCommand(redisClient
*c
) {
4145 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4147 set
= createSetObject();
4148 dictAdd(c
->db
->dict
,c
->argv
[1],set
);
4149 incrRefCount(c
->argv
[1]);
4151 if (set
->type
!= REDIS_SET
) {
4152 addReply(c
,shared
.wrongtypeerr
);
4156 if (dictAdd(set
->ptr
,c
->argv
[2],NULL
) == DICT_OK
) {
4157 incrRefCount(c
->argv
[2]);
4159 addReply(c
,shared
.cone
);
4161 addReply(c
,shared
.czero
);
4165 static void sremCommand(redisClient
*c
) {
4168 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4170 addReply(c
,shared
.czero
);
4172 if (set
->type
!= REDIS_SET
) {
4173 addReply(c
,shared
.wrongtypeerr
);
4176 if (dictDelete(set
->ptr
,c
->argv
[2]) == DICT_OK
) {
4178 if (htNeedsResize(set
->ptr
)) dictResize(set
->ptr
);
4179 addReply(c
,shared
.cone
);
4181 addReply(c
,shared
.czero
);
4186 static void smoveCommand(redisClient
*c
) {
4187 robj
*srcset
, *dstset
;
4189 srcset
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4190 dstset
= lookupKeyWrite(c
->db
,c
->argv
[2]);
4192 /* If the source key does not exist return 0, if it's of the wrong type
4194 if (srcset
== NULL
|| srcset
->type
!= REDIS_SET
) {
4195 addReply(c
, srcset
? shared
.wrongtypeerr
: shared
.czero
);
4198 /* Error if the destination key is not a set as well */
4199 if (dstset
&& dstset
->type
!= REDIS_SET
) {
4200 addReply(c
,shared
.wrongtypeerr
);
4203 /* Remove the element from the source set */
4204 if (dictDelete(srcset
->ptr
,c
->argv
[3]) == DICT_ERR
) {
4205 /* Key not found in the src set! return zero */
4206 addReply(c
,shared
.czero
);
4210 /* Add the element to the destination set */
4212 dstset
= createSetObject();
4213 dictAdd(c
->db
->dict
,c
->argv
[2],dstset
);
4214 incrRefCount(c
->argv
[2]);
4216 if (dictAdd(dstset
->ptr
,c
->argv
[3],NULL
) == DICT_OK
)
4217 incrRefCount(c
->argv
[3]);
4218 addReply(c
,shared
.cone
);
4221 static void sismemberCommand(redisClient
*c
) {
4224 set
= lookupKeyRead(c
->db
,c
->argv
[1]);
4226 addReply(c
,shared
.czero
);
4228 if (set
->type
!= REDIS_SET
) {
4229 addReply(c
,shared
.wrongtypeerr
);
4232 if (dictFind(set
->ptr
,c
->argv
[2]))
4233 addReply(c
,shared
.cone
);
4235 addReply(c
,shared
.czero
);
4239 static void scardCommand(redisClient
*c
) {
4243 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4245 addReply(c
,shared
.czero
);
4248 if (o
->type
!= REDIS_SET
) {
4249 addReply(c
,shared
.wrongtypeerr
);
4252 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",
4258 static void spopCommand(redisClient
*c
) {
4262 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4264 addReply(c
,shared
.nullbulk
);
4266 if (set
->type
!= REDIS_SET
) {
4267 addReply(c
,shared
.wrongtypeerr
);
4270 de
= dictGetRandomKey(set
->ptr
);
4272 addReply(c
,shared
.nullbulk
);
4274 robj
*ele
= dictGetEntryKey(de
);
4276 addReplyBulkLen(c
,ele
);
4278 addReply(c
,shared
.crlf
);
4279 dictDelete(set
->ptr
,ele
);
4280 if (htNeedsResize(set
->ptr
)) dictResize(set
->ptr
);
4286 static void srandmemberCommand(redisClient
*c
) {
4290 set
= lookupKeyRead(c
->db
,c
->argv
[1]);
4292 addReply(c
,shared
.nullbulk
);
4294 if (set
->type
!= REDIS_SET
) {
4295 addReply(c
,shared
.wrongtypeerr
);
4298 de
= dictGetRandomKey(set
->ptr
);
4300 addReply(c
,shared
.nullbulk
);
4302 robj
*ele
= dictGetEntryKey(de
);
4304 addReplyBulkLen(c
,ele
);
4306 addReply(c
,shared
.crlf
);
4311 static int qsortCompareSetsByCardinality(const void *s1
, const void *s2
) {
4312 dict
**d1
= (void*) s1
, **d2
= (void*) s2
;
4314 return dictSize(*d1
)-dictSize(*d2
);
4317 static void sinterGenericCommand(redisClient
*c
, robj
**setskeys
, unsigned long setsnum
, robj
*dstkey
) {
4318 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
4321 robj
*lenobj
= NULL
, *dstset
= NULL
;
4322 unsigned long j
, cardinality
= 0;
4324 for (j
= 0; j
< setsnum
; j
++) {
4328 lookupKeyWrite(c
->db
,setskeys
[j
]) :
4329 lookupKeyRead(c
->db
,setskeys
[j
]);
4333 if (deleteKey(c
->db
,dstkey
))
4335 addReply(c
,shared
.czero
);
4337 addReply(c
,shared
.nullmultibulk
);
4341 if (setobj
->type
!= REDIS_SET
) {
4343 addReply(c
,shared
.wrongtypeerr
);
4346 dv
[j
] = setobj
->ptr
;
4348 /* Sort sets from the smallest to largest, this will improve our
4349 * algorithm's performace */
4350 qsort(dv
,setsnum
,sizeof(dict
*),qsortCompareSetsByCardinality
);
4352 /* The first thing we should output is the total number of elements...
4353 * since this is a multi-bulk write, but at this stage we don't know
4354 * the intersection set size, so we use a trick, append an empty object
4355 * to the output list and save the pointer to later modify it with the
4358 lenobj
= createObject(REDIS_STRING
,NULL
);
4360 decrRefCount(lenobj
);
4362 /* If we have a target key where to store the resulting set
4363 * create this key with an empty set inside */
4364 dstset
= createSetObject();
4367 /* Iterate all the elements of the first (smallest) set, and test
4368 * the element against all the other sets, if at least one set does
4369 * not include the element it is discarded */
4370 di
= dictGetIterator(dv
[0]);
4372 while((de
= dictNext(di
)) != NULL
) {
4375 for (j
= 1; j
< setsnum
; j
++)
4376 if (dictFind(dv
[j
],dictGetEntryKey(de
)) == NULL
) break;
4378 continue; /* at least one set does not contain the member */
4379 ele
= dictGetEntryKey(de
);
4381 addReplyBulkLen(c
,ele
);
4383 addReply(c
,shared
.crlf
);
4386 dictAdd(dstset
->ptr
,ele
,NULL
);
4390 dictReleaseIterator(di
);
4393 /* Store the resulting set into the target */
4394 deleteKey(c
->db
,dstkey
);
4395 dictAdd(c
->db
->dict
,dstkey
,dstset
);
4396 incrRefCount(dstkey
);
4400 lenobj
->ptr
= sdscatprintf(sdsempty(),"*%lu\r\n",cardinality
);
4402 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",
4403 dictSize((dict
*)dstset
->ptr
)));
4409 static void sinterCommand(redisClient
*c
) {
4410 sinterGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
);
4413 static void sinterstoreCommand(redisClient
*c
) {
4414 sinterGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1]);
4417 #define REDIS_OP_UNION 0
4418 #define REDIS_OP_DIFF 1
4420 static void sunionDiffGenericCommand(redisClient
*c
, robj
**setskeys
, int setsnum
, robj
*dstkey
, int op
) {
4421 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
4424 robj
*dstset
= NULL
;
4425 int j
, cardinality
= 0;
4427 for (j
= 0; j
< setsnum
; j
++) {
4431 lookupKeyWrite(c
->db
,setskeys
[j
]) :
4432 lookupKeyRead(c
->db
,setskeys
[j
]);
4437 if (setobj
->type
!= REDIS_SET
) {
4439 addReply(c
,shared
.wrongtypeerr
);
4442 dv
[j
] = setobj
->ptr
;
4445 /* We need a temp set object to store our union. If the dstkey
4446 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
4447 * this set object will be the resulting object to set into the target key*/
4448 dstset
= createSetObject();
4450 /* Iterate all the elements of all the sets, add every element a single
4451 * time to the result set */
4452 for (j
= 0; j
< setsnum
; j
++) {
4453 if (op
== REDIS_OP_DIFF
&& j
== 0 && !dv
[j
]) break; /* result set is empty */
4454 if (!dv
[j
]) continue; /* non existing keys are like empty sets */
4456 di
= dictGetIterator(dv
[j
]);
4458 while((de
= dictNext(di
)) != NULL
) {
4461 /* dictAdd will not add the same element multiple times */
4462 ele
= dictGetEntryKey(de
);
4463 if (op
== REDIS_OP_UNION
|| j
== 0) {
4464 if (dictAdd(dstset
->ptr
,ele
,NULL
) == DICT_OK
) {
4468 } else if (op
== REDIS_OP_DIFF
) {
4469 if (dictDelete(dstset
->ptr
,ele
) == DICT_OK
) {
4474 dictReleaseIterator(di
);
4476 if (op
== REDIS_OP_DIFF
&& cardinality
== 0) break; /* result set is empty */
4479 /* Output the content of the resulting set, if not in STORE mode */
4481 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",cardinality
));
4482 di
= dictGetIterator(dstset
->ptr
);
4483 while((de
= dictNext(di
)) != NULL
) {
4486 ele
= dictGetEntryKey(de
);
4487 addReplyBulkLen(c
,ele
);
4489 addReply(c
,shared
.crlf
);
4491 dictReleaseIterator(di
);
4493 /* If we have a target key where to store the resulting set
4494 * create this key with the result set inside */
4495 deleteKey(c
->db
,dstkey
);
4496 dictAdd(c
->db
->dict
,dstkey
,dstset
);
4497 incrRefCount(dstkey
);
4502 decrRefCount(dstset
);
4504 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",
4505 dictSize((dict
*)dstset
->ptr
)));
4511 static void sunionCommand(redisClient
*c
) {
4512 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_UNION
);
4515 static void sunionstoreCommand(redisClient
*c
) {
4516 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_UNION
);
4519 static void sdiffCommand(redisClient
*c
) {
4520 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_DIFF
);
4523 static void sdiffstoreCommand(redisClient
*c
) {
4524 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_DIFF
);
4527 /* ==================================== ZSets =============================== */
4529 /* ZSETs are ordered sets using two data structures to hold the same elements
4530 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
4533 * The elements are added to an hash table mapping Redis objects to scores.
4534 * At the same time the elements are added to a skip list mapping scores
4535 * to Redis objects (so objects are sorted by scores in this "view"). */
4537 /* This skiplist implementation is almost a C translation of the original
4538 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
4539 * Alternative to Balanced Trees", modified in three ways:
4540 * a) this implementation allows for repeated values.
4541 * b) the comparison is not just by key (our 'score') but by satellite data.
4542 * c) there is a back pointer, so it's a doubly linked list with the back
4543 * pointers being only at "level 1". This allows to traverse the list
4544 * from tail to head, useful for ZREVRANGE. */
4546 static zskiplistNode
*zslCreateNode(int level
, double score
, robj
*obj
) {
4547 zskiplistNode
*zn
= zmalloc(sizeof(*zn
));
4549 zn
->forward
= zmalloc(sizeof(zskiplistNode
*) * level
);
4555 static zskiplist
*zslCreate(void) {
4559 zsl
= zmalloc(sizeof(*zsl
));
4562 zsl
->header
= zslCreateNode(ZSKIPLIST_MAXLEVEL
,0,NULL
);
4563 for (j
= 0; j
< ZSKIPLIST_MAXLEVEL
; j
++)
4564 zsl
->header
->forward
[j
] = NULL
;
4565 zsl
->header
->backward
= NULL
;
4570 static void zslFreeNode(zskiplistNode
*node
) {
4571 decrRefCount(node
->obj
);
4572 zfree(node
->forward
);
4576 static void zslFree(zskiplist
*zsl
) {
4577 zskiplistNode
*node
= zsl
->header
->forward
[0], *next
;
4579 zfree(zsl
->header
->forward
);
4582 next
= node
->forward
[0];
4589 static int zslRandomLevel(void) {
4591 while ((random()&0xFFFF) < (ZSKIPLIST_P
* 0xFFFF))
4596 static void zslInsert(zskiplist
*zsl
, double score
, robj
*obj
) {
4597 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
4601 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4602 while (x
->forward
[i
] &&
4603 (x
->forward
[i
]->score
< score
||
4604 (x
->forward
[i
]->score
== score
&&
4605 compareStringObjects(x
->forward
[i
]->obj
,obj
) < 0)))
4609 /* we assume the key is not already inside, since we allow duplicated
4610 * scores, and the re-insertion of score and redis object should never
4611 * happpen since the caller of zslInsert() should test in the hash table
4612 * if the element is already inside or not. */
4613 level
= zslRandomLevel();
4614 if (level
> zsl
->level
) {
4615 for (i
= zsl
->level
; i
< level
; i
++)
4616 update
[i
] = zsl
->header
;
4619 x
= zslCreateNode(level
,score
,obj
);
4620 for (i
= 0; i
< level
; i
++) {
4621 x
->forward
[i
] = update
[i
]->forward
[i
];
4622 update
[i
]->forward
[i
] = x
;
4624 x
->backward
= (update
[0] == zsl
->header
) ? NULL
: update
[0];
4626 x
->forward
[0]->backward
= x
;
4632 /* Delete an element with matching score/object from the skiplist. */
4633 static int zslDelete(zskiplist
*zsl
, double score
, robj
*obj
) {
4634 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
4638 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4639 while (x
->forward
[i
] &&
4640 (x
->forward
[i
]->score
< score
||
4641 (x
->forward
[i
]->score
== score
&&
4642 compareStringObjects(x
->forward
[i
]->obj
,obj
) < 0)))
4646 /* We may have multiple elements with the same score, what we need
4647 * is to find the element with both the right score and object. */
4649 if (x
&& score
== x
->score
&& compareStringObjects(x
->obj
,obj
) == 0) {
4650 for (i
= 0; i
< zsl
->level
; i
++) {
4651 if (update
[i
]->forward
[i
] != x
) break;
4652 update
[i
]->forward
[i
] = x
->forward
[i
];
4654 if (x
->forward
[0]) {
4655 x
->forward
[0]->backward
= (x
->backward
== zsl
->header
) ?
4658 zsl
->tail
= x
->backward
;
4661 while(zsl
->level
> 1 && zsl
->header
->forward
[zsl
->level
-1] == NULL
)
4666 return 0; /* not found */
4668 return 0; /* not found */
4671 /* Delete all the elements with score between min and max from the skiplist.
4672 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
4673 * Note that this function takes the reference to the hash table view of the
4674 * sorted set, in order to remove the elements from the hash table too. */
4675 static unsigned long zslDeleteRange(zskiplist
*zsl
, double min
, double max
, dict
*dict
) {
4676 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
4677 unsigned long removed
= 0;
4681 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4682 while (x
->forward
[i
] && x
->forward
[i
]->score
< min
)
4686 /* We may have multiple elements with the same score, what we need
4687 * is to find the element with both the right score and object. */
4689 while (x
&& x
->score
<= max
) {
4690 zskiplistNode
*next
;
4692 for (i
= 0; i
< zsl
->level
; i
++) {
4693 if (update
[i
]->forward
[i
] != x
) break;
4694 update
[i
]->forward
[i
] = x
->forward
[i
];
4696 if (x
->forward
[0]) {
4697 x
->forward
[0]->backward
= (x
->backward
== zsl
->header
) ?
4700 zsl
->tail
= x
->backward
;
4702 next
= x
->forward
[0];
4703 dictDelete(dict
,x
->obj
);
4705 while(zsl
->level
> 1 && zsl
->header
->forward
[zsl
->level
-1] == NULL
)
4711 return removed
; /* not found */
4714 /* Find the first node having a score equal or greater than the specified one.
4715 * Returns NULL if there is no match. */
4716 static zskiplistNode
*zslFirstWithScore(zskiplist
*zsl
, double score
) {
4721 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4722 while (x
->forward
[i
] && x
->forward
[i
]->score
< score
)
4725 /* We may have multiple elements with the same score, what we need
4726 * is to find the element with both the right score and object. */
4727 return x
->forward
[0];
4730 /* The actual Z-commands implementations */
4732 /* This generic command implements both ZADD and ZINCRBY.
4733 * scoreval is the score if the operation is a ZADD (doincrement == 0) or
4734 * the increment if the operation is a ZINCRBY (doincrement == 1). */
4735 static void zaddGenericCommand(redisClient
*c
, robj
*key
, robj
*ele
, double scoreval
, int doincrement
) {
4740 zsetobj
= lookupKeyWrite(c
->db
,key
);
4741 if (zsetobj
== NULL
) {
4742 zsetobj
= createZsetObject();
4743 dictAdd(c
->db
->dict
,key
,zsetobj
);
4746 if (zsetobj
->type
!= REDIS_ZSET
) {
4747 addReply(c
,shared
.wrongtypeerr
);
4753 /* Ok now since we implement both ZADD and ZINCRBY here the code
4754 * needs to handle the two different conditions. It's all about setting
4755 * '*score', that is, the new score to set, to the right value. */
4756 score
= zmalloc(sizeof(double));
4760 /* Read the old score. If the element was not present starts from 0 */
4761 de
= dictFind(zs
->dict
,ele
);
4763 double *oldscore
= dictGetEntryVal(de
);
4764 *score
= *oldscore
+ scoreval
;
4772 /* What follows is a simple remove and re-insert operation that is common
4773 * to both ZADD and ZINCRBY... */
4774 if (dictAdd(zs
->dict
,ele
,score
) == DICT_OK
) {
4775 /* case 1: New element */
4776 incrRefCount(ele
); /* added to hash */
4777 zslInsert(zs
->zsl
,*score
,ele
);
4778 incrRefCount(ele
); /* added to skiplist */
4781 addReplyDouble(c
,*score
);
4783 addReply(c
,shared
.cone
);
4788 /* case 2: Score update operation */
4789 de
= dictFind(zs
->dict
,ele
);
4790 redisAssert(de
!= NULL
);
4791 oldscore
= dictGetEntryVal(de
);
4792 if (*score
!= *oldscore
) {
4795 /* Remove and insert the element in the skip list with new score */
4796 deleted
= zslDelete(zs
->zsl
,*oldscore
,ele
);
4797 redisAssert(deleted
!= 0);
4798 zslInsert(zs
->zsl
,*score
,ele
);
4800 /* Update the score in the hash table */
4801 dictReplace(zs
->dict
,ele
,score
);
4807 addReplyDouble(c
,*score
);
4809 addReply(c
,shared
.czero
);
4813 static void zaddCommand(redisClient
*c
) {
4816 scoreval
= strtod(c
->argv
[2]->ptr
,NULL
);
4817 zaddGenericCommand(c
,c
->argv
[1],c
->argv
[3],scoreval
,0);
4820 static void zincrbyCommand(redisClient
*c
) {
4823 scoreval
= strtod(c
->argv
[2]->ptr
,NULL
);
4824 zaddGenericCommand(c
,c
->argv
[1],c
->argv
[3],scoreval
,1);
4827 static void zremCommand(redisClient
*c
) {
4831 zsetobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4832 if (zsetobj
== NULL
) {
4833 addReply(c
,shared
.czero
);
4839 if (zsetobj
->type
!= REDIS_ZSET
) {
4840 addReply(c
,shared
.wrongtypeerr
);
4844 de
= dictFind(zs
->dict
,c
->argv
[2]);
4846 addReply(c
,shared
.czero
);
4849 /* Delete from the skiplist */
4850 oldscore
= dictGetEntryVal(de
);
4851 deleted
= zslDelete(zs
->zsl
,*oldscore
,c
->argv
[2]);
4852 redisAssert(deleted
!= 0);
4854 /* Delete from the hash table */
4855 dictDelete(zs
->dict
,c
->argv
[2]);
4856 if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
);
4858 addReply(c
,shared
.cone
);
4862 static void zremrangebyscoreCommand(redisClient
*c
) {
4863 double min
= strtod(c
->argv
[2]->ptr
,NULL
);
4864 double max
= strtod(c
->argv
[3]->ptr
,NULL
);
4868 zsetobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4869 if (zsetobj
== NULL
) {
4870 addReply(c
,shared
.czero
);
4874 if (zsetobj
->type
!= REDIS_ZSET
) {
4875 addReply(c
,shared
.wrongtypeerr
);
4879 deleted
= zslDeleteRange(zs
->zsl
,min
,max
,zs
->dict
);
4880 if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
);
4881 server
.dirty
+= deleted
;
4882 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",deleted
));
4886 static void zrangeGenericCommand(redisClient
*c
, int reverse
) {
4888 int start
= atoi(c
->argv
[2]->ptr
);
4889 int end
= atoi(c
->argv
[3]->ptr
);
4892 if (c
->argc
== 5 && !strcasecmp(c
->argv
[4]->ptr
,"withscores")) {
4894 } else if (c
->argc
>= 5) {
4895 addReply(c
,shared
.syntaxerr
);
4899 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4901 addReply(c
,shared
.nullmultibulk
);
4903 if (o
->type
!= REDIS_ZSET
) {
4904 addReply(c
,shared
.wrongtypeerr
);
4906 zset
*zsetobj
= o
->ptr
;
4907 zskiplist
*zsl
= zsetobj
->zsl
;
4910 int llen
= zsl
->length
;
4914 /* convert negative indexes */
4915 if (start
< 0) start
= llen
+start
;
4916 if (end
< 0) end
= llen
+end
;
4917 if (start
< 0) start
= 0;
4918 if (end
< 0) end
= 0;
4920 /* indexes sanity checks */
4921 if (start
> end
|| start
>= llen
) {
4922 /* Out of range start or start > end result in empty list */
4923 addReply(c
,shared
.emptymultibulk
);
4926 if (end
>= llen
) end
= llen
-1;
4927 rangelen
= (end
-start
)+1;
4929 /* Return the result in form of a multi-bulk reply */
4935 ln
= zsl
->header
->forward
[0];
4937 ln
= ln
->forward
[0];
4940 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",
4941 withscores
? (rangelen
*2) : rangelen
));
4942 for (j
= 0; j
< rangelen
; j
++) {
4944 addReplyBulkLen(c
,ele
);
4946 addReply(c
,shared
.crlf
);
4948 addReplyDouble(c
,ln
->score
);
4949 ln
= reverse
? ln
->backward
: ln
->forward
[0];
4955 static void zrangeCommand(redisClient
*c
) {
4956 zrangeGenericCommand(c
,0);
4959 static void zrevrangeCommand(redisClient
*c
) {
4960 zrangeGenericCommand(c
,1);
4963 static void zrangebyscoreCommand(redisClient
*c
) {
4965 double min
= strtod(c
->argv
[2]->ptr
,NULL
);
4966 double max
= strtod(c
->argv
[3]->ptr
,NULL
);
4967 int offset
= 0, limit
= -1;
4969 if (c
->argc
!= 4 && c
->argc
!= 7) {
4971 sdsnew("-ERR wrong number of arguments for ZRANGEBYSCORE\r\n"));
4973 } else if (c
->argc
== 7 && strcasecmp(c
->argv
[4]->ptr
,"limit")) {
4974 addReply(c
,shared
.syntaxerr
);
4976 } else if (c
->argc
== 7) {
4977 offset
= atoi(c
->argv
[5]->ptr
);
4978 limit
= atoi(c
->argv
[6]->ptr
);
4979 if (offset
< 0) offset
= 0;
4982 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4984 addReply(c
,shared
.nullmultibulk
);
4986 if (o
->type
!= REDIS_ZSET
) {
4987 addReply(c
,shared
.wrongtypeerr
);
4989 zset
*zsetobj
= o
->ptr
;
4990 zskiplist
*zsl
= zsetobj
->zsl
;
4993 unsigned int rangelen
= 0;
4995 /* Get the first node with the score >= min */
4996 ln
= zslFirstWithScore(zsl
,min
);
4998 /* No element matching the speciifed interval */
4999 addReply(c
,shared
.emptymultibulk
);
5003 /* We don't know in advance how many matching elements there
5004 * are in the list, so we push this object that will represent
5005 * the multi-bulk length in the output buffer, and will "fix"
5007 lenobj
= createObject(REDIS_STRING
,NULL
);
5009 decrRefCount(lenobj
);
5011 while(ln
&& ln
->score
<= max
) {
5014 ln
= ln
->forward
[0];
5017 if (limit
== 0) break;
5019 addReplyBulkLen(c
,ele
);
5021 addReply(c
,shared
.crlf
);
5022 ln
= ln
->forward
[0];
5024 if (limit
> 0) limit
--;
5026 lenobj
->ptr
= sdscatprintf(sdsempty(),"*%d\r\n",rangelen
);
5031 static void zcardCommand(redisClient
*c
) {
5035 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
5037 addReply(c
,shared
.czero
);
5040 if (o
->type
!= REDIS_ZSET
) {
5041 addReply(c
,shared
.wrongtypeerr
);
5044 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",zs
->zsl
->length
));
5049 static void zscoreCommand(redisClient
*c
) {
5053 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
5055 addReply(c
,shared
.nullbulk
);
5058 if (o
->type
!= REDIS_ZSET
) {
5059 addReply(c
,shared
.wrongtypeerr
);
5064 de
= dictFind(zs
->dict
,c
->argv
[2]);
5066 addReply(c
,shared
.nullbulk
);
5068 double *score
= dictGetEntryVal(de
);
5070 addReplyDouble(c
,*score
);
5076 /* ========================= Non type-specific commands ==================== */
5078 static void flushdbCommand(redisClient
*c
) {
5079 server
.dirty
+= dictSize(c
->db
->dict
);
5080 dictEmpty(c
->db
->dict
);
5081 dictEmpty(c
->db
->expires
);
5082 addReply(c
,shared
.ok
);
5085 static void flushallCommand(redisClient
*c
) {
5086 server
.dirty
+= emptyDb();
5087 addReply(c
,shared
.ok
);
5088 rdbSave(server
.dbfilename
);
5092 static redisSortOperation
*createSortOperation(int type
, robj
*pattern
) {
5093 redisSortOperation
*so
= zmalloc(sizeof(*so
));
5095 so
->pattern
= pattern
;
5099 /* Return the value associated to the key with a name obtained
5100 * substituting the first occurence of '*' in 'pattern' with 'subst' */
5101 static robj
*lookupKeyByPattern(redisDb
*db
, robj
*pattern
, robj
*subst
) {
5105 int prefixlen
, sublen
, postfixlen
;
5106 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
5110 char buf
[REDIS_SORTKEY_MAX
+1];
5113 /* If the pattern is "#" return the substitution object itself in order
5114 * to implement the "SORT ... GET #" feature. */
5115 spat
= pattern
->ptr
;
5116 if (spat
[0] == '#' && spat
[1] == '\0') {
5120 /* The substitution object may be specially encoded. If so we create
5121 * a decoded object on the fly. Otherwise getDecodedObject will just
5122 * increment the ref count, that we'll decrement later. */
5123 subst
= getDecodedObject(subst
);
5126 if (sdslen(spat
)+sdslen(ssub
)-1 > REDIS_SORTKEY_MAX
) return NULL
;
5127 p
= strchr(spat
,'*');
5129 decrRefCount(subst
);
5134 sublen
= sdslen(ssub
);
5135 postfixlen
= sdslen(spat
)-(prefixlen
+1);
5136 memcpy(keyname
.buf
,spat
,prefixlen
);
5137 memcpy(keyname
.buf
+prefixlen
,ssub
,sublen
);
5138 memcpy(keyname
.buf
+prefixlen
+sublen
,p
+1,postfixlen
);
5139 keyname
.buf
[prefixlen
+sublen
+postfixlen
] = '\0';
5140 keyname
.len
= prefixlen
+sublen
+postfixlen
;
5142 initStaticStringObject(keyobj
,((char*)&keyname
)+(sizeof(long)*2))
5143 decrRefCount(subst
);
5145 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
5146 return lookupKeyRead(db
,&keyobj
);
5149 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
5150 * the additional parameter is not standard but a BSD-specific we have to
5151 * pass sorting parameters via the global 'server' structure */
5152 static int sortCompare(const void *s1
, const void *s2
) {
5153 const redisSortObject
*so1
= s1
, *so2
= s2
;
5156 if (!server
.sort_alpha
) {
5157 /* Numeric sorting. Here it's trivial as we precomputed scores */
5158 if (so1
->u
.score
> so2
->u
.score
) {
5160 } else if (so1
->u
.score
< so2
->u
.score
) {
5166 /* Alphanumeric sorting */
5167 if (server
.sort_bypattern
) {
5168 if (!so1
->u
.cmpobj
|| !so2
->u
.cmpobj
) {
5169 /* At least one compare object is NULL */
5170 if (so1
->u
.cmpobj
== so2
->u
.cmpobj
)
5172 else if (so1
->u
.cmpobj
== NULL
)
5177 /* We have both the objects, use strcoll */
5178 cmp
= strcoll(so1
->u
.cmpobj
->ptr
,so2
->u
.cmpobj
->ptr
);
5181 /* Compare elements directly */
5184 dec1
= getDecodedObject(so1
->obj
);
5185 dec2
= getDecodedObject(so2
->obj
);
5186 cmp
= strcoll(dec1
->ptr
,dec2
->ptr
);
5191 return server
.sort_desc
? -cmp
: cmp
;
5194 /* The SORT command is the most complex command in Redis. Warning: this code
5195 * is optimized for speed and a bit less for readability */
5196 static void sortCommand(redisClient
*c
) {
5199 int desc
= 0, alpha
= 0;
5200 int limit_start
= 0, limit_count
= -1, start
, end
;
5201 int j
, dontsort
= 0, vectorlen
;
5202 int getop
= 0; /* GET operation counter */
5203 robj
*sortval
, *sortby
= NULL
, *storekey
= NULL
;
5204 redisSortObject
*vector
; /* Resulting vector to sort */
5206 /* Lookup the key to sort. It must be of the right types */
5207 sortval
= lookupKeyRead(c
->db
,c
->argv
[1]);
5208 if (sortval
== NULL
) {
5209 addReply(c
,shared
.nullmultibulk
);
5212 if (sortval
->type
!= REDIS_SET
&& sortval
->type
!= REDIS_LIST
&&
5213 sortval
->type
!= REDIS_ZSET
)
5215 addReply(c
,shared
.wrongtypeerr
);
5219 /* Create a list of operations to perform for every sorted element.
5220 * Operations can be GET/DEL/INCR/DECR */
5221 operations
= listCreate();
5222 listSetFreeMethod(operations
,zfree
);
5225 /* Now we need to protect sortval incrementing its count, in the future
5226 * SORT may have options able to overwrite/delete keys during the sorting
5227 * and the sorted key itself may get destroied */
5228 incrRefCount(sortval
);
5230 /* The SORT command has an SQL-alike syntax, parse it */
5231 while(j
< c
->argc
) {
5232 int leftargs
= c
->argc
-j
-1;
5233 if (!strcasecmp(c
->argv
[j
]->ptr
,"asc")) {
5235 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"desc")) {
5237 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"alpha")) {
5239 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"limit") && leftargs
>= 2) {
5240 limit_start
= atoi(c
->argv
[j
+1]->ptr
);
5241 limit_count
= atoi(c
->argv
[j
+2]->ptr
);
5243 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"store") && leftargs
>= 1) {
5244 storekey
= c
->argv
[j
+1];
5246 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"by") && leftargs
>= 1) {
5247 sortby
= c
->argv
[j
+1];
5248 /* If the BY pattern does not contain '*', i.e. it is constant,
5249 * we don't need to sort nor to lookup the weight keys. */
5250 if (strchr(c
->argv
[j
+1]->ptr
,'*') == NULL
) dontsort
= 1;
5252 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"get") && leftargs
>= 1) {
5253 listAddNodeTail(operations
,createSortOperation(
5254 REDIS_SORT_GET
,c
->argv
[j
+1]));
5258 decrRefCount(sortval
);
5259 listRelease(operations
);
5260 addReply(c
,shared
.syntaxerr
);
5266 /* Load the sorting vector with all the objects to sort */
5267 switch(sortval
->type
) {
5268 case REDIS_LIST
: vectorlen
= listLength((list
*)sortval
->ptr
); break;
5269 case REDIS_SET
: vectorlen
= dictSize((dict
*)sortval
->ptr
); break;
5270 case REDIS_ZSET
: vectorlen
= dictSize(((zset
*)sortval
->ptr
)->dict
); break;
5271 default: vectorlen
= 0; redisAssert(0); /* Avoid GCC warning */
5273 vector
= zmalloc(sizeof(redisSortObject
)*vectorlen
);
5276 if (sortval
->type
== REDIS_LIST
) {
5277 list
*list
= sortval
->ptr
;
5281 while((ln
= listYield(list
))) {
5282 robj
*ele
= ln
->value
;
5283 vector
[j
].obj
= ele
;
5284 vector
[j
].u
.score
= 0;
5285 vector
[j
].u
.cmpobj
= NULL
;
5293 if (sortval
->type
== REDIS_SET
) {
5296 zset
*zs
= sortval
->ptr
;
5300 di
= dictGetIterator(set
);
5301 while((setele
= dictNext(di
)) != NULL
) {
5302 vector
[j
].obj
= dictGetEntryKey(setele
);
5303 vector
[j
].u
.score
= 0;
5304 vector
[j
].u
.cmpobj
= NULL
;
5307 dictReleaseIterator(di
);
5309 redisAssert(j
== vectorlen
);
5311 /* Now it's time to load the right scores in the sorting vector */
5312 if (dontsort
== 0) {
5313 for (j
= 0; j
< vectorlen
; j
++) {
5317 byval
= lookupKeyByPattern(c
->db
,sortby
,vector
[j
].obj
);
5318 if (!byval
|| byval
->type
!= REDIS_STRING
) continue;
5320 vector
[j
].u
.cmpobj
= getDecodedObject(byval
);
5322 if (byval
->encoding
== REDIS_ENCODING_RAW
) {
5323 vector
[j
].u
.score
= strtod(byval
->ptr
,NULL
);
5325 /* Don't need to decode the object if it's
5326 * integer-encoded (the only encoding supported) so
5327 * far. We can just cast it */
5328 if (byval
->encoding
== REDIS_ENCODING_INT
) {
5329 vector
[j
].u
.score
= (long)byval
->ptr
;
5331 redisAssert(1 != 1);
5336 if (vector
[j
].obj
->encoding
== REDIS_ENCODING_RAW
)
5337 vector
[j
].u
.score
= strtod(vector
[j
].obj
->ptr
,NULL
);
5339 if (vector
[j
].obj
->encoding
== REDIS_ENCODING_INT
)
5340 vector
[j
].u
.score
= (long) vector
[j
].obj
->ptr
;
5342 redisAssert(1 != 1);
5349 /* We are ready to sort the vector... perform a bit of sanity check
5350 * on the LIMIT option too. We'll use a partial version of quicksort. */
5351 start
= (limit_start
< 0) ? 0 : limit_start
;
5352 end
= (limit_count
< 0) ? vectorlen
-1 : start
+limit_count
-1;
5353 if (start
>= vectorlen
) {
5354 start
= vectorlen
-1;
5357 if (end
>= vectorlen
) end
= vectorlen
-1;
5359 if (dontsort
== 0) {
5360 server
.sort_desc
= desc
;
5361 server
.sort_alpha
= alpha
;
5362 server
.sort_bypattern
= sortby
? 1 : 0;
5363 if (sortby
&& (start
!= 0 || end
!= vectorlen
-1))
5364 pqsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
, start
,end
);
5366 qsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
);
5369 /* Send command output to the output buffer, performing the specified
5370 * GET/DEL/INCR/DECR operations if any. */
5371 outputlen
= getop
? getop
*(end
-start
+1) : end
-start
+1;
5372 if (storekey
== NULL
) {
5373 /* STORE option not specified, sent the sorting result to client */
5374 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",outputlen
));
5375 for (j
= start
; j
<= end
; j
++) {
5378 addReplyBulkLen(c
,vector
[j
].obj
);
5379 addReply(c
,vector
[j
].obj
);
5380 addReply(c
,shared
.crlf
);
5382 listRewind(operations
);
5383 while((ln
= listYield(operations
))) {
5384 redisSortOperation
*sop
= ln
->value
;
5385 robj
*val
= lookupKeyByPattern(c
->db
,sop
->pattern
,
5388 if (sop
->type
== REDIS_SORT_GET
) {
5389 if (!val
|| val
->type
!= REDIS_STRING
) {
5390 addReply(c
,shared
.nullbulk
);
5392 addReplyBulkLen(c
,val
);
5394 addReply(c
,shared
.crlf
);
5397 redisAssert(sop
->type
== REDIS_SORT_GET
); /* always fails */
5402 robj
*listObject
= createListObject();
5403 list
*listPtr
= (list
*) listObject
->ptr
;
5405 /* STORE option specified, set the sorting result as a List object */
5406 for (j
= start
; j
<= end
; j
++) {
5409 listAddNodeTail(listPtr
,vector
[j
].obj
);
5410 incrRefCount(vector
[j
].obj
);
5412 listRewind(operations
);
5413 while((ln
= listYield(operations
))) {
5414 redisSortOperation
*sop
= ln
->value
;
5415 robj
*val
= lookupKeyByPattern(c
->db
,sop
->pattern
,
5418 if (sop
->type
== REDIS_SORT_GET
) {
5419 if (!val
|| val
->type
!= REDIS_STRING
) {
5420 listAddNodeTail(listPtr
,createStringObject("",0));
5422 listAddNodeTail(listPtr
,val
);
5426 redisAssert(sop
->type
== REDIS_SORT_GET
); /* always fails */
5430 if (dictReplace(c
->db
->dict
,storekey
,listObject
)) {
5431 incrRefCount(storekey
);
5433 /* Note: we add 1 because the DB is dirty anyway since even if the
5434 * SORT result is empty a new key is set and maybe the old content
5436 server
.dirty
+= 1+outputlen
;
5437 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",outputlen
));
5441 decrRefCount(sortval
);
5442 listRelease(operations
);
5443 for (j
= 0; j
< vectorlen
; j
++) {
5444 if (sortby
&& alpha
&& vector
[j
].u
.cmpobj
)
5445 decrRefCount(vector
[j
].u
.cmpobj
);
5450 /* Convert an amount of bytes into a human readable string in the form
5451 * of 100B, 2G, 100M, 4K, and so forth. */
5452 static void bytesToHuman(char *s
, unsigned long long n
) {
5457 sprintf(s
,"%lluB",n
);
5459 } else if (n
< (1024*1024)) {
5460 d
= (double)n
/(1024);
5461 sprintf(s
,"%.2fK",d
);
5462 } else if (n
< (1024LL*1024*1024)) {
5463 d
= (double)n
/(1024*1024);
5464 sprintf(s
,"%.2fM",d
);
5465 } else if (n
< (1024LL*1024*1024*1024)) {
5466 d
= (double)n
/(1024LL*1024*1024);
5467 sprintf(s
,"%.2fM",d
);
5471 /* Create the string returned by the INFO command. This is decoupled
5472 * by the INFO command itself as we need to report the same information
5473 * on memory corruption problems. */
5474 static sds
genRedisInfoString(void) {
5476 time_t uptime
= time(NULL
)-server
.stat_starttime
;
5480 bytesToHuman(hmem
,server
.usedmemory
);
5481 info
= sdscatprintf(sdsempty(),
5482 "redis_version:%s\r\n"
5484 "multiplexing_api:%s\r\n"
5485 "process_id:%ld\r\n"
5486 "uptime_in_seconds:%ld\r\n"
5487 "uptime_in_days:%ld\r\n"
5488 "connected_clients:%d\r\n"
5489 "connected_slaves:%d\r\n"
5490 "blocked_clients:%d\r\n"
5491 "used_memory:%zu\r\n"
5492 "used_memory_human:%s\r\n"
5493 "changes_since_last_save:%lld\r\n"
5494 "bgsave_in_progress:%d\r\n"
5495 "last_save_time:%ld\r\n"
5496 "bgrewriteaof_in_progress:%d\r\n"
5497 "total_connections_received:%lld\r\n"
5498 "total_commands_processed:%lld\r\n"
5502 (sizeof(long) == 8) ? "64" : "32",
5507 listLength(server
.clients
)-listLength(server
.slaves
),
5508 listLength(server
.slaves
),
5509 server
.blockedclients
,
5513 server
.bgsavechildpid
!= -1,
5515 server
.bgrewritechildpid
!= -1,
5516 server
.stat_numconnections
,
5517 server
.stat_numcommands
,
5518 server
.vm_enabled
!= 0,
5519 server
.masterhost
== NULL
? "master" : "slave"
5521 if (server
.masterhost
) {
5522 info
= sdscatprintf(info
,
5523 "master_host:%s\r\n"
5524 "master_port:%d\r\n"
5525 "master_link_status:%s\r\n"
5526 "master_last_io_seconds_ago:%d\r\n"
5529 (server
.replstate
== REDIS_REPL_CONNECTED
) ?
5531 server
.master
? ((int)(time(NULL
)-server
.master
->lastinteraction
)) : -1
5534 if (server
.vm_enabled
) {
5535 info
= sdscatprintf(info
,
5536 "vm_conf_max_memory:%llu\r\n"
5537 "vm_conf_page_size:%llu\r\n"
5538 "vm_conf_pages:%llu\r\n"
5539 "vm_stats_used_pages:%llu\r\n"
5540 "vm_stats_swapped_objects:%llu\r\n"
5541 "vm_stats_swappin_count:%llu\r\n"
5542 "vm_stats_swappout_count:%llu\r\n"
5543 ,(unsigned long long) server
.vm_max_memory
,
5544 (unsigned long long) server
.vm_page_size
,
5545 (unsigned long long) server
.vm_pages
,
5546 (unsigned long long) server
.vm_stats_used_pages
,
5547 (unsigned long long) server
.vm_stats_swapped_objects
,
5548 (unsigned long long) server
.vm_stats_swapins
,
5549 (unsigned long long) server
.vm_stats_swapouts
5552 for (j
= 0; j
< server
.dbnum
; j
++) {
5553 long long keys
, vkeys
;
5555 keys
= dictSize(server
.db
[j
].dict
);
5556 vkeys
= dictSize(server
.db
[j
].expires
);
5557 if (keys
|| vkeys
) {
5558 info
= sdscatprintf(info
, "db%d:keys=%lld,expires=%lld\r\n",
5565 static void infoCommand(redisClient
*c
) {
5566 sds info
= genRedisInfoString();
5567 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n",
5568 (unsigned long)sdslen(info
)));
5569 addReplySds(c
,info
);
5570 addReply(c
,shared
.crlf
);
5573 static void monitorCommand(redisClient
*c
) {
5574 /* ignore MONITOR if aleady slave or in monitor mode */
5575 if (c
->flags
& REDIS_SLAVE
) return;
5577 c
->flags
|= (REDIS_SLAVE
|REDIS_MONITOR
);
5579 listAddNodeTail(server
.monitors
,c
);
5580 addReply(c
,shared
.ok
);
5583 /* ================================= Expire ================================= */
5584 static int removeExpire(redisDb
*db
, robj
*key
) {
5585 if (dictDelete(db
->expires
,key
) == DICT_OK
) {
5592 static int setExpire(redisDb
*db
, robj
*key
, time_t when
) {
5593 if (dictAdd(db
->expires
,key
,(void*)when
) == DICT_ERR
) {
5601 /* Return the expire time of the specified key, or -1 if no expire
5602 * is associated with this key (i.e. the key is non volatile) */
5603 static time_t getExpire(redisDb
*db
, robj
*key
) {
5606 /* No expire? return ASAP */
5607 if (dictSize(db
->expires
) == 0 ||
5608 (de
= dictFind(db
->expires
,key
)) == NULL
) return -1;
5610 return (time_t) dictGetEntryVal(de
);
5613 static int expireIfNeeded(redisDb
*db
, robj
*key
) {
5617 /* No expire? return ASAP */
5618 if (dictSize(db
->expires
) == 0 ||
5619 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
5621 /* Lookup the expire */
5622 when
= (time_t) dictGetEntryVal(de
);
5623 if (time(NULL
) <= when
) return 0;
5625 /* Delete the key */
5626 dictDelete(db
->expires
,key
);
5627 return dictDelete(db
->dict
,key
) == DICT_OK
;
5630 static int deleteIfVolatile(redisDb
*db
, robj
*key
) {
5633 /* No expire? return ASAP */
5634 if (dictSize(db
->expires
) == 0 ||
5635 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
5637 /* Delete the key */
5639 dictDelete(db
->expires
,key
);
5640 return dictDelete(db
->dict
,key
) == DICT_OK
;
5643 static void expireGenericCommand(redisClient
*c
, robj
*key
, time_t seconds
) {
5646 de
= dictFind(c
->db
->dict
,key
);
5648 addReply(c
,shared
.czero
);
5652 if (deleteKey(c
->db
,key
)) server
.dirty
++;
5653 addReply(c
, shared
.cone
);
5656 time_t when
= time(NULL
)+seconds
;
5657 if (setExpire(c
->db
,key
,when
)) {
5658 addReply(c
,shared
.cone
);
5661 addReply(c
,shared
.czero
);
5667 static void expireCommand(redisClient
*c
) {
5668 expireGenericCommand(c
,c
->argv
[1],strtol(c
->argv
[2]->ptr
,NULL
,10));
5671 static void expireatCommand(redisClient
*c
) {
5672 expireGenericCommand(c
,c
->argv
[1],strtol(c
->argv
[2]->ptr
,NULL
,10)-time(NULL
));
5675 static void ttlCommand(redisClient
*c
) {
5679 expire
= getExpire(c
->db
,c
->argv
[1]);
5681 ttl
= (int) (expire
-time(NULL
));
5682 if (ttl
< 0) ttl
= -1;
5684 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",ttl
));
5687 /* ================================ MULTI/EXEC ============================== */
5689 /* Client state initialization for MULTI/EXEC */
5690 static void initClientMultiState(redisClient
*c
) {
5691 c
->mstate
.commands
= NULL
;
5692 c
->mstate
.count
= 0;
5695 /* Release all the resources associated with MULTI/EXEC state */
5696 static void freeClientMultiState(redisClient
*c
) {
5699 for (j
= 0; j
< c
->mstate
.count
; j
++) {
5701 multiCmd
*mc
= c
->mstate
.commands
+j
;
5703 for (i
= 0; i
< mc
->argc
; i
++)
5704 decrRefCount(mc
->argv
[i
]);
5707 zfree(c
->mstate
.commands
);
5710 /* Add a new command into the MULTI commands queue */
5711 static void queueMultiCommand(redisClient
*c
, struct redisCommand
*cmd
) {
5715 c
->mstate
.commands
= zrealloc(c
->mstate
.commands
,
5716 sizeof(multiCmd
)*(c
->mstate
.count
+1));
5717 mc
= c
->mstate
.commands
+c
->mstate
.count
;
5720 mc
->argv
= zmalloc(sizeof(robj
*)*c
->argc
);
5721 memcpy(mc
->argv
,c
->argv
,sizeof(robj
*)*c
->argc
);
5722 for (j
= 0; j
< c
->argc
; j
++)
5723 incrRefCount(mc
->argv
[j
]);
5727 static void multiCommand(redisClient
*c
) {
5728 c
->flags
|= REDIS_MULTI
;
5729 addReply(c
,shared
.ok
);
5732 static void execCommand(redisClient
*c
) {
5737 if (!(c
->flags
& REDIS_MULTI
)) {
5738 addReplySds(c
,sdsnew("-ERR EXEC without MULTI\r\n"));
5742 orig_argv
= c
->argv
;
5743 orig_argc
= c
->argc
;
5744 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",c
->mstate
.count
));
5745 for (j
= 0; j
< c
->mstate
.count
; j
++) {
5746 c
->argc
= c
->mstate
.commands
[j
].argc
;
5747 c
->argv
= c
->mstate
.commands
[j
].argv
;
5748 call(c
,c
->mstate
.commands
[j
].cmd
);
5750 c
->argv
= orig_argv
;
5751 c
->argc
= orig_argc
;
5752 freeClientMultiState(c
);
5753 initClientMultiState(c
);
5754 c
->flags
&= (~REDIS_MULTI
);
5757 /* =========================== Blocking Operations ========================= */
5759 /* Currently Redis blocking operations support is limited to list POP ops,
5760 * so the current implementation is not fully generic, but it is also not
5761 * completely specific so it will not require a rewrite to support new
5762 * kind of blocking operations in the future.
5764 * Still it's important to note that list blocking operations can be already
5765 * used as a notification mechanism in order to implement other blocking
5766 * operations at application level, so there must be a very strong evidence
5767 * of usefulness and generality before new blocking operations are implemented.
5769 * This is how the current blocking POP works, we use BLPOP as example:
5770 * - If the user calls BLPOP and the key exists and contains a non empty list
5771 * then LPOP is called instead. So BLPOP is semantically the same as LPOP
5772 * if there is not to block.
5773 * - If instead BLPOP is called and the key does not exists or the list is
5774 * empty we need to block. In order to do so we remove the notification for
5775 * new data to read in the client socket (so that we'll not serve new
5776 * requests if the blocking request is not served). Also we put the client
5777 * in a dictionary (db->blockingkeys) mapping keys to a list of clients
5778 * blocking for this keys.
5779 * - If a PUSH operation against a key with blocked clients waiting is
5780 * performed, we serve the first in the list: basically instead to push
5781 * the new element inside the list we return it to the (first / oldest)
5782 * blocking client, unblock the client, and remove it form the list.
5784 * The above comment and the source code should be enough in order to understand
5785 * the implementation and modify / fix it later.
5788 /* Set a client in blocking mode for the specified key, with the specified
5790 static void blockForKeys(redisClient
*c
, robj
**keys
, int numkeys
, time_t timeout
) {
5795 c
->blockingkeys
= zmalloc(sizeof(robj
*)*numkeys
);
5796 c
->blockingkeysnum
= numkeys
;
5797 c
->blockingto
= timeout
;
5798 for (j
= 0; j
< numkeys
; j
++) {
5799 /* Add the key in the client structure, to map clients -> keys */
5800 c
->blockingkeys
[j
] = keys
[j
];
5801 incrRefCount(keys
[j
]);
5803 /* And in the other "side", to map keys -> clients */
5804 de
= dictFind(c
->db
->blockingkeys
,keys
[j
]);
5808 /* For every key we take a list of clients blocked for it */
5810 retval
= dictAdd(c
->db
->blockingkeys
,keys
[j
],l
);
5811 incrRefCount(keys
[j
]);
5812 assert(retval
== DICT_OK
);
5814 l
= dictGetEntryVal(de
);
5816 listAddNodeTail(l
,c
);
5818 /* Mark the client as a blocked client */
5819 c
->flags
|= REDIS_BLOCKED
;
5820 aeDeleteFileEvent(server
.el
,c
->fd
,AE_READABLE
);
5821 server
.blockedclients
++;
5824 /* Unblock a client that's waiting in a blocking operation such as BLPOP */
5825 static void unblockClient(redisClient
*c
) {
5830 assert(c
->blockingkeys
!= NULL
);
5831 /* The client may wait for multiple keys, so unblock it for every key. */
5832 for (j
= 0; j
< c
->blockingkeysnum
; j
++) {
5833 /* Remove this client from the list of clients waiting for this key. */
5834 de
= dictFind(c
->db
->blockingkeys
,c
->blockingkeys
[j
]);
5836 l
= dictGetEntryVal(de
);
5837 listDelNode(l
,listSearchKey(l
,c
));
5838 /* If the list is empty we need to remove it to avoid wasting memory */
5839 if (listLength(l
) == 0)
5840 dictDelete(c
->db
->blockingkeys
,c
->blockingkeys
[j
]);
5841 decrRefCount(c
->blockingkeys
[j
]);
5843 /* Cleanup the client structure */
5844 zfree(c
->blockingkeys
);
5845 c
->blockingkeys
= NULL
;
5846 c
->flags
&= (~REDIS_BLOCKED
);
5847 server
.blockedclients
--;
5848 /* Ok now we are ready to get read events from socket, note that we
5849 * can't trap errors here as it's possible that unblockClients() is
5850 * called from freeClient() itself, and the only thing we can do
5851 * if we failed to register the READABLE event is to kill the client.
5852 * Still the following function should never fail in the real world as
5853 * we are sure the file descriptor is sane, and we exit on out of mem. */
5854 aeCreateFileEvent(server
.el
, c
->fd
, AE_READABLE
, readQueryFromClient
, c
);
5855 /* As a final step we want to process data if there is some command waiting
5856 * in the input buffer. Note that this is safe even if unblockClient()
5857 * gets called from freeClient() because freeClient() will be smart
5858 * enough to call this function *after* c->querybuf was set to NULL. */
5859 if (c
->querybuf
&& sdslen(c
->querybuf
) > 0) processInputBuffer(c
);
5862 /* This should be called from any function PUSHing into lists.
5863 * 'c' is the "pushing client", 'key' is the key it is pushing data against,
5864 * 'ele' is the element pushed.
5866 * If the function returns 0 there was no client waiting for a list push
5869 * If the function returns 1 there was a client waiting for a list push
5870 * against this key, the element was passed to this client thus it's not
5871 * needed to actually add it to the list and the caller should return asap. */
5872 static int handleClientsWaitingListPush(redisClient
*c
, robj
*key
, robj
*ele
) {
5873 struct dictEntry
*de
;
5874 redisClient
*receiver
;
5878 de
= dictFind(c
->db
->blockingkeys
,key
);
5879 if (de
== NULL
) return 0;
5880 l
= dictGetEntryVal(de
);
5883 receiver
= ln
->value
;
5885 addReplySds(receiver
,sdsnew("*2\r\n"));
5886 addReplyBulkLen(receiver
,key
);
5887 addReply(receiver
,key
);
5888 addReply(receiver
,shared
.crlf
);
5889 addReplyBulkLen(receiver
,ele
);
5890 addReply(receiver
,ele
);
5891 addReply(receiver
,shared
.crlf
);
5892 unblockClient(receiver
);
5896 /* Blocking RPOP/LPOP */
5897 static void blockingPopGenericCommand(redisClient
*c
, int where
) {
5902 for (j
= 1; j
< c
->argc
-1; j
++) {
5903 o
= lookupKeyWrite(c
->db
,c
->argv
[j
]);
5905 if (o
->type
!= REDIS_LIST
) {
5906 addReply(c
,shared
.wrongtypeerr
);
5909 list
*list
= o
->ptr
;
5910 if (listLength(list
) != 0) {
5911 /* If the list contains elements fall back to the usual
5912 * non-blocking POP operation */
5913 robj
*argv
[2], **orig_argv
;
5916 /* We need to alter the command arguments before to call
5917 * popGenericCommand() as the command takes a single key. */
5918 orig_argv
= c
->argv
;
5919 orig_argc
= c
->argc
;
5920 argv
[1] = c
->argv
[j
];
5924 /* Also the return value is different, we need to output
5925 * the multi bulk reply header and the key name. The
5926 * "real" command will add the last element (the value)
5927 * for us. If this souds like an hack to you it's just
5928 * because it is... */
5929 addReplySds(c
,sdsnew("*2\r\n"));
5930 addReplyBulkLen(c
,argv
[1]);
5931 addReply(c
,argv
[1]);
5932 addReply(c
,shared
.crlf
);
5933 popGenericCommand(c
,where
);
5935 /* Fix the client structure with the original stuff */
5936 c
->argv
= orig_argv
;
5937 c
->argc
= orig_argc
;
5943 /* If the list is empty or the key does not exists we must block */
5944 timeout
= strtol(c
->argv
[c
->argc
-1]->ptr
,NULL
,10);
5945 if (timeout
> 0) timeout
+= time(NULL
);
5946 blockForKeys(c
,c
->argv
+1,c
->argc
-2,timeout
);
5949 static void blpopCommand(redisClient
*c
) {
5950 blockingPopGenericCommand(c
,REDIS_HEAD
);
5953 static void brpopCommand(redisClient
*c
) {
5954 blockingPopGenericCommand(c
,REDIS_TAIL
);
5957 /* =============================== Replication ============================= */
5959 static int syncWrite(int fd
, char *ptr
, ssize_t size
, int timeout
) {
5960 ssize_t nwritten
, ret
= size
;
5961 time_t start
= time(NULL
);
5965 if (aeWait(fd
,AE_WRITABLE
,1000) & AE_WRITABLE
) {
5966 nwritten
= write(fd
,ptr
,size
);
5967 if (nwritten
== -1) return -1;
5971 if ((time(NULL
)-start
) > timeout
) {
5979 static int syncRead(int fd
, char *ptr
, ssize_t size
, int timeout
) {
5980 ssize_t nread
, totread
= 0;
5981 time_t start
= time(NULL
);
5985 if (aeWait(fd
,AE_READABLE
,1000) & AE_READABLE
) {
5986 nread
= read(fd
,ptr
,size
);
5987 if (nread
== -1) return -1;
5992 if ((time(NULL
)-start
) > timeout
) {
6000 static int syncReadLine(int fd
, char *ptr
, ssize_t size
, int timeout
) {
6007 if (syncRead(fd
,&c
,1,timeout
) == -1) return -1;
6010 if (nread
&& *(ptr
-1) == '\r') *(ptr
-1) = '\0';
6021 static void syncCommand(redisClient
*c
) {
6022 /* ignore SYNC if aleady slave or in monitor mode */
6023 if (c
->flags
& REDIS_SLAVE
) return;
6025 /* SYNC can't be issued when the server has pending data to send to
6026 * the client about already issued commands. We need a fresh reply
6027 * buffer registering the differences between the BGSAVE and the current
6028 * dataset, so that we can copy to other slaves if needed. */
6029 if (listLength(c
->reply
) != 0) {
6030 addReplySds(c
,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
6034 redisLog(REDIS_NOTICE
,"Slave ask for synchronization");
6035 /* Here we need to check if there is a background saving operation
6036 * in progress, or if it is required to start one */
6037 if (server
.bgsavechildpid
!= -1) {
6038 /* Ok a background save is in progress. Let's check if it is a good
6039 * one for replication, i.e. if there is another slave that is
6040 * registering differences since the server forked to save */
6044 listRewind(server
.slaves
);
6045 while((ln
= listYield(server
.slaves
))) {
6047 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) break;
6050 /* Perfect, the server is already registering differences for
6051 * another slave. Set the right state, and copy the buffer. */
6052 listRelease(c
->reply
);
6053 c
->reply
= listDup(slave
->reply
);
6054 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
6055 redisLog(REDIS_NOTICE
,"Waiting for end of BGSAVE for SYNC");
6057 /* No way, we need to wait for the next BGSAVE in order to
6058 * register differences */
6059 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_START
;
6060 redisLog(REDIS_NOTICE
,"Waiting for next BGSAVE for SYNC");
6063 /* Ok we don't have a BGSAVE in progress, let's start one */
6064 redisLog(REDIS_NOTICE
,"Starting BGSAVE for SYNC");
6065 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
6066 redisLog(REDIS_NOTICE
,"Replication failed, can't BGSAVE");
6067 addReplySds(c
,sdsnew("-ERR Unalbe to perform background save\r\n"));
6070 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
6073 c
->flags
|= REDIS_SLAVE
;
6075 listAddNodeTail(server
.slaves
,c
);
6079 static void sendBulkToSlave(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
6080 redisClient
*slave
= privdata
;
6082 REDIS_NOTUSED(mask
);
6083 char buf
[REDIS_IOBUF_LEN
];
6084 ssize_t nwritten
, buflen
;
6086 if (slave
->repldboff
== 0) {
6087 /* Write the bulk write count before to transfer the DB. In theory here
6088 * we don't know how much room there is in the output buffer of the
6089 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
6090 * operations) will never be smaller than the few bytes we need. */
6093 bulkcount
= sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
6095 if (write(fd
,bulkcount
,sdslen(bulkcount
)) != (signed)sdslen(bulkcount
))
6103 lseek(slave
->repldbfd
,slave
->repldboff
,SEEK_SET
);
6104 buflen
= read(slave
->repldbfd
,buf
,REDIS_IOBUF_LEN
);
6106 redisLog(REDIS_WARNING
,"Read error sending DB to slave: %s",
6107 (buflen
== 0) ? "premature EOF" : strerror(errno
));
6111 if ((nwritten
= write(fd
,buf
,buflen
)) == -1) {
6112 redisLog(REDIS_VERBOSE
,"Write error sending DB to slave: %s",
6117 slave
->repldboff
+= nwritten
;
6118 if (slave
->repldboff
== slave
->repldbsize
) {
6119 close(slave
->repldbfd
);
6120 slave
->repldbfd
= -1;
6121 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
6122 slave
->replstate
= REDIS_REPL_ONLINE
;
6123 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
,
6124 sendReplyToClient
, slave
) == AE_ERR
) {
6128 addReplySds(slave
,sdsempty());
6129 redisLog(REDIS_NOTICE
,"Synchronization with slave succeeded");
6133 /* This function is called at the end of every backgrond saving.
6134 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
6135 * otherwise REDIS_ERR is passed to the function.
6137 * The goal of this function is to handle slaves waiting for a successful
6138 * background saving in order to perform non-blocking synchronization. */
6139 static void updateSlavesWaitingBgsave(int bgsaveerr
) {
6141 int startbgsave
= 0;
6143 listRewind(server
.slaves
);
6144 while((ln
= listYield(server
.slaves
))) {
6145 redisClient
*slave
= ln
->value
;
6147 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) {
6149 slave
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
6150 } else if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) {
6151 struct redis_stat buf
;
6153 if (bgsaveerr
!= REDIS_OK
) {
6155 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE child returned an error");
6158 if ((slave
->repldbfd
= open(server
.dbfilename
,O_RDONLY
)) == -1 ||
6159 redis_fstat(slave
->repldbfd
,&buf
) == -1) {
6161 redisLog(REDIS_WARNING
,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno
));
6164 slave
->repldboff
= 0;
6165 slave
->repldbsize
= buf
.st_size
;
6166 slave
->replstate
= REDIS_REPL_SEND_BULK
;
6167 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
6168 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
, sendBulkToSlave
, slave
) == AE_ERR
) {
6175 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
6176 listRewind(server
.slaves
);
6177 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE failed");
6178 while((ln
= listYield(server
.slaves
))) {
6179 redisClient
*slave
= ln
->value
;
6181 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
)
6188 static int syncWithMaster(void) {
6189 char buf
[1024], tmpfile
[256], authcmd
[1024];
6191 int fd
= anetTcpConnect(NULL
,server
.masterhost
,server
.masterport
);
6195 redisLog(REDIS_WARNING
,"Unable to connect to MASTER: %s",
6200 /* AUTH with the master if required. */
6201 if(server
.masterauth
) {
6202 snprintf(authcmd
, 1024, "AUTH %s\r\n", server
.masterauth
);
6203 if (syncWrite(fd
, authcmd
, strlen(server
.masterauth
)+7, 5) == -1) {
6205 redisLog(REDIS_WARNING
,"Unable to AUTH to MASTER: %s",
6209 /* Read the AUTH result. */
6210 if (syncReadLine(fd
,buf
,1024,3600) == -1) {
6212 redisLog(REDIS_WARNING
,"I/O error reading auth result from MASTER: %s",
6216 if (buf
[0] != '+') {
6218 redisLog(REDIS_WARNING
,"Cannot AUTH to MASTER, is the masterauth password correct?");
6223 /* Issue the SYNC command */
6224 if (syncWrite(fd
,"SYNC \r\n",7,5) == -1) {
6226 redisLog(REDIS_WARNING
,"I/O error writing to MASTER: %s",
6230 /* Read the bulk write count */
6231 if (syncReadLine(fd
,buf
,1024,3600) == -1) {
6233 redisLog(REDIS_WARNING
,"I/O error reading bulk count from MASTER: %s",
6237 if (buf
[0] != '$') {
6239 redisLog(REDIS_WARNING
,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
6242 dumpsize
= atoi(buf
+1);
6243 redisLog(REDIS_NOTICE
,"Receiving %d bytes data dump from MASTER",dumpsize
);
6244 /* Read the bulk write data on a temp file */
6245 snprintf(tmpfile
,256,"temp-%d.%ld.rdb",(int)time(NULL
),(long int)random());
6246 dfd
= open(tmpfile
,O_CREAT
|O_WRONLY
,0644);
6249 redisLog(REDIS_WARNING
,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno
));
6253 int nread
, nwritten
;
6255 nread
= read(fd
,buf
,(dumpsize
< 1024)?dumpsize
:1024);
6257 redisLog(REDIS_WARNING
,"I/O error trying to sync with MASTER: %s",
6263 nwritten
= write(dfd
,buf
,nread
);
6264 if (nwritten
== -1) {
6265 redisLog(REDIS_WARNING
,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno
));
6273 if (rename(tmpfile
,server
.dbfilename
) == -1) {
6274 redisLog(REDIS_WARNING
,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno
));
6280 if (rdbLoad(server
.dbfilename
) != REDIS_OK
) {
6281 redisLog(REDIS_WARNING
,"Failed trying to load the MASTER synchronization DB from disk");
6285 server
.master
= createClient(fd
);
6286 server
.master
->flags
|= REDIS_MASTER
;
6287 server
.master
->authenticated
= 1;
6288 server
.replstate
= REDIS_REPL_CONNECTED
;
6292 static void slaveofCommand(redisClient
*c
) {
6293 if (!strcasecmp(c
->argv
[1]->ptr
,"no") &&
6294 !strcasecmp(c
->argv
[2]->ptr
,"one")) {
6295 if (server
.masterhost
) {
6296 sdsfree(server
.masterhost
);
6297 server
.masterhost
= NULL
;
6298 if (server
.master
) freeClient(server
.master
);
6299 server
.replstate
= REDIS_REPL_NONE
;
6300 redisLog(REDIS_NOTICE
,"MASTER MODE enabled (user request)");
6303 sdsfree(server
.masterhost
);
6304 server
.masterhost
= sdsdup(c
->argv
[1]->ptr
);
6305 server
.masterport
= atoi(c
->argv
[2]->ptr
);
6306 if (server
.master
) freeClient(server
.master
);
6307 server
.replstate
= REDIS_REPL_CONNECT
;
6308 redisLog(REDIS_NOTICE
,"SLAVE OF %s:%d enabled (user request)",
6309 server
.masterhost
, server
.masterport
);
6311 addReply(c
,shared
.ok
);
6314 /* ============================ Maxmemory directive ======================== */
6316 /* Free one object form the pre-allocated objects free list. This is useful
6317 * under low mem conditions as by default we take 1 million free objects
6319 static void freeOneObjectFromFreelist(void) {
6322 listNode
*head
= listFirst(server
.objfreelist
);
6323 o
= listNodeValue(head
);
6324 listDelNode(server
.objfreelist
,head
);
6328 /* This function gets called when 'maxmemory' is set on the config file to limit
6329 * the max memory used by the server, and we are out of memory.
6330 * This function will try to, in order:
6332 * - Free objects from the free list
6333 * - Try to remove keys with an EXPIRE set
6335 * It is not possible to free enough memory to reach used-memory < maxmemory
6336 * the server will start refusing commands that will enlarge even more the
6339 static void freeMemoryIfNeeded(void) {
6340 while (server
.maxmemory
&& zmalloc_used_memory() > server
.maxmemory
) {
6341 if (listLength(server
.objfreelist
)) {
6342 freeOneObjectFromFreelist();
6344 int j
, k
, freed
= 0;
6346 for (j
= 0; j
< server
.dbnum
; j
++) {
6348 robj
*minkey
= NULL
;
6349 struct dictEntry
*de
;
6351 if (dictSize(server
.db
[j
].expires
)) {
6353 /* From a sample of three keys drop the one nearest to
6354 * the natural expire */
6355 for (k
= 0; k
< 3; k
++) {
6358 de
= dictGetRandomKey(server
.db
[j
].expires
);
6359 t
= (time_t) dictGetEntryVal(de
);
6360 if (minttl
== -1 || t
< minttl
) {
6361 minkey
= dictGetEntryKey(de
);
6365 deleteKey(server
.db
+j
,minkey
);
6368 if (!freed
) return; /* nothing to free... */
6373 /* ============================== Append Only file ========================== */
6375 static void feedAppendOnlyFile(struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
) {
6376 sds buf
= sdsempty();
6382 /* The DB this command was targetting is not the same as the last command
6383 * we appendend. To issue a SELECT command is needed. */
6384 if (dictid
!= server
.appendseldb
) {
6387 snprintf(seldb
,sizeof(seldb
),"%d",dictid
);
6388 buf
= sdscatprintf(buf
,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
6389 (unsigned long)strlen(seldb
),seldb
);
6390 server
.appendseldb
= dictid
;
6393 /* "Fix" the argv vector if the command is EXPIRE. We want to translate
6394 * EXPIREs into EXPIREATs calls */
6395 if (cmd
->proc
== expireCommand
) {
6398 tmpargv
[0] = createStringObject("EXPIREAT",8);
6399 tmpargv
[1] = argv
[1];
6400 incrRefCount(argv
[1]);
6401 when
= time(NULL
)+strtol(argv
[2]->ptr
,NULL
,10);
6402 tmpargv
[2] = createObject(REDIS_STRING
,
6403 sdscatprintf(sdsempty(),"%ld",when
));
6407 /* Append the actual command */
6408 buf
= sdscatprintf(buf
,"*%d\r\n",argc
);
6409 for (j
= 0; j
< argc
; j
++) {
6412 o
= getDecodedObject(o
);
6413 buf
= sdscatprintf(buf
,"$%lu\r\n",(unsigned long)sdslen(o
->ptr
));
6414 buf
= sdscatlen(buf
,o
->ptr
,sdslen(o
->ptr
));
6415 buf
= sdscatlen(buf
,"\r\n",2);
6419 /* Free the objects from the modified argv for EXPIREAT */
6420 if (cmd
->proc
== expireCommand
) {
6421 for (j
= 0; j
< 3; j
++)
6422 decrRefCount(argv
[j
]);
6425 /* We want to perform a single write. This should be guaranteed atomic
6426 * at least if the filesystem we are writing is a real physical one.
6427 * While this will save us against the server being killed I don't think
6428 * there is much to do about the whole server stopping for power problems
6430 nwritten
= write(server
.appendfd
,buf
,sdslen(buf
));
6431 if (nwritten
!= (signed)sdslen(buf
)) {
6432 /* Ooops, we are in troubles. The best thing to do for now is
6433 * to simply exit instead to give the illusion that everything is
6434 * working as expected. */
6435 if (nwritten
== -1) {
6436 redisLog(REDIS_WARNING
,"Exiting on error writing to the append-only file: %s",strerror(errno
));
6438 redisLog(REDIS_WARNING
,"Exiting on short write while writing to the append-only file: %s",strerror(errno
));
6442 /* If a background append only file rewriting is in progress we want to
6443 * accumulate the differences between the child DB and the current one
6444 * in a buffer, so that when the child process will do its work we
6445 * can append the differences to the new append only file. */
6446 if (server
.bgrewritechildpid
!= -1)
6447 server
.bgrewritebuf
= sdscatlen(server
.bgrewritebuf
,buf
,sdslen(buf
));
6451 if (server
.appendfsync
== APPENDFSYNC_ALWAYS
||
6452 (server
.appendfsync
== APPENDFSYNC_EVERYSEC
&&
6453 now
-server
.lastfsync
> 1))
6455 fsync(server
.appendfd
); /* Let's try to get this data on the disk */
6456 server
.lastfsync
= now
;
6460 /* In Redis commands are always executed in the context of a client, so in
6461 * order to load the append only file we need to create a fake client. */
6462 static struct redisClient
*createFakeClient(void) {
6463 struct redisClient
*c
= zmalloc(sizeof(*c
));
6467 c
->querybuf
= sdsempty();
6471 /* We set the fake client as a slave waiting for the synchronization
6472 * so that Redis will not try to send replies to this client. */
6473 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_START
;
6474 c
->reply
= listCreate();
6475 listSetFreeMethod(c
->reply
,decrRefCount
);
6476 listSetDupMethod(c
->reply
,dupClientReplyValue
);
6480 static void freeFakeClient(struct redisClient
*c
) {
6481 sdsfree(c
->querybuf
);
6482 listRelease(c
->reply
);
6486 /* Replay the append log file. On error REDIS_OK is returned. On non fatal
6487 * error (the append only file is zero-length) REDIS_ERR is returned. On
6488 * fatal error an error message is logged and the program exists. */
6489 int loadAppendOnlyFile(char *filename
) {
6490 struct redisClient
*fakeClient
;
6491 FILE *fp
= fopen(filename
,"r");
6492 struct redis_stat sb
;
6493 unsigned long long loadedkeys
= 0;
6495 if (redis_fstat(fileno(fp
),&sb
) != -1 && sb
.st_size
== 0)
6499 redisLog(REDIS_WARNING
,"Fatal error: can't open the append log file for reading: %s",strerror(errno
));
6503 fakeClient
= createFakeClient();
6510 struct redisCommand
*cmd
;
6512 if (fgets(buf
,sizeof(buf
),fp
) == NULL
) {
6518 if (buf
[0] != '*') goto fmterr
;
6520 argv
= zmalloc(sizeof(robj
*)*argc
);
6521 for (j
= 0; j
< argc
; j
++) {
6522 if (fgets(buf
,sizeof(buf
),fp
) == NULL
) goto readerr
;
6523 if (buf
[0] != '$') goto fmterr
;
6524 len
= strtol(buf
+1,NULL
,10);
6525 argsds
= sdsnewlen(NULL
,len
);
6526 if (len
&& fread(argsds
,len
,1,fp
) == 0) goto fmterr
;
6527 argv
[j
] = createObject(REDIS_STRING
,argsds
);
6528 if (fread(buf
,2,1,fp
) == 0) goto fmterr
; /* discard CRLF */
6531 /* Command lookup */
6532 cmd
= lookupCommand(argv
[0]->ptr
);
6534 redisLog(REDIS_WARNING
,"Unknown command '%s' reading the append only file", argv
[0]->ptr
);
6537 /* Try object sharing and encoding */
6538 if (server
.shareobjects
) {
6540 for(j
= 1; j
< argc
; j
++)
6541 argv
[j
] = tryObjectSharing(argv
[j
]);
6543 if (cmd
->flags
& REDIS_CMD_BULK
)
6544 tryObjectEncoding(argv
[argc
-1]);
6545 /* Run the command in the context of a fake client */
6546 fakeClient
->argc
= argc
;
6547 fakeClient
->argv
= argv
;
6548 cmd
->proc(fakeClient
);
6549 /* Discard the reply objects list from the fake client */
6550 while(listLength(fakeClient
->reply
))
6551 listDelNode(fakeClient
->reply
,listFirst(fakeClient
->reply
));
6552 /* Clean up, ready for the next command */
6553 for (j
= 0; j
< argc
; j
++) decrRefCount(argv
[j
]);
6555 /* Handle swapping while loading big datasets when VM is on */
6557 if (server
.vm_enabled
&& (loadedkeys
% 5000) == 0) {
6558 while (zmalloc_used_memory() > server
.vm_max_memory
) {
6559 if (vmSwapOneObject() == REDIS_ERR
) break;
6564 freeFakeClient(fakeClient
);
6569 redisLog(REDIS_WARNING
,"Unexpected end of file reading the append only file");
6571 redisLog(REDIS_WARNING
,"Unrecoverable error reading the append only file: %s", strerror(errno
));
6575 redisLog(REDIS_WARNING
,"Bad file format reading the append only file");
6579 /* Write an object into a file in the bulk format $<count>\r\n<payload>\r\n */
6580 static int fwriteBulk(FILE *fp
, robj
*obj
) {
6582 obj
= getDecodedObject(obj
);
6583 snprintf(buf
,sizeof(buf
),"$%ld\r\n",(long)sdslen(obj
->ptr
));
6584 if (fwrite(buf
,strlen(buf
),1,fp
) == 0) goto err
;
6585 if (sdslen(obj
->ptr
) && fwrite(obj
->ptr
,sdslen(obj
->ptr
),1,fp
) == 0)
6587 if (fwrite("\r\n",2,1,fp
) == 0) goto err
;
6595 /* Write a double value in bulk format $<count>\r\n<payload>\r\n */
6596 static int fwriteBulkDouble(FILE *fp
, double d
) {
6597 char buf
[128], dbuf
[128];
6599 snprintf(dbuf
,sizeof(dbuf
),"%.17g\r\n",d
);
6600 snprintf(buf
,sizeof(buf
),"$%lu\r\n",(unsigned long)strlen(dbuf
)-2);
6601 if (fwrite(buf
,strlen(buf
),1,fp
) == 0) return 0;
6602 if (fwrite(dbuf
,strlen(dbuf
),1,fp
) == 0) return 0;
6606 /* Write a long value in bulk format $<count>\r\n<payload>\r\n */
6607 static int fwriteBulkLong(FILE *fp
, long l
) {
6608 char buf
[128], lbuf
[128];
6610 snprintf(lbuf
,sizeof(lbuf
),"%ld\r\n",l
);
6611 snprintf(buf
,sizeof(buf
),"$%lu\r\n",(unsigned long)strlen(lbuf
)-2);
6612 if (fwrite(buf
,strlen(buf
),1,fp
) == 0) return 0;
6613 if (fwrite(lbuf
,strlen(lbuf
),1,fp
) == 0) return 0;
6617 /* Write a sequence of commands able to fully rebuild the dataset into
6618 * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */
6619 static int rewriteAppendOnlyFile(char *filename
) {
6620 dictIterator
*di
= NULL
;
6625 time_t now
= time(NULL
);
6627 /* Note that we have to use a different temp name here compared to the
6628 * one used by rewriteAppendOnlyFileBackground() function. */
6629 snprintf(tmpfile
,256,"temp-rewriteaof-%d.aof", (int) getpid());
6630 fp
= fopen(tmpfile
,"w");
6632 redisLog(REDIS_WARNING
, "Failed rewriting the append only file: %s", strerror(errno
));
6635 for (j
= 0; j
< server
.dbnum
; j
++) {
6636 char selectcmd
[] = "*2\r\n$6\r\nSELECT\r\n";
6637 redisDb
*db
= server
.db
+j
;
6639 if (dictSize(d
) == 0) continue;
6640 di
= dictGetIterator(d
);
6646 /* SELECT the new DB */
6647 if (fwrite(selectcmd
,sizeof(selectcmd
)-1,1,fp
) == 0) goto werr
;
6648 if (fwriteBulkLong(fp
,j
) == 0) goto werr
;
6650 /* Iterate this DB writing every entry */
6651 while((de
= dictNext(di
)) != NULL
) {
6656 key
= dictGetEntryKey(de
);
6657 if (!server
.vm_enabled
|| key
->storage
== REDIS_VM_MEMORY
) {
6658 o
= dictGetEntryVal(de
);
6661 o
= vmPreviewObject(key
);
6662 key
= dupStringObject(key
);
6665 expiretime
= getExpire(db
,key
);
6667 /* Save the key and associated value */
6668 if (o
->type
== REDIS_STRING
) {
6669 /* Emit a SET command */
6670 char cmd
[]="*3\r\n$3\r\nSET\r\n";
6671 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6673 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6674 if (fwriteBulk(fp
,o
) == 0) goto werr
;
6675 } else if (o
->type
== REDIS_LIST
) {
6676 /* Emit the RPUSHes needed to rebuild the list */
6677 list
*list
= o
->ptr
;
6681 while((ln
= listYield(list
))) {
6682 char cmd
[]="*3\r\n$5\r\nRPUSH\r\n";
6683 robj
*eleobj
= listNodeValue(ln
);
6685 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6686 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6687 if (fwriteBulk(fp
,eleobj
) == 0) goto werr
;
6689 } else if (o
->type
== REDIS_SET
) {
6690 /* Emit the SADDs needed to rebuild the set */
6692 dictIterator
*di
= dictGetIterator(set
);
6695 while((de
= dictNext(di
)) != NULL
) {
6696 char cmd
[]="*3\r\n$4\r\nSADD\r\n";
6697 robj
*eleobj
= dictGetEntryKey(de
);
6699 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6700 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6701 if (fwriteBulk(fp
,eleobj
) == 0) goto werr
;
6703 dictReleaseIterator(di
);
6704 } else if (o
->type
== REDIS_ZSET
) {
6705 /* Emit the ZADDs needed to rebuild the sorted set */
6707 dictIterator
*di
= dictGetIterator(zs
->dict
);
6710 while((de
= dictNext(di
)) != NULL
) {
6711 char cmd
[]="*4\r\n$4\r\nZADD\r\n";
6712 robj
*eleobj
= dictGetEntryKey(de
);
6713 double *score
= dictGetEntryVal(de
);
6715 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6716 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6717 if (fwriteBulkDouble(fp
,*score
) == 0) goto werr
;
6718 if (fwriteBulk(fp
,eleobj
) == 0) goto werr
;
6720 dictReleaseIterator(di
);
6722 redisAssert(0 != 0);
6724 /* Save the expire time */
6725 if (expiretime
!= -1) {
6726 char cmd
[]="*3\r\n$8\r\nEXPIREAT\r\n";
6727 /* If this key is already expired skip it */
6728 if (expiretime
< now
) continue;
6729 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6730 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6731 if (fwriteBulkLong(fp
,expiretime
) == 0) goto werr
;
6733 /* We created a few temp objects if the key->value pair
6734 * was about a swapped out object. Free both. */
6740 dictReleaseIterator(di
);
6743 /* Make sure data will not remain on the OS's output buffers */
6748 /* Use RENAME to make sure the DB file is changed atomically only
6749 * if the generate DB file is ok. */
6750 if (rename(tmpfile
,filename
) == -1) {
6751 redisLog(REDIS_WARNING
,"Error moving temp append only file on the final destination: %s", strerror(errno
));
6755 redisLog(REDIS_NOTICE
,"SYNC append only file rewrite performed");
6761 redisLog(REDIS_WARNING
,"Write error writing append only file on disk: %s", strerror(errno
));
6762 if (di
) dictReleaseIterator(di
);
6766 /* This is how rewriting of the append only file in background works:
6768 * 1) The user calls BGREWRITEAOF
6769 * 2) Redis calls this function, that forks():
6770 * 2a) the child rewrite the append only file in a temp file.
6771 * 2b) the parent accumulates differences in server.bgrewritebuf.
6772 * 3) When the child finished '2a' exists.
6773 * 4) The parent will trap the exit code, if it's OK, will append the
6774 * data accumulated into server.bgrewritebuf into the temp file, and
6775 * finally will rename(2) the temp file in the actual file name.
6776 * The the new file is reopened as the new append only file. Profit!
6778 static int rewriteAppendOnlyFileBackground(void) {
6781 if (server
.bgrewritechildpid
!= -1) return REDIS_ERR
;
6782 if ((childpid
= fork()) == 0) {
6787 snprintf(tmpfile
,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
6788 if (rewriteAppendOnlyFile(tmpfile
) == REDIS_OK
) {
6795 if (childpid
== -1) {
6796 redisLog(REDIS_WARNING
,
6797 "Can't rewrite append only file in background: fork: %s",
6801 redisLog(REDIS_NOTICE
,
6802 "Background append only file rewriting started by pid %d",childpid
);
6803 server
.bgrewritechildpid
= childpid
;
6804 /* We set appendseldb to -1 in order to force the next call to the
6805 * feedAppendOnlyFile() to issue a SELECT command, so the differences
6806 * accumulated by the parent into server.bgrewritebuf will start
6807 * with a SELECT statement and it will be safe to merge. */
6808 server
.appendseldb
= -1;
6811 return REDIS_OK
; /* unreached */
6814 static void bgrewriteaofCommand(redisClient
*c
) {
6815 if (server
.bgrewritechildpid
!= -1) {
6816 addReplySds(c
,sdsnew("-ERR background append only file rewriting already in progress\r\n"));
6819 if (rewriteAppendOnlyFileBackground() == REDIS_OK
) {
6820 char *status
= "+Background append only file rewriting started\r\n";
6821 addReplySds(c
,sdsnew(status
));
6823 addReply(c
,shared
.err
);
6827 static void aofRemoveTempFile(pid_t childpid
) {
6830 snprintf(tmpfile
,256,"temp-rewriteaof-bg-%d.aof", (int) childpid
);
6834 /* =============================== Virtual Memory =========================== */
6835 static void vmInit(void) {
6838 server
.vm_fp
= fopen("/tmp/redisvm","w+b");
6839 if (server
.vm_fp
== NULL
) {
6840 redisLog(REDIS_WARNING
,"Impossible to open the swap file. Exiting.");
6843 server
.vm_fd
= fileno(server
.vm_fp
);
6844 server
.vm_next_page
= 0;
6845 server
.vm_near_pages
= 0;
6846 server
.vm_stats_used_pages
= 0;
6847 server
.vm_stats_swapped_objects
= 0;
6848 server
.vm_stats_swapouts
= 0;
6849 server
.vm_stats_swapins
= 0;
6850 totsize
= server
.vm_pages
*server
.vm_page_size
;
6851 redisLog(REDIS_NOTICE
,"Allocating %lld bytes of swap file",totsize
);
6852 if (ftruncate(server
.vm_fd
,totsize
) == -1) {
6853 redisLog(REDIS_WARNING
,"Can't ftruncate swap file: %s. Exiting.",
6857 redisLog(REDIS_NOTICE
,"Swap file allocated with success");
6859 server
.vm_bitmap
= zmalloc((server
.vm_pages
+7)/8);
6860 redisLog(REDIS_VERBOSE
,"Allocated %lld bytes page table for %lld pages",
6861 (long long) (server
.vm_pages
+7)/8, server
.vm_pages
);
6862 memset(server
.vm_bitmap
,0,(server
.vm_pages
+7)/8);
6863 /* Try to remove the swap file, so the OS will really delete it from the
6864 * file system when Redis exists. */
6865 unlink("/tmp/redisvm");
6867 /* Initialize threaded I/O */
6868 server
.io_jobs
= listCreate();
6869 server
.io_done
= listCreate();
6870 server
.io_clients
= listCreate();
6871 pthread_mutex_init(&server
.io_mutex
,NULL
);
6872 server
.io_active_threads
= 0;
6875 /* Mark the page as used */
6876 static void vmMarkPageUsed(off_t page
) {
6877 off_t byte
= page
/8;
6879 server
.vm_bitmap
[byte
] |= 1<<bit
;
6880 redisLog(REDIS_DEBUG
,"Mark used: %lld (byte:%lld bit:%d)\n",
6881 (long long)page
, (long long)byte
, bit
);
6884 /* Mark N contiguous pages as used, with 'page' being the first. */
6885 static void vmMarkPagesUsed(off_t page
, off_t count
) {
6888 for (j
= 0; j
< count
; j
++)
6889 vmMarkPageUsed(page
+j
);
6890 server
.vm_stats_used_pages
+= count
;
6893 /* Mark the page as free */
6894 static void vmMarkPageFree(off_t page
) {
6895 off_t byte
= page
/8;
6897 server
.vm_bitmap
[byte
] &= ~(1<<bit
);
6900 /* Mark N contiguous pages as free, with 'page' being the first. */
6901 static void vmMarkPagesFree(off_t page
, off_t count
) {
6904 for (j
= 0; j
< count
; j
++)
6905 vmMarkPageFree(page
+j
);
6906 server
.vm_stats_used_pages
-= count
;
6909 /* Test if the page is free */
6910 static int vmFreePage(off_t page
) {
6911 off_t byte
= page
/8;
6913 return (server
.vm_bitmap
[byte
] & (1<<bit
)) == 0;
6916 /* Find N contiguous free pages storing the first page of the cluster in *first.
6917 * Returns REDIS_OK if it was able to find N contiguous pages, otherwise
6918 * REDIS_ERR is returned.
6920 * This function uses a simple algorithm: we try to allocate
6921 * REDIS_VM_MAX_NEAR_PAGES sequentially, when we reach this limit we start
6922 * again from the start of the swap file searching for free spaces.
6924 * If it looks pretty clear that there are no free pages near our offset
6925 * we try to find less populated places doing a forward jump of
6926 * REDIS_VM_MAX_RANDOM_JUMP, then we start scanning again a few pages
6927 * without hurry, and then we jump again and so forth...
6929 * This function can be improved using a free list to avoid to guess
6930 * too much, since we could collect data about freed pages.
6932 * note: I implemented this function just after watching an episode of
6933 * Battlestar Galactica, where the hybrid was continuing to say "JUMP!"
6935 static int vmFindContiguousPages(off_t
*first
, int n
) {
6936 off_t base
, offset
= 0, since_jump
= 0, numfree
= 0;
6938 if (server
.vm_near_pages
== REDIS_VM_MAX_NEAR_PAGES
) {
6939 server
.vm_near_pages
= 0;
6940 server
.vm_next_page
= 0;
6942 server
.vm_near_pages
++; /* Yet another try for pages near to the old ones */
6943 base
= server
.vm_next_page
;
6945 while(offset
< server
.vm_pages
) {
6946 off_t
this = base
+offset
;
6948 redisLog(REDIS_DEBUG
, "THIS: %lld (%c)\n", (long long) this, vmFreePage(this) ? 'F' : 'X');
6949 /* If we overflow, restart from page zero */
6950 if (this >= server
.vm_pages
) {
6951 this -= server
.vm_pages
;
6953 /* Just overflowed, what we found on tail is no longer
6954 * interesting, as it's no longer contiguous. */
6958 if (vmFreePage(this)) {
6959 /* This is a free page */
6961 /* Already got N free pages? Return to the caller, with success */
6963 *first
= this-(n
-1);
6964 server
.vm_next_page
= this+1;
6968 /* The current one is not a free page */
6972 /* Fast-forward if the current page is not free and we already
6973 * searched enough near this place. */
6975 if (!numfree
&& since_jump
>= REDIS_VM_MAX_RANDOM_JUMP
/4) {
6976 offset
+= random() % REDIS_VM_MAX_RANDOM_JUMP
;
6978 /* Note that even if we rewind after the jump, we are don't need
6979 * to make sure numfree is set to zero as we only jump *if* it
6980 * is set to zero. */
6982 /* Otherwise just check the next page */
6989 /* Swap the 'val' object relative to 'key' into disk. Store all the information
6990 * needed to later retrieve the object into the key object.
6991 * If we can't find enough contiguous empty pages to swap the object on disk
6992 * REDIS_ERR is returned. */
6993 static int vmSwapObject(robj
*key
, robj
*val
) {
6994 off_t pages
= rdbSavedObjectPages(val
);
6997 assert(key
->storage
== REDIS_VM_MEMORY
);
6998 assert(key
->refcount
== 1);
6999 if (vmFindContiguousPages(&page
,pages
) == REDIS_ERR
) return REDIS_ERR
;
7000 if (fseeko(server
.vm_fp
,page
*server
.vm_page_size
,SEEK_SET
) == -1) {
7001 redisLog(REDIS_WARNING
,
7002 "Critical VM problem in vmSwapObject(): can't seek: %s",
7006 rdbSaveObject(server
.vm_fp
,val
);
7007 key
->vm
.page
= page
;
7008 key
->vm
.usedpages
= pages
;
7009 key
->storage
= REDIS_VM_SWAPPED
;
7010 key
->vtype
= val
->type
;
7011 decrRefCount(val
); /* Deallocate the object from memory. */
7012 vmMarkPagesUsed(page
,pages
);
7013 redisLog(REDIS_DEBUG
,"VM: object %s swapped out at %lld (%lld pages)",
7014 (unsigned char*) key
->ptr
,
7015 (unsigned long long) page
, (unsigned long long) pages
);
7016 server
.vm_stats_swapped_objects
++;
7017 server
.vm_stats_swapouts
++;
7018 fflush(server
.vm_fp
);
7022 /* Load the value object relative to the 'key' object from swap to memory.
7023 * The newly allocated object is returned.
7025 * If preview is true the unserialized object is returned to the caller but
7026 * no changes are made to the key object, nor the pages are marked as freed */
7027 static robj
*vmGenericLoadObject(robj
*key
, int preview
) {
7030 redisAssert(key
->storage
== REDIS_VM_SWAPPED
);
7031 if (fseeko(server
.vm_fp
,key
->vm
.page
*server
.vm_page_size
,SEEK_SET
) == -1) {
7032 redisLog(REDIS_WARNING
,
7033 "Unrecoverable VM problem in vmLoadObject(): can't seek: %s",
7037 val
= rdbLoadObject(key
->vtype
,server
.vm_fp
);
7039 redisLog(REDIS_WARNING
, "Unrecoverable VM problem in vmLoadObject(): can't load object from swap file: %s", strerror(errno
));
7043 key
->storage
= REDIS_VM_MEMORY
;
7044 key
->vm
.atime
= server
.unixtime
;
7045 vmMarkPagesFree(key
->vm
.page
,key
->vm
.usedpages
);
7046 redisLog(REDIS_DEBUG
, "VM: object %s loaded from disk",
7047 (unsigned char*) key
->ptr
);
7048 server
.vm_stats_swapped_objects
--;
7050 redisLog(REDIS_DEBUG
, "VM: object %s previewed from disk",
7051 (unsigned char*) key
->ptr
);
7053 server
.vm_stats_swapins
++;
7057 /* Plain object loading, from swap to memory */
7058 static robj
*vmLoadObject(robj
*key
) {
7059 return vmGenericLoadObject(key
,0);
7062 /* Just load the value on disk, without to modify the key.
7063 * This is useful when we want to perform some operation on the value
7064 * without to really bring it from swap to memory, like while saving the
7065 * dataset or rewriting the append only log. */
7066 static robj
*vmPreviewObject(robj
*key
) {
7067 return vmGenericLoadObject(key
,1);
7070 /* How a good candidate is this object for swapping?
7071 * The better candidate it is, the greater the returned value.
7073 * Currently we try to perform a fast estimation of the object size in
7074 * memory, and combine it with aging informations.
7076 * Basically swappability = idle-time * log(estimated size)
7078 * Bigger objects are preferred over smaller objects, but not
7079 * proportionally, this is why we use the logarithm. This algorithm is
7080 * just a first try and will probably be tuned later. */
7081 static double computeObjectSwappability(robj
*o
) {
7082 time_t age
= server
.unixtime
- o
->vm
.atime
;
7086 struct dictEntry
*de
;
7089 if (age
<= 0) return 0;
7092 if (o
->encoding
!= REDIS_ENCODING_RAW
) {
7095 asize
= sdslen(o
->ptr
)+sizeof(*o
)+sizeof(long)*2;
7100 listNode
*ln
= listFirst(l
);
7102 asize
= sizeof(list
);
7104 robj
*ele
= ln
->value
;
7107 elesize
= (ele
->encoding
== REDIS_ENCODING_RAW
) ?
7108 (sizeof(*o
)+sdslen(ele
->ptr
)) :
7110 asize
+= (sizeof(listNode
)+elesize
)*listLength(l
);
7115 z
= (o
->type
== REDIS_ZSET
);
7116 d
= z
? ((zset
*)o
->ptr
)->dict
: o
->ptr
;
7118 asize
= sizeof(dict
)+(sizeof(struct dictEntry
*)*dictSlots(d
));
7119 if (z
) asize
+= sizeof(zset
)-sizeof(dict
);
7124 de
= dictGetRandomKey(d
);
7125 ele
= dictGetEntryKey(de
);
7126 elesize
= (ele
->encoding
== REDIS_ENCODING_RAW
) ?
7127 (sizeof(*o
)+sdslen(ele
->ptr
)) :
7129 asize
+= (sizeof(struct dictEntry
)+elesize
)*dictSize(d
);
7130 if (z
) asize
+= sizeof(zskiplistNode
)*dictSize(d
);
7134 return (double)asize
*log(1+asize
);
7137 /* Try to swap an object that's a good candidate for swapping.
7138 * Returns REDIS_OK if the object was swapped, REDIS_ERR if it's not possible
7139 * to swap any object at all. */
7140 static int vmSwapOneObject(void) {
7142 struct dictEntry
*best
= NULL
;
7143 double best_swappability
= 0;
7146 for (j
= 0; j
< server
.dbnum
; j
++) {
7147 redisDb
*db
= server
.db
+j
;
7148 int maxtries
= 1000;
7150 if (dictSize(db
->dict
) == 0) continue;
7151 for (i
= 0; i
< 5; i
++) {
7153 double swappability
;
7155 if (maxtries
) maxtries
--;
7156 de
= dictGetRandomKey(db
->dict
);
7157 key
= dictGetEntryKey(de
);
7158 val
= dictGetEntryVal(de
);
7159 if (key
->storage
!= REDIS_VM_MEMORY
) {
7160 if (maxtries
) i
--; /* don't count this try */
7163 swappability
= computeObjectSwappability(val
);
7164 if (!best
|| swappability
> best_swappability
) {
7166 best_swappability
= swappability
;
7171 redisLog(REDIS_DEBUG
,"No swappable key found!");
7174 key
= dictGetEntryKey(best
);
7175 val
= dictGetEntryVal(best
);
7177 redisLog(REDIS_DEBUG
,"Key with best swappability: %s, %f",
7178 key
->ptr
, best_swappability
);
7180 /* Unshare the key if needed */
7181 if (key
->refcount
> 1) {
7182 robj
*newkey
= dupStringObject(key
);
7184 key
= dictGetEntryKey(best
) = newkey
;
7187 if (vmSwapObject(key
,val
) == REDIS_OK
) {
7188 dictGetEntryVal(best
) = NULL
;
7195 /* Return true if it's safe to swap out objects in a given moment.
7196 * Basically we don't want to swap objects out while there is a BGSAVE
7197 * or a BGAEOREWRITE running in backgroud. */
7198 static int vmCanSwapOut(void) {
7199 return (server
.bgsavechildpid
== -1 && server
.bgrewritechildpid
== -1);
7202 /* Delete a key if swapped. Returns 1 if the key was found, was swapped
7203 * and was deleted. Otherwise 0 is returned. */
7204 static int deleteIfSwapped(redisDb
*db
, robj
*key
) {
7208 if ((de
= dictFind(db
->dict
,key
)) == NULL
) return 0;
7209 foundkey
= dictGetEntryKey(de
);
7210 if (foundkey
->storage
== REDIS_VM_MEMORY
) return 0;
7215 /* ================================= Debugging ============================== */
7217 static void debugCommand(redisClient
*c
) {
7218 if (!strcasecmp(c
->argv
[1]->ptr
,"segfault")) {
7220 } else if (!strcasecmp(c
->argv
[1]->ptr
,"reload")) {
7221 if (rdbSave(server
.dbfilename
) != REDIS_OK
) {
7222 addReply(c
,shared
.err
);
7226 if (rdbLoad(server
.dbfilename
) != REDIS_OK
) {
7227 addReply(c
,shared
.err
);
7230 redisLog(REDIS_WARNING
,"DB reloaded by DEBUG RELOAD");
7231 addReply(c
,shared
.ok
);
7232 } else if (!strcasecmp(c
->argv
[1]->ptr
,"loadaof")) {
7234 if (loadAppendOnlyFile(server
.appendfilename
) != REDIS_OK
) {
7235 addReply(c
,shared
.err
);
7238 redisLog(REDIS_WARNING
,"Append Only File loaded by DEBUG LOADAOF");
7239 addReply(c
,shared
.ok
);
7240 } else if (!strcasecmp(c
->argv
[1]->ptr
,"object") && c
->argc
== 3) {
7241 dictEntry
*de
= dictFind(c
->db
->dict
,c
->argv
[2]);
7245 addReply(c
,shared
.nokeyerr
);
7248 key
= dictGetEntryKey(de
);
7249 val
= dictGetEntryVal(de
);
7250 if (server
.vm_enabled
&& key
->storage
== REDIS_VM_MEMORY
) {
7251 addReplySds(c
,sdscatprintf(sdsempty(),
7252 "+Key at:%p refcount:%d, value at:%p refcount:%d "
7253 "encoding:%d serializedlength:%lld\r\n",
7254 (void*)key
, key
->refcount
, (void*)val
, val
->refcount
,
7255 val
->encoding
, rdbSavedObjectLen(val
)));
7257 addReplySds(c
,sdscatprintf(sdsempty(),
7258 "+Key at:%p refcount:%d, value swapped at: page %llu "
7259 "using %llu pages\r\n",
7260 (void*)key
, key
->refcount
, (unsigned long long) key
->vm
.page
,
7261 (unsigned long long) key
->vm
.usedpages
));
7263 } else if (!strcasecmp(c
->argv
[1]->ptr
,"swapout") && c
->argc
== 3) {
7264 dictEntry
*de
= dictFind(c
->db
->dict
,c
->argv
[2]);
7267 if (!server
.vm_enabled
) {
7268 addReplySds(c
,sdsnew("-ERR Virtual Memory is disabled\r\n"));
7272 addReply(c
,shared
.nokeyerr
);
7275 key
= dictGetEntryKey(de
);
7276 val
= dictGetEntryVal(de
);
7277 /* If the key is shared we want to create a copy */
7278 if (key
->refcount
> 1) {
7279 robj
*newkey
= dupStringObject(key
);
7281 key
= dictGetEntryKey(de
) = newkey
;
7284 if (key
->storage
!= REDIS_VM_MEMORY
) {
7285 addReplySds(c
,sdsnew("-ERR This key is not in memory\r\n"));
7286 } else if (vmSwapObject(key
,val
) == REDIS_OK
) {
7287 dictGetEntryVal(de
) = NULL
;
7288 addReply(c
,shared
.ok
);
7290 addReply(c
,shared
.err
);
7293 addReplySds(c
,sdsnew(
7294 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>|SWAPOUT <key>|RELOAD]\r\n"));
7298 static void _redisAssert(char *estr
) {
7299 redisLog(REDIS_WARNING
,"=== ASSERTION FAILED ===");
7300 redisLog(REDIS_WARNING
,"==> %s\n",estr
);
7301 #ifdef HAVE_BACKTRACE
7302 redisLog(REDIS_WARNING
,"(forcing SIGSEGV in order to print the stack trace)");
7307 /* =================================== Main! ================================ */
7310 int linuxOvercommitMemoryValue(void) {
7311 FILE *fp
= fopen("/proc/sys/vm/overcommit_memory","r");
7315 if (fgets(buf
,64,fp
) == NULL
) {
7324 void linuxOvercommitMemoryWarning(void) {
7325 if (linuxOvercommitMemoryValue() == 0) {
7326 redisLog(REDIS_WARNING
,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
7329 #endif /* __linux__ */
7331 static void daemonize(void) {
7335 if (fork() != 0) exit(0); /* parent exits */
7336 printf("New pid: %d\n", getpid());
7337 setsid(); /* create a new session */
7339 /* Every output goes to /dev/null. If Redis is daemonized but
7340 * the 'logfile' is set to 'stdout' in the configuration file
7341 * it will not log at all. */
7342 if ((fd
= open("/dev/null", O_RDWR
, 0)) != -1) {
7343 dup2(fd
, STDIN_FILENO
);
7344 dup2(fd
, STDOUT_FILENO
);
7345 dup2(fd
, STDERR_FILENO
);
7346 if (fd
> STDERR_FILENO
) close(fd
);
7348 /* Try to write the pid file */
7349 fp
= fopen(server
.pidfile
,"w");
7351 fprintf(fp
,"%d\n",getpid());
7356 int main(int argc
, char **argv
) {
7359 resetServerSaveParams();
7360 loadServerConfig(argv
[1]);
7361 } else if (argc
> 2) {
7362 fprintf(stderr
,"Usage: ./redis-server [/path/to/redis.conf]\n");
7365 redisLog(REDIS_WARNING
,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
7367 if (server
.daemonize
) daemonize();
7369 redisLog(REDIS_NOTICE
,"Server started, Redis version " REDIS_VERSION
);
7371 linuxOvercommitMemoryWarning();
7373 if (server
.appendonly
) {
7374 if (loadAppendOnlyFile(server
.appendfilename
) == REDIS_OK
)
7375 redisLog(REDIS_NOTICE
,"DB loaded from append only file");
7377 if (rdbLoad(server
.dbfilename
) == REDIS_OK
)
7378 redisLog(REDIS_NOTICE
,"DB loaded from disk");
7380 if (aeCreateFileEvent(server
.el
, server
.fd
, AE_READABLE
,
7381 acceptHandler
, NULL
) == AE_ERR
) oom("creating file event");
7382 redisLog(REDIS_NOTICE
,"The server is now ready to accept connections on port %d", server
.port
);
7384 aeDeleteEventLoop(server
.el
);
7388 /* ============================= Backtrace support ========================= */
7390 #ifdef HAVE_BACKTRACE
7391 static char *findFuncName(void *pointer
, unsigned long *offset
);
7393 static void *getMcontextEip(ucontext_t
*uc
) {
7394 #if defined(__FreeBSD__)
7395 return (void*) uc
->uc_mcontext
.mc_eip
;
7396 #elif defined(__dietlibc__)
7397 return (void*) uc
->uc_mcontext
.eip
;
7398 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
7400 return (void*) uc
->uc_mcontext
->__ss
.__rip
;
7402 return (void*) uc
->uc_mcontext
->__ss
.__eip
;
7404 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
7405 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
7406 return (void*) uc
->uc_mcontext
->__ss
.__rip
;
7408 return (void*) uc
->uc_mcontext
->__ss
.__eip
;
7410 #elif defined(__i386__) || defined(__X86_64__) || defined(__x86_64__)
7411 return (void*) uc
->uc_mcontext
.gregs
[REG_EIP
]; /* Linux 32/64 bit */
7412 #elif defined(__ia64__) /* Linux IA64 */
7413 return (void*) uc
->uc_mcontext
.sc_ip
;
7419 static void segvHandler(int sig
, siginfo_t
*info
, void *secret
) {
7421 char **messages
= NULL
;
7422 int i
, trace_size
= 0;
7423 unsigned long offset
=0;
7424 ucontext_t
*uc
= (ucontext_t
*) secret
;
7426 REDIS_NOTUSED(info
);
7428 redisLog(REDIS_WARNING
,
7429 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION
, sig
);
7430 infostring
= genRedisInfoString();
7431 redisLog(REDIS_WARNING
, "%s",infostring
);
7432 /* It's not safe to sdsfree() the returned string under memory
7433 * corruption conditions. Let it leak as we are going to abort */
7435 trace_size
= backtrace(trace
, 100);
7436 /* overwrite sigaction with caller's address */
7437 if (getMcontextEip(uc
) != NULL
) {
7438 trace
[1] = getMcontextEip(uc
);
7440 messages
= backtrace_symbols(trace
, trace_size
);
7442 for (i
=1; i
<trace_size
; ++i
) {
7443 char *fn
= findFuncName(trace
[i
], &offset
), *p
;
7445 p
= strchr(messages
[i
],'+');
7446 if (!fn
|| (p
&& ((unsigned long)strtol(p
+1,NULL
,10)) < offset
)) {
7447 redisLog(REDIS_WARNING
,"%s", messages
[i
]);
7449 redisLog(REDIS_WARNING
,"%d redis-server %p %s + %d", i
, trace
[i
], fn
, (unsigned int)offset
);
7452 /* free(messages); Don't call free() with possibly corrupted memory. */
7456 static void setupSigSegvAction(void) {
7457 struct sigaction act
;
7459 sigemptyset (&act
.sa_mask
);
7460 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
7461 * is used. Otherwise, sa_handler is used */
7462 act
.sa_flags
= SA_NODEFER
| SA_ONSTACK
| SA_RESETHAND
| SA_SIGINFO
;
7463 act
.sa_sigaction
= segvHandler
;
7464 sigaction (SIGSEGV
, &act
, NULL
);
7465 sigaction (SIGBUS
, &act
, NULL
);
7466 sigaction (SIGFPE
, &act
, NULL
);
7467 sigaction (SIGILL
, &act
, NULL
);
7468 sigaction (SIGBUS
, &act
, NULL
);
7472 #include "staticsymbols.h"
7473 /* This function try to convert a pointer into a function name. It's used in
7474 * oreder to provide a backtrace under segmentation fault that's able to
7475 * display functions declared as static (otherwise the backtrace is useless). */
7476 static char *findFuncName(void *pointer
, unsigned long *offset
){
7478 unsigned long off
, minoff
= 0;
7480 /* Try to match against the Symbol with the smallest offset */
7481 for (i
=0; symsTable
[i
].pointer
; i
++) {
7482 unsigned long lp
= (unsigned long) pointer
;
7484 if (lp
!= (unsigned long)-1 && lp
>= symsTable
[i
].pointer
) {
7485 off
=lp
-symsTable
[i
].pointer
;
7486 if (ret
< 0 || off
< minoff
) {
7492 if (ret
== -1) return NULL
;
7494 return symsTable
[ret
].name
;
7496 #else /* HAVE_BACKTRACE */
7497 static void setupSigSegvAction(void) {
7499 #endif /* HAVE_BACKTRACE */