2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
30 #define REDIS_VERSION "1.3.2"
40 #define __USE_POSIX199309
46 #endif /* HAVE_BACKTRACE */
54 #include <arpa/inet.h>
58 #include <sys/resource.h>
65 #include "solarisfixes.h"
69 #include "ae.h" /* Event driven programming library */
70 #include "sds.h" /* Dynamic safe strings */
71 #include "anet.h" /* Networking the easy way */
72 #include "dict.h" /* Hash tables */
73 #include "adlist.h" /* Linked lists */
74 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
75 #include "lzf.h" /* LZF compression library */
76 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
82 /* Static server configuration */
83 #define REDIS_SERVERPORT 6379 /* TCP port */
84 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
85 #define REDIS_IOBUF_LEN 1024
86 #define REDIS_LOADBUF_LEN 1024
87 #define REDIS_STATIC_ARGS 4
88 #define REDIS_DEFAULT_DBNUM 16
89 #define REDIS_CONFIGLINE_MAX 1024
90 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
91 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
92 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
93 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
94 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
96 /* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */
97 #define REDIS_WRITEV_THRESHOLD 3
98 /* Max number of iovecs used for each writev call */
99 #define REDIS_WRITEV_IOVEC_COUNT 256
101 /* Hash table parameters */
102 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
105 #define REDIS_CMD_BULK 1 /* Bulk write command */
106 #define REDIS_CMD_INLINE 2 /* Inline command */
107 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
108 this flags will return an error when the 'maxmemory' option is set in the
109 config file and the server is using more than maxmemory bytes of memory.
110 In short this commands are denied on low memory conditions. */
111 #define REDIS_CMD_DENYOOM 4
114 #define REDIS_STRING 0
120 /* Objects encoding */
121 #define REDIS_ENCODING_RAW 0 /* Raw representation */
122 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
124 /* Object types only used for dumping to disk */
125 #define REDIS_EXPIRETIME 253
126 #define REDIS_SELECTDB 254
127 #define REDIS_EOF 255
129 /* Defines related to the dump file format. To store 32 bits lengths for short
130 * keys requires a lot of space, so we check the most significant 2 bits of
131 * the first byte to interpreter the length:
133 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
134 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
135 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
136 * 11|000000 this means: specially encoded object will follow. The six bits
137 * number specify the kind of object that follows.
138 * See the REDIS_RDB_ENC_* defines.
140 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
141 * values, will fit inside. */
142 #define REDIS_RDB_6BITLEN 0
143 #define REDIS_RDB_14BITLEN 1
144 #define REDIS_RDB_32BITLEN 2
145 #define REDIS_RDB_ENCVAL 3
146 #define REDIS_RDB_LENERR UINT_MAX
148 /* When a length of a string object stored on disk has the first two bits
149 * set, the remaining two bits specify a special encoding for the object
150 * accordingly to the following defines: */
151 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
152 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
153 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
154 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
156 /* Virtual memory object->where field. */
157 #define REDIS_VM_MEMORY 0 /* The object is on memory */
158 #define REDIS_VM_SWAPPED 1 /* The object is on disk */
159 #define REDIS_VM_SWAPPING 2 /* Redis is swapping this object on disk */
160 #define REDIS_VM_LOADING 3 /* Redis is loading this object from disk */
162 /* Virtual memory static configuration stuff.
163 * Check vmFindContiguousPages() to know more about this magic numbers. */
164 #define REDIS_VM_MAX_NEAR_PAGES 65536
165 #define REDIS_VM_MAX_RANDOM_JUMP 4096
166 #define REDIS_VM_MAX_THREADS 32
169 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
170 #define REDIS_SLAVE 2 /* This client is a slave server */
171 #define REDIS_MASTER 4 /* This client is a master server */
172 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
173 #define REDIS_MULTI 16 /* This client is in a MULTI context */
174 #define REDIS_BLOCKED 32 /* The client is waiting in a blocking operation */
175 #define REDIS_IO_WAIT 64 /* The client is waiting for Virtual Memory I/O */
177 /* Slave replication state - slave side */
178 #define REDIS_REPL_NONE 0 /* No active replication */
179 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
180 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
182 /* Slave replication state - from the point of view of master
183 * Note that in SEND_BULK and ONLINE state the slave receives new updates
184 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
185 * to start the next background saving in order to send updates to it. */
186 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
187 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
188 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
189 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
191 /* List related stuff */
195 /* Sort operations */
196 #define REDIS_SORT_GET 0
197 #define REDIS_SORT_ASC 1
198 #define REDIS_SORT_DESC 2
199 #define REDIS_SORTKEY_MAX 1024
202 #define REDIS_DEBUG 0
203 #define REDIS_VERBOSE 1
204 #define REDIS_NOTICE 2
205 #define REDIS_WARNING 3
207 /* Anti-warning macro... */
208 #define REDIS_NOTUSED(V) ((void) V)
210 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
211 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
213 /* Append only defines */
214 #define APPENDFSYNC_NO 0
215 #define APPENDFSYNC_ALWAYS 1
216 #define APPENDFSYNC_EVERYSEC 2
218 /* We can print the stacktrace, so our assert is defined this way: */
219 #define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e,__FILE__,__LINE__),exit(1)))
220 static void _redisAssert(char *estr
, char *file
, int line
);
222 /*================================= Data types ============================== */
224 /* A redis object, that is a type able to hold a string / list / set */
226 /* The VM object structure */
227 struct redisObjectVM
{
228 off_t page
; /* the page at witch the object is stored on disk */
229 off_t usedpages
; /* number of pages used on disk */
230 time_t atime
; /* Last access time */
233 /* The actual Redis Object */
234 typedef struct redisObject
{
237 unsigned char encoding
;
238 unsigned char storage
; /* If this object is a key, where is the value?
239 * REDIS_VM_MEMORY, REDIS_VM_SWAPPED, ... */
240 unsigned char vtype
; /* If this object is a key, and value is swapped out,
241 * this is the type of the swapped out object. */
243 /* VM fields, this are only allocated if VM is active, otherwise the
244 * object allocation function will just allocate
245 * sizeof(redisObjct) minus sizeof(redisObjectVM), so using
246 * Redis without VM active will not have any overhead. */
247 struct redisObjectVM vm
;
250 /* Macro used to initalize a Redis object allocated on the stack.
251 * Note that this macro is taken near the structure definition to make sure
252 * we'll update it when the structure is changed, to avoid bugs like
253 * bug #85 introduced exactly in this way. */
254 #define initStaticStringObject(_var,_ptr) do { \
256 _var.type = REDIS_STRING; \
257 _var.encoding = REDIS_ENCODING_RAW; \
259 if (server.vm_enabled) _var.storage = REDIS_VM_MEMORY; \
262 typedef struct redisDb
{
263 dict
*dict
; /* The keyspace for this DB */
264 dict
*expires
; /* Timeout of keys with a timeout set */
265 dict
*blockingkeys
; /* Keys with clients waiting for data (BLPOP) */
269 /* Client MULTI/EXEC state */
270 typedef struct multiCmd
{
273 struct redisCommand
*cmd
;
276 typedef struct multiState
{
277 multiCmd
*commands
; /* Array of MULTI commands */
278 int count
; /* Total number of MULTI commands */
281 /* With multiplexing we need to take per-clinet state.
282 * Clients are taken in a liked list. */
283 typedef struct redisClient
{
288 robj
**argv
, **mbargv
;
290 int bulklen
; /* bulk read len. -1 if not in bulk read mode */
291 int multibulk
; /* multi bulk command format active */
294 time_t lastinteraction
; /* time of the last interaction, used for timeout */
295 int flags
; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
297 int slaveseldb
; /* slave selected db, if this client is a slave */
298 int authenticated
; /* when requirepass is non-NULL */
299 int replstate
; /* replication state if this is a slave */
300 int repldbfd
; /* replication DB file descriptor */
301 long repldboff
; /* replication DB file offset */
302 off_t repldbsize
; /* replication DB file size */
303 multiState mstate
; /* MULTI/EXEC state */
304 robj
**blockingkeys
; /* The key we waiting to terminate a blocking
305 * operation such as BLPOP. Otherwise NULL. */
306 int blockingkeysnum
; /* Number of blocking keys */
307 time_t blockingto
; /* Blocking operation timeout. If UNIX current time
308 * is >= blockingto then the operation timed out. */
309 list
*io_keys
; /* Keys this client is waiting to be loaded from the
310 * swap file in order to continue. */
318 /* Global server state structure */
323 dict
*sharingpool
; /* Poll used for object sharing */
324 unsigned int sharingpoolsize
;
325 long long dirty
; /* changes to DB from the last save */
327 list
*slaves
, *monitors
;
328 char neterr
[ANET_ERR_LEN
];
330 int cronloops
; /* number of times the cron function run */
331 list
*objfreelist
; /* A list of freed objects to avoid malloc() */
332 time_t lastsave
; /* Unix time of last save succeeede */
333 size_t usedmemory
; /* Used memory in megabytes */
334 /* Fields used only for stats */
335 time_t stat_starttime
; /* server start time */
336 long long stat_numcommands
; /* number of processed commands */
337 long long stat_numconnections
; /* number of connections received */
350 pid_t bgsavechildpid
;
351 pid_t bgrewritechildpid
;
352 sds bgrewritebuf
; /* buffer taken by parent during oppend only rewrite */
353 struct saveparam
*saveparams
;
358 char *appendfilename
;
362 /* Replication related */
367 redisClient
*master
; /* client that is master for this slave */
369 unsigned int maxclients
;
370 unsigned long long maxmemory
;
371 unsigned int blockedclients
;
372 /* Sort parameters - qsort_r() is only available under BSD so we
373 * have to take this state global, in order to pass it to sortCompare() */
377 /* Virtual memory configuration */
381 unsigned long long vm_max_memory
;
382 /* Virtual memory state */
385 off_t vm_next_page
; /* Next probably empty page */
386 off_t vm_near_pages
; /* Number of pages allocated sequentially */
387 unsigned char *vm_bitmap
; /* Bitmap of free/used pages */
388 time_t unixtime
; /* Unix time sampled every second. */
389 /* Virtual memory I/O threads stuff */
390 /* An I/O thread process an element taken from the io_jobs queue and
391 * put the result of the operation in the io_done list. While the
392 * job is being processed, it's put on io_processing queue. */
393 list
*io_newjobs
; /* List of VM I/O jobs yet to be processed */
394 list
*io_processing
; /* List of VM I/O jobs being processed */
395 list
*io_processed
; /* List of VM I/O jobs already processed */
396 list
*io_clients
; /* All the clients waiting for SWAP I/O operations */
397 pthread_mutex_t io_mutex
; /* lock to access io_jobs/io_done/io_thread_job */
398 pthread_mutex_t obj_freelist_mutex
; /* safe redis objects creation/free */
399 pthread_mutex_t io_swapfile_mutex
; /* So we can lseek + write */
400 int io_active_threads
; /* Number of running I/O threads */
401 int vm_max_threads
; /* Max number of I/O threads running at the same time */
402 /* Our main thread is blocked on the event loop, locking for sockets ready
403 * to be read or written, so when a threaded I/O operation is ready to be
404 * processed by the main thread, the I/O thread will use a unix pipe to
405 * awake the main thread. The followings are the two pipe FDs. */
406 int io_ready_pipe_read
;
407 int io_ready_pipe_write
;
408 /* Virtual memory stats */
409 unsigned long long vm_stats_used_pages
;
410 unsigned long long vm_stats_swapped_objects
;
411 unsigned long long vm_stats_swapouts
;
412 unsigned long long vm_stats_swapins
;
416 typedef void redisCommandProc(redisClient
*c
);
417 struct redisCommand
{
419 redisCommandProc
*proc
;
424 struct redisFunctionSym
{
426 unsigned long pointer
;
429 typedef struct _redisSortObject
{
437 typedef struct _redisSortOperation
{
440 } redisSortOperation
;
442 /* ZSETs use a specialized version of Skiplists */
444 typedef struct zskiplistNode
{
445 struct zskiplistNode
**forward
;
446 struct zskiplistNode
*backward
;
451 typedef struct zskiplist
{
452 struct zskiplistNode
*header
, *tail
;
453 unsigned long length
;
457 typedef struct zset
{
462 /* Our shared "common" objects */
464 struct sharedObjectsStruct
{
465 robj
*crlf
, *ok
, *err
, *emptybulk
, *czero
, *cone
, *pong
, *space
,
466 *colon
, *nullbulk
, *nullmultibulk
, *queued
,
467 *emptymultibulk
, *wrongtypeerr
, *nokeyerr
, *syntaxerr
, *sameobjecterr
,
468 *outofrangeerr
, *plus
,
469 *select0
, *select1
, *select2
, *select3
, *select4
,
470 *select5
, *select6
, *select7
, *select8
, *select9
;
473 /* Global vars that are actally used as constants. The following double
474 * values are used for double on-disk serialization, and are initialized
475 * at runtime to avoid strange compiler optimizations. */
477 static double R_Zero
, R_PosInf
, R_NegInf
, R_Nan
;
479 /* VM threaded I/O request message */
480 #define REDIS_IOJOB_LOAD 0 /* Load from disk to memory */
481 #define REDIS_IOJOB_PREPARE_SWAP 1 /* Compute needed pages */
482 #define REDIS_IOJOB_DO_SWAP 2 /* Swap from memory to disk */
483 typedef struct iojon
{
484 int type
; /* Request type, REDIS_IOJOB_* */
485 redisDb
*db
;/* Redis database */
486 robj
*key
; /* This I/O request is about swapping this key */
487 robj
*val
; /* the value to swap for REDIS_IOREQ_*_SWAP, otherwise this
488 * field is populated by the I/O thread for REDIS_IOREQ_LOAD. */
489 off_t page
; /* Swap page where to read/write the object */
490 off_t pages
; /* Swap pages needed to safe object. PREPARE_SWAP return val */
491 int canceled
; /* True if this command was canceled by blocking side of VM */
492 pthread_t thread
; /* ID of the thread processing this entry */
495 /*================================ Prototypes =============================== */
497 static void freeStringObject(robj
*o
);
498 static void freeListObject(robj
*o
);
499 static void freeSetObject(robj
*o
);
500 static void decrRefCount(void *o
);
501 static robj
*createObject(int type
, void *ptr
);
502 static void freeClient(redisClient
*c
);
503 static int rdbLoad(char *filename
);
504 static void addReply(redisClient
*c
, robj
*obj
);
505 static void addReplySds(redisClient
*c
, sds s
);
506 static void incrRefCount(robj
*o
);
507 static int rdbSaveBackground(char *filename
);
508 static robj
*createStringObject(char *ptr
, size_t len
);
509 static robj
*dupStringObject(robj
*o
);
510 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
);
511 static void feedAppendOnlyFile(struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
);
512 static int syncWithMaster(void);
513 static robj
*tryObjectSharing(robj
*o
);
514 static int tryObjectEncoding(robj
*o
);
515 static robj
*getDecodedObject(robj
*o
);
516 static int removeExpire(redisDb
*db
, robj
*key
);
517 static int expireIfNeeded(redisDb
*db
, robj
*key
);
518 static int deleteIfVolatile(redisDb
*db
, robj
*key
);
519 static int deleteIfSwapped(redisDb
*db
, robj
*key
);
520 static int deleteKey(redisDb
*db
, robj
*key
);
521 static time_t getExpire(redisDb
*db
, robj
*key
);
522 static int setExpire(redisDb
*db
, robj
*key
, time_t when
);
523 static void updateSlavesWaitingBgsave(int bgsaveerr
);
524 static void freeMemoryIfNeeded(void);
525 static int processCommand(redisClient
*c
);
526 static void setupSigSegvAction(void);
527 static void rdbRemoveTempFile(pid_t childpid
);
528 static void aofRemoveTempFile(pid_t childpid
);
529 static size_t stringObjectLen(robj
*o
);
530 static void processInputBuffer(redisClient
*c
);
531 static zskiplist
*zslCreate(void);
532 static void zslFree(zskiplist
*zsl
);
533 static void zslInsert(zskiplist
*zsl
, double score
, robj
*obj
);
534 static void sendReplyToClientWritev(aeEventLoop
*el
, int fd
, void *privdata
, int mask
);
535 static void initClientMultiState(redisClient
*c
);
536 static void freeClientMultiState(redisClient
*c
);
537 static void queueMultiCommand(redisClient
*c
, struct redisCommand
*cmd
);
538 static void unblockClient(redisClient
*c
);
539 static int handleClientsWaitingListPush(redisClient
*c
, robj
*key
, robj
*ele
);
540 static void vmInit(void);
541 static void vmMarkPagesFree(off_t page
, off_t count
);
542 static robj
*vmLoadObject(robj
*key
);
543 static robj
*vmPreviewObject(robj
*key
);
544 static int vmSwapOneObjectBlocking(void);
545 static int vmSwapOneObjectThreaded(void);
546 static int vmCanSwapOut(void);
547 static int tryFreeOneObjectFromFreelist(void);
548 static void acceptHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
);
549 static void vmThreadedIOCompletedJob(aeEventLoop
*el
, int fd
, void *privdata
, int mask
);
550 static void vmCancelThreadedIOJob(robj
*o
);
551 static void lockThreadedIO(void);
552 static void unlockThreadedIO(void);
553 static int vmSwapObjectThreaded(robj
*key
, robj
*val
, redisDb
*db
);
554 static void freeIOJob(iojob
*j
);
555 static void queueIOJob(iojob
*j
);
556 static int vmWriteObjectOnSwap(robj
*o
, off_t page
);
557 static robj
*vmReadObjectFromSwap(off_t page
, int type
);
559 static void authCommand(redisClient
*c
);
560 static void pingCommand(redisClient
*c
);
561 static void echoCommand(redisClient
*c
);
562 static void setCommand(redisClient
*c
);
563 static void setnxCommand(redisClient
*c
);
564 static void getCommand(redisClient
*c
);
565 static void delCommand(redisClient
*c
);
566 static void existsCommand(redisClient
*c
);
567 static void incrCommand(redisClient
*c
);
568 static void decrCommand(redisClient
*c
);
569 static void incrbyCommand(redisClient
*c
);
570 static void decrbyCommand(redisClient
*c
);
571 static void selectCommand(redisClient
*c
);
572 static void randomkeyCommand(redisClient
*c
);
573 static void keysCommand(redisClient
*c
);
574 static void dbsizeCommand(redisClient
*c
);
575 static void lastsaveCommand(redisClient
*c
);
576 static void saveCommand(redisClient
*c
);
577 static void bgsaveCommand(redisClient
*c
);
578 static void bgrewriteaofCommand(redisClient
*c
);
579 static void shutdownCommand(redisClient
*c
);
580 static void moveCommand(redisClient
*c
);
581 static void renameCommand(redisClient
*c
);
582 static void renamenxCommand(redisClient
*c
);
583 static void lpushCommand(redisClient
*c
);
584 static void rpushCommand(redisClient
*c
);
585 static void lpopCommand(redisClient
*c
);
586 static void rpopCommand(redisClient
*c
);
587 static void llenCommand(redisClient
*c
);
588 static void lindexCommand(redisClient
*c
);
589 static void lrangeCommand(redisClient
*c
);
590 static void ltrimCommand(redisClient
*c
);
591 static void typeCommand(redisClient
*c
);
592 static void lsetCommand(redisClient
*c
);
593 static void saddCommand(redisClient
*c
);
594 static void sremCommand(redisClient
*c
);
595 static void smoveCommand(redisClient
*c
);
596 static void sismemberCommand(redisClient
*c
);
597 static void scardCommand(redisClient
*c
);
598 static void spopCommand(redisClient
*c
);
599 static void srandmemberCommand(redisClient
*c
);
600 static void sinterCommand(redisClient
*c
);
601 static void sinterstoreCommand(redisClient
*c
);
602 static void sunionCommand(redisClient
*c
);
603 static void sunionstoreCommand(redisClient
*c
);
604 static void sdiffCommand(redisClient
*c
);
605 static void sdiffstoreCommand(redisClient
*c
);
606 static void syncCommand(redisClient
*c
);
607 static void flushdbCommand(redisClient
*c
);
608 static void flushallCommand(redisClient
*c
);
609 static void sortCommand(redisClient
*c
);
610 static void lremCommand(redisClient
*c
);
611 static void rpoplpushcommand(redisClient
*c
);
612 static void infoCommand(redisClient
*c
);
613 static void mgetCommand(redisClient
*c
);
614 static void monitorCommand(redisClient
*c
);
615 static void expireCommand(redisClient
*c
);
616 static void expireatCommand(redisClient
*c
);
617 static void getsetCommand(redisClient
*c
);
618 static void ttlCommand(redisClient
*c
);
619 static void slaveofCommand(redisClient
*c
);
620 static void debugCommand(redisClient
*c
);
621 static void msetCommand(redisClient
*c
);
622 static void msetnxCommand(redisClient
*c
);
623 static void zaddCommand(redisClient
*c
);
624 static void zincrbyCommand(redisClient
*c
);
625 static void zrangeCommand(redisClient
*c
);
626 static void zrangebyscoreCommand(redisClient
*c
);
627 static void zrevrangeCommand(redisClient
*c
);
628 static void zcardCommand(redisClient
*c
);
629 static void zremCommand(redisClient
*c
);
630 static void zscoreCommand(redisClient
*c
);
631 static void zremrangebyscoreCommand(redisClient
*c
);
632 static void multiCommand(redisClient
*c
);
633 static void execCommand(redisClient
*c
);
634 static void blpopCommand(redisClient
*c
);
635 static void brpopCommand(redisClient
*c
);
637 /*================================= Globals ================================= */
640 static struct redisServer server
; /* server global state */
641 static struct redisCommand cmdTable
[] = {
642 {"get",getCommand
,2,REDIS_CMD_INLINE
},
643 {"set",setCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
644 {"setnx",setnxCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
645 {"del",delCommand
,-2,REDIS_CMD_INLINE
},
646 {"exists",existsCommand
,2,REDIS_CMD_INLINE
},
647 {"incr",incrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
648 {"decr",decrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
649 {"mget",mgetCommand
,-2,REDIS_CMD_INLINE
},
650 {"rpush",rpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
651 {"lpush",lpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
652 {"rpop",rpopCommand
,2,REDIS_CMD_INLINE
},
653 {"lpop",lpopCommand
,2,REDIS_CMD_INLINE
},
654 {"brpop",brpopCommand
,-3,REDIS_CMD_INLINE
},
655 {"blpop",blpopCommand
,-3,REDIS_CMD_INLINE
},
656 {"llen",llenCommand
,2,REDIS_CMD_INLINE
},
657 {"lindex",lindexCommand
,3,REDIS_CMD_INLINE
},
658 {"lset",lsetCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
659 {"lrange",lrangeCommand
,4,REDIS_CMD_INLINE
},
660 {"ltrim",ltrimCommand
,4,REDIS_CMD_INLINE
},
661 {"lrem",lremCommand
,4,REDIS_CMD_BULK
},
662 {"rpoplpush",rpoplpushcommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
663 {"sadd",saddCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
664 {"srem",sremCommand
,3,REDIS_CMD_BULK
},
665 {"smove",smoveCommand
,4,REDIS_CMD_BULK
},
666 {"sismember",sismemberCommand
,3,REDIS_CMD_BULK
},
667 {"scard",scardCommand
,2,REDIS_CMD_INLINE
},
668 {"spop",spopCommand
,2,REDIS_CMD_INLINE
},
669 {"srandmember",srandmemberCommand
,2,REDIS_CMD_INLINE
},
670 {"sinter",sinterCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
671 {"sinterstore",sinterstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
672 {"sunion",sunionCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
673 {"sunionstore",sunionstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
674 {"sdiff",sdiffCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
675 {"sdiffstore",sdiffstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
676 {"smembers",sinterCommand
,2,REDIS_CMD_INLINE
},
677 {"zadd",zaddCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
678 {"zincrby",zincrbyCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
679 {"zrem",zremCommand
,3,REDIS_CMD_BULK
},
680 {"zremrangebyscore",zremrangebyscoreCommand
,4,REDIS_CMD_INLINE
},
681 {"zrange",zrangeCommand
,-4,REDIS_CMD_INLINE
},
682 {"zrangebyscore",zrangebyscoreCommand
,-4,REDIS_CMD_INLINE
},
683 {"zrevrange",zrevrangeCommand
,-4,REDIS_CMD_INLINE
},
684 {"zcard",zcardCommand
,2,REDIS_CMD_INLINE
},
685 {"zscore",zscoreCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
686 {"incrby",incrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
687 {"decrby",decrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
688 {"getset",getsetCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
689 {"mset",msetCommand
,-3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
690 {"msetnx",msetnxCommand
,-3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
691 {"randomkey",randomkeyCommand
,1,REDIS_CMD_INLINE
},
692 {"select",selectCommand
,2,REDIS_CMD_INLINE
},
693 {"move",moveCommand
,3,REDIS_CMD_INLINE
},
694 {"rename",renameCommand
,3,REDIS_CMD_INLINE
},
695 {"renamenx",renamenxCommand
,3,REDIS_CMD_INLINE
},
696 {"expire",expireCommand
,3,REDIS_CMD_INLINE
},
697 {"expireat",expireatCommand
,3,REDIS_CMD_INLINE
},
698 {"keys",keysCommand
,2,REDIS_CMD_INLINE
},
699 {"dbsize",dbsizeCommand
,1,REDIS_CMD_INLINE
},
700 {"auth",authCommand
,2,REDIS_CMD_INLINE
},
701 {"ping",pingCommand
,1,REDIS_CMD_INLINE
},
702 {"echo",echoCommand
,2,REDIS_CMD_BULK
},
703 {"save",saveCommand
,1,REDIS_CMD_INLINE
},
704 {"bgsave",bgsaveCommand
,1,REDIS_CMD_INLINE
},
705 {"bgrewriteaof",bgrewriteaofCommand
,1,REDIS_CMD_INLINE
},
706 {"shutdown",shutdownCommand
,1,REDIS_CMD_INLINE
},
707 {"lastsave",lastsaveCommand
,1,REDIS_CMD_INLINE
},
708 {"type",typeCommand
,2,REDIS_CMD_INLINE
},
709 {"multi",multiCommand
,1,REDIS_CMD_INLINE
},
710 {"exec",execCommand
,1,REDIS_CMD_INLINE
},
711 {"sync",syncCommand
,1,REDIS_CMD_INLINE
},
712 {"flushdb",flushdbCommand
,1,REDIS_CMD_INLINE
},
713 {"flushall",flushallCommand
,1,REDIS_CMD_INLINE
},
714 {"sort",sortCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
715 {"info",infoCommand
,1,REDIS_CMD_INLINE
},
716 {"monitor",monitorCommand
,1,REDIS_CMD_INLINE
},
717 {"ttl",ttlCommand
,2,REDIS_CMD_INLINE
},
718 {"slaveof",slaveofCommand
,3,REDIS_CMD_INLINE
},
719 {"debug",debugCommand
,-2,REDIS_CMD_INLINE
},
723 /*============================ Utility functions ============================ */
725 /* Glob-style pattern matching. */
726 int stringmatchlen(const char *pattern
, int patternLen
,
727 const char *string
, int stringLen
, int nocase
)
732 while (pattern
[1] == '*') {
737 return 1; /* match */
739 if (stringmatchlen(pattern
+1, patternLen
-1,
740 string
, stringLen
, nocase
))
741 return 1; /* match */
745 return 0; /* no match */
749 return 0; /* no match */
759 not = pattern
[0] == '^';
766 if (pattern
[0] == '\\') {
769 if (pattern
[0] == string
[0])
771 } else if (pattern
[0] == ']') {
773 } else if (patternLen
== 0) {
777 } else if (pattern
[1] == '-' && patternLen
>= 3) {
778 int start
= pattern
[0];
779 int end
= pattern
[2];
787 start
= tolower(start
);
793 if (c
>= start
&& c
<= end
)
797 if (pattern
[0] == string
[0])
800 if (tolower((int)pattern
[0]) == tolower((int)string
[0]))
810 return 0; /* no match */
816 if (patternLen
>= 2) {
823 if (pattern
[0] != string
[0])
824 return 0; /* no match */
826 if (tolower((int)pattern
[0]) != tolower((int)string
[0]))
827 return 0; /* no match */
835 if (stringLen
== 0) {
836 while(*pattern
== '*') {
843 if (patternLen
== 0 && stringLen
== 0)
848 static void redisLog(int level
, const char *fmt
, ...) {
852 fp
= (server
.logfile
== NULL
) ? stdout
: fopen(server
.logfile
,"a");
856 if (level
>= server
.verbosity
) {
862 strftime(buf
,64,"%d %b %H:%M:%S",localtime(&now
));
863 fprintf(fp
,"%s %c ",buf
,c
[level
]);
864 vfprintf(fp
, fmt
, ap
);
870 if (server
.logfile
) fclose(fp
);
873 /*====================== Hash table type implementation ==================== */
875 /* This is an hash table type that uses the SDS dynamic strings libary as
876 * keys and radis objects as values (objects can hold SDS strings,
879 static void dictVanillaFree(void *privdata
, void *val
)
881 DICT_NOTUSED(privdata
);
885 static void dictListDestructor(void *privdata
, void *val
)
887 DICT_NOTUSED(privdata
);
888 listRelease((list
*)val
);
891 static int sdsDictKeyCompare(void *privdata
, const void *key1
,
895 DICT_NOTUSED(privdata
);
897 l1
= sdslen((sds
)key1
);
898 l2
= sdslen((sds
)key2
);
899 if (l1
!= l2
) return 0;
900 return memcmp(key1
, key2
, l1
) == 0;
903 static void dictRedisObjectDestructor(void *privdata
, void *val
)
905 DICT_NOTUSED(privdata
);
907 if (val
== NULL
) return; /* Values of swapped out keys as set to NULL */
911 static int dictObjKeyCompare(void *privdata
, const void *key1
,
914 const robj
*o1
= key1
, *o2
= key2
;
915 return sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
918 static unsigned int dictObjHash(const void *key
) {
920 return dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
923 static int dictEncObjKeyCompare(void *privdata
, const void *key1
,
926 robj
*o1
= (robj
*) key1
, *o2
= (robj
*) key2
;
929 o1
= getDecodedObject(o1
);
930 o2
= getDecodedObject(o2
);
931 cmp
= sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
937 static unsigned int dictEncObjHash(const void *key
) {
938 robj
*o
= (robj
*) key
;
940 o
= getDecodedObject(o
);
941 unsigned int hash
= dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
946 static dictType setDictType
= {
947 dictEncObjHash
, /* hash function */
950 dictEncObjKeyCompare
, /* key compare */
951 dictRedisObjectDestructor
, /* key destructor */
952 NULL
/* val destructor */
955 static dictType zsetDictType
= {
956 dictEncObjHash
, /* hash function */
959 dictEncObjKeyCompare
, /* key compare */
960 dictRedisObjectDestructor
, /* key destructor */
961 dictVanillaFree
/* val destructor of malloc(sizeof(double)) */
964 static dictType hashDictType
= {
965 dictObjHash
, /* hash function */
968 dictObjKeyCompare
, /* key compare */
969 dictRedisObjectDestructor
, /* key destructor */
970 dictRedisObjectDestructor
/* val destructor */
973 /* Keylist hash table type has unencoded redis objects as keys and
974 * lists as values. It's used for blocking operations (BLPOP) */
975 static dictType keylistDictType
= {
976 dictObjHash
, /* hash function */
979 dictObjKeyCompare
, /* key compare */
980 dictRedisObjectDestructor
, /* key destructor */
981 dictListDestructor
/* val destructor */
984 /* ========================= Random utility functions ======================= */
986 /* Redis generally does not try to recover from out of memory conditions
987 * when allocating objects or strings, it is not clear if it will be possible
988 * to report this condition to the client since the networking layer itself
989 * is based on heap allocation for send buffers, so we simply abort.
990 * At least the code will be simpler to read... */
991 static void oom(const char *msg
) {
992 redisLog(REDIS_WARNING
, "%s: Out of memory\n",msg
);
997 /* ====================== Redis server networking stuff ===================== */
998 static void closeTimedoutClients(void) {
1001 time_t now
= time(NULL
);
1003 listRewind(server
.clients
);
1004 while ((ln
= listYield(server
.clients
)) != NULL
) {
1005 c
= listNodeValue(ln
);
1006 if (server
.maxidletime
&&
1007 !(c
->flags
& REDIS_SLAVE
) && /* no timeout for slaves */
1008 !(c
->flags
& REDIS_MASTER
) && /* no timeout for masters */
1009 (now
- c
->lastinteraction
> server
.maxidletime
))
1011 redisLog(REDIS_VERBOSE
,"Closing idle client");
1013 } else if (c
->flags
& REDIS_BLOCKED
) {
1014 if (c
->blockingto
!= 0 && c
->blockingto
< now
) {
1015 addReply(c
,shared
.nullmultibulk
);
1022 static int htNeedsResize(dict
*dict
) {
1023 long long size
, used
;
1025 size
= dictSlots(dict
);
1026 used
= dictSize(dict
);
1027 return (size
&& used
&& size
> DICT_HT_INITIAL_SIZE
&&
1028 (used
*100/size
< REDIS_HT_MINFILL
));
1031 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
1032 * we resize the hash table to save memory */
1033 static void tryResizeHashTables(void) {
1036 for (j
= 0; j
< server
.dbnum
; j
++) {
1037 if (htNeedsResize(server
.db
[j
].dict
)) {
1038 redisLog(REDIS_VERBOSE
,"The hash table %d is too sparse, resize it...",j
);
1039 dictResize(server
.db
[j
].dict
);
1040 redisLog(REDIS_VERBOSE
,"Hash table %d resized.",j
);
1042 if (htNeedsResize(server
.db
[j
].expires
))
1043 dictResize(server
.db
[j
].expires
);
1047 /* A background saving child (BGSAVE) terminated its work. Handle this. */
1048 void backgroundSaveDoneHandler(int statloc
) {
1049 int exitcode
= WEXITSTATUS(statloc
);
1050 int bysignal
= WIFSIGNALED(statloc
);
1052 if (!bysignal
&& exitcode
== 0) {
1053 redisLog(REDIS_NOTICE
,
1054 "Background saving terminated with success");
1056 server
.lastsave
= time(NULL
);
1057 } else if (!bysignal
&& exitcode
!= 0) {
1058 redisLog(REDIS_WARNING
, "Background saving error");
1060 redisLog(REDIS_WARNING
,
1061 "Background saving terminated by signal");
1062 rdbRemoveTempFile(server
.bgsavechildpid
);
1064 server
.bgsavechildpid
= -1;
1065 /* Possibly there are slaves waiting for a BGSAVE in order to be served
1066 * (the first stage of SYNC is a bulk transfer of dump.rdb) */
1067 updateSlavesWaitingBgsave(exitcode
== 0 ? REDIS_OK
: REDIS_ERR
);
1070 /* A background append only file rewriting (BGREWRITEAOF) terminated its work.
1072 void backgroundRewriteDoneHandler(int statloc
) {
1073 int exitcode
= WEXITSTATUS(statloc
);
1074 int bysignal
= WIFSIGNALED(statloc
);
1076 if (!bysignal
&& exitcode
== 0) {
1080 redisLog(REDIS_NOTICE
,
1081 "Background append only file rewriting terminated with success");
1082 /* Now it's time to flush the differences accumulated by the parent */
1083 snprintf(tmpfile
,256,"temp-rewriteaof-bg-%d.aof", (int) server
.bgrewritechildpid
);
1084 fd
= open(tmpfile
,O_WRONLY
|O_APPEND
);
1086 redisLog(REDIS_WARNING
, "Not able to open the temp append only file produced by the child: %s", strerror(errno
));
1089 /* Flush our data... */
1090 if (write(fd
,server
.bgrewritebuf
,sdslen(server
.bgrewritebuf
)) !=
1091 (signed) sdslen(server
.bgrewritebuf
)) {
1092 redisLog(REDIS_WARNING
, "Error or short write trying to flush the parent diff of the append log file in the child temp file: %s", strerror(errno
));
1096 redisLog(REDIS_NOTICE
,"Parent diff flushed into the new append log file with success (%lu bytes)",sdslen(server
.bgrewritebuf
));
1097 /* Now our work is to rename the temp file into the stable file. And
1098 * switch the file descriptor used by the server for append only. */
1099 if (rename(tmpfile
,server
.appendfilename
) == -1) {
1100 redisLog(REDIS_WARNING
,"Can't rename the temp append only file into the stable one: %s", strerror(errno
));
1104 /* Mission completed... almost */
1105 redisLog(REDIS_NOTICE
,"Append only file successfully rewritten.");
1106 if (server
.appendfd
!= -1) {
1107 /* If append only is actually enabled... */
1108 close(server
.appendfd
);
1109 server
.appendfd
= fd
;
1111 server
.appendseldb
= -1; /* Make sure it will issue SELECT */
1112 redisLog(REDIS_NOTICE
,"The new append only file was selected for future appends.");
1114 /* If append only is disabled we just generate a dump in this
1115 * format. Why not? */
1118 } else if (!bysignal
&& exitcode
!= 0) {
1119 redisLog(REDIS_WARNING
, "Background append only file rewriting error");
1121 redisLog(REDIS_WARNING
,
1122 "Background append only file rewriting terminated by signal");
1125 sdsfree(server
.bgrewritebuf
);
1126 server
.bgrewritebuf
= sdsempty();
1127 aofRemoveTempFile(server
.bgrewritechildpid
);
1128 server
.bgrewritechildpid
= -1;
1131 static int serverCron(struct aeEventLoop
*eventLoop
, long long id
, void *clientData
) {
1132 int j
, loops
= server
.cronloops
++;
1133 REDIS_NOTUSED(eventLoop
);
1135 REDIS_NOTUSED(clientData
);
1137 /* We take a cached value of the unix time in the global state because
1138 * with virtual memory and aging there is to store the current time
1139 * in objects at every object access, and accuracy is not needed.
1140 * To access a global var is faster than calling time(NULL) */
1141 server
.unixtime
= time(NULL
);
1143 /* Update the global state with the amount of used memory */
1144 server
.usedmemory
= zmalloc_used_memory();
1146 /* Show some info about non-empty databases */
1147 for (j
= 0; j
< server
.dbnum
; j
++) {
1148 long long size
, used
, vkeys
;
1150 size
= dictSlots(server
.db
[j
].dict
);
1151 used
= dictSize(server
.db
[j
].dict
);
1152 vkeys
= dictSize(server
.db
[j
].expires
);
1153 if (!(loops
% 5) && (used
|| vkeys
)) {
1154 redisLog(REDIS_VERBOSE
,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j
,used
,vkeys
,size
);
1155 /* dictPrintStats(server.dict); */
1159 /* We don't want to resize the hash tables while a bacground saving
1160 * is in progress: the saving child is created using fork() that is
1161 * implemented with a copy-on-write semantic in most modern systems, so
1162 * if we resize the HT while there is the saving child at work actually
1163 * a lot of memory movements in the parent will cause a lot of pages
1165 if (server
.bgsavechildpid
== -1) tryResizeHashTables();
1167 /* Show information about connected clients */
1169 redisLog(REDIS_VERBOSE
,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
1170 listLength(server
.clients
)-listLength(server
.slaves
),
1171 listLength(server
.slaves
),
1173 dictSize(server
.sharingpool
));
1176 /* Close connections of timedout clients */
1177 if ((server
.maxidletime
&& !(loops
% 10)) || server
.blockedclients
)
1178 closeTimedoutClients();
1180 /* Check if a background saving or AOF rewrite in progress terminated */
1181 if (server
.bgsavechildpid
!= -1 || server
.bgrewritechildpid
!= -1) {
1185 if ((pid
= wait3(&statloc
,WNOHANG
,NULL
)) != 0) {
1186 if (pid
== server
.bgsavechildpid
) {
1187 backgroundSaveDoneHandler(statloc
);
1189 backgroundRewriteDoneHandler(statloc
);
1193 /* If there is not a background saving in progress check if
1194 * we have to save now */
1195 time_t now
= time(NULL
);
1196 for (j
= 0; j
< server
.saveparamslen
; j
++) {
1197 struct saveparam
*sp
= server
.saveparams
+j
;
1199 if (server
.dirty
>= sp
->changes
&&
1200 now
-server
.lastsave
> sp
->seconds
) {
1201 redisLog(REDIS_NOTICE
,"%d changes in %d seconds. Saving...",
1202 sp
->changes
, sp
->seconds
);
1203 rdbSaveBackground(server
.dbfilename
);
1209 /* Try to expire a few timed out keys. The algorithm used is adaptive and
1210 * will use few CPU cycles if there are few expiring keys, otherwise
1211 * it will get more aggressive to avoid that too much memory is used by
1212 * keys that can be removed from the keyspace. */
1213 for (j
= 0; j
< server
.dbnum
; j
++) {
1215 redisDb
*db
= server
.db
+j
;
1217 /* Continue to expire if at the end of the cycle more than 25%
1218 * of the keys were expired. */
1220 long num
= dictSize(db
->expires
);
1221 time_t now
= time(NULL
);
1224 if (num
> REDIS_EXPIRELOOKUPS_PER_CRON
)
1225 num
= REDIS_EXPIRELOOKUPS_PER_CRON
;
1230 if ((de
= dictGetRandomKey(db
->expires
)) == NULL
) break;
1231 t
= (time_t) dictGetEntryVal(de
);
1233 deleteKey(db
,dictGetEntryKey(de
));
1237 } while (expired
> REDIS_EXPIRELOOKUPS_PER_CRON
/4);
1240 /* Swap a few keys on disk if we are over the memory limit and VM
1241 * is enbled. Try to free objects from the free list first. */
1242 if (vmCanSwapOut()) {
1243 while (server
.vm_enabled
&& zmalloc_used_memory() >
1244 server
.vm_max_memory
)
1246 if (tryFreeOneObjectFromFreelist() == REDIS_OK
) continue;
1247 if (vmSwapOneObjectThreaded() == REDIS_ERR
) {
1248 if ((loops
% 30) == 0 && zmalloc_used_memory() >
1249 (server
.vm_max_memory
+server
.vm_max_memory
/10)) {
1250 redisLog(REDIS_WARNING
,"WARNING: vm-max-memory limit exceeded by more than 10%% but unable to swap more objects out!");
1253 /* Note that we freed just one object, because anyway when
1254 * the I/O thread in charge to swap this object out will
1255 * do its work, the handler of completed jobs will try to swap
1256 * more objects if we are out of memory. */
1261 /* Check if we should connect to a MASTER */
1262 if (server
.replstate
== REDIS_REPL_CONNECT
) {
1263 redisLog(REDIS_NOTICE
,"Connecting to MASTER...");
1264 if (syncWithMaster() == REDIS_OK
) {
1265 redisLog(REDIS_NOTICE
,"MASTER <-> SLAVE sync succeeded");
1271 static void createSharedObjects(void) {
1272 shared
.crlf
= createObject(REDIS_STRING
,sdsnew("\r\n"));
1273 shared
.ok
= createObject(REDIS_STRING
,sdsnew("+OK\r\n"));
1274 shared
.err
= createObject(REDIS_STRING
,sdsnew("-ERR\r\n"));
1275 shared
.emptybulk
= createObject(REDIS_STRING
,sdsnew("$0\r\n\r\n"));
1276 shared
.czero
= createObject(REDIS_STRING
,sdsnew(":0\r\n"));
1277 shared
.cone
= createObject(REDIS_STRING
,sdsnew(":1\r\n"));
1278 shared
.nullbulk
= createObject(REDIS_STRING
,sdsnew("$-1\r\n"));
1279 shared
.nullmultibulk
= createObject(REDIS_STRING
,sdsnew("*-1\r\n"));
1280 shared
.emptymultibulk
= createObject(REDIS_STRING
,sdsnew("*0\r\n"));
1281 shared
.pong
= createObject(REDIS_STRING
,sdsnew("+PONG\r\n"));
1282 shared
.queued
= createObject(REDIS_STRING
,sdsnew("+QUEUED\r\n"));
1283 shared
.wrongtypeerr
= createObject(REDIS_STRING
,sdsnew(
1284 "-ERR Operation against a key holding the wrong kind of value\r\n"));
1285 shared
.nokeyerr
= createObject(REDIS_STRING
,sdsnew(
1286 "-ERR no such key\r\n"));
1287 shared
.syntaxerr
= createObject(REDIS_STRING
,sdsnew(
1288 "-ERR syntax error\r\n"));
1289 shared
.sameobjecterr
= createObject(REDIS_STRING
,sdsnew(
1290 "-ERR source and destination objects are the same\r\n"));
1291 shared
.outofrangeerr
= createObject(REDIS_STRING
,sdsnew(
1292 "-ERR index out of range\r\n"));
1293 shared
.space
= createObject(REDIS_STRING
,sdsnew(" "));
1294 shared
.colon
= createObject(REDIS_STRING
,sdsnew(":"));
1295 shared
.plus
= createObject(REDIS_STRING
,sdsnew("+"));
1296 shared
.select0
= createStringObject("select 0\r\n",10);
1297 shared
.select1
= createStringObject("select 1\r\n",10);
1298 shared
.select2
= createStringObject("select 2\r\n",10);
1299 shared
.select3
= createStringObject("select 3\r\n",10);
1300 shared
.select4
= createStringObject("select 4\r\n",10);
1301 shared
.select5
= createStringObject("select 5\r\n",10);
1302 shared
.select6
= createStringObject("select 6\r\n",10);
1303 shared
.select7
= createStringObject("select 7\r\n",10);
1304 shared
.select8
= createStringObject("select 8\r\n",10);
1305 shared
.select9
= createStringObject("select 9\r\n",10);
1308 static void appendServerSaveParams(time_t seconds
, int changes
) {
1309 server
.saveparams
= zrealloc(server
.saveparams
,sizeof(struct saveparam
)*(server
.saveparamslen
+1));
1310 server
.saveparams
[server
.saveparamslen
].seconds
= seconds
;
1311 server
.saveparams
[server
.saveparamslen
].changes
= changes
;
1312 server
.saveparamslen
++;
1315 static void resetServerSaveParams() {
1316 zfree(server
.saveparams
);
1317 server
.saveparams
= NULL
;
1318 server
.saveparamslen
= 0;
1321 static void initServerConfig() {
1322 server
.dbnum
= REDIS_DEFAULT_DBNUM
;
1323 server
.port
= REDIS_SERVERPORT
;
1324 server
.verbosity
= REDIS_VERBOSE
;
1325 server
.maxidletime
= REDIS_MAXIDLETIME
;
1326 server
.saveparams
= NULL
;
1327 server
.logfile
= NULL
; /* NULL = log on standard output */
1328 server
.bindaddr
= NULL
;
1329 server
.glueoutputbuf
= 1;
1330 server
.daemonize
= 0;
1331 server
.appendonly
= 0;
1332 server
.appendfsync
= APPENDFSYNC_ALWAYS
;
1333 server
.lastfsync
= time(NULL
);
1334 server
.appendfd
= -1;
1335 server
.appendseldb
= -1; /* Make sure the first time will not match */
1336 server
.pidfile
= "/var/run/redis.pid";
1337 server
.dbfilename
= "dump.rdb";
1338 server
.appendfilename
= "appendonly.aof";
1339 server
.requirepass
= NULL
;
1340 server
.shareobjects
= 0;
1341 server
.rdbcompression
= 1;
1342 server
.sharingpoolsize
= 1024;
1343 server
.maxclients
= 0;
1344 server
.blockedclients
= 0;
1345 server
.maxmemory
= 0;
1346 server
.vm_enabled
= 0;
1347 server
.vm_page_size
= 256; /* 256 bytes per page */
1348 server
.vm_pages
= 1024*1024*100; /* 104 millions of pages */
1349 server
.vm_max_memory
= 1024LL*1024*1024*1; /* 1 GB of RAM */
1350 server
.vm_max_threads
= 4;
1352 resetServerSaveParams();
1354 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1355 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1356 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1357 /* Replication related */
1359 server
.masterauth
= NULL
;
1360 server
.masterhost
= NULL
;
1361 server
.masterport
= 6379;
1362 server
.master
= NULL
;
1363 server
.replstate
= REDIS_REPL_NONE
;
1365 /* Double constants initialization */
1367 R_PosInf
= 1.0/R_Zero
;
1368 R_NegInf
= -1.0/R_Zero
;
1369 R_Nan
= R_Zero
/R_Zero
;
1372 static void initServer() {
1375 signal(SIGHUP
, SIG_IGN
);
1376 signal(SIGPIPE
, SIG_IGN
);
1377 setupSigSegvAction();
1379 server
.devnull
= fopen("/dev/null","w");
1380 if (server
.devnull
== NULL
) {
1381 redisLog(REDIS_WARNING
, "Can't open /dev/null: %s", server
.neterr
);
1384 server
.clients
= listCreate();
1385 server
.slaves
= listCreate();
1386 server
.monitors
= listCreate();
1387 server
.objfreelist
= listCreate();
1388 createSharedObjects();
1389 server
.el
= aeCreateEventLoop();
1390 server
.db
= zmalloc(sizeof(redisDb
)*server
.dbnum
);
1391 server
.sharingpool
= dictCreate(&setDictType
,NULL
);
1392 server
.fd
= anetTcpServer(server
.neterr
, server
.port
, server
.bindaddr
);
1393 if (server
.fd
== -1) {
1394 redisLog(REDIS_WARNING
, "Opening TCP port: %s", server
.neterr
);
1397 for (j
= 0; j
< server
.dbnum
; j
++) {
1398 server
.db
[j
].dict
= dictCreate(&hashDictType
,NULL
);
1399 server
.db
[j
].expires
= dictCreate(&setDictType
,NULL
);
1400 server
.db
[j
].blockingkeys
= dictCreate(&keylistDictType
,NULL
);
1401 server
.db
[j
].id
= j
;
1403 server
.cronloops
= 0;
1404 server
.bgsavechildpid
= -1;
1405 server
.bgrewritechildpid
= -1;
1406 server
.bgrewritebuf
= sdsempty();
1407 server
.lastsave
= time(NULL
);
1409 server
.usedmemory
= 0;
1410 server
.stat_numcommands
= 0;
1411 server
.stat_numconnections
= 0;
1412 server
.stat_starttime
= time(NULL
);
1413 server
.unixtime
= time(NULL
);
1414 aeCreateTimeEvent(server
.el
, 1, serverCron
, NULL
, NULL
);
1415 if (aeCreateFileEvent(server
.el
, server
.fd
, AE_READABLE
,
1416 acceptHandler
, NULL
) == AE_ERR
) oom("creating file event");
1418 if (server
.appendonly
) {
1419 server
.appendfd
= open(server
.appendfilename
,O_WRONLY
|O_APPEND
|O_CREAT
,0644);
1420 if (server
.appendfd
== -1) {
1421 redisLog(REDIS_WARNING
, "Can't open the append-only file: %s",
1427 if (server
.vm_enabled
) vmInit();
1430 /* Empty the whole database */
1431 static long long emptyDb() {
1433 long long removed
= 0;
1435 for (j
= 0; j
< server
.dbnum
; j
++) {
1436 removed
+= dictSize(server
.db
[j
].dict
);
1437 dictEmpty(server
.db
[j
].dict
);
1438 dictEmpty(server
.db
[j
].expires
);
1443 static int yesnotoi(char *s
) {
1444 if (!strcasecmp(s
,"yes")) return 1;
1445 else if (!strcasecmp(s
,"no")) return 0;
1449 /* I agree, this is a very rudimental way to load a configuration...
1450 will improve later if the config gets more complex */
1451 static void loadServerConfig(char *filename
) {
1453 char buf
[REDIS_CONFIGLINE_MAX
+1], *err
= NULL
;
1457 if (filename
[0] == '-' && filename
[1] == '\0')
1460 if ((fp
= fopen(filename
,"r")) == NULL
) {
1461 redisLog(REDIS_WARNING
,"Fatal error, can't open config file");
1466 while(fgets(buf
,REDIS_CONFIGLINE_MAX
+1,fp
) != NULL
) {
1472 line
= sdstrim(line
," \t\r\n");
1474 /* Skip comments and blank lines*/
1475 if (line
[0] == '#' || line
[0] == '\0') {
1480 /* Split into arguments */
1481 argv
= sdssplitlen(line
,sdslen(line
)," ",1,&argc
);
1482 sdstolower(argv
[0]);
1484 /* Execute config directives */
1485 if (!strcasecmp(argv
[0],"timeout") && argc
== 2) {
1486 server
.maxidletime
= atoi(argv
[1]);
1487 if (server
.maxidletime
< 0) {
1488 err
= "Invalid timeout value"; goto loaderr
;
1490 } else if (!strcasecmp(argv
[0],"port") && argc
== 2) {
1491 server
.port
= atoi(argv
[1]);
1492 if (server
.port
< 1 || server
.port
> 65535) {
1493 err
= "Invalid port"; goto loaderr
;
1495 } else if (!strcasecmp(argv
[0],"bind") && argc
== 2) {
1496 server
.bindaddr
= zstrdup(argv
[1]);
1497 } else if (!strcasecmp(argv
[0],"save") && argc
== 3) {
1498 int seconds
= atoi(argv
[1]);
1499 int changes
= atoi(argv
[2]);
1500 if (seconds
< 1 || changes
< 0) {
1501 err
= "Invalid save parameters"; goto loaderr
;
1503 appendServerSaveParams(seconds
,changes
);
1504 } else if (!strcasecmp(argv
[0],"dir") && argc
== 2) {
1505 if (chdir(argv
[1]) == -1) {
1506 redisLog(REDIS_WARNING
,"Can't chdir to '%s': %s",
1507 argv
[1], strerror(errno
));
1510 } else if (!strcasecmp(argv
[0],"loglevel") && argc
== 2) {
1511 if (!strcasecmp(argv
[1],"debug")) server
.verbosity
= REDIS_DEBUG
;
1512 else if (!strcasecmp(argv
[1],"verbose")) server
.verbosity
= REDIS_VERBOSE
;
1513 else if (!strcasecmp(argv
[1],"notice")) server
.verbosity
= REDIS_NOTICE
;
1514 else if (!strcasecmp(argv
[1],"warning")) server
.verbosity
= REDIS_WARNING
;
1516 err
= "Invalid log level. Must be one of debug, notice, warning";
1519 } else if (!strcasecmp(argv
[0],"logfile") && argc
== 2) {
1522 server
.logfile
= zstrdup(argv
[1]);
1523 if (!strcasecmp(server
.logfile
,"stdout")) {
1524 zfree(server
.logfile
);
1525 server
.logfile
= NULL
;
1527 if (server
.logfile
) {
1528 /* Test if we are able to open the file. The server will not
1529 * be able to abort just for this problem later... */
1530 logfp
= fopen(server
.logfile
,"a");
1531 if (logfp
== NULL
) {
1532 err
= sdscatprintf(sdsempty(),
1533 "Can't open the log file: %s", strerror(errno
));
1538 } else if (!strcasecmp(argv
[0],"databases") && argc
== 2) {
1539 server
.dbnum
= atoi(argv
[1]);
1540 if (server
.dbnum
< 1) {
1541 err
= "Invalid number of databases"; goto loaderr
;
1543 } else if (!strcasecmp(argv
[0],"maxclients") && argc
== 2) {
1544 server
.maxclients
= atoi(argv
[1]);
1545 } else if (!strcasecmp(argv
[0],"maxmemory") && argc
== 2) {
1546 server
.maxmemory
= strtoll(argv
[1], NULL
, 10);
1547 } else if (!strcasecmp(argv
[0],"slaveof") && argc
== 3) {
1548 server
.masterhost
= sdsnew(argv
[1]);
1549 server
.masterport
= atoi(argv
[2]);
1550 server
.replstate
= REDIS_REPL_CONNECT
;
1551 } else if (!strcasecmp(argv
[0],"masterauth") && argc
== 2) {
1552 server
.masterauth
= zstrdup(argv
[1]);
1553 } else if (!strcasecmp(argv
[0],"glueoutputbuf") && argc
== 2) {
1554 if ((server
.glueoutputbuf
= yesnotoi(argv
[1])) == -1) {
1555 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1557 } else if (!strcasecmp(argv
[0],"shareobjects") && argc
== 2) {
1558 if ((server
.shareobjects
= yesnotoi(argv
[1])) == -1) {
1559 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1561 } else if (!strcasecmp(argv
[0],"rdbcompression") && argc
== 2) {
1562 if ((server
.rdbcompression
= yesnotoi(argv
[1])) == -1) {
1563 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1565 } else if (!strcasecmp(argv
[0],"shareobjectspoolsize") && argc
== 2) {
1566 server
.sharingpoolsize
= atoi(argv
[1]);
1567 if (server
.sharingpoolsize
< 1) {
1568 err
= "invalid object sharing pool size"; goto loaderr
;
1570 } else if (!strcasecmp(argv
[0],"daemonize") && argc
== 2) {
1571 if ((server
.daemonize
= yesnotoi(argv
[1])) == -1) {
1572 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1574 } else if (!strcasecmp(argv
[0],"appendonly") && argc
== 2) {
1575 if ((server
.appendonly
= yesnotoi(argv
[1])) == -1) {
1576 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1578 } else if (!strcasecmp(argv
[0],"appendfsync") && argc
== 2) {
1579 if (!strcasecmp(argv
[1],"no")) {
1580 server
.appendfsync
= APPENDFSYNC_NO
;
1581 } else if (!strcasecmp(argv
[1],"always")) {
1582 server
.appendfsync
= APPENDFSYNC_ALWAYS
;
1583 } else if (!strcasecmp(argv
[1],"everysec")) {
1584 server
.appendfsync
= APPENDFSYNC_EVERYSEC
;
1586 err
= "argument must be 'no', 'always' or 'everysec'";
1589 } else if (!strcasecmp(argv
[0],"requirepass") && argc
== 2) {
1590 server
.requirepass
= zstrdup(argv
[1]);
1591 } else if (!strcasecmp(argv
[0],"pidfile") && argc
== 2) {
1592 server
.pidfile
= zstrdup(argv
[1]);
1593 } else if (!strcasecmp(argv
[0],"dbfilename") && argc
== 2) {
1594 server
.dbfilename
= zstrdup(argv
[1]);
1595 } else if (!strcasecmp(argv
[0],"vm-enabled") && argc
== 2) {
1596 if ((server
.vm_enabled
= yesnotoi(argv
[1])) == -1) {
1597 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1599 } else if (!strcasecmp(argv
[0],"vm-max-memory") && argc
== 2) {
1600 server
.vm_max_memory
= strtoll(argv
[1], NULL
, 10);
1601 } else if (!strcasecmp(argv
[0],"vm-page-size") && argc
== 2) {
1602 server
.vm_page_size
= strtoll(argv
[1], NULL
, 10);
1603 } else if (!strcasecmp(argv
[0],"vm-pages") && argc
== 2) {
1604 server
.vm_pages
= strtoll(argv
[1], NULL
, 10);
1605 } else if (!strcasecmp(argv
[0],"vm-max-threads") && argc
== 2) {
1606 server
.vm_max_threads
= strtoll(argv
[1], NULL
, 10);
1608 err
= "Bad directive or wrong number of arguments"; goto loaderr
;
1610 for (j
= 0; j
< argc
; j
++)
1615 if (fp
!= stdin
) fclose(fp
);
1619 fprintf(stderr
, "\n*** FATAL CONFIG FILE ERROR ***\n");
1620 fprintf(stderr
, "Reading the configuration file, at line %d\n", linenum
);
1621 fprintf(stderr
, ">>> '%s'\n", line
);
1622 fprintf(stderr
, "%s\n", err
);
1626 static void freeClientArgv(redisClient
*c
) {
1629 for (j
= 0; j
< c
->argc
; j
++)
1630 decrRefCount(c
->argv
[j
]);
1631 for (j
= 0; j
< c
->mbargc
; j
++)
1632 decrRefCount(c
->mbargv
[j
]);
1637 static void freeClient(redisClient
*c
) {
1640 /* Note that if the client we are freeing is blocked into a blocking
1641 * call, we have to set querybuf to NULL *before* to call unblockClient()
1642 * to avoid processInputBuffer() will get called. Also it is important
1643 * to remove the file events after this, because this call adds
1644 * the READABLE event. */
1645 sdsfree(c
->querybuf
);
1647 if (c
->flags
& REDIS_BLOCKED
)
1650 aeDeleteFileEvent(server
.el
,c
->fd
,AE_READABLE
);
1651 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1652 listRelease(c
->reply
);
1655 /* Remove from the list of clients */
1656 ln
= listSearchKey(server
.clients
,c
);
1657 redisAssert(ln
!= NULL
);
1658 listDelNode(server
.clients
,ln
);
1659 /* Remove from the list of clients waiting for VM operations */
1660 if (server
.vm_enabled
&& listLength(c
->io_keys
)) {
1661 ln
= listSearchKey(server
.io_clients
,c
);
1662 if (ln
) listDelNode(server
.io_clients
,ln
);
1663 listRelease(c
->io_keys
);
1665 listRelease(c
->io_keys
);
1667 if (c
->flags
& REDIS_SLAVE
) {
1668 if (c
->replstate
== REDIS_REPL_SEND_BULK
&& c
->repldbfd
!= -1)
1670 list
*l
= (c
->flags
& REDIS_MONITOR
) ? server
.monitors
: server
.slaves
;
1671 ln
= listSearchKey(l
,c
);
1672 redisAssert(ln
!= NULL
);
1675 if (c
->flags
& REDIS_MASTER
) {
1676 server
.master
= NULL
;
1677 server
.replstate
= REDIS_REPL_CONNECT
;
1681 freeClientMultiState(c
);
1685 #define GLUEREPLY_UP_TO (1024)
1686 static void glueReplyBuffersIfNeeded(redisClient
*c
) {
1688 char buf
[GLUEREPLY_UP_TO
];
1692 listRewind(c
->reply
);
1693 while((ln
= listYield(c
->reply
))) {
1697 objlen
= sdslen(o
->ptr
);
1698 if (copylen
+ objlen
<= GLUEREPLY_UP_TO
) {
1699 memcpy(buf
+copylen
,o
->ptr
,objlen
);
1701 listDelNode(c
->reply
,ln
);
1703 if (copylen
== 0) return;
1707 /* Now the output buffer is empty, add the new single element */
1708 o
= createObject(REDIS_STRING
,sdsnewlen(buf
,copylen
));
1709 listAddNodeHead(c
->reply
,o
);
1712 static void sendReplyToClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1713 redisClient
*c
= privdata
;
1714 int nwritten
= 0, totwritten
= 0, objlen
;
1717 REDIS_NOTUSED(mask
);
1719 /* Use writev() if we have enough buffers to send */
1720 if (!server
.glueoutputbuf
&&
1721 listLength(c
->reply
) > REDIS_WRITEV_THRESHOLD
&&
1722 !(c
->flags
& REDIS_MASTER
))
1724 sendReplyToClientWritev(el
, fd
, privdata
, mask
);
1728 while(listLength(c
->reply
)) {
1729 if (server
.glueoutputbuf
&& listLength(c
->reply
) > 1)
1730 glueReplyBuffersIfNeeded(c
);
1732 o
= listNodeValue(listFirst(c
->reply
));
1733 objlen
= sdslen(o
->ptr
);
1736 listDelNode(c
->reply
,listFirst(c
->reply
));
1740 if (c
->flags
& REDIS_MASTER
) {
1741 /* Don't reply to a master */
1742 nwritten
= objlen
- c
->sentlen
;
1744 nwritten
= write(fd
, ((char*)o
->ptr
)+c
->sentlen
, objlen
- c
->sentlen
);
1745 if (nwritten
<= 0) break;
1747 c
->sentlen
+= nwritten
;
1748 totwritten
+= nwritten
;
1749 /* If we fully sent the object on head go to the next one */
1750 if (c
->sentlen
== objlen
) {
1751 listDelNode(c
->reply
,listFirst(c
->reply
));
1754 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1755 * bytes, in a single threaded server it's a good idea to serve
1756 * other clients as well, even if a very large request comes from
1757 * super fast link that is always able to accept data (in real world
1758 * scenario think about 'KEYS *' against the loopback interfae) */
1759 if (totwritten
> REDIS_MAX_WRITE_PER_EVENT
) break;
1761 if (nwritten
== -1) {
1762 if (errno
== EAGAIN
) {
1765 redisLog(REDIS_VERBOSE
,
1766 "Error writing to client: %s", strerror(errno
));
1771 if (totwritten
> 0) c
->lastinteraction
= time(NULL
);
1772 if (listLength(c
->reply
) == 0) {
1774 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1778 static void sendReplyToClientWritev(aeEventLoop
*el
, int fd
, void *privdata
, int mask
)
1780 redisClient
*c
= privdata
;
1781 int nwritten
= 0, totwritten
= 0, objlen
, willwrite
;
1783 struct iovec iov
[REDIS_WRITEV_IOVEC_COUNT
];
1784 int offset
, ion
= 0;
1786 REDIS_NOTUSED(mask
);
1789 while (listLength(c
->reply
)) {
1790 offset
= c
->sentlen
;
1794 /* fill-in the iov[] array */
1795 for(node
= listFirst(c
->reply
); node
; node
= listNextNode(node
)) {
1796 o
= listNodeValue(node
);
1797 objlen
= sdslen(o
->ptr
);
1799 if (totwritten
+ objlen
- offset
> REDIS_MAX_WRITE_PER_EVENT
)
1802 if(ion
== REDIS_WRITEV_IOVEC_COUNT
)
1803 break; /* no more iovecs */
1805 iov
[ion
].iov_base
= ((char*)o
->ptr
) + offset
;
1806 iov
[ion
].iov_len
= objlen
- offset
;
1807 willwrite
+= objlen
- offset
;
1808 offset
= 0; /* just for the first item */
1815 /* write all collected blocks at once */
1816 if((nwritten
= writev(fd
, iov
, ion
)) < 0) {
1817 if (errno
!= EAGAIN
) {
1818 redisLog(REDIS_VERBOSE
,
1819 "Error writing to client: %s", strerror(errno
));
1826 totwritten
+= nwritten
;
1827 offset
= c
->sentlen
;
1829 /* remove written robjs from c->reply */
1830 while (nwritten
&& listLength(c
->reply
)) {
1831 o
= listNodeValue(listFirst(c
->reply
));
1832 objlen
= sdslen(o
->ptr
);
1834 if(nwritten
>= objlen
- offset
) {
1835 listDelNode(c
->reply
, listFirst(c
->reply
));
1836 nwritten
-= objlen
- offset
;
1840 c
->sentlen
+= nwritten
;
1848 c
->lastinteraction
= time(NULL
);
1850 if (listLength(c
->reply
) == 0) {
1852 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1856 static struct redisCommand
*lookupCommand(char *name
) {
1858 while(cmdTable
[j
].name
!= NULL
) {
1859 if (!strcasecmp(name
,cmdTable
[j
].name
)) return &cmdTable
[j
];
1865 /* resetClient prepare the client to process the next command */
1866 static void resetClient(redisClient
*c
) {
1872 /* Call() is the core of Redis execution of a command */
1873 static void call(redisClient
*c
, struct redisCommand
*cmd
) {
1876 dirty
= server
.dirty
;
1878 if (server
.appendonly
&& server
.dirty
-dirty
)
1879 feedAppendOnlyFile(cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1880 if (server
.dirty
-dirty
&& listLength(server
.slaves
))
1881 replicationFeedSlaves(server
.slaves
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1882 if (listLength(server
.monitors
))
1883 replicationFeedSlaves(server
.monitors
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1884 server
.stat_numcommands
++;
1887 /* If this function gets called we already read a whole
1888 * command, argments are in the client argv/argc fields.
1889 * processCommand() execute the command or prepare the
1890 * server for a bulk read from the client.
1892 * If 1 is returned the client is still alive and valid and
1893 * and other operations can be performed by the caller. Otherwise
1894 * if 0 is returned the client was destroied (i.e. after QUIT). */
1895 static int processCommand(redisClient
*c
) {
1896 struct redisCommand
*cmd
;
1898 /* Free some memory if needed (maxmemory setting) */
1899 if (server
.maxmemory
) freeMemoryIfNeeded();
1901 /* Handle the multi bulk command type. This is an alternative protocol
1902 * supported by Redis in order to receive commands that are composed of
1903 * multiple binary-safe "bulk" arguments. The latency of processing is
1904 * a bit higher but this allows things like multi-sets, so if this
1905 * protocol is used only for MSET and similar commands this is a big win. */
1906 if (c
->multibulk
== 0 && c
->argc
== 1 && ((char*)(c
->argv
[0]->ptr
))[0] == '*') {
1907 c
->multibulk
= atoi(((char*)c
->argv
[0]->ptr
)+1);
1908 if (c
->multibulk
<= 0) {
1912 decrRefCount(c
->argv
[c
->argc
-1]);
1916 } else if (c
->multibulk
) {
1917 if (c
->bulklen
== -1) {
1918 if (((char*)c
->argv
[0]->ptr
)[0] != '$') {
1919 addReplySds(c
,sdsnew("-ERR multi bulk protocol error\r\n"));
1923 int bulklen
= atoi(((char*)c
->argv
[0]->ptr
)+1);
1924 decrRefCount(c
->argv
[0]);
1925 if (bulklen
< 0 || bulklen
> 1024*1024*1024) {
1927 addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n"));
1932 c
->bulklen
= bulklen
+2; /* add two bytes for CR+LF */
1936 c
->mbargv
= zrealloc(c
->mbargv
,(sizeof(robj
*))*(c
->mbargc
+1));
1937 c
->mbargv
[c
->mbargc
] = c
->argv
[0];
1941 if (c
->multibulk
== 0) {
1945 /* Here we need to swap the multi-bulk argc/argv with the
1946 * normal argc/argv of the client structure. */
1948 c
->argv
= c
->mbargv
;
1949 c
->mbargv
= auxargv
;
1952 c
->argc
= c
->mbargc
;
1953 c
->mbargc
= auxargc
;
1955 /* We need to set bulklen to something different than -1
1956 * in order for the code below to process the command without
1957 * to try to read the last argument of a bulk command as
1958 * a special argument. */
1960 /* continue below and process the command */
1967 /* -- end of multi bulk commands processing -- */
1969 /* The QUIT command is handled as a special case. Normal command
1970 * procs are unable to close the client connection safely */
1971 if (!strcasecmp(c
->argv
[0]->ptr
,"quit")) {
1975 cmd
= lookupCommand(c
->argv
[0]->ptr
);
1978 sdscatprintf(sdsempty(), "-ERR unknown command '%s'\r\n",
1979 (char*)c
->argv
[0]->ptr
));
1982 } else if ((cmd
->arity
> 0 && cmd
->arity
!= c
->argc
) ||
1983 (c
->argc
< -cmd
->arity
)) {
1985 sdscatprintf(sdsempty(),
1986 "-ERR wrong number of arguments for '%s' command\r\n",
1990 } else if (server
.maxmemory
&& cmd
->flags
& REDIS_CMD_DENYOOM
&& zmalloc_used_memory() > server
.maxmemory
) {
1991 addReplySds(c
,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1994 } else if (cmd
->flags
& REDIS_CMD_BULK
&& c
->bulklen
== -1) {
1995 int bulklen
= atoi(c
->argv
[c
->argc
-1]->ptr
);
1997 decrRefCount(c
->argv
[c
->argc
-1]);
1998 if (bulklen
< 0 || bulklen
> 1024*1024*1024) {
2000 addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n"));
2005 c
->bulklen
= bulklen
+2; /* add two bytes for CR+LF */
2006 /* It is possible that the bulk read is already in the
2007 * buffer. Check this condition and handle it accordingly.
2008 * This is just a fast path, alternative to call processInputBuffer().
2009 * It's a good idea since the code is small and this condition
2010 * happens most of the times. */
2011 if ((signed)sdslen(c
->querybuf
) >= c
->bulklen
) {
2012 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
2014 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
2019 /* Let's try to share objects on the command arguments vector */
2020 if (server
.shareobjects
) {
2022 for(j
= 1; j
< c
->argc
; j
++)
2023 c
->argv
[j
] = tryObjectSharing(c
->argv
[j
]);
2025 /* Let's try to encode the bulk object to save space. */
2026 if (cmd
->flags
& REDIS_CMD_BULK
)
2027 tryObjectEncoding(c
->argv
[c
->argc
-1]);
2029 /* Check if the user is authenticated */
2030 if (server
.requirepass
&& !c
->authenticated
&& cmd
->proc
!= authCommand
) {
2031 addReplySds(c
,sdsnew("-ERR operation not permitted\r\n"));
2036 /* Exec the command */
2037 if (c
->flags
& REDIS_MULTI
&& cmd
->proc
!= execCommand
) {
2038 queueMultiCommand(c
,cmd
);
2039 addReply(c
,shared
.queued
);
2044 /* Prepare the client for the next command */
2045 if (c
->flags
& REDIS_CLOSE
) {
2053 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
) {
2057 /* (args*2)+1 is enough room for args, spaces, newlines */
2058 robj
*static_outv
[REDIS_STATIC_ARGS
*2+1];
2060 if (argc
<= REDIS_STATIC_ARGS
) {
2063 outv
= zmalloc(sizeof(robj
*)*(argc
*2+1));
2066 for (j
= 0; j
< argc
; j
++) {
2067 if (j
!= 0) outv
[outc
++] = shared
.space
;
2068 if ((cmd
->flags
& REDIS_CMD_BULK
) && j
== argc
-1) {
2071 lenobj
= createObject(REDIS_STRING
,
2072 sdscatprintf(sdsempty(),"%lu\r\n",
2073 (unsigned long) stringObjectLen(argv
[j
])));
2074 lenobj
->refcount
= 0;
2075 outv
[outc
++] = lenobj
;
2077 outv
[outc
++] = argv
[j
];
2079 outv
[outc
++] = shared
.crlf
;
2081 /* Increment all the refcounts at start and decrement at end in order to
2082 * be sure to free objects if there is no slave in a replication state
2083 * able to be feed with commands */
2084 for (j
= 0; j
< outc
; j
++) incrRefCount(outv
[j
]);
2086 while((ln
= listYield(slaves
))) {
2087 redisClient
*slave
= ln
->value
;
2089 /* Don't feed slaves that are still waiting for BGSAVE to start */
2090 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) continue;
2092 /* Feed all the other slaves, MONITORs and so on */
2093 if (slave
->slaveseldb
!= dictid
) {
2097 case 0: selectcmd
= shared
.select0
; break;
2098 case 1: selectcmd
= shared
.select1
; break;
2099 case 2: selectcmd
= shared
.select2
; break;
2100 case 3: selectcmd
= shared
.select3
; break;
2101 case 4: selectcmd
= shared
.select4
; break;
2102 case 5: selectcmd
= shared
.select5
; break;
2103 case 6: selectcmd
= shared
.select6
; break;
2104 case 7: selectcmd
= shared
.select7
; break;
2105 case 8: selectcmd
= shared
.select8
; break;
2106 case 9: selectcmd
= shared
.select9
; break;
2108 selectcmd
= createObject(REDIS_STRING
,
2109 sdscatprintf(sdsempty(),"select %d\r\n",dictid
));
2110 selectcmd
->refcount
= 0;
2113 addReply(slave
,selectcmd
);
2114 slave
->slaveseldb
= dictid
;
2116 for (j
= 0; j
< outc
; j
++) addReply(slave
,outv
[j
]);
2118 for (j
= 0; j
< outc
; j
++) decrRefCount(outv
[j
]);
2119 if (outv
!= static_outv
) zfree(outv
);
2122 static void processInputBuffer(redisClient
*c
) {
2124 /* Before to process the input buffer, make sure the client is not
2125 * waitig for a blocking operation such as BLPOP. Note that the first
2126 * iteration the client is never blocked, otherwise the processInputBuffer
2127 * would not be called at all, but after the execution of the first commands
2128 * in the input buffer the client may be blocked, and the "goto again"
2129 * will try to reiterate. The following line will make it return asap. */
2130 if (c
->flags
& REDIS_BLOCKED
|| c
->flags
& REDIS_IO_WAIT
) return;
2131 if (c
->bulklen
== -1) {
2132 /* Read the first line of the query */
2133 char *p
= strchr(c
->querybuf
,'\n');
2140 query
= c
->querybuf
;
2141 c
->querybuf
= sdsempty();
2142 querylen
= 1+(p
-(query
));
2143 if (sdslen(query
) > querylen
) {
2144 /* leave data after the first line of the query in the buffer */
2145 c
->querybuf
= sdscatlen(c
->querybuf
,query
+querylen
,sdslen(query
)-querylen
);
2147 *p
= '\0'; /* remove "\n" */
2148 if (*(p
-1) == '\r') *(p
-1) = '\0'; /* and "\r" if any */
2149 sdsupdatelen(query
);
2151 /* Now we can split the query in arguments */
2152 argv
= sdssplitlen(query
,sdslen(query
)," ",1,&argc
);
2155 if (c
->argv
) zfree(c
->argv
);
2156 c
->argv
= zmalloc(sizeof(robj
*)*argc
);
2158 for (j
= 0; j
< argc
; j
++) {
2159 if (sdslen(argv
[j
])) {
2160 c
->argv
[c
->argc
] = createObject(REDIS_STRING
,argv
[j
]);
2168 /* Execute the command. If the client is still valid
2169 * after processCommand() return and there is something
2170 * on the query buffer try to process the next command. */
2171 if (processCommand(c
) && sdslen(c
->querybuf
)) goto again
;
2173 /* Nothing to process, argc == 0. Just process the query
2174 * buffer if it's not empty or return to the caller */
2175 if (sdslen(c
->querybuf
)) goto again
;
2178 } else if (sdslen(c
->querybuf
) >= REDIS_REQUEST_MAX_SIZE
) {
2179 redisLog(REDIS_VERBOSE
, "Client protocol error");
2184 /* Bulk read handling. Note that if we are at this point
2185 the client already sent a command terminated with a newline,
2186 we are reading the bulk data that is actually the last
2187 argument of the command. */
2188 int qbl
= sdslen(c
->querybuf
);
2190 if (c
->bulklen
<= qbl
) {
2191 /* Copy everything but the final CRLF as final argument */
2192 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
2194 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
2195 /* Process the command. If the client is still valid after
2196 * the processing and there is more data in the buffer
2197 * try to parse it. */
2198 if (processCommand(c
) && sdslen(c
->querybuf
)) goto again
;
2204 static void readQueryFromClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
2205 redisClient
*c
= (redisClient
*) privdata
;
2206 char buf
[REDIS_IOBUF_LEN
];
2209 REDIS_NOTUSED(mask
);
2211 nread
= read(fd
, buf
, REDIS_IOBUF_LEN
);
2213 if (errno
== EAGAIN
) {
2216 redisLog(REDIS_VERBOSE
, "Reading from client: %s",strerror(errno
));
2220 } else if (nread
== 0) {
2221 redisLog(REDIS_VERBOSE
, "Client closed connection");
2226 c
->querybuf
= sdscatlen(c
->querybuf
, buf
, nread
);
2227 c
->lastinteraction
= time(NULL
);
2231 processInputBuffer(c
);
2234 static int selectDb(redisClient
*c
, int id
) {
2235 if (id
< 0 || id
>= server
.dbnum
)
2237 c
->db
= &server
.db
[id
];
2241 static void *dupClientReplyValue(void *o
) {
2242 incrRefCount((robj
*)o
);
2246 static redisClient
*createClient(int fd
) {
2247 redisClient
*c
= zmalloc(sizeof(*c
));
2249 anetNonBlock(NULL
,fd
);
2250 anetTcpNoDelay(NULL
,fd
);
2251 if (!c
) return NULL
;
2254 c
->querybuf
= sdsempty();
2263 c
->lastinteraction
= time(NULL
);
2264 c
->authenticated
= 0;
2265 c
->replstate
= REDIS_REPL_NONE
;
2266 c
->reply
= listCreate();
2267 listSetFreeMethod(c
->reply
,decrRefCount
);
2268 listSetDupMethod(c
->reply
,dupClientReplyValue
);
2269 c
->blockingkeys
= NULL
;
2270 c
->blockingkeysnum
= 0;
2271 c
->io_keys
= listCreate();
2272 listSetFreeMethod(c
->io_keys
,decrRefCount
);
2273 if (aeCreateFileEvent(server
.el
, c
->fd
, AE_READABLE
,
2274 readQueryFromClient
, c
) == AE_ERR
) {
2278 listAddNodeTail(server
.clients
,c
);
2279 initClientMultiState(c
);
2283 static void addReply(redisClient
*c
, robj
*obj
) {
2284 if (listLength(c
->reply
) == 0 &&
2285 (c
->replstate
== REDIS_REPL_NONE
||
2286 c
->replstate
== REDIS_REPL_ONLINE
) &&
2287 aeCreateFileEvent(server
.el
, c
->fd
, AE_WRITABLE
,
2288 sendReplyToClient
, c
) == AE_ERR
) return;
2290 if (server
.vm_enabled
&& obj
->storage
!= REDIS_VM_MEMORY
) {
2291 obj
= dupStringObject(obj
);
2292 obj
->refcount
= 0; /* getDecodedObject() will increment the refcount */
2294 listAddNodeTail(c
->reply
,getDecodedObject(obj
));
2297 static void addReplySds(redisClient
*c
, sds s
) {
2298 robj
*o
= createObject(REDIS_STRING
,s
);
2303 static void addReplyDouble(redisClient
*c
, double d
) {
2306 snprintf(buf
,sizeof(buf
),"%.17g",d
);
2307 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n%s\r\n",
2308 (unsigned long) strlen(buf
),buf
));
2311 static void addReplyBulkLen(redisClient
*c
, robj
*obj
) {
2314 if (obj
->encoding
== REDIS_ENCODING_RAW
) {
2315 len
= sdslen(obj
->ptr
);
2317 long n
= (long)obj
->ptr
;
2319 /* Compute how many bytes will take this integer as a radix 10 string */
2325 while((n
= n
/10) != 0) {
2329 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n",(unsigned long)len
));
2332 static void acceptHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
2337 REDIS_NOTUSED(mask
);
2338 REDIS_NOTUSED(privdata
);
2340 cfd
= anetAccept(server
.neterr
, fd
, cip
, &cport
);
2341 if (cfd
== AE_ERR
) {
2342 redisLog(REDIS_VERBOSE
,"Accepting client connection: %s", server
.neterr
);
2345 redisLog(REDIS_VERBOSE
,"Accepted %s:%d", cip
, cport
);
2346 if ((c
= createClient(cfd
)) == NULL
) {
2347 redisLog(REDIS_WARNING
,"Error allocating resoures for the client");
2348 close(cfd
); /* May be already closed, just ingore errors */
2351 /* If maxclient directive is set and this is one client more... close the
2352 * connection. Note that we create the client instead to check before
2353 * for this condition, since now the socket is already set in nonblocking
2354 * mode and we can send an error for free using the Kernel I/O */
2355 if (server
.maxclients
&& listLength(server
.clients
) > server
.maxclients
) {
2356 char *err
= "-ERR max number of clients reached\r\n";
2358 /* That's a best effort error message, don't check write errors */
2359 if (write(c
->fd
,err
,strlen(err
)) == -1) {
2360 /* Nothing to do, Just to avoid the warning... */
2365 server
.stat_numconnections
++;
2368 /* ======================= Redis objects implementation ===================== */
2370 static robj
*createObject(int type
, void *ptr
) {
2373 if (server
.vm_enabled
) pthread_mutex_lock(&server
.obj_freelist_mutex
);
2374 if (listLength(server
.objfreelist
)) {
2375 listNode
*head
= listFirst(server
.objfreelist
);
2376 o
= listNodeValue(head
);
2377 listDelNode(server
.objfreelist
,head
);
2378 if (server
.vm_enabled
) pthread_mutex_unlock(&server
.obj_freelist_mutex
);
2380 if (server
.vm_enabled
) {
2381 pthread_mutex_unlock(&server
.obj_freelist_mutex
);
2382 o
= zmalloc(sizeof(*o
));
2384 o
= zmalloc(sizeof(*o
)-sizeof(struct redisObjectVM
));
2388 o
->encoding
= REDIS_ENCODING_RAW
;
2391 if (server
.vm_enabled
) {
2392 o
->vm
.atime
= server
.unixtime
;
2393 o
->storage
= REDIS_VM_MEMORY
;
2398 static robj
*createStringObject(char *ptr
, size_t len
) {
2399 return createObject(REDIS_STRING
,sdsnewlen(ptr
,len
));
2402 static robj
*dupStringObject(robj
*o
) {
2403 assert(o
->encoding
== REDIS_ENCODING_RAW
);
2404 return createStringObject(o
->ptr
,sdslen(o
->ptr
));
2407 static robj
*createListObject(void) {
2408 list
*l
= listCreate();
2410 listSetFreeMethod(l
,decrRefCount
);
2411 return createObject(REDIS_LIST
,l
);
2414 static robj
*createSetObject(void) {
2415 dict
*d
= dictCreate(&setDictType
,NULL
);
2416 return createObject(REDIS_SET
,d
);
2419 static robj
*createZsetObject(void) {
2420 zset
*zs
= zmalloc(sizeof(*zs
));
2422 zs
->dict
= dictCreate(&zsetDictType
,NULL
);
2423 zs
->zsl
= zslCreate();
2424 return createObject(REDIS_ZSET
,zs
);
2427 static void freeStringObject(robj
*o
) {
2428 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2433 static void freeListObject(robj
*o
) {
2434 listRelease((list
*) o
->ptr
);
2437 static void freeSetObject(robj
*o
) {
2438 dictRelease((dict
*) o
->ptr
);
2441 static void freeZsetObject(robj
*o
) {
2444 dictRelease(zs
->dict
);
2449 static void freeHashObject(robj
*o
) {
2450 dictRelease((dict
*) o
->ptr
);
2453 static void incrRefCount(robj
*o
) {
2454 redisAssert(!server
.vm_enabled
|| o
->storage
== REDIS_VM_MEMORY
);
2458 static void decrRefCount(void *obj
) {
2461 /* Object is swapped out, or in the process of being loaded. */
2462 if (server
.vm_enabled
&&
2463 (o
->storage
== REDIS_VM_SWAPPED
|| o
->storage
== REDIS_VM_LOADING
))
2465 if (o
->storage
== REDIS_VM_SWAPPED
|| o
->storage
== REDIS_VM_LOADING
) {
2466 redisAssert(o
->refcount
== 1);
2468 if (o
->storage
== REDIS_VM_LOADING
) vmCancelThreadedIOJob(obj
);
2469 redisAssert(o
->type
== REDIS_STRING
);
2470 freeStringObject(o
);
2471 vmMarkPagesFree(o
->vm
.page
,o
->vm
.usedpages
);
2472 pthread_mutex_lock(&server
.obj_freelist_mutex
);
2473 if (listLength(server
.objfreelist
) > REDIS_OBJFREELIST_MAX
||
2474 !listAddNodeHead(server
.objfreelist
,o
))
2476 pthread_mutex_unlock(&server
.obj_freelist_mutex
);
2477 server
.vm_stats_swapped_objects
--;
2480 /* Object is in memory, or in the process of being swapped out. */
2481 if (--(o
->refcount
) == 0) {
2482 if (server
.vm_enabled
&& o
->storage
== REDIS_VM_SWAPPING
)
2483 vmCancelThreadedIOJob(obj
);
2485 case REDIS_STRING
: freeStringObject(o
); break;
2486 case REDIS_LIST
: freeListObject(o
); break;
2487 case REDIS_SET
: freeSetObject(o
); break;
2488 case REDIS_ZSET
: freeZsetObject(o
); break;
2489 case REDIS_HASH
: freeHashObject(o
); break;
2490 default: redisAssert(0 != 0); break;
2492 if (server
.vm_enabled
) pthread_mutex_lock(&server
.obj_freelist_mutex
);
2493 if (listLength(server
.objfreelist
) > REDIS_OBJFREELIST_MAX
||
2494 !listAddNodeHead(server
.objfreelist
,o
))
2496 if (server
.vm_enabled
) pthread_mutex_unlock(&server
.obj_freelist_mutex
);
2500 static robj
*lookupKey(redisDb
*db
, robj
*key
) {
2501 dictEntry
*de
= dictFind(db
->dict
,key
);
2503 robj
*key
= dictGetEntryKey(de
);
2504 robj
*val
= dictGetEntryVal(de
);
2506 if (server
.vm_enabled
) {
2507 if (key
->storage
== REDIS_VM_MEMORY
||
2508 key
->storage
== REDIS_VM_SWAPPING
)
2510 /* If we were swapping the object out, stop it, this key
2512 if (key
->storage
== REDIS_VM_SWAPPING
)
2513 vmCancelThreadedIOJob(key
);
2514 /* Update the access time of the key for the aging algorithm. */
2515 key
->vm
.atime
= server
.unixtime
;
2517 /* Our value was swapped on disk. Bring it at home. */
2518 redisAssert(val
== NULL
);
2519 val
= vmLoadObject(key
);
2520 dictGetEntryVal(de
) = val
;
2529 static robj
*lookupKeyRead(redisDb
*db
, robj
*key
) {
2530 expireIfNeeded(db
,key
);
2531 return lookupKey(db
,key
);
2534 static robj
*lookupKeyWrite(redisDb
*db
, robj
*key
) {
2535 deleteIfVolatile(db
,key
);
2536 return lookupKey(db
,key
);
2539 static int deleteKey(redisDb
*db
, robj
*key
) {
2542 /* We need to protect key from destruction: after the first dictDelete()
2543 * it may happen that 'key' is no longer valid if we don't increment
2544 * it's count. This may happen when we get the object reference directly
2545 * from the hash table with dictRandomKey() or dict iterators */
2547 if (dictSize(db
->expires
)) dictDelete(db
->expires
,key
);
2548 retval
= dictDelete(db
->dict
,key
);
2551 return retval
== DICT_OK
;
2554 /* Try to share an object against the shared objects pool */
2555 static robj
*tryObjectSharing(robj
*o
) {
2556 struct dictEntry
*de
;
2559 if (o
== NULL
|| server
.shareobjects
== 0) return o
;
2561 redisAssert(o
->type
== REDIS_STRING
);
2562 de
= dictFind(server
.sharingpool
,o
);
2564 robj
*shared
= dictGetEntryKey(de
);
2566 c
= ((unsigned long) dictGetEntryVal(de
))+1;
2567 dictGetEntryVal(de
) = (void*) c
;
2568 incrRefCount(shared
);
2572 /* Here we are using a stream algorihtm: Every time an object is
2573 * shared we increment its count, everytime there is a miss we
2574 * recrement the counter of a random object. If this object reaches
2575 * zero we remove the object and put the current object instead. */
2576 if (dictSize(server
.sharingpool
) >=
2577 server
.sharingpoolsize
) {
2578 de
= dictGetRandomKey(server
.sharingpool
);
2579 redisAssert(de
!= NULL
);
2580 c
= ((unsigned long) dictGetEntryVal(de
))-1;
2581 dictGetEntryVal(de
) = (void*) c
;
2583 dictDelete(server
.sharingpool
,de
->key
);
2586 c
= 0; /* If the pool is empty we want to add this object */
2591 retval
= dictAdd(server
.sharingpool
,o
,(void*)1);
2592 redisAssert(retval
== DICT_OK
);
2599 /* Check if the nul-terminated string 's' can be represented by a long
2600 * (that is, is a number that fits into long without any other space or
2601 * character before or after the digits).
2603 * If so, the function returns REDIS_OK and *longval is set to the value
2604 * of the number. Otherwise REDIS_ERR is returned */
2605 static int isStringRepresentableAsLong(sds s
, long *longval
) {
2606 char buf
[32], *endptr
;
2610 value
= strtol(s
, &endptr
, 10);
2611 if (endptr
[0] != '\0') return REDIS_ERR
;
2612 slen
= snprintf(buf
,32,"%ld",value
);
2614 /* If the number converted back into a string is not identical
2615 * then it's not possible to encode the string as integer */
2616 if (sdslen(s
) != (unsigned)slen
|| memcmp(buf
,s
,slen
)) return REDIS_ERR
;
2617 if (longval
) *longval
= value
;
2621 /* Try to encode a string object in order to save space */
2622 static int tryObjectEncoding(robj
*o
) {
2626 if (o
->encoding
!= REDIS_ENCODING_RAW
)
2627 return REDIS_ERR
; /* Already encoded */
2629 /* It's not save to encode shared objects: shared objects can be shared
2630 * everywhere in the "object space" of Redis. Encoded objects can only
2631 * appear as "values" (and not, for instance, as keys) */
2632 if (o
->refcount
> 1) return REDIS_ERR
;
2634 /* Currently we try to encode only strings */
2635 redisAssert(o
->type
== REDIS_STRING
);
2637 /* Check if we can represent this string as a long integer */
2638 if (isStringRepresentableAsLong(s
,&value
) == REDIS_ERR
) return REDIS_ERR
;
2640 /* Ok, this object can be encoded */
2641 o
->encoding
= REDIS_ENCODING_INT
;
2643 o
->ptr
= (void*) value
;
2647 /* Get a decoded version of an encoded object (returned as a new object).
2648 * If the object is already raw-encoded just increment the ref count. */
2649 static robj
*getDecodedObject(robj
*o
) {
2652 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2656 if (o
->type
== REDIS_STRING
&& o
->encoding
== REDIS_ENCODING_INT
) {
2659 snprintf(buf
,32,"%ld",(long)o
->ptr
);
2660 dec
= createStringObject(buf
,strlen(buf
));
2663 redisAssert(1 != 1);
2667 /* Compare two string objects via strcmp() or alike.
2668 * Note that the objects may be integer-encoded. In such a case we
2669 * use snprintf() to get a string representation of the numbers on the stack
2670 * and compare the strings, it's much faster than calling getDecodedObject().
2672 * Important note: if objects are not integer encoded, but binary-safe strings,
2673 * sdscmp() from sds.c will apply memcmp() so this function ca be considered
2675 static int compareStringObjects(robj
*a
, robj
*b
) {
2676 redisAssert(a
->type
== REDIS_STRING
&& b
->type
== REDIS_STRING
);
2677 char bufa
[128], bufb
[128], *astr
, *bstr
;
2680 if (a
== b
) return 0;
2681 if (a
->encoding
!= REDIS_ENCODING_RAW
) {
2682 snprintf(bufa
,sizeof(bufa
),"%ld",(long) a
->ptr
);
2688 if (b
->encoding
!= REDIS_ENCODING_RAW
) {
2689 snprintf(bufb
,sizeof(bufb
),"%ld",(long) b
->ptr
);
2695 return bothsds
? sdscmp(astr
,bstr
) : strcmp(astr
,bstr
);
2698 static size_t stringObjectLen(robj
*o
) {
2699 redisAssert(o
->type
== REDIS_STRING
);
2700 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2701 return sdslen(o
->ptr
);
2705 return snprintf(buf
,32,"%ld",(long)o
->ptr
);
2709 /*============================ RDB saving/loading =========================== */
2711 static int rdbSaveType(FILE *fp
, unsigned char type
) {
2712 if (fwrite(&type
,1,1,fp
) == 0) return -1;
2716 static int rdbSaveTime(FILE *fp
, time_t t
) {
2717 int32_t t32
= (int32_t) t
;
2718 if (fwrite(&t32
,4,1,fp
) == 0) return -1;
2722 /* check rdbLoadLen() comments for more info */
2723 static int rdbSaveLen(FILE *fp
, uint32_t len
) {
2724 unsigned char buf
[2];
2727 /* Save a 6 bit len */
2728 buf
[0] = (len
&0xFF)|(REDIS_RDB_6BITLEN
<<6);
2729 if (fwrite(buf
,1,1,fp
) == 0) return -1;
2730 } else if (len
< (1<<14)) {
2731 /* Save a 14 bit len */
2732 buf
[0] = ((len
>>8)&0xFF)|(REDIS_RDB_14BITLEN
<<6);
2734 if (fwrite(buf
,2,1,fp
) == 0) return -1;
2736 /* Save a 32 bit len */
2737 buf
[0] = (REDIS_RDB_32BITLEN
<<6);
2738 if (fwrite(buf
,1,1,fp
) == 0) return -1;
2740 if (fwrite(&len
,4,1,fp
) == 0) return -1;
2745 /* String objects in the form "2391" "-100" without any space and with a
2746 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2747 * encoded as integers to save space */
2748 static int rdbTryIntegerEncoding(sds s
, unsigned char *enc
) {
2750 char *endptr
, buf
[32];
2752 /* Check if it's possible to encode this value as a number */
2753 value
= strtoll(s
, &endptr
, 10);
2754 if (endptr
[0] != '\0') return 0;
2755 snprintf(buf
,32,"%lld",value
);
2757 /* If the number converted back into a string is not identical
2758 * then it's not possible to encode the string as integer */
2759 if (strlen(buf
) != sdslen(s
) || memcmp(buf
,s
,sdslen(s
))) return 0;
2761 /* Finally check if it fits in our ranges */
2762 if (value
>= -(1<<7) && value
<= (1<<7)-1) {
2763 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT8
;
2764 enc
[1] = value
&0xFF;
2766 } else if (value
>= -(1<<15) && value
<= (1<<15)-1) {
2767 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT16
;
2768 enc
[1] = value
&0xFF;
2769 enc
[2] = (value
>>8)&0xFF;
2771 } else if (value
>= -((long long)1<<31) && value
<= ((long long)1<<31)-1) {
2772 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT32
;
2773 enc
[1] = value
&0xFF;
2774 enc
[2] = (value
>>8)&0xFF;
2775 enc
[3] = (value
>>16)&0xFF;
2776 enc
[4] = (value
>>24)&0xFF;
2783 static int rdbSaveLzfStringObject(FILE *fp
, robj
*obj
) {
2784 unsigned int comprlen
, outlen
;
2788 /* We require at least four bytes compression for this to be worth it */
2789 outlen
= sdslen(obj
->ptr
)-4;
2790 if (outlen
<= 0) return 0;
2791 if ((out
= zmalloc(outlen
+1)) == NULL
) return 0;
2792 comprlen
= lzf_compress(obj
->ptr
, sdslen(obj
->ptr
), out
, outlen
);
2793 if (comprlen
== 0) {
2797 /* Data compressed! Let's save it on disk */
2798 byte
= (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_LZF
;
2799 if (fwrite(&byte
,1,1,fp
) == 0) goto writeerr
;
2800 if (rdbSaveLen(fp
,comprlen
) == -1) goto writeerr
;
2801 if (rdbSaveLen(fp
,sdslen(obj
->ptr
)) == -1) goto writeerr
;
2802 if (fwrite(out
,comprlen
,1,fp
) == 0) goto writeerr
;
2811 /* Save a string objet as [len][data] on disk. If the object is a string
2812 * representation of an integer value we try to safe it in a special form */
2813 static int rdbSaveStringObjectRaw(FILE *fp
, robj
*obj
) {
2817 len
= sdslen(obj
->ptr
);
2819 /* Try integer encoding */
2821 unsigned char buf
[5];
2822 if ((enclen
= rdbTryIntegerEncoding(obj
->ptr
,buf
)) > 0) {
2823 if (fwrite(buf
,enclen
,1,fp
) == 0) return -1;
2828 /* Try LZF compression - under 20 bytes it's unable to compress even
2829 * aaaaaaaaaaaaaaaaaa so skip it */
2830 if (server
.rdbcompression
&& len
> 20) {
2833 retval
= rdbSaveLzfStringObject(fp
,obj
);
2834 if (retval
== -1) return -1;
2835 if (retval
> 0) return 0;
2836 /* retval == 0 means data can't be compressed, save the old way */
2839 /* Store verbatim */
2840 if (rdbSaveLen(fp
,len
) == -1) return -1;
2841 if (len
&& fwrite(obj
->ptr
,len
,1,fp
) == 0) return -1;
2845 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2846 static int rdbSaveStringObject(FILE *fp
, robj
*obj
) {
2849 if (obj
->storage
== REDIS_VM_MEMORY
&&
2850 obj
->encoding
!= REDIS_ENCODING_RAW
)
2852 obj
= getDecodedObject(obj
);
2853 retval
= rdbSaveStringObjectRaw(fp
,obj
);
2856 /* This is a fast path when we are sure the object is not encoded.
2857 * Note that's any *faster* actually as we needed to add the conditional
2858 * but because this may happen in a background process we don't want
2859 * to touch the object fields with incr/decrRefCount in order to
2860 * preveny copy on write of pages.
2862 * Also incrRefCount() will have a failing assert() if we try to call
2863 * it against an object with storage != REDIS_VM_MEMORY. */
2864 retval
= rdbSaveStringObjectRaw(fp
,obj
);
2869 /* Save a double value. Doubles are saved as strings prefixed by an unsigned
2870 * 8 bit integer specifing the length of the representation.
2871 * This 8 bit integer has special values in order to specify the following
2877 static int rdbSaveDoubleValue(FILE *fp
, double val
) {
2878 unsigned char buf
[128];
2884 } else if (!isfinite(val
)) {
2886 buf
[0] = (val
< 0) ? 255 : 254;
2888 snprintf((char*)buf
+1,sizeof(buf
)-1,"%.17g",val
);
2889 buf
[0] = strlen((char*)buf
+1);
2892 if (fwrite(buf
,len
,1,fp
) == 0) return -1;
2896 /* Save a Redis object. */
2897 static int rdbSaveObject(FILE *fp
, robj
*o
) {
2898 if (o
->type
== REDIS_STRING
) {
2899 /* Save a string value */
2900 if (rdbSaveStringObject(fp
,o
) == -1) return -1;
2901 } else if (o
->type
== REDIS_LIST
) {
2902 /* Save a list value */
2903 list
*list
= o
->ptr
;
2907 if (rdbSaveLen(fp
,listLength(list
)) == -1) return -1;
2908 while((ln
= listYield(list
))) {
2909 robj
*eleobj
= listNodeValue(ln
);
2911 if (rdbSaveStringObject(fp
,eleobj
) == -1) return -1;
2913 } else if (o
->type
== REDIS_SET
) {
2914 /* Save a set value */
2916 dictIterator
*di
= dictGetIterator(set
);
2919 if (rdbSaveLen(fp
,dictSize(set
)) == -1) return -1;
2920 while((de
= dictNext(di
)) != NULL
) {
2921 robj
*eleobj
= dictGetEntryKey(de
);
2923 if (rdbSaveStringObject(fp
,eleobj
) == -1) return -1;
2925 dictReleaseIterator(di
);
2926 } else if (o
->type
== REDIS_ZSET
) {
2927 /* Save a set value */
2929 dictIterator
*di
= dictGetIterator(zs
->dict
);
2932 if (rdbSaveLen(fp
,dictSize(zs
->dict
)) == -1) return -1;
2933 while((de
= dictNext(di
)) != NULL
) {
2934 robj
*eleobj
= dictGetEntryKey(de
);
2935 double *score
= dictGetEntryVal(de
);
2937 if (rdbSaveStringObject(fp
,eleobj
) == -1) return -1;
2938 if (rdbSaveDoubleValue(fp
,*score
) == -1) return -1;
2940 dictReleaseIterator(di
);
2942 redisAssert(0 != 0);
2947 /* Return the length the object will have on disk if saved with
2948 * the rdbSaveObject() function. Currently we use a trick to get
2949 * this length with very little changes to the code. In the future
2950 * we could switch to a faster solution. */
2951 static off_t
rdbSavedObjectLen(robj
*o
, FILE *fp
) {
2952 if (fp
== NULL
) fp
= server
.devnull
;
2954 assert(rdbSaveObject(fp
,o
) != 1);
2958 /* Return the number of pages required to save this object in the swap file */
2959 static off_t
rdbSavedObjectPages(robj
*o
, FILE *fp
) {
2960 off_t bytes
= rdbSavedObjectLen(o
,fp
);
2962 return (bytes
+(server
.vm_page_size
-1))/server
.vm_page_size
;
2965 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2966 static int rdbSave(char *filename
) {
2967 dictIterator
*di
= NULL
;
2972 time_t now
= time(NULL
);
2974 snprintf(tmpfile
,256,"temp-%d.rdb", (int) getpid());
2975 fp
= fopen(tmpfile
,"w");
2977 redisLog(REDIS_WARNING
, "Failed saving the DB: %s", strerror(errno
));
2980 if (fwrite("REDIS0001",9,1,fp
) == 0) goto werr
;
2981 for (j
= 0; j
< server
.dbnum
; j
++) {
2982 redisDb
*db
= server
.db
+j
;
2984 if (dictSize(d
) == 0) continue;
2985 di
= dictGetIterator(d
);
2991 /* Write the SELECT DB opcode */
2992 if (rdbSaveType(fp
,REDIS_SELECTDB
) == -1) goto werr
;
2993 if (rdbSaveLen(fp
,j
) == -1) goto werr
;
2995 /* Iterate this DB writing every entry */
2996 while((de
= dictNext(di
)) != NULL
) {
2997 robj
*key
= dictGetEntryKey(de
);
2998 robj
*o
= dictGetEntryVal(de
);
2999 time_t expiretime
= getExpire(db
,key
);
3001 /* Save the expire time */
3002 if (expiretime
!= -1) {
3003 /* If this key is already expired skip it */
3004 if (expiretime
< now
) continue;
3005 if (rdbSaveType(fp
,REDIS_EXPIRETIME
) == -1) goto werr
;
3006 if (rdbSaveTime(fp
,expiretime
) == -1) goto werr
;
3008 /* Save the key and associated value. This requires special
3009 * handling if the value is swapped out. */
3010 if (!server
.vm_enabled
|| key
->storage
== REDIS_VM_MEMORY
||
3011 key
->storage
== REDIS_VM_SWAPPING
) {
3012 /* Save type, key, value */
3013 if (rdbSaveType(fp
,o
->type
) == -1) goto werr
;
3014 if (rdbSaveStringObject(fp
,key
) == -1) goto werr
;
3015 if (rdbSaveObject(fp
,o
) == -1) goto werr
;
3017 /* REDIS_VM_SWAPPED or REDIS_VM_LOADING */
3019 /* Get a preview of the object in memory */
3020 po
= vmPreviewObject(key
);
3021 /* Save type, key, value */
3022 if (rdbSaveType(fp
,key
->vtype
) == -1) goto werr
;
3023 if (rdbSaveStringObject(fp
,key
) == -1) goto werr
;
3024 if (rdbSaveObject(fp
,po
) == -1) goto werr
;
3025 /* Remove the loaded object from memory */
3029 dictReleaseIterator(di
);
3032 if (rdbSaveType(fp
,REDIS_EOF
) == -1) goto werr
;
3034 /* Make sure data will not remain on the OS's output buffers */
3039 /* Use RENAME to make sure the DB file is changed atomically only
3040 * if the generate DB file is ok. */
3041 if (rename(tmpfile
,filename
) == -1) {
3042 redisLog(REDIS_WARNING
,"Error moving temp DB file on the final destination: %s", strerror(errno
));
3046 redisLog(REDIS_NOTICE
,"DB saved on disk");
3048 server
.lastsave
= time(NULL
);
3054 redisLog(REDIS_WARNING
,"Write error saving DB on disk: %s", strerror(errno
));
3055 if (di
) dictReleaseIterator(di
);
3059 static int rdbSaveBackground(char *filename
) {
3062 if (server
.bgsavechildpid
!= -1) return REDIS_ERR
;
3063 if ((childpid
= fork()) == 0) {
3066 if (rdbSave(filename
) == REDIS_OK
) {
3073 if (childpid
== -1) {
3074 redisLog(REDIS_WARNING
,"Can't save in background: fork: %s",
3078 redisLog(REDIS_NOTICE
,"Background saving started by pid %d",childpid
);
3079 server
.bgsavechildpid
= childpid
;
3082 return REDIS_OK
; /* unreached */
3085 static void rdbRemoveTempFile(pid_t childpid
) {
3088 snprintf(tmpfile
,256,"temp-%d.rdb", (int) childpid
);
3092 static int rdbLoadType(FILE *fp
) {
3094 if (fread(&type
,1,1,fp
) == 0) return -1;
3098 static time_t rdbLoadTime(FILE *fp
) {
3100 if (fread(&t32
,4,1,fp
) == 0) return -1;
3101 return (time_t) t32
;
3104 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
3105 * of this file for a description of how this are stored on disk.
3107 * isencoded is set to 1 if the readed length is not actually a length but
3108 * an "encoding type", check the above comments for more info */
3109 static uint32_t rdbLoadLen(FILE *fp
, int *isencoded
) {
3110 unsigned char buf
[2];
3114 if (isencoded
) *isencoded
= 0;
3115 if (fread(buf
,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
3116 type
= (buf
[0]&0xC0)>>6;
3117 if (type
== REDIS_RDB_6BITLEN
) {
3118 /* Read a 6 bit len */
3120 } else if (type
== REDIS_RDB_ENCVAL
) {
3121 /* Read a 6 bit len encoding type */
3122 if (isencoded
) *isencoded
= 1;
3124 } else if (type
== REDIS_RDB_14BITLEN
) {
3125 /* Read a 14 bit len */
3126 if (fread(buf
+1,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
3127 return ((buf
[0]&0x3F)<<8)|buf
[1];
3129 /* Read a 32 bit len */
3130 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
3135 static robj
*rdbLoadIntegerObject(FILE *fp
, int enctype
) {
3136 unsigned char enc
[4];
3139 if (enctype
== REDIS_RDB_ENC_INT8
) {
3140 if (fread(enc
,1,1,fp
) == 0) return NULL
;
3141 val
= (signed char)enc
[0];
3142 } else if (enctype
== REDIS_RDB_ENC_INT16
) {
3144 if (fread(enc
,2,1,fp
) == 0) return NULL
;
3145 v
= enc
[0]|(enc
[1]<<8);
3147 } else if (enctype
== REDIS_RDB_ENC_INT32
) {
3149 if (fread(enc
,4,1,fp
) == 0) return NULL
;
3150 v
= enc
[0]|(enc
[1]<<8)|(enc
[2]<<16)|(enc
[3]<<24);
3153 val
= 0; /* anti-warning */
3156 return createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",val
));
3159 static robj
*rdbLoadLzfStringObject(FILE*fp
) {
3160 unsigned int len
, clen
;
3161 unsigned char *c
= NULL
;
3164 if ((clen
= rdbLoadLen(fp
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
3165 if ((len
= rdbLoadLen(fp
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
3166 if ((c
= zmalloc(clen
)) == NULL
) goto err
;
3167 if ((val
= sdsnewlen(NULL
,len
)) == NULL
) goto err
;
3168 if (fread(c
,clen
,1,fp
) == 0) goto err
;
3169 if (lzf_decompress(c
,clen
,val
,len
) == 0) goto err
;
3171 return createObject(REDIS_STRING
,val
);
3178 static robj
*rdbLoadStringObject(FILE*fp
) {
3183 len
= rdbLoadLen(fp
,&isencoded
);
3186 case REDIS_RDB_ENC_INT8
:
3187 case REDIS_RDB_ENC_INT16
:
3188 case REDIS_RDB_ENC_INT32
:
3189 return tryObjectSharing(rdbLoadIntegerObject(fp
,len
));
3190 case REDIS_RDB_ENC_LZF
:
3191 return tryObjectSharing(rdbLoadLzfStringObject(fp
));
3197 if (len
== REDIS_RDB_LENERR
) return NULL
;
3198 val
= sdsnewlen(NULL
,len
);
3199 if (len
&& fread(val
,len
,1,fp
) == 0) {
3203 return tryObjectSharing(createObject(REDIS_STRING
,val
));
3206 /* For information about double serialization check rdbSaveDoubleValue() */
3207 static int rdbLoadDoubleValue(FILE *fp
, double *val
) {
3211 if (fread(&len
,1,1,fp
) == 0) return -1;
3213 case 255: *val
= R_NegInf
; return 0;
3214 case 254: *val
= R_PosInf
; return 0;
3215 case 253: *val
= R_Nan
; return 0;
3217 if (fread(buf
,len
,1,fp
) == 0) return -1;
3219 sscanf(buf
, "%lg", val
);
3224 /* Load a Redis object of the specified type from the specified file.
3225 * On success a newly allocated object is returned, otherwise NULL. */
3226 static robj
*rdbLoadObject(int type
, FILE *fp
) {
3229 if (type
== REDIS_STRING
) {
3230 /* Read string value */
3231 if ((o
= rdbLoadStringObject(fp
)) == NULL
) return NULL
;
3232 tryObjectEncoding(o
);
3233 } else if (type
== REDIS_LIST
|| type
== REDIS_SET
) {
3234 /* Read list/set value */
3237 if ((listlen
= rdbLoadLen(fp
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
3238 o
= (type
== REDIS_LIST
) ? createListObject() : createSetObject();
3239 /* Load every single element of the list/set */
3243 if ((ele
= rdbLoadStringObject(fp
)) == NULL
) return NULL
;
3244 tryObjectEncoding(ele
);
3245 if (type
== REDIS_LIST
) {
3246 listAddNodeTail((list
*)o
->ptr
,ele
);
3248 dictAdd((dict
*)o
->ptr
,ele
,NULL
);
3251 } else if (type
== REDIS_ZSET
) {
3252 /* Read list/set value */
3256 if ((zsetlen
= rdbLoadLen(fp
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
3257 o
= createZsetObject();
3259 /* Load every single element of the list/set */
3262 double *score
= zmalloc(sizeof(double));
3264 if ((ele
= rdbLoadStringObject(fp
)) == NULL
) return NULL
;
3265 tryObjectEncoding(ele
);
3266 if (rdbLoadDoubleValue(fp
,score
) == -1) return NULL
;
3267 dictAdd(zs
->dict
,ele
,score
);
3268 zslInsert(zs
->zsl
,*score
,ele
);
3269 incrRefCount(ele
); /* added to skiplist */
3272 redisAssert(0 != 0);
3277 static int rdbLoad(char *filename
) {
3279 robj
*keyobj
= NULL
;
3281 int type
, retval
, rdbver
;
3282 dict
*d
= server
.db
[0].dict
;
3283 redisDb
*db
= server
.db
+0;
3285 time_t expiretime
= -1, now
= time(NULL
);
3286 long long loadedkeys
= 0;
3288 fp
= fopen(filename
,"r");
3289 if (!fp
) return REDIS_ERR
;
3290 if (fread(buf
,9,1,fp
) == 0) goto eoferr
;
3292 if (memcmp(buf
,"REDIS",5) != 0) {
3294 redisLog(REDIS_WARNING
,"Wrong signature trying to load DB from file");
3297 rdbver
= atoi(buf
+5);
3300 redisLog(REDIS_WARNING
,"Can't handle RDB format version %d",rdbver
);
3307 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
3308 if (type
== REDIS_EXPIRETIME
) {
3309 if ((expiretime
= rdbLoadTime(fp
)) == -1) goto eoferr
;
3310 /* We read the time so we need to read the object type again */
3311 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
3313 if (type
== REDIS_EOF
) break;
3314 /* Handle SELECT DB opcode as a special case */
3315 if (type
== REDIS_SELECTDB
) {
3316 if ((dbid
= rdbLoadLen(fp
,NULL
)) == REDIS_RDB_LENERR
)
3318 if (dbid
>= (unsigned)server
.dbnum
) {
3319 redisLog(REDIS_WARNING
,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server
.dbnum
);
3322 db
= server
.db
+dbid
;
3327 if ((keyobj
= rdbLoadStringObject(fp
)) == NULL
) goto eoferr
;
3329 if ((o
= rdbLoadObject(type
,fp
)) == NULL
) goto eoferr
;
3330 /* Add the new object in the hash table */
3331 retval
= dictAdd(d
,keyobj
,o
);
3332 if (retval
== DICT_ERR
) {
3333 redisLog(REDIS_WARNING
,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj
->ptr
);
3336 /* Set the expire time if needed */
3337 if (expiretime
!= -1) {
3338 setExpire(db
,keyobj
,expiretime
);
3339 /* Delete this key if already expired */
3340 if (expiretime
< now
) deleteKey(db
,keyobj
);
3344 /* Handle swapping while loading big datasets when VM is on */
3346 if (server
.vm_enabled
&& (loadedkeys
% 5000) == 0) {
3347 while (zmalloc_used_memory() > server
.vm_max_memory
) {
3348 if (vmSwapOneObjectBlocking() == REDIS_ERR
) break;
3355 eoferr
: /* unexpected end of file is handled here with a fatal exit */
3356 if (keyobj
) decrRefCount(keyobj
);
3357 redisLog(REDIS_WARNING
,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
3359 return REDIS_ERR
; /* Just to avoid warning */
3362 /*================================== Commands =============================== */
3364 static void authCommand(redisClient
*c
) {
3365 if (!server
.requirepass
|| !strcmp(c
->argv
[1]->ptr
, server
.requirepass
)) {
3366 c
->authenticated
= 1;
3367 addReply(c
,shared
.ok
);
3369 c
->authenticated
= 0;
3370 addReplySds(c
,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
3374 static void pingCommand(redisClient
*c
) {
3375 addReply(c
,shared
.pong
);
3378 static void echoCommand(redisClient
*c
) {
3379 addReplyBulkLen(c
,c
->argv
[1]);
3380 addReply(c
,c
->argv
[1]);
3381 addReply(c
,shared
.crlf
);
3384 /*=================================== Strings =============================== */
3386 static void setGenericCommand(redisClient
*c
, int nx
) {
3389 if (nx
) deleteIfVolatile(c
->db
,c
->argv
[1]);
3390 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
3391 if (retval
== DICT_ERR
) {
3393 /* If the key is about a swapped value, we want a new key object
3394 * to overwrite the old. So we delete the old key in the database.
3395 * This will also make sure that swap pages about the old object
3396 * will be marked as free. */
3397 if (deleteIfSwapped(c
->db
,c
->argv
[1]))
3398 incrRefCount(c
->argv
[1]);
3399 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
3400 incrRefCount(c
->argv
[2]);
3402 addReply(c
,shared
.czero
);
3406 incrRefCount(c
->argv
[1]);
3407 incrRefCount(c
->argv
[2]);
3410 removeExpire(c
->db
,c
->argv
[1]);
3411 addReply(c
, nx
? shared
.cone
: shared
.ok
);
3414 static void setCommand(redisClient
*c
) {
3415 setGenericCommand(c
,0);
3418 static void setnxCommand(redisClient
*c
) {
3419 setGenericCommand(c
,1);
3422 static int getGenericCommand(redisClient
*c
) {
3423 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3426 addReply(c
,shared
.nullbulk
);
3429 if (o
->type
!= REDIS_STRING
) {
3430 addReply(c
,shared
.wrongtypeerr
);
3433 addReplyBulkLen(c
,o
);
3435 addReply(c
,shared
.crlf
);
3441 static void getCommand(redisClient
*c
) {
3442 getGenericCommand(c
);
3445 static void getsetCommand(redisClient
*c
) {
3446 if (getGenericCommand(c
) == REDIS_ERR
) return;
3447 if (dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]) == DICT_ERR
) {
3448 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
3450 incrRefCount(c
->argv
[1]);
3452 incrRefCount(c
->argv
[2]);
3454 removeExpire(c
->db
,c
->argv
[1]);
3457 static void mgetCommand(redisClient
*c
) {
3460 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",c
->argc
-1));
3461 for (j
= 1; j
< c
->argc
; j
++) {
3462 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[j
]);
3464 addReply(c
,shared
.nullbulk
);
3466 if (o
->type
!= REDIS_STRING
) {
3467 addReply(c
,shared
.nullbulk
);
3469 addReplyBulkLen(c
,o
);
3471 addReply(c
,shared
.crlf
);
3477 static void msetGenericCommand(redisClient
*c
, int nx
) {
3478 int j
, busykeys
= 0;
3480 if ((c
->argc
% 2) == 0) {
3481 addReplySds(c
,sdsnew("-ERR wrong number of arguments for MSET\r\n"));
3484 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
3485 * set nothing at all if at least one already key exists. */
3487 for (j
= 1; j
< c
->argc
; j
+= 2) {
3488 if (lookupKeyWrite(c
->db
,c
->argv
[j
]) != NULL
) {
3494 addReply(c
, shared
.czero
);
3498 for (j
= 1; j
< c
->argc
; j
+= 2) {
3501 tryObjectEncoding(c
->argv
[j
+1]);
3502 retval
= dictAdd(c
->db
->dict
,c
->argv
[j
],c
->argv
[j
+1]);
3503 if (retval
== DICT_ERR
) {
3504 dictReplace(c
->db
->dict
,c
->argv
[j
],c
->argv
[j
+1]);
3505 incrRefCount(c
->argv
[j
+1]);
3507 incrRefCount(c
->argv
[j
]);
3508 incrRefCount(c
->argv
[j
+1]);
3510 removeExpire(c
->db
,c
->argv
[j
]);
3512 server
.dirty
+= (c
->argc
-1)/2;
3513 addReply(c
, nx
? shared
.cone
: shared
.ok
);
3516 static void msetCommand(redisClient
*c
) {
3517 msetGenericCommand(c
,0);
3520 static void msetnxCommand(redisClient
*c
) {
3521 msetGenericCommand(c
,1);
3524 static void incrDecrCommand(redisClient
*c
, long long incr
) {
3529 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3533 if (o
->type
!= REDIS_STRING
) {
3538 if (o
->encoding
== REDIS_ENCODING_RAW
)
3539 value
= strtoll(o
->ptr
, &eptr
, 10);
3540 else if (o
->encoding
== REDIS_ENCODING_INT
)
3541 value
= (long)o
->ptr
;
3543 redisAssert(1 != 1);
3548 o
= createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",value
));
3549 tryObjectEncoding(o
);
3550 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],o
);
3551 if (retval
== DICT_ERR
) {
3552 dictReplace(c
->db
->dict
,c
->argv
[1],o
);
3553 removeExpire(c
->db
,c
->argv
[1]);
3555 incrRefCount(c
->argv
[1]);
3558 addReply(c
,shared
.colon
);
3560 addReply(c
,shared
.crlf
);
3563 static void incrCommand(redisClient
*c
) {
3564 incrDecrCommand(c
,1);
3567 static void decrCommand(redisClient
*c
) {
3568 incrDecrCommand(c
,-1);
3571 static void incrbyCommand(redisClient
*c
) {
3572 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
3573 incrDecrCommand(c
,incr
);
3576 static void decrbyCommand(redisClient
*c
) {
3577 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
3578 incrDecrCommand(c
,-incr
);
3581 /* ========================= Type agnostic commands ========================= */
3583 static void delCommand(redisClient
*c
) {
3586 for (j
= 1; j
< c
->argc
; j
++) {
3587 if (deleteKey(c
->db
,c
->argv
[j
])) {
3594 addReply(c
,shared
.czero
);
3597 addReply(c
,shared
.cone
);
3600 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",deleted
));
3605 static void existsCommand(redisClient
*c
) {
3606 addReply(c
,lookupKeyRead(c
->db
,c
->argv
[1]) ? shared
.cone
: shared
.czero
);
3609 static void selectCommand(redisClient
*c
) {
3610 int id
= atoi(c
->argv
[1]->ptr
);
3612 if (selectDb(c
,id
) == REDIS_ERR
) {
3613 addReplySds(c
,sdsnew("-ERR invalid DB index\r\n"));
3615 addReply(c
,shared
.ok
);
3619 static void randomkeyCommand(redisClient
*c
) {
3623 de
= dictGetRandomKey(c
->db
->dict
);
3624 if (!de
|| expireIfNeeded(c
->db
,dictGetEntryKey(de
)) == 0) break;
3627 addReply(c
,shared
.plus
);
3628 addReply(c
,shared
.crlf
);
3630 addReply(c
,shared
.plus
);
3631 addReply(c
,dictGetEntryKey(de
));
3632 addReply(c
,shared
.crlf
);
3636 static void keysCommand(redisClient
*c
) {
3639 sds pattern
= c
->argv
[1]->ptr
;
3640 int plen
= sdslen(pattern
);
3641 unsigned long numkeys
= 0, keyslen
= 0;
3642 robj
*lenobj
= createObject(REDIS_STRING
,NULL
);
3644 di
= dictGetIterator(c
->db
->dict
);
3646 decrRefCount(lenobj
);
3647 while((de
= dictNext(di
)) != NULL
) {
3648 robj
*keyobj
= dictGetEntryKey(de
);
3650 sds key
= keyobj
->ptr
;
3651 if ((pattern
[0] == '*' && pattern
[1] == '\0') ||
3652 stringmatchlen(pattern
,plen
,key
,sdslen(key
),0)) {
3653 if (expireIfNeeded(c
->db
,keyobj
) == 0) {
3655 addReply(c
,shared
.space
);
3658 keyslen
+= sdslen(key
);
3662 dictReleaseIterator(di
);
3663 lenobj
->ptr
= sdscatprintf(sdsempty(),"$%lu\r\n",keyslen
+(numkeys
? (numkeys
-1) : 0));
3664 addReply(c
,shared
.crlf
);
3667 static void dbsizeCommand(redisClient
*c
) {
3669 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c
->db
->dict
)));
3672 static void lastsaveCommand(redisClient
*c
) {
3674 sdscatprintf(sdsempty(),":%lu\r\n",server
.lastsave
));
3677 static void typeCommand(redisClient
*c
) {
3681 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3686 case REDIS_STRING
: type
= "+string"; break;
3687 case REDIS_LIST
: type
= "+list"; break;
3688 case REDIS_SET
: type
= "+set"; break;
3689 case REDIS_ZSET
: type
= "+zset"; break;
3690 default: type
= "unknown"; break;
3693 addReplySds(c
,sdsnew(type
));
3694 addReply(c
,shared
.crlf
);
3697 static void saveCommand(redisClient
*c
) {
3698 if (server
.bgsavechildpid
!= -1) {
3699 addReplySds(c
,sdsnew("-ERR background save in progress\r\n"));
3702 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
3703 addReply(c
,shared
.ok
);
3705 addReply(c
,shared
.err
);
3709 static void bgsaveCommand(redisClient
*c
) {
3710 if (server
.bgsavechildpid
!= -1) {
3711 addReplySds(c
,sdsnew("-ERR background save already in progress\r\n"));
3714 if (rdbSaveBackground(server
.dbfilename
) == REDIS_OK
) {
3715 char *status
= "+Background saving started\r\n";
3716 addReplySds(c
,sdsnew(status
));
3718 addReply(c
,shared
.err
);
3722 static void shutdownCommand(redisClient
*c
) {
3723 redisLog(REDIS_WARNING
,"User requested shutdown, saving DB...");
3724 /* Kill the saving child if there is a background saving in progress.
3725 We want to avoid race conditions, for instance our saving child may
3726 overwrite the synchronous saving did by SHUTDOWN. */
3727 if (server
.bgsavechildpid
!= -1) {
3728 redisLog(REDIS_WARNING
,"There is a live saving child. Killing it!");
3729 kill(server
.bgsavechildpid
,SIGKILL
);
3730 rdbRemoveTempFile(server
.bgsavechildpid
);
3732 if (server
.appendonly
) {
3733 /* Append only file: fsync() the AOF and exit */
3734 fsync(server
.appendfd
);
3737 /* Snapshotting. Perform a SYNC SAVE and exit */
3738 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
3739 if (server
.daemonize
)
3740 unlink(server
.pidfile
);
3741 redisLog(REDIS_WARNING
,"%zu bytes used at exit",zmalloc_used_memory());
3742 redisLog(REDIS_WARNING
,"Server exit now, bye bye...");
3745 /* Ooops.. error saving! The best we can do is to continue operating.
3746 * Note that if there was a background saving process, in the next
3747 * cron() Redis will be notified that the background saving aborted,
3748 * handling special stuff like slaves pending for synchronization... */
3749 redisLog(REDIS_WARNING
,"Error trying to save the DB, can't exit");
3750 addReplySds(c
,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3755 static void renameGenericCommand(redisClient
*c
, int nx
) {
3758 /* To use the same key as src and dst is probably an error */
3759 if (sdscmp(c
->argv
[1]->ptr
,c
->argv
[2]->ptr
) == 0) {
3760 addReply(c
,shared
.sameobjecterr
);
3764 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3766 addReply(c
,shared
.nokeyerr
);
3770 deleteIfVolatile(c
->db
,c
->argv
[2]);
3771 if (dictAdd(c
->db
->dict
,c
->argv
[2],o
) == DICT_ERR
) {
3774 addReply(c
,shared
.czero
);
3777 dictReplace(c
->db
->dict
,c
->argv
[2],o
);
3779 incrRefCount(c
->argv
[2]);
3781 deleteKey(c
->db
,c
->argv
[1]);
3783 addReply(c
,nx
? shared
.cone
: shared
.ok
);
3786 static void renameCommand(redisClient
*c
) {
3787 renameGenericCommand(c
,0);
3790 static void renamenxCommand(redisClient
*c
) {
3791 renameGenericCommand(c
,1);
3794 static void moveCommand(redisClient
*c
) {
3799 /* Obtain source and target DB pointers */
3802 if (selectDb(c
,atoi(c
->argv
[2]->ptr
)) == REDIS_ERR
) {
3803 addReply(c
,shared
.outofrangeerr
);
3807 selectDb(c
,srcid
); /* Back to the source DB */
3809 /* If the user is moving using as target the same
3810 * DB as the source DB it is probably an error. */
3812 addReply(c
,shared
.sameobjecterr
);
3816 /* Check if the element exists and get a reference */
3817 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3819 addReply(c
,shared
.czero
);
3823 /* Try to add the element to the target DB */
3824 deleteIfVolatile(dst
,c
->argv
[1]);
3825 if (dictAdd(dst
->dict
,c
->argv
[1],o
) == DICT_ERR
) {
3826 addReply(c
,shared
.czero
);
3829 incrRefCount(c
->argv
[1]);
3832 /* OK! key moved, free the entry in the source DB */
3833 deleteKey(src
,c
->argv
[1]);
3835 addReply(c
,shared
.cone
);
3838 /* =================================== Lists ================================ */
3839 static void pushGenericCommand(redisClient
*c
, int where
) {
3843 lobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3845 if (handleClientsWaitingListPush(c
,c
->argv
[1],c
->argv
[2])) {
3846 addReply(c
,shared
.ok
);
3849 lobj
= createListObject();
3851 if (where
== REDIS_HEAD
) {
3852 listAddNodeHead(list
,c
->argv
[2]);
3854 listAddNodeTail(list
,c
->argv
[2]);
3856 dictAdd(c
->db
->dict
,c
->argv
[1],lobj
);
3857 incrRefCount(c
->argv
[1]);
3858 incrRefCount(c
->argv
[2]);
3860 if (lobj
->type
!= REDIS_LIST
) {
3861 addReply(c
,shared
.wrongtypeerr
);
3864 if (handleClientsWaitingListPush(c
,c
->argv
[1],c
->argv
[2])) {
3865 addReply(c
,shared
.ok
);
3869 if (where
== REDIS_HEAD
) {
3870 listAddNodeHead(list
,c
->argv
[2]);
3872 listAddNodeTail(list
,c
->argv
[2]);
3874 incrRefCount(c
->argv
[2]);
3877 addReply(c
,shared
.ok
);
3880 static void lpushCommand(redisClient
*c
) {
3881 pushGenericCommand(c
,REDIS_HEAD
);
3884 static void rpushCommand(redisClient
*c
) {
3885 pushGenericCommand(c
,REDIS_TAIL
);
3888 static void llenCommand(redisClient
*c
) {
3892 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3894 addReply(c
,shared
.czero
);
3897 if (o
->type
!= REDIS_LIST
) {
3898 addReply(c
,shared
.wrongtypeerr
);
3901 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",listLength(l
)));
3906 static void lindexCommand(redisClient
*c
) {
3908 int index
= atoi(c
->argv
[2]->ptr
);
3910 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3912 addReply(c
,shared
.nullbulk
);
3914 if (o
->type
!= REDIS_LIST
) {
3915 addReply(c
,shared
.wrongtypeerr
);
3917 list
*list
= o
->ptr
;
3920 ln
= listIndex(list
, index
);
3922 addReply(c
,shared
.nullbulk
);
3924 robj
*ele
= listNodeValue(ln
);
3925 addReplyBulkLen(c
,ele
);
3927 addReply(c
,shared
.crlf
);
3933 static void lsetCommand(redisClient
*c
) {
3935 int index
= atoi(c
->argv
[2]->ptr
);
3937 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3939 addReply(c
,shared
.nokeyerr
);
3941 if (o
->type
!= REDIS_LIST
) {
3942 addReply(c
,shared
.wrongtypeerr
);
3944 list
*list
= o
->ptr
;
3947 ln
= listIndex(list
, index
);
3949 addReply(c
,shared
.outofrangeerr
);
3951 robj
*ele
= listNodeValue(ln
);
3954 listNodeValue(ln
) = c
->argv
[3];
3955 incrRefCount(c
->argv
[3]);
3956 addReply(c
,shared
.ok
);
3963 static void popGenericCommand(redisClient
*c
, int where
) {
3966 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3968 addReply(c
,shared
.nullbulk
);
3970 if (o
->type
!= REDIS_LIST
) {
3971 addReply(c
,shared
.wrongtypeerr
);
3973 list
*list
= o
->ptr
;
3976 if (where
== REDIS_HEAD
)
3977 ln
= listFirst(list
);
3979 ln
= listLast(list
);
3982 addReply(c
,shared
.nullbulk
);
3984 robj
*ele
= listNodeValue(ln
);
3985 addReplyBulkLen(c
,ele
);
3987 addReply(c
,shared
.crlf
);
3988 listDelNode(list
,ln
);
3995 static void lpopCommand(redisClient
*c
) {
3996 popGenericCommand(c
,REDIS_HEAD
);
3999 static void rpopCommand(redisClient
*c
) {
4000 popGenericCommand(c
,REDIS_TAIL
);
4003 static void lrangeCommand(redisClient
*c
) {
4005 int start
= atoi(c
->argv
[2]->ptr
);
4006 int end
= atoi(c
->argv
[3]->ptr
);
4008 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4010 addReply(c
,shared
.nullmultibulk
);
4012 if (o
->type
!= REDIS_LIST
) {
4013 addReply(c
,shared
.wrongtypeerr
);
4015 list
*list
= o
->ptr
;
4017 int llen
= listLength(list
);
4021 /* convert negative indexes */
4022 if (start
< 0) start
= llen
+start
;
4023 if (end
< 0) end
= llen
+end
;
4024 if (start
< 0) start
= 0;
4025 if (end
< 0) end
= 0;
4027 /* indexes sanity checks */
4028 if (start
> end
|| start
>= llen
) {
4029 /* Out of range start or start > end result in empty list */
4030 addReply(c
,shared
.emptymultibulk
);
4033 if (end
>= llen
) end
= llen
-1;
4034 rangelen
= (end
-start
)+1;
4036 /* Return the result in form of a multi-bulk reply */
4037 ln
= listIndex(list
, start
);
4038 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",rangelen
));
4039 for (j
= 0; j
< rangelen
; j
++) {
4040 ele
= listNodeValue(ln
);
4041 addReplyBulkLen(c
,ele
);
4043 addReply(c
,shared
.crlf
);
4050 static void ltrimCommand(redisClient
*c
) {
4052 int start
= atoi(c
->argv
[2]->ptr
);
4053 int end
= atoi(c
->argv
[3]->ptr
);
4055 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4057 addReply(c
,shared
.ok
);
4059 if (o
->type
!= REDIS_LIST
) {
4060 addReply(c
,shared
.wrongtypeerr
);
4062 list
*list
= o
->ptr
;
4064 int llen
= listLength(list
);
4065 int j
, ltrim
, rtrim
;
4067 /* convert negative indexes */
4068 if (start
< 0) start
= llen
+start
;
4069 if (end
< 0) end
= llen
+end
;
4070 if (start
< 0) start
= 0;
4071 if (end
< 0) end
= 0;
4073 /* indexes sanity checks */
4074 if (start
> end
|| start
>= llen
) {
4075 /* Out of range start or start > end result in empty list */
4079 if (end
>= llen
) end
= llen
-1;
4084 /* Remove list elements to perform the trim */
4085 for (j
= 0; j
< ltrim
; j
++) {
4086 ln
= listFirst(list
);
4087 listDelNode(list
,ln
);
4089 for (j
= 0; j
< rtrim
; j
++) {
4090 ln
= listLast(list
);
4091 listDelNode(list
,ln
);
4094 addReply(c
,shared
.ok
);
4099 static void lremCommand(redisClient
*c
) {
4102 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4104 addReply(c
,shared
.czero
);
4106 if (o
->type
!= REDIS_LIST
) {
4107 addReply(c
,shared
.wrongtypeerr
);
4109 list
*list
= o
->ptr
;
4110 listNode
*ln
, *next
;
4111 int toremove
= atoi(c
->argv
[2]->ptr
);
4116 toremove
= -toremove
;
4119 ln
= fromtail
? list
->tail
: list
->head
;
4121 robj
*ele
= listNodeValue(ln
);
4123 next
= fromtail
? ln
->prev
: ln
->next
;
4124 if (compareStringObjects(ele
,c
->argv
[3]) == 0) {
4125 listDelNode(list
,ln
);
4128 if (toremove
&& removed
== toremove
) break;
4132 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",removed
));
4137 /* This is the semantic of this command:
4138 * RPOPLPUSH srclist dstlist:
4139 * IF LLEN(srclist) > 0
4140 * element = RPOP srclist
4141 * LPUSH dstlist element
4148 * The idea is to be able to get an element from a list in a reliable way
4149 * since the element is not just returned but pushed against another list
4150 * as well. This command was originally proposed by Ezra Zygmuntowicz.
4152 static void rpoplpushcommand(redisClient
*c
) {
4155 sobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4157 addReply(c
,shared
.nullbulk
);
4159 if (sobj
->type
!= REDIS_LIST
) {
4160 addReply(c
,shared
.wrongtypeerr
);
4162 list
*srclist
= sobj
->ptr
;
4163 listNode
*ln
= listLast(srclist
);
4166 addReply(c
,shared
.nullbulk
);
4168 robj
*dobj
= lookupKeyWrite(c
->db
,c
->argv
[2]);
4169 robj
*ele
= listNodeValue(ln
);
4172 if (dobj
&& dobj
->type
!= REDIS_LIST
) {
4173 addReply(c
,shared
.wrongtypeerr
);
4177 /* Add the element to the target list (unless it's directly
4178 * passed to some BLPOP-ing client */
4179 if (!handleClientsWaitingListPush(c
,c
->argv
[2],ele
)) {
4181 /* Create the list if the key does not exist */
4182 dobj
= createListObject();
4183 dictAdd(c
->db
->dict
,c
->argv
[2],dobj
);
4184 incrRefCount(c
->argv
[2]);
4186 dstlist
= dobj
->ptr
;
4187 listAddNodeHead(dstlist
,ele
);
4191 /* Send the element to the client as reply as well */
4192 addReplyBulkLen(c
,ele
);
4194 addReply(c
,shared
.crlf
);
4196 /* Finally remove the element from the source list */
4197 listDelNode(srclist
,ln
);
4205 /* ==================================== Sets ================================ */
4207 static void saddCommand(redisClient
*c
) {
4210 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4212 set
= createSetObject();
4213 dictAdd(c
->db
->dict
,c
->argv
[1],set
);
4214 incrRefCount(c
->argv
[1]);
4216 if (set
->type
!= REDIS_SET
) {
4217 addReply(c
,shared
.wrongtypeerr
);
4221 if (dictAdd(set
->ptr
,c
->argv
[2],NULL
) == DICT_OK
) {
4222 incrRefCount(c
->argv
[2]);
4224 addReply(c
,shared
.cone
);
4226 addReply(c
,shared
.czero
);
4230 static void sremCommand(redisClient
*c
) {
4233 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4235 addReply(c
,shared
.czero
);
4237 if (set
->type
!= REDIS_SET
) {
4238 addReply(c
,shared
.wrongtypeerr
);
4241 if (dictDelete(set
->ptr
,c
->argv
[2]) == DICT_OK
) {
4243 if (htNeedsResize(set
->ptr
)) dictResize(set
->ptr
);
4244 addReply(c
,shared
.cone
);
4246 addReply(c
,shared
.czero
);
4251 static void smoveCommand(redisClient
*c
) {
4252 robj
*srcset
, *dstset
;
4254 srcset
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4255 dstset
= lookupKeyWrite(c
->db
,c
->argv
[2]);
4257 /* If the source key does not exist return 0, if it's of the wrong type
4259 if (srcset
== NULL
|| srcset
->type
!= REDIS_SET
) {
4260 addReply(c
, srcset
? shared
.wrongtypeerr
: shared
.czero
);
4263 /* Error if the destination key is not a set as well */
4264 if (dstset
&& dstset
->type
!= REDIS_SET
) {
4265 addReply(c
,shared
.wrongtypeerr
);
4268 /* Remove the element from the source set */
4269 if (dictDelete(srcset
->ptr
,c
->argv
[3]) == DICT_ERR
) {
4270 /* Key not found in the src set! return zero */
4271 addReply(c
,shared
.czero
);
4275 /* Add the element to the destination set */
4277 dstset
= createSetObject();
4278 dictAdd(c
->db
->dict
,c
->argv
[2],dstset
);
4279 incrRefCount(c
->argv
[2]);
4281 if (dictAdd(dstset
->ptr
,c
->argv
[3],NULL
) == DICT_OK
)
4282 incrRefCount(c
->argv
[3]);
4283 addReply(c
,shared
.cone
);
4286 static void sismemberCommand(redisClient
*c
) {
4289 set
= lookupKeyRead(c
->db
,c
->argv
[1]);
4291 addReply(c
,shared
.czero
);
4293 if (set
->type
!= REDIS_SET
) {
4294 addReply(c
,shared
.wrongtypeerr
);
4297 if (dictFind(set
->ptr
,c
->argv
[2]))
4298 addReply(c
,shared
.cone
);
4300 addReply(c
,shared
.czero
);
4304 static void scardCommand(redisClient
*c
) {
4308 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4310 addReply(c
,shared
.czero
);
4313 if (o
->type
!= REDIS_SET
) {
4314 addReply(c
,shared
.wrongtypeerr
);
4317 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",
4323 static void spopCommand(redisClient
*c
) {
4327 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4329 addReply(c
,shared
.nullbulk
);
4331 if (set
->type
!= REDIS_SET
) {
4332 addReply(c
,shared
.wrongtypeerr
);
4335 de
= dictGetRandomKey(set
->ptr
);
4337 addReply(c
,shared
.nullbulk
);
4339 robj
*ele
= dictGetEntryKey(de
);
4341 addReplyBulkLen(c
,ele
);
4343 addReply(c
,shared
.crlf
);
4344 dictDelete(set
->ptr
,ele
);
4345 if (htNeedsResize(set
->ptr
)) dictResize(set
->ptr
);
4351 static void srandmemberCommand(redisClient
*c
) {
4355 set
= lookupKeyRead(c
->db
,c
->argv
[1]);
4357 addReply(c
,shared
.nullbulk
);
4359 if (set
->type
!= REDIS_SET
) {
4360 addReply(c
,shared
.wrongtypeerr
);
4363 de
= dictGetRandomKey(set
->ptr
);
4365 addReply(c
,shared
.nullbulk
);
4367 robj
*ele
= dictGetEntryKey(de
);
4369 addReplyBulkLen(c
,ele
);
4371 addReply(c
,shared
.crlf
);
4376 static int qsortCompareSetsByCardinality(const void *s1
, const void *s2
) {
4377 dict
**d1
= (void*) s1
, **d2
= (void*) s2
;
4379 return dictSize(*d1
)-dictSize(*d2
);
4382 static void sinterGenericCommand(redisClient
*c
, robj
**setskeys
, unsigned long setsnum
, robj
*dstkey
) {
4383 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
4386 robj
*lenobj
= NULL
, *dstset
= NULL
;
4387 unsigned long j
, cardinality
= 0;
4389 for (j
= 0; j
< setsnum
; j
++) {
4393 lookupKeyWrite(c
->db
,setskeys
[j
]) :
4394 lookupKeyRead(c
->db
,setskeys
[j
]);
4398 if (deleteKey(c
->db
,dstkey
))
4400 addReply(c
,shared
.czero
);
4402 addReply(c
,shared
.nullmultibulk
);
4406 if (setobj
->type
!= REDIS_SET
) {
4408 addReply(c
,shared
.wrongtypeerr
);
4411 dv
[j
] = setobj
->ptr
;
4413 /* Sort sets from the smallest to largest, this will improve our
4414 * algorithm's performace */
4415 qsort(dv
,setsnum
,sizeof(dict
*),qsortCompareSetsByCardinality
);
4417 /* The first thing we should output is the total number of elements...
4418 * since this is a multi-bulk write, but at this stage we don't know
4419 * the intersection set size, so we use a trick, append an empty object
4420 * to the output list and save the pointer to later modify it with the
4423 lenobj
= createObject(REDIS_STRING
,NULL
);
4425 decrRefCount(lenobj
);
4427 /* If we have a target key where to store the resulting set
4428 * create this key with an empty set inside */
4429 dstset
= createSetObject();
4432 /* Iterate all the elements of the first (smallest) set, and test
4433 * the element against all the other sets, if at least one set does
4434 * not include the element it is discarded */
4435 di
= dictGetIterator(dv
[0]);
4437 while((de
= dictNext(di
)) != NULL
) {
4440 for (j
= 1; j
< setsnum
; j
++)
4441 if (dictFind(dv
[j
],dictGetEntryKey(de
)) == NULL
) break;
4443 continue; /* at least one set does not contain the member */
4444 ele
= dictGetEntryKey(de
);
4446 addReplyBulkLen(c
,ele
);
4448 addReply(c
,shared
.crlf
);
4451 dictAdd(dstset
->ptr
,ele
,NULL
);
4455 dictReleaseIterator(di
);
4458 /* Store the resulting set into the target */
4459 deleteKey(c
->db
,dstkey
);
4460 dictAdd(c
->db
->dict
,dstkey
,dstset
);
4461 incrRefCount(dstkey
);
4465 lenobj
->ptr
= sdscatprintf(sdsempty(),"*%lu\r\n",cardinality
);
4467 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",
4468 dictSize((dict
*)dstset
->ptr
)));
4474 static void sinterCommand(redisClient
*c
) {
4475 sinterGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
);
4478 static void sinterstoreCommand(redisClient
*c
) {
4479 sinterGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1]);
4482 #define REDIS_OP_UNION 0
4483 #define REDIS_OP_DIFF 1
4485 static void sunionDiffGenericCommand(redisClient
*c
, robj
**setskeys
, int setsnum
, robj
*dstkey
, int op
) {
4486 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
4489 robj
*dstset
= NULL
;
4490 int j
, cardinality
= 0;
4492 for (j
= 0; j
< setsnum
; j
++) {
4496 lookupKeyWrite(c
->db
,setskeys
[j
]) :
4497 lookupKeyRead(c
->db
,setskeys
[j
]);
4502 if (setobj
->type
!= REDIS_SET
) {
4504 addReply(c
,shared
.wrongtypeerr
);
4507 dv
[j
] = setobj
->ptr
;
4510 /* We need a temp set object to store our union. If the dstkey
4511 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
4512 * this set object will be the resulting object to set into the target key*/
4513 dstset
= createSetObject();
4515 /* Iterate all the elements of all the sets, add every element a single
4516 * time to the result set */
4517 for (j
= 0; j
< setsnum
; j
++) {
4518 if (op
== REDIS_OP_DIFF
&& j
== 0 && !dv
[j
]) break; /* result set is empty */
4519 if (!dv
[j
]) continue; /* non existing keys are like empty sets */
4521 di
= dictGetIterator(dv
[j
]);
4523 while((de
= dictNext(di
)) != NULL
) {
4526 /* dictAdd will not add the same element multiple times */
4527 ele
= dictGetEntryKey(de
);
4528 if (op
== REDIS_OP_UNION
|| j
== 0) {
4529 if (dictAdd(dstset
->ptr
,ele
,NULL
) == DICT_OK
) {
4533 } else if (op
== REDIS_OP_DIFF
) {
4534 if (dictDelete(dstset
->ptr
,ele
) == DICT_OK
) {
4539 dictReleaseIterator(di
);
4541 if (op
== REDIS_OP_DIFF
&& cardinality
== 0) break; /* result set is empty */
4544 /* Output the content of the resulting set, if not in STORE mode */
4546 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",cardinality
));
4547 di
= dictGetIterator(dstset
->ptr
);
4548 while((de
= dictNext(di
)) != NULL
) {
4551 ele
= dictGetEntryKey(de
);
4552 addReplyBulkLen(c
,ele
);
4554 addReply(c
,shared
.crlf
);
4556 dictReleaseIterator(di
);
4558 /* If we have a target key where to store the resulting set
4559 * create this key with the result set inside */
4560 deleteKey(c
->db
,dstkey
);
4561 dictAdd(c
->db
->dict
,dstkey
,dstset
);
4562 incrRefCount(dstkey
);
4567 decrRefCount(dstset
);
4569 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",
4570 dictSize((dict
*)dstset
->ptr
)));
4576 static void sunionCommand(redisClient
*c
) {
4577 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_UNION
);
4580 static void sunionstoreCommand(redisClient
*c
) {
4581 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_UNION
);
4584 static void sdiffCommand(redisClient
*c
) {
4585 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_DIFF
);
4588 static void sdiffstoreCommand(redisClient
*c
) {
4589 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_DIFF
);
4592 /* ==================================== ZSets =============================== */
4594 /* ZSETs are ordered sets using two data structures to hold the same elements
4595 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
4598 * The elements are added to an hash table mapping Redis objects to scores.
4599 * At the same time the elements are added to a skip list mapping scores
4600 * to Redis objects (so objects are sorted by scores in this "view"). */
4602 /* This skiplist implementation is almost a C translation of the original
4603 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
4604 * Alternative to Balanced Trees", modified in three ways:
4605 * a) this implementation allows for repeated values.
4606 * b) the comparison is not just by key (our 'score') but by satellite data.
4607 * c) there is a back pointer, so it's a doubly linked list with the back
4608 * pointers being only at "level 1". This allows to traverse the list
4609 * from tail to head, useful for ZREVRANGE. */
4611 static zskiplistNode
*zslCreateNode(int level
, double score
, robj
*obj
) {
4612 zskiplistNode
*zn
= zmalloc(sizeof(*zn
));
4614 zn
->forward
= zmalloc(sizeof(zskiplistNode
*) * level
);
4620 static zskiplist
*zslCreate(void) {
4624 zsl
= zmalloc(sizeof(*zsl
));
4627 zsl
->header
= zslCreateNode(ZSKIPLIST_MAXLEVEL
,0,NULL
);
4628 for (j
= 0; j
< ZSKIPLIST_MAXLEVEL
; j
++)
4629 zsl
->header
->forward
[j
] = NULL
;
4630 zsl
->header
->backward
= NULL
;
4635 static void zslFreeNode(zskiplistNode
*node
) {
4636 decrRefCount(node
->obj
);
4637 zfree(node
->forward
);
4641 static void zslFree(zskiplist
*zsl
) {
4642 zskiplistNode
*node
= zsl
->header
->forward
[0], *next
;
4644 zfree(zsl
->header
->forward
);
4647 next
= node
->forward
[0];
4654 static int zslRandomLevel(void) {
4656 while ((random()&0xFFFF) < (ZSKIPLIST_P
* 0xFFFF))
4661 static void zslInsert(zskiplist
*zsl
, double score
, robj
*obj
) {
4662 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
4666 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4667 while (x
->forward
[i
] &&
4668 (x
->forward
[i
]->score
< score
||
4669 (x
->forward
[i
]->score
== score
&&
4670 compareStringObjects(x
->forward
[i
]->obj
,obj
) < 0)))
4674 /* we assume the key is not already inside, since we allow duplicated
4675 * scores, and the re-insertion of score and redis object should never
4676 * happpen since the caller of zslInsert() should test in the hash table
4677 * if the element is already inside or not. */
4678 level
= zslRandomLevel();
4679 if (level
> zsl
->level
) {
4680 for (i
= zsl
->level
; i
< level
; i
++)
4681 update
[i
] = zsl
->header
;
4684 x
= zslCreateNode(level
,score
,obj
);
4685 for (i
= 0; i
< level
; i
++) {
4686 x
->forward
[i
] = update
[i
]->forward
[i
];
4687 update
[i
]->forward
[i
] = x
;
4689 x
->backward
= (update
[0] == zsl
->header
) ? NULL
: update
[0];
4691 x
->forward
[0]->backward
= x
;
4697 /* Delete an element with matching score/object from the skiplist. */
4698 static int zslDelete(zskiplist
*zsl
, double score
, robj
*obj
) {
4699 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
4703 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4704 while (x
->forward
[i
] &&
4705 (x
->forward
[i
]->score
< score
||
4706 (x
->forward
[i
]->score
== score
&&
4707 compareStringObjects(x
->forward
[i
]->obj
,obj
) < 0)))
4711 /* We may have multiple elements with the same score, what we need
4712 * is to find the element with both the right score and object. */
4714 if (x
&& score
== x
->score
&& compareStringObjects(x
->obj
,obj
) == 0) {
4715 for (i
= 0; i
< zsl
->level
; i
++) {
4716 if (update
[i
]->forward
[i
] != x
) break;
4717 update
[i
]->forward
[i
] = x
->forward
[i
];
4719 if (x
->forward
[0]) {
4720 x
->forward
[0]->backward
= (x
->backward
== zsl
->header
) ?
4723 zsl
->tail
= x
->backward
;
4726 while(zsl
->level
> 1 && zsl
->header
->forward
[zsl
->level
-1] == NULL
)
4731 return 0; /* not found */
4733 return 0; /* not found */
4736 /* Delete all the elements with score between min and max from the skiplist.
4737 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
4738 * Note that this function takes the reference to the hash table view of the
4739 * sorted set, in order to remove the elements from the hash table too. */
4740 static unsigned long zslDeleteRange(zskiplist
*zsl
, double min
, double max
, dict
*dict
) {
4741 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
4742 unsigned long removed
= 0;
4746 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4747 while (x
->forward
[i
] && x
->forward
[i
]->score
< min
)
4751 /* We may have multiple elements with the same score, what we need
4752 * is to find the element with both the right score and object. */
4754 while (x
&& x
->score
<= max
) {
4755 zskiplistNode
*next
;
4757 for (i
= 0; i
< zsl
->level
; i
++) {
4758 if (update
[i
]->forward
[i
] != x
) break;
4759 update
[i
]->forward
[i
] = x
->forward
[i
];
4761 if (x
->forward
[0]) {
4762 x
->forward
[0]->backward
= (x
->backward
== zsl
->header
) ?
4765 zsl
->tail
= x
->backward
;
4767 next
= x
->forward
[0];
4768 dictDelete(dict
,x
->obj
);
4770 while(zsl
->level
> 1 && zsl
->header
->forward
[zsl
->level
-1] == NULL
)
4776 return removed
; /* not found */
4779 /* Find the first node having a score equal or greater than the specified one.
4780 * Returns NULL if there is no match. */
4781 static zskiplistNode
*zslFirstWithScore(zskiplist
*zsl
, double score
) {
4786 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4787 while (x
->forward
[i
] && x
->forward
[i
]->score
< score
)
4790 /* We may have multiple elements with the same score, what we need
4791 * is to find the element with both the right score and object. */
4792 return x
->forward
[0];
4795 /* The actual Z-commands implementations */
4797 /* This generic command implements both ZADD and ZINCRBY.
4798 * scoreval is the score if the operation is a ZADD (doincrement == 0) or
4799 * the increment if the operation is a ZINCRBY (doincrement == 1). */
4800 static void zaddGenericCommand(redisClient
*c
, robj
*key
, robj
*ele
, double scoreval
, int doincrement
) {
4805 zsetobj
= lookupKeyWrite(c
->db
,key
);
4806 if (zsetobj
== NULL
) {
4807 zsetobj
= createZsetObject();
4808 dictAdd(c
->db
->dict
,key
,zsetobj
);
4811 if (zsetobj
->type
!= REDIS_ZSET
) {
4812 addReply(c
,shared
.wrongtypeerr
);
4818 /* Ok now since we implement both ZADD and ZINCRBY here the code
4819 * needs to handle the two different conditions. It's all about setting
4820 * '*score', that is, the new score to set, to the right value. */
4821 score
= zmalloc(sizeof(double));
4825 /* Read the old score. If the element was not present starts from 0 */
4826 de
= dictFind(zs
->dict
,ele
);
4828 double *oldscore
= dictGetEntryVal(de
);
4829 *score
= *oldscore
+ scoreval
;
4837 /* What follows is a simple remove and re-insert operation that is common
4838 * to both ZADD and ZINCRBY... */
4839 if (dictAdd(zs
->dict
,ele
,score
) == DICT_OK
) {
4840 /* case 1: New element */
4841 incrRefCount(ele
); /* added to hash */
4842 zslInsert(zs
->zsl
,*score
,ele
);
4843 incrRefCount(ele
); /* added to skiplist */
4846 addReplyDouble(c
,*score
);
4848 addReply(c
,shared
.cone
);
4853 /* case 2: Score update operation */
4854 de
= dictFind(zs
->dict
,ele
);
4855 redisAssert(de
!= NULL
);
4856 oldscore
= dictGetEntryVal(de
);
4857 if (*score
!= *oldscore
) {
4860 /* Remove and insert the element in the skip list with new score */
4861 deleted
= zslDelete(zs
->zsl
,*oldscore
,ele
);
4862 redisAssert(deleted
!= 0);
4863 zslInsert(zs
->zsl
,*score
,ele
);
4865 /* Update the score in the hash table */
4866 dictReplace(zs
->dict
,ele
,score
);
4872 addReplyDouble(c
,*score
);
4874 addReply(c
,shared
.czero
);
4878 static void zaddCommand(redisClient
*c
) {
4881 scoreval
= strtod(c
->argv
[2]->ptr
,NULL
);
4882 zaddGenericCommand(c
,c
->argv
[1],c
->argv
[3],scoreval
,0);
4885 static void zincrbyCommand(redisClient
*c
) {
4888 scoreval
= strtod(c
->argv
[2]->ptr
,NULL
);
4889 zaddGenericCommand(c
,c
->argv
[1],c
->argv
[3],scoreval
,1);
4892 static void zremCommand(redisClient
*c
) {
4896 zsetobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4897 if (zsetobj
== NULL
) {
4898 addReply(c
,shared
.czero
);
4904 if (zsetobj
->type
!= REDIS_ZSET
) {
4905 addReply(c
,shared
.wrongtypeerr
);
4909 de
= dictFind(zs
->dict
,c
->argv
[2]);
4911 addReply(c
,shared
.czero
);
4914 /* Delete from the skiplist */
4915 oldscore
= dictGetEntryVal(de
);
4916 deleted
= zslDelete(zs
->zsl
,*oldscore
,c
->argv
[2]);
4917 redisAssert(deleted
!= 0);
4919 /* Delete from the hash table */
4920 dictDelete(zs
->dict
,c
->argv
[2]);
4921 if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
);
4923 addReply(c
,shared
.cone
);
4927 static void zremrangebyscoreCommand(redisClient
*c
) {
4928 double min
= strtod(c
->argv
[2]->ptr
,NULL
);
4929 double max
= strtod(c
->argv
[3]->ptr
,NULL
);
4933 zsetobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4934 if (zsetobj
== NULL
) {
4935 addReply(c
,shared
.czero
);
4939 if (zsetobj
->type
!= REDIS_ZSET
) {
4940 addReply(c
,shared
.wrongtypeerr
);
4944 deleted
= zslDeleteRange(zs
->zsl
,min
,max
,zs
->dict
);
4945 if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
);
4946 server
.dirty
+= deleted
;
4947 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",deleted
));
4951 static void zrangeGenericCommand(redisClient
*c
, int reverse
) {
4953 int start
= atoi(c
->argv
[2]->ptr
);
4954 int end
= atoi(c
->argv
[3]->ptr
);
4957 if (c
->argc
== 5 && !strcasecmp(c
->argv
[4]->ptr
,"withscores")) {
4959 } else if (c
->argc
>= 5) {
4960 addReply(c
,shared
.syntaxerr
);
4964 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4966 addReply(c
,shared
.nullmultibulk
);
4968 if (o
->type
!= REDIS_ZSET
) {
4969 addReply(c
,shared
.wrongtypeerr
);
4971 zset
*zsetobj
= o
->ptr
;
4972 zskiplist
*zsl
= zsetobj
->zsl
;
4975 int llen
= zsl
->length
;
4979 /* convert negative indexes */
4980 if (start
< 0) start
= llen
+start
;
4981 if (end
< 0) end
= llen
+end
;
4982 if (start
< 0) start
= 0;
4983 if (end
< 0) end
= 0;
4985 /* indexes sanity checks */
4986 if (start
> end
|| start
>= llen
) {
4987 /* Out of range start or start > end result in empty list */
4988 addReply(c
,shared
.emptymultibulk
);
4991 if (end
>= llen
) end
= llen
-1;
4992 rangelen
= (end
-start
)+1;
4994 /* Return the result in form of a multi-bulk reply */
5000 ln
= zsl
->header
->forward
[0];
5002 ln
= ln
->forward
[0];
5005 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",
5006 withscores
? (rangelen
*2) : rangelen
));
5007 for (j
= 0; j
< rangelen
; j
++) {
5009 addReplyBulkLen(c
,ele
);
5011 addReply(c
,shared
.crlf
);
5013 addReplyDouble(c
,ln
->score
);
5014 ln
= reverse
? ln
->backward
: ln
->forward
[0];
5020 static void zrangeCommand(redisClient
*c
) {
5021 zrangeGenericCommand(c
,0);
5024 static void zrevrangeCommand(redisClient
*c
) {
5025 zrangeGenericCommand(c
,1);
5028 static void zrangebyscoreCommand(redisClient
*c
) {
5030 double min
= strtod(c
->argv
[2]->ptr
,NULL
);
5031 double max
= strtod(c
->argv
[3]->ptr
,NULL
);
5032 int offset
= 0, limit
= -1;
5034 if (c
->argc
!= 4 && c
->argc
!= 7) {
5036 sdsnew("-ERR wrong number of arguments for ZRANGEBYSCORE\r\n"));
5038 } else if (c
->argc
== 7 && strcasecmp(c
->argv
[4]->ptr
,"limit")) {
5039 addReply(c
,shared
.syntaxerr
);
5041 } else if (c
->argc
== 7) {
5042 offset
= atoi(c
->argv
[5]->ptr
);
5043 limit
= atoi(c
->argv
[6]->ptr
);
5044 if (offset
< 0) offset
= 0;
5047 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
5049 addReply(c
,shared
.nullmultibulk
);
5051 if (o
->type
!= REDIS_ZSET
) {
5052 addReply(c
,shared
.wrongtypeerr
);
5054 zset
*zsetobj
= o
->ptr
;
5055 zskiplist
*zsl
= zsetobj
->zsl
;
5058 unsigned int rangelen
= 0;
5060 /* Get the first node with the score >= min */
5061 ln
= zslFirstWithScore(zsl
,min
);
5063 /* No element matching the speciifed interval */
5064 addReply(c
,shared
.emptymultibulk
);
5068 /* We don't know in advance how many matching elements there
5069 * are in the list, so we push this object that will represent
5070 * the multi-bulk length in the output buffer, and will "fix"
5072 lenobj
= createObject(REDIS_STRING
,NULL
);
5074 decrRefCount(lenobj
);
5076 while(ln
&& ln
->score
<= max
) {
5079 ln
= ln
->forward
[0];
5082 if (limit
== 0) break;
5084 addReplyBulkLen(c
,ele
);
5086 addReply(c
,shared
.crlf
);
5087 ln
= ln
->forward
[0];
5089 if (limit
> 0) limit
--;
5091 lenobj
->ptr
= sdscatprintf(sdsempty(),"*%d\r\n",rangelen
);
5096 static void zcardCommand(redisClient
*c
) {
5100 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
5102 addReply(c
,shared
.czero
);
5105 if (o
->type
!= REDIS_ZSET
) {
5106 addReply(c
,shared
.wrongtypeerr
);
5109 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",zs
->zsl
->length
));
5114 static void zscoreCommand(redisClient
*c
) {
5118 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
5120 addReply(c
,shared
.nullbulk
);
5123 if (o
->type
!= REDIS_ZSET
) {
5124 addReply(c
,shared
.wrongtypeerr
);
5129 de
= dictFind(zs
->dict
,c
->argv
[2]);
5131 addReply(c
,shared
.nullbulk
);
5133 double *score
= dictGetEntryVal(de
);
5135 addReplyDouble(c
,*score
);
5141 /* ========================= Non type-specific commands ==================== */
5143 static void flushdbCommand(redisClient
*c
) {
5144 server
.dirty
+= dictSize(c
->db
->dict
);
5145 dictEmpty(c
->db
->dict
);
5146 dictEmpty(c
->db
->expires
);
5147 addReply(c
,shared
.ok
);
5150 static void flushallCommand(redisClient
*c
) {
5151 server
.dirty
+= emptyDb();
5152 addReply(c
,shared
.ok
);
5153 rdbSave(server
.dbfilename
);
5157 static redisSortOperation
*createSortOperation(int type
, robj
*pattern
) {
5158 redisSortOperation
*so
= zmalloc(sizeof(*so
));
5160 so
->pattern
= pattern
;
5164 /* Return the value associated to the key with a name obtained
5165 * substituting the first occurence of '*' in 'pattern' with 'subst' */
5166 static robj
*lookupKeyByPattern(redisDb
*db
, robj
*pattern
, robj
*subst
) {
5170 int prefixlen
, sublen
, postfixlen
;
5171 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
5175 char buf
[REDIS_SORTKEY_MAX
+1];
5178 /* If the pattern is "#" return the substitution object itself in order
5179 * to implement the "SORT ... GET #" feature. */
5180 spat
= pattern
->ptr
;
5181 if (spat
[0] == '#' && spat
[1] == '\0') {
5185 /* The substitution object may be specially encoded. If so we create
5186 * a decoded object on the fly. Otherwise getDecodedObject will just
5187 * increment the ref count, that we'll decrement later. */
5188 subst
= getDecodedObject(subst
);
5191 if (sdslen(spat
)+sdslen(ssub
)-1 > REDIS_SORTKEY_MAX
) return NULL
;
5192 p
= strchr(spat
,'*');
5194 decrRefCount(subst
);
5199 sublen
= sdslen(ssub
);
5200 postfixlen
= sdslen(spat
)-(prefixlen
+1);
5201 memcpy(keyname
.buf
,spat
,prefixlen
);
5202 memcpy(keyname
.buf
+prefixlen
,ssub
,sublen
);
5203 memcpy(keyname
.buf
+prefixlen
+sublen
,p
+1,postfixlen
);
5204 keyname
.buf
[prefixlen
+sublen
+postfixlen
] = '\0';
5205 keyname
.len
= prefixlen
+sublen
+postfixlen
;
5207 initStaticStringObject(keyobj
,((char*)&keyname
)+(sizeof(long)*2))
5208 decrRefCount(subst
);
5210 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
5211 return lookupKeyRead(db
,&keyobj
);
5214 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
5215 * the additional parameter is not standard but a BSD-specific we have to
5216 * pass sorting parameters via the global 'server' structure */
5217 static int sortCompare(const void *s1
, const void *s2
) {
5218 const redisSortObject
*so1
= s1
, *so2
= s2
;
5221 if (!server
.sort_alpha
) {
5222 /* Numeric sorting. Here it's trivial as we precomputed scores */
5223 if (so1
->u
.score
> so2
->u
.score
) {
5225 } else if (so1
->u
.score
< so2
->u
.score
) {
5231 /* Alphanumeric sorting */
5232 if (server
.sort_bypattern
) {
5233 if (!so1
->u
.cmpobj
|| !so2
->u
.cmpobj
) {
5234 /* At least one compare object is NULL */
5235 if (so1
->u
.cmpobj
== so2
->u
.cmpobj
)
5237 else if (so1
->u
.cmpobj
== NULL
)
5242 /* We have both the objects, use strcoll */
5243 cmp
= strcoll(so1
->u
.cmpobj
->ptr
,so2
->u
.cmpobj
->ptr
);
5246 /* Compare elements directly */
5249 dec1
= getDecodedObject(so1
->obj
);
5250 dec2
= getDecodedObject(so2
->obj
);
5251 cmp
= strcoll(dec1
->ptr
,dec2
->ptr
);
5256 return server
.sort_desc
? -cmp
: cmp
;
5259 /* The SORT command is the most complex command in Redis. Warning: this code
5260 * is optimized for speed and a bit less for readability */
5261 static void sortCommand(redisClient
*c
) {
5264 int desc
= 0, alpha
= 0;
5265 int limit_start
= 0, limit_count
= -1, start
, end
;
5266 int j
, dontsort
= 0, vectorlen
;
5267 int getop
= 0; /* GET operation counter */
5268 robj
*sortval
, *sortby
= NULL
, *storekey
= NULL
;
5269 redisSortObject
*vector
; /* Resulting vector to sort */
5271 /* Lookup the key to sort. It must be of the right types */
5272 sortval
= lookupKeyRead(c
->db
,c
->argv
[1]);
5273 if (sortval
== NULL
) {
5274 addReply(c
,shared
.nullmultibulk
);
5277 if (sortval
->type
!= REDIS_SET
&& sortval
->type
!= REDIS_LIST
&&
5278 sortval
->type
!= REDIS_ZSET
)
5280 addReply(c
,shared
.wrongtypeerr
);
5284 /* Create a list of operations to perform for every sorted element.
5285 * Operations can be GET/DEL/INCR/DECR */
5286 operations
= listCreate();
5287 listSetFreeMethod(operations
,zfree
);
5290 /* Now we need to protect sortval incrementing its count, in the future
5291 * SORT may have options able to overwrite/delete keys during the sorting
5292 * and the sorted key itself may get destroied */
5293 incrRefCount(sortval
);
5295 /* The SORT command has an SQL-alike syntax, parse it */
5296 while(j
< c
->argc
) {
5297 int leftargs
= c
->argc
-j
-1;
5298 if (!strcasecmp(c
->argv
[j
]->ptr
,"asc")) {
5300 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"desc")) {
5302 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"alpha")) {
5304 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"limit") && leftargs
>= 2) {
5305 limit_start
= atoi(c
->argv
[j
+1]->ptr
);
5306 limit_count
= atoi(c
->argv
[j
+2]->ptr
);
5308 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"store") && leftargs
>= 1) {
5309 storekey
= c
->argv
[j
+1];
5311 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"by") && leftargs
>= 1) {
5312 sortby
= c
->argv
[j
+1];
5313 /* If the BY pattern does not contain '*', i.e. it is constant,
5314 * we don't need to sort nor to lookup the weight keys. */
5315 if (strchr(c
->argv
[j
+1]->ptr
,'*') == NULL
) dontsort
= 1;
5317 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"get") && leftargs
>= 1) {
5318 listAddNodeTail(operations
,createSortOperation(
5319 REDIS_SORT_GET
,c
->argv
[j
+1]));
5323 decrRefCount(sortval
);
5324 listRelease(operations
);
5325 addReply(c
,shared
.syntaxerr
);
5331 /* Load the sorting vector with all the objects to sort */
5332 switch(sortval
->type
) {
5333 case REDIS_LIST
: vectorlen
= listLength((list
*)sortval
->ptr
); break;
5334 case REDIS_SET
: vectorlen
= dictSize((dict
*)sortval
->ptr
); break;
5335 case REDIS_ZSET
: vectorlen
= dictSize(((zset
*)sortval
->ptr
)->dict
); break;
5336 default: vectorlen
= 0; redisAssert(0); /* Avoid GCC warning */
5338 vector
= zmalloc(sizeof(redisSortObject
)*vectorlen
);
5341 if (sortval
->type
== REDIS_LIST
) {
5342 list
*list
= sortval
->ptr
;
5346 while((ln
= listYield(list
))) {
5347 robj
*ele
= ln
->value
;
5348 vector
[j
].obj
= ele
;
5349 vector
[j
].u
.score
= 0;
5350 vector
[j
].u
.cmpobj
= NULL
;
5358 if (sortval
->type
== REDIS_SET
) {
5361 zset
*zs
= sortval
->ptr
;
5365 di
= dictGetIterator(set
);
5366 while((setele
= dictNext(di
)) != NULL
) {
5367 vector
[j
].obj
= dictGetEntryKey(setele
);
5368 vector
[j
].u
.score
= 0;
5369 vector
[j
].u
.cmpobj
= NULL
;
5372 dictReleaseIterator(di
);
5374 redisAssert(j
== vectorlen
);
5376 /* Now it's time to load the right scores in the sorting vector */
5377 if (dontsort
== 0) {
5378 for (j
= 0; j
< vectorlen
; j
++) {
5382 byval
= lookupKeyByPattern(c
->db
,sortby
,vector
[j
].obj
);
5383 if (!byval
|| byval
->type
!= REDIS_STRING
) continue;
5385 vector
[j
].u
.cmpobj
= getDecodedObject(byval
);
5387 if (byval
->encoding
== REDIS_ENCODING_RAW
) {
5388 vector
[j
].u
.score
= strtod(byval
->ptr
,NULL
);
5390 /* Don't need to decode the object if it's
5391 * integer-encoded (the only encoding supported) so
5392 * far. We can just cast it */
5393 if (byval
->encoding
== REDIS_ENCODING_INT
) {
5394 vector
[j
].u
.score
= (long)byval
->ptr
;
5396 redisAssert(1 != 1);
5401 if (vector
[j
].obj
->encoding
== REDIS_ENCODING_RAW
)
5402 vector
[j
].u
.score
= strtod(vector
[j
].obj
->ptr
,NULL
);
5404 if (vector
[j
].obj
->encoding
== REDIS_ENCODING_INT
)
5405 vector
[j
].u
.score
= (long) vector
[j
].obj
->ptr
;
5407 redisAssert(1 != 1);
5414 /* We are ready to sort the vector... perform a bit of sanity check
5415 * on the LIMIT option too. We'll use a partial version of quicksort. */
5416 start
= (limit_start
< 0) ? 0 : limit_start
;
5417 end
= (limit_count
< 0) ? vectorlen
-1 : start
+limit_count
-1;
5418 if (start
>= vectorlen
) {
5419 start
= vectorlen
-1;
5422 if (end
>= vectorlen
) end
= vectorlen
-1;
5424 if (dontsort
== 0) {
5425 server
.sort_desc
= desc
;
5426 server
.sort_alpha
= alpha
;
5427 server
.sort_bypattern
= sortby
? 1 : 0;
5428 if (sortby
&& (start
!= 0 || end
!= vectorlen
-1))
5429 pqsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
, start
,end
);
5431 qsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
);
5434 /* Send command output to the output buffer, performing the specified
5435 * GET/DEL/INCR/DECR operations if any. */
5436 outputlen
= getop
? getop
*(end
-start
+1) : end
-start
+1;
5437 if (storekey
== NULL
) {
5438 /* STORE option not specified, sent the sorting result to client */
5439 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",outputlen
));
5440 for (j
= start
; j
<= end
; j
++) {
5443 addReplyBulkLen(c
,vector
[j
].obj
);
5444 addReply(c
,vector
[j
].obj
);
5445 addReply(c
,shared
.crlf
);
5447 listRewind(operations
);
5448 while((ln
= listYield(operations
))) {
5449 redisSortOperation
*sop
= ln
->value
;
5450 robj
*val
= lookupKeyByPattern(c
->db
,sop
->pattern
,
5453 if (sop
->type
== REDIS_SORT_GET
) {
5454 if (!val
|| val
->type
!= REDIS_STRING
) {
5455 addReply(c
,shared
.nullbulk
);
5457 addReplyBulkLen(c
,val
);
5459 addReply(c
,shared
.crlf
);
5462 redisAssert(sop
->type
== REDIS_SORT_GET
); /* always fails */
5467 robj
*listObject
= createListObject();
5468 list
*listPtr
= (list
*) listObject
->ptr
;
5470 /* STORE option specified, set the sorting result as a List object */
5471 for (j
= start
; j
<= end
; j
++) {
5474 listAddNodeTail(listPtr
,vector
[j
].obj
);
5475 incrRefCount(vector
[j
].obj
);
5477 listRewind(operations
);
5478 while((ln
= listYield(operations
))) {
5479 redisSortOperation
*sop
= ln
->value
;
5480 robj
*val
= lookupKeyByPattern(c
->db
,sop
->pattern
,
5483 if (sop
->type
== REDIS_SORT_GET
) {
5484 if (!val
|| val
->type
!= REDIS_STRING
) {
5485 listAddNodeTail(listPtr
,createStringObject("",0));
5487 listAddNodeTail(listPtr
,val
);
5491 redisAssert(sop
->type
== REDIS_SORT_GET
); /* always fails */
5495 if (dictReplace(c
->db
->dict
,storekey
,listObject
)) {
5496 incrRefCount(storekey
);
5498 /* Note: we add 1 because the DB is dirty anyway since even if the
5499 * SORT result is empty a new key is set and maybe the old content
5501 server
.dirty
+= 1+outputlen
;
5502 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",outputlen
));
5506 decrRefCount(sortval
);
5507 listRelease(operations
);
5508 for (j
= 0; j
< vectorlen
; j
++) {
5509 if (sortby
&& alpha
&& vector
[j
].u
.cmpobj
)
5510 decrRefCount(vector
[j
].u
.cmpobj
);
5515 /* Convert an amount of bytes into a human readable string in the form
5516 * of 100B, 2G, 100M, 4K, and so forth. */
5517 static void bytesToHuman(char *s
, unsigned long long n
) {
5522 sprintf(s
,"%lluB",n
);
5524 } else if (n
< (1024*1024)) {
5525 d
= (double)n
/(1024);
5526 sprintf(s
,"%.2fK",d
);
5527 } else if (n
< (1024LL*1024*1024)) {
5528 d
= (double)n
/(1024*1024);
5529 sprintf(s
,"%.2fM",d
);
5530 } else if (n
< (1024LL*1024*1024*1024)) {
5531 d
= (double)n
/(1024LL*1024*1024);
5532 sprintf(s
,"%.2fM",d
);
5536 /* Create the string returned by the INFO command. This is decoupled
5537 * by the INFO command itself as we need to report the same information
5538 * on memory corruption problems. */
5539 static sds
genRedisInfoString(void) {
5541 time_t uptime
= time(NULL
)-server
.stat_starttime
;
5545 bytesToHuman(hmem
,server
.usedmemory
);
5546 info
= sdscatprintf(sdsempty(),
5547 "redis_version:%s\r\n"
5549 "multiplexing_api:%s\r\n"
5550 "process_id:%ld\r\n"
5551 "uptime_in_seconds:%ld\r\n"
5552 "uptime_in_days:%ld\r\n"
5553 "connected_clients:%d\r\n"
5554 "connected_slaves:%d\r\n"
5555 "blocked_clients:%d\r\n"
5556 "used_memory:%zu\r\n"
5557 "used_memory_human:%s\r\n"
5558 "changes_since_last_save:%lld\r\n"
5559 "bgsave_in_progress:%d\r\n"
5560 "last_save_time:%ld\r\n"
5561 "bgrewriteaof_in_progress:%d\r\n"
5562 "total_connections_received:%lld\r\n"
5563 "total_commands_processed:%lld\r\n"
5567 (sizeof(long) == 8) ? "64" : "32",
5572 listLength(server
.clients
)-listLength(server
.slaves
),
5573 listLength(server
.slaves
),
5574 server
.blockedclients
,
5578 server
.bgsavechildpid
!= -1,
5580 server
.bgrewritechildpid
!= -1,
5581 server
.stat_numconnections
,
5582 server
.stat_numcommands
,
5583 server
.vm_enabled
!= 0,
5584 server
.masterhost
== NULL
? "master" : "slave"
5586 if (server
.masterhost
) {
5587 info
= sdscatprintf(info
,
5588 "master_host:%s\r\n"
5589 "master_port:%d\r\n"
5590 "master_link_status:%s\r\n"
5591 "master_last_io_seconds_ago:%d\r\n"
5594 (server
.replstate
== REDIS_REPL_CONNECTED
) ?
5596 server
.master
? ((int)(time(NULL
)-server
.master
->lastinteraction
)) : -1
5599 if (server
.vm_enabled
) {
5600 info
= sdscatprintf(info
,
5601 "vm_conf_max_memory:%llu\r\n"
5602 "vm_conf_page_size:%llu\r\n"
5603 "vm_conf_pages:%llu\r\n"
5604 "vm_stats_used_pages:%llu\r\n"
5605 "vm_stats_swapped_objects:%llu\r\n"
5606 "vm_stats_swappin_count:%llu\r\n"
5607 "vm_stats_swappout_count:%llu\r\n"
5608 "vm_stats_io_newjobs_len:%lu\r\n"
5609 "vm_stats_io_processing_len:%lu\r\n"
5610 "vm_stats_io_processed_len:%lu\r\n"
5611 "vm_stats_io_waiting_clients:%lu\r\n"
5612 ,(unsigned long long) server
.vm_max_memory
,
5613 (unsigned long long) server
.vm_page_size
,
5614 (unsigned long long) server
.vm_pages
,
5615 (unsigned long long) server
.vm_stats_used_pages
,
5616 (unsigned long long) server
.vm_stats_swapped_objects
,
5617 (unsigned long long) server
.vm_stats_swapins
,
5618 (unsigned long long) server
.vm_stats_swapouts
,
5619 (unsigned long) listLength(server
.io_newjobs
),
5620 (unsigned long) listLength(server
.io_processing
),
5621 (unsigned long) listLength(server
.io_processed
),
5622 (unsigned long) listLength(server
.io_clients
)
5625 for (j
= 0; j
< server
.dbnum
; j
++) {
5626 long long keys
, vkeys
;
5628 keys
= dictSize(server
.db
[j
].dict
);
5629 vkeys
= dictSize(server
.db
[j
].expires
);
5630 if (keys
|| vkeys
) {
5631 info
= sdscatprintf(info
, "db%d:keys=%lld,expires=%lld\r\n",
5638 static void infoCommand(redisClient
*c
) {
5639 sds info
= genRedisInfoString();
5640 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n",
5641 (unsigned long)sdslen(info
)));
5642 addReplySds(c
,info
);
5643 addReply(c
,shared
.crlf
);
5646 static void monitorCommand(redisClient
*c
) {
5647 /* ignore MONITOR if aleady slave or in monitor mode */
5648 if (c
->flags
& REDIS_SLAVE
) return;
5650 c
->flags
|= (REDIS_SLAVE
|REDIS_MONITOR
);
5652 listAddNodeTail(server
.monitors
,c
);
5653 addReply(c
,shared
.ok
);
5656 /* ================================= Expire ================================= */
5657 static int removeExpire(redisDb
*db
, robj
*key
) {
5658 if (dictDelete(db
->expires
,key
) == DICT_OK
) {
5665 static int setExpire(redisDb
*db
, robj
*key
, time_t when
) {
5666 if (dictAdd(db
->expires
,key
,(void*)when
) == DICT_ERR
) {
5674 /* Return the expire time of the specified key, or -1 if no expire
5675 * is associated with this key (i.e. the key is non volatile) */
5676 static time_t getExpire(redisDb
*db
, robj
*key
) {
5679 /* No expire? return ASAP */
5680 if (dictSize(db
->expires
) == 0 ||
5681 (de
= dictFind(db
->expires
,key
)) == NULL
) return -1;
5683 return (time_t) dictGetEntryVal(de
);
5686 static int expireIfNeeded(redisDb
*db
, robj
*key
) {
5690 /* No expire? return ASAP */
5691 if (dictSize(db
->expires
) == 0 ||
5692 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
5694 /* Lookup the expire */
5695 when
= (time_t) dictGetEntryVal(de
);
5696 if (time(NULL
) <= when
) return 0;
5698 /* Delete the key */
5699 dictDelete(db
->expires
,key
);
5700 return dictDelete(db
->dict
,key
) == DICT_OK
;
5703 static int deleteIfVolatile(redisDb
*db
, robj
*key
) {
5706 /* No expire? return ASAP */
5707 if (dictSize(db
->expires
) == 0 ||
5708 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
5710 /* Delete the key */
5712 dictDelete(db
->expires
,key
);
5713 return dictDelete(db
->dict
,key
) == DICT_OK
;
5716 static void expireGenericCommand(redisClient
*c
, robj
*key
, time_t seconds
) {
5719 de
= dictFind(c
->db
->dict
,key
);
5721 addReply(c
,shared
.czero
);
5725 if (deleteKey(c
->db
,key
)) server
.dirty
++;
5726 addReply(c
, shared
.cone
);
5729 time_t when
= time(NULL
)+seconds
;
5730 if (setExpire(c
->db
,key
,when
)) {
5731 addReply(c
,shared
.cone
);
5734 addReply(c
,shared
.czero
);
5740 static void expireCommand(redisClient
*c
) {
5741 expireGenericCommand(c
,c
->argv
[1],strtol(c
->argv
[2]->ptr
,NULL
,10));
5744 static void expireatCommand(redisClient
*c
) {
5745 expireGenericCommand(c
,c
->argv
[1],strtol(c
->argv
[2]->ptr
,NULL
,10)-time(NULL
));
5748 static void ttlCommand(redisClient
*c
) {
5752 expire
= getExpire(c
->db
,c
->argv
[1]);
5754 ttl
= (int) (expire
-time(NULL
));
5755 if (ttl
< 0) ttl
= -1;
5757 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",ttl
));
5760 /* ================================ MULTI/EXEC ============================== */
5762 /* Client state initialization for MULTI/EXEC */
5763 static void initClientMultiState(redisClient
*c
) {
5764 c
->mstate
.commands
= NULL
;
5765 c
->mstate
.count
= 0;
5768 /* Release all the resources associated with MULTI/EXEC state */
5769 static void freeClientMultiState(redisClient
*c
) {
5772 for (j
= 0; j
< c
->mstate
.count
; j
++) {
5774 multiCmd
*mc
= c
->mstate
.commands
+j
;
5776 for (i
= 0; i
< mc
->argc
; i
++)
5777 decrRefCount(mc
->argv
[i
]);
5780 zfree(c
->mstate
.commands
);
5783 /* Add a new command into the MULTI commands queue */
5784 static void queueMultiCommand(redisClient
*c
, struct redisCommand
*cmd
) {
5788 c
->mstate
.commands
= zrealloc(c
->mstate
.commands
,
5789 sizeof(multiCmd
)*(c
->mstate
.count
+1));
5790 mc
= c
->mstate
.commands
+c
->mstate
.count
;
5793 mc
->argv
= zmalloc(sizeof(robj
*)*c
->argc
);
5794 memcpy(mc
->argv
,c
->argv
,sizeof(robj
*)*c
->argc
);
5795 for (j
= 0; j
< c
->argc
; j
++)
5796 incrRefCount(mc
->argv
[j
]);
5800 static void multiCommand(redisClient
*c
) {
5801 c
->flags
|= REDIS_MULTI
;
5802 addReply(c
,shared
.ok
);
5805 static void execCommand(redisClient
*c
) {
5810 if (!(c
->flags
& REDIS_MULTI
)) {
5811 addReplySds(c
,sdsnew("-ERR EXEC without MULTI\r\n"));
5815 orig_argv
= c
->argv
;
5816 orig_argc
= c
->argc
;
5817 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",c
->mstate
.count
));
5818 for (j
= 0; j
< c
->mstate
.count
; j
++) {
5819 c
->argc
= c
->mstate
.commands
[j
].argc
;
5820 c
->argv
= c
->mstate
.commands
[j
].argv
;
5821 call(c
,c
->mstate
.commands
[j
].cmd
);
5823 c
->argv
= orig_argv
;
5824 c
->argc
= orig_argc
;
5825 freeClientMultiState(c
);
5826 initClientMultiState(c
);
5827 c
->flags
&= (~REDIS_MULTI
);
5830 /* =========================== Blocking Operations ========================= */
5832 /* Currently Redis blocking operations support is limited to list POP ops,
5833 * so the current implementation is not fully generic, but it is also not
5834 * completely specific so it will not require a rewrite to support new
5835 * kind of blocking operations in the future.
5837 * Still it's important to note that list blocking operations can be already
5838 * used as a notification mechanism in order to implement other blocking
5839 * operations at application level, so there must be a very strong evidence
5840 * of usefulness and generality before new blocking operations are implemented.
5842 * This is how the current blocking POP works, we use BLPOP as example:
5843 * - If the user calls BLPOP and the key exists and contains a non empty list
5844 * then LPOP is called instead. So BLPOP is semantically the same as LPOP
5845 * if there is not to block.
5846 * - If instead BLPOP is called and the key does not exists or the list is
5847 * empty we need to block. In order to do so we remove the notification for
5848 * new data to read in the client socket (so that we'll not serve new
5849 * requests if the blocking request is not served). Also we put the client
5850 * in a dictionary (db->blockingkeys) mapping keys to a list of clients
5851 * blocking for this keys.
5852 * - If a PUSH operation against a key with blocked clients waiting is
5853 * performed, we serve the first in the list: basically instead to push
5854 * the new element inside the list we return it to the (first / oldest)
5855 * blocking client, unblock the client, and remove it form the list.
5857 * The above comment and the source code should be enough in order to understand
5858 * the implementation and modify / fix it later.
5861 /* Set a client in blocking mode for the specified key, with the specified
5863 static void blockForKeys(redisClient
*c
, robj
**keys
, int numkeys
, time_t timeout
) {
5868 c
->blockingkeys
= zmalloc(sizeof(robj
*)*numkeys
);
5869 c
->blockingkeysnum
= numkeys
;
5870 c
->blockingto
= timeout
;
5871 for (j
= 0; j
< numkeys
; j
++) {
5872 /* Add the key in the client structure, to map clients -> keys */
5873 c
->blockingkeys
[j
] = keys
[j
];
5874 incrRefCount(keys
[j
]);
5876 /* And in the other "side", to map keys -> clients */
5877 de
= dictFind(c
->db
->blockingkeys
,keys
[j
]);
5881 /* For every key we take a list of clients blocked for it */
5883 retval
= dictAdd(c
->db
->blockingkeys
,keys
[j
],l
);
5884 incrRefCount(keys
[j
]);
5885 assert(retval
== DICT_OK
);
5887 l
= dictGetEntryVal(de
);
5889 listAddNodeTail(l
,c
);
5891 /* Mark the client as a blocked client */
5892 c
->flags
|= REDIS_BLOCKED
;
5893 aeDeleteFileEvent(server
.el
,c
->fd
,AE_READABLE
);
5894 server
.blockedclients
++;
5897 /* Unblock a client that's waiting in a blocking operation such as BLPOP */
5898 static void unblockClient(redisClient
*c
) {
5903 assert(c
->blockingkeys
!= NULL
);
5904 /* The client may wait for multiple keys, so unblock it for every key. */
5905 for (j
= 0; j
< c
->blockingkeysnum
; j
++) {
5906 /* Remove this client from the list of clients waiting for this key. */
5907 de
= dictFind(c
->db
->blockingkeys
,c
->blockingkeys
[j
]);
5909 l
= dictGetEntryVal(de
);
5910 listDelNode(l
,listSearchKey(l
,c
));
5911 /* If the list is empty we need to remove it to avoid wasting memory */
5912 if (listLength(l
) == 0)
5913 dictDelete(c
->db
->blockingkeys
,c
->blockingkeys
[j
]);
5914 decrRefCount(c
->blockingkeys
[j
]);
5916 /* Cleanup the client structure */
5917 zfree(c
->blockingkeys
);
5918 c
->blockingkeys
= NULL
;
5919 c
->flags
&= (~REDIS_BLOCKED
);
5920 server
.blockedclients
--;
5921 /* Ok now we are ready to get read events from socket, note that we
5922 * can't trap errors here as it's possible that unblockClients() is
5923 * called from freeClient() itself, and the only thing we can do
5924 * if we failed to register the READABLE event is to kill the client.
5925 * Still the following function should never fail in the real world as
5926 * we are sure the file descriptor is sane, and we exit on out of mem. */
5927 aeCreateFileEvent(server
.el
, c
->fd
, AE_READABLE
, readQueryFromClient
, c
);
5928 /* As a final step we want to process data if there is some command waiting
5929 * in the input buffer. Note that this is safe even if unblockClient()
5930 * gets called from freeClient() because freeClient() will be smart
5931 * enough to call this function *after* c->querybuf was set to NULL. */
5932 if (c
->querybuf
&& sdslen(c
->querybuf
) > 0) processInputBuffer(c
);
5935 /* This should be called from any function PUSHing into lists.
5936 * 'c' is the "pushing client", 'key' is the key it is pushing data against,
5937 * 'ele' is the element pushed.
5939 * If the function returns 0 there was no client waiting for a list push
5942 * If the function returns 1 there was a client waiting for a list push
5943 * against this key, the element was passed to this client thus it's not
5944 * needed to actually add it to the list and the caller should return asap. */
5945 static int handleClientsWaitingListPush(redisClient
*c
, robj
*key
, robj
*ele
) {
5946 struct dictEntry
*de
;
5947 redisClient
*receiver
;
5951 de
= dictFind(c
->db
->blockingkeys
,key
);
5952 if (de
== NULL
) return 0;
5953 l
= dictGetEntryVal(de
);
5956 receiver
= ln
->value
;
5958 addReplySds(receiver
,sdsnew("*2\r\n"));
5959 addReplyBulkLen(receiver
,key
);
5960 addReply(receiver
,key
);
5961 addReply(receiver
,shared
.crlf
);
5962 addReplyBulkLen(receiver
,ele
);
5963 addReply(receiver
,ele
);
5964 addReply(receiver
,shared
.crlf
);
5965 unblockClient(receiver
);
5969 /* Blocking RPOP/LPOP */
5970 static void blockingPopGenericCommand(redisClient
*c
, int where
) {
5975 for (j
= 1; j
< c
->argc
-1; j
++) {
5976 o
= lookupKeyWrite(c
->db
,c
->argv
[j
]);
5978 if (o
->type
!= REDIS_LIST
) {
5979 addReply(c
,shared
.wrongtypeerr
);
5982 list
*list
= o
->ptr
;
5983 if (listLength(list
) != 0) {
5984 /* If the list contains elements fall back to the usual
5985 * non-blocking POP operation */
5986 robj
*argv
[2], **orig_argv
;
5989 /* We need to alter the command arguments before to call
5990 * popGenericCommand() as the command takes a single key. */
5991 orig_argv
= c
->argv
;
5992 orig_argc
= c
->argc
;
5993 argv
[1] = c
->argv
[j
];
5997 /* Also the return value is different, we need to output
5998 * the multi bulk reply header and the key name. The
5999 * "real" command will add the last element (the value)
6000 * for us. If this souds like an hack to you it's just
6001 * because it is... */
6002 addReplySds(c
,sdsnew("*2\r\n"));
6003 addReplyBulkLen(c
,argv
[1]);
6004 addReply(c
,argv
[1]);
6005 addReply(c
,shared
.crlf
);
6006 popGenericCommand(c
,where
);
6008 /* Fix the client structure with the original stuff */
6009 c
->argv
= orig_argv
;
6010 c
->argc
= orig_argc
;
6016 /* If the list is empty or the key does not exists we must block */
6017 timeout
= strtol(c
->argv
[c
->argc
-1]->ptr
,NULL
,10);
6018 if (timeout
> 0) timeout
+= time(NULL
);
6019 blockForKeys(c
,c
->argv
+1,c
->argc
-2,timeout
);
6022 static void blpopCommand(redisClient
*c
) {
6023 blockingPopGenericCommand(c
,REDIS_HEAD
);
6026 static void brpopCommand(redisClient
*c
) {
6027 blockingPopGenericCommand(c
,REDIS_TAIL
);
6030 /* =============================== Replication ============================= */
6032 static int syncWrite(int fd
, char *ptr
, ssize_t size
, int timeout
) {
6033 ssize_t nwritten
, ret
= size
;
6034 time_t start
= time(NULL
);
6038 if (aeWait(fd
,AE_WRITABLE
,1000) & AE_WRITABLE
) {
6039 nwritten
= write(fd
,ptr
,size
);
6040 if (nwritten
== -1) return -1;
6044 if ((time(NULL
)-start
) > timeout
) {
6052 static int syncRead(int fd
, char *ptr
, ssize_t size
, int timeout
) {
6053 ssize_t nread
, totread
= 0;
6054 time_t start
= time(NULL
);
6058 if (aeWait(fd
,AE_READABLE
,1000) & AE_READABLE
) {
6059 nread
= read(fd
,ptr
,size
);
6060 if (nread
== -1) return -1;
6065 if ((time(NULL
)-start
) > timeout
) {
6073 static int syncReadLine(int fd
, char *ptr
, ssize_t size
, int timeout
) {
6080 if (syncRead(fd
,&c
,1,timeout
) == -1) return -1;
6083 if (nread
&& *(ptr
-1) == '\r') *(ptr
-1) = '\0';
6094 static void syncCommand(redisClient
*c
) {
6095 /* ignore SYNC if aleady slave or in monitor mode */
6096 if (c
->flags
& REDIS_SLAVE
) return;
6098 /* SYNC can't be issued when the server has pending data to send to
6099 * the client about already issued commands. We need a fresh reply
6100 * buffer registering the differences between the BGSAVE and the current
6101 * dataset, so that we can copy to other slaves if needed. */
6102 if (listLength(c
->reply
) != 0) {
6103 addReplySds(c
,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
6107 redisLog(REDIS_NOTICE
,"Slave ask for synchronization");
6108 /* Here we need to check if there is a background saving operation
6109 * in progress, or if it is required to start one */
6110 if (server
.bgsavechildpid
!= -1) {
6111 /* Ok a background save is in progress. Let's check if it is a good
6112 * one for replication, i.e. if there is another slave that is
6113 * registering differences since the server forked to save */
6117 listRewind(server
.slaves
);
6118 while((ln
= listYield(server
.slaves
))) {
6120 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) break;
6123 /* Perfect, the server is already registering differences for
6124 * another slave. Set the right state, and copy the buffer. */
6125 listRelease(c
->reply
);
6126 c
->reply
= listDup(slave
->reply
);
6127 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
6128 redisLog(REDIS_NOTICE
,"Waiting for end of BGSAVE for SYNC");
6130 /* No way, we need to wait for the next BGSAVE in order to
6131 * register differences */
6132 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_START
;
6133 redisLog(REDIS_NOTICE
,"Waiting for next BGSAVE for SYNC");
6136 /* Ok we don't have a BGSAVE in progress, let's start one */
6137 redisLog(REDIS_NOTICE
,"Starting BGSAVE for SYNC");
6138 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
6139 redisLog(REDIS_NOTICE
,"Replication failed, can't BGSAVE");
6140 addReplySds(c
,sdsnew("-ERR Unalbe to perform background save\r\n"));
6143 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
6146 c
->flags
|= REDIS_SLAVE
;
6148 listAddNodeTail(server
.slaves
,c
);
6152 static void sendBulkToSlave(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
6153 redisClient
*slave
= privdata
;
6155 REDIS_NOTUSED(mask
);
6156 char buf
[REDIS_IOBUF_LEN
];
6157 ssize_t nwritten
, buflen
;
6159 if (slave
->repldboff
== 0) {
6160 /* Write the bulk write count before to transfer the DB. In theory here
6161 * we don't know how much room there is in the output buffer of the
6162 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
6163 * operations) will never be smaller than the few bytes we need. */
6166 bulkcount
= sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
6168 if (write(fd
,bulkcount
,sdslen(bulkcount
)) != (signed)sdslen(bulkcount
))
6176 lseek(slave
->repldbfd
,slave
->repldboff
,SEEK_SET
);
6177 buflen
= read(slave
->repldbfd
,buf
,REDIS_IOBUF_LEN
);
6179 redisLog(REDIS_WARNING
,"Read error sending DB to slave: %s",
6180 (buflen
== 0) ? "premature EOF" : strerror(errno
));
6184 if ((nwritten
= write(fd
,buf
,buflen
)) == -1) {
6185 redisLog(REDIS_VERBOSE
,"Write error sending DB to slave: %s",
6190 slave
->repldboff
+= nwritten
;
6191 if (slave
->repldboff
== slave
->repldbsize
) {
6192 close(slave
->repldbfd
);
6193 slave
->repldbfd
= -1;
6194 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
6195 slave
->replstate
= REDIS_REPL_ONLINE
;
6196 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
,
6197 sendReplyToClient
, slave
) == AE_ERR
) {
6201 addReplySds(slave
,sdsempty());
6202 redisLog(REDIS_NOTICE
,"Synchronization with slave succeeded");
6206 /* This function is called at the end of every backgrond saving.
6207 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
6208 * otherwise REDIS_ERR is passed to the function.
6210 * The goal of this function is to handle slaves waiting for a successful
6211 * background saving in order to perform non-blocking synchronization. */
6212 static void updateSlavesWaitingBgsave(int bgsaveerr
) {
6214 int startbgsave
= 0;
6216 listRewind(server
.slaves
);
6217 while((ln
= listYield(server
.slaves
))) {
6218 redisClient
*slave
= ln
->value
;
6220 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) {
6222 slave
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
6223 } else if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) {
6224 struct redis_stat buf
;
6226 if (bgsaveerr
!= REDIS_OK
) {
6228 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE child returned an error");
6231 if ((slave
->repldbfd
= open(server
.dbfilename
,O_RDONLY
)) == -1 ||
6232 redis_fstat(slave
->repldbfd
,&buf
) == -1) {
6234 redisLog(REDIS_WARNING
,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno
));
6237 slave
->repldboff
= 0;
6238 slave
->repldbsize
= buf
.st_size
;
6239 slave
->replstate
= REDIS_REPL_SEND_BULK
;
6240 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
6241 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
, sendBulkToSlave
, slave
) == AE_ERR
) {
6248 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
6249 listRewind(server
.slaves
);
6250 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE failed");
6251 while((ln
= listYield(server
.slaves
))) {
6252 redisClient
*slave
= ln
->value
;
6254 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
)
6261 static int syncWithMaster(void) {
6262 char buf
[1024], tmpfile
[256], authcmd
[1024];
6264 int fd
= anetTcpConnect(NULL
,server
.masterhost
,server
.masterport
);
6268 redisLog(REDIS_WARNING
,"Unable to connect to MASTER: %s",
6273 /* AUTH with the master if required. */
6274 if(server
.masterauth
) {
6275 snprintf(authcmd
, 1024, "AUTH %s\r\n", server
.masterauth
);
6276 if (syncWrite(fd
, authcmd
, strlen(server
.masterauth
)+7, 5) == -1) {
6278 redisLog(REDIS_WARNING
,"Unable to AUTH to MASTER: %s",
6282 /* Read the AUTH result. */
6283 if (syncReadLine(fd
,buf
,1024,3600) == -1) {
6285 redisLog(REDIS_WARNING
,"I/O error reading auth result from MASTER: %s",
6289 if (buf
[0] != '+') {
6291 redisLog(REDIS_WARNING
,"Cannot AUTH to MASTER, is the masterauth password correct?");
6296 /* Issue the SYNC command */
6297 if (syncWrite(fd
,"SYNC \r\n",7,5) == -1) {
6299 redisLog(REDIS_WARNING
,"I/O error writing to MASTER: %s",
6303 /* Read the bulk write count */
6304 if (syncReadLine(fd
,buf
,1024,3600) == -1) {
6306 redisLog(REDIS_WARNING
,"I/O error reading bulk count from MASTER: %s",
6310 if (buf
[0] != '$') {
6312 redisLog(REDIS_WARNING
,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
6315 dumpsize
= atoi(buf
+1);
6316 redisLog(REDIS_NOTICE
,"Receiving %d bytes data dump from MASTER",dumpsize
);
6317 /* Read the bulk write data on a temp file */
6318 snprintf(tmpfile
,256,"temp-%d.%ld.rdb",(int)time(NULL
),(long int)random());
6319 dfd
= open(tmpfile
,O_CREAT
|O_WRONLY
,0644);
6322 redisLog(REDIS_WARNING
,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno
));
6326 int nread
, nwritten
;
6328 nread
= read(fd
,buf
,(dumpsize
< 1024)?dumpsize
:1024);
6330 redisLog(REDIS_WARNING
,"I/O error trying to sync with MASTER: %s",
6336 nwritten
= write(dfd
,buf
,nread
);
6337 if (nwritten
== -1) {
6338 redisLog(REDIS_WARNING
,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno
));
6346 if (rename(tmpfile
,server
.dbfilename
) == -1) {
6347 redisLog(REDIS_WARNING
,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno
));
6353 if (rdbLoad(server
.dbfilename
) != REDIS_OK
) {
6354 redisLog(REDIS_WARNING
,"Failed trying to load the MASTER synchronization DB from disk");
6358 server
.master
= createClient(fd
);
6359 server
.master
->flags
|= REDIS_MASTER
;
6360 server
.master
->authenticated
= 1;
6361 server
.replstate
= REDIS_REPL_CONNECTED
;
6365 static void slaveofCommand(redisClient
*c
) {
6366 if (!strcasecmp(c
->argv
[1]->ptr
,"no") &&
6367 !strcasecmp(c
->argv
[2]->ptr
,"one")) {
6368 if (server
.masterhost
) {
6369 sdsfree(server
.masterhost
);
6370 server
.masterhost
= NULL
;
6371 if (server
.master
) freeClient(server
.master
);
6372 server
.replstate
= REDIS_REPL_NONE
;
6373 redisLog(REDIS_NOTICE
,"MASTER MODE enabled (user request)");
6376 sdsfree(server
.masterhost
);
6377 server
.masterhost
= sdsdup(c
->argv
[1]->ptr
);
6378 server
.masterport
= atoi(c
->argv
[2]->ptr
);
6379 if (server
.master
) freeClient(server
.master
);
6380 server
.replstate
= REDIS_REPL_CONNECT
;
6381 redisLog(REDIS_NOTICE
,"SLAVE OF %s:%d enabled (user request)",
6382 server
.masterhost
, server
.masterport
);
6384 addReply(c
,shared
.ok
);
6387 /* ============================ Maxmemory directive ======================== */
6389 /* Try to free one object form the pre-allocated objects free list.
6390 * This is useful under low mem conditions as by default we take 1 million
6391 * free objects allocated. On success REDIS_OK is returned, otherwise
6393 static int tryFreeOneObjectFromFreelist(void) {
6396 if (server
.vm_enabled
) pthread_mutex_lock(&server
.obj_freelist_mutex
);
6397 if (listLength(server
.objfreelist
)) {
6398 listNode
*head
= listFirst(server
.objfreelist
);
6399 o
= listNodeValue(head
);
6400 listDelNode(server
.objfreelist
,head
);
6401 if (server
.vm_enabled
) pthread_mutex_unlock(&server
.obj_freelist_mutex
);
6405 if (server
.vm_enabled
) pthread_mutex_unlock(&server
.obj_freelist_mutex
);
6410 /* This function gets called when 'maxmemory' is set on the config file to limit
6411 * the max memory used by the server, and we are out of memory.
6412 * This function will try to, in order:
6414 * - Free objects from the free list
6415 * - Try to remove keys with an EXPIRE set
6417 * It is not possible to free enough memory to reach used-memory < maxmemory
6418 * the server will start refusing commands that will enlarge even more the
6421 static void freeMemoryIfNeeded(void) {
6422 while (server
.maxmemory
&& zmalloc_used_memory() > server
.maxmemory
) {
6423 int j
, k
, freed
= 0;
6425 if (tryFreeOneObjectFromFreelist() == REDIS_OK
) continue;
6426 for (j
= 0; j
< server
.dbnum
; j
++) {
6428 robj
*minkey
= NULL
;
6429 struct dictEntry
*de
;
6431 if (dictSize(server
.db
[j
].expires
)) {
6433 /* From a sample of three keys drop the one nearest to
6434 * the natural expire */
6435 for (k
= 0; k
< 3; k
++) {
6438 de
= dictGetRandomKey(server
.db
[j
].expires
);
6439 t
= (time_t) dictGetEntryVal(de
);
6440 if (minttl
== -1 || t
< minttl
) {
6441 minkey
= dictGetEntryKey(de
);
6445 deleteKey(server
.db
+j
,minkey
);
6448 if (!freed
) return; /* nothing to free... */
6452 /* ============================== Append Only file ========================== */
6454 static void feedAppendOnlyFile(struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
) {
6455 sds buf
= sdsempty();
6461 /* The DB this command was targetting is not the same as the last command
6462 * we appendend. To issue a SELECT command is needed. */
6463 if (dictid
!= server
.appendseldb
) {
6466 snprintf(seldb
,sizeof(seldb
),"%d",dictid
);
6467 buf
= sdscatprintf(buf
,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
6468 (unsigned long)strlen(seldb
),seldb
);
6469 server
.appendseldb
= dictid
;
6472 /* "Fix" the argv vector if the command is EXPIRE. We want to translate
6473 * EXPIREs into EXPIREATs calls */
6474 if (cmd
->proc
== expireCommand
) {
6477 tmpargv
[0] = createStringObject("EXPIREAT",8);
6478 tmpargv
[1] = argv
[1];
6479 incrRefCount(argv
[1]);
6480 when
= time(NULL
)+strtol(argv
[2]->ptr
,NULL
,10);
6481 tmpargv
[2] = createObject(REDIS_STRING
,
6482 sdscatprintf(sdsempty(),"%ld",when
));
6486 /* Append the actual command */
6487 buf
= sdscatprintf(buf
,"*%d\r\n",argc
);
6488 for (j
= 0; j
< argc
; j
++) {
6491 o
= getDecodedObject(o
);
6492 buf
= sdscatprintf(buf
,"$%lu\r\n",(unsigned long)sdslen(o
->ptr
));
6493 buf
= sdscatlen(buf
,o
->ptr
,sdslen(o
->ptr
));
6494 buf
= sdscatlen(buf
,"\r\n",2);
6498 /* Free the objects from the modified argv for EXPIREAT */
6499 if (cmd
->proc
== expireCommand
) {
6500 for (j
= 0; j
< 3; j
++)
6501 decrRefCount(argv
[j
]);
6504 /* We want to perform a single write. This should be guaranteed atomic
6505 * at least if the filesystem we are writing is a real physical one.
6506 * While this will save us against the server being killed I don't think
6507 * there is much to do about the whole server stopping for power problems
6509 nwritten
= write(server
.appendfd
,buf
,sdslen(buf
));
6510 if (nwritten
!= (signed)sdslen(buf
)) {
6511 /* Ooops, we are in troubles. The best thing to do for now is
6512 * to simply exit instead to give the illusion that everything is
6513 * working as expected. */
6514 if (nwritten
== -1) {
6515 redisLog(REDIS_WARNING
,"Exiting on error writing to the append-only file: %s",strerror(errno
));
6517 redisLog(REDIS_WARNING
,"Exiting on short write while writing to the append-only file: %s",strerror(errno
));
6521 /* If a background append only file rewriting is in progress we want to
6522 * accumulate the differences between the child DB and the current one
6523 * in a buffer, so that when the child process will do its work we
6524 * can append the differences to the new append only file. */
6525 if (server
.bgrewritechildpid
!= -1)
6526 server
.bgrewritebuf
= sdscatlen(server
.bgrewritebuf
,buf
,sdslen(buf
));
6530 if (server
.appendfsync
== APPENDFSYNC_ALWAYS
||
6531 (server
.appendfsync
== APPENDFSYNC_EVERYSEC
&&
6532 now
-server
.lastfsync
> 1))
6534 fsync(server
.appendfd
); /* Let's try to get this data on the disk */
6535 server
.lastfsync
= now
;
6539 /* In Redis commands are always executed in the context of a client, so in
6540 * order to load the append only file we need to create a fake client. */
6541 static struct redisClient
*createFakeClient(void) {
6542 struct redisClient
*c
= zmalloc(sizeof(*c
));
6546 c
->querybuf
= sdsempty();
6550 /* We set the fake client as a slave waiting for the synchronization
6551 * so that Redis will not try to send replies to this client. */
6552 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_START
;
6553 c
->reply
= listCreate();
6554 listSetFreeMethod(c
->reply
,decrRefCount
);
6555 listSetDupMethod(c
->reply
,dupClientReplyValue
);
6559 static void freeFakeClient(struct redisClient
*c
) {
6560 sdsfree(c
->querybuf
);
6561 listRelease(c
->reply
);
6565 /* Replay the append log file. On error REDIS_OK is returned. On non fatal
6566 * error (the append only file is zero-length) REDIS_ERR is returned. On
6567 * fatal error an error message is logged and the program exists. */
6568 int loadAppendOnlyFile(char *filename
) {
6569 struct redisClient
*fakeClient
;
6570 FILE *fp
= fopen(filename
,"r");
6571 struct redis_stat sb
;
6572 unsigned long long loadedkeys
= 0;
6574 if (redis_fstat(fileno(fp
),&sb
) != -1 && sb
.st_size
== 0)
6578 redisLog(REDIS_WARNING
,"Fatal error: can't open the append log file for reading: %s",strerror(errno
));
6582 fakeClient
= createFakeClient();
6589 struct redisCommand
*cmd
;
6591 if (fgets(buf
,sizeof(buf
),fp
) == NULL
) {
6597 if (buf
[0] != '*') goto fmterr
;
6599 argv
= zmalloc(sizeof(robj
*)*argc
);
6600 for (j
= 0; j
< argc
; j
++) {
6601 if (fgets(buf
,sizeof(buf
),fp
) == NULL
) goto readerr
;
6602 if (buf
[0] != '$') goto fmterr
;
6603 len
= strtol(buf
+1,NULL
,10);
6604 argsds
= sdsnewlen(NULL
,len
);
6605 if (len
&& fread(argsds
,len
,1,fp
) == 0) goto fmterr
;
6606 argv
[j
] = createObject(REDIS_STRING
,argsds
);
6607 if (fread(buf
,2,1,fp
) == 0) goto fmterr
; /* discard CRLF */
6610 /* Command lookup */
6611 cmd
= lookupCommand(argv
[0]->ptr
);
6613 redisLog(REDIS_WARNING
,"Unknown command '%s' reading the append only file", argv
[0]->ptr
);
6616 /* Try object sharing and encoding */
6617 if (server
.shareobjects
) {
6619 for(j
= 1; j
< argc
; j
++)
6620 argv
[j
] = tryObjectSharing(argv
[j
]);
6622 if (cmd
->flags
& REDIS_CMD_BULK
)
6623 tryObjectEncoding(argv
[argc
-1]);
6624 /* Run the command in the context of a fake client */
6625 fakeClient
->argc
= argc
;
6626 fakeClient
->argv
= argv
;
6627 cmd
->proc(fakeClient
);
6628 /* Discard the reply objects list from the fake client */
6629 while(listLength(fakeClient
->reply
))
6630 listDelNode(fakeClient
->reply
,listFirst(fakeClient
->reply
));
6631 /* Clean up, ready for the next command */
6632 for (j
= 0; j
< argc
; j
++) decrRefCount(argv
[j
]);
6634 /* Handle swapping while loading big datasets when VM is on */
6636 if (server
.vm_enabled
&& (loadedkeys
% 5000) == 0) {
6637 while (zmalloc_used_memory() > server
.vm_max_memory
) {
6638 if (vmSwapOneObjectBlocking() == REDIS_ERR
) break;
6643 freeFakeClient(fakeClient
);
6648 redisLog(REDIS_WARNING
,"Unexpected end of file reading the append only file");
6650 redisLog(REDIS_WARNING
,"Unrecoverable error reading the append only file: %s", strerror(errno
));
6654 redisLog(REDIS_WARNING
,"Bad file format reading the append only file");
6658 /* Write an object into a file in the bulk format $<count>\r\n<payload>\r\n */
6659 static int fwriteBulk(FILE *fp
, robj
*obj
) {
6663 if (obj
->storage
== REDIS_VM_MEMORY
&& obj
->encoding
!= REDIS_ENCODING_RAW
){
6664 obj
= getDecodedObject(obj
);
6667 snprintf(buf
,sizeof(buf
),"$%ld\r\n",(long)sdslen(obj
->ptr
));
6668 if (fwrite(buf
,strlen(buf
),1,fp
) == 0) goto err
;
6669 if (sdslen(obj
->ptr
) && fwrite(obj
->ptr
,sdslen(obj
->ptr
),1,fp
) == 0)
6671 if (fwrite("\r\n",2,1,fp
) == 0) goto err
;
6672 if (decrrc
) decrRefCount(obj
);
6675 if (decrrc
) decrRefCount(obj
);
6679 /* Write a double value in bulk format $<count>\r\n<payload>\r\n */
6680 static int fwriteBulkDouble(FILE *fp
, double d
) {
6681 char buf
[128], dbuf
[128];
6683 snprintf(dbuf
,sizeof(dbuf
),"%.17g\r\n",d
);
6684 snprintf(buf
,sizeof(buf
),"$%lu\r\n",(unsigned long)strlen(dbuf
)-2);
6685 if (fwrite(buf
,strlen(buf
),1,fp
) == 0) return 0;
6686 if (fwrite(dbuf
,strlen(dbuf
),1,fp
) == 0) return 0;
6690 /* Write a long value in bulk format $<count>\r\n<payload>\r\n */
6691 static int fwriteBulkLong(FILE *fp
, long l
) {
6692 char buf
[128], lbuf
[128];
6694 snprintf(lbuf
,sizeof(lbuf
),"%ld\r\n",l
);
6695 snprintf(buf
,sizeof(buf
),"$%lu\r\n",(unsigned long)strlen(lbuf
)-2);
6696 if (fwrite(buf
,strlen(buf
),1,fp
) == 0) return 0;
6697 if (fwrite(lbuf
,strlen(lbuf
),1,fp
) == 0) return 0;
6701 /* Write a sequence of commands able to fully rebuild the dataset into
6702 * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */
6703 static int rewriteAppendOnlyFile(char *filename
) {
6704 dictIterator
*di
= NULL
;
6709 time_t now
= time(NULL
);
6711 /* Note that we have to use a different temp name here compared to the
6712 * one used by rewriteAppendOnlyFileBackground() function. */
6713 snprintf(tmpfile
,256,"temp-rewriteaof-%d.aof", (int) getpid());
6714 fp
= fopen(tmpfile
,"w");
6716 redisLog(REDIS_WARNING
, "Failed rewriting the append only file: %s", strerror(errno
));
6719 for (j
= 0; j
< server
.dbnum
; j
++) {
6720 char selectcmd
[] = "*2\r\n$6\r\nSELECT\r\n";
6721 redisDb
*db
= server
.db
+j
;
6723 if (dictSize(d
) == 0) continue;
6724 di
= dictGetIterator(d
);
6730 /* SELECT the new DB */
6731 if (fwrite(selectcmd
,sizeof(selectcmd
)-1,1,fp
) == 0) goto werr
;
6732 if (fwriteBulkLong(fp
,j
) == 0) goto werr
;
6734 /* Iterate this DB writing every entry */
6735 while((de
= dictNext(di
)) != NULL
) {
6740 key
= dictGetEntryKey(de
);
6741 /* If the value for this key is swapped, load a preview in memory.
6742 * We use a "swapped" flag to remember if we need to free the
6743 * value object instead to just increment the ref count anyway
6744 * in order to avoid copy-on-write of pages if we are forked() */
6745 if (!server
.vm_enabled
|| key
->storage
== REDIS_VM_MEMORY
||
6746 key
->storage
== REDIS_VM_SWAPPING
) {
6747 o
= dictGetEntryVal(de
);
6750 o
= vmPreviewObject(key
);
6753 expiretime
= getExpire(db
,key
);
6755 /* Save the key and associated value */
6756 if (o
->type
== REDIS_STRING
) {
6757 /* Emit a SET command */
6758 char cmd
[]="*3\r\n$3\r\nSET\r\n";
6759 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6761 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6762 if (fwriteBulk(fp
,o
) == 0) goto werr
;
6763 } else if (o
->type
== REDIS_LIST
) {
6764 /* Emit the RPUSHes needed to rebuild the list */
6765 list
*list
= o
->ptr
;
6769 while((ln
= listYield(list
))) {
6770 char cmd
[]="*3\r\n$5\r\nRPUSH\r\n";
6771 robj
*eleobj
= listNodeValue(ln
);
6773 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6774 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6775 if (fwriteBulk(fp
,eleobj
) == 0) goto werr
;
6777 } else if (o
->type
== REDIS_SET
) {
6778 /* Emit the SADDs needed to rebuild the set */
6780 dictIterator
*di
= dictGetIterator(set
);
6783 while((de
= dictNext(di
)) != NULL
) {
6784 char cmd
[]="*3\r\n$4\r\nSADD\r\n";
6785 robj
*eleobj
= dictGetEntryKey(de
);
6787 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6788 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6789 if (fwriteBulk(fp
,eleobj
) == 0) goto werr
;
6791 dictReleaseIterator(di
);
6792 } else if (o
->type
== REDIS_ZSET
) {
6793 /* Emit the ZADDs needed to rebuild the sorted set */
6795 dictIterator
*di
= dictGetIterator(zs
->dict
);
6798 while((de
= dictNext(di
)) != NULL
) {
6799 char cmd
[]="*4\r\n$4\r\nZADD\r\n";
6800 robj
*eleobj
= dictGetEntryKey(de
);
6801 double *score
= dictGetEntryVal(de
);
6803 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6804 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6805 if (fwriteBulkDouble(fp
,*score
) == 0) goto werr
;
6806 if (fwriteBulk(fp
,eleobj
) == 0) goto werr
;
6808 dictReleaseIterator(di
);
6810 redisAssert(0 != 0);
6812 /* Save the expire time */
6813 if (expiretime
!= -1) {
6814 char cmd
[]="*3\r\n$8\r\nEXPIREAT\r\n";
6815 /* If this key is already expired skip it */
6816 if (expiretime
< now
) continue;
6817 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6818 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6819 if (fwriteBulkLong(fp
,expiretime
) == 0) goto werr
;
6821 if (swapped
) decrRefCount(o
);
6823 dictReleaseIterator(di
);
6826 /* Make sure data will not remain on the OS's output buffers */
6831 /* Use RENAME to make sure the DB file is changed atomically only
6832 * if the generate DB file is ok. */
6833 if (rename(tmpfile
,filename
) == -1) {
6834 redisLog(REDIS_WARNING
,"Error moving temp append only file on the final destination: %s", strerror(errno
));
6838 redisLog(REDIS_NOTICE
,"SYNC append only file rewrite performed");
6844 redisLog(REDIS_WARNING
,"Write error writing append only file on disk: %s", strerror(errno
));
6845 if (di
) dictReleaseIterator(di
);
6849 /* This is how rewriting of the append only file in background works:
6851 * 1) The user calls BGREWRITEAOF
6852 * 2) Redis calls this function, that forks():
6853 * 2a) the child rewrite the append only file in a temp file.
6854 * 2b) the parent accumulates differences in server.bgrewritebuf.
6855 * 3) When the child finished '2a' exists.
6856 * 4) The parent will trap the exit code, if it's OK, will append the
6857 * data accumulated into server.bgrewritebuf into the temp file, and
6858 * finally will rename(2) the temp file in the actual file name.
6859 * The the new file is reopened as the new append only file. Profit!
6861 static int rewriteAppendOnlyFileBackground(void) {
6864 if (server
.bgrewritechildpid
!= -1) return REDIS_ERR
;
6865 if ((childpid
= fork()) == 0) {
6870 snprintf(tmpfile
,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
6871 if (rewriteAppendOnlyFile(tmpfile
) == REDIS_OK
) {
6878 if (childpid
== -1) {
6879 redisLog(REDIS_WARNING
,
6880 "Can't rewrite append only file in background: fork: %s",
6884 redisLog(REDIS_NOTICE
,
6885 "Background append only file rewriting started by pid %d",childpid
);
6886 server
.bgrewritechildpid
= childpid
;
6887 /* We set appendseldb to -1 in order to force the next call to the
6888 * feedAppendOnlyFile() to issue a SELECT command, so the differences
6889 * accumulated by the parent into server.bgrewritebuf will start
6890 * with a SELECT statement and it will be safe to merge. */
6891 server
.appendseldb
= -1;
6894 return REDIS_OK
; /* unreached */
6897 static void bgrewriteaofCommand(redisClient
*c
) {
6898 if (server
.bgrewritechildpid
!= -1) {
6899 addReplySds(c
,sdsnew("-ERR background append only file rewriting already in progress\r\n"));
6902 if (rewriteAppendOnlyFileBackground() == REDIS_OK
) {
6903 char *status
= "+Background append only file rewriting started\r\n";
6904 addReplySds(c
,sdsnew(status
));
6906 addReply(c
,shared
.err
);
6910 static void aofRemoveTempFile(pid_t childpid
) {
6913 snprintf(tmpfile
,256,"temp-rewriteaof-bg-%d.aof", (int) childpid
);
6917 /* Virtual Memory is composed mainly of two subsystems:
6918 * - Blocking Virutal Memory
6919 * - Threaded Virtual Memory I/O
6920 * The two parts are not fully decoupled, but functions are split among two
6921 * different sections of the source code (delimited by comments) in order to
6922 * make more clear what functionality is about the blocking VM and what about
6923 * the threaded (not blocking) VM.
6927 * Redis VM is a blocking VM (one that blocks reading swapped values from
6928 * disk into memory when a value swapped out is needed in memory) that is made
6929 * unblocking by trying to examine the command argument vector in order to
6930 * load in background values that will likely be needed in order to exec
6931 * the command. The command is executed only once all the relevant keys
6932 * are loaded into memory.
6934 * This basically is almost as simple of a blocking VM, but almost as parallel
6935 * as a fully non-blocking VM.
6938 /* =================== Virtual Memory - Blocking Side ====================== */
6939 static void vmInit(void) {
6943 server
.vm_fp
= fopen("/tmp/redisvm","w+b");
6944 if (server
.vm_fp
== NULL
) {
6945 redisLog(REDIS_WARNING
,"Impossible to open the swap file. Exiting.");
6948 server
.vm_fd
= fileno(server
.vm_fp
);
6949 server
.vm_next_page
= 0;
6950 server
.vm_near_pages
= 0;
6951 server
.vm_stats_used_pages
= 0;
6952 server
.vm_stats_swapped_objects
= 0;
6953 server
.vm_stats_swapouts
= 0;
6954 server
.vm_stats_swapins
= 0;
6955 totsize
= server
.vm_pages
*server
.vm_page_size
;
6956 redisLog(REDIS_NOTICE
,"Allocating %lld bytes of swap file",totsize
);
6957 if (ftruncate(server
.vm_fd
,totsize
) == -1) {
6958 redisLog(REDIS_WARNING
,"Can't ftruncate swap file: %s. Exiting.",
6962 redisLog(REDIS_NOTICE
,"Swap file allocated with success");
6964 server
.vm_bitmap
= zmalloc((server
.vm_pages
+7)/8);
6965 redisLog(REDIS_VERBOSE
,"Allocated %lld bytes page table for %lld pages",
6966 (long long) (server
.vm_pages
+7)/8, server
.vm_pages
);
6967 memset(server
.vm_bitmap
,0,(server
.vm_pages
+7)/8);
6968 /* Try to remove the swap file, so the OS will really delete it from the
6969 * file system when Redis exists. */
6970 unlink("/tmp/redisvm");
6972 /* Initialize threaded I/O (used by Virtual Memory) */
6973 server
.io_newjobs
= listCreate();
6974 server
.io_processing
= listCreate();
6975 server
.io_processed
= listCreate();
6976 server
.io_clients
= listCreate();
6977 pthread_mutex_init(&server
.io_mutex
,NULL
);
6978 pthread_mutex_init(&server
.obj_freelist_mutex
,NULL
);
6979 pthread_mutex_init(&server
.io_swapfile_mutex
,NULL
);
6980 server
.io_active_threads
= 0;
6981 if (pipe(pipefds
) == -1) {
6982 redisLog(REDIS_WARNING
,"Unable to intialized VM: pipe(2): %s. Exiting."
6986 server
.io_ready_pipe_read
= pipefds
[0];
6987 server
.io_ready_pipe_write
= pipefds
[1];
6988 redisAssert(anetNonBlock(NULL
,server
.io_ready_pipe_read
) != ANET_ERR
);
6989 /* Listen for events in the threaded I/O pipe */
6990 if (aeCreateFileEvent(server
.el
, server
.io_ready_pipe_read
, AE_READABLE
,
6991 vmThreadedIOCompletedJob
, NULL
) == AE_ERR
)
6992 oom("creating file event");
6995 /* Mark the page as used */
6996 static void vmMarkPageUsed(off_t page
) {
6997 off_t byte
= page
/8;
6999 server
.vm_bitmap
[byte
] |= 1<<bit
;
7000 redisLog(REDIS_DEBUG
,"Mark used: %lld (byte:%lld bit:%d)\n",
7001 (long long)page
, (long long)byte
, bit
);
7004 /* Mark N contiguous pages as used, with 'page' being the first. */
7005 static void vmMarkPagesUsed(off_t page
, off_t count
) {
7008 for (j
= 0; j
< count
; j
++)
7009 vmMarkPageUsed(page
+j
);
7010 server
.vm_stats_used_pages
+= count
;
7013 /* Mark the page as free */
7014 static void vmMarkPageFree(off_t page
) {
7015 off_t byte
= page
/8;
7017 server
.vm_bitmap
[byte
] &= ~(1<<bit
);
7020 /* Mark N contiguous pages as free, with 'page' being the first. */
7021 static void vmMarkPagesFree(off_t page
, off_t count
) {
7024 for (j
= 0; j
< count
; j
++)
7025 vmMarkPageFree(page
+j
);
7026 server
.vm_stats_used_pages
-= count
;
7029 /* Test if the page is free */
7030 static int vmFreePage(off_t page
) {
7031 off_t byte
= page
/8;
7033 return (server
.vm_bitmap
[byte
] & (1<<bit
)) == 0;
7036 /* Find N contiguous free pages storing the first page of the cluster in *first.
7037 * Returns REDIS_OK if it was able to find N contiguous pages, otherwise
7038 * REDIS_ERR is returned.
7040 * This function uses a simple algorithm: we try to allocate
7041 * REDIS_VM_MAX_NEAR_PAGES sequentially, when we reach this limit we start
7042 * again from the start of the swap file searching for free spaces.
7044 * If it looks pretty clear that there are no free pages near our offset
7045 * we try to find less populated places doing a forward jump of
7046 * REDIS_VM_MAX_RANDOM_JUMP, then we start scanning again a few pages
7047 * without hurry, and then we jump again and so forth...
7049 * This function can be improved using a free list to avoid to guess
7050 * too much, since we could collect data about freed pages.
7052 * note: I implemented this function just after watching an episode of
7053 * Battlestar Galactica, where the hybrid was continuing to say "JUMP!"
7055 static int vmFindContiguousPages(off_t
*first
, int n
) {
7056 off_t base
, offset
= 0, since_jump
= 0, numfree
= 0;
7058 if (server
.vm_near_pages
== REDIS_VM_MAX_NEAR_PAGES
) {
7059 server
.vm_near_pages
= 0;
7060 server
.vm_next_page
= 0;
7062 server
.vm_near_pages
++; /* Yet another try for pages near to the old ones */
7063 base
= server
.vm_next_page
;
7065 while(offset
< server
.vm_pages
) {
7066 off_t
this = base
+offset
;
7068 redisLog(REDIS_DEBUG
, "THIS: %lld (%c)\n", (long long) this, vmFreePage(this) ? 'F' : 'X');
7069 /* If we overflow, restart from page zero */
7070 if (this >= server
.vm_pages
) {
7071 this -= server
.vm_pages
;
7073 /* Just overflowed, what we found on tail is no longer
7074 * interesting, as it's no longer contiguous. */
7078 if (vmFreePage(this)) {
7079 /* This is a free page */
7081 /* Already got N free pages? Return to the caller, with success */
7083 *first
= this-(n
-1);
7084 server
.vm_next_page
= this+1;
7088 /* The current one is not a free page */
7092 /* Fast-forward if the current page is not free and we already
7093 * searched enough near this place. */
7095 if (!numfree
&& since_jump
>= REDIS_VM_MAX_RANDOM_JUMP
/4) {
7096 offset
+= random() % REDIS_VM_MAX_RANDOM_JUMP
;
7098 /* Note that even if we rewind after the jump, we are don't need
7099 * to make sure numfree is set to zero as we only jump *if* it
7100 * is set to zero. */
7102 /* Otherwise just check the next page */
7109 /* Write the specified object at the specified page of the swap file */
7110 static int vmWriteObjectOnSwap(robj
*o
, off_t page
) {
7111 if (server
.vm_enabled
) pthread_mutex_lock(&server
.io_swapfile_mutex
);
7112 if (fseeko(server
.vm_fp
,page
*server
.vm_page_size
,SEEK_SET
) == -1) {
7113 if (server
.vm_enabled
) pthread_mutex_unlock(&server
.io_swapfile_mutex
);
7114 redisLog(REDIS_WARNING
,
7115 "Critical VM problem in vmSwapObjectBlocking(): can't seek: %s",
7119 rdbSaveObject(server
.vm_fp
,o
);
7120 if (server
.vm_enabled
) pthread_mutex_unlock(&server
.io_swapfile_mutex
);
7124 /* Swap the 'val' object relative to 'key' into disk. Store all the information
7125 * needed to later retrieve the object into the key object.
7126 * If we can't find enough contiguous empty pages to swap the object on disk
7127 * REDIS_ERR is returned. */
7128 static int vmSwapObjectBlocking(robj
*key
, robj
*val
) {
7129 off_t pages
= rdbSavedObjectPages(val
,NULL
);
7132 assert(key
->storage
== REDIS_VM_MEMORY
);
7133 assert(key
->refcount
== 1);
7134 if (vmFindContiguousPages(&page
,pages
) == REDIS_ERR
) return REDIS_ERR
;
7135 if (vmWriteObjectOnSwap(val
,page
) == REDIS_ERR
) return REDIS_ERR
;
7136 key
->vm
.page
= page
;
7137 key
->vm
.usedpages
= pages
;
7138 key
->storage
= REDIS_VM_SWAPPED
;
7139 key
->vtype
= val
->type
;
7140 decrRefCount(val
); /* Deallocate the object from memory. */
7141 vmMarkPagesUsed(page
,pages
);
7142 redisLog(REDIS_DEBUG
,"VM: object %s swapped out at %lld (%lld pages)",
7143 (unsigned char*) key
->ptr
,
7144 (unsigned long long) page
, (unsigned long long) pages
);
7145 server
.vm_stats_swapped_objects
++;
7146 server
.vm_stats_swapouts
++;
7147 fflush(server
.vm_fp
);
7151 static robj
*vmReadObjectFromSwap(off_t page
, int type
) {
7154 if (server
.vm_enabled
) pthread_mutex_lock(&server
.io_swapfile_mutex
);
7155 if (fseeko(server
.vm_fp
,page
*server
.vm_page_size
,SEEK_SET
) == -1) {
7156 redisLog(REDIS_WARNING
,
7157 "Unrecoverable VM problem in vmLoadObject(): can't seek: %s",
7161 o
= rdbLoadObject(type
,server
.vm_fp
);
7163 redisLog(REDIS_WARNING
, "Unrecoverable VM problem in vmLoadObject(): can't load object from swap file: %s", strerror(errno
));
7166 if (server
.vm_enabled
) pthread_mutex_unlock(&server
.io_swapfile_mutex
);
7170 /* Load the value object relative to the 'key' object from swap to memory.
7171 * The newly allocated object is returned.
7173 * If preview is true the unserialized object is returned to the caller but
7174 * no changes are made to the key object, nor the pages are marked as freed */
7175 static robj
*vmGenericLoadObject(robj
*key
, int preview
) {
7178 redisAssert(key
->storage
== REDIS_VM_SWAPPED
);
7179 val
= vmReadObjectFromSwap(key
->vm
.page
,key
->vtype
);
7181 key
->storage
= REDIS_VM_MEMORY
;
7182 key
->vm
.atime
= server
.unixtime
;
7183 vmMarkPagesFree(key
->vm
.page
,key
->vm
.usedpages
);
7184 redisLog(REDIS_DEBUG
, "VM: object %s loaded from disk",
7185 (unsigned char*) key
->ptr
);
7186 server
.vm_stats_swapped_objects
--;
7188 redisLog(REDIS_DEBUG
, "VM: object %s previewed from disk",
7189 (unsigned char*) key
->ptr
);
7191 server
.vm_stats_swapins
++;
7195 /* Plain object loading, from swap to memory */
7196 static robj
*vmLoadObject(robj
*key
) {
7197 /* If we are loading the object in background, stop it, we
7198 * need to load this object synchronously ASAP. */
7199 if (key
->storage
== REDIS_VM_LOADING
)
7200 vmCancelThreadedIOJob(key
);
7201 return vmGenericLoadObject(key
,0);
7204 /* Just load the value on disk, without to modify the key.
7205 * This is useful when we want to perform some operation on the value
7206 * without to really bring it from swap to memory, like while saving the
7207 * dataset or rewriting the append only log. */
7208 static robj
*vmPreviewObject(robj
*key
) {
7209 return vmGenericLoadObject(key
,1);
7212 /* How a good candidate is this object for swapping?
7213 * The better candidate it is, the greater the returned value.
7215 * Currently we try to perform a fast estimation of the object size in
7216 * memory, and combine it with aging informations.
7218 * Basically swappability = idle-time * log(estimated size)
7220 * Bigger objects are preferred over smaller objects, but not
7221 * proportionally, this is why we use the logarithm. This algorithm is
7222 * just a first try and will probably be tuned later. */
7223 static double computeObjectSwappability(robj
*o
) {
7224 time_t age
= server
.unixtime
- o
->vm
.atime
;
7228 struct dictEntry
*de
;
7231 if (age
<= 0) return 0;
7234 if (o
->encoding
!= REDIS_ENCODING_RAW
) {
7237 asize
= sdslen(o
->ptr
)+sizeof(*o
)+sizeof(long)*2;
7242 listNode
*ln
= listFirst(l
);
7244 asize
= sizeof(list
);
7246 robj
*ele
= ln
->value
;
7249 elesize
= (ele
->encoding
== REDIS_ENCODING_RAW
) ?
7250 (sizeof(*o
)+sdslen(ele
->ptr
)) :
7252 asize
+= (sizeof(listNode
)+elesize
)*listLength(l
);
7257 z
= (o
->type
== REDIS_ZSET
);
7258 d
= z
? ((zset
*)o
->ptr
)->dict
: o
->ptr
;
7260 asize
= sizeof(dict
)+(sizeof(struct dictEntry
*)*dictSlots(d
));
7261 if (z
) asize
+= sizeof(zset
)-sizeof(dict
);
7266 de
= dictGetRandomKey(d
);
7267 ele
= dictGetEntryKey(de
);
7268 elesize
= (ele
->encoding
== REDIS_ENCODING_RAW
) ?
7269 (sizeof(*o
)+sdslen(ele
->ptr
)) :
7271 asize
+= (sizeof(struct dictEntry
)+elesize
)*dictSize(d
);
7272 if (z
) asize
+= sizeof(zskiplistNode
)*dictSize(d
);
7276 return (double)asize
*log(1+asize
);
7279 /* Try to swap an object that's a good candidate for swapping.
7280 * Returns REDIS_OK if the object was swapped, REDIS_ERR if it's not possible
7281 * to swap any object at all.
7283 * If 'usethreaded' is true, Redis will try to swap the object in background
7284 * using I/O threads. */
7285 static int vmSwapOneObject(int usethreads
) {
7287 struct dictEntry
*best
= NULL
;
7288 double best_swappability
= 0;
7289 redisDb
*best_db
= NULL
;
7292 for (j
= 0; j
< server
.dbnum
; j
++) {
7293 redisDb
*db
= server
.db
+j
;
7294 int maxtries
= 1000;
7296 if (dictSize(db
->dict
) == 0) continue;
7297 for (i
= 0; i
< 5; i
++) {
7299 double swappability
;
7301 if (maxtries
) maxtries
--;
7302 de
= dictGetRandomKey(db
->dict
);
7303 key
= dictGetEntryKey(de
);
7304 val
= dictGetEntryVal(de
);
7305 if (key
->storage
!= REDIS_VM_MEMORY
) {
7306 if (maxtries
) i
--; /* don't count this try */
7309 swappability
= computeObjectSwappability(val
);
7310 if (!best
|| swappability
> best_swappability
) {
7312 best_swappability
= swappability
;
7318 redisLog(REDIS_DEBUG
,"No swappable key found!");
7321 key
= dictGetEntryKey(best
);
7322 val
= dictGetEntryVal(best
);
7324 redisLog(REDIS_DEBUG
,"Key with best swappability: %s, %f",
7325 key
->ptr
, best_swappability
);
7327 /* Unshare the key if needed */
7328 if (key
->refcount
> 1) {
7329 robj
*newkey
= dupStringObject(key
);
7331 key
= dictGetEntryKey(best
) = newkey
;
7335 vmSwapObjectThreaded(key
,val
,best_db
);
7338 if (vmSwapObjectBlocking(key
,val
) == REDIS_OK
) {
7339 dictGetEntryVal(best
) = NULL
;
7347 static int vmSwapOneObjectBlocking() {
7348 return vmSwapOneObject(0);
7351 static int vmSwapOneObjectThreaded() {
7352 return vmSwapOneObject(1);
7355 /* Return true if it's safe to swap out objects in a given moment.
7356 * Basically we don't want to swap objects out while there is a BGSAVE
7357 * or a BGAEOREWRITE running in backgroud. */
7358 static int vmCanSwapOut(void) {
7359 return (server
.bgsavechildpid
== -1 && server
.bgrewritechildpid
== -1);
7362 /* Delete a key if swapped. Returns 1 if the key was found, was swapped
7363 * and was deleted. Otherwise 0 is returned. */
7364 static int deleteIfSwapped(redisDb
*db
, robj
*key
) {
7368 if ((de
= dictFind(db
->dict
,key
)) == NULL
) return 0;
7369 foundkey
= dictGetEntryKey(de
);
7370 if (foundkey
->storage
== REDIS_VM_MEMORY
) return 0;
7375 /* =================== Virtual Memory - Threaded I/O ======================= */
7377 static void freeIOJob(iojob
*j
) {
7378 if (j
->type
== REDIS_IOJOB_PREPARE_SWAP
||
7379 j
->type
== REDIS_IOJOB_DO_SWAP
)
7380 decrRefCount(j
->val
);
7381 decrRefCount(j
->key
);
7385 /* Every time a thread finished a Job, it writes a byte into the write side
7386 * of an unix pipe in order to "awake" the main thread, and this function
7388 static void vmThreadedIOCompletedJob(aeEventLoop
*el
, int fd
, void *privdata
,
7394 REDIS_NOTUSED(mask
);
7395 REDIS_NOTUSED(privdata
);
7397 /* For every byte we read in the read side of the pipe, there is one
7398 * I/O job completed to process. */
7399 while((retval
= read(fd
,buf
,1)) == 1) {
7403 struct dictEntry
*de
;
7405 redisLog(REDIS_DEBUG
,"Processing I/O completed job");
7406 assert(listLength(server
.io_processed
) != 0);
7408 /* Get the processed element (the oldest one) */
7410 ln
= listFirst(server
.io_processed
);
7412 listDelNode(server
.io_processed
,ln
);
7414 /* If this job is marked as canceled, just ignore it */
7419 /* Post process it in the main thread, as there are things we
7420 * can do just here to avoid race conditions and/or invasive locks */
7421 redisLog(REDIS_DEBUG
,"Job %p type: %d, key at %p (%s) refcount: %d\n", (void*) j
, j
->type
, (void*)j
->key
, (char*)j
->key
->ptr
, j
->key
->refcount
);
7422 de
= dictFind(j
->db
->dict
,j
->key
);
7424 key
= dictGetEntryKey(de
);
7425 if (j
->type
== REDIS_IOJOB_LOAD
) {
7426 /* Key loaded, bring it at home */
7427 key
->storage
= REDIS_VM_MEMORY
;
7428 key
->vm
.atime
= server
.unixtime
;
7429 vmMarkPagesFree(key
->vm
.page
,key
->vm
.usedpages
);
7430 redisLog(REDIS_DEBUG
, "VM: object %s loaded from disk (threaded)",
7431 (unsigned char*) key
->ptr
);
7432 server
.vm_stats_swapped_objects
--;
7433 server
.vm_stats_swapins
++;
7435 } else if (j
->type
== REDIS_IOJOB_PREPARE_SWAP
) {
7436 /* Now we know the amount of pages required to swap this object.
7437 * Let's find some space for it, and queue this task again
7438 * rebranded as REDIS_IOJOB_DO_SWAP. */
7439 if (vmFindContiguousPages(&j
->page
,j
->pages
) == REDIS_ERR
) {
7440 /* Ooops... no space! */
7443 j
->type
= REDIS_IOJOB_DO_SWAP
;
7448 } else if (j
->type
== REDIS_IOJOB_DO_SWAP
) {
7451 /* Key swapped. We can finally free some memory. */
7452 if (key
->storage
!= REDIS_VM_SWAPPING
) {
7453 printf("key->storage: %d\n",key
->storage
);
7454 printf("key->name: %s\n",(char*)key
->ptr
);
7455 printf("key->refcount: %d\n",key
->refcount
);
7456 printf("val: %p\n",(void*)j
->val
);
7457 printf("val->type: %d\n",j
->val
->type
);
7458 printf("val->ptr: %s\n",(char*)j
->val
->ptr
);
7460 redisAssert(key
->storage
== REDIS_VM_SWAPPING
);
7461 val
= dictGetEntryVal(de
);
7462 key
->vm
.page
= j
->page
;
7463 key
->vm
.usedpages
= j
->pages
;
7464 key
->storage
= REDIS_VM_SWAPPED
;
7465 key
->vtype
= j
->val
->type
;
7466 decrRefCount(val
); /* Deallocate the object from memory. */
7467 dictGetEntryVal(de
) = NULL
;
7468 vmMarkPagesUsed(j
->page
,j
->pages
);
7469 redisLog(REDIS_DEBUG
,
7470 "VM: object %s swapped out at %lld (%lld pages) (threaded)",
7471 (unsigned char*) key
->ptr
,
7472 (unsigned long long) j
->page
, (unsigned long long) j
->pages
);
7473 server
.vm_stats_swapped_objects
++;
7474 server
.vm_stats_swapouts
++;
7476 /* Put a few more swap requests in queue if we are still
7478 if (zmalloc_used_memory() > server
.vm_max_memory
) {
7482 more
= listLength(server
.io_newjobs
) <
7483 (unsigned) server
.vm_max_threads
;
7485 /* Don't waste CPU time if swappable objects are rare. */
7486 if (vmSwapOneObjectThreaded() == REDIS_ERR
) break;
7490 return; /* XXX REMOVE ME */
7492 if (retval
< 0 && errno
!= EAGAIN
) {
7493 redisLog(REDIS_WARNING
,
7494 "WARNING: read(2) error in vmThreadedIOCompletedJob() %s",
7499 static void lockThreadedIO(void) {
7500 pthread_mutex_lock(&server
.io_mutex
);
7503 static void unlockThreadedIO(void) {
7504 pthread_mutex_unlock(&server
.io_mutex
);
7507 /* Remove the specified object from the threaded I/O queue if still not
7508 * processed, otherwise make sure to flag it as canceled. */
7509 static void vmCancelThreadedIOJob(robj
*o
) {
7511 server
.io_newjobs
, /* 0 */
7512 server
.io_processing
, /* 1 */
7513 server
.io_processed
/* 2 */
7517 assert(o
->storage
== REDIS_VM_LOADING
|| o
->storage
== REDIS_VM_SWAPPING
);
7519 /* Search for a matching key in one of the queues */
7520 for (i
= 0; i
< 3; i
++) {
7523 listRewind(lists
[i
]);
7524 while ((ln
= listYield(lists
[i
])) != NULL
) {
7525 iojob
*job
= ln
->value
;
7527 if (job
->canceled
) continue; /* Skip this, already canceled. */
7528 if (compareStringObjects(job
->key
,o
) == 0) {
7529 redisLog(REDIS_DEBUG
,"*** CANCELED %p (%s)\n",
7530 (void*)job
, (char*)o
->ptr
);
7532 case 0: /* io_newjobs */
7533 /* If the job was yet not processed the best thing to do
7534 * is to remove it from the queue at all */
7536 listDelNode(lists
[i
],ln
);
7538 case 1: /* io_processing */
7539 case 2: /* io_processed */
7543 if (o
->storage
== REDIS_VM_LOADING
)
7544 o
->storage
= REDIS_VM_SWAPPED
;
7545 else if (o
->storage
== REDIS_VM_SWAPPING
)
7546 o
->storage
= REDIS_VM_MEMORY
;
7553 assert(1 != 1); /* We should never reach this */
7556 static void *IOThreadEntryPoint(void *arg
) {
7561 pthread_detach(pthread_self());
7563 /* Get a new job to process */
7565 if (listLength(server
.io_newjobs
) == 0) {
7566 /* No new jobs in queue, exit. */
7567 redisLog(REDIS_DEBUG
,"Thread %lld exiting, nothing to do\n",
7568 (long long) pthread_self());
7569 server
.io_active_threads
--;
7573 ln
= listFirst(server
.io_newjobs
);
7575 listDelNode(server
.io_newjobs
,ln
);
7576 /* Add the job in the processing queue */
7577 j
->thread
= pthread_self();
7578 listAddNodeTail(server
.io_processing
,j
);
7579 ln
= listLast(server
.io_processing
); /* We use ln later to remove it */
7581 redisLog(REDIS_DEBUG
,"Thread %lld got a new job (type %d): %p about key '%s'\n",
7582 (long long) pthread_self(), j
->type
, (void*)j
, (char*)j
->key
->ptr
);
7584 /* Process the Job */
7585 if (j
->type
== REDIS_IOJOB_LOAD
) {
7586 } else if (j
->type
== REDIS_IOJOB_PREPARE_SWAP
) {
7587 FILE *fp
= fopen("/dev/null","w+");
7588 j
->pages
= rdbSavedObjectPages(j
->val
,fp
);
7590 } else if (j
->type
== REDIS_IOJOB_DO_SWAP
) {
7591 if (vmWriteObjectOnSwap(j
->val
,j
->page
) == REDIS_ERR
)
7595 /* Done: insert the job into the processed queue */
7596 redisLog(REDIS_DEBUG
,"Thread %lld completed the job: %p (key %s)\n",
7597 (long long) pthread_self(), (void*)j
, (char*)j
->key
->ptr
);
7599 listDelNode(server
.io_processing
,ln
);
7600 listAddNodeTail(server
.io_processed
,j
);
7603 /* Signal the main thread there is new stuff to process */
7604 assert(write(server
.io_ready_pipe_write
,"x",1) == 1);
7606 return NULL
; /* never reached */
7609 static void spawnIOThread(void) {
7612 pthread_create(&thread
,NULL
,IOThreadEntryPoint
,NULL
);
7613 server
.io_active_threads
++;
7616 /* This function must be called while with threaded IO locked */
7617 static void queueIOJob(iojob
*j
) {
7618 redisLog(REDIS_DEBUG
,"Queued IO Job %p type %d about key '%s'\n",
7619 (void*)j
, j
->type
, (char*)j
->key
->ptr
);
7620 listAddNodeTail(server
.io_newjobs
,j
);
7621 if (server
.io_active_threads
< server
.vm_max_threads
)
7625 static int vmSwapObjectThreaded(robj
*key
, robj
*val
, redisDb
*db
) {
7628 assert(key
->storage
== REDIS_VM_MEMORY
);
7629 assert(key
->refcount
== 1);
7631 j
= zmalloc(sizeof(*j
));
7632 j
->type
= REDIS_IOJOB_PREPARE_SWAP
;
7634 j
->key
= dupStringObject(key
);
7638 j
->thread
= (pthread_t
) -1;
7639 key
->storage
= REDIS_VM_SWAPPING
;
7647 /* ================================= Debugging ============================== */
7649 static void debugCommand(redisClient
*c
) {
7650 if (!strcasecmp(c
->argv
[1]->ptr
,"segfault")) {
7652 } else if (!strcasecmp(c
->argv
[1]->ptr
,"reload")) {
7653 if (rdbSave(server
.dbfilename
) != REDIS_OK
) {
7654 addReply(c
,shared
.err
);
7658 if (rdbLoad(server
.dbfilename
) != REDIS_OK
) {
7659 addReply(c
,shared
.err
);
7662 redisLog(REDIS_WARNING
,"DB reloaded by DEBUG RELOAD");
7663 addReply(c
,shared
.ok
);
7664 } else if (!strcasecmp(c
->argv
[1]->ptr
,"loadaof")) {
7666 if (loadAppendOnlyFile(server
.appendfilename
) != REDIS_OK
) {
7667 addReply(c
,shared
.err
);
7670 redisLog(REDIS_WARNING
,"Append Only File loaded by DEBUG LOADAOF");
7671 addReply(c
,shared
.ok
);
7672 } else if (!strcasecmp(c
->argv
[1]->ptr
,"object") && c
->argc
== 3) {
7673 dictEntry
*de
= dictFind(c
->db
->dict
,c
->argv
[2]);
7677 addReply(c
,shared
.nokeyerr
);
7680 key
= dictGetEntryKey(de
);
7681 val
= dictGetEntryVal(de
);
7682 if (server
.vm_enabled
&& (key
->storage
== REDIS_VM_MEMORY
||
7683 key
->storage
== REDIS_VM_SWAPPING
)) {
7684 addReplySds(c
,sdscatprintf(sdsempty(),
7685 "+Key at:%p refcount:%d, value at:%p refcount:%d "
7686 "encoding:%d serializedlength:%lld\r\n",
7687 (void*)key
, key
->refcount
, (void*)val
, val
->refcount
,
7688 val
->encoding
, rdbSavedObjectLen(val
,NULL
)));
7690 addReplySds(c
,sdscatprintf(sdsempty(),
7691 "+Key at:%p refcount:%d, value swapped at: page %llu "
7692 "using %llu pages\r\n",
7693 (void*)key
, key
->refcount
, (unsigned long long) key
->vm
.page
,
7694 (unsigned long long) key
->vm
.usedpages
));
7696 } else if (!strcasecmp(c
->argv
[1]->ptr
,"swapout") && c
->argc
== 3) {
7697 dictEntry
*de
= dictFind(c
->db
->dict
,c
->argv
[2]);
7700 if (!server
.vm_enabled
) {
7701 addReplySds(c
,sdsnew("-ERR Virtual Memory is disabled\r\n"));
7705 addReply(c
,shared
.nokeyerr
);
7708 key
= dictGetEntryKey(de
);
7709 val
= dictGetEntryVal(de
);
7710 /* If the key is shared we want to create a copy */
7711 if (key
->refcount
> 1) {
7712 robj
*newkey
= dupStringObject(key
);
7714 key
= dictGetEntryKey(de
) = newkey
;
7717 if (key
->storage
!= REDIS_VM_MEMORY
) {
7718 addReplySds(c
,sdsnew("-ERR This key is not in memory\r\n"));
7719 } else if (vmSwapObjectBlocking(key
,val
) == REDIS_OK
) {
7720 dictGetEntryVal(de
) = NULL
;
7721 addReply(c
,shared
.ok
);
7723 addReply(c
,shared
.err
);
7726 addReplySds(c
,sdsnew(
7727 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>|SWAPOUT <key>|RELOAD]\r\n"));
7731 static void _redisAssert(char *estr
, char *file
, int line
) {
7732 redisLog(REDIS_WARNING
,"=== ASSERTION FAILED ===");
7733 redisLog(REDIS_WARNING
,"==> %s:%d '%s' is not true\n",file
,line
,estr
);
7734 #ifdef HAVE_BACKTRACE
7735 redisLog(REDIS_WARNING
,"(forcing SIGSEGV in order to print the stack trace)");
7740 /* =================================== Main! ================================ */
7743 int linuxOvercommitMemoryValue(void) {
7744 FILE *fp
= fopen("/proc/sys/vm/overcommit_memory","r");
7748 if (fgets(buf
,64,fp
) == NULL
) {
7757 void linuxOvercommitMemoryWarning(void) {
7758 if (linuxOvercommitMemoryValue() == 0) {
7759 redisLog(REDIS_WARNING
,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
7762 #endif /* __linux__ */
7764 static void daemonize(void) {
7768 if (fork() != 0) exit(0); /* parent exits */
7769 setsid(); /* create a new session */
7771 /* Every output goes to /dev/null. If Redis is daemonized but
7772 * the 'logfile' is set to 'stdout' in the configuration file
7773 * it will not log at all. */
7774 if ((fd
= open("/dev/null", O_RDWR
, 0)) != -1) {
7775 dup2(fd
, STDIN_FILENO
);
7776 dup2(fd
, STDOUT_FILENO
);
7777 dup2(fd
, STDERR_FILENO
);
7778 if (fd
> STDERR_FILENO
) close(fd
);
7780 /* Try to write the pid file */
7781 fp
= fopen(server
.pidfile
,"w");
7783 fprintf(fp
,"%d\n",getpid());
7788 int main(int argc
, char **argv
) {
7791 resetServerSaveParams();
7792 loadServerConfig(argv
[1]);
7793 } else if (argc
> 2) {
7794 fprintf(stderr
,"Usage: ./redis-server [/path/to/redis.conf]\n");
7797 redisLog(REDIS_WARNING
,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
7799 if (server
.daemonize
) daemonize();
7801 redisLog(REDIS_NOTICE
,"Server started, Redis version " REDIS_VERSION
);
7803 linuxOvercommitMemoryWarning();
7805 if (server
.appendonly
) {
7806 if (loadAppendOnlyFile(server
.appendfilename
) == REDIS_OK
)
7807 redisLog(REDIS_NOTICE
,"DB loaded from append only file");
7809 if (rdbLoad(server
.dbfilename
) == REDIS_OK
)
7810 redisLog(REDIS_NOTICE
,"DB loaded from disk");
7812 redisLog(REDIS_NOTICE
,"The server is now ready to accept connections on port %d", server
.port
);
7814 aeDeleteEventLoop(server
.el
);
7818 /* ============================= Backtrace support ========================= */
7820 #ifdef HAVE_BACKTRACE
7821 static char *findFuncName(void *pointer
, unsigned long *offset
);
7823 static void *getMcontextEip(ucontext_t
*uc
) {
7824 #if defined(__FreeBSD__)
7825 return (void*) uc
->uc_mcontext
.mc_eip
;
7826 #elif defined(__dietlibc__)
7827 return (void*) uc
->uc_mcontext
.eip
;
7828 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
7830 return (void*) uc
->uc_mcontext
->__ss
.__rip
;
7832 return (void*) uc
->uc_mcontext
->__ss
.__eip
;
7834 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
7835 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
7836 return (void*) uc
->uc_mcontext
->__ss
.__rip
;
7838 return (void*) uc
->uc_mcontext
->__ss
.__eip
;
7840 #elif defined(__i386__) || defined(__X86_64__) || defined(__x86_64__)
7841 return (void*) uc
->uc_mcontext
.gregs
[REG_EIP
]; /* Linux 32/64 bit */
7842 #elif defined(__ia64__) /* Linux IA64 */
7843 return (void*) uc
->uc_mcontext
.sc_ip
;
7849 static void segvHandler(int sig
, siginfo_t
*info
, void *secret
) {
7851 char **messages
= NULL
;
7852 int i
, trace_size
= 0;
7853 unsigned long offset
=0;
7854 ucontext_t
*uc
= (ucontext_t
*) secret
;
7856 REDIS_NOTUSED(info
);
7858 redisLog(REDIS_WARNING
,
7859 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION
, sig
);
7860 infostring
= genRedisInfoString();
7861 redisLog(REDIS_WARNING
, "%s",infostring
);
7862 /* It's not safe to sdsfree() the returned string under memory
7863 * corruption conditions. Let it leak as we are going to abort */
7865 trace_size
= backtrace(trace
, 100);
7866 /* overwrite sigaction with caller's address */
7867 if (getMcontextEip(uc
) != NULL
) {
7868 trace
[1] = getMcontextEip(uc
);
7870 messages
= backtrace_symbols(trace
, trace_size
);
7872 for (i
=1; i
<trace_size
; ++i
) {
7873 char *fn
= findFuncName(trace
[i
], &offset
), *p
;
7875 p
= strchr(messages
[i
],'+');
7876 if (!fn
|| (p
&& ((unsigned long)strtol(p
+1,NULL
,10)) < offset
)) {
7877 redisLog(REDIS_WARNING
,"%s", messages
[i
]);
7879 redisLog(REDIS_WARNING
,"%d redis-server %p %s + %d", i
, trace
[i
], fn
, (unsigned int)offset
);
7882 /* free(messages); Don't call free() with possibly corrupted memory. */
7886 static void setupSigSegvAction(void) {
7887 struct sigaction act
;
7889 sigemptyset (&act
.sa_mask
);
7890 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
7891 * is used. Otherwise, sa_handler is used */
7892 act
.sa_flags
= SA_NODEFER
| SA_ONSTACK
| SA_RESETHAND
| SA_SIGINFO
;
7893 act
.sa_sigaction
= segvHandler
;
7894 sigaction (SIGSEGV
, &act
, NULL
);
7895 sigaction (SIGBUS
, &act
, NULL
);
7896 sigaction (SIGFPE
, &act
, NULL
);
7897 sigaction (SIGILL
, &act
, NULL
);
7898 sigaction (SIGBUS
, &act
, NULL
);
7902 #include "staticsymbols.h"
7903 /* This function try to convert a pointer into a function name. It's used in
7904 * oreder to provide a backtrace under segmentation fault that's able to
7905 * display functions declared as static (otherwise the backtrace is useless). */
7906 static char *findFuncName(void *pointer
, unsigned long *offset
){
7908 unsigned long off
, minoff
= 0;
7910 /* Try to match against the Symbol with the smallest offset */
7911 for (i
=0; symsTable
[i
].pointer
; i
++) {
7912 unsigned long lp
= (unsigned long) pointer
;
7914 if (lp
!= (unsigned long)-1 && lp
>= symsTable
[i
].pointer
) {
7915 off
=lp
-symsTable
[i
].pointer
;
7916 if (ret
< 0 || off
< minoff
) {
7922 if (ret
== -1) return NULL
;
7924 return symsTable
[ret
].name
;
7926 #else /* HAVE_BACKTRACE */
7927 static void setupSigSegvAction(void) {
7929 #endif /* HAVE_BACKTRACE */