2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
30 #define REDIS_VERSION "1.3.2"
40 #define __USE_POSIX199309
46 #endif /* HAVE_BACKTRACE */
54 #include <arpa/inet.h>
58 #include <sys/resource.h>
65 #include "solarisfixes.h"
69 #include "ae.h" /* Event driven programming library */
70 #include "sds.h" /* Dynamic safe strings */
71 #include "anet.h" /* Networking the easy way */
72 #include "dict.h" /* Hash tables */
73 #include "adlist.h" /* Linked lists */
74 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
75 #include "lzf.h" /* LZF compression library */
76 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
78 /* #define REDIS_HELGRIND_FRIENDLY */
79 #if defined(__GNUC__) && defined(REDIS_HELGRIND_FRIENDLY)
80 #warning "Remember to undef REDIS_HELGRIND_FRIENDLY before to commit"
87 /* Static server configuration */
88 #define REDIS_SERVERPORT 6379 /* TCP port */
89 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
90 #define REDIS_IOBUF_LEN 1024
91 #define REDIS_LOADBUF_LEN 1024
92 #define REDIS_STATIC_ARGS 4
93 #define REDIS_DEFAULT_DBNUM 16
94 #define REDIS_CONFIGLINE_MAX 1024
95 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
96 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
97 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
98 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
99 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
101 /* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */
102 #define REDIS_WRITEV_THRESHOLD 3
103 /* Max number of iovecs used for each writev call */
104 #define REDIS_WRITEV_IOVEC_COUNT 256
106 /* Hash table parameters */
107 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
110 #define REDIS_CMD_BULK 1 /* Bulk write command */
111 #define REDIS_CMD_INLINE 2 /* Inline command */
112 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
113 this flags will return an error when the 'maxmemory' option is set in the
114 config file and the server is using more than maxmemory bytes of memory.
115 In short this commands are denied on low memory conditions. */
116 #define REDIS_CMD_DENYOOM 4
119 #define REDIS_STRING 0
125 /* Objects encoding */
126 #define REDIS_ENCODING_RAW 0 /* Raw representation */
127 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
129 /* Object types only used for dumping to disk */
130 #define REDIS_EXPIRETIME 253
131 #define REDIS_SELECTDB 254
132 #define REDIS_EOF 255
134 /* Defines related to the dump file format. To store 32 bits lengths for short
135 * keys requires a lot of space, so we check the most significant 2 bits of
136 * the first byte to interpreter the length:
138 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
139 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
140 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
141 * 11|000000 this means: specially encoded object will follow. The six bits
142 * number specify the kind of object that follows.
143 * See the REDIS_RDB_ENC_* defines.
145 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
146 * values, will fit inside. */
147 #define REDIS_RDB_6BITLEN 0
148 #define REDIS_RDB_14BITLEN 1
149 #define REDIS_RDB_32BITLEN 2
150 #define REDIS_RDB_ENCVAL 3
151 #define REDIS_RDB_LENERR UINT_MAX
153 /* When a length of a string object stored on disk has the first two bits
154 * set, the remaining two bits specify a special encoding for the object
155 * accordingly to the following defines: */
156 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
157 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
158 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
159 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
161 /* Virtual memory object->where field. */
162 #define REDIS_VM_MEMORY 0 /* The object is on memory */
163 #define REDIS_VM_SWAPPED 1 /* The object is on disk */
164 #define REDIS_VM_SWAPPING 2 /* Redis is swapping this object on disk */
165 #define REDIS_VM_LOADING 3 /* Redis is loading this object from disk */
167 /* Virtual memory static configuration stuff.
168 * Check vmFindContiguousPages() to know more about this magic numbers. */
169 #define REDIS_VM_MAX_NEAR_PAGES 65536
170 #define REDIS_VM_MAX_RANDOM_JUMP 4096
171 #define REDIS_VM_MAX_THREADS 32
172 #define REDIS_THREAD_STACK_SIZE (1024*1024*4)
173 /* The following is the number of completed I/O jobs to process when the
174 * handelr is called. 1 is the minimum, and also the default, as it allows
175 * to block as little as possible other accessing clients. While Virtual
176 * Memory I/O operations are performed by threads, this operations must
177 * be processed by the main thread when completed to take effect. */
178 #define REDIS_MAX_COMPLETED_JOBS_PROCESSED 1
181 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
182 #define REDIS_SLAVE 2 /* This client is a slave server */
183 #define REDIS_MASTER 4 /* This client is a master server */
184 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
185 #define REDIS_MULTI 16 /* This client is in a MULTI context */
186 #define REDIS_BLOCKED 32 /* The client is waiting in a blocking operation */
187 #define REDIS_IO_WAIT 64 /* The client is waiting for Virtual Memory I/O */
189 /* Slave replication state - slave side */
190 #define REDIS_REPL_NONE 0 /* No active replication */
191 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
192 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
194 /* Slave replication state - from the point of view of master
195 * Note that in SEND_BULK and ONLINE state the slave receives new updates
196 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
197 * to start the next background saving in order to send updates to it. */
198 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
199 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
200 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
201 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
203 /* List related stuff */
207 /* Sort operations */
208 #define REDIS_SORT_GET 0
209 #define REDIS_SORT_ASC 1
210 #define REDIS_SORT_DESC 2
211 #define REDIS_SORTKEY_MAX 1024
214 #define REDIS_DEBUG 0
215 #define REDIS_VERBOSE 1
216 #define REDIS_NOTICE 2
217 #define REDIS_WARNING 3
219 /* Anti-warning macro... */
220 #define REDIS_NOTUSED(V) ((void) V)
222 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
223 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
225 /* Append only defines */
226 #define APPENDFSYNC_NO 0
227 #define APPENDFSYNC_ALWAYS 1
228 #define APPENDFSYNC_EVERYSEC 2
230 /* We can print the stacktrace, so our assert is defined this way: */
231 #define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e,__FILE__,__LINE__),exit(1)))
232 static void _redisAssert(char *estr
, char *file
, int line
);
234 /*================================= Data types ============================== */
236 /* A redis object, that is a type able to hold a string / list / set */
238 /* The VM object structure */
239 struct redisObjectVM
{
240 off_t page
; /* the page at witch the object is stored on disk */
241 off_t usedpages
; /* number of pages used on disk */
242 time_t atime
; /* Last access time */
245 /* The actual Redis Object */
246 typedef struct redisObject
{
249 unsigned char encoding
;
250 unsigned char storage
; /* If this object is a key, where is the value?
251 * REDIS_VM_MEMORY, REDIS_VM_SWAPPED, ... */
252 unsigned char vtype
; /* If this object is a key, and value is swapped out,
253 * this is the type of the swapped out object. */
255 /* VM fields, this are only allocated if VM is active, otherwise the
256 * object allocation function will just allocate
257 * sizeof(redisObjct) minus sizeof(redisObjectVM), so using
258 * Redis without VM active will not have any overhead. */
259 struct redisObjectVM vm
;
262 /* Macro used to initalize a Redis object allocated on the stack.
263 * Note that this macro is taken near the structure definition to make sure
264 * we'll update it when the structure is changed, to avoid bugs like
265 * bug #85 introduced exactly in this way. */
266 #define initStaticStringObject(_var,_ptr) do { \
268 _var.type = REDIS_STRING; \
269 _var.encoding = REDIS_ENCODING_RAW; \
271 if (server.vm_enabled) _var.storage = REDIS_VM_MEMORY; \
274 typedef struct redisDb
{
275 dict
*dict
; /* The keyspace for this DB */
276 dict
*expires
; /* Timeout of keys with a timeout set */
277 dict
*blockingkeys
; /* Keys with clients waiting for data (BLPOP) */
281 /* Client MULTI/EXEC state */
282 typedef struct multiCmd
{
285 struct redisCommand
*cmd
;
288 typedef struct multiState
{
289 multiCmd
*commands
; /* Array of MULTI commands */
290 int count
; /* Total number of MULTI commands */
293 /* With multiplexing we need to take per-clinet state.
294 * Clients are taken in a liked list. */
295 typedef struct redisClient
{
300 robj
**argv
, **mbargv
;
302 int bulklen
; /* bulk read len. -1 if not in bulk read mode */
303 int multibulk
; /* multi bulk command format active */
306 time_t lastinteraction
; /* time of the last interaction, used for timeout */
307 int flags
; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
309 int slaveseldb
; /* slave selected db, if this client is a slave */
310 int authenticated
; /* when requirepass is non-NULL */
311 int replstate
; /* replication state if this is a slave */
312 int repldbfd
; /* replication DB file descriptor */
313 long repldboff
; /* replication DB file offset */
314 off_t repldbsize
; /* replication DB file size */
315 multiState mstate
; /* MULTI/EXEC state */
316 robj
**blockingkeys
; /* The key we waiting to terminate a blocking
317 * operation such as BLPOP. Otherwise NULL. */
318 int blockingkeysnum
; /* Number of blocking keys */
319 time_t blockingto
; /* Blocking operation timeout. If UNIX current time
320 * is >= blockingto then the operation timed out. */
321 list
*io_keys
; /* Keys this client is waiting to be loaded from the
322 * swap file in order to continue. */
330 /* Global server state structure */
335 dict
*sharingpool
; /* Poll used for object sharing */
336 unsigned int sharingpoolsize
;
337 long long dirty
; /* changes to DB from the last save */
339 list
*slaves
, *monitors
;
340 char neterr
[ANET_ERR_LEN
];
342 int cronloops
; /* number of times the cron function run */
343 list
*objfreelist
; /* A list of freed objects to avoid malloc() */
344 time_t lastsave
; /* Unix time of last save succeeede */
345 size_t usedmemory
; /* Used memory in megabytes */
346 /* Fields used only for stats */
347 time_t stat_starttime
; /* server start time */
348 long long stat_numcommands
; /* number of processed commands */
349 long long stat_numconnections
; /* number of connections received */
362 pid_t bgsavechildpid
;
363 pid_t bgrewritechildpid
;
364 sds bgrewritebuf
; /* buffer taken by parent during oppend only rewrite */
365 struct saveparam
*saveparams
;
370 char *appendfilename
;
374 /* Replication related */
379 redisClient
*master
; /* client that is master for this slave */
381 unsigned int maxclients
;
382 unsigned long long maxmemory
;
383 unsigned int blockedclients
;
384 /* Sort parameters - qsort_r() is only available under BSD so we
385 * have to take this state global, in order to pass it to sortCompare() */
389 /* Virtual memory configuration */
394 unsigned long long vm_max_memory
;
395 /* Virtual memory state */
398 off_t vm_next_page
; /* Next probably empty page */
399 off_t vm_near_pages
; /* Number of pages allocated sequentially */
400 unsigned char *vm_bitmap
; /* Bitmap of free/used pages */
401 time_t unixtime
; /* Unix time sampled every second. */
402 /* Virtual memory I/O threads stuff */
403 /* An I/O thread process an element taken from the io_jobs queue and
404 * put the result of the operation in the io_done list. While the
405 * job is being processed, it's put on io_processing queue. */
406 list
*io_newjobs
; /* List of VM I/O jobs yet to be processed */
407 list
*io_processing
; /* List of VM I/O jobs being processed */
408 list
*io_processed
; /* List of VM I/O jobs already processed */
409 list
*io_clients
; /* All the clients waiting for SWAP I/O operations */
410 pthread_mutex_t io_mutex
; /* lock to access io_jobs/io_done/io_thread_job */
411 pthread_mutex_t obj_freelist_mutex
; /* safe redis objects creation/free */
412 pthread_mutex_t io_swapfile_mutex
; /* So we can lseek + write */
413 pthread_attr_t io_threads_attr
; /* attributes for threads creation */
414 int io_active_threads
; /* Number of running I/O threads */
415 int vm_max_threads
; /* Max number of I/O threads running at the same time */
416 /* Our main thread is blocked on the event loop, locking for sockets ready
417 * to be read or written, so when a threaded I/O operation is ready to be
418 * processed by the main thread, the I/O thread will use a unix pipe to
419 * awake the main thread. The followings are the two pipe FDs. */
420 int io_ready_pipe_read
;
421 int io_ready_pipe_write
;
422 /* Virtual memory stats */
423 unsigned long long vm_stats_used_pages
;
424 unsigned long long vm_stats_swapped_objects
;
425 unsigned long long vm_stats_swapouts
;
426 unsigned long long vm_stats_swapins
;
430 typedef void redisCommandProc(redisClient
*c
);
431 struct redisCommand
{
433 redisCommandProc
*proc
;
438 struct redisFunctionSym
{
440 unsigned long pointer
;
443 typedef struct _redisSortObject
{
451 typedef struct _redisSortOperation
{
454 } redisSortOperation
;
456 /* ZSETs use a specialized version of Skiplists */
458 typedef struct zskiplistNode
{
459 struct zskiplistNode
**forward
;
460 struct zskiplistNode
*backward
;
465 typedef struct zskiplist
{
466 struct zskiplistNode
*header
, *tail
;
467 unsigned long length
;
471 typedef struct zset
{
476 /* Our shared "common" objects */
478 struct sharedObjectsStruct
{
479 robj
*crlf
, *ok
, *err
, *emptybulk
, *czero
, *cone
, *pong
, *space
,
480 *colon
, *nullbulk
, *nullmultibulk
, *queued
,
481 *emptymultibulk
, *wrongtypeerr
, *nokeyerr
, *syntaxerr
, *sameobjecterr
,
482 *outofrangeerr
, *plus
,
483 *select0
, *select1
, *select2
, *select3
, *select4
,
484 *select5
, *select6
, *select7
, *select8
, *select9
;
487 /* Global vars that are actally used as constants. The following double
488 * values are used for double on-disk serialization, and are initialized
489 * at runtime to avoid strange compiler optimizations. */
491 static double R_Zero
, R_PosInf
, R_NegInf
, R_Nan
;
493 /* VM threaded I/O request message */
494 #define REDIS_IOJOB_LOAD 0 /* Load from disk to memory */
495 #define REDIS_IOJOB_PREPARE_SWAP 1 /* Compute needed pages */
496 #define REDIS_IOJOB_DO_SWAP 2 /* Swap from memory to disk */
497 typedef struct iojon
{
498 int type
; /* Request type, REDIS_IOJOB_* */
499 redisDb
*db
;/* Redis database */
500 robj
*key
; /* This I/O request is about swapping this key */
501 robj
*val
; /* the value to swap for REDIS_IOREQ_*_SWAP, otherwise this
502 * field is populated by the I/O thread for REDIS_IOREQ_LOAD. */
503 off_t page
; /* Swap page where to read/write the object */
504 off_t pages
; /* Swap pages needed to safe object. PREPARE_SWAP return val */
505 int canceled
; /* True if this command was canceled by blocking side of VM */
506 pthread_t thread
; /* ID of the thread processing this entry */
509 /*================================ Prototypes =============================== */
511 static void freeStringObject(robj
*o
);
512 static void freeListObject(robj
*o
);
513 static void freeSetObject(robj
*o
);
514 static void decrRefCount(void *o
);
515 static robj
*createObject(int type
, void *ptr
);
516 static void freeClient(redisClient
*c
);
517 static int rdbLoad(char *filename
);
518 static void addReply(redisClient
*c
, robj
*obj
);
519 static void addReplySds(redisClient
*c
, sds s
);
520 static void incrRefCount(robj
*o
);
521 static int rdbSaveBackground(char *filename
);
522 static robj
*createStringObject(char *ptr
, size_t len
);
523 static robj
*dupStringObject(robj
*o
);
524 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
);
525 static void feedAppendOnlyFile(struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
);
526 static int syncWithMaster(void);
527 static robj
*tryObjectSharing(robj
*o
);
528 static int tryObjectEncoding(robj
*o
);
529 static robj
*getDecodedObject(robj
*o
);
530 static int removeExpire(redisDb
*db
, robj
*key
);
531 static int expireIfNeeded(redisDb
*db
, robj
*key
);
532 static int deleteIfVolatile(redisDb
*db
, robj
*key
);
533 static int deleteIfSwapped(redisDb
*db
, robj
*key
);
534 static int deleteKey(redisDb
*db
, robj
*key
);
535 static time_t getExpire(redisDb
*db
, robj
*key
);
536 static int setExpire(redisDb
*db
, robj
*key
, time_t when
);
537 static void updateSlavesWaitingBgsave(int bgsaveerr
);
538 static void freeMemoryIfNeeded(void);
539 static int processCommand(redisClient
*c
);
540 static void setupSigSegvAction(void);
541 static void rdbRemoveTempFile(pid_t childpid
);
542 static void aofRemoveTempFile(pid_t childpid
);
543 static size_t stringObjectLen(robj
*o
);
544 static void processInputBuffer(redisClient
*c
);
545 static zskiplist
*zslCreate(void);
546 static void zslFree(zskiplist
*zsl
);
547 static void zslInsert(zskiplist
*zsl
, double score
, robj
*obj
);
548 static void sendReplyToClientWritev(aeEventLoop
*el
, int fd
, void *privdata
, int mask
);
549 static void initClientMultiState(redisClient
*c
);
550 static void freeClientMultiState(redisClient
*c
);
551 static void queueMultiCommand(redisClient
*c
, struct redisCommand
*cmd
);
552 static void unblockClient(redisClient
*c
);
553 static int handleClientsWaitingListPush(redisClient
*c
, robj
*key
, robj
*ele
);
554 static void vmInit(void);
555 static void vmMarkPagesFree(off_t page
, off_t count
);
556 static robj
*vmLoadObject(robj
*key
);
557 static robj
*vmPreviewObject(robj
*key
);
558 static int vmSwapOneObjectBlocking(void);
559 static int vmSwapOneObjectThreaded(void);
560 static int vmCanSwapOut(void);
561 static int tryFreeOneObjectFromFreelist(void);
562 static void acceptHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
);
563 static void vmThreadedIOCompletedJob(aeEventLoop
*el
, int fd
, void *privdata
, int mask
);
564 static void vmCancelThreadedIOJob(robj
*o
);
565 static void lockThreadedIO(void);
566 static void unlockThreadedIO(void);
567 static int vmSwapObjectThreaded(robj
*key
, robj
*val
, redisDb
*db
);
568 static void freeIOJob(iojob
*j
);
569 static void queueIOJob(iojob
*j
);
570 static int vmWriteObjectOnSwap(robj
*o
, off_t page
);
571 static robj
*vmReadObjectFromSwap(off_t page
, int type
);
572 static void waitEmptyIOJobsQueue(void);
573 static void vmReopenSwapFile(void);
575 static void authCommand(redisClient
*c
);
576 static void pingCommand(redisClient
*c
);
577 static void echoCommand(redisClient
*c
);
578 static void setCommand(redisClient
*c
);
579 static void setnxCommand(redisClient
*c
);
580 static void getCommand(redisClient
*c
);
581 static void delCommand(redisClient
*c
);
582 static void existsCommand(redisClient
*c
);
583 static void incrCommand(redisClient
*c
);
584 static void decrCommand(redisClient
*c
);
585 static void incrbyCommand(redisClient
*c
);
586 static void decrbyCommand(redisClient
*c
);
587 static void selectCommand(redisClient
*c
);
588 static void randomkeyCommand(redisClient
*c
);
589 static void keysCommand(redisClient
*c
);
590 static void dbsizeCommand(redisClient
*c
);
591 static void lastsaveCommand(redisClient
*c
);
592 static void saveCommand(redisClient
*c
);
593 static void bgsaveCommand(redisClient
*c
);
594 static void bgrewriteaofCommand(redisClient
*c
);
595 static void shutdownCommand(redisClient
*c
);
596 static void moveCommand(redisClient
*c
);
597 static void renameCommand(redisClient
*c
);
598 static void renamenxCommand(redisClient
*c
);
599 static void lpushCommand(redisClient
*c
);
600 static void rpushCommand(redisClient
*c
);
601 static void lpopCommand(redisClient
*c
);
602 static void rpopCommand(redisClient
*c
);
603 static void llenCommand(redisClient
*c
);
604 static void lindexCommand(redisClient
*c
);
605 static void lrangeCommand(redisClient
*c
);
606 static void ltrimCommand(redisClient
*c
);
607 static void typeCommand(redisClient
*c
);
608 static void lsetCommand(redisClient
*c
);
609 static void saddCommand(redisClient
*c
);
610 static void sremCommand(redisClient
*c
);
611 static void smoveCommand(redisClient
*c
);
612 static void sismemberCommand(redisClient
*c
);
613 static void scardCommand(redisClient
*c
);
614 static void spopCommand(redisClient
*c
);
615 static void srandmemberCommand(redisClient
*c
);
616 static void sinterCommand(redisClient
*c
);
617 static void sinterstoreCommand(redisClient
*c
);
618 static void sunionCommand(redisClient
*c
);
619 static void sunionstoreCommand(redisClient
*c
);
620 static void sdiffCommand(redisClient
*c
);
621 static void sdiffstoreCommand(redisClient
*c
);
622 static void syncCommand(redisClient
*c
);
623 static void flushdbCommand(redisClient
*c
);
624 static void flushallCommand(redisClient
*c
);
625 static void sortCommand(redisClient
*c
);
626 static void lremCommand(redisClient
*c
);
627 static void rpoplpushcommand(redisClient
*c
);
628 static void infoCommand(redisClient
*c
);
629 static void mgetCommand(redisClient
*c
);
630 static void monitorCommand(redisClient
*c
);
631 static void expireCommand(redisClient
*c
);
632 static void expireatCommand(redisClient
*c
);
633 static void getsetCommand(redisClient
*c
);
634 static void ttlCommand(redisClient
*c
);
635 static void slaveofCommand(redisClient
*c
);
636 static void debugCommand(redisClient
*c
);
637 static void msetCommand(redisClient
*c
);
638 static void msetnxCommand(redisClient
*c
);
639 static void zaddCommand(redisClient
*c
);
640 static void zincrbyCommand(redisClient
*c
);
641 static void zrangeCommand(redisClient
*c
);
642 static void zrangebyscoreCommand(redisClient
*c
);
643 static void zrevrangeCommand(redisClient
*c
);
644 static void zcardCommand(redisClient
*c
);
645 static void zremCommand(redisClient
*c
);
646 static void zscoreCommand(redisClient
*c
);
647 static void zremrangebyscoreCommand(redisClient
*c
);
648 static void multiCommand(redisClient
*c
);
649 static void execCommand(redisClient
*c
);
650 static void blpopCommand(redisClient
*c
);
651 static void brpopCommand(redisClient
*c
);
653 /*================================= Globals ================================= */
656 static struct redisServer server
; /* server global state */
657 static struct redisCommand cmdTable
[] = {
658 {"get",getCommand
,2,REDIS_CMD_INLINE
},
659 {"set",setCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
660 {"setnx",setnxCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
661 {"del",delCommand
,-2,REDIS_CMD_INLINE
},
662 {"exists",existsCommand
,2,REDIS_CMD_INLINE
},
663 {"incr",incrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
664 {"decr",decrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
665 {"mget",mgetCommand
,-2,REDIS_CMD_INLINE
},
666 {"rpush",rpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
667 {"lpush",lpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
668 {"rpop",rpopCommand
,2,REDIS_CMD_INLINE
},
669 {"lpop",lpopCommand
,2,REDIS_CMD_INLINE
},
670 {"brpop",brpopCommand
,-3,REDIS_CMD_INLINE
},
671 {"blpop",blpopCommand
,-3,REDIS_CMD_INLINE
},
672 {"llen",llenCommand
,2,REDIS_CMD_INLINE
},
673 {"lindex",lindexCommand
,3,REDIS_CMD_INLINE
},
674 {"lset",lsetCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
675 {"lrange",lrangeCommand
,4,REDIS_CMD_INLINE
},
676 {"ltrim",ltrimCommand
,4,REDIS_CMD_INLINE
},
677 {"lrem",lremCommand
,4,REDIS_CMD_BULK
},
678 {"rpoplpush",rpoplpushcommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
679 {"sadd",saddCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
680 {"srem",sremCommand
,3,REDIS_CMD_BULK
},
681 {"smove",smoveCommand
,4,REDIS_CMD_BULK
},
682 {"sismember",sismemberCommand
,3,REDIS_CMD_BULK
},
683 {"scard",scardCommand
,2,REDIS_CMD_INLINE
},
684 {"spop",spopCommand
,2,REDIS_CMD_INLINE
},
685 {"srandmember",srandmemberCommand
,2,REDIS_CMD_INLINE
},
686 {"sinter",sinterCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
687 {"sinterstore",sinterstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
688 {"sunion",sunionCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
689 {"sunionstore",sunionstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
690 {"sdiff",sdiffCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
691 {"sdiffstore",sdiffstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
692 {"smembers",sinterCommand
,2,REDIS_CMD_INLINE
},
693 {"zadd",zaddCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
694 {"zincrby",zincrbyCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
695 {"zrem",zremCommand
,3,REDIS_CMD_BULK
},
696 {"zremrangebyscore",zremrangebyscoreCommand
,4,REDIS_CMD_INLINE
},
697 {"zrange",zrangeCommand
,-4,REDIS_CMD_INLINE
},
698 {"zrangebyscore",zrangebyscoreCommand
,-4,REDIS_CMD_INLINE
},
699 {"zrevrange",zrevrangeCommand
,-4,REDIS_CMD_INLINE
},
700 {"zcard",zcardCommand
,2,REDIS_CMD_INLINE
},
701 {"zscore",zscoreCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
702 {"incrby",incrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
703 {"decrby",decrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
704 {"getset",getsetCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
705 {"mset",msetCommand
,-3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
706 {"msetnx",msetnxCommand
,-3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
707 {"randomkey",randomkeyCommand
,1,REDIS_CMD_INLINE
},
708 {"select",selectCommand
,2,REDIS_CMD_INLINE
},
709 {"move",moveCommand
,3,REDIS_CMD_INLINE
},
710 {"rename",renameCommand
,3,REDIS_CMD_INLINE
},
711 {"renamenx",renamenxCommand
,3,REDIS_CMD_INLINE
},
712 {"expire",expireCommand
,3,REDIS_CMD_INLINE
},
713 {"expireat",expireatCommand
,3,REDIS_CMD_INLINE
},
714 {"keys",keysCommand
,2,REDIS_CMD_INLINE
},
715 {"dbsize",dbsizeCommand
,1,REDIS_CMD_INLINE
},
716 {"auth",authCommand
,2,REDIS_CMD_INLINE
},
717 {"ping",pingCommand
,1,REDIS_CMD_INLINE
},
718 {"echo",echoCommand
,2,REDIS_CMD_BULK
},
719 {"save",saveCommand
,1,REDIS_CMD_INLINE
},
720 {"bgsave",bgsaveCommand
,1,REDIS_CMD_INLINE
},
721 {"bgrewriteaof",bgrewriteaofCommand
,1,REDIS_CMD_INLINE
},
722 {"shutdown",shutdownCommand
,1,REDIS_CMD_INLINE
},
723 {"lastsave",lastsaveCommand
,1,REDIS_CMD_INLINE
},
724 {"type",typeCommand
,2,REDIS_CMD_INLINE
},
725 {"multi",multiCommand
,1,REDIS_CMD_INLINE
},
726 {"exec",execCommand
,1,REDIS_CMD_INLINE
},
727 {"sync",syncCommand
,1,REDIS_CMD_INLINE
},
728 {"flushdb",flushdbCommand
,1,REDIS_CMD_INLINE
},
729 {"flushall",flushallCommand
,1,REDIS_CMD_INLINE
},
730 {"sort",sortCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
731 {"info",infoCommand
,1,REDIS_CMD_INLINE
},
732 {"monitor",monitorCommand
,1,REDIS_CMD_INLINE
},
733 {"ttl",ttlCommand
,2,REDIS_CMD_INLINE
},
734 {"slaveof",slaveofCommand
,3,REDIS_CMD_INLINE
},
735 {"debug",debugCommand
,-2,REDIS_CMD_INLINE
},
739 /*============================ Utility functions ============================ */
741 /* Glob-style pattern matching. */
742 int stringmatchlen(const char *pattern
, int patternLen
,
743 const char *string
, int stringLen
, int nocase
)
748 while (pattern
[1] == '*') {
753 return 1; /* match */
755 if (stringmatchlen(pattern
+1, patternLen
-1,
756 string
, stringLen
, nocase
))
757 return 1; /* match */
761 return 0; /* no match */
765 return 0; /* no match */
775 not = pattern
[0] == '^';
782 if (pattern
[0] == '\\') {
785 if (pattern
[0] == string
[0])
787 } else if (pattern
[0] == ']') {
789 } else if (patternLen
== 0) {
793 } else if (pattern
[1] == '-' && patternLen
>= 3) {
794 int start
= pattern
[0];
795 int end
= pattern
[2];
803 start
= tolower(start
);
809 if (c
>= start
&& c
<= end
)
813 if (pattern
[0] == string
[0])
816 if (tolower((int)pattern
[0]) == tolower((int)string
[0]))
826 return 0; /* no match */
832 if (patternLen
>= 2) {
839 if (pattern
[0] != string
[0])
840 return 0; /* no match */
842 if (tolower((int)pattern
[0]) != tolower((int)string
[0]))
843 return 0; /* no match */
851 if (stringLen
== 0) {
852 while(*pattern
== '*') {
859 if (patternLen
== 0 && stringLen
== 0)
864 static void redisLog(int level
, const char *fmt
, ...) {
868 fp
= (server
.logfile
== NULL
) ? stdout
: fopen(server
.logfile
,"a");
872 if (level
>= server
.verbosity
) {
878 strftime(buf
,64,"%d %b %H:%M:%S",localtime(&now
));
879 fprintf(fp
,"[%d] %s %c ",(int)getpid(),buf
,c
[level
]);
880 vfprintf(fp
, fmt
, ap
);
886 if (server
.logfile
) fclose(fp
);
889 /*====================== Hash table type implementation ==================== */
891 /* This is an hash table type that uses the SDS dynamic strings libary as
892 * keys and radis objects as values (objects can hold SDS strings,
895 static void dictVanillaFree(void *privdata
, void *val
)
897 DICT_NOTUSED(privdata
);
901 static void dictListDestructor(void *privdata
, void *val
)
903 DICT_NOTUSED(privdata
);
904 listRelease((list
*)val
);
907 static int sdsDictKeyCompare(void *privdata
, const void *key1
,
911 DICT_NOTUSED(privdata
);
913 l1
= sdslen((sds
)key1
);
914 l2
= sdslen((sds
)key2
);
915 if (l1
!= l2
) return 0;
916 return memcmp(key1
, key2
, l1
) == 0;
919 static void dictRedisObjectDestructor(void *privdata
, void *val
)
921 DICT_NOTUSED(privdata
);
923 if (val
== NULL
) return; /* Values of swapped out keys as set to NULL */
927 static int dictObjKeyCompare(void *privdata
, const void *key1
,
930 const robj
*o1
= key1
, *o2
= key2
;
931 return sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
934 static unsigned int dictObjHash(const void *key
) {
936 return dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
939 static int dictEncObjKeyCompare(void *privdata
, const void *key1
,
942 robj
*o1
= (robj
*) key1
, *o2
= (robj
*) key2
;
945 o1
= getDecodedObject(o1
);
946 o2
= getDecodedObject(o2
);
947 cmp
= sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
953 static unsigned int dictEncObjHash(const void *key
) {
954 robj
*o
= (robj
*) key
;
956 o
= getDecodedObject(o
);
957 unsigned int hash
= dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
962 /* Sets type and expires */
963 static dictType setDictType
= {
964 dictEncObjHash
, /* hash function */
967 dictEncObjKeyCompare
, /* key compare */
968 dictRedisObjectDestructor
, /* key destructor */
969 NULL
/* val destructor */
972 /* Sorted sets hash (note: a skiplist is used in addition to the hash table) */
973 static dictType zsetDictType
= {
974 dictEncObjHash
, /* hash function */
977 dictEncObjKeyCompare
, /* key compare */
978 dictRedisObjectDestructor
, /* key destructor */
979 dictVanillaFree
/* val destructor of malloc(sizeof(double)) */
983 static dictType hashDictType
= {
984 dictObjHash
, /* hash function */
987 dictObjKeyCompare
, /* key compare */
988 dictRedisObjectDestructor
, /* key destructor */
989 dictRedisObjectDestructor
/* val destructor */
993 static dictType keyptrDictType
= {
994 dictObjHash
, /* hash function */
997 dictObjKeyCompare
, /* key compare */
998 dictRedisObjectDestructor
, /* key destructor */
999 NULL
/* val destructor */
1002 /* Keylist hash table type has unencoded redis objects as keys and
1003 * lists as values. It's used for blocking operations (BLPOP) */
1004 static dictType keylistDictType
= {
1005 dictObjHash
, /* hash function */
1008 dictObjKeyCompare
, /* key compare */
1009 dictRedisObjectDestructor
, /* key destructor */
1010 dictListDestructor
/* val destructor */
1013 /* ========================= Random utility functions ======================= */
1015 /* Redis generally does not try to recover from out of memory conditions
1016 * when allocating objects or strings, it is not clear if it will be possible
1017 * to report this condition to the client since the networking layer itself
1018 * is based on heap allocation for send buffers, so we simply abort.
1019 * At least the code will be simpler to read... */
1020 static void oom(const char *msg
) {
1021 redisLog(REDIS_WARNING
, "%s: Out of memory\n",msg
);
1026 /* ====================== Redis server networking stuff ===================== */
1027 static void closeTimedoutClients(void) {
1030 time_t now
= time(NULL
);
1033 listRewind(server
.clients
,&li
);
1034 while ((ln
= listNext(&li
)) != NULL
) {
1035 c
= listNodeValue(ln
);
1036 if (server
.maxidletime
&&
1037 !(c
->flags
& REDIS_SLAVE
) && /* no timeout for slaves */
1038 !(c
->flags
& REDIS_MASTER
) && /* no timeout for masters */
1039 (now
- c
->lastinteraction
> server
.maxidletime
))
1041 redisLog(REDIS_VERBOSE
,"Closing idle client");
1043 } else if (c
->flags
& REDIS_BLOCKED
) {
1044 if (c
->blockingto
!= 0 && c
->blockingto
< now
) {
1045 addReply(c
,shared
.nullmultibulk
);
1052 static int htNeedsResize(dict
*dict
) {
1053 long long size
, used
;
1055 size
= dictSlots(dict
);
1056 used
= dictSize(dict
);
1057 return (size
&& used
&& size
> DICT_HT_INITIAL_SIZE
&&
1058 (used
*100/size
< REDIS_HT_MINFILL
));
1061 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
1062 * we resize the hash table to save memory */
1063 static void tryResizeHashTables(void) {
1066 for (j
= 0; j
< server
.dbnum
; j
++) {
1067 if (htNeedsResize(server
.db
[j
].dict
)) {
1068 redisLog(REDIS_VERBOSE
,"The hash table %d is too sparse, resize it...",j
);
1069 dictResize(server
.db
[j
].dict
);
1070 redisLog(REDIS_VERBOSE
,"Hash table %d resized.",j
);
1072 if (htNeedsResize(server
.db
[j
].expires
))
1073 dictResize(server
.db
[j
].expires
);
1077 /* A background saving child (BGSAVE) terminated its work. Handle this. */
1078 void backgroundSaveDoneHandler(int statloc
) {
1079 int exitcode
= WEXITSTATUS(statloc
);
1080 int bysignal
= WIFSIGNALED(statloc
);
1082 if (!bysignal
&& exitcode
== 0) {
1083 redisLog(REDIS_NOTICE
,
1084 "Background saving terminated with success");
1086 server
.lastsave
= time(NULL
);
1087 } else if (!bysignal
&& exitcode
!= 0) {
1088 redisLog(REDIS_WARNING
, "Background saving error");
1090 redisLog(REDIS_WARNING
,
1091 "Background saving terminated by signal");
1092 rdbRemoveTempFile(server
.bgsavechildpid
);
1094 server
.bgsavechildpid
= -1;
1095 /* Possibly there are slaves waiting for a BGSAVE in order to be served
1096 * (the first stage of SYNC is a bulk transfer of dump.rdb) */
1097 updateSlavesWaitingBgsave(exitcode
== 0 ? REDIS_OK
: REDIS_ERR
);
1100 /* A background append only file rewriting (BGREWRITEAOF) terminated its work.
1102 void backgroundRewriteDoneHandler(int statloc
) {
1103 int exitcode
= WEXITSTATUS(statloc
);
1104 int bysignal
= WIFSIGNALED(statloc
);
1106 if (!bysignal
&& exitcode
== 0) {
1110 redisLog(REDIS_NOTICE
,
1111 "Background append only file rewriting terminated with success");
1112 /* Now it's time to flush the differences accumulated by the parent */
1113 snprintf(tmpfile
,256,"temp-rewriteaof-bg-%d.aof", (int) server
.bgrewritechildpid
);
1114 fd
= open(tmpfile
,O_WRONLY
|O_APPEND
);
1116 redisLog(REDIS_WARNING
, "Not able to open the temp append only file produced by the child: %s", strerror(errno
));
1119 /* Flush our data... */
1120 if (write(fd
,server
.bgrewritebuf
,sdslen(server
.bgrewritebuf
)) !=
1121 (signed) sdslen(server
.bgrewritebuf
)) {
1122 redisLog(REDIS_WARNING
, "Error or short write trying to flush the parent diff of the append log file in the child temp file: %s", strerror(errno
));
1126 redisLog(REDIS_NOTICE
,"Parent diff flushed into the new append log file with success (%lu bytes)",sdslen(server
.bgrewritebuf
));
1127 /* Now our work is to rename the temp file into the stable file. And
1128 * switch the file descriptor used by the server for append only. */
1129 if (rename(tmpfile
,server
.appendfilename
) == -1) {
1130 redisLog(REDIS_WARNING
,"Can't rename the temp append only file into the stable one: %s", strerror(errno
));
1134 /* Mission completed... almost */
1135 redisLog(REDIS_NOTICE
,"Append only file successfully rewritten.");
1136 if (server
.appendfd
!= -1) {
1137 /* If append only is actually enabled... */
1138 close(server
.appendfd
);
1139 server
.appendfd
= fd
;
1141 server
.appendseldb
= -1; /* Make sure it will issue SELECT */
1142 redisLog(REDIS_NOTICE
,"The new append only file was selected for future appends.");
1144 /* If append only is disabled we just generate a dump in this
1145 * format. Why not? */
1148 } else if (!bysignal
&& exitcode
!= 0) {
1149 redisLog(REDIS_WARNING
, "Background append only file rewriting error");
1151 redisLog(REDIS_WARNING
,
1152 "Background append only file rewriting terminated by signal");
1155 sdsfree(server
.bgrewritebuf
);
1156 server
.bgrewritebuf
= sdsempty();
1157 aofRemoveTempFile(server
.bgrewritechildpid
);
1158 server
.bgrewritechildpid
= -1;
1161 static int serverCron(struct aeEventLoop
*eventLoop
, long long id
, void *clientData
) {
1162 int j
, loops
= server
.cronloops
++;
1163 REDIS_NOTUSED(eventLoop
);
1165 REDIS_NOTUSED(clientData
);
1167 /* We take a cached value of the unix time in the global state because
1168 * with virtual memory and aging there is to store the current time
1169 * in objects at every object access, and accuracy is not needed.
1170 * To access a global var is faster than calling time(NULL) */
1171 server
.unixtime
= time(NULL
);
1173 /* Update the global state with the amount of used memory */
1174 server
.usedmemory
= zmalloc_used_memory();
1176 /* Show some info about non-empty databases */
1177 for (j
= 0; j
< server
.dbnum
; j
++) {
1178 long long size
, used
, vkeys
;
1180 size
= dictSlots(server
.db
[j
].dict
);
1181 used
= dictSize(server
.db
[j
].dict
);
1182 vkeys
= dictSize(server
.db
[j
].expires
);
1183 if (!(loops
% 5) && (used
|| vkeys
)) {
1184 redisLog(REDIS_VERBOSE
,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j
,used
,vkeys
,size
);
1185 /* dictPrintStats(server.dict); */
1189 /* We don't want to resize the hash tables while a bacground saving
1190 * is in progress: the saving child is created using fork() that is
1191 * implemented with a copy-on-write semantic in most modern systems, so
1192 * if we resize the HT while there is the saving child at work actually
1193 * a lot of memory movements in the parent will cause a lot of pages
1195 if (server
.bgsavechildpid
== -1) tryResizeHashTables();
1197 /* Show information about connected clients */
1199 redisLog(REDIS_VERBOSE
,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
1200 listLength(server
.clients
)-listLength(server
.slaves
),
1201 listLength(server
.slaves
),
1203 dictSize(server
.sharingpool
));
1206 /* Close connections of timedout clients */
1207 if ((server
.maxidletime
&& !(loops
% 10)) || server
.blockedclients
)
1208 closeTimedoutClients();
1210 /* Check if a background saving or AOF rewrite in progress terminated */
1211 if (server
.bgsavechildpid
!= -1 || server
.bgrewritechildpid
!= -1) {
1215 if ((pid
= wait3(&statloc
,WNOHANG
,NULL
)) != 0) {
1216 if (pid
== server
.bgsavechildpid
) {
1217 backgroundSaveDoneHandler(statloc
);
1219 backgroundRewriteDoneHandler(statloc
);
1223 /* If there is not a background saving in progress check if
1224 * we have to save now */
1225 time_t now
= time(NULL
);
1226 for (j
= 0; j
< server
.saveparamslen
; j
++) {
1227 struct saveparam
*sp
= server
.saveparams
+j
;
1229 if (server
.dirty
>= sp
->changes
&&
1230 now
-server
.lastsave
> sp
->seconds
) {
1231 redisLog(REDIS_NOTICE
,"%d changes in %d seconds. Saving...",
1232 sp
->changes
, sp
->seconds
);
1233 rdbSaveBackground(server
.dbfilename
);
1239 /* Try to expire a few timed out keys. The algorithm used is adaptive and
1240 * will use few CPU cycles if there are few expiring keys, otherwise
1241 * it will get more aggressive to avoid that too much memory is used by
1242 * keys that can be removed from the keyspace. */
1243 for (j
= 0; j
< server
.dbnum
; j
++) {
1245 redisDb
*db
= server
.db
+j
;
1247 /* Continue to expire if at the end of the cycle more than 25%
1248 * of the keys were expired. */
1250 long num
= dictSize(db
->expires
);
1251 time_t now
= time(NULL
);
1254 if (num
> REDIS_EXPIRELOOKUPS_PER_CRON
)
1255 num
= REDIS_EXPIRELOOKUPS_PER_CRON
;
1260 if ((de
= dictGetRandomKey(db
->expires
)) == NULL
) break;
1261 t
= (time_t) dictGetEntryVal(de
);
1263 deleteKey(db
,dictGetEntryKey(de
));
1267 } while (expired
> REDIS_EXPIRELOOKUPS_PER_CRON
/4);
1270 /* Swap a few keys on disk if we are over the memory limit and VM
1271 * is enbled. Try to free objects from the free list first. */
1272 if (vmCanSwapOut()) {
1273 while (server
.vm_enabled
&& zmalloc_used_memory() >
1274 server
.vm_max_memory
)
1278 if (tryFreeOneObjectFromFreelist() == REDIS_OK
) continue;
1279 retval
= (server
.vm_max_threads
== 0) ?
1280 vmSwapOneObjectBlocking() :
1281 vmSwapOneObjectThreaded();
1282 if (retval
== REDIS_ERR
&& (loops
% 30) == 0 &&
1283 zmalloc_used_memory() >
1284 (server
.vm_max_memory
+server
.vm_max_memory
/10))
1286 redisLog(REDIS_WARNING
,"WARNING: vm-max-memory limit exceeded by more than 10%% but unable to swap more objects out!");
1288 /* Note that when using threade I/O we free just one object,
1289 * because anyway when the I/O thread in charge to swap this
1290 * object out will finish, the handler of completed jobs
1291 * will try to swap more objects if we are still out of memory. */
1292 if (retval
== REDIS_ERR
|| server
.vm_max_threads
> 0) break;
1296 /* Check if we should connect to a MASTER */
1297 if (server
.replstate
== REDIS_REPL_CONNECT
) {
1298 redisLog(REDIS_NOTICE
,"Connecting to MASTER...");
1299 if (syncWithMaster() == REDIS_OK
) {
1300 redisLog(REDIS_NOTICE
,"MASTER <-> SLAVE sync succeeded");
1306 static void createSharedObjects(void) {
1307 shared
.crlf
= createObject(REDIS_STRING
,sdsnew("\r\n"));
1308 shared
.ok
= createObject(REDIS_STRING
,sdsnew("+OK\r\n"));
1309 shared
.err
= createObject(REDIS_STRING
,sdsnew("-ERR\r\n"));
1310 shared
.emptybulk
= createObject(REDIS_STRING
,sdsnew("$0\r\n\r\n"));
1311 shared
.czero
= createObject(REDIS_STRING
,sdsnew(":0\r\n"));
1312 shared
.cone
= createObject(REDIS_STRING
,sdsnew(":1\r\n"));
1313 shared
.nullbulk
= createObject(REDIS_STRING
,sdsnew("$-1\r\n"));
1314 shared
.nullmultibulk
= createObject(REDIS_STRING
,sdsnew("*-1\r\n"));
1315 shared
.emptymultibulk
= createObject(REDIS_STRING
,sdsnew("*0\r\n"));
1316 shared
.pong
= createObject(REDIS_STRING
,sdsnew("+PONG\r\n"));
1317 shared
.queued
= createObject(REDIS_STRING
,sdsnew("+QUEUED\r\n"));
1318 shared
.wrongtypeerr
= createObject(REDIS_STRING
,sdsnew(
1319 "-ERR Operation against a key holding the wrong kind of value\r\n"));
1320 shared
.nokeyerr
= createObject(REDIS_STRING
,sdsnew(
1321 "-ERR no such key\r\n"));
1322 shared
.syntaxerr
= createObject(REDIS_STRING
,sdsnew(
1323 "-ERR syntax error\r\n"));
1324 shared
.sameobjecterr
= createObject(REDIS_STRING
,sdsnew(
1325 "-ERR source and destination objects are the same\r\n"));
1326 shared
.outofrangeerr
= createObject(REDIS_STRING
,sdsnew(
1327 "-ERR index out of range\r\n"));
1328 shared
.space
= createObject(REDIS_STRING
,sdsnew(" "));
1329 shared
.colon
= createObject(REDIS_STRING
,sdsnew(":"));
1330 shared
.plus
= createObject(REDIS_STRING
,sdsnew("+"));
1331 shared
.select0
= createStringObject("select 0\r\n",10);
1332 shared
.select1
= createStringObject("select 1\r\n",10);
1333 shared
.select2
= createStringObject("select 2\r\n",10);
1334 shared
.select3
= createStringObject("select 3\r\n",10);
1335 shared
.select4
= createStringObject("select 4\r\n",10);
1336 shared
.select5
= createStringObject("select 5\r\n",10);
1337 shared
.select6
= createStringObject("select 6\r\n",10);
1338 shared
.select7
= createStringObject("select 7\r\n",10);
1339 shared
.select8
= createStringObject("select 8\r\n",10);
1340 shared
.select9
= createStringObject("select 9\r\n",10);
1343 static void appendServerSaveParams(time_t seconds
, int changes
) {
1344 server
.saveparams
= zrealloc(server
.saveparams
,sizeof(struct saveparam
)*(server
.saveparamslen
+1));
1345 server
.saveparams
[server
.saveparamslen
].seconds
= seconds
;
1346 server
.saveparams
[server
.saveparamslen
].changes
= changes
;
1347 server
.saveparamslen
++;
1350 static void resetServerSaveParams() {
1351 zfree(server
.saveparams
);
1352 server
.saveparams
= NULL
;
1353 server
.saveparamslen
= 0;
1356 static void initServerConfig() {
1357 server
.dbnum
= REDIS_DEFAULT_DBNUM
;
1358 server
.port
= REDIS_SERVERPORT
;
1359 server
.verbosity
= REDIS_VERBOSE
;
1360 server
.maxidletime
= REDIS_MAXIDLETIME
;
1361 server
.saveparams
= NULL
;
1362 server
.logfile
= NULL
; /* NULL = log on standard output */
1363 server
.bindaddr
= NULL
;
1364 server
.glueoutputbuf
= 1;
1365 server
.daemonize
= 0;
1366 server
.appendonly
= 0;
1367 server
.appendfsync
= APPENDFSYNC_ALWAYS
;
1368 server
.lastfsync
= time(NULL
);
1369 server
.appendfd
= -1;
1370 server
.appendseldb
= -1; /* Make sure the first time will not match */
1371 server
.pidfile
= "/var/run/redis.pid";
1372 server
.dbfilename
= "dump.rdb";
1373 server
.appendfilename
= "appendonly.aof";
1374 server
.requirepass
= NULL
;
1375 server
.shareobjects
= 0;
1376 server
.rdbcompression
= 1;
1377 server
.sharingpoolsize
= 1024;
1378 server
.maxclients
= 0;
1379 server
.blockedclients
= 0;
1380 server
.maxmemory
= 0;
1381 server
.vm_enabled
= 0;
1382 server
.vm_swap_file
= zstrdup("/tmp/redis-%p.vm");
1383 server
.vm_page_size
= 256; /* 256 bytes per page */
1384 server
.vm_pages
= 1024*1024*100; /* 104 millions of pages */
1385 server
.vm_max_memory
= 1024LL*1024*1024*1; /* 1 GB of RAM */
1386 server
.vm_max_threads
= 4;
1388 resetServerSaveParams();
1390 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1391 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1392 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1393 /* Replication related */
1395 server
.masterauth
= NULL
;
1396 server
.masterhost
= NULL
;
1397 server
.masterport
= 6379;
1398 server
.master
= NULL
;
1399 server
.replstate
= REDIS_REPL_NONE
;
1401 /* Double constants initialization */
1403 R_PosInf
= 1.0/R_Zero
;
1404 R_NegInf
= -1.0/R_Zero
;
1405 R_Nan
= R_Zero
/R_Zero
;
1408 static void initServer() {
1411 signal(SIGHUP
, SIG_IGN
);
1412 signal(SIGPIPE
, SIG_IGN
);
1413 setupSigSegvAction();
1415 server
.devnull
= fopen("/dev/null","w");
1416 if (server
.devnull
== NULL
) {
1417 redisLog(REDIS_WARNING
, "Can't open /dev/null: %s", server
.neterr
);
1420 server
.clients
= listCreate();
1421 server
.slaves
= listCreate();
1422 server
.monitors
= listCreate();
1423 server
.objfreelist
= listCreate();
1424 createSharedObjects();
1425 server
.el
= aeCreateEventLoop();
1426 server
.db
= zmalloc(sizeof(redisDb
)*server
.dbnum
);
1427 server
.sharingpool
= dictCreate(&setDictType
,NULL
);
1428 server
.fd
= anetTcpServer(server
.neterr
, server
.port
, server
.bindaddr
);
1429 if (server
.fd
== -1) {
1430 redisLog(REDIS_WARNING
, "Opening TCP port: %s", server
.neterr
);
1433 for (j
= 0; j
< server
.dbnum
; j
++) {
1434 server
.db
[j
].dict
= dictCreate(&hashDictType
,NULL
);
1435 server
.db
[j
].expires
= dictCreate(&keyptrDictType
,NULL
);
1436 server
.db
[j
].blockingkeys
= dictCreate(&keylistDictType
,NULL
);
1437 server
.db
[j
].id
= j
;
1439 server
.cronloops
= 0;
1440 server
.bgsavechildpid
= -1;
1441 server
.bgrewritechildpid
= -1;
1442 server
.bgrewritebuf
= sdsempty();
1443 server
.lastsave
= time(NULL
);
1445 server
.usedmemory
= 0;
1446 server
.stat_numcommands
= 0;
1447 server
.stat_numconnections
= 0;
1448 server
.stat_starttime
= time(NULL
);
1449 server
.unixtime
= time(NULL
);
1450 aeCreateTimeEvent(server
.el
, 1, serverCron
, NULL
, NULL
);
1451 if (aeCreateFileEvent(server
.el
, server
.fd
, AE_READABLE
,
1452 acceptHandler
, NULL
) == AE_ERR
) oom("creating file event");
1454 if (server
.appendonly
) {
1455 server
.appendfd
= open(server
.appendfilename
,O_WRONLY
|O_APPEND
|O_CREAT
,0644);
1456 if (server
.appendfd
== -1) {
1457 redisLog(REDIS_WARNING
, "Can't open the append-only file: %s",
1463 if (server
.vm_enabled
) vmInit();
1466 /* Empty the whole database */
1467 static long long emptyDb() {
1469 long long removed
= 0;
1471 for (j
= 0; j
< server
.dbnum
; j
++) {
1472 removed
+= dictSize(server
.db
[j
].dict
);
1473 dictEmpty(server
.db
[j
].dict
);
1474 dictEmpty(server
.db
[j
].expires
);
1479 static int yesnotoi(char *s
) {
1480 if (!strcasecmp(s
,"yes")) return 1;
1481 else if (!strcasecmp(s
,"no")) return 0;
1485 /* I agree, this is a very rudimental way to load a configuration...
1486 will improve later if the config gets more complex */
1487 static void loadServerConfig(char *filename
) {
1489 char buf
[REDIS_CONFIGLINE_MAX
+1], *err
= NULL
;
1493 if (filename
[0] == '-' && filename
[1] == '\0')
1496 if ((fp
= fopen(filename
,"r")) == NULL
) {
1497 redisLog(REDIS_WARNING
,"Fatal error, can't open config file");
1502 while(fgets(buf
,REDIS_CONFIGLINE_MAX
+1,fp
) != NULL
) {
1508 line
= sdstrim(line
," \t\r\n");
1510 /* Skip comments and blank lines*/
1511 if (line
[0] == '#' || line
[0] == '\0') {
1516 /* Split into arguments */
1517 argv
= sdssplitlen(line
,sdslen(line
)," ",1,&argc
);
1518 sdstolower(argv
[0]);
1520 /* Execute config directives */
1521 if (!strcasecmp(argv
[0],"timeout") && argc
== 2) {
1522 server
.maxidletime
= atoi(argv
[1]);
1523 if (server
.maxidletime
< 0) {
1524 err
= "Invalid timeout value"; goto loaderr
;
1526 } else if (!strcasecmp(argv
[0],"port") && argc
== 2) {
1527 server
.port
= atoi(argv
[1]);
1528 if (server
.port
< 1 || server
.port
> 65535) {
1529 err
= "Invalid port"; goto loaderr
;
1531 } else if (!strcasecmp(argv
[0],"bind") && argc
== 2) {
1532 server
.bindaddr
= zstrdup(argv
[1]);
1533 } else if (!strcasecmp(argv
[0],"save") && argc
== 3) {
1534 int seconds
= atoi(argv
[1]);
1535 int changes
= atoi(argv
[2]);
1536 if (seconds
< 1 || changes
< 0) {
1537 err
= "Invalid save parameters"; goto loaderr
;
1539 appendServerSaveParams(seconds
,changes
);
1540 } else if (!strcasecmp(argv
[0],"dir") && argc
== 2) {
1541 if (chdir(argv
[1]) == -1) {
1542 redisLog(REDIS_WARNING
,"Can't chdir to '%s': %s",
1543 argv
[1], strerror(errno
));
1546 } else if (!strcasecmp(argv
[0],"loglevel") && argc
== 2) {
1547 if (!strcasecmp(argv
[1],"debug")) server
.verbosity
= REDIS_DEBUG
;
1548 else if (!strcasecmp(argv
[1],"verbose")) server
.verbosity
= REDIS_VERBOSE
;
1549 else if (!strcasecmp(argv
[1],"notice")) server
.verbosity
= REDIS_NOTICE
;
1550 else if (!strcasecmp(argv
[1],"warning")) server
.verbosity
= REDIS_WARNING
;
1552 err
= "Invalid log level. Must be one of debug, notice, warning";
1555 } else if (!strcasecmp(argv
[0],"logfile") && argc
== 2) {
1558 server
.logfile
= zstrdup(argv
[1]);
1559 if (!strcasecmp(server
.logfile
,"stdout")) {
1560 zfree(server
.logfile
);
1561 server
.logfile
= NULL
;
1563 if (server
.logfile
) {
1564 /* Test if we are able to open the file. The server will not
1565 * be able to abort just for this problem later... */
1566 logfp
= fopen(server
.logfile
,"a");
1567 if (logfp
== NULL
) {
1568 err
= sdscatprintf(sdsempty(),
1569 "Can't open the log file: %s", strerror(errno
));
1574 } else if (!strcasecmp(argv
[0],"databases") && argc
== 2) {
1575 server
.dbnum
= atoi(argv
[1]);
1576 if (server
.dbnum
< 1) {
1577 err
= "Invalid number of databases"; goto loaderr
;
1579 } else if (!strcasecmp(argv
[0],"maxclients") && argc
== 2) {
1580 server
.maxclients
= atoi(argv
[1]);
1581 } else if (!strcasecmp(argv
[0],"maxmemory") && argc
== 2) {
1582 server
.maxmemory
= strtoll(argv
[1], NULL
, 10);
1583 } else if (!strcasecmp(argv
[0],"slaveof") && argc
== 3) {
1584 server
.masterhost
= sdsnew(argv
[1]);
1585 server
.masterport
= atoi(argv
[2]);
1586 server
.replstate
= REDIS_REPL_CONNECT
;
1587 } else if (!strcasecmp(argv
[0],"masterauth") && argc
== 2) {
1588 server
.masterauth
= zstrdup(argv
[1]);
1589 } else if (!strcasecmp(argv
[0],"glueoutputbuf") && argc
== 2) {
1590 if ((server
.glueoutputbuf
= yesnotoi(argv
[1])) == -1) {
1591 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1593 } else if (!strcasecmp(argv
[0],"shareobjects") && argc
== 2) {
1594 if ((server
.shareobjects
= yesnotoi(argv
[1])) == -1) {
1595 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1597 } else if (!strcasecmp(argv
[0],"rdbcompression") && argc
== 2) {
1598 if ((server
.rdbcompression
= yesnotoi(argv
[1])) == -1) {
1599 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1601 } else if (!strcasecmp(argv
[0],"shareobjectspoolsize") && argc
== 2) {
1602 server
.sharingpoolsize
= atoi(argv
[1]);
1603 if (server
.sharingpoolsize
< 1) {
1604 err
= "invalid object sharing pool size"; goto loaderr
;
1606 } else if (!strcasecmp(argv
[0],"daemonize") && argc
== 2) {
1607 if ((server
.daemonize
= yesnotoi(argv
[1])) == -1) {
1608 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1610 } else if (!strcasecmp(argv
[0],"appendonly") && argc
== 2) {
1611 if ((server
.appendonly
= yesnotoi(argv
[1])) == -1) {
1612 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1614 } else if (!strcasecmp(argv
[0],"appendfsync") && argc
== 2) {
1615 if (!strcasecmp(argv
[1],"no")) {
1616 server
.appendfsync
= APPENDFSYNC_NO
;
1617 } else if (!strcasecmp(argv
[1],"always")) {
1618 server
.appendfsync
= APPENDFSYNC_ALWAYS
;
1619 } else if (!strcasecmp(argv
[1],"everysec")) {
1620 server
.appendfsync
= APPENDFSYNC_EVERYSEC
;
1622 err
= "argument must be 'no', 'always' or 'everysec'";
1625 } else if (!strcasecmp(argv
[0],"requirepass") && argc
== 2) {
1626 server
.requirepass
= zstrdup(argv
[1]);
1627 } else if (!strcasecmp(argv
[0],"pidfile") && argc
== 2) {
1628 server
.pidfile
= zstrdup(argv
[1]);
1629 } else if (!strcasecmp(argv
[0],"dbfilename") && argc
== 2) {
1630 server
.dbfilename
= zstrdup(argv
[1]);
1631 } else if (!strcasecmp(argv
[0],"vm-enabled") && argc
== 2) {
1632 if ((server
.vm_enabled
= yesnotoi(argv
[1])) == -1) {
1633 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1635 } else if (!strcasecmp(argv
[0],"vm-swap-file") && argc
== 2) {
1636 server
.vm_swap_file
= zstrdup(argv
[1]);
1637 } else if (!strcasecmp(argv
[0],"vm-max-memory") && argc
== 2) {
1638 server
.vm_max_memory
= strtoll(argv
[1], NULL
, 10);
1639 } else if (!strcasecmp(argv
[0],"vm-page-size") && argc
== 2) {
1640 server
.vm_page_size
= strtoll(argv
[1], NULL
, 10);
1641 } else if (!strcasecmp(argv
[0],"vm-pages") && argc
== 2) {
1642 server
.vm_pages
= strtoll(argv
[1], NULL
, 10);
1643 } else if (!strcasecmp(argv
[0],"vm-max-threads") && argc
== 2) {
1644 server
.vm_max_threads
= strtoll(argv
[1], NULL
, 10);
1646 err
= "Bad directive or wrong number of arguments"; goto loaderr
;
1648 for (j
= 0; j
< argc
; j
++)
1653 if (fp
!= stdin
) fclose(fp
);
1657 fprintf(stderr
, "\n*** FATAL CONFIG FILE ERROR ***\n");
1658 fprintf(stderr
, "Reading the configuration file, at line %d\n", linenum
);
1659 fprintf(stderr
, ">>> '%s'\n", line
);
1660 fprintf(stderr
, "%s\n", err
);
1664 static void freeClientArgv(redisClient
*c
) {
1667 for (j
= 0; j
< c
->argc
; j
++)
1668 decrRefCount(c
->argv
[j
]);
1669 for (j
= 0; j
< c
->mbargc
; j
++)
1670 decrRefCount(c
->mbargv
[j
]);
1675 static void freeClient(redisClient
*c
) {
1678 /* Note that if the client we are freeing is blocked into a blocking
1679 * call, we have to set querybuf to NULL *before* to call unblockClient()
1680 * to avoid processInputBuffer() will get called. Also it is important
1681 * to remove the file events after this, because this call adds
1682 * the READABLE event. */
1683 sdsfree(c
->querybuf
);
1685 if (c
->flags
& REDIS_BLOCKED
)
1688 aeDeleteFileEvent(server
.el
,c
->fd
,AE_READABLE
);
1689 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1690 listRelease(c
->reply
);
1693 /* Remove from the list of clients */
1694 ln
= listSearchKey(server
.clients
,c
);
1695 redisAssert(ln
!= NULL
);
1696 listDelNode(server
.clients
,ln
);
1697 /* Remove from the list of clients waiting for VM operations */
1698 if (server
.vm_enabled
&& listLength(c
->io_keys
)) {
1699 ln
= listSearchKey(server
.io_clients
,c
);
1700 if (ln
) listDelNode(server
.io_clients
,ln
);
1701 listRelease(c
->io_keys
);
1703 listRelease(c
->io_keys
);
1705 if (c
->flags
& REDIS_SLAVE
) {
1706 if (c
->replstate
== REDIS_REPL_SEND_BULK
&& c
->repldbfd
!= -1)
1708 list
*l
= (c
->flags
& REDIS_MONITOR
) ? server
.monitors
: server
.slaves
;
1709 ln
= listSearchKey(l
,c
);
1710 redisAssert(ln
!= NULL
);
1713 if (c
->flags
& REDIS_MASTER
) {
1714 server
.master
= NULL
;
1715 server
.replstate
= REDIS_REPL_CONNECT
;
1719 freeClientMultiState(c
);
1723 #define GLUEREPLY_UP_TO (1024)
1724 static void glueReplyBuffersIfNeeded(redisClient
*c
) {
1726 char buf
[GLUEREPLY_UP_TO
];
1731 listRewind(c
->reply
,&li
);
1732 while((ln
= listNext(&li
))) {
1736 objlen
= sdslen(o
->ptr
);
1737 if (copylen
+ objlen
<= GLUEREPLY_UP_TO
) {
1738 memcpy(buf
+copylen
,o
->ptr
,objlen
);
1740 listDelNode(c
->reply
,ln
);
1742 if (copylen
== 0) return;
1746 /* Now the output buffer is empty, add the new single element */
1747 o
= createObject(REDIS_STRING
,sdsnewlen(buf
,copylen
));
1748 listAddNodeHead(c
->reply
,o
);
1751 static void sendReplyToClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1752 redisClient
*c
= privdata
;
1753 int nwritten
= 0, totwritten
= 0, objlen
;
1756 REDIS_NOTUSED(mask
);
1758 /* Use writev() if we have enough buffers to send */
1759 if (!server
.glueoutputbuf
&&
1760 listLength(c
->reply
) > REDIS_WRITEV_THRESHOLD
&&
1761 !(c
->flags
& REDIS_MASTER
))
1763 sendReplyToClientWritev(el
, fd
, privdata
, mask
);
1767 while(listLength(c
->reply
)) {
1768 if (server
.glueoutputbuf
&& listLength(c
->reply
) > 1)
1769 glueReplyBuffersIfNeeded(c
);
1771 o
= listNodeValue(listFirst(c
->reply
));
1772 objlen
= sdslen(o
->ptr
);
1775 listDelNode(c
->reply
,listFirst(c
->reply
));
1779 if (c
->flags
& REDIS_MASTER
) {
1780 /* Don't reply to a master */
1781 nwritten
= objlen
- c
->sentlen
;
1783 nwritten
= write(fd
, ((char*)o
->ptr
)+c
->sentlen
, objlen
- c
->sentlen
);
1784 if (nwritten
<= 0) break;
1786 c
->sentlen
+= nwritten
;
1787 totwritten
+= nwritten
;
1788 /* If we fully sent the object on head go to the next one */
1789 if (c
->sentlen
== objlen
) {
1790 listDelNode(c
->reply
,listFirst(c
->reply
));
1793 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1794 * bytes, in a single threaded server it's a good idea to serve
1795 * other clients as well, even if a very large request comes from
1796 * super fast link that is always able to accept data (in real world
1797 * scenario think about 'KEYS *' against the loopback interfae) */
1798 if (totwritten
> REDIS_MAX_WRITE_PER_EVENT
) break;
1800 if (nwritten
== -1) {
1801 if (errno
== EAGAIN
) {
1804 redisLog(REDIS_VERBOSE
,
1805 "Error writing to client: %s", strerror(errno
));
1810 if (totwritten
> 0) c
->lastinteraction
= time(NULL
);
1811 if (listLength(c
->reply
) == 0) {
1813 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1817 static void sendReplyToClientWritev(aeEventLoop
*el
, int fd
, void *privdata
, int mask
)
1819 redisClient
*c
= privdata
;
1820 int nwritten
= 0, totwritten
= 0, objlen
, willwrite
;
1822 struct iovec iov
[REDIS_WRITEV_IOVEC_COUNT
];
1823 int offset
, ion
= 0;
1825 REDIS_NOTUSED(mask
);
1828 while (listLength(c
->reply
)) {
1829 offset
= c
->sentlen
;
1833 /* fill-in the iov[] array */
1834 for(node
= listFirst(c
->reply
); node
; node
= listNextNode(node
)) {
1835 o
= listNodeValue(node
);
1836 objlen
= sdslen(o
->ptr
);
1838 if (totwritten
+ objlen
- offset
> REDIS_MAX_WRITE_PER_EVENT
)
1841 if(ion
== REDIS_WRITEV_IOVEC_COUNT
)
1842 break; /* no more iovecs */
1844 iov
[ion
].iov_base
= ((char*)o
->ptr
) + offset
;
1845 iov
[ion
].iov_len
= objlen
- offset
;
1846 willwrite
+= objlen
- offset
;
1847 offset
= 0; /* just for the first item */
1854 /* write all collected blocks at once */
1855 if((nwritten
= writev(fd
, iov
, ion
)) < 0) {
1856 if (errno
!= EAGAIN
) {
1857 redisLog(REDIS_VERBOSE
,
1858 "Error writing to client: %s", strerror(errno
));
1865 totwritten
+= nwritten
;
1866 offset
= c
->sentlen
;
1868 /* remove written robjs from c->reply */
1869 while (nwritten
&& listLength(c
->reply
)) {
1870 o
= listNodeValue(listFirst(c
->reply
));
1871 objlen
= sdslen(o
->ptr
);
1873 if(nwritten
>= objlen
- offset
) {
1874 listDelNode(c
->reply
, listFirst(c
->reply
));
1875 nwritten
-= objlen
- offset
;
1879 c
->sentlen
+= nwritten
;
1887 c
->lastinteraction
= time(NULL
);
1889 if (listLength(c
->reply
) == 0) {
1891 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1895 static struct redisCommand
*lookupCommand(char *name
) {
1897 while(cmdTable
[j
].name
!= NULL
) {
1898 if (!strcasecmp(name
,cmdTable
[j
].name
)) return &cmdTable
[j
];
1904 /* resetClient prepare the client to process the next command */
1905 static void resetClient(redisClient
*c
) {
1911 /* Call() is the core of Redis execution of a command */
1912 static void call(redisClient
*c
, struct redisCommand
*cmd
) {
1915 dirty
= server
.dirty
;
1917 if (server
.appendonly
&& server
.dirty
-dirty
)
1918 feedAppendOnlyFile(cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1919 if (server
.dirty
-dirty
&& listLength(server
.slaves
))
1920 replicationFeedSlaves(server
.slaves
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1921 if (listLength(server
.monitors
))
1922 replicationFeedSlaves(server
.monitors
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1923 server
.stat_numcommands
++;
1926 /* If this function gets called we already read a whole
1927 * command, argments are in the client argv/argc fields.
1928 * processCommand() execute the command or prepare the
1929 * server for a bulk read from the client.
1931 * If 1 is returned the client is still alive and valid and
1932 * and other operations can be performed by the caller. Otherwise
1933 * if 0 is returned the client was destroied (i.e. after QUIT). */
1934 static int processCommand(redisClient
*c
) {
1935 struct redisCommand
*cmd
;
1937 /* Free some memory if needed (maxmemory setting) */
1938 if (server
.maxmemory
) freeMemoryIfNeeded();
1940 /* Handle the multi bulk command type. This is an alternative protocol
1941 * supported by Redis in order to receive commands that are composed of
1942 * multiple binary-safe "bulk" arguments. The latency of processing is
1943 * a bit higher but this allows things like multi-sets, so if this
1944 * protocol is used only for MSET and similar commands this is a big win. */
1945 if (c
->multibulk
== 0 && c
->argc
== 1 && ((char*)(c
->argv
[0]->ptr
))[0] == '*') {
1946 c
->multibulk
= atoi(((char*)c
->argv
[0]->ptr
)+1);
1947 if (c
->multibulk
<= 0) {
1951 decrRefCount(c
->argv
[c
->argc
-1]);
1955 } else if (c
->multibulk
) {
1956 if (c
->bulklen
== -1) {
1957 if (((char*)c
->argv
[0]->ptr
)[0] != '$') {
1958 addReplySds(c
,sdsnew("-ERR multi bulk protocol error\r\n"));
1962 int bulklen
= atoi(((char*)c
->argv
[0]->ptr
)+1);
1963 decrRefCount(c
->argv
[0]);
1964 if (bulklen
< 0 || bulklen
> 1024*1024*1024) {
1966 addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n"));
1971 c
->bulklen
= bulklen
+2; /* add two bytes for CR+LF */
1975 c
->mbargv
= zrealloc(c
->mbargv
,(sizeof(robj
*))*(c
->mbargc
+1));
1976 c
->mbargv
[c
->mbargc
] = c
->argv
[0];
1980 if (c
->multibulk
== 0) {
1984 /* Here we need to swap the multi-bulk argc/argv with the
1985 * normal argc/argv of the client structure. */
1987 c
->argv
= c
->mbargv
;
1988 c
->mbargv
= auxargv
;
1991 c
->argc
= c
->mbargc
;
1992 c
->mbargc
= auxargc
;
1994 /* We need to set bulklen to something different than -1
1995 * in order for the code below to process the command without
1996 * to try to read the last argument of a bulk command as
1997 * a special argument. */
1999 /* continue below and process the command */
2006 /* -- end of multi bulk commands processing -- */
2008 /* The QUIT command is handled as a special case. Normal command
2009 * procs are unable to close the client connection safely */
2010 if (!strcasecmp(c
->argv
[0]->ptr
,"quit")) {
2014 cmd
= lookupCommand(c
->argv
[0]->ptr
);
2017 sdscatprintf(sdsempty(), "-ERR unknown command '%s'\r\n",
2018 (char*)c
->argv
[0]->ptr
));
2021 } else if ((cmd
->arity
> 0 && cmd
->arity
!= c
->argc
) ||
2022 (c
->argc
< -cmd
->arity
)) {
2024 sdscatprintf(sdsempty(),
2025 "-ERR wrong number of arguments for '%s' command\r\n",
2029 } else if (server
.maxmemory
&& cmd
->flags
& REDIS_CMD_DENYOOM
&& zmalloc_used_memory() > server
.maxmemory
) {
2030 addReplySds(c
,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
2033 } else if (cmd
->flags
& REDIS_CMD_BULK
&& c
->bulklen
== -1) {
2034 int bulklen
= atoi(c
->argv
[c
->argc
-1]->ptr
);
2036 decrRefCount(c
->argv
[c
->argc
-1]);
2037 if (bulklen
< 0 || bulklen
> 1024*1024*1024) {
2039 addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n"));
2044 c
->bulklen
= bulklen
+2; /* add two bytes for CR+LF */
2045 /* It is possible that the bulk read is already in the
2046 * buffer. Check this condition and handle it accordingly.
2047 * This is just a fast path, alternative to call processInputBuffer().
2048 * It's a good idea since the code is small and this condition
2049 * happens most of the times. */
2050 if ((signed)sdslen(c
->querybuf
) >= c
->bulklen
) {
2051 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
2053 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
2058 /* Let's try to share objects on the command arguments vector */
2059 if (server
.shareobjects
) {
2061 for(j
= 1; j
< c
->argc
; j
++)
2062 c
->argv
[j
] = tryObjectSharing(c
->argv
[j
]);
2064 /* Let's try to encode the bulk object to save space. */
2065 if (cmd
->flags
& REDIS_CMD_BULK
)
2066 tryObjectEncoding(c
->argv
[c
->argc
-1]);
2068 /* Check if the user is authenticated */
2069 if (server
.requirepass
&& !c
->authenticated
&& cmd
->proc
!= authCommand
) {
2070 addReplySds(c
,sdsnew("-ERR operation not permitted\r\n"));
2075 /* Exec the command */
2076 if (c
->flags
& REDIS_MULTI
&& cmd
->proc
!= execCommand
) {
2077 queueMultiCommand(c
,cmd
);
2078 addReply(c
,shared
.queued
);
2083 /* Prepare the client for the next command */
2084 if (c
->flags
& REDIS_CLOSE
) {
2092 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
) {
2097 /* (args*2)+1 is enough room for args, spaces, newlines */
2098 robj
*static_outv
[REDIS_STATIC_ARGS
*2+1];
2100 if (argc
<= REDIS_STATIC_ARGS
) {
2103 outv
= zmalloc(sizeof(robj
*)*(argc
*2+1));
2106 for (j
= 0; j
< argc
; j
++) {
2107 if (j
!= 0) outv
[outc
++] = shared
.space
;
2108 if ((cmd
->flags
& REDIS_CMD_BULK
) && j
== argc
-1) {
2111 lenobj
= createObject(REDIS_STRING
,
2112 sdscatprintf(sdsempty(),"%lu\r\n",
2113 (unsigned long) stringObjectLen(argv
[j
])));
2114 lenobj
->refcount
= 0;
2115 outv
[outc
++] = lenobj
;
2117 outv
[outc
++] = argv
[j
];
2119 outv
[outc
++] = shared
.crlf
;
2121 /* Increment all the refcounts at start and decrement at end in order to
2122 * be sure to free objects if there is no slave in a replication state
2123 * able to be feed with commands */
2124 for (j
= 0; j
< outc
; j
++) incrRefCount(outv
[j
]);
2125 listRewind(slaves
,&li
);
2126 while((ln
= listNext(&li
))) {
2127 redisClient
*slave
= ln
->value
;
2129 /* Don't feed slaves that are still waiting for BGSAVE to start */
2130 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) continue;
2132 /* Feed all the other slaves, MONITORs and so on */
2133 if (slave
->slaveseldb
!= dictid
) {
2137 case 0: selectcmd
= shared
.select0
; break;
2138 case 1: selectcmd
= shared
.select1
; break;
2139 case 2: selectcmd
= shared
.select2
; break;
2140 case 3: selectcmd
= shared
.select3
; break;
2141 case 4: selectcmd
= shared
.select4
; break;
2142 case 5: selectcmd
= shared
.select5
; break;
2143 case 6: selectcmd
= shared
.select6
; break;
2144 case 7: selectcmd
= shared
.select7
; break;
2145 case 8: selectcmd
= shared
.select8
; break;
2146 case 9: selectcmd
= shared
.select9
; break;
2148 selectcmd
= createObject(REDIS_STRING
,
2149 sdscatprintf(sdsempty(),"select %d\r\n",dictid
));
2150 selectcmd
->refcount
= 0;
2153 addReply(slave
,selectcmd
);
2154 slave
->slaveseldb
= dictid
;
2156 for (j
= 0; j
< outc
; j
++) addReply(slave
,outv
[j
]);
2158 for (j
= 0; j
< outc
; j
++) decrRefCount(outv
[j
]);
2159 if (outv
!= static_outv
) zfree(outv
);
2162 static void processInputBuffer(redisClient
*c
) {
2164 /* Before to process the input buffer, make sure the client is not
2165 * waitig for a blocking operation such as BLPOP. Note that the first
2166 * iteration the client is never blocked, otherwise the processInputBuffer
2167 * would not be called at all, but after the execution of the first commands
2168 * in the input buffer the client may be blocked, and the "goto again"
2169 * will try to reiterate. The following line will make it return asap. */
2170 if (c
->flags
& REDIS_BLOCKED
|| c
->flags
& REDIS_IO_WAIT
) return;
2171 if (c
->bulklen
== -1) {
2172 /* Read the first line of the query */
2173 char *p
= strchr(c
->querybuf
,'\n');
2180 query
= c
->querybuf
;
2181 c
->querybuf
= sdsempty();
2182 querylen
= 1+(p
-(query
));
2183 if (sdslen(query
) > querylen
) {
2184 /* leave data after the first line of the query in the buffer */
2185 c
->querybuf
= sdscatlen(c
->querybuf
,query
+querylen
,sdslen(query
)-querylen
);
2187 *p
= '\0'; /* remove "\n" */
2188 if (*(p
-1) == '\r') *(p
-1) = '\0'; /* and "\r" if any */
2189 sdsupdatelen(query
);
2191 /* Now we can split the query in arguments */
2192 argv
= sdssplitlen(query
,sdslen(query
)," ",1,&argc
);
2195 if (c
->argv
) zfree(c
->argv
);
2196 c
->argv
= zmalloc(sizeof(robj
*)*argc
);
2198 for (j
= 0; j
< argc
; j
++) {
2199 if (sdslen(argv
[j
])) {
2200 c
->argv
[c
->argc
] = createObject(REDIS_STRING
,argv
[j
]);
2208 /* Execute the command. If the client is still valid
2209 * after processCommand() return and there is something
2210 * on the query buffer try to process the next command. */
2211 if (processCommand(c
) && sdslen(c
->querybuf
)) goto again
;
2213 /* Nothing to process, argc == 0. Just process the query
2214 * buffer if it's not empty or return to the caller */
2215 if (sdslen(c
->querybuf
)) goto again
;
2218 } else if (sdslen(c
->querybuf
) >= REDIS_REQUEST_MAX_SIZE
) {
2219 redisLog(REDIS_VERBOSE
, "Client protocol error");
2224 /* Bulk read handling. Note that if we are at this point
2225 the client already sent a command terminated with a newline,
2226 we are reading the bulk data that is actually the last
2227 argument of the command. */
2228 int qbl
= sdslen(c
->querybuf
);
2230 if (c
->bulklen
<= qbl
) {
2231 /* Copy everything but the final CRLF as final argument */
2232 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
2234 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
2235 /* Process the command. If the client is still valid after
2236 * the processing and there is more data in the buffer
2237 * try to parse it. */
2238 if (processCommand(c
) && sdslen(c
->querybuf
)) goto again
;
2244 static void readQueryFromClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
2245 redisClient
*c
= (redisClient
*) privdata
;
2246 char buf
[REDIS_IOBUF_LEN
];
2249 REDIS_NOTUSED(mask
);
2251 nread
= read(fd
, buf
, REDIS_IOBUF_LEN
);
2253 if (errno
== EAGAIN
) {
2256 redisLog(REDIS_VERBOSE
, "Reading from client: %s",strerror(errno
));
2260 } else if (nread
== 0) {
2261 redisLog(REDIS_VERBOSE
, "Client closed connection");
2266 c
->querybuf
= sdscatlen(c
->querybuf
, buf
, nread
);
2267 c
->lastinteraction
= time(NULL
);
2271 processInputBuffer(c
);
2274 static int selectDb(redisClient
*c
, int id
) {
2275 if (id
< 0 || id
>= server
.dbnum
)
2277 c
->db
= &server
.db
[id
];
2281 static void *dupClientReplyValue(void *o
) {
2282 incrRefCount((robj
*)o
);
2286 static redisClient
*createClient(int fd
) {
2287 redisClient
*c
= zmalloc(sizeof(*c
));
2289 anetNonBlock(NULL
,fd
);
2290 anetTcpNoDelay(NULL
,fd
);
2291 if (!c
) return NULL
;
2294 c
->querybuf
= sdsempty();
2303 c
->lastinteraction
= time(NULL
);
2304 c
->authenticated
= 0;
2305 c
->replstate
= REDIS_REPL_NONE
;
2306 c
->reply
= listCreate();
2307 listSetFreeMethod(c
->reply
,decrRefCount
);
2308 listSetDupMethod(c
->reply
,dupClientReplyValue
);
2309 c
->blockingkeys
= NULL
;
2310 c
->blockingkeysnum
= 0;
2311 c
->io_keys
= listCreate();
2312 listSetFreeMethod(c
->io_keys
,decrRefCount
);
2313 if (aeCreateFileEvent(server
.el
, c
->fd
, AE_READABLE
,
2314 readQueryFromClient
, c
) == AE_ERR
) {
2318 listAddNodeTail(server
.clients
,c
);
2319 initClientMultiState(c
);
2323 static void addReply(redisClient
*c
, robj
*obj
) {
2324 if (listLength(c
->reply
) == 0 &&
2325 (c
->replstate
== REDIS_REPL_NONE
||
2326 c
->replstate
== REDIS_REPL_ONLINE
) &&
2327 aeCreateFileEvent(server
.el
, c
->fd
, AE_WRITABLE
,
2328 sendReplyToClient
, c
) == AE_ERR
) return;
2330 if (server
.vm_enabled
&& obj
->storage
!= REDIS_VM_MEMORY
) {
2331 obj
= dupStringObject(obj
);
2332 obj
->refcount
= 0; /* getDecodedObject() will increment the refcount */
2334 listAddNodeTail(c
->reply
,getDecodedObject(obj
));
2337 static void addReplySds(redisClient
*c
, sds s
) {
2338 robj
*o
= createObject(REDIS_STRING
,s
);
2343 static void addReplyDouble(redisClient
*c
, double d
) {
2346 snprintf(buf
,sizeof(buf
),"%.17g",d
);
2347 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n%s\r\n",
2348 (unsigned long) strlen(buf
),buf
));
2351 static void addReplyBulkLen(redisClient
*c
, robj
*obj
) {
2354 if (obj
->encoding
== REDIS_ENCODING_RAW
) {
2355 len
= sdslen(obj
->ptr
);
2357 long n
= (long)obj
->ptr
;
2359 /* Compute how many bytes will take this integer as a radix 10 string */
2365 while((n
= n
/10) != 0) {
2369 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n",(unsigned long)len
));
2372 static void acceptHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
2377 REDIS_NOTUSED(mask
);
2378 REDIS_NOTUSED(privdata
);
2380 cfd
= anetAccept(server
.neterr
, fd
, cip
, &cport
);
2381 if (cfd
== AE_ERR
) {
2382 redisLog(REDIS_VERBOSE
,"Accepting client connection: %s", server
.neterr
);
2385 redisLog(REDIS_VERBOSE
,"Accepted %s:%d", cip
, cport
);
2386 if ((c
= createClient(cfd
)) == NULL
) {
2387 redisLog(REDIS_WARNING
,"Error allocating resoures for the client");
2388 close(cfd
); /* May be already closed, just ingore errors */
2391 /* If maxclient directive is set and this is one client more... close the
2392 * connection. Note that we create the client instead to check before
2393 * for this condition, since now the socket is already set in nonblocking
2394 * mode and we can send an error for free using the Kernel I/O */
2395 if (server
.maxclients
&& listLength(server
.clients
) > server
.maxclients
) {
2396 char *err
= "-ERR max number of clients reached\r\n";
2398 /* That's a best effort error message, don't check write errors */
2399 if (write(c
->fd
,err
,strlen(err
)) == -1) {
2400 /* Nothing to do, Just to avoid the warning... */
2405 server
.stat_numconnections
++;
2408 /* ======================= Redis objects implementation ===================== */
2410 static robj
*createObject(int type
, void *ptr
) {
2413 if (server
.vm_enabled
) pthread_mutex_lock(&server
.obj_freelist_mutex
);
2414 if (listLength(server
.objfreelist
)) {
2415 listNode
*head
= listFirst(server
.objfreelist
);
2416 o
= listNodeValue(head
);
2417 listDelNode(server
.objfreelist
,head
);
2418 if (server
.vm_enabled
) pthread_mutex_unlock(&server
.obj_freelist_mutex
);
2420 if (server
.vm_enabled
) {
2421 pthread_mutex_unlock(&server
.obj_freelist_mutex
);
2422 o
= zmalloc(sizeof(*o
));
2424 o
= zmalloc(sizeof(*o
)-sizeof(struct redisObjectVM
));
2428 o
->encoding
= REDIS_ENCODING_RAW
;
2431 if (server
.vm_enabled
) {
2432 /* Note that this code may run in the context of an I/O thread
2433 * and accessing to server.unixtime in theory is an error
2434 * (no locks). But in practice this is safe, and even if we read
2435 * garbage Redis will not fail, as it's just a statistical info */
2436 o
->vm
.atime
= server
.unixtime
;
2437 o
->storage
= REDIS_VM_MEMORY
;
2442 static robj
*createStringObject(char *ptr
, size_t len
) {
2443 return createObject(REDIS_STRING
,sdsnewlen(ptr
,len
));
2446 static robj
*dupStringObject(robj
*o
) {
2447 assert(o
->encoding
== REDIS_ENCODING_RAW
);
2448 return createStringObject(o
->ptr
,sdslen(o
->ptr
));
2451 static robj
*createListObject(void) {
2452 list
*l
= listCreate();
2454 listSetFreeMethod(l
,decrRefCount
);
2455 return createObject(REDIS_LIST
,l
);
2458 static robj
*createSetObject(void) {
2459 dict
*d
= dictCreate(&setDictType
,NULL
);
2460 return createObject(REDIS_SET
,d
);
2463 static robj
*createZsetObject(void) {
2464 zset
*zs
= zmalloc(sizeof(*zs
));
2466 zs
->dict
= dictCreate(&zsetDictType
,NULL
);
2467 zs
->zsl
= zslCreate();
2468 return createObject(REDIS_ZSET
,zs
);
2471 static void freeStringObject(robj
*o
) {
2472 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2477 static void freeListObject(robj
*o
) {
2478 listRelease((list
*) o
->ptr
);
2481 static void freeSetObject(robj
*o
) {
2482 dictRelease((dict
*) o
->ptr
);
2485 static void freeZsetObject(robj
*o
) {
2488 dictRelease(zs
->dict
);
2493 static void freeHashObject(robj
*o
) {
2494 dictRelease((dict
*) o
->ptr
);
2497 static void incrRefCount(robj
*o
) {
2498 redisAssert(!server
.vm_enabled
|| o
->storage
== REDIS_VM_MEMORY
);
2502 static void decrRefCount(void *obj
) {
2505 /* Object is swapped out, or in the process of being loaded. */
2506 if (server
.vm_enabled
&&
2507 (o
->storage
== REDIS_VM_SWAPPED
|| o
->storage
== REDIS_VM_LOADING
))
2509 if (o
->storage
== REDIS_VM_SWAPPED
|| o
->storage
== REDIS_VM_LOADING
) {
2510 redisAssert(o
->refcount
== 1);
2512 if (o
->storage
== REDIS_VM_LOADING
) vmCancelThreadedIOJob(obj
);
2513 redisAssert(o
->type
== REDIS_STRING
);
2514 freeStringObject(o
);
2515 vmMarkPagesFree(o
->vm
.page
,o
->vm
.usedpages
);
2516 pthread_mutex_lock(&server
.obj_freelist_mutex
);
2517 if (listLength(server
.objfreelist
) > REDIS_OBJFREELIST_MAX
||
2518 !listAddNodeHead(server
.objfreelist
,o
))
2520 pthread_mutex_unlock(&server
.obj_freelist_mutex
);
2521 server
.vm_stats_swapped_objects
--;
2524 /* Object is in memory, or in the process of being swapped out. */
2525 if (--(o
->refcount
) == 0) {
2526 if (server
.vm_enabled
&& o
->storage
== REDIS_VM_SWAPPING
)
2527 vmCancelThreadedIOJob(obj
);
2529 case REDIS_STRING
: freeStringObject(o
); break;
2530 case REDIS_LIST
: freeListObject(o
); break;
2531 case REDIS_SET
: freeSetObject(o
); break;
2532 case REDIS_ZSET
: freeZsetObject(o
); break;
2533 case REDIS_HASH
: freeHashObject(o
); break;
2534 default: redisAssert(0 != 0); break;
2536 if (server
.vm_enabled
) pthread_mutex_lock(&server
.obj_freelist_mutex
);
2537 if (listLength(server
.objfreelist
) > REDIS_OBJFREELIST_MAX
||
2538 !listAddNodeHead(server
.objfreelist
,o
))
2540 if (server
.vm_enabled
) pthread_mutex_unlock(&server
.obj_freelist_mutex
);
2544 static robj
*lookupKey(redisDb
*db
, robj
*key
) {
2545 dictEntry
*de
= dictFind(db
->dict
,key
);
2547 robj
*key
= dictGetEntryKey(de
);
2548 robj
*val
= dictGetEntryVal(de
);
2550 if (server
.vm_enabled
) {
2551 if (key
->storage
== REDIS_VM_MEMORY
||
2552 key
->storage
== REDIS_VM_SWAPPING
)
2554 /* If we were swapping the object out, stop it, this key
2556 if (key
->storage
== REDIS_VM_SWAPPING
)
2557 vmCancelThreadedIOJob(key
);
2558 /* Update the access time of the key for the aging algorithm. */
2559 key
->vm
.atime
= server
.unixtime
;
2561 /* Our value was swapped on disk. Bring it at home. */
2562 redisAssert(val
== NULL
);
2563 val
= vmLoadObject(key
);
2564 dictGetEntryVal(de
) = val
;
2573 static robj
*lookupKeyRead(redisDb
*db
, robj
*key
) {
2574 expireIfNeeded(db
,key
);
2575 return lookupKey(db
,key
);
2578 static robj
*lookupKeyWrite(redisDb
*db
, robj
*key
) {
2579 deleteIfVolatile(db
,key
);
2580 return lookupKey(db
,key
);
2583 static int deleteKey(redisDb
*db
, robj
*key
) {
2586 /* We need to protect key from destruction: after the first dictDelete()
2587 * it may happen that 'key' is no longer valid if we don't increment
2588 * it's count. This may happen when we get the object reference directly
2589 * from the hash table with dictRandomKey() or dict iterators */
2591 if (dictSize(db
->expires
)) dictDelete(db
->expires
,key
);
2592 retval
= dictDelete(db
->dict
,key
);
2595 return retval
== DICT_OK
;
2598 /* Try to share an object against the shared objects pool */
2599 static robj
*tryObjectSharing(robj
*o
) {
2600 struct dictEntry
*de
;
2603 if (o
== NULL
|| server
.shareobjects
== 0) return o
;
2605 redisAssert(o
->type
== REDIS_STRING
);
2606 de
= dictFind(server
.sharingpool
,o
);
2608 robj
*shared
= dictGetEntryKey(de
);
2610 c
= ((unsigned long) dictGetEntryVal(de
))+1;
2611 dictGetEntryVal(de
) = (void*) c
;
2612 incrRefCount(shared
);
2616 /* Here we are using a stream algorihtm: Every time an object is
2617 * shared we increment its count, everytime there is a miss we
2618 * recrement the counter of a random object. If this object reaches
2619 * zero we remove the object and put the current object instead. */
2620 if (dictSize(server
.sharingpool
) >=
2621 server
.sharingpoolsize
) {
2622 de
= dictGetRandomKey(server
.sharingpool
);
2623 redisAssert(de
!= NULL
);
2624 c
= ((unsigned long) dictGetEntryVal(de
))-1;
2625 dictGetEntryVal(de
) = (void*) c
;
2627 dictDelete(server
.sharingpool
,de
->key
);
2630 c
= 0; /* If the pool is empty we want to add this object */
2635 retval
= dictAdd(server
.sharingpool
,o
,(void*)1);
2636 redisAssert(retval
== DICT_OK
);
2643 /* Check if the nul-terminated string 's' can be represented by a long
2644 * (that is, is a number that fits into long without any other space or
2645 * character before or after the digits).
2647 * If so, the function returns REDIS_OK and *longval is set to the value
2648 * of the number. Otherwise REDIS_ERR is returned */
2649 static int isStringRepresentableAsLong(sds s
, long *longval
) {
2650 char buf
[32], *endptr
;
2654 value
= strtol(s
, &endptr
, 10);
2655 if (endptr
[0] != '\0') return REDIS_ERR
;
2656 slen
= snprintf(buf
,32,"%ld",value
);
2658 /* If the number converted back into a string is not identical
2659 * then it's not possible to encode the string as integer */
2660 if (sdslen(s
) != (unsigned)slen
|| memcmp(buf
,s
,slen
)) return REDIS_ERR
;
2661 if (longval
) *longval
= value
;
2665 /* Try to encode a string object in order to save space */
2666 static int tryObjectEncoding(robj
*o
) {
2670 if (o
->encoding
!= REDIS_ENCODING_RAW
)
2671 return REDIS_ERR
; /* Already encoded */
2673 /* It's not save to encode shared objects: shared objects can be shared
2674 * everywhere in the "object space" of Redis. Encoded objects can only
2675 * appear as "values" (and not, for instance, as keys) */
2676 if (o
->refcount
> 1) return REDIS_ERR
;
2678 /* Currently we try to encode only strings */
2679 redisAssert(o
->type
== REDIS_STRING
);
2681 /* Check if we can represent this string as a long integer */
2682 if (isStringRepresentableAsLong(s
,&value
) == REDIS_ERR
) return REDIS_ERR
;
2684 /* Ok, this object can be encoded */
2685 o
->encoding
= REDIS_ENCODING_INT
;
2687 o
->ptr
= (void*) value
;
2691 /* Get a decoded version of an encoded object (returned as a new object).
2692 * If the object is already raw-encoded just increment the ref count. */
2693 static robj
*getDecodedObject(robj
*o
) {
2696 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2700 if (o
->type
== REDIS_STRING
&& o
->encoding
== REDIS_ENCODING_INT
) {
2703 snprintf(buf
,32,"%ld",(long)o
->ptr
);
2704 dec
= createStringObject(buf
,strlen(buf
));
2707 redisAssert(1 != 1);
2711 /* Compare two string objects via strcmp() or alike.
2712 * Note that the objects may be integer-encoded. In such a case we
2713 * use snprintf() to get a string representation of the numbers on the stack
2714 * and compare the strings, it's much faster than calling getDecodedObject().
2716 * Important note: if objects are not integer encoded, but binary-safe strings,
2717 * sdscmp() from sds.c will apply memcmp() so this function ca be considered
2719 static int compareStringObjects(robj
*a
, robj
*b
) {
2720 redisAssert(a
->type
== REDIS_STRING
&& b
->type
== REDIS_STRING
);
2721 char bufa
[128], bufb
[128], *astr
, *bstr
;
2724 if (a
== b
) return 0;
2725 if (a
->encoding
!= REDIS_ENCODING_RAW
) {
2726 snprintf(bufa
,sizeof(bufa
),"%ld",(long) a
->ptr
);
2732 if (b
->encoding
!= REDIS_ENCODING_RAW
) {
2733 snprintf(bufb
,sizeof(bufb
),"%ld",(long) b
->ptr
);
2739 return bothsds
? sdscmp(astr
,bstr
) : strcmp(astr
,bstr
);
2742 static size_t stringObjectLen(robj
*o
) {
2743 redisAssert(o
->type
== REDIS_STRING
);
2744 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2745 return sdslen(o
->ptr
);
2749 return snprintf(buf
,32,"%ld",(long)o
->ptr
);
2753 /*============================ RDB saving/loading =========================== */
2755 static int rdbSaveType(FILE *fp
, unsigned char type
) {
2756 if (fwrite(&type
,1,1,fp
) == 0) return -1;
2760 static int rdbSaveTime(FILE *fp
, time_t t
) {
2761 int32_t t32
= (int32_t) t
;
2762 if (fwrite(&t32
,4,1,fp
) == 0) return -1;
2766 /* check rdbLoadLen() comments for more info */
2767 static int rdbSaveLen(FILE *fp
, uint32_t len
) {
2768 unsigned char buf
[2];
2771 /* Save a 6 bit len */
2772 buf
[0] = (len
&0xFF)|(REDIS_RDB_6BITLEN
<<6);
2773 if (fwrite(buf
,1,1,fp
) == 0) return -1;
2774 } else if (len
< (1<<14)) {
2775 /* Save a 14 bit len */
2776 buf
[0] = ((len
>>8)&0xFF)|(REDIS_RDB_14BITLEN
<<6);
2778 if (fwrite(buf
,2,1,fp
) == 0) return -1;
2780 /* Save a 32 bit len */
2781 buf
[0] = (REDIS_RDB_32BITLEN
<<6);
2782 if (fwrite(buf
,1,1,fp
) == 0) return -1;
2784 if (fwrite(&len
,4,1,fp
) == 0) return -1;
2789 /* String objects in the form "2391" "-100" without any space and with a
2790 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2791 * encoded as integers to save space */
2792 static int rdbTryIntegerEncoding(sds s
, unsigned char *enc
) {
2794 char *endptr
, buf
[32];
2796 /* Check if it's possible to encode this value as a number */
2797 value
= strtoll(s
, &endptr
, 10);
2798 if (endptr
[0] != '\0') return 0;
2799 snprintf(buf
,32,"%lld",value
);
2801 /* If the number converted back into a string is not identical
2802 * then it's not possible to encode the string as integer */
2803 if (strlen(buf
) != sdslen(s
) || memcmp(buf
,s
,sdslen(s
))) return 0;
2805 /* Finally check if it fits in our ranges */
2806 if (value
>= -(1<<7) && value
<= (1<<7)-1) {
2807 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT8
;
2808 enc
[1] = value
&0xFF;
2810 } else if (value
>= -(1<<15) && value
<= (1<<15)-1) {
2811 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT16
;
2812 enc
[1] = value
&0xFF;
2813 enc
[2] = (value
>>8)&0xFF;
2815 } else if (value
>= -((long long)1<<31) && value
<= ((long long)1<<31)-1) {
2816 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT32
;
2817 enc
[1] = value
&0xFF;
2818 enc
[2] = (value
>>8)&0xFF;
2819 enc
[3] = (value
>>16)&0xFF;
2820 enc
[4] = (value
>>24)&0xFF;
2827 static int rdbSaveLzfStringObject(FILE *fp
, robj
*obj
) {
2828 unsigned int comprlen
, outlen
;
2832 /* We require at least four bytes compression for this to be worth it */
2833 outlen
= sdslen(obj
->ptr
)-4;
2834 if (outlen
<= 0) return 0;
2835 if ((out
= zmalloc(outlen
+1)) == NULL
) return 0;
2836 comprlen
= lzf_compress(obj
->ptr
, sdslen(obj
->ptr
), out
, outlen
);
2837 if (comprlen
== 0) {
2841 /* Data compressed! Let's save it on disk */
2842 byte
= (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_LZF
;
2843 if (fwrite(&byte
,1,1,fp
) == 0) goto writeerr
;
2844 if (rdbSaveLen(fp
,comprlen
) == -1) goto writeerr
;
2845 if (rdbSaveLen(fp
,sdslen(obj
->ptr
)) == -1) goto writeerr
;
2846 if (fwrite(out
,comprlen
,1,fp
) == 0) goto writeerr
;
2855 /* Save a string objet as [len][data] on disk. If the object is a string
2856 * representation of an integer value we try to safe it in a special form */
2857 static int rdbSaveStringObjectRaw(FILE *fp
, robj
*obj
) {
2861 len
= sdslen(obj
->ptr
);
2863 /* Try integer encoding */
2865 unsigned char buf
[5];
2866 if ((enclen
= rdbTryIntegerEncoding(obj
->ptr
,buf
)) > 0) {
2867 if (fwrite(buf
,enclen
,1,fp
) == 0) return -1;
2872 /* Try LZF compression - under 20 bytes it's unable to compress even
2873 * aaaaaaaaaaaaaaaaaa so skip it */
2874 if (server
.rdbcompression
&& len
> 20) {
2877 retval
= rdbSaveLzfStringObject(fp
,obj
);
2878 if (retval
== -1) return -1;
2879 if (retval
> 0) return 0;
2880 /* retval == 0 means data can't be compressed, save the old way */
2883 /* Store verbatim */
2884 if (rdbSaveLen(fp
,len
) == -1) return -1;
2885 if (len
&& fwrite(obj
->ptr
,len
,1,fp
) == 0) return -1;
2889 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2890 static int rdbSaveStringObject(FILE *fp
, robj
*obj
) {
2893 /* Avoid incr/decr ref count business when possible.
2894 * This plays well with copy-on-write given that we are probably
2895 * in a child process (BGSAVE). Also this makes sure key objects
2896 * of swapped objects are not incRefCount-ed (an assert does not allow
2897 * this in order to avoid bugs) */
2898 if (obj
->encoding
!= REDIS_ENCODING_RAW
) {
2899 obj
= getDecodedObject(obj
);
2900 retval
= rdbSaveStringObjectRaw(fp
,obj
);
2903 retval
= rdbSaveStringObjectRaw(fp
,obj
);
2908 /* Save a double value. Doubles are saved as strings prefixed by an unsigned
2909 * 8 bit integer specifing the length of the representation.
2910 * This 8 bit integer has special values in order to specify the following
2916 static int rdbSaveDoubleValue(FILE *fp
, double val
) {
2917 unsigned char buf
[128];
2923 } else if (!isfinite(val
)) {
2925 buf
[0] = (val
< 0) ? 255 : 254;
2927 snprintf((char*)buf
+1,sizeof(buf
)-1,"%.17g",val
);
2928 buf
[0] = strlen((char*)buf
+1);
2931 if (fwrite(buf
,len
,1,fp
) == 0) return -1;
2935 /* Save a Redis object. */
2936 static int rdbSaveObject(FILE *fp
, robj
*o
) {
2937 if (o
->type
== REDIS_STRING
) {
2938 /* Save a string value */
2939 if (rdbSaveStringObject(fp
,o
) == -1) return -1;
2940 } else if (o
->type
== REDIS_LIST
) {
2941 /* Save a list value */
2942 list
*list
= o
->ptr
;
2946 if (rdbSaveLen(fp
,listLength(list
)) == -1) return -1;
2947 listRewind(list
,&li
);
2948 while((ln
= listNext(&li
))) {
2949 robj
*eleobj
= listNodeValue(ln
);
2951 if (rdbSaveStringObject(fp
,eleobj
) == -1) return -1;
2953 } else if (o
->type
== REDIS_SET
) {
2954 /* Save a set value */
2956 dictIterator
*di
= dictGetIterator(set
);
2959 if (rdbSaveLen(fp
,dictSize(set
)) == -1) return -1;
2960 while((de
= dictNext(di
)) != NULL
) {
2961 robj
*eleobj
= dictGetEntryKey(de
);
2963 if (rdbSaveStringObject(fp
,eleobj
) == -1) return -1;
2965 dictReleaseIterator(di
);
2966 } else if (o
->type
== REDIS_ZSET
) {
2967 /* Save a set value */
2969 dictIterator
*di
= dictGetIterator(zs
->dict
);
2972 if (rdbSaveLen(fp
,dictSize(zs
->dict
)) == -1) return -1;
2973 while((de
= dictNext(di
)) != NULL
) {
2974 robj
*eleobj
= dictGetEntryKey(de
);
2975 double *score
= dictGetEntryVal(de
);
2977 if (rdbSaveStringObject(fp
,eleobj
) == -1) return -1;
2978 if (rdbSaveDoubleValue(fp
,*score
) == -1) return -1;
2980 dictReleaseIterator(di
);
2982 redisAssert(0 != 0);
2987 /* Return the length the object will have on disk if saved with
2988 * the rdbSaveObject() function. Currently we use a trick to get
2989 * this length with very little changes to the code. In the future
2990 * we could switch to a faster solution. */
2991 static off_t
rdbSavedObjectLen(robj
*o
, FILE *fp
) {
2992 if (fp
== NULL
) fp
= server
.devnull
;
2994 assert(rdbSaveObject(fp
,o
) != 1);
2998 /* Return the number of pages required to save this object in the swap file */
2999 static off_t
rdbSavedObjectPages(robj
*o
, FILE *fp
) {
3000 off_t bytes
= rdbSavedObjectLen(o
,fp
);
3002 return (bytes
+(server
.vm_page_size
-1))/server
.vm_page_size
;
3005 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
3006 static int rdbSave(char *filename
) {
3007 dictIterator
*di
= NULL
;
3012 time_t now
= time(NULL
);
3014 waitEmptyIOJobsQueue(); /* Otherwise other threads may fseek() the swap */
3015 snprintf(tmpfile
,256,"temp-%d.rdb", (int) getpid());
3016 fp
= fopen(tmpfile
,"w");
3018 redisLog(REDIS_WARNING
, "Failed saving the DB: %s", strerror(errno
));
3021 if (fwrite("REDIS0001",9,1,fp
) == 0) goto werr
;
3022 for (j
= 0; j
< server
.dbnum
; j
++) {
3023 redisDb
*db
= server
.db
+j
;
3025 if (dictSize(d
) == 0) continue;
3026 di
= dictGetIterator(d
);
3032 /* Write the SELECT DB opcode */
3033 if (rdbSaveType(fp
,REDIS_SELECTDB
) == -1) goto werr
;
3034 if (rdbSaveLen(fp
,j
) == -1) goto werr
;
3036 /* Iterate this DB writing every entry */
3037 while((de
= dictNext(di
)) != NULL
) {
3038 robj
*key
= dictGetEntryKey(de
);
3039 robj
*o
= dictGetEntryVal(de
);
3040 time_t expiretime
= getExpire(db
,key
);
3042 /* Save the expire time */
3043 if (expiretime
!= -1) {
3044 /* If this key is already expired skip it */
3045 if (expiretime
< now
) continue;
3046 if (rdbSaveType(fp
,REDIS_EXPIRETIME
) == -1) goto werr
;
3047 if (rdbSaveTime(fp
,expiretime
) == -1) goto werr
;
3049 /* Save the key and associated value. This requires special
3050 * handling if the value is swapped out. */
3051 if (!server
.vm_enabled
|| key
->storage
== REDIS_VM_MEMORY
||
3052 key
->storage
== REDIS_VM_SWAPPING
) {
3053 /* Save type, key, value */
3054 if (rdbSaveType(fp
,o
->type
) == -1) goto werr
;
3055 if (rdbSaveStringObject(fp
,key
) == -1) goto werr
;
3056 if (rdbSaveObject(fp
,o
) == -1) goto werr
;
3058 /* REDIS_VM_SWAPPED or REDIS_VM_LOADING */
3060 /* Get a preview of the object in memory */
3061 po
= vmPreviewObject(key
);
3062 /* Save type, key, value */
3063 if (rdbSaveType(fp
,key
->vtype
) == -1) goto werr
;
3064 if (rdbSaveStringObject(fp
,key
) == -1) goto werr
;
3065 if (rdbSaveObject(fp
,po
) == -1) goto werr
;
3066 /* Remove the loaded object from memory */
3070 dictReleaseIterator(di
);
3073 if (rdbSaveType(fp
,REDIS_EOF
) == -1) goto werr
;
3075 /* Make sure data will not remain on the OS's output buffers */
3080 /* Use RENAME to make sure the DB file is changed atomically only
3081 * if the generate DB file is ok. */
3082 if (rename(tmpfile
,filename
) == -1) {
3083 redisLog(REDIS_WARNING
,"Error moving temp DB file on the final destination: %s", strerror(errno
));
3087 redisLog(REDIS_NOTICE
,"DB saved on disk");
3089 server
.lastsave
= time(NULL
);
3095 redisLog(REDIS_WARNING
,"Write error saving DB on disk: %s", strerror(errno
));
3096 if (di
) dictReleaseIterator(di
);
3100 static int rdbSaveBackground(char *filename
) {
3103 if (server
.bgsavechildpid
!= -1) return REDIS_ERR
;
3104 if (server
.vm_enabled
) waitEmptyIOJobsQueue();
3105 if ((childpid
= fork()) == 0) {
3107 if (server
.vm_enabled
) vmReopenSwapFile();
3109 if (rdbSave(filename
) == REDIS_OK
) {
3116 if (childpid
== -1) {
3117 redisLog(REDIS_WARNING
,"Can't save in background: fork: %s",
3121 redisLog(REDIS_NOTICE
,"Background saving started by pid %d",childpid
);
3122 server
.bgsavechildpid
= childpid
;
3125 return REDIS_OK
; /* unreached */
3128 static void rdbRemoveTempFile(pid_t childpid
) {
3131 snprintf(tmpfile
,256,"temp-%d.rdb", (int) childpid
);
3135 static int rdbLoadType(FILE *fp
) {
3137 if (fread(&type
,1,1,fp
) == 0) return -1;
3141 static time_t rdbLoadTime(FILE *fp
) {
3143 if (fread(&t32
,4,1,fp
) == 0) return -1;
3144 return (time_t) t32
;
3147 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
3148 * of this file for a description of how this are stored on disk.
3150 * isencoded is set to 1 if the readed length is not actually a length but
3151 * an "encoding type", check the above comments for more info */
3152 static uint32_t rdbLoadLen(FILE *fp
, int *isencoded
) {
3153 unsigned char buf
[2];
3157 if (isencoded
) *isencoded
= 0;
3158 if (fread(buf
,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
3159 type
= (buf
[0]&0xC0)>>6;
3160 if (type
== REDIS_RDB_6BITLEN
) {
3161 /* Read a 6 bit len */
3163 } else if (type
== REDIS_RDB_ENCVAL
) {
3164 /* Read a 6 bit len encoding type */
3165 if (isencoded
) *isencoded
= 1;
3167 } else if (type
== REDIS_RDB_14BITLEN
) {
3168 /* Read a 14 bit len */
3169 if (fread(buf
+1,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
3170 return ((buf
[0]&0x3F)<<8)|buf
[1];
3172 /* Read a 32 bit len */
3173 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
3178 static robj
*rdbLoadIntegerObject(FILE *fp
, int enctype
) {
3179 unsigned char enc
[4];
3182 if (enctype
== REDIS_RDB_ENC_INT8
) {
3183 if (fread(enc
,1,1,fp
) == 0) return NULL
;
3184 val
= (signed char)enc
[0];
3185 } else if (enctype
== REDIS_RDB_ENC_INT16
) {
3187 if (fread(enc
,2,1,fp
) == 0) return NULL
;
3188 v
= enc
[0]|(enc
[1]<<8);
3190 } else if (enctype
== REDIS_RDB_ENC_INT32
) {
3192 if (fread(enc
,4,1,fp
) == 0) return NULL
;
3193 v
= enc
[0]|(enc
[1]<<8)|(enc
[2]<<16)|(enc
[3]<<24);
3196 val
= 0; /* anti-warning */
3199 return createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",val
));
3202 static robj
*rdbLoadLzfStringObject(FILE*fp
) {
3203 unsigned int len
, clen
;
3204 unsigned char *c
= NULL
;
3207 if ((clen
= rdbLoadLen(fp
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
3208 if ((len
= rdbLoadLen(fp
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
3209 if ((c
= zmalloc(clen
)) == NULL
) goto err
;
3210 if ((val
= sdsnewlen(NULL
,len
)) == NULL
) goto err
;
3211 if (fread(c
,clen
,1,fp
) == 0) goto err
;
3212 if (lzf_decompress(c
,clen
,val
,len
) == 0) goto err
;
3214 return createObject(REDIS_STRING
,val
);
3221 static robj
*rdbLoadStringObject(FILE*fp
) {
3226 len
= rdbLoadLen(fp
,&isencoded
);
3229 case REDIS_RDB_ENC_INT8
:
3230 case REDIS_RDB_ENC_INT16
:
3231 case REDIS_RDB_ENC_INT32
:
3232 return tryObjectSharing(rdbLoadIntegerObject(fp
,len
));
3233 case REDIS_RDB_ENC_LZF
:
3234 return tryObjectSharing(rdbLoadLzfStringObject(fp
));
3240 if (len
== REDIS_RDB_LENERR
) return NULL
;
3241 val
= sdsnewlen(NULL
,len
);
3242 if (len
&& fread(val
,len
,1,fp
) == 0) {
3246 return tryObjectSharing(createObject(REDIS_STRING
,val
));
3249 /* For information about double serialization check rdbSaveDoubleValue() */
3250 static int rdbLoadDoubleValue(FILE *fp
, double *val
) {
3254 if (fread(&len
,1,1,fp
) == 0) return -1;
3256 case 255: *val
= R_NegInf
; return 0;
3257 case 254: *val
= R_PosInf
; return 0;
3258 case 253: *val
= R_Nan
; return 0;
3260 if (fread(buf
,len
,1,fp
) == 0) return -1;
3262 sscanf(buf
, "%lg", val
);
3267 /* Load a Redis object of the specified type from the specified file.
3268 * On success a newly allocated object is returned, otherwise NULL. */
3269 static robj
*rdbLoadObject(int type
, FILE *fp
) {
3272 if (type
== REDIS_STRING
) {
3273 /* Read string value */
3274 if ((o
= rdbLoadStringObject(fp
)) == NULL
) return NULL
;
3275 tryObjectEncoding(o
);
3276 } else if (type
== REDIS_LIST
|| type
== REDIS_SET
) {
3277 /* Read list/set value */
3280 if ((listlen
= rdbLoadLen(fp
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
3281 o
= (type
== REDIS_LIST
) ? createListObject() : createSetObject();
3282 /* Load every single element of the list/set */
3286 if ((ele
= rdbLoadStringObject(fp
)) == NULL
) return NULL
;
3287 tryObjectEncoding(ele
);
3288 if (type
== REDIS_LIST
) {
3289 listAddNodeTail((list
*)o
->ptr
,ele
);
3291 dictAdd((dict
*)o
->ptr
,ele
,NULL
);
3294 } else if (type
== REDIS_ZSET
) {
3295 /* Read list/set value */
3299 if ((zsetlen
= rdbLoadLen(fp
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
3300 o
= createZsetObject();
3302 /* Load every single element of the list/set */
3305 double *score
= zmalloc(sizeof(double));
3307 if ((ele
= rdbLoadStringObject(fp
)) == NULL
) return NULL
;
3308 tryObjectEncoding(ele
);
3309 if (rdbLoadDoubleValue(fp
,score
) == -1) return NULL
;
3310 dictAdd(zs
->dict
,ele
,score
);
3311 zslInsert(zs
->zsl
,*score
,ele
);
3312 incrRefCount(ele
); /* added to skiplist */
3315 redisAssert(0 != 0);
3320 static int rdbLoad(char *filename
) {
3322 robj
*keyobj
= NULL
;
3324 int type
, retval
, rdbver
;
3325 dict
*d
= server
.db
[0].dict
;
3326 redisDb
*db
= server
.db
+0;
3328 time_t expiretime
= -1, now
= time(NULL
);
3329 long long loadedkeys
= 0;
3331 fp
= fopen(filename
,"r");
3332 if (!fp
) return REDIS_ERR
;
3333 if (fread(buf
,9,1,fp
) == 0) goto eoferr
;
3335 if (memcmp(buf
,"REDIS",5) != 0) {
3337 redisLog(REDIS_WARNING
,"Wrong signature trying to load DB from file");
3340 rdbver
= atoi(buf
+5);
3343 redisLog(REDIS_WARNING
,"Can't handle RDB format version %d",rdbver
);
3350 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
3351 if (type
== REDIS_EXPIRETIME
) {
3352 if ((expiretime
= rdbLoadTime(fp
)) == -1) goto eoferr
;
3353 /* We read the time so we need to read the object type again */
3354 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
3356 if (type
== REDIS_EOF
) break;
3357 /* Handle SELECT DB opcode as a special case */
3358 if (type
== REDIS_SELECTDB
) {
3359 if ((dbid
= rdbLoadLen(fp
,NULL
)) == REDIS_RDB_LENERR
)
3361 if (dbid
>= (unsigned)server
.dbnum
) {
3362 redisLog(REDIS_WARNING
,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server
.dbnum
);
3365 db
= server
.db
+dbid
;
3370 if ((keyobj
= rdbLoadStringObject(fp
)) == NULL
) goto eoferr
;
3372 if ((o
= rdbLoadObject(type
,fp
)) == NULL
) goto eoferr
;
3373 /* Add the new object in the hash table */
3374 retval
= dictAdd(d
,keyobj
,o
);
3375 if (retval
== DICT_ERR
) {
3376 redisLog(REDIS_WARNING
,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj
->ptr
);
3379 /* Set the expire time if needed */
3380 if (expiretime
!= -1) {
3381 setExpire(db
,keyobj
,expiretime
);
3382 /* Delete this key if already expired */
3383 if (expiretime
< now
) deleteKey(db
,keyobj
);
3387 /* Handle swapping while loading big datasets when VM is on */
3389 if (server
.vm_enabled
&& (loadedkeys
% 5000) == 0) {
3390 while (zmalloc_used_memory() > server
.vm_max_memory
) {
3391 if (vmSwapOneObjectBlocking() == REDIS_ERR
) break;
3398 eoferr
: /* unexpected end of file is handled here with a fatal exit */
3399 if (keyobj
) decrRefCount(keyobj
);
3400 redisLog(REDIS_WARNING
,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
3402 return REDIS_ERR
; /* Just to avoid warning */
3405 /*================================== Commands =============================== */
3407 static void authCommand(redisClient
*c
) {
3408 if (!server
.requirepass
|| !strcmp(c
->argv
[1]->ptr
, server
.requirepass
)) {
3409 c
->authenticated
= 1;
3410 addReply(c
,shared
.ok
);
3412 c
->authenticated
= 0;
3413 addReplySds(c
,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
3417 static void pingCommand(redisClient
*c
) {
3418 addReply(c
,shared
.pong
);
3421 static void echoCommand(redisClient
*c
) {
3422 addReplyBulkLen(c
,c
->argv
[1]);
3423 addReply(c
,c
->argv
[1]);
3424 addReply(c
,shared
.crlf
);
3427 /*=================================== Strings =============================== */
3429 static void setGenericCommand(redisClient
*c
, int nx
) {
3432 if (nx
) deleteIfVolatile(c
->db
,c
->argv
[1]);
3433 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
3434 if (retval
== DICT_ERR
) {
3436 /* If the key is about a swapped value, we want a new key object
3437 * to overwrite the old. So we delete the old key in the database.
3438 * This will also make sure that swap pages about the old object
3439 * will be marked as free. */
3440 if (deleteIfSwapped(c
->db
,c
->argv
[1]))
3441 incrRefCount(c
->argv
[1]);
3442 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
3443 incrRefCount(c
->argv
[2]);
3445 addReply(c
,shared
.czero
);
3449 incrRefCount(c
->argv
[1]);
3450 incrRefCount(c
->argv
[2]);
3453 removeExpire(c
->db
,c
->argv
[1]);
3454 addReply(c
, nx
? shared
.cone
: shared
.ok
);
3457 static void setCommand(redisClient
*c
) {
3458 setGenericCommand(c
,0);
3461 static void setnxCommand(redisClient
*c
) {
3462 setGenericCommand(c
,1);
3465 static int getGenericCommand(redisClient
*c
) {
3466 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3469 addReply(c
,shared
.nullbulk
);
3472 if (o
->type
!= REDIS_STRING
) {
3473 addReply(c
,shared
.wrongtypeerr
);
3476 addReplyBulkLen(c
,o
);
3478 addReply(c
,shared
.crlf
);
3484 static void getCommand(redisClient
*c
) {
3485 getGenericCommand(c
);
3488 static void getsetCommand(redisClient
*c
) {
3489 if (getGenericCommand(c
) == REDIS_ERR
) return;
3490 if (dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]) == DICT_ERR
) {
3491 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
3493 incrRefCount(c
->argv
[1]);
3495 incrRefCount(c
->argv
[2]);
3497 removeExpire(c
->db
,c
->argv
[1]);
3500 static void mgetCommand(redisClient
*c
) {
3503 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",c
->argc
-1));
3504 for (j
= 1; j
< c
->argc
; j
++) {
3505 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[j
]);
3507 addReply(c
,shared
.nullbulk
);
3509 if (o
->type
!= REDIS_STRING
) {
3510 addReply(c
,shared
.nullbulk
);
3512 addReplyBulkLen(c
,o
);
3514 addReply(c
,shared
.crlf
);
3520 static void msetGenericCommand(redisClient
*c
, int nx
) {
3521 int j
, busykeys
= 0;
3523 if ((c
->argc
% 2) == 0) {
3524 addReplySds(c
,sdsnew("-ERR wrong number of arguments for MSET\r\n"));
3527 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
3528 * set nothing at all if at least one already key exists. */
3530 for (j
= 1; j
< c
->argc
; j
+= 2) {
3531 if (lookupKeyWrite(c
->db
,c
->argv
[j
]) != NULL
) {
3537 addReply(c
, shared
.czero
);
3541 for (j
= 1; j
< c
->argc
; j
+= 2) {
3544 tryObjectEncoding(c
->argv
[j
+1]);
3545 retval
= dictAdd(c
->db
->dict
,c
->argv
[j
],c
->argv
[j
+1]);
3546 if (retval
== DICT_ERR
) {
3547 dictReplace(c
->db
->dict
,c
->argv
[j
],c
->argv
[j
+1]);
3548 incrRefCount(c
->argv
[j
+1]);
3550 incrRefCount(c
->argv
[j
]);
3551 incrRefCount(c
->argv
[j
+1]);
3553 removeExpire(c
->db
,c
->argv
[j
]);
3555 server
.dirty
+= (c
->argc
-1)/2;
3556 addReply(c
, nx
? shared
.cone
: shared
.ok
);
3559 static void msetCommand(redisClient
*c
) {
3560 msetGenericCommand(c
,0);
3563 static void msetnxCommand(redisClient
*c
) {
3564 msetGenericCommand(c
,1);
3567 static void incrDecrCommand(redisClient
*c
, long long incr
) {
3572 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3576 if (o
->type
!= REDIS_STRING
) {
3581 if (o
->encoding
== REDIS_ENCODING_RAW
)
3582 value
= strtoll(o
->ptr
, &eptr
, 10);
3583 else if (o
->encoding
== REDIS_ENCODING_INT
)
3584 value
= (long)o
->ptr
;
3586 redisAssert(1 != 1);
3591 o
= createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",value
));
3592 tryObjectEncoding(o
);
3593 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],o
);
3594 if (retval
== DICT_ERR
) {
3595 dictReplace(c
->db
->dict
,c
->argv
[1],o
);
3596 removeExpire(c
->db
,c
->argv
[1]);
3598 incrRefCount(c
->argv
[1]);
3601 addReply(c
,shared
.colon
);
3603 addReply(c
,shared
.crlf
);
3606 static void incrCommand(redisClient
*c
) {
3607 incrDecrCommand(c
,1);
3610 static void decrCommand(redisClient
*c
) {
3611 incrDecrCommand(c
,-1);
3614 static void incrbyCommand(redisClient
*c
) {
3615 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
3616 incrDecrCommand(c
,incr
);
3619 static void decrbyCommand(redisClient
*c
) {
3620 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
3621 incrDecrCommand(c
,-incr
);
3624 /* ========================= Type agnostic commands ========================= */
3626 static void delCommand(redisClient
*c
) {
3629 for (j
= 1; j
< c
->argc
; j
++) {
3630 if (deleteKey(c
->db
,c
->argv
[j
])) {
3637 addReply(c
,shared
.czero
);
3640 addReply(c
,shared
.cone
);
3643 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",deleted
));
3648 static void existsCommand(redisClient
*c
) {
3649 addReply(c
,lookupKeyRead(c
->db
,c
->argv
[1]) ? shared
.cone
: shared
.czero
);
3652 static void selectCommand(redisClient
*c
) {
3653 int id
= atoi(c
->argv
[1]->ptr
);
3655 if (selectDb(c
,id
) == REDIS_ERR
) {
3656 addReplySds(c
,sdsnew("-ERR invalid DB index\r\n"));
3658 addReply(c
,shared
.ok
);
3662 static void randomkeyCommand(redisClient
*c
) {
3666 de
= dictGetRandomKey(c
->db
->dict
);
3667 if (!de
|| expireIfNeeded(c
->db
,dictGetEntryKey(de
)) == 0) break;
3670 addReply(c
,shared
.plus
);
3671 addReply(c
,shared
.crlf
);
3673 addReply(c
,shared
.plus
);
3674 addReply(c
,dictGetEntryKey(de
));
3675 addReply(c
,shared
.crlf
);
3679 static void keysCommand(redisClient
*c
) {
3682 sds pattern
= c
->argv
[1]->ptr
;
3683 int plen
= sdslen(pattern
);
3684 unsigned long numkeys
= 0, keyslen
= 0;
3685 robj
*lenobj
= createObject(REDIS_STRING
,NULL
);
3687 di
= dictGetIterator(c
->db
->dict
);
3689 decrRefCount(lenobj
);
3690 while((de
= dictNext(di
)) != NULL
) {
3691 robj
*keyobj
= dictGetEntryKey(de
);
3693 sds key
= keyobj
->ptr
;
3694 if ((pattern
[0] == '*' && pattern
[1] == '\0') ||
3695 stringmatchlen(pattern
,plen
,key
,sdslen(key
),0)) {
3696 if (expireIfNeeded(c
->db
,keyobj
) == 0) {
3698 addReply(c
,shared
.space
);
3701 keyslen
+= sdslen(key
);
3705 dictReleaseIterator(di
);
3706 lenobj
->ptr
= sdscatprintf(sdsempty(),"$%lu\r\n",keyslen
+(numkeys
? (numkeys
-1) : 0));
3707 addReply(c
,shared
.crlf
);
3710 static void dbsizeCommand(redisClient
*c
) {
3712 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c
->db
->dict
)));
3715 static void lastsaveCommand(redisClient
*c
) {
3717 sdscatprintf(sdsempty(),":%lu\r\n",server
.lastsave
));
3720 static void typeCommand(redisClient
*c
) {
3724 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3729 case REDIS_STRING
: type
= "+string"; break;
3730 case REDIS_LIST
: type
= "+list"; break;
3731 case REDIS_SET
: type
= "+set"; break;
3732 case REDIS_ZSET
: type
= "+zset"; break;
3733 default: type
= "unknown"; break;
3736 addReplySds(c
,sdsnew(type
));
3737 addReply(c
,shared
.crlf
);
3740 static void saveCommand(redisClient
*c
) {
3741 if (server
.bgsavechildpid
!= -1) {
3742 addReplySds(c
,sdsnew("-ERR background save in progress\r\n"));
3745 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
3746 addReply(c
,shared
.ok
);
3748 addReply(c
,shared
.err
);
3752 static void bgsaveCommand(redisClient
*c
) {
3753 if (server
.bgsavechildpid
!= -1) {
3754 addReplySds(c
,sdsnew("-ERR background save already in progress\r\n"));
3757 if (rdbSaveBackground(server
.dbfilename
) == REDIS_OK
) {
3758 char *status
= "+Background saving started\r\n";
3759 addReplySds(c
,sdsnew(status
));
3761 addReply(c
,shared
.err
);
3765 static void shutdownCommand(redisClient
*c
) {
3766 redisLog(REDIS_WARNING
,"User requested shutdown, saving DB...");
3767 /* Kill the saving child if there is a background saving in progress.
3768 We want to avoid race conditions, for instance our saving child may
3769 overwrite the synchronous saving did by SHUTDOWN. */
3770 if (server
.bgsavechildpid
!= -1) {
3771 redisLog(REDIS_WARNING
,"There is a live saving child. Killing it!");
3772 kill(server
.bgsavechildpid
,SIGKILL
);
3773 rdbRemoveTempFile(server
.bgsavechildpid
);
3775 if (server
.appendonly
) {
3776 /* Append only file: fsync() the AOF and exit */
3777 fsync(server
.appendfd
);
3778 if (server
.vm_enabled
) unlink(server
.vm_swap_file
);
3781 /* Snapshotting. Perform a SYNC SAVE and exit */
3782 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
3783 if (server
.daemonize
)
3784 unlink(server
.pidfile
);
3785 redisLog(REDIS_WARNING
,"%zu bytes used at exit",zmalloc_used_memory());
3786 redisLog(REDIS_WARNING
,"Server exit now, bye bye...");
3787 if (server
.vm_enabled
) unlink(server
.vm_swap_file
);
3790 /* Ooops.. error saving! The best we can do is to continue operating.
3791 * Note that if there was a background saving process, in the next
3792 * cron() Redis will be notified that the background saving aborted,
3793 * handling special stuff like slaves pending for synchronization... */
3794 redisLog(REDIS_WARNING
,"Error trying to save the DB, can't exit");
3795 addReplySds(c
,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3800 static void renameGenericCommand(redisClient
*c
, int nx
) {
3803 /* To use the same key as src and dst is probably an error */
3804 if (sdscmp(c
->argv
[1]->ptr
,c
->argv
[2]->ptr
) == 0) {
3805 addReply(c
,shared
.sameobjecterr
);
3809 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3811 addReply(c
,shared
.nokeyerr
);
3815 deleteIfVolatile(c
->db
,c
->argv
[2]);
3816 if (dictAdd(c
->db
->dict
,c
->argv
[2],o
) == DICT_ERR
) {
3819 addReply(c
,shared
.czero
);
3822 dictReplace(c
->db
->dict
,c
->argv
[2],o
);
3824 incrRefCount(c
->argv
[2]);
3826 deleteKey(c
->db
,c
->argv
[1]);
3828 addReply(c
,nx
? shared
.cone
: shared
.ok
);
3831 static void renameCommand(redisClient
*c
) {
3832 renameGenericCommand(c
,0);
3835 static void renamenxCommand(redisClient
*c
) {
3836 renameGenericCommand(c
,1);
3839 static void moveCommand(redisClient
*c
) {
3844 /* Obtain source and target DB pointers */
3847 if (selectDb(c
,atoi(c
->argv
[2]->ptr
)) == REDIS_ERR
) {
3848 addReply(c
,shared
.outofrangeerr
);
3852 selectDb(c
,srcid
); /* Back to the source DB */
3854 /* If the user is moving using as target the same
3855 * DB as the source DB it is probably an error. */
3857 addReply(c
,shared
.sameobjecterr
);
3861 /* Check if the element exists and get a reference */
3862 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3864 addReply(c
,shared
.czero
);
3868 /* Try to add the element to the target DB */
3869 deleteIfVolatile(dst
,c
->argv
[1]);
3870 if (dictAdd(dst
->dict
,c
->argv
[1],o
) == DICT_ERR
) {
3871 addReply(c
,shared
.czero
);
3874 incrRefCount(c
->argv
[1]);
3877 /* OK! key moved, free the entry in the source DB */
3878 deleteKey(src
,c
->argv
[1]);
3880 addReply(c
,shared
.cone
);
3883 /* =================================== Lists ================================ */
3884 static void pushGenericCommand(redisClient
*c
, int where
) {
3888 lobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3890 if (handleClientsWaitingListPush(c
,c
->argv
[1],c
->argv
[2])) {
3891 addReply(c
,shared
.ok
);
3894 lobj
= createListObject();
3896 if (where
== REDIS_HEAD
) {
3897 listAddNodeHead(list
,c
->argv
[2]);
3899 listAddNodeTail(list
,c
->argv
[2]);
3901 dictAdd(c
->db
->dict
,c
->argv
[1],lobj
);
3902 incrRefCount(c
->argv
[1]);
3903 incrRefCount(c
->argv
[2]);
3905 if (lobj
->type
!= REDIS_LIST
) {
3906 addReply(c
,shared
.wrongtypeerr
);
3909 if (handleClientsWaitingListPush(c
,c
->argv
[1],c
->argv
[2])) {
3910 addReply(c
,shared
.ok
);
3914 if (where
== REDIS_HEAD
) {
3915 listAddNodeHead(list
,c
->argv
[2]);
3917 listAddNodeTail(list
,c
->argv
[2]);
3919 incrRefCount(c
->argv
[2]);
3922 addReply(c
,shared
.ok
);
3925 static void lpushCommand(redisClient
*c
) {
3926 pushGenericCommand(c
,REDIS_HEAD
);
3929 static void rpushCommand(redisClient
*c
) {
3930 pushGenericCommand(c
,REDIS_TAIL
);
3933 static void llenCommand(redisClient
*c
) {
3937 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3939 addReply(c
,shared
.czero
);
3942 if (o
->type
!= REDIS_LIST
) {
3943 addReply(c
,shared
.wrongtypeerr
);
3946 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",listLength(l
)));
3951 static void lindexCommand(redisClient
*c
) {
3953 int index
= atoi(c
->argv
[2]->ptr
);
3955 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3957 addReply(c
,shared
.nullbulk
);
3959 if (o
->type
!= REDIS_LIST
) {
3960 addReply(c
,shared
.wrongtypeerr
);
3962 list
*list
= o
->ptr
;
3965 ln
= listIndex(list
, index
);
3967 addReply(c
,shared
.nullbulk
);
3969 robj
*ele
= listNodeValue(ln
);
3970 addReplyBulkLen(c
,ele
);
3972 addReply(c
,shared
.crlf
);
3978 static void lsetCommand(redisClient
*c
) {
3980 int index
= atoi(c
->argv
[2]->ptr
);
3982 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3984 addReply(c
,shared
.nokeyerr
);
3986 if (o
->type
!= REDIS_LIST
) {
3987 addReply(c
,shared
.wrongtypeerr
);
3989 list
*list
= o
->ptr
;
3992 ln
= listIndex(list
, index
);
3994 addReply(c
,shared
.outofrangeerr
);
3996 robj
*ele
= listNodeValue(ln
);
3999 listNodeValue(ln
) = c
->argv
[3];
4000 incrRefCount(c
->argv
[3]);
4001 addReply(c
,shared
.ok
);
4008 static void popGenericCommand(redisClient
*c
, int where
) {
4011 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4013 addReply(c
,shared
.nullbulk
);
4015 if (o
->type
!= REDIS_LIST
) {
4016 addReply(c
,shared
.wrongtypeerr
);
4018 list
*list
= o
->ptr
;
4021 if (where
== REDIS_HEAD
)
4022 ln
= listFirst(list
);
4024 ln
= listLast(list
);
4027 addReply(c
,shared
.nullbulk
);
4029 robj
*ele
= listNodeValue(ln
);
4030 addReplyBulkLen(c
,ele
);
4032 addReply(c
,shared
.crlf
);
4033 listDelNode(list
,ln
);
4040 static void lpopCommand(redisClient
*c
) {
4041 popGenericCommand(c
,REDIS_HEAD
);
4044 static void rpopCommand(redisClient
*c
) {
4045 popGenericCommand(c
,REDIS_TAIL
);
4048 static void lrangeCommand(redisClient
*c
) {
4050 int start
= atoi(c
->argv
[2]->ptr
);
4051 int end
= atoi(c
->argv
[3]->ptr
);
4053 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4055 addReply(c
,shared
.nullmultibulk
);
4057 if (o
->type
!= REDIS_LIST
) {
4058 addReply(c
,shared
.wrongtypeerr
);
4060 list
*list
= o
->ptr
;
4062 int llen
= listLength(list
);
4066 /* convert negative indexes */
4067 if (start
< 0) start
= llen
+start
;
4068 if (end
< 0) end
= llen
+end
;
4069 if (start
< 0) start
= 0;
4070 if (end
< 0) end
= 0;
4072 /* indexes sanity checks */
4073 if (start
> end
|| start
>= llen
) {
4074 /* Out of range start or start > end result in empty list */
4075 addReply(c
,shared
.emptymultibulk
);
4078 if (end
>= llen
) end
= llen
-1;
4079 rangelen
= (end
-start
)+1;
4081 /* Return the result in form of a multi-bulk reply */
4082 ln
= listIndex(list
, start
);
4083 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",rangelen
));
4084 for (j
= 0; j
< rangelen
; j
++) {
4085 ele
= listNodeValue(ln
);
4086 addReplyBulkLen(c
,ele
);
4088 addReply(c
,shared
.crlf
);
4095 static void ltrimCommand(redisClient
*c
) {
4097 int start
= atoi(c
->argv
[2]->ptr
);
4098 int end
= atoi(c
->argv
[3]->ptr
);
4100 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4102 addReply(c
,shared
.ok
);
4104 if (o
->type
!= REDIS_LIST
) {
4105 addReply(c
,shared
.wrongtypeerr
);
4107 list
*list
= o
->ptr
;
4109 int llen
= listLength(list
);
4110 int j
, ltrim
, rtrim
;
4112 /* convert negative indexes */
4113 if (start
< 0) start
= llen
+start
;
4114 if (end
< 0) end
= llen
+end
;
4115 if (start
< 0) start
= 0;
4116 if (end
< 0) end
= 0;
4118 /* indexes sanity checks */
4119 if (start
> end
|| start
>= llen
) {
4120 /* Out of range start or start > end result in empty list */
4124 if (end
>= llen
) end
= llen
-1;
4129 /* Remove list elements to perform the trim */
4130 for (j
= 0; j
< ltrim
; j
++) {
4131 ln
= listFirst(list
);
4132 listDelNode(list
,ln
);
4134 for (j
= 0; j
< rtrim
; j
++) {
4135 ln
= listLast(list
);
4136 listDelNode(list
,ln
);
4139 addReply(c
,shared
.ok
);
4144 static void lremCommand(redisClient
*c
) {
4147 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4149 addReply(c
,shared
.czero
);
4151 if (o
->type
!= REDIS_LIST
) {
4152 addReply(c
,shared
.wrongtypeerr
);
4154 list
*list
= o
->ptr
;
4155 listNode
*ln
, *next
;
4156 int toremove
= atoi(c
->argv
[2]->ptr
);
4161 toremove
= -toremove
;
4164 ln
= fromtail
? list
->tail
: list
->head
;
4166 robj
*ele
= listNodeValue(ln
);
4168 next
= fromtail
? ln
->prev
: ln
->next
;
4169 if (compareStringObjects(ele
,c
->argv
[3]) == 0) {
4170 listDelNode(list
,ln
);
4173 if (toremove
&& removed
== toremove
) break;
4177 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",removed
));
4182 /* This is the semantic of this command:
4183 * RPOPLPUSH srclist dstlist:
4184 * IF LLEN(srclist) > 0
4185 * element = RPOP srclist
4186 * LPUSH dstlist element
4193 * The idea is to be able to get an element from a list in a reliable way
4194 * since the element is not just returned but pushed against another list
4195 * as well. This command was originally proposed by Ezra Zygmuntowicz.
4197 static void rpoplpushcommand(redisClient
*c
) {
4200 sobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4202 addReply(c
,shared
.nullbulk
);
4204 if (sobj
->type
!= REDIS_LIST
) {
4205 addReply(c
,shared
.wrongtypeerr
);
4207 list
*srclist
= sobj
->ptr
;
4208 listNode
*ln
= listLast(srclist
);
4211 addReply(c
,shared
.nullbulk
);
4213 robj
*dobj
= lookupKeyWrite(c
->db
,c
->argv
[2]);
4214 robj
*ele
= listNodeValue(ln
);
4217 if (dobj
&& dobj
->type
!= REDIS_LIST
) {
4218 addReply(c
,shared
.wrongtypeerr
);
4222 /* Add the element to the target list (unless it's directly
4223 * passed to some BLPOP-ing client */
4224 if (!handleClientsWaitingListPush(c
,c
->argv
[2],ele
)) {
4226 /* Create the list if the key does not exist */
4227 dobj
= createListObject();
4228 dictAdd(c
->db
->dict
,c
->argv
[2],dobj
);
4229 incrRefCount(c
->argv
[2]);
4231 dstlist
= dobj
->ptr
;
4232 listAddNodeHead(dstlist
,ele
);
4236 /* Send the element to the client as reply as well */
4237 addReplyBulkLen(c
,ele
);
4239 addReply(c
,shared
.crlf
);
4241 /* Finally remove the element from the source list */
4242 listDelNode(srclist
,ln
);
4250 /* ==================================== Sets ================================ */
4252 static void saddCommand(redisClient
*c
) {
4255 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4257 set
= createSetObject();
4258 dictAdd(c
->db
->dict
,c
->argv
[1],set
);
4259 incrRefCount(c
->argv
[1]);
4261 if (set
->type
!= REDIS_SET
) {
4262 addReply(c
,shared
.wrongtypeerr
);
4266 if (dictAdd(set
->ptr
,c
->argv
[2],NULL
) == DICT_OK
) {
4267 incrRefCount(c
->argv
[2]);
4269 addReply(c
,shared
.cone
);
4271 addReply(c
,shared
.czero
);
4275 static void sremCommand(redisClient
*c
) {
4278 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4280 addReply(c
,shared
.czero
);
4282 if (set
->type
!= REDIS_SET
) {
4283 addReply(c
,shared
.wrongtypeerr
);
4286 if (dictDelete(set
->ptr
,c
->argv
[2]) == DICT_OK
) {
4288 if (htNeedsResize(set
->ptr
)) dictResize(set
->ptr
);
4289 addReply(c
,shared
.cone
);
4291 addReply(c
,shared
.czero
);
4296 static void smoveCommand(redisClient
*c
) {
4297 robj
*srcset
, *dstset
;
4299 srcset
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4300 dstset
= lookupKeyWrite(c
->db
,c
->argv
[2]);
4302 /* If the source key does not exist return 0, if it's of the wrong type
4304 if (srcset
== NULL
|| srcset
->type
!= REDIS_SET
) {
4305 addReply(c
, srcset
? shared
.wrongtypeerr
: shared
.czero
);
4308 /* Error if the destination key is not a set as well */
4309 if (dstset
&& dstset
->type
!= REDIS_SET
) {
4310 addReply(c
,shared
.wrongtypeerr
);
4313 /* Remove the element from the source set */
4314 if (dictDelete(srcset
->ptr
,c
->argv
[3]) == DICT_ERR
) {
4315 /* Key not found in the src set! return zero */
4316 addReply(c
,shared
.czero
);
4320 /* Add the element to the destination set */
4322 dstset
= createSetObject();
4323 dictAdd(c
->db
->dict
,c
->argv
[2],dstset
);
4324 incrRefCount(c
->argv
[2]);
4326 if (dictAdd(dstset
->ptr
,c
->argv
[3],NULL
) == DICT_OK
)
4327 incrRefCount(c
->argv
[3]);
4328 addReply(c
,shared
.cone
);
4331 static void sismemberCommand(redisClient
*c
) {
4334 set
= lookupKeyRead(c
->db
,c
->argv
[1]);
4336 addReply(c
,shared
.czero
);
4338 if (set
->type
!= REDIS_SET
) {
4339 addReply(c
,shared
.wrongtypeerr
);
4342 if (dictFind(set
->ptr
,c
->argv
[2]))
4343 addReply(c
,shared
.cone
);
4345 addReply(c
,shared
.czero
);
4349 static void scardCommand(redisClient
*c
) {
4353 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4355 addReply(c
,shared
.czero
);
4358 if (o
->type
!= REDIS_SET
) {
4359 addReply(c
,shared
.wrongtypeerr
);
4362 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",
4368 static void spopCommand(redisClient
*c
) {
4372 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4374 addReply(c
,shared
.nullbulk
);
4376 if (set
->type
!= REDIS_SET
) {
4377 addReply(c
,shared
.wrongtypeerr
);
4380 de
= dictGetRandomKey(set
->ptr
);
4382 addReply(c
,shared
.nullbulk
);
4384 robj
*ele
= dictGetEntryKey(de
);
4386 addReplyBulkLen(c
,ele
);
4388 addReply(c
,shared
.crlf
);
4389 dictDelete(set
->ptr
,ele
);
4390 if (htNeedsResize(set
->ptr
)) dictResize(set
->ptr
);
4396 static void srandmemberCommand(redisClient
*c
) {
4400 set
= lookupKeyRead(c
->db
,c
->argv
[1]);
4402 addReply(c
,shared
.nullbulk
);
4404 if (set
->type
!= REDIS_SET
) {
4405 addReply(c
,shared
.wrongtypeerr
);
4408 de
= dictGetRandomKey(set
->ptr
);
4410 addReply(c
,shared
.nullbulk
);
4412 robj
*ele
= dictGetEntryKey(de
);
4414 addReplyBulkLen(c
,ele
);
4416 addReply(c
,shared
.crlf
);
4421 static int qsortCompareSetsByCardinality(const void *s1
, const void *s2
) {
4422 dict
**d1
= (void*) s1
, **d2
= (void*) s2
;
4424 return dictSize(*d1
)-dictSize(*d2
);
4427 static void sinterGenericCommand(redisClient
*c
, robj
**setskeys
, unsigned long setsnum
, robj
*dstkey
) {
4428 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
4431 robj
*lenobj
= NULL
, *dstset
= NULL
;
4432 unsigned long j
, cardinality
= 0;
4434 for (j
= 0; j
< setsnum
; j
++) {
4438 lookupKeyWrite(c
->db
,setskeys
[j
]) :
4439 lookupKeyRead(c
->db
,setskeys
[j
]);
4443 if (deleteKey(c
->db
,dstkey
))
4445 addReply(c
,shared
.czero
);
4447 addReply(c
,shared
.nullmultibulk
);
4451 if (setobj
->type
!= REDIS_SET
) {
4453 addReply(c
,shared
.wrongtypeerr
);
4456 dv
[j
] = setobj
->ptr
;
4458 /* Sort sets from the smallest to largest, this will improve our
4459 * algorithm's performace */
4460 qsort(dv
,setsnum
,sizeof(dict
*),qsortCompareSetsByCardinality
);
4462 /* The first thing we should output is the total number of elements...
4463 * since this is a multi-bulk write, but at this stage we don't know
4464 * the intersection set size, so we use a trick, append an empty object
4465 * to the output list and save the pointer to later modify it with the
4468 lenobj
= createObject(REDIS_STRING
,NULL
);
4470 decrRefCount(lenobj
);
4472 /* If we have a target key where to store the resulting set
4473 * create this key with an empty set inside */
4474 dstset
= createSetObject();
4477 /* Iterate all the elements of the first (smallest) set, and test
4478 * the element against all the other sets, if at least one set does
4479 * not include the element it is discarded */
4480 di
= dictGetIterator(dv
[0]);
4482 while((de
= dictNext(di
)) != NULL
) {
4485 for (j
= 1; j
< setsnum
; j
++)
4486 if (dictFind(dv
[j
],dictGetEntryKey(de
)) == NULL
) break;
4488 continue; /* at least one set does not contain the member */
4489 ele
= dictGetEntryKey(de
);
4491 addReplyBulkLen(c
,ele
);
4493 addReply(c
,shared
.crlf
);
4496 dictAdd(dstset
->ptr
,ele
,NULL
);
4500 dictReleaseIterator(di
);
4503 /* Store the resulting set into the target */
4504 deleteKey(c
->db
,dstkey
);
4505 dictAdd(c
->db
->dict
,dstkey
,dstset
);
4506 incrRefCount(dstkey
);
4510 lenobj
->ptr
= sdscatprintf(sdsempty(),"*%lu\r\n",cardinality
);
4512 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",
4513 dictSize((dict
*)dstset
->ptr
)));
4519 static void sinterCommand(redisClient
*c
) {
4520 sinterGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
);
4523 static void sinterstoreCommand(redisClient
*c
) {
4524 sinterGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1]);
4527 #define REDIS_OP_UNION 0
4528 #define REDIS_OP_DIFF 1
4530 static void sunionDiffGenericCommand(redisClient
*c
, robj
**setskeys
, int setsnum
, robj
*dstkey
, int op
) {
4531 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
4534 robj
*dstset
= NULL
;
4535 int j
, cardinality
= 0;
4537 for (j
= 0; j
< setsnum
; j
++) {
4541 lookupKeyWrite(c
->db
,setskeys
[j
]) :
4542 lookupKeyRead(c
->db
,setskeys
[j
]);
4547 if (setobj
->type
!= REDIS_SET
) {
4549 addReply(c
,shared
.wrongtypeerr
);
4552 dv
[j
] = setobj
->ptr
;
4555 /* We need a temp set object to store our union. If the dstkey
4556 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
4557 * this set object will be the resulting object to set into the target key*/
4558 dstset
= createSetObject();
4560 /* Iterate all the elements of all the sets, add every element a single
4561 * time to the result set */
4562 for (j
= 0; j
< setsnum
; j
++) {
4563 if (op
== REDIS_OP_DIFF
&& j
== 0 && !dv
[j
]) break; /* result set is empty */
4564 if (!dv
[j
]) continue; /* non existing keys are like empty sets */
4566 di
= dictGetIterator(dv
[j
]);
4568 while((de
= dictNext(di
)) != NULL
) {
4571 /* dictAdd will not add the same element multiple times */
4572 ele
= dictGetEntryKey(de
);
4573 if (op
== REDIS_OP_UNION
|| j
== 0) {
4574 if (dictAdd(dstset
->ptr
,ele
,NULL
) == DICT_OK
) {
4578 } else if (op
== REDIS_OP_DIFF
) {
4579 if (dictDelete(dstset
->ptr
,ele
) == DICT_OK
) {
4584 dictReleaseIterator(di
);
4586 if (op
== REDIS_OP_DIFF
&& cardinality
== 0) break; /* result set is empty */
4589 /* Output the content of the resulting set, if not in STORE mode */
4591 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",cardinality
));
4592 di
= dictGetIterator(dstset
->ptr
);
4593 while((de
= dictNext(di
)) != NULL
) {
4596 ele
= dictGetEntryKey(de
);
4597 addReplyBulkLen(c
,ele
);
4599 addReply(c
,shared
.crlf
);
4601 dictReleaseIterator(di
);
4603 /* If we have a target key where to store the resulting set
4604 * create this key with the result set inside */
4605 deleteKey(c
->db
,dstkey
);
4606 dictAdd(c
->db
->dict
,dstkey
,dstset
);
4607 incrRefCount(dstkey
);
4612 decrRefCount(dstset
);
4614 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",
4615 dictSize((dict
*)dstset
->ptr
)));
4621 static void sunionCommand(redisClient
*c
) {
4622 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_UNION
);
4625 static void sunionstoreCommand(redisClient
*c
) {
4626 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_UNION
);
4629 static void sdiffCommand(redisClient
*c
) {
4630 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_DIFF
);
4633 static void sdiffstoreCommand(redisClient
*c
) {
4634 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_DIFF
);
4637 /* ==================================== ZSets =============================== */
4639 /* ZSETs are ordered sets using two data structures to hold the same elements
4640 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
4643 * The elements are added to an hash table mapping Redis objects to scores.
4644 * At the same time the elements are added to a skip list mapping scores
4645 * to Redis objects (so objects are sorted by scores in this "view"). */
4647 /* This skiplist implementation is almost a C translation of the original
4648 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
4649 * Alternative to Balanced Trees", modified in three ways:
4650 * a) this implementation allows for repeated values.
4651 * b) the comparison is not just by key (our 'score') but by satellite data.
4652 * c) there is a back pointer, so it's a doubly linked list with the back
4653 * pointers being only at "level 1". This allows to traverse the list
4654 * from tail to head, useful for ZREVRANGE. */
4656 static zskiplistNode
*zslCreateNode(int level
, double score
, robj
*obj
) {
4657 zskiplistNode
*zn
= zmalloc(sizeof(*zn
));
4659 zn
->forward
= zmalloc(sizeof(zskiplistNode
*) * level
);
4665 static zskiplist
*zslCreate(void) {
4669 zsl
= zmalloc(sizeof(*zsl
));
4672 zsl
->header
= zslCreateNode(ZSKIPLIST_MAXLEVEL
,0,NULL
);
4673 for (j
= 0; j
< ZSKIPLIST_MAXLEVEL
; j
++)
4674 zsl
->header
->forward
[j
] = NULL
;
4675 zsl
->header
->backward
= NULL
;
4680 static void zslFreeNode(zskiplistNode
*node
) {
4681 decrRefCount(node
->obj
);
4682 zfree(node
->forward
);
4686 static void zslFree(zskiplist
*zsl
) {
4687 zskiplistNode
*node
= zsl
->header
->forward
[0], *next
;
4689 zfree(zsl
->header
->forward
);
4692 next
= node
->forward
[0];
4699 static int zslRandomLevel(void) {
4701 while ((random()&0xFFFF) < (ZSKIPLIST_P
* 0xFFFF))
4706 static void zslInsert(zskiplist
*zsl
, double score
, robj
*obj
) {
4707 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
4711 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4712 while (x
->forward
[i
] &&
4713 (x
->forward
[i
]->score
< score
||
4714 (x
->forward
[i
]->score
== score
&&
4715 compareStringObjects(x
->forward
[i
]->obj
,obj
) < 0)))
4719 /* we assume the key is not already inside, since we allow duplicated
4720 * scores, and the re-insertion of score and redis object should never
4721 * happpen since the caller of zslInsert() should test in the hash table
4722 * if the element is already inside or not. */
4723 level
= zslRandomLevel();
4724 if (level
> zsl
->level
) {
4725 for (i
= zsl
->level
; i
< level
; i
++)
4726 update
[i
] = zsl
->header
;
4729 x
= zslCreateNode(level
,score
,obj
);
4730 for (i
= 0; i
< level
; i
++) {
4731 x
->forward
[i
] = update
[i
]->forward
[i
];
4732 update
[i
]->forward
[i
] = x
;
4734 x
->backward
= (update
[0] == zsl
->header
) ? NULL
: update
[0];
4736 x
->forward
[0]->backward
= x
;
4742 /* Delete an element with matching score/object from the skiplist. */
4743 static int zslDelete(zskiplist
*zsl
, double score
, robj
*obj
) {
4744 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
4748 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4749 while (x
->forward
[i
] &&
4750 (x
->forward
[i
]->score
< score
||
4751 (x
->forward
[i
]->score
== score
&&
4752 compareStringObjects(x
->forward
[i
]->obj
,obj
) < 0)))
4756 /* We may have multiple elements with the same score, what we need
4757 * is to find the element with both the right score and object. */
4759 if (x
&& score
== x
->score
&& compareStringObjects(x
->obj
,obj
) == 0) {
4760 for (i
= 0; i
< zsl
->level
; i
++) {
4761 if (update
[i
]->forward
[i
] != x
) break;
4762 update
[i
]->forward
[i
] = x
->forward
[i
];
4764 if (x
->forward
[0]) {
4765 x
->forward
[0]->backward
= (x
->backward
== zsl
->header
) ?
4768 zsl
->tail
= x
->backward
;
4771 while(zsl
->level
> 1 && zsl
->header
->forward
[zsl
->level
-1] == NULL
)
4776 return 0; /* not found */
4778 return 0; /* not found */
4781 /* Delete all the elements with score between min and max from the skiplist.
4782 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
4783 * Note that this function takes the reference to the hash table view of the
4784 * sorted set, in order to remove the elements from the hash table too. */
4785 static unsigned long zslDeleteRange(zskiplist
*zsl
, double min
, double max
, dict
*dict
) {
4786 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
4787 unsigned long removed
= 0;
4791 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4792 while (x
->forward
[i
] && x
->forward
[i
]->score
< min
)
4796 /* We may have multiple elements with the same score, what we need
4797 * is to find the element with both the right score and object. */
4799 while (x
&& x
->score
<= max
) {
4800 zskiplistNode
*next
;
4802 for (i
= 0; i
< zsl
->level
; i
++) {
4803 if (update
[i
]->forward
[i
] != x
) break;
4804 update
[i
]->forward
[i
] = x
->forward
[i
];
4806 if (x
->forward
[0]) {
4807 x
->forward
[0]->backward
= (x
->backward
== zsl
->header
) ?
4810 zsl
->tail
= x
->backward
;
4812 next
= x
->forward
[0];
4813 dictDelete(dict
,x
->obj
);
4815 while(zsl
->level
> 1 && zsl
->header
->forward
[zsl
->level
-1] == NULL
)
4821 return removed
; /* not found */
4824 /* Find the first node having a score equal or greater than the specified one.
4825 * Returns NULL if there is no match. */
4826 static zskiplistNode
*zslFirstWithScore(zskiplist
*zsl
, double score
) {
4831 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4832 while (x
->forward
[i
] && x
->forward
[i
]->score
< score
)
4835 /* We may have multiple elements with the same score, what we need
4836 * is to find the element with both the right score and object. */
4837 return x
->forward
[0];
4840 /* The actual Z-commands implementations */
4842 /* This generic command implements both ZADD and ZINCRBY.
4843 * scoreval is the score if the operation is a ZADD (doincrement == 0) or
4844 * the increment if the operation is a ZINCRBY (doincrement == 1). */
4845 static void zaddGenericCommand(redisClient
*c
, robj
*key
, robj
*ele
, double scoreval
, int doincrement
) {
4850 zsetobj
= lookupKeyWrite(c
->db
,key
);
4851 if (zsetobj
== NULL
) {
4852 zsetobj
= createZsetObject();
4853 dictAdd(c
->db
->dict
,key
,zsetobj
);
4856 if (zsetobj
->type
!= REDIS_ZSET
) {
4857 addReply(c
,shared
.wrongtypeerr
);
4863 /* Ok now since we implement both ZADD and ZINCRBY here the code
4864 * needs to handle the two different conditions. It's all about setting
4865 * '*score', that is, the new score to set, to the right value. */
4866 score
= zmalloc(sizeof(double));
4870 /* Read the old score. If the element was not present starts from 0 */
4871 de
= dictFind(zs
->dict
,ele
);
4873 double *oldscore
= dictGetEntryVal(de
);
4874 *score
= *oldscore
+ scoreval
;
4882 /* What follows is a simple remove and re-insert operation that is common
4883 * to both ZADD and ZINCRBY... */
4884 if (dictAdd(zs
->dict
,ele
,score
) == DICT_OK
) {
4885 /* case 1: New element */
4886 incrRefCount(ele
); /* added to hash */
4887 zslInsert(zs
->zsl
,*score
,ele
);
4888 incrRefCount(ele
); /* added to skiplist */
4891 addReplyDouble(c
,*score
);
4893 addReply(c
,shared
.cone
);
4898 /* case 2: Score update operation */
4899 de
= dictFind(zs
->dict
,ele
);
4900 redisAssert(de
!= NULL
);
4901 oldscore
= dictGetEntryVal(de
);
4902 if (*score
!= *oldscore
) {
4905 /* Remove and insert the element in the skip list with new score */
4906 deleted
= zslDelete(zs
->zsl
,*oldscore
,ele
);
4907 redisAssert(deleted
!= 0);
4908 zslInsert(zs
->zsl
,*score
,ele
);
4910 /* Update the score in the hash table */
4911 dictReplace(zs
->dict
,ele
,score
);
4917 addReplyDouble(c
,*score
);
4919 addReply(c
,shared
.czero
);
4923 static void zaddCommand(redisClient
*c
) {
4926 scoreval
= strtod(c
->argv
[2]->ptr
,NULL
);
4927 zaddGenericCommand(c
,c
->argv
[1],c
->argv
[3],scoreval
,0);
4930 static void zincrbyCommand(redisClient
*c
) {
4933 scoreval
= strtod(c
->argv
[2]->ptr
,NULL
);
4934 zaddGenericCommand(c
,c
->argv
[1],c
->argv
[3],scoreval
,1);
4937 static void zremCommand(redisClient
*c
) {
4941 zsetobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4942 if (zsetobj
== NULL
) {
4943 addReply(c
,shared
.czero
);
4949 if (zsetobj
->type
!= REDIS_ZSET
) {
4950 addReply(c
,shared
.wrongtypeerr
);
4954 de
= dictFind(zs
->dict
,c
->argv
[2]);
4956 addReply(c
,shared
.czero
);
4959 /* Delete from the skiplist */
4960 oldscore
= dictGetEntryVal(de
);
4961 deleted
= zslDelete(zs
->zsl
,*oldscore
,c
->argv
[2]);
4962 redisAssert(deleted
!= 0);
4964 /* Delete from the hash table */
4965 dictDelete(zs
->dict
,c
->argv
[2]);
4966 if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
);
4968 addReply(c
,shared
.cone
);
4972 static void zremrangebyscoreCommand(redisClient
*c
) {
4973 double min
= strtod(c
->argv
[2]->ptr
,NULL
);
4974 double max
= strtod(c
->argv
[3]->ptr
,NULL
);
4978 zsetobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4979 if (zsetobj
== NULL
) {
4980 addReply(c
,shared
.czero
);
4984 if (zsetobj
->type
!= REDIS_ZSET
) {
4985 addReply(c
,shared
.wrongtypeerr
);
4989 deleted
= zslDeleteRange(zs
->zsl
,min
,max
,zs
->dict
);
4990 if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
);
4991 server
.dirty
+= deleted
;
4992 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",deleted
));
4996 static void zrangeGenericCommand(redisClient
*c
, int reverse
) {
4998 int start
= atoi(c
->argv
[2]->ptr
);
4999 int end
= atoi(c
->argv
[3]->ptr
);
5002 if (c
->argc
== 5 && !strcasecmp(c
->argv
[4]->ptr
,"withscores")) {
5004 } else if (c
->argc
>= 5) {
5005 addReply(c
,shared
.syntaxerr
);
5009 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
5011 addReply(c
,shared
.nullmultibulk
);
5013 if (o
->type
!= REDIS_ZSET
) {
5014 addReply(c
,shared
.wrongtypeerr
);
5016 zset
*zsetobj
= o
->ptr
;
5017 zskiplist
*zsl
= zsetobj
->zsl
;
5020 int llen
= zsl
->length
;
5024 /* convert negative indexes */
5025 if (start
< 0) start
= llen
+start
;
5026 if (end
< 0) end
= llen
+end
;
5027 if (start
< 0) start
= 0;
5028 if (end
< 0) end
= 0;
5030 /* indexes sanity checks */
5031 if (start
> end
|| start
>= llen
) {
5032 /* Out of range start or start > end result in empty list */
5033 addReply(c
,shared
.emptymultibulk
);
5036 if (end
>= llen
) end
= llen
-1;
5037 rangelen
= (end
-start
)+1;
5039 /* Return the result in form of a multi-bulk reply */
5045 ln
= zsl
->header
->forward
[0];
5047 ln
= ln
->forward
[0];
5050 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",
5051 withscores
? (rangelen
*2) : rangelen
));
5052 for (j
= 0; j
< rangelen
; j
++) {
5054 addReplyBulkLen(c
,ele
);
5056 addReply(c
,shared
.crlf
);
5058 addReplyDouble(c
,ln
->score
);
5059 ln
= reverse
? ln
->backward
: ln
->forward
[0];
5065 static void zrangeCommand(redisClient
*c
) {
5066 zrangeGenericCommand(c
,0);
5069 static void zrevrangeCommand(redisClient
*c
) {
5070 zrangeGenericCommand(c
,1);
5073 static void zrangebyscoreCommand(redisClient
*c
) {
5075 double min
= strtod(c
->argv
[2]->ptr
,NULL
);
5076 double max
= strtod(c
->argv
[3]->ptr
,NULL
);
5077 int offset
= 0, limit
= -1;
5079 if (c
->argc
!= 4 && c
->argc
!= 7) {
5081 sdsnew("-ERR wrong number of arguments for ZRANGEBYSCORE\r\n"));
5083 } else if (c
->argc
== 7 && strcasecmp(c
->argv
[4]->ptr
,"limit")) {
5084 addReply(c
,shared
.syntaxerr
);
5086 } else if (c
->argc
== 7) {
5087 offset
= atoi(c
->argv
[5]->ptr
);
5088 limit
= atoi(c
->argv
[6]->ptr
);
5089 if (offset
< 0) offset
= 0;
5092 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
5094 addReply(c
,shared
.nullmultibulk
);
5096 if (o
->type
!= REDIS_ZSET
) {
5097 addReply(c
,shared
.wrongtypeerr
);
5099 zset
*zsetobj
= o
->ptr
;
5100 zskiplist
*zsl
= zsetobj
->zsl
;
5103 unsigned int rangelen
= 0;
5105 /* Get the first node with the score >= min */
5106 ln
= zslFirstWithScore(zsl
,min
);
5108 /* No element matching the speciifed interval */
5109 addReply(c
,shared
.emptymultibulk
);
5113 /* We don't know in advance how many matching elements there
5114 * are in the list, so we push this object that will represent
5115 * the multi-bulk length in the output buffer, and will "fix"
5117 lenobj
= createObject(REDIS_STRING
,NULL
);
5119 decrRefCount(lenobj
);
5121 while(ln
&& ln
->score
<= max
) {
5124 ln
= ln
->forward
[0];
5127 if (limit
== 0) break;
5129 addReplyBulkLen(c
,ele
);
5131 addReply(c
,shared
.crlf
);
5132 ln
= ln
->forward
[0];
5134 if (limit
> 0) limit
--;
5136 lenobj
->ptr
= sdscatprintf(sdsempty(),"*%d\r\n",rangelen
);
5141 static void zcardCommand(redisClient
*c
) {
5145 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
5147 addReply(c
,shared
.czero
);
5150 if (o
->type
!= REDIS_ZSET
) {
5151 addReply(c
,shared
.wrongtypeerr
);
5154 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",zs
->zsl
->length
));
5159 static void zscoreCommand(redisClient
*c
) {
5163 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
5165 addReply(c
,shared
.nullbulk
);
5168 if (o
->type
!= REDIS_ZSET
) {
5169 addReply(c
,shared
.wrongtypeerr
);
5174 de
= dictFind(zs
->dict
,c
->argv
[2]);
5176 addReply(c
,shared
.nullbulk
);
5178 double *score
= dictGetEntryVal(de
);
5180 addReplyDouble(c
,*score
);
5186 /* ========================= Non type-specific commands ==================== */
5188 static void flushdbCommand(redisClient
*c
) {
5189 server
.dirty
+= dictSize(c
->db
->dict
);
5190 dictEmpty(c
->db
->dict
);
5191 dictEmpty(c
->db
->expires
);
5192 addReply(c
,shared
.ok
);
5195 static void flushallCommand(redisClient
*c
) {
5196 server
.dirty
+= emptyDb();
5197 addReply(c
,shared
.ok
);
5198 rdbSave(server
.dbfilename
);
5202 static redisSortOperation
*createSortOperation(int type
, robj
*pattern
) {
5203 redisSortOperation
*so
= zmalloc(sizeof(*so
));
5205 so
->pattern
= pattern
;
5209 /* Return the value associated to the key with a name obtained
5210 * substituting the first occurence of '*' in 'pattern' with 'subst' */
5211 static robj
*lookupKeyByPattern(redisDb
*db
, robj
*pattern
, robj
*subst
) {
5215 int prefixlen
, sublen
, postfixlen
;
5216 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
5220 char buf
[REDIS_SORTKEY_MAX
+1];
5223 /* If the pattern is "#" return the substitution object itself in order
5224 * to implement the "SORT ... GET #" feature. */
5225 spat
= pattern
->ptr
;
5226 if (spat
[0] == '#' && spat
[1] == '\0') {
5230 /* The substitution object may be specially encoded. If so we create
5231 * a decoded object on the fly. Otherwise getDecodedObject will just
5232 * increment the ref count, that we'll decrement later. */
5233 subst
= getDecodedObject(subst
);
5236 if (sdslen(spat
)+sdslen(ssub
)-1 > REDIS_SORTKEY_MAX
) return NULL
;
5237 p
= strchr(spat
,'*');
5239 decrRefCount(subst
);
5244 sublen
= sdslen(ssub
);
5245 postfixlen
= sdslen(spat
)-(prefixlen
+1);
5246 memcpy(keyname
.buf
,spat
,prefixlen
);
5247 memcpy(keyname
.buf
+prefixlen
,ssub
,sublen
);
5248 memcpy(keyname
.buf
+prefixlen
+sublen
,p
+1,postfixlen
);
5249 keyname
.buf
[prefixlen
+sublen
+postfixlen
] = '\0';
5250 keyname
.len
= prefixlen
+sublen
+postfixlen
;
5252 initStaticStringObject(keyobj
,((char*)&keyname
)+(sizeof(long)*2))
5253 decrRefCount(subst
);
5255 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
5256 return lookupKeyRead(db
,&keyobj
);
5259 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
5260 * the additional parameter is not standard but a BSD-specific we have to
5261 * pass sorting parameters via the global 'server' structure */
5262 static int sortCompare(const void *s1
, const void *s2
) {
5263 const redisSortObject
*so1
= s1
, *so2
= s2
;
5266 if (!server
.sort_alpha
) {
5267 /* Numeric sorting. Here it's trivial as we precomputed scores */
5268 if (so1
->u
.score
> so2
->u
.score
) {
5270 } else if (so1
->u
.score
< so2
->u
.score
) {
5276 /* Alphanumeric sorting */
5277 if (server
.sort_bypattern
) {
5278 if (!so1
->u
.cmpobj
|| !so2
->u
.cmpobj
) {
5279 /* At least one compare object is NULL */
5280 if (so1
->u
.cmpobj
== so2
->u
.cmpobj
)
5282 else if (so1
->u
.cmpobj
== NULL
)
5287 /* We have both the objects, use strcoll */
5288 cmp
= strcoll(so1
->u
.cmpobj
->ptr
,so2
->u
.cmpobj
->ptr
);
5291 /* Compare elements directly */
5294 dec1
= getDecodedObject(so1
->obj
);
5295 dec2
= getDecodedObject(so2
->obj
);
5296 cmp
= strcoll(dec1
->ptr
,dec2
->ptr
);
5301 return server
.sort_desc
? -cmp
: cmp
;
5304 /* The SORT command is the most complex command in Redis. Warning: this code
5305 * is optimized for speed and a bit less for readability */
5306 static void sortCommand(redisClient
*c
) {
5309 int desc
= 0, alpha
= 0;
5310 int limit_start
= 0, limit_count
= -1, start
, end
;
5311 int j
, dontsort
= 0, vectorlen
;
5312 int getop
= 0; /* GET operation counter */
5313 robj
*sortval
, *sortby
= NULL
, *storekey
= NULL
;
5314 redisSortObject
*vector
; /* Resulting vector to sort */
5316 /* Lookup the key to sort. It must be of the right types */
5317 sortval
= lookupKeyRead(c
->db
,c
->argv
[1]);
5318 if (sortval
== NULL
) {
5319 addReply(c
,shared
.nullmultibulk
);
5322 if (sortval
->type
!= REDIS_SET
&& sortval
->type
!= REDIS_LIST
&&
5323 sortval
->type
!= REDIS_ZSET
)
5325 addReply(c
,shared
.wrongtypeerr
);
5329 /* Create a list of operations to perform for every sorted element.
5330 * Operations can be GET/DEL/INCR/DECR */
5331 operations
= listCreate();
5332 listSetFreeMethod(operations
,zfree
);
5335 /* Now we need to protect sortval incrementing its count, in the future
5336 * SORT may have options able to overwrite/delete keys during the sorting
5337 * and the sorted key itself may get destroied */
5338 incrRefCount(sortval
);
5340 /* The SORT command has an SQL-alike syntax, parse it */
5341 while(j
< c
->argc
) {
5342 int leftargs
= c
->argc
-j
-1;
5343 if (!strcasecmp(c
->argv
[j
]->ptr
,"asc")) {
5345 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"desc")) {
5347 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"alpha")) {
5349 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"limit") && leftargs
>= 2) {
5350 limit_start
= atoi(c
->argv
[j
+1]->ptr
);
5351 limit_count
= atoi(c
->argv
[j
+2]->ptr
);
5353 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"store") && leftargs
>= 1) {
5354 storekey
= c
->argv
[j
+1];
5356 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"by") && leftargs
>= 1) {
5357 sortby
= c
->argv
[j
+1];
5358 /* If the BY pattern does not contain '*', i.e. it is constant,
5359 * we don't need to sort nor to lookup the weight keys. */
5360 if (strchr(c
->argv
[j
+1]->ptr
,'*') == NULL
) dontsort
= 1;
5362 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"get") && leftargs
>= 1) {
5363 listAddNodeTail(operations
,createSortOperation(
5364 REDIS_SORT_GET
,c
->argv
[j
+1]));
5368 decrRefCount(sortval
);
5369 listRelease(operations
);
5370 addReply(c
,shared
.syntaxerr
);
5376 /* Load the sorting vector with all the objects to sort */
5377 switch(sortval
->type
) {
5378 case REDIS_LIST
: vectorlen
= listLength((list
*)sortval
->ptr
); break;
5379 case REDIS_SET
: vectorlen
= dictSize((dict
*)sortval
->ptr
); break;
5380 case REDIS_ZSET
: vectorlen
= dictSize(((zset
*)sortval
->ptr
)->dict
); break;
5381 default: vectorlen
= 0; redisAssert(0); /* Avoid GCC warning */
5383 vector
= zmalloc(sizeof(redisSortObject
)*vectorlen
);
5386 if (sortval
->type
== REDIS_LIST
) {
5387 list
*list
= sortval
->ptr
;
5391 listRewind(list
,&li
);
5392 while((ln
= listNext(&li
))) {
5393 robj
*ele
= ln
->value
;
5394 vector
[j
].obj
= ele
;
5395 vector
[j
].u
.score
= 0;
5396 vector
[j
].u
.cmpobj
= NULL
;
5404 if (sortval
->type
== REDIS_SET
) {
5407 zset
*zs
= sortval
->ptr
;
5411 di
= dictGetIterator(set
);
5412 while((setele
= dictNext(di
)) != NULL
) {
5413 vector
[j
].obj
= dictGetEntryKey(setele
);
5414 vector
[j
].u
.score
= 0;
5415 vector
[j
].u
.cmpobj
= NULL
;
5418 dictReleaseIterator(di
);
5420 redisAssert(j
== vectorlen
);
5422 /* Now it's time to load the right scores in the sorting vector */
5423 if (dontsort
== 0) {
5424 for (j
= 0; j
< vectorlen
; j
++) {
5428 byval
= lookupKeyByPattern(c
->db
,sortby
,vector
[j
].obj
);
5429 if (!byval
|| byval
->type
!= REDIS_STRING
) continue;
5431 vector
[j
].u
.cmpobj
= getDecodedObject(byval
);
5433 if (byval
->encoding
== REDIS_ENCODING_RAW
) {
5434 vector
[j
].u
.score
= strtod(byval
->ptr
,NULL
);
5436 /* Don't need to decode the object if it's
5437 * integer-encoded (the only encoding supported) so
5438 * far. We can just cast it */
5439 if (byval
->encoding
== REDIS_ENCODING_INT
) {
5440 vector
[j
].u
.score
= (long)byval
->ptr
;
5442 redisAssert(1 != 1);
5447 if (vector
[j
].obj
->encoding
== REDIS_ENCODING_RAW
)
5448 vector
[j
].u
.score
= strtod(vector
[j
].obj
->ptr
,NULL
);
5450 if (vector
[j
].obj
->encoding
== REDIS_ENCODING_INT
)
5451 vector
[j
].u
.score
= (long) vector
[j
].obj
->ptr
;
5453 redisAssert(1 != 1);
5460 /* We are ready to sort the vector... perform a bit of sanity check
5461 * on the LIMIT option too. We'll use a partial version of quicksort. */
5462 start
= (limit_start
< 0) ? 0 : limit_start
;
5463 end
= (limit_count
< 0) ? vectorlen
-1 : start
+limit_count
-1;
5464 if (start
>= vectorlen
) {
5465 start
= vectorlen
-1;
5468 if (end
>= vectorlen
) end
= vectorlen
-1;
5470 if (dontsort
== 0) {
5471 server
.sort_desc
= desc
;
5472 server
.sort_alpha
= alpha
;
5473 server
.sort_bypattern
= sortby
? 1 : 0;
5474 if (sortby
&& (start
!= 0 || end
!= vectorlen
-1))
5475 pqsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
, start
,end
);
5477 qsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
);
5480 /* Send command output to the output buffer, performing the specified
5481 * GET/DEL/INCR/DECR operations if any. */
5482 outputlen
= getop
? getop
*(end
-start
+1) : end
-start
+1;
5483 if (storekey
== NULL
) {
5484 /* STORE option not specified, sent the sorting result to client */
5485 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",outputlen
));
5486 for (j
= start
; j
<= end
; j
++) {
5491 addReplyBulkLen(c
,vector
[j
].obj
);
5492 addReply(c
,vector
[j
].obj
);
5493 addReply(c
,shared
.crlf
);
5495 listRewind(operations
,&li
);
5496 while((ln
= listNext(&li
))) {
5497 redisSortOperation
*sop
= ln
->value
;
5498 robj
*val
= lookupKeyByPattern(c
->db
,sop
->pattern
,
5501 if (sop
->type
== REDIS_SORT_GET
) {
5502 if (!val
|| val
->type
!= REDIS_STRING
) {
5503 addReply(c
,shared
.nullbulk
);
5505 addReplyBulkLen(c
,val
);
5507 addReply(c
,shared
.crlf
);
5510 redisAssert(sop
->type
== REDIS_SORT_GET
); /* always fails */
5515 robj
*listObject
= createListObject();
5516 list
*listPtr
= (list
*) listObject
->ptr
;
5518 /* STORE option specified, set the sorting result as a List object */
5519 for (j
= start
; j
<= end
; j
++) {
5524 listAddNodeTail(listPtr
,vector
[j
].obj
);
5525 incrRefCount(vector
[j
].obj
);
5527 listRewind(operations
,&li
);
5528 while((ln
= listNext(&li
))) {
5529 redisSortOperation
*sop
= ln
->value
;
5530 robj
*val
= lookupKeyByPattern(c
->db
,sop
->pattern
,
5533 if (sop
->type
== REDIS_SORT_GET
) {
5534 if (!val
|| val
->type
!= REDIS_STRING
) {
5535 listAddNodeTail(listPtr
,createStringObject("",0));
5537 listAddNodeTail(listPtr
,val
);
5541 redisAssert(sop
->type
== REDIS_SORT_GET
); /* always fails */
5545 if (dictReplace(c
->db
->dict
,storekey
,listObject
)) {
5546 incrRefCount(storekey
);
5548 /* Note: we add 1 because the DB is dirty anyway since even if the
5549 * SORT result is empty a new key is set and maybe the old content
5551 server
.dirty
+= 1+outputlen
;
5552 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",outputlen
));
5556 decrRefCount(sortval
);
5557 listRelease(operations
);
5558 for (j
= 0; j
< vectorlen
; j
++) {
5559 if (sortby
&& alpha
&& vector
[j
].u
.cmpobj
)
5560 decrRefCount(vector
[j
].u
.cmpobj
);
5565 /* Convert an amount of bytes into a human readable string in the form
5566 * of 100B, 2G, 100M, 4K, and so forth. */
5567 static void bytesToHuman(char *s
, unsigned long long n
) {
5572 sprintf(s
,"%lluB",n
);
5574 } else if (n
< (1024*1024)) {
5575 d
= (double)n
/(1024);
5576 sprintf(s
,"%.2fK",d
);
5577 } else if (n
< (1024LL*1024*1024)) {
5578 d
= (double)n
/(1024*1024);
5579 sprintf(s
,"%.2fM",d
);
5580 } else if (n
< (1024LL*1024*1024*1024)) {
5581 d
= (double)n
/(1024LL*1024*1024);
5582 sprintf(s
,"%.2fM",d
);
5586 /* Create the string returned by the INFO command. This is decoupled
5587 * by the INFO command itself as we need to report the same information
5588 * on memory corruption problems. */
5589 static sds
genRedisInfoString(void) {
5591 time_t uptime
= time(NULL
)-server
.stat_starttime
;
5595 bytesToHuman(hmem
,server
.usedmemory
);
5596 info
= sdscatprintf(sdsempty(),
5597 "redis_version:%s\r\n"
5599 "multiplexing_api:%s\r\n"
5600 "process_id:%ld\r\n"
5601 "uptime_in_seconds:%ld\r\n"
5602 "uptime_in_days:%ld\r\n"
5603 "connected_clients:%d\r\n"
5604 "connected_slaves:%d\r\n"
5605 "blocked_clients:%d\r\n"
5606 "used_memory:%zu\r\n"
5607 "used_memory_human:%s\r\n"
5608 "changes_since_last_save:%lld\r\n"
5609 "bgsave_in_progress:%d\r\n"
5610 "last_save_time:%ld\r\n"
5611 "bgrewriteaof_in_progress:%d\r\n"
5612 "total_connections_received:%lld\r\n"
5613 "total_commands_processed:%lld\r\n"
5617 (sizeof(long) == 8) ? "64" : "32",
5622 listLength(server
.clients
)-listLength(server
.slaves
),
5623 listLength(server
.slaves
),
5624 server
.blockedclients
,
5628 server
.bgsavechildpid
!= -1,
5630 server
.bgrewritechildpid
!= -1,
5631 server
.stat_numconnections
,
5632 server
.stat_numcommands
,
5633 server
.vm_enabled
!= 0,
5634 server
.masterhost
== NULL
? "master" : "slave"
5636 if (server
.masterhost
) {
5637 info
= sdscatprintf(info
,
5638 "master_host:%s\r\n"
5639 "master_port:%d\r\n"
5640 "master_link_status:%s\r\n"
5641 "master_last_io_seconds_ago:%d\r\n"
5644 (server
.replstate
== REDIS_REPL_CONNECTED
) ?
5646 server
.master
? ((int)(time(NULL
)-server
.master
->lastinteraction
)) : -1
5649 if (server
.vm_enabled
) {
5651 info
= sdscatprintf(info
,
5652 "vm_conf_max_memory:%llu\r\n"
5653 "vm_conf_page_size:%llu\r\n"
5654 "vm_conf_pages:%llu\r\n"
5655 "vm_stats_used_pages:%llu\r\n"
5656 "vm_stats_swapped_objects:%llu\r\n"
5657 "vm_stats_swappin_count:%llu\r\n"
5658 "vm_stats_swappout_count:%llu\r\n"
5659 "vm_stats_io_newjobs_len:%lu\r\n"
5660 "vm_stats_io_processing_len:%lu\r\n"
5661 "vm_stats_io_processed_len:%lu\r\n"
5662 "vm_stats_io_waiting_clients:%lu\r\n"
5663 "vm_stats_io_active_threads:%lu\r\n"
5664 ,(unsigned long long) server
.vm_max_memory
,
5665 (unsigned long long) server
.vm_page_size
,
5666 (unsigned long long) server
.vm_pages
,
5667 (unsigned long long) server
.vm_stats_used_pages
,
5668 (unsigned long long) server
.vm_stats_swapped_objects
,
5669 (unsigned long long) server
.vm_stats_swapins
,
5670 (unsigned long long) server
.vm_stats_swapouts
,
5671 (unsigned long) listLength(server
.io_newjobs
),
5672 (unsigned long) listLength(server
.io_processing
),
5673 (unsigned long) listLength(server
.io_processed
),
5674 (unsigned long) listLength(server
.io_clients
),
5675 (unsigned long) server
.io_active_threads
5679 for (j
= 0; j
< server
.dbnum
; j
++) {
5680 long long keys
, vkeys
;
5682 keys
= dictSize(server
.db
[j
].dict
);
5683 vkeys
= dictSize(server
.db
[j
].expires
);
5684 if (keys
|| vkeys
) {
5685 info
= sdscatprintf(info
, "db%d:keys=%lld,expires=%lld\r\n",
5692 static void infoCommand(redisClient
*c
) {
5693 sds info
= genRedisInfoString();
5694 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n",
5695 (unsigned long)sdslen(info
)));
5696 addReplySds(c
,info
);
5697 addReply(c
,shared
.crlf
);
5700 static void monitorCommand(redisClient
*c
) {
5701 /* ignore MONITOR if aleady slave or in monitor mode */
5702 if (c
->flags
& REDIS_SLAVE
) return;
5704 c
->flags
|= (REDIS_SLAVE
|REDIS_MONITOR
);
5706 listAddNodeTail(server
.monitors
,c
);
5707 addReply(c
,shared
.ok
);
5710 /* ================================= Expire ================================= */
5711 static int removeExpire(redisDb
*db
, robj
*key
) {
5712 if (dictDelete(db
->expires
,key
) == DICT_OK
) {
5719 static int setExpire(redisDb
*db
, robj
*key
, time_t when
) {
5720 if (dictAdd(db
->expires
,key
,(void*)when
) == DICT_ERR
) {
5728 /* Return the expire time of the specified key, or -1 if no expire
5729 * is associated with this key (i.e. the key is non volatile) */
5730 static time_t getExpire(redisDb
*db
, robj
*key
) {
5733 /* No expire? return ASAP */
5734 if (dictSize(db
->expires
) == 0 ||
5735 (de
= dictFind(db
->expires
,key
)) == NULL
) return -1;
5737 return (time_t) dictGetEntryVal(de
);
5740 static int expireIfNeeded(redisDb
*db
, robj
*key
) {
5744 /* No expire? return ASAP */
5745 if (dictSize(db
->expires
) == 0 ||
5746 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
5748 /* Lookup the expire */
5749 when
= (time_t) dictGetEntryVal(de
);
5750 if (time(NULL
) <= when
) return 0;
5752 /* Delete the key */
5753 dictDelete(db
->expires
,key
);
5754 return dictDelete(db
->dict
,key
) == DICT_OK
;
5757 static int deleteIfVolatile(redisDb
*db
, robj
*key
) {
5760 /* No expire? return ASAP */
5761 if (dictSize(db
->expires
) == 0 ||
5762 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
5764 /* Delete the key */
5766 dictDelete(db
->expires
,key
);
5767 return dictDelete(db
->dict
,key
) == DICT_OK
;
5770 static void expireGenericCommand(redisClient
*c
, robj
*key
, time_t seconds
) {
5773 de
= dictFind(c
->db
->dict
,key
);
5775 addReply(c
,shared
.czero
);
5779 if (deleteKey(c
->db
,key
)) server
.dirty
++;
5780 addReply(c
, shared
.cone
);
5783 time_t when
= time(NULL
)+seconds
;
5784 if (setExpire(c
->db
,key
,when
)) {
5785 addReply(c
,shared
.cone
);
5788 addReply(c
,shared
.czero
);
5794 static void expireCommand(redisClient
*c
) {
5795 expireGenericCommand(c
,c
->argv
[1],strtol(c
->argv
[2]->ptr
,NULL
,10));
5798 static void expireatCommand(redisClient
*c
) {
5799 expireGenericCommand(c
,c
->argv
[1],strtol(c
->argv
[2]->ptr
,NULL
,10)-time(NULL
));
5802 static void ttlCommand(redisClient
*c
) {
5806 expire
= getExpire(c
->db
,c
->argv
[1]);
5808 ttl
= (int) (expire
-time(NULL
));
5809 if (ttl
< 0) ttl
= -1;
5811 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",ttl
));
5814 /* ================================ MULTI/EXEC ============================== */
5816 /* Client state initialization for MULTI/EXEC */
5817 static void initClientMultiState(redisClient
*c
) {
5818 c
->mstate
.commands
= NULL
;
5819 c
->mstate
.count
= 0;
5822 /* Release all the resources associated with MULTI/EXEC state */
5823 static void freeClientMultiState(redisClient
*c
) {
5826 for (j
= 0; j
< c
->mstate
.count
; j
++) {
5828 multiCmd
*mc
= c
->mstate
.commands
+j
;
5830 for (i
= 0; i
< mc
->argc
; i
++)
5831 decrRefCount(mc
->argv
[i
]);
5834 zfree(c
->mstate
.commands
);
5837 /* Add a new command into the MULTI commands queue */
5838 static void queueMultiCommand(redisClient
*c
, struct redisCommand
*cmd
) {
5842 c
->mstate
.commands
= zrealloc(c
->mstate
.commands
,
5843 sizeof(multiCmd
)*(c
->mstate
.count
+1));
5844 mc
= c
->mstate
.commands
+c
->mstate
.count
;
5847 mc
->argv
= zmalloc(sizeof(robj
*)*c
->argc
);
5848 memcpy(mc
->argv
,c
->argv
,sizeof(robj
*)*c
->argc
);
5849 for (j
= 0; j
< c
->argc
; j
++)
5850 incrRefCount(mc
->argv
[j
]);
5854 static void multiCommand(redisClient
*c
) {
5855 c
->flags
|= REDIS_MULTI
;
5856 addReply(c
,shared
.ok
);
5859 static void execCommand(redisClient
*c
) {
5864 if (!(c
->flags
& REDIS_MULTI
)) {
5865 addReplySds(c
,sdsnew("-ERR EXEC without MULTI\r\n"));
5869 orig_argv
= c
->argv
;
5870 orig_argc
= c
->argc
;
5871 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",c
->mstate
.count
));
5872 for (j
= 0; j
< c
->mstate
.count
; j
++) {
5873 c
->argc
= c
->mstate
.commands
[j
].argc
;
5874 c
->argv
= c
->mstate
.commands
[j
].argv
;
5875 call(c
,c
->mstate
.commands
[j
].cmd
);
5877 c
->argv
= orig_argv
;
5878 c
->argc
= orig_argc
;
5879 freeClientMultiState(c
);
5880 initClientMultiState(c
);
5881 c
->flags
&= (~REDIS_MULTI
);
5884 /* =========================== Blocking Operations ========================= */
5886 /* Currently Redis blocking operations support is limited to list POP ops,
5887 * so the current implementation is not fully generic, but it is also not
5888 * completely specific so it will not require a rewrite to support new
5889 * kind of blocking operations in the future.
5891 * Still it's important to note that list blocking operations can be already
5892 * used as a notification mechanism in order to implement other blocking
5893 * operations at application level, so there must be a very strong evidence
5894 * of usefulness and generality before new blocking operations are implemented.
5896 * This is how the current blocking POP works, we use BLPOP as example:
5897 * - If the user calls BLPOP and the key exists and contains a non empty list
5898 * then LPOP is called instead. So BLPOP is semantically the same as LPOP
5899 * if there is not to block.
5900 * - If instead BLPOP is called and the key does not exists or the list is
5901 * empty we need to block. In order to do so we remove the notification for
5902 * new data to read in the client socket (so that we'll not serve new
5903 * requests if the blocking request is not served). Also we put the client
5904 * in a dictionary (db->blockingkeys) mapping keys to a list of clients
5905 * blocking for this keys.
5906 * - If a PUSH operation against a key with blocked clients waiting is
5907 * performed, we serve the first in the list: basically instead to push
5908 * the new element inside the list we return it to the (first / oldest)
5909 * blocking client, unblock the client, and remove it form the list.
5911 * The above comment and the source code should be enough in order to understand
5912 * the implementation and modify / fix it later.
5915 /* Set a client in blocking mode for the specified key, with the specified
5917 static void blockForKeys(redisClient
*c
, robj
**keys
, int numkeys
, time_t timeout
) {
5922 c
->blockingkeys
= zmalloc(sizeof(robj
*)*numkeys
);
5923 c
->blockingkeysnum
= numkeys
;
5924 c
->blockingto
= timeout
;
5925 for (j
= 0; j
< numkeys
; j
++) {
5926 /* Add the key in the client structure, to map clients -> keys */
5927 c
->blockingkeys
[j
] = keys
[j
];
5928 incrRefCount(keys
[j
]);
5930 /* And in the other "side", to map keys -> clients */
5931 de
= dictFind(c
->db
->blockingkeys
,keys
[j
]);
5935 /* For every key we take a list of clients blocked for it */
5937 retval
= dictAdd(c
->db
->blockingkeys
,keys
[j
],l
);
5938 incrRefCount(keys
[j
]);
5939 assert(retval
== DICT_OK
);
5941 l
= dictGetEntryVal(de
);
5943 listAddNodeTail(l
,c
);
5945 /* Mark the client as a blocked client */
5946 c
->flags
|= REDIS_BLOCKED
;
5947 aeDeleteFileEvent(server
.el
,c
->fd
,AE_READABLE
);
5948 server
.blockedclients
++;
5951 /* Unblock a client that's waiting in a blocking operation such as BLPOP */
5952 static void unblockClient(redisClient
*c
) {
5957 assert(c
->blockingkeys
!= NULL
);
5958 /* The client may wait for multiple keys, so unblock it for every key. */
5959 for (j
= 0; j
< c
->blockingkeysnum
; j
++) {
5960 /* Remove this client from the list of clients waiting for this key. */
5961 de
= dictFind(c
->db
->blockingkeys
,c
->blockingkeys
[j
]);
5963 l
= dictGetEntryVal(de
);
5964 listDelNode(l
,listSearchKey(l
,c
));
5965 /* If the list is empty we need to remove it to avoid wasting memory */
5966 if (listLength(l
) == 0)
5967 dictDelete(c
->db
->blockingkeys
,c
->blockingkeys
[j
]);
5968 decrRefCount(c
->blockingkeys
[j
]);
5970 /* Cleanup the client structure */
5971 zfree(c
->blockingkeys
);
5972 c
->blockingkeys
= NULL
;
5973 c
->flags
&= (~REDIS_BLOCKED
);
5974 server
.blockedclients
--;
5975 /* Ok now we are ready to get read events from socket, note that we
5976 * can't trap errors here as it's possible that unblockClients() is
5977 * called from freeClient() itself, and the only thing we can do
5978 * if we failed to register the READABLE event is to kill the client.
5979 * Still the following function should never fail in the real world as
5980 * we are sure the file descriptor is sane, and we exit on out of mem. */
5981 aeCreateFileEvent(server
.el
, c
->fd
, AE_READABLE
, readQueryFromClient
, c
);
5982 /* As a final step we want to process data if there is some command waiting
5983 * in the input buffer. Note that this is safe even if unblockClient()
5984 * gets called from freeClient() because freeClient() will be smart
5985 * enough to call this function *after* c->querybuf was set to NULL. */
5986 if (c
->querybuf
&& sdslen(c
->querybuf
) > 0) processInputBuffer(c
);
5989 /* This should be called from any function PUSHing into lists.
5990 * 'c' is the "pushing client", 'key' is the key it is pushing data against,
5991 * 'ele' is the element pushed.
5993 * If the function returns 0 there was no client waiting for a list push
5996 * If the function returns 1 there was a client waiting for a list push
5997 * against this key, the element was passed to this client thus it's not
5998 * needed to actually add it to the list and the caller should return asap. */
5999 static int handleClientsWaitingListPush(redisClient
*c
, robj
*key
, robj
*ele
) {
6000 struct dictEntry
*de
;
6001 redisClient
*receiver
;
6005 de
= dictFind(c
->db
->blockingkeys
,key
);
6006 if (de
== NULL
) return 0;
6007 l
= dictGetEntryVal(de
);
6010 receiver
= ln
->value
;
6012 addReplySds(receiver
,sdsnew("*2\r\n"));
6013 addReplyBulkLen(receiver
,key
);
6014 addReply(receiver
,key
);
6015 addReply(receiver
,shared
.crlf
);
6016 addReplyBulkLen(receiver
,ele
);
6017 addReply(receiver
,ele
);
6018 addReply(receiver
,shared
.crlf
);
6019 unblockClient(receiver
);
6023 /* Blocking RPOP/LPOP */
6024 static void blockingPopGenericCommand(redisClient
*c
, int where
) {
6029 for (j
= 1; j
< c
->argc
-1; j
++) {
6030 o
= lookupKeyWrite(c
->db
,c
->argv
[j
]);
6032 if (o
->type
!= REDIS_LIST
) {
6033 addReply(c
,shared
.wrongtypeerr
);
6036 list
*list
= o
->ptr
;
6037 if (listLength(list
) != 0) {
6038 /* If the list contains elements fall back to the usual
6039 * non-blocking POP operation */
6040 robj
*argv
[2], **orig_argv
;
6043 /* We need to alter the command arguments before to call
6044 * popGenericCommand() as the command takes a single key. */
6045 orig_argv
= c
->argv
;
6046 orig_argc
= c
->argc
;
6047 argv
[1] = c
->argv
[j
];
6051 /* Also the return value is different, we need to output
6052 * the multi bulk reply header and the key name. The
6053 * "real" command will add the last element (the value)
6054 * for us. If this souds like an hack to you it's just
6055 * because it is... */
6056 addReplySds(c
,sdsnew("*2\r\n"));
6057 addReplyBulkLen(c
,argv
[1]);
6058 addReply(c
,argv
[1]);
6059 addReply(c
,shared
.crlf
);
6060 popGenericCommand(c
,where
);
6062 /* Fix the client structure with the original stuff */
6063 c
->argv
= orig_argv
;
6064 c
->argc
= orig_argc
;
6070 /* If the list is empty or the key does not exists we must block */
6071 timeout
= strtol(c
->argv
[c
->argc
-1]->ptr
,NULL
,10);
6072 if (timeout
> 0) timeout
+= time(NULL
);
6073 blockForKeys(c
,c
->argv
+1,c
->argc
-2,timeout
);
6076 static void blpopCommand(redisClient
*c
) {
6077 blockingPopGenericCommand(c
,REDIS_HEAD
);
6080 static void brpopCommand(redisClient
*c
) {
6081 blockingPopGenericCommand(c
,REDIS_TAIL
);
6084 /* =============================== Replication ============================= */
6086 static int syncWrite(int fd
, char *ptr
, ssize_t size
, int timeout
) {
6087 ssize_t nwritten
, ret
= size
;
6088 time_t start
= time(NULL
);
6092 if (aeWait(fd
,AE_WRITABLE
,1000) & AE_WRITABLE
) {
6093 nwritten
= write(fd
,ptr
,size
);
6094 if (nwritten
== -1) return -1;
6098 if ((time(NULL
)-start
) > timeout
) {
6106 static int syncRead(int fd
, char *ptr
, ssize_t size
, int timeout
) {
6107 ssize_t nread
, totread
= 0;
6108 time_t start
= time(NULL
);
6112 if (aeWait(fd
,AE_READABLE
,1000) & AE_READABLE
) {
6113 nread
= read(fd
,ptr
,size
);
6114 if (nread
== -1) return -1;
6119 if ((time(NULL
)-start
) > timeout
) {
6127 static int syncReadLine(int fd
, char *ptr
, ssize_t size
, int timeout
) {
6134 if (syncRead(fd
,&c
,1,timeout
) == -1) return -1;
6137 if (nread
&& *(ptr
-1) == '\r') *(ptr
-1) = '\0';
6148 static void syncCommand(redisClient
*c
) {
6149 /* ignore SYNC if aleady slave or in monitor mode */
6150 if (c
->flags
& REDIS_SLAVE
) return;
6152 /* SYNC can't be issued when the server has pending data to send to
6153 * the client about already issued commands. We need a fresh reply
6154 * buffer registering the differences between the BGSAVE and the current
6155 * dataset, so that we can copy to other slaves if needed. */
6156 if (listLength(c
->reply
) != 0) {
6157 addReplySds(c
,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
6161 redisLog(REDIS_NOTICE
,"Slave ask for synchronization");
6162 /* Here we need to check if there is a background saving operation
6163 * in progress, or if it is required to start one */
6164 if (server
.bgsavechildpid
!= -1) {
6165 /* Ok a background save is in progress. Let's check if it is a good
6166 * one for replication, i.e. if there is another slave that is
6167 * registering differences since the server forked to save */
6172 listRewind(server
.slaves
,&li
);
6173 while((ln
= listNext(&li
))) {
6175 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) break;
6178 /* Perfect, the server is already registering differences for
6179 * another slave. Set the right state, and copy the buffer. */
6180 listRelease(c
->reply
);
6181 c
->reply
= listDup(slave
->reply
);
6182 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
6183 redisLog(REDIS_NOTICE
,"Waiting for end of BGSAVE for SYNC");
6185 /* No way, we need to wait for the next BGSAVE in order to
6186 * register differences */
6187 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_START
;
6188 redisLog(REDIS_NOTICE
,"Waiting for next BGSAVE for SYNC");
6191 /* Ok we don't have a BGSAVE in progress, let's start one */
6192 redisLog(REDIS_NOTICE
,"Starting BGSAVE for SYNC");
6193 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
6194 redisLog(REDIS_NOTICE
,"Replication failed, can't BGSAVE");
6195 addReplySds(c
,sdsnew("-ERR Unalbe to perform background save\r\n"));
6198 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
6201 c
->flags
|= REDIS_SLAVE
;
6203 listAddNodeTail(server
.slaves
,c
);
6207 static void sendBulkToSlave(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
6208 redisClient
*slave
= privdata
;
6210 REDIS_NOTUSED(mask
);
6211 char buf
[REDIS_IOBUF_LEN
];
6212 ssize_t nwritten
, buflen
;
6214 if (slave
->repldboff
== 0) {
6215 /* Write the bulk write count before to transfer the DB. In theory here
6216 * we don't know how much room there is in the output buffer of the
6217 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
6218 * operations) will never be smaller than the few bytes we need. */
6221 bulkcount
= sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
6223 if (write(fd
,bulkcount
,sdslen(bulkcount
)) != (signed)sdslen(bulkcount
))
6231 lseek(slave
->repldbfd
,slave
->repldboff
,SEEK_SET
);
6232 buflen
= read(slave
->repldbfd
,buf
,REDIS_IOBUF_LEN
);
6234 redisLog(REDIS_WARNING
,"Read error sending DB to slave: %s",
6235 (buflen
== 0) ? "premature EOF" : strerror(errno
));
6239 if ((nwritten
= write(fd
,buf
,buflen
)) == -1) {
6240 redisLog(REDIS_VERBOSE
,"Write error sending DB to slave: %s",
6245 slave
->repldboff
+= nwritten
;
6246 if (slave
->repldboff
== slave
->repldbsize
) {
6247 close(slave
->repldbfd
);
6248 slave
->repldbfd
= -1;
6249 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
6250 slave
->replstate
= REDIS_REPL_ONLINE
;
6251 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
,
6252 sendReplyToClient
, slave
) == AE_ERR
) {
6256 addReplySds(slave
,sdsempty());
6257 redisLog(REDIS_NOTICE
,"Synchronization with slave succeeded");
6261 /* This function is called at the end of every backgrond saving.
6262 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
6263 * otherwise REDIS_ERR is passed to the function.
6265 * The goal of this function is to handle slaves waiting for a successful
6266 * background saving in order to perform non-blocking synchronization. */
6267 static void updateSlavesWaitingBgsave(int bgsaveerr
) {
6269 int startbgsave
= 0;
6272 listRewind(server
.slaves
,&li
);
6273 while((ln
= listNext(&li
))) {
6274 redisClient
*slave
= ln
->value
;
6276 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) {
6278 slave
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
6279 } else if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) {
6280 struct redis_stat buf
;
6282 if (bgsaveerr
!= REDIS_OK
) {
6284 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE child returned an error");
6287 if ((slave
->repldbfd
= open(server
.dbfilename
,O_RDONLY
)) == -1 ||
6288 redis_fstat(slave
->repldbfd
,&buf
) == -1) {
6290 redisLog(REDIS_WARNING
,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno
));
6293 slave
->repldboff
= 0;
6294 slave
->repldbsize
= buf
.st_size
;
6295 slave
->replstate
= REDIS_REPL_SEND_BULK
;
6296 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
6297 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
, sendBulkToSlave
, slave
) == AE_ERR
) {
6304 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
6307 listRewind(server
.slaves
,&li
);
6308 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE failed");
6309 while((ln
= listNext(&li
))) {
6310 redisClient
*slave
= ln
->value
;
6312 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
)
6319 static int syncWithMaster(void) {
6320 char buf
[1024], tmpfile
[256], authcmd
[1024];
6322 int fd
= anetTcpConnect(NULL
,server
.masterhost
,server
.masterport
);
6326 redisLog(REDIS_WARNING
,"Unable to connect to MASTER: %s",
6331 /* AUTH with the master if required. */
6332 if(server
.masterauth
) {
6333 snprintf(authcmd
, 1024, "AUTH %s\r\n", server
.masterauth
);
6334 if (syncWrite(fd
, authcmd
, strlen(server
.masterauth
)+7, 5) == -1) {
6336 redisLog(REDIS_WARNING
,"Unable to AUTH to MASTER: %s",
6340 /* Read the AUTH result. */
6341 if (syncReadLine(fd
,buf
,1024,3600) == -1) {
6343 redisLog(REDIS_WARNING
,"I/O error reading auth result from MASTER: %s",
6347 if (buf
[0] != '+') {
6349 redisLog(REDIS_WARNING
,"Cannot AUTH to MASTER, is the masterauth password correct?");
6354 /* Issue the SYNC command */
6355 if (syncWrite(fd
,"SYNC \r\n",7,5) == -1) {
6357 redisLog(REDIS_WARNING
,"I/O error writing to MASTER: %s",
6361 /* Read the bulk write count */
6362 if (syncReadLine(fd
,buf
,1024,3600) == -1) {
6364 redisLog(REDIS_WARNING
,"I/O error reading bulk count from MASTER: %s",
6368 if (buf
[0] != '$') {
6370 redisLog(REDIS_WARNING
,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
6373 dumpsize
= atoi(buf
+1);
6374 redisLog(REDIS_NOTICE
,"Receiving %d bytes data dump from MASTER",dumpsize
);
6375 /* Read the bulk write data on a temp file */
6376 snprintf(tmpfile
,256,"temp-%d.%ld.rdb",(int)time(NULL
),(long int)random());
6377 dfd
= open(tmpfile
,O_CREAT
|O_WRONLY
,0644);
6380 redisLog(REDIS_WARNING
,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno
));
6384 int nread
, nwritten
;
6386 nread
= read(fd
,buf
,(dumpsize
< 1024)?dumpsize
:1024);
6388 redisLog(REDIS_WARNING
,"I/O error trying to sync with MASTER: %s",
6394 nwritten
= write(dfd
,buf
,nread
);
6395 if (nwritten
== -1) {
6396 redisLog(REDIS_WARNING
,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno
));
6404 if (rename(tmpfile
,server
.dbfilename
) == -1) {
6405 redisLog(REDIS_WARNING
,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno
));
6411 if (rdbLoad(server
.dbfilename
) != REDIS_OK
) {
6412 redisLog(REDIS_WARNING
,"Failed trying to load the MASTER synchronization DB from disk");
6416 server
.master
= createClient(fd
);
6417 server
.master
->flags
|= REDIS_MASTER
;
6418 server
.master
->authenticated
= 1;
6419 server
.replstate
= REDIS_REPL_CONNECTED
;
6423 static void slaveofCommand(redisClient
*c
) {
6424 if (!strcasecmp(c
->argv
[1]->ptr
,"no") &&
6425 !strcasecmp(c
->argv
[2]->ptr
,"one")) {
6426 if (server
.masterhost
) {
6427 sdsfree(server
.masterhost
);
6428 server
.masterhost
= NULL
;
6429 if (server
.master
) freeClient(server
.master
);
6430 server
.replstate
= REDIS_REPL_NONE
;
6431 redisLog(REDIS_NOTICE
,"MASTER MODE enabled (user request)");
6434 sdsfree(server
.masterhost
);
6435 server
.masterhost
= sdsdup(c
->argv
[1]->ptr
);
6436 server
.masterport
= atoi(c
->argv
[2]->ptr
);
6437 if (server
.master
) freeClient(server
.master
);
6438 server
.replstate
= REDIS_REPL_CONNECT
;
6439 redisLog(REDIS_NOTICE
,"SLAVE OF %s:%d enabled (user request)",
6440 server
.masterhost
, server
.masterport
);
6442 addReply(c
,shared
.ok
);
6445 /* ============================ Maxmemory directive ======================== */
6447 /* Try to free one object form the pre-allocated objects free list.
6448 * This is useful under low mem conditions as by default we take 1 million
6449 * free objects allocated. On success REDIS_OK is returned, otherwise
6451 static int tryFreeOneObjectFromFreelist(void) {
6454 if (server
.vm_enabled
) pthread_mutex_lock(&server
.obj_freelist_mutex
);
6455 if (listLength(server
.objfreelist
)) {
6456 listNode
*head
= listFirst(server
.objfreelist
);
6457 o
= listNodeValue(head
);
6458 listDelNode(server
.objfreelist
,head
);
6459 if (server
.vm_enabled
) pthread_mutex_unlock(&server
.obj_freelist_mutex
);
6463 if (server
.vm_enabled
) pthread_mutex_unlock(&server
.obj_freelist_mutex
);
6468 /* This function gets called when 'maxmemory' is set on the config file to limit
6469 * the max memory used by the server, and we are out of memory.
6470 * This function will try to, in order:
6472 * - Free objects from the free list
6473 * - Try to remove keys with an EXPIRE set
6475 * It is not possible to free enough memory to reach used-memory < maxmemory
6476 * the server will start refusing commands that will enlarge even more the
6479 static void freeMemoryIfNeeded(void) {
6480 while (server
.maxmemory
&& zmalloc_used_memory() > server
.maxmemory
) {
6481 int j
, k
, freed
= 0;
6483 if (tryFreeOneObjectFromFreelist() == REDIS_OK
) continue;
6484 for (j
= 0; j
< server
.dbnum
; j
++) {
6486 robj
*minkey
= NULL
;
6487 struct dictEntry
*de
;
6489 if (dictSize(server
.db
[j
].expires
)) {
6491 /* From a sample of three keys drop the one nearest to
6492 * the natural expire */
6493 for (k
= 0; k
< 3; k
++) {
6496 de
= dictGetRandomKey(server
.db
[j
].expires
);
6497 t
= (time_t) dictGetEntryVal(de
);
6498 if (minttl
== -1 || t
< minttl
) {
6499 minkey
= dictGetEntryKey(de
);
6503 deleteKey(server
.db
+j
,minkey
);
6506 if (!freed
) return; /* nothing to free... */
6510 /* ============================== Append Only file ========================== */
6512 static void feedAppendOnlyFile(struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
) {
6513 sds buf
= sdsempty();
6519 /* The DB this command was targetting is not the same as the last command
6520 * we appendend. To issue a SELECT command is needed. */
6521 if (dictid
!= server
.appendseldb
) {
6524 snprintf(seldb
,sizeof(seldb
),"%d",dictid
);
6525 buf
= sdscatprintf(buf
,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
6526 (unsigned long)strlen(seldb
),seldb
);
6527 server
.appendseldb
= dictid
;
6530 /* "Fix" the argv vector if the command is EXPIRE. We want to translate
6531 * EXPIREs into EXPIREATs calls */
6532 if (cmd
->proc
== expireCommand
) {
6535 tmpargv
[0] = createStringObject("EXPIREAT",8);
6536 tmpargv
[1] = argv
[1];
6537 incrRefCount(argv
[1]);
6538 when
= time(NULL
)+strtol(argv
[2]->ptr
,NULL
,10);
6539 tmpargv
[2] = createObject(REDIS_STRING
,
6540 sdscatprintf(sdsempty(),"%ld",when
));
6544 /* Append the actual command */
6545 buf
= sdscatprintf(buf
,"*%d\r\n",argc
);
6546 for (j
= 0; j
< argc
; j
++) {
6549 o
= getDecodedObject(o
);
6550 buf
= sdscatprintf(buf
,"$%lu\r\n",(unsigned long)sdslen(o
->ptr
));
6551 buf
= sdscatlen(buf
,o
->ptr
,sdslen(o
->ptr
));
6552 buf
= sdscatlen(buf
,"\r\n",2);
6556 /* Free the objects from the modified argv for EXPIREAT */
6557 if (cmd
->proc
== expireCommand
) {
6558 for (j
= 0; j
< 3; j
++)
6559 decrRefCount(argv
[j
]);
6562 /* We want to perform a single write. This should be guaranteed atomic
6563 * at least if the filesystem we are writing is a real physical one.
6564 * While this will save us against the server being killed I don't think
6565 * there is much to do about the whole server stopping for power problems
6567 nwritten
= write(server
.appendfd
,buf
,sdslen(buf
));
6568 if (nwritten
!= (signed)sdslen(buf
)) {
6569 /* Ooops, we are in troubles. The best thing to do for now is
6570 * to simply exit instead to give the illusion that everything is
6571 * working as expected. */
6572 if (nwritten
== -1) {
6573 redisLog(REDIS_WARNING
,"Exiting on error writing to the append-only file: %s",strerror(errno
));
6575 redisLog(REDIS_WARNING
,"Exiting on short write while writing to the append-only file: %s",strerror(errno
));
6579 /* If a background append only file rewriting is in progress we want to
6580 * accumulate the differences between the child DB and the current one
6581 * in a buffer, so that when the child process will do its work we
6582 * can append the differences to the new append only file. */
6583 if (server
.bgrewritechildpid
!= -1)
6584 server
.bgrewritebuf
= sdscatlen(server
.bgrewritebuf
,buf
,sdslen(buf
));
6588 if (server
.appendfsync
== APPENDFSYNC_ALWAYS
||
6589 (server
.appendfsync
== APPENDFSYNC_EVERYSEC
&&
6590 now
-server
.lastfsync
> 1))
6592 fsync(server
.appendfd
); /* Let's try to get this data on the disk */
6593 server
.lastfsync
= now
;
6597 /* In Redis commands are always executed in the context of a client, so in
6598 * order to load the append only file we need to create a fake client. */
6599 static struct redisClient
*createFakeClient(void) {
6600 struct redisClient
*c
= zmalloc(sizeof(*c
));
6604 c
->querybuf
= sdsempty();
6608 /* We set the fake client as a slave waiting for the synchronization
6609 * so that Redis will not try to send replies to this client. */
6610 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_START
;
6611 c
->reply
= listCreate();
6612 listSetFreeMethod(c
->reply
,decrRefCount
);
6613 listSetDupMethod(c
->reply
,dupClientReplyValue
);
6617 static void freeFakeClient(struct redisClient
*c
) {
6618 sdsfree(c
->querybuf
);
6619 listRelease(c
->reply
);
6623 /* Replay the append log file. On error REDIS_OK is returned. On non fatal
6624 * error (the append only file is zero-length) REDIS_ERR is returned. On
6625 * fatal error an error message is logged and the program exists. */
6626 int loadAppendOnlyFile(char *filename
) {
6627 struct redisClient
*fakeClient
;
6628 FILE *fp
= fopen(filename
,"r");
6629 struct redis_stat sb
;
6630 unsigned long long loadedkeys
= 0;
6632 if (redis_fstat(fileno(fp
),&sb
) != -1 && sb
.st_size
== 0)
6636 redisLog(REDIS_WARNING
,"Fatal error: can't open the append log file for reading: %s",strerror(errno
));
6640 fakeClient
= createFakeClient();
6647 struct redisCommand
*cmd
;
6649 if (fgets(buf
,sizeof(buf
),fp
) == NULL
) {
6655 if (buf
[0] != '*') goto fmterr
;
6657 argv
= zmalloc(sizeof(robj
*)*argc
);
6658 for (j
= 0; j
< argc
; j
++) {
6659 if (fgets(buf
,sizeof(buf
),fp
) == NULL
) goto readerr
;
6660 if (buf
[0] != '$') goto fmterr
;
6661 len
= strtol(buf
+1,NULL
,10);
6662 argsds
= sdsnewlen(NULL
,len
);
6663 if (len
&& fread(argsds
,len
,1,fp
) == 0) goto fmterr
;
6664 argv
[j
] = createObject(REDIS_STRING
,argsds
);
6665 if (fread(buf
,2,1,fp
) == 0) goto fmterr
; /* discard CRLF */
6668 /* Command lookup */
6669 cmd
= lookupCommand(argv
[0]->ptr
);
6671 redisLog(REDIS_WARNING
,"Unknown command '%s' reading the append only file", argv
[0]->ptr
);
6674 /* Try object sharing and encoding */
6675 if (server
.shareobjects
) {
6677 for(j
= 1; j
< argc
; j
++)
6678 argv
[j
] = tryObjectSharing(argv
[j
]);
6680 if (cmd
->flags
& REDIS_CMD_BULK
)
6681 tryObjectEncoding(argv
[argc
-1]);
6682 /* Run the command in the context of a fake client */
6683 fakeClient
->argc
= argc
;
6684 fakeClient
->argv
= argv
;
6685 cmd
->proc(fakeClient
);
6686 /* Discard the reply objects list from the fake client */
6687 while(listLength(fakeClient
->reply
))
6688 listDelNode(fakeClient
->reply
,listFirst(fakeClient
->reply
));
6689 /* Clean up, ready for the next command */
6690 for (j
= 0; j
< argc
; j
++) decrRefCount(argv
[j
]);
6692 /* Handle swapping while loading big datasets when VM is on */
6694 if (server
.vm_enabled
&& (loadedkeys
% 5000) == 0) {
6695 while (zmalloc_used_memory() > server
.vm_max_memory
) {
6696 if (vmSwapOneObjectBlocking() == REDIS_ERR
) break;
6701 freeFakeClient(fakeClient
);
6706 redisLog(REDIS_WARNING
,"Unexpected end of file reading the append only file");
6708 redisLog(REDIS_WARNING
,"Unrecoverable error reading the append only file: %s", strerror(errno
));
6712 redisLog(REDIS_WARNING
,"Bad file format reading the append only file");
6716 /* Write an object into a file in the bulk format $<count>\r\n<payload>\r\n */
6717 static int fwriteBulk(FILE *fp
, robj
*obj
) {
6721 /* Avoid the incr/decr ref count business if possible to help
6722 * copy-on-write (we are often in a child process when this function
6724 * Also makes sure that key objects don't get incrRefCount-ed when VM
6726 if (obj
->encoding
!= REDIS_ENCODING_RAW
) {
6727 obj
= getDecodedObject(obj
);
6730 snprintf(buf
,sizeof(buf
),"$%ld\r\n",(long)sdslen(obj
->ptr
));
6731 if (fwrite(buf
,strlen(buf
),1,fp
) == 0) goto err
;
6732 if (sdslen(obj
->ptr
) && fwrite(obj
->ptr
,sdslen(obj
->ptr
),1,fp
) == 0)
6734 if (fwrite("\r\n",2,1,fp
) == 0) goto err
;
6735 if (decrrc
) decrRefCount(obj
);
6738 if (decrrc
) decrRefCount(obj
);
6742 /* Write a double value in bulk format $<count>\r\n<payload>\r\n */
6743 static int fwriteBulkDouble(FILE *fp
, double d
) {
6744 char buf
[128], dbuf
[128];
6746 snprintf(dbuf
,sizeof(dbuf
),"%.17g\r\n",d
);
6747 snprintf(buf
,sizeof(buf
),"$%lu\r\n",(unsigned long)strlen(dbuf
)-2);
6748 if (fwrite(buf
,strlen(buf
),1,fp
) == 0) return 0;
6749 if (fwrite(dbuf
,strlen(dbuf
),1,fp
) == 0) return 0;
6753 /* Write a long value in bulk format $<count>\r\n<payload>\r\n */
6754 static int fwriteBulkLong(FILE *fp
, long l
) {
6755 char buf
[128], lbuf
[128];
6757 snprintf(lbuf
,sizeof(lbuf
),"%ld\r\n",l
);
6758 snprintf(buf
,sizeof(buf
),"$%lu\r\n",(unsigned long)strlen(lbuf
)-2);
6759 if (fwrite(buf
,strlen(buf
),1,fp
) == 0) return 0;
6760 if (fwrite(lbuf
,strlen(lbuf
),1,fp
) == 0) return 0;
6764 /* Write a sequence of commands able to fully rebuild the dataset into
6765 * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */
6766 static int rewriteAppendOnlyFile(char *filename
) {
6767 dictIterator
*di
= NULL
;
6772 time_t now
= time(NULL
);
6774 /* Note that we have to use a different temp name here compared to the
6775 * one used by rewriteAppendOnlyFileBackground() function. */
6776 snprintf(tmpfile
,256,"temp-rewriteaof-%d.aof", (int) getpid());
6777 fp
= fopen(tmpfile
,"w");
6779 redisLog(REDIS_WARNING
, "Failed rewriting the append only file: %s", strerror(errno
));
6782 for (j
= 0; j
< server
.dbnum
; j
++) {
6783 char selectcmd
[] = "*2\r\n$6\r\nSELECT\r\n";
6784 redisDb
*db
= server
.db
+j
;
6786 if (dictSize(d
) == 0) continue;
6787 di
= dictGetIterator(d
);
6793 /* SELECT the new DB */
6794 if (fwrite(selectcmd
,sizeof(selectcmd
)-1,1,fp
) == 0) goto werr
;
6795 if (fwriteBulkLong(fp
,j
) == 0) goto werr
;
6797 /* Iterate this DB writing every entry */
6798 while((de
= dictNext(di
)) != NULL
) {
6803 key
= dictGetEntryKey(de
);
6804 /* If the value for this key is swapped, load a preview in memory.
6805 * We use a "swapped" flag to remember if we need to free the
6806 * value object instead to just increment the ref count anyway
6807 * in order to avoid copy-on-write of pages if we are forked() */
6808 if (!server
.vm_enabled
|| key
->storage
== REDIS_VM_MEMORY
||
6809 key
->storage
== REDIS_VM_SWAPPING
) {
6810 o
= dictGetEntryVal(de
);
6813 o
= vmPreviewObject(key
);
6816 expiretime
= getExpire(db
,key
);
6818 /* Save the key and associated value */
6819 if (o
->type
== REDIS_STRING
) {
6820 /* Emit a SET command */
6821 char cmd
[]="*3\r\n$3\r\nSET\r\n";
6822 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6824 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6825 if (fwriteBulk(fp
,o
) == 0) goto werr
;
6826 } else if (o
->type
== REDIS_LIST
) {
6827 /* Emit the RPUSHes needed to rebuild the list */
6828 list
*list
= o
->ptr
;
6832 listRewind(list
,&li
);
6833 while((ln
= listNext(&li
))) {
6834 char cmd
[]="*3\r\n$5\r\nRPUSH\r\n";
6835 robj
*eleobj
= listNodeValue(ln
);
6837 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6838 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6839 if (fwriteBulk(fp
,eleobj
) == 0) goto werr
;
6841 } else if (o
->type
== REDIS_SET
) {
6842 /* Emit the SADDs needed to rebuild the set */
6844 dictIterator
*di
= dictGetIterator(set
);
6847 while((de
= dictNext(di
)) != NULL
) {
6848 char cmd
[]="*3\r\n$4\r\nSADD\r\n";
6849 robj
*eleobj
= dictGetEntryKey(de
);
6851 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6852 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6853 if (fwriteBulk(fp
,eleobj
) == 0) goto werr
;
6855 dictReleaseIterator(di
);
6856 } else if (o
->type
== REDIS_ZSET
) {
6857 /* Emit the ZADDs needed to rebuild the sorted set */
6859 dictIterator
*di
= dictGetIterator(zs
->dict
);
6862 while((de
= dictNext(di
)) != NULL
) {
6863 char cmd
[]="*4\r\n$4\r\nZADD\r\n";
6864 robj
*eleobj
= dictGetEntryKey(de
);
6865 double *score
= dictGetEntryVal(de
);
6867 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6868 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6869 if (fwriteBulkDouble(fp
,*score
) == 0) goto werr
;
6870 if (fwriteBulk(fp
,eleobj
) == 0) goto werr
;
6872 dictReleaseIterator(di
);
6874 redisAssert(0 != 0);
6876 /* Save the expire time */
6877 if (expiretime
!= -1) {
6878 char cmd
[]="*3\r\n$8\r\nEXPIREAT\r\n";
6879 /* If this key is already expired skip it */
6880 if (expiretime
< now
) continue;
6881 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6882 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6883 if (fwriteBulkLong(fp
,expiretime
) == 0) goto werr
;
6885 if (swapped
) decrRefCount(o
);
6887 dictReleaseIterator(di
);
6890 /* Make sure data will not remain on the OS's output buffers */
6895 /* Use RENAME to make sure the DB file is changed atomically only
6896 * if the generate DB file is ok. */
6897 if (rename(tmpfile
,filename
) == -1) {
6898 redisLog(REDIS_WARNING
,"Error moving temp append only file on the final destination: %s", strerror(errno
));
6902 redisLog(REDIS_NOTICE
,"SYNC append only file rewrite performed");
6908 redisLog(REDIS_WARNING
,"Write error writing append only file on disk: %s", strerror(errno
));
6909 if (di
) dictReleaseIterator(di
);
6913 /* This is how rewriting of the append only file in background works:
6915 * 1) The user calls BGREWRITEAOF
6916 * 2) Redis calls this function, that forks():
6917 * 2a) the child rewrite the append only file in a temp file.
6918 * 2b) the parent accumulates differences in server.bgrewritebuf.
6919 * 3) When the child finished '2a' exists.
6920 * 4) The parent will trap the exit code, if it's OK, will append the
6921 * data accumulated into server.bgrewritebuf into the temp file, and
6922 * finally will rename(2) the temp file in the actual file name.
6923 * The the new file is reopened as the new append only file. Profit!
6925 static int rewriteAppendOnlyFileBackground(void) {
6928 if (server
.bgrewritechildpid
!= -1) return REDIS_ERR
;
6929 if (server
.vm_enabled
) waitEmptyIOJobsQueue();
6930 if ((childpid
= fork()) == 0) {
6934 if (server
.vm_enabled
) vmReopenSwapFile();
6936 snprintf(tmpfile
,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
6937 if (rewriteAppendOnlyFile(tmpfile
) == REDIS_OK
) {
6944 if (childpid
== -1) {
6945 redisLog(REDIS_WARNING
,
6946 "Can't rewrite append only file in background: fork: %s",
6950 redisLog(REDIS_NOTICE
,
6951 "Background append only file rewriting started by pid %d",childpid
);
6952 server
.bgrewritechildpid
= childpid
;
6953 /* We set appendseldb to -1 in order to force the next call to the
6954 * feedAppendOnlyFile() to issue a SELECT command, so the differences
6955 * accumulated by the parent into server.bgrewritebuf will start
6956 * with a SELECT statement and it will be safe to merge. */
6957 server
.appendseldb
= -1;
6960 return REDIS_OK
; /* unreached */
6963 static void bgrewriteaofCommand(redisClient
*c
) {
6964 if (server
.bgrewritechildpid
!= -1) {
6965 addReplySds(c
,sdsnew("-ERR background append only file rewriting already in progress\r\n"));
6968 if (rewriteAppendOnlyFileBackground() == REDIS_OK
) {
6969 char *status
= "+Background append only file rewriting started\r\n";
6970 addReplySds(c
,sdsnew(status
));
6972 addReply(c
,shared
.err
);
6976 static void aofRemoveTempFile(pid_t childpid
) {
6979 snprintf(tmpfile
,256,"temp-rewriteaof-bg-%d.aof", (int) childpid
);
6983 /* Virtual Memory is composed mainly of two subsystems:
6984 * - Blocking Virutal Memory
6985 * - Threaded Virtual Memory I/O
6986 * The two parts are not fully decoupled, but functions are split among two
6987 * different sections of the source code (delimited by comments) in order to
6988 * make more clear what functionality is about the blocking VM and what about
6989 * the threaded (not blocking) VM.
6993 * Redis VM is a blocking VM (one that blocks reading swapped values from
6994 * disk into memory when a value swapped out is needed in memory) that is made
6995 * unblocking by trying to examine the command argument vector in order to
6996 * load in background values that will likely be needed in order to exec
6997 * the command. The command is executed only once all the relevant keys
6998 * are loaded into memory.
7000 * This basically is almost as simple of a blocking VM, but almost as parallel
7001 * as a fully non-blocking VM.
7004 /* =================== Virtual Memory - Blocking Side ====================== */
7006 /* substitute the first occurrence of '%p' with the process pid in the
7007 * swap file name. */
7008 static void expandVmSwapFilename(void) {
7009 char *p
= strstr(server
.vm_swap_file
,"%p");
7015 new = sdscat(new,server
.vm_swap_file
);
7016 new = sdscatprintf(new,"%ld",(long) getpid());
7017 new = sdscat(new,p
+2);
7018 zfree(server
.vm_swap_file
);
7019 server
.vm_swap_file
= new;
7022 static void vmInit(void) {
7027 if (server
.vm_max_threads
!= 0)
7028 zmalloc_enable_thread_safeness(); /* we need thread safe zmalloc() */
7030 expandVmSwapFilename();
7031 redisLog(REDIS_NOTICE
,"Using '%s' as swap file",server
.vm_swap_file
);
7032 server
.vm_fp
= fopen(server
.vm_swap_file
,"r+b");
7033 if (server
.vm_fp
== NULL
) {
7034 redisLog(REDIS_WARNING
,"Impossible to open the swap file. Exiting.");
7037 server
.vm_fd
= fileno(server
.vm_fp
);
7038 server
.vm_next_page
= 0;
7039 server
.vm_near_pages
= 0;
7040 server
.vm_stats_used_pages
= 0;
7041 server
.vm_stats_swapped_objects
= 0;
7042 server
.vm_stats_swapouts
= 0;
7043 server
.vm_stats_swapins
= 0;
7044 totsize
= server
.vm_pages
*server
.vm_page_size
;
7045 redisLog(REDIS_NOTICE
,"Allocating %lld bytes of swap file",totsize
);
7046 if (ftruncate(server
.vm_fd
,totsize
) == -1) {
7047 redisLog(REDIS_WARNING
,"Can't ftruncate swap file: %s. Exiting.",
7051 redisLog(REDIS_NOTICE
,"Swap file allocated with success");
7053 server
.vm_bitmap
= zmalloc((server
.vm_pages
+7)/8);
7054 redisLog(REDIS_VERBOSE
,"Allocated %lld bytes page table for %lld pages",
7055 (long long) (server
.vm_pages
+7)/8, server
.vm_pages
);
7056 memset(server
.vm_bitmap
,0,(server
.vm_pages
+7)/8);
7058 /* Initialize threaded I/O (used by Virtual Memory) */
7059 server
.io_newjobs
= listCreate();
7060 server
.io_processing
= listCreate();
7061 server
.io_processed
= listCreate();
7062 server
.io_clients
= listCreate();
7063 pthread_mutex_init(&server
.io_mutex
,NULL
);
7064 pthread_mutex_init(&server
.obj_freelist_mutex
,NULL
);
7065 pthread_mutex_init(&server
.io_swapfile_mutex
,NULL
);
7066 server
.io_active_threads
= 0;
7067 if (pipe(pipefds
) == -1) {
7068 redisLog(REDIS_WARNING
,"Unable to intialized VM: pipe(2): %s. Exiting."
7072 server
.io_ready_pipe_read
= pipefds
[0];
7073 server
.io_ready_pipe_write
= pipefds
[1];
7074 redisAssert(anetNonBlock(NULL
,server
.io_ready_pipe_read
) != ANET_ERR
);
7075 /* LZF requires a lot of stack */
7076 pthread_attr_init(&server
.io_threads_attr
);
7077 pthread_attr_getstacksize(&server
.io_threads_attr
, &stacksize
);
7078 while (stacksize
< REDIS_THREAD_STACK_SIZE
) stacksize
*= 2;
7079 pthread_attr_setstacksize(&server
.io_threads_attr
, stacksize
);
7080 /* Listen for events in the threaded I/O pipe */
7081 if (aeCreateFileEvent(server
.el
, server
.io_ready_pipe_read
, AE_READABLE
,
7082 vmThreadedIOCompletedJob
, NULL
) == AE_ERR
)
7083 oom("creating file event");
7086 /* Mark the page as used */
7087 static void vmMarkPageUsed(off_t page
) {
7088 off_t byte
= page
/8;
7090 server
.vm_bitmap
[byte
] |= 1<<bit
;
7091 redisLog(REDIS_DEBUG
,"Mark used: %lld (byte:%lld bit:%d)\n",
7092 (long long)page
, (long long)byte
, bit
);
7095 /* Mark N contiguous pages as used, with 'page' being the first. */
7096 static void vmMarkPagesUsed(off_t page
, off_t count
) {
7099 for (j
= 0; j
< count
; j
++)
7100 vmMarkPageUsed(page
+j
);
7101 server
.vm_stats_used_pages
+= count
;
7104 /* Mark the page as free */
7105 static void vmMarkPageFree(off_t page
) {
7106 off_t byte
= page
/8;
7108 server
.vm_bitmap
[byte
] &= ~(1<<bit
);
7111 /* Mark N contiguous pages as free, with 'page' being the first. */
7112 static void vmMarkPagesFree(off_t page
, off_t count
) {
7115 for (j
= 0; j
< count
; j
++)
7116 vmMarkPageFree(page
+j
);
7117 server
.vm_stats_used_pages
-= count
;
7120 /* Test if the page is free */
7121 static int vmFreePage(off_t page
) {
7122 off_t byte
= page
/8;
7124 return (server
.vm_bitmap
[byte
] & (1<<bit
)) == 0;
7127 /* Find N contiguous free pages storing the first page of the cluster in *first.
7128 * Returns REDIS_OK if it was able to find N contiguous pages, otherwise
7129 * REDIS_ERR is returned.
7131 * This function uses a simple algorithm: we try to allocate
7132 * REDIS_VM_MAX_NEAR_PAGES sequentially, when we reach this limit we start
7133 * again from the start of the swap file searching for free spaces.
7135 * If it looks pretty clear that there are no free pages near our offset
7136 * we try to find less populated places doing a forward jump of
7137 * REDIS_VM_MAX_RANDOM_JUMP, then we start scanning again a few pages
7138 * without hurry, and then we jump again and so forth...
7140 * This function can be improved using a free list to avoid to guess
7141 * too much, since we could collect data about freed pages.
7143 * note: I implemented this function just after watching an episode of
7144 * Battlestar Galactica, where the hybrid was continuing to say "JUMP!"
7146 static int vmFindContiguousPages(off_t
*first
, off_t n
) {
7147 off_t base
, offset
= 0, since_jump
= 0, numfree
= 0;
7149 if (server
.vm_near_pages
== REDIS_VM_MAX_NEAR_PAGES
) {
7150 server
.vm_near_pages
= 0;
7151 server
.vm_next_page
= 0;
7153 server
.vm_near_pages
++; /* Yet another try for pages near to the old ones */
7154 base
= server
.vm_next_page
;
7156 while(offset
< server
.vm_pages
) {
7157 off_t
this = base
+offset
;
7159 redisLog(REDIS_DEBUG
, "THIS: %lld (%c)\n", (long long) this, vmFreePage(this) ? 'F' : 'X');
7160 /* If we overflow, restart from page zero */
7161 if (this >= server
.vm_pages
) {
7162 this -= server
.vm_pages
;
7164 /* Just overflowed, what we found on tail is no longer
7165 * interesting, as it's no longer contiguous. */
7169 if (vmFreePage(this)) {
7170 /* This is a free page */
7172 /* Already got N free pages? Return to the caller, with success */
7174 *first
= this-(n
-1);
7175 server
.vm_next_page
= this+1;
7179 /* The current one is not a free page */
7183 /* Fast-forward if the current page is not free and we already
7184 * searched enough near this place. */
7186 if (!numfree
&& since_jump
>= REDIS_VM_MAX_RANDOM_JUMP
/4) {
7187 offset
+= random() % REDIS_VM_MAX_RANDOM_JUMP
;
7189 /* Note that even if we rewind after the jump, we are don't need
7190 * to make sure numfree is set to zero as we only jump *if* it
7191 * is set to zero. */
7193 /* Otherwise just check the next page */
7200 /* Write the specified object at the specified page of the swap file */
7201 static int vmWriteObjectOnSwap(robj
*o
, off_t page
) {
7202 if (server
.vm_enabled
) pthread_mutex_lock(&server
.io_swapfile_mutex
);
7203 if (fseeko(server
.vm_fp
,page
*server
.vm_page_size
,SEEK_SET
) == -1) {
7204 if (server
.vm_enabled
) pthread_mutex_unlock(&server
.io_swapfile_mutex
);
7205 redisLog(REDIS_WARNING
,
7206 "Critical VM problem in vmSwapObjectBlocking(): can't seek: %s",
7210 rdbSaveObject(server
.vm_fp
,o
);
7211 if (server
.vm_enabled
) pthread_mutex_unlock(&server
.io_swapfile_mutex
);
7215 /* Swap the 'val' object relative to 'key' into disk. Store all the information
7216 * needed to later retrieve the object into the key object.
7217 * If we can't find enough contiguous empty pages to swap the object on disk
7218 * REDIS_ERR is returned. */
7219 static int vmSwapObjectBlocking(robj
*key
, robj
*val
) {
7220 off_t pages
= rdbSavedObjectPages(val
,NULL
);
7223 assert(key
->storage
== REDIS_VM_MEMORY
);
7224 assert(key
->refcount
== 1);
7225 if (vmFindContiguousPages(&page
,pages
) == REDIS_ERR
) return REDIS_ERR
;
7226 if (vmWriteObjectOnSwap(val
,page
) == REDIS_ERR
) return REDIS_ERR
;
7227 key
->vm
.page
= page
;
7228 key
->vm
.usedpages
= pages
;
7229 key
->storage
= REDIS_VM_SWAPPED
;
7230 key
->vtype
= val
->type
;
7231 decrRefCount(val
); /* Deallocate the object from memory. */
7232 vmMarkPagesUsed(page
,pages
);
7233 redisLog(REDIS_DEBUG
,"VM: object %s swapped out at %lld (%lld pages)",
7234 (unsigned char*) key
->ptr
,
7235 (unsigned long long) page
, (unsigned long long) pages
);
7236 server
.vm_stats_swapped_objects
++;
7237 server
.vm_stats_swapouts
++;
7238 fflush(server
.vm_fp
);
7242 static robj
*vmReadObjectFromSwap(off_t page
, int type
) {
7245 if (server
.vm_enabled
) pthread_mutex_lock(&server
.io_swapfile_mutex
);
7246 if (fseeko(server
.vm_fp
,page
*server
.vm_page_size
,SEEK_SET
) == -1) {
7247 redisLog(REDIS_WARNING
,
7248 "Unrecoverable VM problem in vmLoadObject(): can't seek: %s",
7252 o
= rdbLoadObject(type
,server
.vm_fp
);
7254 redisLog(REDIS_WARNING
, "Unrecoverable VM problem in vmLoadObject(): can't load object from swap file: %s", strerror(errno
));
7257 if (server
.vm_enabled
) pthread_mutex_unlock(&server
.io_swapfile_mutex
);
7261 /* Load the value object relative to the 'key' object from swap to memory.
7262 * The newly allocated object is returned.
7264 * If preview is true the unserialized object is returned to the caller but
7265 * no changes are made to the key object, nor the pages are marked as freed */
7266 static robj
*vmGenericLoadObject(robj
*key
, int preview
) {
7269 redisAssert(key
->storage
== REDIS_VM_SWAPPED
);
7270 val
= vmReadObjectFromSwap(key
->vm
.page
,key
->vtype
);
7272 key
->storage
= REDIS_VM_MEMORY
;
7273 key
->vm
.atime
= server
.unixtime
;
7274 vmMarkPagesFree(key
->vm
.page
,key
->vm
.usedpages
);
7275 redisLog(REDIS_DEBUG
, "VM: object %s loaded from disk",
7276 (unsigned char*) key
->ptr
);
7277 server
.vm_stats_swapped_objects
--;
7279 redisLog(REDIS_DEBUG
, "VM: object %s previewed from disk",
7280 (unsigned char*) key
->ptr
);
7282 server
.vm_stats_swapins
++;
7286 /* Plain object loading, from swap to memory */
7287 static robj
*vmLoadObject(robj
*key
) {
7288 /* If we are loading the object in background, stop it, we
7289 * need to load this object synchronously ASAP. */
7290 if (key
->storage
== REDIS_VM_LOADING
)
7291 vmCancelThreadedIOJob(key
);
7292 return vmGenericLoadObject(key
,0);
7295 /* Just load the value on disk, without to modify the key.
7296 * This is useful when we want to perform some operation on the value
7297 * without to really bring it from swap to memory, like while saving the
7298 * dataset or rewriting the append only log. */
7299 static robj
*vmPreviewObject(robj
*key
) {
7300 return vmGenericLoadObject(key
,1);
7303 /* How a good candidate is this object for swapping?
7304 * The better candidate it is, the greater the returned value.
7306 * Currently we try to perform a fast estimation of the object size in
7307 * memory, and combine it with aging informations.
7309 * Basically swappability = idle-time * log(estimated size)
7311 * Bigger objects are preferred over smaller objects, but not
7312 * proportionally, this is why we use the logarithm. This algorithm is
7313 * just a first try and will probably be tuned later. */
7314 static double computeObjectSwappability(robj
*o
) {
7315 time_t age
= server
.unixtime
- o
->vm
.atime
;
7319 struct dictEntry
*de
;
7322 if (age
<= 0) return 0;
7325 if (o
->encoding
!= REDIS_ENCODING_RAW
) {
7328 asize
= sdslen(o
->ptr
)+sizeof(*o
)+sizeof(long)*2;
7333 listNode
*ln
= listFirst(l
);
7335 asize
= sizeof(list
);
7337 robj
*ele
= ln
->value
;
7340 elesize
= (ele
->encoding
== REDIS_ENCODING_RAW
) ?
7341 (sizeof(*o
)+sdslen(ele
->ptr
)) :
7343 asize
+= (sizeof(listNode
)+elesize
)*listLength(l
);
7348 z
= (o
->type
== REDIS_ZSET
);
7349 d
= z
? ((zset
*)o
->ptr
)->dict
: o
->ptr
;
7351 asize
= sizeof(dict
)+(sizeof(struct dictEntry
*)*dictSlots(d
));
7352 if (z
) asize
+= sizeof(zset
)-sizeof(dict
);
7357 de
= dictGetRandomKey(d
);
7358 ele
= dictGetEntryKey(de
);
7359 elesize
= (ele
->encoding
== REDIS_ENCODING_RAW
) ?
7360 (sizeof(*o
)+sdslen(ele
->ptr
)) :
7362 asize
+= (sizeof(struct dictEntry
)+elesize
)*dictSize(d
);
7363 if (z
) asize
+= sizeof(zskiplistNode
)*dictSize(d
);
7367 return (double)asize
*log(1+asize
);
7370 /* Try to swap an object that's a good candidate for swapping.
7371 * Returns REDIS_OK if the object was swapped, REDIS_ERR if it's not possible
7372 * to swap any object at all.
7374 * If 'usethreaded' is true, Redis will try to swap the object in background
7375 * using I/O threads. */
7376 static int vmSwapOneObject(int usethreads
) {
7378 struct dictEntry
*best
= NULL
;
7379 double best_swappability
= 0;
7380 redisDb
*best_db
= NULL
;
7383 for (j
= 0; j
< server
.dbnum
; j
++) {
7384 redisDb
*db
= server
.db
+j
;
7385 int maxtries
= 1000;
7387 if (dictSize(db
->dict
) == 0) continue;
7388 for (i
= 0; i
< 5; i
++) {
7390 double swappability
;
7392 if (maxtries
) maxtries
--;
7393 de
= dictGetRandomKey(db
->dict
);
7394 key
= dictGetEntryKey(de
);
7395 val
= dictGetEntryVal(de
);
7396 /* Only swap objects that are currently in memory.
7398 * Also don't swap shared objects if threaded VM is on, as we
7399 * try to ensure that the main thread does not touch the
7400 * object while the I/O thread is using it, but we can't
7401 * control other keys without adding additional mutex. */
7402 if (key
->storage
!= REDIS_VM_MEMORY
||
7403 (server
.vm_max_threads
!= 0 && val
->refcount
!= 1)) {
7404 if (maxtries
) i
--; /* don't count this try */
7407 swappability
= computeObjectSwappability(val
);
7408 if (!best
|| swappability
> best_swappability
) {
7410 best_swappability
= swappability
;
7416 redisLog(REDIS_DEBUG
,"No swappable key found!");
7419 key
= dictGetEntryKey(best
);
7420 val
= dictGetEntryVal(best
);
7422 redisLog(REDIS_DEBUG
,"Key with best swappability: %s, %f",
7423 key
->ptr
, best_swappability
);
7425 /* Unshare the key if needed */
7426 if (key
->refcount
> 1) {
7427 robj
*newkey
= dupStringObject(key
);
7429 key
= dictGetEntryKey(best
) = newkey
;
7433 vmSwapObjectThreaded(key
,val
,best_db
);
7436 if (vmSwapObjectBlocking(key
,val
) == REDIS_OK
) {
7437 dictGetEntryVal(best
) = NULL
;
7445 static int vmSwapOneObjectBlocking() {
7446 return vmSwapOneObject(0);
7449 static int vmSwapOneObjectThreaded() {
7450 return vmSwapOneObject(1);
7453 /* Return true if it's safe to swap out objects in a given moment.
7454 * Basically we don't want to swap objects out while there is a BGSAVE
7455 * or a BGAEOREWRITE running in backgroud. */
7456 static int vmCanSwapOut(void) {
7457 return (server
.bgsavechildpid
== -1 && server
.bgrewritechildpid
== -1);
7460 /* Delete a key if swapped. Returns 1 if the key was found, was swapped
7461 * and was deleted. Otherwise 0 is returned. */
7462 static int deleteIfSwapped(redisDb
*db
, robj
*key
) {
7466 if ((de
= dictFind(db
->dict
,key
)) == NULL
) return 0;
7467 foundkey
= dictGetEntryKey(de
);
7468 if (foundkey
->storage
== REDIS_VM_MEMORY
) return 0;
7473 /* =================== Virtual Memory - Threaded I/O ======================= */
7475 static void freeIOJob(iojob
*j
) {
7476 if (j
->type
== REDIS_IOJOB_PREPARE_SWAP
||
7477 j
->type
== REDIS_IOJOB_DO_SWAP
)
7478 decrRefCount(j
->val
);
7479 decrRefCount(j
->key
);
7483 /* Every time a thread finished a Job, it writes a byte into the write side
7484 * of an unix pipe in order to "awake" the main thread, and this function
7486 static void vmThreadedIOCompletedJob(aeEventLoop
*el
, int fd
, void *privdata
,
7493 REDIS_NOTUSED(mask
);
7494 REDIS_NOTUSED(privdata
);
7496 /* For every byte we read in the read side of the pipe, there is one
7497 * I/O job completed to process. */
7498 while((retval
= read(fd
,buf
,1)) == 1) {
7502 struct dictEntry
*de
;
7504 redisLog(REDIS_DEBUG
,"Processing I/O completed job");
7506 /* Get the processed element (the oldest one) */
7508 assert(listLength(server
.io_processed
) != 0);
7509 ln
= listFirst(server
.io_processed
);
7511 listDelNode(server
.io_processed
,ln
);
7513 /* If this job is marked as canceled, just ignore it */
7518 /* Post process it in the main thread, as there are things we
7519 * can do just here to avoid race conditions and/or invasive locks */
7520 redisLog(REDIS_DEBUG
,"Job %p type: %d, key at %p (%s) refcount: %d\n", (void*) j
, j
->type
, (void*)j
->key
, (char*)j
->key
->ptr
, j
->key
->refcount
);
7521 de
= dictFind(j
->db
->dict
,j
->key
);
7523 key
= dictGetEntryKey(de
);
7524 if (j
->type
== REDIS_IOJOB_LOAD
) {
7525 /* Key loaded, bring it at home */
7526 key
->storage
= REDIS_VM_MEMORY
;
7527 key
->vm
.atime
= server
.unixtime
;
7528 vmMarkPagesFree(key
->vm
.page
,key
->vm
.usedpages
);
7529 redisLog(REDIS_DEBUG
, "VM: object %s loaded from disk (threaded)",
7530 (unsigned char*) key
->ptr
);
7531 server
.vm_stats_swapped_objects
--;
7532 server
.vm_stats_swapins
++;
7534 } else if (j
->type
== REDIS_IOJOB_PREPARE_SWAP
) {
7535 /* Now we know the amount of pages required to swap this object.
7536 * Let's find some space for it, and queue this task again
7537 * rebranded as REDIS_IOJOB_DO_SWAP. */
7538 if (!vmCanSwapOut() ||
7539 vmFindContiguousPages(&j
->page
,j
->pages
) == REDIS_ERR
)
7541 /* Ooops... no space or we can't swap as there is
7542 * a fork()ed Redis trying to save stuff on disk. */
7544 key
->storage
= REDIS_VM_MEMORY
; /* undo operation */
7546 /* Note that we need to mark this pages as used now,
7547 * if the job will be canceled, we'll mark them as freed
7549 vmMarkPagesUsed(j
->page
,j
->pages
);
7550 j
->type
= REDIS_IOJOB_DO_SWAP
;
7555 } else if (j
->type
== REDIS_IOJOB_DO_SWAP
) {
7558 /* Key swapped. We can finally free some memory. */
7559 if (key
->storage
!= REDIS_VM_SWAPPING
) {
7560 printf("key->storage: %d\n",key
->storage
);
7561 printf("key->name: %s\n",(char*)key
->ptr
);
7562 printf("key->refcount: %d\n",key
->refcount
);
7563 printf("val: %p\n",(void*)j
->val
);
7564 printf("val->type: %d\n",j
->val
->type
);
7565 printf("val->ptr: %s\n",(char*)j
->val
->ptr
);
7567 redisAssert(key
->storage
== REDIS_VM_SWAPPING
);
7568 val
= dictGetEntryVal(de
);
7569 key
->vm
.page
= j
->page
;
7570 key
->vm
.usedpages
= j
->pages
;
7571 key
->storage
= REDIS_VM_SWAPPED
;
7572 key
->vtype
= j
->val
->type
;
7573 decrRefCount(val
); /* Deallocate the object from memory. */
7574 dictGetEntryVal(de
) = NULL
;
7575 redisLog(REDIS_DEBUG
,
7576 "VM: object %s swapped out at %lld (%lld pages) (threaded)",
7577 (unsigned char*) key
->ptr
,
7578 (unsigned long long) j
->page
, (unsigned long long) j
->pages
);
7579 server
.vm_stats_swapped_objects
++;
7580 server
.vm_stats_swapouts
++;
7582 /* Put a few more swap requests in queue if we are still
7584 if (vmCanSwapOut() && zmalloc_used_memory() > server
.vm_max_memory
){
7588 more
= listLength(server
.io_newjobs
) <
7589 (unsigned) server
.vm_max_threads
;
7591 /* Don't waste CPU time if swappable objects are rare. */
7592 if (vmSwapOneObjectThreaded() == REDIS_ERR
) break;
7597 if (processed
== REDIS_MAX_COMPLETED_JOBS_PROCESSED
) return;
7599 if (retval
< 0 && errno
!= EAGAIN
) {
7600 redisLog(REDIS_WARNING
,
7601 "WARNING: read(2) error in vmThreadedIOCompletedJob() %s",
7606 static void lockThreadedIO(void) {
7607 pthread_mutex_lock(&server
.io_mutex
);
7610 static void unlockThreadedIO(void) {
7611 pthread_mutex_unlock(&server
.io_mutex
);
7614 /* Remove the specified object from the threaded I/O queue if still not
7615 * processed, otherwise make sure to flag it as canceled. */
7616 static void vmCancelThreadedIOJob(robj
*o
) {
7618 server
.io_newjobs
, /* 0 */
7619 server
.io_processing
, /* 1 */
7620 server
.io_processed
/* 2 */
7624 assert(o
->storage
== REDIS_VM_LOADING
|| o
->storage
== REDIS_VM_SWAPPING
);
7627 /* Search for a matching key in one of the queues */
7628 for (i
= 0; i
< 3; i
++) {
7632 listRewind(lists
[i
],&li
);
7633 while ((ln
= listNext(&li
)) != NULL
) {
7634 iojob
*job
= ln
->value
;
7636 if (job
->canceled
) continue; /* Skip this, already canceled. */
7637 if (compareStringObjects(job
->key
,o
) == 0) {
7638 redisLog(REDIS_DEBUG
,"*** CANCELED %p (%s) (LIST ID %d)\n",
7639 (void*)job
, (char*)o
->ptr
, i
);
7640 /* Mark the pages as free since the swap didn't happened
7641 * or happened but is now discarded. */
7642 if (job
->type
== REDIS_IOJOB_DO_SWAP
)
7643 vmMarkPagesFree(job
->page
,job
->pages
);
7644 /* Cancel the job. It depends on the list the job is
7647 case 0: /* io_newjobs */
7648 /* If the job was yet not processed the best thing to do
7649 * is to remove it from the queue at all */
7651 listDelNode(lists
[i
],ln
);
7653 case 1: /* io_processing */
7654 /* Oh Shi- the thread is messing with the Job, and
7655 * probably with the object if this is a
7656 * PREPARE_SWAP or DO_SWAP job. Better to wait for the
7657 * job to move into the next queue... */
7658 if (job
->type
!= REDIS_IOJOB_LOAD
) {
7659 /* Yes, we try again and again until the job
7662 /* But let's wait some time for the I/O thread
7663 * to finish with this job. After all this condition
7664 * should be very rare. */
7671 case 2: /* io_processed */
7672 /* The job was already processed, that's easy...
7673 * just mark it as canceled so that we'll ignore it
7674 * when processing completed jobs. */
7678 /* Finally we have to adjust the storage type of the object
7679 * in order to "UNDO" the operaiton. */
7680 if (o
->storage
== REDIS_VM_LOADING
)
7681 o
->storage
= REDIS_VM_SWAPPED
;
7682 else if (o
->storage
== REDIS_VM_SWAPPING
)
7683 o
->storage
= REDIS_VM_MEMORY
;
7690 assert(1 != 1); /* We should never reach this */
7693 static void *IOThreadEntryPoint(void *arg
) {
7698 pthread_detach(pthread_self());
7700 /* Get a new job to process */
7702 if (listLength(server
.io_newjobs
) == 0) {
7703 #ifdef REDIS_HELGRIND_FRIENDLY
7704 /* No new jobs? Wait and retry, because to be Helgrind
7705 * (valgrind --tool=helgrind) what's needed is to take
7706 * the same threads running instead to create/destroy threads
7707 * as needed (otherwise valgrind will fail) */
7709 usleep(1); /* Give some time for the I/O thread to work. */
7712 /* No new jobs in queue, exit. */
7713 redisLog(REDIS_DEBUG
,"Thread %lld exiting, nothing to do",
7714 (long long) pthread_self());
7715 server
.io_active_threads
--;
7719 ln
= listFirst(server
.io_newjobs
);
7721 listDelNode(server
.io_newjobs
,ln
);
7722 /* Add the job in the processing queue */
7723 j
->thread
= pthread_self();
7724 listAddNodeTail(server
.io_processing
,j
);
7725 ln
= listLast(server
.io_processing
); /* We use ln later to remove it */
7727 redisLog(REDIS_DEBUG
,"Thread %lld got a new job (type %d): %p about key '%s'",
7728 (long long) pthread_self(), j
->type
, (void*)j
, (char*)j
->key
->ptr
);
7730 /* Process the Job */
7731 if (j
->type
== REDIS_IOJOB_LOAD
) {
7732 } else if (j
->type
== REDIS_IOJOB_PREPARE_SWAP
) {
7733 FILE *fp
= fopen("/dev/null","w+");
7734 j
->pages
= rdbSavedObjectPages(j
->val
,fp
);
7736 } else if (j
->type
== REDIS_IOJOB_DO_SWAP
) {
7737 if (vmWriteObjectOnSwap(j
->val
,j
->page
) == REDIS_ERR
)
7741 /* Done: insert the job into the processed queue */
7742 redisLog(REDIS_DEBUG
,"Thread %lld completed the job: %p (key %s)",
7743 (long long) pthread_self(), (void*)j
, (char*)j
->key
->ptr
);
7745 listDelNode(server
.io_processing
,ln
);
7746 listAddNodeTail(server
.io_processed
,j
);
7749 /* Signal the main thread there is new stuff to process */
7750 assert(write(server
.io_ready_pipe_write
,"x",1) == 1);
7752 return NULL
; /* never reached */
7755 static void spawnIOThread(void) {
7758 pthread_create(&thread
,&server
.io_threads_attr
,IOThreadEntryPoint
,NULL
);
7759 server
.io_active_threads
++;
7762 /* We need to wait for the last thread to exit before we are able to
7763 * fork() in order to BGSAVE or BGREWRITEAOF. */
7764 static void waitEmptyIOJobsQueue(void) {
7767 if (listLength(server
.io_newjobs
) == 0 &&
7768 listLength(server
.io_processing
) == 0 &&
7769 server
.io_active_threads
== 0)
7775 usleep(10000); /* 10 milliseconds */
7779 static void vmReopenSwapFile(void) {
7780 fclose(server
.vm_fp
);
7781 server
.vm_fp
= fopen(server
.vm_swap_file
,"r+b");
7782 if (server
.vm_fp
== NULL
) {
7783 redisLog(REDIS_WARNING
,"Can't re-open the VM swap file: %s. Exiting.",
7784 server
.vm_swap_file
);
7787 server
.vm_fd
= fileno(server
.vm_fp
);
7790 /* This function must be called while with threaded IO locked */
7791 static void queueIOJob(iojob
*j
) {
7792 redisLog(REDIS_DEBUG
,"Queued IO Job %p type %d about key '%s'\n",
7793 (void*)j
, j
->type
, (char*)j
->key
->ptr
);
7794 listAddNodeTail(server
.io_newjobs
,j
);
7795 if (server
.io_active_threads
< server
.vm_max_threads
)
7799 static int vmSwapObjectThreaded(robj
*key
, robj
*val
, redisDb
*db
) {
7802 assert(key
->storage
== REDIS_VM_MEMORY
);
7803 assert(key
->refcount
== 1);
7805 j
= zmalloc(sizeof(*j
));
7806 j
->type
= REDIS_IOJOB_PREPARE_SWAP
;
7808 j
->key
= dupStringObject(key
);
7812 j
->thread
= (pthread_t
) -1;
7813 key
->storage
= REDIS_VM_SWAPPING
;
7821 /* ================================= Debugging ============================== */
7823 static void debugCommand(redisClient
*c
) {
7824 if (!strcasecmp(c
->argv
[1]->ptr
,"segfault")) {
7826 } else if (!strcasecmp(c
->argv
[1]->ptr
,"reload")) {
7827 if (rdbSave(server
.dbfilename
) != REDIS_OK
) {
7828 addReply(c
,shared
.err
);
7832 if (rdbLoad(server
.dbfilename
) != REDIS_OK
) {
7833 addReply(c
,shared
.err
);
7836 redisLog(REDIS_WARNING
,"DB reloaded by DEBUG RELOAD");
7837 addReply(c
,shared
.ok
);
7838 } else if (!strcasecmp(c
->argv
[1]->ptr
,"loadaof")) {
7840 if (loadAppendOnlyFile(server
.appendfilename
) != REDIS_OK
) {
7841 addReply(c
,shared
.err
);
7844 redisLog(REDIS_WARNING
,"Append Only File loaded by DEBUG LOADAOF");
7845 addReply(c
,shared
.ok
);
7846 } else if (!strcasecmp(c
->argv
[1]->ptr
,"object") && c
->argc
== 3) {
7847 dictEntry
*de
= dictFind(c
->db
->dict
,c
->argv
[2]);
7851 addReply(c
,shared
.nokeyerr
);
7854 key
= dictGetEntryKey(de
);
7855 val
= dictGetEntryVal(de
);
7856 if (server
.vm_enabled
&& (key
->storage
== REDIS_VM_MEMORY
||
7857 key
->storage
== REDIS_VM_SWAPPING
)) {
7858 addReplySds(c
,sdscatprintf(sdsempty(),
7859 "+Key at:%p refcount:%d, value at:%p refcount:%d "
7860 "encoding:%d serializedlength:%lld\r\n",
7861 (void*)key
, key
->refcount
, (void*)val
, val
->refcount
,
7862 val
->encoding
, rdbSavedObjectLen(val
,NULL
)));
7864 addReplySds(c
,sdscatprintf(sdsempty(),
7865 "+Key at:%p refcount:%d, value swapped at: page %llu "
7866 "using %llu pages\r\n",
7867 (void*)key
, key
->refcount
, (unsigned long long) key
->vm
.page
,
7868 (unsigned long long) key
->vm
.usedpages
));
7870 } else if (!strcasecmp(c
->argv
[1]->ptr
,"swapout") && c
->argc
== 3) {
7871 dictEntry
*de
= dictFind(c
->db
->dict
,c
->argv
[2]);
7874 if (!server
.vm_enabled
) {
7875 addReplySds(c
,sdsnew("-ERR Virtual Memory is disabled\r\n"));
7879 addReply(c
,shared
.nokeyerr
);
7882 key
= dictGetEntryKey(de
);
7883 val
= dictGetEntryVal(de
);
7884 /* If the key is shared we want to create a copy */
7885 if (key
->refcount
> 1) {
7886 robj
*newkey
= dupStringObject(key
);
7888 key
= dictGetEntryKey(de
) = newkey
;
7891 if (key
->storage
!= REDIS_VM_MEMORY
) {
7892 addReplySds(c
,sdsnew("-ERR This key is not in memory\r\n"));
7893 } else if (vmSwapObjectBlocking(key
,val
) == REDIS_OK
) {
7894 dictGetEntryVal(de
) = NULL
;
7895 addReply(c
,shared
.ok
);
7897 addReply(c
,shared
.err
);
7900 addReplySds(c
,sdsnew(
7901 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>|SWAPOUT <key>|RELOAD]\r\n"));
7905 static void _redisAssert(char *estr
, char *file
, int line
) {
7906 redisLog(REDIS_WARNING
,"=== ASSERTION FAILED ===");
7907 redisLog(REDIS_WARNING
,"==> %s:%d '%s' is not true\n",file
,line
,estr
);
7908 #ifdef HAVE_BACKTRACE
7909 redisLog(REDIS_WARNING
,"(forcing SIGSEGV in order to print the stack trace)");
7914 /* =================================== Main! ================================ */
7917 int linuxOvercommitMemoryValue(void) {
7918 FILE *fp
= fopen("/proc/sys/vm/overcommit_memory","r");
7922 if (fgets(buf
,64,fp
) == NULL
) {
7931 void linuxOvercommitMemoryWarning(void) {
7932 if (linuxOvercommitMemoryValue() == 0) {
7933 redisLog(REDIS_WARNING
,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
7936 #endif /* __linux__ */
7938 static void daemonize(void) {
7942 if (fork() != 0) exit(0); /* parent exits */
7943 setsid(); /* create a new session */
7945 /* Every output goes to /dev/null. If Redis is daemonized but
7946 * the 'logfile' is set to 'stdout' in the configuration file
7947 * it will not log at all. */
7948 if ((fd
= open("/dev/null", O_RDWR
, 0)) != -1) {
7949 dup2(fd
, STDIN_FILENO
);
7950 dup2(fd
, STDOUT_FILENO
);
7951 dup2(fd
, STDERR_FILENO
);
7952 if (fd
> STDERR_FILENO
) close(fd
);
7954 /* Try to write the pid file */
7955 fp
= fopen(server
.pidfile
,"w");
7957 fprintf(fp
,"%d\n",getpid());
7962 int main(int argc
, char **argv
) {
7965 resetServerSaveParams();
7966 loadServerConfig(argv
[1]);
7967 } else if (argc
> 2) {
7968 fprintf(stderr
,"Usage: ./redis-server [/path/to/redis.conf]\n");
7971 redisLog(REDIS_WARNING
,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
7973 if (server
.daemonize
) daemonize();
7975 redisLog(REDIS_NOTICE
,"Server started, Redis version " REDIS_VERSION
);
7977 linuxOvercommitMemoryWarning();
7979 if (server
.appendonly
) {
7980 if (loadAppendOnlyFile(server
.appendfilename
) == REDIS_OK
)
7981 redisLog(REDIS_NOTICE
,"DB loaded from append only file");
7983 if (rdbLoad(server
.dbfilename
) == REDIS_OK
)
7984 redisLog(REDIS_NOTICE
,"DB loaded from disk");
7986 redisLog(REDIS_NOTICE
,"The server is now ready to accept connections on port %d", server
.port
);
7988 aeDeleteEventLoop(server
.el
);
7992 /* ============================= Backtrace support ========================= */
7994 #ifdef HAVE_BACKTRACE
7995 static char *findFuncName(void *pointer
, unsigned long *offset
);
7997 static void *getMcontextEip(ucontext_t
*uc
) {
7998 #if defined(__FreeBSD__)
7999 return (void*) uc
->uc_mcontext
.mc_eip
;
8000 #elif defined(__dietlibc__)
8001 return (void*) uc
->uc_mcontext
.eip
;
8002 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
8004 return (void*) uc
->uc_mcontext
->__ss
.__rip
;
8006 return (void*) uc
->uc_mcontext
->__ss
.__eip
;
8008 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
8009 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
8010 return (void*) uc
->uc_mcontext
->__ss
.__rip
;
8012 return (void*) uc
->uc_mcontext
->__ss
.__eip
;
8014 #elif defined(__i386__) || defined(__X86_64__) || defined(__x86_64__)
8015 return (void*) uc
->uc_mcontext
.gregs
[REG_EIP
]; /* Linux 32/64 bit */
8016 #elif defined(__ia64__) /* Linux IA64 */
8017 return (void*) uc
->uc_mcontext
.sc_ip
;
8023 static void segvHandler(int sig
, siginfo_t
*info
, void *secret
) {
8025 char **messages
= NULL
;
8026 int i
, trace_size
= 0;
8027 unsigned long offset
=0;
8028 ucontext_t
*uc
= (ucontext_t
*) secret
;
8030 REDIS_NOTUSED(info
);
8032 redisLog(REDIS_WARNING
,
8033 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION
, sig
);
8034 infostring
= genRedisInfoString();
8035 redisLog(REDIS_WARNING
, "%s",infostring
);
8036 /* It's not safe to sdsfree() the returned string under memory
8037 * corruption conditions. Let it leak as we are going to abort */
8039 trace_size
= backtrace(trace
, 100);
8040 /* overwrite sigaction with caller's address */
8041 if (getMcontextEip(uc
) != NULL
) {
8042 trace
[1] = getMcontextEip(uc
);
8044 messages
= backtrace_symbols(trace
, trace_size
);
8046 for (i
=1; i
<trace_size
; ++i
) {
8047 char *fn
= findFuncName(trace
[i
], &offset
), *p
;
8049 p
= strchr(messages
[i
],'+');
8050 if (!fn
|| (p
&& ((unsigned long)strtol(p
+1,NULL
,10)) < offset
)) {
8051 redisLog(REDIS_WARNING
,"%s", messages
[i
]);
8053 redisLog(REDIS_WARNING
,"%d redis-server %p %s + %d", i
, trace
[i
], fn
, (unsigned int)offset
);
8056 /* free(messages); Don't call free() with possibly corrupted memory. */
8060 static void setupSigSegvAction(void) {
8061 struct sigaction act
;
8063 sigemptyset (&act
.sa_mask
);
8064 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
8065 * is used. Otherwise, sa_handler is used */
8066 act
.sa_flags
= SA_NODEFER
| SA_ONSTACK
| SA_RESETHAND
| SA_SIGINFO
;
8067 act
.sa_sigaction
= segvHandler
;
8068 sigaction (SIGSEGV
, &act
, NULL
);
8069 sigaction (SIGBUS
, &act
, NULL
);
8070 sigaction (SIGFPE
, &act
, NULL
);
8071 sigaction (SIGILL
, &act
, NULL
);
8072 sigaction (SIGBUS
, &act
, NULL
);
8076 #include "staticsymbols.h"
8077 /* This function try to convert a pointer into a function name. It's used in
8078 * oreder to provide a backtrace under segmentation fault that's able to
8079 * display functions declared as static (otherwise the backtrace is useless). */
8080 static char *findFuncName(void *pointer
, unsigned long *offset
){
8082 unsigned long off
, minoff
= 0;
8084 /* Try to match against the Symbol with the smallest offset */
8085 for (i
=0; symsTable
[i
].pointer
; i
++) {
8086 unsigned long lp
= (unsigned long) pointer
;
8088 if (lp
!= (unsigned long)-1 && lp
>= symsTable
[i
].pointer
) {
8089 off
=lp
-symsTable
[i
].pointer
;
8090 if (ret
< 0 || off
< minoff
) {
8096 if (ret
== -1) return NULL
;
8098 return symsTable
[ret
].name
;
8100 #else /* HAVE_BACKTRACE */
8101 static void setupSigSegvAction(void) {
8103 #endif /* HAVE_BACKTRACE */