2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
30 #define REDIS_VERSION "1.3.3"
40 #define __USE_POSIX199309
46 #endif /* HAVE_BACKTRACE */
54 #include <arpa/inet.h>
58 #include <sys/resource.h>
65 #include "solarisfixes.h"
69 #include "ae.h" /* Event driven programming library */
70 #include "sds.h" /* Dynamic safe strings */
71 #include "anet.h" /* Networking the easy way */
72 #include "dict.h" /* Hash tables */
73 #include "adlist.h" /* Linked lists */
74 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
75 #include "lzf.h" /* LZF compression library */
76 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
82 /* Static server configuration */
83 #define REDIS_SERVERPORT 6379 /* TCP port */
84 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
85 #define REDIS_IOBUF_LEN 1024
86 #define REDIS_LOADBUF_LEN 1024
87 #define REDIS_STATIC_ARGS 4
88 #define REDIS_DEFAULT_DBNUM 16
89 #define REDIS_CONFIGLINE_MAX 1024
90 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
91 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
92 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
93 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
94 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
96 /* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */
97 #define REDIS_WRITEV_THRESHOLD 3
98 /* Max number of iovecs used for each writev call */
99 #define REDIS_WRITEV_IOVEC_COUNT 256
101 /* Hash table parameters */
102 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
105 #define REDIS_CMD_BULK 1 /* Bulk write command */
106 #define REDIS_CMD_INLINE 2 /* Inline command */
107 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
108 this flags will return an error when the 'maxmemory' option is set in the
109 config file and the server is using more than maxmemory bytes of memory.
110 In short this commands are denied on low memory conditions. */
111 #define REDIS_CMD_DENYOOM 4
114 #define REDIS_STRING 0
120 /* Objects encoding */
121 #define REDIS_ENCODING_RAW 0 /* Raw representation */
122 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
124 /* Object types only used for dumping to disk */
125 #define REDIS_EXPIRETIME 253
126 #define REDIS_SELECTDB 254
127 #define REDIS_EOF 255
129 /* Defines related to the dump file format. To store 32 bits lengths for short
130 * keys requires a lot of space, so we check the most significant 2 bits of
131 * the first byte to interpreter the length:
133 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
134 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
135 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
136 * 11|000000 this means: specially encoded object will follow. The six bits
137 * number specify the kind of object that follows.
138 * See the REDIS_RDB_ENC_* defines.
140 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
141 * values, will fit inside. */
142 #define REDIS_RDB_6BITLEN 0
143 #define REDIS_RDB_14BITLEN 1
144 #define REDIS_RDB_32BITLEN 2
145 #define REDIS_RDB_ENCVAL 3
146 #define REDIS_RDB_LENERR UINT_MAX
148 /* When a length of a string object stored on disk has the first two bits
149 * set, the remaining two bits specify a special encoding for the object
150 * accordingly to the following defines: */
151 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
152 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
153 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
154 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
156 /* Virtual memory object->where field. */
157 #define REDIS_VM_MEMORY 0 /* The object is on memory */
158 #define REDIS_VM_SWAPPED 1 /* The object is on disk */
159 #define REDIS_VM_SWAPPING 2 /* Redis is swapping this object on disk */
160 #define REDIS_VM_LOADING 3 /* Redis is loading this object from disk */
162 /* Virtual memory static configuration stuff.
163 * Check vmFindContiguousPages() to know more about this magic numbers. */
164 #define REDIS_VM_MAX_NEAR_PAGES 65536
165 #define REDIS_VM_MAX_RANDOM_JUMP 4096
166 #define REDIS_VM_MAX_THREADS 32
167 #define REDIS_THREAD_STACK_SIZE (1024*1024*4)
168 /* The following is the *percentage* of completed I/O jobs to process when the
169 * handelr is called. While Virtual Memory I/O operations are performed by
170 * threads, this operations must be processed by the main thread when completed
171 * in order to take effect. */
172 #define REDIS_MAX_COMPLETED_JOBS_PROCESSED 1
175 #define REDIS_SLAVE 1 /* This client is a slave server */
176 #define REDIS_MASTER 2 /* This client is a master server */
177 #define REDIS_MONITOR 4 /* This client is a slave monitor, see MONITOR */
178 #define REDIS_MULTI 8 /* This client is in a MULTI context */
179 #define REDIS_BLOCKED 16 /* The client is waiting in a blocking operation */
180 #define REDIS_IO_WAIT 32 /* The client is waiting for Virtual Memory I/O */
182 /* Slave replication state - slave side */
183 #define REDIS_REPL_NONE 0 /* No active replication */
184 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
185 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
187 /* Slave replication state - from the point of view of master
188 * Note that in SEND_BULK and ONLINE state the slave receives new updates
189 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
190 * to start the next background saving in order to send updates to it. */
191 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
192 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
193 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
194 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
196 /* List related stuff */
200 /* Sort operations */
201 #define REDIS_SORT_GET 0
202 #define REDIS_SORT_ASC 1
203 #define REDIS_SORT_DESC 2
204 #define REDIS_SORTKEY_MAX 1024
207 #define REDIS_DEBUG 0
208 #define REDIS_VERBOSE 1
209 #define REDIS_NOTICE 2
210 #define REDIS_WARNING 3
212 /* Anti-warning macro... */
213 #define REDIS_NOTUSED(V) ((void) V)
215 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
216 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
218 /* Append only defines */
219 #define APPENDFSYNC_NO 0
220 #define APPENDFSYNC_ALWAYS 1
221 #define APPENDFSYNC_EVERYSEC 2
223 /* We can print the stacktrace, so our assert is defined this way: */
224 #define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e,__FILE__,__LINE__),_exit(1)))
225 static void _redisAssert(char *estr
, char *file
, int line
);
227 /*================================= Data types ============================== */
229 /* A redis object, that is a type able to hold a string / list / set */
231 /* The VM object structure */
232 struct redisObjectVM
{
233 off_t page
; /* the page at witch the object is stored on disk */
234 off_t usedpages
; /* number of pages used on disk */
235 time_t atime
; /* Last access time */
238 /* The actual Redis Object */
239 typedef struct redisObject
{
242 unsigned char encoding
;
243 unsigned char storage
; /* If this object is a key, where is the value?
244 * REDIS_VM_MEMORY, REDIS_VM_SWAPPED, ... */
245 unsigned char vtype
; /* If this object is a key, and value is swapped out,
246 * this is the type of the swapped out object. */
248 /* VM fields, this are only allocated if VM is active, otherwise the
249 * object allocation function will just allocate
250 * sizeof(redisObjct) minus sizeof(redisObjectVM), so using
251 * Redis without VM active will not have any overhead. */
252 struct redisObjectVM vm
;
255 /* Macro used to initalize a Redis object allocated on the stack.
256 * Note that this macro is taken near the structure definition to make sure
257 * we'll update it when the structure is changed, to avoid bugs like
258 * bug #85 introduced exactly in this way. */
259 #define initStaticStringObject(_var,_ptr) do { \
261 _var.type = REDIS_STRING; \
262 _var.encoding = REDIS_ENCODING_RAW; \
264 if (server.vm_enabled) _var.storage = REDIS_VM_MEMORY; \
267 typedef struct redisDb
{
268 dict
*dict
; /* The keyspace for this DB */
269 dict
*expires
; /* Timeout of keys with a timeout set */
270 dict
*blockingkeys
; /* Keys with clients waiting for data (BLPOP) */
271 dict
*io_keys
; /* Keys with clients waiting for VM I/O */
275 /* Client MULTI/EXEC state */
276 typedef struct multiCmd
{
279 struct redisCommand
*cmd
;
282 typedef struct multiState
{
283 multiCmd
*commands
; /* Array of MULTI commands */
284 int count
; /* Total number of MULTI commands */
287 /* With multiplexing we need to take per-clinet state.
288 * Clients are taken in a liked list. */
289 typedef struct redisClient
{
294 robj
**argv
, **mbargv
;
296 int bulklen
; /* bulk read len. -1 if not in bulk read mode */
297 int multibulk
; /* multi bulk command format active */
300 time_t lastinteraction
; /* time of the last interaction, used for timeout */
301 int flags
; /* REDIS_SLAVE | REDIS_MONITOR | REDIS_MULTI ... */
302 int slaveseldb
; /* slave selected db, if this client is a slave */
303 int authenticated
; /* when requirepass is non-NULL */
304 int replstate
; /* replication state if this is a slave */
305 int repldbfd
; /* replication DB file descriptor */
306 long repldboff
; /* replication DB file offset */
307 off_t repldbsize
; /* replication DB file size */
308 multiState mstate
; /* MULTI/EXEC state */
309 robj
**blockingkeys
; /* The key we are waiting to terminate a blocking
310 * operation such as BLPOP. Otherwise NULL. */
311 int blockingkeysnum
; /* Number of blocking keys */
312 time_t blockingto
; /* Blocking operation timeout. If UNIX current time
313 * is >= blockingto then the operation timed out. */
314 list
*io_keys
; /* Keys this client is waiting to be loaded from the
315 * swap file in order to continue. */
323 /* Global server state structure */
328 dict
*sharingpool
; /* Poll used for object sharing */
329 unsigned int sharingpoolsize
;
330 long long dirty
; /* changes to DB from the last save */
332 list
*slaves
, *monitors
;
333 char neterr
[ANET_ERR_LEN
];
335 int cronloops
; /* number of times the cron function run */
336 list
*objfreelist
; /* A list of freed objects to avoid malloc() */
337 time_t lastsave
; /* Unix time of last save succeeede */
338 /* Fields used only for stats */
339 time_t stat_starttime
; /* server start time */
340 long long stat_numcommands
; /* number of processed commands */
341 long long stat_numconnections
; /* number of connections received */
354 pid_t bgsavechildpid
;
355 pid_t bgrewritechildpid
;
356 sds bgrewritebuf
; /* buffer taken by parent during oppend only rewrite */
357 struct saveparam
*saveparams
;
362 char *appendfilename
;
366 /* Replication related */
371 redisClient
*master
; /* client that is master for this slave */
373 unsigned int maxclients
;
374 unsigned long long maxmemory
;
375 unsigned int blpop_blocked_clients
;
376 unsigned int vm_blocked_clients
;
377 /* Sort parameters - qsort_r() is only available under BSD so we
378 * have to take this state global, in order to pass it to sortCompare() */
382 /* Virtual memory configuration */
387 unsigned long long vm_max_memory
;
388 /* Virtual memory state */
391 off_t vm_next_page
; /* Next probably empty page */
392 off_t vm_near_pages
; /* Number of pages allocated sequentially */
393 unsigned char *vm_bitmap
; /* Bitmap of free/used pages */
394 time_t unixtime
; /* Unix time sampled every second. */
395 /* Virtual memory I/O threads stuff */
396 /* An I/O thread process an element taken from the io_jobs queue and
397 * put the result of the operation in the io_done list. While the
398 * job is being processed, it's put on io_processing queue. */
399 list
*io_newjobs
; /* List of VM I/O jobs yet to be processed */
400 list
*io_processing
; /* List of VM I/O jobs being processed */
401 list
*io_processed
; /* List of VM I/O jobs already processed */
402 list
*io_ready_clients
; /* Clients ready to be unblocked. All keys loaded */
403 pthread_mutex_t io_mutex
; /* lock to access io_jobs/io_done/io_thread_job */
404 pthread_mutex_t obj_freelist_mutex
; /* safe redis objects creation/free */
405 pthread_mutex_t io_swapfile_mutex
; /* So we can lseek + write */
406 pthread_attr_t io_threads_attr
; /* attributes for threads creation */
407 int io_active_threads
; /* Number of running I/O threads */
408 int vm_max_threads
; /* Max number of I/O threads running at the same time */
409 /* Our main thread is blocked on the event loop, locking for sockets ready
410 * to be read or written, so when a threaded I/O operation is ready to be
411 * processed by the main thread, the I/O thread will use a unix pipe to
412 * awake the main thread. The followings are the two pipe FDs. */
413 int io_ready_pipe_read
;
414 int io_ready_pipe_write
;
415 /* Virtual memory stats */
416 unsigned long long vm_stats_used_pages
;
417 unsigned long long vm_stats_swapped_objects
;
418 unsigned long long vm_stats_swapouts
;
419 unsigned long long vm_stats_swapins
;
423 typedef void redisCommandProc(redisClient
*c
);
424 struct redisCommand
{
426 redisCommandProc
*proc
;
431 struct redisFunctionSym
{
433 unsigned long pointer
;
436 typedef struct _redisSortObject
{
444 typedef struct _redisSortOperation
{
447 } redisSortOperation
;
449 /* ZSETs use a specialized version of Skiplists */
451 typedef struct zskiplistNode
{
452 struct zskiplistNode
**forward
;
453 struct zskiplistNode
*backward
;
458 typedef struct zskiplist
{
459 struct zskiplistNode
*header
, *tail
;
460 unsigned long length
;
464 typedef struct zset
{
469 /* Our shared "common" objects */
471 struct sharedObjectsStruct
{
472 robj
*crlf
, *ok
, *err
, *emptybulk
, *czero
, *cone
, *pong
, *space
,
473 *colon
, *nullbulk
, *nullmultibulk
, *queued
,
474 *emptymultibulk
, *wrongtypeerr
, *nokeyerr
, *syntaxerr
, *sameobjecterr
,
475 *outofrangeerr
, *plus
,
476 *select0
, *select1
, *select2
, *select3
, *select4
,
477 *select5
, *select6
, *select7
, *select8
, *select9
;
480 /* Global vars that are actally used as constants. The following double
481 * values are used for double on-disk serialization, and are initialized
482 * at runtime to avoid strange compiler optimizations. */
484 static double R_Zero
, R_PosInf
, R_NegInf
, R_Nan
;
486 /* VM threaded I/O request message */
487 #define REDIS_IOJOB_LOAD 0 /* Load from disk to memory */
488 #define REDIS_IOJOB_PREPARE_SWAP 1 /* Compute needed pages */
489 #define REDIS_IOJOB_DO_SWAP 2 /* Swap from memory to disk */
490 typedef struct iojob
{
491 int type
; /* Request type, REDIS_IOJOB_* */
492 redisDb
*db
;/* Redis database */
493 robj
*key
; /* This I/O request is about swapping this key */
494 robj
*val
; /* the value to swap for REDIS_IOREQ_*_SWAP, otherwise this
495 * field is populated by the I/O thread for REDIS_IOREQ_LOAD. */
496 off_t page
; /* Swap page where to read/write the object */
497 off_t pages
; /* Swap pages needed to safe object. PREPARE_SWAP return val */
498 int canceled
; /* True if this command was canceled by blocking side of VM */
499 pthread_t thread
; /* ID of the thread processing this entry */
502 /*================================ Prototypes =============================== */
504 static void freeStringObject(robj
*o
);
505 static void freeListObject(robj
*o
);
506 static void freeSetObject(robj
*o
);
507 static void decrRefCount(void *o
);
508 static robj
*createObject(int type
, void *ptr
);
509 static void freeClient(redisClient
*c
);
510 static int rdbLoad(char *filename
);
511 static void addReply(redisClient
*c
, robj
*obj
);
512 static void addReplySds(redisClient
*c
, sds s
);
513 static void incrRefCount(robj
*o
);
514 static int rdbSaveBackground(char *filename
);
515 static robj
*createStringObject(char *ptr
, size_t len
);
516 static robj
*dupStringObject(robj
*o
);
517 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
);
518 static void feedAppendOnlyFile(struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
);
519 static int syncWithMaster(void);
520 static robj
*tryObjectSharing(robj
*o
);
521 static int tryObjectEncoding(robj
*o
);
522 static robj
*getDecodedObject(robj
*o
);
523 static int removeExpire(redisDb
*db
, robj
*key
);
524 static int expireIfNeeded(redisDb
*db
, robj
*key
);
525 static int deleteIfVolatile(redisDb
*db
, robj
*key
);
526 static int deleteIfSwapped(redisDb
*db
, robj
*key
);
527 static int deleteKey(redisDb
*db
, robj
*key
);
528 static time_t getExpire(redisDb
*db
, robj
*key
);
529 static int setExpire(redisDb
*db
, robj
*key
, time_t when
);
530 static void updateSlavesWaitingBgsave(int bgsaveerr
);
531 static void freeMemoryIfNeeded(void);
532 static int processCommand(redisClient
*c
);
533 static void setupSigSegvAction(void);
534 static void rdbRemoveTempFile(pid_t childpid
);
535 static void aofRemoveTempFile(pid_t childpid
);
536 static size_t stringObjectLen(robj
*o
);
537 static void processInputBuffer(redisClient
*c
);
538 static zskiplist
*zslCreate(void);
539 static void zslFree(zskiplist
*zsl
);
540 static void zslInsert(zskiplist
*zsl
, double score
, robj
*obj
);
541 static void sendReplyToClientWritev(aeEventLoop
*el
, int fd
, void *privdata
, int mask
);
542 static void initClientMultiState(redisClient
*c
);
543 static void freeClientMultiState(redisClient
*c
);
544 static void queueMultiCommand(redisClient
*c
, struct redisCommand
*cmd
);
545 static void unblockClientWaitingData(redisClient
*c
);
546 static int handleClientsWaitingListPush(redisClient
*c
, robj
*key
, robj
*ele
);
547 static void vmInit(void);
548 static void vmMarkPagesFree(off_t page
, off_t count
);
549 static robj
*vmLoadObject(robj
*key
);
550 static robj
*vmPreviewObject(robj
*key
);
551 static int vmSwapOneObjectBlocking(void);
552 static int vmSwapOneObjectThreaded(void);
553 static int vmCanSwapOut(void);
554 static int tryFreeOneObjectFromFreelist(void);
555 static void acceptHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
);
556 static void vmThreadedIOCompletedJob(aeEventLoop
*el
, int fd
, void *privdata
, int mask
);
557 static void vmCancelThreadedIOJob(robj
*o
);
558 static void lockThreadedIO(void);
559 static void unlockThreadedIO(void);
560 static int vmSwapObjectThreaded(robj
*key
, robj
*val
, redisDb
*db
);
561 static void freeIOJob(iojob
*j
);
562 static void queueIOJob(iojob
*j
);
563 static int vmWriteObjectOnSwap(robj
*o
, off_t page
);
564 static robj
*vmReadObjectFromSwap(off_t page
, int type
);
565 static void waitEmptyIOJobsQueue(void);
566 static void vmReopenSwapFile(void);
567 static int vmFreePage(off_t page
);
568 static int blockClientOnSwappedKeys(struct redisCommand
*cmd
, redisClient
*c
);
569 static int dontWaitForSwappedKey(redisClient
*c
, robj
*key
);
570 static void handleClientsBlockedOnSwappedKey(redisDb
*db
, robj
*key
);
571 static void readQueryFromClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
);
572 static struct redisCommand
*lookupCommand(char *name
);
573 static void call(redisClient
*c
, struct redisCommand
*cmd
);
574 static void resetClient(redisClient
*c
);
576 static void authCommand(redisClient
*c
);
577 static void pingCommand(redisClient
*c
);
578 static void echoCommand(redisClient
*c
);
579 static void setCommand(redisClient
*c
);
580 static void setnxCommand(redisClient
*c
);
581 static void getCommand(redisClient
*c
);
582 static void delCommand(redisClient
*c
);
583 static void existsCommand(redisClient
*c
);
584 static void incrCommand(redisClient
*c
);
585 static void decrCommand(redisClient
*c
);
586 static void incrbyCommand(redisClient
*c
);
587 static void decrbyCommand(redisClient
*c
);
588 static void selectCommand(redisClient
*c
);
589 static void randomkeyCommand(redisClient
*c
);
590 static void keysCommand(redisClient
*c
);
591 static void dbsizeCommand(redisClient
*c
);
592 static void lastsaveCommand(redisClient
*c
);
593 static void saveCommand(redisClient
*c
);
594 static void bgsaveCommand(redisClient
*c
);
595 static void bgrewriteaofCommand(redisClient
*c
);
596 static void shutdownCommand(redisClient
*c
);
597 static void moveCommand(redisClient
*c
);
598 static void renameCommand(redisClient
*c
);
599 static void renamenxCommand(redisClient
*c
);
600 static void lpushCommand(redisClient
*c
);
601 static void rpushCommand(redisClient
*c
);
602 static void lpopCommand(redisClient
*c
);
603 static void rpopCommand(redisClient
*c
);
604 static void llenCommand(redisClient
*c
);
605 static void lindexCommand(redisClient
*c
);
606 static void lrangeCommand(redisClient
*c
);
607 static void ltrimCommand(redisClient
*c
);
608 static void typeCommand(redisClient
*c
);
609 static void lsetCommand(redisClient
*c
);
610 static void saddCommand(redisClient
*c
);
611 static void sremCommand(redisClient
*c
);
612 static void smoveCommand(redisClient
*c
);
613 static void sismemberCommand(redisClient
*c
);
614 static void scardCommand(redisClient
*c
);
615 static void spopCommand(redisClient
*c
);
616 static void srandmemberCommand(redisClient
*c
);
617 static void sinterCommand(redisClient
*c
);
618 static void sinterstoreCommand(redisClient
*c
);
619 static void sunionCommand(redisClient
*c
);
620 static void sunionstoreCommand(redisClient
*c
);
621 static void sdiffCommand(redisClient
*c
);
622 static void sdiffstoreCommand(redisClient
*c
);
623 static void syncCommand(redisClient
*c
);
624 static void flushdbCommand(redisClient
*c
);
625 static void flushallCommand(redisClient
*c
);
626 static void sortCommand(redisClient
*c
);
627 static void lremCommand(redisClient
*c
);
628 static void rpoplpushcommand(redisClient
*c
);
629 static void infoCommand(redisClient
*c
);
630 static void mgetCommand(redisClient
*c
);
631 static void monitorCommand(redisClient
*c
);
632 static void expireCommand(redisClient
*c
);
633 static void expireatCommand(redisClient
*c
);
634 static void getsetCommand(redisClient
*c
);
635 static void ttlCommand(redisClient
*c
);
636 static void slaveofCommand(redisClient
*c
);
637 static void debugCommand(redisClient
*c
);
638 static void msetCommand(redisClient
*c
);
639 static void msetnxCommand(redisClient
*c
);
640 static void zaddCommand(redisClient
*c
);
641 static void zincrbyCommand(redisClient
*c
);
642 static void zrangeCommand(redisClient
*c
);
643 static void zrangebyscoreCommand(redisClient
*c
);
644 static void zrevrangeCommand(redisClient
*c
);
645 static void zcardCommand(redisClient
*c
);
646 static void zremCommand(redisClient
*c
);
647 static void zscoreCommand(redisClient
*c
);
648 static void zremrangebyscoreCommand(redisClient
*c
);
649 static void multiCommand(redisClient
*c
);
650 static void execCommand(redisClient
*c
);
651 static void blpopCommand(redisClient
*c
);
652 static void brpopCommand(redisClient
*c
);
654 /*================================= Globals ================================= */
657 static struct redisServer server
; /* server global state */
658 static struct redisCommand cmdTable
[] = {
659 {"get",getCommand
,2,REDIS_CMD_INLINE
},
660 {"set",setCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
661 {"setnx",setnxCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
662 {"del",delCommand
,-2,REDIS_CMD_INLINE
},
663 {"exists",existsCommand
,2,REDIS_CMD_INLINE
},
664 {"incr",incrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
665 {"decr",decrCommand
,2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
666 {"mget",mgetCommand
,-2,REDIS_CMD_INLINE
},
667 {"rpush",rpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
668 {"lpush",lpushCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
669 {"rpop",rpopCommand
,2,REDIS_CMD_INLINE
},
670 {"lpop",lpopCommand
,2,REDIS_CMD_INLINE
},
671 {"brpop",brpopCommand
,-3,REDIS_CMD_INLINE
},
672 {"blpop",blpopCommand
,-3,REDIS_CMD_INLINE
},
673 {"llen",llenCommand
,2,REDIS_CMD_INLINE
},
674 {"lindex",lindexCommand
,3,REDIS_CMD_INLINE
},
675 {"lset",lsetCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
676 {"lrange",lrangeCommand
,4,REDIS_CMD_INLINE
},
677 {"ltrim",ltrimCommand
,4,REDIS_CMD_INLINE
},
678 {"lrem",lremCommand
,4,REDIS_CMD_BULK
},
679 {"rpoplpush",rpoplpushcommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
680 {"sadd",saddCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
681 {"srem",sremCommand
,3,REDIS_CMD_BULK
},
682 {"smove",smoveCommand
,4,REDIS_CMD_BULK
},
683 {"sismember",sismemberCommand
,3,REDIS_CMD_BULK
},
684 {"scard",scardCommand
,2,REDIS_CMD_INLINE
},
685 {"spop",spopCommand
,2,REDIS_CMD_INLINE
},
686 {"srandmember",srandmemberCommand
,2,REDIS_CMD_INLINE
},
687 {"sinter",sinterCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
688 {"sinterstore",sinterstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
689 {"sunion",sunionCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
690 {"sunionstore",sunionstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
691 {"sdiff",sdiffCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
692 {"sdiffstore",sdiffstoreCommand
,-3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
693 {"smembers",sinterCommand
,2,REDIS_CMD_INLINE
},
694 {"zadd",zaddCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
695 {"zincrby",zincrbyCommand
,4,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
696 {"zrem",zremCommand
,3,REDIS_CMD_BULK
},
697 {"zremrangebyscore",zremrangebyscoreCommand
,4,REDIS_CMD_INLINE
},
698 {"zrange",zrangeCommand
,-4,REDIS_CMD_INLINE
},
699 {"zrangebyscore",zrangebyscoreCommand
,-4,REDIS_CMD_INLINE
},
700 {"zrevrange",zrevrangeCommand
,-4,REDIS_CMD_INLINE
},
701 {"zcard",zcardCommand
,2,REDIS_CMD_INLINE
},
702 {"zscore",zscoreCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
703 {"incrby",incrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
704 {"decrby",decrbyCommand
,3,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
705 {"getset",getsetCommand
,3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
706 {"mset",msetCommand
,-3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
707 {"msetnx",msetnxCommand
,-3,REDIS_CMD_BULK
|REDIS_CMD_DENYOOM
},
708 {"randomkey",randomkeyCommand
,1,REDIS_CMD_INLINE
},
709 {"select",selectCommand
,2,REDIS_CMD_INLINE
},
710 {"move",moveCommand
,3,REDIS_CMD_INLINE
},
711 {"rename",renameCommand
,3,REDIS_CMD_INLINE
},
712 {"renamenx",renamenxCommand
,3,REDIS_CMD_INLINE
},
713 {"expire",expireCommand
,3,REDIS_CMD_INLINE
},
714 {"expireat",expireatCommand
,3,REDIS_CMD_INLINE
},
715 {"keys",keysCommand
,2,REDIS_CMD_INLINE
},
716 {"dbsize",dbsizeCommand
,1,REDIS_CMD_INLINE
},
717 {"auth",authCommand
,2,REDIS_CMD_INLINE
},
718 {"ping",pingCommand
,1,REDIS_CMD_INLINE
},
719 {"echo",echoCommand
,2,REDIS_CMD_BULK
},
720 {"save",saveCommand
,1,REDIS_CMD_INLINE
},
721 {"bgsave",bgsaveCommand
,1,REDIS_CMD_INLINE
},
722 {"bgrewriteaof",bgrewriteaofCommand
,1,REDIS_CMD_INLINE
},
723 {"shutdown",shutdownCommand
,1,REDIS_CMD_INLINE
},
724 {"lastsave",lastsaveCommand
,1,REDIS_CMD_INLINE
},
725 {"type",typeCommand
,2,REDIS_CMD_INLINE
},
726 {"multi",multiCommand
,1,REDIS_CMD_INLINE
},
727 {"exec",execCommand
,1,REDIS_CMD_INLINE
},
728 {"sync",syncCommand
,1,REDIS_CMD_INLINE
},
729 {"flushdb",flushdbCommand
,1,REDIS_CMD_INLINE
},
730 {"flushall",flushallCommand
,1,REDIS_CMD_INLINE
},
731 {"sort",sortCommand
,-2,REDIS_CMD_INLINE
|REDIS_CMD_DENYOOM
},
732 {"info",infoCommand
,1,REDIS_CMD_INLINE
},
733 {"monitor",monitorCommand
,1,REDIS_CMD_INLINE
},
734 {"ttl",ttlCommand
,2,REDIS_CMD_INLINE
},
735 {"slaveof",slaveofCommand
,3,REDIS_CMD_INLINE
},
736 {"debug",debugCommand
,-2,REDIS_CMD_INLINE
},
740 /*============================ Utility functions ============================ */
742 /* Glob-style pattern matching. */
743 int stringmatchlen(const char *pattern
, int patternLen
,
744 const char *string
, int stringLen
, int nocase
)
749 while (pattern
[1] == '*') {
754 return 1; /* match */
756 if (stringmatchlen(pattern
+1, patternLen
-1,
757 string
, stringLen
, nocase
))
758 return 1; /* match */
762 return 0; /* no match */
766 return 0; /* no match */
776 not = pattern
[0] == '^';
783 if (pattern
[0] == '\\') {
786 if (pattern
[0] == string
[0])
788 } else if (pattern
[0] == ']') {
790 } else if (patternLen
== 0) {
794 } else if (pattern
[1] == '-' && patternLen
>= 3) {
795 int start
= pattern
[0];
796 int end
= pattern
[2];
804 start
= tolower(start
);
810 if (c
>= start
&& c
<= end
)
814 if (pattern
[0] == string
[0])
817 if (tolower((int)pattern
[0]) == tolower((int)string
[0]))
827 return 0; /* no match */
833 if (patternLen
>= 2) {
840 if (pattern
[0] != string
[0])
841 return 0; /* no match */
843 if (tolower((int)pattern
[0]) != tolower((int)string
[0]))
844 return 0; /* no match */
852 if (stringLen
== 0) {
853 while(*pattern
== '*') {
860 if (patternLen
== 0 && stringLen
== 0)
865 static void redisLog(int level
, const char *fmt
, ...) {
869 fp
= (server
.logfile
== NULL
) ? stdout
: fopen(server
.logfile
,"a");
873 if (level
>= server
.verbosity
) {
879 strftime(buf
,64,"%d %b %H:%M:%S",localtime(&now
));
880 fprintf(fp
,"[%d] %s %c ",(int)getpid(),buf
,c
[level
]);
881 vfprintf(fp
, fmt
, ap
);
887 if (server
.logfile
) fclose(fp
);
890 /*====================== Hash table type implementation ==================== */
892 /* This is an hash table type that uses the SDS dynamic strings libary as
893 * keys and radis objects as values (objects can hold SDS strings,
896 static void dictVanillaFree(void *privdata
, void *val
)
898 DICT_NOTUSED(privdata
);
902 static void dictListDestructor(void *privdata
, void *val
)
904 DICT_NOTUSED(privdata
);
905 listRelease((list
*)val
);
908 static int sdsDictKeyCompare(void *privdata
, const void *key1
,
912 DICT_NOTUSED(privdata
);
914 l1
= sdslen((sds
)key1
);
915 l2
= sdslen((sds
)key2
);
916 if (l1
!= l2
) return 0;
917 return memcmp(key1
, key2
, l1
) == 0;
920 static void dictRedisObjectDestructor(void *privdata
, void *val
)
922 DICT_NOTUSED(privdata
);
924 if (val
== NULL
) return; /* Values of swapped out keys as set to NULL */
928 static int dictObjKeyCompare(void *privdata
, const void *key1
,
931 const robj
*o1
= key1
, *o2
= key2
;
932 return sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
935 static unsigned int dictObjHash(const void *key
) {
937 return dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
940 static int dictEncObjKeyCompare(void *privdata
, const void *key1
,
943 robj
*o1
= (robj
*) key1
, *o2
= (robj
*) key2
;
946 o1
= getDecodedObject(o1
);
947 o2
= getDecodedObject(o2
);
948 cmp
= sdsDictKeyCompare(privdata
,o1
->ptr
,o2
->ptr
);
954 static unsigned int dictEncObjHash(const void *key
) {
955 robj
*o
= (robj
*) key
;
957 o
= getDecodedObject(o
);
958 unsigned int hash
= dictGenHashFunction(o
->ptr
, sdslen((sds
)o
->ptr
));
963 /* Sets type and expires */
964 static dictType setDictType
= {
965 dictEncObjHash
, /* hash function */
968 dictEncObjKeyCompare
, /* key compare */
969 dictRedisObjectDestructor
, /* key destructor */
970 NULL
/* val destructor */
973 /* Sorted sets hash (note: a skiplist is used in addition to the hash table) */
974 static dictType zsetDictType
= {
975 dictEncObjHash
, /* hash function */
978 dictEncObjKeyCompare
, /* key compare */
979 dictRedisObjectDestructor
, /* key destructor */
980 dictVanillaFree
/* val destructor of malloc(sizeof(double)) */
984 static dictType hashDictType
= {
985 dictObjHash
, /* hash function */
988 dictObjKeyCompare
, /* key compare */
989 dictRedisObjectDestructor
, /* key destructor */
990 dictRedisObjectDestructor
/* val destructor */
994 static dictType keyptrDictType
= {
995 dictObjHash
, /* hash function */
998 dictObjKeyCompare
, /* key compare */
999 dictRedisObjectDestructor
, /* key destructor */
1000 NULL
/* val destructor */
1003 /* Keylist hash table type has unencoded redis objects as keys and
1004 * lists as values. It's used for blocking operations (BLPOP) and to
1005 * map swapped keys to a list of clients waiting for this keys to be loaded. */
1006 static dictType keylistDictType
= {
1007 dictObjHash
, /* hash function */
1010 dictObjKeyCompare
, /* key compare */
1011 dictRedisObjectDestructor
, /* key destructor */
1012 dictListDestructor
/* val destructor */
1015 /* ========================= Random utility functions ======================= */
1017 /* Redis generally does not try to recover from out of memory conditions
1018 * when allocating objects or strings, it is not clear if it will be possible
1019 * to report this condition to the client since the networking layer itself
1020 * is based on heap allocation for send buffers, so we simply abort.
1021 * At least the code will be simpler to read... */
1022 static void oom(const char *msg
) {
1023 redisLog(REDIS_WARNING
, "%s: Out of memory\n",msg
);
1028 /* ====================== Redis server networking stuff ===================== */
1029 static void closeTimedoutClients(void) {
1032 time_t now
= time(NULL
);
1035 listRewind(server
.clients
,&li
);
1036 while ((ln
= listNext(&li
)) != NULL
) {
1037 c
= listNodeValue(ln
);
1038 if (server
.maxidletime
&&
1039 !(c
->flags
& REDIS_SLAVE
) && /* no timeout for slaves */
1040 !(c
->flags
& REDIS_MASTER
) && /* no timeout for masters */
1041 (now
- c
->lastinteraction
> server
.maxidletime
))
1043 redisLog(REDIS_VERBOSE
,"Closing idle client");
1045 } else if (c
->flags
& REDIS_BLOCKED
) {
1046 if (c
->blockingto
!= 0 && c
->blockingto
< now
) {
1047 addReply(c
,shared
.nullmultibulk
);
1048 unblockClientWaitingData(c
);
1054 static int htNeedsResize(dict
*dict
) {
1055 long long size
, used
;
1057 size
= dictSlots(dict
);
1058 used
= dictSize(dict
);
1059 return (size
&& used
&& size
> DICT_HT_INITIAL_SIZE
&&
1060 (used
*100/size
< REDIS_HT_MINFILL
));
1063 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
1064 * we resize the hash table to save memory */
1065 static void tryResizeHashTables(void) {
1068 for (j
= 0; j
< server
.dbnum
; j
++) {
1069 if (htNeedsResize(server
.db
[j
].dict
)) {
1070 redisLog(REDIS_VERBOSE
,"The hash table %d is too sparse, resize it...",j
);
1071 dictResize(server
.db
[j
].dict
);
1072 redisLog(REDIS_VERBOSE
,"Hash table %d resized.",j
);
1074 if (htNeedsResize(server
.db
[j
].expires
))
1075 dictResize(server
.db
[j
].expires
);
1079 /* A background saving child (BGSAVE) terminated its work. Handle this. */
1080 void backgroundSaveDoneHandler(int statloc
) {
1081 int exitcode
= WEXITSTATUS(statloc
);
1082 int bysignal
= WIFSIGNALED(statloc
);
1084 if (!bysignal
&& exitcode
== 0) {
1085 redisLog(REDIS_NOTICE
,
1086 "Background saving terminated with success");
1088 server
.lastsave
= time(NULL
);
1089 } else if (!bysignal
&& exitcode
!= 0) {
1090 redisLog(REDIS_WARNING
, "Background saving error");
1092 redisLog(REDIS_WARNING
,
1093 "Background saving terminated by signal");
1094 rdbRemoveTempFile(server
.bgsavechildpid
);
1096 server
.bgsavechildpid
= -1;
1097 /* Possibly there are slaves waiting for a BGSAVE in order to be served
1098 * (the first stage of SYNC is a bulk transfer of dump.rdb) */
1099 updateSlavesWaitingBgsave(exitcode
== 0 ? REDIS_OK
: REDIS_ERR
);
1102 /* A background append only file rewriting (BGREWRITEAOF) terminated its work.
1104 void backgroundRewriteDoneHandler(int statloc
) {
1105 int exitcode
= WEXITSTATUS(statloc
);
1106 int bysignal
= WIFSIGNALED(statloc
);
1108 if (!bysignal
&& exitcode
== 0) {
1112 redisLog(REDIS_NOTICE
,
1113 "Background append only file rewriting terminated with success");
1114 /* Now it's time to flush the differences accumulated by the parent */
1115 snprintf(tmpfile
,256,"temp-rewriteaof-bg-%d.aof", (int) server
.bgrewritechildpid
);
1116 fd
= open(tmpfile
,O_WRONLY
|O_APPEND
);
1118 redisLog(REDIS_WARNING
, "Not able to open the temp append only file produced by the child: %s", strerror(errno
));
1121 /* Flush our data... */
1122 if (write(fd
,server
.bgrewritebuf
,sdslen(server
.bgrewritebuf
)) !=
1123 (signed) sdslen(server
.bgrewritebuf
)) {
1124 redisLog(REDIS_WARNING
, "Error or short write trying to flush the parent diff of the append log file in the child temp file: %s", strerror(errno
));
1128 redisLog(REDIS_NOTICE
,"Parent diff flushed into the new append log file with success (%lu bytes)",sdslen(server
.bgrewritebuf
));
1129 /* Now our work is to rename the temp file into the stable file. And
1130 * switch the file descriptor used by the server for append only. */
1131 if (rename(tmpfile
,server
.appendfilename
) == -1) {
1132 redisLog(REDIS_WARNING
,"Can't rename the temp append only file into the stable one: %s", strerror(errno
));
1136 /* Mission completed... almost */
1137 redisLog(REDIS_NOTICE
,"Append only file successfully rewritten.");
1138 if (server
.appendfd
!= -1) {
1139 /* If append only is actually enabled... */
1140 close(server
.appendfd
);
1141 server
.appendfd
= fd
;
1143 server
.appendseldb
= -1; /* Make sure it will issue SELECT */
1144 redisLog(REDIS_NOTICE
,"The new append only file was selected for future appends.");
1146 /* If append only is disabled we just generate a dump in this
1147 * format. Why not? */
1150 } else if (!bysignal
&& exitcode
!= 0) {
1151 redisLog(REDIS_WARNING
, "Background append only file rewriting error");
1153 redisLog(REDIS_WARNING
,
1154 "Background append only file rewriting terminated by signal");
1157 sdsfree(server
.bgrewritebuf
);
1158 server
.bgrewritebuf
= sdsempty();
1159 aofRemoveTempFile(server
.bgrewritechildpid
);
1160 server
.bgrewritechildpid
= -1;
1163 static int serverCron(struct aeEventLoop
*eventLoop
, long long id
, void *clientData
) {
1164 int j
, loops
= server
.cronloops
++;
1165 REDIS_NOTUSED(eventLoop
);
1167 REDIS_NOTUSED(clientData
);
1169 /* We take a cached value of the unix time in the global state because
1170 * with virtual memory and aging there is to store the current time
1171 * in objects at every object access, and accuracy is not needed.
1172 * To access a global var is faster than calling time(NULL) */
1173 server
.unixtime
= time(NULL
);
1175 /* Show some info about non-empty databases */
1176 for (j
= 0; j
< server
.dbnum
; j
++) {
1177 long long size
, used
, vkeys
;
1179 size
= dictSlots(server
.db
[j
].dict
);
1180 used
= dictSize(server
.db
[j
].dict
);
1181 vkeys
= dictSize(server
.db
[j
].expires
);
1182 if (!(loops
% 5) && (used
|| vkeys
)) {
1183 redisLog(REDIS_VERBOSE
,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j
,used
,vkeys
,size
);
1184 /* dictPrintStats(server.dict); */
1188 /* We don't want to resize the hash tables while a bacground saving
1189 * is in progress: the saving child is created using fork() that is
1190 * implemented with a copy-on-write semantic in most modern systems, so
1191 * if we resize the HT while there is the saving child at work actually
1192 * a lot of memory movements in the parent will cause a lot of pages
1194 if (server
.bgsavechildpid
== -1) tryResizeHashTables();
1196 /* Show information about connected clients */
1198 redisLog(REDIS_VERBOSE
,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
1199 listLength(server
.clients
)-listLength(server
.slaves
),
1200 listLength(server
.slaves
),
1201 zmalloc_used_memory(),
1202 dictSize(server
.sharingpool
));
1205 /* Close connections of timedout clients */
1206 if ((server
.maxidletime
&& !(loops
% 10)) || server
.blpop_blocked_clients
)
1207 closeTimedoutClients();
1209 /* Check if a background saving or AOF rewrite in progress terminated */
1210 if (server
.bgsavechildpid
!= -1 || server
.bgrewritechildpid
!= -1) {
1214 if ((pid
= wait3(&statloc
,WNOHANG
,NULL
)) != 0) {
1215 if (pid
== server
.bgsavechildpid
) {
1216 backgroundSaveDoneHandler(statloc
);
1218 backgroundRewriteDoneHandler(statloc
);
1222 /* If there is not a background saving in progress check if
1223 * we have to save now */
1224 time_t now
= time(NULL
);
1225 for (j
= 0; j
< server
.saveparamslen
; j
++) {
1226 struct saveparam
*sp
= server
.saveparams
+j
;
1228 if (server
.dirty
>= sp
->changes
&&
1229 now
-server
.lastsave
> sp
->seconds
) {
1230 redisLog(REDIS_NOTICE
,"%d changes in %d seconds. Saving...",
1231 sp
->changes
, sp
->seconds
);
1232 rdbSaveBackground(server
.dbfilename
);
1238 /* Try to expire a few timed out keys. The algorithm used is adaptive and
1239 * will use few CPU cycles if there are few expiring keys, otherwise
1240 * it will get more aggressive to avoid that too much memory is used by
1241 * keys that can be removed from the keyspace. */
1242 for (j
= 0; j
< server
.dbnum
; j
++) {
1244 redisDb
*db
= server
.db
+j
;
1246 /* Continue to expire if at the end of the cycle more than 25%
1247 * of the keys were expired. */
1249 long num
= dictSize(db
->expires
);
1250 time_t now
= time(NULL
);
1253 if (num
> REDIS_EXPIRELOOKUPS_PER_CRON
)
1254 num
= REDIS_EXPIRELOOKUPS_PER_CRON
;
1259 if ((de
= dictGetRandomKey(db
->expires
)) == NULL
) break;
1260 t
= (time_t) dictGetEntryVal(de
);
1262 deleteKey(db
,dictGetEntryKey(de
));
1266 } while (expired
> REDIS_EXPIRELOOKUPS_PER_CRON
/4);
1269 /* Swap a few keys on disk if we are over the memory limit and VM
1270 * is enbled. Try to free objects from the free list first. */
1271 if (vmCanSwapOut()) {
1272 while (server
.vm_enabled
&& zmalloc_used_memory() >
1273 server
.vm_max_memory
)
1277 if (tryFreeOneObjectFromFreelist() == REDIS_OK
) continue;
1278 retval
= (server
.vm_max_threads
== 0) ?
1279 vmSwapOneObjectBlocking() :
1280 vmSwapOneObjectThreaded();
1281 if (retval
== REDIS_ERR
&& (loops
% 30) == 0 &&
1282 zmalloc_used_memory() >
1283 (server
.vm_max_memory
+server
.vm_max_memory
/10))
1285 redisLog(REDIS_WARNING
,"WARNING: vm-max-memory limit exceeded by more than 10%% but unable to swap more objects out!");
1287 /* Note that when using threade I/O we free just one object,
1288 * because anyway when the I/O thread in charge to swap this
1289 * object out will finish, the handler of completed jobs
1290 * will try to swap more objects if we are still out of memory. */
1291 if (retval
== REDIS_ERR
|| server
.vm_max_threads
> 0) break;
1295 /* Check if we should connect to a MASTER */
1296 if (server
.replstate
== REDIS_REPL_CONNECT
) {
1297 redisLog(REDIS_NOTICE
,"Connecting to MASTER...");
1298 if (syncWithMaster() == REDIS_OK
) {
1299 redisLog(REDIS_NOTICE
,"MASTER <-> SLAVE sync succeeded");
1305 /* This function gets called every time Redis is entering the
1306 * main loop of the event driven library, that is, before to sleep
1307 * for ready file descriptors. */
1308 static void beforeSleep(struct aeEventLoop
*eventLoop
) {
1309 REDIS_NOTUSED(eventLoop
);
1311 if (server
.vm_enabled
&& listLength(server
.io_ready_clients
)) {
1315 listRewind(server
.io_ready_clients
,&li
);
1316 while((ln
= listNext(&li
))) {
1317 redisClient
*c
= ln
->value
;
1318 struct redisCommand
*cmd
;
1320 /* Resume the client. */
1321 listDelNode(server
.io_ready_clients
,ln
);
1322 c
->flags
&= (~REDIS_IO_WAIT
);
1323 server
.vm_blocked_clients
--;
1324 aeCreateFileEvent(server
.el
, c
->fd
, AE_READABLE
,
1325 readQueryFromClient
, c
);
1326 cmd
= lookupCommand(c
->argv
[0]->ptr
);
1327 assert(cmd
!= NULL
);
1330 /* There may be more data to process in the input buffer. */
1331 if (c
->querybuf
&& sdslen(c
->querybuf
) > 0)
1332 processInputBuffer(c
);
1337 static void createSharedObjects(void) {
1338 shared
.crlf
= createObject(REDIS_STRING
,sdsnew("\r\n"));
1339 shared
.ok
= createObject(REDIS_STRING
,sdsnew("+OK\r\n"));
1340 shared
.err
= createObject(REDIS_STRING
,sdsnew("-ERR\r\n"));
1341 shared
.emptybulk
= createObject(REDIS_STRING
,sdsnew("$0\r\n\r\n"));
1342 shared
.czero
= createObject(REDIS_STRING
,sdsnew(":0\r\n"));
1343 shared
.cone
= createObject(REDIS_STRING
,sdsnew(":1\r\n"));
1344 shared
.nullbulk
= createObject(REDIS_STRING
,sdsnew("$-1\r\n"));
1345 shared
.nullmultibulk
= createObject(REDIS_STRING
,sdsnew("*-1\r\n"));
1346 shared
.emptymultibulk
= createObject(REDIS_STRING
,sdsnew("*0\r\n"));
1347 shared
.pong
= createObject(REDIS_STRING
,sdsnew("+PONG\r\n"));
1348 shared
.queued
= createObject(REDIS_STRING
,sdsnew("+QUEUED\r\n"));
1349 shared
.wrongtypeerr
= createObject(REDIS_STRING
,sdsnew(
1350 "-ERR Operation against a key holding the wrong kind of value\r\n"));
1351 shared
.nokeyerr
= createObject(REDIS_STRING
,sdsnew(
1352 "-ERR no such key\r\n"));
1353 shared
.syntaxerr
= createObject(REDIS_STRING
,sdsnew(
1354 "-ERR syntax error\r\n"));
1355 shared
.sameobjecterr
= createObject(REDIS_STRING
,sdsnew(
1356 "-ERR source and destination objects are the same\r\n"));
1357 shared
.outofrangeerr
= createObject(REDIS_STRING
,sdsnew(
1358 "-ERR index out of range\r\n"));
1359 shared
.space
= createObject(REDIS_STRING
,sdsnew(" "));
1360 shared
.colon
= createObject(REDIS_STRING
,sdsnew(":"));
1361 shared
.plus
= createObject(REDIS_STRING
,sdsnew("+"));
1362 shared
.select0
= createStringObject("select 0\r\n",10);
1363 shared
.select1
= createStringObject("select 1\r\n",10);
1364 shared
.select2
= createStringObject("select 2\r\n",10);
1365 shared
.select3
= createStringObject("select 3\r\n",10);
1366 shared
.select4
= createStringObject("select 4\r\n",10);
1367 shared
.select5
= createStringObject("select 5\r\n",10);
1368 shared
.select6
= createStringObject("select 6\r\n",10);
1369 shared
.select7
= createStringObject("select 7\r\n",10);
1370 shared
.select8
= createStringObject("select 8\r\n",10);
1371 shared
.select9
= createStringObject("select 9\r\n",10);
1374 static void appendServerSaveParams(time_t seconds
, int changes
) {
1375 server
.saveparams
= zrealloc(server
.saveparams
,sizeof(struct saveparam
)*(server
.saveparamslen
+1));
1376 server
.saveparams
[server
.saveparamslen
].seconds
= seconds
;
1377 server
.saveparams
[server
.saveparamslen
].changes
= changes
;
1378 server
.saveparamslen
++;
1381 static void resetServerSaveParams() {
1382 zfree(server
.saveparams
);
1383 server
.saveparams
= NULL
;
1384 server
.saveparamslen
= 0;
1387 static void initServerConfig() {
1388 server
.dbnum
= REDIS_DEFAULT_DBNUM
;
1389 server
.port
= REDIS_SERVERPORT
;
1390 server
.verbosity
= REDIS_VERBOSE
;
1391 server
.maxidletime
= REDIS_MAXIDLETIME
;
1392 server
.saveparams
= NULL
;
1393 server
.logfile
= NULL
; /* NULL = log on standard output */
1394 server
.bindaddr
= NULL
;
1395 server
.glueoutputbuf
= 1;
1396 server
.daemonize
= 0;
1397 server
.appendonly
= 0;
1398 server
.appendfsync
= APPENDFSYNC_ALWAYS
;
1399 server
.lastfsync
= time(NULL
);
1400 server
.appendfd
= -1;
1401 server
.appendseldb
= -1; /* Make sure the first time will not match */
1402 server
.pidfile
= "/var/run/redis.pid";
1403 server
.dbfilename
= "dump.rdb";
1404 server
.appendfilename
= "appendonly.aof";
1405 server
.requirepass
= NULL
;
1406 server
.shareobjects
= 0;
1407 server
.rdbcompression
= 1;
1408 server
.sharingpoolsize
= 1024;
1409 server
.maxclients
= 0;
1410 server
.blpop_blocked_clients
= 0;
1411 server
.maxmemory
= 0;
1412 server
.vm_enabled
= 0;
1413 server
.vm_swap_file
= zstrdup("/tmp/redis-%p.vm");
1414 server
.vm_page_size
= 256; /* 256 bytes per page */
1415 server
.vm_pages
= 1024*1024*100; /* 104 millions of pages */
1416 server
.vm_max_memory
= 1024LL*1024*1024*1; /* 1 GB of RAM */
1417 server
.vm_max_threads
= 4;
1418 server
.vm_blocked_clients
= 0;
1420 resetServerSaveParams();
1422 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1423 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1424 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1425 /* Replication related */
1427 server
.masterauth
= NULL
;
1428 server
.masterhost
= NULL
;
1429 server
.masterport
= 6379;
1430 server
.master
= NULL
;
1431 server
.replstate
= REDIS_REPL_NONE
;
1433 /* Double constants initialization */
1435 R_PosInf
= 1.0/R_Zero
;
1436 R_NegInf
= -1.0/R_Zero
;
1437 R_Nan
= R_Zero
/R_Zero
;
1440 static void initServer() {
1443 signal(SIGHUP
, SIG_IGN
);
1444 signal(SIGPIPE
, SIG_IGN
);
1445 setupSigSegvAction();
1447 server
.devnull
= fopen("/dev/null","w");
1448 if (server
.devnull
== NULL
) {
1449 redisLog(REDIS_WARNING
, "Can't open /dev/null: %s", server
.neterr
);
1452 server
.clients
= listCreate();
1453 server
.slaves
= listCreate();
1454 server
.monitors
= listCreate();
1455 server
.objfreelist
= listCreate();
1456 createSharedObjects();
1457 server
.el
= aeCreateEventLoop();
1458 server
.db
= zmalloc(sizeof(redisDb
)*server
.dbnum
);
1459 server
.sharingpool
= dictCreate(&setDictType
,NULL
);
1460 server
.fd
= anetTcpServer(server
.neterr
, server
.port
, server
.bindaddr
);
1461 if (server
.fd
== -1) {
1462 redisLog(REDIS_WARNING
, "Opening TCP port: %s", server
.neterr
);
1465 for (j
= 0; j
< server
.dbnum
; j
++) {
1466 server
.db
[j
].dict
= dictCreate(&hashDictType
,NULL
);
1467 server
.db
[j
].expires
= dictCreate(&keyptrDictType
,NULL
);
1468 server
.db
[j
].blockingkeys
= dictCreate(&keylistDictType
,NULL
);
1469 if (server
.vm_enabled
)
1470 server
.db
[j
].io_keys
= dictCreate(&keylistDictType
,NULL
);
1471 server
.db
[j
].id
= j
;
1473 server
.cronloops
= 0;
1474 server
.bgsavechildpid
= -1;
1475 server
.bgrewritechildpid
= -1;
1476 server
.bgrewritebuf
= sdsempty();
1477 server
.lastsave
= time(NULL
);
1479 server
.stat_numcommands
= 0;
1480 server
.stat_numconnections
= 0;
1481 server
.stat_starttime
= time(NULL
);
1482 server
.unixtime
= time(NULL
);
1483 aeCreateTimeEvent(server
.el
, 1, serverCron
, NULL
, NULL
);
1484 if (aeCreateFileEvent(server
.el
, server
.fd
, AE_READABLE
,
1485 acceptHandler
, NULL
) == AE_ERR
) oom("creating file event");
1487 if (server
.appendonly
) {
1488 server
.appendfd
= open(server
.appendfilename
,O_WRONLY
|O_APPEND
|O_CREAT
,0644);
1489 if (server
.appendfd
== -1) {
1490 redisLog(REDIS_WARNING
, "Can't open the append-only file: %s",
1496 if (server
.vm_enabled
) vmInit();
1499 /* Empty the whole database */
1500 static long long emptyDb() {
1502 long long removed
= 0;
1504 for (j
= 0; j
< server
.dbnum
; j
++) {
1505 removed
+= dictSize(server
.db
[j
].dict
);
1506 dictEmpty(server
.db
[j
].dict
);
1507 dictEmpty(server
.db
[j
].expires
);
1512 static int yesnotoi(char *s
) {
1513 if (!strcasecmp(s
,"yes")) return 1;
1514 else if (!strcasecmp(s
,"no")) return 0;
1518 /* I agree, this is a very rudimental way to load a configuration...
1519 will improve later if the config gets more complex */
1520 static void loadServerConfig(char *filename
) {
1522 char buf
[REDIS_CONFIGLINE_MAX
+1], *err
= NULL
;
1526 if (filename
[0] == '-' && filename
[1] == '\0')
1529 if ((fp
= fopen(filename
,"r")) == NULL
) {
1530 redisLog(REDIS_WARNING
,"Fatal error, can't open config file");
1535 while(fgets(buf
,REDIS_CONFIGLINE_MAX
+1,fp
) != NULL
) {
1541 line
= sdstrim(line
," \t\r\n");
1543 /* Skip comments and blank lines*/
1544 if (line
[0] == '#' || line
[0] == '\0') {
1549 /* Split into arguments */
1550 argv
= sdssplitlen(line
,sdslen(line
)," ",1,&argc
);
1551 sdstolower(argv
[0]);
1553 /* Execute config directives */
1554 if (!strcasecmp(argv
[0],"timeout") && argc
== 2) {
1555 server
.maxidletime
= atoi(argv
[1]);
1556 if (server
.maxidletime
< 0) {
1557 err
= "Invalid timeout value"; goto loaderr
;
1559 } else if (!strcasecmp(argv
[0],"port") && argc
== 2) {
1560 server
.port
= atoi(argv
[1]);
1561 if (server
.port
< 1 || server
.port
> 65535) {
1562 err
= "Invalid port"; goto loaderr
;
1564 } else if (!strcasecmp(argv
[0],"bind") && argc
== 2) {
1565 server
.bindaddr
= zstrdup(argv
[1]);
1566 } else if (!strcasecmp(argv
[0],"save") && argc
== 3) {
1567 int seconds
= atoi(argv
[1]);
1568 int changes
= atoi(argv
[2]);
1569 if (seconds
< 1 || changes
< 0) {
1570 err
= "Invalid save parameters"; goto loaderr
;
1572 appendServerSaveParams(seconds
,changes
);
1573 } else if (!strcasecmp(argv
[0],"dir") && argc
== 2) {
1574 if (chdir(argv
[1]) == -1) {
1575 redisLog(REDIS_WARNING
,"Can't chdir to '%s': %s",
1576 argv
[1], strerror(errno
));
1579 } else if (!strcasecmp(argv
[0],"loglevel") && argc
== 2) {
1580 if (!strcasecmp(argv
[1],"debug")) server
.verbosity
= REDIS_DEBUG
;
1581 else if (!strcasecmp(argv
[1],"verbose")) server
.verbosity
= REDIS_VERBOSE
;
1582 else if (!strcasecmp(argv
[1],"notice")) server
.verbosity
= REDIS_NOTICE
;
1583 else if (!strcasecmp(argv
[1],"warning")) server
.verbosity
= REDIS_WARNING
;
1585 err
= "Invalid log level. Must be one of debug, notice, warning";
1588 } else if (!strcasecmp(argv
[0],"logfile") && argc
== 2) {
1591 server
.logfile
= zstrdup(argv
[1]);
1592 if (!strcasecmp(server
.logfile
,"stdout")) {
1593 zfree(server
.logfile
);
1594 server
.logfile
= NULL
;
1596 if (server
.logfile
) {
1597 /* Test if we are able to open the file. The server will not
1598 * be able to abort just for this problem later... */
1599 logfp
= fopen(server
.logfile
,"a");
1600 if (logfp
== NULL
) {
1601 err
= sdscatprintf(sdsempty(),
1602 "Can't open the log file: %s", strerror(errno
));
1607 } else if (!strcasecmp(argv
[0],"databases") && argc
== 2) {
1608 server
.dbnum
= atoi(argv
[1]);
1609 if (server
.dbnum
< 1) {
1610 err
= "Invalid number of databases"; goto loaderr
;
1612 } else if (!strcasecmp(argv
[0],"maxclients") && argc
== 2) {
1613 server
.maxclients
= atoi(argv
[1]);
1614 } else if (!strcasecmp(argv
[0],"maxmemory") && argc
== 2) {
1615 server
.maxmemory
= strtoll(argv
[1], NULL
, 10);
1616 } else if (!strcasecmp(argv
[0],"slaveof") && argc
== 3) {
1617 server
.masterhost
= sdsnew(argv
[1]);
1618 server
.masterport
= atoi(argv
[2]);
1619 server
.replstate
= REDIS_REPL_CONNECT
;
1620 } else if (!strcasecmp(argv
[0],"masterauth") && argc
== 2) {
1621 server
.masterauth
= zstrdup(argv
[1]);
1622 } else if (!strcasecmp(argv
[0],"glueoutputbuf") && argc
== 2) {
1623 if ((server
.glueoutputbuf
= yesnotoi(argv
[1])) == -1) {
1624 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1626 } else if (!strcasecmp(argv
[0],"shareobjects") && argc
== 2) {
1627 if ((server
.shareobjects
= yesnotoi(argv
[1])) == -1) {
1628 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1630 } else if (!strcasecmp(argv
[0],"rdbcompression") && argc
== 2) {
1631 if ((server
.rdbcompression
= yesnotoi(argv
[1])) == -1) {
1632 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1634 } else if (!strcasecmp(argv
[0],"shareobjectspoolsize") && argc
== 2) {
1635 server
.sharingpoolsize
= atoi(argv
[1]);
1636 if (server
.sharingpoolsize
< 1) {
1637 err
= "invalid object sharing pool size"; goto loaderr
;
1639 } else if (!strcasecmp(argv
[0],"daemonize") && argc
== 2) {
1640 if ((server
.daemonize
= yesnotoi(argv
[1])) == -1) {
1641 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1643 } else if (!strcasecmp(argv
[0],"appendonly") && argc
== 2) {
1644 if ((server
.appendonly
= yesnotoi(argv
[1])) == -1) {
1645 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1647 } else if (!strcasecmp(argv
[0],"appendfsync") && argc
== 2) {
1648 if (!strcasecmp(argv
[1],"no")) {
1649 server
.appendfsync
= APPENDFSYNC_NO
;
1650 } else if (!strcasecmp(argv
[1],"always")) {
1651 server
.appendfsync
= APPENDFSYNC_ALWAYS
;
1652 } else if (!strcasecmp(argv
[1],"everysec")) {
1653 server
.appendfsync
= APPENDFSYNC_EVERYSEC
;
1655 err
= "argument must be 'no', 'always' or 'everysec'";
1658 } else if (!strcasecmp(argv
[0],"requirepass") && argc
== 2) {
1659 server
.requirepass
= zstrdup(argv
[1]);
1660 } else if (!strcasecmp(argv
[0],"pidfile") && argc
== 2) {
1661 server
.pidfile
= zstrdup(argv
[1]);
1662 } else if (!strcasecmp(argv
[0],"dbfilename") && argc
== 2) {
1663 server
.dbfilename
= zstrdup(argv
[1]);
1664 } else if (!strcasecmp(argv
[0],"vm-enabled") && argc
== 2) {
1665 if ((server
.vm_enabled
= yesnotoi(argv
[1])) == -1) {
1666 err
= "argument must be 'yes' or 'no'"; goto loaderr
;
1668 } else if (!strcasecmp(argv
[0],"vm-swap-file") && argc
== 2) {
1669 zfree(server
.vm_swap_file
);
1670 server
.vm_swap_file
= zstrdup(argv
[1]);
1671 } else if (!strcasecmp(argv
[0],"vm-max-memory") && argc
== 2) {
1672 server
.vm_max_memory
= strtoll(argv
[1], NULL
, 10);
1673 } else if (!strcasecmp(argv
[0],"vm-page-size") && argc
== 2) {
1674 server
.vm_page_size
= strtoll(argv
[1], NULL
, 10);
1675 } else if (!strcasecmp(argv
[0],"vm-pages") && argc
== 2) {
1676 server
.vm_pages
= strtoll(argv
[1], NULL
, 10);
1677 } else if (!strcasecmp(argv
[0],"vm-max-threads") && argc
== 2) {
1678 server
.vm_max_threads
= strtoll(argv
[1], NULL
, 10);
1680 err
= "Bad directive or wrong number of arguments"; goto loaderr
;
1682 for (j
= 0; j
< argc
; j
++)
1687 if (fp
!= stdin
) fclose(fp
);
1691 fprintf(stderr
, "\n*** FATAL CONFIG FILE ERROR ***\n");
1692 fprintf(stderr
, "Reading the configuration file, at line %d\n", linenum
);
1693 fprintf(stderr
, ">>> '%s'\n", line
);
1694 fprintf(stderr
, "%s\n", err
);
1698 static void freeClientArgv(redisClient
*c
) {
1701 for (j
= 0; j
< c
->argc
; j
++)
1702 decrRefCount(c
->argv
[j
]);
1703 for (j
= 0; j
< c
->mbargc
; j
++)
1704 decrRefCount(c
->mbargv
[j
]);
1709 static void freeClient(redisClient
*c
) {
1712 /* Note that if the client we are freeing is blocked into a blocking
1713 * call, we have to set querybuf to NULL *before* to call
1714 * unblockClientWaitingData() to avoid processInputBuffer() will get
1715 * called. Also it is important to remove the file events after
1716 * this, because this call adds the READABLE event. */
1717 sdsfree(c
->querybuf
);
1719 if (c
->flags
& REDIS_BLOCKED
)
1720 unblockClientWaitingData(c
);
1722 aeDeleteFileEvent(server
.el
,c
->fd
,AE_READABLE
);
1723 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1724 listRelease(c
->reply
);
1727 /* Remove from the list of clients */
1728 ln
= listSearchKey(server
.clients
,c
);
1729 redisAssert(ln
!= NULL
);
1730 listDelNode(server
.clients
,ln
);
1731 /* Remove from the list of clients waiting for swapped keys */
1732 if (c
->flags
& REDIS_IO_WAIT
&& listLength(c
->io_keys
) == 0) {
1733 ln
= listSearchKey(server
.io_ready_clients
,c
);
1735 listDelNode(server
.io_ready_clients
,ln
);
1736 server
.vm_blocked_clients
--;
1739 while (server
.vm_enabled
&& listLength(c
->io_keys
)) {
1740 ln
= listFirst(c
->io_keys
);
1741 dontWaitForSwappedKey(c
,ln
->value
);
1743 listRelease(c
->io_keys
);
1745 if (c
->flags
& REDIS_SLAVE
) {
1746 if (c
->replstate
== REDIS_REPL_SEND_BULK
&& c
->repldbfd
!= -1)
1748 list
*l
= (c
->flags
& REDIS_MONITOR
) ? server
.monitors
: server
.slaves
;
1749 ln
= listSearchKey(l
,c
);
1750 redisAssert(ln
!= NULL
);
1753 if (c
->flags
& REDIS_MASTER
) {
1754 server
.master
= NULL
;
1755 server
.replstate
= REDIS_REPL_CONNECT
;
1759 freeClientMultiState(c
);
1763 #define GLUEREPLY_UP_TO (1024)
1764 static void glueReplyBuffersIfNeeded(redisClient
*c
) {
1766 char buf
[GLUEREPLY_UP_TO
];
1771 listRewind(c
->reply
,&li
);
1772 while((ln
= listNext(&li
))) {
1776 objlen
= sdslen(o
->ptr
);
1777 if (copylen
+ objlen
<= GLUEREPLY_UP_TO
) {
1778 memcpy(buf
+copylen
,o
->ptr
,objlen
);
1780 listDelNode(c
->reply
,ln
);
1782 if (copylen
== 0) return;
1786 /* Now the output buffer is empty, add the new single element */
1787 o
= createObject(REDIS_STRING
,sdsnewlen(buf
,copylen
));
1788 listAddNodeHead(c
->reply
,o
);
1791 static void sendReplyToClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
1792 redisClient
*c
= privdata
;
1793 int nwritten
= 0, totwritten
= 0, objlen
;
1796 REDIS_NOTUSED(mask
);
1798 /* Use writev() if we have enough buffers to send */
1799 if (!server
.glueoutputbuf
&&
1800 listLength(c
->reply
) > REDIS_WRITEV_THRESHOLD
&&
1801 !(c
->flags
& REDIS_MASTER
))
1803 sendReplyToClientWritev(el
, fd
, privdata
, mask
);
1807 while(listLength(c
->reply
)) {
1808 if (server
.glueoutputbuf
&& listLength(c
->reply
) > 1)
1809 glueReplyBuffersIfNeeded(c
);
1811 o
= listNodeValue(listFirst(c
->reply
));
1812 objlen
= sdslen(o
->ptr
);
1815 listDelNode(c
->reply
,listFirst(c
->reply
));
1819 if (c
->flags
& REDIS_MASTER
) {
1820 /* Don't reply to a master */
1821 nwritten
= objlen
- c
->sentlen
;
1823 nwritten
= write(fd
, ((char*)o
->ptr
)+c
->sentlen
, objlen
- c
->sentlen
);
1824 if (nwritten
<= 0) break;
1826 c
->sentlen
+= nwritten
;
1827 totwritten
+= nwritten
;
1828 /* If we fully sent the object on head go to the next one */
1829 if (c
->sentlen
== objlen
) {
1830 listDelNode(c
->reply
,listFirst(c
->reply
));
1833 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1834 * bytes, in a single threaded server it's a good idea to serve
1835 * other clients as well, even if a very large request comes from
1836 * super fast link that is always able to accept data (in real world
1837 * scenario think about 'KEYS *' against the loopback interfae) */
1838 if (totwritten
> REDIS_MAX_WRITE_PER_EVENT
) break;
1840 if (nwritten
== -1) {
1841 if (errno
== EAGAIN
) {
1844 redisLog(REDIS_VERBOSE
,
1845 "Error writing to client: %s", strerror(errno
));
1850 if (totwritten
> 0) c
->lastinteraction
= time(NULL
);
1851 if (listLength(c
->reply
) == 0) {
1853 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1857 static void sendReplyToClientWritev(aeEventLoop
*el
, int fd
, void *privdata
, int mask
)
1859 redisClient
*c
= privdata
;
1860 int nwritten
= 0, totwritten
= 0, objlen
, willwrite
;
1862 struct iovec iov
[REDIS_WRITEV_IOVEC_COUNT
];
1863 int offset
, ion
= 0;
1865 REDIS_NOTUSED(mask
);
1868 while (listLength(c
->reply
)) {
1869 offset
= c
->sentlen
;
1873 /* fill-in the iov[] array */
1874 for(node
= listFirst(c
->reply
); node
; node
= listNextNode(node
)) {
1875 o
= listNodeValue(node
);
1876 objlen
= sdslen(o
->ptr
);
1878 if (totwritten
+ objlen
- offset
> REDIS_MAX_WRITE_PER_EVENT
)
1881 if(ion
== REDIS_WRITEV_IOVEC_COUNT
)
1882 break; /* no more iovecs */
1884 iov
[ion
].iov_base
= ((char*)o
->ptr
) + offset
;
1885 iov
[ion
].iov_len
= objlen
- offset
;
1886 willwrite
+= objlen
- offset
;
1887 offset
= 0; /* just for the first item */
1894 /* write all collected blocks at once */
1895 if((nwritten
= writev(fd
, iov
, ion
)) < 0) {
1896 if (errno
!= EAGAIN
) {
1897 redisLog(REDIS_VERBOSE
,
1898 "Error writing to client: %s", strerror(errno
));
1905 totwritten
+= nwritten
;
1906 offset
= c
->sentlen
;
1908 /* remove written robjs from c->reply */
1909 while (nwritten
&& listLength(c
->reply
)) {
1910 o
= listNodeValue(listFirst(c
->reply
));
1911 objlen
= sdslen(o
->ptr
);
1913 if(nwritten
>= objlen
- offset
) {
1914 listDelNode(c
->reply
, listFirst(c
->reply
));
1915 nwritten
-= objlen
- offset
;
1919 c
->sentlen
+= nwritten
;
1927 c
->lastinteraction
= time(NULL
);
1929 if (listLength(c
->reply
) == 0) {
1931 aeDeleteFileEvent(server
.el
,c
->fd
,AE_WRITABLE
);
1935 static struct redisCommand
*lookupCommand(char *name
) {
1937 while(cmdTable
[j
].name
!= NULL
) {
1938 if (!strcasecmp(name
,cmdTable
[j
].name
)) return &cmdTable
[j
];
1944 /* resetClient prepare the client to process the next command */
1945 static void resetClient(redisClient
*c
) {
1951 /* Call() is the core of Redis execution of a command */
1952 static void call(redisClient
*c
, struct redisCommand
*cmd
) {
1955 dirty
= server
.dirty
;
1957 if (server
.appendonly
&& server
.dirty
-dirty
)
1958 feedAppendOnlyFile(cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1959 if (server
.dirty
-dirty
&& listLength(server
.slaves
))
1960 replicationFeedSlaves(server
.slaves
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1961 if (listLength(server
.monitors
))
1962 replicationFeedSlaves(server
.monitors
,cmd
,c
->db
->id
,c
->argv
,c
->argc
);
1963 server
.stat_numcommands
++;
1966 /* If this function gets called we already read a whole
1967 * command, argments are in the client argv/argc fields.
1968 * processCommand() execute the command or prepare the
1969 * server for a bulk read from the client.
1971 * If 1 is returned the client is still alive and valid and
1972 * and other operations can be performed by the caller. Otherwise
1973 * if 0 is returned the client was destroied (i.e. after QUIT). */
1974 static int processCommand(redisClient
*c
) {
1975 struct redisCommand
*cmd
;
1977 /* Free some memory if needed (maxmemory setting) */
1978 if (server
.maxmemory
) freeMemoryIfNeeded();
1980 /* Handle the multi bulk command type. This is an alternative protocol
1981 * supported by Redis in order to receive commands that are composed of
1982 * multiple binary-safe "bulk" arguments. The latency of processing is
1983 * a bit higher but this allows things like multi-sets, so if this
1984 * protocol is used only for MSET and similar commands this is a big win. */
1985 if (c
->multibulk
== 0 && c
->argc
== 1 && ((char*)(c
->argv
[0]->ptr
))[0] == '*') {
1986 c
->multibulk
= atoi(((char*)c
->argv
[0]->ptr
)+1);
1987 if (c
->multibulk
<= 0) {
1991 decrRefCount(c
->argv
[c
->argc
-1]);
1995 } else if (c
->multibulk
) {
1996 if (c
->bulklen
== -1) {
1997 if (((char*)c
->argv
[0]->ptr
)[0] != '$') {
1998 addReplySds(c
,sdsnew("-ERR multi bulk protocol error\r\n"));
2002 int bulklen
= atoi(((char*)c
->argv
[0]->ptr
)+1);
2003 decrRefCount(c
->argv
[0]);
2004 if (bulklen
< 0 || bulklen
> 1024*1024*1024) {
2006 addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n"));
2011 c
->bulklen
= bulklen
+2; /* add two bytes for CR+LF */
2015 c
->mbargv
= zrealloc(c
->mbargv
,(sizeof(robj
*))*(c
->mbargc
+1));
2016 c
->mbargv
[c
->mbargc
] = c
->argv
[0];
2020 if (c
->multibulk
== 0) {
2024 /* Here we need to swap the multi-bulk argc/argv with the
2025 * normal argc/argv of the client structure. */
2027 c
->argv
= c
->mbargv
;
2028 c
->mbargv
= auxargv
;
2031 c
->argc
= c
->mbargc
;
2032 c
->mbargc
= auxargc
;
2034 /* We need to set bulklen to something different than -1
2035 * in order for the code below to process the command without
2036 * to try to read the last argument of a bulk command as
2037 * a special argument. */
2039 /* continue below and process the command */
2046 /* -- end of multi bulk commands processing -- */
2048 /* The QUIT command is handled as a special case. Normal command
2049 * procs are unable to close the client connection safely */
2050 if (!strcasecmp(c
->argv
[0]->ptr
,"quit")) {
2055 /* Now lookup the command and check ASAP about trivial error conditions
2056 * such wrong arity, bad command name and so forth. */
2057 cmd
= lookupCommand(c
->argv
[0]->ptr
);
2060 sdscatprintf(sdsempty(), "-ERR unknown command '%s'\r\n",
2061 (char*)c
->argv
[0]->ptr
));
2064 } else if ((cmd
->arity
> 0 && cmd
->arity
!= c
->argc
) ||
2065 (c
->argc
< -cmd
->arity
)) {
2067 sdscatprintf(sdsempty(),
2068 "-ERR wrong number of arguments for '%s' command\r\n",
2072 } else if (server
.maxmemory
&& cmd
->flags
& REDIS_CMD_DENYOOM
&& zmalloc_used_memory() > server
.maxmemory
) {
2073 addReplySds(c
,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
2076 } else if (cmd
->flags
& REDIS_CMD_BULK
&& c
->bulklen
== -1) {
2077 /* This is a bulk command, we have to read the last argument yet. */
2078 int bulklen
= atoi(c
->argv
[c
->argc
-1]->ptr
);
2080 decrRefCount(c
->argv
[c
->argc
-1]);
2081 if (bulklen
< 0 || bulklen
> 1024*1024*1024) {
2083 addReplySds(c
,sdsnew("-ERR invalid bulk write count\r\n"));
2088 c
->bulklen
= bulklen
+2; /* add two bytes for CR+LF */
2089 /* It is possible that the bulk read is already in the
2090 * buffer. Check this condition and handle it accordingly.
2091 * This is just a fast path, alternative to call processInputBuffer().
2092 * It's a good idea since the code is small and this condition
2093 * happens most of the times. */
2094 if ((signed)sdslen(c
->querybuf
) >= c
->bulklen
) {
2095 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
2097 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
2099 /* Otherwise return... there is to read the last argument
2100 * from the socket. */
2104 /* Let's try to share objects on the command arguments vector */
2105 if (server
.shareobjects
) {
2107 for(j
= 1; j
< c
->argc
; j
++)
2108 c
->argv
[j
] = tryObjectSharing(c
->argv
[j
]);
2110 /* Let's try to encode the bulk object to save space. */
2111 if (cmd
->flags
& REDIS_CMD_BULK
)
2112 tryObjectEncoding(c
->argv
[c
->argc
-1]);
2114 /* Check if the user is authenticated */
2115 if (server
.requirepass
&& !c
->authenticated
&& cmd
->proc
!= authCommand
) {
2116 addReplySds(c
,sdsnew("-ERR operation not permitted\r\n"));
2121 /* Exec the command */
2122 if (c
->flags
& REDIS_MULTI
&& cmd
->proc
!= execCommand
) {
2123 queueMultiCommand(c
,cmd
);
2124 addReply(c
,shared
.queued
);
2126 if (server
.vm_enabled
&& server
.vm_max_threads
> 0 &&
2127 blockClientOnSwappedKeys(cmd
,c
)) return 1;
2131 /* Prepare the client for the next command */
2136 static void replicationFeedSlaves(list
*slaves
, struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
) {
2141 /* (args*2)+1 is enough room for args, spaces, newlines */
2142 robj
*static_outv
[REDIS_STATIC_ARGS
*2+1];
2144 if (argc
<= REDIS_STATIC_ARGS
) {
2147 outv
= zmalloc(sizeof(robj
*)*(argc
*2+1));
2150 for (j
= 0; j
< argc
; j
++) {
2151 if (j
!= 0) outv
[outc
++] = shared
.space
;
2152 if ((cmd
->flags
& REDIS_CMD_BULK
) && j
== argc
-1) {
2155 lenobj
= createObject(REDIS_STRING
,
2156 sdscatprintf(sdsempty(),"%lu\r\n",
2157 (unsigned long) stringObjectLen(argv
[j
])));
2158 lenobj
->refcount
= 0;
2159 outv
[outc
++] = lenobj
;
2161 outv
[outc
++] = argv
[j
];
2163 outv
[outc
++] = shared
.crlf
;
2165 /* Increment all the refcounts at start and decrement at end in order to
2166 * be sure to free objects if there is no slave in a replication state
2167 * able to be feed with commands */
2168 for (j
= 0; j
< outc
; j
++) incrRefCount(outv
[j
]);
2169 listRewind(slaves
,&li
);
2170 while((ln
= listNext(&li
))) {
2171 redisClient
*slave
= ln
->value
;
2173 /* Don't feed slaves that are still waiting for BGSAVE to start */
2174 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) continue;
2176 /* Feed all the other slaves, MONITORs and so on */
2177 if (slave
->slaveseldb
!= dictid
) {
2181 case 0: selectcmd
= shared
.select0
; break;
2182 case 1: selectcmd
= shared
.select1
; break;
2183 case 2: selectcmd
= shared
.select2
; break;
2184 case 3: selectcmd
= shared
.select3
; break;
2185 case 4: selectcmd
= shared
.select4
; break;
2186 case 5: selectcmd
= shared
.select5
; break;
2187 case 6: selectcmd
= shared
.select6
; break;
2188 case 7: selectcmd
= shared
.select7
; break;
2189 case 8: selectcmd
= shared
.select8
; break;
2190 case 9: selectcmd
= shared
.select9
; break;
2192 selectcmd
= createObject(REDIS_STRING
,
2193 sdscatprintf(sdsempty(),"select %d\r\n",dictid
));
2194 selectcmd
->refcount
= 0;
2197 addReply(slave
,selectcmd
);
2198 slave
->slaveseldb
= dictid
;
2200 for (j
= 0; j
< outc
; j
++) addReply(slave
,outv
[j
]);
2202 for (j
= 0; j
< outc
; j
++) decrRefCount(outv
[j
]);
2203 if (outv
!= static_outv
) zfree(outv
);
2206 static void processInputBuffer(redisClient
*c
) {
2208 /* Before to process the input buffer, make sure the client is not
2209 * waitig for a blocking operation such as BLPOP. Note that the first
2210 * iteration the client is never blocked, otherwise the processInputBuffer
2211 * would not be called at all, but after the execution of the first commands
2212 * in the input buffer the client may be blocked, and the "goto again"
2213 * will try to reiterate. The following line will make it return asap. */
2214 if (c
->flags
& REDIS_BLOCKED
|| c
->flags
& REDIS_IO_WAIT
) return;
2215 if (c
->bulklen
== -1) {
2216 /* Read the first line of the query */
2217 char *p
= strchr(c
->querybuf
,'\n');
2224 query
= c
->querybuf
;
2225 c
->querybuf
= sdsempty();
2226 querylen
= 1+(p
-(query
));
2227 if (sdslen(query
) > querylen
) {
2228 /* leave data after the first line of the query in the buffer */
2229 c
->querybuf
= sdscatlen(c
->querybuf
,query
+querylen
,sdslen(query
)-querylen
);
2231 *p
= '\0'; /* remove "\n" */
2232 if (*(p
-1) == '\r') *(p
-1) = '\0'; /* and "\r" if any */
2233 sdsupdatelen(query
);
2235 /* Now we can split the query in arguments */
2236 argv
= sdssplitlen(query
,sdslen(query
)," ",1,&argc
);
2239 if (c
->argv
) zfree(c
->argv
);
2240 c
->argv
= zmalloc(sizeof(robj
*)*argc
);
2242 for (j
= 0; j
< argc
; j
++) {
2243 if (sdslen(argv
[j
])) {
2244 c
->argv
[c
->argc
] = createObject(REDIS_STRING
,argv
[j
]);
2252 /* Execute the command. If the client is still valid
2253 * after processCommand() return and there is something
2254 * on the query buffer try to process the next command. */
2255 if (processCommand(c
) && sdslen(c
->querybuf
)) goto again
;
2257 /* Nothing to process, argc == 0. Just process the query
2258 * buffer if it's not empty or return to the caller */
2259 if (sdslen(c
->querybuf
)) goto again
;
2262 } else if (sdslen(c
->querybuf
) >= REDIS_REQUEST_MAX_SIZE
) {
2263 redisLog(REDIS_VERBOSE
, "Client protocol error");
2268 /* Bulk read handling. Note that if we are at this point
2269 the client already sent a command terminated with a newline,
2270 we are reading the bulk data that is actually the last
2271 argument of the command. */
2272 int qbl
= sdslen(c
->querybuf
);
2274 if (c
->bulklen
<= qbl
) {
2275 /* Copy everything but the final CRLF as final argument */
2276 c
->argv
[c
->argc
] = createStringObject(c
->querybuf
,c
->bulklen
-2);
2278 c
->querybuf
= sdsrange(c
->querybuf
,c
->bulklen
,-1);
2279 /* Process the command. If the client is still valid after
2280 * the processing and there is more data in the buffer
2281 * try to parse it. */
2282 if (processCommand(c
) && sdslen(c
->querybuf
)) goto again
;
2288 static void readQueryFromClient(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
2289 redisClient
*c
= (redisClient
*) privdata
;
2290 char buf
[REDIS_IOBUF_LEN
];
2293 REDIS_NOTUSED(mask
);
2295 nread
= read(fd
, buf
, REDIS_IOBUF_LEN
);
2297 if (errno
== EAGAIN
) {
2300 redisLog(REDIS_VERBOSE
, "Reading from client: %s",strerror(errno
));
2304 } else if (nread
== 0) {
2305 redisLog(REDIS_VERBOSE
, "Client closed connection");
2310 c
->querybuf
= sdscatlen(c
->querybuf
, buf
, nread
);
2311 c
->lastinteraction
= time(NULL
);
2315 processInputBuffer(c
);
2318 static int selectDb(redisClient
*c
, int id
) {
2319 if (id
< 0 || id
>= server
.dbnum
)
2321 c
->db
= &server
.db
[id
];
2325 static void *dupClientReplyValue(void *o
) {
2326 incrRefCount((robj
*)o
);
2330 static redisClient
*createClient(int fd
) {
2331 redisClient
*c
= zmalloc(sizeof(*c
));
2333 anetNonBlock(NULL
,fd
);
2334 anetTcpNoDelay(NULL
,fd
);
2335 if (!c
) return NULL
;
2338 c
->querybuf
= sdsempty();
2347 c
->lastinteraction
= time(NULL
);
2348 c
->authenticated
= 0;
2349 c
->replstate
= REDIS_REPL_NONE
;
2350 c
->reply
= listCreate();
2351 listSetFreeMethod(c
->reply
,decrRefCount
);
2352 listSetDupMethod(c
->reply
,dupClientReplyValue
);
2353 c
->blockingkeys
= NULL
;
2354 c
->blockingkeysnum
= 0;
2355 c
->io_keys
= listCreate();
2356 listSetFreeMethod(c
->io_keys
,decrRefCount
);
2357 if (aeCreateFileEvent(server
.el
, c
->fd
, AE_READABLE
,
2358 readQueryFromClient
, c
) == AE_ERR
) {
2362 listAddNodeTail(server
.clients
,c
);
2363 initClientMultiState(c
);
2367 static void addReply(redisClient
*c
, robj
*obj
) {
2368 if (listLength(c
->reply
) == 0 &&
2369 (c
->replstate
== REDIS_REPL_NONE
||
2370 c
->replstate
== REDIS_REPL_ONLINE
) &&
2371 aeCreateFileEvent(server
.el
, c
->fd
, AE_WRITABLE
,
2372 sendReplyToClient
, c
) == AE_ERR
) return;
2374 if (server
.vm_enabled
&& obj
->storage
!= REDIS_VM_MEMORY
) {
2375 obj
= dupStringObject(obj
);
2376 obj
->refcount
= 0; /* getDecodedObject() will increment the refcount */
2378 listAddNodeTail(c
->reply
,getDecodedObject(obj
));
2381 static void addReplySds(redisClient
*c
, sds s
) {
2382 robj
*o
= createObject(REDIS_STRING
,s
);
2387 static void addReplyDouble(redisClient
*c
, double d
) {
2390 snprintf(buf
,sizeof(buf
),"%.17g",d
);
2391 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n%s\r\n",
2392 (unsigned long) strlen(buf
),buf
));
2395 static void addReplyBulkLen(redisClient
*c
, robj
*obj
) {
2398 if (obj
->encoding
== REDIS_ENCODING_RAW
) {
2399 len
= sdslen(obj
->ptr
);
2401 long n
= (long)obj
->ptr
;
2403 /* Compute how many bytes will take this integer as a radix 10 string */
2409 while((n
= n
/10) != 0) {
2413 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n",(unsigned long)len
));
2416 static void acceptHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
2421 REDIS_NOTUSED(mask
);
2422 REDIS_NOTUSED(privdata
);
2424 cfd
= anetAccept(server
.neterr
, fd
, cip
, &cport
);
2425 if (cfd
== AE_ERR
) {
2426 redisLog(REDIS_VERBOSE
,"Accepting client connection: %s", server
.neterr
);
2429 redisLog(REDIS_VERBOSE
,"Accepted %s:%d", cip
, cport
);
2430 if ((c
= createClient(cfd
)) == NULL
) {
2431 redisLog(REDIS_WARNING
,"Error allocating resoures for the client");
2432 close(cfd
); /* May be already closed, just ingore errors */
2435 /* If maxclient directive is set and this is one client more... close the
2436 * connection. Note that we create the client instead to check before
2437 * for this condition, since now the socket is already set in nonblocking
2438 * mode and we can send an error for free using the Kernel I/O */
2439 if (server
.maxclients
&& listLength(server
.clients
) > server
.maxclients
) {
2440 char *err
= "-ERR max number of clients reached\r\n";
2442 /* That's a best effort error message, don't check write errors */
2443 if (write(c
->fd
,err
,strlen(err
)) == -1) {
2444 /* Nothing to do, Just to avoid the warning... */
2449 server
.stat_numconnections
++;
2452 /* ======================= Redis objects implementation ===================== */
2454 static robj
*createObject(int type
, void *ptr
) {
2457 if (server
.vm_enabled
) pthread_mutex_lock(&server
.obj_freelist_mutex
);
2458 if (listLength(server
.objfreelist
)) {
2459 listNode
*head
= listFirst(server
.objfreelist
);
2460 o
= listNodeValue(head
);
2461 listDelNode(server
.objfreelist
,head
);
2462 if (server
.vm_enabled
) pthread_mutex_unlock(&server
.obj_freelist_mutex
);
2464 if (server
.vm_enabled
) {
2465 pthread_mutex_unlock(&server
.obj_freelist_mutex
);
2466 o
= zmalloc(sizeof(*o
));
2468 o
= zmalloc(sizeof(*o
)-sizeof(struct redisObjectVM
));
2472 o
->encoding
= REDIS_ENCODING_RAW
;
2475 if (server
.vm_enabled
) {
2476 /* Note that this code may run in the context of an I/O thread
2477 * and accessing to server.unixtime in theory is an error
2478 * (no locks). But in practice this is safe, and even if we read
2479 * garbage Redis will not fail, as it's just a statistical info */
2480 o
->vm
.atime
= server
.unixtime
;
2481 o
->storage
= REDIS_VM_MEMORY
;
2486 static robj
*createStringObject(char *ptr
, size_t len
) {
2487 return createObject(REDIS_STRING
,sdsnewlen(ptr
,len
));
2490 static robj
*dupStringObject(robj
*o
) {
2491 assert(o
->encoding
== REDIS_ENCODING_RAW
);
2492 return createStringObject(o
->ptr
,sdslen(o
->ptr
));
2495 static robj
*createListObject(void) {
2496 list
*l
= listCreate();
2498 listSetFreeMethod(l
,decrRefCount
);
2499 return createObject(REDIS_LIST
,l
);
2502 static robj
*createSetObject(void) {
2503 dict
*d
= dictCreate(&setDictType
,NULL
);
2504 return createObject(REDIS_SET
,d
);
2507 static robj
*createZsetObject(void) {
2508 zset
*zs
= zmalloc(sizeof(*zs
));
2510 zs
->dict
= dictCreate(&zsetDictType
,NULL
);
2511 zs
->zsl
= zslCreate();
2512 return createObject(REDIS_ZSET
,zs
);
2515 static void freeStringObject(robj
*o
) {
2516 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2521 static void freeListObject(robj
*o
) {
2522 listRelease((list
*) o
->ptr
);
2525 static void freeSetObject(robj
*o
) {
2526 dictRelease((dict
*) o
->ptr
);
2529 static void freeZsetObject(robj
*o
) {
2532 dictRelease(zs
->dict
);
2537 static void freeHashObject(robj
*o
) {
2538 dictRelease((dict
*) o
->ptr
);
2541 static void incrRefCount(robj
*o
) {
2542 redisAssert(!server
.vm_enabled
|| o
->storage
== REDIS_VM_MEMORY
);
2546 static void decrRefCount(void *obj
) {
2549 /* Object is a key of a swapped out value, or in the process of being
2551 if (server
.vm_enabled
&&
2552 (o
->storage
== REDIS_VM_SWAPPED
|| o
->storage
== REDIS_VM_LOADING
))
2554 if (o
->storage
== REDIS_VM_SWAPPED
|| o
->storage
== REDIS_VM_LOADING
) {
2555 redisAssert(o
->refcount
== 1);
2557 if (o
->storage
== REDIS_VM_LOADING
) vmCancelThreadedIOJob(obj
);
2558 redisAssert(o
->type
== REDIS_STRING
);
2559 freeStringObject(o
);
2560 vmMarkPagesFree(o
->vm
.page
,o
->vm
.usedpages
);
2561 pthread_mutex_lock(&server
.obj_freelist_mutex
);
2562 if (listLength(server
.objfreelist
) > REDIS_OBJFREELIST_MAX
||
2563 !listAddNodeHead(server
.objfreelist
,o
))
2565 pthread_mutex_unlock(&server
.obj_freelist_mutex
);
2566 server
.vm_stats_swapped_objects
--;
2569 /* Object is in memory, or in the process of being swapped out. */
2570 if (--(o
->refcount
) == 0) {
2571 if (server
.vm_enabled
&& o
->storage
== REDIS_VM_SWAPPING
)
2572 vmCancelThreadedIOJob(obj
);
2574 case REDIS_STRING
: freeStringObject(o
); break;
2575 case REDIS_LIST
: freeListObject(o
); break;
2576 case REDIS_SET
: freeSetObject(o
); break;
2577 case REDIS_ZSET
: freeZsetObject(o
); break;
2578 case REDIS_HASH
: freeHashObject(o
); break;
2579 default: redisAssert(0 != 0); break;
2581 if (server
.vm_enabled
) pthread_mutex_lock(&server
.obj_freelist_mutex
);
2582 if (listLength(server
.objfreelist
) > REDIS_OBJFREELIST_MAX
||
2583 !listAddNodeHead(server
.objfreelist
,o
))
2585 if (server
.vm_enabled
) pthread_mutex_unlock(&server
.obj_freelist_mutex
);
2589 static robj
*lookupKey(redisDb
*db
, robj
*key
) {
2590 dictEntry
*de
= dictFind(db
->dict
,key
);
2592 robj
*key
= dictGetEntryKey(de
);
2593 robj
*val
= dictGetEntryVal(de
);
2595 if (server
.vm_enabled
) {
2596 if (key
->storage
== REDIS_VM_MEMORY
||
2597 key
->storage
== REDIS_VM_SWAPPING
)
2599 /* If we were swapping the object out, stop it, this key
2601 if (key
->storage
== REDIS_VM_SWAPPING
)
2602 vmCancelThreadedIOJob(key
);
2603 /* Update the access time of the key for the aging algorithm. */
2604 key
->vm
.atime
= server
.unixtime
;
2606 int notify
= (key
->storage
== REDIS_VM_LOADING
);
2608 /* Our value was swapped on disk. Bring it at home. */
2609 redisAssert(val
== NULL
);
2610 val
= vmLoadObject(key
);
2611 dictGetEntryVal(de
) = val
;
2613 /* Clients blocked by the VM subsystem may be waiting for
2615 if (notify
) handleClientsBlockedOnSwappedKey(db
,key
);
2624 static robj
*lookupKeyRead(redisDb
*db
, robj
*key
) {
2625 expireIfNeeded(db
,key
);
2626 return lookupKey(db
,key
);
2629 static robj
*lookupKeyWrite(redisDb
*db
, robj
*key
) {
2630 deleteIfVolatile(db
,key
);
2631 return lookupKey(db
,key
);
2634 static int deleteKey(redisDb
*db
, robj
*key
) {
2637 /* We need to protect key from destruction: after the first dictDelete()
2638 * it may happen that 'key' is no longer valid if we don't increment
2639 * it's count. This may happen when we get the object reference directly
2640 * from the hash table with dictRandomKey() or dict iterators */
2642 if (dictSize(db
->expires
)) dictDelete(db
->expires
,key
);
2643 retval
= dictDelete(db
->dict
,key
);
2646 return retval
== DICT_OK
;
2649 /* Try to share an object against the shared objects pool */
2650 static robj
*tryObjectSharing(robj
*o
) {
2651 struct dictEntry
*de
;
2654 if (o
== NULL
|| server
.shareobjects
== 0) return o
;
2656 redisAssert(o
->type
== REDIS_STRING
);
2657 de
= dictFind(server
.sharingpool
,o
);
2659 robj
*shared
= dictGetEntryKey(de
);
2661 c
= ((unsigned long) dictGetEntryVal(de
))+1;
2662 dictGetEntryVal(de
) = (void*) c
;
2663 incrRefCount(shared
);
2667 /* Here we are using a stream algorihtm: Every time an object is
2668 * shared we increment its count, everytime there is a miss we
2669 * recrement the counter of a random object. If this object reaches
2670 * zero we remove the object and put the current object instead. */
2671 if (dictSize(server
.sharingpool
) >=
2672 server
.sharingpoolsize
) {
2673 de
= dictGetRandomKey(server
.sharingpool
);
2674 redisAssert(de
!= NULL
);
2675 c
= ((unsigned long) dictGetEntryVal(de
))-1;
2676 dictGetEntryVal(de
) = (void*) c
;
2678 dictDelete(server
.sharingpool
,de
->key
);
2681 c
= 0; /* If the pool is empty we want to add this object */
2686 retval
= dictAdd(server
.sharingpool
,o
,(void*)1);
2687 redisAssert(retval
== DICT_OK
);
2694 /* Check if the nul-terminated string 's' can be represented by a long
2695 * (that is, is a number that fits into long without any other space or
2696 * character before or after the digits).
2698 * If so, the function returns REDIS_OK and *longval is set to the value
2699 * of the number. Otherwise REDIS_ERR is returned */
2700 static int isStringRepresentableAsLong(sds s
, long *longval
) {
2701 char buf
[32], *endptr
;
2705 value
= strtol(s
, &endptr
, 10);
2706 if (endptr
[0] != '\0') return REDIS_ERR
;
2707 slen
= snprintf(buf
,32,"%ld",value
);
2709 /* If the number converted back into a string is not identical
2710 * then it's not possible to encode the string as integer */
2711 if (sdslen(s
) != (unsigned)slen
|| memcmp(buf
,s
,slen
)) return REDIS_ERR
;
2712 if (longval
) *longval
= value
;
2716 /* Try to encode a string object in order to save space */
2717 static int tryObjectEncoding(robj
*o
) {
2721 if (o
->encoding
!= REDIS_ENCODING_RAW
)
2722 return REDIS_ERR
; /* Already encoded */
2724 /* It's not save to encode shared objects: shared objects can be shared
2725 * everywhere in the "object space" of Redis. Encoded objects can only
2726 * appear as "values" (and not, for instance, as keys) */
2727 if (o
->refcount
> 1) return REDIS_ERR
;
2729 /* Currently we try to encode only strings */
2730 redisAssert(o
->type
== REDIS_STRING
);
2732 /* Check if we can represent this string as a long integer */
2733 if (isStringRepresentableAsLong(s
,&value
) == REDIS_ERR
) return REDIS_ERR
;
2735 /* Ok, this object can be encoded */
2736 o
->encoding
= REDIS_ENCODING_INT
;
2738 o
->ptr
= (void*) value
;
2742 /* Get a decoded version of an encoded object (returned as a new object).
2743 * If the object is already raw-encoded just increment the ref count. */
2744 static robj
*getDecodedObject(robj
*o
) {
2747 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2751 if (o
->type
== REDIS_STRING
&& o
->encoding
== REDIS_ENCODING_INT
) {
2754 snprintf(buf
,32,"%ld",(long)o
->ptr
);
2755 dec
= createStringObject(buf
,strlen(buf
));
2758 redisAssert(1 != 1);
2762 /* Compare two string objects via strcmp() or alike.
2763 * Note that the objects may be integer-encoded. In such a case we
2764 * use snprintf() to get a string representation of the numbers on the stack
2765 * and compare the strings, it's much faster than calling getDecodedObject().
2767 * Important note: if objects are not integer encoded, but binary-safe strings,
2768 * sdscmp() from sds.c will apply memcmp() so this function ca be considered
2770 static int compareStringObjects(robj
*a
, robj
*b
) {
2771 redisAssert(a
->type
== REDIS_STRING
&& b
->type
== REDIS_STRING
);
2772 char bufa
[128], bufb
[128], *astr
, *bstr
;
2775 if (a
== b
) return 0;
2776 if (a
->encoding
!= REDIS_ENCODING_RAW
) {
2777 snprintf(bufa
,sizeof(bufa
),"%ld",(long) a
->ptr
);
2783 if (b
->encoding
!= REDIS_ENCODING_RAW
) {
2784 snprintf(bufb
,sizeof(bufb
),"%ld",(long) b
->ptr
);
2790 return bothsds
? sdscmp(astr
,bstr
) : strcmp(astr
,bstr
);
2793 static size_t stringObjectLen(robj
*o
) {
2794 redisAssert(o
->type
== REDIS_STRING
);
2795 if (o
->encoding
== REDIS_ENCODING_RAW
) {
2796 return sdslen(o
->ptr
);
2800 return snprintf(buf
,32,"%ld",(long)o
->ptr
);
2804 /*============================ RDB saving/loading =========================== */
2806 static int rdbSaveType(FILE *fp
, unsigned char type
) {
2807 if (fwrite(&type
,1,1,fp
) == 0) return -1;
2811 static int rdbSaveTime(FILE *fp
, time_t t
) {
2812 int32_t t32
= (int32_t) t
;
2813 if (fwrite(&t32
,4,1,fp
) == 0) return -1;
2817 /* check rdbLoadLen() comments for more info */
2818 static int rdbSaveLen(FILE *fp
, uint32_t len
) {
2819 unsigned char buf
[2];
2822 /* Save a 6 bit len */
2823 buf
[0] = (len
&0xFF)|(REDIS_RDB_6BITLEN
<<6);
2824 if (fwrite(buf
,1,1,fp
) == 0) return -1;
2825 } else if (len
< (1<<14)) {
2826 /* Save a 14 bit len */
2827 buf
[0] = ((len
>>8)&0xFF)|(REDIS_RDB_14BITLEN
<<6);
2829 if (fwrite(buf
,2,1,fp
) == 0) return -1;
2831 /* Save a 32 bit len */
2832 buf
[0] = (REDIS_RDB_32BITLEN
<<6);
2833 if (fwrite(buf
,1,1,fp
) == 0) return -1;
2835 if (fwrite(&len
,4,1,fp
) == 0) return -1;
2840 /* String objects in the form "2391" "-100" without any space and with a
2841 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2842 * encoded as integers to save space */
2843 static int rdbTryIntegerEncoding(sds s
, unsigned char *enc
) {
2845 char *endptr
, buf
[32];
2847 /* Check if it's possible to encode this value as a number */
2848 value
= strtoll(s
, &endptr
, 10);
2849 if (endptr
[0] != '\0') return 0;
2850 snprintf(buf
,32,"%lld",value
);
2852 /* If the number converted back into a string is not identical
2853 * then it's not possible to encode the string as integer */
2854 if (strlen(buf
) != sdslen(s
) || memcmp(buf
,s
,sdslen(s
))) return 0;
2856 /* Finally check if it fits in our ranges */
2857 if (value
>= -(1<<7) && value
<= (1<<7)-1) {
2858 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT8
;
2859 enc
[1] = value
&0xFF;
2861 } else if (value
>= -(1<<15) && value
<= (1<<15)-1) {
2862 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT16
;
2863 enc
[1] = value
&0xFF;
2864 enc
[2] = (value
>>8)&0xFF;
2866 } else if (value
>= -((long long)1<<31) && value
<= ((long long)1<<31)-1) {
2867 enc
[0] = (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_INT32
;
2868 enc
[1] = value
&0xFF;
2869 enc
[2] = (value
>>8)&0xFF;
2870 enc
[3] = (value
>>16)&0xFF;
2871 enc
[4] = (value
>>24)&0xFF;
2878 static int rdbSaveLzfStringObject(FILE *fp
, robj
*obj
) {
2879 unsigned int comprlen
, outlen
;
2883 /* We require at least four bytes compression for this to be worth it */
2884 outlen
= sdslen(obj
->ptr
)-4;
2885 if (outlen
<= 0) return 0;
2886 if ((out
= zmalloc(outlen
+1)) == NULL
) return 0;
2887 comprlen
= lzf_compress(obj
->ptr
, sdslen(obj
->ptr
), out
, outlen
);
2888 if (comprlen
== 0) {
2892 /* Data compressed! Let's save it on disk */
2893 byte
= (REDIS_RDB_ENCVAL
<<6)|REDIS_RDB_ENC_LZF
;
2894 if (fwrite(&byte
,1,1,fp
) == 0) goto writeerr
;
2895 if (rdbSaveLen(fp
,comprlen
) == -1) goto writeerr
;
2896 if (rdbSaveLen(fp
,sdslen(obj
->ptr
)) == -1) goto writeerr
;
2897 if (fwrite(out
,comprlen
,1,fp
) == 0) goto writeerr
;
2906 /* Save a string objet as [len][data] on disk. If the object is a string
2907 * representation of an integer value we try to safe it in a special form */
2908 static int rdbSaveStringObjectRaw(FILE *fp
, robj
*obj
) {
2912 len
= sdslen(obj
->ptr
);
2914 /* Try integer encoding */
2916 unsigned char buf
[5];
2917 if ((enclen
= rdbTryIntegerEncoding(obj
->ptr
,buf
)) > 0) {
2918 if (fwrite(buf
,enclen
,1,fp
) == 0) return -1;
2923 /* Try LZF compression - under 20 bytes it's unable to compress even
2924 * aaaaaaaaaaaaaaaaaa so skip it */
2925 if (server
.rdbcompression
&& len
> 20) {
2928 retval
= rdbSaveLzfStringObject(fp
,obj
);
2929 if (retval
== -1) return -1;
2930 if (retval
> 0) return 0;
2931 /* retval == 0 means data can't be compressed, save the old way */
2934 /* Store verbatim */
2935 if (rdbSaveLen(fp
,len
) == -1) return -1;
2936 if (len
&& fwrite(obj
->ptr
,len
,1,fp
) == 0) return -1;
2940 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2941 static int rdbSaveStringObject(FILE *fp
, robj
*obj
) {
2944 /* Avoid incr/decr ref count business when possible.
2945 * This plays well with copy-on-write given that we are probably
2946 * in a child process (BGSAVE). Also this makes sure key objects
2947 * of swapped objects are not incRefCount-ed (an assert does not allow
2948 * this in order to avoid bugs) */
2949 if (obj
->encoding
!= REDIS_ENCODING_RAW
) {
2950 obj
= getDecodedObject(obj
);
2951 retval
= rdbSaveStringObjectRaw(fp
,obj
);
2954 retval
= rdbSaveStringObjectRaw(fp
,obj
);
2959 /* Save a double value. Doubles are saved as strings prefixed by an unsigned
2960 * 8 bit integer specifing the length of the representation.
2961 * This 8 bit integer has special values in order to specify the following
2967 static int rdbSaveDoubleValue(FILE *fp
, double val
) {
2968 unsigned char buf
[128];
2974 } else if (!isfinite(val
)) {
2976 buf
[0] = (val
< 0) ? 255 : 254;
2978 snprintf((char*)buf
+1,sizeof(buf
)-1,"%.17g",val
);
2979 buf
[0] = strlen((char*)buf
+1);
2982 if (fwrite(buf
,len
,1,fp
) == 0) return -1;
2986 /* Save a Redis object. */
2987 static int rdbSaveObject(FILE *fp
, robj
*o
) {
2988 if (o
->type
== REDIS_STRING
) {
2989 /* Save a string value */
2990 if (rdbSaveStringObject(fp
,o
) == -1) return -1;
2991 } else if (o
->type
== REDIS_LIST
) {
2992 /* Save a list value */
2993 list
*list
= o
->ptr
;
2997 if (rdbSaveLen(fp
,listLength(list
)) == -1) return -1;
2998 listRewind(list
,&li
);
2999 while((ln
= listNext(&li
))) {
3000 robj
*eleobj
= listNodeValue(ln
);
3002 if (rdbSaveStringObject(fp
,eleobj
) == -1) return -1;
3004 } else if (o
->type
== REDIS_SET
) {
3005 /* Save a set value */
3007 dictIterator
*di
= dictGetIterator(set
);
3010 if (rdbSaveLen(fp
,dictSize(set
)) == -1) return -1;
3011 while((de
= dictNext(di
)) != NULL
) {
3012 robj
*eleobj
= dictGetEntryKey(de
);
3014 if (rdbSaveStringObject(fp
,eleobj
) == -1) return -1;
3016 dictReleaseIterator(di
);
3017 } else if (o
->type
== REDIS_ZSET
) {
3018 /* Save a set value */
3020 dictIterator
*di
= dictGetIterator(zs
->dict
);
3023 if (rdbSaveLen(fp
,dictSize(zs
->dict
)) == -1) return -1;
3024 while((de
= dictNext(di
)) != NULL
) {
3025 robj
*eleobj
= dictGetEntryKey(de
);
3026 double *score
= dictGetEntryVal(de
);
3028 if (rdbSaveStringObject(fp
,eleobj
) == -1) return -1;
3029 if (rdbSaveDoubleValue(fp
,*score
) == -1) return -1;
3031 dictReleaseIterator(di
);
3033 redisAssert(0 != 0);
3038 /* Return the length the object will have on disk if saved with
3039 * the rdbSaveObject() function. Currently we use a trick to get
3040 * this length with very little changes to the code. In the future
3041 * we could switch to a faster solution. */
3042 static off_t
rdbSavedObjectLen(robj
*o
, FILE *fp
) {
3043 if (fp
== NULL
) fp
= server
.devnull
;
3045 assert(rdbSaveObject(fp
,o
) != 1);
3049 /* Return the number of pages required to save this object in the swap file */
3050 static off_t
rdbSavedObjectPages(robj
*o
, FILE *fp
) {
3051 off_t bytes
= rdbSavedObjectLen(o
,fp
);
3053 return (bytes
+(server
.vm_page_size
-1))/server
.vm_page_size
;
3056 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
3057 static int rdbSave(char *filename
) {
3058 dictIterator
*di
= NULL
;
3063 time_t now
= time(NULL
);
3065 /* Wait for I/O therads to terminate, just in case this is a
3066 * foreground-saving, to avoid seeking the swap file descriptor at the
3068 if (server
.vm_enabled
)
3069 waitEmptyIOJobsQueue();
3071 snprintf(tmpfile
,256,"temp-%d.rdb", (int) getpid());
3072 fp
= fopen(tmpfile
,"w");
3074 redisLog(REDIS_WARNING
, "Failed saving the DB: %s", strerror(errno
));
3077 if (fwrite("REDIS0001",9,1,fp
) == 0) goto werr
;
3078 for (j
= 0; j
< server
.dbnum
; j
++) {
3079 redisDb
*db
= server
.db
+j
;
3081 if (dictSize(d
) == 0) continue;
3082 di
= dictGetIterator(d
);
3088 /* Write the SELECT DB opcode */
3089 if (rdbSaveType(fp
,REDIS_SELECTDB
) == -1) goto werr
;
3090 if (rdbSaveLen(fp
,j
) == -1) goto werr
;
3092 /* Iterate this DB writing every entry */
3093 while((de
= dictNext(di
)) != NULL
) {
3094 robj
*key
= dictGetEntryKey(de
);
3095 robj
*o
= dictGetEntryVal(de
);
3096 time_t expiretime
= getExpire(db
,key
);
3098 /* Save the expire time */
3099 if (expiretime
!= -1) {
3100 /* If this key is already expired skip it */
3101 if (expiretime
< now
) continue;
3102 if (rdbSaveType(fp
,REDIS_EXPIRETIME
) == -1) goto werr
;
3103 if (rdbSaveTime(fp
,expiretime
) == -1) goto werr
;
3105 /* Save the key and associated value. This requires special
3106 * handling if the value is swapped out. */
3107 if (!server
.vm_enabled
|| key
->storage
== REDIS_VM_MEMORY
||
3108 key
->storage
== REDIS_VM_SWAPPING
) {
3109 /* Save type, key, value */
3110 if (rdbSaveType(fp
,o
->type
) == -1) goto werr
;
3111 if (rdbSaveStringObject(fp
,key
) == -1) goto werr
;
3112 if (rdbSaveObject(fp
,o
) == -1) goto werr
;
3114 /* REDIS_VM_SWAPPED or REDIS_VM_LOADING */
3116 /* Get a preview of the object in memory */
3117 po
= vmPreviewObject(key
);
3118 /* Save type, key, value */
3119 if (rdbSaveType(fp
,key
->vtype
) == -1) goto werr
;
3120 if (rdbSaveStringObject(fp
,key
) == -1) goto werr
;
3121 if (rdbSaveObject(fp
,po
) == -1) goto werr
;
3122 /* Remove the loaded object from memory */
3126 dictReleaseIterator(di
);
3129 if (rdbSaveType(fp
,REDIS_EOF
) == -1) goto werr
;
3131 /* Make sure data will not remain on the OS's output buffers */
3136 /* Use RENAME to make sure the DB file is changed atomically only
3137 * if the generate DB file is ok. */
3138 if (rename(tmpfile
,filename
) == -1) {
3139 redisLog(REDIS_WARNING
,"Error moving temp DB file on the final destination: %s", strerror(errno
));
3143 redisLog(REDIS_NOTICE
,"DB saved on disk");
3145 server
.lastsave
= time(NULL
);
3151 redisLog(REDIS_WARNING
,"Write error saving DB on disk: %s", strerror(errno
));
3152 if (di
) dictReleaseIterator(di
);
3156 static int rdbSaveBackground(char *filename
) {
3159 if (server
.bgsavechildpid
!= -1) return REDIS_ERR
;
3160 if (server
.vm_enabled
) waitEmptyIOJobsQueue();
3161 if ((childpid
= fork()) == 0) {
3163 if (server
.vm_enabled
) vmReopenSwapFile();
3165 if (rdbSave(filename
) == REDIS_OK
) {
3172 if (childpid
== -1) {
3173 redisLog(REDIS_WARNING
,"Can't save in background: fork: %s",
3177 redisLog(REDIS_NOTICE
,"Background saving started by pid %d",childpid
);
3178 server
.bgsavechildpid
= childpid
;
3181 return REDIS_OK
; /* unreached */
3184 static void rdbRemoveTempFile(pid_t childpid
) {
3187 snprintf(tmpfile
,256,"temp-%d.rdb", (int) childpid
);
3191 static int rdbLoadType(FILE *fp
) {
3193 if (fread(&type
,1,1,fp
) == 0) return -1;
3197 static time_t rdbLoadTime(FILE *fp
) {
3199 if (fread(&t32
,4,1,fp
) == 0) return -1;
3200 return (time_t) t32
;
3203 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
3204 * of this file for a description of how this are stored on disk.
3206 * isencoded is set to 1 if the readed length is not actually a length but
3207 * an "encoding type", check the above comments for more info */
3208 static uint32_t rdbLoadLen(FILE *fp
, int *isencoded
) {
3209 unsigned char buf
[2];
3213 if (isencoded
) *isencoded
= 0;
3214 if (fread(buf
,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
3215 type
= (buf
[0]&0xC0)>>6;
3216 if (type
== REDIS_RDB_6BITLEN
) {
3217 /* Read a 6 bit len */
3219 } else if (type
== REDIS_RDB_ENCVAL
) {
3220 /* Read a 6 bit len encoding type */
3221 if (isencoded
) *isencoded
= 1;
3223 } else if (type
== REDIS_RDB_14BITLEN
) {
3224 /* Read a 14 bit len */
3225 if (fread(buf
+1,1,1,fp
) == 0) return REDIS_RDB_LENERR
;
3226 return ((buf
[0]&0x3F)<<8)|buf
[1];
3228 /* Read a 32 bit len */
3229 if (fread(&len
,4,1,fp
) == 0) return REDIS_RDB_LENERR
;
3234 static robj
*rdbLoadIntegerObject(FILE *fp
, int enctype
) {
3235 unsigned char enc
[4];
3238 if (enctype
== REDIS_RDB_ENC_INT8
) {
3239 if (fread(enc
,1,1,fp
) == 0) return NULL
;
3240 val
= (signed char)enc
[0];
3241 } else if (enctype
== REDIS_RDB_ENC_INT16
) {
3243 if (fread(enc
,2,1,fp
) == 0) return NULL
;
3244 v
= enc
[0]|(enc
[1]<<8);
3246 } else if (enctype
== REDIS_RDB_ENC_INT32
) {
3248 if (fread(enc
,4,1,fp
) == 0) return NULL
;
3249 v
= enc
[0]|(enc
[1]<<8)|(enc
[2]<<16)|(enc
[3]<<24);
3252 val
= 0; /* anti-warning */
3255 return createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",val
));
3258 static robj
*rdbLoadLzfStringObject(FILE*fp
) {
3259 unsigned int len
, clen
;
3260 unsigned char *c
= NULL
;
3263 if ((clen
= rdbLoadLen(fp
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
3264 if ((len
= rdbLoadLen(fp
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
3265 if ((c
= zmalloc(clen
)) == NULL
) goto err
;
3266 if ((val
= sdsnewlen(NULL
,len
)) == NULL
) goto err
;
3267 if (fread(c
,clen
,1,fp
) == 0) goto err
;
3268 if (lzf_decompress(c
,clen
,val
,len
) == 0) goto err
;
3270 return createObject(REDIS_STRING
,val
);
3277 static robj
*rdbLoadStringObject(FILE*fp
) {
3282 len
= rdbLoadLen(fp
,&isencoded
);
3285 case REDIS_RDB_ENC_INT8
:
3286 case REDIS_RDB_ENC_INT16
:
3287 case REDIS_RDB_ENC_INT32
:
3288 return tryObjectSharing(rdbLoadIntegerObject(fp
,len
));
3289 case REDIS_RDB_ENC_LZF
:
3290 return tryObjectSharing(rdbLoadLzfStringObject(fp
));
3296 if (len
== REDIS_RDB_LENERR
) return NULL
;
3297 val
= sdsnewlen(NULL
,len
);
3298 if (len
&& fread(val
,len
,1,fp
) == 0) {
3302 return tryObjectSharing(createObject(REDIS_STRING
,val
));
3305 /* For information about double serialization check rdbSaveDoubleValue() */
3306 static int rdbLoadDoubleValue(FILE *fp
, double *val
) {
3310 if (fread(&len
,1,1,fp
) == 0) return -1;
3312 case 255: *val
= R_NegInf
; return 0;
3313 case 254: *val
= R_PosInf
; return 0;
3314 case 253: *val
= R_Nan
; return 0;
3316 if (fread(buf
,len
,1,fp
) == 0) return -1;
3318 sscanf(buf
, "%lg", val
);
3323 /* Load a Redis object of the specified type from the specified file.
3324 * On success a newly allocated object is returned, otherwise NULL. */
3325 static robj
*rdbLoadObject(int type
, FILE *fp
) {
3328 if (type
== REDIS_STRING
) {
3329 /* Read string value */
3330 if ((o
= rdbLoadStringObject(fp
)) == NULL
) return NULL
;
3331 tryObjectEncoding(o
);
3332 } else if (type
== REDIS_LIST
|| type
== REDIS_SET
) {
3333 /* Read list/set value */
3336 if ((listlen
= rdbLoadLen(fp
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
3337 o
= (type
== REDIS_LIST
) ? createListObject() : createSetObject();
3338 /* It's faster to expand the dict to the right size asap in order
3339 * to avoid rehashing */
3340 if (type
== REDIS_SET
&& listlen
> DICT_HT_INITIAL_SIZE
)
3341 dictExpand(o
->ptr
,listlen
);
3342 /* Load every single element of the list/set */
3346 if ((ele
= rdbLoadStringObject(fp
)) == NULL
) return NULL
;
3347 tryObjectEncoding(ele
);
3348 if (type
== REDIS_LIST
) {
3349 listAddNodeTail((list
*)o
->ptr
,ele
);
3351 dictAdd((dict
*)o
->ptr
,ele
,NULL
);
3354 } else if (type
== REDIS_ZSET
) {
3355 /* Read list/set value */
3359 if ((zsetlen
= rdbLoadLen(fp
,NULL
)) == REDIS_RDB_LENERR
) return NULL
;
3360 o
= createZsetObject();
3362 /* Load every single element of the list/set */
3365 double *score
= zmalloc(sizeof(double));
3367 if ((ele
= rdbLoadStringObject(fp
)) == NULL
) return NULL
;
3368 tryObjectEncoding(ele
);
3369 if (rdbLoadDoubleValue(fp
,score
) == -1) return NULL
;
3370 dictAdd(zs
->dict
,ele
,score
);
3371 zslInsert(zs
->zsl
,*score
,ele
);
3372 incrRefCount(ele
); /* added to skiplist */
3375 redisAssert(0 != 0);
3380 static int rdbLoad(char *filename
) {
3382 robj
*keyobj
= NULL
;
3384 int type
, retval
, rdbver
;
3385 dict
*d
= server
.db
[0].dict
;
3386 redisDb
*db
= server
.db
+0;
3388 time_t expiretime
= -1, now
= time(NULL
);
3389 long long loadedkeys
= 0;
3391 fp
= fopen(filename
,"r");
3392 if (!fp
) return REDIS_ERR
;
3393 if (fread(buf
,9,1,fp
) == 0) goto eoferr
;
3395 if (memcmp(buf
,"REDIS",5) != 0) {
3397 redisLog(REDIS_WARNING
,"Wrong signature trying to load DB from file");
3400 rdbver
= atoi(buf
+5);
3403 redisLog(REDIS_WARNING
,"Can't handle RDB format version %d",rdbver
);
3410 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
3411 if (type
== REDIS_EXPIRETIME
) {
3412 if ((expiretime
= rdbLoadTime(fp
)) == -1) goto eoferr
;
3413 /* We read the time so we need to read the object type again */
3414 if ((type
= rdbLoadType(fp
)) == -1) goto eoferr
;
3416 if (type
== REDIS_EOF
) break;
3417 /* Handle SELECT DB opcode as a special case */
3418 if (type
== REDIS_SELECTDB
) {
3419 if ((dbid
= rdbLoadLen(fp
,NULL
)) == REDIS_RDB_LENERR
)
3421 if (dbid
>= (unsigned)server
.dbnum
) {
3422 redisLog(REDIS_WARNING
,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server
.dbnum
);
3425 db
= server
.db
+dbid
;
3430 if ((keyobj
= rdbLoadStringObject(fp
)) == NULL
) goto eoferr
;
3432 if ((o
= rdbLoadObject(type
,fp
)) == NULL
) goto eoferr
;
3433 /* Add the new object in the hash table */
3434 retval
= dictAdd(d
,keyobj
,o
);
3435 if (retval
== DICT_ERR
) {
3436 redisLog(REDIS_WARNING
,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj
->ptr
);
3439 /* Set the expire time if needed */
3440 if (expiretime
!= -1) {
3441 setExpire(db
,keyobj
,expiretime
);
3442 /* Delete this key if already expired */
3443 if (expiretime
< now
) deleteKey(db
,keyobj
);
3447 /* Handle swapping while loading big datasets when VM is on */
3449 if (server
.vm_enabled
&& (loadedkeys
% 5000) == 0) {
3450 while (zmalloc_used_memory() > server
.vm_max_memory
) {
3451 if (vmSwapOneObjectBlocking() == REDIS_ERR
) break;
3458 eoferr
: /* unexpected end of file is handled here with a fatal exit */
3459 if (keyobj
) decrRefCount(keyobj
);
3460 redisLog(REDIS_WARNING
,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
3462 return REDIS_ERR
; /* Just to avoid warning */
3465 /*================================== Commands =============================== */
3467 static void authCommand(redisClient
*c
) {
3468 if (!server
.requirepass
|| !strcmp(c
->argv
[1]->ptr
, server
.requirepass
)) {
3469 c
->authenticated
= 1;
3470 addReply(c
,shared
.ok
);
3472 c
->authenticated
= 0;
3473 addReplySds(c
,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
3477 static void pingCommand(redisClient
*c
) {
3478 addReply(c
,shared
.pong
);
3481 static void echoCommand(redisClient
*c
) {
3482 addReplyBulkLen(c
,c
->argv
[1]);
3483 addReply(c
,c
->argv
[1]);
3484 addReply(c
,shared
.crlf
);
3487 /*=================================== Strings =============================== */
3489 static void setGenericCommand(redisClient
*c
, int nx
) {
3492 if (nx
) deleteIfVolatile(c
->db
,c
->argv
[1]);
3493 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
3494 if (retval
== DICT_ERR
) {
3496 /* If the key is about a swapped value, we want a new key object
3497 * to overwrite the old. So we delete the old key in the database.
3498 * This will also make sure that swap pages about the old object
3499 * will be marked as free. */
3500 if (deleteIfSwapped(c
->db
,c
->argv
[1]))
3501 incrRefCount(c
->argv
[1]);
3502 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
3503 incrRefCount(c
->argv
[2]);
3505 addReply(c
,shared
.czero
);
3509 incrRefCount(c
->argv
[1]);
3510 incrRefCount(c
->argv
[2]);
3513 removeExpire(c
->db
,c
->argv
[1]);
3514 addReply(c
, nx
? shared
.cone
: shared
.ok
);
3517 static void setCommand(redisClient
*c
) {
3518 setGenericCommand(c
,0);
3521 static void setnxCommand(redisClient
*c
) {
3522 setGenericCommand(c
,1);
3525 static int getGenericCommand(redisClient
*c
) {
3526 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3529 addReply(c
,shared
.nullbulk
);
3532 if (o
->type
!= REDIS_STRING
) {
3533 addReply(c
,shared
.wrongtypeerr
);
3536 addReplyBulkLen(c
,o
);
3538 addReply(c
,shared
.crlf
);
3544 static void getCommand(redisClient
*c
) {
3545 getGenericCommand(c
);
3548 static void getsetCommand(redisClient
*c
) {
3549 if (getGenericCommand(c
) == REDIS_ERR
) return;
3550 if (dictAdd(c
->db
->dict
,c
->argv
[1],c
->argv
[2]) == DICT_ERR
) {
3551 dictReplace(c
->db
->dict
,c
->argv
[1],c
->argv
[2]);
3553 incrRefCount(c
->argv
[1]);
3555 incrRefCount(c
->argv
[2]);
3557 removeExpire(c
->db
,c
->argv
[1]);
3560 static void mgetCommand(redisClient
*c
) {
3563 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",c
->argc
-1));
3564 for (j
= 1; j
< c
->argc
; j
++) {
3565 robj
*o
= lookupKeyRead(c
->db
,c
->argv
[j
]);
3567 addReply(c
,shared
.nullbulk
);
3569 if (o
->type
!= REDIS_STRING
) {
3570 addReply(c
,shared
.nullbulk
);
3572 addReplyBulkLen(c
,o
);
3574 addReply(c
,shared
.crlf
);
3580 static void msetGenericCommand(redisClient
*c
, int nx
) {
3581 int j
, busykeys
= 0;
3583 if ((c
->argc
% 2) == 0) {
3584 addReplySds(c
,sdsnew("-ERR wrong number of arguments for MSET\r\n"));
3587 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
3588 * set nothing at all if at least one already key exists. */
3590 for (j
= 1; j
< c
->argc
; j
+= 2) {
3591 if (lookupKeyWrite(c
->db
,c
->argv
[j
]) != NULL
) {
3597 addReply(c
, shared
.czero
);
3601 for (j
= 1; j
< c
->argc
; j
+= 2) {
3604 tryObjectEncoding(c
->argv
[j
+1]);
3605 retval
= dictAdd(c
->db
->dict
,c
->argv
[j
],c
->argv
[j
+1]);
3606 if (retval
== DICT_ERR
) {
3607 dictReplace(c
->db
->dict
,c
->argv
[j
],c
->argv
[j
+1]);
3608 incrRefCount(c
->argv
[j
+1]);
3610 incrRefCount(c
->argv
[j
]);
3611 incrRefCount(c
->argv
[j
+1]);
3613 removeExpire(c
->db
,c
->argv
[j
]);
3615 server
.dirty
+= (c
->argc
-1)/2;
3616 addReply(c
, nx
? shared
.cone
: shared
.ok
);
3619 static void msetCommand(redisClient
*c
) {
3620 msetGenericCommand(c
,0);
3623 static void msetnxCommand(redisClient
*c
) {
3624 msetGenericCommand(c
,1);
3627 static void incrDecrCommand(redisClient
*c
, long long incr
) {
3632 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3636 if (o
->type
!= REDIS_STRING
) {
3641 if (o
->encoding
== REDIS_ENCODING_RAW
)
3642 value
= strtoll(o
->ptr
, &eptr
, 10);
3643 else if (o
->encoding
== REDIS_ENCODING_INT
)
3644 value
= (long)o
->ptr
;
3646 redisAssert(1 != 1);
3651 o
= createObject(REDIS_STRING
,sdscatprintf(sdsempty(),"%lld",value
));
3652 tryObjectEncoding(o
);
3653 retval
= dictAdd(c
->db
->dict
,c
->argv
[1],o
);
3654 if (retval
== DICT_ERR
) {
3655 dictReplace(c
->db
->dict
,c
->argv
[1],o
);
3656 removeExpire(c
->db
,c
->argv
[1]);
3658 incrRefCount(c
->argv
[1]);
3661 addReply(c
,shared
.colon
);
3663 addReply(c
,shared
.crlf
);
3666 static void incrCommand(redisClient
*c
) {
3667 incrDecrCommand(c
,1);
3670 static void decrCommand(redisClient
*c
) {
3671 incrDecrCommand(c
,-1);
3674 static void incrbyCommand(redisClient
*c
) {
3675 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
3676 incrDecrCommand(c
,incr
);
3679 static void decrbyCommand(redisClient
*c
) {
3680 long long incr
= strtoll(c
->argv
[2]->ptr
, NULL
, 10);
3681 incrDecrCommand(c
,-incr
);
3684 /* ========================= Type agnostic commands ========================= */
3686 static void delCommand(redisClient
*c
) {
3689 for (j
= 1; j
< c
->argc
; j
++) {
3690 if (deleteKey(c
->db
,c
->argv
[j
])) {
3697 addReply(c
,shared
.czero
);
3700 addReply(c
,shared
.cone
);
3703 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",deleted
));
3708 static void existsCommand(redisClient
*c
) {
3709 addReply(c
,lookupKeyRead(c
->db
,c
->argv
[1]) ? shared
.cone
: shared
.czero
);
3712 static void selectCommand(redisClient
*c
) {
3713 int id
= atoi(c
->argv
[1]->ptr
);
3715 if (selectDb(c
,id
) == REDIS_ERR
) {
3716 addReplySds(c
,sdsnew("-ERR invalid DB index\r\n"));
3718 addReply(c
,shared
.ok
);
3722 static void randomkeyCommand(redisClient
*c
) {
3726 de
= dictGetRandomKey(c
->db
->dict
);
3727 if (!de
|| expireIfNeeded(c
->db
,dictGetEntryKey(de
)) == 0) break;
3730 addReply(c
,shared
.plus
);
3731 addReply(c
,shared
.crlf
);
3733 addReply(c
,shared
.plus
);
3734 addReply(c
,dictGetEntryKey(de
));
3735 addReply(c
,shared
.crlf
);
3739 static void keysCommand(redisClient
*c
) {
3742 sds pattern
= c
->argv
[1]->ptr
;
3743 int plen
= sdslen(pattern
);
3744 unsigned long numkeys
= 0, keyslen
= 0;
3745 robj
*lenobj
= createObject(REDIS_STRING
,NULL
);
3747 di
= dictGetIterator(c
->db
->dict
);
3749 decrRefCount(lenobj
);
3750 while((de
= dictNext(di
)) != NULL
) {
3751 robj
*keyobj
= dictGetEntryKey(de
);
3753 sds key
= keyobj
->ptr
;
3754 if ((pattern
[0] == '*' && pattern
[1] == '\0') ||
3755 stringmatchlen(pattern
,plen
,key
,sdslen(key
),0)) {
3756 if (expireIfNeeded(c
->db
,keyobj
) == 0) {
3758 addReply(c
,shared
.space
);
3761 keyslen
+= sdslen(key
);
3765 dictReleaseIterator(di
);
3766 lenobj
->ptr
= sdscatprintf(sdsempty(),"$%lu\r\n",keyslen
+(numkeys
? (numkeys
-1) : 0));
3767 addReply(c
,shared
.crlf
);
3770 static void dbsizeCommand(redisClient
*c
) {
3772 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c
->db
->dict
)));
3775 static void lastsaveCommand(redisClient
*c
) {
3777 sdscatprintf(sdsempty(),":%lu\r\n",server
.lastsave
));
3780 static void typeCommand(redisClient
*c
) {
3784 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3789 case REDIS_STRING
: type
= "+string"; break;
3790 case REDIS_LIST
: type
= "+list"; break;
3791 case REDIS_SET
: type
= "+set"; break;
3792 case REDIS_ZSET
: type
= "+zset"; break;
3793 default: type
= "unknown"; break;
3796 addReplySds(c
,sdsnew(type
));
3797 addReply(c
,shared
.crlf
);
3800 static void saveCommand(redisClient
*c
) {
3801 if (server
.bgsavechildpid
!= -1) {
3802 addReplySds(c
,sdsnew("-ERR background save in progress\r\n"));
3805 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
3806 addReply(c
,shared
.ok
);
3808 addReply(c
,shared
.err
);
3812 static void bgsaveCommand(redisClient
*c
) {
3813 if (server
.bgsavechildpid
!= -1) {
3814 addReplySds(c
,sdsnew("-ERR background save already in progress\r\n"));
3817 if (rdbSaveBackground(server
.dbfilename
) == REDIS_OK
) {
3818 char *status
= "+Background saving started\r\n";
3819 addReplySds(c
,sdsnew(status
));
3821 addReply(c
,shared
.err
);
3825 static void shutdownCommand(redisClient
*c
) {
3826 redisLog(REDIS_WARNING
,"User requested shutdown, saving DB...");
3827 /* Kill the saving child if there is a background saving in progress.
3828 We want to avoid race conditions, for instance our saving child may
3829 overwrite the synchronous saving did by SHUTDOWN. */
3830 if (server
.bgsavechildpid
!= -1) {
3831 redisLog(REDIS_WARNING
,"There is a live saving child. Killing it!");
3832 kill(server
.bgsavechildpid
,SIGKILL
);
3833 rdbRemoveTempFile(server
.bgsavechildpid
);
3835 if (server
.appendonly
) {
3836 /* Append only file: fsync() the AOF and exit */
3837 fsync(server
.appendfd
);
3838 if (server
.vm_enabled
) unlink(server
.vm_swap_file
);
3841 /* Snapshotting. Perform a SYNC SAVE and exit */
3842 if (rdbSave(server
.dbfilename
) == REDIS_OK
) {
3843 if (server
.daemonize
)
3844 unlink(server
.pidfile
);
3845 redisLog(REDIS_WARNING
,"%zu bytes used at exit",zmalloc_used_memory());
3846 redisLog(REDIS_WARNING
,"Server exit now, bye bye...");
3847 if (server
.vm_enabled
) unlink(server
.vm_swap_file
);
3850 /* Ooops.. error saving! The best we can do is to continue operating.
3851 * Note that if there was a background saving process, in the next
3852 * cron() Redis will be notified that the background saving aborted,
3853 * handling special stuff like slaves pending for synchronization... */
3854 redisLog(REDIS_WARNING
,"Error trying to save the DB, can't exit");
3855 addReplySds(c
,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3860 static void renameGenericCommand(redisClient
*c
, int nx
) {
3863 /* To use the same key as src and dst is probably an error */
3864 if (sdscmp(c
->argv
[1]->ptr
,c
->argv
[2]->ptr
) == 0) {
3865 addReply(c
,shared
.sameobjecterr
);
3869 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3871 addReply(c
,shared
.nokeyerr
);
3875 deleteIfVolatile(c
->db
,c
->argv
[2]);
3876 if (dictAdd(c
->db
->dict
,c
->argv
[2],o
) == DICT_ERR
) {
3879 addReply(c
,shared
.czero
);
3882 dictReplace(c
->db
->dict
,c
->argv
[2],o
);
3884 incrRefCount(c
->argv
[2]);
3886 deleteKey(c
->db
,c
->argv
[1]);
3888 addReply(c
,nx
? shared
.cone
: shared
.ok
);
3891 static void renameCommand(redisClient
*c
) {
3892 renameGenericCommand(c
,0);
3895 static void renamenxCommand(redisClient
*c
) {
3896 renameGenericCommand(c
,1);
3899 static void moveCommand(redisClient
*c
) {
3904 /* Obtain source and target DB pointers */
3907 if (selectDb(c
,atoi(c
->argv
[2]->ptr
)) == REDIS_ERR
) {
3908 addReply(c
,shared
.outofrangeerr
);
3912 selectDb(c
,srcid
); /* Back to the source DB */
3914 /* If the user is moving using as target the same
3915 * DB as the source DB it is probably an error. */
3917 addReply(c
,shared
.sameobjecterr
);
3921 /* Check if the element exists and get a reference */
3922 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3924 addReply(c
,shared
.czero
);
3928 /* Try to add the element to the target DB */
3929 deleteIfVolatile(dst
,c
->argv
[1]);
3930 if (dictAdd(dst
->dict
,c
->argv
[1],o
) == DICT_ERR
) {
3931 addReply(c
,shared
.czero
);
3934 incrRefCount(c
->argv
[1]);
3937 /* OK! key moved, free the entry in the source DB */
3938 deleteKey(src
,c
->argv
[1]);
3940 addReply(c
,shared
.cone
);
3943 /* =================================== Lists ================================ */
3944 static void pushGenericCommand(redisClient
*c
, int where
) {
3948 lobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
3950 if (handleClientsWaitingListPush(c
,c
->argv
[1],c
->argv
[2])) {
3951 addReply(c
,shared
.ok
);
3954 lobj
= createListObject();
3956 if (where
== REDIS_HEAD
) {
3957 listAddNodeHead(list
,c
->argv
[2]);
3959 listAddNodeTail(list
,c
->argv
[2]);
3961 dictAdd(c
->db
->dict
,c
->argv
[1],lobj
);
3962 incrRefCount(c
->argv
[1]);
3963 incrRefCount(c
->argv
[2]);
3965 if (lobj
->type
!= REDIS_LIST
) {
3966 addReply(c
,shared
.wrongtypeerr
);
3969 if (handleClientsWaitingListPush(c
,c
->argv
[1],c
->argv
[2])) {
3970 addReply(c
,shared
.ok
);
3974 if (where
== REDIS_HEAD
) {
3975 listAddNodeHead(list
,c
->argv
[2]);
3977 listAddNodeTail(list
,c
->argv
[2]);
3979 incrRefCount(c
->argv
[2]);
3982 addReply(c
,shared
.ok
);
3985 static void lpushCommand(redisClient
*c
) {
3986 pushGenericCommand(c
,REDIS_HEAD
);
3989 static void rpushCommand(redisClient
*c
) {
3990 pushGenericCommand(c
,REDIS_TAIL
);
3993 static void llenCommand(redisClient
*c
) {
3997 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
3999 addReply(c
,shared
.czero
);
4002 if (o
->type
!= REDIS_LIST
) {
4003 addReply(c
,shared
.wrongtypeerr
);
4006 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",listLength(l
)));
4011 static void lindexCommand(redisClient
*c
) {
4013 int index
= atoi(c
->argv
[2]->ptr
);
4015 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4017 addReply(c
,shared
.nullbulk
);
4019 if (o
->type
!= REDIS_LIST
) {
4020 addReply(c
,shared
.wrongtypeerr
);
4022 list
*list
= o
->ptr
;
4025 ln
= listIndex(list
, index
);
4027 addReply(c
,shared
.nullbulk
);
4029 robj
*ele
= listNodeValue(ln
);
4030 addReplyBulkLen(c
,ele
);
4032 addReply(c
,shared
.crlf
);
4038 static void lsetCommand(redisClient
*c
) {
4040 int index
= atoi(c
->argv
[2]->ptr
);
4042 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4044 addReply(c
,shared
.nokeyerr
);
4046 if (o
->type
!= REDIS_LIST
) {
4047 addReply(c
,shared
.wrongtypeerr
);
4049 list
*list
= o
->ptr
;
4052 ln
= listIndex(list
, index
);
4054 addReply(c
,shared
.outofrangeerr
);
4056 robj
*ele
= listNodeValue(ln
);
4059 listNodeValue(ln
) = c
->argv
[3];
4060 incrRefCount(c
->argv
[3]);
4061 addReply(c
,shared
.ok
);
4068 static void popGenericCommand(redisClient
*c
, int where
) {
4071 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4073 addReply(c
,shared
.nullbulk
);
4075 if (o
->type
!= REDIS_LIST
) {
4076 addReply(c
,shared
.wrongtypeerr
);
4078 list
*list
= o
->ptr
;
4081 if (where
== REDIS_HEAD
)
4082 ln
= listFirst(list
);
4084 ln
= listLast(list
);
4087 addReply(c
,shared
.nullbulk
);
4089 robj
*ele
= listNodeValue(ln
);
4090 addReplyBulkLen(c
,ele
);
4092 addReply(c
,shared
.crlf
);
4093 listDelNode(list
,ln
);
4100 static void lpopCommand(redisClient
*c
) {
4101 popGenericCommand(c
,REDIS_HEAD
);
4104 static void rpopCommand(redisClient
*c
) {
4105 popGenericCommand(c
,REDIS_TAIL
);
4108 static void lrangeCommand(redisClient
*c
) {
4110 int start
= atoi(c
->argv
[2]->ptr
);
4111 int end
= atoi(c
->argv
[3]->ptr
);
4113 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4115 addReply(c
,shared
.nullmultibulk
);
4117 if (o
->type
!= REDIS_LIST
) {
4118 addReply(c
,shared
.wrongtypeerr
);
4120 list
*list
= o
->ptr
;
4122 int llen
= listLength(list
);
4126 /* convert negative indexes */
4127 if (start
< 0) start
= llen
+start
;
4128 if (end
< 0) end
= llen
+end
;
4129 if (start
< 0) start
= 0;
4130 if (end
< 0) end
= 0;
4132 /* indexes sanity checks */
4133 if (start
> end
|| start
>= llen
) {
4134 /* Out of range start or start > end result in empty list */
4135 addReply(c
,shared
.emptymultibulk
);
4138 if (end
>= llen
) end
= llen
-1;
4139 rangelen
= (end
-start
)+1;
4141 /* Return the result in form of a multi-bulk reply */
4142 ln
= listIndex(list
, start
);
4143 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",rangelen
));
4144 for (j
= 0; j
< rangelen
; j
++) {
4145 ele
= listNodeValue(ln
);
4146 addReplyBulkLen(c
,ele
);
4148 addReply(c
,shared
.crlf
);
4155 static void ltrimCommand(redisClient
*c
) {
4157 int start
= atoi(c
->argv
[2]->ptr
);
4158 int end
= atoi(c
->argv
[3]->ptr
);
4160 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4162 addReply(c
,shared
.ok
);
4164 if (o
->type
!= REDIS_LIST
) {
4165 addReply(c
,shared
.wrongtypeerr
);
4167 list
*list
= o
->ptr
;
4169 int llen
= listLength(list
);
4170 int j
, ltrim
, rtrim
;
4172 /* convert negative indexes */
4173 if (start
< 0) start
= llen
+start
;
4174 if (end
< 0) end
= llen
+end
;
4175 if (start
< 0) start
= 0;
4176 if (end
< 0) end
= 0;
4178 /* indexes sanity checks */
4179 if (start
> end
|| start
>= llen
) {
4180 /* Out of range start or start > end result in empty list */
4184 if (end
>= llen
) end
= llen
-1;
4189 /* Remove list elements to perform the trim */
4190 for (j
= 0; j
< ltrim
; j
++) {
4191 ln
= listFirst(list
);
4192 listDelNode(list
,ln
);
4194 for (j
= 0; j
< rtrim
; j
++) {
4195 ln
= listLast(list
);
4196 listDelNode(list
,ln
);
4199 addReply(c
,shared
.ok
);
4204 static void lremCommand(redisClient
*c
) {
4207 o
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4209 addReply(c
,shared
.czero
);
4211 if (o
->type
!= REDIS_LIST
) {
4212 addReply(c
,shared
.wrongtypeerr
);
4214 list
*list
= o
->ptr
;
4215 listNode
*ln
, *next
;
4216 int toremove
= atoi(c
->argv
[2]->ptr
);
4221 toremove
= -toremove
;
4224 ln
= fromtail
? list
->tail
: list
->head
;
4226 robj
*ele
= listNodeValue(ln
);
4228 next
= fromtail
? ln
->prev
: ln
->next
;
4229 if (compareStringObjects(ele
,c
->argv
[3]) == 0) {
4230 listDelNode(list
,ln
);
4233 if (toremove
&& removed
== toremove
) break;
4237 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",removed
));
4242 /* This is the semantic of this command:
4243 * RPOPLPUSH srclist dstlist:
4244 * IF LLEN(srclist) > 0
4245 * element = RPOP srclist
4246 * LPUSH dstlist element
4253 * The idea is to be able to get an element from a list in a reliable way
4254 * since the element is not just returned but pushed against another list
4255 * as well. This command was originally proposed by Ezra Zygmuntowicz.
4257 static void rpoplpushcommand(redisClient
*c
) {
4260 sobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4262 addReply(c
,shared
.nullbulk
);
4264 if (sobj
->type
!= REDIS_LIST
) {
4265 addReply(c
,shared
.wrongtypeerr
);
4267 list
*srclist
= sobj
->ptr
;
4268 listNode
*ln
= listLast(srclist
);
4271 addReply(c
,shared
.nullbulk
);
4273 robj
*dobj
= lookupKeyWrite(c
->db
,c
->argv
[2]);
4274 robj
*ele
= listNodeValue(ln
);
4277 if (dobj
&& dobj
->type
!= REDIS_LIST
) {
4278 addReply(c
,shared
.wrongtypeerr
);
4282 /* Add the element to the target list (unless it's directly
4283 * passed to some BLPOP-ing client */
4284 if (!handleClientsWaitingListPush(c
,c
->argv
[2],ele
)) {
4286 /* Create the list if the key does not exist */
4287 dobj
= createListObject();
4288 dictAdd(c
->db
->dict
,c
->argv
[2],dobj
);
4289 incrRefCount(c
->argv
[2]);
4291 dstlist
= dobj
->ptr
;
4292 listAddNodeHead(dstlist
,ele
);
4296 /* Send the element to the client as reply as well */
4297 addReplyBulkLen(c
,ele
);
4299 addReply(c
,shared
.crlf
);
4301 /* Finally remove the element from the source list */
4302 listDelNode(srclist
,ln
);
4310 /* ==================================== Sets ================================ */
4312 static void saddCommand(redisClient
*c
) {
4315 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4317 set
= createSetObject();
4318 dictAdd(c
->db
->dict
,c
->argv
[1],set
);
4319 incrRefCount(c
->argv
[1]);
4321 if (set
->type
!= REDIS_SET
) {
4322 addReply(c
,shared
.wrongtypeerr
);
4326 if (dictAdd(set
->ptr
,c
->argv
[2],NULL
) == DICT_OK
) {
4327 incrRefCount(c
->argv
[2]);
4329 addReply(c
,shared
.cone
);
4331 addReply(c
,shared
.czero
);
4335 static void sremCommand(redisClient
*c
) {
4338 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4340 addReply(c
,shared
.czero
);
4342 if (set
->type
!= REDIS_SET
) {
4343 addReply(c
,shared
.wrongtypeerr
);
4346 if (dictDelete(set
->ptr
,c
->argv
[2]) == DICT_OK
) {
4348 if (htNeedsResize(set
->ptr
)) dictResize(set
->ptr
);
4349 addReply(c
,shared
.cone
);
4351 addReply(c
,shared
.czero
);
4356 static void smoveCommand(redisClient
*c
) {
4357 robj
*srcset
, *dstset
;
4359 srcset
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4360 dstset
= lookupKeyWrite(c
->db
,c
->argv
[2]);
4362 /* If the source key does not exist return 0, if it's of the wrong type
4364 if (srcset
== NULL
|| srcset
->type
!= REDIS_SET
) {
4365 addReply(c
, srcset
? shared
.wrongtypeerr
: shared
.czero
);
4368 /* Error if the destination key is not a set as well */
4369 if (dstset
&& dstset
->type
!= REDIS_SET
) {
4370 addReply(c
,shared
.wrongtypeerr
);
4373 /* Remove the element from the source set */
4374 if (dictDelete(srcset
->ptr
,c
->argv
[3]) == DICT_ERR
) {
4375 /* Key not found in the src set! return zero */
4376 addReply(c
,shared
.czero
);
4380 /* Add the element to the destination set */
4382 dstset
= createSetObject();
4383 dictAdd(c
->db
->dict
,c
->argv
[2],dstset
);
4384 incrRefCount(c
->argv
[2]);
4386 if (dictAdd(dstset
->ptr
,c
->argv
[3],NULL
) == DICT_OK
)
4387 incrRefCount(c
->argv
[3]);
4388 addReply(c
,shared
.cone
);
4391 static void sismemberCommand(redisClient
*c
) {
4394 set
= lookupKeyRead(c
->db
,c
->argv
[1]);
4396 addReply(c
,shared
.czero
);
4398 if (set
->type
!= REDIS_SET
) {
4399 addReply(c
,shared
.wrongtypeerr
);
4402 if (dictFind(set
->ptr
,c
->argv
[2]))
4403 addReply(c
,shared
.cone
);
4405 addReply(c
,shared
.czero
);
4409 static void scardCommand(redisClient
*c
) {
4413 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
4415 addReply(c
,shared
.czero
);
4418 if (o
->type
!= REDIS_SET
) {
4419 addReply(c
,shared
.wrongtypeerr
);
4422 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",
4428 static void spopCommand(redisClient
*c
) {
4432 set
= lookupKeyWrite(c
->db
,c
->argv
[1]);
4434 addReply(c
,shared
.nullbulk
);
4436 if (set
->type
!= REDIS_SET
) {
4437 addReply(c
,shared
.wrongtypeerr
);
4440 de
= dictGetRandomKey(set
->ptr
);
4442 addReply(c
,shared
.nullbulk
);
4444 robj
*ele
= dictGetEntryKey(de
);
4446 addReplyBulkLen(c
,ele
);
4448 addReply(c
,shared
.crlf
);
4449 dictDelete(set
->ptr
,ele
);
4450 if (htNeedsResize(set
->ptr
)) dictResize(set
->ptr
);
4456 static void srandmemberCommand(redisClient
*c
) {
4460 set
= lookupKeyRead(c
->db
,c
->argv
[1]);
4462 addReply(c
,shared
.nullbulk
);
4464 if (set
->type
!= REDIS_SET
) {
4465 addReply(c
,shared
.wrongtypeerr
);
4468 de
= dictGetRandomKey(set
->ptr
);
4470 addReply(c
,shared
.nullbulk
);
4472 robj
*ele
= dictGetEntryKey(de
);
4474 addReplyBulkLen(c
,ele
);
4476 addReply(c
,shared
.crlf
);
4481 static int qsortCompareSetsByCardinality(const void *s1
, const void *s2
) {
4482 dict
**d1
= (void*) s1
, **d2
= (void*) s2
;
4484 return dictSize(*d1
)-dictSize(*d2
);
4487 static void sinterGenericCommand(redisClient
*c
, robj
**setskeys
, unsigned long setsnum
, robj
*dstkey
) {
4488 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
4491 robj
*lenobj
= NULL
, *dstset
= NULL
;
4492 unsigned long j
, cardinality
= 0;
4494 for (j
= 0; j
< setsnum
; j
++) {
4498 lookupKeyWrite(c
->db
,setskeys
[j
]) :
4499 lookupKeyRead(c
->db
,setskeys
[j
]);
4503 if (deleteKey(c
->db
,dstkey
))
4505 addReply(c
,shared
.czero
);
4507 addReply(c
,shared
.nullmultibulk
);
4511 if (setobj
->type
!= REDIS_SET
) {
4513 addReply(c
,shared
.wrongtypeerr
);
4516 dv
[j
] = setobj
->ptr
;
4518 /* Sort sets from the smallest to largest, this will improve our
4519 * algorithm's performace */
4520 qsort(dv
,setsnum
,sizeof(dict
*),qsortCompareSetsByCardinality
);
4522 /* The first thing we should output is the total number of elements...
4523 * since this is a multi-bulk write, but at this stage we don't know
4524 * the intersection set size, so we use a trick, append an empty object
4525 * to the output list and save the pointer to later modify it with the
4528 lenobj
= createObject(REDIS_STRING
,NULL
);
4530 decrRefCount(lenobj
);
4532 /* If we have a target key where to store the resulting set
4533 * create this key with an empty set inside */
4534 dstset
= createSetObject();
4537 /* Iterate all the elements of the first (smallest) set, and test
4538 * the element against all the other sets, if at least one set does
4539 * not include the element it is discarded */
4540 di
= dictGetIterator(dv
[0]);
4542 while((de
= dictNext(di
)) != NULL
) {
4545 for (j
= 1; j
< setsnum
; j
++)
4546 if (dictFind(dv
[j
],dictGetEntryKey(de
)) == NULL
) break;
4548 continue; /* at least one set does not contain the member */
4549 ele
= dictGetEntryKey(de
);
4551 addReplyBulkLen(c
,ele
);
4553 addReply(c
,shared
.crlf
);
4556 dictAdd(dstset
->ptr
,ele
,NULL
);
4560 dictReleaseIterator(di
);
4563 /* Store the resulting set into the target */
4564 deleteKey(c
->db
,dstkey
);
4565 dictAdd(c
->db
->dict
,dstkey
,dstset
);
4566 incrRefCount(dstkey
);
4570 lenobj
->ptr
= sdscatprintf(sdsempty(),"*%lu\r\n",cardinality
);
4572 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",
4573 dictSize((dict
*)dstset
->ptr
)));
4579 static void sinterCommand(redisClient
*c
) {
4580 sinterGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
);
4583 static void sinterstoreCommand(redisClient
*c
) {
4584 sinterGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1]);
4587 #define REDIS_OP_UNION 0
4588 #define REDIS_OP_DIFF 1
4590 static void sunionDiffGenericCommand(redisClient
*c
, robj
**setskeys
, int setsnum
, robj
*dstkey
, int op
) {
4591 dict
**dv
= zmalloc(sizeof(dict
*)*setsnum
);
4594 robj
*dstset
= NULL
;
4595 int j
, cardinality
= 0;
4597 for (j
= 0; j
< setsnum
; j
++) {
4601 lookupKeyWrite(c
->db
,setskeys
[j
]) :
4602 lookupKeyRead(c
->db
,setskeys
[j
]);
4607 if (setobj
->type
!= REDIS_SET
) {
4609 addReply(c
,shared
.wrongtypeerr
);
4612 dv
[j
] = setobj
->ptr
;
4615 /* We need a temp set object to store our union. If the dstkey
4616 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
4617 * this set object will be the resulting object to set into the target key*/
4618 dstset
= createSetObject();
4620 /* Iterate all the elements of all the sets, add every element a single
4621 * time to the result set */
4622 for (j
= 0; j
< setsnum
; j
++) {
4623 if (op
== REDIS_OP_DIFF
&& j
== 0 && !dv
[j
]) break; /* result set is empty */
4624 if (!dv
[j
]) continue; /* non existing keys are like empty sets */
4626 di
= dictGetIterator(dv
[j
]);
4628 while((de
= dictNext(di
)) != NULL
) {
4631 /* dictAdd will not add the same element multiple times */
4632 ele
= dictGetEntryKey(de
);
4633 if (op
== REDIS_OP_UNION
|| j
== 0) {
4634 if (dictAdd(dstset
->ptr
,ele
,NULL
) == DICT_OK
) {
4638 } else if (op
== REDIS_OP_DIFF
) {
4639 if (dictDelete(dstset
->ptr
,ele
) == DICT_OK
) {
4644 dictReleaseIterator(di
);
4646 if (op
== REDIS_OP_DIFF
&& cardinality
== 0) break; /* result set is empty */
4649 /* Output the content of the resulting set, if not in STORE mode */
4651 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",cardinality
));
4652 di
= dictGetIterator(dstset
->ptr
);
4653 while((de
= dictNext(di
)) != NULL
) {
4656 ele
= dictGetEntryKey(de
);
4657 addReplyBulkLen(c
,ele
);
4659 addReply(c
,shared
.crlf
);
4661 dictReleaseIterator(di
);
4663 /* If we have a target key where to store the resulting set
4664 * create this key with the result set inside */
4665 deleteKey(c
->db
,dstkey
);
4666 dictAdd(c
->db
->dict
,dstkey
,dstset
);
4667 incrRefCount(dstkey
);
4672 decrRefCount(dstset
);
4674 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",
4675 dictSize((dict
*)dstset
->ptr
)));
4681 static void sunionCommand(redisClient
*c
) {
4682 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_UNION
);
4685 static void sunionstoreCommand(redisClient
*c
) {
4686 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_UNION
);
4689 static void sdiffCommand(redisClient
*c
) {
4690 sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_DIFF
);
4693 static void sdiffstoreCommand(redisClient
*c
) {
4694 sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_DIFF
);
4697 /* ==================================== ZSets =============================== */
4699 /* ZSETs are ordered sets using two data structures to hold the same elements
4700 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
4703 * The elements are added to an hash table mapping Redis objects to scores.
4704 * At the same time the elements are added to a skip list mapping scores
4705 * to Redis objects (so objects are sorted by scores in this "view"). */
4707 /* This skiplist implementation is almost a C translation of the original
4708 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
4709 * Alternative to Balanced Trees", modified in three ways:
4710 * a) this implementation allows for repeated values.
4711 * b) the comparison is not just by key (our 'score') but by satellite data.
4712 * c) there is a back pointer, so it's a doubly linked list with the back
4713 * pointers being only at "level 1". This allows to traverse the list
4714 * from tail to head, useful for ZREVRANGE. */
4716 static zskiplistNode
*zslCreateNode(int level
, double score
, robj
*obj
) {
4717 zskiplistNode
*zn
= zmalloc(sizeof(*zn
));
4719 zn
->forward
= zmalloc(sizeof(zskiplistNode
*) * level
);
4725 static zskiplist
*zslCreate(void) {
4729 zsl
= zmalloc(sizeof(*zsl
));
4732 zsl
->header
= zslCreateNode(ZSKIPLIST_MAXLEVEL
,0,NULL
);
4733 for (j
= 0; j
< ZSKIPLIST_MAXLEVEL
; j
++)
4734 zsl
->header
->forward
[j
] = NULL
;
4735 zsl
->header
->backward
= NULL
;
4740 static void zslFreeNode(zskiplistNode
*node
) {
4741 decrRefCount(node
->obj
);
4742 zfree(node
->forward
);
4746 static void zslFree(zskiplist
*zsl
) {
4747 zskiplistNode
*node
= zsl
->header
->forward
[0], *next
;
4749 zfree(zsl
->header
->forward
);
4752 next
= node
->forward
[0];
4759 static int zslRandomLevel(void) {
4761 while ((random()&0xFFFF) < (ZSKIPLIST_P
* 0xFFFF))
4766 static void zslInsert(zskiplist
*zsl
, double score
, robj
*obj
) {
4767 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
4771 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4772 while (x
->forward
[i
] &&
4773 (x
->forward
[i
]->score
< score
||
4774 (x
->forward
[i
]->score
== score
&&
4775 compareStringObjects(x
->forward
[i
]->obj
,obj
) < 0)))
4779 /* we assume the key is not already inside, since we allow duplicated
4780 * scores, and the re-insertion of score and redis object should never
4781 * happpen since the caller of zslInsert() should test in the hash table
4782 * if the element is already inside or not. */
4783 level
= zslRandomLevel();
4784 if (level
> zsl
->level
) {
4785 for (i
= zsl
->level
; i
< level
; i
++)
4786 update
[i
] = zsl
->header
;
4789 x
= zslCreateNode(level
,score
,obj
);
4790 for (i
= 0; i
< level
; i
++) {
4791 x
->forward
[i
] = update
[i
]->forward
[i
];
4792 update
[i
]->forward
[i
] = x
;
4794 x
->backward
= (update
[0] == zsl
->header
) ? NULL
: update
[0];
4796 x
->forward
[0]->backward
= x
;
4802 /* Delete an element with matching score/object from the skiplist. */
4803 static int zslDelete(zskiplist
*zsl
, double score
, robj
*obj
) {
4804 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
4808 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4809 while (x
->forward
[i
] &&
4810 (x
->forward
[i
]->score
< score
||
4811 (x
->forward
[i
]->score
== score
&&
4812 compareStringObjects(x
->forward
[i
]->obj
,obj
) < 0)))
4816 /* We may have multiple elements with the same score, what we need
4817 * is to find the element with both the right score and object. */
4819 if (x
&& score
== x
->score
&& compareStringObjects(x
->obj
,obj
) == 0) {
4820 for (i
= 0; i
< zsl
->level
; i
++) {
4821 if (update
[i
]->forward
[i
] != x
) break;
4822 update
[i
]->forward
[i
] = x
->forward
[i
];
4824 if (x
->forward
[0]) {
4825 x
->forward
[0]->backward
= (x
->backward
== zsl
->header
) ?
4828 zsl
->tail
= x
->backward
;
4831 while(zsl
->level
> 1 && zsl
->header
->forward
[zsl
->level
-1] == NULL
)
4836 return 0; /* not found */
4838 return 0; /* not found */
4841 /* Delete all the elements with score between min and max from the skiplist.
4842 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
4843 * Note that this function takes the reference to the hash table view of the
4844 * sorted set, in order to remove the elements from the hash table too. */
4845 static unsigned long zslDeleteRange(zskiplist
*zsl
, double min
, double max
, dict
*dict
) {
4846 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
4847 unsigned long removed
= 0;
4851 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4852 while (x
->forward
[i
] && x
->forward
[i
]->score
< min
)
4856 /* We may have multiple elements with the same score, what we need
4857 * is to find the element with both the right score and object. */
4859 while (x
&& x
->score
<= max
) {
4860 zskiplistNode
*next
;
4862 for (i
= 0; i
< zsl
->level
; i
++) {
4863 if (update
[i
]->forward
[i
] != x
) break;
4864 update
[i
]->forward
[i
] = x
->forward
[i
];
4866 if (x
->forward
[0]) {
4867 x
->forward
[0]->backward
= (x
->backward
== zsl
->header
) ?
4870 zsl
->tail
= x
->backward
;
4872 next
= x
->forward
[0];
4873 dictDelete(dict
,x
->obj
);
4875 while(zsl
->level
> 1 && zsl
->header
->forward
[zsl
->level
-1] == NULL
)
4881 return removed
; /* not found */
4884 /* Find the first node having a score equal or greater than the specified one.
4885 * Returns NULL if there is no match. */
4886 static zskiplistNode
*zslFirstWithScore(zskiplist
*zsl
, double score
) {
4891 for (i
= zsl
->level
-1; i
>= 0; i
--) {
4892 while (x
->forward
[i
] && x
->forward
[i
]->score
< score
)
4895 /* We may have multiple elements with the same score, what we need
4896 * is to find the element with both the right score and object. */
4897 return x
->forward
[0];
4900 /* The actual Z-commands implementations */
4902 /* This generic command implements both ZADD and ZINCRBY.
4903 * scoreval is the score if the operation is a ZADD (doincrement == 0) or
4904 * the increment if the operation is a ZINCRBY (doincrement == 1). */
4905 static void zaddGenericCommand(redisClient
*c
, robj
*key
, robj
*ele
, double scoreval
, int doincrement
) {
4910 zsetobj
= lookupKeyWrite(c
->db
,key
);
4911 if (zsetobj
== NULL
) {
4912 zsetobj
= createZsetObject();
4913 dictAdd(c
->db
->dict
,key
,zsetobj
);
4916 if (zsetobj
->type
!= REDIS_ZSET
) {
4917 addReply(c
,shared
.wrongtypeerr
);
4923 /* Ok now since we implement both ZADD and ZINCRBY here the code
4924 * needs to handle the two different conditions. It's all about setting
4925 * '*score', that is, the new score to set, to the right value. */
4926 score
= zmalloc(sizeof(double));
4930 /* Read the old score. If the element was not present starts from 0 */
4931 de
= dictFind(zs
->dict
,ele
);
4933 double *oldscore
= dictGetEntryVal(de
);
4934 *score
= *oldscore
+ scoreval
;
4942 /* What follows is a simple remove and re-insert operation that is common
4943 * to both ZADD and ZINCRBY... */
4944 if (dictAdd(zs
->dict
,ele
,score
) == DICT_OK
) {
4945 /* case 1: New element */
4946 incrRefCount(ele
); /* added to hash */
4947 zslInsert(zs
->zsl
,*score
,ele
);
4948 incrRefCount(ele
); /* added to skiplist */
4951 addReplyDouble(c
,*score
);
4953 addReply(c
,shared
.cone
);
4958 /* case 2: Score update operation */
4959 de
= dictFind(zs
->dict
,ele
);
4960 redisAssert(de
!= NULL
);
4961 oldscore
= dictGetEntryVal(de
);
4962 if (*score
!= *oldscore
) {
4965 /* Remove and insert the element in the skip list with new score */
4966 deleted
= zslDelete(zs
->zsl
,*oldscore
,ele
);
4967 redisAssert(deleted
!= 0);
4968 zslInsert(zs
->zsl
,*score
,ele
);
4970 /* Update the score in the hash table */
4971 dictReplace(zs
->dict
,ele
,score
);
4977 addReplyDouble(c
,*score
);
4979 addReply(c
,shared
.czero
);
4983 static void zaddCommand(redisClient
*c
) {
4986 scoreval
= strtod(c
->argv
[2]->ptr
,NULL
);
4987 zaddGenericCommand(c
,c
->argv
[1],c
->argv
[3],scoreval
,0);
4990 static void zincrbyCommand(redisClient
*c
) {
4993 scoreval
= strtod(c
->argv
[2]->ptr
,NULL
);
4994 zaddGenericCommand(c
,c
->argv
[1],c
->argv
[3],scoreval
,1);
4997 static void zremCommand(redisClient
*c
) {
5001 zsetobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
5002 if (zsetobj
== NULL
) {
5003 addReply(c
,shared
.czero
);
5009 if (zsetobj
->type
!= REDIS_ZSET
) {
5010 addReply(c
,shared
.wrongtypeerr
);
5014 de
= dictFind(zs
->dict
,c
->argv
[2]);
5016 addReply(c
,shared
.czero
);
5019 /* Delete from the skiplist */
5020 oldscore
= dictGetEntryVal(de
);
5021 deleted
= zslDelete(zs
->zsl
,*oldscore
,c
->argv
[2]);
5022 redisAssert(deleted
!= 0);
5024 /* Delete from the hash table */
5025 dictDelete(zs
->dict
,c
->argv
[2]);
5026 if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
);
5028 addReply(c
,shared
.cone
);
5032 static void zremrangebyscoreCommand(redisClient
*c
) {
5033 double min
= strtod(c
->argv
[2]->ptr
,NULL
);
5034 double max
= strtod(c
->argv
[3]->ptr
,NULL
);
5038 zsetobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
5039 if (zsetobj
== NULL
) {
5040 addReply(c
,shared
.czero
);
5044 if (zsetobj
->type
!= REDIS_ZSET
) {
5045 addReply(c
,shared
.wrongtypeerr
);
5049 deleted
= zslDeleteRange(zs
->zsl
,min
,max
,zs
->dict
);
5050 if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
);
5051 server
.dirty
+= deleted
;
5052 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",deleted
));
5056 static void zrangeGenericCommand(redisClient
*c
, int reverse
) {
5058 int start
= atoi(c
->argv
[2]->ptr
);
5059 int end
= atoi(c
->argv
[3]->ptr
);
5062 if (c
->argc
== 5 && !strcasecmp(c
->argv
[4]->ptr
,"withscores")) {
5064 } else if (c
->argc
>= 5) {
5065 addReply(c
,shared
.syntaxerr
);
5069 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
5071 addReply(c
,shared
.nullmultibulk
);
5073 if (o
->type
!= REDIS_ZSET
) {
5074 addReply(c
,shared
.wrongtypeerr
);
5076 zset
*zsetobj
= o
->ptr
;
5077 zskiplist
*zsl
= zsetobj
->zsl
;
5080 int llen
= zsl
->length
;
5084 /* convert negative indexes */
5085 if (start
< 0) start
= llen
+start
;
5086 if (end
< 0) end
= llen
+end
;
5087 if (start
< 0) start
= 0;
5088 if (end
< 0) end
= 0;
5090 /* indexes sanity checks */
5091 if (start
> end
|| start
>= llen
) {
5092 /* Out of range start or start > end result in empty list */
5093 addReply(c
,shared
.emptymultibulk
);
5096 if (end
>= llen
) end
= llen
-1;
5097 rangelen
= (end
-start
)+1;
5099 /* Return the result in form of a multi-bulk reply */
5105 ln
= zsl
->header
->forward
[0];
5107 ln
= ln
->forward
[0];
5110 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",
5111 withscores
? (rangelen
*2) : rangelen
));
5112 for (j
= 0; j
< rangelen
; j
++) {
5114 addReplyBulkLen(c
,ele
);
5116 addReply(c
,shared
.crlf
);
5118 addReplyDouble(c
,ln
->score
);
5119 ln
= reverse
? ln
->backward
: ln
->forward
[0];
5125 static void zrangeCommand(redisClient
*c
) {
5126 zrangeGenericCommand(c
,0);
5129 static void zrevrangeCommand(redisClient
*c
) {
5130 zrangeGenericCommand(c
,1);
5133 static void zrangebyscoreCommand(redisClient
*c
) {
5135 double min
= strtod(c
->argv
[2]->ptr
,NULL
);
5136 double max
= strtod(c
->argv
[3]->ptr
,NULL
);
5137 int offset
= 0, limit
= -1;
5139 if (c
->argc
!= 4 && c
->argc
!= 7) {
5141 sdsnew("-ERR wrong number of arguments for ZRANGEBYSCORE\r\n"));
5143 } else if (c
->argc
== 7 && strcasecmp(c
->argv
[4]->ptr
,"limit")) {
5144 addReply(c
,shared
.syntaxerr
);
5146 } else if (c
->argc
== 7) {
5147 offset
= atoi(c
->argv
[5]->ptr
);
5148 limit
= atoi(c
->argv
[6]->ptr
);
5149 if (offset
< 0) offset
= 0;
5152 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
5154 addReply(c
,shared
.nullmultibulk
);
5156 if (o
->type
!= REDIS_ZSET
) {
5157 addReply(c
,shared
.wrongtypeerr
);
5159 zset
*zsetobj
= o
->ptr
;
5160 zskiplist
*zsl
= zsetobj
->zsl
;
5163 unsigned int rangelen
= 0;
5165 /* Get the first node with the score >= min */
5166 ln
= zslFirstWithScore(zsl
,min
);
5168 /* No element matching the speciifed interval */
5169 addReply(c
,shared
.emptymultibulk
);
5173 /* We don't know in advance how many matching elements there
5174 * are in the list, so we push this object that will represent
5175 * the multi-bulk length in the output buffer, and will "fix"
5177 lenobj
= createObject(REDIS_STRING
,NULL
);
5179 decrRefCount(lenobj
);
5181 while(ln
&& ln
->score
<= max
) {
5184 ln
= ln
->forward
[0];
5187 if (limit
== 0) break;
5189 addReplyBulkLen(c
,ele
);
5191 addReply(c
,shared
.crlf
);
5192 ln
= ln
->forward
[0];
5194 if (limit
> 0) limit
--;
5196 lenobj
->ptr
= sdscatprintf(sdsempty(),"*%d\r\n",rangelen
);
5201 static void zcardCommand(redisClient
*c
) {
5205 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
5207 addReply(c
,shared
.czero
);
5210 if (o
->type
!= REDIS_ZSET
) {
5211 addReply(c
,shared
.wrongtypeerr
);
5214 addReplySds(c
,sdscatprintf(sdsempty(),":%lu\r\n",zs
->zsl
->length
));
5219 static void zscoreCommand(redisClient
*c
) {
5223 o
= lookupKeyRead(c
->db
,c
->argv
[1]);
5225 addReply(c
,shared
.nullbulk
);
5228 if (o
->type
!= REDIS_ZSET
) {
5229 addReply(c
,shared
.wrongtypeerr
);
5234 de
= dictFind(zs
->dict
,c
->argv
[2]);
5236 addReply(c
,shared
.nullbulk
);
5238 double *score
= dictGetEntryVal(de
);
5240 addReplyDouble(c
,*score
);
5246 /* ========================= Non type-specific commands ==================== */
5248 static void flushdbCommand(redisClient
*c
) {
5249 server
.dirty
+= dictSize(c
->db
->dict
);
5250 dictEmpty(c
->db
->dict
);
5251 dictEmpty(c
->db
->expires
);
5252 addReply(c
,shared
.ok
);
5255 static void flushallCommand(redisClient
*c
) {
5256 server
.dirty
+= emptyDb();
5257 addReply(c
,shared
.ok
);
5258 rdbSave(server
.dbfilename
);
5262 static redisSortOperation
*createSortOperation(int type
, robj
*pattern
) {
5263 redisSortOperation
*so
= zmalloc(sizeof(*so
));
5265 so
->pattern
= pattern
;
5269 /* Return the value associated to the key with a name obtained
5270 * substituting the first occurence of '*' in 'pattern' with 'subst' */
5271 static robj
*lookupKeyByPattern(redisDb
*db
, robj
*pattern
, robj
*subst
) {
5275 int prefixlen
, sublen
, postfixlen
;
5276 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
5280 char buf
[REDIS_SORTKEY_MAX
+1];
5283 /* If the pattern is "#" return the substitution object itself in order
5284 * to implement the "SORT ... GET #" feature. */
5285 spat
= pattern
->ptr
;
5286 if (spat
[0] == '#' && spat
[1] == '\0') {
5290 /* The substitution object may be specially encoded. If so we create
5291 * a decoded object on the fly. Otherwise getDecodedObject will just
5292 * increment the ref count, that we'll decrement later. */
5293 subst
= getDecodedObject(subst
);
5296 if (sdslen(spat
)+sdslen(ssub
)-1 > REDIS_SORTKEY_MAX
) return NULL
;
5297 p
= strchr(spat
,'*');
5299 decrRefCount(subst
);
5304 sublen
= sdslen(ssub
);
5305 postfixlen
= sdslen(spat
)-(prefixlen
+1);
5306 memcpy(keyname
.buf
,spat
,prefixlen
);
5307 memcpy(keyname
.buf
+prefixlen
,ssub
,sublen
);
5308 memcpy(keyname
.buf
+prefixlen
+sublen
,p
+1,postfixlen
);
5309 keyname
.buf
[prefixlen
+sublen
+postfixlen
] = '\0';
5310 keyname
.len
= prefixlen
+sublen
+postfixlen
;
5312 initStaticStringObject(keyobj
,((char*)&keyname
)+(sizeof(long)*2))
5313 decrRefCount(subst
);
5315 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
5316 return lookupKeyRead(db
,&keyobj
);
5319 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
5320 * the additional parameter is not standard but a BSD-specific we have to
5321 * pass sorting parameters via the global 'server' structure */
5322 static int sortCompare(const void *s1
, const void *s2
) {
5323 const redisSortObject
*so1
= s1
, *so2
= s2
;
5326 if (!server
.sort_alpha
) {
5327 /* Numeric sorting. Here it's trivial as we precomputed scores */
5328 if (so1
->u
.score
> so2
->u
.score
) {
5330 } else if (so1
->u
.score
< so2
->u
.score
) {
5336 /* Alphanumeric sorting */
5337 if (server
.sort_bypattern
) {
5338 if (!so1
->u
.cmpobj
|| !so2
->u
.cmpobj
) {
5339 /* At least one compare object is NULL */
5340 if (so1
->u
.cmpobj
== so2
->u
.cmpobj
)
5342 else if (so1
->u
.cmpobj
== NULL
)
5347 /* We have both the objects, use strcoll */
5348 cmp
= strcoll(so1
->u
.cmpobj
->ptr
,so2
->u
.cmpobj
->ptr
);
5351 /* Compare elements directly */
5354 dec1
= getDecodedObject(so1
->obj
);
5355 dec2
= getDecodedObject(so2
->obj
);
5356 cmp
= strcoll(dec1
->ptr
,dec2
->ptr
);
5361 return server
.sort_desc
? -cmp
: cmp
;
5364 /* The SORT command is the most complex command in Redis. Warning: this code
5365 * is optimized for speed and a bit less for readability */
5366 static void sortCommand(redisClient
*c
) {
5369 int desc
= 0, alpha
= 0;
5370 int limit_start
= 0, limit_count
= -1, start
, end
;
5371 int j
, dontsort
= 0, vectorlen
;
5372 int getop
= 0; /* GET operation counter */
5373 robj
*sortval
, *sortby
= NULL
, *storekey
= NULL
;
5374 redisSortObject
*vector
; /* Resulting vector to sort */
5376 /* Lookup the key to sort. It must be of the right types */
5377 sortval
= lookupKeyRead(c
->db
,c
->argv
[1]);
5378 if (sortval
== NULL
) {
5379 addReply(c
,shared
.nullmultibulk
);
5382 if (sortval
->type
!= REDIS_SET
&& sortval
->type
!= REDIS_LIST
&&
5383 sortval
->type
!= REDIS_ZSET
)
5385 addReply(c
,shared
.wrongtypeerr
);
5389 /* Create a list of operations to perform for every sorted element.
5390 * Operations can be GET/DEL/INCR/DECR */
5391 operations
= listCreate();
5392 listSetFreeMethod(operations
,zfree
);
5395 /* Now we need to protect sortval incrementing its count, in the future
5396 * SORT may have options able to overwrite/delete keys during the sorting
5397 * and the sorted key itself may get destroied */
5398 incrRefCount(sortval
);
5400 /* The SORT command has an SQL-alike syntax, parse it */
5401 while(j
< c
->argc
) {
5402 int leftargs
= c
->argc
-j
-1;
5403 if (!strcasecmp(c
->argv
[j
]->ptr
,"asc")) {
5405 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"desc")) {
5407 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"alpha")) {
5409 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"limit") && leftargs
>= 2) {
5410 limit_start
= atoi(c
->argv
[j
+1]->ptr
);
5411 limit_count
= atoi(c
->argv
[j
+2]->ptr
);
5413 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"store") && leftargs
>= 1) {
5414 storekey
= c
->argv
[j
+1];
5416 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"by") && leftargs
>= 1) {
5417 sortby
= c
->argv
[j
+1];
5418 /* If the BY pattern does not contain '*', i.e. it is constant,
5419 * we don't need to sort nor to lookup the weight keys. */
5420 if (strchr(c
->argv
[j
+1]->ptr
,'*') == NULL
) dontsort
= 1;
5422 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"get") && leftargs
>= 1) {
5423 listAddNodeTail(operations
,createSortOperation(
5424 REDIS_SORT_GET
,c
->argv
[j
+1]));
5428 decrRefCount(sortval
);
5429 listRelease(operations
);
5430 addReply(c
,shared
.syntaxerr
);
5436 /* Load the sorting vector with all the objects to sort */
5437 switch(sortval
->type
) {
5438 case REDIS_LIST
: vectorlen
= listLength((list
*)sortval
->ptr
); break;
5439 case REDIS_SET
: vectorlen
= dictSize((dict
*)sortval
->ptr
); break;
5440 case REDIS_ZSET
: vectorlen
= dictSize(((zset
*)sortval
->ptr
)->dict
); break;
5441 default: vectorlen
= 0; redisAssert(0); /* Avoid GCC warning */
5443 vector
= zmalloc(sizeof(redisSortObject
)*vectorlen
);
5446 if (sortval
->type
== REDIS_LIST
) {
5447 list
*list
= sortval
->ptr
;
5451 listRewind(list
,&li
);
5452 while((ln
= listNext(&li
))) {
5453 robj
*ele
= ln
->value
;
5454 vector
[j
].obj
= ele
;
5455 vector
[j
].u
.score
= 0;
5456 vector
[j
].u
.cmpobj
= NULL
;
5464 if (sortval
->type
== REDIS_SET
) {
5467 zset
*zs
= sortval
->ptr
;
5471 di
= dictGetIterator(set
);
5472 while((setele
= dictNext(di
)) != NULL
) {
5473 vector
[j
].obj
= dictGetEntryKey(setele
);
5474 vector
[j
].u
.score
= 0;
5475 vector
[j
].u
.cmpobj
= NULL
;
5478 dictReleaseIterator(di
);
5480 redisAssert(j
== vectorlen
);
5482 /* Now it's time to load the right scores in the sorting vector */
5483 if (dontsort
== 0) {
5484 for (j
= 0; j
< vectorlen
; j
++) {
5488 byval
= lookupKeyByPattern(c
->db
,sortby
,vector
[j
].obj
);
5489 if (!byval
|| byval
->type
!= REDIS_STRING
) continue;
5491 vector
[j
].u
.cmpobj
= getDecodedObject(byval
);
5493 if (byval
->encoding
== REDIS_ENCODING_RAW
) {
5494 vector
[j
].u
.score
= strtod(byval
->ptr
,NULL
);
5496 /* Don't need to decode the object if it's
5497 * integer-encoded (the only encoding supported) so
5498 * far. We can just cast it */
5499 if (byval
->encoding
== REDIS_ENCODING_INT
) {
5500 vector
[j
].u
.score
= (long)byval
->ptr
;
5502 redisAssert(1 != 1);
5507 if (vector
[j
].obj
->encoding
== REDIS_ENCODING_RAW
)
5508 vector
[j
].u
.score
= strtod(vector
[j
].obj
->ptr
,NULL
);
5510 if (vector
[j
].obj
->encoding
== REDIS_ENCODING_INT
)
5511 vector
[j
].u
.score
= (long) vector
[j
].obj
->ptr
;
5513 redisAssert(1 != 1);
5520 /* We are ready to sort the vector... perform a bit of sanity check
5521 * on the LIMIT option too. We'll use a partial version of quicksort. */
5522 start
= (limit_start
< 0) ? 0 : limit_start
;
5523 end
= (limit_count
< 0) ? vectorlen
-1 : start
+limit_count
-1;
5524 if (start
>= vectorlen
) {
5525 start
= vectorlen
-1;
5528 if (end
>= vectorlen
) end
= vectorlen
-1;
5530 if (dontsort
== 0) {
5531 server
.sort_desc
= desc
;
5532 server
.sort_alpha
= alpha
;
5533 server
.sort_bypattern
= sortby
? 1 : 0;
5534 if (sortby
&& (start
!= 0 || end
!= vectorlen
-1))
5535 pqsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
, start
,end
);
5537 qsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
);
5540 /* Send command output to the output buffer, performing the specified
5541 * GET/DEL/INCR/DECR operations if any. */
5542 outputlen
= getop
? getop
*(end
-start
+1) : end
-start
+1;
5543 if (storekey
== NULL
) {
5544 /* STORE option not specified, sent the sorting result to client */
5545 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",outputlen
));
5546 for (j
= start
; j
<= end
; j
++) {
5551 addReplyBulkLen(c
,vector
[j
].obj
);
5552 addReply(c
,vector
[j
].obj
);
5553 addReply(c
,shared
.crlf
);
5555 listRewind(operations
,&li
);
5556 while((ln
= listNext(&li
))) {
5557 redisSortOperation
*sop
= ln
->value
;
5558 robj
*val
= lookupKeyByPattern(c
->db
,sop
->pattern
,
5561 if (sop
->type
== REDIS_SORT_GET
) {
5562 if (!val
|| val
->type
!= REDIS_STRING
) {
5563 addReply(c
,shared
.nullbulk
);
5565 addReplyBulkLen(c
,val
);
5567 addReply(c
,shared
.crlf
);
5570 redisAssert(sop
->type
== REDIS_SORT_GET
); /* always fails */
5575 robj
*listObject
= createListObject();
5576 list
*listPtr
= (list
*) listObject
->ptr
;
5578 /* STORE option specified, set the sorting result as a List object */
5579 for (j
= start
; j
<= end
; j
++) {
5584 listAddNodeTail(listPtr
,vector
[j
].obj
);
5585 incrRefCount(vector
[j
].obj
);
5587 listRewind(operations
,&li
);
5588 while((ln
= listNext(&li
))) {
5589 redisSortOperation
*sop
= ln
->value
;
5590 robj
*val
= lookupKeyByPattern(c
->db
,sop
->pattern
,
5593 if (sop
->type
== REDIS_SORT_GET
) {
5594 if (!val
|| val
->type
!= REDIS_STRING
) {
5595 listAddNodeTail(listPtr
,createStringObject("",0));
5597 listAddNodeTail(listPtr
,val
);
5601 redisAssert(sop
->type
== REDIS_SORT_GET
); /* always fails */
5605 if (dictReplace(c
->db
->dict
,storekey
,listObject
)) {
5606 incrRefCount(storekey
);
5608 /* Note: we add 1 because the DB is dirty anyway since even if the
5609 * SORT result is empty a new key is set and maybe the old content
5611 server
.dirty
+= 1+outputlen
;
5612 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",outputlen
));
5616 decrRefCount(sortval
);
5617 listRelease(operations
);
5618 for (j
= 0; j
< vectorlen
; j
++) {
5619 if (sortby
&& alpha
&& vector
[j
].u
.cmpobj
)
5620 decrRefCount(vector
[j
].u
.cmpobj
);
5625 /* Convert an amount of bytes into a human readable string in the form
5626 * of 100B, 2G, 100M, 4K, and so forth. */
5627 static void bytesToHuman(char *s
, unsigned long long n
) {
5632 sprintf(s
,"%lluB",n
);
5634 } else if (n
< (1024*1024)) {
5635 d
= (double)n
/(1024);
5636 sprintf(s
,"%.2fK",d
);
5637 } else if (n
< (1024LL*1024*1024)) {
5638 d
= (double)n
/(1024*1024);
5639 sprintf(s
,"%.2fM",d
);
5640 } else if (n
< (1024LL*1024*1024*1024)) {
5641 d
= (double)n
/(1024LL*1024*1024);
5642 sprintf(s
,"%.2fG",d
);
5646 /* Create the string returned by the INFO command. This is decoupled
5647 * by the INFO command itself as we need to report the same information
5648 * on memory corruption problems. */
5649 static sds
genRedisInfoString(void) {
5651 time_t uptime
= time(NULL
)-server
.stat_starttime
;
5655 bytesToHuman(hmem
,zmalloc_used_memory());
5656 info
= sdscatprintf(sdsempty(),
5657 "redis_version:%s\r\n"
5659 "multiplexing_api:%s\r\n"
5660 "process_id:%ld\r\n"
5661 "uptime_in_seconds:%ld\r\n"
5662 "uptime_in_days:%ld\r\n"
5663 "connected_clients:%d\r\n"
5664 "connected_slaves:%d\r\n"
5665 "blocked_clients:%d\r\n"
5666 "used_memory:%zu\r\n"
5667 "used_memory_human:%s\r\n"
5668 "changes_since_last_save:%lld\r\n"
5669 "bgsave_in_progress:%d\r\n"
5670 "last_save_time:%ld\r\n"
5671 "bgrewriteaof_in_progress:%d\r\n"
5672 "total_connections_received:%lld\r\n"
5673 "total_commands_processed:%lld\r\n"
5677 (sizeof(long) == 8) ? "64" : "32",
5682 listLength(server
.clients
)-listLength(server
.slaves
),
5683 listLength(server
.slaves
),
5684 server
.blpop_blocked_clients
,
5685 zmalloc_used_memory(),
5688 server
.bgsavechildpid
!= -1,
5690 server
.bgrewritechildpid
!= -1,
5691 server
.stat_numconnections
,
5692 server
.stat_numcommands
,
5693 server
.vm_enabled
!= 0,
5694 server
.masterhost
== NULL
? "master" : "slave"
5696 if (server
.masterhost
) {
5697 info
= sdscatprintf(info
,
5698 "master_host:%s\r\n"
5699 "master_port:%d\r\n"
5700 "master_link_status:%s\r\n"
5701 "master_last_io_seconds_ago:%d\r\n"
5704 (server
.replstate
== REDIS_REPL_CONNECTED
) ?
5706 server
.master
? ((int)(time(NULL
)-server
.master
->lastinteraction
)) : -1
5709 if (server
.vm_enabled
) {
5711 info
= sdscatprintf(info
,
5712 "vm_conf_max_memory:%llu\r\n"
5713 "vm_conf_page_size:%llu\r\n"
5714 "vm_conf_pages:%llu\r\n"
5715 "vm_stats_used_pages:%llu\r\n"
5716 "vm_stats_swapped_objects:%llu\r\n"
5717 "vm_stats_swappin_count:%llu\r\n"
5718 "vm_stats_swappout_count:%llu\r\n"
5719 "vm_stats_io_newjobs_len:%lu\r\n"
5720 "vm_stats_io_processing_len:%lu\r\n"
5721 "vm_stats_io_processed_len:%lu\r\n"
5722 "vm_stats_io_active_threads:%lu\r\n"
5723 "vm_stats_blocked_clients:%lu\r\n"
5724 ,(unsigned long long) server
.vm_max_memory
,
5725 (unsigned long long) server
.vm_page_size
,
5726 (unsigned long long) server
.vm_pages
,
5727 (unsigned long long) server
.vm_stats_used_pages
,
5728 (unsigned long long) server
.vm_stats_swapped_objects
,
5729 (unsigned long long) server
.vm_stats_swapins
,
5730 (unsigned long long) server
.vm_stats_swapouts
,
5731 (unsigned long) listLength(server
.io_newjobs
),
5732 (unsigned long) listLength(server
.io_processing
),
5733 (unsigned long) listLength(server
.io_processed
),
5734 (unsigned long) server
.io_active_threads
,
5735 (unsigned long) server
.vm_blocked_clients
5739 for (j
= 0; j
< server
.dbnum
; j
++) {
5740 long long keys
, vkeys
;
5742 keys
= dictSize(server
.db
[j
].dict
);
5743 vkeys
= dictSize(server
.db
[j
].expires
);
5744 if (keys
|| vkeys
) {
5745 info
= sdscatprintf(info
, "db%d:keys=%lld,expires=%lld\r\n",
5752 static void infoCommand(redisClient
*c
) {
5753 sds info
= genRedisInfoString();
5754 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n",
5755 (unsigned long)sdslen(info
)));
5756 addReplySds(c
,info
);
5757 addReply(c
,shared
.crlf
);
5760 static void monitorCommand(redisClient
*c
) {
5761 /* ignore MONITOR if aleady slave or in monitor mode */
5762 if (c
->flags
& REDIS_SLAVE
) return;
5764 c
->flags
|= (REDIS_SLAVE
|REDIS_MONITOR
);
5766 listAddNodeTail(server
.monitors
,c
);
5767 addReply(c
,shared
.ok
);
5770 /* ================================= Expire ================================= */
5771 static int removeExpire(redisDb
*db
, robj
*key
) {
5772 if (dictDelete(db
->expires
,key
) == DICT_OK
) {
5779 static int setExpire(redisDb
*db
, robj
*key
, time_t when
) {
5780 if (dictAdd(db
->expires
,key
,(void*)when
) == DICT_ERR
) {
5788 /* Return the expire time of the specified key, or -1 if no expire
5789 * is associated with this key (i.e. the key is non volatile) */
5790 static time_t getExpire(redisDb
*db
, robj
*key
) {
5793 /* No expire? return ASAP */
5794 if (dictSize(db
->expires
) == 0 ||
5795 (de
= dictFind(db
->expires
,key
)) == NULL
) return -1;
5797 return (time_t) dictGetEntryVal(de
);
5800 static int expireIfNeeded(redisDb
*db
, robj
*key
) {
5804 /* No expire? return ASAP */
5805 if (dictSize(db
->expires
) == 0 ||
5806 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
5808 /* Lookup the expire */
5809 when
= (time_t) dictGetEntryVal(de
);
5810 if (time(NULL
) <= when
) return 0;
5812 /* Delete the key */
5813 dictDelete(db
->expires
,key
);
5814 return dictDelete(db
->dict
,key
) == DICT_OK
;
5817 static int deleteIfVolatile(redisDb
*db
, robj
*key
) {
5820 /* No expire? return ASAP */
5821 if (dictSize(db
->expires
) == 0 ||
5822 (de
= dictFind(db
->expires
,key
)) == NULL
) return 0;
5824 /* Delete the key */
5826 dictDelete(db
->expires
,key
);
5827 return dictDelete(db
->dict
,key
) == DICT_OK
;
5830 static void expireGenericCommand(redisClient
*c
, robj
*key
, time_t seconds
) {
5833 de
= dictFind(c
->db
->dict
,key
);
5835 addReply(c
,shared
.czero
);
5839 if (deleteKey(c
->db
,key
)) server
.dirty
++;
5840 addReply(c
, shared
.cone
);
5843 time_t when
= time(NULL
)+seconds
;
5844 if (setExpire(c
->db
,key
,when
)) {
5845 addReply(c
,shared
.cone
);
5848 addReply(c
,shared
.czero
);
5854 static void expireCommand(redisClient
*c
) {
5855 expireGenericCommand(c
,c
->argv
[1],strtol(c
->argv
[2]->ptr
,NULL
,10));
5858 static void expireatCommand(redisClient
*c
) {
5859 expireGenericCommand(c
,c
->argv
[1],strtol(c
->argv
[2]->ptr
,NULL
,10)-time(NULL
));
5862 static void ttlCommand(redisClient
*c
) {
5866 expire
= getExpire(c
->db
,c
->argv
[1]);
5868 ttl
= (int) (expire
-time(NULL
));
5869 if (ttl
< 0) ttl
= -1;
5871 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",ttl
));
5874 /* ================================ MULTI/EXEC ============================== */
5876 /* Client state initialization for MULTI/EXEC */
5877 static void initClientMultiState(redisClient
*c
) {
5878 c
->mstate
.commands
= NULL
;
5879 c
->mstate
.count
= 0;
5882 /* Release all the resources associated with MULTI/EXEC state */
5883 static void freeClientMultiState(redisClient
*c
) {
5886 for (j
= 0; j
< c
->mstate
.count
; j
++) {
5888 multiCmd
*mc
= c
->mstate
.commands
+j
;
5890 for (i
= 0; i
< mc
->argc
; i
++)
5891 decrRefCount(mc
->argv
[i
]);
5894 zfree(c
->mstate
.commands
);
5897 /* Add a new command into the MULTI commands queue */
5898 static void queueMultiCommand(redisClient
*c
, struct redisCommand
*cmd
) {
5902 c
->mstate
.commands
= zrealloc(c
->mstate
.commands
,
5903 sizeof(multiCmd
)*(c
->mstate
.count
+1));
5904 mc
= c
->mstate
.commands
+c
->mstate
.count
;
5907 mc
->argv
= zmalloc(sizeof(robj
*)*c
->argc
);
5908 memcpy(mc
->argv
,c
->argv
,sizeof(robj
*)*c
->argc
);
5909 for (j
= 0; j
< c
->argc
; j
++)
5910 incrRefCount(mc
->argv
[j
]);
5914 static void multiCommand(redisClient
*c
) {
5915 c
->flags
|= REDIS_MULTI
;
5916 addReply(c
,shared
.ok
);
5919 static void execCommand(redisClient
*c
) {
5924 if (!(c
->flags
& REDIS_MULTI
)) {
5925 addReplySds(c
,sdsnew("-ERR EXEC without MULTI\r\n"));
5929 orig_argv
= c
->argv
;
5930 orig_argc
= c
->argc
;
5931 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",c
->mstate
.count
));
5932 for (j
= 0; j
< c
->mstate
.count
; j
++) {
5933 c
->argc
= c
->mstate
.commands
[j
].argc
;
5934 c
->argv
= c
->mstate
.commands
[j
].argv
;
5935 call(c
,c
->mstate
.commands
[j
].cmd
);
5937 c
->argv
= orig_argv
;
5938 c
->argc
= orig_argc
;
5939 freeClientMultiState(c
);
5940 initClientMultiState(c
);
5941 c
->flags
&= (~REDIS_MULTI
);
5944 /* =========================== Blocking Operations ========================= */
5946 /* Currently Redis blocking operations support is limited to list POP ops,
5947 * so the current implementation is not fully generic, but it is also not
5948 * completely specific so it will not require a rewrite to support new
5949 * kind of blocking operations in the future.
5951 * Still it's important to note that list blocking operations can be already
5952 * used as a notification mechanism in order to implement other blocking
5953 * operations at application level, so there must be a very strong evidence
5954 * of usefulness and generality before new blocking operations are implemented.
5956 * This is how the current blocking POP works, we use BLPOP as example:
5957 * - If the user calls BLPOP and the key exists and contains a non empty list
5958 * then LPOP is called instead. So BLPOP is semantically the same as LPOP
5959 * if there is not to block.
5960 * - If instead BLPOP is called and the key does not exists or the list is
5961 * empty we need to block. In order to do so we remove the notification for
5962 * new data to read in the client socket (so that we'll not serve new
5963 * requests if the blocking request is not served). Also we put the client
5964 * in a dictionary (db->blockingkeys) mapping keys to a list of clients
5965 * blocking for this keys.
5966 * - If a PUSH operation against a key with blocked clients waiting is
5967 * performed, we serve the first in the list: basically instead to push
5968 * the new element inside the list we return it to the (first / oldest)
5969 * blocking client, unblock the client, and remove it form the list.
5971 * The above comment and the source code should be enough in order to understand
5972 * the implementation and modify / fix it later.
5975 /* Set a client in blocking mode for the specified key, with the specified
5977 static void blockForKeys(redisClient
*c
, robj
**keys
, int numkeys
, time_t timeout
) {
5982 c
->blockingkeys
= zmalloc(sizeof(robj
*)*numkeys
);
5983 c
->blockingkeysnum
= numkeys
;
5984 c
->blockingto
= timeout
;
5985 for (j
= 0; j
< numkeys
; j
++) {
5986 /* Add the key in the client structure, to map clients -> keys */
5987 c
->blockingkeys
[j
] = keys
[j
];
5988 incrRefCount(keys
[j
]);
5990 /* And in the other "side", to map keys -> clients */
5991 de
= dictFind(c
->db
->blockingkeys
,keys
[j
]);
5995 /* For every key we take a list of clients blocked for it */
5997 retval
= dictAdd(c
->db
->blockingkeys
,keys
[j
],l
);
5998 incrRefCount(keys
[j
]);
5999 assert(retval
== DICT_OK
);
6001 l
= dictGetEntryVal(de
);
6003 listAddNodeTail(l
,c
);
6005 /* Mark the client as a blocked client */
6006 c
->flags
|= REDIS_BLOCKED
;
6007 aeDeleteFileEvent(server
.el
,c
->fd
,AE_READABLE
);
6008 server
.blpop_blocked_clients
++;
6011 /* Unblock a client that's waiting in a blocking operation such as BLPOP */
6012 static void unblockClientWaitingData(redisClient
*c
) {
6017 assert(c
->blockingkeys
!= NULL
);
6018 /* The client may wait for multiple keys, so unblock it for every key. */
6019 for (j
= 0; j
< c
->blockingkeysnum
; j
++) {
6020 /* Remove this client from the list of clients waiting for this key. */
6021 de
= dictFind(c
->db
->blockingkeys
,c
->blockingkeys
[j
]);
6023 l
= dictGetEntryVal(de
);
6024 listDelNode(l
,listSearchKey(l
,c
));
6025 /* If the list is empty we need to remove it to avoid wasting memory */
6026 if (listLength(l
) == 0)
6027 dictDelete(c
->db
->blockingkeys
,c
->blockingkeys
[j
]);
6028 decrRefCount(c
->blockingkeys
[j
]);
6030 /* Cleanup the client structure */
6031 zfree(c
->blockingkeys
);
6032 c
->blockingkeys
= NULL
;
6033 c
->flags
&= (~REDIS_BLOCKED
);
6034 server
.blpop_blocked_clients
--;
6035 /* Ok now we are ready to get read events from socket, note that we
6036 * can't trap errors here as it's possible that unblockClientWaitingDatas() is
6037 * called from freeClient() itself, and the only thing we can do
6038 * if we failed to register the READABLE event is to kill the client.
6039 * Still the following function should never fail in the real world as
6040 * we are sure the file descriptor is sane, and we exit on out of mem. */
6041 aeCreateFileEvent(server
.el
, c
->fd
, AE_READABLE
, readQueryFromClient
, c
);
6042 /* As a final step we want to process data if there is some command waiting
6043 * in the input buffer. Note that this is safe even if
6044 * unblockClientWaitingData() gets called from freeClient() because
6045 * freeClient() will be smart enough to call this function
6046 * *after* c->querybuf was set to NULL. */
6047 if (c
->querybuf
&& sdslen(c
->querybuf
) > 0) processInputBuffer(c
);
6050 /* This should be called from any function PUSHing into lists.
6051 * 'c' is the "pushing client", 'key' is the key it is pushing data against,
6052 * 'ele' is the element pushed.
6054 * If the function returns 0 there was no client waiting for a list push
6057 * If the function returns 1 there was a client waiting for a list push
6058 * against this key, the element was passed to this client thus it's not
6059 * needed to actually add it to the list and the caller should return asap. */
6060 static int handleClientsWaitingListPush(redisClient
*c
, robj
*key
, robj
*ele
) {
6061 struct dictEntry
*de
;
6062 redisClient
*receiver
;
6066 de
= dictFind(c
->db
->blockingkeys
,key
);
6067 if (de
== NULL
) return 0;
6068 l
= dictGetEntryVal(de
);
6071 receiver
= ln
->value
;
6073 addReplySds(receiver
,sdsnew("*2\r\n"));
6074 addReplyBulkLen(receiver
,key
);
6075 addReply(receiver
,key
);
6076 addReply(receiver
,shared
.crlf
);
6077 addReplyBulkLen(receiver
,ele
);
6078 addReply(receiver
,ele
);
6079 addReply(receiver
,shared
.crlf
);
6080 unblockClientWaitingData(receiver
);
6084 /* Blocking RPOP/LPOP */
6085 static void blockingPopGenericCommand(redisClient
*c
, int where
) {
6090 for (j
= 1; j
< c
->argc
-1; j
++) {
6091 o
= lookupKeyWrite(c
->db
,c
->argv
[j
]);
6093 if (o
->type
!= REDIS_LIST
) {
6094 addReply(c
,shared
.wrongtypeerr
);
6097 list
*list
= o
->ptr
;
6098 if (listLength(list
) != 0) {
6099 /* If the list contains elements fall back to the usual
6100 * non-blocking POP operation */
6101 robj
*argv
[2], **orig_argv
;
6104 /* We need to alter the command arguments before to call
6105 * popGenericCommand() as the command takes a single key. */
6106 orig_argv
= c
->argv
;
6107 orig_argc
= c
->argc
;
6108 argv
[1] = c
->argv
[j
];
6112 /* Also the return value is different, we need to output
6113 * the multi bulk reply header and the key name. The
6114 * "real" command will add the last element (the value)
6115 * for us. If this souds like an hack to you it's just
6116 * because it is... */
6117 addReplySds(c
,sdsnew("*2\r\n"));
6118 addReplyBulkLen(c
,argv
[1]);
6119 addReply(c
,argv
[1]);
6120 addReply(c
,shared
.crlf
);
6121 popGenericCommand(c
,where
);
6123 /* Fix the client structure with the original stuff */
6124 c
->argv
= orig_argv
;
6125 c
->argc
= orig_argc
;
6131 /* If the list is empty or the key does not exists we must block */
6132 timeout
= strtol(c
->argv
[c
->argc
-1]->ptr
,NULL
,10);
6133 if (timeout
> 0) timeout
+= time(NULL
);
6134 blockForKeys(c
,c
->argv
+1,c
->argc
-2,timeout
);
6137 static void blpopCommand(redisClient
*c
) {
6138 blockingPopGenericCommand(c
,REDIS_HEAD
);
6141 static void brpopCommand(redisClient
*c
) {
6142 blockingPopGenericCommand(c
,REDIS_TAIL
);
6145 /* =============================== Replication ============================= */
6147 static int syncWrite(int fd
, char *ptr
, ssize_t size
, int timeout
) {
6148 ssize_t nwritten
, ret
= size
;
6149 time_t start
= time(NULL
);
6153 if (aeWait(fd
,AE_WRITABLE
,1000) & AE_WRITABLE
) {
6154 nwritten
= write(fd
,ptr
,size
);
6155 if (nwritten
== -1) return -1;
6159 if ((time(NULL
)-start
) > timeout
) {
6167 static int syncRead(int fd
, char *ptr
, ssize_t size
, int timeout
) {
6168 ssize_t nread
, totread
= 0;
6169 time_t start
= time(NULL
);
6173 if (aeWait(fd
,AE_READABLE
,1000) & AE_READABLE
) {
6174 nread
= read(fd
,ptr
,size
);
6175 if (nread
== -1) return -1;
6180 if ((time(NULL
)-start
) > timeout
) {
6188 static int syncReadLine(int fd
, char *ptr
, ssize_t size
, int timeout
) {
6195 if (syncRead(fd
,&c
,1,timeout
) == -1) return -1;
6198 if (nread
&& *(ptr
-1) == '\r') *(ptr
-1) = '\0';
6209 static void syncCommand(redisClient
*c
) {
6210 /* ignore SYNC if aleady slave or in monitor mode */
6211 if (c
->flags
& REDIS_SLAVE
) return;
6213 /* SYNC can't be issued when the server has pending data to send to
6214 * the client about already issued commands. We need a fresh reply
6215 * buffer registering the differences between the BGSAVE and the current
6216 * dataset, so that we can copy to other slaves if needed. */
6217 if (listLength(c
->reply
) != 0) {
6218 addReplySds(c
,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
6222 redisLog(REDIS_NOTICE
,"Slave ask for synchronization");
6223 /* Here we need to check if there is a background saving operation
6224 * in progress, or if it is required to start one */
6225 if (server
.bgsavechildpid
!= -1) {
6226 /* Ok a background save is in progress. Let's check if it is a good
6227 * one for replication, i.e. if there is another slave that is
6228 * registering differences since the server forked to save */
6233 listRewind(server
.slaves
,&li
);
6234 while((ln
= listNext(&li
))) {
6236 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) break;
6239 /* Perfect, the server is already registering differences for
6240 * another slave. Set the right state, and copy the buffer. */
6241 listRelease(c
->reply
);
6242 c
->reply
= listDup(slave
->reply
);
6243 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
6244 redisLog(REDIS_NOTICE
,"Waiting for end of BGSAVE for SYNC");
6246 /* No way, we need to wait for the next BGSAVE in order to
6247 * register differences */
6248 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_START
;
6249 redisLog(REDIS_NOTICE
,"Waiting for next BGSAVE for SYNC");
6252 /* Ok we don't have a BGSAVE in progress, let's start one */
6253 redisLog(REDIS_NOTICE
,"Starting BGSAVE for SYNC");
6254 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
6255 redisLog(REDIS_NOTICE
,"Replication failed, can't BGSAVE");
6256 addReplySds(c
,sdsnew("-ERR Unalbe to perform background save\r\n"));
6259 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
6262 c
->flags
|= REDIS_SLAVE
;
6264 listAddNodeTail(server
.slaves
,c
);
6268 static void sendBulkToSlave(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
6269 redisClient
*slave
= privdata
;
6271 REDIS_NOTUSED(mask
);
6272 char buf
[REDIS_IOBUF_LEN
];
6273 ssize_t nwritten
, buflen
;
6275 if (slave
->repldboff
== 0) {
6276 /* Write the bulk write count before to transfer the DB. In theory here
6277 * we don't know how much room there is in the output buffer of the
6278 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
6279 * operations) will never be smaller than the few bytes we need. */
6282 bulkcount
= sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
6284 if (write(fd
,bulkcount
,sdslen(bulkcount
)) != (signed)sdslen(bulkcount
))
6292 lseek(slave
->repldbfd
,slave
->repldboff
,SEEK_SET
);
6293 buflen
= read(slave
->repldbfd
,buf
,REDIS_IOBUF_LEN
);
6295 redisLog(REDIS_WARNING
,"Read error sending DB to slave: %s",
6296 (buflen
== 0) ? "premature EOF" : strerror(errno
));
6300 if ((nwritten
= write(fd
,buf
,buflen
)) == -1) {
6301 redisLog(REDIS_VERBOSE
,"Write error sending DB to slave: %s",
6306 slave
->repldboff
+= nwritten
;
6307 if (slave
->repldboff
== slave
->repldbsize
) {
6308 close(slave
->repldbfd
);
6309 slave
->repldbfd
= -1;
6310 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
6311 slave
->replstate
= REDIS_REPL_ONLINE
;
6312 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
,
6313 sendReplyToClient
, slave
) == AE_ERR
) {
6317 addReplySds(slave
,sdsempty());
6318 redisLog(REDIS_NOTICE
,"Synchronization with slave succeeded");
6322 /* This function is called at the end of every backgrond saving.
6323 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
6324 * otherwise REDIS_ERR is passed to the function.
6326 * The goal of this function is to handle slaves waiting for a successful
6327 * background saving in order to perform non-blocking synchronization. */
6328 static void updateSlavesWaitingBgsave(int bgsaveerr
) {
6330 int startbgsave
= 0;
6333 listRewind(server
.slaves
,&li
);
6334 while((ln
= listNext(&li
))) {
6335 redisClient
*slave
= ln
->value
;
6337 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) {
6339 slave
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
6340 } else if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) {
6341 struct redis_stat buf
;
6343 if (bgsaveerr
!= REDIS_OK
) {
6345 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE child returned an error");
6348 if ((slave
->repldbfd
= open(server
.dbfilename
,O_RDONLY
)) == -1 ||
6349 redis_fstat(slave
->repldbfd
,&buf
) == -1) {
6351 redisLog(REDIS_WARNING
,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno
));
6354 slave
->repldboff
= 0;
6355 slave
->repldbsize
= buf
.st_size
;
6356 slave
->replstate
= REDIS_REPL_SEND_BULK
;
6357 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
6358 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
, sendBulkToSlave
, slave
) == AE_ERR
) {
6365 if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) {
6368 listRewind(server
.slaves
,&li
);
6369 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE failed");
6370 while((ln
= listNext(&li
))) {
6371 redisClient
*slave
= ln
->value
;
6373 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
)
6380 static int syncWithMaster(void) {
6381 char buf
[1024], tmpfile
[256], authcmd
[1024];
6383 int fd
= anetTcpConnect(NULL
,server
.masterhost
,server
.masterport
);
6387 redisLog(REDIS_WARNING
,"Unable to connect to MASTER: %s",
6392 /* AUTH with the master if required. */
6393 if(server
.masterauth
) {
6394 snprintf(authcmd
, 1024, "AUTH %s\r\n", server
.masterauth
);
6395 if (syncWrite(fd
, authcmd
, strlen(server
.masterauth
)+7, 5) == -1) {
6397 redisLog(REDIS_WARNING
,"Unable to AUTH to MASTER: %s",
6401 /* Read the AUTH result. */
6402 if (syncReadLine(fd
,buf
,1024,3600) == -1) {
6404 redisLog(REDIS_WARNING
,"I/O error reading auth result from MASTER: %s",
6408 if (buf
[0] != '+') {
6410 redisLog(REDIS_WARNING
,"Cannot AUTH to MASTER, is the masterauth password correct?");
6415 /* Issue the SYNC command */
6416 if (syncWrite(fd
,"SYNC \r\n",7,5) == -1) {
6418 redisLog(REDIS_WARNING
,"I/O error writing to MASTER: %s",
6422 /* Read the bulk write count */
6423 if (syncReadLine(fd
,buf
,1024,3600) == -1) {
6425 redisLog(REDIS_WARNING
,"I/O error reading bulk count from MASTER: %s",
6429 if (buf
[0] != '$') {
6431 redisLog(REDIS_WARNING
,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
6434 dumpsize
= atoi(buf
+1);
6435 redisLog(REDIS_NOTICE
,"Receiving %d bytes data dump from MASTER",dumpsize
);
6436 /* Read the bulk write data on a temp file */
6437 snprintf(tmpfile
,256,"temp-%d.%ld.rdb",(int)time(NULL
),(long int)random());
6438 dfd
= open(tmpfile
,O_CREAT
|O_WRONLY
,0644);
6441 redisLog(REDIS_WARNING
,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno
));
6445 int nread
, nwritten
;
6447 nread
= read(fd
,buf
,(dumpsize
< 1024)?dumpsize
:1024);
6449 redisLog(REDIS_WARNING
,"I/O error trying to sync with MASTER: %s",
6455 nwritten
= write(dfd
,buf
,nread
);
6456 if (nwritten
== -1) {
6457 redisLog(REDIS_WARNING
,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno
));
6465 if (rename(tmpfile
,server
.dbfilename
) == -1) {
6466 redisLog(REDIS_WARNING
,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno
));
6472 if (rdbLoad(server
.dbfilename
) != REDIS_OK
) {
6473 redisLog(REDIS_WARNING
,"Failed trying to load the MASTER synchronization DB from disk");
6477 server
.master
= createClient(fd
);
6478 server
.master
->flags
|= REDIS_MASTER
;
6479 server
.master
->authenticated
= 1;
6480 server
.replstate
= REDIS_REPL_CONNECTED
;
6484 static void slaveofCommand(redisClient
*c
) {
6485 if (!strcasecmp(c
->argv
[1]->ptr
,"no") &&
6486 !strcasecmp(c
->argv
[2]->ptr
,"one")) {
6487 if (server
.masterhost
) {
6488 sdsfree(server
.masterhost
);
6489 server
.masterhost
= NULL
;
6490 if (server
.master
) freeClient(server
.master
);
6491 server
.replstate
= REDIS_REPL_NONE
;
6492 redisLog(REDIS_NOTICE
,"MASTER MODE enabled (user request)");
6495 sdsfree(server
.masterhost
);
6496 server
.masterhost
= sdsdup(c
->argv
[1]->ptr
);
6497 server
.masterport
= atoi(c
->argv
[2]->ptr
);
6498 if (server
.master
) freeClient(server
.master
);
6499 server
.replstate
= REDIS_REPL_CONNECT
;
6500 redisLog(REDIS_NOTICE
,"SLAVE OF %s:%d enabled (user request)",
6501 server
.masterhost
, server
.masterport
);
6503 addReply(c
,shared
.ok
);
6506 /* ============================ Maxmemory directive ======================== */
6508 /* Try to free one object form the pre-allocated objects free list.
6509 * This is useful under low mem conditions as by default we take 1 million
6510 * free objects allocated. On success REDIS_OK is returned, otherwise
6512 static int tryFreeOneObjectFromFreelist(void) {
6515 if (server
.vm_enabled
) pthread_mutex_lock(&server
.obj_freelist_mutex
);
6516 if (listLength(server
.objfreelist
)) {
6517 listNode
*head
= listFirst(server
.objfreelist
);
6518 o
= listNodeValue(head
);
6519 listDelNode(server
.objfreelist
,head
);
6520 if (server
.vm_enabled
) pthread_mutex_unlock(&server
.obj_freelist_mutex
);
6524 if (server
.vm_enabled
) pthread_mutex_unlock(&server
.obj_freelist_mutex
);
6529 /* This function gets called when 'maxmemory' is set on the config file to limit
6530 * the max memory used by the server, and we are out of memory.
6531 * This function will try to, in order:
6533 * - Free objects from the free list
6534 * - Try to remove keys with an EXPIRE set
6536 * It is not possible to free enough memory to reach used-memory < maxmemory
6537 * the server will start refusing commands that will enlarge even more the
6540 static void freeMemoryIfNeeded(void) {
6541 while (server
.maxmemory
&& zmalloc_used_memory() > server
.maxmemory
) {
6542 int j
, k
, freed
= 0;
6544 if (tryFreeOneObjectFromFreelist() == REDIS_OK
) continue;
6545 for (j
= 0; j
< server
.dbnum
; j
++) {
6547 robj
*minkey
= NULL
;
6548 struct dictEntry
*de
;
6550 if (dictSize(server
.db
[j
].expires
)) {
6552 /* From a sample of three keys drop the one nearest to
6553 * the natural expire */
6554 for (k
= 0; k
< 3; k
++) {
6557 de
= dictGetRandomKey(server
.db
[j
].expires
);
6558 t
= (time_t) dictGetEntryVal(de
);
6559 if (minttl
== -1 || t
< minttl
) {
6560 minkey
= dictGetEntryKey(de
);
6564 deleteKey(server
.db
+j
,minkey
);
6567 if (!freed
) return; /* nothing to free... */
6571 /* ============================== Append Only file ========================== */
6573 static void feedAppendOnlyFile(struct redisCommand
*cmd
, int dictid
, robj
**argv
, int argc
) {
6574 sds buf
= sdsempty();
6580 /* The DB this command was targetting is not the same as the last command
6581 * we appendend. To issue a SELECT command is needed. */
6582 if (dictid
!= server
.appendseldb
) {
6585 snprintf(seldb
,sizeof(seldb
),"%d",dictid
);
6586 buf
= sdscatprintf(buf
,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
6587 (unsigned long)strlen(seldb
),seldb
);
6588 server
.appendseldb
= dictid
;
6591 /* "Fix" the argv vector if the command is EXPIRE. We want to translate
6592 * EXPIREs into EXPIREATs calls */
6593 if (cmd
->proc
== expireCommand
) {
6596 tmpargv
[0] = createStringObject("EXPIREAT",8);
6597 tmpargv
[1] = argv
[1];
6598 incrRefCount(argv
[1]);
6599 when
= time(NULL
)+strtol(argv
[2]->ptr
,NULL
,10);
6600 tmpargv
[2] = createObject(REDIS_STRING
,
6601 sdscatprintf(sdsempty(),"%ld",when
));
6605 /* Append the actual command */
6606 buf
= sdscatprintf(buf
,"*%d\r\n",argc
);
6607 for (j
= 0; j
< argc
; j
++) {
6610 o
= getDecodedObject(o
);
6611 buf
= sdscatprintf(buf
,"$%lu\r\n",(unsigned long)sdslen(o
->ptr
));
6612 buf
= sdscatlen(buf
,o
->ptr
,sdslen(o
->ptr
));
6613 buf
= sdscatlen(buf
,"\r\n",2);
6617 /* Free the objects from the modified argv for EXPIREAT */
6618 if (cmd
->proc
== expireCommand
) {
6619 for (j
= 0; j
< 3; j
++)
6620 decrRefCount(argv
[j
]);
6623 /* We want to perform a single write. This should be guaranteed atomic
6624 * at least if the filesystem we are writing is a real physical one.
6625 * While this will save us against the server being killed I don't think
6626 * there is much to do about the whole server stopping for power problems
6628 nwritten
= write(server
.appendfd
,buf
,sdslen(buf
));
6629 if (nwritten
!= (signed)sdslen(buf
)) {
6630 /* Ooops, we are in troubles. The best thing to do for now is
6631 * to simply exit instead to give the illusion that everything is
6632 * working as expected. */
6633 if (nwritten
== -1) {
6634 redisLog(REDIS_WARNING
,"Exiting on error writing to the append-only file: %s",strerror(errno
));
6636 redisLog(REDIS_WARNING
,"Exiting on short write while writing to the append-only file: %s",strerror(errno
));
6640 /* If a background append only file rewriting is in progress we want to
6641 * accumulate the differences between the child DB and the current one
6642 * in a buffer, so that when the child process will do its work we
6643 * can append the differences to the new append only file. */
6644 if (server
.bgrewritechildpid
!= -1)
6645 server
.bgrewritebuf
= sdscatlen(server
.bgrewritebuf
,buf
,sdslen(buf
));
6649 if (server
.appendfsync
== APPENDFSYNC_ALWAYS
||
6650 (server
.appendfsync
== APPENDFSYNC_EVERYSEC
&&
6651 now
-server
.lastfsync
> 1))
6653 fsync(server
.appendfd
); /* Let's try to get this data on the disk */
6654 server
.lastfsync
= now
;
6658 /* In Redis commands are always executed in the context of a client, so in
6659 * order to load the append only file we need to create a fake client. */
6660 static struct redisClient
*createFakeClient(void) {
6661 struct redisClient
*c
= zmalloc(sizeof(*c
));
6665 c
->querybuf
= sdsempty();
6669 /* We set the fake client as a slave waiting for the synchronization
6670 * so that Redis will not try to send replies to this client. */
6671 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_START
;
6672 c
->reply
= listCreate();
6673 listSetFreeMethod(c
->reply
,decrRefCount
);
6674 listSetDupMethod(c
->reply
,dupClientReplyValue
);
6678 static void freeFakeClient(struct redisClient
*c
) {
6679 sdsfree(c
->querybuf
);
6680 listRelease(c
->reply
);
6684 /* Replay the append log file. On error REDIS_OK is returned. On non fatal
6685 * error (the append only file is zero-length) REDIS_ERR is returned. On
6686 * fatal error an error message is logged and the program exists. */
6687 int loadAppendOnlyFile(char *filename
) {
6688 struct redisClient
*fakeClient
;
6689 FILE *fp
= fopen(filename
,"r");
6690 struct redis_stat sb
;
6691 unsigned long long loadedkeys
= 0;
6693 if (redis_fstat(fileno(fp
),&sb
) != -1 && sb
.st_size
== 0)
6697 redisLog(REDIS_WARNING
,"Fatal error: can't open the append log file for reading: %s",strerror(errno
));
6701 fakeClient
= createFakeClient();
6708 struct redisCommand
*cmd
;
6710 if (fgets(buf
,sizeof(buf
),fp
) == NULL
) {
6716 if (buf
[0] != '*') goto fmterr
;
6718 argv
= zmalloc(sizeof(robj
*)*argc
);
6719 for (j
= 0; j
< argc
; j
++) {
6720 if (fgets(buf
,sizeof(buf
),fp
) == NULL
) goto readerr
;
6721 if (buf
[0] != '$') goto fmterr
;
6722 len
= strtol(buf
+1,NULL
,10);
6723 argsds
= sdsnewlen(NULL
,len
);
6724 if (len
&& fread(argsds
,len
,1,fp
) == 0) goto fmterr
;
6725 argv
[j
] = createObject(REDIS_STRING
,argsds
);
6726 if (fread(buf
,2,1,fp
) == 0) goto fmterr
; /* discard CRLF */
6729 /* Command lookup */
6730 cmd
= lookupCommand(argv
[0]->ptr
);
6732 redisLog(REDIS_WARNING
,"Unknown command '%s' reading the append only file", argv
[0]->ptr
);
6735 /* Try object sharing and encoding */
6736 if (server
.shareobjects
) {
6738 for(j
= 1; j
< argc
; j
++)
6739 argv
[j
] = tryObjectSharing(argv
[j
]);
6741 if (cmd
->flags
& REDIS_CMD_BULK
)
6742 tryObjectEncoding(argv
[argc
-1]);
6743 /* Run the command in the context of a fake client */
6744 fakeClient
->argc
= argc
;
6745 fakeClient
->argv
= argv
;
6746 cmd
->proc(fakeClient
);
6747 /* Discard the reply objects list from the fake client */
6748 while(listLength(fakeClient
->reply
))
6749 listDelNode(fakeClient
->reply
,listFirst(fakeClient
->reply
));
6750 /* Clean up, ready for the next command */
6751 for (j
= 0; j
< argc
; j
++) decrRefCount(argv
[j
]);
6753 /* Handle swapping while loading big datasets when VM is on */
6755 if (server
.vm_enabled
&& (loadedkeys
% 5000) == 0) {
6756 while (zmalloc_used_memory() > server
.vm_max_memory
) {
6757 if (vmSwapOneObjectBlocking() == REDIS_ERR
) break;
6762 freeFakeClient(fakeClient
);
6767 redisLog(REDIS_WARNING
,"Unexpected end of file reading the append only file");
6769 redisLog(REDIS_WARNING
,"Unrecoverable error reading the append only file: %s", strerror(errno
));
6773 redisLog(REDIS_WARNING
,"Bad file format reading the append only file");
6777 /* Write an object into a file in the bulk format $<count>\r\n<payload>\r\n */
6778 static int fwriteBulk(FILE *fp
, robj
*obj
) {
6782 /* Avoid the incr/decr ref count business if possible to help
6783 * copy-on-write (we are often in a child process when this function
6785 * Also makes sure that key objects don't get incrRefCount-ed when VM
6787 if (obj
->encoding
!= REDIS_ENCODING_RAW
) {
6788 obj
= getDecodedObject(obj
);
6791 snprintf(buf
,sizeof(buf
),"$%ld\r\n",(long)sdslen(obj
->ptr
));
6792 if (fwrite(buf
,strlen(buf
),1,fp
) == 0) goto err
;
6793 if (sdslen(obj
->ptr
) && fwrite(obj
->ptr
,sdslen(obj
->ptr
),1,fp
) == 0)
6795 if (fwrite("\r\n",2,1,fp
) == 0) goto err
;
6796 if (decrrc
) decrRefCount(obj
);
6799 if (decrrc
) decrRefCount(obj
);
6803 /* Write a double value in bulk format $<count>\r\n<payload>\r\n */
6804 static int fwriteBulkDouble(FILE *fp
, double d
) {
6805 char buf
[128], dbuf
[128];
6807 snprintf(dbuf
,sizeof(dbuf
),"%.17g\r\n",d
);
6808 snprintf(buf
,sizeof(buf
),"$%lu\r\n",(unsigned long)strlen(dbuf
)-2);
6809 if (fwrite(buf
,strlen(buf
),1,fp
) == 0) return 0;
6810 if (fwrite(dbuf
,strlen(dbuf
),1,fp
) == 0) return 0;
6814 /* Write a long value in bulk format $<count>\r\n<payload>\r\n */
6815 static int fwriteBulkLong(FILE *fp
, long l
) {
6816 char buf
[128], lbuf
[128];
6818 snprintf(lbuf
,sizeof(lbuf
),"%ld\r\n",l
);
6819 snprintf(buf
,sizeof(buf
),"$%lu\r\n",(unsigned long)strlen(lbuf
)-2);
6820 if (fwrite(buf
,strlen(buf
),1,fp
) == 0) return 0;
6821 if (fwrite(lbuf
,strlen(lbuf
),1,fp
) == 0) return 0;
6825 /* Write a sequence of commands able to fully rebuild the dataset into
6826 * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */
6827 static int rewriteAppendOnlyFile(char *filename
) {
6828 dictIterator
*di
= NULL
;
6833 time_t now
= time(NULL
);
6835 /* Note that we have to use a different temp name here compared to the
6836 * one used by rewriteAppendOnlyFileBackground() function. */
6837 snprintf(tmpfile
,256,"temp-rewriteaof-%d.aof", (int) getpid());
6838 fp
= fopen(tmpfile
,"w");
6840 redisLog(REDIS_WARNING
, "Failed rewriting the append only file: %s", strerror(errno
));
6843 for (j
= 0; j
< server
.dbnum
; j
++) {
6844 char selectcmd
[] = "*2\r\n$6\r\nSELECT\r\n";
6845 redisDb
*db
= server
.db
+j
;
6847 if (dictSize(d
) == 0) continue;
6848 di
= dictGetIterator(d
);
6854 /* SELECT the new DB */
6855 if (fwrite(selectcmd
,sizeof(selectcmd
)-1,1,fp
) == 0) goto werr
;
6856 if (fwriteBulkLong(fp
,j
) == 0) goto werr
;
6858 /* Iterate this DB writing every entry */
6859 while((de
= dictNext(di
)) != NULL
) {
6864 key
= dictGetEntryKey(de
);
6865 /* If the value for this key is swapped, load a preview in memory.
6866 * We use a "swapped" flag to remember if we need to free the
6867 * value object instead to just increment the ref count anyway
6868 * in order to avoid copy-on-write of pages if we are forked() */
6869 if (!server
.vm_enabled
|| key
->storage
== REDIS_VM_MEMORY
||
6870 key
->storage
== REDIS_VM_SWAPPING
) {
6871 o
= dictGetEntryVal(de
);
6874 o
= vmPreviewObject(key
);
6877 expiretime
= getExpire(db
,key
);
6879 /* Save the key and associated value */
6880 if (o
->type
== REDIS_STRING
) {
6881 /* Emit a SET command */
6882 char cmd
[]="*3\r\n$3\r\nSET\r\n";
6883 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6885 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6886 if (fwriteBulk(fp
,o
) == 0) goto werr
;
6887 } else if (o
->type
== REDIS_LIST
) {
6888 /* Emit the RPUSHes needed to rebuild the list */
6889 list
*list
= o
->ptr
;
6893 listRewind(list
,&li
);
6894 while((ln
= listNext(&li
))) {
6895 char cmd
[]="*3\r\n$5\r\nRPUSH\r\n";
6896 robj
*eleobj
= listNodeValue(ln
);
6898 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6899 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6900 if (fwriteBulk(fp
,eleobj
) == 0) goto werr
;
6902 } else if (o
->type
== REDIS_SET
) {
6903 /* Emit the SADDs needed to rebuild the set */
6905 dictIterator
*di
= dictGetIterator(set
);
6908 while((de
= dictNext(di
)) != NULL
) {
6909 char cmd
[]="*3\r\n$4\r\nSADD\r\n";
6910 robj
*eleobj
= dictGetEntryKey(de
);
6912 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6913 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6914 if (fwriteBulk(fp
,eleobj
) == 0) goto werr
;
6916 dictReleaseIterator(di
);
6917 } else if (o
->type
== REDIS_ZSET
) {
6918 /* Emit the ZADDs needed to rebuild the sorted set */
6920 dictIterator
*di
= dictGetIterator(zs
->dict
);
6923 while((de
= dictNext(di
)) != NULL
) {
6924 char cmd
[]="*4\r\n$4\r\nZADD\r\n";
6925 robj
*eleobj
= dictGetEntryKey(de
);
6926 double *score
= dictGetEntryVal(de
);
6928 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6929 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6930 if (fwriteBulkDouble(fp
,*score
) == 0) goto werr
;
6931 if (fwriteBulk(fp
,eleobj
) == 0) goto werr
;
6933 dictReleaseIterator(di
);
6935 redisAssert(0 != 0);
6937 /* Save the expire time */
6938 if (expiretime
!= -1) {
6939 char cmd
[]="*3\r\n$8\r\nEXPIREAT\r\n";
6940 /* If this key is already expired skip it */
6941 if (expiretime
< now
) continue;
6942 if (fwrite(cmd
,sizeof(cmd
)-1,1,fp
) == 0) goto werr
;
6943 if (fwriteBulk(fp
,key
) == 0) goto werr
;
6944 if (fwriteBulkLong(fp
,expiretime
) == 0) goto werr
;
6946 if (swapped
) decrRefCount(o
);
6948 dictReleaseIterator(di
);
6951 /* Make sure data will not remain on the OS's output buffers */
6956 /* Use RENAME to make sure the DB file is changed atomically only
6957 * if the generate DB file is ok. */
6958 if (rename(tmpfile
,filename
) == -1) {
6959 redisLog(REDIS_WARNING
,"Error moving temp append only file on the final destination: %s", strerror(errno
));
6963 redisLog(REDIS_NOTICE
,"SYNC append only file rewrite performed");
6969 redisLog(REDIS_WARNING
,"Write error writing append only file on disk: %s", strerror(errno
));
6970 if (di
) dictReleaseIterator(di
);
6974 /* This is how rewriting of the append only file in background works:
6976 * 1) The user calls BGREWRITEAOF
6977 * 2) Redis calls this function, that forks():
6978 * 2a) the child rewrite the append only file in a temp file.
6979 * 2b) the parent accumulates differences in server.bgrewritebuf.
6980 * 3) When the child finished '2a' exists.
6981 * 4) The parent will trap the exit code, if it's OK, will append the
6982 * data accumulated into server.bgrewritebuf into the temp file, and
6983 * finally will rename(2) the temp file in the actual file name.
6984 * The the new file is reopened as the new append only file. Profit!
6986 static int rewriteAppendOnlyFileBackground(void) {
6989 if (server
.bgrewritechildpid
!= -1) return REDIS_ERR
;
6990 if (server
.vm_enabled
) waitEmptyIOJobsQueue();
6991 if ((childpid
= fork()) == 0) {
6995 if (server
.vm_enabled
) vmReopenSwapFile();
6997 snprintf(tmpfile
,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
6998 if (rewriteAppendOnlyFile(tmpfile
) == REDIS_OK
) {
7005 if (childpid
== -1) {
7006 redisLog(REDIS_WARNING
,
7007 "Can't rewrite append only file in background: fork: %s",
7011 redisLog(REDIS_NOTICE
,
7012 "Background append only file rewriting started by pid %d",childpid
);
7013 server
.bgrewritechildpid
= childpid
;
7014 /* We set appendseldb to -1 in order to force the next call to the
7015 * feedAppendOnlyFile() to issue a SELECT command, so the differences
7016 * accumulated by the parent into server.bgrewritebuf will start
7017 * with a SELECT statement and it will be safe to merge. */
7018 server
.appendseldb
= -1;
7021 return REDIS_OK
; /* unreached */
7024 static void bgrewriteaofCommand(redisClient
*c
) {
7025 if (server
.bgrewritechildpid
!= -1) {
7026 addReplySds(c
,sdsnew("-ERR background append only file rewriting already in progress\r\n"));
7029 if (rewriteAppendOnlyFileBackground() == REDIS_OK
) {
7030 char *status
= "+Background append only file rewriting started\r\n";
7031 addReplySds(c
,sdsnew(status
));
7033 addReply(c
,shared
.err
);
7037 static void aofRemoveTempFile(pid_t childpid
) {
7040 snprintf(tmpfile
,256,"temp-rewriteaof-bg-%d.aof", (int) childpid
);
7044 /* Virtual Memory is composed mainly of two subsystems:
7045 * - Blocking Virutal Memory
7046 * - Threaded Virtual Memory I/O
7047 * The two parts are not fully decoupled, but functions are split among two
7048 * different sections of the source code (delimited by comments) in order to
7049 * make more clear what functionality is about the blocking VM and what about
7050 * the threaded (not blocking) VM.
7054 * Redis VM is a blocking VM (one that blocks reading swapped values from
7055 * disk into memory when a value swapped out is needed in memory) that is made
7056 * unblocking by trying to examine the command argument vector in order to
7057 * load in background values that will likely be needed in order to exec
7058 * the command. The command is executed only once all the relevant keys
7059 * are loaded into memory.
7061 * This basically is almost as simple of a blocking VM, but almost as parallel
7062 * as a fully non-blocking VM.
7065 /* =================== Virtual Memory - Blocking Side ====================== */
7067 /* substitute the first occurrence of '%p' with the process pid in the
7068 * swap file name. */
7069 static void expandVmSwapFilename(void) {
7070 char *p
= strstr(server
.vm_swap_file
,"%p");
7076 new = sdscat(new,server
.vm_swap_file
);
7077 new = sdscatprintf(new,"%ld",(long) getpid());
7078 new = sdscat(new,p
+2);
7079 zfree(server
.vm_swap_file
);
7080 server
.vm_swap_file
= new;
7083 static void vmInit(void) {
7088 if (server
.vm_max_threads
!= 0)
7089 zmalloc_enable_thread_safeness(); /* we need thread safe zmalloc() */
7091 expandVmSwapFilename();
7092 redisLog(REDIS_NOTICE
,"Using '%s' as swap file",server
.vm_swap_file
);
7093 if ((server
.vm_fp
= fopen(server
.vm_swap_file
,"r+b")) == NULL
) {
7094 server
.vm_fp
= fopen(server
.vm_swap_file
,"w+b");
7096 if (server
.vm_fp
== NULL
) {
7097 redisLog(REDIS_WARNING
,
7098 "Impossible to open the swap file: %s. Exiting.",
7102 server
.vm_fd
= fileno(server
.vm_fp
);
7103 server
.vm_next_page
= 0;
7104 server
.vm_near_pages
= 0;
7105 server
.vm_stats_used_pages
= 0;
7106 server
.vm_stats_swapped_objects
= 0;
7107 server
.vm_stats_swapouts
= 0;
7108 server
.vm_stats_swapins
= 0;
7109 totsize
= server
.vm_pages
*server
.vm_page_size
;
7110 redisLog(REDIS_NOTICE
,"Allocating %lld bytes of swap file",totsize
);
7111 if (ftruncate(server
.vm_fd
,totsize
) == -1) {
7112 redisLog(REDIS_WARNING
,"Can't ftruncate swap file: %s. Exiting.",
7116 redisLog(REDIS_NOTICE
,"Swap file allocated with success");
7118 server
.vm_bitmap
= zmalloc((server
.vm_pages
+7)/8);
7119 redisLog(REDIS_VERBOSE
,"Allocated %lld bytes page table for %lld pages",
7120 (long long) (server
.vm_pages
+7)/8, server
.vm_pages
);
7121 memset(server
.vm_bitmap
,0,(server
.vm_pages
+7)/8);
7123 /* Initialize threaded I/O (used by Virtual Memory) */
7124 server
.io_newjobs
= listCreate();
7125 server
.io_processing
= listCreate();
7126 server
.io_processed
= listCreate();
7127 server
.io_ready_clients
= listCreate();
7128 pthread_mutex_init(&server
.io_mutex
,NULL
);
7129 pthread_mutex_init(&server
.obj_freelist_mutex
,NULL
);
7130 pthread_mutex_init(&server
.io_swapfile_mutex
,NULL
);
7131 server
.io_active_threads
= 0;
7132 if (pipe(pipefds
) == -1) {
7133 redisLog(REDIS_WARNING
,"Unable to intialized VM: pipe(2): %s. Exiting."
7137 server
.io_ready_pipe_read
= pipefds
[0];
7138 server
.io_ready_pipe_write
= pipefds
[1];
7139 redisAssert(anetNonBlock(NULL
,server
.io_ready_pipe_read
) != ANET_ERR
);
7140 /* LZF requires a lot of stack */
7141 pthread_attr_init(&server
.io_threads_attr
);
7142 pthread_attr_getstacksize(&server
.io_threads_attr
, &stacksize
);
7143 while (stacksize
< REDIS_THREAD_STACK_SIZE
) stacksize
*= 2;
7144 pthread_attr_setstacksize(&server
.io_threads_attr
, stacksize
);
7145 /* Listen for events in the threaded I/O pipe */
7146 if (aeCreateFileEvent(server
.el
, server
.io_ready_pipe_read
, AE_READABLE
,
7147 vmThreadedIOCompletedJob
, NULL
) == AE_ERR
)
7148 oom("creating file event");
7151 /* Mark the page as used */
7152 static void vmMarkPageUsed(off_t page
) {
7153 off_t byte
= page
/8;
7155 redisAssert(vmFreePage(page
) == 1);
7156 server
.vm_bitmap
[byte
] |= 1<<bit
;
7157 redisLog(REDIS_DEBUG
,"Mark used: %lld (byte:%lld bit:%d)\n",
7158 (long long)page
, (long long)byte
, bit
);
7161 /* Mark N contiguous pages as used, with 'page' being the first. */
7162 static void vmMarkPagesUsed(off_t page
, off_t count
) {
7165 for (j
= 0; j
< count
; j
++)
7166 vmMarkPageUsed(page
+j
);
7167 server
.vm_stats_used_pages
+= count
;
7170 /* Mark the page as free */
7171 static void vmMarkPageFree(off_t page
) {
7172 off_t byte
= page
/8;
7174 redisAssert(vmFreePage(page
) == 0);
7175 server
.vm_bitmap
[byte
] &= ~(1<<bit
);
7176 redisLog(REDIS_DEBUG
,"Mark free: %lld (byte:%lld bit:%d)\n",
7177 (long long)page
, (long long)byte
, bit
);
7180 /* Mark N contiguous pages as free, with 'page' being the first. */
7181 static void vmMarkPagesFree(off_t page
, off_t count
) {
7184 for (j
= 0; j
< count
; j
++)
7185 vmMarkPageFree(page
+j
);
7186 server
.vm_stats_used_pages
-= count
;
7187 if (server
.vm_stats_used_pages
> 100000000) {
7192 /* Test if the page is free */
7193 static int vmFreePage(off_t page
) {
7194 off_t byte
= page
/8;
7196 return (server
.vm_bitmap
[byte
] & (1<<bit
)) == 0;
7199 /* Find N contiguous free pages storing the first page of the cluster in *first.
7200 * Returns REDIS_OK if it was able to find N contiguous pages, otherwise
7201 * REDIS_ERR is returned.
7203 * This function uses a simple algorithm: we try to allocate
7204 * REDIS_VM_MAX_NEAR_PAGES sequentially, when we reach this limit we start
7205 * again from the start of the swap file searching for free spaces.
7207 * If it looks pretty clear that there are no free pages near our offset
7208 * we try to find less populated places doing a forward jump of
7209 * REDIS_VM_MAX_RANDOM_JUMP, then we start scanning again a few pages
7210 * without hurry, and then we jump again and so forth...
7212 * This function can be improved using a free list to avoid to guess
7213 * too much, since we could collect data about freed pages.
7215 * note: I implemented this function just after watching an episode of
7216 * Battlestar Galactica, where the hybrid was continuing to say "JUMP!"
7218 static int vmFindContiguousPages(off_t
*first
, off_t n
) {
7219 off_t base
, offset
= 0, since_jump
= 0, numfree
= 0;
7221 if (server
.vm_near_pages
== REDIS_VM_MAX_NEAR_PAGES
) {
7222 server
.vm_near_pages
= 0;
7223 server
.vm_next_page
= 0;
7225 server
.vm_near_pages
++; /* Yet another try for pages near to the old ones */
7226 base
= server
.vm_next_page
;
7228 while(offset
< server
.vm_pages
) {
7229 off_t
this = base
+offset
;
7231 /* If we overflow, restart from page zero */
7232 if (this >= server
.vm_pages
) {
7233 this -= server
.vm_pages
;
7235 /* Just overflowed, what we found on tail is no longer
7236 * interesting, as it's no longer contiguous. */
7240 redisLog(REDIS_DEBUG
, "THIS: %lld (%c)\n", (long long) this, vmFreePage(this) ? 'F' : 'X');
7241 if (vmFreePage(this)) {
7242 /* This is a free page */
7244 /* Already got N free pages? Return to the caller, with success */
7246 *first
= this-(n
-1);
7247 server
.vm_next_page
= this+1;
7251 /* The current one is not a free page */
7255 /* Fast-forward if the current page is not free and we already
7256 * searched enough near this place. */
7258 if (!numfree
&& since_jump
>= REDIS_VM_MAX_RANDOM_JUMP
/4) {
7259 offset
+= random() % REDIS_VM_MAX_RANDOM_JUMP
;
7261 /* Note that even if we rewind after the jump, we are don't need
7262 * to make sure numfree is set to zero as we only jump *if* it
7263 * is set to zero. */
7265 /* Otherwise just check the next page */
7272 /* Write the specified object at the specified page of the swap file */
7273 static int vmWriteObjectOnSwap(robj
*o
, off_t page
) {
7274 if (server
.vm_enabled
) pthread_mutex_lock(&server
.io_swapfile_mutex
);
7275 if (fseeko(server
.vm_fp
,page
*server
.vm_page_size
,SEEK_SET
) == -1) {
7276 if (server
.vm_enabled
) pthread_mutex_unlock(&server
.io_swapfile_mutex
);
7277 redisLog(REDIS_WARNING
,
7278 "Critical VM problem in vmSwapObjectBlocking(): can't seek: %s",
7282 rdbSaveObject(server
.vm_fp
,o
);
7283 if (server
.vm_enabled
) pthread_mutex_unlock(&server
.io_swapfile_mutex
);
7287 /* Swap the 'val' object relative to 'key' into disk. Store all the information
7288 * needed to later retrieve the object into the key object.
7289 * If we can't find enough contiguous empty pages to swap the object on disk
7290 * REDIS_ERR is returned. */
7291 static int vmSwapObjectBlocking(robj
*key
, robj
*val
) {
7292 off_t pages
= rdbSavedObjectPages(val
,NULL
);
7295 assert(key
->storage
== REDIS_VM_MEMORY
);
7296 assert(key
->refcount
== 1);
7297 if (vmFindContiguousPages(&page
,pages
) == REDIS_ERR
) return REDIS_ERR
;
7298 if (vmWriteObjectOnSwap(val
,page
) == REDIS_ERR
) return REDIS_ERR
;
7299 key
->vm
.page
= page
;
7300 key
->vm
.usedpages
= pages
;
7301 key
->storage
= REDIS_VM_SWAPPED
;
7302 key
->vtype
= val
->type
;
7303 decrRefCount(val
); /* Deallocate the object from memory. */
7304 vmMarkPagesUsed(page
,pages
);
7305 redisLog(REDIS_DEBUG
,"VM: object %s swapped out at %lld (%lld pages)",
7306 (unsigned char*) key
->ptr
,
7307 (unsigned long long) page
, (unsigned long long) pages
);
7308 server
.vm_stats_swapped_objects
++;
7309 server
.vm_stats_swapouts
++;
7310 fflush(server
.vm_fp
);
7314 static robj
*vmReadObjectFromSwap(off_t page
, int type
) {
7317 if (server
.vm_enabled
) pthread_mutex_lock(&server
.io_swapfile_mutex
);
7318 if (fseeko(server
.vm_fp
,page
*server
.vm_page_size
,SEEK_SET
) == -1) {
7319 redisLog(REDIS_WARNING
,
7320 "Unrecoverable VM problem in vmReadObjectFromSwap(): can't seek: %s",
7324 o
= rdbLoadObject(type
,server
.vm_fp
);
7326 redisLog(REDIS_WARNING
, "Unrecoverable VM problem in vmReadObjectFromSwap(): can't load object from swap file: %s", strerror(errno
));
7329 if (server
.vm_enabled
) pthread_mutex_unlock(&server
.io_swapfile_mutex
);
7333 /* Load the value object relative to the 'key' object from swap to memory.
7334 * The newly allocated object is returned.
7336 * If preview is true the unserialized object is returned to the caller but
7337 * no changes are made to the key object, nor the pages are marked as freed */
7338 static robj
*vmGenericLoadObject(robj
*key
, int preview
) {
7341 redisAssert(key
->storage
== REDIS_VM_SWAPPED
|| key
->storage
== REDIS_VM_LOADING
);
7342 val
= vmReadObjectFromSwap(key
->vm
.page
,key
->vtype
);
7344 key
->storage
= REDIS_VM_MEMORY
;
7345 key
->vm
.atime
= server
.unixtime
;
7346 vmMarkPagesFree(key
->vm
.page
,key
->vm
.usedpages
);
7347 redisLog(REDIS_DEBUG
, "VM: object %s loaded from disk",
7348 (unsigned char*) key
->ptr
);
7349 server
.vm_stats_swapped_objects
--;
7351 redisLog(REDIS_DEBUG
, "VM: object %s previewed from disk",
7352 (unsigned char*) key
->ptr
);
7354 server
.vm_stats_swapins
++;
7358 /* Plain object loading, from swap to memory */
7359 static robj
*vmLoadObject(robj
*key
) {
7360 /* If we are loading the object in background, stop it, we
7361 * need to load this object synchronously ASAP. */
7362 if (key
->storage
== REDIS_VM_LOADING
)
7363 vmCancelThreadedIOJob(key
);
7364 return vmGenericLoadObject(key
,0);
7367 /* Just load the value on disk, without to modify the key.
7368 * This is useful when we want to perform some operation on the value
7369 * without to really bring it from swap to memory, like while saving the
7370 * dataset or rewriting the append only log. */
7371 static robj
*vmPreviewObject(robj
*key
) {
7372 return vmGenericLoadObject(key
,1);
7375 /* How a good candidate is this object for swapping?
7376 * The better candidate it is, the greater the returned value.
7378 * Currently we try to perform a fast estimation of the object size in
7379 * memory, and combine it with aging informations.
7381 * Basically swappability = idle-time * log(estimated size)
7383 * Bigger objects are preferred over smaller objects, but not
7384 * proportionally, this is why we use the logarithm. This algorithm is
7385 * just a first try and will probably be tuned later. */
7386 static double computeObjectSwappability(robj
*o
) {
7387 time_t age
= server
.unixtime
- o
->vm
.atime
;
7391 struct dictEntry
*de
;
7394 if (age
<= 0) return 0;
7397 if (o
->encoding
!= REDIS_ENCODING_RAW
) {
7400 asize
= sdslen(o
->ptr
)+sizeof(*o
)+sizeof(long)*2;
7405 listNode
*ln
= listFirst(l
);
7407 asize
= sizeof(list
);
7409 robj
*ele
= ln
->value
;
7412 elesize
= (ele
->encoding
== REDIS_ENCODING_RAW
) ?
7413 (sizeof(*o
)+sdslen(ele
->ptr
)) :
7415 asize
+= (sizeof(listNode
)+elesize
)*listLength(l
);
7420 z
= (o
->type
== REDIS_ZSET
);
7421 d
= z
? ((zset
*)o
->ptr
)->dict
: o
->ptr
;
7423 asize
= sizeof(dict
)+(sizeof(struct dictEntry
*)*dictSlots(d
));
7424 if (z
) asize
+= sizeof(zset
)-sizeof(dict
);
7429 de
= dictGetRandomKey(d
);
7430 ele
= dictGetEntryKey(de
);
7431 elesize
= (ele
->encoding
== REDIS_ENCODING_RAW
) ?
7432 (sizeof(*o
)+sdslen(ele
->ptr
)) :
7434 asize
+= (sizeof(struct dictEntry
)+elesize
)*dictSize(d
);
7435 if (z
) asize
+= sizeof(zskiplistNode
)*dictSize(d
);
7439 return (double)asize
*log(1+asize
);
7442 /* Try to swap an object that's a good candidate for swapping.
7443 * Returns REDIS_OK if the object was swapped, REDIS_ERR if it's not possible
7444 * to swap any object at all.
7446 * If 'usethreaded' is true, Redis will try to swap the object in background
7447 * using I/O threads. */
7448 static int vmSwapOneObject(int usethreads
) {
7450 struct dictEntry
*best
= NULL
;
7451 double best_swappability
= 0;
7452 redisDb
*best_db
= NULL
;
7455 for (j
= 0; j
< server
.dbnum
; j
++) {
7456 redisDb
*db
= server
.db
+j
;
7457 /* Why maxtries is set to 100?
7458 * Because this way (usually) we'll find 1 object even if just 1% - 2%
7459 * are swappable objects */
7462 if (dictSize(db
->dict
) == 0) continue;
7463 for (i
= 0; i
< 5; i
++) {
7465 double swappability
;
7467 if (maxtries
) maxtries
--;
7468 de
= dictGetRandomKey(db
->dict
);
7469 key
= dictGetEntryKey(de
);
7470 val
= dictGetEntryVal(de
);
7471 /* Only swap objects that are currently in memory.
7473 * Also don't swap shared objects if threaded VM is on, as we
7474 * try to ensure that the main thread does not touch the
7475 * object while the I/O thread is using it, but we can't
7476 * control other keys without adding additional mutex. */
7477 if (key
->storage
!= REDIS_VM_MEMORY
||
7478 (server
.vm_max_threads
!= 0 && val
->refcount
!= 1)) {
7479 if (maxtries
) i
--; /* don't count this try */
7482 swappability
= computeObjectSwappability(val
);
7483 if (!best
|| swappability
> best_swappability
) {
7485 best_swappability
= swappability
;
7491 redisLog(REDIS_DEBUG
,"No swappable key found!");
7494 key
= dictGetEntryKey(best
);
7495 val
= dictGetEntryVal(best
);
7497 redisLog(REDIS_DEBUG
,"Key with best swappability: %s, %f",
7498 key
->ptr
, best_swappability
);
7500 /* Unshare the key if needed */
7501 if (key
->refcount
> 1) {
7502 robj
*newkey
= dupStringObject(key
);
7504 key
= dictGetEntryKey(best
) = newkey
;
7508 vmSwapObjectThreaded(key
,val
,best_db
);
7511 if (vmSwapObjectBlocking(key
,val
) == REDIS_OK
) {
7512 dictGetEntryVal(best
) = NULL
;
7520 static int vmSwapOneObjectBlocking() {
7521 return vmSwapOneObject(0);
7524 static int vmSwapOneObjectThreaded() {
7525 return vmSwapOneObject(1);
7528 /* Return true if it's safe to swap out objects in a given moment.
7529 * Basically we don't want to swap objects out while there is a BGSAVE
7530 * or a BGAEOREWRITE running in backgroud. */
7531 static int vmCanSwapOut(void) {
7532 return (server
.bgsavechildpid
== -1 && server
.bgrewritechildpid
== -1);
7535 /* Delete a key if swapped. Returns 1 if the key was found, was swapped
7536 * and was deleted. Otherwise 0 is returned. */
7537 static int deleteIfSwapped(redisDb
*db
, robj
*key
) {
7541 if ((de
= dictFind(db
->dict
,key
)) == NULL
) return 0;
7542 foundkey
= dictGetEntryKey(de
);
7543 if (foundkey
->storage
== REDIS_VM_MEMORY
) return 0;
7548 /* =================== Virtual Memory - Threaded I/O ======================= */
7550 static void freeIOJob(iojob
*j
) {
7551 if ((j
->type
== REDIS_IOJOB_PREPARE_SWAP
||
7552 j
->type
== REDIS_IOJOB_DO_SWAP
||
7553 j
->type
== REDIS_IOJOB_LOAD
) && j
->val
!= NULL
)
7554 decrRefCount(j
->val
);
7555 decrRefCount(j
->key
);
7559 /* Every time a thread finished a Job, it writes a byte into the write side
7560 * of an unix pipe in order to "awake" the main thread, and this function
7562 static void vmThreadedIOCompletedJob(aeEventLoop
*el
, int fd
, void *privdata
,
7566 int retval
, processed
= 0, toprocess
= -1, trytoswap
= 1;
7568 REDIS_NOTUSED(mask
);
7569 REDIS_NOTUSED(privdata
);
7571 /* For every byte we read in the read side of the pipe, there is one
7572 * I/O job completed to process. */
7573 while((retval
= read(fd
,buf
,1)) == 1) {
7577 struct dictEntry
*de
;
7579 redisLog(REDIS_DEBUG
,"Processing I/O completed job");
7581 /* Get the processed element (the oldest one) */
7583 assert(listLength(server
.io_processed
) != 0);
7584 if (toprocess
== -1) {
7585 toprocess
= (listLength(server
.io_processed
)*REDIS_MAX_COMPLETED_JOBS_PROCESSED
)/100;
7586 if (toprocess
<= 0) toprocess
= 1;
7588 ln
= listFirst(server
.io_processed
);
7590 listDelNode(server
.io_processed
,ln
);
7592 /* If this job is marked as canceled, just ignore it */
7597 /* Post process it in the main thread, as there are things we
7598 * can do just here to avoid race conditions and/or invasive locks */
7599 redisLog(REDIS_DEBUG
,"Job %p type: %d, key at %p (%s) refcount: %d\n", (void*) j
, j
->type
, (void*)j
->key
, (char*)j
->key
->ptr
, j
->key
->refcount
);
7600 de
= dictFind(j
->db
->dict
,j
->key
);
7602 key
= dictGetEntryKey(de
);
7603 if (j
->type
== REDIS_IOJOB_LOAD
) {
7606 /* Key loaded, bring it at home */
7607 key
->storage
= REDIS_VM_MEMORY
;
7608 key
->vm
.atime
= server
.unixtime
;
7609 vmMarkPagesFree(key
->vm
.page
,key
->vm
.usedpages
);
7610 redisLog(REDIS_DEBUG
, "VM: object %s loaded from disk (threaded)",
7611 (unsigned char*) key
->ptr
);
7612 server
.vm_stats_swapped_objects
--;
7613 server
.vm_stats_swapins
++;
7614 dictGetEntryVal(de
) = j
->val
;
7615 incrRefCount(j
->val
);
7618 /* Handle clients waiting for this key to be loaded. */
7619 handleClientsBlockedOnSwappedKey(db
,key
);
7620 } else if (j
->type
== REDIS_IOJOB_PREPARE_SWAP
) {
7621 /* Now we know the amount of pages required to swap this object.
7622 * Let's find some space for it, and queue this task again
7623 * rebranded as REDIS_IOJOB_DO_SWAP. */
7624 if (!vmCanSwapOut() ||
7625 vmFindContiguousPages(&j
->page
,j
->pages
) == REDIS_ERR
)
7627 /* Ooops... no space or we can't swap as there is
7628 * a fork()ed Redis trying to save stuff on disk. */
7630 key
->storage
= REDIS_VM_MEMORY
; /* undo operation */
7632 /* Note that we need to mark this pages as used now,
7633 * if the job will be canceled, we'll mark them as freed
7635 vmMarkPagesUsed(j
->page
,j
->pages
);
7636 j
->type
= REDIS_IOJOB_DO_SWAP
;
7641 } else if (j
->type
== REDIS_IOJOB_DO_SWAP
) {
7644 /* Key swapped. We can finally free some memory. */
7645 if (key
->storage
!= REDIS_VM_SWAPPING
) {
7646 printf("key->storage: %d\n",key
->storage
);
7647 printf("key->name: %s\n",(char*)key
->ptr
);
7648 printf("key->refcount: %d\n",key
->refcount
);
7649 printf("val: %p\n",(void*)j
->val
);
7650 printf("val->type: %d\n",j
->val
->type
);
7651 printf("val->ptr: %s\n",(char*)j
->val
->ptr
);
7653 redisAssert(key
->storage
== REDIS_VM_SWAPPING
);
7654 val
= dictGetEntryVal(de
);
7655 key
->vm
.page
= j
->page
;
7656 key
->vm
.usedpages
= j
->pages
;
7657 key
->storage
= REDIS_VM_SWAPPED
;
7658 key
->vtype
= j
->val
->type
;
7659 decrRefCount(val
); /* Deallocate the object from memory. */
7660 dictGetEntryVal(de
) = NULL
;
7661 redisLog(REDIS_DEBUG
,
7662 "VM: object %s swapped out at %lld (%lld pages) (threaded)",
7663 (unsigned char*) key
->ptr
,
7664 (unsigned long long) j
->page
, (unsigned long long) j
->pages
);
7665 server
.vm_stats_swapped_objects
++;
7666 server
.vm_stats_swapouts
++;
7668 /* Put a few more swap requests in queue if we are still
7670 if (trytoswap
&& vmCanSwapOut() &&
7671 zmalloc_used_memory() > server
.vm_max_memory
)
7676 more
= listLength(server
.io_newjobs
) <
7677 (unsigned) server
.vm_max_threads
;
7679 /* Don't waste CPU time if swappable objects are rare. */
7680 if (vmSwapOneObjectThreaded() == REDIS_ERR
) {
7688 if (processed
== toprocess
) return;
7690 if (retval
< 0 && errno
!= EAGAIN
) {
7691 redisLog(REDIS_WARNING
,
7692 "WARNING: read(2) error in vmThreadedIOCompletedJob() %s",
7697 static void lockThreadedIO(void) {
7698 pthread_mutex_lock(&server
.io_mutex
);
7701 static void unlockThreadedIO(void) {
7702 pthread_mutex_unlock(&server
.io_mutex
);
7705 /* Remove the specified object from the threaded I/O queue if still not
7706 * processed, otherwise make sure to flag it as canceled. */
7707 static void vmCancelThreadedIOJob(robj
*o
) {
7709 server
.io_newjobs
, /* 0 */
7710 server
.io_processing
, /* 1 */
7711 server
.io_processed
/* 2 */
7715 assert(o
->storage
== REDIS_VM_LOADING
|| o
->storage
== REDIS_VM_SWAPPING
);
7718 /* Search for a matching key in one of the queues */
7719 for (i
= 0; i
< 3; i
++) {
7723 listRewind(lists
[i
],&li
);
7724 while ((ln
= listNext(&li
)) != NULL
) {
7725 iojob
*job
= ln
->value
;
7727 if (job
->canceled
) continue; /* Skip this, already canceled. */
7728 if (compareStringObjects(job
->key
,o
) == 0) {
7729 redisLog(REDIS_DEBUG
,"*** CANCELED %p (%s) (type %d) (LIST ID %d)\n",
7730 (void*)job
, (char*)o
->ptr
, job
->type
, i
);
7731 /* Mark the pages as free since the swap didn't happened
7732 * or happened but is now discarded. */
7733 if (i
!= 1 && job
->type
== REDIS_IOJOB_DO_SWAP
)
7734 vmMarkPagesFree(job
->page
,job
->pages
);
7735 /* Cancel the job. It depends on the list the job is
7738 case 0: /* io_newjobs */
7739 /* If the job was yet not processed the best thing to do
7740 * is to remove it from the queue at all */
7742 listDelNode(lists
[i
],ln
);
7744 case 1: /* io_processing */
7745 /* Oh Shi- the thread is messing with the Job:
7747 * Probably it's accessing the object if this is a
7748 * PREPARE_SWAP or DO_SWAP job.
7749 * If it's a LOAD job it may be reading from disk and
7750 * if we don't wait for the job to terminate before to
7751 * cancel it, maybe in a few microseconds data can be
7752 * corrupted in this pages. So the short story is:
7754 * Better to wait for the job to move into the
7755 * next queue (processed)... */
7757 /* We try again and again until the job is completed. */
7759 /* But let's wait some time for the I/O thread
7760 * to finish with this job. After all this condition
7761 * should be very rare. */
7764 case 2: /* io_processed */
7765 /* The job was already processed, that's easy...
7766 * just mark it as canceled so that we'll ignore it
7767 * when processing completed jobs. */
7771 /* Finally we have to adjust the storage type of the object
7772 * in order to "UNDO" the operaiton. */
7773 if (o
->storage
== REDIS_VM_LOADING
)
7774 o
->storage
= REDIS_VM_SWAPPED
;
7775 else if (o
->storage
== REDIS_VM_SWAPPING
)
7776 o
->storage
= REDIS_VM_MEMORY
;
7783 assert(1 != 1); /* We should never reach this */
7786 static void *IOThreadEntryPoint(void *arg
) {
7791 pthread_detach(pthread_self());
7793 /* Get a new job to process */
7795 if (listLength(server
.io_newjobs
) == 0) {
7796 /* No new jobs in queue, exit. */
7797 redisLog(REDIS_DEBUG
,"Thread %lld exiting, nothing to do",
7798 (long long) pthread_self());
7799 server
.io_active_threads
--;
7803 ln
= listFirst(server
.io_newjobs
);
7805 listDelNode(server
.io_newjobs
,ln
);
7806 /* Add the job in the processing queue */
7807 j
->thread
= pthread_self();
7808 listAddNodeTail(server
.io_processing
,j
);
7809 ln
= listLast(server
.io_processing
); /* We use ln later to remove it */
7811 redisLog(REDIS_DEBUG
,"Thread %lld got a new job (type %d): %p about key '%s'",
7812 (long long) pthread_self(), j
->type
, (void*)j
, (char*)j
->key
->ptr
);
7814 /* Process the Job */
7815 if (j
->type
== REDIS_IOJOB_LOAD
) {
7816 j
->val
= vmReadObjectFromSwap(j
->page
,j
->key
->vtype
);
7817 } else if (j
->type
== REDIS_IOJOB_PREPARE_SWAP
) {
7818 FILE *fp
= fopen("/dev/null","w+");
7819 j
->pages
= rdbSavedObjectPages(j
->val
,fp
);
7821 } else if (j
->type
== REDIS_IOJOB_DO_SWAP
) {
7822 if (vmWriteObjectOnSwap(j
->val
,j
->page
) == REDIS_ERR
)
7826 /* Done: insert the job into the processed queue */
7827 redisLog(REDIS_DEBUG
,"Thread %lld completed the job: %p (key %s)",
7828 (long long) pthread_self(), (void*)j
, (char*)j
->key
->ptr
);
7830 listDelNode(server
.io_processing
,ln
);
7831 listAddNodeTail(server
.io_processed
,j
);
7834 /* Signal the main thread there is new stuff to process */
7835 assert(write(server
.io_ready_pipe_write
,"x",1) == 1);
7837 return NULL
; /* never reached */
7840 static void spawnIOThread(void) {
7842 sigset_t mask
, omask
;
7845 sigaddset(&mask
,SIGCHLD
);
7846 sigaddset(&mask
,SIGHUP
);
7847 sigaddset(&mask
,SIGPIPE
);
7848 pthread_sigmask(SIG_SETMASK
, &mask
, &omask
);
7849 pthread_create(&thread
,&server
.io_threads_attr
,IOThreadEntryPoint
,NULL
);
7850 pthread_sigmask(SIG_SETMASK
, &omask
, NULL
);
7851 server
.io_active_threads
++;
7854 /* We need to wait for the last thread to exit before we are able to
7855 * fork() in order to BGSAVE or BGREWRITEAOF. */
7856 static void waitEmptyIOJobsQueue(void) {
7858 int io_processed_len
;
7861 if (listLength(server
.io_newjobs
) == 0 &&
7862 listLength(server
.io_processing
) == 0 &&
7863 server
.io_active_threads
== 0)
7868 /* While waiting for empty jobs queue condition we post-process some
7869 * finshed job, as I/O threads may be hanging trying to write against
7870 * the io_ready_pipe_write FD but there are so much pending jobs that
7872 io_processed_len
= listLength(server
.io_processed
);
7874 if (io_processed_len
) {
7875 vmThreadedIOCompletedJob(NULL
,server
.io_ready_pipe_read
,NULL
,0);
7876 usleep(1000); /* 1 millisecond */
7878 usleep(10000); /* 10 milliseconds */
7883 static void vmReopenSwapFile(void) {
7884 /* Note: we don't close the old one as we are in the child process
7885 * and don't want to mess at all with the original file object. */
7886 server
.vm_fp
= fopen(server
.vm_swap_file
,"r+b");
7887 if (server
.vm_fp
== NULL
) {
7888 redisLog(REDIS_WARNING
,"Can't re-open the VM swap file: %s. Exiting.",
7889 server
.vm_swap_file
);
7892 server
.vm_fd
= fileno(server
.vm_fp
);
7895 /* This function must be called while with threaded IO locked */
7896 static void queueIOJob(iojob
*j
) {
7897 redisLog(REDIS_DEBUG
,"Queued IO Job %p type %d about key '%s'\n",
7898 (void*)j
, j
->type
, (char*)j
->key
->ptr
);
7899 listAddNodeTail(server
.io_newjobs
,j
);
7900 if (server
.io_active_threads
< server
.vm_max_threads
)
7904 static int vmSwapObjectThreaded(robj
*key
, robj
*val
, redisDb
*db
) {
7907 assert(key
->storage
== REDIS_VM_MEMORY
);
7908 assert(key
->refcount
== 1);
7910 j
= zmalloc(sizeof(*j
));
7911 j
->type
= REDIS_IOJOB_PREPARE_SWAP
;
7913 j
->key
= dupStringObject(key
);
7917 j
->thread
= (pthread_t
) -1;
7918 key
->storage
= REDIS_VM_SWAPPING
;
7926 /* ============ Virtual Memory - Blocking clients on missing keys =========== */
7928 /* This function makes the clinet 'c' waiting for the key 'key' to be loaded.
7929 * If there is not already a job loading the key, it is craeted.
7930 * The key is added to the io_keys list in the client structure, and also
7931 * in the hash table mapping swapped keys to waiting clients, that is,
7932 * server.io_waited_keys. */
7933 static int waitForSwappedKey(redisClient
*c
, robj
*key
) {
7934 struct dictEntry
*de
;
7938 /* If the key does not exist or is already in RAM we don't need to
7939 * block the client at all. */
7940 de
= dictFind(c
->db
->dict
,key
);
7941 if (de
== NULL
) return 0;
7942 o
= dictGetEntryKey(de
);
7943 if (o
->storage
== REDIS_VM_MEMORY
) {
7945 } else if (o
->storage
== REDIS_VM_SWAPPING
) {
7946 /* We were swapping the key, undo it! */
7947 vmCancelThreadedIOJob(o
);
7951 /* OK: the key is either swapped, or being loaded just now. */
7953 /* Add the key to the list of keys this client is waiting for.
7954 * This maps clients to keys they are waiting for. */
7955 listAddNodeTail(c
->io_keys
,key
);
7958 /* Add the client to the swapped keys => clients waiting map. */
7959 de
= dictFind(c
->db
->io_keys
,key
);
7963 /* For every key we take a list of clients blocked for it */
7965 retval
= dictAdd(c
->db
->io_keys
,key
,l
);
7967 assert(retval
== DICT_OK
);
7969 l
= dictGetEntryVal(de
);
7971 listAddNodeTail(l
,c
);
7973 /* Are we already loading the key from disk? If not create a job */
7974 if (o
->storage
== REDIS_VM_SWAPPED
) {
7977 o
->storage
= REDIS_VM_LOADING
;
7978 j
= zmalloc(sizeof(*j
));
7979 j
->type
= REDIS_IOJOB_LOAD
;
7981 j
->key
= dupStringObject(key
);
7982 j
->key
->vtype
= o
->vtype
;
7983 j
->page
= o
->vm
.page
;
7986 j
->thread
= (pthread_t
) -1;
7994 /* Is this client attempting to run a command against swapped keys?
7995 * If so, block it ASAP, load the keys in background, then resume it.
7997 * The important idea about this function is that it can fail! If keys will
7998 * still be swapped when the client is resumed, this key lookups will
7999 * just block loading keys from disk. In practical terms this should only
8000 * happen with SORT BY command or if there is a bug in this function.
8002 * Return 1 if the client is marked as blocked, 0 if the client can
8003 * continue as the keys it is going to access appear to be in memory. */
8004 static int blockClientOnSwappedKeys(struct redisCommand
*cmd
, redisClient
*c
) {
8005 if (cmd
->proc
== getCommand
) {
8006 waitForSwappedKey(c
,c
->argv
[1]);
8008 /* If the client was blocked for at least one key, mark it as blocked. */
8009 if (listLength(c
->io_keys
)) {
8010 c
->flags
|= REDIS_IO_WAIT
;
8011 aeDeleteFileEvent(server
.el
,c
->fd
,AE_READABLE
);
8012 server
.vm_blocked_clients
++;
8019 /* Remove the 'key' from the list of blocked keys for a given client.
8021 * The function returns 1 when there are no longer blocking keys after
8022 * the current one was removed (and the client can be unblocked). */
8023 static int dontWaitForSwappedKey(redisClient
*c
, robj
*key
) {
8027 struct dictEntry
*de
;
8029 /* Remove the key from the list of keys this client is waiting for. */
8030 listRewind(c
->io_keys
,&li
);
8031 while ((ln
= listNext(&li
)) != NULL
) {
8032 if (compareStringObjects(ln
->value
,key
) == 0) {
8033 listDelNode(c
->io_keys
,ln
);
8039 /* Remove the client form the key => waiting clients map. */
8040 de
= dictFind(c
->db
->io_keys
,key
);
8042 l
= dictGetEntryVal(de
);
8043 ln
= listSearchKey(l
,c
);
8046 if (listLength(l
) == 0)
8047 dictDelete(c
->db
->io_keys
,key
);
8049 return listLength(c
->io_keys
) == 0;
8052 static void handleClientsBlockedOnSwappedKey(redisDb
*db
, robj
*key
) {
8053 struct dictEntry
*de
;
8058 de
= dictFind(db
->io_keys
,key
);
8061 l
= dictGetEntryVal(de
);
8062 len
= listLength(l
);
8063 /* Note: we can't use something like while(listLength(l)) as the list
8064 * can be freed by the calling function when we remove the last element. */
8067 redisClient
*c
= ln
->value
;
8069 if (dontWaitForSwappedKey(c
,key
)) {
8070 /* Put the client in the list of clients ready to go as we
8071 * loaded all the keys about it. */
8072 listAddNodeTail(server
.io_ready_clients
,c
);
8077 /* ================================= Debugging ============================== */
8079 static void debugCommand(redisClient
*c
) {
8080 if (!strcasecmp(c
->argv
[1]->ptr
,"segfault")) {
8082 } else if (!strcasecmp(c
->argv
[1]->ptr
,"reload")) {
8083 if (rdbSave(server
.dbfilename
) != REDIS_OK
) {
8084 addReply(c
,shared
.err
);
8088 if (rdbLoad(server
.dbfilename
) != REDIS_OK
) {
8089 addReply(c
,shared
.err
);
8092 redisLog(REDIS_WARNING
,"DB reloaded by DEBUG RELOAD");
8093 addReply(c
,shared
.ok
);
8094 } else if (!strcasecmp(c
->argv
[1]->ptr
,"loadaof")) {
8096 if (loadAppendOnlyFile(server
.appendfilename
) != REDIS_OK
) {
8097 addReply(c
,shared
.err
);
8100 redisLog(REDIS_WARNING
,"Append Only File loaded by DEBUG LOADAOF");
8101 addReply(c
,shared
.ok
);
8102 } else if (!strcasecmp(c
->argv
[1]->ptr
,"object") && c
->argc
== 3) {
8103 dictEntry
*de
= dictFind(c
->db
->dict
,c
->argv
[2]);
8107 addReply(c
,shared
.nokeyerr
);
8110 key
= dictGetEntryKey(de
);
8111 val
= dictGetEntryVal(de
);
8112 if (server
.vm_enabled
&& (key
->storage
== REDIS_VM_MEMORY
||
8113 key
->storage
== REDIS_VM_SWAPPING
)) {
8114 addReplySds(c
,sdscatprintf(sdsempty(),
8115 "+Key at:%p refcount:%d, value at:%p refcount:%d "
8116 "encoding:%d serializedlength:%lld\r\n",
8117 (void*)key
, key
->refcount
, (void*)val
, val
->refcount
,
8118 val
->encoding
, (long long) rdbSavedObjectLen(val
,NULL
)));
8120 addReplySds(c
,sdscatprintf(sdsempty(),
8121 "+Key at:%p refcount:%d, value swapped at: page %llu "
8122 "using %llu pages\r\n",
8123 (void*)key
, key
->refcount
, (unsigned long long) key
->vm
.page
,
8124 (unsigned long long) key
->vm
.usedpages
));
8126 } else if (!strcasecmp(c
->argv
[1]->ptr
,"swapout") && c
->argc
== 3) {
8127 dictEntry
*de
= dictFind(c
->db
->dict
,c
->argv
[2]);
8130 if (!server
.vm_enabled
) {
8131 addReplySds(c
,sdsnew("-ERR Virtual Memory is disabled\r\n"));
8135 addReply(c
,shared
.nokeyerr
);
8138 key
= dictGetEntryKey(de
);
8139 val
= dictGetEntryVal(de
);
8140 /* If the key is shared we want to create a copy */
8141 if (key
->refcount
> 1) {
8142 robj
*newkey
= dupStringObject(key
);
8144 key
= dictGetEntryKey(de
) = newkey
;
8147 if (key
->storage
!= REDIS_VM_MEMORY
) {
8148 addReplySds(c
,sdsnew("-ERR This key is not in memory\r\n"));
8149 } else if (vmSwapObjectBlocking(key
,val
) == REDIS_OK
) {
8150 dictGetEntryVal(de
) = NULL
;
8151 addReply(c
,shared
.ok
);
8153 addReply(c
,shared
.err
);
8156 addReplySds(c
,sdsnew(
8157 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>|SWAPOUT <key>|RELOAD]\r\n"));
8161 static void _redisAssert(char *estr
, char *file
, int line
) {
8162 redisLog(REDIS_WARNING
,"=== ASSERTION FAILED ===");
8163 redisLog(REDIS_WARNING
,"==> %s:%d '%s' is not true\n",file
,line
,estr
);
8164 #ifdef HAVE_BACKTRACE
8165 redisLog(REDIS_WARNING
,"(forcing SIGSEGV in order to print the stack trace)");
8170 /* =================================== Main! ================================ */
8173 int linuxOvercommitMemoryValue(void) {
8174 FILE *fp
= fopen("/proc/sys/vm/overcommit_memory","r");
8178 if (fgets(buf
,64,fp
) == NULL
) {
8187 void linuxOvercommitMemoryWarning(void) {
8188 if (linuxOvercommitMemoryValue() == 0) {
8189 redisLog(REDIS_WARNING
,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
8192 #endif /* __linux__ */
8194 static void daemonize(void) {
8198 if (fork() != 0) exit(0); /* parent exits */
8199 setsid(); /* create a new session */
8201 /* Every output goes to /dev/null. If Redis is daemonized but
8202 * the 'logfile' is set to 'stdout' in the configuration file
8203 * it will not log at all. */
8204 if ((fd
= open("/dev/null", O_RDWR
, 0)) != -1) {
8205 dup2(fd
, STDIN_FILENO
);
8206 dup2(fd
, STDOUT_FILENO
);
8207 dup2(fd
, STDERR_FILENO
);
8208 if (fd
> STDERR_FILENO
) close(fd
);
8210 /* Try to write the pid file */
8211 fp
= fopen(server
.pidfile
,"w");
8213 fprintf(fp
,"%d\n",getpid());
8218 int main(int argc
, char **argv
) {
8223 resetServerSaveParams();
8224 loadServerConfig(argv
[1]);
8225 } else if (argc
> 2) {
8226 fprintf(stderr
,"Usage: ./redis-server [/path/to/redis.conf]\n");
8229 redisLog(REDIS_WARNING
,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
8231 if (server
.daemonize
) daemonize();
8233 redisLog(REDIS_NOTICE
,"Server started, Redis version " REDIS_VERSION
);
8235 linuxOvercommitMemoryWarning();
8238 if (server
.appendonly
) {
8239 if (loadAppendOnlyFile(server
.appendfilename
) == REDIS_OK
)
8240 redisLog(REDIS_NOTICE
,"DB loaded from append only file: %ld seconds",time(NULL
)-start
);
8242 if (rdbLoad(server
.dbfilename
) == REDIS_OK
)
8243 redisLog(REDIS_NOTICE
,"DB loaded from disk: %ld seconds",time(NULL
)-start
);
8245 redisLog(REDIS_NOTICE
,"The server is now ready to accept connections on port %d", server
.port
);
8246 aeSetBeforeSleepProc(server
.el
,beforeSleep
);
8248 aeDeleteEventLoop(server
.el
);
8252 /* ============================= Backtrace support ========================= */
8254 #ifdef HAVE_BACKTRACE
8255 static char *findFuncName(void *pointer
, unsigned long *offset
);
8257 static void *getMcontextEip(ucontext_t
*uc
) {
8258 #if defined(__FreeBSD__)
8259 return (void*) uc
->uc_mcontext
.mc_eip
;
8260 #elif defined(__dietlibc__)
8261 return (void*) uc
->uc_mcontext
.eip
;
8262 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
8264 return (void*) uc
->uc_mcontext
->__ss
.__rip
;
8266 return (void*) uc
->uc_mcontext
->__ss
.__eip
;
8268 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
8269 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
8270 return (void*) uc
->uc_mcontext
->__ss
.__rip
;
8272 return (void*) uc
->uc_mcontext
->__ss
.__eip
;
8274 #elif defined(__i386__) || defined(__X86_64__) || defined(__x86_64__)
8275 return (void*) uc
->uc_mcontext
.gregs
[REG_EIP
]; /* Linux 32/64 bit */
8276 #elif defined(__ia64__) /* Linux IA64 */
8277 return (void*) uc
->uc_mcontext
.sc_ip
;
8283 static void segvHandler(int sig
, siginfo_t
*info
, void *secret
) {
8285 char **messages
= NULL
;
8286 int i
, trace_size
= 0;
8287 unsigned long offset
=0;
8288 ucontext_t
*uc
= (ucontext_t
*) secret
;
8290 REDIS_NOTUSED(info
);
8292 redisLog(REDIS_WARNING
,
8293 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION
, sig
);
8294 infostring
= genRedisInfoString();
8295 redisLog(REDIS_WARNING
, "%s",infostring
);
8296 /* It's not safe to sdsfree() the returned string under memory
8297 * corruption conditions. Let it leak as we are going to abort */
8299 trace_size
= backtrace(trace
, 100);
8300 /* overwrite sigaction with caller's address */
8301 if (getMcontextEip(uc
) != NULL
) {
8302 trace
[1] = getMcontextEip(uc
);
8304 messages
= backtrace_symbols(trace
, trace_size
);
8306 for (i
=1; i
<trace_size
; ++i
) {
8307 char *fn
= findFuncName(trace
[i
], &offset
), *p
;
8309 p
= strchr(messages
[i
],'+');
8310 if (!fn
|| (p
&& ((unsigned long)strtol(p
+1,NULL
,10)) < offset
)) {
8311 redisLog(REDIS_WARNING
,"%s", messages
[i
]);
8313 redisLog(REDIS_WARNING
,"%d redis-server %p %s + %d", i
, trace
[i
], fn
, (unsigned int)offset
);
8316 /* free(messages); Don't call free() with possibly corrupted memory. */
8320 static void setupSigSegvAction(void) {
8321 struct sigaction act
;
8323 sigemptyset (&act
.sa_mask
);
8324 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
8325 * is used. Otherwise, sa_handler is used */
8326 act
.sa_flags
= SA_NODEFER
| SA_ONSTACK
| SA_RESETHAND
| SA_SIGINFO
;
8327 act
.sa_sigaction
= segvHandler
;
8328 sigaction (SIGSEGV
, &act
, NULL
);
8329 sigaction (SIGBUS
, &act
, NULL
);
8330 sigaction (SIGFPE
, &act
, NULL
);
8331 sigaction (SIGILL
, &act
, NULL
);
8332 sigaction (SIGBUS
, &act
, NULL
);
8336 #include "staticsymbols.h"
8337 /* This function try to convert a pointer into a function name. It's used in
8338 * oreder to provide a backtrace under segmentation fault that's able to
8339 * display functions declared as static (otherwise the backtrace is useless). */
8340 static char *findFuncName(void *pointer
, unsigned long *offset
){
8342 unsigned long off
, minoff
= 0;
8344 /* Try to match against the Symbol with the smallest offset */
8345 for (i
=0; symsTable
[i
].pointer
; i
++) {
8346 unsigned long lp
= (unsigned long) pointer
;
8348 if (lp
!= (unsigned long)-1 && lp
>= symsTable
[i
].pointer
) {
8349 off
=lp
-symsTable
[i
].pointer
;
8350 if (ret
< 0 || off
< minoff
) {
8356 if (ret
== -1) return NULL
;
8358 return symsTable
[ret
].name
;
8360 #else /* HAVE_BACKTRACE */
8361 static void setupSigSegvAction(void) {
8363 #endif /* HAVE_BACKTRACE */