]> git.saurik.com Git - redis.git/blame_incremental - redis.c
VM low level pages handling
[redis.git] / redis.c
... / ...
CommitLineData
1/*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30#define REDIS_VERSION "1.3.2"
31
32#include "fmacros.h"
33#include "config.h"
34
35#include <stdio.h>
36#include <stdlib.h>
37#include <string.h>
38#include <time.h>
39#include <unistd.h>
40#define __USE_POSIX199309
41#include <signal.h>
42
43#ifdef HAVE_BACKTRACE
44#include <execinfo.h>
45#include <ucontext.h>
46#endif /* HAVE_BACKTRACE */
47
48#include <sys/wait.h>
49#include <errno.h>
50#include <assert.h>
51#include <ctype.h>
52#include <stdarg.h>
53#include <inttypes.h>
54#include <arpa/inet.h>
55#include <sys/stat.h>
56#include <fcntl.h>
57#include <sys/time.h>
58#include <sys/resource.h>
59#include <sys/uio.h>
60#include <limits.h>
61#include <math.h>
62
63#if defined(__sun)
64#include "solarisfixes.h"
65#endif
66
67#include "redis.h"
68#include "ae.h" /* Event driven programming library */
69#include "sds.h" /* Dynamic safe strings */
70#include "anet.h" /* Networking the easy way */
71#include "dict.h" /* Hash tables */
72#include "adlist.h" /* Linked lists */
73#include "zmalloc.h" /* total memory usage aware version of malloc/free */
74#include "lzf.h" /* LZF compression library */
75#include "pqsort.h" /* Partial qsort for SORT+LIMIT */
76
77/* Error codes */
78#define REDIS_OK 0
79#define REDIS_ERR -1
80
81/* Static server configuration */
82#define REDIS_SERVERPORT 6379 /* TCP port */
83#define REDIS_MAXIDLETIME (60*5) /* default client timeout */
84#define REDIS_IOBUF_LEN 1024
85#define REDIS_LOADBUF_LEN 1024
86#define REDIS_STATIC_ARGS 4
87#define REDIS_DEFAULT_DBNUM 16
88#define REDIS_CONFIGLINE_MAX 1024
89#define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
90#define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
91#define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
92#define REDIS_MAX_WRITE_PER_EVENT (1024*64)
93#define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
94
95/* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */
96#define REDIS_WRITEV_THRESHOLD 3
97/* Max number of iovecs used for each writev call */
98#define REDIS_WRITEV_IOVEC_COUNT 256
99
100/* Hash table parameters */
101#define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
102
103/* Command flags */
104#define REDIS_CMD_BULK 1 /* Bulk write command */
105#define REDIS_CMD_INLINE 2 /* Inline command */
106/* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
107 this flags will return an error when the 'maxmemory' option is set in the
108 config file and the server is using more than maxmemory bytes of memory.
109 In short this commands are denied on low memory conditions. */
110#define REDIS_CMD_DENYOOM 4
111
112/* Object types */
113#define REDIS_STRING 0
114#define REDIS_LIST 1
115#define REDIS_SET 2
116#define REDIS_ZSET 3
117#define REDIS_HASH 4
118
119/* Objects encoding */
120#define REDIS_ENCODING_RAW 0 /* Raw representation */
121#define REDIS_ENCODING_INT 1 /* Encoded as integer */
122
123/* Object types only used for dumping to disk */
124#define REDIS_EXPIRETIME 253
125#define REDIS_SELECTDB 254
126#define REDIS_EOF 255
127
128/* Defines related to the dump file format. To store 32 bits lengths for short
129 * keys requires a lot of space, so we check the most significant 2 bits of
130 * the first byte to interpreter the length:
131 *
132 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
133 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
134 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
135 * 11|000000 this means: specially encoded object will follow. The six bits
136 * number specify the kind of object that follows.
137 * See the REDIS_RDB_ENC_* defines.
138 *
139 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
140 * values, will fit inside. */
141#define REDIS_RDB_6BITLEN 0
142#define REDIS_RDB_14BITLEN 1
143#define REDIS_RDB_32BITLEN 2
144#define REDIS_RDB_ENCVAL 3
145#define REDIS_RDB_LENERR UINT_MAX
146
147/* When a length of a string object stored on disk has the first two bits
148 * set, the remaining two bits specify a special encoding for the object
149 * accordingly to the following defines: */
150#define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
151#define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
152#define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
153#define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
154
155/* Virtual memory object->where field. */
156#define REDIS_VM_MEMORY 0 /* The object is on memory */
157#define REDIS_VM_SWAPPED 1 /* The object is on disk */
158#define REDIS_VM_SWAPPING 2 /* Redis is swapping this object on disk */
159#define REDIS_VM_LOADING 3 /* Redis is loading this object from disk */
160
161/* Virtual memory static configuration stuff.
162 * Check vmFindContiguousPages() to know more about this magic numbers. */
163#define REDIS_VM_MAX_NEAR_PAGES 65536
164#define REDIS_VM_MAX_RANDOM_JUMP 4096
165
166/* Client flags */
167#define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
168#define REDIS_SLAVE 2 /* This client is a slave server */
169#define REDIS_MASTER 4 /* This client is a master server */
170#define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
171#define REDIS_MULTI 16 /* This client is in a MULTI context */
172#define REDIS_BLOCKED 32 /* The client is waiting in a blocking operation */
173
174/* Slave replication state - slave side */
175#define REDIS_REPL_NONE 0 /* No active replication */
176#define REDIS_REPL_CONNECT 1 /* Must connect to master */
177#define REDIS_REPL_CONNECTED 2 /* Connected to master */
178
179/* Slave replication state - from the point of view of master
180 * Note that in SEND_BULK and ONLINE state the slave receives new updates
181 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
182 * to start the next background saving in order to send updates to it. */
183#define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
184#define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
185#define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
186#define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
187
188/* List related stuff */
189#define REDIS_HEAD 0
190#define REDIS_TAIL 1
191
192/* Sort operations */
193#define REDIS_SORT_GET 0
194#define REDIS_SORT_ASC 1
195#define REDIS_SORT_DESC 2
196#define REDIS_SORTKEY_MAX 1024
197
198/* Log levels */
199#define REDIS_DEBUG 0
200#define REDIS_NOTICE 1
201#define REDIS_WARNING 2
202
203/* Anti-warning macro... */
204#define REDIS_NOTUSED(V) ((void) V)
205
206#define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
207#define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
208
209/* Append only defines */
210#define APPENDFSYNC_NO 0
211#define APPENDFSYNC_ALWAYS 1
212#define APPENDFSYNC_EVERYSEC 2
213
214/* We can print the stacktrace, so our assert is defined this way: */
215#define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e),exit(1)))
216static void _redisAssert(char *estr);
217
218/*================================= Data types ============================== */
219
220/* A redis object, that is a type able to hold a string / list / set */
221
222/* The VM object structure */
223struct redisObjectVM {
224 off_t offset; /* the page at witch the object is stored on disk */
225 int pages; /* number of pages used on disk */
226} vm;
227
228/* The actual Redis Object */
229typedef struct redisObject {
230 void *ptr;
231 unsigned char type;
232 unsigned char encoding;
233 unsigned char storage; /* where? REDIS_VM_MEMORY, REDIS_VM_SWAPPED, ... */
234 unsigned char notused;
235 int refcount;
236 /* VM fields, this are only allocated if VM is active, otherwise the
237 * object allocation function will just allocate
238 * sizeof(redisObjct) minus sizeof(redisObjectVM), so using
239 * Redis without VM active will not have any overhead. */
240 struct redisObjectVM vm;
241} robj;
242
243/* Macro used to initalize a Redis object allocated on the stack.
244 * Note that this macro is taken near the structure definition to make sure
245 * we'll update it when the structure is changed, to avoid bugs like
246 * bug #85 introduced exactly in this way. */
247#define initStaticStringObject(_var,_ptr) do { \
248 _var.refcount = 1; \
249 _var.type = REDIS_STRING; \
250 _var.encoding = REDIS_ENCODING_RAW; \
251 _var.ptr = _ptr; \
252} while(0);
253
254typedef struct redisDb {
255 dict *dict; /* The keyspace for this DB */
256 dict *expires; /* Timeout of keys with a timeout set */
257 dict *blockingkeys; /* Keys with clients waiting for data (BLPOP) */
258 int id;
259} redisDb;
260
261/* Client MULTI/EXEC state */
262typedef struct multiCmd {
263 robj **argv;
264 int argc;
265 struct redisCommand *cmd;
266} multiCmd;
267
268typedef struct multiState {
269 multiCmd *commands; /* Array of MULTI commands */
270 int count; /* Total number of MULTI commands */
271} multiState;
272
273/* With multiplexing we need to take per-clinet state.
274 * Clients are taken in a liked list. */
275typedef struct redisClient {
276 int fd;
277 redisDb *db;
278 int dictid;
279 sds querybuf;
280 robj **argv, **mbargv;
281 int argc, mbargc;
282 int bulklen; /* bulk read len. -1 if not in bulk read mode */
283 int multibulk; /* multi bulk command format active */
284 list *reply;
285 int sentlen;
286 time_t lastinteraction; /* time of the last interaction, used for timeout */
287 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
288 /* REDIS_MULTI */
289 int slaveseldb; /* slave selected db, if this client is a slave */
290 int authenticated; /* when requirepass is non-NULL */
291 int replstate; /* replication state if this is a slave */
292 int repldbfd; /* replication DB file descriptor */
293 long repldboff; /* replication DB file offset */
294 off_t repldbsize; /* replication DB file size */
295 multiState mstate; /* MULTI/EXEC state */
296 robj **blockingkeys; /* The key we waiting to terminate a blocking
297 * operation such as BLPOP. Otherwise NULL. */
298 int blockingkeysnum; /* Number of blocking keys */
299 time_t blockingto; /* Blocking operation timeout. If UNIX current time
300 * is >= blockingto then the operation timed out. */
301} redisClient;
302
303struct saveparam {
304 time_t seconds;
305 int changes;
306};
307
308/* Global server state structure */
309struct redisServer {
310 int port;
311 int fd;
312 redisDb *db;
313 dict *sharingpool; /* Poll used for object sharing */
314 unsigned int sharingpoolsize;
315 long long dirty; /* changes to DB from the last save */
316 list *clients;
317 list *slaves, *monitors;
318 char neterr[ANET_ERR_LEN];
319 aeEventLoop *el;
320 int cronloops; /* number of times the cron function run */
321 list *objfreelist; /* A list of freed objects to avoid malloc() */
322 time_t lastsave; /* Unix time of last save succeeede */
323 size_t usedmemory; /* Used memory in megabytes */
324 /* Fields used only for stats */
325 time_t stat_starttime; /* server start time */
326 long long stat_numcommands; /* number of processed commands */
327 long long stat_numconnections; /* number of connections received */
328 /* Configuration */
329 int verbosity;
330 int glueoutputbuf;
331 int maxidletime;
332 int dbnum;
333 int daemonize;
334 int appendonly;
335 int appendfsync;
336 time_t lastfsync;
337 int appendfd;
338 int appendseldb;
339 char *pidfile;
340 pid_t bgsavechildpid;
341 pid_t bgrewritechildpid;
342 sds bgrewritebuf; /* buffer taken by parent during oppend only rewrite */
343 struct saveparam *saveparams;
344 int saveparamslen;
345 char *logfile;
346 char *bindaddr;
347 char *dbfilename;
348 char *appendfilename;
349 char *requirepass;
350 int shareobjects;
351 int rdbcompression;
352 /* Replication related */
353 int isslave;
354 char *masterauth;
355 char *masterhost;
356 int masterport;
357 redisClient *master; /* client that is master for this slave */
358 int replstate;
359 unsigned int maxclients;
360 unsigned long maxmemory;
361 unsigned int blockedclients;
362 /* Sort parameters - qsort_r() is only available under BSD so we
363 * have to take this state global, in order to pass it to sortCompare() */
364 int sort_desc;
365 int sort_alpha;
366 int sort_bypattern;
367 /* Virtual memory configuration */
368 int vm_enabled;
369 off_t vm_page_size;
370 off_t vm_pages;
371 long vm_max_memory;
372 /* Virtual memory state */
373 FILE *vm_fp;
374 int vm_fd;
375 off_t vm_next_page; /* Next probably empty page */
376 off_t vm_near_pages; /* Number of pages allocated sequentially */
377 unsigned char *vm_bitmap; /* Bitmap of free/used pages */
378};
379
380typedef void redisCommandProc(redisClient *c);
381struct redisCommand {
382 char *name;
383 redisCommandProc *proc;
384 int arity;
385 int flags;
386};
387
388struct redisFunctionSym {
389 char *name;
390 unsigned long pointer;
391};
392
393typedef struct _redisSortObject {
394 robj *obj;
395 union {
396 double score;
397 robj *cmpobj;
398 } u;
399} redisSortObject;
400
401typedef struct _redisSortOperation {
402 int type;
403 robj *pattern;
404} redisSortOperation;
405
406/* ZSETs use a specialized version of Skiplists */
407
408typedef struct zskiplistNode {
409 struct zskiplistNode **forward;
410 struct zskiplistNode *backward;
411 double score;
412 robj *obj;
413} zskiplistNode;
414
415typedef struct zskiplist {
416 struct zskiplistNode *header, *tail;
417 unsigned long length;
418 int level;
419} zskiplist;
420
421typedef struct zset {
422 dict *dict;
423 zskiplist *zsl;
424} zset;
425
426/* Our shared "common" objects */
427
428struct sharedObjectsStruct {
429 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
430 *colon, *nullbulk, *nullmultibulk, *queued,
431 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
432 *outofrangeerr, *plus,
433 *select0, *select1, *select2, *select3, *select4,
434 *select5, *select6, *select7, *select8, *select9;
435} shared;
436
437/* Global vars that are actally used as constants. The following double
438 * values are used for double on-disk serialization, and are initialized
439 * at runtime to avoid strange compiler optimizations. */
440
441static double R_Zero, R_PosInf, R_NegInf, R_Nan;
442
443/*================================ Prototypes =============================== */
444
445static void freeStringObject(robj *o);
446static void freeListObject(robj *o);
447static void freeSetObject(robj *o);
448static void decrRefCount(void *o);
449static robj *createObject(int type, void *ptr);
450static void freeClient(redisClient *c);
451static int rdbLoad(char *filename);
452static void addReply(redisClient *c, robj *obj);
453static void addReplySds(redisClient *c, sds s);
454static void incrRefCount(robj *o);
455static int rdbSaveBackground(char *filename);
456static robj *createStringObject(char *ptr, size_t len);
457static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
458static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc);
459static int syncWithMaster(void);
460static robj *tryObjectSharing(robj *o);
461static int tryObjectEncoding(robj *o);
462static robj *getDecodedObject(robj *o);
463static int removeExpire(redisDb *db, robj *key);
464static int expireIfNeeded(redisDb *db, robj *key);
465static int deleteIfVolatile(redisDb *db, robj *key);
466static int deleteKey(redisDb *db, robj *key);
467static time_t getExpire(redisDb *db, robj *key);
468static int setExpire(redisDb *db, robj *key, time_t when);
469static void updateSlavesWaitingBgsave(int bgsaveerr);
470static void freeMemoryIfNeeded(void);
471static int processCommand(redisClient *c);
472static void setupSigSegvAction(void);
473static void rdbRemoveTempFile(pid_t childpid);
474static void aofRemoveTempFile(pid_t childpid);
475static size_t stringObjectLen(robj *o);
476static void processInputBuffer(redisClient *c);
477static zskiplist *zslCreate(void);
478static void zslFree(zskiplist *zsl);
479static void zslInsert(zskiplist *zsl, double score, robj *obj);
480static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask);
481static void initClientMultiState(redisClient *c);
482static void freeClientMultiState(redisClient *c);
483static void queueMultiCommand(redisClient *c, struct redisCommand *cmd);
484static void unblockClient(redisClient *c);
485static int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele);
486static void vmInit(void);
487
488static void authCommand(redisClient *c);
489static void pingCommand(redisClient *c);
490static void echoCommand(redisClient *c);
491static void setCommand(redisClient *c);
492static void setnxCommand(redisClient *c);
493static void getCommand(redisClient *c);
494static void delCommand(redisClient *c);
495static void existsCommand(redisClient *c);
496static void incrCommand(redisClient *c);
497static void decrCommand(redisClient *c);
498static void incrbyCommand(redisClient *c);
499static void decrbyCommand(redisClient *c);
500static void selectCommand(redisClient *c);
501static void randomkeyCommand(redisClient *c);
502static void keysCommand(redisClient *c);
503static void dbsizeCommand(redisClient *c);
504static void lastsaveCommand(redisClient *c);
505static void saveCommand(redisClient *c);
506static void bgsaveCommand(redisClient *c);
507static void bgrewriteaofCommand(redisClient *c);
508static void shutdownCommand(redisClient *c);
509static void moveCommand(redisClient *c);
510static void renameCommand(redisClient *c);
511static void renamenxCommand(redisClient *c);
512static void lpushCommand(redisClient *c);
513static void rpushCommand(redisClient *c);
514static void lpopCommand(redisClient *c);
515static void rpopCommand(redisClient *c);
516static void llenCommand(redisClient *c);
517static void lindexCommand(redisClient *c);
518static void lrangeCommand(redisClient *c);
519static void ltrimCommand(redisClient *c);
520static void typeCommand(redisClient *c);
521static void lsetCommand(redisClient *c);
522static void saddCommand(redisClient *c);
523static void sremCommand(redisClient *c);
524static void smoveCommand(redisClient *c);
525static void sismemberCommand(redisClient *c);
526static void scardCommand(redisClient *c);
527static void spopCommand(redisClient *c);
528static void srandmemberCommand(redisClient *c);
529static void sinterCommand(redisClient *c);
530static void sinterstoreCommand(redisClient *c);
531static void sunionCommand(redisClient *c);
532static void sunionstoreCommand(redisClient *c);
533static void sdiffCommand(redisClient *c);
534static void sdiffstoreCommand(redisClient *c);
535static void syncCommand(redisClient *c);
536static void flushdbCommand(redisClient *c);
537static void flushallCommand(redisClient *c);
538static void sortCommand(redisClient *c);
539static void lremCommand(redisClient *c);
540static void rpoplpushcommand(redisClient *c);
541static void infoCommand(redisClient *c);
542static void mgetCommand(redisClient *c);
543static void monitorCommand(redisClient *c);
544static void expireCommand(redisClient *c);
545static void expireatCommand(redisClient *c);
546static void getsetCommand(redisClient *c);
547static void ttlCommand(redisClient *c);
548static void slaveofCommand(redisClient *c);
549static void debugCommand(redisClient *c);
550static void msetCommand(redisClient *c);
551static void msetnxCommand(redisClient *c);
552static void zaddCommand(redisClient *c);
553static void zincrbyCommand(redisClient *c);
554static void zrangeCommand(redisClient *c);
555static void zrangebyscoreCommand(redisClient *c);
556static void zrevrangeCommand(redisClient *c);
557static void zcardCommand(redisClient *c);
558static void zremCommand(redisClient *c);
559static void zscoreCommand(redisClient *c);
560static void zremrangebyscoreCommand(redisClient *c);
561static void multiCommand(redisClient *c);
562static void execCommand(redisClient *c);
563static void blpopCommand(redisClient *c);
564static void brpopCommand(redisClient *c);
565
566/*================================= Globals ================================= */
567
568/* Global vars */
569static struct redisServer server; /* server global state */
570static struct redisCommand cmdTable[] = {
571 {"get",getCommand,2,REDIS_CMD_INLINE},
572 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
573 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
574 {"del",delCommand,-2,REDIS_CMD_INLINE},
575 {"exists",existsCommand,2,REDIS_CMD_INLINE},
576 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
577 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
578 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
579 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
580 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
581 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
582 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
583 {"brpop",brpopCommand,-3,REDIS_CMD_INLINE},
584 {"blpop",blpopCommand,-3,REDIS_CMD_INLINE},
585 {"llen",llenCommand,2,REDIS_CMD_INLINE},
586 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
587 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
588 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
589 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
590 {"lrem",lremCommand,4,REDIS_CMD_BULK},
591 {"rpoplpush",rpoplpushcommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
592 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
593 {"srem",sremCommand,3,REDIS_CMD_BULK},
594 {"smove",smoveCommand,4,REDIS_CMD_BULK},
595 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
596 {"scard",scardCommand,2,REDIS_CMD_INLINE},
597 {"spop",spopCommand,2,REDIS_CMD_INLINE},
598 {"srandmember",srandmemberCommand,2,REDIS_CMD_INLINE},
599 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
600 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
601 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
602 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
603 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
604 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
605 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
606 {"zadd",zaddCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
607 {"zincrby",zincrbyCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
608 {"zrem",zremCommand,3,REDIS_CMD_BULK},
609 {"zremrangebyscore",zremrangebyscoreCommand,4,REDIS_CMD_INLINE},
610 {"zrange",zrangeCommand,-4,REDIS_CMD_INLINE},
611 {"zrangebyscore",zrangebyscoreCommand,-4,REDIS_CMD_INLINE},
612 {"zrevrange",zrevrangeCommand,-4,REDIS_CMD_INLINE},
613 {"zcard",zcardCommand,2,REDIS_CMD_INLINE},
614 {"zscore",zscoreCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
615 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
616 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
617 {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
618 {"mset",msetCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
619 {"msetnx",msetnxCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
620 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
621 {"select",selectCommand,2,REDIS_CMD_INLINE},
622 {"move",moveCommand,3,REDIS_CMD_INLINE},
623 {"rename",renameCommand,3,REDIS_CMD_INLINE},
624 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
625 {"expire",expireCommand,3,REDIS_CMD_INLINE},
626 {"expireat",expireatCommand,3,REDIS_CMD_INLINE},
627 {"keys",keysCommand,2,REDIS_CMD_INLINE},
628 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
629 {"auth",authCommand,2,REDIS_CMD_INLINE},
630 {"ping",pingCommand,1,REDIS_CMD_INLINE},
631 {"echo",echoCommand,2,REDIS_CMD_BULK},
632 {"save",saveCommand,1,REDIS_CMD_INLINE},
633 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
634 {"bgrewriteaof",bgrewriteaofCommand,1,REDIS_CMD_INLINE},
635 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
636 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
637 {"type",typeCommand,2,REDIS_CMD_INLINE},
638 {"multi",multiCommand,1,REDIS_CMD_INLINE},
639 {"exec",execCommand,1,REDIS_CMD_INLINE},
640 {"sync",syncCommand,1,REDIS_CMD_INLINE},
641 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
642 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
643 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
644 {"info",infoCommand,1,REDIS_CMD_INLINE},
645 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
646 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
647 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
648 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
649 {NULL,NULL,0,0}
650};
651
652/*============================ Utility functions ============================ */
653
654/* Glob-style pattern matching. */
655int stringmatchlen(const char *pattern, int patternLen,
656 const char *string, int stringLen, int nocase)
657{
658 while(patternLen) {
659 switch(pattern[0]) {
660 case '*':
661 while (pattern[1] == '*') {
662 pattern++;
663 patternLen--;
664 }
665 if (patternLen == 1)
666 return 1; /* match */
667 while(stringLen) {
668 if (stringmatchlen(pattern+1, patternLen-1,
669 string, stringLen, nocase))
670 return 1; /* match */
671 string++;
672 stringLen--;
673 }
674 return 0; /* no match */
675 break;
676 case '?':
677 if (stringLen == 0)
678 return 0; /* no match */
679 string++;
680 stringLen--;
681 break;
682 case '[':
683 {
684 int not, match;
685
686 pattern++;
687 patternLen--;
688 not = pattern[0] == '^';
689 if (not) {
690 pattern++;
691 patternLen--;
692 }
693 match = 0;
694 while(1) {
695 if (pattern[0] == '\\') {
696 pattern++;
697 patternLen--;
698 if (pattern[0] == string[0])
699 match = 1;
700 } else if (pattern[0] == ']') {
701 break;
702 } else if (patternLen == 0) {
703 pattern--;
704 patternLen++;
705 break;
706 } else if (pattern[1] == '-' && patternLen >= 3) {
707 int start = pattern[0];
708 int end = pattern[2];
709 int c = string[0];
710 if (start > end) {
711 int t = start;
712 start = end;
713 end = t;
714 }
715 if (nocase) {
716 start = tolower(start);
717 end = tolower(end);
718 c = tolower(c);
719 }
720 pattern += 2;
721 patternLen -= 2;
722 if (c >= start && c <= end)
723 match = 1;
724 } else {
725 if (!nocase) {
726 if (pattern[0] == string[0])
727 match = 1;
728 } else {
729 if (tolower((int)pattern[0]) == tolower((int)string[0]))
730 match = 1;
731 }
732 }
733 pattern++;
734 patternLen--;
735 }
736 if (not)
737 match = !match;
738 if (!match)
739 return 0; /* no match */
740 string++;
741 stringLen--;
742 break;
743 }
744 case '\\':
745 if (patternLen >= 2) {
746 pattern++;
747 patternLen--;
748 }
749 /* fall through */
750 default:
751 if (!nocase) {
752 if (pattern[0] != string[0])
753 return 0; /* no match */
754 } else {
755 if (tolower((int)pattern[0]) != tolower((int)string[0]))
756 return 0; /* no match */
757 }
758 string++;
759 stringLen--;
760 break;
761 }
762 pattern++;
763 patternLen--;
764 if (stringLen == 0) {
765 while(*pattern == '*') {
766 pattern++;
767 patternLen--;
768 }
769 break;
770 }
771 }
772 if (patternLen == 0 && stringLen == 0)
773 return 1;
774 return 0;
775}
776
777static void redisLog(int level, const char *fmt, ...) {
778 va_list ap;
779 FILE *fp;
780
781 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
782 if (!fp) return;
783
784 va_start(ap, fmt);
785 if (level >= server.verbosity) {
786 char *c = ".-*";
787 char buf[64];
788 time_t now;
789
790 now = time(NULL);
791 strftime(buf,64,"%d %b %H:%M:%S",localtime(&now));
792 fprintf(fp,"%s %c ",buf,c[level]);
793 vfprintf(fp, fmt, ap);
794 fprintf(fp,"\n");
795 fflush(fp);
796 }
797 va_end(ap);
798
799 if (server.logfile) fclose(fp);
800}
801
802/*====================== Hash table type implementation ==================== */
803
804/* This is an hash table type that uses the SDS dynamic strings libary as
805 * keys and radis objects as values (objects can hold SDS strings,
806 * lists, sets). */
807
808static void dictVanillaFree(void *privdata, void *val)
809{
810 DICT_NOTUSED(privdata);
811 zfree(val);
812}
813
814static void dictListDestructor(void *privdata, void *val)
815{
816 DICT_NOTUSED(privdata);
817 listRelease((list*)val);
818}
819
820static int sdsDictKeyCompare(void *privdata, const void *key1,
821 const void *key2)
822{
823 int l1,l2;
824 DICT_NOTUSED(privdata);
825
826 l1 = sdslen((sds)key1);
827 l2 = sdslen((sds)key2);
828 if (l1 != l2) return 0;
829 return memcmp(key1, key2, l1) == 0;
830}
831
832static void dictRedisObjectDestructor(void *privdata, void *val)
833{
834 DICT_NOTUSED(privdata);
835
836 decrRefCount(val);
837}
838
839static int dictObjKeyCompare(void *privdata, const void *key1,
840 const void *key2)
841{
842 const robj *o1 = key1, *o2 = key2;
843 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
844}
845
846static unsigned int dictObjHash(const void *key) {
847 const robj *o = key;
848 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
849}
850
851static int dictEncObjKeyCompare(void *privdata, const void *key1,
852 const void *key2)
853{
854 robj *o1 = (robj*) key1, *o2 = (robj*) key2;
855 int cmp;
856
857 o1 = getDecodedObject(o1);
858 o2 = getDecodedObject(o2);
859 cmp = sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
860 decrRefCount(o1);
861 decrRefCount(o2);
862 return cmp;
863}
864
865static unsigned int dictEncObjHash(const void *key) {
866 robj *o = (robj*) key;
867
868 o = getDecodedObject(o);
869 unsigned int hash = dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
870 decrRefCount(o);
871 return hash;
872}
873
874static dictType setDictType = {
875 dictEncObjHash, /* hash function */
876 NULL, /* key dup */
877 NULL, /* val dup */
878 dictEncObjKeyCompare, /* key compare */
879 dictRedisObjectDestructor, /* key destructor */
880 NULL /* val destructor */
881};
882
883static dictType zsetDictType = {
884 dictEncObjHash, /* hash function */
885 NULL, /* key dup */
886 NULL, /* val dup */
887 dictEncObjKeyCompare, /* key compare */
888 dictRedisObjectDestructor, /* key destructor */
889 dictVanillaFree /* val destructor of malloc(sizeof(double)) */
890};
891
892static dictType hashDictType = {
893 dictObjHash, /* hash function */
894 NULL, /* key dup */
895 NULL, /* val dup */
896 dictObjKeyCompare, /* key compare */
897 dictRedisObjectDestructor, /* key destructor */
898 dictRedisObjectDestructor /* val destructor */
899};
900
901/* Keylist hash table type has unencoded redis objects as keys and
902 * lists as values. It's used for blocking operations (BLPOP) */
903static dictType keylistDictType = {
904 dictObjHash, /* hash function */
905 NULL, /* key dup */
906 NULL, /* val dup */
907 dictObjKeyCompare, /* key compare */
908 dictRedisObjectDestructor, /* key destructor */
909 dictListDestructor /* val destructor */
910};
911
912/* ========================= Random utility functions ======================= */
913
914/* Redis generally does not try to recover from out of memory conditions
915 * when allocating objects or strings, it is not clear if it will be possible
916 * to report this condition to the client since the networking layer itself
917 * is based on heap allocation for send buffers, so we simply abort.
918 * At least the code will be simpler to read... */
919static void oom(const char *msg) {
920 redisLog(REDIS_WARNING, "%s: Out of memory\n",msg);
921 sleep(1);
922 abort();
923}
924
925/* ====================== Redis server networking stuff ===================== */
926static void closeTimedoutClients(void) {
927 redisClient *c;
928 listNode *ln;
929 time_t now = time(NULL);
930
931 listRewind(server.clients);
932 while ((ln = listYield(server.clients)) != NULL) {
933 c = listNodeValue(ln);
934 if (server.maxidletime &&
935 !(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
936 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
937 (now - c->lastinteraction > server.maxidletime))
938 {
939 redisLog(REDIS_DEBUG,"Closing idle client");
940 freeClient(c);
941 } else if (c->flags & REDIS_BLOCKED) {
942 if (c->blockingto != 0 && c->blockingto < now) {
943 addReply(c,shared.nullmultibulk);
944 unblockClient(c);
945 }
946 }
947 }
948}
949
950static int htNeedsResize(dict *dict) {
951 long long size, used;
952
953 size = dictSlots(dict);
954 used = dictSize(dict);
955 return (size && used && size > DICT_HT_INITIAL_SIZE &&
956 (used*100/size < REDIS_HT_MINFILL));
957}
958
959/* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
960 * we resize the hash table to save memory */
961static void tryResizeHashTables(void) {
962 int j;
963
964 for (j = 0; j < server.dbnum; j++) {
965 if (htNeedsResize(server.db[j].dict)) {
966 redisLog(REDIS_DEBUG,"The hash table %d is too sparse, resize it...",j);
967 dictResize(server.db[j].dict);
968 redisLog(REDIS_DEBUG,"Hash table %d resized.",j);
969 }
970 if (htNeedsResize(server.db[j].expires))
971 dictResize(server.db[j].expires);
972 }
973}
974
975/* A background saving child (BGSAVE) terminated its work. Handle this. */
976void backgroundSaveDoneHandler(int statloc) {
977 int exitcode = WEXITSTATUS(statloc);
978 int bysignal = WIFSIGNALED(statloc);
979
980 if (!bysignal && exitcode == 0) {
981 redisLog(REDIS_NOTICE,
982 "Background saving terminated with success");
983 server.dirty = 0;
984 server.lastsave = time(NULL);
985 } else if (!bysignal && exitcode != 0) {
986 redisLog(REDIS_WARNING, "Background saving error");
987 } else {
988 redisLog(REDIS_WARNING,
989 "Background saving terminated by signal");
990 rdbRemoveTempFile(server.bgsavechildpid);
991 }
992 server.bgsavechildpid = -1;
993 /* Possibly there are slaves waiting for a BGSAVE in order to be served
994 * (the first stage of SYNC is a bulk transfer of dump.rdb) */
995 updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
996}
997
998/* A background append only file rewriting (BGREWRITEAOF) terminated its work.
999 * Handle this. */
1000void backgroundRewriteDoneHandler(int statloc) {
1001 int exitcode = WEXITSTATUS(statloc);
1002 int bysignal = WIFSIGNALED(statloc);
1003
1004 if (!bysignal && exitcode == 0) {
1005 int fd;
1006 char tmpfile[256];
1007
1008 redisLog(REDIS_NOTICE,
1009 "Background append only file rewriting terminated with success");
1010 /* Now it's time to flush the differences accumulated by the parent */
1011 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) server.bgrewritechildpid);
1012 fd = open(tmpfile,O_WRONLY|O_APPEND);
1013 if (fd == -1) {
1014 redisLog(REDIS_WARNING, "Not able to open the temp append only file produced by the child: %s", strerror(errno));
1015 goto cleanup;
1016 }
1017 /* Flush our data... */
1018 if (write(fd,server.bgrewritebuf,sdslen(server.bgrewritebuf)) !=
1019 (signed) sdslen(server.bgrewritebuf)) {
1020 redisLog(REDIS_WARNING, "Error or short write trying to flush the parent diff of the append log file in the child temp file: %s", strerror(errno));
1021 close(fd);
1022 goto cleanup;
1023 }
1024 redisLog(REDIS_NOTICE,"Parent diff flushed into the new append log file with success (%lu bytes)",sdslen(server.bgrewritebuf));
1025 /* Now our work is to rename the temp file into the stable file. And
1026 * switch the file descriptor used by the server for append only. */
1027 if (rename(tmpfile,server.appendfilename) == -1) {
1028 redisLog(REDIS_WARNING,"Can't rename the temp append only file into the stable one: %s", strerror(errno));
1029 close(fd);
1030 goto cleanup;
1031 }
1032 /* Mission completed... almost */
1033 redisLog(REDIS_NOTICE,"Append only file successfully rewritten.");
1034 if (server.appendfd != -1) {
1035 /* If append only is actually enabled... */
1036 close(server.appendfd);
1037 server.appendfd = fd;
1038 fsync(fd);
1039 server.appendseldb = -1; /* Make sure it will issue SELECT */
1040 redisLog(REDIS_NOTICE,"The new append only file was selected for future appends.");
1041 } else {
1042 /* If append only is disabled we just generate a dump in this
1043 * format. Why not? */
1044 close(fd);
1045 }
1046 } else if (!bysignal && exitcode != 0) {
1047 redisLog(REDIS_WARNING, "Background append only file rewriting error");
1048 } else {
1049 redisLog(REDIS_WARNING,
1050 "Background append only file rewriting terminated by signal");
1051 }
1052cleanup:
1053 sdsfree(server.bgrewritebuf);
1054 server.bgrewritebuf = sdsempty();
1055 aofRemoveTempFile(server.bgrewritechildpid);
1056 server.bgrewritechildpid = -1;
1057}
1058
1059static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
1060 int j, loops = server.cronloops++;
1061 REDIS_NOTUSED(eventLoop);
1062 REDIS_NOTUSED(id);
1063 REDIS_NOTUSED(clientData);
1064
1065 /* Update the global state with the amount of used memory */
1066 server.usedmemory = zmalloc_used_memory();
1067
1068 /* Show some info about non-empty databases */
1069 for (j = 0; j < server.dbnum; j++) {
1070 long long size, used, vkeys;
1071
1072 size = dictSlots(server.db[j].dict);
1073 used = dictSize(server.db[j].dict);
1074 vkeys = dictSize(server.db[j].expires);
1075 if (!(loops % 5) && (used || vkeys)) {
1076 redisLog(REDIS_DEBUG,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
1077 /* dictPrintStats(server.dict); */
1078 }
1079 }
1080
1081 /* We don't want to resize the hash tables while a bacground saving
1082 * is in progress: the saving child is created using fork() that is
1083 * implemented with a copy-on-write semantic in most modern systems, so
1084 * if we resize the HT while there is the saving child at work actually
1085 * a lot of memory movements in the parent will cause a lot of pages
1086 * copied. */
1087 if (server.bgsavechildpid == -1) tryResizeHashTables();
1088
1089 /* Show information about connected clients */
1090 if (!(loops % 5)) {
1091 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
1092 listLength(server.clients)-listLength(server.slaves),
1093 listLength(server.slaves),
1094 server.usedmemory,
1095 dictSize(server.sharingpool));
1096 }
1097
1098 /* Close connections of timedout clients */
1099 if ((server.maxidletime && !(loops % 10)) || server.blockedclients)
1100 closeTimedoutClients();
1101
1102 /* Check if a background saving or AOF rewrite in progress terminated */
1103 if (server.bgsavechildpid != -1 || server.bgrewritechildpid != -1) {
1104 int statloc;
1105 pid_t pid;
1106
1107 if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
1108 if (pid == server.bgsavechildpid) {
1109 backgroundSaveDoneHandler(statloc);
1110 } else {
1111 backgroundRewriteDoneHandler(statloc);
1112 }
1113 }
1114 } else {
1115 /* If there is not a background saving in progress check if
1116 * we have to save now */
1117 time_t now = time(NULL);
1118 for (j = 0; j < server.saveparamslen; j++) {
1119 struct saveparam *sp = server.saveparams+j;
1120
1121 if (server.dirty >= sp->changes &&
1122 now-server.lastsave > sp->seconds) {
1123 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
1124 sp->changes, sp->seconds);
1125 rdbSaveBackground(server.dbfilename);
1126 break;
1127 }
1128 }
1129 }
1130
1131 /* Try to expire a few timed out keys. The algorithm used is adaptive and
1132 * will use few CPU cycles if there are few expiring keys, otherwise
1133 * it will get more aggressive to avoid that too much memory is used by
1134 * keys that can be removed from the keyspace. */
1135 for (j = 0; j < server.dbnum; j++) {
1136 int expired;
1137 redisDb *db = server.db+j;
1138
1139 /* Continue to expire if at the end of the cycle more than 25%
1140 * of the keys were expired. */
1141 do {
1142 int num = dictSize(db->expires);
1143 time_t now = time(NULL);
1144
1145 expired = 0;
1146 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
1147 num = REDIS_EXPIRELOOKUPS_PER_CRON;
1148 while (num--) {
1149 dictEntry *de;
1150 time_t t;
1151
1152 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
1153 t = (time_t) dictGetEntryVal(de);
1154 if (now > t) {
1155 deleteKey(db,dictGetEntryKey(de));
1156 expired++;
1157 }
1158 }
1159 } while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4);
1160 }
1161
1162 /* Check if we should connect to a MASTER */
1163 if (server.replstate == REDIS_REPL_CONNECT) {
1164 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
1165 if (syncWithMaster() == REDIS_OK) {
1166 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
1167 }
1168 }
1169 return 1000;
1170}
1171
1172static void createSharedObjects(void) {
1173 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
1174 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
1175 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
1176 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
1177 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
1178 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
1179 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
1180 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
1181 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
1182 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
1183 shared.queued = createObject(REDIS_STRING,sdsnew("+QUEUED\r\n"));
1184 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
1185 "-ERR Operation against a key holding the wrong kind of value\r\n"));
1186 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
1187 "-ERR no such key\r\n"));
1188 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
1189 "-ERR syntax error\r\n"));
1190 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
1191 "-ERR source and destination objects are the same\r\n"));
1192 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
1193 "-ERR index out of range\r\n"));
1194 shared.space = createObject(REDIS_STRING,sdsnew(" "));
1195 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
1196 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
1197 shared.select0 = createStringObject("select 0\r\n",10);
1198 shared.select1 = createStringObject("select 1\r\n",10);
1199 shared.select2 = createStringObject("select 2\r\n",10);
1200 shared.select3 = createStringObject("select 3\r\n",10);
1201 shared.select4 = createStringObject("select 4\r\n",10);
1202 shared.select5 = createStringObject("select 5\r\n",10);
1203 shared.select6 = createStringObject("select 6\r\n",10);
1204 shared.select7 = createStringObject("select 7\r\n",10);
1205 shared.select8 = createStringObject("select 8\r\n",10);
1206 shared.select9 = createStringObject("select 9\r\n",10);
1207}
1208
1209static void appendServerSaveParams(time_t seconds, int changes) {
1210 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
1211 server.saveparams[server.saveparamslen].seconds = seconds;
1212 server.saveparams[server.saveparamslen].changes = changes;
1213 server.saveparamslen++;
1214}
1215
1216static void resetServerSaveParams() {
1217 zfree(server.saveparams);
1218 server.saveparams = NULL;
1219 server.saveparamslen = 0;
1220}
1221
1222static void initServerConfig() {
1223 server.dbnum = REDIS_DEFAULT_DBNUM;
1224 server.port = REDIS_SERVERPORT;
1225 server.verbosity = REDIS_DEBUG;
1226 server.maxidletime = REDIS_MAXIDLETIME;
1227 server.saveparams = NULL;
1228 server.logfile = NULL; /* NULL = log on standard output */
1229 server.bindaddr = NULL;
1230 server.glueoutputbuf = 1;
1231 server.daemonize = 0;
1232 server.appendonly = 0;
1233 server.appendfsync = APPENDFSYNC_ALWAYS;
1234 server.lastfsync = time(NULL);
1235 server.appendfd = -1;
1236 server.appendseldb = -1; /* Make sure the first time will not match */
1237 server.pidfile = "/var/run/redis.pid";
1238 server.dbfilename = "dump.rdb";
1239 server.appendfilename = "appendonly.aof";
1240 server.requirepass = NULL;
1241 server.shareobjects = 0;
1242 server.rdbcompression = 1;
1243 server.sharingpoolsize = 1024;
1244 server.maxclients = 0;
1245 server.blockedclients = 0;
1246 server.maxmemory = 0;
1247 server.vm_enabled = 0;
1248 server.vm_page_size = 256; /* 256 bytes per page */
1249 server.vm_pages = 1024*1024*100; /* 104 millions of pages */
1250 server.vm_max_memory = 1024LL*1024*1024*1; /* 1 GB of RAM */
1251
1252 resetServerSaveParams();
1253
1254 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1255 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1256 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1257 /* Replication related */
1258 server.isslave = 0;
1259 server.masterauth = NULL;
1260 server.masterhost = NULL;
1261 server.masterport = 6379;
1262 server.master = NULL;
1263 server.replstate = REDIS_REPL_NONE;
1264
1265 /* Double constants initialization */
1266 R_Zero = 0.0;
1267 R_PosInf = 1.0/R_Zero;
1268 R_NegInf = -1.0/R_Zero;
1269 R_Nan = R_Zero/R_Zero;
1270}
1271
1272static void initServer() {
1273 int j;
1274
1275 signal(SIGHUP, SIG_IGN);
1276 signal(SIGPIPE, SIG_IGN);
1277 setupSigSegvAction();
1278
1279 server.clients = listCreate();
1280 server.slaves = listCreate();
1281 server.monitors = listCreate();
1282 server.objfreelist = listCreate();
1283 createSharedObjects();
1284 server.el = aeCreateEventLoop();
1285 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
1286 server.sharingpool = dictCreate(&setDictType,NULL);
1287 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
1288 if (server.fd == -1) {
1289 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
1290 exit(1);
1291 }
1292 for (j = 0; j < server.dbnum; j++) {
1293 server.db[j].dict = dictCreate(&hashDictType,NULL);
1294 server.db[j].expires = dictCreate(&setDictType,NULL);
1295 server.db[j].blockingkeys = dictCreate(&keylistDictType,NULL);
1296 server.db[j].id = j;
1297 }
1298 server.cronloops = 0;
1299 server.bgsavechildpid = -1;
1300 server.bgrewritechildpid = -1;
1301 server.bgrewritebuf = sdsempty();
1302 server.lastsave = time(NULL);
1303 server.dirty = 0;
1304 server.usedmemory = 0;
1305 server.stat_numcommands = 0;
1306 server.stat_numconnections = 0;
1307 server.stat_starttime = time(NULL);
1308 aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
1309
1310 if (server.appendonly) {
1311 server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,0644);
1312 if (server.appendfd == -1) {
1313 redisLog(REDIS_WARNING, "Can't open the append-only file: %s",
1314 strerror(errno));
1315 exit(1);
1316 }
1317 }
1318
1319 if (server.vm_enabled) vmInit();
1320}
1321
1322/* Empty the whole database */
1323static long long emptyDb() {
1324 int j;
1325 long long removed = 0;
1326
1327 for (j = 0; j < server.dbnum; j++) {
1328 removed += dictSize(server.db[j].dict);
1329 dictEmpty(server.db[j].dict);
1330 dictEmpty(server.db[j].expires);
1331 }
1332 return removed;
1333}
1334
1335static int yesnotoi(char *s) {
1336 if (!strcasecmp(s,"yes")) return 1;
1337 else if (!strcasecmp(s,"no")) return 0;
1338 else return -1;
1339}
1340
1341/* I agree, this is a very rudimental way to load a configuration...
1342 will improve later if the config gets more complex */
1343static void loadServerConfig(char *filename) {
1344 FILE *fp;
1345 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
1346 int linenum = 0;
1347 sds line = NULL;
1348
1349 if (filename[0] == '-' && filename[1] == '\0')
1350 fp = stdin;
1351 else {
1352 if ((fp = fopen(filename,"r")) == NULL) {
1353 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
1354 exit(1);
1355 }
1356 }
1357
1358 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
1359 sds *argv;
1360 int argc, j;
1361
1362 linenum++;
1363 line = sdsnew(buf);
1364 line = sdstrim(line," \t\r\n");
1365
1366 /* Skip comments and blank lines*/
1367 if (line[0] == '#' || line[0] == '\0') {
1368 sdsfree(line);
1369 continue;
1370 }
1371
1372 /* Split into arguments */
1373 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
1374 sdstolower(argv[0]);
1375
1376 /* Execute config directives */
1377 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
1378 server.maxidletime = atoi(argv[1]);
1379 if (server.maxidletime < 0) {
1380 err = "Invalid timeout value"; goto loaderr;
1381 }
1382 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
1383 server.port = atoi(argv[1]);
1384 if (server.port < 1 || server.port > 65535) {
1385 err = "Invalid port"; goto loaderr;
1386 }
1387 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
1388 server.bindaddr = zstrdup(argv[1]);
1389 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
1390 int seconds = atoi(argv[1]);
1391 int changes = atoi(argv[2]);
1392 if (seconds < 1 || changes < 0) {
1393 err = "Invalid save parameters"; goto loaderr;
1394 }
1395 appendServerSaveParams(seconds,changes);
1396 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1397 if (chdir(argv[1]) == -1) {
1398 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1399 argv[1], strerror(errno));
1400 exit(1);
1401 }
1402 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1403 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1404 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1405 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1406 else {
1407 err = "Invalid log level. Must be one of debug, notice, warning";
1408 goto loaderr;
1409 }
1410 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1411 FILE *logfp;
1412
1413 server.logfile = zstrdup(argv[1]);
1414 if (!strcasecmp(server.logfile,"stdout")) {
1415 zfree(server.logfile);
1416 server.logfile = NULL;
1417 }
1418 if (server.logfile) {
1419 /* Test if we are able to open the file. The server will not
1420 * be able to abort just for this problem later... */
1421 logfp = fopen(server.logfile,"a");
1422 if (logfp == NULL) {
1423 err = sdscatprintf(sdsempty(),
1424 "Can't open the log file: %s", strerror(errno));
1425 goto loaderr;
1426 }
1427 fclose(logfp);
1428 }
1429 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1430 server.dbnum = atoi(argv[1]);
1431 if (server.dbnum < 1) {
1432 err = "Invalid number of databases"; goto loaderr;
1433 }
1434 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1435 server.maxclients = atoi(argv[1]);
1436 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1437 server.maxmemory = strtoll(argv[1], NULL, 10);
1438 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1439 server.masterhost = sdsnew(argv[1]);
1440 server.masterport = atoi(argv[2]);
1441 server.replstate = REDIS_REPL_CONNECT;
1442 } else if (!strcasecmp(argv[0],"masterauth") && argc == 2) {
1443 server.masterauth = zstrdup(argv[1]);
1444 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1445 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1446 err = "argument must be 'yes' or 'no'"; goto loaderr;
1447 }
1448 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1449 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1450 err = "argument must be 'yes' or 'no'"; goto loaderr;
1451 }
1452 } else if (!strcasecmp(argv[0],"rdbcompression") && argc == 2) {
1453 if ((server.rdbcompression = yesnotoi(argv[1])) == -1) {
1454 err = "argument must be 'yes' or 'no'"; goto loaderr;
1455 }
1456 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1457 server.sharingpoolsize = atoi(argv[1]);
1458 if (server.sharingpoolsize < 1) {
1459 err = "invalid object sharing pool size"; goto loaderr;
1460 }
1461 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1462 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1463 err = "argument must be 'yes' or 'no'"; goto loaderr;
1464 }
1465 } else if (!strcasecmp(argv[0],"appendonly") && argc == 2) {
1466 if ((server.appendonly = yesnotoi(argv[1])) == -1) {
1467 err = "argument must be 'yes' or 'no'"; goto loaderr;
1468 }
1469 } else if (!strcasecmp(argv[0],"appendfsync") && argc == 2) {
1470 if (!strcasecmp(argv[1],"no")) {
1471 server.appendfsync = APPENDFSYNC_NO;
1472 } else if (!strcasecmp(argv[1],"always")) {
1473 server.appendfsync = APPENDFSYNC_ALWAYS;
1474 } else if (!strcasecmp(argv[1],"everysec")) {
1475 server.appendfsync = APPENDFSYNC_EVERYSEC;
1476 } else {
1477 err = "argument must be 'no', 'always' or 'everysec'";
1478 goto loaderr;
1479 }
1480 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1481 server.requirepass = zstrdup(argv[1]);
1482 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1483 server.pidfile = zstrdup(argv[1]);
1484 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1485 server.dbfilename = zstrdup(argv[1]);
1486 } else if (!strcasecmp(argv[0],"vm-enabled") && argc == 2) {
1487 if ((server.vm_enabled = yesnotoi(argv[1])) == -1) {
1488 err = "argument must be 'yes' or 'no'"; goto loaderr;
1489 }
1490 } else {
1491 err = "Bad directive or wrong number of arguments"; goto loaderr;
1492 }
1493 for (j = 0; j < argc; j++)
1494 sdsfree(argv[j]);
1495 zfree(argv);
1496 sdsfree(line);
1497 }
1498 if (fp != stdin) fclose(fp);
1499 return;
1500
1501loaderr:
1502 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1503 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1504 fprintf(stderr, ">>> '%s'\n", line);
1505 fprintf(stderr, "%s\n", err);
1506 exit(1);
1507}
1508
1509static void freeClientArgv(redisClient *c) {
1510 int j;
1511
1512 for (j = 0; j < c->argc; j++)
1513 decrRefCount(c->argv[j]);
1514 for (j = 0; j < c->mbargc; j++)
1515 decrRefCount(c->mbargv[j]);
1516 c->argc = 0;
1517 c->mbargc = 0;
1518}
1519
1520static void freeClient(redisClient *c) {
1521 listNode *ln;
1522
1523 /* Note that if the client we are freeing is blocked into a blocking
1524 * call, we have to set querybuf to NULL *before* to call unblockClient()
1525 * to avoid processInputBuffer() will get called. Also it is important
1526 * to remove the file events after this, because this call adds
1527 * the READABLE event. */
1528 sdsfree(c->querybuf);
1529 c->querybuf = NULL;
1530 if (c->flags & REDIS_BLOCKED)
1531 unblockClient(c);
1532
1533 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1534 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1535 listRelease(c->reply);
1536 freeClientArgv(c);
1537 close(c->fd);
1538 ln = listSearchKey(server.clients,c);
1539 redisAssert(ln != NULL);
1540 listDelNode(server.clients,ln);
1541 if (c->flags & REDIS_SLAVE) {
1542 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1543 close(c->repldbfd);
1544 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1545 ln = listSearchKey(l,c);
1546 redisAssert(ln != NULL);
1547 listDelNode(l,ln);
1548 }
1549 if (c->flags & REDIS_MASTER) {
1550 server.master = NULL;
1551 server.replstate = REDIS_REPL_CONNECT;
1552 }
1553 zfree(c->argv);
1554 zfree(c->mbargv);
1555 freeClientMultiState(c);
1556 zfree(c);
1557}
1558
1559#define GLUEREPLY_UP_TO (1024)
1560static void glueReplyBuffersIfNeeded(redisClient *c) {
1561 int copylen = 0;
1562 char buf[GLUEREPLY_UP_TO];
1563 listNode *ln;
1564 robj *o;
1565
1566 listRewind(c->reply);
1567 while((ln = listYield(c->reply))) {
1568 int objlen;
1569
1570 o = ln->value;
1571 objlen = sdslen(o->ptr);
1572 if (copylen + objlen <= GLUEREPLY_UP_TO) {
1573 memcpy(buf+copylen,o->ptr,objlen);
1574 copylen += objlen;
1575 listDelNode(c->reply,ln);
1576 } else {
1577 if (copylen == 0) return;
1578 break;
1579 }
1580 }
1581 /* Now the output buffer is empty, add the new single element */
1582 o = createObject(REDIS_STRING,sdsnewlen(buf,copylen));
1583 listAddNodeHead(c->reply,o);
1584}
1585
1586static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1587 redisClient *c = privdata;
1588 int nwritten = 0, totwritten = 0, objlen;
1589 robj *o;
1590 REDIS_NOTUSED(el);
1591 REDIS_NOTUSED(mask);
1592
1593 /* Use writev() if we have enough buffers to send */
1594 if (!server.glueoutputbuf &&
1595 listLength(c->reply) > REDIS_WRITEV_THRESHOLD &&
1596 !(c->flags & REDIS_MASTER))
1597 {
1598 sendReplyToClientWritev(el, fd, privdata, mask);
1599 return;
1600 }
1601
1602 while(listLength(c->reply)) {
1603 if (server.glueoutputbuf && listLength(c->reply) > 1)
1604 glueReplyBuffersIfNeeded(c);
1605
1606 o = listNodeValue(listFirst(c->reply));
1607 objlen = sdslen(o->ptr);
1608
1609 if (objlen == 0) {
1610 listDelNode(c->reply,listFirst(c->reply));
1611 continue;
1612 }
1613
1614 if (c->flags & REDIS_MASTER) {
1615 /* Don't reply to a master */
1616 nwritten = objlen - c->sentlen;
1617 } else {
1618 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1619 if (nwritten <= 0) break;
1620 }
1621 c->sentlen += nwritten;
1622 totwritten += nwritten;
1623 /* If we fully sent the object on head go to the next one */
1624 if (c->sentlen == objlen) {
1625 listDelNode(c->reply,listFirst(c->reply));
1626 c->sentlen = 0;
1627 }
1628 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1629 * bytes, in a single threaded server it's a good idea to serve
1630 * other clients as well, even if a very large request comes from
1631 * super fast link that is always able to accept data (in real world
1632 * scenario think about 'KEYS *' against the loopback interfae) */
1633 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
1634 }
1635 if (nwritten == -1) {
1636 if (errno == EAGAIN) {
1637 nwritten = 0;
1638 } else {
1639 redisLog(REDIS_DEBUG,
1640 "Error writing to client: %s", strerror(errno));
1641 freeClient(c);
1642 return;
1643 }
1644 }
1645 if (totwritten > 0) c->lastinteraction = time(NULL);
1646 if (listLength(c->reply) == 0) {
1647 c->sentlen = 0;
1648 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1649 }
1650}
1651
1652static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask)
1653{
1654 redisClient *c = privdata;
1655 int nwritten = 0, totwritten = 0, objlen, willwrite;
1656 robj *o;
1657 struct iovec iov[REDIS_WRITEV_IOVEC_COUNT];
1658 int offset, ion = 0;
1659 REDIS_NOTUSED(el);
1660 REDIS_NOTUSED(mask);
1661
1662 listNode *node;
1663 while (listLength(c->reply)) {
1664 offset = c->sentlen;
1665 ion = 0;
1666 willwrite = 0;
1667
1668 /* fill-in the iov[] array */
1669 for(node = listFirst(c->reply); node; node = listNextNode(node)) {
1670 o = listNodeValue(node);
1671 objlen = sdslen(o->ptr);
1672
1673 if (totwritten + objlen - offset > REDIS_MAX_WRITE_PER_EVENT)
1674 break;
1675
1676 if(ion == REDIS_WRITEV_IOVEC_COUNT)
1677 break; /* no more iovecs */
1678
1679 iov[ion].iov_base = ((char*)o->ptr) + offset;
1680 iov[ion].iov_len = objlen - offset;
1681 willwrite += objlen - offset;
1682 offset = 0; /* just for the first item */
1683 ion++;
1684 }
1685
1686 if(willwrite == 0)
1687 break;
1688
1689 /* write all collected blocks at once */
1690 if((nwritten = writev(fd, iov, ion)) < 0) {
1691 if (errno != EAGAIN) {
1692 redisLog(REDIS_DEBUG,
1693 "Error writing to client: %s", strerror(errno));
1694 freeClient(c);
1695 return;
1696 }
1697 break;
1698 }
1699
1700 totwritten += nwritten;
1701 offset = c->sentlen;
1702
1703 /* remove written robjs from c->reply */
1704 while (nwritten && listLength(c->reply)) {
1705 o = listNodeValue(listFirst(c->reply));
1706 objlen = sdslen(o->ptr);
1707
1708 if(nwritten >= objlen - offset) {
1709 listDelNode(c->reply, listFirst(c->reply));
1710 nwritten -= objlen - offset;
1711 c->sentlen = 0;
1712 } else {
1713 /* partial write */
1714 c->sentlen += nwritten;
1715 break;
1716 }
1717 offset = 0;
1718 }
1719 }
1720
1721 if (totwritten > 0)
1722 c->lastinteraction = time(NULL);
1723
1724 if (listLength(c->reply) == 0) {
1725 c->sentlen = 0;
1726 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1727 }
1728}
1729
1730static struct redisCommand *lookupCommand(char *name) {
1731 int j = 0;
1732 while(cmdTable[j].name != NULL) {
1733 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1734 j++;
1735 }
1736 return NULL;
1737}
1738
1739/* resetClient prepare the client to process the next command */
1740static void resetClient(redisClient *c) {
1741 freeClientArgv(c);
1742 c->bulklen = -1;
1743 c->multibulk = 0;
1744}
1745
1746/* Call() is the core of Redis execution of a command */
1747static void call(redisClient *c, struct redisCommand *cmd) {
1748 long long dirty;
1749
1750 dirty = server.dirty;
1751 cmd->proc(c);
1752 if (server.appendonly && server.dirty-dirty)
1753 feedAppendOnlyFile(cmd,c->db->id,c->argv,c->argc);
1754 if (server.dirty-dirty && listLength(server.slaves))
1755 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1756 if (listLength(server.monitors))
1757 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1758 server.stat_numcommands++;
1759}
1760
1761/* If this function gets called we already read a whole
1762 * command, argments are in the client argv/argc fields.
1763 * processCommand() execute the command or prepare the
1764 * server for a bulk read from the client.
1765 *
1766 * If 1 is returned the client is still alive and valid and
1767 * and other operations can be performed by the caller. Otherwise
1768 * if 0 is returned the client was destroied (i.e. after QUIT). */
1769static int processCommand(redisClient *c) {
1770 struct redisCommand *cmd;
1771
1772 /* Free some memory if needed (maxmemory setting) */
1773 if (server.maxmemory) freeMemoryIfNeeded();
1774
1775 /* Handle the multi bulk command type. This is an alternative protocol
1776 * supported by Redis in order to receive commands that are composed of
1777 * multiple binary-safe "bulk" arguments. The latency of processing is
1778 * a bit higher but this allows things like multi-sets, so if this
1779 * protocol is used only for MSET and similar commands this is a big win. */
1780 if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') {
1781 c->multibulk = atoi(((char*)c->argv[0]->ptr)+1);
1782 if (c->multibulk <= 0) {
1783 resetClient(c);
1784 return 1;
1785 } else {
1786 decrRefCount(c->argv[c->argc-1]);
1787 c->argc--;
1788 return 1;
1789 }
1790 } else if (c->multibulk) {
1791 if (c->bulklen == -1) {
1792 if (((char*)c->argv[0]->ptr)[0] != '$') {
1793 addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
1794 resetClient(c);
1795 return 1;
1796 } else {
1797 int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
1798 decrRefCount(c->argv[0]);
1799 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1800 c->argc--;
1801 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1802 resetClient(c);
1803 return 1;
1804 }
1805 c->argc--;
1806 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1807 return 1;
1808 }
1809 } else {
1810 c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1));
1811 c->mbargv[c->mbargc] = c->argv[0];
1812 c->mbargc++;
1813 c->argc--;
1814 c->multibulk--;
1815 if (c->multibulk == 0) {
1816 robj **auxargv;
1817 int auxargc;
1818
1819 /* Here we need to swap the multi-bulk argc/argv with the
1820 * normal argc/argv of the client structure. */
1821 auxargv = c->argv;
1822 c->argv = c->mbargv;
1823 c->mbargv = auxargv;
1824
1825 auxargc = c->argc;
1826 c->argc = c->mbargc;
1827 c->mbargc = auxargc;
1828
1829 /* We need to set bulklen to something different than -1
1830 * in order for the code below to process the command without
1831 * to try to read the last argument of a bulk command as
1832 * a special argument. */
1833 c->bulklen = 0;
1834 /* continue below and process the command */
1835 } else {
1836 c->bulklen = -1;
1837 return 1;
1838 }
1839 }
1840 }
1841 /* -- end of multi bulk commands processing -- */
1842
1843 /* The QUIT command is handled as a special case. Normal command
1844 * procs are unable to close the client connection safely */
1845 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1846 freeClient(c);
1847 return 0;
1848 }
1849 cmd = lookupCommand(c->argv[0]->ptr);
1850 if (!cmd) {
1851 addReplySds(c,
1852 sdscatprintf(sdsempty(), "-ERR unknown command '%s'\r\n",
1853 (char*)c->argv[0]->ptr));
1854 resetClient(c);
1855 return 1;
1856 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1857 (c->argc < -cmd->arity)) {
1858 addReplySds(c,
1859 sdscatprintf(sdsempty(),
1860 "-ERR wrong number of arguments for '%s' command\r\n",
1861 cmd->name));
1862 resetClient(c);
1863 return 1;
1864 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1865 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1866 resetClient(c);
1867 return 1;
1868 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1869 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1870
1871 decrRefCount(c->argv[c->argc-1]);
1872 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1873 c->argc--;
1874 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1875 resetClient(c);
1876 return 1;
1877 }
1878 c->argc--;
1879 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1880 /* It is possible that the bulk read is already in the
1881 * buffer. Check this condition and handle it accordingly.
1882 * This is just a fast path, alternative to call processInputBuffer().
1883 * It's a good idea since the code is small and this condition
1884 * happens most of the times. */
1885 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1886 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1887 c->argc++;
1888 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1889 } else {
1890 return 1;
1891 }
1892 }
1893 /* Let's try to share objects on the command arguments vector */
1894 if (server.shareobjects) {
1895 int j;
1896 for(j = 1; j < c->argc; j++)
1897 c->argv[j] = tryObjectSharing(c->argv[j]);
1898 }
1899 /* Let's try to encode the bulk object to save space. */
1900 if (cmd->flags & REDIS_CMD_BULK)
1901 tryObjectEncoding(c->argv[c->argc-1]);
1902
1903 /* Check if the user is authenticated */
1904 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1905 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1906 resetClient(c);
1907 return 1;
1908 }
1909
1910 /* Exec the command */
1911 if (c->flags & REDIS_MULTI && cmd->proc != execCommand) {
1912 queueMultiCommand(c,cmd);
1913 addReply(c,shared.queued);
1914 } else {
1915 call(c,cmd);
1916 }
1917
1918 /* Prepare the client for the next command */
1919 if (c->flags & REDIS_CLOSE) {
1920 freeClient(c);
1921 return 0;
1922 }
1923 resetClient(c);
1924 return 1;
1925}
1926
1927static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1928 listNode *ln;
1929 int outc = 0, j;
1930 robj **outv;
1931 /* (args*2)+1 is enough room for args, spaces, newlines */
1932 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1933
1934 if (argc <= REDIS_STATIC_ARGS) {
1935 outv = static_outv;
1936 } else {
1937 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1938 }
1939
1940 for (j = 0; j < argc; j++) {
1941 if (j != 0) outv[outc++] = shared.space;
1942 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1943 robj *lenobj;
1944
1945 lenobj = createObject(REDIS_STRING,
1946 sdscatprintf(sdsempty(),"%lu\r\n",
1947 (unsigned long) stringObjectLen(argv[j])));
1948 lenobj->refcount = 0;
1949 outv[outc++] = lenobj;
1950 }
1951 outv[outc++] = argv[j];
1952 }
1953 outv[outc++] = shared.crlf;
1954
1955 /* Increment all the refcounts at start and decrement at end in order to
1956 * be sure to free objects if there is no slave in a replication state
1957 * able to be feed with commands */
1958 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1959 listRewind(slaves);
1960 while((ln = listYield(slaves))) {
1961 redisClient *slave = ln->value;
1962
1963 /* Don't feed slaves that are still waiting for BGSAVE to start */
1964 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1965
1966 /* Feed all the other slaves, MONITORs and so on */
1967 if (slave->slaveseldb != dictid) {
1968 robj *selectcmd;
1969
1970 switch(dictid) {
1971 case 0: selectcmd = shared.select0; break;
1972 case 1: selectcmd = shared.select1; break;
1973 case 2: selectcmd = shared.select2; break;
1974 case 3: selectcmd = shared.select3; break;
1975 case 4: selectcmd = shared.select4; break;
1976 case 5: selectcmd = shared.select5; break;
1977 case 6: selectcmd = shared.select6; break;
1978 case 7: selectcmd = shared.select7; break;
1979 case 8: selectcmd = shared.select8; break;
1980 case 9: selectcmd = shared.select9; break;
1981 default:
1982 selectcmd = createObject(REDIS_STRING,
1983 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1984 selectcmd->refcount = 0;
1985 break;
1986 }
1987 addReply(slave,selectcmd);
1988 slave->slaveseldb = dictid;
1989 }
1990 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1991 }
1992 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1993 if (outv != static_outv) zfree(outv);
1994}
1995
1996static void processInputBuffer(redisClient *c) {
1997again:
1998 /* Before to process the input buffer, make sure the client is not
1999 * waitig for a blocking operation such as BLPOP. Note that the first
2000 * iteration the client is never blocked, otherwise the processInputBuffer
2001 * would not be called at all, but after the execution of the first commands
2002 * in the input buffer the client may be blocked, and the "goto again"
2003 * will try to reiterate. The following line will make it return asap. */
2004 if (c->flags & REDIS_BLOCKED) return;
2005 if (c->bulklen == -1) {
2006 /* Read the first line of the query */
2007 char *p = strchr(c->querybuf,'\n');
2008 size_t querylen;
2009
2010 if (p) {
2011 sds query, *argv;
2012 int argc, j;
2013
2014 query = c->querybuf;
2015 c->querybuf = sdsempty();
2016 querylen = 1+(p-(query));
2017 if (sdslen(query) > querylen) {
2018 /* leave data after the first line of the query in the buffer */
2019 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
2020 }
2021 *p = '\0'; /* remove "\n" */
2022 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
2023 sdsupdatelen(query);
2024
2025 /* Now we can split the query in arguments */
2026 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
2027 sdsfree(query);
2028
2029 if (c->argv) zfree(c->argv);
2030 c->argv = zmalloc(sizeof(robj*)*argc);
2031
2032 for (j = 0; j < argc; j++) {
2033 if (sdslen(argv[j])) {
2034 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
2035 c->argc++;
2036 } else {
2037 sdsfree(argv[j]);
2038 }
2039 }
2040 zfree(argv);
2041 if (c->argc) {
2042 /* Execute the command. If the client is still valid
2043 * after processCommand() return and there is something
2044 * on the query buffer try to process the next command. */
2045 if (processCommand(c) && sdslen(c->querybuf)) goto again;
2046 } else {
2047 /* Nothing to process, argc == 0. Just process the query
2048 * buffer if it's not empty or return to the caller */
2049 if (sdslen(c->querybuf)) goto again;
2050 }
2051 return;
2052 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
2053 redisLog(REDIS_DEBUG, "Client protocol error");
2054 freeClient(c);
2055 return;
2056 }
2057 } else {
2058 /* Bulk read handling. Note that if we are at this point
2059 the client already sent a command terminated with a newline,
2060 we are reading the bulk data that is actually the last
2061 argument of the command. */
2062 int qbl = sdslen(c->querybuf);
2063
2064 if (c->bulklen <= qbl) {
2065 /* Copy everything but the final CRLF as final argument */
2066 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
2067 c->argc++;
2068 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
2069 /* Process the command. If the client is still valid after
2070 * the processing and there is more data in the buffer
2071 * try to parse it. */
2072 if (processCommand(c) && sdslen(c->querybuf)) goto again;
2073 return;
2074 }
2075 }
2076}
2077
2078static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
2079 redisClient *c = (redisClient*) privdata;
2080 char buf[REDIS_IOBUF_LEN];
2081 int nread;
2082 REDIS_NOTUSED(el);
2083 REDIS_NOTUSED(mask);
2084
2085 nread = read(fd, buf, REDIS_IOBUF_LEN);
2086 if (nread == -1) {
2087 if (errno == EAGAIN) {
2088 nread = 0;
2089 } else {
2090 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
2091 freeClient(c);
2092 return;
2093 }
2094 } else if (nread == 0) {
2095 redisLog(REDIS_DEBUG, "Client closed connection");
2096 freeClient(c);
2097 return;
2098 }
2099 if (nread) {
2100 c->querybuf = sdscatlen(c->querybuf, buf, nread);
2101 c->lastinteraction = time(NULL);
2102 } else {
2103 return;
2104 }
2105 processInputBuffer(c);
2106}
2107
2108static int selectDb(redisClient *c, int id) {
2109 if (id < 0 || id >= server.dbnum)
2110 return REDIS_ERR;
2111 c->db = &server.db[id];
2112 return REDIS_OK;
2113}
2114
2115static void *dupClientReplyValue(void *o) {
2116 incrRefCount((robj*)o);
2117 return 0;
2118}
2119
2120static redisClient *createClient(int fd) {
2121 redisClient *c = zmalloc(sizeof(*c));
2122
2123 anetNonBlock(NULL,fd);
2124 anetTcpNoDelay(NULL,fd);
2125 if (!c) return NULL;
2126 selectDb(c,0);
2127 c->fd = fd;
2128 c->querybuf = sdsempty();
2129 c->argc = 0;
2130 c->argv = NULL;
2131 c->bulklen = -1;
2132 c->multibulk = 0;
2133 c->mbargc = 0;
2134 c->mbargv = NULL;
2135 c->sentlen = 0;
2136 c->flags = 0;
2137 c->lastinteraction = time(NULL);
2138 c->authenticated = 0;
2139 c->replstate = REDIS_REPL_NONE;
2140 c->reply = listCreate();
2141 c->blockingkeys = NULL;
2142 c->blockingkeysnum = 0;
2143 listSetFreeMethod(c->reply,decrRefCount);
2144 listSetDupMethod(c->reply,dupClientReplyValue);
2145 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
2146 readQueryFromClient, c) == AE_ERR) {
2147 freeClient(c);
2148 return NULL;
2149 }
2150 listAddNodeTail(server.clients,c);
2151 initClientMultiState(c);
2152 return c;
2153}
2154
2155static void addReply(redisClient *c, robj *obj) {
2156 if (listLength(c->reply) == 0 &&
2157 (c->replstate == REDIS_REPL_NONE ||
2158 c->replstate == REDIS_REPL_ONLINE) &&
2159 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
2160 sendReplyToClient, c) == AE_ERR) return;
2161 listAddNodeTail(c->reply,getDecodedObject(obj));
2162}
2163
2164static void addReplySds(redisClient *c, sds s) {
2165 robj *o = createObject(REDIS_STRING,s);
2166 addReply(c,o);
2167 decrRefCount(o);
2168}
2169
2170static void addReplyDouble(redisClient *c, double d) {
2171 char buf[128];
2172
2173 snprintf(buf,sizeof(buf),"%.17g",d);
2174 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n%s\r\n",
2175 (unsigned long) strlen(buf),buf));
2176}
2177
2178static void addReplyBulkLen(redisClient *c, robj *obj) {
2179 size_t len;
2180
2181 if (obj->encoding == REDIS_ENCODING_RAW) {
2182 len = sdslen(obj->ptr);
2183 } else {
2184 long n = (long)obj->ptr;
2185
2186 /* Compute how many bytes will take this integer as a radix 10 string */
2187 len = 1;
2188 if (n < 0) {
2189 len++;
2190 n = -n;
2191 }
2192 while((n = n/10) != 0) {
2193 len++;
2194 }
2195 }
2196 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",(unsigned long)len));
2197}
2198
2199static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
2200 int cport, cfd;
2201 char cip[128];
2202 redisClient *c;
2203 REDIS_NOTUSED(el);
2204 REDIS_NOTUSED(mask);
2205 REDIS_NOTUSED(privdata);
2206
2207 cfd = anetAccept(server.neterr, fd, cip, &cport);
2208 if (cfd == AE_ERR) {
2209 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
2210 return;
2211 }
2212 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
2213 if ((c = createClient(cfd)) == NULL) {
2214 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
2215 close(cfd); /* May be already closed, just ingore errors */
2216 return;
2217 }
2218 /* If maxclient directive is set and this is one client more... close the
2219 * connection. Note that we create the client instead to check before
2220 * for this condition, since now the socket is already set in nonblocking
2221 * mode and we can send an error for free using the Kernel I/O */
2222 if (server.maxclients && listLength(server.clients) > server.maxclients) {
2223 char *err = "-ERR max number of clients reached\r\n";
2224
2225 /* That's a best effort error message, don't check write errors */
2226 if (write(c->fd,err,strlen(err)) == -1) {
2227 /* Nothing to do, Just to avoid the warning... */
2228 }
2229 freeClient(c);
2230 return;
2231 }
2232 server.stat_numconnections++;
2233}
2234
2235/* ======================= Redis objects implementation ===================== */
2236
2237static robj *createObject(int type, void *ptr) {
2238 robj *o;
2239
2240 if (listLength(server.objfreelist)) {
2241 listNode *head = listFirst(server.objfreelist);
2242 o = listNodeValue(head);
2243 listDelNode(server.objfreelist,head);
2244 } else {
2245 if (server.vm_enabled) {
2246 o = zmalloc(sizeof(*o));
2247 } else {
2248 o = zmalloc(sizeof(*o)-sizeof(struct redisObjectVM));
2249 }
2250 }
2251 o->type = type;
2252 o->encoding = REDIS_ENCODING_RAW;
2253 o->ptr = ptr;
2254 o->refcount = 1;
2255 return o;
2256}
2257
2258static robj *createStringObject(char *ptr, size_t len) {
2259 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
2260}
2261
2262static robj *createListObject(void) {
2263 list *l = listCreate();
2264
2265 listSetFreeMethod(l,decrRefCount);
2266 return createObject(REDIS_LIST,l);
2267}
2268
2269static robj *createSetObject(void) {
2270 dict *d = dictCreate(&setDictType,NULL);
2271 return createObject(REDIS_SET,d);
2272}
2273
2274static robj *createZsetObject(void) {
2275 zset *zs = zmalloc(sizeof(*zs));
2276
2277 zs->dict = dictCreate(&zsetDictType,NULL);
2278 zs->zsl = zslCreate();
2279 return createObject(REDIS_ZSET,zs);
2280}
2281
2282static void freeStringObject(robj *o) {
2283 if (o->encoding == REDIS_ENCODING_RAW) {
2284 sdsfree(o->ptr);
2285 }
2286}
2287
2288static void freeListObject(robj *o) {
2289 listRelease((list*) o->ptr);
2290}
2291
2292static void freeSetObject(robj *o) {
2293 dictRelease((dict*) o->ptr);
2294}
2295
2296static void freeZsetObject(robj *o) {
2297 zset *zs = o->ptr;
2298
2299 dictRelease(zs->dict);
2300 zslFree(zs->zsl);
2301 zfree(zs);
2302}
2303
2304static void freeHashObject(robj *o) {
2305 dictRelease((dict*) o->ptr);
2306}
2307
2308static void incrRefCount(robj *o) {
2309 o->refcount++;
2310#ifdef DEBUG_REFCOUNT
2311 if (o->type == REDIS_STRING)
2312 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
2313#endif
2314}
2315
2316static void decrRefCount(void *obj) {
2317 robj *o = obj;
2318
2319#ifdef DEBUG_REFCOUNT
2320 if (o->type == REDIS_STRING)
2321 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
2322#endif
2323 if (--(o->refcount) == 0) {
2324 switch(o->type) {
2325 case REDIS_STRING: freeStringObject(o); break;
2326 case REDIS_LIST: freeListObject(o); break;
2327 case REDIS_SET: freeSetObject(o); break;
2328 case REDIS_ZSET: freeZsetObject(o); break;
2329 case REDIS_HASH: freeHashObject(o); break;
2330 default: redisAssert(0 != 0); break;
2331 }
2332 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
2333 !listAddNodeHead(server.objfreelist,o))
2334 zfree(o);
2335 }
2336}
2337
2338static robj *lookupKey(redisDb *db, robj *key) {
2339 dictEntry *de = dictFind(db->dict,key);
2340 return de ? dictGetEntryVal(de) : NULL;
2341}
2342
2343static robj *lookupKeyRead(redisDb *db, robj *key) {
2344 expireIfNeeded(db,key);
2345 return lookupKey(db,key);
2346}
2347
2348static robj *lookupKeyWrite(redisDb *db, robj *key) {
2349 deleteIfVolatile(db,key);
2350 return lookupKey(db,key);
2351}
2352
2353static int deleteKey(redisDb *db, robj *key) {
2354 int retval;
2355
2356 /* We need to protect key from destruction: after the first dictDelete()
2357 * it may happen that 'key' is no longer valid if we don't increment
2358 * it's count. This may happen when we get the object reference directly
2359 * from the hash table with dictRandomKey() or dict iterators */
2360 incrRefCount(key);
2361 if (dictSize(db->expires)) dictDelete(db->expires,key);
2362 retval = dictDelete(db->dict,key);
2363 decrRefCount(key);
2364
2365 return retval == DICT_OK;
2366}
2367
2368/* Try to share an object against the shared objects pool */
2369static robj *tryObjectSharing(robj *o) {
2370 struct dictEntry *de;
2371 unsigned long c;
2372
2373 if (o == NULL || server.shareobjects == 0) return o;
2374
2375 redisAssert(o->type == REDIS_STRING);
2376 de = dictFind(server.sharingpool,o);
2377 if (de) {
2378 robj *shared = dictGetEntryKey(de);
2379
2380 c = ((unsigned long) dictGetEntryVal(de))+1;
2381 dictGetEntryVal(de) = (void*) c;
2382 incrRefCount(shared);
2383 decrRefCount(o);
2384 return shared;
2385 } else {
2386 /* Here we are using a stream algorihtm: Every time an object is
2387 * shared we increment its count, everytime there is a miss we
2388 * recrement the counter of a random object. If this object reaches
2389 * zero we remove the object and put the current object instead. */
2390 if (dictSize(server.sharingpool) >=
2391 server.sharingpoolsize) {
2392 de = dictGetRandomKey(server.sharingpool);
2393 redisAssert(de != NULL);
2394 c = ((unsigned long) dictGetEntryVal(de))-1;
2395 dictGetEntryVal(de) = (void*) c;
2396 if (c == 0) {
2397 dictDelete(server.sharingpool,de->key);
2398 }
2399 } else {
2400 c = 0; /* If the pool is empty we want to add this object */
2401 }
2402 if (c == 0) {
2403 int retval;
2404
2405 retval = dictAdd(server.sharingpool,o,(void*)1);
2406 redisAssert(retval == DICT_OK);
2407 incrRefCount(o);
2408 }
2409 return o;
2410 }
2411}
2412
2413/* Check if the nul-terminated string 's' can be represented by a long
2414 * (that is, is a number that fits into long without any other space or
2415 * character before or after the digits).
2416 *
2417 * If so, the function returns REDIS_OK and *longval is set to the value
2418 * of the number. Otherwise REDIS_ERR is returned */
2419static int isStringRepresentableAsLong(sds s, long *longval) {
2420 char buf[32], *endptr;
2421 long value;
2422 int slen;
2423
2424 value = strtol(s, &endptr, 10);
2425 if (endptr[0] != '\0') return REDIS_ERR;
2426 slen = snprintf(buf,32,"%ld",value);
2427
2428 /* If the number converted back into a string is not identical
2429 * then it's not possible to encode the string as integer */
2430 if (sdslen(s) != (unsigned)slen || memcmp(buf,s,slen)) return REDIS_ERR;
2431 if (longval) *longval = value;
2432 return REDIS_OK;
2433}
2434
2435/* Try to encode a string object in order to save space */
2436static int tryObjectEncoding(robj *o) {
2437 long value;
2438 sds s = o->ptr;
2439
2440 if (o->encoding != REDIS_ENCODING_RAW)
2441 return REDIS_ERR; /* Already encoded */
2442
2443 /* It's not save to encode shared objects: shared objects can be shared
2444 * everywhere in the "object space" of Redis. Encoded objects can only
2445 * appear as "values" (and not, for instance, as keys) */
2446 if (o->refcount > 1) return REDIS_ERR;
2447
2448 /* Currently we try to encode only strings */
2449 redisAssert(o->type == REDIS_STRING);
2450
2451 /* Check if we can represent this string as a long integer */
2452 if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return REDIS_ERR;
2453
2454 /* Ok, this object can be encoded */
2455 o->encoding = REDIS_ENCODING_INT;
2456 sdsfree(o->ptr);
2457 o->ptr = (void*) value;
2458 return REDIS_OK;
2459}
2460
2461/* Get a decoded version of an encoded object (returned as a new object).
2462 * If the object is already raw-encoded just increment the ref count. */
2463static robj *getDecodedObject(robj *o) {
2464 robj *dec;
2465
2466 if (o->encoding == REDIS_ENCODING_RAW) {
2467 incrRefCount(o);
2468 return o;
2469 }
2470 if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) {
2471 char buf[32];
2472
2473 snprintf(buf,32,"%ld",(long)o->ptr);
2474 dec = createStringObject(buf,strlen(buf));
2475 return dec;
2476 } else {
2477 redisAssert(1 != 1);
2478 }
2479}
2480
2481/* Compare two string objects via strcmp() or alike.
2482 * Note that the objects may be integer-encoded. In such a case we
2483 * use snprintf() to get a string representation of the numbers on the stack
2484 * and compare the strings, it's much faster than calling getDecodedObject().
2485 *
2486 * Important note: if objects are not integer encoded, but binary-safe strings,
2487 * sdscmp() from sds.c will apply memcmp() so this function ca be considered
2488 * binary safe. */
2489static int compareStringObjects(robj *a, robj *b) {
2490 redisAssert(a->type == REDIS_STRING && b->type == REDIS_STRING);
2491 char bufa[128], bufb[128], *astr, *bstr;
2492 int bothsds = 1;
2493
2494 if (a == b) return 0;
2495 if (a->encoding != REDIS_ENCODING_RAW) {
2496 snprintf(bufa,sizeof(bufa),"%ld",(long) a->ptr);
2497 astr = bufa;
2498 bothsds = 0;
2499 } else {
2500 astr = a->ptr;
2501 }
2502 if (b->encoding != REDIS_ENCODING_RAW) {
2503 snprintf(bufb,sizeof(bufb),"%ld",(long) b->ptr);
2504 bstr = bufb;
2505 bothsds = 0;
2506 } else {
2507 bstr = b->ptr;
2508 }
2509 return bothsds ? sdscmp(astr,bstr) : strcmp(astr,bstr);
2510}
2511
2512static size_t stringObjectLen(robj *o) {
2513 redisAssert(o->type == REDIS_STRING);
2514 if (o->encoding == REDIS_ENCODING_RAW) {
2515 return sdslen(o->ptr);
2516 } else {
2517 char buf[32];
2518
2519 return snprintf(buf,32,"%ld",(long)o->ptr);
2520 }
2521}
2522
2523/*============================ RDB saving/loading =========================== */
2524
2525static int rdbSaveType(FILE *fp, unsigned char type) {
2526 if (fwrite(&type,1,1,fp) == 0) return -1;
2527 return 0;
2528}
2529
2530static int rdbSaveTime(FILE *fp, time_t t) {
2531 int32_t t32 = (int32_t) t;
2532 if (fwrite(&t32,4,1,fp) == 0) return -1;
2533 return 0;
2534}
2535
2536/* check rdbLoadLen() comments for more info */
2537static int rdbSaveLen(FILE *fp, uint32_t len) {
2538 unsigned char buf[2];
2539
2540 if (len < (1<<6)) {
2541 /* Save a 6 bit len */
2542 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
2543 if (fwrite(buf,1,1,fp) == 0) return -1;
2544 } else if (len < (1<<14)) {
2545 /* Save a 14 bit len */
2546 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
2547 buf[1] = len&0xFF;
2548 if (fwrite(buf,2,1,fp) == 0) return -1;
2549 } else {
2550 /* Save a 32 bit len */
2551 buf[0] = (REDIS_RDB_32BITLEN<<6);
2552 if (fwrite(buf,1,1,fp) == 0) return -1;
2553 len = htonl(len);
2554 if (fwrite(&len,4,1,fp) == 0) return -1;
2555 }
2556 return 0;
2557}
2558
2559/* String objects in the form "2391" "-100" without any space and with a
2560 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2561 * encoded as integers to save space */
2562static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
2563 long long value;
2564 char *endptr, buf[32];
2565
2566 /* Check if it's possible to encode this value as a number */
2567 value = strtoll(s, &endptr, 10);
2568 if (endptr[0] != '\0') return 0;
2569 snprintf(buf,32,"%lld",value);
2570
2571 /* If the number converted back into a string is not identical
2572 * then it's not possible to encode the string as integer */
2573 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
2574
2575 /* Finally check if it fits in our ranges */
2576 if (value >= -(1<<7) && value <= (1<<7)-1) {
2577 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
2578 enc[1] = value&0xFF;
2579 return 2;
2580 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
2581 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
2582 enc[1] = value&0xFF;
2583 enc[2] = (value>>8)&0xFF;
2584 return 3;
2585 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
2586 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
2587 enc[1] = value&0xFF;
2588 enc[2] = (value>>8)&0xFF;
2589 enc[3] = (value>>16)&0xFF;
2590 enc[4] = (value>>24)&0xFF;
2591 return 5;
2592 } else {
2593 return 0;
2594 }
2595}
2596
2597static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
2598 unsigned int comprlen, outlen;
2599 unsigned char byte;
2600 void *out;
2601
2602 /* We require at least four bytes compression for this to be worth it */
2603 outlen = sdslen(obj->ptr)-4;
2604 if (outlen <= 0) return 0;
2605 if ((out = zmalloc(outlen+1)) == NULL) return 0;
2606 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
2607 if (comprlen == 0) {
2608 zfree(out);
2609 return 0;
2610 }
2611 /* Data compressed! Let's save it on disk */
2612 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
2613 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
2614 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
2615 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
2616 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
2617 zfree(out);
2618 return comprlen;
2619
2620writeerr:
2621 zfree(out);
2622 return -1;
2623}
2624
2625/* Save a string objet as [len][data] on disk. If the object is a string
2626 * representation of an integer value we try to safe it in a special form */
2627static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
2628 size_t len;
2629 int enclen;
2630
2631 len = sdslen(obj->ptr);
2632
2633 /* Try integer encoding */
2634 if (len <= 11) {
2635 unsigned char buf[5];
2636 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
2637 if (fwrite(buf,enclen,1,fp) == 0) return -1;
2638 return 0;
2639 }
2640 }
2641
2642 /* Try LZF compression - under 20 bytes it's unable to compress even
2643 * aaaaaaaaaaaaaaaaaa so skip it */
2644 if (server.rdbcompression && len > 20) {
2645 int retval;
2646
2647 retval = rdbSaveLzfStringObject(fp,obj);
2648 if (retval == -1) return -1;
2649 if (retval > 0) return 0;
2650 /* retval == 0 means data can't be compressed, save the old way */
2651 }
2652
2653 /* Store verbatim */
2654 if (rdbSaveLen(fp,len) == -1) return -1;
2655 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
2656 return 0;
2657}
2658
2659/* Like rdbSaveStringObjectRaw() but handle encoded objects */
2660static int rdbSaveStringObject(FILE *fp, robj *obj) {
2661 int retval;
2662
2663 obj = getDecodedObject(obj);
2664 retval = rdbSaveStringObjectRaw(fp,obj);
2665 decrRefCount(obj);
2666 return retval;
2667}
2668
2669/* Save a double value. Doubles are saved as strings prefixed by an unsigned
2670 * 8 bit integer specifing the length of the representation.
2671 * This 8 bit integer has special values in order to specify the following
2672 * conditions:
2673 * 253: not a number
2674 * 254: + inf
2675 * 255: - inf
2676 */
2677static int rdbSaveDoubleValue(FILE *fp, double val) {
2678 unsigned char buf[128];
2679 int len;
2680
2681 if (isnan(val)) {
2682 buf[0] = 253;
2683 len = 1;
2684 } else if (!isfinite(val)) {
2685 len = 1;
2686 buf[0] = (val < 0) ? 255 : 254;
2687 } else {
2688 snprintf((char*)buf+1,sizeof(buf)-1,"%.17g",val);
2689 buf[0] = strlen((char*)buf+1);
2690 len = buf[0]+1;
2691 }
2692 if (fwrite(buf,len,1,fp) == 0) return -1;
2693 return 0;
2694}
2695
2696/* Save a Redis object. */
2697static int rdbSaveObject(FILE *fp, robj *o) {
2698 if (o->type == REDIS_STRING) {
2699 /* Save a string value */
2700 if (rdbSaveStringObject(fp,o) == -1) return -1;
2701 } else if (o->type == REDIS_LIST) {
2702 /* Save a list value */
2703 list *list = o->ptr;
2704 listNode *ln;
2705
2706 listRewind(list);
2707 if (rdbSaveLen(fp,listLength(list)) == -1) return -1;
2708 while((ln = listYield(list))) {
2709 robj *eleobj = listNodeValue(ln);
2710
2711 if (rdbSaveStringObject(fp,eleobj) == -1) return -1;
2712 }
2713 } else if (o->type == REDIS_SET) {
2714 /* Save a set value */
2715 dict *set = o->ptr;
2716 dictIterator *di = dictGetIterator(set);
2717 dictEntry *de;
2718
2719 if (rdbSaveLen(fp,dictSize(set)) == -1) return -1;
2720 while((de = dictNext(di)) != NULL) {
2721 robj *eleobj = dictGetEntryKey(de);
2722
2723 if (rdbSaveStringObject(fp,eleobj) == -1) return -1;
2724 }
2725 dictReleaseIterator(di);
2726 } else if (o->type == REDIS_ZSET) {
2727 /* Save a set value */
2728 zset *zs = o->ptr;
2729 dictIterator *di = dictGetIterator(zs->dict);
2730 dictEntry *de;
2731
2732 if (rdbSaveLen(fp,dictSize(zs->dict)) == -1) return -1;
2733 while((de = dictNext(di)) != NULL) {
2734 robj *eleobj = dictGetEntryKey(de);
2735 double *score = dictGetEntryVal(de);
2736
2737 if (rdbSaveStringObject(fp,eleobj) == -1) return -1;
2738 if (rdbSaveDoubleValue(fp,*score) == -1) return -1;
2739 }
2740 dictReleaseIterator(di);
2741 } else {
2742 redisAssert(0 != 0);
2743 }
2744 return 0;
2745}
2746
2747/* Return the length the object will have on disk if saved with
2748 * the rdbSaveObject() function. Currently we use a trick to get
2749 * this length with very little changes to the code. In the future
2750 * we could switch to a faster solution. */
2751static off_t rdbSavedObjectLen(robj *o) {
2752 static FILE *fp = NULL;
2753
2754 if (fp == NULL) fp = fopen("/dev/null","w");
2755 assert(fp != NULL);
2756
2757 rewind(fp);
2758 assert(rdbSaveObject(fp,o) != 1);
2759 return ftello(fp);
2760}
2761
2762/* Return the number of pages required to save this object in the swap file */
2763static off_t rdbSavedObjectPages(robj *o) {
2764 off_t bytes = rdbSavedObjectLen(o);
2765
2766 return (bytes+(server.vm_page_size-1))/server.vm_page_size;
2767}
2768
2769/* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2770static int rdbSave(char *filename) {
2771 dictIterator *di = NULL;
2772 dictEntry *de;
2773 FILE *fp;
2774 char tmpfile[256];
2775 int j;
2776 time_t now = time(NULL);
2777
2778 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
2779 fp = fopen(tmpfile,"w");
2780 if (!fp) {
2781 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
2782 return REDIS_ERR;
2783 }
2784 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
2785 for (j = 0; j < server.dbnum; j++) {
2786 redisDb *db = server.db+j;
2787 dict *d = db->dict;
2788 if (dictSize(d) == 0) continue;
2789 di = dictGetIterator(d);
2790 if (!di) {
2791 fclose(fp);
2792 return REDIS_ERR;
2793 }
2794
2795 /* Write the SELECT DB opcode */
2796 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
2797 if (rdbSaveLen(fp,j) == -1) goto werr;
2798
2799 /* Iterate this DB writing every entry */
2800 while((de = dictNext(di)) != NULL) {
2801 robj *key = dictGetEntryKey(de);
2802 robj *o = dictGetEntryVal(de);
2803 time_t expiretime = getExpire(db,key);
2804
2805 /* Save the expire time */
2806 if (expiretime != -1) {
2807 /* If this key is already expired skip it */
2808 if (expiretime < now) continue;
2809 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
2810 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
2811 }
2812 /* Save the key and associated value */
2813 if (rdbSaveType(fp,o->type) == -1) goto werr;
2814 if (rdbSaveStringObject(fp,key) == -1) goto werr;
2815 /* Save the actual value */
2816 if (rdbSaveObject(fp,o) == -1) goto werr;
2817 }
2818 dictReleaseIterator(di);
2819 }
2820 /* EOF opcode */
2821 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
2822
2823 /* Make sure data will not remain on the OS's output buffers */
2824 fflush(fp);
2825 fsync(fileno(fp));
2826 fclose(fp);
2827
2828 /* Use RENAME to make sure the DB file is changed atomically only
2829 * if the generate DB file is ok. */
2830 if (rename(tmpfile,filename) == -1) {
2831 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
2832 unlink(tmpfile);
2833 return REDIS_ERR;
2834 }
2835 redisLog(REDIS_NOTICE,"DB saved on disk");
2836 server.dirty = 0;
2837 server.lastsave = time(NULL);
2838 return REDIS_OK;
2839
2840werr:
2841 fclose(fp);
2842 unlink(tmpfile);
2843 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
2844 if (di) dictReleaseIterator(di);
2845 return REDIS_ERR;
2846}
2847
2848static int rdbSaveBackground(char *filename) {
2849 pid_t childpid;
2850
2851 if (server.bgsavechildpid != -1) return REDIS_ERR;
2852 if ((childpid = fork()) == 0) {
2853 /* Child */
2854 close(server.fd);
2855 if (rdbSave(filename) == REDIS_OK) {
2856 exit(0);
2857 } else {
2858 exit(1);
2859 }
2860 } else {
2861 /* Parent */
2862 if (childpid == -1) {
2863 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
2864 strerror(errno));
2865 return REDIS_ERR;
2866 }
2867 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
2868 server.bgsavechildpid = childpid;
2869 return REDIS_OK;
2870 }
2871 return REDIS_OK; /* unreached */
2872}
2873
2874static void rdbRemoveTempFile(pid_t childpid) {
2875 char tmpfile[256];
2876
2877 snprintf(tmpfile,256,"temp-%d.rdb", (int) childpid);
2878 unlink(tmpfile);
2879}
2880
2881static int rdbLoadType(FILE *fp) {
2882 unsigned char type;
2883 if (fread(&type,1,1,fp) == 0) return -1;
2884 return type;
2885}
2886
2887static time_t rdbLoadTime(FILE *fp) {
2888 int32_t t32;
2889 if (fread(&t32,4,1,fp) == 0) return -1;
2890 return (time_t) t32;
2891}
2892
2893/* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2894 * of this file for a description of how this are stored on disk.
2895 *
2896 * isencoded is set to 1 if the readed length is not actually a length but
2897 * an "encoding type", check the above comments for more info */
2898static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
2899 unsigned char buf[2];
2900 uint32_t len;
2901
2902 if (isencoded) *isencoded = 0;
2903 if (rdbver == 0) {
2904 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2905 return ntohl(len);
2906 } else {
2907 int type;
2908
2909 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2910 type = (buf[0]&0xC0)>>6;
2911 if (type == REDIS_RDB_6BITLEN) {
2912 /* Read a 6 bit len */
2913 return buf[0]&0x3F;
2914 } else if (type == REDIS_RDB_ENCVAL) {
2915 /* Read a 6 bit len encoding type */
2916 if (isencoded) *isencoded = 1;
2917 return buf[0]&0x3F;
2918 } else if (type == REDIS_RDB_14BITLEN) {
2919 /* Read a 14 bit len */
2920 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2921 return ((buf[0]&0x3F)<<8)|buf[1];
2922 } else {
2923 /* Read a 32 bit len */
2924 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2925 return ntohl(len);
2926 }
2927 }
2928}
2929
2930static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2931 unsigned char enc[4];
2932 long long val;
2933
2934 if (enctype == REDIS_RDB_ENC_INT8) {
2935 if (fread(enc,1,1,fp) == 0) return NULL;
2936 val = (signed char)enc[0];
2937 } else if (enctype == REDIS_RDB_ENC_INT16) {
2938 uint16_t v;
2939 if (fread(enc,2,1,fp) == 0) return NULL;
2940 v = enc[0]|(enc[1]<<8);
2941 val = (int16_t)v;
2942 } else if (enctype == REDIS_RDB_ENC_INT32) {
2943 uint32_t v;
2944 if (fread(enc,4,1,fp) == 0) return NULL;
2945 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2946 val = (int32_t)v;
2947 } else {
2948 val = 0; /* anti-warning */
2949 redisAssert(0!=0);
2950 }
2951 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2952}
2953
2954static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2955 unsigned int len, clen;
2956 unsigned char *c = NULL;
2957 sds val = NULL;
2958
2959 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2960 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2961 if ((c = zmalloc(clen)) == NULL) goto err;
2962 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2963 if (fread(c,clen,1,fp) == 0) goto err;
2964 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2965 zfree(c);
2966 return createObject(REDIS_STRING,val);
2967err:
2968 zfree(c);
2969 sdsfree(val);
2970 return NULL;
2971}
2972
2973static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2974 int isencoded;
2975 uint32_t len;
2976 sds val;
2977
2978 len = rdbLoadLen(fp,rdbver,&isencoded);
2979 if (isencoded) {
2980 switch(len) {
2981 case REDIS_RDB_ENC_INT8:
2982 case REDIS_RDB_ENC_INT16:
2983 case REDIS_RDB_ENC_INT32:
2984 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
2985 case REDIS_RDB_ENC_LZF:
2986 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
2987 default:
2988 redisAssert(0!=0);
2989 }
2990 }
2991
2992 if (len == REDIS_RDB_LENERR) return NULL;
2993 val = sdsnewlen(NULL,len);
2994 if (len && fread(val,len,1,fp) == 0) {
2995 sdsfree(val);
2996 return NULL;
2997 }
2998 return tryObjectSharing(createObject(REDIS_STRING,val));
2999}
3000
3001/* For information about double serialization check rdbSaveDoubleValue() */
3002static int rdbLoadDoubleValue(FILE *fp, double *val) {
3003 char buf[128];
3004 unsigned char len;
3005
3006 if (fread(&len,1,1,fp) == 0) return -1;
3007 switch(len) {
3008 case 255: *val = R_NegInf; return 0;
3009 case 254: *val = R_PosInf; return 0;
3010 case 253: *val = R_Nan; return 0;
3011 default:
3012 if (fread(buf,len,1,fp) == 0) return -1;
3013 buf[len] = '\0';
3014 sscanf(buf, "%lg", val);
3015 return 0;
3016 }
3017}
3018
3019static int rdbLoad(char *filename) {
3020 FILE *fp;
3021 robj *keyobj = NULL;
3022 uint32_t dbid;
3023 int type, retval, rdbver;
3024 dict *d = server.db[0].dict;
3025 redisDb *db = server.db+0;
3026 char buf[1024];
3027 time_t expiretime = -1, now = time(NULL);
3028
3029 fp = fopen(filename,"r");
3030 if (!fp) return REDIS_ERR;
3031 if (fread(buf,9,1,fp) == 0) goto eoferr;
3032 buf[9] = '\0';
3033 if (memcmp(buf,"REDIS",5) != 0) {
3034 fclose(fp);
3035 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
3036 return REDIS_ERR;
3037 }
3038 rdbver = atoi(buf+5);
3039 if (rdbver > 1) {
3040 fclose(fp);
3041 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
3042 return REDIS_ERR;
3043 }
3044 while(1) {
3045 robj *o;
3046
3047 /* Read type. */
3048 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
3049 if (type == REDIS_EXPIRETIME) {
3050 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
3051 /* We read the time so we need to read the object type again */
3052 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
3053 }
3054 if (type == REDIS_EOF) break;
3055 /* Handle SELECT DB opcode as a special case */
3056 if (type == REDIS_SELECTDB) {
3057 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
3058 goto eoferr;
3059 if (dbid >= (unsigned)server.dbnum) {
3060 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
3061 exit(1);
3062 }
3063 db = server.db+dbid;
3064 d = db->dict;
3065 continue;
3066 }
3067 /* Read key */
3068 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
3069
3070 if (type == REDIS_STRING) {
3071 /* Read string value */
3072 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
3073 tryObjectEncoding(o);
3074 } else if (type == REDIS_LIST || type == REDIS_SET) {
3075 /* Read list/set value */
3076 uint32_t listlen;
3077
3078 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
3079 goto eoferr;
3080 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
3081 /* Load every single element of the list/set */
3082 while(listlen--) {
3083 robj *ele;
3084
3085 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
3086 tryObjectEncoding(ele);
3087 if (type == REDIS_LIST) {
3088 listAddNodeTail((list*)o->ptr,ele);
3089 } else {
3090 dictAdd((dict*)o->ptr,ele,NULL);
3091 }
3092 }
3093 } else if (type == REDIS_ZSET) {
3094 /* Read list/set value */
3095 uint32_t zsetlen;
3096 zset *zs;
3097
3098 if ((zsetlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
3099 goto eoferr;
3100 o = createZsetObject();
3101 zs = o->ptr;
3102 /* Load every single element of the list/set */
3103 while(zsetlen--) {
3104 robj *ele;
3105 double *score = zmalloc(sizeof(double));
3106
3107 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
3108 tryObjectEncoding(ele);
3109 if (rdbLoadDoubleValue(fp,score) == -1) goto eoferr;
3110 dictAdd(zs->dict,ele,score);
3111 zslInsert(zs->zsl,*score,ele);
3112 incrRefCount(ele); /* added to skiplist */
3113 }
3114 } else {
3115 redisAssert(0 != 0);
3116 }
3117 /* Add the new object in the hash table */
3118 retval = dictAdd(d,keyobj,o);
3119 if (retval == DICT_ERR) {
3120 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
3121 exit(1);
3122 }
3123 /* Set the expire time if needed */
3124 if (expiretime != -1) {
3125 setExpire(db,keyobj,expiretime);
3126 /* Delete this key if already expired */
3127 if (expiretime < now) deleteKey(db,keyobj);
3128 expiretime = -1;
3129 }
3130 keyobj = o = NULL;
3131 }
3132 fclose(fp);
3133 return REDIS_OK;
3134
3135eoferr: /* unexpected end of file is handled here with a fatal exit */
3136 if (keyobj) decrRefCount(keyobj);
3137 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
3138 exit(1);
3139 return REDIS_ERR; /* Just to avoid warning */
3140}
3141
3142/*================================== Commands =============================== */
3143
3144static void authCommand(redisClient *c) {
3145 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
3146 c->authenticated = 1;
3147 addReply(c,shared.ok);
3148 } else {
3149 c->authenticated = 0;
3150 addReplySds(c,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
3151 }
3152}
3153
3154static void pingCommand(redisClient *c) {
3155 addReply(c,shared.pong);
3156}
3157
3158static void echoCommand(redisClient *c) {
3159 addReplyBulkLen(c,c->argv[1]);
3160 addReply(c,c->argv[1]);
3161 addReply(c,shared.crlf);
3162}
3163
3164/*=================================== Strings =============================== */
3165
3166static void setGenericCommand(redisClient *c, int nx) {
3167 int retval;
3168
3169 if (nx) deleteIfVolatile(c->db,c->argv[1]);
3170 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
3171 if (retval == DICT_ERR) {
3172 if (!nx) {
3173 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
3174 incrRefCount(c->argv[2]);
3175 } else {
3176 addReply(c,shared.czero);
3177 return;
3178 }
3179 } else {
3180 incrRefCount(c->argv[1]);
3181 incrRefCount(c->argv[2]);
3182 }
3183 server.dirty++;
3184 removeExpire(c->db,c->argv[1]);
3185 addReply(c, nx ? shared.cone : shared.ok);
3186}
3187
3188static void setCommand(redisClient *c) {
3189 setGenericCommand(c,0);
3190}
3191
3192static void setnxCommand(redisClient *c) {
3193 setGenericCommand(c,1);
3194}
3195
3196static int getGenericCommand(redisClient *c) {
3197 robj *o = lookupKeyRead(c->db,c->argv[1]);
3198
3199 if (o == NULL) {
3200 addReply(c,shared.nullbulk);
3201 return REDIS_OK;
3202 } else {
3203 if (o->type != REDIS_STRING) {
3204 addReply(c,shared.wrongtypeerr);
3205 return REDIS_ERR;
3206 } else {
3207 addReplyBulkLen(c,o);
3208 addReply(c,o);
3209 addReply(c,shared.crlf);
3210 return REDIS_OK;
3211 }
3212 }
3213}
3214
3215static void getCommand(redisClient *c) {
3216 getGenericCommand(c);
3217}
3218
3219static void getsetCommand(redisClient *c) {
3220 if (getGenericCommand(c) == REDIS_ERR) return;
3221 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
3222 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
3223 } else {
3224 incrRefCount(c->argv[1]);
3225 }
3226 incrRefCount(c->argv[2]);
3227 server.dirty++;
3228 removeExpire(c->db,c->argv[1]);
3229}
3230
3231static void mgetCommand(redisClient *c) {
3232 int j;
3233
3234 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
3235 for (j = 1; j < c->argc; j++) {
3236 robj *o = lookupKeyRead(c->db,c->argv[j]);
3237 if (o == NULL) {
3238 addReply(c,shared.nullbulk);
3239 } else {
3240 if (o->type != REDIS_STRING) {
3241 addReply(c,shared.nullbulk);
3242 } else {
3243 addReplyBulkLen(c,o);
3244 addReply(c,o);
3245 addReply(c,shared.crlf);
3246 }
3247 }
3248 }
3249}
3250
3251static void msetGenericCommand(redisClient *c, int nx) {
3252 int j, busykeys = 0;
3253
3254 if ((c->argc % 2) == 0) {
3255 addReplySds(c,sdsnew("-ERR wrong number of arguments for MSET\r\n"));
3256 return;
3257 }
3258 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
3259 * set nothing at all if at least one already key exists. */
3260 if (nx) {
3261 for (j = 1; j < c->argc; j += 2) {
3262 if (lookupKeyWrite(c->db,c->argv[j]) != NULL) {
3263 busykeys++;
3264 }
3265 }
3266 }
3267 if (busykeys) {
3268 addReply(c, shared.czero);
3269 return;
3270 }
3271
3272 for (j = 1; j < c->argc; j += 2) {
3273 int retval;
3274
3275 tryObjectEncoding(c->argv[j+1]);
3276 retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
3277 if (retval == DICT_ERR) {
3278 dictReplace(c->db->dict,c->argv[j],c->argv[j+1]);
3279 incrRefCount(c->argv[j+1]);
3280 } else {
3281 incrRefCount(c->argv[j]);
3282 incrRefCount(c->argv[j+1]);
3283 }
3284 removeExpire(c->db,c->argv[j]);
3285 }
3286 server.dirty += (c->argc-1)/2;
3287 addReply(c, nx ? shared.cone : shared.ok);
3288}
3289
3290static void msetCommand(redisClient *c) {
3291 msetGenericCommand(c,0);
3292}
3293
3294static void msetnxCommand(redisClient *c) {
3295 msetGenericCommand(c,1);
3296}
3297
3298static void incrDecrCommand(redisClient *c, long long incr) {
3299 long long value;
3300 int retval;
3301 robj *o;
3302
3303 o = lookupKeyWrite(c->db,c->argv[1]);
3304 if (o == NULL) {
3305 value = 0;
3306 } else {
3307 if (o->type != REDIS_STRING) {
3308 value = 0;
3309 } else {
3310 char *eptr;
3311
3312 if (o->encoding == REDIS_ENCODING_RAW)
3313 value = strtoll(o->ptr, &eptr, 10);
3314 else if (o->encoding == REDIS_ENCODING_INT)
3315 value = (long)o->ptr;
3316 else
3317 redisAssert(1 != 1);
3318 }
3319 }
3320
3321 value += incr;
3322 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
3323 tryObjectEncoding(o);
3324 retval = dictAdd(c->db->dict,c->argv[1],o);
3325 if (retval == DICT_ERR) {
3326 dictReplace(c->db->dict,c->argv[1],o);
3327 removeExpire(c->db,c->argv[1]);
3328 } else {
3329 incrRefCount(c->argv[1]);
3330 }
3331 server.dirty++;
3332 addReply(c,shared.colon);
3333 addReply(c,o);
3334 addReply(c,shared.crlf);
3335}
3336
3337static void incrCommand(redisClient *c) {
3338 incrDecrCommand(c,1);
3339}
3340
3341static void decrCommand(redisClient *c) {
3342 incrDecrCommand(c,-1);
3343}
3344
3345static void incrbyCommand(redisClient *c) {
3346 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
3347 incrDecrCommand(c,incr);
3348}
3349
3350static void decrbyCommand(redisClient *c) {
3351 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
3352 incrDecrCommand(c,-incr);
3353}
3354
3355/* ========================= Type agnostic commands ========================= */
3356
3357static void delCommand(redisClient *c) {
3358 int deleted = 0, j;
3359
3360 for (j = 1; j < c->argc; j++) {
3361 if (deleteKey(c->db,c->argv[j])) {
3362 server.dirty++;
3363 deleted++;
3364 }
3365 }
3366 switch(deleted) {
3367 case 0:
3368 addReply(c,shared.czero);
3369 break;
3370 case 1:
3371 addReply(c,shared.cone);
3372 break;
3373 default:
3374 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
3375 break;
3376 }
3377}
3378
3379static void existsCommand(redisClient *c) {
3380 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
3381}
3382
3383static void selectCommand(redisClient *c) {
3384 int id = atoi(c->argv[1]->ptr);
3385
3386 if (selectDb(c,id) == REDIS_ERR) {
3387 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
3388 } else {
3389 addReply(c,shared.ok);
3390 }
3391}
3392
3393static void randomkeyCommand(redisClient *c) {
3394 dictEntry *de;
3395
3396 while(1) {
3397 de = dictGetRandomKey(c->db->dict);
3398 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
3399 }
3400 if (de == NULL) {
3401 addReply(c,shared.plus);
3402 addReply(c,shared.crlf);
3403 } else {
3404 addReply(c,shared.plus);
3405 addReply(c,dictGetEntryKey(de));
3406 addReply(c,shared.crlf);
3407 }
3408}
3409
3410static void keysCommand(redisClient *c) {
3411 dictIterator *di;
3412 dictEntry *de;
3413 sds pattern = c->argv[1]->ptr;
3414 int plen = sdslen(pattern);
3415 unsigned long numkeys = 0, keyslen = 0;
3416 robj *lenobj = createObject(REDIS_STRING,NULL);
3417
3418 di = dictGetIterator(c->db->dict);
3419 addReply(c,lenobj);
3420 decrRefCount(lenobj);
3421 while((de = dictNext(di)) != NULL) {
3422 robj *keyobj = dictGetEntryKey(de);
3423
3424 sds key = keyobj->ptr;
3425 if ((pattern[0] == '*' && pattern[1] == '\0') ||
3426 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
3427 if (expireIfNeeded(c->db,keyobj) == 0) {
3428 if (numkeys != 0)
3429 addReply(c,shared.space);
3430 addReply(c,keyobj);
3431 numkeys++;
3432 keyslen += sdslen(key);
3433 }
3434 }
3435 }
3436 dictReleaseIterator(di);
3437 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
3438 addReply(c,shared.crlf);
3439}
3440
3441static void dbsizeCommand(redisClient *c) {
3442 addReplySds(c,
3443 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
3444}
3445
3446static void lastsaveCommand(redisClient *c) {
3447 addReplySds(c,
3448 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
3449}
3450
3451static void typeCommand(redisClient *c) {
3452 robj *o;
3453 char *type;
3454
3455 o = lookupKeyRead(c->db,c->argv[1]);
3456 if (o == NULL) {
3457 type = "+none";
3458 } else {
3459 switch(o->type) {
3460 case REDIS_STRING: type = "+string"; break;
3461 case REDIS_LIST: type = "+list"; break;
3462 case REDIS_SET: type = "+set"; break;
3463 case REDIS_ZSET: type = "+zset"; break;
3464 default: type = "unknown"; break;
3465 }
3466 }
3467 addReplySds(c,sdsnew(type));
3468 addReply(c,shared.crlf);
3469}
3470
3471static void saveCommand(redisClient *c) {
3472 if (server.bgsavechildpid != -1) {
3473 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
3474 return;
3475 }
3476 if (rdbSave(server.dbfilename) == REDIS_OK) {
3477 addReply(c,shared.ok);
3478 } else {
3479 addReply(c,shared.err);
3480 }
3481}
3482
3483static void bgsaveCommand(redisClient *c) {
3484 if (server.bgsavechildpid != -1) {
3485 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
3486 return;
3487 }
3488 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
3489 char *status = "+Background saving started\r\n";
3490 addReplySds(c,sdsnew(status));
3491 } else {
3492 addReply(c,shared.err);
3493 }
3494}
3495
3496static void shutdownCommand(redisClient *c) {
3497 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
3498 /* Kill the saving child if there is a background saving in progress.
3499 We want to avoid race conditions, for instance our saving child may
3500 overwrite the synchronous saving did by SHUTDOWN. */
3501 if (server.bgsavechildpid != -1) {
3502 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
3503 kill(server.bgsavechildpid,SIGKILL);
3504 rdbRemoveTempFile(server.bgsavechildpid);
3505 }
3506 if (server.appendonly) {
3507 /* Append only file: fsync() the AOF and exit */
3508 fsync(server.appendfd);
3509 exit(0);
3510 } else {
3511 /* Snapshotting. Perform a SYNC SAVE and exit */
3512 if (rdbSave(server.dbfilename) == REDIS_OK) {
3513 if (server.daemonize)
3514 unlink(server.pidfile);
3515 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
3516 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
3517 exit(0);
3518 } else {
3519 /* Ooops.. error saving! The best we can do is to continue operating.
3520 * Note that if there was a background saving process, in the next
3521 * cron() Redis will be notified that the background saving aborted,
3522 * handling special stuff like slaves pending for synchronization... */
3523 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
3524 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3525 }
3526 }
3527}
3528
3529static void renameGenericCommand(redisClient *c, int nx) {
3530 robj *o;
3531
3532 /* To use the same key as src and dst is probably an error */
3533 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
3534 addReply(c,shared.sameobjecterr);
3535 return;
3536 }
3537
3538 o = lookupKeyWrite(c->db,c->argv[1]);
3539 if (o == NULL) {
3540 addReply(c,shared.nokeyerr);
3541 return;
3542 }
3543 incrRefCount(o);
3544 deleteIfVolatile(c->db,c->argv[2]);
3545 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
3546 if (nx) {
3547 decrRefCount(o);
3548 addReply(c,shared.czero);
3549 return;
3550 }
3551 dictReplace(c->db->dict,c->argv[2],o);
3552 } else {
3553 incrRefCount(c->argv[2]);
3554 }
3555 deleteKey(c->db,c->argv[1]);
3556 server.dirty++;
3557 addReply(c,nx ? shared.cone : shared.ok);
3558}
3559
3560static void renameCommand(redisClient *c) {
3561 renameGenericCommand(c,0);
3562}
3563
3564static void renamenxCommand(redisClient *c) {
3565 renameGenericCommand(c,1);
3566}
3567
3568static void moveCommand(redisClient *c) {
3569 robj *o;
3570 redisDb *src, *dst;
3571 int srcid;
3572
3573 /* Obtain source and target DB pointers */
3574 src = c->db;
3575 srcid = c->db->id;
3576 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
3577 addReply(c,shared.outofrangeerr);
3578 return;
3579 }
3580 dst = c->db;
3581 selectDb(c,srcid); /* Back to the source DB */
3582
3583 /* If the user is moving using as target the same
3584 * DB as the source DB it is probably an error. */
3585 if (src == dst) {
3586 addReply(c,shared.sameobjecterr);
3587 return;
3588 }
3589
3590 /* Check if the element exists and get a reference */
3591 o = lookupKeyWrite(c->db,c->argv[1]);
3592 if (!o) {
3593 addReply(c,shared.czero);
3594 return;
3595 }
3596
3597 /* Try to add the element to the target DB */
3598 deleteIfVolatile(dst,c->argv[1]);
3599 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
3600 addReply(c,shared.czero);
3601 return;
3602 }
3603 incrRefCount(c->argv[1]);
3604 incrRefCount(o);
3605
3606 /* OK! key moved, free the entry in the source DB */
3607 deleteKey(src,c->argv[1]);
3608 server.dirty++;
3609 addReply(c,shared.cone);
3610}
3611
3612/* =================================== Lists ================================ */
3613static void pushGenericCommand(redisClient *c, int where) {
3614 robj *lobj;
3615 list *list;
3616
3617 lobj = lookupKeyWrite(c->db,c->argv[1]);
3618 if (lobj == NULL) {
3619 if (handleClientsWaitingListPush(c,c->argv[1],c->argv[2])) {
3620 addReply(c,shared.ok);
3621 return;
3622 }
3623 lobj = createListObject();
3624 list = lobj->ptr;
3625 if (where == REDIS_HEAD) {
3626 listAddNodeHead(list,c->argv[2]);
3627 } else {
3628 listAddNodeTail(list,c->argv[2]);
3629 }
3630 dictAdd(c->db->dict,c->argv[1],lobj);
3631 incrRefCount(c->argv[1]);
3632 incrRefCount(c->argv[2]);
3633 } else {
3634 if (lobj->type != REDIS_LIST) {
3635 addReply(c,shared.wrongtypeerr);
3636 return;
3637 }
3638 if (handleClientsWaitingListPush(c,c->argv[1],c->argv[2])) {
3639 addReply(c,shared.ok);
3640 return;
3641 }
3642 list = lobj->ptr;
3643 if (where == REDIS_HEAD) {
3644 listAddNodeHead(list,c->argv[2]);
3645 } else {
3646 listAddNodeTail(list,c->argv[2]);
3647 }
3648 incrRefCount(c->argv[2]);
3649 }
3650 server.dirty++;
3651 addReply(c,shared.ok);
3652}
3653
3654static void lpushCommand(redisClient *c) {
3655 pushGenericCommand(c,REDIS_HEAD);
3656}
3657
3658static void rpushCommand(redisClient *c) {
3659 pushGenericCommand(c,REDIS_TAIL);
3660}
3661
3662static void llenCommand(redisClient *c) {
3663 robj *o;
3664 list *l;
3665
3666 o = lookupKeyRead(c->db,c->argv[1]);
3667 if (o == NULL) {
3668 addReply(c,shared.czero);
3669 return;
3670 } else {
3671 if (o->type != REDIS_LIST) {
3672 addReply(c,shared.wrongtypeerr);
3673 } else {
3674 l = o->ptr;
3675 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
3676 }
3677 }
3678}
3679
3680static void lindexCommand(redisClient *c) {
3681 robj *o;
3682 int index = atoi(c->argv[2]->ptr);
3683
3684 o = lookupKeyRead(c->db,c->argv[1]);
3685 if (o == NULL) {
3686 addReply(c,shared.nullbulk);
3687 } else {
3688 if (o->type != REDIS_LIST) {
3689 addReply(c,shared.wrongtypeerr);
3690 } else {
3691 list *list = o->ptr;
3692 listNode *ln;
3693
3694 ln = listIndex(list, index);
3695 if (ln == NULL) {
3696 addReply(c,shared.nullbulk);
3697 } else {
3698 robj *ele = listNodeValue(ln);
3699 addReplyBulkLen(c,ele);
3700 addReply(c,ele);
3701 addReply(c,shared.crlf);
3702 }
3703 }
3704 }
3705}
3706
3707static void lsetCommand(redisClient *c) {
3708 robj *o;
3709 int index = atoi(c->argv[2]->ptr);
3710
3711 o = lookupKeyWrite(c->db,c->argv[1]);
3712 if (o == NULL) {
3713 addReply(c,shared.nokeyerr);
3714 } else {
3715 if (o->type != REDIS_LIST) {
3716 addReply(c,shared.wrongtypeerr);
3717 } else {
3718 list *list = o->ptr;
3719 listNode *ln;
3720
3721 ln = listIndex(list, index);
3722 if (ln == NULL) {
3723 addReply(c,shared.outofrangeerr);
3724 } else {
3725 robj *ele = listNodeValue(ln);
3726
3727 decrRefCount(ele);
3728 listNodeValue(ln) = c->argv[3];
3729 incrRefCount(c->argv[3]);
3730 addReply(c,shared.ok);
3731 server.dirty++;
3732 }
3733 }
3734 }
3735}
3736
3737static void popGenericCommand(redisClient *c, int where) {
3738 robj *o;
3739
3740 o = lookupKeyWrite(c->db,c->argv[1]);
3741 if (o == NULL) {
3742 addReply(c,shared.nullbulk);
3743 } else {
3744 if (o->type != REDIS_LIST) {
3745 addReply(c,shared.wrongtypeerr);
3746 } else {
3747 list *list = o->ptr;
3748 listNode *ln;
3749
3750 if (where == REDIS_HEAD)
3751 ln = listFirst(list);
3752 else
3753 ln = listLast(list);
3754
3755 if (ln == NULL) {
3756 addReply(c,shared.nullbulk);
3757 } else {
3758 robj *ele = listNodeValue(ln);
3759 addReplyBulkLen(c,ele);
3760 addReply(c,ele);
3761 addReply(c,shared.crlf);
3762 listDelNode(list,ln);
3763 server.dirty++;
3764 }
3765 }
3766 }
3767}
3768
3769static void lpopCommand(redisClient *c) {
3770 popGenericCommand(c,REDIS_HEAD);
3771}
3772
3773static void rpopCommand(redisClient *c) {
3774 popGenericCommand(c,REDIS_TAIL);
3775}
3776
3777static void lrangeCommand(redisClient *c) {
3778 robj *o;
3779 int start = atoi(c->argv[2]->ptr);
3780 int end = atoi(c->argv[3]->ptr);
3781
3782 o = lookupKeyRead(c->db,c->argv[1]);
3783 if (o == NULL) {
3784 addReply(c,shared.nullmultibulk);
3785 } else {
3786 if (o->type != REDIS_LIST) {
3787 addReply(c,shared.wrongtypeerr);
3788 } else {
3789 list *list = o->ptr;
3790 listNode *ln;
3791 int llen = listLength(list);
3792 int rangelen, j;
3793 robj *ele;
3794
3795 /* convert negative indexes */
3796 if (start < 0) start = llen+start;
3797 if (end < 0) end = llen+end;
3798 if (start < 0) start = 0;
3799 if (end < 0) end = 0;
3800
3801 /* indexes sanity checks */
3802 if (start > end || start >= llen) {
3803 /* Out of range start or start > end result in empty list */
3804 addReply(c,shared.emptymultibulk);
3805 return;
3806 }
3807 if (end >= llen) end = llen-1;
3808 rangelen = (end-start)+1;
3809
3810 /* Return the result in form of a multi-bulk reply */
3811 ln = listIndex(list, start);
3812 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
3813 for (j = 0; j < rangelen; j++) {
3814 ele = listNodeValue(ln);
3815 addReplyBulkLen(c,ele);
3816 addReply(c,ele);
3817 addReply(c,shared.crlf);
3818 ln = ln->next;
3819 }
3820 }
3821 }
3822}
3823
3824static void ltrimCommand(redisClient *c) {
3825 robj *o;
3826 int start = atoi(c->argv[2]->ptr);
3827 int end = atoi(c->argv[3]->ptr);
3828
3829 o = lookupKeyWrite(c->db,c->argv[1]);
3830 if (o == NULL) {
3831 addReply(c,shared.ok);
3832 } else {
3833 if (o->type != REDIS_LIST) {
3834 addReply(c,shared.wrongtypeerr);
3835 } else {
3836 list *list = o->ptr;
3837 listNode *ln;
3838 int llen = listLength(list);
3839 int j, ltrim, rtrim;
3840
3841 /* convert negative indexes */
3842 if (start < 0) start = llen+start;
3843 if (end < 0) end = llen+end;
3844 if (start < 0) start = 0;
3845 if (end < 0) end = 0;
3846
3847 /* indexes sanity checks */
3848 if (start > end || start >= llen) {
3849 /* Out of range start or start > end result in empty list */
3850 ltrim = llen;
3851 rtrim = 0;
3852 } else {
3853 if (end >= llen) end = llen-1;
3854 ltrim = start;
3855 rtrim = llen-end-1;
3856 }
3857
3858 /* Remove list elements to perform the trim */
3859 for (j = 0; j < ltrim; j++) {
3860 ln = listFirst(list);
3861 listDelNode(list,ln);
3862 }
3863 for (j = 0; j < rtrim; j++) {
3864 ln = listLast(list);
3865 listDelNode(list,ln);
3866 }
3867 server.dirty++;
3868 addReply(c,shared.ok);
3869 }
3870 }
3871}
3872
3873static void lremCommand(redisClient *c) {
3874 robj *o;
3875
3876 o = lookupKeyWrite(c->db,c->argv[1]);
3877 if (o == NULL) {
3878 addReply(c,shared.czero);
3879 } else {
3880 if (o->type != REDIS_LIST) {
3881 addReply(c,shared.wrongtypeerr);
3882 } else {
3883 list *list = o->ptr;
3884 listNode *ln, *next;
3885 int toremove = atoi(c->argv[2]->ptr);
3886 int removed = 0;
3887 int fromtail = 0;
3888
3889 if (toremove < 0) {
3890 toremove = -toremove;
3891 fromtail = 1;
3892 }
3893 ln = fromtail ? list->tail : list->head;
3894 while (ln) {
3895 robj *ele = listNodeValue(ln);
3896
3897 next = fromtail ? ln->prev : ln->next;
3898 if (compareStringObjects(ele,c->argv[3]) == 0) {
3899 listDelNode(list,ln);
3900 server.dirty++;
3901 removed++;
3902 if (toremove && removed == toremove) break;
3903 }
3904 ln = next;
3905 }
3906 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
3907 }
3908 }
3909}
3910
3911/* This is the semantic of this command:
3912 * RPOPLPUSH srclist dstlist:
3913 * IF LLEN(srclist) > 0
3914 * element = RPOP srclist
3915 * LPUSH dstlist element
3916 * RETURN element
3917 * ELSE
3918 * RETURN nil
3919 * END
3920 * END
3921 *
3922 * The idea is to be able to get an element from a list in a reliable way
3923 * since the element is not just returned but pushed against another list
3924 * as well. This command was originally proposed by Ezra Zygmuntowicz.
3925 */
3926static void rpoplpushcommand(redisClient *c) {
3927 robj *sobj;
3928
3929 sobj = lookupKeyWrite(c->db,c->argv[1]);
3930 if (sobj == NULL) {
3931 addReply(c,shared.nullbulk);
3932 } else {
3933 if (sobj->type != REDIS_LIST) {
3934 addReply(c,shared.wrongtypeerr);
3935 } else {
3936 list *srclist = sobj->ptr;
3937 listNode *ln = listLast(srclist);
3938
3939 if (ln == NULL) {
3940 addReply(c,shared.nullbulk);
3941 } else {
3942 robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
3943 robj *ele = listNodeValue(ln);
3944 list *dstlist;
3945
3946 if (dobj && dobj->type != REDIS_LIST) {
3947 addReply(c,shared.wrongtypeerr);
3948 return;
3949 }
3950
3951 /* Add the element to the target list (unless it's directly
3952 * passed to some BLPOP-ing client */
3953 if (!handleClientsWaitingListPush(c,c->argv[2],ele)) {
3954 if (dobj == NULL) {
3955 /* Create the list if the key does not exist */
3956 dobj = createListObject();
3957 dictAdd(c->db->dict,c->argv[2],dobj);
3958 incrRefCount(c->argv[2]);
3959 }
3960 dstlist = dobj->ptr;
3961 listAddNodeHead(dstlist,ele);
3962 incrRefCount(ele);
3963 }
3964
3965 /* Send the element to the client as reply as well */
3966 addReplyBulkLen(c,ele);
3967 addReply(c,ele);
3968 addReply(c,shared.crlf);
3969
3970 /* Finally remove the element from the source list */
3971 listDelNode(srclist,ln);
3972 server.dirty++;
3973 }
3974 }
3975 }
3976}
3977
3978
3979/* ==================================== Sets ================================ */
3980
3981static void saddCommand(redisClient *c) {
3982 robj *set;
3983
3984 set = lookupKeyWrite(c->db,c->argv[1]);
3985 if (set == NULL) {
3986 set = createSetObject();
3987 dictAdd(c->db->dict,c->argv[1],set);
3988 incrRefCount(c->argv[1]);
3989 } else {
3990 if (set->type != REDIS_SET) {
3991 addReply(c,shared.wrongtypeerr);
3992 return;
3993 }
3994 }
3995 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
3996 incrRefCount(c->argv[2]);
3997 server.dirty++;
3998 addReply(c,shared.cone);
3999 } else {
4000 addReply(c,shared.czero);
4001 }
4002}
4003
4004static void sremCommand(redisClient *c) {
4005 robj *set;
4006
4007 set = lookupKeyWrite(c->db,c->argv[1]);
4008 if (set == NULL) {
4009 addReply(c,shared.czero);
4010 } else {
4011 if (set->type != REDIS_SET) {
4012 addReply(c,shared.wrongtypeerr);
4013 return;
4014 }
4015 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
4016 server.dirty++;
4017 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
4018 addReply(c,shared.cone);
4019 } else {
4020 addReply(c,shared.czero);
4021 }
4022 }
4023}
4024
4025static void smoveCommand(redisClient *c) {
4026 robj *srcset, *dstset;
4027
4028 srcset = lookupKeyWrite(c->db,c->argv[1]);
4029 dstset = lookupKeyWrite(c->db,c->argv[2]);
4030
4031 /* If the source key does not exist return 0, if it's of the wrong type
4032 * raise an error */
4033 if (srcset == NULL || srcset->type != REDIS_SET) {
4034 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
4035 return;
4036 }
4037 /* Error if the destination key is not a set as well */
4038 if (dstset && dstset->type != REDIS_SET) {
4039 addReply(c,shared.wrongtypeerr);
4040 return;
4041 }
4042 /* Remove the element from the source set */
4043 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
4044 /* Key not found in the src set! return zero */
4045 addReply(c,shared.czero);
4046 return;
4047 }
4048 server.dirty++;
4049 /* Add the element to the destination set */
4050 if (!dstset) {
4051 dstset = createSetObject();
4052 dictAdd(c->db->dict,c->argv[2],dstset);
4053 incrRefCount(c->argv[2]);
4054 }
4055 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
4056 incrRefCount(c->argv[3]);
4057 addReply(c,shared.cone);
4058}
4059
4060static void sismemberCommand(redisClient *c) {
4061 robj *set;
4062
4063 set = lookupKeyRead(c->db,c->argv[1]);
4064 if (set == NULL) {
4065 addReply(c,shared.czero);
4066 } else {
4067 if (set->type != REDIS_SET) {
4068 addReply(c,shared.wrongtypeerr);
4069 return;
4070 }
4071 if (dictFind(set->ptr,c->argv[2]))
4072 addReply(c,shared.cone);
4073 else
4074 addReply(c,shared.czero);
4075 }
4076}
4077
4078static void scardCommand(redisClient *c) {
4079 robj *o;
4080 dict *s;
4081
4082 o = lookupKeyRead(c->db,c->argv[1]);
4083 if (o == NULL) {
4084 addReply(c,shared.czero);
4085 return;
4086 } else {
4087 if (o->type != REDIS_SET) {
4088 addReply(c,shared.wrongtypeerr);
4089 } else {
4090 s = o->ptr;
4091 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
4092 dictSize(s)));
4093 }
4094 }
4095}
4096
4097static void spopCommand(redisClient *c) {
4098 robj *set;
4099 dictEntry *de;
4100
4101 set = lookupKeyWrite(c->db,c->argv[1]);
4102 if (set == NULL) {
4103 addReply(c,shared.nullbulk);
4104 } else {
4105 if (set->type != REDIS_SET) {
4106 addReply(c,shared.wrongtypeerr);
4107 return;
4108 }
4109 de = dictGetRandomKey(set->ptr);
4110 if (de == NULL) {
4111 addReply(c,shared.nullbulk);
4112 } else {
4113 robj *ele = dictGetEntryKey(de);
4114
4115 addReplyBulkLen(c,ele);
4116 addReply(c,ele);
4117 addReply(c,shared.crlf);
4118 dictDelete(set->ptr,ele);
4119 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
4120 server.dirty++;
4121 }
4122 }
4123}
4124
4125static void srandmemberCommand(redisClient *c) {
4126 robj *set;
4127 dictEntry *de;
4128
4129 set = lookupKeyRead(c->db,c->argv[1]);
4130 if (set == NULL) {
4131 addReply(c,shared.nullbulk);
4132 } else {
4133 if (set->type != REDIS_SET) {
4134 addReply(c,shared.wrongtypeerr);
4135 return;
4136 }
4137 de = dictGetRandomKey(set->ptr);
4138 if (de == NULL) {
4139 addReply(c,shared.nullbulk);
4140 } else {
4141 robj *ele = dictGetEntryKey(de);
4142
4143 addReplyBulkLen(c,ele);
4144 addReply(c,ele);
4145 addReply(c,shared.crlf);
4146 }
4147 }
4148}
4149
4150static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
4151 dict **d1 = (void*) s1, **d2 = (void*) s2;
4152
4153 return dictSize(*d1)-dictSize(*d2);
4154}
4155
4156static void sinterGenericCommand(redisClient *c, robj **setskeys, unsigned long setsnum, robj *dstkey) {
4157 dict **dv = zmalloc(sizeof(dict*)*setsnum);
4158 dictIterator *di;
4159 dictEntry *de;
4160 robj *lenobj = NULL, *dstset = NULL;
4161 unsigned long j, cardinality = 0;
4162
4163 for (j = 0; j < setsnum; j++) {
4164 robj *setobj;
4165
4166 setobj = dstkey ?
4167 lookupKeyWrite(c->db,setskeys[j]) :
4168 lookupKeyRead(c->db,setskeys[j]);
4169 if (!setobj) {
4170 zfree(dv);
4171 if (dstkey) {
4172 if (deleteKey(c->db,dstkey))
4173 server.dirty++;
4174 addReply(c,shared.czero);
4175 } else {
4176 addReply(c,shared.nullmultibulk);
4177 }
4178 return;
4179 }
4180 if (setobj->type != REDIS_SET) {
4181 zfree(dv);
4182 addReply(c,shared.wrongtypeerr);
4183 return;
4184 }
4185 dv[j] = setobj->ptr;
4186 }
4187 /* Sort sets from the smallest to largest, this will improve our
4188 * algorithm's performace */
4189 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
4190
4191 /* The first thing we should output is the total number of elements...
4192 * since this is a multi-bulk write, but at this stage we don't know
4193 * the intersection set size, so we use a trick, append an empty object
4194 * to the output list and save the pointer to later modify it with the
4195 * right length */
4196 if (!dstkey) {
4197 lenobj = createObject(REDIS_STRING,NULL);
4198 addReply(c,lenobj);
4199 decrRefCount(lenobj);
4200 } else {
4201 /* If we have a target key where to store the resulting set
4202 * create this key with an empty set inside */
4203 dstset = createSetObject();
4204 }
4205
4206 /* Iterate all the elements of the first (smallest) set, and test
4207 * the element against all the other sets, if at least one set does
4208 * not include the element it is discarded */
4209 di = dictGetIterator(dv[0]);
4210
4211 while((de = dictNext(di)) != NULL) {
4212 robj *ele;
4213
4214 for (j = 1; j < setsnum; j++)
4215 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
4216 if (j != setsnum)
4217 continue; /* at least one set does not contain the member */
4218 ele = dictGetEntryKey(de);
4219 if (!dstkey) {
4220 addReplyBulkLen(c,ele);
4221 addReply(c,ele);
4222 addReply(c,shared.crlf);
4223 cardinality++;
4224 } else {
4225 dictAdd(dstset->ptr,ele,NULL);
4226 incrRefCount(ele);
4227 }
4228 }
4229 dictReleaseIterator(di);
4230
4231 if (dstkey) {
4232 /* Store the resulting set into the target */
4233 deleteKey(c->db,dstkey);
4234 dictAdd(c->db->dict,dstkey,dstset);
4235 incrRefCount(dstkey);
4236 }
4237
4238 if (!dstkey) {
4239 lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n",cardinality);
4240 } else {
4241 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
4242 dictSize((dict*)dstset->ptr)));
4243 server.dirty++;
4244 }
4245 zfree(dv);
4246}
4247
4248static void sinterCommand(redisClient *c) {
4249 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
4250}
4251
4252static void sinterstoreCommand(redisClient *c) {
4253 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
4254}
4255
4256#define REDIS_OP_UNION 0
4257#define REDIS_OP_DIFF 1
4258
4259static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
4260 dict **dv = zmalloc(sizeof(dict*)*setsnum);
4261 dictIterator *di;
4262 dictEntry *de;
4263 robj *dstset = NULL;
4264 int j, cardinality = 0;
4265
4266 for (j = 0; j < setsnum; j++) {
4267 robj *setobj;
4268
4269 setobj = dstkey ?
4270 lookupKeyWrite(c->db,setskeys[j]) :
4271 lookupKeyRead(c->db,setskeys[j]);
4272 if (!setobj) {
4273 dv[j] = NULL;
4274 continue;
4275 }
4276 if (setobj->type != REDIS_SET) {
4277 zfree(dv);
4278 addReply(c,shared.wrongtypeerr);
4279 return;
4280 }
4281 dv[j] = setobj->ptr;
4282 }
4283
4284 /* We need a temp set object to store our union. If the dstkey
4285 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
4286 * this set object will be the resulting object to set into the target key*/
4287 dstset = createSetObject();
4288
4289 /* Iterate all the elements of all the sets, add every element a single
4290 * time to the result set */
4291 for (j = 0; j < setsnum; j++) {
4292 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
4293 if (!dv[j]) continue; /* non existing keys are like empty sets */
4294
4295 di = dictGetIterator(dv[j]);
4296
4297 while((de = dictNext(di)) != NULL) {
4298 robj *ele;
4299
4300 /* dictAdd will not add the same element multiple times */
4301 ele = dictGetEntryKey(de);
4302 if (op == REDIS_OP_UNION || j == 0) {
4303 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
4304 incrRefCount(ele);
4305 cardinality++;
4306 }
4307 } else if (op == REDIS_OP_DIFF) {
4308 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
4309 cardinality--;
4310 }
4311 }
4312 }
4313 dictReleaseIterator(di);
4314
4315 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
4316 }
4317
4318 /* Output the content of the resulting set, if not in STORE mode */
4319 if (!dstkey) {
4320 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
4321 di = dictGetIterator(dstset->ptr);
4322 while((de = dictNext(di)) != NULL) {
4323 robj *ele;
4324
4325 ele = dictGetEntryKey(de);
4326 addReplyBulkLen(c,ele);
4327 addReply(c,ele);
4328 addReply(c,shared.crlf);
4329 }
4330 dictReleaseIterator(di);
4331 } else {
4332 /* If we have a target key where to store the resulting set
4333 * create this key with the result set inside */
4334 deleteKey(c->db,dstkey);
4335 dictAdd(c->db->dict,dstkey,dstset);
4336 incrRefCount(dstkey);
4337 }
4338
4339 /* Cleanup */
4340 if (!dstkey) {
4341 decrRefCount(dstset);
4342 } else {
4343 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
4344 dictSize((dict*)dstset->ptr)));
4345 server.dirty++;
4346 }
4347 zfree(dv);
4348}
4349
4350static void sunionCommand(redisClient *c) {
4351 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
4352}
4353
4354static void sunionstoreCommand(redisClient *c) {
4355 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
4356}
4357
4358static void sdiffCommand(redisClient *c) {
4359 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
4360}
4361
4362static void sdiffstoreCommand(redisClient *c) {
4363 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
4364}
4365
4366/* ==================================== ZSets =============================== */
4367
4368/* ZSETs are ordered sets using two data structures to hold the same elements
4369 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
4370 * data structure.
4371 *
4372 * The elements are added to an hash table mapping Redis objects to scores.
4373 * At the same time the elements are added to a skip list mapping scores
4374 * to Redis objects (so objects are sorted by scores in this "view"). */
4375
4376/* This skiplist implementation is almost a C translation of the original
4377 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
4378 * Alternative to Balanced Trees", modified in three ways:
4379 * a) this implementation allows for repeated values.
4380 * b) the comparison is not just by key (our 'score') but by satellite data.
4381 * c) there is a back pointer, so it's a doubly linked list with the back
4382 * pointers being only at "level 1". This allows to traverse the list
4383 * from tail to head, useful for ZREVRANGE. */
4384
4385static zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
4386 zskiplistNode *zn = zmalloc(sizeof(*zn));
4387
4388 zn->forward = zmalloc(sizeof(zskiplistNode*) * level);
4389 zn->score = score;
4390 zn->obj = obj;
4391 return zn;
4392}
4393
4394static zskiplist *zslCreate(void) {
4395 int j;
4396 zskiplist *zsl;
4397
4398 zsl = zmalloc(sizeof(*zsl));
4399 zsl->level = 1;
4400 zsl->length = 0;
4401 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
4402 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++)
4403 zsl->header->forward[j] = NULL;
4404 zsl->header->backward = NULL;
4405 zsl->tail = NULL;
4406 return zsl;
4407}
4408
4409static void zslFreeNode(zskiplistNode *node) {
4410 decrRefCount(node->obj);
4411 zfree(node->forward);
4412 zfree(node);
4413}
4414
4415static void zslFree(zskiplist *zsl) {
4416 zskiplistNode *node = zsl->header->forward[0], *next;
4417
4418 zfree(zsl->header->forward);
4419 zfree(zsl->header);
4420 while(node) {
4421 next = node->forward[0];
4422 zslFreeNode(node);
4423 node = next;
4424 }
4425 zfree(zsl);
4426}
4427
4428static int zslRandomLevel(void) {
4429 int level = 1;
4430 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
4431 level += 1;
4432 return level;
4433}
4434
4435static void zslInsert(zskiplist *zsl, double score, robj *obj) {
4436 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4437 int i, level;
4438
4439 x = zsl->header;
4440 for (i = zsl->level-1; i >= 0; i--) {
4441 while (x->forward[i] &&
4442 (x->forward[i]->score < score ||
4443 (x->forward[i]->score == score &&
4444 compareStringObjects(x->forward[i]->obj,obj) < 0)))
4445 x = x->forward[i];
4446 update[i] = x;
4447 }
4448 /* we assume the key is not already inside, since we allow duplicated
4449 * scores, and the re-insertion of score and redis object should never
4450 * happpen since the caller of zslInsert() should test in the hash table
4451 * if the element is already inside or not. */
4452 level = zslRandomLevel();
4453 if (level > zsl->level) {
4454 for (i = zsl->level; i < level; i++)
4455 update[i] = zsl->header;
4456 zsl->level = level;
4457 }
4458 x = zslCreateNode(level,score,obj);
4459 for (i = 0; i < level; i++) {
4460 x->forward[i] = update[i]->forward[i];
4461 update[i]->forward[i] = x;
4462 }
4463 x->backward = (update[0] == zsl->header) ? NULL : update[0];
4464 if (x->forward[0])
4465 x->forward[0]->backward = x;
4466 else
4467 zsl->tail = x;
4468 zsl->length++;
4469}
4470
4471/* Delete an element with matching score/object from the skiplist. */
4472static int zslDelete(zskiplist *zsl, double score, robj *obj) {
4473 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4474 int i;
4475
4476 x = zsl->header;
4477 for (i = zsl->level-1; i >= 0; i--) {
4478 while (x->forward[i] &&
4479 (x->forward[i]->score < score ||
4480 (x->forward[i]->score == score &&
4481 compareStringObjects(x->forward[i]->obj,obj) < 0)))
4482 x = x->forward[i];
4483 update[i] = x;
4484 }
4485 /* We may have multiple elements with the same score, what we need
4486 * is to find the element with both the right score and object. */
4487 x = x->forward[0];
4488 if (x && score == x->score && compareStringObjects(x->obj,obj) == 0) {
4489 for (i = 0; i < zsl->level; i++) {
4490 if (update[i]->forward[i] != x) break;
4491 update[i]->forward[i] = x->forward[i];
4492 }
4493 if (x->forward[0]) {
4494 x->forward[0]->backward = (x->backward == zsl->header) ?
4495 NULL : x->backward;
4496 } else {
4497 zsl->tail = x->backward;
4498 }
4499 zslFreeNode(x);
4500 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4501 zsl->level--;
4502 zsl->length--;
4503 return 1;
4504 } else {
4505 return 0; /* not found */
4506 }
4507 return 0; /* not found */
4508}
4509
4510/* Delete all the elements with score between min and max from the skiplist.
4511 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
4512 * Note that this function takes the reference to the hash table view of the
4513 * sorted set, in order to remove the elements from the hash table too. */
4514static unsigned long zslDeleteRange(zskiplist *zsl, double min, double max, dict *dict) {
4515 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4516 unsigned long removed = 0;
4517 int i;
4518
4519 x = zsl->header;
4520 for (i = zsl->level-1; i >= 0; i--) {
4521 while (x->forward[i] && x->forward[i]->score < min)
4522 x = x->forward[i];
4523 update[i] = x;
4524 }
4525 /* We may have multiple elements with the same score, what we need
4526 * is to find the element with both the right score and object. */
4527 x = x->forward[0];
4528 while (x && x->score <= max) {
4529 zskiplistNode *next;
4530
4531 for (i = 0; i < zsl->level; i++) {
4532 if (update[i]->forward[i] != x) break;
4533 update[i]->forward[i] = x->forward[i];
4534 }
4535 if (x->forward[0]) {
4536 x->forward[0]->backward = (x->backward == zsl->header) ?
4537 NULL : x->backward;
4538 } else {
4539 zsl->tail = x->backward;
4540 }
4541 next = x->forward[0];
4542 dictDelete(dict,x->obj);
4543 zslFreeNode(x);
4544 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4545 zsl->level--;
4546 zsl->length--;
4547 removed++;
4548 x = next;
4549 }
4550 return removed; /* not found */
4551}
4552
4553/* Find the first node having a score equal or greater than the specified one.
4554 * Returns NULL if there is no match. */
4555static zskiplistNode *zslFirstWithScore(zskiplist *zsl, double score) {
4556 zskiplistNode *x;
4557 int i;
4558
4559 x = zsl->header;
4560 for (i = zsl->level-1; i >= 0; i--) {
4561 while (x->forward[i] && x->forward[i]->score < score)
4562 x = x->forward[i];
4563 }
4564 /* We may have multiple elements with the same score, what we need
4565 * is to find the element with both the right score and object. */
4566 return x->forward[0];
4567}
4568
4569/* The actual Z-commands implementations */
4570
4571/* This generic command implements both ZADD and ZINCRBY.
4572 * scoreval is the score if the operation is a ZADD (doincrement == 0) or
4573 * the increment if the operation is a ZINCRBY (doincrement == 1). */
4574static void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double scoreval, int doincrement) {
4575 robj *zsetobj;
4576 zset *zs;
4577 double *score;
4578
4579 zsetobj = lookupKeyWrite(c->db,key);
4580 if (zsetobj == NULL) {
4581 zsetobj = createZsetObject();
4582 dictAdd(c->db->dict,key,zsetobj);
4583 incrRefCount(key);
4584 } else {
4585 if (zsetobj->type != REDIS_ZSET) {
4586 addReply(c,shared.wrongtypeerr);
4587 return;
4588 }
4589 }
4590 zs = zsetobj->ptr;
4591
4592 /* Ok now since we implement both ZADD and ZINCRBY here the code
4593 * needs to handle the two different conditions. It's all about setting
4594 * '*score', that is, the new score to set, to the right value. */
4595 score = zmalloc(sizeof(double));
4596 if (doincrement) {
4597 dictEntry *de;
4598
4599 /* Read the old score. If the element was not present starts from 0 */
4600 de = dictFind(zs->dict,ele);
4601 if (de) {
4602 double *oldscore = dictGetEntryVal(de);
4603 *score = *oldscore + scoreval;
4604 } else {
4605 *score = scoreval;
4606 }
4607 } else {
4608 *score = scoreval;
4609 }
4610
4611 /* What follows is a simple remove and re-insert operation that is common
4612 * to both ZADD and ZINCRBY... */
4613 if (dictAdd(zs->dict,ele,score) == DICT_OK) {
4614 /* case 1: New element */
4615 incrRefCount(ele); /* added to hash */
4616 zslInsert(zs->zsl,*score,ele);
4617 incrRefCount(ele); /* added to skiplist */
4618 server.dirty++;
4619 if (doincrement)
4620 addReplyDouble(c,*score);
4621 else
4622 addReply(c,shared.cone);
4623 } else {
4624 dictEntry *de;
4625 double *oldscore;
4626
4627 /* case 2: Score update operation */
4628 de = dictFind(zs->dict,ele);
4629 redisAssert(de != NULL);
4630 oldscore = dictGetEntryVal(de);
4631 if (*score != *oldscore) {
4632 int deleted;
4633
4634 /* Remove and insert the element in the skip list with new score */
4635 deleted = zslDelete(zs->zsl,*oldscore,ele);
4636 redisAssert(deleted != 0);
4637 zslInsert(zs->zsl,*score,ele);
4638 incrRefCount(ele);
4639 /* Update the score in the hash table */
4640 dictReplace(zs->dict,ele,score);
4641 server.dirty++;
4642 } else {
4643 zfree(score);
4644 }
4645 if (doincrement)
4646 addReplyDouble(c,*score);
4647 else
4648 addReply(c,shared.czero);
4649 }
4650}
4651
4652static void zaddCommand(redisClient *c) {
4653 double scoreval;
4654
4655 scoreval = strtod(c->argv[2]->ptr,NULL);
4656 zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,0);
4657}
4658
4659static void zincrbyCommand(redisClient *c) {
4660 double scoreval;
4661
4662 scoreval = strtod(c->argv[2]->ptr,NULL);
4663 zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,1);
4664}
4665
4666static void zremCommand(redisClient *c) {
4667 robj *zsetobj;
4668 zset *zs;
4669
4670 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4671 if (zsetobj == NULL) {
4672 addReply(c,shared.czero);
4673 } else {
4674 dictEntry *de;
4675 double *oldscore;
4676 int deleted;
4677
4678 if (zsetobj->type != REDIS_ZSET) {
4679 addReply(c,shared.wrongtypeerr);
4680 return;
4681 }
4682 zs = zsetobj->ptr;
4683 de = dictFind(zs->dict,c->argv[2]);
4684 if (de == NULL) {
4685 addReply(c,shared.czero);
4686 return;
4687 }
4688 /* Delete from the skiplist */
4689 oldscore = dictGetEntryVal(de);
4690 deleted = zslDelete(zs->zsl,*oldscore,c->argv[2]);
4691 redisAssert(deleted != 0);
4692
4693 /* Delete from the hash table */
4694 dictDelete(zs->dict,c->argv[2]);
4695 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4696 server.dirty++;
4697 addReply(c,shared.cone);
4698 }
4699}
4700
4701static void zremrangebyscoreCommand(redisClient *c) {
4702 double min = strtod(c->argv[2]->ptr,NULL);
4703 double max = strtod(c->argv[3]->ptr,NULL);
4704 robj *zsetobj;
4705 zset *zs;
4706
4707 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4708 if (zsetobj == NULL) {
4709 addReply(c,shared.czero);
4710 } else {
4711 long deleted;
4712
4713 if (zsetobj->type != REDIS_ZSET) {
4714 addReply(c,shared.wrongtypeerr);
4715 return;
4716 }
4717 zs = zsetobj->ptr;
4718 deleted = zslDeleteRange(zs->zsl,min,max,zs->dict);
4719 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4720 server.dirty += deleted;
4721 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",deleted));
4722 }
4723}
4724
4725static void zrangeGenericCommand(redisClient *c, int reverse) {
4726 robj *o;
4727 int start = atoi(c->argv[2]->ptr);
4728 int end = atoi(c->argv[3]->ptr);
4729 int withscores = 0;
4730
4731 if (c->argc == 5 && !strcasecmp(c->argv[4]->ptr,"withscores")) {
4732 withscores = 1;
4733 } else if (c->argc >= 5) {
4734 addReply(c,shared.syntaxerr);
4735 return;
4736 }
4737
4738 o = lookupKeyRead(c->db,c->argv[1]);
4739 if (o == NULL) {
4740 addReply(c,shared.nullmultibulk);
4741 } else {
4742 if (o->type != REDIS_ZSET) {
4743 addReply(c,shared.wrongtypeerr);
4744 } else {
4745 zset *zsetobj = o->ptr;
4746 zskiplist *zsl = zsetobj->zsl;
4747 zskiplistNode *ln;
4748
4749 int llen = zsl->length;
4750 int rangelen, j;
4751 robj *ele;
4752
4753 /* convert negative indexes */
4754 if (start < 0) start = llen+start;
4755 if (end < 0) end = llen+end;
4756 if (start < 0) start = 0;
4757 if (end < 0) end = 0;
4758
4759 /* indexes sanity checks */
4760 if (start > end || start >= llen) {
4761 /* Out of range start or start > end result in empty list */
4762 addReply(c,shared.emptymultibulk);
4763 return;
4764 }
4765 if (end >= llen) end = llen-1;
4766 rangelen = (end-start)+1;
4767
4768 /* Return the result in form of a multi-bulk reply */
4769 if (reverse) {
4770 ln = zsl->tail;
4771 while (start--)
4772 ln = ln->backward;
4773 } else {
4774 ln = zsl->header->forward[0];
4775 while (start--)
4776 ln = ln->forward[0];
4777 }
4778
4779 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",
4780 withscores ? (rangelen*2) : rangelen));
4781 for (j = 0; j < rangelen; j++) {
4782 ele = ln->obj;
4783 addReplyBulkLen(c,ele);
4784 addReply(c,ele);
4785 addReply(c,shared.crlf);
4786 if (withscores)
4787 addReplyDouble(c,ln->score);
4788 ln = reverse ? ln->backward : ln->forward[0];
4789 }
4790 }
4791 }
4792}
4793
4794static void zrangeCommand(redisClient *c) {
4795 zrangeGenericCommand(c,0);
4796}
4797
4798static void zrevrangeCommand(redisClient *c) {
4799 zrangeGenericCommand(c,1);
4800}
4801
4802static void zrangebyscoreCommand(redisClient *c) {
4803 robj *o;
4804 double min = strtod(c->argv[2]->ptr,NULL);
4805 double max = strtod(c->argv[3]->ptr,NULL);
4806 int offset = 0, limit = -1;
4807
4808 if (c->argc != 4 && c->argc != 7) {
4809 addReplySds(c,
4810 sdsnew("-ERR wrong number of arguments for ZRANGEBYSCORE\r\n"));
4811 return;
4812 } else if (c->argc == 7 && strcasecmp(c->argv[4]->ptr,"limit")) {
4813 addReply(c,shared.syntaxerr);
4814 return;
4815 } else if (c->argc == 7) {
4816 offset = atoi(c->argv[5]->ptr);
4817 limit = atoi(c->argv[6]->ptr);
4818 if (offset < 0) offset = 0;
4819 }
4820
4821 o = lookupKeyRead(c->db,c->argv[1]);
4822 if (o == NULL) {
4823 addReply(c,shared.nullmultibulk);
4824 } else {
4825 if (o->type != REDIS_ZSET) {
4826 addReply(c,shared.wrongtypeerr);
4827 } else {
4828 zset *zsetobj = o->ptr;
4829 zskiplist *zsl = zsetobj->zsl;
4830 zskiplistNode *ln;
4831 robj *ele, *lenobj;
4832 unsigned int rangelen = 0;
4833
4834 /* Get the first node with the score >= min */
4835 ln = zslFirstWithScore(zsl,min);
4836 if (ln == NULL) {
4837 /* No element matching the speciifed interval */
4838 addReply(c,shared.emptymultibulk);
4839 return;
4840 }
4841
4842 /* We don't know in advance how many matching elements there
4843 * are in the list, so we push this object that will represent
4844 * the multi-bulk length in the output buffer, and will "fix"
4845 * it later */
4846 lenobj = createObject(REDIS_STRING,NULL);
4847 addReply(c,lenobj);
4848 decrRefCount(lenobj);
4849
4850 while(ln && ln->score <= max) {
4851 if (offset) {
4852 offset--;
4853 ln = ln->forward[0];
4854 continue;
4855 }
4856 if (limit == 0) break;
4857 ele = ln->obj;
4858 addReplyBulkLen(c,ele);
4859 addReply(c,ele);
4860 addReply(c,shared.crlf);
4861 ln = ln->forward[0];
4862 rangelen++;
4863 if (limit > 0) limit--;
4864 }
4865 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",rangelen);
4866 }
4867 }
4868}
4869
4870static void zcardCommand(redisClient *c) {
4871 robj *o;
4872 zset *zs;
4873
4874 o = lookupKeyRead(c->db,c->argv[1]);
4875 if (o == NULL) {
4876 addReply(c,shared.czero);
4877 return;
4878 } else {
4879 if (o->type != REDIS_ZSET) {
4880 addReply(c,shared.wrongtypeerr);
4881 } else {
4882 zs = o->ptr;
4883 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",zs->zsl->length));
4884 }
4885 }
4886}
4887
4888static void zscoreCommand(redisClient *c) {
4889 robj *o;
4890 zset *zs;
4891
4892 o = lookupKeyRead(c->db,c->argv[1]);
4893 if (o == NULL) {
4894 addReply(c,shared.nullbulk);
4895 return;
4896 } else {
4897 if (o->type != REDIS_ZSET) {
4898 addReply(c,shared.wrongtypeerr);
4899 } else {
4900 dictEntry *de;
4901
4902 zs = o->ptr;
4903 de = dictFind(zs->dict,c->argv[2]);
4904 if (!de) {
4905 addReply(c,shared.nullbulk);
4906 } else {
4907 double *score = dictGetEntryVal(de);
4908
4909 addReplyDouble(c,*score);
4910 }
4911 }
4912 }
4913}
4914
4915/* ========================= Non type-specific commands ==================== */
4916
4917static void flushdbCommand(redisClient *c) {
4918 server.dirty += dictSize(c->db->dict);
4919 dictEmpty(c->db->dict);
4920 dictEmpty(c->db->expires);
4921 addReply(c,shared.ok);
4922}
4923
4924static void flushallCommand(redisClient *c) {
4925 server.dirty += emptyDb();
4926 addReply(c,shared.ok);
4927 rdbSave(server.dbfilename);
4928 server.dirty++;
4929}
4930
4931static redisSortOperation *createSortOperation(int type, robj *pattern) {
4932 redisSortOperation *so = zmalloc(sizeof(*so));
4933 so->type = type;
4934 so->pattern = pattern;
4935 return so;
4936}
4937
4938/* Return the value associated to the key with a name obtained
4939 * substituting the first occurence of '*' in 'pattern' with 'subst' */
4940static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
4941 char *p;
4942 sds spat, ssub;
4943 robj keyobj;
4944 int prefixlen, sublen, postfixlen;
4945 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
4946 struct {
4947 long len;
4948 long free;
4949 char buf[REDIS_SORTKEY_MAX+1];
4950 } keyname;
4951
4952 /* If the pattern is "#" return the substitution object itself in order
4953 * to implement the "SORT ... GET #" feature. */
4954 spat = pattern->ptr;
4955 if (spat[0] == '#' && spat[1] == '\0') {
4956 return subst;
4957 }
4958
4959 /* The substitution object may be specially encoded. If so we create
4960 * a decoded object on the fly. Otherwise getDecodedObject will just
4961 * increment the ref count, that we'll decrement later. */
4962 subst = getDecodedObject(subst);
4963
4964 ssub = subst->ptr;
4965 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
4966 p = strchr(spat,'*');
4967 if (!p) {
4968 decrRefCount(subst);
4969 return NULL;
4970 }
4971
4972 prefixlen = p-spat;
4973 sublen = sdslen(ssub);
4974 postfixlen = sdslen(spat)-(prefixlen+1);
4975 memcpy(keyname.buf,spat,prefixlen);
4976 memcpy(keyname.buf+prefixlen,ssub,sublen);
4977 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
4978 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
4979 keyname.len = prefixlen+sublen+postfixlen;
4980
4981 initStaticStringObject(keyobj,((char*)&keyname)+(sizeof(long)*2))
4982 decrRefCount(subst);
4983
4984 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
4985 return lookupKeyRead(db,&keyobj);
4986}
4987
4988/* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
4989 * the additional parameter is not standard but a BSD-specific we have to
4990 * pass sorting parameters via the global 'server' structure */
4991static int sortCompare(const void *s1, const void *s2) {
4992 const redisSortObject *so1 = s1, *so2 = s2;
4993 int cmp;
4994
4995 if (!server.sort_alpha) {
4996 /* Numeric sorting. Here it's trivial as we precomputed scores */
4997 if (so1->u.score > so2->u.score) {
4998 cmp = 1;
4999 } else if (so1->u.score < so2->u.score) {
5000 cmp = -1;
5001 } else {
5002 cmp = 0;
5003 }
5004 } else {
5005 /* Alphanumeric sorting */
5006 if (server.sort_bypattern) {
5007 if (!so1->u.cmpobj || !so2->u.cmpobj) {
5008 /* At least one compare object is NULL */
5009 if (so1->u.cmpobj == so2->u.cmpobj)
5010 cmp = 0;
5011 else if (so1->u.cmpobj == NULL)
5012 cmp = -1;
5013 else
5014 cmp = 1;
5015 } else {
5016 /* We have both the objects, use strcoll */
5017 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
5018 }
5019 } else {
5020 /* Compare elements directly */
5021 robj *dec1, *dec2;
5022
5023 dec1 = getDecodedObject(so1->obj);
5024 dec2 = getDecodedObject(so2->obj);
5025 cmp = strcoll(dec1->ptr,dec2->ptr);
5026 decrRefCount(dec1);
5027 decrRefCount(dec2);
5028 }
5029 }
5030 return server.sort_desc ? -cmp : cmp;
5031}
5032
5033/* The SORT command is the most complex command in Redis. Warning: this code
5034 * is optimized for speed and a bit less for readability */
5035static void sortCommand(redisClient *c) {
5036 list *operations;
5037 int outputlen = 0;
5038 int desc = 0, alpha = 0;
5039 int limit_start = 0, limit_count = -1, start, end;
5040 int j, dontsort = 0, vectorlen;
5041 int getop = 0; /* GET operation counter */
5042 robj *sortval, *sortby = NULL, *storekey = NULL;
5043 redisSortObject *vector; /* Resulting vector to sort */
5044
5045 /* Lookup the key to sort. It must be of the right types */
5046 sortval = lookupKeyRead(c->db,c->argv[1]);
5047 if (sortval == NULL) {
5048 addReply(c,shared.nullmultibulk);
5049 return;
5050 }
5051 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST &&
5052 sortval->type != REDIS_ZSET)
5053 {
5054 addReply(c,shared.wrongtypeerr);
5055 return;
5056 }
5057
5058 /* Create a list of operations to perform for every sorted element.
5059 * Operations can be GET/DEL/INCR/DECR */
5060 operations = listCreate();
5061 listSetFreeMethod(operations,zfree);
5062 j = 2;
5063
5064 /* Now we need to protect sortval incrementing its count, in the future
5065 * SORT may have options able to overwrite/delete keys during the sorting
5066 * and the sorted key itself may get destroied */
5067 incrRefCount(sortval);
5068
5069 /* The SORT command has an SQL-alike syntax, parse it */
5070 while(j < c->argc) {
5071 int leftargs = c->argc-j-1;
5072 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
5073 desc = 0;
5074 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
5075 desc = 1;
5076 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
5077 alpha = 1;
5078 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
5079 limit_start = atoi(c->argv[j+1]->ptr);
5080 limit_count = atoi(c->argv[j+2]->ptr);
5081 j+=2;
5082 } else if (!strcasecmp(c->argv[j]->ptr,"store") && leftargs >= 1) {
5083 storekey = c->argv[j+1];
5084 j++;
5085 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
5086 sortby = c->argv[j+1];
5087 /* If the BY pattern does not contain '*', i.e. it is constant,
5088 * we don't need to sort nor to lookup the weight keys. */
5089 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
5090 j++;
5091 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
5092 listAddNodeTail(operations,createSortOperation(
5093 REDIS_SORT_GET,c->argv[j+1]));
5094 getop++;
5095 j++;
5096 } else {
5097 decrRefCount(sortval);
5098 listRelease(operations);
5099 addReply(c,shared.syntaxerr);
5100 return;
5101 }
5102 j++;
5103 }
5104
5105 /* Load the sorting vector with all the objects to sort */
5106 switch(sortval->type) {
5107 case REDIS_LIST: vectorlen = listLength((list*)sortval->ptr); break;
5108 case REDIS_SET: vectorlen = dictSize((dict*)sortval->ptr); break;
5109 case REDIS_ZSET: vectorlen = dictSize(((zset*)sortval->ptr)->dict); break;
5110 default: vectorlen = 0; redisAssert(0); /* Avoid GCC warning */
5111 }
5112 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
5113 j = 0;
5114
5115 if (sortval->type == REDIS_LIST) {
5116 list *list = sortval->ptr;
5117 listNode *ln;
5118
5119 listRewind(list);
5120 while((ln = listYield(list))) {
5121 robj *ele = ln->value;
5122 vector[j].obj = ele;
5123 vector[j].u.score = 0;
5124 vector[j].u.cmpobj = NULL;
5125 j++;
5126 }
5127 } else {
5128 dict *set;
5129 dictIterator *di;
5130 dictEntry *setele;
5131
5132 if (sortval->type == REDIS_SET) {
5133 set = sortval->ptr;
5134 } else {
5135 zset *zs = sortval->ptr;
5136 set = zs->dict;
5137 }
5138
5139 di = dictGetIterator(set);
5140 while((setele = dictNext(di)) != NULL) {
5141 vector[j].obj = dictGetEntryKey(setele);
5142 vector[j].u.score = 0;
5143 vector[j].u.cmpobj = NULL;
5144 j++;
5145 }
5146 dictReleaseIterator(di);
5147 }
5148 redisAssert(j == vectorlen);
5149
5150 /* Now it's time to load the right scores in the sorting vector */
5151 if (dontsort == 0) {
5152 for (j = 0; j < vectorlen; j++) {
5153 if (sortby) {
5154 robj *byval;
5155
5156 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
5157 if (!byval || byval->type != REDIS_STRING) continue;
5158 if (alpha) {
5159 vector[j].u.cmpobj = getDecodedObject(byval);
5160 } else {
5161 if (byval->encoding == REDIS_ENCODING_RAW) {
5162 vector[j].u.score = strtod(byval->ptr,NULL);
5163 } else {
5164 /* Don't need to decode the object if it's
5165 * integer-encoded (the only encoding supported) so
5166 * far. We can just cast it */
5167 if (byval->encoding == REDIS_ENCODING_INT) {
5168 vector[j].u.score = (long)byval->ptr;
5169 } else
5170 redisAssert(1 != 1);
5171 }
5172 }
5173 } else {
5174 if (!alpha) {
5175 if (vector[j].obj->encoding == REDIS_ENCODING_RAW)
5176 vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
5177 else {
5178 if (vector[j].obj->encoding == REDIS_ENCODING_INT)
5179 vector[j].u.score = (long) vector[j].obj->ptr;
5180 else
5181 redisAssert(1 != 1);
5182 }
5183 }
5184 }
5185 }
5186 }
5187
5188 /* We are ready to sort the vector... perform a bit of sanity check
5189 * on the LIMIT option too. We'll use a partial version of quicksort. */
5190 start = (limit_start < 0) ? 0 : limit_start;
5191 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
5192 if (start >= vectorlen) {
5193 start = vectorlen-1;
5194 end = vectorlen-2;
5195 }
5196 if (end >= vectorlen) end = vectorlen-1;
5197
5198 if (dontsort == 0) {
5199 server.sort_desc = desc;
5200 server.sort_alpha = alpha;
5201 server.sort_bypattern = sortby ? 1 : 0;
5202 if (sortby && (start != 0 || end != vectorlen-1))
5203 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
5204 else
5205 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
5206 }
5207
5208 /* Send command output to the output buffer, performing the specified
5209 * GET/DEL/INCR/DECR operations if any. */
5210 outputlen = getop ? getop*(end-start+1) : end-start+1;
5211 if (storekey == NULL) {
5212 /* STORE option not specified, sent the sorting result to client */
5213 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
5214 for (j = start; j <= end; j++) {
5215 listNode *ln;
5216 if (!getop) {
5217 addReplyBulkLen(c,vector[j].obj);
5218 addReply(c,vector[j].obj);
5219 addReply(c,shared.crlf);
5220 }
5221 listRewind(operations);
5222 while((ln = listYield(operations))) {
5223 redisSortOperation *sop = ln->value;
5224 robj *val = lookupKeyByPattern(c->db,sop->pattern,
5225 vector[j].obj);
5226
5227 if (sop->type == REDIS_SORT_GET) {
5228 if (!val || val->type != REDIS_STRING) {
5229 addReply(c,shared.nullbulk);
5230 } else {
5231 addReplyBulkLen(c,val);
5232 addReply(c,val);
5233 addReply(c,shared.crlf);
5234 }
5235 } else {
5236 redisAssert(sop->type == REDIS_SORT_GET); /* always fails */
5237 }
5238 }
5239 }
5240 } else {
5241 robj *listObject = createListObject();
5242 list *listPtr = (list*) listObject->ptr;
5243
5244 /* STORE option specified, set the sorting result as a List object */
5245 for (j = start; j <= end; j++) {
5246 listNode *ln;
5247 if (!getop) {
5248 listAddNodeTail(listPtr,vector[j].obj);
5249 incrRefCount(vector[j].obj);
5250 }
5251 listRewind(operations);
5252 while((ln = listYield(operations))) {
5253 redisSortOperation *sop = ln->value;
5254 robj *val = lookupKeyByPattern(c->db,sop->pattern,
5255 vector[j].obj);
5256
5257 if (sop->type == REDIS_SORT_GET) {
5258 if (!val || val->type != REDIS_STRING) {
5259 listAddNodeTail(listPtr,createStringObject("",0));
5260 } else {
5261 listAddNodeTail(listPtr,val);
5262 incrRefCount(val);
5263 }
5264 } else {
5265 redisAssert(sop->type == REDIS_SORT_GET); /* always fails */
5266 }
5267 }
5268 }
5269 if (dictReplace(c->db->dict,storekey,listObject)) {
5270 incrRefCount(storekey);
5271 }
5272 /* Note: we add 1 because the DB is dirty anyway since even if the
5273 * SORT result is empty a new key is set and maybe the old content
5274 * replaced. */
5275 server.dirty += 1+outputlen;
5276 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",outputlen));
5277 }
5278
5279 /* Cleanup */
5280 decrRefCount(sortval);
5281 listRelease(operations);
5282 for (j = 0; j < vectorlen; j++) {
5283 if (sortby && alpha && vector[j].u.cmpobj)
5284 decrRefCount(vector[j].u.cmpobj);
5285 }
5286 zfree(vector);
5287}
5288
5289/* Create the string returned by the INFO command. This is decoupled
5290 * by the INFO command itself as we need to report the same information
5291 * on memory corruption problems. */
5292static sds genRedisInfoString(void) {
5293 sds info;
5294 time_t uptime = time(NULL)-server.stat_starttime;
5295 int j;
5296
5297 info = sdscatprintf(sdsempty(),
5298 "redis_version:%s\r\n"
5299 "arch_bits:%s\r\n"
5300 "multiplexing_api:%s\r\n"
5301 "uptime_in_seconds:%ld\r\n"
5302 "uptime_in_days:%ld\r\n"
5303 "connected_clients:%d\r\n"
5304 "connected_slaves:%d\r\n"
5305 "blocked_clients:%d\r\n"
5306 "used_memory:%zu\r\n"
5307 "changes_since_last_save:%lld\r\n"
5308 "bgsave_in_progress:%d\r\n"
5309 "last_save_time:%ld\r\n"
5310 "bgrewriteaof_in_progress:%d\r\n"
5311 "total_connections_received:%lld\r\n"
5312 "total_commands_processed:%lld\r\n"
5313 "role:%s\r\n"
5314 ,REDIS_VERSION,
5315 (sizeof(long) == 8) ? "64" : "32",
5316 aeGetApiName(),
5317 uptime,
5318 uptime/(3600*24),
5319 listLength(server.clients)-listLength(server.slaves),
5320 listLength(server.slaves),
5321 server.blockedclients,
5322 server.usedmemory,
5323 server.dirty,
5324 server.bgsavechildpid != -1,
5325 server.lastsave,
5326 server.bgrewritechildpid != -1,
5327 server.stat_numconnections,
5328 server.stat_numcommands,
5329 server.masterhost == NULL ? "master" : "slave"
5330 );
5331 if (server.masterhost) {
5332 info = sdscatprintf(info,
5333 "master_host:%s\r\n"
5334 "master_port:%d\r\n"
5335 "master_link_status:%s\r\n"
5336 "master_last_io_seconds_ago:%d\r\n"
5337 ,server.masterhost,
5338 server.masterport,
5339 (server.replstate == REDIS_REPL_CONNECTED) ?
5340 "up" : "down",
5341 server.master ? ((int)(time(NULL)-server.master->lastinteraction)) : -1
5342 );
5343 }
5344 for (j = 0; j < server.dbnum; j++) {
5345 long long keys, vkeys;
5346
5347 keys = dictSize(server.db[j].dict);
5348 vkeys = dictSize(server.db[j].expires);
5349 if (keys || vkeys) {
5350 info = sdscatprintf(info, "db%d:keys=%lld,expires=%lld\r\n",
5351 j, keys, vkeys);
5352 }
5353 }
5354 return info;
5355}
5356
5357static void infoCommand(redisClient *c) {
5358 sds info = genRedisInfoString();
5359 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",
5360 (unsigned long)sdslen(info)));
5361 addReplySds(c,info);
5362 addReply(c,shared.crlf);
5363}
5364
5365static void monitorCommand(redisClient *c) {
5366 /* ignore MONITOR if aleady slave or in monitor mode */
5367 if (c->flags & REDIS_SLAVE) return;
5368
5369 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
5370 c->slaveseldb = 0;
5371 listAddNodeTail(server.monitors,c);
5372 addReply(c,shared.ok);
5373}
5374
5375/* ================================= Expire ================================= */
5376static int removeExpire(redisDb *db, robj *key) {
5377 if (dictDelete(db->expires,key) == DICT_OK) {
5378 return 1;
5379 } else {
5380 return 0;
5381 }
5382}
5383
5384static int setExpire(redisDb *db, robj *key, time_t when) {
5385 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
5386 return 0;
5387 } else {
5388 incrRefCount(key);
5389 return 1;
5390 }
5391}
5392
5393/* Return the expire time of the specified key, or -1 if no expire
5394 * is associated with this key (i.e. the key is non volatile) */
5395static time_t getExpire(redisDb *db, robj *key) {
5396 dictEntry *de;
5397
5398 /* No expire? return ASAP */
5399 if (dictSize(db->expires) == 0 ||
5400 (de = dictFind(db->expires,key)) == NULL) return -1;
5401
5402 return (time_t) dictGetEntryVal(de);
5403}
5404
5405static int expireIfNeeded(redisDb *db, robj *key) {
5406 time_t when;
5407 dictEntry *de;
5408
5409 /* No expire? return ASAP */
5410 if (dictSize(db->expires) == 0 ||
5411 (de = dictFind(db->expires,key)) == NULL) return 0;
5412
5413 /* Lookup the expire */
5414 when = (time_t) dictGetEntryVal(de);
5415 if (time(NULL) <= when) return 0;
5416
5417 /* Delete the key */
5418 dictDelete(db->expires,key);
5419 return dictDelete(db->dict,key) == DICT_OK;
5420}
5421
5422static int deleteIfVolatile(redisDb *db, robj *key) {
5423 dictEntry *de;
5424
5425 /* No expire? return ASAP */
5426 if (dictSize(db->expires) == 0 ||
5427 (de = dictFind(db->expires,key)) == NULL) return 0;
5428
5429 /* Delete the key */
5430 server.dirty++;
5431 dictDelete(db->expires,key);
5432 return dictDelete(db->dict,key) == DICT_OK;
5433}
5434
5435static void expireGenericCommand(redisClient *c, robj *key, time_t seconds) {
5436 dictEntry *de;
5437
5438 de = dictFind(c->db->dict,key);
5439 if (de == NULL) {
5440 addReply(c,shared.czero);
5441 return;
5442 }
5443 if (seconds < 0) {
5444 if (deleteKey(c->db,key)) server.dirty++;
5445 addReply(c, shared.cone);
5446 return;
5447 } else {
5448 time_t when = time(NULL)+seconds;
5449 if (setExpire(c->db,key,when)) {
5450 addReply(c,shared.cone);
5451 server.dirty++;
5452 } else {
5453 addReply(c,shared.czero);
5454 }
5455 return;
5456 }
5457}
5458
5459static void expireCommand(redisClient *c) {
5460 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10));
5461}
5462
5463static void expireatCommand(redisClient *c) {
5464 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10)-time(NULL));
5465}
5466
5467static void ttlCommand(redisClient *c) {
5468 time_t expire;
5469 int ttl = -1;
5470
5471 expire = getExpire(c->db,c->argv[1]);
5472 if (expire != -1) {
5473 ttl = (int) (expire-time(NULL));
5474 if (ttl < 0) ttl = -1;
5475 }
5476 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
5477}
5478
5479/* ================================ MULTI/EXEC ============================== */
5480
5481/* Client state initialization for MULTI/EXEC */
5482static void initClientMultiState(redisClient *c) {
5483 c->mstate.commands = NULL;
5484 c->mstate.count = 0;
5485}
5486
5487/* Release all the resources associated with MULTI/EXEC state */
5488static void freeClientMultiState(redisClient *c) {
5489 int j;
5490
5491 for (j = 0; j < c->mstate.count; j++) {
5492 int i;
5493 multiCmd *mc = c->mstate.commands+j;
5494
5495 for (i = 0; i < mc->argc; i++)
5496 decrRefCount(mc->argv[i]);
5497 zfree(mc->argv);
5498 }
5499 zfree(c->mstate.commands);
5500}
5501
5502/* Add a new command into the MULTI commands queue */
5503static void queueMultiCommand(redisClient *c, struct redisCommand *cmd) {
5504 multiCmd *mc;
5505 int j;
5506
5507 c->mstate.commands = zrealloc(c->mstate.commands,
5508 sizeof(multiCmd)*(c->mstate.count+1));
5509 mc = c->mstate.commands+c->mstate.count;
5510 mc->cmd = cmd;
5511 mc->argc = c->argc;
5512 mc->argv = zmalloc(sizeof(robj*)*c->argc);
5513 memcpy(mc->argv,c->argv,sizeof(robj*)*c->argc);
5514 for (j = 0; j < c->argc; j++)
5515 incrRefCount(mc->argv[j]);
5516 c->mstate.count++;
5517}
5518
5519static void multiCommand(redisClient *c) {
5520 c->flags |= REDIS_MULTI;
5521 addReply(c,shared.ok);
5522}
5523
5524static void execCommand(redisClient *c) {
5525 int j;
5526 robj **orig_argv;
5527 int orig_argc;
5528
5529 if (!(c->flags & REDIS_MULTI)) {
5530 addReplySds(c,sdsnew("-ERR EXEC without MULTI\r\n"));
5531 return;
5532 }
5533
5534 orig_argv = c->argv;
5535 orig_argc = c->argc;
5536 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->mstate.count));
5537 for (j = 0; j < c->mstate.count; j++) {
5538 c->argc = c->mstate.commands[j].argc;
5539 c->argv = c->mstate.commands[j].argv;
5540 call(c,c->mstate.commands[j].cmd);
5541 }
5542 c->argv = orig_argv;
5543 c->argc = orig_argc;
5544 freeClientMultiState(c);
5545 initClientMultiState(c);
5546 c->flags &= (~REDIS_MULTI);
5547}
5548
5549/* =========================== Blocking Operations ========================= */
5550
5551/* Currently Redis blocking operations support is limited to list POP ops,
5552 * so the current implementation is not fully generic, but it is also not
5553 * completely specific so it will not require a rewrite to support new
5554 * kind of blocking operations in the future.
5555 *
5556 * Still it's important to note that list blocking operations can be already
5557 * used as a notification mechanism in order to implement other blocking
5558 * operations at application level, so there must be a very strong evidence
5559 * of usefulness and generality before new blocking operations are implemented.
5560 *
5561 * This is how the current blocking POP works, we use BLPOP as example:
5562 * - If the user calls BLPOP and the key exists and contains a non empty list
5563 * then LPOP is called instead. So BLPOP is semantically the same as LPOP
5564 * if there is not to block.
5565 * - If instead BLPOP is called and the key does not exists or the list is
5566 * empty we need to block. In order to do so we remove the notification for
5567 * new data to read in the client socket (so that we'll not serve new
5568 * requests if the blocking request is not served). Also we put the client
5569 * in a dictionary (db->blockingkeys) mapping keys to a list of clients
5570 * blocking for this keys.
5571 * - If a PUSH operation against a key with blocked clients waiting is
5572 * performed, we serve the first in the list: basically instead to push
5573 * the new element inside the list we return it to the (first / oldest)
5574 * blocking client, unblock the client, and remove it form the list.
5575 *
5576 * The above comment and the source code should be enough in order to understand
5577 * the implementation and modify / fix it later.
5578 */
5579
5580/* Set a client in blocking mode for the specified key, with the specified
5581 * timeout */
5582static void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout) {
5583 dictEntry *de;
5584 list *l;
5585 int j;
5586
5587 c->blockingkeys = zmalloc(sizeof(robj*)*numkeys);
5588 c->blockingkeysnum = numkeys;
5589 c->blockingto = timeout;
5590 for (j = 0; j < numkeys; j++) {
5591 /* Add the key in the client structure, to map clients -> keys */
5592 c->blockingkeys[j] = keys[j];
5593 incrRefCount(keys[j]);
5594
5595 /* And in the other "side", to map keys -> clients */
5596 de = dictFind(c->db->blockingkeys,keys[j]);
5597 if (de == NULL) {
5598 int retval;
5599
5600 /* For every key we take a list of clients blocked for it */
5601 l = listCreate();
5602 retval = dictAdd(c->db->blockingkeys,keys[j],l);
5603 incrRefCount(keys[j]);
5604 assert(retval == DICT_OK);
5605 } else {
5606 l = dictGetEntryVal(de);
5607 }
5608 listAddNodeTail(l,c);
5609 }
5610 /* Mark the client as a blocked client */
5611 c->flags |= REDIS_BLOCKED;
5612 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
5613 server.blockedclients++;
5614}
5615
5616/* Unblock a client that's waiting in a blocking operation such as BLPOP */
5617static void unblockClient(redisClient *c) {
5618 dictEntry *de;
5619 list *l;
5620 int j;
5621
5622 assert(c->blockingkeys != NULL);
5623 /* The client may wait for multiple keys, so unblock it for every key. */
5624 for (j = 0; j < c->blockingkeysnum; j++) {
5625 /* Remove this client from the list of clients waiting for this key. */
5626 de = dictFind(c->db->blockingkeys,c->blockingkeys[j]);
5627 assert(de != NULL);
5628 l = dictGetEntryVal(de);
5629 listDelNode(l,listSearchKey(l,c));
5630 /* If the list is empty we need to remove it to avoid wasting memory */
5631 if (listLength(l) == 0)
5632 dictDelete(c->db->blockingkeys,c->blockingkeys[j]);
5633 decrRefCount(c->blockingkeys[j]);
5634 }
5635 /* Cleanup the client structure */
5636 zfree(c->blockingkeys);
5637 c->blockingkeys = NULL;
5638 c->flags &= (~REDIS_BLOCKED);
5639 server.blockedclients--;
5640 /* Ok now we are ready to get read events from socket, note that we
5641 * can't trap errors here as it's possible that unblockClients() is
5642 * called from freeClient() itself, and the only thing we can do
5643 * if we failed to register the READABLE event is to kill the client.
5644 * Still the following function should never fail in the real world as
5645 * we are sure the file descriptor is sane, and we exit on out of mem. */
5646 aeCreateFileEvent(server.el, c->fd, AE_READABLE, readQueryFromClient, c);
5647 /* As a final step we want to process data if there is some command waiting
5648 * in the input buffer. Note that this is safe even if unblockClient()
5649 * gets called from freeClient() because freeClient() will be smart
5650 * enough to call this function *after* c->querybuf was set to NULL. */
5651 if (c->querybuf && sdslen(c->querybuf) > 0) processInputBuffer(c);
5652}
5653
5654/* This should be called from any function PUSHing into lists.
5655 * 'c' is the "pushing client", 'key' is the key it is pushing data against,
5656 * 'ele' is the element pushed.
5657 *
5658 * If the function returns 0 there was no client waiting for a list push
5659 * against this key.
5660 *
5661 * If the function returns 1 there was a client waiting for a list push
5662 * against this key, the element was passed to this client thus it's not
5663 * needed to actually add it to the list and the caller should return asap. */
5664static int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
5665 struct dictEntry *de;
5666 redisClient *receiver;
5667 list *l;
5668 listNode *ln;
5669
5670 de = dictFind(c->db->blockingkeys,key);
5671 if (de == NULL) return 0;
5672 l = dictGetEntryVal(de);
5673 ln = listFirst(l);
5674 assert(ln != NULL);
5675 receiver = ln->value;
5676
5677 addReplySds(receiver,sdsnew("*2\r\n"));
5678 addReplyBulkLen(receiver,key);
5679 addReply(receiver,key);
5680 addReply(receiver,shared.crlf);
5681 addReplyBulkLen(receiver,ele);
5682 addReply(receiver,ele);
5683 addReply(receiver,shared.crlf);
5684 unblockClient(receiver);
5685 return 1;
5686}
5687
5688/* Blocking RPOP/LPOP */
5689static void blockingPopGenericCommand(redisClient *c, int where) {
5690 robj *o;
5691 time_t timeout;
5692 int j;
5693
5694 for (j = 1; j < c->argc-1; j++) {
5695 o = lookupKeyWrite(c->db,c->argv[j]);
5696 if (o != NULL) {
5697 if (o->type != REDIS_LIST) {
5698 addReply(c,shared.wrongtypeerr);
5699 return;
5700 } else {
5701 list *list = o->ptr;
5702 if (listLength(list) != 0) {
5703 /* If the list contains elements fall back to the usual
5704 * non-blocking POP operation */
5705 robj *argv[2], **orig_argv;
5706 int orig_argc;
5707
5708 /* We need to alter the command arguments before to call
5709 * popGenericCommand() as the command takes a single key. */
5710 orig_argv = c->argv;
5711 orig_argc = c->argc;
5712 argv[1] = c->argv[j];
5713 c->argv = argv;
5714 c->argc = 2;
5715
5716 /* Also the return value is different, we need to output
5717 * the multi bulk reply header and the key name. The
5718 * "real" command will add the last element (the value)
5719 * for us. If this souds like an hack to you it's just
5720 * because it is... */
5721 addReplySds(c,sdsnew("*2\r\n"));
5722 addReplyBulkLen(c,argv[1]);
5723 addReply(c,argv[1]);
5724 addReply(c,shared.crlf);
5725 popGenericCommand(c,where);
5726
5727 /* Fix the client structure with the original stuff */
5728 c->argv = orig_argv;
5729 c->argc = orig_argc;
5730 return;
5731 }
5732 }
5733 }
5734 }
5735 /* If the list is empty or the key does not exists we must block */
5736 timeout = strtol(c->argv[c->argc-1]->ptr,NULL,10);
5737 if (timeout > 0) timeout += time(NULL);
5738 blockForKeys(c,c->argv+1,c->argc-2,timeout);
5739}
5740
5741static void blpopCommand(redisClient *c) {
5742 blockingPopGenericCommand(c,REDIS_HEAD);
5743}
5744
5745static void brpopCommand(redisClient *c) {
5746 blockingPopGenericCommand(c,REDIS_TAIL);
5747}
5748
5749/* =============================== Replication ============================= */
5750
5751static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
5752 ssize_t nwritten, ret = size;
5753 time_t start = time(NULL);
5754
5755 timeout++;
5756 while(size) {
5757 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
5758 nwritten = write(fd,ptr,size);
5759 if (nwritten == -1) return -1;
5760 ptr += nwritten;
5761 size -= nwritten;
5762 }
5763 if ((time(NULL)-start) > timeout) {
5764 errno = ETIMEDOUT;
5765 return -1;
5766 }
5767 }
5768 return ret;
5769}
5770
5771static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
5772 ssize_t nread, totread = 0;
5773 time_t start = time(NULL);
5774
5775 timeout++;
5776 while(size) {
5777 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
5778 nread = read(fd,ptr,size);
5779 if (nread == -1) return -1;
5780 ptr += nread;
5781 size -= nread;
5782 totread += nread;
5783 }
5784 if ((time(NULL)-start) > timeout) {
5785 errno = ETIMEDOUT;
5786 return -1;
5787 }
5788 }
5789 return totread;
5790}
5791
5792static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
5793 ssize_t nread = 0;
5794
5795 size--;
5796 while(size) {
5797 char c;
5798
5799 if (syncRead(fd,&c,1,timeout) == -1) return -1;
5800 if (c == '\n') {
5801 *ptr = '\0';
5802 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
5803 return nread;
5804 } else {
5805 *ptr++ = c;
5806 *ptr = '\0';
5807 nread++;
5808 }
5809 }
5810 return nread;
5811}
5812
5813static void syncCommand(redisClient *c) {
5814 /* ignore SYNC if aleady slave or in monitor mode */
5815 if (c->flags & REDIS_SLAVE) return;
5816
5817 /* SYNC can't be issued when the server has pending data to send to
5818 * the client about already issued commands. We need a fresh reply
5819 * buffer registering the differences between the BGSAVE and the current
5820 * dataset, so that we can copy to other slaves if needed. */
5821 if (listLength(c->reply) != 0) {
5822 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
5823 return;
5824 }
5825
5826 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
5827 /* Here we need to check if there is a background saving operation
5828 * in progress, or if it is required to start one */
5829 if (server.bgsavechildpid != -1) {
5830 /* Ok a background save is in progress. Let's check if it is a good
5831 * one for replication, i.e. if there is another slave that is
5832 * registering differences since the server forked to save */
5833 redisClient *slave;
5834 listNode *ln;
5835
5836 listRewind(server.slaves);
5837 while((ln = listYield(server.slaves))) {
5838 slave = ln->value;
5839 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
5840 }
5841 if (ln) {
5842 /* Perfect, the server is already registering differences for
5843 * another slave. Set the right state, and copy the buffer. */
5844 listRelease(c->reply);
5845 c->reply = listDup(slave->reply);
5846 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5847 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
5848 } else {
5849 /* No way, we need to wait for the next BGSAVE in order to
5850 * register differences */
5851 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
5852 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
5853 }
5854 } else {
5855 /* Ok we don't have a BGSAVE in progress, let's start one */
5856 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
5857 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5858 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
5859 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
5860 return;
5861 }
5862 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5863 }
5864 c->repldbfd = -1;
5865 c->flags |= REDIS_SLAVE;
5866 c->slaveseldb = 0;
5867 listAddNodeTail(server.slaves,c);
5868 return;
5869}
5870
5871static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
5872 redisClient *slave = privdata;
5873 REDIS_NOTUSED(el);
5874 REDIS_NOTUSED(mask);
5875 char buf[REDIS_IOBUF_LEN];
5876 ssize_t nwritten, buflen;
5877
5878 if (slave->repldboff == 0) {
5879 /* Write the bulk write count before to transfer the DB. In theory here
5880 * we don't know how much room there is in the output buffer of the
5881 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
5882 * operations) will never be smaller than the few bytes we need. */
5883 sds bulkcount;
5884
5885 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
5886 slave->repldbsize);
5887 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
5888 {
5889 sdsfree(bulkcount);
5890 freeClient(slave);
5891 return;
5892 }
5893 sdsfree(bulkcount);
5894 }
5895 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
5896 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
5897 if (buflen <= 0) {
5898 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
5899 (buflen == 0) ? "premature EOF" : strerror(errno));
5900 freeClient(slave);
5901 return;
5902 }
5903 if ((nwritten = write(fd,buf,buflen)) == -1) {
5904 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
5905 strerror(errno));
5906 freeClient(slave);
5907 return;
5908 }
5909 slave->repldboff += nwritten;
5910 if (slave->repldboff == slave->repldbsize) {
5911 close(slave->repldbfd);
5912 slave->repldbfd = -1;
5913 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5914 slave->replstate = REDIS_REPL_ONLINE;
5915 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
5916 sendReplyToClient, slave) == AE_ERR) {
5917 freeClient(slave);
5918 return;
5919 }
5920 addReplySds(slave,sdsempty());
5921 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
5922 }
5923}
5924
5925/* This function is called at the end of every backgrond saving.
5926 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
5927 * otherwise REDIS_ERR is passed to the function.
5928 *
5929 * The goal of this function is to handle slaves waiting for a successful
5930 * background saving in order to perform non-blocking synchronization. */
5931static void updateSlavesWaitingBgsave(int bgsaveerr) {
5932 listNode *ln;
5933 int startbgsave = 0;
5934
5935 listRewind(server.slaves);
5936 while((ln = listYield(server.slaves))) {
5937 redisClient *slave = ln->value;
5938
5939 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
5940 startbgsave = 1;
5941 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5942 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
5943 struct redis_stat buf;
5944
5945 if (bgsaveerr != REDIS_OK) {
5946 freeClient(slave);
5947 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
5948 continue;
5949 }
5950 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
5951 redis_fstat(slave->repldbfd,&buf) == -1) {
5952 freeClient(slave);
5953 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
5954 continue;
5955 }
5956 slave->repldboff = 0;
5957 slave->repldbsize = buf.st_size;
5958 slave->replstate = REDIS_REPL_SEND_BULK;
5959 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5960 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
5961 freeClient(slave);
5962 continue;
5963 }
5964 }
5965 }
5966 if (startbgsave) {
5967 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5968 listRewind(server.slaves);
5969 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
5970 while((ln = listYield(server.slaves))) {
5971 redisClient *slave = ln->value;
5972
5973 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
5974 freeClient(slave);
5975 }
5976 }
5977 }
5978}
5979
5980static int syncWithMaster(void) {
5981 char buf[1024], tmpfile[256], authcmd[1024];
5982 int dumpsize;
5983 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
5984 int dfd;
5985
5986 if (fd == -1) {
5987 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
5988 strerror(errno));
5989 return REDIS_ERR;
5990 }
5991
5992 /* AUTH with the master if required. */
5993 if(server.masterauth) {
5994 snprintf(authcmd, 1024, "AUTH %s\r\n", server.masterauth);
5995 if (syncWrite(fd, authcmd, strlen(server.masterauth)+7, 5) == -1) {
5996 close(fd);
5997 redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",
5998 strerror(errno));
5999 return REDIS_ERR;
6000 }
6001 /* Read the AUTH result. */
6002 if (syncReadLine(fd,buf,1024,3600) == -1) {
6003 close(fd);
6004 redisLog(REDIS_WARNING,"I/O error reading auth result from MASTER: %s",
6005 strerror(errno));
6006 return REDIS_ERR;
6007 }
6008 if (buf[0] != '+') {
6009 close(fd);
6010 redisLog(REDIS_WARNING,"Cannot AUTH to MASTER, is the masterauth password correct?");
6011 return REDIS_ERR;
6012 }
6013 }
6014
6015 /* Issue the SYNC command */
6016 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
6017 close(fd);
6018 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
6019 strerror(errno));
6020 return REDIS_ERR;
6021 }
6022 /* Read the bulk write count */
6023 if (syncReadLine(fd,buf,1024,3600) == -1) {
6024 close(fd);
6025 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
6026 strerror(errno));
6027 return REDIS_ERR;
6028 }
6029 if (buf[0] != '$') {
6030 close(fd);
6031 redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
6032 return REDIS_ERR;
6033 }
6034 dumpsize = atoi(buf+1);
6035 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
6036 /* Read the bulk write data on a temp file */
6037 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
6038 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
6039 if (dfd == -1) {
6040 close(fd);
6041 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
6042 return REDIS_ERR;
6043 }
6044 while(dumpsize) {
6045 int nread, nwritten;
6046
6047 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
6048 if (nread == -1) {
6049 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
6050 strerror(errno));
6051 close(fd);
6052 close(dfd);
6053 return REDIS_ERR;
6054 }
6055 nwritten = write(dfd,buf,nread);
6056 if (nwritten == -1) {
6057 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
6058 close(fd);
6059 close(dfd);
6060 return REDIS_ERR;
6061 }
6062 dumpsize -= nread;
6063 }
6064 close(dfd);
6065 if (rename(tmpfile,server.dbfilename) == -1) {
6066 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
6067 unlink(tmpfile);
6068 close(fd);
6069 return REDIS_ERR;
6070 }
6071 emptyDb();
6072 if (rdbLoad(server.dbfilename) != REDIS_OK) {
6073 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
6074 close(fd);
6075 return REDIS_ERR;
6076 }
6077 server.master = createClient(fd);
6078 server.master->flags |= REDIS_MASTER;
6079 server.master->authenticated = 1;
6080 server.replstate = REDIS_REPL_CONNECTED;
6081 return REDIS_OK;
6082}
6083
6084static void slaveofCommand(redisClient *c) {
6085 if (!strcasecmp(c->argv[1]->ptr,"no") &&
6086 !strcasecmp(c->argv[2]->ptr,"one")) {
6087 if (server.masterhost) {
6088 sdsfree(server.masterhost);
6089 server.masterhost = NULL;
6090 if (server.master) freeClient(server.master);
6091 server.replstate = REDIS_REPL_NONE;
6092 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
6093 }
6094 } else {
6095 sdsfree(server.masterhost);
6096 server.masterhost = sdsdup(c->argv[1]->ptr);
6097 server.masterport = atoi(c->argv[2]->ptr);
6098 if (server.master) freeClient(server.master);
6099 server.replstate = REDIS_REPL_CONNECT;
6100 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
6101 server.masterhost, server.masterport);
6102 }
6103 addReply(c,shared.ok);
6104}
6105
6106/* ============================ Maxmemory directive ======================== */
6107
6108/* This function gets called when 'maxmemory' is set on the config file to limit
6109 * the max memory used by the server, and we are out of memory.
6110 * This function will try to, in order:
6111 *
6112 * - Free objects from the free list
6113 * - Try to remove keys with an EXPIRE set
6114 *
6115 * It is not possible to free enough memory to reach used-memory < maxmemory
6116 * the server will start refusing commands that will enlarge even more the
6117 * memory usage.
6118 */
6119static void freeMemoryIfNeeded(void) {
6120 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
6121 if (listLength(server.objfreelist)) {
6122 robj *o;
6123
6124 listNode *head = listFirst(server.objfreelist);
6125 o = listNodeValue(head);
6126 listDelNode(server.objfreelist,head);
6127 zfree(o);
6128 } else {
6129 int j, k, freed = 0;
6130
6131 for (j = 0; j < server.dbnum; j++) {
6132 int minttl = -1;
6133 robj *minkey = NULL;
6134 struct dictEntry *de;
6135
6136 if (dictSize(server.db[j].expires)) {
6137 freed = 1;
6138 /* From a sample of three keys drop the one nearest to
6139 * the natural expire */
6140 for (k = 0; k < 3; k++) {
6141 time_t t;
6142
6143 de = dictGetRandomKey(server.db[j].expires);
6144 t = (time_t) dictGetEntryVal(de);
6145 if (minttl == -1 || t < minttl) {
6146 minkey = dictGetEntryKey(de);
6147 minttl = t;
6148 }
6149 }
6150 deleteKey(server.db+j,minkey);
6151 }
6152 }
6153 if (!freed) return; /* nothing to free... */
6154 }
6155 }
6156}
6157
6158/* ============================== Append Only file ========================== */
6159
6160static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
6161 sds buf = sdsempty();
6162 int j;
6163 ssize_t nwritten;
6164 time_t now;
6165 robj *tmpargv[3];
6166
6167 /* The DB this command was targetting is not the same as the last command
6168 * we appendend. To issue a SELECT command is needed. */
6169 if (dictid != server.appendseldb) {
6170 char seldb[64];
6171
6172 snprintf(seldb,sizeof(seldb),"%d",dictid);
6173 buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
6174 (unsigned long)strlen(seldb),seldb);
6175 server.appendseldb = dictid;
6176 }
6177
6178 /* "Fix" the argv vector if the command is EXPIRE. We want to translate
6179 * EXPIREs into EXPIREATs calls */
6180 if (cmd->proc == expireCommand) {
6181 long when;
6182
6183 tmpargv[0] = createStringObject("EXPIREAT",8);
6184 tmpargv[1] = argv[1];
6185 incrRefCount(argv[1]);
6186 when = time(NULL)+strtol(argv[2]->ptr,NULL,10);
6187 tmpargv[2] = createObject(REDIS_STRING,
6188 sdscatprintf(sdsempty(),"%ld",when));
6189 argv = tmpargv;
6190 }
6191
6192 /* Append the actual command */
6193 buf = sdscatprintf(buf,"*%d\r\n",argc);
6194 for (j = 0; j < argc; j++) {
6195 robj *o = argv[j];
6196
6197 o = getDecodedObject(o);
6198 buf = sdscatprintf(buf,"$%lu\r\n",(unsigned long)sdslen(o->ptr));
6199 buf = sdscatlen(buf,o->ptr,sdslen(o->ptr));
6200 buf = sdscatlen(buf,"\r\n",2);
6201 decrRefCount(o);
6202 }
6203
6204 /* Free the objects from the modified argv for EXPIREAT */
6205 if (cmd->proc == expireCommand) {
6206 for (j = 0; j < 3; j++)
6207 decrRefCount(argv[j]);
6208 }
6209
6210 /* We want to perform a single write. This should be guaranteed atomic
6211 * at least if the filesystem we are writing is a real physical one.
6212 * While this will save us against the server being killed I don't think
6213 * there is much to do about the whole server stopping for power problems
6214 * or alike */
6215 nwritten = write(server.appendfd,buf,sdslen(buf));
6216 if (nwritten != (signed)sdslen(buf)) {
6217 /* Ooops, we are in troubles. The best thing to do for now is
6218 * to simply exit instead to give the illusion that everything is
6219 * working as expected. */
6220 if (nwritten == -1) {
6221 redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno));
6222 } else {
6223 redisLog(REDIS_WARNING,"Exiting on short write while writing to the append-only file: %s",strerror(errno));
6224 }
6225 exit(1);
6226 }
6227 /* If a background append only file rewriting is in progress we want to
6228 * accumulate the differences between the child DB and the current one
6229 * in a buffer, so that when the child process will do its work we
6230 * can append the differences to the new append only file. */
6231 if (server.bgrewritechildpid != -1)
6232 server.bgrewritebuf = sdscatlen(server.bgrewritebuf,buf,sdslen(buf));
6233
6234 sdsfree(buf);
6235 now = time(NULL);
6236 if (server.appendfsync == APPENDFSYNC_ALWAYS ||
6237 (server.appendfsync == APPENDFSYNC_EVERYSEC &&
6238 now-server.lastfsync > 1))
6239 {
6240 fsync(server.appendfd); /* Let's try to get this data on the disk */
6241 server.lastfsync = now;
6242 }
6243}
6244
6245/* In Redis commands are always executed in the context of a client, so in
6246 * order to load the append only file we need to create a fake client. */
6247static struct redisClient *createFakeClient(void) {
6248 struct redisClient *c = zmalloc(sizeof(*c));
6249
6250 selectDb(c,0);
6251 c->fd = -1;
6252 c->querybuf = sdsempty();
6253 c->argc = 0;
6254 c->argv = NULL;
6255 c->flags = 0;
6256 /* We set the fake client as a slave waiting for the synchronization
6257 * so that Redis will not try to send replies to this client. */
6258 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
6259 c->reply = listCreate();
6260 listSetFreeMethod(c->reply,decrRefCount);
6261 listSetDupMethod(c->reply,dupClientReplyValue);
6262 return c;
6263}
6264
6265static void freeFakeClient(struct redisClient *c) {
6266 sdsfree(c->querybuf);
6267 listRelease(c->reply);
6268 zfree(c);
6269}
6270
6271/* Replay the append log file. On error REDIS_OK is returned. On non fatal
6272 * error (the append only file is zero-length) REDIS_ERR is returned. On
6273 * fatal error an error message is logged and the program exists. */
6274int loadAppendOnlyFile(char *filename) {
6275 struct redisClient *fakeClient;
6276 FILE *fp = fopen(filename,"r");
6277 struct redis_stat sb;
6278
6279 if (redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0)
6280 return REDIS_ERR;
6281
6282 if (fp == NULL) {
6283 redisLog(REDIS_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
6284 exit(1);
6285 }
6286
6287 fakeClient = createFakeClient();
6288 while(1) {
6289 int argc, j;
6290 unsigned long len;
6291 robj **argv;
6292 char buf[128];
6293 sds argsds;
6294 struct redisCommand *cmd;
6295
6296 if (fgets(buf,sizeof(buf),fp) == NULL) {
6297 if (feof(fp))
6298 break;
6299 else
6300 goto readerr;
6301 }
6302 if (buf[0] != '*') goto fmterr;
6303 argc = atoi(buf+1);
6304 argv = zmalloc(sizeof(robj*)*argc);
6305 for (j = 0; j < argc; j++) {
6306 if (fgets(buf,sizeof(buf),fp) == NULL) goto readerr;
6307 if (buf[0] != '$') goto fmterr;
6308 len = strtol(buf+1,NULL,10);
6309 argsds = sdsnewlen(NULL,len);
6310 if (len && fread(argsds,len,1,fp) == 0) goto fmterr;
6311 argv[j] = createObject(REDIS_STRING,argsds);
6312 if (fread(buf,2,1,fp) == 0) goto fmterr; /* discard CRLF */
6313 }
6314
6315 /* Command lookup */
6316 cmd = lookupCommand(argv[0]->ptr);
6317 if (!cmd) {
6318 redisLog(REDIS_WARNING,"Unknown command '%s' reading the append only file", argv[0]->ptr);
6319 exit(1);
6320 }
6321 /* Try object sharing and encoding */
6322 if (server.shareobjects) {
6323 int j;
6324 for(j = 1; j < argc; j++)
6325 argv[j] = tryObjectSharing(argv[j]);
6326 }
6327 if (cmd->flags & REDIS_CMD_BULK)
6328 tryObjectEncoding(argv[argc-1]);
6329 /* Run the command in the context of a fake client */
6330 fakeClient->argc = argc;
6331 fakeClient->argv = argv;
6332 cmd->proc(fakeClient);
6333 /* Discard the reply objects list from the fake client */
6334 while(listLength(fakeClient->reply))
6335 listDelNode(fakeClient->reply,listFirst(fakeClient->reply));
6336 /* Clean up, ready for the next command */
6337 for (j = 0; j < argc; j++) decrRefCount(argv[j]);
6338 zfree(argv);
6339 }
6340 fclose(fp);
6341 freeFakeClient(fakeClient);
6342 return REDIS_OK;
6343
6344readerr:
6345 if (feof(fp)) {
6346 redisLog(REDIS_WARNING,"Unexpected end of file reading the append only file");
6347 } else {
6348 redisLog(REDIS_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno));
6349 }
6350 exit(1);
6351fmterr:
6352 redisLog(REDIS_WARNING,"Bad file format reading the append only file");
6353 exit(1);
6354}
6355
6356/* Write an object into a file in the bulk format $<count>\r\n<payload>\r\n */
6357static int fwriteBulk(FILE *fp, robj *obj) {
6358 char buf[128];
6359 obj = getDecodedObject(obj);
6360 snprintf(buf,sizeof(buf),"$%ld\r\n",(long)sdslen(obj->ptr));
6361 if (fwrite(buf,strlen(buf),1,fp) == 0) goto err;
6362 if (sdslen(obj->ptr) && fwrite(obj->ptr,sdslen(obj->ptr),1,fp) == 0)
6363 goto err;
6364 if (fwrite("\r\n",2,1,fp) == 0) goto err;
6365 decrRefCount(obj);
6366 return 1;
6367err:
6368 decrRefCount(obj);
6369 return 0;
6370}
6371
6372/* Write a double value in bulk format $<count>\r\n<payload>\r\n */
6373static int fwriteBulkDouble(FILE *fp, double d) {
6374 char buf[128], dbuf[128];
6375
6376 snprintf(dbuf,sizeof(dbuf),"%.17g\r\n",d);
6377 snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(dbuf)-2);
6378 if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
6379 if (fwrite(dbuf,strlen(dbuf),1,fp) == 0) return 0;
6380 return 1;
6381}
6382
6383/* Write a long value in bulk format $<count>\r\n<payload>\r\n */
6384static int fwriteBulkLong(FILE *fp, long l) {
6385 char buf[128], lbuf[128];
6386
6387 snprintf(lbuf,sizeof(lbuf),"%ld\r\n",l);
6388 snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(lbuf)-2);
6389 if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
6390 if (fwrite(lbuf,strlen(lbuf),1,fp) == 0) return 0;
6391 return 1;
6392}
6393
6394/* Write a sequence of commands able to fully rebuild the dataset into
6395 * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */
6396static int rewriteAppendOnlyFile(char *filename) {
6397 dictIterator *di = NULL;
6398 dictEntry *de;
6399 FILE *fp;
6400 char tmpfile[256];
6401 int j;
6402 time_t now = time(NULL);
6403
6404 /* Note that we have to use a different temp name here compared to the
6405 * one used by rewriteAppendOnlyFileBackground() function. */
6406 snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
6407 fp = fopen(tmpfile,"w");
6408 if (!fp) {
6409 redisLog(REDIS_WARNING, "Failed rewriting the append only file: %s", strerror(errno));
6410 return REDIS_ERR;
6411 }
6412 for (j = 0; j < server.dbnum; j++) {
6413 char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
6414 redisDb *db = server.db+j;
6415 dict *d = db->dict;
6416 if (dictSize(d) == 0) continue;
6417 di = dictGetIterator(d);
6418 if (!di) {
6419 fclose(fp);
6420 return REDIS_ERR;
6421 }
6422
6423 /* SELECT the new DB */
6424 if (fwrite(selectcmd,sizeof(selectcmd)-1,1,fp) == 0) goto werr;
6425 if (fwriteBulkLong(fp,j) == 0) goto werr;
6426
6427 /* Iterate this DB writing every entry */
6428 while((de = dictNext(di)) != NULL) {
6429 robj *key = dictGetEntryKey(de);
6430 robj *o = dictGetEntryVal(de);
6431 time_t expiretime = getExpire(db,key);
6432
6433 /* Save the key and associated value */
6434 if (o->type == REDIS_STRING) {
6435 /* Emit a SET command */
6436 char cmd[]="*3\r\n$3\r\nSET\r\n";
6437 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6438 /* Key and value */
6439 if (fwriteBulk(fp,key) == 0) goto werr;
6440 if (fwriteBulk(fp,o) == 0) goto werr;
6441 } else if (o->type == REDIS_LIST) {
6442 /* Emit the RPUSHes needed to rebuild the list */
6443 list *list = o->ptr;
6444 listNode *ln;
6445
6446 listRewind(list);
6447 while((ln = listYield(list))) {
6448 char cmd[]="*3\r\n$5\r\nRPUSH\r\n";
6449 robj *eleobj = listNodeValue(ln);
6450
6451 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6452 if (fwriteBulk(fp,key) == 0) goto werr;
6453 if (fwriteBulk(fp,eleobj) == 0) goto werr;
6454 }
6455 } else if (o->type == REDIS_SET) {
6456 /* Emit the SADDs needed to rebuild the set */
6457 dict *set = o->ptr;
6458 dictIterator *di = dictGetIterator(set);
6459 dictEntry *de;
6460
6461 while((de = dictNext(di)) != NULL) {
6462 char cmd[]="*3\r\n$4\r\nSADD\r\n";
6463 robj *eleobj = dictGetEntryKey(de);
6464
6465 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6466 if (fwriteBulk(fp,key) == 0) goto werr;
6467 if (fwriteBulk(fp,eleobj) == 0) goto werr;
6468 }
6469 dictReleaseIterator(di);
6470 } else if (o->type == REDIS_ZSET) {
6471 /* Emit the ZADDs needed to rebuild the sorted set */
6472 zset *zs = o->ptr;
6473 dictIterator *di = dictGetIterator(zs->dict);
6474 dictEntry *de;
6475
6476 while((de = dictNext(di)) != NULL) {
6477 char cmd[]="*4\r\n$4\r\nZADD\r\n";
6478 robj *eleobj = dictGetEntryKey(de);
6479 double *score = dictGetEntryVal(de);
6480
6481 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6482 if (fwriteBulk(fp,key) == 0) goto werr;
6483 if (fwriteBulkDouble(fp,*score) == 0) goto werr;
6484 if (fwriteBulk(fp,eleobj) == 0) goto werr;
6485 }
6486 dictReleaseIterator(di);
6487 } else {
6488 redisAssert(0 != 0);
6489 }
6490 /* Save the expire time */
6491 if (expiretime != -1) {
6492 char cmd[]="*3\r\n$8\r\nEXPIREAT\r\n";
6493 /* If this key is already expired skip it */
6494 if (expiretime < now) continue;
6495 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6496 if (fwriteBulk(fp,key) == 0) goto werr;
6497 if (fwriteBulkLong(fp,expiretime) == 0) goto werr;
6498 }
6499 }
6500 dictReleaseIterator(di);
6501 }
6502
6503 /* Make sure data will not remain on the OS's output buffers */
6504 fflush(fp);
6505 fsync(fileno(fp));
6506 fclose(fp);
6507
6508 /* Use RENAME to make sure the DB file is changed atomically only
6509 * if the generate DB file is ok. */
6510 if (rename(tmpfile,filename) == -1) {
6511 redisLog(REDIS_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
6512 unlink(tmpfile);
6513 return REDIS_ERR;
6514 }
6515 redisLog(REDIS_NOTICE,"SYNC append only file rewrite performed");
6516 return REDIS_OK;
6517
6518werr:
6519 fclose(fp);
6520 unlink(tmpfile);
6521 redisLog(REDIS_WARNING,"Write error writing append only file on disk: %s", strerror(errno));
6522 if (di) dictReleaseIterator(di);
6523 return REDIS_ERR;
6524}
6525
6526/* This is how rewriting of the append only file in background works:
6527 *
6528 * 1) The user calls BGREWRITEAOF
6529 * 2) Redis calls this function, that forks():
6530 * 2a) the child rewrite the append only file in a temp file.
6531 * 2b) the parent accumulates differences in server.bgrewritebuf.
6532 * 3) When the child finished '2a' exists.
6533 * 4) The parent will trap the exit code, if it's OK, will append the
6534 * data accumulated into server.bgrewritebuf into the temp file, and
6535 * finally will rename(2) the temp file in the actual file name.
6536 * The the new file is reopened as the new append only file. Profit!
6537 */
6538static int rewriteAppendOnlyFileBackground(void) {
6539 pid_t childpid;
6540
6541 if (server.bgrewritechildpid != -1) return REDIS_ERR;
6542 if ((childpid = fork()) == 0) {
6543 /* Child */
6544 char tmpfile[256];
6545 close(server.fd);
6546
6547 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
6548 if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {
6549 exit(0);
6550 } else {
6551 exit(1);
6552 }
6553 } else {
6554 /* Parent */
6555 if (childpid == -1) {
6556 redisLog(REDIS_WARNING,
6557 "Can't rewrite append only file in background: fork: %s",
6558 strerror(errno));
6559 return REDIS_ERR;
6560 }
6561 redisLog(REDIS_NOTICE,
6562 "Background append only file rewriting started by pid %d",childpid);
6563 server.bgrewritechildpid = childpid;
6564 /* We set appendseldb to -1 in order to force the next call to the
6565 * feedAppendOnlyFile() to issue a SELECT command, so the differences
6566 * accumulated by the parent into server.bgrewritebuf will start
6567 * with a SELECT statement and it will be safe to merge. */
6568 server.appendseldb = -1;
6569 return REDIS_OK;
6570 }
6571 return REDIS_OK; /* unreached */
6572}
6573
6574static void bgrewriteaofCommand(redisClient *c) {
6575 if (server.bgrewritechildpid != -1) {
6576 addReplySds(c,sdsnew("-ERR background append only file rewriting already in progress\r\n"));
6577 return;
6578 }
6579 if (rewriteAppendOnlyFileBackground() == REDIS_OK) {
6580 char *status = "+Background append only file rewriting started\r\n";
6581 addReplySds(c,sdsnew(status));
6582 } else {
6583 addReply(c,shared.err);
6584 }
6585}
6586
6587static void aofRemoveTempFile(pid_t childpid) {
6588 char tmpfile[256];
6589
6590 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) childpid);
6591 unlink(tmpfile);
6592}
6593
6594/* =============================== Virtual Memory =========================== */
6595static void vmInit(void) {
6596 off_t totsize;
6597
6598 server.vm_fp = fopen("/tmp/redisvm","w+b");
6599 if (server.vm_fp == NULL) {
6600 redisLog(REDIS_WARNING,"Impossible to open the swap file. Exiting.");
6601 exit(1);
6602 }
6603 server.vm_fd = fileno(server.vm_fp);
6604 server.vm_next_page = 0;
6605 server.vm_near_pages = 0;
6606 totsize = server.vm_pages*server.vm_page_size;
6607 redisLog(REDIS_NOTICE,"Allocating %lld bytes of swap file",totsize);
6608 if (ftruncate(server.vm_fd,totsize) == -1) {
6609 redisLog(REDIS_WARNING,"Can't ftruncate swap file: %s. Exiting.",
6610 strerror(errno));
6611 exit(1);
6612 } else {
6613 redisLog(REDIS_NOTICE,"Swap file allocated with success");
6614 }
6615 server.vm_bitmap = zmalloc((server.vm_near_pages+7)/8);
6616 memset(server.vm_bitmap,0,(server.vm_near_pages+7)/8);
6617 /* Try to remove the swap file, so the OS will really delete it from the
6618 * file system when Redis exists. */
6619 unlink("/tmp/redisvm");
6620}
6621
6622/* Mark the page as used */
6623static void vmMarkPageUsed(off_t page) {
6624 off_t byte = page/8;
6625 int bit = page&7;
6626 server.vm_bitmap[byte] |= 1<<bit;
6627}
6628
6629/* Mark N contiguous pages as used, with 'page' being the first. */
6630static void vmMarkPagesUsed(off_t page, off_t count) {
6631 off_t j;
6632
6633 for (j = 0; j < count; j++)
6634 vmMarkPageUsed(page+count);
6635}
6636
6637/* Mark the page as free */
6638static void vmMarkPageFree(off_t page) {
6639 off_t byte = page/8;
6640 int bit = page&7;
6641 server.vm_bitmap[byte] &= ~(1<<bit);
6642}
6643
6644/* Mark N contiguous pages as free, with 'page' being the first. */
6645static void vmMarkPagesFree(off_t page, off_t count) {
6646 off_t j;
6647
6648 for (j = 0; j < count; j++)
6649 vmMarkPageFree(page+count);
6650}
6651
6652/* Test if the page is free */
6653static int vmFreePage(off_t page) {
6654 off_t byte = page/8;
6655 int bit = page&7;
6656 return server.vm_bitmap[byte] & bit;
6657}
6658
6659/* Find N contiguous free pages storing the first page of the cluster in *first.
6660 * Returns 1 if it was able to find N contiguous pages, otherwise 0 is
6661 * returned.
6662 *
6663 * This function uses a simple algorithm: we try to allocate
6664 * REDIS_VM_MAX_NEAR_PAGES sequentially, when we reach this limit we start
6665 * again from the start of the swap file searching for free spaces.
6666 *
6667 * If it looks pretty clear that there are no free pages near our offset
6668 * we try to find less populated places doing a forward jump of
6669 * REDIS_VM_MAX_RANDOM_JUMP, then we start scanning again a few pages
6670 * without hurry, and then we jump again and so forth...
6671 *
6672 * This function can be improved using a free list to avoid to guess
6673 * too much, since we could collect data about freed pages.
6674 *
6675 * note: I implemented this function just after watching an episode of
6676 * Battlestar Galactica, where the hybrid was continuing to say "JUMP!"
6677 */
6678static int vmFindContiguousPages(off_t *first, int n) {
6679 off_t base, offset = 0, since_jump = 0, numfree = 0;
6680
6681 if (server.vm_near_pages == REDIS_VM_MAX_NEAR_PAGES) {
6682 server.vm_near_pages = 0;
6683 server.vm_next_page = 0;
6684 }
6685 server.vm_near_pages++; /* Yet another try for pages near to the old ones */
6686 base = server.vm_next_page;
6687
6688 while(offset < server.vm_pages) {
6689 off_t this = base+offset;
6690
6691 /* If we overflow, restart from page zero */
6692 if (this >= server.vm_pages) {
6693 this -= server.vm_pages;
6694 if (this == 0) {
6695 /* Just overflowed, what we found on tail is no longer
6696 * interesting, as it's no longer contiguous. */
6697 numfree = 0;
6698 }
6699 }
6700 if (vmFreePage(this)) {
6701 /* This is a free page */
6702 numfree++;
6703 /* Already got N free pages? Return to the caller, with success */
6704 if (numfree == n) {
6705 *first = this;
6706 return 1;
6707 }
6708 } else {
6709 /* The current one is not a free page */
6710 numfree = 0;
6711 }
6712
6713 /* Fast-forward if the current page is not free and we already
6714 * searched enough near this place. */
6715 since_jump++;
6716 if (!numfree && since_jump >= REDIS_VM_MAX_RANDOM_JUMP/4) {
6717 offset += random() % REDIS_VM_MAX_RANDOM_JUMP;
6718 since_jump = 0;
6719 /* Note that even if we rewind after the jump, we are don't need
6720 * to make sure numfree is set to zero as we only jump *if* it
6721 * is set to zero. */
6722 } else {
6723 /* Otherwise just check the next page */
6724 offset++;
6725 }
6726 }
6727 return 0;
6728}
6729
6730/* ================================= Debugging ============================== */
6731
6732static void debugCommand(redisClient *c) {
6733 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
6734 *((char*)-1) = 'x';
6735 } else if (!strcasecmp(c->argv[1]->ptr,"reload")) {
6736 if (rdbSave(server.dbfilename) != REDIS_OK) {
6737 addReply(c,shared.err);
6738 return;
6739 }
6740 emptyDb();
6741 if (rdbLoad(server.dbfilename) != REDIS_OK) {
6742 addReply(c,shared.err);
6743 return;
6744 }
6745 redisLog(REDIS_WARNING,"DB reloaded by DEBUG RELOAD");
6746 addReply(c,shared.ok);
6747 } else if (!strcasecmp(c->argv[1]->ptr,"loadaof")) {
6748 emptyDb();
6749 if (loadAppendOnlyFile(server.appendfilename) != REDIS_OK) {
6750 addReply(c,shared.err);
6751 return;
6752 }
6753 redisLog(REDIS_WARNING,"Append Only File loaded by DEBUG LOADAOF");
6754 addReply(c,shared.ok);
6755 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
6756 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
6757 robj *key, *val;
6758
6759 if (!de) {
6760 addReply(c,shared.nokeyerr);
6761 return;
6762 }
6763 key = dictGetEntryKey(de);
6764 val = dictGetEntryVal(de);
6765 addReplySds(c,sdscatprintf(sdsempty(),
6766 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d serializedlength:%lld\r\n",
6767 (void*)key, key->refcount, (void*)val, val->refcount,
6768 val->encoding, rdbSavedObjectLen(val)));
6769 } else {
6770 addReplySds(c,sdsnew(
6771 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>|RELOAD]\r\n"));
6772 }
6773}
6774
6775static void _redisAssert(char *estr) {
6776 redisLog(REDIS_WARNING,"=== ASSERTION FAILED ===");
6777 redisLog(REDIS_WARNING,"==> %s\n",estr);
6778#ifdef HAVE_BACKTRACE
6779 redisLog(REDIS_WARNING,"(forcing SIGSEGV in order to print the stack trace)");
6780 *((char*)-1) = 'x';
6781#endif
6782}
6783
6784/* =================================== Main! ================================ */
6785
6786#ifdef __linux__
6787int linuxOvercommitMemoryValue(void) {
6788 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
6789 char buf[64];
6790
6791 if (!fp) return -1;
6792 if (fgets(buf,64,fp) == NULL) {
6793 fclose(fp);
6794 return -1;
6795 }
6796 fclose(fp);
6797
6798 return atoi(buf);
6799}
6800
6801void linuxOvercommitMemoryWarning(void) {
6802 if (linuxOvercommitMemoryValue() == 0) {
6803 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
6804 }
6805}
6806#endif /* __linux__ */
6807
6808static void daemonize(void) {
6809 int fd;
6810 FILE *fp;
6811
6812 if (fork() != 0) exit(0); /* parent exits */
6813 printf("New pid: %d\n", getpid());
6814 setsid(); /* create a new session */
6815
6816 /* Every output goes to /dev/null. If Redis is daemonized but
6817 * the 'logfile' is set to 'stdout' in the configuration file
6818 * it will not log at all. */
6819 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
6820 dup2(fd, STDIN_FILENO);
6821 dup2(fd, STDOUT_FILENO);
6822 dup2(fd, STDERR_FILENO);
6823 if (fd > STDERR_FILENO) close(fd);
6824 }
6825 /* Try to write the pid file */
6826 fp = fopen(server.pidfile,"w");
6827 if (fp) {
6828 fprintf(fp,"%d\n",getpid());
6829 fclose(fp);
6830 }
6831}
6832
6833int main(int argc, char **argv) {
6834 initServerConfig();
6835 if (argc == 2) {
6836 resetServerSaveParams();
6837 loadServerConfig(argv[1]);
6838 } else if (argc > 2) {
6839 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
6840 exit(1);
6841 } else {
6842 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
6843 }
6844 if (server.daemonize) daemonize();
6845 initServer();
6846 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
6847#ifdef __linux__
6848 linuxOvercommitMemoryWarning();
6849#endif
6850 if (server.appendonly) {
6851 if (loadAppendOnlyFile(server.appendfilename) == REDIS_OK)
6852 redisLog(REDIS_NOTICE,"DB loaded from append only file");
6853 } else {
6854 if (rdbLoad(server.dbfilename) == REDIS_OK)
6855 redisLog(REDIS_NOTICE,"DB loaded from disk");
6856 }
6857 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
6858 acceptHandler, NULL) == AE_ERR) oom("creating file event");
6859 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
6860 aeMain(server.el);
6861 aeDeleteEventLoop(server.el);
6862 return 0;
6863}
6864
6865/* ============================= Backtrace support ========================= */
6866
6867#ifdef HAVE_BACKTRACE
6868static char *findFuncName(void *pointer, unsigned long *offset);
6869
6870static void *getMcontextEip(ucontext_t *uc) {
6871#if defined(__FreeBSD__)
6872 return (void*) uc->uc_mcontext.mc_eip;
6873#elif defined(__dietlibc__)
6874 return (void*) uc->uc_mcontext.eip;
6875#elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
6876 #if __x86_64__
6877 return (void*) uc->uc_mcontext->__ss.__rip;
6878 #else
6879 return (void*) uc->uc_mcontext->__ss.__eip;
6880 #endif
6881#elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
6882 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
6883 return (void*) uc->uc_mcontext->__ss.__rip;
6884 #else
6885 return (void*) uc->uc_mcontext->__ss.__eip;
6886 #endif
6887#elif defined(__i386__) || defined(__X86_64__) || defined(__x86_64__)
6888 return (void*) uc->uc_mcontext.gregs[REG_EIP]; /* Linux 32/64 bit */
6889#elif defined(__ia64__) /* Linux IA64 */
6890 return (void*) uc->uc_mcontext.sc_ip;
6891#else
6892 return NULL;
6893#endif
6894}
6895
6896static void segvHandler(int sig, siginfo_t *info, void *secret) {
6897 void *trace[100];
6898 char **messages = NULL;
6899 int i, trace_size = 0;
6900 unsigned long offset=0;
6901 ucontext_t *uc = (ucontext_t*) secret;
6902 sds infostring;
6903 REDIS_NOTUSED(info);
6904
6905 redisLog(REDIS_WARNING,
6906 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION, sig);
6907 infostring = genRedisInfoString();
6908 redisLog(REDIS_WARNING, "%s",infostring);
6909 /* It's not safe to sdsfree() the returned string under memory
6910 * corruption conditions. Let it leak as we are going to abort */
6911
6912 trace_size = backtrace(trace, 100);
6913 /* overwrite sigaction with caller's address */
6914 if (getMcontextEip(uc) != NULL) {
6915 trace[1] = getMcontextEip(uc);
6916 }
6917 messages = backtrace_symbols(trace, trace_size);
6918
6919 for (i=1; i<trace_size; ++i) {
6920 char *fn = findFuncName(trace[i], &offset), *p;
6921
6922 p = strchr(messages[i],'+');
6923 if (!fn || (p && ((unsigned long)strtol(p+1,NULL,10)) < offset)) {
6924 redisLog(REDIS_WARNING,"%s", messages[i]);
6925 } else {
6926 redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
6927 }
6928 }
6929 /* free(messages); Don't call free() with possibly corrupted memory. */
6930 exit(0);
6931}
6932
6933static void setupSigSegvAction(void) {
6934 struct sigaction act;
6935
6936 sigemptyset (&act.sa_mask);
6937 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
6938 * is used. Otherwise, sa_handler is used */
6939 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
6940 act.sa_sigaction = segvHandler;
6941 sigaction (SIGSEGV, &act, NULL);
6942 sigaction (SIGBUS, &act, NULL);
6943 sigaction (SIGFPE, &act, NULL);
6944 sigaction (SIGILL, &act, NULL);
6945 sigaction (SIGBUS, &act, NULL);
6946 return;
6947}
6948
6949#include "staticsymbols.h"
6950/* This function try to convert a pointer into a function name. It's used in
6951 * oreder to provide a backtrace under segmentation fault that's able to
6952 * display functions declared as static (otherwise the backtrace is useless). */
6953static char *findFuncName(void *pointer, unsigned long *offset){
6954 int i, ret = -1;
6955 unsigned long off, minoff = 0;
6956
6957 /* Try to match against the Symbol with the smallest offset */
6958 for (i=0; symsTable[i].pointer; i++) {
6959 unsigned long lp = (unsigned long) pointer;
6960
6961 if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
6962 off=lp-symsTable[i].pointer;
6963 if (ret < 0 || off < minoff) {
6964 minoff=off;
6965 ret=i;
6966 }
6967 }
6968 }
6969 if (ret == -1) return NULL;
6970 *offset = minoff;
6971 return symsTable[ret].name;
6972}
6973#else /* HAVE_BACKTRACE */
6974static void setupSigSegvAction(void) {
6975}
6976#endif /* HAVE_BACKTRACE */
6977
6978
6979
6980/* The End */
6981
6982
6983