2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
30 #define REDIS_VERSION "1.3.2"
40 #define __USE_POSIX199309
46 #endif /* HAVE_BACKTRACE */
54 #include <arpa/inet.h>
58 #include <sys/resource.h>
65 #include "solarisfixes.h"
69 #include "ae.h" /* Event driven programming library */
70 #include "sds.h" /* Dynamic safe strings */
71 #include "anet.h" /* Networking the easy way */
72 #include "dict.h" /* Hash tables */
73 #include "adlist.h" /* Linked lists */
74 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
75 #include "lzf.h" /* LZF compression library */
76 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
82 /* Static server configuration */
83 #define REDIS_SERVERPORT 6379 /* TCP port */
84 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
85 #define REDIS_IOBUF_LEN 1024
86 #define REDIS_LOADBUF_LEN 1024
87 #define REDIS_STATIC_ARGS 4
88 #define REDIS_DEFAULT_DBNUM 16
89 #define REDIS_CONFIGLINE_MAX 1024
90 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
91 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
92 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
93 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
94 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
96 /* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */
97 #define REDIS_WRITEV_THRESHOLD 3
98 /* Max number of iovecs used for each writev call */
99 #define REDIS_WRITEV_IOVEC_COUNT 256
101 /* Hash table parameters */
102 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
105 #define REDIS_CMD_BULK 1 /* Bulk write command */
106 #define REDIS_CMD_INLINE 2 /* Inline command */
107 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
108 this flags will return an error when the 'maxmemory' option is set in the
109 config file and the server is using more than maxmemory bytes of memory.
110 In short this commands are denied on low memory conditions. */
111 #define REDIS_CMD_DENYOOM 4
114 #define REDIS_STRING 0
120 /* Objects encoding */
121 #define REDIS_ENCODING_RAW 0 /* Raw representation */
122 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
124 /* Object types only used for dumping to disk */
125 #define REDIS_EXPIRETIME 253
126 #define REDIS_SELECTDB 254
127 #define REDIS_EOF 255
129 /* Defines related to the dump file format. To store 32 bits lengths for short
130 * keys requires a lot of space, so we check the most significant 2 bits of
131 * the first byte to interpreter the length:
133 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
134 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
135 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
136 * 11|000000 this means: specially encoded object will follow. The six bits
137 * number specify the kind of object that follows.
138 * See the REDIS_RDB_ENC_* defines.
140 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
141 * values, will fit inside. */
142 #define REDIS_RDB_6BITLEN 0
143 #define REDIS_RDB_14BITLEN 1
144 #define REDIS_RDB_32BITLEN 2
145 #define REDIS_RDB_ENCVAL 3
146 #define REDIS_RDB_LENERR UINT_MAX
148 /* When a length of a string object stored on disk has the first two bits
149 * set, the remaining two bits specify a special encoding for the object
150 * accordingly to the following defines: */
151 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
152 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
153 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
154 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
156 /* Virtual memory object->where field. */
157 #define REDIS_VM_MEMORY 0 /* The object is on memory */
158 #define REDIS_VM_SWAPPED 1 /* The object is on disk */
159 #define REDIS_VM_SWAPPING 2 /* Redis is swapping this object on disk */
160 #define REDIS_VM_LOADING 3 /* Redis is loading this object from disk */
162 /* Virtual memory static configuration stuff.
163 * Check vmFindContiguousPages() to know more about this magic numbers. */
164 #define REDIS_VM_MAX_NEAR_PAGES 65536
165 #define REDIS_VM_MAX_RANDOM_JUMP 4096
166 #define REDIS_VM_MAX_THREADS 32
169 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
170 #define REDIS_SLAVE 2 /* This client is a slave server */
171 #define REDIS_MASTER 4 /* This client is a master server */
172 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
173 #define REDIS_MULTI 16 /* This client is in a MULTI context */
174 #define REDIS_BLOCKED 32 /* The client is waiting in a blocking operation */
175 #define REDIS_IO_WAIT 64 /* The client is waiting for Virtual Memory I/O */
177 /* Slave replication state - slave side */
178 #define REDIS_REPL_NONE 0 /* No active replication */
179 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
180 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
182 /* Slave replication state - from the point of view of master
183 * Note that in SEND_BULK and ONLINE state the slave receives new updates
184 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
185 * to start the next background saving in order to send updates to it. */
186 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
187 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
188 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
189 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
191 /* List related stuff */
195 /* Sort operations */
196 #define REDIS_SORT_GET 0
197 #define REDIS_SORT_ASC 1
198 #define REDIS_SORT_DESC 2
199 #define REDIS_SORTKEY_MAX 1024
202 #define REDIS_DEBUG 0
203 #define REDIS_VERBOSE 1
204 #define REDIS_NOTICE 2
205 #define REDIS_WARNING 3
207 /* Anti-warning macro... */
208 #define REDIS_NOTUSED(V) ((void) V)
210 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
211 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
213 /* Append only defines */
214 #define APPENDFSYNC_NO 0
215 #define APPENDFSYNC_ALWAYS 1
216 #define APPENDFSYNC_EVERYSEC 2
218 /* We can print the stacktrace, so our assert is defined this way: */
219 #define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e),exit(1)))
220 static void _redisAssert(char *estr
);
222 /*================================= Data types ============================== */
224 /* A redis object, that is a type able to hold a string / list / set */
226 /* The VM object structure */
227 struct redisObjectVM
{
228 off_t page
; /* the page at witch the object is stored on disk */
229 off_t usedpages
; /* number of pages used on disk */
230 time_t atime
; /* Last access time */
233 /* The actual Redis Object */
234 typedef struct redisObject
{
237 unsigned char encoding
;
238 unsigned char storage
; /* If this object is a key, where is the value?
239 * REDIS_VM_MEMORY, REDIS_VM_SWAPPED, ... */
240 unsigned char vtype
; /* If this object is a key, and value is swapped out,
241 * this is the type of the swapped out object. */
243 /* VM fields, this are only allocated if VM is active, otherwise the
244 * object allocation function will just allocate
245 * sizeof(redisObjct) minus sizeof(redisObjectVM), so using
246 * Redis without VM active will not have any overhead. */
247 struct redisObjectVM vm
;
250 /* Macro used to initalize a Redis object allocated on the stack.
251 * Note that this macro is taken near the structure definition to make sure
252 * we'll update it when the structure is changed, to avoid bugs like
253 * bug #85 introduced exactly in this way. */
254 #define initStaticStringObject(_var,_ptr) do { \
256 _var.type = REDIS_STRING; \
257 _var.encoding = REDIS_ENCODING_RAW; \
259 if (server.vm_enabled) _var.storage = REDIS_VM_MEMORY; \
262 typedef struct redisDb
{
263 dict
*dict
; /* The keyspace for this DB */
264 dict
*expires
; /* Timeout of keys with a timeout set */
265 dict
*blockingkeys
; /* Keys with clients waiting for data (BLPOP) */
269 /* Client MULTI/EXEC state */
270 typedef struct multiCmd
{
273 struct redisCommand
*cmd
;
276 typedef struct multiState
{
277 multiCmd
*commands
; /* Array of MULTI commands */
278 int count
; /* Total number of MULTI commands */
281 /* With multiplexing we need to take per-clinet state.
282 * Clients are taken in a liked list. */
283 typedef struct redisClient
{
288 robj
**argv
, **mbargv
;
290 int bulklen
; /* bulk read len. -1 if not in bulk read mode */
291 int multibulk
; /* multi bulk command format active */
294 time_t lastinteraction
; /* time of the last interaction, used for timeout */
295 int flags
; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
297 int slaveseldb
; /* slave selected db, if this client is a slave */
298 int authenticated
; /* when requirepass is non-NULL */
299 int replstate
; /* replication state if this is a slave */
300 int repldbfd
; /* replication DB file descriptor */
301 long repldboff
; /* replication DB file offset */
302 off_t repldbsize
; /* replication DB file size */
303 multiState mstate
; /* MULTI/EXEC state */
304 robj
**blockingkeys
; /* The key we waiting to terminate a blocking
305 * operation such as BLPOP. Otherwise NULL. */
306 int blockingkeysnum
; /* Number of blocking keys */
307 time_t blockingto
; /* Blocking operation timeout. If UNIX current time
308 * is >= blockingto then the operation timed out. */
309 list
*io_keys
; /* Keys this client is waiting to be loaded from the
310 * swap file in order to continue. */
318 /* Global server state structure */
323 dict
*sharingpool
; /* Poll used for object sharing */
324 unsigned int sharingpoolsize
;
325 long long dirty
; /* changes to DB from the last save */
327 list
*slaves
, *monitors
;
328 char neterr
[ANET_ERR_LEN
];
330 int cronloops
; /* number of times the cron function run */
331 list
*objfreelist
; /* A list of freed objects to avoid malloc() */
332 time_t lastsave
; /* Unix time of last save succeeede */
333 size_t usedmemory
; /* Used memory in megabytes */
334 /* Fields used only for stats */
335 time_t stat_starttime
; /* server start time */
336 long long stat_numcommands
; /* number of processed commands */
337 long long stat_numconnections
; /* number of connections received */
350 pid_t bgsavechildpid
;
351 pid_t bgrewritechildpid
;
352 sds bgrewritebuf
; /* buffer taken by parent during oppend only rewrite */
353 struct saveparam
*saveparams
;
358 char *appendfilename
;
362 /* Replication related */
367 redisClient
*master
; /* client that is master for this slave */
369 unsigned int maxclients
;
370 unsigned long long maxmemory
;
371 unsigned int blockedclients
;
372 /* Sort parameters - qsort_r() is only available under BSD so we
373 * have to take this state global, in order to pass it to sortCompare() */
377 /* Virtual memory configuration */
381 unsigned long long vm_max_memory
;
382 /* Virtual memory state */
385 off_t vm_next_page
; /* Next probably empty page */
386 off_t vm_near_pages
; /* Number of pages allocated sequentially */
387 unsigned char *vm_bitmap
; /* Bitmap of free/used pages */
388 time_t unixtime
; /* Unix time sampled every second. */
389 /* Virtual memory I/O threads stuff */
390 /* An I/O thread process an element taken from the io_jobs queue and
391 * put the result of the operation in the io_done list. While the
392 * job is being processed, it's put on io_processing queue. */
393 list
*io_newjobs
; /* List of VM I/O jobs yet to be processed */
394 list
*io_processing
; /* List of VM I/O jobs being processed */
395 list
*io_processed
; /* List of VM I/O jobs already processed */
396 list
*io_clients
; /* All the clients waiting for SWAP I/O operations */
397 pthread_mutex_t io_mutex
; /* lock to access io_jobs/io_done/io_thread_job */
398 int io_active_threads
; /* Number of running I/O threads */
399 int vm_max_threads
; /* Max number of I/O threads running at the same time */
400 /* Our main thread is blocked on the event loop, locking for sockets ready
401 * to be read or written, so when a threaded I/O operation is ready to be
402 * processed by the main thread, the I/O thread will use a unix pipe to
403 * awake the main thread. The followings are the two pipe FDs. */
404 int io_ready_pipe_read
;
405 int io_ready_pipe_write
;
406 /* Virtual memory stats */
407 unsigned long long vm_stats_used_pages
;
408 unsigned long long vm_stats_swapped_objects
;
409 unsigned long long vm_stats_swapouts
;
410 unsigned long long vm_stats_swapins
;
413 typedef void redisCommandProc(redisClient
*c
);
414 struct redisCommand
{
416 redisCommandProc
*proc
;
421 struct redisFunctionSym
{
423 unsigned long pointer
;
426 typedef struct _redisSortObject
{
434 typedef struct _redisSortOperation
{
437 } redisSortOperation
;
439 /* ZSETs use a specialized version of Skiplists */
441 typedef struct zskiplistNode
{
442 struct zskiplistNode
**forward
;
443 struct zskiplistNode
*backward
;
448 typedef struct zskiplist
{
449 struct zskiplistNode
*header
, *tail
;
450 unsigned long length
;
454 typedef struct zset
{
459 /* Our shared "common" objects */
461 struct sharedObjectsStruct
{
462 robj
*crlf
, *ok
, *err
, *emptybulk
, *czero
, *cone
, *pong
, *space
,
463 *colon
, *nullbulk
, *nullmultibulk
, *queued
,
464 *emptymultibulk
, *wrongtypeerr
, *nokeyerr
, *syntaxerr
, *sameobjecterr
,
465 *outofrangeerr
, *plus
,
466 *select0
, *select1
, *select2
, *select3
, *select4
,
467 *select5
, *select6
, *select7
, *select8
, *select9
;
470 /* Global vars that are actally used as constants. The following double
471 * values are used for double on-disk serialization, and are initialized
472 * at runtime to avoid strange compiler optimizations. */
474 static double R_Zero
, R_PosInf
, R_NegInf
, R_Nan
;
476 /* VM threaded I/O request message */
477 #define REDIS_IOJOB_LOAD 0
478 #define REDIS_IOJOB_SWAP 1
479 typedef struct iojon
{
480 int type
; /* Request type, REDIS_IOJOB_* */
481 int dbid
; /* Redis database ID */
482 robj
*key
; /* This I/O request is about swapping this key */
483 robj
*val
; /* the value to swap for REDIS_IOREQ_SWAP, otherwise this
484 * field is populated by the I/O thread for REDIS_IOREQ_LOAD. */
485 off_t page
; /* Swap page where to read/write the object */
486 int canceled
; /* True if this command was canceled by blocking side of VM */
487 pthread_t thread
; /* ID of the thread processing this entry */
490 /*================================ Prototypes =============================== */
492 static void freeStringObject(robj
*o
);
493 static void freeListObject(robj
*o
);
494 static void freeSetObject(robj
*o
);
495 static void decrRefCount(void *o
);
496 static robj
*createObject(int type
, void *ptr
);
497 static void freeClient(redisClient
*c
);
498 static int rdbLoad(char *filename
);
499 static void addReply(redisClient
*c
, robj
*obj
);
500 static void addReplySds(redisClient
*c
, sds s
);
501 static void incrRefCount(robj
*o
);
502 static int rdbSaveBackground(char *filename
);
503 static robj
*createStringObject(char *ptr
, size_t len
);
504 static robj
*dupStringObject(robj
*o
);
505 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
);
506 static void feedAppendOnlyFile(struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
);
507 static int syncWithMaster(void);
508 static robj
*tryObjectSharing(robj
*o
);
509 static int tryObjectEncoding(robj
*o
);
510 static robj
*getDecodedObject(robj
*o
);
511 static int removeExpire(redisDb
*db
, robj
*key
);
512 static int expireIfNeeded(redisDb
*db
, robj
*key
);
513 static int deleteIfVolatile(redisDb
*db
, robj
*key
);
514 static int deleteIfSwapped(redisDb
*db
, robj
*key
);
515 static int deleteKey(redisDb
*db
, robj
*key
);
516 static time_t getExpire(redisDb
*db
, robj
*key
);
517 static int setExpire(redisDb
*db
, robj
*key
, time_t when
);
518 static void updateSlavesWaitingBgsave(int bgsaveerr
);
519 static void freeMemoryIfNeeded(void);
520 static int processCommand(redisClient
*c
);
521 static void setupSigSegvAction(void);
522 static void rdbRemoveTempFile(pid_t childpid
);
523 static void aofRemoveTempFile(pid_t childpid
);
524 static size_t stringObjectLen(robj
*o
);
525 static void processInputBuffer(redisClient
*c
);
526 static zskiplist
*zslCreate(void);
527 static void zslFree(zskiplist
*zsl
);
528 static void zslInsert(zskiplist
*zsl
, double score
, robj
*obj
);
529 static void sendReplyToClientWritev(aeEventLoop
*el
, int fd
, void *privdata
, int mask
);
530 static void initClientMultiState(redisClient
*c
);
531 static void freeClientMultiState(redisClient
*c
);
532 static void queueMultiCommand(redisClient
*c
, struct redisCommand
*cmd
);
533 static void unblockClient(redisClient
*c
);
534 static int handleClientsWaitingListPush(redisClient
*c
, robj
*key
, robj
*ele
);
535 static void vmInit(void);
536 static void vmMarkPagesFree(off_t page
, off_t count
);
537 static robj
*vmLoadObject(robj
*key
);
538 static robj
*vmPreviewObject(robj
*key
);
539 static int vmSwapOneObject(void);
540 static int vmCanSwapOut(void);
541 static void freeOneObjectFromFreelist(void);
542 static void acceptHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
);
543 static void vmThreadedIOCompletedJob(aeEventLoop
*el
, int fd
, void *privdata
, int mask
);
544 static void vmCancelThreadedIOJob(robj
*o
);
546 static void authCommand(redisClient
*c
);
547 static void pingCommand(redisClient
*c
);
548 static void echoCommand(redisClient
*c
);
549 static void setCommand(redisClient
*c
);
550 static void setnxCommand(redisClient
*c
);
551 static void getCommand(redisClient
*c
);
552 static void delCommand(redisClient
*c
);
553 static void existsCommand(redisClient
*c
);
554 static void incrCommand(redisClient
*c
);
555 static void decrCommand(redisClient
*c
);
556 static void incrbyCommand(redisClient
*c
);
557 static void decrbyCommand(redisClient
*c
);
558 static void selectCommand(redisClient
*c
);
559 static void randomkeyCommand(redisClient
*c
);
560 static void keysCommand(redisClient
*c
);
561 static void dbsizeCommand(redisClient
*c
);
562 static void lastsaveCommand(redisClient
*c
);
563 static void saveCommand(redisClient
*c
);
564 static void bgsaveCommand(redisClient
*c
);
565 static void bgrewriteaofCommand(redisClient
*c
);
566 static void shutdownCommand(redisClient
*c
);
567 static void moveCommand(redisClient
*c
);
568 static void renameCommand(redisClient
*c
);
569 static void renamenxCommand(redisClient
*c
);
570 static void lpushCommand(redisClient
*c
);
571 static void rpushCommand(redisClient
*c
);
572 static void lpopCommand(redisClient
*c
);
573 static void rpopCommand(redisClient
*c
);
574 static void llenCommand(redisClient
*c
);
575 static void lindexCommand(redisClient
*c
);
576 static void lrangeCommand(redisClient
*c
);
577 static void ltrimCommand(redisClient
*c
);
578 static void typeCommand(redisClient
*c
);
579 static void lsetCommand(redisClient
*c
);
580 static void saddCommand(redisClient
*c
);
581 static void sremCommand(redisClient
*c
);
582 static void smoveCommand(redisClient
*c
);
583 static void sismemberCommand(redisClient
*c
);
584 static void scardCommand(redisClient
*c
);
585 static void spopCommand(redisClient
*c
);
586 static void srandmemberCommand(redisClient
*c
);
587 static void sinterCommand(redisClient
*c
);
588 static void sinterstoreCommand(redisClient
*c
);
589 static void sunionCommand(redisClient
*c
);
590 static void sunionstoreCommand(redisClient
*c
);
591 static void sdiffCommand(redisClient
*c
);
592 static void sdiffstoreCommand(redisClient
*c
);
593 static void syncCommand(redisClient
*c
);
594 static void flushdbCommand(redisClient
*c
);
595 static void flushallCommand(redisClient
*c
);
596 static void sortCommand(redisClient
*c
);
597 static void lremCommand(redisClient
*c
);
598 static void rpoplpushcommand(redisClient
*c
);
599 static void infoCommand(redisClient
*c
);
600 static void mgetCommand(redisClient
*c
);
601 static void monitorCommand(redisClient
*c
);
602 static void expireCommand(redisClient
*c
);
603 static void expireatCommand(redisClient
*c
);
604 static void getsetCommand(redisClient
*c
);
605 static void ttlCommand(redisClient
*c
);
606 static void slaveofCommand(redisClient
*c
);
607 static void debugCommand(redisClient
*c
);
608 static void msetCommand(redisClient
*c
);
609 static void msetnxCommand(redisClient
*c
);
610 static void zaddCommand(redisClient
*c
);
611 static void zincrbyCommand(redisClient
*c
);
612 static void zrangeCommand(redisClient
*c
);
613 static void zrangebyscoreCommand(redisClient
*c
);
614 static void zrevrangeCommand(redisClient
*c
);
615 static void zcardCommand(redisClient
*c
);
616 static void zremCommand(redisClient
*c
);
617 static void zscoreCommand(redisClient
*c
);
618 static void zremrangebyscoreCommand(redisClient
*c
);
619 static void multiCommand(redisClient
*c
);
620 static void execCommand(redisClient
*c
);
621 static void blpopCommand(redisClient
*c
);
622 static void brpopCommand(redisClient
*c
);
624 /*================================= Globals ================================= */
627 static struct redisServer server
; /* server global state */
628 static struct redisCommand cmdTable
[] = {
629 {"get",getCommand
,2,REDIS_CMD_INLINE
},
630 {"set",setCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
631 {"setnx",setnxCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
632 {"del",delCommand
,-2,REDIS_CMD_INLINE
},
633 {"exists",existsCommand
,2,REDIS_CMD_INLINE
},
634 {"incr",incrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
635 {"decr",decrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
636 {"mget",mgetCommand
,-2,REDIS_CMD_INLINE
},
637 {"rpush",rpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
638 {"lpush",lpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
639 {"rpop",rpopCommand
,2,REDIS_CMD_INLINE
},
640 {"lpop",lpopCommand
,2,REDIS_CMD_INLINE
},
641 {"brpop",brpopCommand
,-3,REDIS_CMD_INLINE
},
642 {"blpop",blpopCommand
,-3,REDIS_CMD_INLINE
},
643 {"llen",llenCommand
,2,REDIS_CMD_INLINE
},
644 {"lindex",lindexCommand
,3,REDIS_CMD_INLINE
},
645 {"lset",lsetCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
646 {"lrange",lrangeCommand
,4,REDIS_CMD_INLINE
},
647 {"ltrim",ltrimCommand
,4,REDIS_CMD_INLINE
},
648 {"lrem",lremCommand
,4,REDIS_CMD_BULK
},
649 {"rpoplpush",rpoplpushcommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
650 {"sadd",saddCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
651 {"srem",sremCommand
,3,REDIS_CMD_BULK
},
652 {"smove",smoveCommand
,4,REDIS_CMD_BULK
},
653 {"sismember",sismemberCommand
,3,REDIS_CMD_BULK
},
654 {"scard",scardCommand
,2,REDIS_CMD_INLINE
},
655 {"spop",spopCommand
,2,REDIS_CMD_INLINE
},
656 {"srandmember",srandmemberCommand
,2,REDIS_CMD_INLINE
},
657 {"sinter",sinterCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
658 {"sinterstore",sinterstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
659 {"sunion",sunionCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
660 {"sunionstore",sunionstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
661 {"sdiff",sdiffCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
662 {"sdiffstore",sdiffstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
663 {"smembers",sinterCommand
,2,REDIS_CMD_INLINE
},
664 {"zadd",zaddCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
665 {"zincrby",zincrbyCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
666 {"zrem",zremCommand
,3,REDIS_CMD_BULK
},
667 {"zremrangebyscore",zremrangebyscoreCommand
,4,REDIS_CMD_INLINE
},
668 {"zrange",zrangeCommand
,-4,REDIS_CMD_INLINE
},
669 {"zrangebyscore",zrangebyscoreCommand
,-4,REDIS_CMD_INLINE
},
670 {"zrevrange",zrevrangeCommand
,-4,REDIS_CMD_INLINE
},
671 {"zcard",zcardCommand
,2,REDIS_CMD_INLINE
},
672 {"zscore",zscoreCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
673 {"incrby",incrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
674 {"decrby",decrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
675 {"getset",getsetCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
676 {"mset",msetCommand
,-3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
677 {"msetnx",msetnxCommand
,-3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
678 {"randomkey",randomkeyCommand
,1,REDIS_CMD_INLINE
},
679 {"select",selectCommand
,2,REDIS_CMD_INLINE
},
680 {"move",moveCommand
,3,REDIS_CMD_INLINE
},
681 {"rename",renameCommand
,3,REDIS_CMD_INLINE
},
682 {"renamenx",renamenxCommand
,3,REDIS_CMD_INLINE
},
683 {"expire",expireCommand
,3,REDIS_CMD_INLINE
},
684 {"expireat",expireatCommand
,3,REDIS_CMD_INLINE
},
685 {"keys",keysCommand
,2,REDIS_CMD_INLINE
},
686 {"dbsize",dbsizeCommand
,1,REDIS_CMD_INLINE
},
687 {"auth",authCommand
,2,REDIS_CMD_INLINE
},
688 {"ping",pingCommand
,1,REDIS_CMD_INLINE
},
689 {"echo",echoCommand
,2,REDIS_CMD_BULK
},
690 {"save",saveCommand
,1,REDIS_CMD_INLINE
},
691 {"bgsave",bgsaveCommand
,1,REDIS_CMD_INLINE
},
692 {"bgrewriteaof",bgrewriteaofCommand
,1,REDIS_CMD_INLINE
},
693 {"shutdown",shutdownCommand
,1,REDIS_CMD_INLINE
},
694 {"lastsave",lastsaveCommand
,1,REDIS_CMD_INLINE
},
695 {"type",typeCommand
,2,REDIS_CMD_INLINE
},
696 {"multi",multiCommand
,1,REDIS_CMD_INLINE
},
697 {"exec",execCommand
,1,REDIS_CMD_INLINE
},
698 {"sync",syncCommand
,1,REDIS_CMD_INLINE
},
699 {"flushdb",flushdbCommand
,1,REDIS_CMD_INLINE
},
700 {"flushall",flushallCommand
,1,REDIS_CMD_INLINE
},
701 {"sort",sortCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
702 {"info",infoCommand
,1,REDIS_CMD_INLINE
},
703 {"monitor",monitorCommand
,1,REDIS_CMD_INLINE
},
704 {"ttl",ttlCommand
,2,REDIS_CMD_INLINE
},
705 {"slaveof",slaveofCommand
,3,REDIS_CMD_INLINE
},
706 {"debug",debugCommand
,-2,REDIS_CMD_INLINE
},
710 /*============================ Utility functions ============================ */
712 /* Glob-style pattern matching. */
713 int stringmatchlen(const char *pattern
, int patternLen
,
714 const char *string
, int stringLen
, int nocase
)
719 while (pattern
[1] == '*') {
724 return 1; /* match */
726 if (stringmatchlen(pattern
+1, patternLen
-1,
727 string
, stringLen
, nocase
))
728 return 1; /* match */
732 return 0; /* no match */
736 return 0; /* no match */
746 not = pattern
[0] == '^';
753 if (pattern
[0] == '\\') {
756 if (pattern
[0] == string
[0])
758 } else if (pattern
[0] == ']') {
760 } else if (patternLen
== 0) {
764 } else if (pattern
[1] == '-' && patternLen
>= 3) {
765 int start
= pattern
[0];
766 int end
= pattern
[2];
774 start
= tolower(start
);
780 if (c
>= start
&& c
<= end
)
784 if (pattern
[0] == string
[0])
787 if (tolower((int)pattern
[0]) == tolower((int)string
[0]))
797 return 0; /* no match */
803 if (patternLen
>= 2) {
810 if (pattern
[0] != string
[0])
811 return 0; /* no match */
813 if (tolower((int)pattern
[0]) != tolower((int)string
[0]))
814 return 0; /* no match */
822 if (stringLen
== 0) {
823 while(*pattern
== '*') {
830 if (patternLen
== 0 && stringLen
== 0)
835 static void redisLog(int level
, const char *fmt
, ...) {
839 fp
= (server
.logfile
== NULL
) ? stdout
: fopen(server
.logfile
,"a");
843 if (level
>= server
.verbosity
) {
849 strftime(buf
,64,"%d %b %H:%M:%S",localtime(&now
));
850 fprintf(fp
,"%s %c ",buf
,c
[level
]);
851 vfprintf(fp
, fmt
, ap
);
857 if (server
.logfile
) fclose(fp
);
860 /*====================== Hash table type implementation ==================== */
862 /* This is an hash table type that uses the SDS dynamic strings libary as
863 * keys and radis objects as values (objects can hold SDS strings,
866 static void dictVanillaFree(void *privdata
, void *val
)
868 DICT_NOTUSED(privdata
);
872 static void dictListDestructor(void *privdata
, void *val
)
874 DICT_NOTUSED(privdata
);
875 listRelease((list
*)val
);
878 static int sdsDictKeyCompare(void *privdata
, const void *key1
,
882 DICT_NOTUSED(privdata
);
884 l1
= sdslen((sds
)key1
);
885 l2
= sdslen((sds
)key2
);
886 if (l1
!= l2
) return 0;
887 return memcmp(key1
, key2
, l1
) == 0;
890 static void dictRedisObjectDestructor(void *privdata
, void *val
)
892 DICT_NOTUSED(privdata
);
894 if (val
== NULL
) return; /* Values of swapped out keys as set to NULL */
898 static int dictObjKeyCompare(void *privdata
, const void *key1
,
901 const robj
*o1
= key1
, *o2
= key2
;
902 return sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
905 static unsigned int dictObjHash(const void *key
) {
907 return dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
910 static int dictEncObjKeyCompare(void *privdata
, const void *key1
,
913 robj
*o1
= (robj
*) key1
, *o2
= (robj
*) key2
;
916 o1
= getDecodedObject(o1
);
917 o2
= getDecodedObject(o2
);
918 cmp
= sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
924 static unsigned int dictEncObjHash(const void *key
) {
925 robj
*o
= (robj
*) key
;
927 o
= getDecodedObject(o
);
928 unsigned int hash
= dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
933 static dictType setDictType
= {
934 dictEncObjHash
, /* hash function */
937 dictEncObjKeyCompare
, /* key compare */
938 dictRedisObjectDestructor
, /* key destructor */
939 NULL
/* val destructor */
942 static dictType zsetDictType
= {
943 dictEncObjHash
, /* hash function */
946 dictEncObjKeyCompare
, /* key compare */
947 dictRedisObjectDestructor
, /* key destructor */
948 dictVanillaFree
/* val destructor of malloc(sizeof(double)) */
951 static dictType hashDictType
= {
952 dictObjHash
, /* hash function */
955 dictObjKeyCompare
, /* key compare */
956 dictRedisObjectDestructor
, /* key destructor */
957 dictRedisObjectDestructor
/* val destructor */
960 /* Keylist hash table type has unencoded redis objects as keys and
961 * lists as values. It's used for blocking operations (BLPOP) */
962 static dictType keylistDictType
= {
963 dictObjHash
, /* hash function */
966 dictObjKeyCompare
, /* key compare */
967 dictRedisObjectDestructor
, /* key destructor */
968 dictListDestructor
/* val destructor */
971 /* ========================= Random utility functions ======================= */
973 /* Redis generally does not try to recover from out of memory conditions
974 * when allocating objects or strings, it is not clear if it will be possible
975 * to report this condition to the client since the networking layer itself
976 * is based on heap allocation for send buffers, so we simply abort.
977 * At least the code will be simpler to read... */
978 static void oom(const char *msg
) {
979 redisLog(REDIS_WARNING
, "%s: Out of memory\n",msg
);
984 /* ====================== Redis server networking stuff ===================== */
985 static void closeTimedoutClients(void) {
988 time_t now
= time(NULL
);
990 listRewind(server
.clients
);
991 while ((ln
= listYield(server
.clients
)) != NULL
) {
992 c
= listNodeValue(ln
);
993 if (server
.maxidletime
&&
994 !(c
->flags
& REDIS_SLAVE
) && /* no timeout for slaves */
995 !(c
->flags
& REDIS_MASTER
) && /* no timeout for masters */
996 (now
- c
->lastinteraction
> server
.maxidletime
))
998 redisLog(REDIS_VERBOSE
,"Closing idle client");
1000 } else if (c
->flags
& REDIS_BLOCKED
) {
1001 if (c
->blockingto
!= 0 && c
->blockingto
< now
) {
1002 addReply(c
,shared
.nullmultibulk
);
1009 static int htNeedsResize(dict
*dict
) {
1010 long long size
, used
;
1012 size
= dictSlots(dict
);
1013 used
= dictSize(dict
);
1014 return (size
&& used
&& size
> DICT_HT_INITIAL_SIZE
&&
1015 (used
*100/size
< REDIS_HT_MINFILL
));
1018 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
1019 * we resize the hash table to save memory */
1020 static void tryResizeHashTables(void) {
1023 for (j
= 0; j
< server
.dbnum
; j
++) {
1024 if (htNeedsResize(server
.db
[j
].dict
)) {
1025 redisLog(REDIS_VERBOSE
,"The hash table %d is too sparse, resize it...",j
);
1026 dictResize(server
.db
[j
].dict
);
1027 redisLog(REDIS_VERBOSE
,"Hash table %d resized.",j
);
1029 if (htNeedsResize(server
.db
[j
].expires
))
1030 dictResize(server
.db
[j
].expires
);
1034 /* A background saving child (BGSAVE) terminated its work. Handle this. */
1035 void backgroundSaveDoneHandler(int statloc
) {
1036 int exitcode
= WEXITSTATUS(statloc
);
1037 int bysignal
= WIFSIGNALED(statloc
);
1039 if (!bysignal
&& exitcode
== 0) {
1040 redisLog(REDIS_NOTICE
,
1041 "Background saving terminated with success");
1043 server
.lastsave
= time(NULL
);
1044 } else if (!bysignal
&& exitcode
!= 0) {
1045 redisLog(REDIS_WARNING
, "Background saving error");
1047 redisLog(REDIS_WARNING
,
1048 "Background saving terminated by signal");
1049 rdbRemoveTempFile(server
.bgsavechildpid
);
1051 server
.bgsavechildpid
= -1;
1052 /* Possibly there are slaves waiting for a BGSAVE in order to be served
1053 * (the first stage of SYNC is a bulk transfer of dump.rdb) */
1054 updateSlavesWaitingBgsave(exitcode
== 0 ? REDIS_OK
: REDIS_ERR
);
1057 /* A background append only file rewriting (BGREWRITEAOF) terminated its work.
1059 void backgroundRewriteDoneHandler(int statloc
) {
1060 int exitcode
= WEXITSTATUS(statloc
);
1061 int bysignal
= WIFSIGNALED(statloc
);
1063 if (!bysignal
&& exitcode
== 0) {
1067 redisLog(REDIS_NOTICE
,
1068 "Background append only file rewriting terminated with success");
1069 /* Now it's time to flush the differences accumulated by the parent */
1070 snprintf(tmpfile
,256,"temp-rewriteaof-bg-%d.aof", (int) server
.bgrewritechildpid
);
1071 fd
= open(tmpfile
,O_WRONLY
|O_APPEND
);
1073 redisLog(REDIS_WARNING
, "Not able to open the temp append only file produced by the child: %s", strerror(errno
));
1076 /* Flush our data... */
1077 if (write(fd
,server
.bgrewritebuf
,sdslen(server
.bgrewritebuf
)) !=
1078 (signed) sdslen(server
.bgrewritebuf
)) {
1079 redisLog(REDIS_WARNING
, "Error or short write trying to flush the parent diff of the append log file in the child temp file: %s", strerror(errno
));
1083 redisLog(REDIS_NOTICE
,"Parent diff flushed into the new append log file with success (%lu bytes)",sdslen(server
.bgrewritebuf
));
1084 /* Now our work is to rename the temp file into the stable file. And
1085 * switch the file descriptor used by the server for append only. */
1086 if (rename(tmpfile
,server
.appendfilename
) == -1) {
1087 redisLog(REDIS_WARNING
,"Can't rename the temp append only file into the stable one: %s", strerror(errno
));
1091 /* Mission completed... almost */
1092 redisLog(REDIS_NOTICE
,"Append only file successfully rewritten.");
1093 if (server
.appendfd
!= -1) {
1094 /* If append only is actually enabled... */
1095 close(server
.appendfd
);
1096 server
.appendfd
= fd
;
1098 server
.appendseldb
= -1; /* Make sure it will issue SELECT */
1099 redisLog(REDIS_NOTICE
,"The new append only file was selected for future appends.");
1101 /* If append only is disabled we just generate a dump in this
1102 * format. Why not? */
1105 } else if (!bysignal
&& exitcode
!= 0) {
1106 redisLog(REDIS_WARNING
, "Background append only file rewriting error");
1108 redisLog(REDIS_WARNING
,
1109 "Background append only file rewriting terminated by signal");
1112 sdsfree(server
.bgrewritebuf
);
1113 server
.bgrewritebuf
= sdsempty();
1114 aofRemoveTempFile(server
.bgrewritechildpid
);
1115 server
.bgrewritechildpid
= -1;
1118 static int serverCron(struct aeEventLoop
*eventLoop
, long long id
, void *clientData
) {
1119 int j
, loops
= server
.cronloops
++;
1120 REDIS_NOTUSED(eventLoop
);
1122 REDIS_NOTUSED(clientData
);
1124 /* We take a cached value of the unix time in the global state because
1125 * with virtual memory and aging there is to store the current time
1126 * in objects at every object access, and accuracy is not needed.
1127 * To access a global var is faster than calling time(NULL) */
1128 server
.unixtime
= time(NULL
);
1130 /* Update the global state with the amount of used memory */
1131 server
.usedmemory
= zmalloc_used_memory();
1133 /* Show some info about non-empty databases */
1134 for (j
= 0; j
< server
.dbnum
; j
++) {
1135 long long size
, used
, vkeys
;
1137 size
= dictSlots(server
.db
[j
].dict
);
1138 used
= dictSize(server
.db
[j
].dict
);
1139 vkeys
= dictSize(server
.db
[j
].expires
);
1140 if (!(loops
% 5) && (used
|| vkeys
)) {
1141 redisLog(REDIS_VERBOSE
,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j
,used
,vkeys
,size
);
1142 /* dictPrintStats(server.dict); */
1146 /* We don't want to resize the hash tables while a bacground saving
1147 * is in progress: the saving child is created using fork() that is
1148 * implemented with a copy-on-write semantic in most modern systems, so
1149 * if we resize the HT while there is the saving child at work actually
1150 * a lot of memory movements in the parent will cause a lot of pages
1152 if (server
.bgsavechildpid
== -1) tryResizeHashTables();
1154 /* Show information about connected clients */
1156 redisLog(REDIS_VERBOSE
,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
1157 listLength(server
.clients
)-listLength(server
.slaves
),
1158 listLength(server
.slaves
),
1160 dictSize(server
.sharingpool
));
1163 /* Close connections of timedout clients */
1164 if ((server
.maxidletime
&& !(loops
% 10)) || server
.blockedclients
)
1165 closeTimedoutClients();
1167 /* Check if a background saving or AOF rewrite in progress terminated */
1168 if (server
.bgsavechildpid
!= -1 || server
.bgrewritechildpid
!= -1) {
1172 if ((pid
= wait3(&statloc
,WNOHANG
,NULL
)) != 0) {
1173 if (pid
== server
.bgsavechildpid
) {
1174 backgroundSaveDoneHandler(statloc
);
1176 backgroundRewriteDoneHandler(statloc
);
1180 /* If there is not a background saving in progress check if
1181 * we have to save now */
1182 time_t now
= time(NULL
);
1183 for (j
= 0; j
< server
.saveparamslen
; j
++) {
1184 struct saveparam
*sp
= server
.saveparams
+j
;
1186 if (server
.dirty
>= sp
->changes
&&
1187 now
-server
.lastsave
> sp
->seconds
) {
1188 redisLog(REDIS_NOTICE
,"%d changes in %d seconds. Saving...",
1189 sp
->changes
, sp
->seconds
);
1190 rdbSaveBackground(server
.dbfilename
);
1196 /* Try to expire a few timed out keys. The algorithm used is adaptive and
1197 * will use few CPU cycles if there are few expiring keys, otherwise
1198 * it will get more aggressive to avoid that too much memory is used by
1199 * keys that can be removed from the keyspace. */
1200 for (j
= 0; j
< server
.dbnum
; j
++) {
1202 redisDb
*db
= server
.db
+j
;
1204 /* Continue to expire if at the end of the cycle more than 25%
1205 * of the keys were expired. */
1207 long num
= dictSize(db
->expires
);
1208 time_t now
= time(NULL
);
1211 if (num
> REDIS_EXPIRELOOKUPS_PER_CRON
)
1212 num
= REDIS_EXPIRELOOKUPS_PER_CRON
;
1217 if ((de
= dictGetRandomKey(db
->expires
)) == NULL
) break;
1218 t
= (time_t) dictGetEntryVal(de
);
1220 deleteKey(db
,dictGetEntryKey(de
));
1224 } while (expired
> REDIS_EXPIRELOOKUPS_PER_CRON
/4);
1227 /* Swap a few keys on disk if we are over the memory limit and VM
1228 * is enbled. Try to free objects from the free list first. */
1229 if (vmCanSwapOut()) {
1230 while (server
.vm_enabled
&& zmalloc_used_memory() >
1231 server
.vm_max_memory
)
1233 if (listLength(server
.objfreelist
)) {
1234 freeOneObjectFromFreelist();
1235 } else if (vmSwapOneObject() == REDIS_ERR
) {
1236 if ((loops
% 30) == 0 && zmalloc_used_memory() >
1237 (server
.vm_max_memory
+server
.vm_max_memory
/10)) {
1238 redisLog(REDIS_WARNING
,"WARNING: vm-max-memory limit exceeded by more than 10%% but unable to swap more objects out!");
1245 /* Check if we should connect to a MASTER */
1246 if (server
.replstate
== REDIS_REPL_CONNECT
) {
1247 redisLog(REDIS_NOTICE
,"Connecting to MASTER...");
1248 if (syncWithMaster() == REDIS_OK
) {
1249 redisLog(REDIS_NOTICE
,"MASTER <-> SLAVE sync succeeded");
1255 static void createSharedObjects(void) {
1256 shared
.crlf
= createObject(REDIS_STRING
,sdsnew("\r\n"));
1257 shared
.ok
= createObject(REDIS_STRING
,sdsnew("+OK\r\n"));
1258 shared
.err
= createObject(REDIS_STRING
,sdsnew("-ERR\r\n"));
1259 shared
.emptybulk
= createObject(REDIS_STRING
,sdsnew("$0\r\n\r\n"));
1260 shared
.czero
= createObject(REDIS_STRING
,sdsnew(":0\r\n"));
1261 shared
.cone
= createObject(REDIS_STRING
,sdsnew(":1\r\n"));
1262 shared
.nullbulk
= createObject(REDIS_STRING
,sdsnew("$-1\r\n"));
1263 shared
.nullmultibulk
= createObject(REDIS_STRING
,sdsnew("*-1\r\n"));
1264 shared
.emptymultibulk
= createObject(REDIS_STRING
,sdsnew("*0\r\n"));
1265 shared
.pong
= createObject(REDIS_STRING
,sdsnew("+PONG\r\n"));
1266 shared
.queued
= createObject(REDIS_STRING
,sdsnew("+QUEUED\r\n"));
1267 shared
.wrongtypeerr
= createObject(REDIS_STRING
,sdsnew(
1268 "-ERR Operation against a key holding the wrong kind of value\r\n"));
1269 shared
.nokeyerr
= createObject(REDIS_STRING
,sdsnew(
1270 "-ERR no such key\r\n"));
1271 shared
.syntaxerr
= createObject(REDIS_STRING
,sdsnew(
1272 "-ERR syntax error\r\n"));
1273 shared
.sameobjecterr
= createObject(REDIS_STRING
,sdsnew(
1274 "-ERR source and destination objects are the same\r\n"));
1275 shared
.outofrangeerr
= createObject(REDIS_STRING
,sdsnew(
1276 "-ERR index out of range\r\n"));
1277 shared
.space
= createObject(REDIS_STRING
,sdsnew(" "));
1278 shared
.colon
= createObject(REDIS_STRING
,sdsnew(":"));
1279 shared
.plus
= createObject(REDIS_STRING
,sdsnew("+"));
1280 shared
.select0
= createStringObject("select 0\r\n",10);
1281 shared
.select1
= createStringObject("select 1\r\n",10);
1282 shared
.select2
= createStringObject("select 2\r\n",10);
1283 shared
.select3
= createStringObject("select 3\r\n",10);
1284 shared
.select4
= createStringObject("select 4\r\n",10);
1285 shared
.select5
= createStringObject("select 5\r\n",10);
1286 shared
.select6
= createStringObject("select 6\r\n",10);
1287 shared
.select7
= createStringObject("select 7\r\n",10);
1288 shared
.select8
= createStringObject("select 8\r\n",10);
1289 shared
.select9
= createStringObject("select 9\r\n",10);
1292 static void appendServerSaveParams(time_t seconds
, int changes
) {
1293 server
.saveparams
= zrealloc(server
.saveparams
,sizeof(struct saveparam
)*(server
.saveparamslen
+1));
1294 server
.saveparams
[server
.saveparamslen
].seconds
= seconds
;
1295 server
.saveparams
[server
.saveparamslen
].changes
= changes
;
1296 server
.saveparamslen
++;
1299 static void resetServerSaveParams() {
1300 zfree(server
.saveparams
);
1301 server
.saveparams
= NULL
;
1302 server
.saveparamslen
= 0;
1305 static void initServerConfig() {
1306 server
.dbnum
= REDIS_DEFAULT_DBNUM
;
1307 server
.port
= REDIS_SERVERPORT
;
1308 server
.verbosity
= REDIS_VERBOSE
;
1309 server
.maxidletime
= REDIS_MAXIDLETIME
;
1310 server
.saveparams
= NULL
;
1311 server
.logfile
= NULL
; /* NULL = log on standard output */
1312 server
.bindaddr
= NULL
;
1313 server
.glueoutputbuf
= 1;
1314 server
.daemonize
= 0;
1315 server
.appendonly
= 0;
1316 server
.appendfsync
= APPENDFSYNC_ALWAYS
;
1317 server
.lastfsync
= time(NULL
);
1318 server
.appendfd
= -1;
1319 server
.appendseldb
= -1; /* Make sure the first time will not match */
1320 server
.pidfile
= "/var/run/redis.pid";
1321 server
.dbfilename
= "dump.rdb";
1322 server
.appendfilename
= "appendonly.aof";
1323 server
.requirepass
= NULL
;
1324 server
.shareobjects
= 0;
1325 server
.rdbcompression
= 1;
1326 server
.sharingpoolsize
= 1024;
1327 server
.maxclients
= 0;
1328 server
.blockedclients
= 0;
1329 server
.maxmemory
= 0;
1330 server
.vm_enabled
= 0;
1331 server
.vm_page_size
= 256; /* 256 bytes per page */
1332 server
.vm_pages
= 1024*1024*100; /* 104 millions of pages */
1333 server
.vm_max_memory
= 1024LL*1024*1024*1; /* 1 GB of RAM */
1334 server
.vm_max_threads
= 4;
1336 resetServerSaveParams();
1338 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1339 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1340 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1341 /* Replication related */
1343 server
.masterauth
= NULL
;
1344 server
.masterhost
= NULL
;
1345 server
.masterport
= 6379;
1346 server
.master
= NULL
;
1347 server
.replstate
= REDIS_REPL_NONE
;
1349 /* Double constants initialization */
1351 R_PosInf
= 1.0/R_Zero
;
1352 R_NegInf
= -1.0/R_Zero
;
1353 R_Nan
= R_Zero
/R_Zero
;
1356 static void initServer() {
1359 signal(SIGHUP
, SIG_IGN
);
1360 signal(SIGPIPE
, SIG_IGN
);
1361 setupSigSegvAction();
1363 server
.clients
= listCreate();
1364 server
.slaves
= listCreate();
1365 server
.monitors
= listCreate();
1366 server
.objfreelist
= listCreate();
1367 createSharedObjects();
1368 server
.el
= aeCreateEventLoop();
1369 server
.db
= zmalloc(sizeof(redisDb
)*server
.dbnum
);
1370 server
.sharingpool
= dictCreate(&setDictType
,NULL
);
1371 server
.fd
= anetTcpServer(server
.neterr
, server
.port
, server
.bindaddr
);
1372 if (server
.fd
== -1) {
1373 redisLog(REDIS_WARNING
, "Opening TCP port: %s", server
.neterr
);
1376 for (j
= 0; j
< server
.dbnum
; j
++) {
1377 server
.db
[j
].dict
= dictCreate(&hashDictType
,NULL
);
1378 server
.db
[j
].expires
= dictCreate(&setDictType
,NULL
);
1379 server
.db
[j
].blockingkeys
= dictCreate(&keylistDictType
,NULL
);
1380 server
.db
[j
].id
= j
;
1382 server
.cronloops
= 0;
1383 server
.bgsavechildpid
= -1;
1384 server
.bgrewritechildpid
= -1;
1385 server
.bgrewritebuf
= sdsempty();
1386 server
.lastsave
= time(NULL
);
1388 server
.usedmemory
= 0;
1389 server
.stat_numcommands
= 0;
1390 server
.stat_numconnections
= 0;
1391 server
.stat_starttime
= time(NULL
);
1392 server
.unixtime
= time(NULL
);
1393 aeCreateTimeEvent(server
.el
, 1, serverCron
, NULL
, NULL
);
1394 if (aeCreateFileEvent(server
.el
, server
.fd
, AE_READABLE
,
1395 acceptHandler
, NULL
) == AE_ERR
) oom("creating file event");
1396 if (server
.vm_enabled
) {
1397 /* Listen for events in the threaded I/O pipe */
1398 if (aeCreateFileEvent(server
.el
, server
.io_ready_pipe_read
, AE_READABLE
,
1399 vmThreadedIOCompletedJob
, NULL
) == AE_ERR
)
1400 oom("creating file event");
1403 if (server
.appendonly
) {
1404 server
.appendfd
= open(server
.appendfilename
,O_WRONLY
|O_APPEND
|O_CREAT
,0644);
1405 if (server
.appendfd
== -1) {
1406 redisLog(REDIS_WARNING
, "Can't open the append-only file: %s",
1412 if (server
.vm_enabled
) vmInit();
1415 /* Empty the whole database */
1416 static long long emptyDb() {
1418 long long removed
= 0;
1420 for (j
= 0; j
< server
.dbnum
; j
++) {
1421 removed
+= dictSize(server
.db
[j
].dict
);
1422 dictEmpty(server
.db
[j
].dict
);
1423 dictEmpty(server
.db
[j
].expires
);
1428 static int yesnotoi(char *s
) {
1429 if (!strcasecmp(s
,"yes")) return 1;
1430 else if (!strcasecmp(s
,"no")) return 0;
1434 /* I agree, this is a very rudimental way to load a configuration...
1435 will improve later if the config gets more complex */
1436 static void loadServerConfig(char *filename
) {
1438 char buf
[REDIS_CONFIGLINE_MAX
+1], *err
= NULL
;
1442 if (filename
[0] == '-' && filename
[1] == '\0')
1445 if ((fp
= fopen(filename
,"r")) == NULL
) {
1446 redisLog(REDIS_WARNING
,"Fatal error, can't open config file");
1451 while(fgets(buf
,REDIS_CONFIGLINE_MAX
+1,fp
) != NULL
) {
1457 line
= sdstrim(line
," \t\r\n");
1459 /* Skip comments and blank lines*/
1460 if (line
[0] == '#' || line
[0] == '\0') {
1465 /* Split into arguments */
1466 argv
= sdssplitlen(line
,sdslen(line
)," ",1,&argc
);
1467 sdstolower(argv
[0]);
1469 /* Execute config directives */
1470 if (!strcasecmp(argv
[0],"timeout") && argc
== 2) {
1471 server
.maxidletime
= atoi(argv
[1]);
1472 if (server
.maxidletime
< 0) {
1473 err
= "Invalid timeout value"; goto loaderr
;
1475 } else if (!strcasecmp(argv
[0],"port") && argc
== 2) {
1476 server
.port
= atoi(argv
[1]);
1477 if (server
.port
< 1 || server
.port
> 65535) {
1478 err
= "Invalid port"; goto loaderr
;
1480 } else if (!strcasecmp(argv
[0],"bind") && argc
== 2) {
1481 server
.bindaddr
= zstrdup(argv
[1]);
1482 } else if (!strcasecmp(argv
[0],"save") && argc
== 3) {
1483 int seconds
= atoi(argv
[1]);
1484 int changes
= atoi(argv
[2]);
1485 if (seconds
< 1 || changes
< 0) {
1486 err
= "Invalid save parameters"; goto loaderr
;
1488 appendServerSaveParams(seconds
,changes
);
1489 } else if (!strcasecmp(argv
[0],"dir") && argc
== 2) {
1490 if (chdir(argv
[1]) == -1) {
1491 redisLog(REDIS_WARNING
,"Can't chdir to '%s': %s",
1492 argv
[1], strerror(errno
));
1495 } else if (!strcasecmp(argv
[0],"loglevel") && argc
== 2) {
1496 if (!strcasecmp(argv
[1],"debug")) server
.verbosity
= REDIS_DEBUG
;
1497 else if (!strcasecmp(argv
[1],"verbose")) server
.verbosity
= REDIS_VERBOSE
;
1498 else if (!strcasecmp(argv
[1],"notice")) server
.verbosity
= REDIS_NOTICE
;
1499 else if (!strcasecmp(argv
[1],"warning")) server
.verbosity
= REDIS_WARNING
;
1501 err
= "Invalid log level. Must be one of debug, notice, warning";
1504 } else if (!strcasecmp(argv
[0],"logfile") && argc
== 2) {
1507 server
.logfile
= zstrdup(argv
[1]);
1508 if (!strcasecmp(server
.logfile
,"stdout")) {
1509 zfree(server
.logfile
);
1510 server
.logfile
= NULL
;
1512 if (server
.logfile
) {
1513 /* Test if we are able to open the file. The server will not
1514 * be able to abort just for this problem later... */
1515 logfp
= fopen(server
.logfile
,"a");
1516 if (logfp
== NULL
) {
1517 err
= sdscatprintf(sdsempty(),
1518 "Can't open the log file: %s", strerror(errno
));
1523 } else if (!strcasecmp(argv
[0],"databases") && argc
== 2) {
1524 server
.dbnum
= atoi(argv
[1]);
1525 if (server
.dbnum
< 1) {
1526 err
= "Invalid number of databases"; goto loaderr
;
1528 } else if (!strcasecmp(argv
[0],"maxclients") && argc
== 2) {
1529 server
.maxclients
= atoi(argv
[1]);
1530 } else if (!strcasecmp(argv
[0],"maxmemory") && argc
== 2) {
1531 server
.maxmemory
= strtoll(argv
[1], NULL
, 10);
1532 } else if (!strcasecmp(argv
[0],"slaveof") && argc
== 3) {
1533 server
.masterhost
= sdsnew(argv
[1]);
1534 server
.masterport
= atoi(argv
[2]);
1535 server
.replstate
= REDIS_REPL_CONNECT
;
1536 } else if (!strcasecmp(argv
[0],"masterauth") && argc
== 2) {
1537 server
.masterauth
= zstrdup(argv
[1]);
1538 } else if (!strcasecmp(argv
[0],"glueoutputbuf") && argc
== 2) {
1539 if ((server
.glueoutputbuf
= yesnotoi(argv
[1])) == -1) {
1540 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1542 } else if (!strcasecmp(argv
[0],"shareobjects") && argc
== 2) {
1543 if ((server
.shareobjects
= yesnotoi(argv
[1])) == -1) {
1544 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1546 } else if (!strcasecmp(argv
[0],"rdbcompression") && argc
== 2) {
1547 if ((server
.rdbcompression
= yesnotoi(argv
[1])) == -1) {
1548 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1550 } else if (!strcasecmp(argv
[0],"shareobjectspoolsize") && argc
== 2) {
1551 server
.sharingpoolsize
= atoi(argv
[1]);
1552 if (server
.sharingpoolsize
< 1) {
1553 err
= "invalid object sharing pool size"; goto loaderr
;
1555 } else if (!strcasecmp(argv
[0],"daemonize") && argc
== 2) {
1556 if ((server
.daemonize
= yesnotoi(argv
[1])) == -1) {
1557 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1559 } else if (!strcasecmp(argv
[0],"appendonly") && argc
== 2) {
1560 if ((server
.appendonly
= yesnotoi(argv
[1])) == -1) {
1561 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1563 } else if (!strcasecmp(argv
[0],"appendfsync") && argc
== 2) {
1564 if (!strcasecmp(argv
[1],"no")) {
1565 server
.appendfsync
= APPENDFSYNC_NO
;
1566 } else if (!strcasecmp(argv
[1],"always")) {
1567 server
.appendfsync
= APPENDFSYNC_ALWAYS
;
1568 } else if (!strcasecmp(argv
[1],"everysec")) {
1569 server
.appendfsync
= APPENDFSYNC_EVERYSEC
;
1571 err
= "argument must be 'no', 'always' or 'everysec'";
1574 } else if (!strcasecmp(argv
[0],"requirepass") && argc
== 2) {
1575 server
.requirepass
= zstrdup(argv
[1]);
1576 } else if (!strcasecmp(argv
[0],"pidfile") && argc
== 2) {
1577 server
.pidfile
= zstrdup(argv
[1]);
1578 } else if (!strcasecmp(argv
[0],"dbfilename") && argc
== 2) {
1579 server
.dbfilename
= zstrdup(argv
[1]);
1580 } else if (!strcasecmp(argv
[0],"vm-enabled") && argc
== 2) {
1581 if ((server
.vm_enabled
= yesnotoi(argv
[1])) == -1) {
1582 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1584 } else if (!strcasecmp(argv
[0],"vm-max-memory") && argc
== 2) {
1585 server
.vm_max_memory
= strtoll(argv
[1], NULL
, 10);
1586 } else if (!strcasecmp(argv
[0],"vm-page-size") && argc
== 2) {
1587 server
.vm_page_size
= strtoll(argv
[1], NULL
, 10);
1588 } else if (!strcasecmp(argv
[0],"vm-pages") && argc
== 2) {
1589 server
.vm_pages
= strtoll(argv
[1], NULL
, 10);
1590 } else if (!strcasecmp(argv
[0],"vm-max-threads") && argc
== 2) {
1591 server
.vm_max_threads
= strtoll(argv
[1], NULL
, 10);
1593 err
= "Bad directive or wrong number of arguments"; goto loaderr
;
1595 for (j
= 0; j
< argc
; j
++)
1600 if (fp
!= stdin
) fclose(fp
);
1604 fprintf(stderr
, "\n*** FATAL CONFIG FILE ERROR ***\n");
1605 fprintf(stderr
, "Reading the configuration file, at line %d\n", linenum
);
1606 fprintf(stderr
, ">>> '%s'\n", line
);
1607 fprintf(stderr
, "%s\n", err
);
1611 static void freeClientArgv(redisClient
*c
) {
1614 for (j
= 0; j
< c
->argc
; j
++)
1615 decrRefCount(c
->argv
[j
]);
1616 for (j
= 0; j
< c
->mbargc
; j
++)
1617 decrRefCount(c
->mbargv
[j
]);
1622 static void freeClient(redisClient
*c
) {
1625 /* Note that if the client we are freeing is blocked into a blocking
1626 * call, we have to set querybuf to NULL *before* to call unblockClient()
1627 * to avoid processInputBuffer() will get called. Also it is important
1628 * to remove the file events after this, because this call adds
1629 * the READABLE event. */
1630 sdsfree(c
->querybuf
);
1632 if (c
->flags
& REDIS_BLOCKED
)
1635 aeDeleteFileEvent(server
.el
,c
->fd
,AE_READABLE
);
1636 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1637 listRelease(c
->reply
);
1640 /* Remove from the list of clients */
1641 ln
= listSearchKey(server
.clients
,c
);
1642 redisAssert(ln
!= NULL
);
1643 listDelNode(server
.clients
,ln
);
1644 /* Remove from the list of clients waiting for VM operations */
1645 if (server
.vm_enabled
&& listLength(c
->io_keys
)) {
1646 ln
= listSearchKey(server
.io_clients
,c
);
1647 if (ln
) listDelNode(server
.io_clients
,ln
);
1648 listRelease(c
->io_keys
);
1651 if (c
->flags
& REDIS_SLAVE
) {
1652 if (c
->replstate
== REDIS_REPL_SEND_BULK
&& c
->repldbfd
!= -1)
1654 list
*l
= (c
->flags
& REDIS_MONITOR
) ? server
.monitors
: server
.slaves
;
1655 ln
= listSearchKey(l
,c
);
1656 redisAssert(ln
!= NULL
);
1659 if (c
->flags
& REDIS_MASTER
) {
1660 server
.master
= NULL
;
1661 server
.replstate
= REDIS_REPL_CONNECT
;
1665 freeClientMultiState(c
);
1669 #define GLUEREPLY_UP_TO (1024)
1670 static void glueReplyBuffersIfNeeded(redisClient
*c
) {
1672 char buf
[GLUEREPLY_UP_TO
];
1676 listRewind(c
->reply
);
1677 while((ln
= listYield(c
->reply
))) {
1681 objlen
= sdslen(o
->ptr
);
1682 if (copylen
+ objlen
<= GLUEREPLY_UP_TO
) {
1683 memcpy(buf
+copylen
,o
->ptr
,objlen
);
1685 listDelNode(c
->reply
,ln
);
1687 if (copylen
== 0) return;
1691 /* Now the output buffer is empty, add the new single element */
1692 o
= createObject(REDIS_STRING
,sdsnewlen(buf
,copylen
));
1693 listAddNodeHead(c
->reply
,o
);
1696 static void sendReplyToClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1697 redisClient
*c
= privdata
;
1698 int nwritten
= 0, totwritten
= 0, objlen
;
1701 REDIS_NOTUSED(mask
);
1703 /* Use writev() if we have enough buffers to send */
1704 if (!server
.glueoutputbuf
&&
1705 listLength(c
->reply
) > REDIS_WRITEV_THRESHOLD
&&
1706 !(c
->flags
& REDIS_MASTER
))
1708 sendReplyToClientWritev(el
, fd
, privdata
, mask
);
1712 while(listLength(c
->reply
)) {
1713 if (server
.glueoutputbuf
&& listLength(c
->reply
) > 1)
1714 glueReplyBuffersIfNeeded(c
);
1716 o
= listNodeValue(listFirst(c
->reply
));
1717 objlen
= sdslen(o
->ptr
);
1720 listDelNode(c
->reply
,listFirst(c
->reply
));
1724 if (c
->flags
& REDIS_MASTER
) {
1725 /* Don't reply to a master */
1726 nwritten
= objlen
- c
->sentlen
;
1728 nwritten
= write(fd
, ((char*)o
->ptr
)+c
->sentlen
, objlen
- c
->sentlen
);
1729 if (nwritten
<= 0) break;
1731 c
->sentlen
+= nwritten
;
1732 totwritten
+= nwritten
;
1733 /* If we fully sent the object on head go to the next one */
1734 if (c
->sentlen
== objlen
) {
1735 listDelNode(c
->reply
,listFirst(c
->reply
));
1738 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1739 * bytes, in a single threaded server it's a good idea to serve
1740 * other clients as well, even if a very large request comes from
1741 * super fast link that is always able to accept data (in real world
1742 * scenario think about 'KEYS *' against the loopback interfae) */
1743 if (totwritten
> REDIS_MAX_WRITE_PER_EVENT
) break;
1745 if (nwritten
== -1) {
1746 if (errno
== EAGAIN
) {
1749 redisLog(REDIS_VERBOSE
,
1750 "Error writing to client: %s", strerror(errno
));
1755 if (totwritten
> 0) c
->lastinteraction
= time(NULL
);
1756 if (listLength(c
->reply
) == 0) {
1758 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1762 static void sendReplyToClientWritev(aeEventLoop
*el
, int fd
, void *privdata
, int mask
)
1764 redisClient
*c
= privdata
;
1765 int nwritten
= 0, totwritten
= 0, objlen
, willwrite
;
1767 struct iovec iov
[REDIS_WRITEV_IOVEC_COUNT
];
1768 int offset
, ion
= 0;
1770 REDIS_NOTUSED(mask
);
1773 while (listLength(c
->reply
)) {
1774 offset
= c
->sentlen
;
1778 /* fill-in the iov[] array */
1779 for(node
= listFirst(c
->reply
); node
; node
= listNextNode(node
)) {
1780 o
= listNodeValue(node
);
1781 objlen
= sdslen(o
->ptr
);
1783 if (totwritten
+ objlen
- offset
> REDIS_MAX_WRITE_PER_EVENT
)
1786 if(ion
== REDIS_WRITEV_IOVEC_COUNT
)
1787 break; /* no more iovecs */
1789 iov
[ion
].iov_base
= ((char*)o
->ptr
) + offset
;
1790 iov
[ion
].iov_len
= objlen
- offset
;
1791 willwrite
+= objlen
- offset
;
1792 offset
= 0; /* just for the first item */
1799 /* write all collected blocks at once */
1800 if((nwritten
= writev(fd
, iov
, ion
)) < 0) {
1801 if (errno
!= EAGAIN
) {
1802 redisLog(REDIS_VERBOSE
,
1803 "Error writing to client: %s", strerror(errno
));
1810 totwritten
+= nwritten
;
1811 offset
= c
->sentlen
;
1813 /* remove written robjs from c->reply */
1814 while (nwritten
&& listLength(c
->reply
)) {
1815 o
= listNodeValue(listFirst(c
->reply
));
1816 objlen
= sdslen(o
->ptr
);
1818 if(nwritten
>= objlen
- offset
) {
1819 listDelNode(c
->reply
, listFirst(c
->reply
));
1820 nwritten
-= objlen
- offset
;
1824 c
->sentlen
+= nwritten
;
1832 c
->lastinteraction
= time(NULL
);
1834 if (listLength(c
->reply
) == 0) {
1836 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1840 static struct redisCommand
*lookupCommand(char *name
) {
1842 while(cmdTable
[j
].name
!= NULL
) {
1843 if (!strcasecmp(name
,cmdTable
[j
].name
)) return &cmdTable
[j
];
1849 /* resetClient prepare the client to process the next command */
1850 static void resetClient(redisClient
*c
) {
1856 /* Call() is the core of Redis execution of a command */
1857 static void call(redisClient
*c
, struct redisCommand
*cmd
) {
1860 dirty
= server
.dirty
;
1862 if (server
.appendonly
&& server
.dirty
-dirty
)
1863 feedAppendOnlyFile(cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1864 if (server
.dirty
-dirty
&& listLength(server
.slaves
))
1865 replicationFeedSlaves(server
.slaves
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1866 if (listLength(server
.monitors
))
1867 replicationFeedSlaves(server
.monitors
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1868 server
.stat_numcommands
++;
1871 /* If this function gets called we already read a whole
1872 * command, argments are in the client argv/argc fields.
1873 * processCommand() execute the command or prepare the
1874 * server for a bulk read from the client.
1876 * If 1 is returned the client is still alive and valid and
1877 * and other operations can be performed by the caller. Otherwise
1878 * if 0 is returned the client was destroied (i.e. after QUIT). */
1879 static int processCommand(redisClient
*c
) {
1880 struct redisCommand
*cmd
;
1882 /* Free some memory if needed (maxmemory setting) */
1883 if (server
.maxmemory
) freeMemoryIfNeeded();
1885 /* Handle the multi bulk command type. This is an alternative protocol
1886 * supported by Redis in order to receive commands that are composed of
1887 * multiple binary-safe "bulk" arguments. The latency of processing is
1888 * a bit higher but this allows things like multi-sets, so if this
1889 * protocol is used only for MSET and similar commands this is a big win. */
1890 if (c
->multibulk
== 0 && c
->argc
== 1 && ((char*)(c
->argv
[0]->ptr
))[0] == '*') {
1891 c
->multibulk
= atoi(((char*)c
->argv
[0]->ptr
)+1);
1892 if (c
->multibulk
<= 0) {
1896 decrRefCount(c
->argv
[c
->argc
-1]);
1900 } else if (c
->multibulk
) {
1901 if (c
->bulklen
== -1) {
1902 if (((char*)c
->argv
[0]->ptr
)[0] != '$') {
1903 addReplySds(c
,sdsnew("-ERR multi bulk protocol error\r\n"));
1907 int bulklen
= atoi(((char*)c
->argv
[0]->ptr
)+1);
1908 decrRefCount(c
->argv
[0]);
1909 if (bulklen
< 0 || bulklen
> 1024*1024*1024) {
1911 addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n"));
1916 c
->bulklen
= bulklen
+2; /* add two bytes for CR+LF */
1920 c
->mbargv
= zrealloc(c
->mbargv
,(sizeof(robj
*))*(c
->mbargc
+1));
1921 c
->mbargv
[c
->mbargc
] = c
->argv
[0];
1925 if (c
->multibulk
== 0) {
1929 /* Here we need to swap the multi-bulk argc/argv with the
1930 * normal argc/argv of the client structure. */
1932 c
->argv
= c
->mbargv
;
1933 c
->mbargv
= auxargv
;
1936 c
->argc
= c
->mbargc
;
1937 c
->mbargc
= auxargc
;
1939 /* We need to set bulklen to something different than -1
1940 * in order for the code below to process the command without
1941 * to try to read the last argument of a bulk command as
1942 * a special argument. */
1944 /* continue below and process the command */
1951 /* -- end of multi bulk commands processing -- */
1953 /* The QUIT command is handled as a special case. Normal command
1954 * procs are unable to close the client connection safely */
1955 if (!strcasecmp(c
->argv
[0]->ptr
,"quit")) {
1959 cmd
= lookupCommand(c
->argv
[0]->ptr
);
1962 sdscatprintf(sdsempty(), "-ERR unknown command '%s'\r\n",
1963 (char*)c
->argv
[0]->ptr
));
1966 } else if ((cmd
->arity
> 0 && cmd
->arity
!= c
->argc
) ||
1967 (c
->argc
< -cmd
->arity
)) {
1969 sdscatprintf(sdsempty(),
1970 "-ERR wrong number of arguments for '%s' command\r\n",
1974 } else if (server
.maxmemory
&& cmd
->flags
& REDIS_CMD_DENYOOM
&& zmalloc_used_memory() > server
.maxmemory
) {
1975 addReplySds(c
,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1978 } else if (cmd
->flags
& REDIS_CMD_BULK
&& c
->bulklen
== -1) {
1979 int bulklen
= atoi(c
->argv
[c
->argc
-1]->ptr
);
1981 decrRefCount(c
->argv
[c
->argc
-1]);
1982 if (bulklen
< 0 || bulklen
> 1024*1024*1024) {
1984 addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n"));
1989 c
->bulklen
= bulklen
+2; /* add two bytes for CR+LF */
1990 /* It is possible that the bulk read is already in the
1991 * buffer. Check this condition and handle it accordingly.
1992 * This is just a fast path, alternative to call processInputBuffer().
1993 * It's a good idea since the code is small and this condition
1994 * happens most of the times. */
1995 if ((signed)sdslen(c
->querybuf
) >= c
->bulklen
) {
1996 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
1998 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
2003 /* Let's try to share objects on the command arguments vector */
2004 if (server
.shareobjects
) {
2006 for(j
= 1; j
< c
->argc
; j
++)
2007 c
->argv
[j
] = tryObjectSharing(c
->argv
[j
]);
2009 /* Let's try to encode the bulk object to save space. */
2010 if (cmd
->flags
& REDIS_CMD_BULK
)
2011 tryObjectEncoding(c
->argv
[c
->argc
-1]);
2013 /* Check if the user is authenticated */
2014 if (server
.requirepass
&& !c
->authenticated
&& cmd
->proc
!= authCommand
) {
2015 addReplySds(c
,sdsnew("-ERR operation not permitted\r\n"));
2020 /* Exec the command */
2021 if (c
->flags
& REDIS_MULTI
&& cmd
->proc
!= execCommand
) {
2022 queueMultiCommand(c
,cmd
);
2023 addReply(c
,shared
.queued
);
2028 /* Prepare the client for the next command */
2029 if (c
->flags
& REDIS_CLOSE
) {
2037 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
) {
2041 /* (args*2)+1 is enough room for args, spaces, newlines */
2042 robj
*static_outv
[REDIS_STATIC_ARGS
*2+1];
2044 if (argc
<= REDIS_STATIC_ARGS
) {
2047 outv
= zmalloc(sizeof(robj
*)*(argc
*2+1));
2050 for (j
= 0; j
< argc
; j
++) {
2051 if (j
!= 0) outv
[outc
++] = shared
.space
;
2052 if ((cmd
->flags
& REDIS_CMD_BULK
) && j
== argc
-1) {
2055 lenobj
= createObject(REDIS_STRING
,
2056 sdscatprintf(sdsempty(),"%lu\r\n",
2057 (unsigned long) stringObjectLen(argv
[j
])));
2058 lenobj
->refcount
= 0;
2059 outv
[outc
++] = lenobj
;
2061 outv
[outc
++] = argv
[j
];
2063 outv
[outc
++] = shared
.crlf
;
2065 /* Increment all the refcounts at start and decrement at end in order to
2066 * be sure to free objects if there is no slave in a replication state
2067 * able to be feed with commands */
2068 for (j
= 0; j
< outc
; j
++) incrRefCount(outv
[j
]);
2070 while((ln
= listYield(slaves
))) {
2071 redisClient
*slave
= ln
->value
;
2073 /* Don't feed slaves that are still waiting for BGSAVE to start */
2074 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) continue;
2076 /* Feed all the other slaves, MONITORs and so on */
2077 if (slave
->slaveseldb
!= dictid
) {
2081 case 0: selectcmd
= shared
.select0
; break;
2082 case 1: selectcmd
= shared
.select1
; break;
2083 case 2: selectcmd
= shared
.select2
; break;
2084 case 3: selectcmd
= shared
.select3
; break;
2085 case 4: selectcmd
= shared
.select4
; break;
2086 case 5: selectcmd
= shared
.select5
; break;
2087 case 6: selectcmd
= shared
.select6
; break;
2088 case 7: selectcmd
= shared
.select7
; break;
2089 case 8: selectcmd
= shared
.select8
; break;
2090 case 9: selectcmd
= shared
.select9
; break;
2092 selectcmd
= createObject(REDIS_STRING
,
2093 sdscatprintf(sdsempty(),"select %d\r\n",dictid
));
2094 selectcmd
->refcount
= 0;
2097 addReply(slave
,selectcmd
);
2098 slave
->slaveseldb
= dictid
;
2100 for (j
= 0; j
< outc
; j
++) addReply(slave
,outv
[j
]);
2102 for (j
= 0; j
< outc
; j
++) decrRefCount(outv
[j
]);
2103 if (outv
!= static_outv
) zfree(outv
);
2106 static void processInputBuffer(redisClient
*c
) {
2108 /* Before to process the input buffer, make sure the client is not
2109 * waitig for a blocking operation such as BLPOP. Note that the first
2110 * iteration the client is never blocked, otherwise the processInputBuffer
2111 * would not be called at all, but after the execution of the first commands
2112 * in the input buffer the client may be blocked, and the "goto again"
2113 * will try to reiterate. The following line will make it return asap. */
2114 if (c
->flags
& REDIS_BLOCKED
|| c
->flags
& REDIS_IO_WAIT
) return;
2115 if (c
->bulklen
== -1) {
2116 /* Read the first line of the query */
2117 char *p
= strchr(c
->querybuf
,'\n');
2124 query
= c
->querybuf
;
2125 c
->querybuf
= sdsempty();
2126 querylen
= 1+(p
-(query
));
2127 if (sdslen(query
) > querylen
) {
2128 /* leave data after the first line of the query in the buffer */
2129 c
->querybuf
= sdscatlen(c
->querybuf
,query
+querylen
,sdslen(query
)-querylen
);
2131 *p
= '\0'; /* remove "\n" */
2132 if (*(p
-1) == '\r') *(p
-1) = '\0'; /* and "\r" if any */
2133 sdsupdatelen(query
);
2135 /* Now we can split the query in arguments */
2136 argv
= sdssplitlen(query
,sdslen(query
)," ",1,&argc
);
2139 if (c
->argv
) zfree(c
->argv
);
2140 c
->argv
= zmalloc(sizeof(robj
*)*argc
);
2142 for (j
= 0; j
< argc
; j
++) {
2143 if (sdslen(argv
[j
])) {
2144 c
->argv
[c
->argc
] = createObject(REDIS_STRING
,argv
[j
]);
2152 /* Execute the command. If the client is still valid
2153 * after processCommand() return and there is something
2154 * on the query buffer try to process the next command. */
2155 if (processCommand(c
) && sdslen(c
->querybuf
)) goto again
;
2157 /* Nothing to process, argc == 0. Just process the query
2158 * buffer if it's not empty or return to the caller */
2159 if (sdslen(c
->querybuf
)) goto again
;
2162 } else if (sdslen(c
->querybuf
) >= REDIS_REQUEST_MAX_SIZE
) {
2163 redisLog(REDIS_VERBOSE
, "Client protocol error");
2168 /* Bulk read handling. Note that if we are at this point
2169 the client already sent a command terminated with a newline,
2170 we are reading the bulk data that is actually the last
2171 argument of the command. */
2172 int qbl
= sdslen(c
->querybuf
);
2174 if (c
->bulklen
<= qbl
) {
2175 /* Copy everything but the final CRLF as final argument */
2176 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
2178 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
2179 /* Process the command. If the client is still valid after
2180 * the processing and there is more data in the buffer
2181 * try to parse it. */
2182 if (processCommand(c
) && sdslen(c
->querybuf
)) goto again
;
2188 static void readQueryFromClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
2189 redisClient
*c
= (redisClient
*) privdata
;
2190 char buf
[REDIS_IOBUF_LEN
];
2193 REDIS_NOTUSED(mask
);
2195 nread
= read(fd
, buf
, REDIS_IOBUF_LEN
);
2197 if (errno
== EAGAIN
) {
2200 redisLog(REDIS_VERBOSE
, "Reading from client: %s",strerror(errno
));
2204 } else if (nread
== 0) {
2205 redisLog(REDIS_VERBOSE
, "Client closed connection");
2210 c
->querybuf
= sdscatlen(c
->querybuf
, buf
, nread
);
2211 c
->lastinteraction
= time(NULL
);
2215 processInputBuffer(c
);
2218 static int selectDb(redisClient
*c
, int id
) {
2219 if (id
< 0 || id
>= server
.dbnum
)
2221 c
->db
= &server
.db
[id
];
2225 static void *dupClientReplyValue(void *o
) {
2226 incrRefCount((robj
*)o
);
2230 static redisClient
*createClient(int fd
) {
2231 redisClient
*c
= zmalloc(sizeof(*c
));
2233 anetNonBlock(NULL
,fd
);
2234 anetTcpNoDelay(NULL
,fd
);
2235 if (!c
) return NULL
;
2238 c
->querybuf
= sdsempty();
2247 c
->lastinteraction
= time(NULL
);
2248 c
->authenticated
= 0;
2249 c
->replstate
= REDIS_REPL_NONE
;
2250 c
->reply
= listCreate();
2251 listSetFreeMethod(c
->reply
,decrRefCount
);
2252 listSetDupMethod(c
->reply
,dupClientReplyValue
);
2253 c
->blockingkeys
= NULL
;
2254 c
->blockingkeysnum
= 0;
2255 c
->io_keys
= listCreate();
2256 listSetFreeMethod(c
->io_keys
,decrRefCount
);
2257 if (aeCreateFileEvent(server
.el
, c
->fd
, AE_READABLE
,
2258 readQueryFromClient
, c
) == AE_ERR
) {
2262 listAddNodeTail(server
.clients
,c
);
2263 initClientMultiState(c
);
2267 static void addReply(redisClient
*c
, robj
*obj
) {
2268 if (listLength(c
->reply
) == 0 &&
2269 (c
->replstate
== REDIS_REPL_NONE
||
2270 c
->replstate
== REDIS_REPL_ONLINE
) &&
2271 aeCreateFileEvent(server
.el
, c
->fd
, AE_WRITABLE
,
2272 sendReplyToClient
, c
) == AE_ERR
) return;
2274 if (server
.vm_enabled
&& obj
->storage
!= REDIS_VM_MEMORY
) {
2275 obj
= dupStringObject(obj
);
2276 obj
->refcount
= 0; /* getDecodedObject() will increment the refcount */
2278 listAddNodeTail(c
->reply
,getDecodedObject(obj
));
2281 static void addReplySds(redisClient
*c
, sds s
) {
2282 robj
*o
= createObject(REDIS_STRING
,s
);
2287 static void addReplyDouble(redisClient
*c
, double d
) {
2290 snprintf(buf
,sizeof(buf
),"%.17g",d
);
2291 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n%s\r\n",
2292 (unsigned long) strlen(buf
),buf
));
2295 static void addReplyBulkLen(redisClient
*c
, robj
*obj
) {
2298 if (obj
->encoding
== REDIS_ENCODING_RAW
) {
2299 len
= sdslen(obj
->ptr
);
2301 long n
= (long)obj
->ptr
;
2303 /* Compute how many bytes will take this integer as a radix 10 string */
2309 while((n
= n
/10) != 0) {
2313 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n",(unsigned long)len
));
2316 static void acceptHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
2321 REDIS_NOTUSED(mask
);
2322 REDIS_NOTUSED(privdata
);
2324 cfd
= anetAccept(server
.neterr
, fd
, cip
, &cport
);
2325 if (cfd
== AE_ERR
) {
2326 redisLog(REDIS_VERBOSE
,"Accepting client connection: %s", server
.neterr
);
2329 redisLog(REDIS_VERBOSE
,"Accepted %s:%d", cip
, cport
);
2330 if ((c
= createClient(cfd
)) == NULL
) {
2331 redisLog(REDIS_WARNING
,"Error allocating resoures for the client");
2332 close(cfd
); /* May be already closed, just ingore errors */
2335 /* If maxclient directive is set and this is one client more... close the
2336 * connection. Note that we create the client instead to check before
2337 * for this condition, since now the socket is already set in nonblocking
2338 * mode and we can send an error for free using the Kernel I/O */
2339 if (server
.maxclients
&& listLength(server
.clients
) > server
.maxclients
) {
2340 char *err
= "-ERR max number of clients reached\r\n";
2342 /* That's a best effort error message, don't check write errors */
2343 if (write(c
->fd
,err
,strlen(err
)) == -1) {
2344 /* Nothing to do, Just to avoid the warning... */
2349 server
.stat_numconnections
++;
2352 /* ======================= Redis objects implementation ===================== */
2354 static robj
*createObject(int type
, void *ptr
) {
2357 if (listLength(server
.objfreelist
)) {
2358 listNode
*head
= listFirst(server
.objfreelist
);
2359 o
= listNodeValue(head
);
2360 listDelNode(server
.objfreelist
,head
);
2362 if (server
.vm_enabled
) {
2363 o
= zmalloc(sizeof(*o
));
2365 o
= zmalloc(sizeof(*o
)-sizeof(struct redisObjectVM
));
2369 o
->encoding
= REDIS_ENCODING_RAW
;
2372 if (server
.vm_enabled
) {
2373 o
->vm
.atime
= server
.unixtime
;
2374 o
->storage
= REDIS_VM_MEMORY
;
2379 static robj
*createStringObject(char *ptr
, size_t len
) {
2380 return createObject(REDIS_STRING
,sdsnewlen(ptr
,len
));
2383 static robj
*dupStringObject(robj
*o
) {
2384 return createStringObject(o
->ptr
,sdslen(o
->ptr
));
2387 static robj
*createListObject(void) {
2388 list
*l
= listCreate();
2390 listSetFreeMethod(l
,decrRefCount
);
2391 return createObject(REDIS_LIST
,l
);
2394 static robj
*createSetObject(void) {
2395 dict
*d
= dictCreate(&setDictType
,NULL
);
2396 return createObject(REDIS_SET
,d
);
2399 static robj
*createZsetObject(void) {
2400 zset
*zs
= zmalloc(sizeof(*zs
));
2402 zs
->dict
= dictCreate(&zsetDictType
,NULL
);
2403 zs
->zsl
= zslCreate();
2404 return createObject(REDIS_ZSET
,zs
);
2407 static void freeStringObject(robj
*o
) {
2408 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2413 static void freeListObject(robj
*o
) {
2414 listRelease((list
*) o
->ptr
);
2417 static void freeSetObject(robj
*o
) {
2418 dictRelease((dict
*) o
->ptr
);
2421 static void freeZsetObject(robj
*o
) {
2424 dictRelease(zs
->dict
);
2429 static void freeHashObject(robj
*o
) {
2430 dictRelease((dict
*) o
->ptr
);
2433 static void incrRefCount(robj
*o
) {
2434 redisAssert(!server
.vm_enabled
|| o
->storage
== REDIS_VM_MEMORY
);
2438 static void decrRefCount(void *obj
) {
2441 /* Object is swapped out, or in the process of being loaded. */
2442 if (server
.vm_enabled
&&
2443 (o
->storage
== REDIS_VM_SWAPPED
|| o
->storage
== REDIS_VM_LOADING
))
2445 if (o
->storage
== REDIS_VM_SWAPPED
|| o
->storage
== REDIS_VM_LOADING
) {
2446 redisAssert(o
->refcount
== 1);
2448 if (o
->storage
== REDIS_VM_LOADING
) vmCancelThreadedIOJob(obj
);
2449 redisAssert(o
->type
== REDIS_STRING
);
2450 freeStringObject(o
);
2451 vmMarkPagesFree(o
->vm
.page
,o
->vm
.usedpages
);
2452 if (listLength(server
.objfreelist
) > REDIS_OBJFREELIST_MAX
||
2453 !listAddNodeHead(server
.objfreelist
,o
))
2455 server
.vm_stats_swapped_objects
--;
2458 /* Object is in memory, or in the process of being swapped out. */
2459 if (--(o
->refcount
) == 0) {
2460 if (server
.vm_enabled
&& o
->storage
== REDIS_VM_SWAPPING
)
2461 vmCancelThreadedIOJob(obj
);
2463 case REDIS_STRING
: freeStringObject(o
); break;
2464 case REDIS_LIST
: freeListObject(o
); break;
2465 case REDIS_SET
: freeSetObject(o
); break;
2466 case REDIS_ZSET
: freeZsetObject(o
); break;
2467 case REDIS_HASH
: freeHashObject(o
); break;
2468 default: redisAssert(0 != 0); break;
2470 if (listLength(server
.objfreelist
) > REDIS_OBJFREELIST_MAX
||
2471 !listAddNodeHead(server
.objfreelist
,o
))
2476 static robj
*lookupKey(redisDb
*db
, robj
*key
) {
2477 dictEntry
*de
= dictFind(db
->dict
,key
);
2479 robj
*key
= dictGetEntryKey(de
);
2480 robj
*val
= dictGetEntryVal(de
);
2482 if (server
.vm_enabled
) {
2483 if (key
->storage
== REDIS_VM_MEMORY
||
2484 key
->storage
== REDIS_VM_SWAPPING
)
2486 /* If we were swapping the object out, stop it, this key
2488 if (key
->storage
== REDIS_VM_SWAPPING
)
2489 vmCancelThreadedIOJob(key
);
2490 /* Update the access time of the key for the aging algorithm. */
2491 key
->vm
.atime
= server
.unixtime
;
2493 /* Our value was swapped on disk. Bring it at home. */
2494 redisAssert(val
== NULL
);
2495 val
= vmLoadObject(key
);
2496 dictGetEntryVal(de
) = val
;
2505 static robj
*lookupKeyRead(redisDb
*db
, robj
*key
) {
2506 expireIfNeeded(db
,key
);
2507 return lookupKey(db
,key
);
2510 static robj
*lookupKeyWrite(redisDb
*db
, robj
*key
) {
2511 deleteIfVolatile(db
,key
);
2512 return lookupKey(db
,key
);
2515 static int deleteKey(redisDb
*db
, robj
*key
) {
2518 /* We need to protect key from destruction: after the first dictDelete()
2519 * it may happen that 'key' is no longer valid if we don't increment
2520 * it's count. This may happen when we get the object reference directly
2521 * from the hash table with dictRandomKey() or dict iterators */
2523 if (dictSize(db
->expires
)) dictDelete(db
->expires
,key
);
2524 retval
= dictDelete(db
->dict
,key
);
2527 return retval
== DICT_OK
;
2530 /* Try to share an object against the shared objects pool */
2531 static robj
*tryObjectSharing(robj
*o
) {
2532 struct dictEntry
*de
;
2535 if (o
== NULL
|| server
.shareobjects
== 0) return o
;
2537 redisAssert(o
->type
== REDIS_STRING
);
2538 de
= dictFind(server
.sharingpool
,o
);
2540 robj
*shared
= dictGetEntryKey(de
);
2542 c
= ((unsigned long) dictGetEntryVal(de
))+1;
2543 dictGetEntryVal(de
) = (void*) c
;
2544 incrRefCount(shared
);
2548 /* Here we are using a stream algorihtm: Every time an object is
2549 * shared we increment its count, everytime there is a miss we
2550 * recrement the counter of a random object. If this object reaches
2551 * zero we remove the object and put the current object instead. */
2552 if (dictSize(server
.sharingpool
) >=
2553 server
.sharingpoolsize
) {
2554 de
= dictGetRandomKey(server
.sharingpool
);
2555 redisAssert(de
!= NULL
);
2556 c
= ((unsigned long) dictGetEntryVal(de
))-1;
2557 dictGetEntryVal(de
) = (void*) c
;
2559 dictDelete(server
.sharingpool
,de
->key
);
2562 c
= 0; /* If the pool is empty we want to add this object */
2567 retval
= dictAdd(server
.sharingpool
,o
,(void*)1);
2568 redisAssert(retval
== DICT_OK
);
2575 /* Check if the nul-terminated string 's' can be represented by a long
2576 * (that is, is a number that fits into long without any other space or
2577 * character before or after the digits).
2579 * If so, the function returns REDIS_OK and *longval is set to the value
2580 * of the number. Otherwise REDIS_ERR is returned */
2581 static int isStringRepresentableAsLong(sds s
, long *longval
) {
2582 char buf
[32], *endptr
;
2586 value
= strtol(s
, &endptr
, 10);
2587 if (endptr
[0] != '\0') return REDIS_ERR
;
2588 slen
= snprintf(buf
,32,"%ld",value
);
2590 /* If the number converted back into a string is not identical
2591 * then it's not possible to encode the string as integer */
2592 if (sdslen(s
) != (unsigned)slen
|| memcmp(buf
,s
,slen
)) return REDIS_ERR
;
2593 if (longval
) *longval
= value
;
2597 /* Try to encode a string object in order to save space */
2598 static int tryObjectEncoding(robj
*o
) {
2602 if (o
->encoding
!= REDIS_ENCODING_RAW
)
2603 return REDIS_ERR
; /* Already encoded */
2605 /* It's not save to encode shared objects: shared objects can be shared
2606 * everywhere in the "object space" of Redis. Encoded objects can only
2607 * appear as "values" (and not, for instance, as keys) */
2608 if (o
->refcount
> 1) return REDIS_ERR
;
2610 /* Currently we try to encode only strings */
2611 redisAssert(o
->type
== REDIS_STRING
);
2613 /* Check if we can represent this string as a long integer */
2614 if (isStringRepresentableAsLong(s
,&value
) == REDIS_ERR
) return REDIS_ERR
;
2616 /* Ok, this object can be encoded */
2617 o
->encoding
= REDIS_ENCODING_INT
;
2619 o
->ptr
= (void*) value
;
2623 /* Get a decoded version of an encoded object (returned as a new object).
2624 * If the object is already raw-encoded just increment the ref count. */
2625 static robj
*getDecodedObject(robj
*o
) {
2628 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2632 if (o
->type
== REDIS_STRING
&& o
->encoding
== REDIS_ENCODING_INT
) {
2635 snprintf(buf
,32,"%ld",(long)o
->ptr
);
2636 dec
= createStringObject(buf
,strlen(buf
));
2639 redisAssert(1 != 1);
2643 /* Compare two string objects via strcmp() or alike.
2644 * Note that the objects may be integer-encoded. In such a case we
2645 * use snprintf() to get a string representation of the numbers on the stack
2646 * and compare the strings, it's much faster than calling getDecodedObject().
2648 * Important note: if objects are not integer encoded, but binary-safe strings,
2649 * sdscmp() from sds.c will apply memcmp() so this function ca be considered
2651 static int compareStringObjects(robj
*a
, robj
*b
) {
2652 redisAssert(a
->type
== REDIS_STRING
&& b
->type
== REDIS_STRING
);
2653 char bufa
[128], bufb
[128], *astr
, *bstr
;
2656 if (a
== b
) return 0;
2657 if (a
->encoding
!= REDIS_ENCODING_RAW
) {
2658 snprintf(bufa
,sizeof(bufa
),"%ld",(long) a
->ptr
);
2664 if (b
->encoding
!= REDIS_ENCODING_RAW
) {
2665 snprintf(bufb
,sizeof(bufb
),"%ld",(long) b
->ptr
);
2671 return bothsds
? sdscmp(astr
,bstr
) : strcmp(astr
,bstr
);
2674 static size_t stringObjectLen(robj
*o
) {
2675 redisAssert(o
->type
== REDIS_STRING
);
2676 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2677 return sdslen(o
->ptr
);
2681 return snprintf(buf
,32,"%ld",(long)o
->ptr
);
2685 /*============================ RDB saving/loading =========================== */
2687 static int rdbSaveType(FILE *fp
, unsigned char type
) {
2688 if (fwrite(&type
,1,1,fp
) == 0) return -1;
2692 static int rdbSaveTime(FILE *fp
, time_t t
) {
2693 int32_t t32
= (int32_t) t
;
2694 if (fwrite(&t32
,4,1,fp
) == 0) return -1;
2698 /* check rdbLoadLen() comments for more info */
2699 static int rdbSaveLen(FILE *fp
, uint32_t len
) {
2700 unsigned char buf
[2];
2703 /* Save a 6 bit len */
2704 buf
[0] = (len
&0xFF)|(REDIS_RDB_6BITLEN
<<6);
2705 if (fwrite(buf
,1,1,fp
) == 0) return -1;
2706 } else if (len
< (1<<14)) {
2707 /* Save a 14 bit len */
2708 buf
[0] = ((len
>>8)&0xFF)|(REDIS_RDB_14BITLEN
<<6);
2710 if (fwrite(buf
,2,1,fp
) == 0) return -1;
2712 /* Save a 32 bit len */
2713 buf
[0] = (REDIS_RDB_32BITLEN
<<6);
2714 if (fwrite(buf
,1,1,fp
) == 0) return -1;
2716 if (fwrite(&len
,4,1,fp
) == 0) return -1;
2721 /* String objects in the form "2391" "-100" without any space and with a
2722 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2723 * encoded as integers to save space */
2724 static int rdbTryIntegerEncoding(sds s
, unsigned char *enc
) {
2726 char *endptr
, buf
[32];
2728 /* Check if it's possible to encode this value as a number */
2729 value
= strtoll(s
, &endptr
, 10);
2730 if (endptr
[0] != '\0') return 0;
2731 snprintf(buf
,32,"%lld",value
);
2733 /* If the number converted back into a string is not identical
2734 * then it's not possible to encode the string as integer */
2735 if (strlen(buf
) != sdslen(s
) || memcmp(buf
,s
,sdslen(s
))) return 0;
2737 /* Finally check if it fits in our ranges */
2738 if (value
>= -(1<<7) && value
<= (1<<7)-1) {
2739 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT8
;
2740 enc
[1] = value
&0xFF;
2742 } else if (value
>= -(1<<15) && value
<= (1<<15)-1) {
2743 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT16
;
2744 enc
[1] = value
&0xFF;
2745 enc
[2] = (value
>>8)&0xFF;
2747 } else if (value
>= -((long long)1<<31) && value
<= ((long long)1<<31)-1) {
2748 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT32
;
2749 enc
[1] = value
&0xFF;
2750 enc
[2] = (value
>>8)&0xFF;
2751 enc
[3] = (value
>>16)&0xFF;
2752 enc
[4] = (value
>>24)&0xFF;
2759 static int rdbSaveLzfStringObject(FILE *fp
, robj
*obj
) {
2760 unsigned int comprlen
, outlen
;
2764 /* We require at least four bytes compression for this to be worth it */
2765 outlen
= sdslen(obj
->ptr
)-4;
2766 if (outlen
<= 0) return 0;
2767 if ((out
= zmalloc(outlen
+1)) == NULL
) return 0;
2768 comprlen
= lzf_compress(obj
->ptr
, sdslen(obj
->ptr
), out
, outlen
);
2769 if (comprlen
== 0) {
2773 /* Data compressed! Let's save it on disk */
2774 byte
= (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_LZF
;
2775 if (fwrite(&byte
,1,1,fp
) == 0) goto writeerr
;
2776 if (rdbSaveLen(fp
,comprlen
) == -1) goto writeerr
;
2777 if (rdbSaveLen(fp
,sdslen(obj
->ptr
)) == -1) goto writeerr
;
2778 if (fwrite(out
,comprlen
,1,fp
) == 0) goto writeerr
;
2787 /* Save a string objet as [len][data] on disk. If the object is a string
2788 * representation of an integer value we try to safe it in a special form */
2789 static int rdbSaveStringObjectRaw(FILE *fp
, robj
*obj
) {
2793 len
= sdslen(obj
->ptr
);
2795 /* Try integer encoding */
2797 unsigned char buf
[5];
2798 if ((enclen
= rdbTryIntegerEncoding(obj
->ptr
,buf
)) > 0) {
2799 if (fwrite(buf
,enclen
,1,fp
) == 0) return -1;
2804 /* Try LZF compression - under 20 bytes it's unable to compress even
2805 * aaaaaaaaaaaaaaaaaa so skip it */
2806 if (server
.rdbcompression
&& len
> 20) {
2809 retval
= rdbSaveLzfStringObject(fp
,obj
);
2810 if (retval
== -1) return -1;
2811 if (retval
> 0) return 0;
2812 /* retval == 0 means data can't be compressed, save the old way */
2815 /* Store verbatim */
2816 if (rdbSaveLen(fp
,len
) == -1) return -1;
2817 if (len
&& fwrite(obj
->ptr
,len
,1,fp
) == 0) return -1;
2821 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2822 static int rdbSaveStringObject(FILE *fp
, robj
*obj
) {
2825 if (obj
->storage
== REDIS_VM_MEMORY
&&
2826 obj
->encoding
!= REDIS_ENCODING_RAW
)
2828 obj
= getDecodedObject(obj
);
2829 retval
= rdbSaveStringObjectRaw(fp
,obj
);
2832 /* This is a fast path when we are sure the object is not encoded.
2833 * Note that's any *faster* actually as we needed to add the conditional
2834 * but because this may happen in a background process we don't want
2835 * to touch the object fields with incr/decrRefCount in order to
2836 * preveny copy on write of pages.
2838 * Also incrRefCount() will have a failing assert() if we try to call
2839 * it against an object with storage != REDIS_VM_MEMORY. */
2840 retval
= rdbSaveStringObjectRaw(fp
,obj
);
2845 /* Save a double value. Doubles are saved as strings prefixed by an unsigned
2846 * 8 bit integer specifing the length of the representation.
2847 * This 8 bit integer has special values in order to specify the following
2853 static int rdbSaveDoubleValue(FILE *fp
, double val
) {
2854 unsigned char buf
[128];
2860 } else if (!isfinite(val
)) {
2862 buf
[0] = (val
< 0) ? 255 : 254;
2864 snprintf((char*)buf
+1,sizeof(buf
)-1,"%.17g",val
);
2865 buf
[0] = strlen((char*)buf
+1);
2868 if (fwrite(buf
,len
,1,fp
) == 0) return -1;
2872 /* Save a Redis object. */
2873 static int rdbSaveObject(FILE *fp
, robj
*o
) {
2874 if (o
->type
== REDIS_STRING
) {
2875 /* Save a string value */
2876 if (rdbSaveStringObject(fp
,o
) == -1) return -1;
2877 } else if (o
->type
== REDIS_LIST
) {
2878 /* Save a list value */
2879 list
*list
= o
->ptr
;
2883 if (rdbSaveLen(fp
,listLength(list
)) == -1) return -1;
2884 while((ln
= listYield(list
))) {
2885 robj
*eleobj
= listNodeValue(ln
);
2887 if (rdbSaveStringObject(fp
,eleobj
) == -1) return -1;
2889 } else if (o
->type
== REDIS_SET
) {
2890 /* Save a set value */
2892 dictIterator
*di
= dictGetIterator(set
);
2895 if (rdbSaveLen(fp
,dictSize(set
)) == -1) return -1;
2896 while((de
= dictNext(di
)) != NULL
) {
2897 robj
*eleobj
= dictGetEntryKey(de
);
2899 if (rdbSaveStringObject(fp
,eleobj
) == -1) return -1;
2901 dictReleaseIterator(di
);
2902 } else if (o
->type
== REDIS_ZSET
) {
2903 /* Save a set value */
2905 dictIterator
*di
= dictGetIterator(zs
->dict
);
2908 if (rdbSaveLen(fp
,dictSize(zs
->dict
)) == -1) return -1;
2909 while((de
= dictNext(di
)) != NULL
) {
2910 robj
*eleobj
= dictGetEntryKey(de
);
2911 double *score
= dictGetEntryVal(de
);
2913 if (rdbSaveStringObject(fp
,eleobj
) == -1) return -1;
2914 if (rdbSaveDoubleValue(fp
,*score
) == -1) return -1;
2916 dictReleaseIterator(di
);
2918 redisAssert(0 != 0);
2923 /* Return the length the object will have on disk if saved with
2924 * the rdbSaveObject() function. Currently we use a trick to get
2925 * this length with very little changes to the code. In the future
2926 * we could switch to a faster solution. */
2927 static off_t
rdbSavedObjectLen(robj
*o
) {
2928 static FILE *fp
= NULL
;
2930 if (fp
== NULL
) fp
= fopen("/dev/null","w");
2934 assert(rdbSaveObject(fp
,o
) != 1);
2938 /* Return the number of pages required to save this object in the swap file */
2939 static off_t
rdbSavedObjectPages(robj
*o
) {
2940 off_t bytes
= rdbSavedObjectLen(o
);
2942 return (bytes
+(server
.vm_page_size
-1))/server
.vm_page_size
;
2945 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2946 static int rdbSave(char *filename
) {
2947 dictIterator
*di
= NULL
;
2952 time_t now
= time(NULL
);
2954 snprintf(tmpfile
,256,"temp-%d.rdb", (int) getpid());
2955 fp
= fopen(tmpfile
,"w");
2957 redisLog(REDIS_WARNING
, "Failed saving the DB: %s", strerror(errno
));
2960 if (fwrite("REDIS0001",9,1,fp
) == 0) goto werr
;
2961 for (j
= 0; j
< server
.dbnum
; j
++) {
2962 redisDb
*db
= server
.db
+j
;
2964 if (dictSize(d
) == 0) continue;
2965 di
= dictGetIterator(d
);
2971 /* Write the SELECT DB opcode */
2972 if (rdbSaveType(fp
,REDIS_SELECTDB
) == -1) goto werr
;
2973 if (rdbSaveLen(fp
,j
) == -1) goto werr
;
2975 /* Iterate this DB writing every entry */
2976 while((de
= dictNext(di
)) != NULL
) {
2977 robj
*key
= dictGetEntryKey(de
);
2978 robj
*o
= dictGetEntryVal(de
);
2979 time_t expiretime
= getExpire(db
,key
);
2981 /* Save the expire time */
2982 if (expiretime
!= -1) {
2983 /* If this key is already expired skip it */
2984 if (expiretime
< now
) continue;
2985 if (rdbSaveType(fp
,REDIS_EXPIRETIME
) == -1) goto werr
;
2986 if (rdbSaveTime(fp
,expiretime
) == -1) goto werr
;
2988 /* Save the key and associated value. This requires special
2989 * handling if the value is swapped out. */
2990 if (!server
.vm_enabled
|| key
->storage
== REDIS_VM_MEMORY
||
2991 key
->storage
== REDIS_VM_SWAPPING
) {
2992 /* Save type, key, value */
2993 if (rdbSaveType(fp
,o
->type
) == -1) goto werr
;
2994 if (rdbSaveStringObject(fp
,key
) == -1) goto werr
;
2995 if (rdbSaveObject(fp
,o
) == -1) goto werr
;
2997 /* REDIS_VM_SWAPPED or REDIS_VM_LOADING */
2999 /* Get a preview of the object in memory */
3000 po
= vmPreviewObject(key
);
3001 /* Also duplicate the key object, to pass around a standard
3003 newkey
= dupStringObject(key
);
3004 /* Save type, key, value */
3005 if (rdbSaveType(fp
,key
->vtype
) == -1) goto werr
;
3006 if (rdbSaveStringObject(fp
,newkey
) == -1) goto werr
;
3007 if (rdbSaveObject(fp
,po
) == -1) goto werr
;
3008 /* Remove the loaded object from memory */
3010 decrRefCount(newkey
);
3013 dictReleaseIterator(di
);
3016 if (rdbSaveType(fp
,REDIS_EOF
) == -1) goto werr
;
3018 /* Make sure data will not remain on the OS's output buffers */
3023 /* Use RENAME to make sure the DB file is changed atomically only
3024 * if the generate DB file is ok. */
3025 if (rename(tmpfile
,filename
) == -1) {
3026 redisLog(REDIS_WARNING
,"Error moving temp DB file on the final destination: %s", strerror(errno
));
3030 redisLog(REDIS_NOTICE
,"DB saved on disk");
3032 server
.lastsave
= time(NULL
);
3038 redisLog(REDIS_WARNING
,"Write error saving DB on disk: %s", strerror(errno
));
3039 if (di
) dictReleaseIterator(di
);
3043 static int rdbSaveBackground(char *filename
) {
3046 if (server
.bgsavechildpid
!= -1) return REDIS_ERR
;
3047 if ((childpid
= fork()) == 0) {
3050 if (rdbSave(filename
) == REDIS_OK
) {
3057 if (childpid
== -1) {
3058 redisLog(REDIS_WARNING
,"Can't save in background: fork: %s",
3062 redisLog(REDIS_NOTICE
,"Background saving started by pid %d",childpid
);
3063 server
.bgsavechildpid
= childpid
;
3066 return REDIS_OK
; /* unreached */
3069 static void rdbRemoveTempFile(pid_t childpid
) {
3072 snprintf(tmpfile
,256,"temp-%d.rdb", (int) childpid
);
3076 static int rdbLoadType(FILE *fp
) {
3078 if (fread(&type
,1,1,fp
) == 0) return -1;
3082 static time_t rdbLoadTime(FILE *fp
) {
3084 if (fread(&t32
,4,1,fp
) == 0) return -1;
3085 return (time_t) t32
;
3088 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
3089 * of this file for a description of how this are stored on disk.
3091 * isencoded is set to 1 if the readed length is not actually a length but
3092 * an "encoding type", check the above comments for more info */
3093 static uint32_t rdbLoadLen(FILE *fp
, int *isencoded
) {
3094 unsigned char buf
[2];
3098 if (isencoded
) *isencoded
= 0;
3099 if (fread(buf
,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
3100 type
= (buf
[0]&0xC0)>>6;
3101 if (type
== REDIS_RDB_6BITLEN
) {
3102 /* Read a 6 bit len */
3104 } else if (type
== REDIS_RDB_ENCVAL
) {
3105 /* Read a 6 bit len encoding type */
3106 if (isencoded
) *isencoded
= 1;
3108 } else if (type
== REDIS_RDB_14BITLEN
) {
3109 /* Read a 14 bit len */
3110 if (fread(buf
+1,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
3111 return ((buf
[0]&0x3F)<<8)|buf
[1];
3113 /* Read a 32 bit len */
3114 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
3119 static robj
*rdbLoadIntegerObject(FILE *fp
, int enctype
) {
3120 unsigned char enc
[4];
3123 if (enctype
== REDIS_RDB_ENC_INT8
) {
3124 if (fread(enc
,1,1,fp
) == 0) return NULL
;
3125 val
= (signed char)enc
[0];
3126 } else if (enctype
== REDIS_RDB_ENC_INT16
) {
3128 if (fread(enc
,2,1,fp
) == 0) return NULL
;
3129 v
= enc
[0]|(enc
[1]<<8);
3131 } else if (enctype
== REDIS_RDB_ENC_INT32
) {
3133 if (fread(enc
,4,1,fp
) == 0) return NULL
;
3134 v
= enc
[0]|(enc
[1]<<8)|(enc
[2]<<16)|(enc
[3]<<24);
3137 val
= 0; /* anti-warning */
3140 return createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",val
));
3143 static robj
*rdbLoadLzfStringObject(FILE*fp
) {
3144 unsigned int len
, clen
;
3145 unsigned char *c
= NULL
;
3148 if ((clen
= rdbLoadLen(fp
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
3149 if ((len
= rdbLoadLen(fp
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
3150 if ((c
= zmalloc(clen
)) == NULL
) goto err
;
3151 if ((val
= sdsnewlen(NULL
,len
)) == NULL
) goto err
;
3152 if (fread(c
,clen
,1,fp
) == 0) goto err
;
3153 if (lzf_decompress(c
,clen
,val
,len
) == 0) goto err
;
3155 return createObject(REDIS_STRING
,val
);
3162 static robj
*rdbLoadStringObject(FILE*fp
) {
3167 len
= rdbLoadLen(fp
,&isencoded
);
3170 case REDIS_RDB_ENC_INT8
:
3171 case REDIS_RDB_ENC_INT16
:
3172 case REDIS_RDB_ENC_INT32
:
3173 return tryObjectSharing(rdbLoadIntegerObject(fp
,len
));
3174 case REDIS_RDB_ENC_LZF
:
3175 return tryObjectSharing(rdbLoadLzfStringObject(fp
));
3181 if (len
== REDIS_RDB_LENERR
) return NULL
;
3182 val
= sdsnewlen(NULL
,len
);
3183 if (len
&& fread(val
,len
,1,fp
) == 0) {
3187 return tryObjectSharing(createObject(REDIS_STRING
,val
));
3190 /* For information about double serialization check rdbSaveDoubleValue() */
3191 static int rdbLoadDoubleValue(FILE *fp
, double *val
) {
3195 if (fread(&len
,1,1,fp
) == 0) return -1;
3197 case 255: *val
= R_NegInf
; return 0;
3198 case 254: *val
= R_PosInf
; return 0;
3199 case 253: *val
= R_Nan
; return 0;
3201 if (fread(buf
,len
,1,fp
) == 0) return -1;
3203 sscanf(buf
, "%lg", val
);
3208 /* Load a Redis object of the specified type from the specified file.
3209 * On success a newly allocated object is returned, otherwise NULL. */
3210 static robj
*rdbLoadObject(int type
, FILE *fp
) {
3213 if (type
== REDIS_STRING
) {
3214 /* Read string value */
3215 if ((o
= rdbLoadStringObject(fp
)) == NULL
) return NULL
;
3216 tryObjectEncoding(o
);
3217 } else if (type
== REDIS_LIST
|| type
== REDIS_SET
) {
3218 /* Read list/set value */
3221 if ((listlen
= rdbLoadLen(fp
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
3222 o
= (type
== REDIS_LIST
) ? createListObject() : createSetObject();
3223 /* Load every single element of the list/set */
3227 if ((ele
= rdbLoadStringObject(fp
)) == NULL
) return NULL
;
3228 tryObjectEncoding(ele
);
3229 if (type
== REDIS_LIST
) {
3230 listAddNodeTail((list
*)o
->ptr
,ele
);
3232 dictAdd((dict
*)o
->ptr
,ele
,NULL
);
3235 } else if (type
== REDIS_ZSET
) {
3236 /* Read list/set value */
3240 if ((zsetlen
= rdbLoadLen(fp
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
3241 o
= createZsetObject();
3243 /* Load every single element of the list/set */
3246 double *score
= zmalloc(sizeof(double));
3248 if ((ele
= rdbLoadStringObject(fp
)) == NULL
) return NULL
;
3249 tryObjectEncoding(ele
);
3250 if (rdbLoadDoubleValue(fp
,score
) == -1) return NULL
;
3251 dictAdd(zs
->dict
,ele
,score
);
3252 zslInsert(zs
->zsl
,*score
,ele
);
3253 incrRefCount(ele
); /* added to skiplist */
3256 redisAssert(0 != 0);
3261 static int rdbLoad(char *filename
) {
3263 robj
*keyobj
= NULL
;
3265 int type
, retval
, rdbver
;
3266 dict
*d
= server
.db
[0].dict
;
3267 redisDb
*db
= server
.db
+0;
3269 time_t expiretime
= -1, now
= time(NULL
);
3270 long long loadedkeys
= 0;
3272 fp
= fopen(filename
,"r");
3273 if (!fp
) return REDIS_ERR
;
3274 if (fread(buf
,9,1,fp
) == 0) goto eoferr
;
3276 if (memcmp(buf
,"REDIS",5) != 0) {
3278 redisLog(REDIS_WARNING
,"Wrong signature trying to load DB from file");
3281 rdbver
= atoi(buf
+5);
3284 redisLog(REDIS_WARNING
,"Can't handle RDB format version %d",rdbver
);
3291 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
3292 if (type
== REDIS_EXPIRETIME
) {
3293 if ((expiretime
= rdbLoadTime(fp
)) == -1) goto eoferr
;
3294 /* We read the time so we need to read the object type again */
3295 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
3297 if (type
== REDIS_EOF
) break;
3298 /* Handle SELECT DB opcode as a special case */
3299 if (type
== REDIS_SELECTDB
) {
3300 if ((dbid
= rdbLoadLen(fp
,NULL
)) == REDIS_RDB_LENERR
)
3302 if (dbid
>= (unsigned)server
.dbnum
) {
3303 redisLog(REDIS_WARNING
,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server
.dbnum
);
3306 db
= server
.db
+dbid
;
3311 if ((keyobj
= rdbLoadStringObject(fp
)) == NULL
) goto eoferr
;
3313 if ((o
= rdbLoadObject(type
,fp
)) == NULL
) goto eoferr
;
3314 /* Add the new object in the hash table */
3315 retval
= dictAdd(d
,keyobj
,o
);
3316 if (retval
== DICT_ERR
) {
3317 redisLog(REDIS_WARNING
,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj
->ptr
);
3320 /* Set the expire time if needed */
3321 if (expiretime
!= -1) {
3322 setExpire(db
,keyobj
,expiretime
);
3323 /* Delete this key if already expired */
3324 if (expiretime
< now
) deleteKey(db
,keyobj
);
3328 /* Handle swapping while loading big datasets when VM is on */
3330 if (server
.vm_enabled
&& (loadedkeys
% 5000) == 0) {
3331 while (zmalloc_used_memory() > server
.vm_max_memory
) {
3332 if (vmSwapOneObject() == REDIS_ERR
) break;
3339 eoferr
: /* unexpected end of file is handled here with a fatal exit */
3340 if (keyobj
) decrRefCount(keyobj
);
3341 redisLog(REDIS_WARNING
,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
3343 return REDIS_ERR
; /* Just to avoid warning */
3346 /*================================== Commands =============================== */
3348 static void authCommand(redisClient
*c
) {
3349 if (!server
.requirepass
|| !strcmp(c
->argv
[1]->ptr
, server
.requirepass
)) {
3350 c
->authenticated
= 1;
3351 addReply(c
,shared
.ok
);
3353 c
->authenticated
= 0;
3354 addReplySds(c
,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
3358 static void pingCommand(redisClient
*c
) {
3359 addReply(c
,shared
.pong
);
3362 static void echoCommand(redisClient
*c
) {
3363 addReplyBulkLen(c
,c
->argv
[1]);
3364 addReply(c
,c
->argv
[1]);
3365 addReply(c
,shared
.crlf
);
3368 /*=================================== Strings =============================== */
3370 static void setGenericCommand(redisClient
*c
, int nx
) {
3373 if (nx
) deleteIfVolatile(c
->db
,c
->argv
[1]);
3374 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
3375 if (retval
== DICT_ERR
) {
3377 /* If the key is about a swapped value, we want a new key object
3378 * to overwrite the old. So we delete the old key in the database.
3379 * This will also make sure that swap pages about the old object
3380 * will be marked as free. */
3381 if (deleteIfSwapped(c
->db
,c
->argv
[1]))
3382 incrRefCount(c
->argv
[1]);
3383 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
3384 incrRefCount(c
->argv
[2]);
3386 addReply(c
,shared
.czero
);
3390 incrRefCount(c
->argv
[1]);
3391 incrRefCount(c
->argv
[2]);
3394 removeExpire(c
->db
,c
->argv
[1]);
3395 addReply(c
, nx
? shared
.cone
: shared
.ok
);
3398 static void setCommand(redisClient
*c
) {
3399 setGenericCommand(c
,0);
3402 static void setnxCommand(redisClient
*c
) {
3403 setGenericCommand(c
,1);
3406 static int getGenericCommand(redisClient
*c
) {
3407 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3410 addReply(c
,shared
.nullbulk
);
3413 if (o
->type
!= REDIS_STRING
) {
3414 addReply(c
,shared
.wrongtypeerr
);
3417 addReplyBulkLen(c
,o
);
3419 addReply(c
,shared
.crlf
);
3425 static void getCommand(redisClient
*c
) {
3426 getGenericCommand(c
);
3429 static void getsetCommand(redisClient
*c
) {
3430 if (getGenericCommand(c
) == REDIS_ERR
) return;
3431 if (dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]) == DICT_ERR
) {
3432 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
3434 incrRefCount(c
->argv
[1]);
3436 incrRefCount(c
->argv
[2]);
3438 removeExpire(c
->db
,c
->argv
[1]);
3441 static void mgetCommand(redisClient
*c
) {
3444 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",c
->argc
-1));
3445 for (j
= 1; j
< c
->argc
; j
++) {
3446 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[j
]);
3448 addReply(c
,shared
.nullbulk
);
3450 if (o
->type
!= REDIS_STRING
) {
3451 addReply(c
,shared
.nullbulk
);
3453 addReplyBulkLen(c
,o
);
3455 addReply(c
,shared
.crlf
);
3461 static void msetGenericCommand(redisClient
*c
, int nx
) {
3462 int j
, busykeys
= 0;
3464 if ((c
->argc
% 2) == 0) {
3465 addReplySds(c
,sdsnew("-ERR wrong number of arguments for MSET\r\n"));
3468 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
3469 * set nothing at all if at least one already key exists. */
3471 for (j
= 1; j
< c
->argc
; j
+= 2) {
3472 if (lookupKeyWrite(c
->db
,c
->argv
[j
]) != NULL
) {
3478 addReply(c
, shared
.czero
);
3482 for (j
= 1; j
< c
->argc
; j
+= 2) {
3485 tryObjectEncoding(c
->argv
[j
+1]);
3486 retval
= dictAdd(c
->db
->dict
,c
->argv
[j
],c
->argv
[j
+1]);
3487 if (retval
== DICT_ERR
) {
3488 dictReplace(c
->db
->dict
,c
->argv
[j
],c
->argv
[j
+1]);
3489 incrRefCount(c
->argv
[j
+1]);
3491 incrRefCount(c
->argv
[j
]);
3492 incrRefCount(c
->argv
[j
+1]);
3494 removeExpire(c
->db
,c
->argv
[j
]);
3496 server
.dirty
+= (c
->argc
-1)/2;
3497 addReply(c
, nx
? shared
.cone
: shared
.ok
);
3500 static void msetCommand(redisClient
*c
) {
3501 msetGenericCommand(c
,0);
3504 static void msetnxCommand(redisClient
*c
) {
3505 msetGenericCommand(c
,1);
3508 static void incrDecrCommand(redisClient
*c
, long long incr
) {
3513 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3517 if (o
->type
!= REDIS_STRING
) {
3522 if (o
->encoding
== REDIS_ENCODING_RAW
)
3523 value
= strtoll(o
->ptr
, &eptr
, 10);
3524 else if (o
->encoding
== REDIS_ENCODING_INT
)
3525 value
= (long)o
->ptr
;
3527 redisAssert(1 != 1);
3532 o
= createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",value
));
3533 tryObjectEncoding(o
);
3534 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],o
);
3535 if (retval
== DICT_ERR
) {
3536 dictReplace(c
->db
->dict
,c
->argv
[1],o
);
3537 removeExpire(c
->db
,c
->argv
[1]);
3539 incrRefCount(c
->argv
[1]);
3542 addReply(c
,shared
.colon
);
3544 addReply(c
,shared
.crlf
);
3547 static void incrCommand(redisClient
*c
) {
3548 incrDecrCommand(c
,1);
3551 static void decrCommand(redisClient
*c
) {
3552 incrDecrCommand(c
,-1);
3555 static void incrbyCommand(redisClient
*c
) {
3556 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
3557 incrDecrCommand(c
,incr
);
3560 static void decrbyCommand(redisClient
*c
) {
3561 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
3562 incrDecrCommand(c
,-incr
);
3565 /* ========================= Type agnostic commands ========================= */
3567 static void delCommand(redisClient
*c
) {
3570 for (j
= 1; j
< c
->argc
; j
++) {
3571 if (deleteKey(c
->db
,c
->argv
[j
])) {
3578 addReply(c
,shared
.czero
);
3581 addReply(c
,shared
.cone
);
3584 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",deleted
));
3589 static void existsCommand(redisClient
*c
) {
3590 addReply(c
,lookupKeyRead(c
->db
,c
->argv
[1]) ? shared
.cone
: shared
.czero
);
3593 static void selectCommand(redisClient
*c
) {
3594 int id
= atoi(c
->argv
[1]->ptr
);
3596 if (selectDb(c
,id
) == REDIS_ERR
) {
3597 addReplySds(c
,sdsnew("-ERR invalid DB index\r\n"));
3599 addReply(c
,shared
.ok
);
3603 static void randomkeyCommand(redisClient
*c
) {
3607 de
= dictGetRandomKey(c
->db
->dict
);
3608 if (!de
|| expireIfNeeded(c
->db
,dictGetEntryKey(de
)) == 0) break;
3611 addReply(c
,shared
.plus
);
3612 addReply(c
,shared
.crlf
);
3614 addReply(c
,shared
.plus
);
3615 addReply(c
,dictGetEntryKey(de
));
3616 addReply(c
,shared
.crlf
);
3620 static void keysCommand(redisClient
*c
) {
3623 sds pattern
= c
->argv
[1]->ptr
;
3624 int plen
= sdslen(pattern
);
3625 unsigned long numkeys
= 0, keyslen
= 0;
3626 robj
*lenobj
= createObject(REDIS_STRING
,NULL
);
3628 di
= dictGetIterator(c
->db
->dict
);
3630 decrRefCount(lenobj
);
3631 while((de
= dictNext(di
)) != NULL
) {
3632 robj
*keyobj
= dictGetEntryKey(de
);
3634 sds key
= keyobj
->ptr
;
3635 if ((pattern
[0] == '*' && pattern
[1] == '\0') ||
3636 stringmatchlen(pattern
,plen
,key
,sdslen(key
),0)) {
3637 if (expireIfNeeded(c
->db
,keyobj
) == 0) {
3639 addReply(c
,shared
.space
);
3642 keyslen
+= sdslen(key
);
3646 dictReleaseIterator(di
);
3647 lenobj
->ptr
= sdscatprintf(sdsempty(),"$%lu\r\n",keyslen
+(numkeys
? (numkeys
-1) : 0));
3648 addReply(c
,shared
.crlf
);
3651 static void dbsizeCommand(redisClient
*c
) {
3653 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c
->db
->dict
)));
3656 static void lastsaveCommand(redisClient
*c
) {
3658 sdscatprintf(sdsempty(),":%lu\r\n",server
.lastsave
));
3661 static void typeCommand(redisClient
*c
) {
3665 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3670 case REDIS_STRING
: type
= "+string"; break;
3671 case REDIS_LIST
: type
= "+list"; break;
3672 case REDIS_SET
: type
= "+set"; break;
3673 case REDIS_ZSET
: type
= "+zset"; break;
3674 default: type
= "unknown"; break;
3677 addReplySds(c
,sdsnew(type
));
3678 addReply(c
,shared
.crlf
);
3681 static void saveCommand(redisClient
*c
) {
3682 if (server
.bgsavechildpid
!= -1) {
3683 addReplySds(c
,sdsnew("-ERR background save in progress\r\n"));
3686 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
3687 addReply(c
,shared
.ok
);
3689 addReply(c
,shared
.err
);
3693 static void bgsaveCommand(redisClient
*c
) {
3694 if (server
.bgsavechildpid
!= -1) {
3695 addReplySds(c
,sdsnew("-ERR background save already in progress\r\n"));
3698 if (rdbSaveBackground(server
.dbfilename
) == REDIS_OK
) {
3699 char *status
= "+Background saving started\r\n";
3700 addReplySds(c
,sdsnew(status
));
3702 addReply(c
,shared
.err
);
3706 static void shutdownCommand(redisClient
*c
) {
3707 redisLog(REDIS_WARNING
,"User requested shutdown, saving DB...");
3708 /* Kill the saving child if there is a background saving in progress.
3709 We want to avoid race conditions, for instance our saving child may
3710 overwrite the synchronous saving did by SHUTDOWN. */
3711 if (server
.bgsavechildpid
!= -1) {
3712 redisLog(REDIS_WARNING
,"There is a live saving child. Killing it!");
3713 kill(server
.bgsavechildpid
,SIGKILL
);
3714 rdbRemoveTempFile(server
.bgsavechildpid
);
3716 if (server
.appendonly
) {
3717 /* Append only file: fsync() the AOF and exit */
3718 fsync(server
.appendfd
);
3721 /* Snapshotting. Perform a SYNC SAVE and exit */
3722 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
3723 if (server
.daemonize
)
3724 unlink(server
.pidfile
);
3725 redisLog(REDIS_WARNING
,"%zu bytes used at exit",zmalloc_used_memory());
3726 redisLog(REDIS_WARNING
,"Server exit now, bye bye...");
3729 /* Ooops.. error saving! The best we can do is to continue operating.
3730 * Note that if there was a background saving process, in the next
3731 * cron() Redis will be notified that the background saving aborted,
3732 * handling special stuff like slaves pending for synchronization... */
3733 redisLog(REDIS_WARNING
,"Error trying to save the DB, can't exit");
3734 addReplySds(c
,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3739 static void renameGenericCommand(redisClient
*c
, int nx
) {
3742 /* To use the same key as src and dst is probably an error */
3743 if (sdscmp(c
->argv
[1]->ptr
,c
->argv
[2]->ptr
) == 0) {
3744 addReply(c
,shared
.sameobjecterr
);
3748 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3750 addReply(c
,shared
.nokeyerr
);
3754 deleteIfVolatile(c
->db
,c
->argv
[2]);
3755 if (dictAdd(c
->db
->dict
,c
->argv
[2],o
) == DICT_ERR
) {
3758 addReply(c
,shared
.czero
);
3761 dictReplace(c
->db
->dict
,c
->argv
[2],o
);
3763 incrRefCount(c
->argv
[2]);
3765 deleteKey(c
->db
,c
->argv
[1]);
3767 addReply(c
,nx
? shared
.cone
: shared
.ok
);
3770 static void renameCommand(redisClient
*c
) {
3771 renameGenericCommand(c
,0);
3774 static void renamenxCommand(redisClient
*c
) {
3775 renameGenericCommand(c
,1);
3778 static void moveCommand(redisClient
*c
) {
3783 /* Obtain source and target DB pointers */
3786 if (selectDb(c
,atoi(c
->argv
[2]->ptr
)) == REDIS_ERR
) {
3787 addReply(c
,shared
.outofrangeerr
);
3791 selectDb(c
,srcid
); /* Back to the source DB */
3793 /* If the user is moving using as target the same
3794 * DB as the source DB it is probably an error. */
3796 addReply(c
,shared
.sameobjecterr
);
3800 /* Check if the element exists and get a reference */
3801 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3803 addReply(c
,shared
.czero
);
3807 /* Try to add the element to the target DB */
3808 deleteIfVolatile(dst
,c
->argv
[1]);
3809 if (dictAdd(dst
->dict
,c
->argv
[1],o
) == DICT_ERR
) {
3810 addReply(c
,shared
.czero
);
3813 incrRefCount(c
->argv
[1]);
3816 /* OK! key moved, free the entry in the source DB */
3817 deleteKey(src
,c
->argv
[1]);
3819 addReply(c
,shared
.cone
);
3822 /* =================================== Lists ================================ */
3823 static void pushGenericCommand(redisClient
*c
, int where
) {
3827 lobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3829 if (handleClientsWaitingListPush(c
,c
->argv
[1],c
->argv
[2])) {
3830 addReply(c
,shared
.ok
);
3833 lobj
= createListObject();
3835 if (where
== REDIS_HEAD
) {
3836 listAddNodeHead(list
,c
->argv
[2]);
3838 listAddNodeTail(list
,c
->argv
[2]);
3840 dictAdd(c
->db
->dict
,c
->argv
[1],lobj
);
3841 incrRefCount(c
->argv
[1]);
3842 incrRefCount(c
->argv
[2]);
3844 if (lobj
->type
!= REDIS_LIST
) {
3845 addReply(c
,shared
.wrongtypeerr
);
3848 if (handleClientsWaitingListPush(c
,c
->argv
[1],c
->argv
[2])) {
3849 addReply(c
,shared
.ok
);
3853 if (where
== REDIS_HEAD
) {
3854 listAddNodeHead(list
,c
->argv
[2]);
3856 listAddNodeTail(list
,c
->argv
[2]);
3858 incrRefCount(c
->argv
[2]);
3861 addReply(c
,shared
.ok
);
3864 static void lpushCommand(redisClient
*c
) {
3865 pushGenericCommand(c
,REDIS_HEAD
);
3868 static void rpushCommand(redisClient
*c
) {
3869 pushGenericCommand(c
,REDIS_TAIL
);
3872 static void llenCommand(redisClient
*c
) {
3876 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3878 addReply(c
,shared
.czero
);
3881 if (o
->type
!= REDIS_LIST
) {
3882 addReply(c
,shared
.wrongtypeerr
);
3885 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",listLength(l
)));
3890 static void lindexCommand(redisClient
*c
) {
3892 int index
= atoi(c
->argv
[2]->ptr
);
3894 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3896 addReply(c
,shared
.nullbulk
);
3898 if (o
->type
!= REDIS_LIST
) {
3899 addReply(c
,shared
.wrongtypeerr
);
3901 list
*list
= o
->ptr
;
3904 ln
= listIndex(list
, index
);
3906 addReply(c
,shared
.nullbulk
);
3908 robj
*ele
= listNodeValue(ln
);
3909 addReplyBulkLen(c
,ele
);
3911 addReply(c
,shared
.crlf
);
3917 static void lsetCommand(redisClient
*c
) {
3919 int index
= atoi(c
->argv
[2]->ptr
);
3921 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3923 addReply(c
,shared
.nokeyerr
);
3925 if (o
->type
!= REDIS_LIST
) {
3926 addReply(c
,shared
.wrongtypeerr
);
3928 list
*list
= o
->ptr
;
3931 ln
= listIndex(list
, index
);
3933 addReply(c
,shared
.outofrangeerr
);
3935 robj
*ele
= listNodeValue(ln
);
3938 listNodeValue(ln
) = c
->argv
[3];
3939 incrRefCount(c
->argv
[3]);
3940 addReply(c
,shared
.ok
);
3947 static void popGenericCommand(redisClient
*c
, int where
) {
3950 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3952 addReply(c
,shared
.nullbulk
);
3954 if (o
->type
!= REDIS_LIST
) {
3955 addReply(c
,shared
.wrongtypeerr
);
3957 list
*list
= o
->ptr
;
3960 if (where
== REDIS_HEAD
)
3961 ln
= listFirst(list
);
3963 ln
= listLast(list
);
3966 addReply(c
,shared
.nullbulk
);
3968 robj
*ele
= listNodeValue(ln
);
3969 addReplyBulkLen(c
,ele
);
3971 addReply(c
,shared
.crlf
);
3972 listDelNode(list
,ln
);
3979 static void lpopCommand(redisClient
*c
) {
3980 popGenericCommand(c
,REDIS_HEAD
);
3983 static void rpopCommand(redisClient
*c
) {
3984 popGenericCommand(c
,REDIS_TAIL
);
3987 static void lrangeCommand(redisClient
*c
) {
3989 int start
= atoi(c
->argv
[2]->ptr
);
3990 int end
= atoi(c
->argv
[3]->ptr
);
3992 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3994 addReply(c
,shared
.nullmultibulk
);
3996 if (o
->type
!= REDIS_LIST
) {
3997 addReply(c
,shared
.wrongtypeerr
);
3999 list
*list
= o
->ptr
;
4001 int llen
= listLength(list
);
4005 /* convert negative indexes */
4006 if (start
< 0) start
= llen
+start
;
4007 if (end
< 0) end
= llen
+end
;
4008 if (start
< 0) start
= 0;
4009 if (end
< 0) end
= 0;
4011 /* indexes sanity checks */
4012 if (start
> end
|| start
>= llen
) {
4013 /* Out of range start or start > end result in empty list */
4014 addReply(c
,shared
.emptymultibulk
);
4017 if (end
>= llen
) end
= llen
-1;
4018 rangelen
= (end
-start
)+1;
4020 /* Return the result in form of a multi-bulk reply */
4021 ln
= listIndex(list
, start
);
4022 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",rangelen
));
4023 for (j
= 0; j
< rangelen
; j
++) {
4024 ele
= listNodeValue(ln
);
4025 addReplyBulkLen(c
,ele
);
4027 addReply(c
,shared
.crlf
);
4034 static void ltrimCommand(redisClient
*c
) {
4036 int start
= atoi(c
->argv
[2]->ptr
);
4037 int end
= atoi(c
->argv
[3]->ptr
);
4039 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4041 addReply(c
,shared
.ok
);
4043 if (o
->type
!= REDIS_LIST
) {
4044 addReply(c
,shared
.wrongtypeerr
);
4046 list
*list
= o
->ptr
;
4048 int llen
= listLength(list
);
4049 int j
, ltrim
, rtrim
;
4051 /* convert negative indexes */
4052 if (start
< 0) start
= llen
+start
;
4053 if (end
< 0) end
= llen
+end
;
4054 if (start
< 0) start
= 0;
4055 if (end
< 0) end
= 0;
4057 /* indexes sanity checks */
4058 if (start
> end
|| start
>= llen
) {
4059 /* Out of range start or start > end result in empty list */
4063 if (end
>= llen
) end
= llen
-1;
4068 /* Remove list elements to perform the trim */
4069 for (j
= 0; j
< ltrim
; j
++) {
4070 ln
= listFirst(list
);
4071 listDelNode(list
,ln
);
4073 for (j
= 0; j
< rtrim
; j
++) {
4074 ln
= listLast(list
);
4075 listDelNode(list
,ln
);
4078 addReply(c
,shared
.ok
);
4083 static void lremCommand(redisClient
*c
) {
4086 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4088 addReply(c
,shared
.czero
);
4090 if (o
->type
!= REDIS_LIST
) {
4091 addReply(c
,shared
.wrongtypeerr
);
4093 list
*list
= o
->ptr
;
4094 listNode
*ln
, *next
;
4095 int toremove
= atoi(c
->argv
[2]->ptr
);
4100 toremove
= -toremove
;
4103 ln
= fromtail
? list
->tail
: list
->head
;
4105 robj
*ele
= listNodeValue(ln
);
4107 next
= fromtail
? ln
->prev
: ln
->next
;
4108 if (compareStringObjects(ele
,c
->argv
[3]) == 0) {
4109 listDelNode(list
,ln
);
4112 if (toremove
&& removed
== toremove
) break;
4116 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",removed
));
4121 /* This is the semantic of this command:
4122 * RPOPLPUSH srclist dstlist:
4123 * IF LLEN(srclist) > 0
4124 * element = RPOP srclist
4125 * LPUSH dstlist element
4132 * The idea is to be able to get an element from a list in a reliable way
4133 * since the element is not just returned but pushed against another list
4134 * as well. This command was originally proposed by Ezra Zygmuntowicz.
4136 static void rpoplpushcommand(redisClient
*c
) {
4139 sobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4141 addReply(c
,shared
.nullbulk
);
4143 if (sobj
->type
!= REDIS_LIST
) {
4144 addReply(c
,shared
.wrongtypeerr
);
4146 list
*srclist
= sobj
->ptr
;
4147 listNode
*ln
= listLast(srclist
);
4150 addReply(c
,shared
.nullbulk
);
4152 robj
*dobj
= lookupKeyWrite(c
->db
,c
->argv
[2]);
4153 robj
*ele
= listNodeValue(ln
);
4156 if (dobj
&& dobj
->type
!= REDIS_LIST
) {
4157 addReply(c
,shared
.wrongtypeerr
);
4161 /* Add the element to the target list (unless it's directly
4162 * passed to some BLPOP-ing client */
4163 if (!handleClientsWaitingListPush(c
,c
->argv
[2],ele
)) {
4165 /* Create the list if the key does not exist */
4166 dobj
= createListObject();
4167 dictAdd(c
->db
->dict
,c
->argv
[2],dobj
);
4168 incrRefCount(c
->argv
[2]);
4170 dstlist
= dobj
->ptr
;
4171 listAddNodeHead(dstlist
,ele
);
4175 /* Send the element to the client as reply as well */
4176 addReplyBulkLen(c
,ele
);
4178 addReply(c
,shared
.crlf
);
4180 /* Finally remove the element from the source list */
4181 listDelNode(srclist
,ln
);
4189 /* ==================================== Sets ================================ */
4191 static void saddCommand(redisClient
*c
) {
4194 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4196 set
= createSetObject();
4197 dictAdd(c
->db
->dict
,c
->argv
[1],set
);
4198 incrRefCount(c
->argv
[1]);
4200 if (set
->type
!= REDIS_SET
) {
4201 addReply(c
,shared
.wrongtypeerr
);
4205 if (dictAdd(set
->ptr
,c
->argv
[2],NULL
) == DICT_OK
) {
4206 incrRefCount(c
->argv
[2]);
4208 addReply(c
,shared
.cone
);
4210 addReply(c
,shared
.czero
);
4214 static void sremCommand(redisClient
*c
) {
4217 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4219 addReply(c
,shared
.czero
);
4221 if (set
->type
!= REDIS_SET
) {
4222 addReply(c
,shared
.wrongtypeerr
);
4225 if (dictDelete(set
->ptr
,c
->argv
[2]) == DICT_OK
) {
4227 if (htNeedsResize(set
->ptr
)) dictResize(set
->ptr
);
4228 addReply(c
,shared
.cone
);
4230 addReply(c
,shared
.czero
);
4235 static void smoveCommand(redisClient
*c
) {
4236 robj
*srcset
, *dstset
;
4238 srcset
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4239 dstset
= lookupKeyWrite(c
->db
,c
->argv
[2]);
4241 /* If the source key does not exist return 0, if it's of the wrong type
4243 if (srcset
== NULL
|| srcset
->type
!= REDIS_SET
) {
4244 addReply(c
, srcset
? shared
.wrongtypeerr
: shared
.czero
);
4247 /* Error if the destination key is not a set as well */
4248 if (dstset
&& dstset
->type
!= REDIS_SET
) {
4249 addReply(c
,shared
.wrongtypeerr
);
4252 /* Remove the element from the source set */
4253 if (dictDelete(srcset
->ptr
,c
->argv
[3]) == DICT_ERR
) {
4254 /* Key not found in the src set! return zero */
4255 addReply(c
,shared
.czero
);
4259 /* Add the element to the destination set */
4261 dstset
= createSetObject();
4262 dictAdd(c
->db
->dict
,c
->argv
[2],dstset
);
4263 incrRefCount(c
->argv
[2]);
4265 if (dictAdd(dstset
->ptr
,c
->argv
[3],NULL
) == DICT_OK
)
4266 incrRefCount(c
->argv
[3]);
4267 addReply(c
,shared
.cone
);
4270 static void sismemberCommand(redisClient
*c
) {
4273 set
= lookupKeyRead(c
->db
,c
->argv
[1]);
4275 addReply(c
,shared
.czero
);
4277 if (set
->type
!= REDIS_SET
) {
4278 addReply(c
,shared
.wrongtypeerr
);
4281 if (dictFind(set
->ptr
,c
->argv
[2]))
4282 addReply(c
,shared
.cone
);
4284 addReply(c
,shared
.czero
);
4288 static void scardCommand(redisClient
*c
) {
4292 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4294 addReply(c
,shared
.czero
);
4297 if (o
->type
!= REDIS_SET
) {
4298 addReply(c
,shared
.wrongtypeerr
);
4301 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",
4307 static void spopCommand(redisClient
*c
) {
4311 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4313 addReply(c
,shared
.nullbulk
);
4315 if (set
->type
!= REDIS_SET
) {
4316 addReply(c
,shared
.wrongtypeerr
);
4319 de
= dictGetRandomKey(set
->ptr
);
4321 addReply(c
,shared
.nullbulk
);
4323 robj
*ele
= dictGetEntryKey(de
);
4325 addReplyBulkLen(c
,ele
);
4327 addReply(c
,shared
.crlf
);
4328 dictDelete(set
->ptr
,ele
);
4329 if (htNeedsResize(set
->ptr
)) dictResize(set
->ptr
);
4335 static void srandmemberCommand(redisClient
*c
) {
4339 set
= lookupKeyRead(c
->db
,c
->argv
[1]);
4341 addReply(c
,shared
.nullbulk
);
4343 if (set
->type
!= REDIS_SET
) {
4344 addReply(c
,shared
.wrongtypeerr
);
4347 de
= dictGetRandomKey(set
->ptr
);
4349 addReply(c
,shared
.nullbulk
);
4351 robj
*ele
= dictGetEntryKey(de
);
4353 addReplyBulkLen(c
,ele
);
4355 addReply(c
,shared
.crlf
);
4360 static int qsortCompareSetsByCardinality(const void *s1
, const void *s2
) {
4361 dict
**d1
= (void*) s1
, **d2
= (void*) s2
;
4363 return dictSize(*d1
)-dictSize(*d2
);
4366 static void sinterGenericCommand(redisClient
*c
, robj
**setskeys
, unsigned long setsnum
, robj
*dstkey
) {
4367 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
4370 robj
*lenobj
= NULL
, *dstset
= NULL
;
4371 unsigned long j
, cardinality
= 0;
4373 for (j
= 0; j
< setsnum
; j
++) {
4377 lookupKeyWrite(c
->db
,setskeys
[j
]) :
4378 lookupKeyRead(c
->db
,setskeys
[j
]);
4382 if (deleteKey(c
->db
,dstkey
))
4384 addReply(c
,shared
.czero
);
4386 addReply(c
,shared
.nullmultibulk
);
4390 if (setobj
->type
!= REDIS_SET
) {
4392 addReply(c
,shared
.wrongtypeerr
);
4395 dv
[j
] = setobj
->ptr
;
4397 /* Sort sets from the smallest to largest, this will improve our
4398 * algorithm's performace */
4399 qsort(dv
,setsnum
,sizeof(dict
*),qsortCompareSetsByCardinality
);
4401 /* The first thing we should output is the total number of elements...
4402 * since this is a multi-bulk write, but at this stage we don't know
4403 * the intersection set size, so we use a trick, append an empty object
4404 * to the output list and save the pointer to later modify it with the
4407 lenobj
= createObject(REDIS_STRING
,NULL
);
4409 decrRefCount(lenobj
);
4411 /* If we have a target key where to store the resulting set
4412 * create this key with an empty set inside */
4413 dstset
= createSetObject();
4416 /* Iterate all the elements of the first (smallest) set, and test
4417 * the element against all the other sets, if at least one set does
4418 * not include the element it is discarded */
4419 di
= dictGetIterator(dv
[0]);
4421 while((de
= dictNext(di
)) != NULL
) {
4424 for (j
= 1; j
< setsnum
; j
++)
4425 if (dictFind(dv
[j
],dictGetEntryKey(de
)) == NULL
) break;
4427 continue; /* at least one set does not contain the member */
4428 ele
= dictGetEntryKey(de
);
4430 addReplyBulkLen(c
,ele
);
4432 addReply(c
,shared
.crlf
);
4435 dictAdd(dstset
->ptr
,ele
,NULL
);
4439 dictReleaseIterator(di
);
4442 /* Store the resulting set into the target */
4443 deleteKey(c
->db
,dstkey
);
4444 dictAdd(c
->db
->dict
,dstkey
,dstset
);
4445 incrRefCount(dstkey
);
4449 lenobj
->ptr
= sdscatprintf(sdsempty(),"*%lu\r\n",cardinality
);
4451 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",
4452 dictSize((dict
*)dstset
->ptr
)));
4458 static void sinterCommand(redisClient
*c
) {
4459 sinterGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
);
4462 static void sinterstoreCommand(redisClient
*c
) {
4463 sinterGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1]);
4466 #define REDIS_OP_UNION 0
4467 #define REDIS_OP_DIFF 1
4469 static void sunionDiffGenericCommand(redisClient
*c
, robj
**setskeys
, int setsnum
, robj
*dstkey
, int op
) {
4470 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
4473 robj
*dstset
= NULL
;
4474 int j
, cardinality
= 0;
4476 for (j
= 0; j
< setsnum
; j
++) {
4480 lookupKeyWrite(c
->db
,setskeys
[j
]) :
4481 lookupKeyRead(c
->db
,setskeys
[j
]);
4486 if (setobj
->type
!= REDIS_SET
) {
4488 addReply(c
,shared
.wrongtypeerr
);
4491 dv
[j
] = setobj
->ptr
;
4494 /* We need a temp set object to store our union. If the dstkey
4495 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
4496 * this set object will be the resulting object to set into the target key*/
4497 dstset
= createSetObject();
4499 /* Iterate all the elements of all the sets, add every element a single
4500 * time to the result set */
4501 for (j
= 0; j
< setsnum
; j
++) {
4502 if (op
== REDIS_OP_DIFF
&& j
== 0 && !dv
[j
]) break; /* result set is empty */
4503 if (!dv
[j
]) continue; /* non existing keys are like empty sets */
4505 di
= dictGetIterator(dv
[j
]);
4507 while((de
= dictNext(di
)) != NULL
) {
4510 /* dictAdd will not add the same element multiple times */
4511 ele
= dictGetEntryKey(de
);
4512 if (op
== REDIS_OP_UNION
|| j
== 0) {
4513 if (dictAdd(dstset
->ptr
,ele
,NULL
) == DICT_OK
) {
4517 } else if (op
== REDIS_OP_DIFF
) {
4518 if (dictDelete(dstset
->ptr
,ele
) == DICT_OK
) {
4523 dictReleaseIterator(di
);
4525 if (op
== REDIS_OP_DIFF
&& cardinality
== 0) break; /* result set is empty */
4528 /* Output the content of the resulting set, if not in STORE mode */
4530 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",cardinality
));
4531 di
= dictGetIterator(dstset
->ptr
);
4532 while((de
= dictNext(di
)) != NULL
) {
4535 ele
= dictGetEntryKey(de
);
4536 addReplyBulkLen(c
,ele
);
4538 addReply(c
,shared
.crlf
);
4540 dictReleaseIterator(di
);
4542 /* If we have a target key where to store the resulting set
4543 * create this key with the result set inside */
4544 deleteKey(c
->db
,dstkey
);
4545 dictAdd(c
->db
->dict
,dstkey
,dstset
);
4546 incrRefCount(dstkey
);
4551 decrRefCount(dstset
);
4553 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",
4554 dictSize((dict
*)dstset
->ptr
)));
4560 static void sunionCommand(redisClient
*c
) {
4561 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_UNION
);
4564 static void sunionstoreCommand(redisClient
*c
) {
4565 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_UNION
);
4568 static void sdiffCommand(redisClient
*c
) {
4569 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_DIFF
);
4572 static void sdiffstoreCommand(redisClient
*c
) {
4573 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_DIFF
);
4576 /* ==================================== ZSets =============================== */
4578 /* ZSETs are ordered sets using two data structures to hold the same elements
4579 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
4582 * The elements are added to an hash table mapping Redis objects to scores.
4583 * At the same time the elements are added to a skip list mapping scores
4584 * to Redis objects (so objects are sorted by scores in this "view"). */
4586 /* This skiplist implementation is almost a C translation of the original
4587 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
4588 * Alternative to Balanced Trees", modified in three ways:
4589 * a) this implementation allows for repeated values.
4590 * b) the comparison is not just by key (our 'score') but by satellite data.
4591 * c) there is a back pointer, so it's a doubly linked list with the back
4592 * pointers being only at "level 1". This allows to traverse the list
4593 * from tail to head, useful for ZREVRANGE. */
4595 static zskiplistNode
*zslCreateNode(int level
, double score
, robj
*obj
) {
4596 zskiplistNode
*zn
= zmalloc(sizeof(*zn
));
4598 zn
->forward
= zmalloc(sizeof(zskiplistNode
*) * level
);
4604 static zskiplist
*zslCreate(void) {
4608 zsl
= zmalloc(sizeof(*zsl
));
4611 zsl
->header
= zslCreateNode(ZSKIPLIST_MAXLEVEL
,0,NULL
);
4612 for (j
= 0; j
< ZSKIPLIST_MAXLEVEL
; j
++)
4613 zsl
->header
->forward
[j
] = NULL
;
4614 zsl
->header
->backward
= NULL
;
4619 static void zslFreeNode(zskiplistNode
*node
) {
4620 decrRefCount(node
->obj
);
4621 zfree(node
->forward
);
4625 static void zslFree(zskiplist
*zsl
) {
4626 zskiplistNode
*node
= zsl
->header
->forward
[0], *next
;
4628 zfree(zsl
->header
->forward
);
4631 next
= node
->forward
[0];
4638 static int zslRandomLevel(void) {
4640 while ((random()&0xFFFF) < (ZSKIPLIST_P
* 0xFFFF))
4645 static void zslInsert(zskiplist
*zsl
, double score
, robj
*obj
) {
4646 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
4650 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4651 while (x
->forward
[i
] &&
4652 (x
->forward
[i
]->score
< score
||
4653 (x
->forward
[i
]->score
== score
&&
4654 compareStringObjects(x
->forward
[i
]->obj
,obj
) < 0)))
4658 /* we assume the key is not already inside, since we allow duplicated
4659 * scores, and the re-insertion of score and redis object should never
4660 * happpen since the caller of zslInsert() should test in the hash table
4661 * if the element is already inside or not. */
4662 level
= zslRandomLevel();
4663 if (level
> zsl
->level
) {
4664 for (i
= zsl
->level
; i
< level
; i
++)
4665 update
[i
] = zsl
->header
;
4668 x
= zslCreateNode(level
,score
,obj
);
4669 for (i
= 0; i
< level
; i
++) {
4670 x
->forward
[i
] = update
[i
]->forward
[i
];
4671 update
[i
]->forward
[i
] = x
;
4673 x
->backward
= (update
[0] == zsl
->header
) ? NULL
: update
[0];
4675 x
->forward
[0]->backward
= x
;
4681 /* Delete an element with matching score/object from the skiplist. */
4682 static int zslDelete(zskiplist
*zsl
, double score
, robj
*obj
) {
4683 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
4687 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4688 while (x
->forward
[i
] &&
4689 (x
->forward
[i
]->score
< score
||
4690 (x
->forward
[i
]->score
== score
&&
4691 compareStringObjects(x
->forward
[i
]->obj
,obj
) < 0)))
4695 /* We may have multiple elements with the same score, what we need
4696 * is to find the element with both the right score and object. */
4698 if (x
&& score
== x
->score
&& compareStringObjects(x
->obj
,obj
) == 0) {
4699 for (i
= 0; i
< zsl
->level
; i
++) {
4700 if (update
[i
]->forward
[i
] != x
) break;
4701 update
[i
]->forward
[i
] = x
->forward
[i
];
4703 if (x
->forward
[0]) {
4704 x
->forward
[0]->backward
= (x
->backward
== zsl
->header
) ?
4707 zsl
->tail
= x
->backward
;
4710 while(zsl
->level
> 1 && zsl
->header
->forward
[zsl
->level
-1] == NULL
)
4715 return 0; /* not found */
4717 return 0; /* not found */
4720 /* Delete all the elements with score between min and max from the skiplist.
4721 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
4722 * Note that this function takes the reference to the hash table view of the
4723 * sorted set, in order to remove the elements from the hash table too. */
4724 static unsigned long zslDeleteRange(zskiplist
*zsl
, double min
, double max
, dict
*dict
) {
4725 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
4726 unsigned long removed
= 0;
4730 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4731 while (x
->forward
[i
] && x
->forward
[i
]->score
< min
)
4735 /* We may have multiple elements with the same score, what we need
4736 * is to find the element with both the right score and object. */
4738 while (x
&& x
->score
<= max
) {
4739 zskiplistNode
*next
;
4741 for (i
= 0; i
< zsl
->level
; i
++) {
4742 if (update
[i
]->forward
[i
] != x
) break;
4743 update
[i
]->forward
[i
] = x
->forward
[i
];
4745 if (x
->forward
[0]) {
4746 x
->forward
[0]->backward
= (x
->backward
== zsl
->header
) ?
4749 zsl
->tail
= x
->backward
;
4751 next
= x
->forward
[0];
4752 dictDelete(dict
,x
->obj
);
4754 while(zsl
->level
> 1 && zsl
->header
->forward
[zsl
->level
-1] == NULL
)
4760 return removed
; /* not found */
4763 /* Find the first node having a score equal or greater than the specified one.
4764 * Returns NULL if there is no match. */
4765 static zskiplistNode
*zslFirstWithScore(zskiplist
*zsl
, double score
) {
4770 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4771 while (x
->forward
[i
] && x
->forward
[i
]->score
< score
)
4774 /* We may have multiple elements with the same score, what we need
4775 * is to find the element with both the right score and object. */
4776 return x
->forward
[0];
4779 /* The actual Z-commands implementations */
4781 /* This generic command implements both ZADD and ZINCRBY.
4782 * scoreval is the score if the operation is a ZADD (doincrement == 0) or
4783 * the increment if the operation is a ZINCRBY (doincrement == 1). */
4784 static void zaddGenericCommand(redisClient
*c
, robj
*key
, robj
*ele
, double scoreval
, int doincrement
) {
4789 zsetobj
= lookupKeyWrite(c
->db
,key
);
4790 if (zsetobj
== NULL
) {
4791 zsetobj
= createZsetObject();
4792 dictAdd(c
->db
->dict
,key
,zsetobj
);
4795 if (zsetobj
->type
!= REDIS_ZSET
) {
4796 addReply(c
,shared
.wrongtypeerr
);
4802 /* Ok now since we implement both ZADD and ZINCRBY here the code
4803 * needs to handle the two different conditions. It's all about setting
4804 * '*score', that is, the new score to set, to the right value. */
4805 score
= zmalloc(sizeof(double));
4809 /* Read the old score. If the element was not present starts from 0 */
4810 de
= dictFind(zs
->dict
,ele
);
4812 double *oldscore
= dictGetEntryVal(de
);
4813 *score
= *oldscore
+ scoreval
;
4821 /* What follows is a simple remove and re-insert operation that is common
4822 * to both ZADD and ZINCRBY... */
4823 if (dictAdd(zs
->dict
,ele
,score
) == DICT_OK
) {
4824 /* case 1: New element */
4825 incrRefCount(ele
); /* added to hash */
4826 zslInsert(zs
->zsl
,*score
,ele
);
4827 incrRefCount(ele
); /* added to skiplist */
4830 addReplyDouble(c
,*score
);
4832 addReply(c
,shared
.cone
);
4837 /* case 2: Score update operation */
4838 de
= dictFind(zs
->dict
,ele
);
4839 redisAssert(de
!= NULL
);
4840 oldscore
= dictGetEntryVal(de
);
4841 if (*score
!= *oldscore
) {
4844 /* Remove and insert the element in the skip list with new score */
4845 deleted
= zslDelete(zs
->zsl
,*oldscore
,ele
);
4846 redisAssert(deleted
!= 0);
4847 zslInsert(zs
->zsl
,*score
,ele
);
4849 /* Update the score in the hash table */
4850 dictReplace(zs
->dict
,ele
,score
);
4856 addReplyDouble(c
,*score
);
4858 addReply(c
,shared
.czero
);
4862 static void zaddCommand(redisClient
*c
) {
4865 scoreval
= strtod(c
->argv
[2]->ptr
,NULL
);
4866 zaddGenericCommand(c
,c
->argv
[1],c
->argv
[3],scoreval
,0);
4869 static void zincrbyCommand(redisClient
*c
) {
4872 scoreval
= strtod(c
->argv
[2]->ptr
,NULL
);
4873 zaddGenericCommand(c
,c
->argv
[1],c
->argv
[3],scoreval
,1);
4876 static void zremCommand(redisClient
*c
) {
4880 zsetobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4881 if (zsetobj
== NULL
) {
4882 addReply(c
,shared
.czero
);
4888 if (zsetobj
->type
!= REDIS_ZSET
) {
4889 addReply(c
,shared
.wrongtypeerr
);
4893 de
= dictFind(zs
->dict
,c
->argv
[2]);
4895 addReply(c
,shared
.czero
);
4898 /* Delete from the skiplist */
4899 oldscore
= dictGetEntryVal(de
);
4900 deleted
= zslDelete(zs
->zsl
,*oldscore
,c
->argv
[2]);
4901 redisAssert(deleted
!= 0);
4903 /* Delete from the hash table */
4904 dictDelete(zs
->dict
,c
->argv
[2]);
4905 if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
);
4907 addReply(c
,shared
.cone
);
4911 static void zremrangebyscoreCommand(redisClient
*c
) {
4912 double min
= strtod(c
->argv
[2]->ptr
,NULL
);
4913 double max
= strtod(c
->argv
[3]->ptr
,NULL
);
4917 zsetobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4918 if (zsetobj
== NULL
) {
4919 addReply(c
,shared
.czero
);
4923 if (zsetobj
->type
!= REDIS_ZSET
) {
4924 addReply(c
,shared
.wrongtypeerr
);
4928 deleted
= zslDeleteRange(zs
->zsl
,min
,max
,zs
->dict
);
4929 if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
);
4930 server
.dirty
+= deleted
;
4931 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",deleted
));
4935 static void zrangeGenericCommand(redisClient
*c
, int reverse
) {
4937 int start
= atoi(c
->argv
[2]->ptr
);
4938 int end
= atoi(c
->argv
[3]->ptr
);
4941 if (c
->argc
== 5 && !strcasecmp(c
->argv
[4]->ptr
,"withscores")) {
4943 } else if (c
->argc
>= 5) {
4944 addReply(c
,shared
.syntaxerr
);
4948 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4950 addReply(c
,shared
.nullmultibulk
);
4952 if (o
->type
!= REDIS_ZSET
) {
4953 addReply(c
,shared
.wrongtypeerr
);
4955 zset
*zsetobj
= o
->ptr
;
4956 zskiplist
*zsl
= zsetobj
->zsl
;
4959 int llen
= zsl
->length
;
4963 /* convert negative indexes */
4964 if (start
< 0) start
= llen
+start
;
4965 if (end
< 0) end
= llen
+end
;
4966 if (start
< 0) start
= 0;
4967 if (end
< 0) end
= 0;
4969 /* indexes sanity checks */
4970 if (start
> end
|| start
>= llen
) {
4971 /* Out of range start or start > end result in empty list */
4972 addReply(c
,shared
.emptymultibulk
);
4975 if (end
>= llen
) end
= llen
-1;
4976 rangelen
= (end
-start
)+1;
4978 /* Return the result in form of a multi-bulk reply */
4984 ln
= zsl
->header
->forward
[0];
4986 ln
= ln
->forward
[0];
4989 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",
4990 withscores
? (rangelen
*2) : rangelen
));
4991 for (j
= 0; j
< rangelen
; j
++) {
4993 addReplyBulkLen(c
,ele
);
4995 addReply(c
,shared
.crlf
);
4997 addReplyDouble(c
,ln
->score
);
4998 ln
= reverse
? ln
->backward
: ln
->forward
[0];
5004 static void zrangeCommand(redisClient
*c
) {
5005 zrangeGenericCommand(c
,0);
5008 static void zrevrangeCommand(redisClient
*c
) {
5009 zrangeGenericCommand(c
,1);
5012 static void zrangebyscoreCommand(redisClient
*c
) {
5014 double min
= strtod(c
->argv
[2]->ptr
,NULL
);
5015 double max
= strtod(c
->argv
[3]->ptr
,NULL
);
5016 int offset
= 0, limit
= -1;
5018 if (c
->argc
!= 4 && c
->argc
!= 7) {
5020 sdsnew("-ERR wrong number of arguments for ZRANGEBYSCORE\r\n"));
5022 } else if (c
->argc
== 7 && strcasecmp(c
->argv
[4]->ptr
,"limit")) {
5023 addReply(c
,shared
.syntaxerr
);
5025 } else if (c
->argc
== 7) {
5026 offset
= atoi(c
->argv
[5]->ptr
);
5027 limit
= atoi(c
->argv
[6]->ptr
);
5028 if (offset
< 0) offset
= 0;
5031 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
5033 addReply(c
,shared
.nullmultibulk
);
5035 if (o
->type
!= REDIS_ZSET
) {
5036 addReply(c
,shared
.wrongtypeerr
);
5038 zset
*zsetobj
= o
->ptr
;
5039 zskiplist
*zsl
= zsetobj
->zsl
;
5042 unsigned int rangelen
= 0;
5044 /* Get the first node with the score >= min */
5045 ln
= zslFirstWithScore(zsl
,min
);
5047 /* No element matching the speciifed interval */
5048 addReply(c
,shared
.emptymultibulk
);
5052 /* We don't know in advance how many matching elements there
5053 * are in the list, so we push this object that will represent
5054 * the multi-bulk length in the output buffer, and will "fix"
5056 lenobj
= createObject(REDIS_STRING
,NULL
);
5058 decrRefCount(lenobj
);
5060 while(ln
&& ln
->score
<= max
) {
5063 ln
= ln
->forward
[0];
5066 if (limit
== 0) break;
5068 addReplyBulkLen(c
,ele
);
5070 addReply(c
,shared
.crlf
);
5071 ln
= ln
->forward
[0];
5073 if (limit
> 0) limit
--;
5075 lenobj
->ptr
= sdscatprintf(sdsempty(),"*%d\r\n",rangelen
);
5080 static void zcardCommand(redisClient
*c
) {
5084 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
5086 addReply(c
,shared
.czero
);
5089 if (o
->type
!= REDIS_ZSET
) {
5090 addReply(c
,shared
.wrongtypeerr
);
5093 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",zs
->zsl
->length
));
5098 static void zscoreCommand(redisClient
*c
) {
5102 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
5104 addReply(c
,shared
.nullbulk
);
5107 if (o
->type
!= REDIS_ZSET
) {
5108 addReply(c
,shared
.wrongtypeerr
);
5113 de
= dictFind(zs
->dict
,c
->argv
[2]);
5115 addReply(c
,shared
.nullbulk
);
5117 double *score
= dictGetEntryVal(de
);
5119 addReplyDouble(c
,*score
);
5125 /* ========================= Non type-specific commands ==================== */
5127 static void flushdbCommand(redisClient
*c
) {
5128 server
.dirty
+= dictSize(c
->db
->dict
);
5129 dictEmpty(c
->db
->dict
);
5130 dictEmpty(c
->db
->expires
);
5131 addReply(c
,shared
.ok
);
5134 static void flushallCommand(redisClient
*c
) {
5135 server
.dirty
+= emptyDb();
5136 addReply(c
,shared
.ok
);
5137 rdbSave(server
.dbfilename
);
5141 static redisSortOperation
*createSortOperation(int type
, robj
*pattern
) {
5142 redisSortOperation
*so
= zmalloc(sizeof(*so
));
5144 so
->pattern
= pattern
;
5148 /* Return the value associated to the key with a name obtained
5149 * substituting the first occurence of '*' in 'pattern' with 'subst' */
5150 static robj
*lookupKeyByPattern(redisDb
*db
, robj
*pattern
, robj
*subst
) {
5154 int prefixlen
, sublen
, postfixlen
;
5155 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
5159 char buf
[REDIS_SORTKEY_MAX
+1];
5162 /* If the pattern is "#" return the substitution object itself in order
5163 * to implement the "SORT ... GET #" feature. */
5164 spat
= pattern
->ptr
;
5165 if (spat
[0] == '#' && spat
[1] == '\0') {
5169 /* The substitution object may be specially encoded. If so we create
5170 * a decoded object on the fly. Otherwise getDecodedObject will just
5171 * increment the ref count, that we'll decrement later. */
5172 subst
= getDecodedObject(subst
);
5175 if (sdslen(spat
)+sdslen(ssub
)-1 > REDIS_SORTKEY_MAX
) return NULL
;
5176 p
= strchr(spat
,'*');
5178 decrRefCount(subst
);
5183 sublen
= sdslen(ssub
);
5184 postfixlen
= sdslen(spat
)-(prefixlen
+1);
5185 memcpy(keyname
.buf
,spat
,prefixlen
);
5186 memcpy(keyname
.buf
+prefixlen
,ssub
,sublen
);
5187 memcpy(keyname
.buf
+prefixlen
+sublen
,p
+1,postfixlen
);
5188 keyname
.buf
[prefixlen
+sublen
+postfixlen
] = '\0';
5189 keyname
.len
= prefixlen
+sublen
+postfixlen
;
5191 initStaticStringObject(keyobj
,((char*)&keyname
)+(sizeof(long)*2))
5192 decrRefCount(subst
);
5194 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
5195 return lookupKeyRead(db
,&keyobj
);
5198 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
5199 * the additional parameter is not standard but a BSD-specific we have to
5200 * pass sorting parameters via the global 'server' structure */
5201 static int sortCompare(const void *s1
, const void *s2
) {
5202 const redisSortObject
*so1
= s1
, *so2
= s2
;
5205 if (!server
.sort_alpha
) {
5206 /* Numeric sorting. Here it's trivial as we precomputed scores */
5207 if (so1
->u
.score
> so2
->u
.score
) {
5209 } else if (so1
->u
.score
< so2
->u
.score
) {
5215 /* Alphanumeric sorting */
5216 if (server
.sort_bypattern
) {
5217 if (!so1
->u
.cmpobj
|| !so2
->u
.cmpobj
) {
5218 /* At least one compare object is NULL */
5219 if (so1
->u
.cmpobj
== so2
->u
.cmpobj
)
5221 else if (so1
->u
.cmpobj
== NULL
)
5226 /* We have both the objects, use strcoll */
5227 cmp
= strcoll(so1
->u
.cmpobj
->ptr
,so2
->u
.cmpobj
->ptr
);
5230 /* Compare elements directly */
5233 dec1
= getDecodedObject(so1
->obj
);
5234 dec2
= getDecodedObject(so2
->obj
);
5235 cmp
= strcoll(dec1
->ptr
,dec2
->ptr
);
5240 return server
.sort_desc
? -cmp
: cmp
;
5243 /* The SORT command is the most complex command in Redis. Warning: this code
5244 * is optimized for speed and a bit less for readability */
5245 static void sortCommand(redisClient
*c
) {
5248 int desc
= 0, alpha
= 0;
5249 int limit_start
= 0, limit_count
= -1, start
, end
;
5250 int j
, dontsort
= 0, vectorlen
;
5251 int getop
= 0; /* GET operation counter */
5252 robj
*sortval
, *sortby
= NULL
, *storekey
= NULL
;
5253 redisSortObject
*vector
; /* Resulting vector to sort */
5255 /* Lookup the key to sort. It must be of the right types */
5256 sortval
= lookupKeyRead(c
->db
,c
->argv
[1]);
5257 if (sortval
== NULL
) {
5258 addReply(c
,shared
.nullmultibulk
);
5261 if (sortval
->type
!= REDIS_SET
&& sortval
->type
!= REDIS_LIST
&&
5262 sortval
->type
!= REDIS_ZSET
)
5264 addReply(c
,shared
.wrongtypeerr
);
5268 /* Create a list of operations to perform for every sorted element.
5269 * Operations can be GET/DEL/INCR/DECR */
5270 operations
= listCreate();
5271 listSetFreeMethod(operations
,zfree
);
5274 /* Now we need to protect sortval incrementing its count, in the future
5275 * SORT may have options able to overwrite/delete keys during the sorting
5276 * and the sorted key itself may get destroied */
5277 incrRefCount(sortval
);
5279 /* The SORT command has an SQL-alike syntax, parse it */
5280 while(j
< c
->argc
) {
5281 int leftargs
= c
->argc
-j
-1;
5282 if (!strcasecmp(c
->argv
[j
]->ptr
,"asc")) {
5284 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"desc")) {
5286 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"alpha")) {
5288 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"limit") && leftargs
>= 2) {
5289 limit_start
= atoi(c
->argv
[j
+1]->ptr
);
5290 limit_count
= atoi(c
->argv
[j
+2]->ptr
);
5292 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"store") && leftargs
>= 1) {
5293 storekey
= c
->argv
[j
+1];
5295 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"by") && leftargs
>= 1) {
5296 sortby
= c
->argv
[j
+1];
5297 /* If the BY pattern does not contain '*', i.e. it is constant,
5298 * we don't need to sort nor to lookup the weight keys. */
5299 if (strchr(c
->argv
[j
+1]->ptr
,'*') == NULL
) dontsort
= 1;
5301 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"get") && leftargs
>= 1) {
5302 listAddNodeTail(operations
,createSortOperation(
5303 REDIS_SORT_GET
,c
->argv
[j
+1]));
5307 decrRefCount(sortval
);
5308 listRelease(operations
);
5309 addReply(c
,shared
.syntaxerr
);
5315 /* Load the sorting vector with all the objects to sort */
5316 switch(sortval
->type
) {
5317 case REDIS_LIST
: vectorlen
= listLength((list
*)sortval
->ptr
); break;
5318 case REDIS_SET
: vectorlen
= dictSize((dict
*)sortval
->ptr
); break;
5319 case REDIS_ZSET
: vectorlen
= dictSize(((zset
*)sortval
->ptr
)->dict
); break;
5320 default: vectorlen
= 0; redisAssert(0); /* Avoid GCC warning */
5322 vector
= zmalloc(sizeof(redisSortObject
)*vectorlen
);
5325 if (sortval
->type
== REDIS_LIST
) {
5326 list
*list
= sortval
->ptr
;
5330 while((ln
= listYield(list
))) {
5331 robj
*ele
= ln
->value
;
5332 vector
[j
].obj
= ele
;
5333 vector
[j
].u
.score
= 0;
5334 vector
[j
].u
.cmpobj
= NULL
;
5342 if (sortval
->type
== REDIS_SET
) {
5345 zset
*zs
= sortval
->ptr
;
5349 di
= dictGetIterator(set
);
5350 while((setele
= dictNext(di
)) != NULL
) {
5351 vector
[j
].obj
= dictGetEntryKey(setele
);
5352 vector
[j
].u
.score
= 0;
5353 vector
[j
].u
.cmpobj
= NULL
;
5356 dictReleaseIterator(di
);
5358 redisAssert(j
== vectorlen
);
5360 /* Now it's time to load the right scores in the sorting vector */
5361 if (dontsort
== 0) {
5362 for (j
= 0; j
< vectorlen
; j
++) {
5366 byval
= lookupKeyByPattern(c
->db
,sortby
,vector
[j
].obj
);
5367 if (!byval
|| byval
->type
!= REDIS_STRING
) continue;
5369 vector
[j
].u
.cmpobj
= getDecodedObject(byval
);
5371 if (byval
->encoding
== REDIS_ENCODING_RAW
) {
5372 vector
[j
].u
.score
= strtod(byval
->ptr
,NULL
);
5374 /* Don't need to decode the object if it's
5375 * integer-encoded (the only encoding supported) so
5376 * far. We can just cast it */
5377 if (byval
->encoding
== REDIS_ENCODING_INT
) {
5378 vector
[j
].u
.score
= (long)byval
->ptr
;
5380 redisAssert(1 != 1);
5385 if (vector
[j
].obj
->encoding
== REDIS_ENCODING_RAW
)
5386 vector
[j
].u
.score
= strtod(vector
[j
].obj
->ptr
,NULL
);
5388 if (vector
[j
].obj
->encoding
== REDIS_ENCODING_INT
)
5389 vector
[j
].u
.score
= (long) vector
[j
].obj
->ptr
;
5391 redisAssert(1 != 1);
5398 /* We are ready to sort the vector... perform a bit of sanity check
5399 * on the LIMIT option too. We'll use a partial version of quicksort. */
5400 start
= (limit_start
< 0) ? 0 : limit_start
;
5401 end
= (limit_count
< 0) ? vectorlen
-1 : start
+limit_count
-1;
5402 if (start
>= vectorlen
) {
5403 start
= vectorlen
-1;
5406 if (end
>= vectorlen
) end
= vectorlen
-1;
5408 if (dontsort
== 0) {
5409 server
.sort_desc
= desc
;
5410 server
.sort_alpha
= alpha
;
5411 server
.sort_bypattern
= sortby
? 1 : 0;
5412 if (sortby
&& (start
!= 0 || end
!= vectorlen
-1))
5413 pqsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
, start
,end
);
5415 qsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
);
5418 /* Send command output to the output buffer, performing the specified
5419 * GET/DEL/INCR/DECR operations if any. */
5420 outputlen
= getop
? getop
*(end
-start
+1) : end
-start
+1;
5421 if (storekey
== NULL
) {
5422 /* STORE option not specified, sent the sorting result to client */
5423 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",outputlen
));
5424 for (j
= start
; j
<= end
; j
++) {
5427 addReplyBulkLen(c
,vector
[j
].obj
);
5428 addReply(c
,vector
[j
].obj
);
5429 addReply(c
,shared
.crlf
);
5431 listRewind(operations
);
5432 while((ln
= listYield(operations
))) {
5433 redisSortOperation
*sop
= ln
->value
;
5434 robj
*val
= lookupKeyByPattern(c
->db
,sop
->pattern
,
5437 if (sop
->type
== REDIS_SORT_GET
) {
5438 if (!val
|| val
->type
!= REDIS_STRING
) {
5439 addReply(c
,shared
.nullbulk
);
5441 addReplyBulkLen(c
,val
);
5443 addReply(c
,shared
.crlf
);
5446 redisAssert(sop
->type
== REDIS_SORT_GET
); /* always fails */
5451 robj
*listObject
= createListObject();
5452 list
*listPtr
= (list
*) listObject
->ptr
;
5454 /* STORE option specified, set the sorting result as a List object */
5455 for (j
= start
; j
<= end
; j
++) {
5458 listAddNodeTail(listPtr
,vector
[j
].obj
);
5459 incrRefCount(vector
[j
].obj
);
5461 listRewind(operations
);
5462 while((ln
= listYield(operations
))) {
5463 redisSortOperation
*sop
= ln
->value
;
5464 robj
*val
= lookupKeyByPattern(c
->db
,sop
->pattern
,
5467 if (sop
->type
== REDIS_SORT_GET
) {
5468 if (!val
|| val
->type
!= REDIS_STRING
) {
5469 listAddNodeTail(listPtr
,createStringObject("",0));
5471 listAddNodeTail(listPtr
,val
);
5475 redisAssert(sop
->type
== REDIS_SORT_GET
); /* always fails */
5479 if (dictReplace(c
->db
->dict
,storekey
,listObject
)) {
5480 incrRefCount(storekey
);
5482 /* Note: we add 1 because the DB is dirty anyway since even if the
5483 * SORT result is empty a new key is set and maybe the old content
5485 server
.dirty
+= 1+outputlen
;
5486 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",outputlen
));
5490 decrRefCount(sortval
);
5491 listRelease(operations
);
5492 for (j
= 0; j
< vectorlen
; j
++) {
5493 if (sortby
&& alpha
&& vector
[j
].u
.cmpobj
)
5494 decrRefCount(vector
[j
].u
.cmpobj
);
5499 /* Convert an amount of bytes into a human readable string in the form
5500 * of 100B, 2G, 100M, 4K, and so forth. */
5501 static void bytesToHuman(char *s
, unsigned long long n
) {
5506 sprintf(s
,"%lluB",n
);
5508 } else if (n
< (1024*1024)) {
5509 d
= (double)n
/(1024);
5510 sprintf(s
,"%.2fK",d
);
5511 } else if (n
< (1024LL*1024*1024)) {
5512 d
= (double)n
/(1024*1024);
5513 sprintf(s
,"%.2fM",d
);
5514 } else if (n
< (1024LL*1024*1024*1024)) {
5515 d
= (double)n
/(1024LL*1024*1024);
5516 sprintf(s
,"%.2fM",d
);
5520 /* Create the string returned by the INFO command. This is decoupled
5521 * by the INFO command itself as we need to report the same information
5522 * on memory corruption problems. */
5523 static sds
genRedisInfoString(void) {
5525 time_t uptime
= time(NULL
)-server
.stat_starttime
;
5529 bytesToHuman(hmem
,server
.usedmemory
);
5530 info
= sdscatprintf(sdsempty(),
5531 "redis_version:%s\r\n"
5533 "multiplexing_api:%s\r\n"
5534 "process_id:%ld\r\n"
5535 "uptime_in_seconds:%ld\r\n"
5536 "uptime_in_days:%ld\r\n"
5537 "connected_clients:%d\r\n"
5538 "connected_slaves:%d\r\n"
5539 "blocked_clients:%d\r\n"
5540 "used_memory:%zu\r\n"
5541 "used_memory_human:%s\r\n"
5542 "changes_since_last_save:%lld\r\n"
5543 "bgsave_in_progress:%d\r\n"
5544 "last_save_time:%ld\r\n"
5545 "bgrewriteaof_in_progress:%d\r\n"
5546 "total_connections_received:%lld\r\n"
5547 "total_commands_processed:%lld\r\n"
5551 (sizeof(long) == 8) ? "64" : "32",
5556 listLength(server
.clients
)-listLength(server
.slaves
),
5557 listLength(server
.slaves
),
5558 server
.blockedclients
,
5562 server
.bgsavechildpid
!= -1,
5564 server
.bgrewritechildpid
!= -1,
5565 server
.stat_numconnections
,
5566 server
.stat_numcommands
,
5567 server
.vm_enabled
!= 0,
5568 server
.masterhost
== NULL
? "master" : "slave"
5570 if (server
.masterhost
) {
5571 info
= sdscatprintf(info
,
5572 "master_host:%s\r\n"
5573 "master_port:%d\r\n"
5574 "master_link_status:%s\r\n"
5575 "master_last_io_seconds_ago:%d\r\n"
5578 (server
.replstate
== REDIS_REPL_CONNECTED
) ?
5580 server
.master
? ((int)(time(NULL
)-server
.master
->lastinteraction
)) : -1
5583 if (server
.vm_enabled
) {
5584 info
= sdscatprintf(info
,
5585 "vm_conf_max_memory:%llu\r\n"
5586 "vm_conf_page_size:%llu\r\n"
5587 "vm_conf_pages:%llu\r\n"
5588 "vm_stats_used_pages:%llu\r\n"
5589 "vm_stats_swapped_objects:%llu\r\n"
5590 "vm_stats_swappin_count:%llu\r\n"
5591 "vm_stats_swappout_count:%llu\r\n"
5592 ,(unsigned long long) server
.vm_max_memory
,
5593 (unsigned long long) server
.vm_page_size
,
5594 (unsigned long long) server
.vm_pages
,
5595 (unsigned long long) server
.vm_stats_used_pages
,
5596 (unsigned long long) server
.vm_stats_swapped_objects
,
5597 (unsigned long long) server
.vm_stats_swapins
,
5598 (unsigned long long) server
.vm_stats_swapouts
5601 for (j
= 0; j
< server
.dbnum
; j
++) {
5602 long long keys
, vkeys
;
5604 keys
= dictSize(server
.db
[j
].dict
);
5605 vkeys
= dictSize(server
.db
[j
].expires
);
5606 if (keys
|| vkeys
) {
5607 info
= sdscatprintf(info
, "db%d:keys=%lld,expires=%lld\r\n",
5614 static void infoCommand(redisClient
*c
) {
5615 sds info
= genRedisInfoString();
5616 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n",
5617 (unsigned long)sdslen(info
)));
5618 addReplySds(c
,info
);
5619 addReply(c
,shared
.crlf
);
5622 static void monitorCommand(redisClient
*c
) {
5623 /* ignore MONITOR if aleady slave or in monitor mode */
5624 if (c
->flags
& REDIS_SLAVE
) return;
5626 c
->flags
|= (REDIS_SLAVE
|REDIS_MONITOR
);
5628 listAddNodeTail(server
.monitors
,c
);
5629 addReply(c
,shared
.ok
);
5632 /* ================================= Expire ================================= */
5633 static int removeExpire(redisDb
*db
, robj
*key
) {
5634 if (dictDelete(db
->expires
,key
) == DICT_OK
) {
5641 static int setExpire(redisDb
*db
, robj
*key
, time_t when
) {
5642 if (dictAdd(db
->expires
,key
,(void*)when
) == DICT_ERR
) {
5650 /* Return the expire time of the specified key, or -1 if no expire
5651 * is associated with this key (i.e. the key is non volatile) */
5652 static time_t getExpire(redisDb
*db
, robj
*key
) {
5655 /* No expire? return ASAP */
5656 if (dictSize(db
->expires
) == 0 ||
5657 (de
= dictFind(db
->expires
,key
)) == NULL
) return -1;
5659 return (time_t) dictGetEntryVal(de
);
5662 static int expireIfNeeded(redisDb
*db
, robj
*key
) {
5666 /* No expire? return ASAP */
5667 if (dictSize(db
->expires
) == 0 ||
5668 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
5670 /* Lookup the expire */
5671 when
= (time_t) dictGetEntryVal(de
);
5672 if (time(NULL
) <= when
) return 0;
5674 /* Delete the key */
5675 dictDelete(db
->expires
,key
);
5676 return dictDelete(db
->dict
,key
) == DICT_OK
;
5679 static int deleteIfVolatile(redisDb
*db
, robj
*key
) {
5682 /* No expire? return ASAP */
5683 if (dictSize(db
->expires
) == 0 ||
5684 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
5686 /* Delete the key */
5688 dictDelete(db
->expires
,key
);
5689 return dictDelete(db
->dict
,key
) == DICT_OK
;
5692 static void expireGenericCommand(redisClient
*c
, robj
*key
, time_t seconds
) {
5695 de
= dictFind(c
->db
->dict
,key
);
5697 addReply(c
,shared
.czero
);
5701 if (deleteKey(c
->db
,key
)) server
.dirty
++;
5702 addReply(c
, shared
.cone
);
5705 time_t when
= time(NULL
)+seconds
;
5706 if (setExpire(c
->db
,key
,when
)) {
5707 addReply(c
,shared
.cone
);
5710 addReply(c
,shared
.czero
);
5716 static void expireCommand(redisClient
*c
) {
5717 expireGenericCommand(c
,c
->argv
[1],strtol(c
->argv
[2]->ptr
,NULL
,10));
5720 static void expireatCommand(redisClient
*c
) {
5721 expireGenericCommand(c
,c
->argv
[1],strtol(c
->argv
[2]->ptr
,NULL
,10)-time(NULL
));
5724 static void ttlCommand(redisClient
*c
) {
5728 expire
= getExpire(c
->db
,c
->argv
[1]);
5730 ttl
= (int) (expire
-time(NULL
));
5731 if (ttl
< 0) ttl
= -1;
5733 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",ttl
));
5736 /* ================================ MULTI/EXEC ============================== */
5738 /* Client state initialization for MULTI/EXEC */
5739 static void initClientMultiState(redisClient
*c
) {
5740 c
->mstate
.commands
= NULL
;
5741 c
->mstate
.count
= 0;
5744 /* Release all the resources associated with MULTI/EXEC state */
5745 static void freeClientMultiState(redisClient
*c
) {
5748 for (j
= 0; j
< c
->mstate
.count
; j
++) {
5750 multiCmd
*mc
= c
->mstate
.commands
+j
;
5752 for (i
= 0; i
< mc
->argc
; i
++)
5753 decrRefCount(mc
->argv
[i
]);
5756 zfree(c
->mstate
.commands
);
5759 /* Add a new command into the MULTI commands queue */
5760 static void queueMultiCommand(redisClient
*c
, struct redisCommand
*cmd
) {
5764 c
->mstate
.commands
= zrealloc(c
->mstate
.commands
,
5765 sizeof(multiCmd
)*(c
->mstate
.count
+1));
5766 mc
= c
->mstate
.commands
+c
->mstate
.count
;
5769 mc
->argv
= zmalloc(sizeof(robj
*)*c
->argc
);
5770 memcpy(mc
->argv
,c
->argv
,sizeof(robj
*)*c
->argc
);
5771 for (j
= 0; j
< c
->argc
; j
++)
5772 incrRefCount(mc
->argv
[j
]);
5776 static void multiCommand(redisClient
*c
) {
5777 c
->flags
|= REDIS_MULTI
;
5778 addReply(c
,shared
.ok
);
5781 static void execCommand(redisClient
*c
) {
5786 if (!(c
->flags
& REDIS_MULTI
)) {
5787 addReplySds(c
,sdsnew("-ERR EXEC without MULTI\r\n"));
5791 orig_argv
= c
->argv
;
5792 orig_argc
= c
->argc
;
5793 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",c
->mstate
.count
));
5794 for (j
= 0; j
< c
->mstate
.count
; j
++) {
5795 c
->argc
= c
->mstate
.commands
[j
].argc
;
5796 c
->argv
= c
->mstate
.commands
[j
].argv
;
5797 call(c
,c
->mstate
.commands
[j
].cmd
);
5799 c
->argv
= orig_argv
;
5800 c
->argc
= orig_argc
;
5801 freeClientMultiState(c
);
5802 initClientMultiState(c
);
5803 c
->flags
&= (~REDIS_MULTI
);
5806 /* =========================== Blocking Operations ========================= */
5808 /* Currently Redis blocking operations support is limited to list POP ops,
5809 * so the current implementation is not fully generic, but it is also not
5810 * completely specific so it will not require a rewrite to support new
5811 * kind of blocking operations in the future.
5813 * Still it's important to note that list blocking operations can be already
5814 * used as a notification mechanism in order to implement other blocking
5815 * operations at application level, so there must be a very strong evidence
5816 * of usefulness and generality before new blocking operations are implemented.
5818 * This is how the current blocking POP works, we use BLPOP as example:
5819 * - If the user calls BLPOP and the key exists and contains a non empty list
5820 * then LPOP is called instead. So BLPOP is semantically the same as LPOP
5821 * if there is not to block.
5822 * - If instead BLPOP is called and the key does not exists or the list is
5823 * empty we need to block. In order to do so we remove the notification for
5824 * new data to read in the client socket (so that we'll not serve new
5825 * requests if the blocking request is not served). Also we put the client
5826 * in a dictionary (db->blockingkeys) mapping keys to a list of clients
5827 * blocking for this keys.
5828 * - If a PUSH operation against a key with blocked clients waiting is
5829 * performed, we serve the first in the list: basically instead to push
5830 * the new element inside the list we return it to the (first / oldest)
5831 * blocking client, unblock the client, and remove it form the list.
5833 * The above comment and the source code should be enough in order to understand
5834 * the implementation and modify / fix it later.
5837 /* Set a client in blocking mode for the specified key, with the specified
5839 static void blockForKeys(redisClient
*c
, robj
**keys
, int numkeys
, time_t timeout
) {
5844 c
->blockingkeys
= zmalloc(sizeof(robj
*)*numkeys
);
5845 c
->blockingkeysnum
= numkeys
;
5846 c
->blockingto
= timeout
;
5847 for (j
= 0; j
< numkeys
; j
++) {
5848 /* Add the key in the client structure, to map clients -> keys */
5849 c
->blockingkeys
[j
] = keys
[j
];
5850 incrRefCount(keys
[j
]);
5852 /* And in the other "side", to map keys -> clients */
5853 de
= dictFind(c
->db
->blockingkeys
,keys
[j
]);
5857 /* For every key we take a list of clients blocked for it */
5859 retval
= dictAdd(c
->db
->blockingkeys
,keys
[j
],l
);
5860 incrRefCount(keys
[j
]);
5861 assert(retval
== DICT_OK
);
5863 l
= dictGetEntryVal(de
);
5865 listAddNodeTail(l
,c
);
5867 /* Mark the client as a blocked client */
5868 c
->flags
|= REDIS_BLOCKED
;
5869 aeDeleteFileEvent(server
.el
,c
->fd
,AE_READABLE
);
5870 server
.blockedclients
++;
5873 /* Unblock a client that's waiting in a blocking operation such as BLPOP */
5874 static void unblockClient(redisClient
*c
) {
5879 assert(c
->blockingkeys
!= NULL
);
5880 /* The client may wait for multiple keys, so unblock it for every key. */
5881 for (j
= 0; j
< c
->blockingkeysnum
; j
++) {
5882 /* Remove this client from the list of clients waiting for this key. */
5883 de
= dictFind(c
->db
->blockingkeys
,c
->blockingkeys
[j
]);
5885 l
= dictGetEntryVal(de
);
5886 listDelNode(l
,listSearchKey(l
,c
));
5887 /* If the list is empty we need to remove it to avoid wasting memory */
5888 if (listLength(l
) == 0)
5889 dictDelete(c
->db
->blockingkeys
,c
->blockingkeys
[j
]);
5890 decrRefCount(c
->blockingkeys
[j
]);
5892 /* Cleanup the client structure */
5893 zfree(c
->blockingkeys
);
5894 c
->blockingkeys
= NULL
;
5895 c
->flags
&= (~REDIS_BLOCKED
);
5896 server
.blockedclients
--;
5897 /* Ok now we are ready to get read events from socket, note that we
5898 * can't trap errors here as it's possible that unblockClients() is
5899 * called from freeClient() itself, and the only thing we can do
5900 * if we failed to register the READABLE event is to kill the client.
5901 * Still the following function should never fail in the real world as
5902 * we are sure the file descriptor is sane, and we exit on out of mem. */
5903 aeCreateFileEvent(server
.el
, c
->fd
, AE_READABLE
, readQueryFromClient
, c
);
5904 /* As a final step we want to process data if there is some command waiting
5905 * in the input buffer. Note that this is safe even if unblockClient()
5906 * gets called from freeClient() because freeClient() will be smart
5907 * enough to call this function *after* c->querybuf was set to NULL. */
5908 if (c
->querybuf
&& sdslen(c
->querybuf
) > 0) processInputBuffer(c
);
5911 /* This should be called from any function PUSHing into lists.
5912 * 'c' is the "pushing client", 'key' is the key it is pushing data against,
5913 * 'ele' is the element pushed.
5915 * If the function returns 0 there was no client waiting for a list push
5918 * If the function returns 1 there was a client waiting for a list push
5919 * against this key, the element was passed to this client thus it's not
5920 * needed to actually add it to the list and the caller should return asap. */
5921 static int handleClientsWaitingListPush(redisClient
*c
, robj
*key
, robj
*ele
) {
5922 struct dictEntry
*de
;
5923 redisClient
*receiver
;
5927 de
= dictFind(c
->db
->blockingkeys
,key
);
5928 if (de
== NULL
) return 0;
5929 l
= dictGetEntryVal(de
);
5932 receiver
= ln
->value
;
5934 addReplySds(receiver
,sdsnew("*2\r\n"));
5935 addReplyBulkLen(receiver
,key
);
5936 addReply(receiver
,key
);
5937 addReply(receiver
,shared
.crlf
);
5938 addReplyBulkLen(receiver
,ele
);
5939 addReply(receiver
,ele
);
5940 addReply(receiver
,shared
.crlf
);
5941 unblockClient(receiver
);
5945 /* Blocking RPOP/LPOP */
5946 static void blockingPopGenericCommand(redisClient
*c
, int where
) {
5951 for (j
= 1; j
< c
->argc
-1; j
++) {
5952 o
= lookupKeyWrite(c
->db
,c
->argv
[j
]);
5954 if (o
->type
!= REDIS_LIST
) {
5955 addReply(c
,shared
.wrongtypeerr
);
5958 list
*list
= o
->ptr
;
5959 if (listLength(list
) != 0) {
5960 /* If the list contains elements fall back to the usual
5961 * non-blocking POP operation */
5962 robj
*argv
[2], **orig_argv
;
5965 /* We need to alter the command arguments before to call
5966 * popGenericCommand() as the command takes a single key. */
5967 orig_argv
= c
->argv
;
5968 orig_argc
= c
->argc
;
5969 argv
[1] = c
->argv
[j
];
5973 /* Also the return value is different, we need to output
5974 * the multi bulk reply header and the key name. The
5975 * "real" command will add the last element (the value)
5976 * for us. If this souds like an hack to you it's just
5977 * because it is... */
5978 addReplySds(c
,sdsnew("*2\r\n"));
5979 addReplyBulkLen(c
,argv
[1]);
5980 addReply(c
,argv
[1]);
5981 addReply(c
,shared
.crlf
);
5982 popGenericCommand(c
,where
);
5984 /* Fix the client structure with the original stuff */
5985 c
->argv
= orig_argv
;
5986 c
->argc
= orig_argc
;
5992 /* If the list is empty or the key does not exists we must block */
5993 timeout
= strtol(c
->argv
[c
->argc
-1]->ptr
,NULL
,10);
5994 if (timeout
> 0) timeout
+= time(NULL
);
5995 blockForKeys(c
,c
->argv
+1,c
->argc
-2,timeout
);
5998 static void blpopCommand(redisClient
*c
) {
5999 blockingPopGenericCommand(c
,REDIS_HEAD
);
6002 static void brpopCommand(redisClient
*c
) {
6003 blockingPopGenericCommand(c
,REDIS_TAIL
);
6006 /* =============================== Replication ============================= */
6008 static int syncWrite(int fd
, char *ptr
, ssize_t size
, int timeout
) {
6009 ssize_t nwritten
, ret
= size
;
6010 time_t start
= time(NULL
);
6014 if (aeWait(fd
,AE_WRITABLE
,1000) & AE_WRITABLE
) {
6015 nwritten
= write(fd
,ptr
,size
);
6016 if (nwritten
== -1) return -1;
6020 if ((time(NULL
)-start
) > timeout
) {
6028 static int syncRead(int fd
, char *ptr
, ssize_t size
, int timeout
) {
6029 ssize_t nread
, totread
= 0;
6030 time_t start
= time(NULL
);
6034 if (aeWait(fd
,AE_READABLE
,1000) & AE_READABLE
) {
6035 nread
= read(fd
,ptr
,size
);
6036 if (nread
== -1) return -1;
6041 if ((time(NULL
)-start
) > timeout
) {
6049 static int syncReadLine(int fd
, char *ptr
, ssize_t size
, int timeout
) {
6056 if (syncRead(fd
,&c
,1,timeout
) == -1) return -1;
6059 if (nread
&& *(ptr
-1) == '\r') *(ptr
-1) = '\0';
6070 static void syncCommand(redisClient
*c
) {
6071 /* ignore SYNC if aleady slave or in monitor mode */
6072 if (c
->flags
& REDIS_SLAVE
) return;
6074 /* SYNC can't be issued when the server has pending data to send to
6075 * the client about already issued commands. We need a fresh reply
6076 * buffer registering the differences between the BGSAVE and the current
6077 * dataset, so that we can copy to other slaves if needed. */
6078 if (listLength(c
->reply
) != 0) {
6079 addReplySds(c
,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
6083 redisLog(REDIS_NOTICE
,"Slave ask for synchronization");
6084 /* Here we need to check if there is a background saving operation
6085 * in progress, or if it is required to start one */
6086 if (server
.bgsavechildpid
!= -1) {
6087 /* Ok a background save is in progress. Let's check if it is a good
6088 * one for replication, i.e. if there is another slave that is
6089 * registering differences since the server forked to save */
6093 listRewind(server
.slaves
);
6094 while((ln
= listYield(server
.slaves
))) {
6096 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) break;
6099 /* Perfect, the server is already registering differences for
6100 * another slave. Set the right state, and copy the buffer. */
6101 listRelease(c
->reply
);
6102 c
->reply
= listDup(slave
->reply
);
6103 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
6104 redisLog(REDIS_NOTICE
,"Waiting for end of BGSAVE for SYNC");
6106 /* No way, we need to wait for the next BGSAVE in order to
6107 * register differences */
6108 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_START
;
6109 redisLog(REDIS_NOTICE
,"Waiting for next BGSAVE for SYNC");
6112 /* Ok we don't have a BGSAVE in progress, let's start one */
6113 redisLog(REDIS_NOTICE
,"Starting BGSAVE for SYNC");
6114 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
6115 redisLog(REDIS_NOTICE
,"Replication failed, can't BGSAVE");
6116 addReplySds(c
,sdsnew("-ERR Unalbe to perform background save\r\n"));
6119 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
6122 c
->flags
|= REDIS_SLAVE
;
6124 listAddNodeTail(server
.slaves
,c
);
6128 static void sendBulkToSlave(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
6129 redisClient
*slave
= privdata
;
6131 REDIS_NOTUSED(mask
);
6132 char buf
[REDIS_IOBUF_LEN
];
6133 ssize_t nwritten
, buflen
;
6135 if (slave
->repldboff
== 0) {
6136 /* Write the bulk write count before to transfer the DB. In theory here
6137 * we don't know how much room there is in the output buffer of the
6138 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
6139 * operations) will never be smaller than the few bytes we need. */
6142 bulkcount
= sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
6144 if (write(fd
,bulkcount
,sdslen(bulkcount
)) != (signed)sdslen(bulkcount
))
6152 lseek(slave
->repldbfd
,slave
->repldboff
,SEEK_SET
);
6153 buflen
= read(slave
->repldbfd
,buf
,REDIS_IOBUF_LEN
);
6155 redisLog(REDIS_WARNING
,"Read error sending DB to slave: %s",
6156 (buflen
== 0) ? "premature EOF" : strerror(errno
));
6160 if ((nwritten
= write(fd
,buf
,buflen
)) == -1) {
6161 redisLog(REDIS_VERBOSE
,"Write error sending DB to slave: %s",
6166 slave
->repldboff
+= nwritten
;
6167 if (slave
->repldboff
== slave
->repldbsize
) {
6168 close(slave
->repldbfd
);
6169 slave
->repldbfd
= -1;
6170 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
6171 slave
->replstate
= REDIS_REPL_ONLINE
;
6172 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
,
6173 sendReplyToClient
, slave
) == AE_ERR
) {
6177 addReplySds(slave
,sdsempty());
6178 redisLog(REDIS_NOTICE
,"Synchronization with slave succeeded");
6182 /* This function is called at the end of every backgrond saving.
6183 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
6184 * otherwise REDIS_ERR is passed to the function.
6186 * The goal of this function is to handle slaves waiting for a successful
6187 * background saving in order to perform non-blocking synchronization. */
6188 static void updateSlavesWaitingBgsave(int bgsaveerr
) {
6190 int startbgsave
= 0;
6192 listRewind(server
.slaves
);
6193 while((ln
= listYield(server
.slaves
))) {
6194 redisClient
*slave
= ln
->value
;
6196 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) {
6198 slave
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
6199 } else if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) {
6200 struct redis_stat buf
;
6202 if (bgsaveerr
!= REDIS_OK
) {
6204 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE child returned an error");
6207 if ((slave
->repldbfd
= open(server
.dbfilename
,O_RDONLY
)) == -1 ||
6208 redis_fstat(slave
->repldbfd
,&buf
) == -1) {
6210 redisLog(REDIS_WARNING
,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno
));
6213 slave
->repldboff
= 0;
6214 slave
->repldbsize
= buf
.st_size
;
6215 slave
->replstate
= REDIS_REPL_SEND_BULK
;
6216 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
6217 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
, sendBulkToSlave
, slave
) == AE_ERR
) {
6224 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
6225 listRewind(server
.slaves
);
6226 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE failed");
6227 while((ln
= listYield(server
.slaves
))) {
6228 redisClient
*slave
= ln
->value
;
6230 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
)
6237 static int syncWithMaster(void) {
6238 char buf
[1024], tmpfile
[256], authcmd
[1024];
6240 int fd
= anetTcpConnect(NULL
,server
.masterhost
,server
.masterport
);
6244 redisLog(REDIS_WARNING
,"Unable to connect to MASTER: %s",
6249 /* AUTH with the master if required. */
6250 if(server
.masterauth
) {
6251 snprintf(authcmd
, 1024, "AUTH %s\r\n", server
.masterauth
);
6252 if (syncWrite(fd
, authcmd
, strlen(server
.masterauth
)+7, 5) == -1) {
6254 redisLog(REDIS_WARNING
,"Unable to AUTH to MASTER: %s",
6258 /* Read the AUTH result. */
6259 if (syncReadLine(fd
,buf
,1024,3600) == -1) {
6261 redisLog(REDIS_WARNING
,"I/O error reading auth result from MASTER: %s",
6265 if (buf
[0] != '+') {
6267 redisLog(REDIS_WARNING
,"Cannot AUTH to MASTER, is the masterauth password correct?");
6272 /* Issue the SYNC command */
6273 if (syncWrite(fd
,"SYNC \r\n",7,5) == -1) {
6275 redisLog(REDIS_WARNING
,"I/O error writing to MASTER: %s",
6279 /* Read the bulk write count */
6280 if (syncReadLine(fd
,buf
,1024,3600) == -1) {
6282 redisLog(REDIS_WARNING
,"I/O error reading bulk count from MASTER: %s",
6286 if (buf
[0] != '$') {
6288 redisLog(REDIS_WARNING
,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
6291 dumpsize
= atoi(buf
+1);
6292 redisLog(REDIS_NOTICE
,"Receiving %d bytes data dump from MASTER",dumpsize
);
6293 /* Read the bulk write data on a temp file */
6294 snprintf(tmpfile
,256,"temp-%d.%ld.rdb",(int)time(NULL
),(long int)random());
6295 dfd
= open(tmpfile
,O_CREAT
|O_WRONLY
,0644);
6298 redisLog(REDIS_WARNING
,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno
));
6302 int nread
, nwritten
;
6304 nread
= read(fd
,buf
,(dumpsize
< 1024)?dumpsize
:1024);
6306 redisLog(REDIS_WARNING
,"I/O error trying to sync with MASTER: %s",
6312 nwritten
= write(dfd
,buf
,nread
);
6313 if (nwritten
== -1) {
6314 redisLog(REDIS_WARNING
,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno
));
6322 if (rename(tmpfile
,server
.dbfilename
) == -1) {
6323 redisLog(REDIS_WARNING
,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno
));
6329 if (rdbLoad(server
.dbfilename
) != REDIS_OK
) {
6330 redisLog(REDIS_WARNING
,"Failed trying to load the MASTER synchronization DB from disk");
6334 server
.master
= createClient(fd
);
6335 server
.master
->flags
|= REDIS_MASTER
;
6336 server
.master
->authenticated
= 1;
6337 server
.replstate
= REDIS_REPL_CONNECTED
;
6341 static void slaveofCommand(redisClient
*c
) {
6342 if (!strcasecmp(c
->argv
[1]->ptr
,"no") &&
6343 !strcasecmp(c
->argv
[2]->ptr
,"one")) {
6344 if (server
.masterhost
) {
6345 sdsfree(server
.masterhost
);
6346 server
.masterhost
= NULL
;
6347 if (server
.master
) freeClient(server
.master
);
6348 server
.replstate
= REDIS_REPL_NONE
;
6349 redisLog(REDIS_NOTICE
,"MASTER MODE enabled (user request)");
6352 sdsfree(server
.masterhost
);
6353 server
.masterhost
= sdsdup(c
->argv
[1]->ptr
);
6354 server
.masterport
= atoi(c
->argv
[2]->ptr
);
6355 if (server
.master
) freeClient(server
.master
);
6356 server
.replstate
= REDIS_REPL_CONNECT
;
6357 redisLog(REDIS_NOTICE
,"SLAVE OF %s:%d enabled (user request)",
6358 server
.masterhost
, server
.masterport
);
6360 addReply(c
,shared
.ok
);
6363 /* ============================ Maxmemory directive ======================== */
6365 /* Free one object form the pre-allocated objects free list. This is useful
6366 * under low mem conditions as by default we take 1 million free objects
6368 static void freeOneObjectFromFreelist(void) {
6371 listNode
*head
= listFirst(server
.objfreelist
);
6372 o
= listNodeValue(head
);
6373 listDelNode(server
.objfreelist
,head
);
6377 /* This function gets called when 'maxmemory' is set on the config file to limit
6378 * the max memory used by the server, and we are out of memory.
6379 * This function will try to, in order:
6381 * - Free objects from the free list
6382 * - Try to remove keys with an EXPIRE set
6384 * It is not possible to free enough memory to reach used-memory < maxmemory
6385 * the server will start refusing commands that will enlarge even more the
6388 static void freeMemoryIfNeeded(void) {
6389 while (server
.maxmemory
&& zmalloc_used_memory() > server
.maxmemory
) {
6390 if (listLength(server
.objfreelist
)) {
6391 freeOneObjectFromFreelist();
6393 int j
, k
, freed
= 0;
6395 for (j
= 0; j
< server
.dbnum
; j
++) {
6397 robj
*minkey
= NULL
;
6398 struct dictEntry
*de
;
6400 if (dictSize(server
.db
[j
].expires
)) {
6402 /* From a sample of three keys drop the one nearest to
6403 * the natural expire */
6404 for (k
= 0; k
< 3; k
++) {
6407 de
= dictGetRandomKey(server
.db
[j
].expires
);
6408 t
= (time_t) dictGetEntryVal(de
);
6409 if (minttl
== -1 || t
< minttl
) {
6410 minkey
= dictGetEntryKey(de
);
6414 deleteKey(server
.db
+j
,minkey
);
6417 if (!freed
) return; /* nothing to free... */
6422 /* ============================== Append Only file ========================== */
6424 static void feedAppendOnlyFile(struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
) {
6425 sds buf
= sdsempty();
6431 /* The DB this command was targetting is not the same as the last command
6432 * we appendend. To issue a SELECT command is needed. */
6433 if (dictid
!= server
.appendseldb
) {
6436 snprintf(seldb
,sizeof(seldb
),"%d",dictid
);
6437 buf
= sdscatprintf(buf
,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
6438 (unsigned long)strlen(seldb
),seldb
);
6439 server
.appendseldb
= dictid
;
6442 /* "Fix" the argv vector if the command is EXPIRE. We want to translate
6443 * EXPIREs into EXPIREATs calls */
6444 if (cmd
->proc
== expireCommand
) {
6447 tmpargv
[0] = createStringObject("EXPIREAT",8);
6448 tmpargv
[1] = argv
[1];
6449 incrRefCount(argv
[1]);
6450 when
= time(NULL
)+strtol(argv
[2]->ptr
,NULL
,10);
6451 tmpargv
[2] = createObject(REDIS_STRING
,
6452 sdscatprintf(sdsempty(),"%ld",when
));
6456 /* Append the actual command */
6457 buf
= sdscatprintf(buf
,"*%d\r\n",argc
);
6458 for (j
= 0; j
< argc
; j
++) {
6461 o
= getDecodedObject(o
);
6462 buf
= sdscatprintf(buf
,"$%lu\r\n",(unsigned long)sdslen(o
->ptr
));
6463 buf
= sdscatlen(buf
,o
->ptr
,sdslen(o
->ptr
));
6464 buf
= sdscatlen(buf
,"\r\n",2);
6468 /* Free the objects from the modified argv for EXPIREAT */
6469 if (cmd
->proc
== expireCommand
) {
6470 for (j
= 0; j
< 3; j
++)
6471 decrRefCount(argv
[j
]);
6474 /* We want to perform a single write. This should be guaranteed atomic
6475 * at least if the filesystem we are writing is a real physical one.
6476 * While this will save us against the server being killed I don't think
6477 * there is much to do about the whole server stopping for power problems
6479 nwritten
= write(server
.appendfd
,buf
,sdslen(buf
));
6480 if (nwritten
!= (signed)sdslen(buf
)) {
6481 /* Ooops, we are in troubles. The best thing to do for now is
6482 * to simply exit instead to give the illusion that everything is
6483 * working as expected. */
6484 if (nwritten
== -1) {
6485 redisLog(REDIS_WARNING
,"Exiting on error writing to the append-only file: %s",strerror(errno
));
6487 redisLog(REDIS_WARNING
,"Exiting on short write while writing to the append-only file: %s",strerror(errno
));
6491 /* If a background append only file rewriting is in progress we want to
6492 * accumulate the differences between the child DB and the current one
6493 * in a buffer, so that when the child process will do its work we
6494 * can append the differences to the new append only file. */
6495 if (server
.bgrewritechildpid
!= -1)
6496 server
.bgrewritebuf
= sdscatlen(server
.bgrewritebuf
,buf
,sdslen(buf
));
6500 if (server
.appendfsync
== APPENDFSYNC_ALWAYS
||
6501 (server
.appendfsync
== APPENDFSYNC_EVERYSEC
&&
6502 now
-server
.lastfsync
> 1))
6504 fsync(server
.appendfd
); /* Let's try to get this data on the disk */
6505 server
.lastfsync
= now
;
6509 /* In Redis commands are always executed in the context of a client, so in
6510 * order to load the append only file we need to create a fake client. */
6511 static struct redisClient
*createFakeClient(void) {
6512 struct redisClient
*c
= zmalloc(sizeof(*c
));
6516 c
->querybuf
= sdsempty();
6520 /* We set the fake client as a slave waiting for the synchronization
6521 * so that Redis will not try to send replies to this client. */
6522 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_START
;
6523 c
->reply
= listCreate();
6524 listSetFreeMethod(c
->reply
,decrRefCount
);
6525 listSetDupMethod(c
->reply
,dupClientReplyValue
);
6529 static void freeFakeClient(struct redisClient
*c
) {
6530 sdsfree(c
->querybuf
);
6531 listRelease(c
->reply
);
6535 /* Replay the append log file. On error REDIS_OK is returned. On non fatal
6536 * error (the append only file is zero-length) REDIS_ERR is returned. On
6537 * fatal error an error message is logged and the program exists. */
6538 int loadAppendOnlyFile(char *filename
) {
6539 struct redisClient
*fakeClient
;
6540 FILE *fp
= fopen(filename
,"r");
6541 struct redis_stat sb
;
6542 unsigned long long loadedkeys
= 0;
6544 if (redis_fstat(fileno(fp
),&sb
) != -1 && sb
.st_size
== 0)
6548 redisLog(REDIS_WARNING
,"Fatal error: can't open the append log file for reading: %s",strerror(errno
));
6552 fakeClient
= createFakeClient();
6559 struct redisCommand
*cmd
;
6561 if (fgets(buf
,sizeof(buf
),fp
) == NULL
) {
6567 if (buf
[0] != '*') goto fmterr
;
6569 argv
= zmalloc(sizeof(robj
*)*argc
);
6570 for (j
= 0; j
< argc
; j
++) {
6571 if (fgets(buf
,sizeof(buf
),fp
) == NULL
) goto readerr
;
6572 if (buf
[0] != '$') goto fmterr
;
6573 len
= strtol(buf
+1,NULL
,10);
6574 argsds
= sdsnewlen(NULL
,len
);
6575 if (len
&& fread(argsds
,len
,1,fp
) == 0) goto fmterr
;
6576 argv
[j
] = createObject(REDIS_STRING
,argsds
);
6577 if (fread(buf
,2,1,fp
) == 0) goto fmterr
; /* discard CRLF */
6580 /* Command lookup */
6581 cmd
= lookupCommand(argv
[0]->ptr
);
6583 redisLog(REDIS_WARNING
,"Unknown command '%s' reading the append only file", argv
[0]->ptr
);
6586 /* Try object sharing and encoding */
6587 if (server
.shareobjects
) {
6589 for(j
= 1; j
< argc
; j
++)
6590 argv
[j
] = tryObjectSharing(argv
[j
]);
6592 if (cmd
->flags
& REDIS_CMD_BULK
)
6593 tryObjectEncoding(argv
[argc
-1]);
6594 /* Run the command in the context of a fake client */
6595 fakeClient
->argc
= argc
;
6596 fakeClient
->argv
= argv
;
6597 cmd
->proc(fakeClient
);
6598 /* Discard the reply objects list from the fake client */
6599 while(listLength(fakeClient
->reply
))
6600 listDelNode(fakeClient
->reply
,listFirst(fakeClient
->reply
));
6601 /* Clean up, ready for the next command */
6602 for (j
= 0; j
< argc
; j
++) decrRefCount(argv
[j
]);
6604 /* Handle swapping while loading big datasets when VM is on */
6606 if (server
.vm_enabled
&& (loadedkeys
% 5000) == 0) {
6607 while (zmalloc_used_memory() > server
.vm_max_memory
) {
6608 if (vmSwapOneObject() == REDIS_ERR
) break;
6613 freeFakeClient(fakeClient
);
6618 redisLog(REDIS_WARNING
,"Unexpected end of file reading the append only file");
6620 redisLog(REDIS_WARNING
,"Unrecoverable error reading the append only file: %s", strerror(errno
));
6624 redisLog(REDIS_WARNING
,"Bad file format reading the append only file");
6628 /* Write an object into a file in the bulk format $<count>\r\n<payload>\r\n */
6629 static int fwriteBulk(FILE *fp
, robj
*obj
) {
6631 obj
= getDecodedObject(obj
);
6632 snprintf(buf
,sizeof(buf
),"$%ld\r\n",(long)sdslen(obj
->ptr
));
6633 if (fwrite(buf
,strlen(buf
),1,fp
) == 0) goto err
;
6634 if (sdslen(obj
->ptr
) && fwrite(obj
->ptr
,sdslen(obj
->ptr
),1,fp
) == 0)
6636 if (fwrite("\r\n",2,1,fp
) == 0) goto err
;
6644 /* Write a double value in bulk format $<count>\r\n<payload>\r\n */
6645 static int fwriteBulkDouble(FILE *fp
, double d
) {
6646 char buf
[128], dbuf
[128];
6648 snprintf(dbuf
,sizeof(dbuf
),"%.17g\r\n",d
);
6649 snprintf(buf
,sizeof(buf
),"$%lu\r\n",(unsigned long)strlen(dbuf
)-2);
6650 if (fwrite(buf
,strlen(buf
),1,fp
) == 0) return 0;
6651 if (fwrite(dbuf
,strlen(dbuf
),1,fp
) == 0) return 0;
6655 /* Write a long value in bulk format $<count>\r\n<payload>\r\n */
6656 static int fwriteBulkLong(FILE *fp
, long l
) {
6657 char buf
[128], lbuf
[128];
6659 snprintf(lbuf
,sizeof(lbuf
),"%ld\r\n",l
);
6660 snprintf(buf
,sizeof(buf
),"$%lu\r\n",(unsigned long)strlen(lbuf
)-2);
6661 if (fwrite(buf
,strlen(buf
),1,fp
) == 0) return 0;
6662 if (fwrite(lbuf
,strlen(lbuf
),1,fp
) == 0) return 0;
6666 /* Write a sequence of commands able to fully rebuild the dataset into
6667 * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */
6668 static int rewriteAppendOnlyFile(char *filename
) {
6669 dictIterator
*di
= NULL
;
6674 time_t now
= time(NULL
);
6676 /* Note that we have to use a different temp name here compared to the
6677 * one used by rewriteAppendOnlyFileBackground() function. */
6678 snprintf(tmpfile
,256,"temp-rewriteaof-%d.aof", (int) getpid());
6679 fp
= fopen(tmpfile
,"w");
6681 redisLog(REDIS_WARNING
, "Failed rewriting the append only file: %s", strerror(errno
));
6684 for (j
= 0; j
< server
.dbnum
; j
++) {
6685 char selectcmd
[] = "*2\r\n$6\r\nSELECT\r\n";
6686 redisDb
*db
= server
.db
+j
;
6688 if (dictSize(d
) == 0) continue;
6689 di
= dictGetIterator(d
);
6695 /* SELECT the new DB */
6696 if (fwrite(selectcmd
,sizeof(selectcmd
)-1,1,fp
) == 0) goto werr
;
6697 if (fwriteBulkLong(fp
,j
) == 0) goto werr
;
6699 /* Iterate this DB writing every entry */
6700 while((de
= dictNext(di
)) != NULL
) {
6705 key
= dictGetEntryKey(de
);
6706 if (!server
.vm_enabled
|| key
->storage
== REDIS_VM_MEMORY
||
6707 key
->storage
== REDIS_VM_SWAPPING
) {
6708 o
= dictGetEntryVal(de
);
6711 o
= vmPreviewObject(key
);
6712 key
= dupStringObject(key
);
6715 expiretime
= getExpire(db
,key
);
6717 /* Save the key and associated value */
6718 if (o
->type
== REDIS_STRING
) {
6719 /* Emit a SET command */
6720 char cmd
[]="*3\r\n$3\r\nSET\r\n";
6721 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6723 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6724 if (fwriteBulk(fp
,o
) == 0) goto werr
;
6725 } else if (o
->type
== REDIS_LIST
) {
6726 /* Emit the RPUSHes needed to rebuild the list */
6727 list
*list
= o
->ptr
;
6731 while((ln
= listYield(list
))) {
6732 char cmd
[]="*3\r\n$5\r\nRPUSH\r\n";
6733 robj
*eleobj
= listNodeValue(ln
);
6735 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6736 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6737 if (fwriteBulk(fp
,eleobj
) == 0) goto werr
;
6739 } else if (o
->type
== REDIS_SET
) {
6740 /* Emit the SADDs needed to rebuild the set */
6742 dictIterator
*di
= dictGetIterator(set
);
6745 while((de
= dictNext(di
)) != NULL
) {
6746 char cmd
[]="*3\r\n$4\r\nSADD\r\n";
6747 robj
*eleobj
= dictGetEntryKey(de
);
6749 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6750 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6751 if (fwriteBulk(fp
,eleobj
) == 0) goto werr
;
6753 dictReleaseIterator(di
);
6754 } else if (o
->type
== REDIS_ZSET
) {
6755 /* Emit the ZADDs needed to rebuild the sorted set */
6757 dictIterator
*di
= dictGetIterator(zs
->dict
);
6760 while((de
= dictNext(di
)) != NULL
) {
6761 char cmd
[]="*4\r\n$4\r\nZADD\r\n";
6762 robj
*eleobj
= dictGetEntryKey(de
);
6763 double *score
= dictGetEntryVal(de
);
6765 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6766 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6767 if (fwriteBulkDouble(fp
,*score
) == 0) goto werr
;
6768 if (fwriteBulk(fp
,eleobj
) == 0) goto werr
;
6770 dictReleaseIterator(di
);
6772 redisAssert(0 != 0);
6774 /* Save the expire time */
6775 if (expiretime
!= -1) {
6776 char cmd
[]="*3\r\n$8\r\nEXPIREAT\r\n";
6777 /* If this key is already expired skip it */
6778 if (expiretime
< now
) continue;
6779 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6780 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6781 if (fwriteBulkLong(fp
,expiretime
) == 0) goto werr
;
6783 /* We created a few temp objects if the key->value pair
6784 * was about a swapped out object. Free both. */
6790 dictReleaseIterator(di
);
6793 /* Make sure data will not remain on the OS's output buffers */
6798 /* Use RENAME to make sure the DB file is changed atomically only
6799 * if the generate DB file is ok. */
6800 if (rename(tmpfile
,filename
) == -1) {
6801 redisLog(REDIS_WARNING
,"Error moving temp append only file on the final destination: %s", strerror(errno
));
6805 redisLog(REDIS_NOTICE
,"SYNC append only file rewrite performed");
6811 redisLog(REDIS_WARNING
,"Write error writing append only file on disk: %s", strerror(errno
));
6812 if (di
) dictReleaseIterator(di
);
6816 /* This is how rewriting of the append only file in background works:
6818 * 1) The user calls BGREWRITEAOF
6819 * 2) Redis calls this function, that forks():
6820 * 2a) the child rewrite the append only file in a temp file.
6821 * 2b) the parent accumulates differences in server.bgrewritebuf.
6822 * 3) When the child finished '2a' exists.
6823 * 4) The parent will trap the exit code, if it's OK, will append the
6824 * data accumulated into server.bgrewritebuf into the temp file, and
6825 * finally will rename(2) the temp file in the actual file name.
6826 * The the new file is reopened as the new append only file. Profit!
6828 static int rewriteAppendOnlyFileBackground(void) {
6831 if (server
.bgrewritechildpid
!= -1) return REDIS_ERR
;
6832 if ((childpid
= fork()) == 0) {
6837 snprintf(tmpfile
,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
6838 if (rewriteAppendOnlyFile(tmpfile
) == REDIS_OK
) {
6845 if (childpid
== -1) {
6846 redisLog(REDIS_WARNING
,
6847 "Can't rewrite append only file in background: fork: %s",
6851 redisLog(REDIS_NOTICE
,
6852 "Background append only file rewriting started by pid %d",childpid
);
6853 server
.bgrewritechildpid
= childpid
;
6854 /* We set appendseldb to -1 in order to force the next call to the
6855 * feedAppendOnlyFile() to issue a SELECT command, so the differences
6856 * accumulated by the parent into server.bgrewritebuf will start
6857 * with a SELECT statement and it will be safe to merge. */
6858 server
.appendseldb
= -1;
6861 return REDIS_OK
; /* unreached */
6864 static void bgrewriteaofCommand(redisClient
*c
) {
6865 if (server
.bgrewritechildpid
!= -1) {
6866 addReplySds(c
,sdsnew("-ERR background append only file rewriting already in progress\r\n"));
6869 if (rewriteAppendOnlyFileBackground() == REDIS_OK
) {
6870 char *status
= "+Background append only file rewriting started\r\n";
6871 addReplySds(c
,sdsnew(status
));
6873 addReply(c
,shared
.err
);
6877 static void aofRemoveTempFile(pid_t childpid
) {
6880 snprintf(tmpfile
,256,"temp-rewriteaof-bg-%d.aof", (int) childpid
);
6884 /* Virtual Memory is composed mainly of two subsystems:
6885 * - Blocking Virutal Memory
6886 * - Threaded Virtual Memory I/O
6887 * The two parts are not fully decoupled, but functions are split among two
6888 * different sections of the source code (delimited by comments) in order to
6889 * make more clear what functionality is about the blocking VM and what about
6890 * the threaded (not blocking) VM.
6894 * Redis VM is a blocking VM (one that blocks reading swapped values from
6895 * disk into memory when a value swapped out is needed in memory) that is made
6896 * unblocking by trying to examine the command argument vector in order to
6897 * load in background values that will likely be needed in order to exec
6898 * the command. The command is executed only once all the relevant keys
6899 * are loaded into memory.
6901 * This basically is almost as simple of a blocking VM, but almost as parallel
6902 * as a fully non-blocking VM.
6905 /* =================== Virtual Memory - Blocking Side ====================== */
6906 static void vmInit(void) {
6910 server
.vm_fp
= fopen("/tmp/redisvm","w+b");
6911 if (server
.vm_fp
== NULL
) {
6912 redisLog(REDIS_WARNING
,"Impossible to open the swap file. Exiting.");
6915 server
.vm_fd
= fileno(server
.vm_fp
);
6916 server
.vm_next_page
= 0;
6917 server
.vm_near_pages
= 0;
6918 server
.vm_stats_used_pages
= 0;
6919 server
.vm_stats_swapped_objects
= 0;
6920 server
.vm_stats_swapouts
= 0;
6921 server
.vm_stats_swapins
= 0;
6922 totsize
= server
.vm_pages
*server
.vm_page_size
;
6923 redisLog(REDIS_NOTICE
,"Allocating %lld bytes of swap file",totsize
);
6924 if (ftruncate(server
.vm_fd
,totsize
) == -1) {
6925 redisLog(REDIS_WARNING
,"Can't ftruncate swap file: %s. Exiting.",
6929 redisLog(REDIS_NOTICE
,"Swap file allocated with success");
6931 server
.vm_bitmap
= zmalloc((server
.vm_pages
+7)/8);
6932 redisLog(REDIS_VERBOSE
,"Allocated %lld bytes page table for %lld pages",
6933 (long long) (server
.vm_pages
+7)/8, server
.vm_pages
);
6934 memset(server
.vm_bitmap
,0,(server
.vm_pages
+7)/8);
6935 /* Try to remove the swap file, so the OS will really delete it from the
6936 * file system when Redis exists. */
6937 unlink("/tmp/redisvm");
6939 /* Initialize threaded I/O (used by Virtual Memory) */
6940 server
.io_newjobs
= listCreate();
6941 server
.io_processing
= listCreate();
6942 server
.io_processed
= listCreate();
6943 server
.io_clients
= listCreate();
6944 pthread_mutex_init(&server
.io_mutex
,NULL
);
6945 server
.io_active_threads
= 0;
6946 if (pipe(pipefds
) == -1) {
6947 redisLog(REDIS_WARNING
,"Unable to intialized VM: pipe(2): %s. Exiting."
6951 server
.io_ready_pipe_read
= pipefds
[0];
6952 server
.io_ready_pipe_write
= pipefds
[1];
6953 redisAssert(anetNonBlock(NULL
,server
.io_ready_pipe_read
) != ANET_ERR
);
6956 /* Mark the page as used */
6957 static void vmMarkPageUsed(off_t page
) {
6958 off_t byte
= page
/8;
6960 server
.vm_bitmap
[byte
] |= 1<<bit
;
6961 redisLog(REDIS_DEBUG
,"Mark used: %lld (byte:%lld bit:%d)\n",
6962 (long long)page
, (long long)byte
, bit
);
6965 /* Mark N contiguous pages as used, with 'page' being the first. */
6966 static void vmMarkPagesUsed(off_t page
, off_t count
) {
6969 for (j
= 0; j
< count
; j
++)
6970 vmMarkPageUsed(page
+j
);
6971 server
.vm_stats_used_pages
+= count
;
6974 /* Mark the page as free */
6975 static void vmMarkPageFree(off_t page
) {
6976 off_t byte
= page
/8;
6978 server
.vm_bitmap
[byte
] &= ~(1<<bit
);
6981 /* Mark N contiguous pages as free, with 'page' being the first. */
6982 static void vmMarkPagesFree(off_t page
, off_t count
) {
6985 for (j
= 0; j
< count
; j
++)
6986 vmMarkPageFree(page
+j
);
6987 server
.vm_stats_used_pages
-= count
;
6990 /* Test if the page is free */
6991 static int vmFreePage(off_t page
) {
6992 off_t byte
= page
/8;
6994 return (server
.vm_bitmap
[byte
] & (1<<bit
)) == 0;
6997 /* Find N contiguous free pages storing the first page of the cluster in *first.
6998 * Returns REDIS_OK if it was able to find N contiguous pages, otherwise
6999 * REDIS_ERR is returned.
7001 * This function uses a simple algorithm: we try to allocate
7002 * REDIS_VM_MAX_NEAR_PAGES sequentially, when we reach this limit we start
7003 * again from the start of the swap file searching for free spaces.
7005 * If it looks pretty clear that there are no free pages near our offset
7006 * we try to find less populated places doing a forward jump of
7007 * REDIS_VM_MAX_RANDOM_JUMP, then we start scanning again a few pages
7008 * without hurry, and then we jump again and so forth...
7010 * This function can be improved using a free list to avoid to guess
7011 * too much, since we could collect data about freed pages.
7013 * note: I implemented this function just after watching an episode of
7014 * Battlestar Galactica, where the hybrid was continuing to say "JUMP!"
7016 static int vmFindContiguousPages(off_t
*first
, int n
) {
7017 off_t base
, offset
= 0, since_jump
= 0, numfree
= 0;
7019 if (server
.vm_near_pages
== REDIS_VM_MAX_NEAR_PAGES
) {
7020 server
.vm_near_pages
= 0;
7021 server
.vm_next_page
= 0;
7023 server
.vm_near_pages
++; /* Yet another try for pages near to the old ones */
7024 base
= server
.vm_next_page
;
7026 while(offset
< server
.vm_pages
) {
7027 off_t
this = base
+offset
;
7029 redisLog(REDIS_DEBUG
, "THIS: %lld (%c)\n", (long long) this, vmFreePage(this) ? 'F' : 'X');
7030 /* If we overflow, restart from page zero */
7031 if (this >= server
.vm_pages
) {
7032 this -= server
.vm_pages
;
7034 /* Just overflowed, what we found on tail is no longer
7035 * interesting, as it's no longer contiguous. */
7039 if (vmFreePage(this)) {
7040 /* This is a free page */
7042 /* Already got N free pages? Return to the caller, with success */
7044 *first
= this-(n
-1);
7045 server
.vm_next_page
= this+1;
7049 /* The current one is not a free page */
7053 /* Fast-forward if the current page is not free and we already
7054 * searched enough near this place. */
7056 if (!numfree
&& since_jump
>= REDIS_VM_MAX_RANDOM_JUMP
/4) {
7057 offset
+= random() % REDIS_VM_MAX_RANDOM_JUMP
;
7059 /* Note that even if we rewind after the jump, we are don't need
7060 * to make sure numfree is set to zero as we only jump *if* it
7061 * is set to zero. */
7063 /* Otherwise just check the next page */
7070 /* Swap the 'val' object relative to 'key' into disk. Store all the information
7071 * needed to later retrieve the object into the key object.
7072 * If we can't find enough contiguous empty pages to swap the object on disk
7073 * REDIS_ERR is returned. */
7074 static int vmSwapObject(robj
*key
, robj
*val
) {
7075 off_t pages
= rdbSavedObjectPages(val
);
7078 assert(key
->storage
== REDIS_VM_MEMORY
);
7079 assert(key
->refcount
== 1);
7080 if (vmFindContiguousPages(&page
,pages
) == REDIS_ERR
) return REDIS_ERR
;
7081 if (fseeko(server
.vm_fp
,page
*server
.vm_page_size
,SEEK_SET
) == -1) {
7082 redisLog(REDIS_WARNING
,
7083 "Critical VM problem in vmSwapObject(): can't seek: %s",
7087 rdbSaveObject(server
.vm_fp
,val
);
7088 key
->vm
.page
= page
;
7089 key
->vm
.usedpages
= pages
;
7090 key
->storage
= REDIS_VM_SWAPPED
;
7091 key
->vtype
= val
->type
;
7092 decrRefCount(val
); /* Deallocate the object from memory. */
7093 vmMarkPagesUsed(page
,pages
);
7094 redisLog(REDIS_DEBUG
,"VM: object %s swapped out at %lld (%lld pages)",
7095 (unsigned char*) key
->ptr
,
7096 (unsigned long long) page
, (unsigned long long) pages
);
7097 server
.vm_stats_swapped_objects
++;
7098 server
.vm_stats_swapouts
++;
7099 fflush(server
.vm_fp
);
7103 /* Load the value object relative to the 'key' object from swap to memory.
7104 * The newly allocated object is returned.
7106 * If preview is true the unserialized object is returned to the caller but
7107 * no changes are made to the key object, nor the pages are marked as freed */
7108 static robj
*vmGenericLoadObject(robj
*key
, int preview
) {
7111 redisAssert(key
->storage
== REDIS_VM_SWAPPED
);
7112 if (fseeko(server
.vm_fp
,key
->vm
.page
*server
.vm_page_size
,SEEK_SET
) == -1) {
7113 redisLog(REDIS_WARNING
,
7114 "Unrecoverable VM problem in vmLoadObject(): can't seek: %s",
7118 val
= rdbLoadObject(key
->vtype
,server
.vm_fp
);
7120 redisLog(REDIS_WARNING
, "Unrecoverable VM problem in vmLoadObject(): can't load object from swap file: %s", strerror(errno
));
7124 key
->storage
= REDIS_VM_MEMORY
;
7125 key
->vm
.atime
= server
.unixtime
;
7126 vmMarkPagesFree(key
->vm
.page
,key
->vm
.usedpages
);
7127 redisLog(REDIS_DEBUG
, "VM: object %s loaded from disk",
7128 (unsigned char*) key
->ptr
);
7129 server
.vm_stats_swapped_objects
--;
7131 redisLog(REDIS_DEBUG
, "VM: object %s previewed from disk",
7132 (unsigned char*) key
->ptr
);
7134 server
.vm_stats_swapins
++;
7138 /* Plain object loading, from swap to memory */
7139 static robj
*vmLoadObject(robj
*key
) {
7140 /* If we are loading the object in background, stop it, we
7141 * need to load this object synchronously ASAP. */
7142 if (key
->storage
== REDIS_VM_LOADING
)
7143 vmCancelThreadedIOJob(key
);
7144 return vmGenericLoadObject(key
,0);
7147 /* Just load the value on disk, without to modify the key.
7148 * This is useful when we want to perform some operation on the value
7149 * without to really bring it from swap to memory, like while saving the
7150 * dataset or rewriting the append only log. */
7151 static robj
*vmPreviewObject(robj
*key
) {
7152 return vmGenericLoadObject(key
,1);
7155 /* How a good candidate is this object for swapping?
7156 * The better candidate it is, the greater the returned value.
7158 * Currently we try to perform a fast estimation of the object size in
7159 * memory, and combine it with aging informations.
7161 * Basically swappability = idle-time * log(estimated size)
7163 * Bigger objects are preferred over smaller objects, but not
7164 * proportionally, this is why we use the logarithm. This algorithm is
7165 * just a first try and will probably be tuned later. */
7166 static double computeObjectSwappability(robj
*o
) {
7167 time_t age
= server
.unixtime
- o
->vm
.atime
;
7171 struct dictEntry
*de
;
7174 if (age
<= 0) return 0;
7177 if (o
->encoding
!= REDIS_ENCODING_RAW
) {
7180 asize
= sdslen(o
->ptr
)+sizeof(*o
)+sizeof(long)*2;
7185 listNode
*ln
= listFirst(l
);
7187 asize
= sizeof(list
);
7189 robj
*ele
= ln
->value
;
7192 elesize
= (ele
->encoding
== REDIS_ENCODING_RAW
) ?
7193 (sizeof(*o
)+sdslen(ele
->ptr
)) :
7195 asize
+= (sizeof(listNode
)+elesize
)*listLength(l
);
7200 z
= (o
->type
== REDIS_ZSET
);
7201 d
= z
? ((zset
*)o
->ptr
)->dict
: o
->ptr
;
7203 asize
= sizeof(dict
)+(sizeof(struct dictEntry
*)*dictSlots(d
));
7204 if (z
) asize
+= sizeof(zset
)-sizeof(dict
);
7209 de
= dictGetRandomKey(d
);
7210 ele
= dictGetEntryKey(de
);
7211 elesize
= (ele
->encoding
== REDIS_ENCODING_RAW
) ?
7212 (sizeof(*o
)+sdslen(ele
->ptr
)) :
7214 asize
+= (sizeof(struct dictEntry
)+elesize
)*dictSize(d
);
7215 if (z
) asize
+= sizeof(zskiplistNode
)*dictSize(d
);
7219 return (double)asize
*log(1+asize
);
7222 /* Try to swap an object that's a good candidate for swapping.
7223 * Returns REDIS_OK if the object was swapped, REDIS_ERR if it's not possible
7224 * to swap any object at all. */
7225 static int vmSwapOneObject(void) {
7227 struct dictEntry
*best
= NULL
;
7228 double best_swappability
= 0;
7231 for (j
= 0; j
< server
.dbnum
; j
++) {
7232 redisDb
*db
= server
.db
+j
;
7233 int maxtries
= 1000;
7235 if (dictSize(db
->dict
) == 0) continue;
7236 for (i
= 0; i
< 5; i
++) {
7238 double swappability
;
7240 if (maxtries
) maxtries
--;
7241 de
= dictGetRandomKey(db
->dict
);
7242 key
= dictGetEntryKey(de
);
7243 val
= dictGetEntryVal(de
);
7244 if (key
->storage
!= REDIS_VM_MEMORY
) {
7245 if (maxtries
) i
--; /* don't count this try */
7248 swappability
= computeObjectSwappability(val
);
7249 if (!best
|| swappability
> best_swappability
) {
7251 best_swappability
= swappability
;
7256 redisLog(REDIS_DEBUG
,"No swappable key found!");
7259 key
= dictGetEntryKey(best
);
7260 val
= dictGetEntryVal(best
);
7262 redisLog(REDIS_DEBUG
,"Key with best swappability: %s, %f",
7263 key
->ptr
, best_swappability
);
7265 /* Unshare the key if needed */
7266 if (key
->refcount
> 1) {
7267 robj
*newkey
= dupStringObject(key
);
7269 key
= dictGetEntryKey(best
) = newkey
;
7272 if (vmSwapObject(key
,val
) == REDIS_OK
) {
7273 dictGetEntryVal(best
) = NULL
;
7280 /* Return true if it's safe to swap out objects in a given moment.
7281 * Basically we don't want to swap objects out while there is a BGSAVE
7282 * or a BGAEOREWRITE running in backgroud. */
7283 static int vmCanSwapOut(void) {
7284 return (server
.bgsavechildpid
== -1 && server
.bgrewritechildpid
== -1);
7287 /* Delete a key if swapped. Returns 1 if the key was found, was swapped
7288 * and was deleted. Otherwise 0 is returned. */
7289 static int deleteIfSwapped(redisDb
*db
, robj
*key
) {
7293 if ((de
= dictFind(db
->dict
,key
)) == NULL
) return 0;
7294 foundkey
= dictGetEntryKey(de
);
7295 if (foundkey
->storage
== REDIS_VM_MEMORY
) return 0;
7300 /* =================== Virtual Memory - Threaded I/O ======================= */
7302 /* Every time a thread finished a Job, it writes a byte into the write side
7303 * of an unix pipe in order to "awake" the main thread, and this function
7305 static void vmThreadedIOCompletedJob(aeEventLoop
*el
, int fd
, void *privdata
,
7311 REDIS_NOTUSED(mask
);
7312 REDIS_NOTUSED(privdata
);
7314 /* For every byte we read in the read side of the pipe, there is one
7315 * I/O job completed to process. */
7316 while((retval
= read(fd
,buf
,1)) == 1) {
7317 redisLog(REDIS_DEBUG
,"Processing I/O completed job");
7319 if (retval
< 0 && errno
!= EAGAIN
) {
7320 redisLog(REDIS_WARNING
,
7321 "WARNING: read(2) error in vmThreadedIOCompletedJob() %s",
7326 static void lockThreadedIO(void) {
7327 pthread_mutex_lock(&server
.io_mutex
);
7330 static void unlockThreadedIO(void) {
7331 pthread_mutex_unlock(&server
.io_mutex
);
7334 /* Remove the specified object from the threaded I/O queue if still not
7335 * processed, otherwise make sure to flag it as canceled. */
7336 static void vmCancelThreadedIOJob(robj
*o
) {
7338 server
.io_newjobs
, server
.io_processing
, server
.io_processed
7342 assert(o
->storage
== REDIS_VM_LOADING
|| o
->storage
== REDIS_VM_SWAPPING
);
7344 /* Search for a matching key in one of the queues */
7345 for (i
= 0; i
< 3; i
++) {
7348 listRewind(lists
[i
]);
7349 while ((ln
= listYield(lists
[i
])) != NULL
) {
7350 iojob
*job
= ln
->value
;
7352 if (compareStringObjects(job
->key
,o
) == 0) {
7354 case 0: /* io_newjobs */
7355 /* If the job was not yet processed the best thing to do
7356 * is to remove it from the queue at all */
7357 decrRefCount(job
->key
);
7358 if (job
->type
== REDIS_IOJOB_SWAP
)
7359 decrRefCount(job
->val
);
7360 listDelNode(lists
[i
],ln
);
7362 case 1: /* io_processing */
7363 case 2: /* io_processed */
7367 if (o
->storage
== REDIS_VM_LOADING
)
7368 o
->storage
= REDIS_VM_SWAPPED
;
7369 else if (o
->storage
== REDIS_VM_SWAPPING
)
7370 o
->storage
= REDIS_VM_MEMORY
;
7377 assert(1 != 1); /* We should never reach this */
7380 /* ================================= Debugging ============================== */
7382 static void debugCommand(redisClient
*c
) {
7383 if (!strcasecmp(c
->argv
[1]->ptr
,"segfault")) {
7385 } else if (!strcasecmp(c
->argv
[1]->ptr
,"reload")) {
7386 if (rdbSave(server
.dbfilename
) != REDIS_OK
) {
7387 addReply(c
,shared
.err
);
7391 if (rdbLoad(server
.dbfilename
) != REDIS_OK
) {
7392 addReply(c
,shared
.err
);
7395 redisLog(REDIS_WARNING
,"DB reloaded by DEBUG RELOAD");
7396 addReply(c
,shared
.ok
);
7397 } else if (!strcasecmp(c
->argv
[1]->ptr
,"loadaof")) {
7399 if (loadAppendOnlyFile(server
.appendfilename
) != REDIS_OK
) {
7400 addReply(c
,shared
.err
);
7403 redisLog(REDIS_WARNING
,"Append Only File loaded by DEBUG LOADAOF");
7404 addReply(c
,shared
.ok
);
7405 } else if (!strcasecmp(c
->argv
[1]->ptr
,"object") && c
->argc
== 3) {
7406 dictEntry
*de
= dictFind(c
->db
->dict
,c
->argv
[2]);
7410 addReply(c
,shared
.nokeyerr
);
7413 key
= dictGetEntryKey(de
);
7414 val
= dictGetEntryVal(de
);
7415 if (server
.vm_enabled
&& key
->storage
== REDIS_VM_MEMORY
) {
7416 addReplySds(c
,sdscatprintf(sdsempty(),
7417 "+Key at:%p refcount:%d, value at:%p refcount:%d "
7418 "encoding:%d serializedlength:%lld\r\n",
7419 (void*)key
, key
->refcount
, (void*)val
, val
->refcount
,
7420 val
->encoding
, rdbSavedObjectLen(val
)));
7422 addReplySds(c
,sdscatprintf(sdsempty(),
7423 "+Key at:%p refcount:%d, value swapped at: page %llu "
7424 "using %llu pages\r\n",
7425 (void*)key
, key
->refcount
, (unsigned long long) key
->vm
.page
,
7426 (unsigned long long) key
->vm
.usedpages
));
7428 } else if (!strcasecmp(c
->argv
[1]->ptr
,"swapout") && c
->argc
== 3) {
7429 dictEntry
*de
= dictFind(c
->db
->dict
,c
->argv
[2]);
7432 if (!server
.vm_enabled
) {
7433 addReplySds(c
,sdsnew("-ERR Virtual Memory is disabled\r\n"));
7437 addReply(c
,shared
.nokeyerr
);
7440 key
= dictGetEntryKey(de
);
7441 val
= dictGetEntryVal(de
);
7442 /* If the key is shared we want to create a copy */
7443 if (key
->refcount
> 1) {
7444 robj
*newkey
= dupStringObject(key
);
7446 key
= dictGetEntryKey(de
) = newkey
;
7449 if (key
->storage
!= REDIS_VM_MEMORY
) {
7450 addReplySds(c
,sdsnew("-ERR This key is not in memory\r\n"));
7451 } else if (vmSwapObject(key
,val
) == REDIS_OK
) {
7452 dictGetEntryVal(de
) = NULL
;
7453 addReply(c
,shared
.ok
);
7455 addReply(c
,shared
.err
);
7458 addReplySds(c
,sdsnew(
7459 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>|SWAPOUT <key>|RELOAD]\r\n"));
7463 static void _redisAssert(char *estr
) {
7464 redisLog(REDIS_WARNING
,"=== ASSERTION FAILED ===");
7465 redisLog(REDIS_WARNING
,"==> %s\n",estr
);
7466 #ifdef HAVE_BACKTRACE
7467 redisLog(REDIS_WARNING
,"(forcing SIGSEGV in order to print the stack trace)");
7472 /* =================================== Main! ================================ */
7475 int linuxOvercommitMemoryValue(void) {
7476 FILE *fp
= fopen("/proc/sys/vm/overcommit_memory","r");
7480 if (fgets(buf
,64,fp
) == NULL
) {
7489 void linuxOvercommitMemoryWarning(void) {
7490 if (linuxOvercommitMemoryValue() == 0) {
7491 redisLog(REDIS_WARNING
,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
7494 #endif /* __linux__ */
7496 static void daemonize(void) {
7500 if (fork() != 0) exit(0); /* parent exits */
7501 printf("New pid: %d\n", getpid());
7502 setsid(); /* create a new session */
7504 /* Every output goes to /dev/null. If Redis is daemonized but
7505 * the 'logfile' is set to 'stdout' in the configuration file
7506 * it will not log at all. */
7507 if ((fd
= open("/dev/null", O_RDWR
, 0)) != -1) {
7508 dup2(fd
, STDIN_FILENO
);
7509 dup2(fd
, STDOUT_FILENO
);
7510 dup2(fd
, STDERR_FILENO
);
7511 if (fd
> STDERR_FILENO
) close(fd
);
7513 /* Try to write the pid file */
7514 fp
= fopen(server
.pidfile
,"w");
7516 fprintf(fp
,"%d\n",getpid());
7521 int main(int argc
, char **argv
) {
7524 resetServerSaveParams();
7525 loadServerConfig(argv
[1]);
7526 } else if (argc
> 2) {
7527 fprintf(stderr
,"Usage: ./redis-server [/path/to/redis.conf]\n");
7530 redisLog(REDIS_WARNING
,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
7532 if (server
.daemonize
) daemonize();
7534 redisLog(REDIS_NOTICE
,"Server started, Redis version " REDIS_VERSION
);
7536 linuxOvercommitMemoryWarning();
7538 if (server
.appendonly
) {
7539 if (loadAppendOnlyFile(server
.appendfilename
) == REDIS_OK
)
7540 redisLog(REDIS_NOTICE
,"DB loaded from append only file");
7542 if (rdbLoad(server
.dbfilename
) == REDIS_OK
)
7543 redisLog(REDIS_NOTICE
,"DB loaded from disk");
7545 redisLog(REDIS_NOTICE
,"The server is now ready to accept connections on port %d", server
.port
);
7547 aeDeleteEventLoop(server
.el
);
7551 /* ============================= Backtrace support ========================= */
7553 #ifdef HAVE_BACKTRACE
7554 static char *findFuncName(void *pointer
, unsigned long *offset
);
7556 static void *getMcontextEip(ucontext_t
*uc
) {
7557 #if defined(__FreeBSD__)
7558 return (void*) uc
->uc_mcontext
.mc_eip
;
7559 #elif defined(__dietlibc__)
7560 return (void*) uc
->uc_mcontext
.eip
;
7561 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
7563 return (void*) uc
->uc_mcontext
->__ss
.__rip
;
7565 return (void*) uc
->uc_mcontext
->__ss
.__eip
;
7567 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
7568 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
7569 return (void*) uc
->uc_mcontext
->__ss
.__rip
;
7571 return (void*) uc
->uc_mcontext
->__ss
.__eip
;
7573 #elif defined(__i386__) || defined(__X86_64__) || defined(__x86_64__)
7574 return (void*) uc
->uc_mcontext
.gregs
[REG_EIP
]; /* Linux 32/64 bit */
7575 #elif defined(__ia64__) /* Linux IA64 */
7576 return (void*) uc
->uc_mcontext
.sc_ip
;
7582 static void segvHandler(int sig
, siginfo_t
*info
, void *secret
) {
7584 char **messages
= NULL
;
7585 int i
, trace_size
= 0;
7586 unsigned long offset
=0;
7587 ucontext_t
*uc
= (ucontext_t
*) secret
;
7589 REDIS_NOTUSED(info
);
7591 redisLog(REDIS_WARNING
,
7592 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION
, sig
);
7593 infostring
= genRedisInfoString();
7594 redisLog(REDIS_WARNING
, "%s",infostring
);
7595 /* It's not safe to sdsfree() the returned string under memory
7596 * corruption conditions. Let it leak as we are going to abort */
7598 trace_size
= backtrace(trace
, 100);
7599 /* overwrite sigaction with caller's address */
7600 if (getMcontextEip(uc
) != NULL
) {
7601 trace
[1] = getMcontextEip(uc
);
7603 messages
= backtrace_symbols(trace
, trace_size
);
7605 for (i
=1; i
<trace_size
; ++i
) {
7606 char *fn
= findFuncName(trace
[i
], &offset
), *p
;
7608 p
= strchr(messages
[i
],'+');
7609 if (!fn
|| (p
&& ((unsigned long)strtol(p
+1,NULL
,10)) < offset
)) {
7610 redisLog(REDIS_WARNING
,"%s", messages
[i
]);
7612 redisLog(REDIS_WARNING
,"%d redis-server %p %s + %d", i
, trace
[i
], fn
, (unsigned int)offset
);
7615 /* free(messages); Don't call free() with possibly corrupted memory. */
7619 static void setupSigSegvAction(void) {
7620 struct sigaction act
;
7622 sigemptyset (&act
.sa_mask
);
7623 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
7624 * is used. Otherwise, sa_handler is used */
7625 act
.sa_flags
= SA_NODEFER
| SA_ONSTACK
| SA_RESETHAND
| SA_SIGINFO
;
7626 act
.sa_sigaction
= segvHandler
;
7627 sigaction (SIGSEGV
, &act
, NULL
);
7628 sigaction (SIGBUS
, &act
, NULL
);
7629 sigaction (SIGFPE
, &act
, NULL
);
7630 sigaction (SIGILL
, &act
, NULL
);
7631 sigaction (SIGBUS
, &act
, NULL
);
7635 #include "staticsymbols.h"
7636 /* This function try to convert a pointer into a function name. It's used in
7637 * oreder to provide a backtrace under segmentation fault that's able to
7638 * display functions declared as static (otherwise the backtrace is useless). */
7639 static char *findFuncName(void *pointer
, unsigned long *offset
){
7641 unsigned long off
, minoff
= 0;
7643 /* Try to match against the Symbol with the smallest offset */
7644 for (i
=0; symsTable
[i
].pointer
; i
++) {
7645 unsigned long lp
= (unsigned long) pointer
;
7647 if (lp
!= (unsigned long)-1 && lp
>= symsTable
[i
].pointer
) {
7648 off
=lp
-symsTable
[i
].pointer
;
7649 if (ret
< 0 || off
< minoff
) {
7655 if (ret
== -1) return NULL
;
7657 return symsTable
[ret
].name
;
7659 #else /* HAVE_BACKTRACE */
7660 static void setupSigSegvAction(void) {
7662 #endif /* HAVE_BACKTRACE */