]> git.saurik.com Git - redis.git/blame - redis.c
append only file fixes
[redis.git] / redis.c
CommitLineData
ed9b544e 1/*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
1c0abf3c 30#define REDIS_VERSION "1.07"
23d4709d 31
32#include "fmacros.h"
fbf9bcdb 33#include "config.h"
ed9b544e 34
35#include <stdio.h>
36#include <stdlib.h>
37#include <string.h>
38#include <time.h>
39#include <unistd.h>
c9468bcf 40#define __USE_POSIX199309
ed9b544e 41#include <signal.h>
fbf9bcdb 42
43#ifdef HAVE_BACKTRACE
c9468bcf 44#include <execinfo.h>
45#include <ucontext.h>
fbf9bcdb 46#endif /* HAVE_BACKTRACE */
47
ed9b544e 48#include <sys/wait.h>
49#include <errno.h>
50#include <assert.h>
51#include <ctype.h>
52#include <stdarg.h>
53#include <inttypes.h>
54#include <arpa/inet.h>
55#include <sys/stat.h>
56#include <fcntl.h>
57#include <sys/time.h>
58#include <sys/resource.h>
2895e862 59#include <sys/uio.h>
f78fd11b 60#include <limits.h>
a7866db6 61#include <math.h>
0bc1b2f6 62
63#if defined(__sun)
5043dff3 64#include "solarisfixes.h"
65#endif
ed9b544e 66
c9468bcf 67#include "redis.h"
ed9b544e 68#include "ae.h" /* Event driven programming library */
69#include "sds.h" /* Dynamic safe strings */
70#include "anet.h" /* Networking the easy way */
71#include "dict.h" /* Hash tables */
72#include "adlist.h" /* Linked lists */
73#include "zmalloc.h" /* total memory usage aware version of malloc/free */
5f5b9840 74#include "lzf.h" /* LZF compression library */
75#include "pqsort.h" /* Partial qsort for SORT+LIMIT */
ed9b544e 76
77/* Error codes */
78#define REDIS_OK 0
79#define REDIS_ERR -1
80
81/* Static server configuration */
82#define REDIS_SERVERPORT 6379 /* TCP port */
83#define REDIS_MAXIDLETIME (60*5) /* default client timeout */
6208b3a7 84#define REDIS_IOBUF_LEN 1024
ed9b544e 85#define REDIS_LOADBUF_LEN 1024
93ea3759 86#define REDIS_STATIC_ARGS 4
ed9b544e 87#define REDIS_DEFAULT_DBNUM 16
88#define REDIS_CONFIGLINE_MAX 1024
89#define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
90#define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
94754ccc 91#define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
6f376729 92#define REDIS_MAX_WRITE_PER_EVENT (1024*64)
2895e862 93#define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
94
95/* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */
96#define REDIS_WRITEV_THRESHOLD 3
97/* Max number of iovecs used for each writev call */
98#define REDIS_WRITEV_IOVEC_COUNT 256
ed9b544e 99
100/* Hash table parameters */
101#define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
ed9b544e 102
103/* Command flags */
3fd78bcd 104#define REDIS_CMD_BULK 1 /* Bulk write command */
105#define REDIS_CMD_INLINE 2 /* Inline command */
106/* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
107 this flags will return an error when the 'maxmemory' option is set in the
108 config file and the server is using more than maxmemory bytes of memory.
109 In short this commands are denied on low memory conditions. */
110#define REDIS_CMD_DENYOOM 4
ed9b544e 111
112/* Object types */
113#define REDIS_STRING 0
114#define REDIS_LIST 1
115#define REDIS_SET 2
1812e024 116#define REDIS_ZSET 3
117#define REDIS_HASH 4
f78fd11b 118
942a3961 119/* Objects encoding */
120#define REDIS_ENCODING_RAW 0 /* Raw representation */
121#define REDIS_ENCODING_INT 1 /* Encoded as integer */
122
f78fd11b 123/* Object types only used for dumping to disk */
bb32ede5 124#define REDIS_EXPIRETIME 253
ed9b544e 125#define REDIS_SELECTDB 254
126#define REDIS_EOF 255
127
f78fd11b 128/* Defines related to the dump file format. To store 32 bits lengths for short
129 * keys requires a lot of space, so we check the most significant 2 bits of
130 * the first byte to interpreter the length:
131 *
132 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
133 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
134 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
a4d1ba9a 135 * 11|000000 this means: specially encoded object will follow. The six bits
136 * number specify the kind of object that follows.
137 * See the REDIS_RDB_ENC_* defines.
f78fd11b 138 *
10c43610 139 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
140 * values, will fit inside. */
f78fd11b 141#define REDIS_RDB_6BITLEN 0
142#define REDIS_RDB_14BITLEN 1
143#define REDIS_RDB_32BITLEN 2
17be1a4a 144#define REDIS_RDB_ENCVAL 3
f78fd11b 145#define REDIS_RDB_LENERR UINT_MAX
146
a4d1ba9a 147/* When a length of a string object stored on disk has the first two bits
148 * set, the remaining two bits specify a special encoding for the object
149 * accordingly to the following defines: */
150#define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
151#define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
152#define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
774e3047 153#define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
a4d1ba9a 154
ed9b544e 155/* Client flags */
156#define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
157#define REDIS_SLAVE 2 /* This client is a slave server */
158#define REDIS_MASTER 4 /* This client is a master server */
87eca727 159#define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
ed9b544e 160
40d224a9 161/* Slave replication state - slave side */
ed9b544e 162#define REDIS_REPL_NONE 0 /* No active replication */
163#define REDIS_REPL_CONNECT 1 /* Must connect to master */
164#define REDIS_REPL_CONNECTED 2 /* Connected to master */
165
40d224a9 166/* Slave replication state - from the point of view of master
167 * Note that in SEND_BULK and ONLINE state the slave receives new updates
168 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
169 * to start the next background saving in order to send updates to it. */
170#define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
171#define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
172#define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
173#define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
174
ed9b544e 175/* List related stuff */
176#define REDIS_HEAD 0
177#define REDIS_TAIL 1
178
179/* Sort operations */
180#define REDIS_SORT_GET 0
443c6409 181#define REDIS_SORT_ASC 1
182#define REDIS_SORT_DESC 2
ed9b544e 183#define REDIS_SORTKEY_MAX 1024
184
185/* Log levels */
186#define REDIS_DEBUG 0
187#define REDIS_NOTICE 1
188#define REDIS_WARNING 2
189
190/* Anti-warning macro... */
191#define REDIS_NOTUSED(V) ((void) V)
192
6b47e12e 193#define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
194#define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
ed9b544e 195
48f0308a 196/* Append only defines */
197#define APPENDFSYNC_NO 0
198#define APPENDFSYNC_ALWAYS 1
199#define APPENDFSYNC_EVERYSEC 2
200
ed9b544e 201/*================================= Data types ============================== */
202
203/* A redis object, that is a type able to hold a string / list / set */
204typedef struct redisObject {
ed9b544e 205 void *ptr;
942a3961 206 unsigned char type;
207 unsigned char encoding;
208 unsigned char notused[2];
ed9b544e 209 int refcount;
210} robj;
211
3305306f 212typedef struct redisDb {
213 dict *dict;
214 dict *expires;
215 int id;
216} redisDb;
217
ed9b544e 218/* With multiplexing we need to take per-clinet state.
219 * Clients are taken in a liked list. */
220typedef struct redisClient {
221 int fd;
3305306f 222 redisDb *db;
ed9b544e 223 int dictid;
224 sds querybuf;
e8a74421 225 robj **argv, **mbargv;
226 int argc, mbargc;
40d224a9 227 int bulklen; /* bulk read len. -1 if not in bulk read mode */
e8a74421 228 int multibulk; /* multi bulk command format active */
ed9b544e 229 list *reply;
230 int sentlen;
231 time_t lastinteraction; /* time of the last interaction, used for timeout */
40d224a9 232 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
233 int slaveseldb; /* slave selected db, if this client is a slave */
234 int authenticated; /* when requirepass is non-NULL */
235 int replstate; /* replication state if this is a slave */
236 int repldbfd; /* replication DB file descriptor */
6208b3a7 237 long repldboff; /* replication DB file offset */
40d224a9 238 off_t repldbsize; /* replication DB file size */
ed9b544e 239} redisClient;
240
241struct saveparam {
242 time_t seconds;
243 int changes;
244};
245
246/* Global server state structure */
247struct redisServer {
248 int port;
249 int fd;
3305306f 250 redisDb *db;
10c43610 251 dict *sharingpool;
252 unsigned int sharingpoolsize;
ed9b544e 253 long long dirty; /* changes to DB from the last save */
254 list *clients;
87eca727 255 list *slaves, *monitors;
ed9b544e 256 char neterr[ANET_ERR_LEN];
257 aeEventLoop *el;
258 int cronloops; /* number of times the cron function run */
259 list *objfreelist; /* A list of freed objects to avoid malloc() */
260 time_t lastsave; /* Unix time of last save succeeede */
5fba9f71 261 size_t usedmemory; /* Used memory in megabytes */
ed9b544e 262 /* Fields used only for stats */
263 time_t stat_starttime; /* server start time */
264 long long stat_numcommands; /* number of processed commands */
265 long long stat_numconnections; /* number of connections received */
266 /* Configuration */
267 int verbosity;
268 int glueoutputbuf;
269 int maxidletime;
270 int dbnum;
271 int daemonize;
44b38ef4 272 int appendonly;
48f0308a 273 int appendfsync;
274 time_t lastfsync;
44b38ef4 275 int appendfd;
276 int appendseldb;
ed329fcf 277 char *pidfile;
9f3c422c 278 pid_t bgsavechildpid;
9d65a1bb 279 pid_t bgrewritechildpid;
280 sds bgrewritebuf; /* buffer taken by parent during oppend only rewrite */
ed9b544e 281 struct saveparam *saveparams;
282 int saveparamslen;
283 char *logfile;
284 char *bindaddr;
285 char *dbfilename;
44b38ef4 286 char *appendfilename;
abcb223e 287 char *requirepass;
10c43610 288 int shareobjects;
ed9b544e 289 /* Replication related */
290 int isslave;
d0ccebcf 291 char *masterauth;
ed9b544e 292 char *masterhost;
293 int masterport;
40d224a9 294 redisClient *master; /* client that is master for this slave */
ed9b544e 295 int replstate;
285add55 296 unsigned int maxclients;
d4465900 297 unsigned long maxmemory;
ed9b544e 298 /* Sort parameters - qsort_r() is only available under BSD so we
299 * have to take this state global, in order to pass it to sortCompare() */
300 int sort_desc;
301 int sort_alpha;
302 int sort_bypattern;
303};
304
305typedef void redisCommandProc(redisClient *c);
306struct redisCommand {
307 char *name;
308 redisCommandProc *proc;
309 int arity;
310 int flags;
311};
312
de96dbfe 313struct redisFunctionSym {
314 char *name;
56906eef 315 unsigned long pointer;
de96dbfe 316};
317
ed9b544e 318typedef struct _redisSortObject {
319 robj *obj;
320 union {
321 double score;
322 robj *cmpobj;
323 } u;
324} redisSortObject;
325
326typedef struct _redisSortOperation {
327 int type;
328 robj *pattern;
329} redisSortOperation;
330
6b47e12e 331/* ZSETs use a specialized version of Skiplists */
332
333typedef struct zskiplistNode {
334 struct zskiplistNode **forward;
e3870fab 335 struct zskiplistNode *backward;
6b47e12e 336 double score;
337 robj *obj;
338} zskiplistNode;
339
340typedef struct zskiplist {
e3870fab 341 struct zskiplistNode *header, *tail;
d13f767c 342 unsigned long length;
6b47e12e 343 int level;
344} zskiplist;
345
1812e024 346typedef struct zset {
347 dict *dict;
6b47e12e 348 zskiplist *zsl;
1812e024 349} zset;
350
6b47e12e 351/* Our shared "common" objects */
352
ed9b544e 353struct sharedObjectsStruct {
c937aa89 354 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
7b45bfb2 355 *colon, *nullbulk, *nullmultibulk,
c937aa89 356 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
357 *outofrangeerr, *plus,
ed9b544e 358 *select0, *select1, *select2, *select3, *select4,
359 *select5, *select6, *select7, *select8, *select9;
360} shared;
361
a7866db6 362/* Global vars that are actally used as constants. The following double
363 * values are used for double on-disk serialization, and are initialized
364 * at runtime to avoid strange compiler optimizations. */
365
366static double R_Zero, R_PosInf, R_NegInf, R_Nan;
367
ed9b544e 368/*================================ Prototypes =============================== */
369
370static void freeStringObject(robj *o);
371static void freeListObject(robj *o);
372static void freeSetObject(robj *o);
373static void decrRefCount(void *o);
374static robj *createObject(int type, void *ptr);
375static void freeClient(redisClient *c);
f78fd11b 376static int rdbLoad(char *filename);
ed9b544e 377static void addReply(redisClient *c, robj *obj);
378static void addReplySds(redisClient *c, sds s);
379static void incrRefCount(robj *o);
f78fd11b 380static int rdbSaveBackground(char *filename);
ed9b544e 381static robj *createStringObject(char *ptr, size_t len);
87eca727 382static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
44b38ef4 383static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc);
ed9b544e 384static int syncWithMaster(void);
10c43610 385static robj *tryObjectSharing(robj *o);
942a3961 386static int tryObjectEncoding(robj *o);
9d65a1bb 387static robj *getDecodedObject(robj *o);
3305306f 388static int removeExpire(redisDb *db, robj *key);
389static int expireIfNeeded(redisDb *db, robj *key);
390static int deleteIfVolatile(redisDb *db, robj *key);
94754ccc 391static int deleteKey(redisDb *db, robj *key);
bb32ede5 392static time_t getExpire(redisDb *db, robj *key);
393static int setExpire(redisDb *db, robj *key, time_t when);
a3b21203 394static void updateSlavesWaitingBgsave(int bgsaveerr);
3fd78bcd 395static void freeMemoryIfNeeded(void);
de96dbfe 396static int processCommand(redisClient *c);
56906eef 397static void setupSigSegvAction(void);
a3b21203 398static void rdbRemoveTempFile(pid_t childpid);
9d65a1bb 399static void aofRemoveTempFile(pid_t childpid);
0ea663ea 400static size_t stringObjectLen(robj *o);
638e42ac 401static void processInputBuffer(redisClient *c);
6b47e12e 402static zskiplist *zslCreate(void);
fd8ccf44 403static void zslFree(zskiplist *zsl);
2b59cfdf 404static void zslInsert(zskiplist *zsl, double score, robj *obj);
2895e862 405static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask);
ed9b544e 406
abcb223e 407static void authCommand(redisClient *c);
ed9b544e 408static void pingCommand(redisClient *c);
409static void echoCommand(redisClient *c);
410static void setCommand(redisClient *c);
411static void setnxCommand(redisClient *c);
412static void getCommand(redisClient *c);
413static void delCommand(redisClient *c);
414static void existsCommand(redisClient *c);
415static void incrCommand(redisClient *c);
416static void decrCommand(redisClient *c);
417static void incrbyCommand(redisClient *c);
418static void decrbyCommand(redisClient *c);
419static void selectCommand(redisClient *c);
420static void randomkeyCommand(redisClient *c);
421static void keysCommand(redisClient *c);
422static void dbsizeCommand(redisClient *c);
423static void lastsaveCommand(redisClient *c);
424static void saveCommand(redisClient *c);
425static void bgsaveCommand(redisClient *c);
9d65a1bb 426static void bgrewriteaofCommand(redisClient *c);
ed9b544e 427static void shutdownCommand(redisClient *c);
428static void moveCommand(redisClient *c);
429static void renameCommand(redisClient *c);
430static void renamenxCommand(redisClient *c);
431static void lpushCommand(redisClient *c);
432static void rpushCommand(redisClient *c);
433static void lpopCommand(redisClient *c);
434static void rpopCommand(redisClient *c);
435static void llenCommand(redisClient *c);
436static void lindexCommand(redisClient *c);
437static void lrangeCommand(redisClient *c);
438static void ltrimCommand(redisClient *c);
439static void typeCommand(redisClient *c);
440static void lsetCommand(redisClient *c);
441static void saddCommand(redisClient *c);
442static void sremCommand(redisClient *c);
a4460ef4 443static void smoveCommand(redisClient *c);
ed9b544e 444static void sismemberCommand(redisClient *c);
445static void scardCommand(redisClient *c);
12fea928 446static void spopCommand(redisClient *c);
2abb95a9 447static void srandmemberCommand(redisClient *c);
ed9b544e 448static void sinterCommand(redisClient *c);
449static void sinterstoreCommand(redisClient *c);
40d224a9 450static void sunionCommand(redisClient *c);
451static void sunionstoreCommand(redisClient *c);
f4f56e1d 452static void sdiffCommand(redisClient *c);
453static void sdiffstoreCommand(redisClient *c);
ed9b544e 454static void syncCommand(redisClient *c);
455static void flushdbCommand(redisClient *c);
456static void flushallCommand(redisClient *c);
457static void sortCommand(redisClient *c);
458static void lremCommand(redisClient *c);
0f5f7e9a 459static void rpoplpushcommand(redisClient *c);
ed9b544e 460static void infoCommand(redisClient *c);
70003d28 461static void mgetCommand(redisClient *c);
87eca727 462static void monitorCommand(redisClient *c);
3305306f 463static void expireCommand(redisClient *c);
802e8373 464static void expireatCommand(redisClient *c);
f6b141c5 465static void getsetCommand(redisClient *c);
fd88489a 466static void ttlCommand(redisClient *c);
321b0e13 467static void slaveofCommand(redisClient *c);
7f957c92 468static void debugCommand(redisClient *c);
f6b141c5 469static void msetCommand(redisClient *c);
470static void msetnxCommand(redisClient *c);
fd8ccf44 471static void zaddCommand(redisClient *c);
7db723ad 472static void zincrbyCommand(redisClient *c);
cc812361 473static void zrangeCommand(redisClient *c);
50c55df5 474static void zrangebyscoreCommand(redisClient *c);
e3870fab 475static void zrevrangeCommand(redisClient *c);
3c41331e 476static void zcardCommand(redisClient *c);
1b7106e7 477static void zremCommand(redisClient *c);
6e333bbe 478static void zscoreCommand(redisClient *c);
1807985b 479static void zremrangebyscoreCommand(redisClient *c);
f6b141c5 480
ed9b544e 481/*================================= Globals ================================= */
482
483/* Global vars */
484static struct redisServer server; /* server global state */
485static struct redisCommand cmdTable[] = {
486 {"get",getCommand,2,REDIS_CMD_INLINE},
3fd78bcd 487 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
488 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
5109cdff 489 {"del",delCommand,-2,REDIS_CMD_INLINE},
ed9b544e 490 {"exists",existsCommand,2,REDIS_CMD_INLINE},
3fd78bcd 491 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
492 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
70003d28 493 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
3fd78bcd 494 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
495 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
ed9b544e 496 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
497 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
498 {"llen",llenCommand,2,REDIS_CMD_INLINE},
499 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
3fd78bcd 500 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
ed9b544e 501 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
502 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
503 {"lrem",lremCommand,4,REDIS_CMD_BULK},
0f5f7e9a 504 {"rpoplpush",rpoplpushcommand,3,REDIS_CMD_BULK},
3fd78bcd 505 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
ed9b544e 506 {"srem",sremCommand,3,REDIS_CMD_BULK},
a4460ef4 507 {"smove",smoveCommand,4,REDIS_CMD_BULK},
ed9b544e 508 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
509 {"scard",scardCommand,2,REDIS_CMD_INLINE},
12fea928 510 {"spop",spopCommand,2,REDIS_CMD_INLINE},
2abb95a9 511 {"srandmember",srandmemberCommand,2,REDIS_CMD_INLINE},
3fd78bcd 512 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
513 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
514 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
515 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
516 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
517 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
ed9b544e 518 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
fd8ccf44 519 {"zadd",zaddCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
7db723ad 520 {"zincrby",zincrbyCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
1b7106e7 521 {"zrem",zremCommand,3,REDIS_CMD_BULK},
1807985b 522 {"zremrangebyscore",zremrangebyscoreCommand,4,REDIS_CMD_INLINE},
cc812361 523 {"zrange",zrangeCommand,4,REDIS_CMD_INLINE},
50c55df5 524 {"zrangebyscore",zrangebyscoreCommand,4,REDIS_CMD_INLINE},
e3870fab 525 {"zrevrange",zrevrangeCommand,4,REDIS_CMD_INLINE},
3c41331e 526 {"zcard",zcardCommand,2,REDIS_CMD_INLINE},
6e333bbe 527 {"zscore",zscoreCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
3fd78bcd 528 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
529 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
f6b141c5 530 {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
531 {"mset",msetCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
532 {"msetnx",msetnxCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
ed9b544e 533 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
534 {"select",selectCommand,2,REDIS_CMD_INLINE},
535 {"move",moveCommand,3,REDIS_CMD_INLINE},
536 {"rename",renameCommand,3,REDIS_CMD_INLINE},
537 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
321b0e13 538 {"expire",expireCommand,3,REDIS_CMD_INLINE},
802e8373 539 {"expireat",expireatCommand,3,REDIS_CMD_INLINE},
ed9b544e 540 {"keys",keysCommand,2,REDIS_CMD_INLINE},
541 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
abcb223e 542 {"auth",authCommand,2,REDIS_CMD_INLINE},
ed9b544e 543 {"ping",pingCommand,1,REDIS_CMD_INLINE},
544 {"echo",echoCommand,2,REDIS_CMD_BULK},
545 {"save",saveCommand,1,REDIS_CMD_INLINE},
546 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
9d65a1bb 547 {"bgrewriteaof",bgrewriteaofCommand,1,REDIS_CMD_INLINE},
ed9b544e 548 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
549 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
550 {"type",typeCommand,2,REDIS_CMD_INLINE},
551 {"sync",syncCommand,1,REDIS_CMD_INLINE},
552 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
553 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
3fd78bcd 554 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
ed9b544e 555 {"info",infoCommand,1,REDIS_CMD_INLINE},
87eca727 556 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
fd88489a 557 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
321b0e13 558 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
7f957c92 559 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
ed9b544e 560 {NULL,NULL,0,0}
561};
bcfc686d 562
ed9b544e 563/*============================ Utility functions ============================ */
564
565/* Glob-style pattern matching. */
566int stringmatchlen(const char *pattern, int patternLen,
567 const char *string, int stringLen, int nocase)
568{
569 while(patternLen) {
570 switch(pattern[0]) {
571 case '*':
572 while (pattern[1] == '*') {
573 pattern++;
574 patternLen--;
575 }
576 if (patternLen == 1)
577 return 1; /* match */
578 while(stringLen) {
579 if (stringmatchlen(pattern+1, patternLen-1,
580 string, stringLen, nocase))
581 return 1; /* match */
582 string++;
583 stringLen--;
584 }
585 return 0; /* no match */
586 break;
587 case '?':
588 if (stringLen == 0)
589 return 0; /* no match */
590 string++;
591 stringLen--;
592 break;
593 case '[':
594 {
595 int not, match;
596
597 pattern++;
598 patternLen--;
599 not = pattern[0] == '^';
600 if (not) {
601 pattern++;
602 patternLen--;
603 }
604 match = 0;
605 while(1) {
606 if (pattern[0] == '\\') {
607 pattern++;
608 patternLen--;
609 if (pattern[0] == string[0])
610 match = 1;
611 } else if (pattern[0] == ']') {
612 break;
613 } else if (patternLen == 0) {
614 pattern--;
615 patternLen++;
616 break;
617 } else if (pattern[1] == '-' && patternLen >= 3) {
618 int start = pattern[0];
619 int end = pattern[2];
620 int c = string[0];
621 if (start > end) {
622 int t = start;
623 start = end;
624 end = t;
625 }
626 if (nocase) {
627 start = tolower(start);
628 end = tolower(end);
629 c = tolower(c);
630 }
631 pattern += 2;
632 patternLen -= 2;
633 if (c >= start && c <= end)
634 match = 1;
635 } else {
636 if (!nocase) {
637 if (pattern[0] == string[0])
638 match = 1;
639 } else {
640 if (tolower((int)pattern[0]) == tolower((int)string[0]))
641 match = 1;
642 }
643 }
644 pattern++;
645 patternLen--;
646 }
647 if (not)
648 match = !match;
649 if (!match)
650 return 0; /* no match */
651 string++;
652 stringLen--;
653 break;
654 }
655 case '\\':
656 if (patternLen >= 2) {
657 pattern++;
658 patternLen--;
659 }
660 /* fall through */
661 default:
662 if (!nocase) {
663 if (pattern[0] != string[0])
664 return 0; /* no match */
665 } else {
666 if (tolower((int)pattern[0]) != tolower((int)string[0]))
667 return 0; /* no match */
668 }
669 string++;
670 stringLen--;
671 break;
672 }
673 pattern++;
674 patternLen--;
675 if (stringLen == 0) {
676 while(*pattern == '*') {
677 pattern++;
678 patternLen--;
679 }
680 break;
681 }
682 }
683 if (patternLen == 0 && stringLen == 0)
684 return 1;
685 return 0;
686}
687
56906eef 688static void redisLog(int level, const char *fmt, ...) {
ed9b544e 689 va_list ap;
690 FILE *fp;
691
692 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
693 if (!fp) return;
694
695 va_start(ap, fmt);
696 if (level >= server.verbosity) {
697 char *c = ".-*";
1904ecc1 698 char buf[64];
699 time_t now;
700
701 now = time(NULL);
6c9385e0 702 strftime(buf,64,"%d %b %H:%M:%S",localtime(&now));
1904ecc1 703 fprintf(fp,"%s %c ",buf,c[level]);
ed9b544e 704 vfprintf(fp, fmt, ap);
705 fprintf(fp,"\n");
706 fflush(fp);
707 }
708 va_end(ap);
709
710 if (server.logfile) fclose(fp);
711}
712
713/*====================== Hash table type implementation ==================== */
714
715/* This is an hash table type that uses the SDS dynamic strings libary as
716 * keys and radis objects as values (objects can hold SDS strings,
717 * lists, sets). */
718
1812e024 719static void dictVanillaFree(void *privdata, void *val)
720{
721 DICT_NOTUSED(privdata);
722 zfree(val);
723}
724
ed9b544e 725static int sdsDictKeyCompare(void *privdata, const void *key1,
726 const void *key2)
727{
728 int l1,l2;
729 DICT_NOTUSED(privdata);
730
731 l1 = sdslen((sds)key1);
732 l2 = sdslen((sds)key2);
733 if (l1 != l2) return 0;
734 return memcmp(key1, key2, l1) == 0;
735}
736
737static void dictRedisObjectDestructor(void *privdata, void *val)
738{
739 DICT_NOTUSED(privdata);
740
741 decrRefCount(val);
742}
743
942a3961 744static int dictObjKeyCompare(void *privdata, const void *key1,
ed9b544e 745 const void *key2)
746{
747 const robj *o1 = key1, *o2 = key2;
748 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
749}
750
942a3961 751static unsigned int dictObjHash(const void *key) {
ed9b544e 752 const robj *o = key;
753 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
754}
755
942a3961 756static int dictEncObjKeyCompare(void *privdata, const void *key1,
757 const void *key2)
758{
9d65a1bb 759 robj *o1 = (robj*) key1, *o2 = (robj*) key2;
760 int cmp;
942a3961 761
9d65a1bb 762 o1 = getDecodedObject(o1);
763 o2 = getDecodedObject(o2);
764 cmp = sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
765 decrRefCount(o1);
766 decrRefCount(o2);
767 return cmp;
942a3961 768}
769
770static unsigned int dictEncObjHash(const void *key) {
9d65a1bb 771 robj *o = (robj*) key;
942a3961 772
9d65a1bb 773 o = getDecodedObject(o);
774 unsigned int hash = dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
775 decrRefCount(o);
776 return hash;
942a3961 777}
778
ed9b544e 779static dictType setDictType = {
942a3961 780 dictEncObjHash, /* hash function */
ed9b544e 781 NULL, /* key dup */
782 NULL, /* val dup */
942a3961 783 dictEncObjKeyCompare, /* key compare */
ed9b544e 784 dictRedisObjectDestructor, /* key destructor */
785 NULL /* val destructor */
786};
787
1812e024 788static dictType zsetDictType = {
789 dictEncObjHash, /* hash function */
790 NULL, /* key dup */
791 NULL, /* val dup */
792 dictEncObjKeyCompare, /* key compare */
793 dictRedisObjectDestructor, /* key destructor */
794 dictVanillaFree /* val destructor */
795};
796
ed9b544e 797static dictType hashDictType = {
942a3961 798 dictObjHash, /* hash function */
ed9b544e 799 NULL, /* key dup */
800 NULL, /* val dup */
942a3961 801 dictObjKeyCompare, /* key compare */
ed9b544e 802 dictRedisObjectDestructor, /* key destructor */
803 dictRedisObjectDestructor /* val destructor */
804};
805
806/* ========================= Random utility functions ======================= */
807
808/* Redis generally does not try to recover from out of memory conditions
809 * when allocating objects or strings, it is not clear if it will be possible
810 * to report this condition to the client since the networking layer itself
811 * is based on heap allocation for send buffers, so we simply abort.
812 * At least the code will be simpler to read... */
813static void oom(const char *msg) {
814 fprintf(stderr, "%s: Out of memory\n",msg);
815 fflush(stderr);
816 sleep(1);
817 abort();
818}
819
820/* ====================== Redis server networking stuff ===================== */
56906eef 821static void closeTimedoutClients(void) {
ed9b544e 822 redisClient *c;
ed9b544e 823 listNode *ln;
824 time_t now = time(NULL);
825
6208b3a7 826 listRewind(server.clients);
827 while ((ln = listYield(server.clients)) != NULL) {
ed9b544e 828 c = listNodeValue(ln);
829 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
c7cf2ec9 830 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
ed9b544e 831 (now - c->lastinteraction > server.maxidletime)) {
832 redisLog(REDIS_DEBUG,"Closing idle client");
833 freeClient(c);
834 }
835 }
ed9b544e 836}
837
12fea928 838static int htNeedsResize(dict *dict) {
839 long long size, used;
840
841 size = dictSlots(dict);
842 used = dictSize(dict);
843 return (size && used && size > DICT_HT_INITIAL_SIZE &&
844 (used*100/size < REDIS_HT_MINFILL));
845}
846
0bc03378 847/* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
848 * we resize the hash table to save memory */
56906eef 849static void tryResizeHashTables(void) {
0bc03378 850 int j;
851
852 for (j = 0; j < server.dbnum; j++) {
12fea928 853 if (htNeedsResize(server.db[j].dict)) {
854 redisLog(REDIS_DEBUG,"The hash table %d is too sparse, resize it...",j);
0bc03378 855 dictResize(server.db[j].dict);
12fea928 856 redisLog(REDIS_DEBUG,"Hash table %d resized.",j);
0bc03378 857 }
12fea928 858 if (htNeedsResize(server.db[j].expires))
859 dictResize(server.db[j].expires);
0bc03378 860 }
861}
862
9d65a1bb 863/* A background saving child (BGSAVE) terminated its work. Handle this. */
864void backgroundSaveDoneHandler(int statloc) {
865 int exitcode = WEXITSTATUS(statloc);
866 int bysignal = WIFSIGNALED(statloc);
867
868 if (!bysignal && exitcode == 0) {
869 redisLog(REDIS_NOTICE,
870 "Background saving terminated with success");
871 server.dirty = 0;
872 server.lastsave = time(NULL);
873 } else if (!bysignal && exitcode != 0) {
874 redisLog(REDIS_WARNING, "Background saving error");
875 } else {
876 redisLog(REDIS_WARNING,
877 "Background saving terminated by signal");
878 rdbRemoveTempFile(server.bgsavechildpid);
879 }
880 server.bgsavechildpid = -1;
881 /* Possibly there are slaves waiting for a BGSAVE in order to be served
882 * (the first stage of SYNC is a bulk transfer of dump.rdb) */
883 updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
884}
885
886/* A background append only file rewriting (BGREWRITEAOF) terminated its work.
887 * Handle this. */
888void backgroundRewriteDoneHandler(int statloc) {
889 int exitcode = WEXITSTATUS(statloc);
890 int bysignal = WIFSIGNALED(statloc);
891
892 if (!bysignal && exitcode == 0) {
893 int fd;
894 char tmpfile[256];
895
896 redisLog(REDIS_NOTICE,
897 "Background append only file rewriting terminated with success");
898 /* Now it's time to flush the differences accumulated by the parent */
899 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) server.bgrewritechildpid);
900 fd = open(tmpfile,O_WRONLY|O_APPEND);
901 if (fd == -1) {
902 redisLog(REDIS_WARNING, "Not able to open the temp append only file produced by the child: %s", strerror(errno));
903 goto cleanup;
904 }
905 /* Flush our data... */
906 if (write(fd,server.bgrewritebuf,sdslen(server.bgrewritebuf)) !=
907 (signed) sdslen(server.bgrewritebuf)) {
908 redisLog(REDIS_WARNING, "Error or short write trying to flush the parent diff of the append log file in the child temp file: %s", strerror(errno));
909 close(fd);
910 goto cleanup;
911 }
912 redisLog(REDIS_WARNING,"Parent diff flushed into the new append log file with success");
913 /* Now our work is to rename the temp file into the stable file. And
914 * switch the file descriptor used by the server for append only. */
915 if (rename(tmpfile,server.appendfilename) == -1) {
916 redisLog(REDIS_WARNING,"Can't rename the temp append only file into the stable one: %s", strerror(errno));
917 close(fd);
918 goto cleanup;
919 }
920 /* Mission completed... almost */
921 redisLog(REDIS_NOTICE,"Append only file successfully rewritten.");
922 if (server.appendfd != -1) {
923 /* If append only is actually enabled... */
924 close(server.appendfd);
925 server.appendfd = fd;
926 fsync(fd);
85a83172 927 server.appendseldb = -1; /* Make sure it will issue SELECT */
9d65a1bb 928 redisLog(REDIS_NOTICE,"The new append only file was selected for future appends.");
929 } else {
930 /* If append only is disabled we just generate a dump in this
931 * format. Why not? */
932 close(fd);
933 }
934 } else if (!bysignal && exitcode != 0) {
935 redisLog(REDIS_WARNING, "Background append only file rewriting error");
936 } else {
937 redisLog(REDIS_WARNING,
938 "Background append only file rewriting terminated by signal");
939 }
940cleanup:
941 sdsfree(server.bgrewritebuf);
942 server.bgrewritebuf = sdsempty();
943 aofRemoveTempFile(server.bgrewritechildpid);
944 server.bgrewritechildpid = -1;
945}
946
56906eef 947static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
94754ccc 948 int j, loops = server.cronloops++;
ed9b544e 949 REDIS_NOTUSED(eventLoop);
950 REDIS_NOTUSED(id);
951 REDIS_NOTUSED(clientData);
952
953 /* Update the global state with the amount of used memory */
954 server.usedmemory = zmalloc_used_memory();
955
0bc03378 956 /* Show some info about non-empty databases */
ed9b544e 957 for (j = 0; j < server.dbnum; j++) {
dec423d9 958 long long size, used, vkeys;
94754ccc 959
3305306f 960 size = dictSlots(server.db[j].dict);
961 used = dictSize(server.db[j].dict);
94754ccc 962 vkeys = dictSize(server.db[j].expires);
c3cb078d 963 if (!(loops % 5) && (used || vkeys)) {
964 redisLog(REDIS_DEBUG,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
a4d1ba9a 965 /* dictPrintStats(server.dict); */
ed9b544e 966 }
ed9b544e 967 }
968
0bc03378 969 /* We don't want to resize the hash tables while a bacground saving
970 * is in progress: the saving child is created using fork() that is
971 * implemented with a copy-on-write semantic in most modern systems, so
972 * if we resize the HT while there is the saving child at work actually
973 * a lot of memory movements in the parent will cause a lot of pages
974 * copied. */
9d65a1bb 975 if (server.bgsavechildpid == -1) tryResizeHashTables();
0bc03378 976
ed9b544e 977 /* Show information about connected clients */
978 if (!(loops % 5)) {
21aecf4b 979 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
ed9b544e 980 listLength(server.clients)-listLength(server.slaves),
981 listLength(server.slaves),
10c43610 982 server.usedmemory,
3305306f 983 dictSize(server.sharingpool));
ed9b544e 984 }
985
986 /* Close connections of timedout clients */
0150db36 987 if (server.maxidletime && !(loops % 10))
ed9b544e 988 closeTimedoutClients();
989
9d65a1bb 990 /* Check if a background saving or AOF rewrite in progress terminated */
991 if (server.bgsavechildpid != -1 || server.bgrewritechildpid != -1) {
ed9b544e 992 int statloc;
9d65a1bb 993 pid_t pid;
994
995 if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
996 if (pid == server.bgsavechildpid) {
997 backgroundSaveDoneHandler(statloc);
ed9b544e 998 } else {
9d65a1bb 999 backgroundRewriteDoneHandler(statloc);
ed9b544e 1000 }
ed9b544e 1001 }
1002 } else {
1003 /* If there is not a background saving in progress check if
1004 * we have to save now */
1005 time_t now = time(NULL);
1006 for (j = 0; j < server.saveparamslen; j++) {
1007 struct saveparam *sp = server.saveparams+j;
1008
1009 if (server.dirty >= sp->changes &&
1010 now-server.lastsave > sp->seconds) {
1011 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
1012 sp->changes, sp->seconds);
f78fd11b 1013 rdbSaveBackground(server.dbfilename);
ed9b544e 1014 break;
1015 }
1016 }
1017 }
94754ccc 1018
f2324293 1019 /* Try to expire a few timed out keys. The algorithm used is adaptive and
1020 * will use few CPU cycles if there are few expiring keys, otherwise
1021 * it will get more aggressive to avoid that too much memory is used by
1022 * keys that can be removed from the keyspace. */
94754ccc 1023 for (j = 0; j < server.dbnum; j++) {
f2324293 1024 int expired;
94754ccc 1025 redisDb *db = server.db+j;
94754ccc 1026
f2324293 1027 /* Continue to expire if at the end of the cycle more than 25%
1028 * of the keys were expired. */
1029 do {
1030 int num = dictSize(db->expires);
94754ccc 1031 time_t now = time(NULL);
1032
f2324293 1033 expired = 0;
94754ccc 1034 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
1035 num = REDIS_EXPIRELOOKUPS_PER_CRON;
1036 while (num--) {
1037 dictEntry *de;
1038 time_t t;
1039
1040 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
1041 t = (time_t) dictGetEntryVal(de);
1042 if (now > t) {
1043 deleteKey(db,dictGetEntryKey(de));
f2324293 1044 expired++;
94754ccc 1045 }
1046 }
f2324293 1047 } while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4);
94754ccc 1048 }
1049
ed9b544e 1050 /* Check if we should connect to a MASTER */
1051 if (server.replstate == REDIS_REPL_CONNECT) {
1052 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
1053 if (syncWithMaster() == REDIS_OK) {
1054 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
1055 }
1056 }
1057 return 1000;
1058}
1059
1060static void createSharedObjects(void) {
1061 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
1062 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
1063 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
c937aa89 1064 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
1065 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
1066 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
1067 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
1068 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
1069 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
ed9b544e 1070 /* no such key */
ed9b544e 1071 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
1072 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
1073 "-ERR Operation against a key holding the wrong kind of value\r\n"));
ed9b544e 1074 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
1075 "-ERR no such key\r\n"));
ed9b544e 1076 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
1077 "-ERR syntax error\r\n"));
c937aa89 1078 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
1079 "-ERR source and destination objects are the same\r\n"));
1080 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
1081 "-ERR index out of range\r\n"));
ed9b544e 1082 shared.space = createObject(REDIS_STRING,sdsnew(" "));
c937aa89 1083 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
1084 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
ed9b544e 1085 shared.select0 = createStringObject("select 0\r\n",10);
1086 shared.select1 = createStringObject("select 1\r\n",10);
1087 shared.select2 = createStringObject("select 2\r\n",10);
1088 shared.select3 = createStringObject("select 3\r\n",10);
1089 shared.select4 = createStringObject("select 4\r\n",10);
1090 shared.select5 = createStringObject("select 5\r\n",10);
1091 shared.select6 = createStringObject("select 6\r\n",10);
1092 shared.select7 = createStringObject("select 7\r\n",10);
1093 shared.select8 = createStringObject("select 8\r\n",10);
1094 shared.select9 = createStringObject("select 9\r\n",10);
1095}
1096
1097static void appendServerSaveParams(time_t seconds, int changes) {
1098 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
ed9b544e 1099 server.saveparams[server.saveparamslen].seconds = seconds;
1100 server.saveparams[server.saveparamslen].changes = changes;
1101 server.saveparamslen++;
1102}
1103
bcfc686d 1104static void resetServerSaveParams() {
ed9b544e 1105 zfree(server.saveparams);
1106 server.saveparams = NULL;
1107 server.saveparamslen = 0;
1108}
1109
1110static void initServerConfig() {
1111 server.dbnum = REDIS_DEFAULT_DBNUM;
1112 server.port = REDIS_SERVERPORT;
1113 server.verbosity = REDIS_DEBUG;
1114 server.maxidletime = REDIS_MAXIDLETIME;
1115 server.saveparams = NULL;
1116 server.logfile = NULL; /* NULL = log on standard output */
1117 server.bindaddr = NULL;
1118 server.glueoutputbuf = 1;
1119 server.daemonize = 0;
44b38ef4 1120 server.appendonly = 0;
4e141d5a 1121 server.appendfsync = APPENDFSYNC_ALWAYS;
48f0308a 1122 server.lastfsync = time(NULL);
44b38ef4 1123 server.appendfd = -1;
1124 server.appendseldb = -1; /* Make sure the first time will not match */
ed329fcf 1125 server.pidfile = "/var/run/redis.pid";
ed9b544e 1126 server.dbfilename = "dump.rdb";
9d65a1bb 1127 server.appendfilename = "appendonly.aof";
abcb223e 1128 server.requirepass = NULL;
10c43610 1129 server.shareobjects = 0;
21aecf4b 1130 server.sharingpoolsize = 1024;
285add55 1131 server.maxclients = 0;
3fd78bcd 1132 server.maxmemory = 0;
bcfc686d 1133 resetServerSaveParams();
ed9b544e 1134
1135 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1136 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1137 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1138 /* Replication related */
1139 server.isslave = 0;
d0ccebcf 1140 server.masterauth = NULL;
ed9b544e 1141 server.masterhost = NULL;
1142 server.masterport = 6379;
1143 server.master = NULL;
1144 server.replstate = REDIS_REPL_NONE;
a7866db6 1145
1146 /* Double constants initialization */
1147 R_Zero = 0.0;
1148 R_PosInf = 1.0/R_Zero;
1149 R_NegInf = -1.0/R_Zero;
1150 R_Nan = R_Zero/R_Zero;
ed9b544e 1151}
1152
1153static void initServer() {
1154 int j;
1155
1156 signal(SIGHUP, SIG_IGN);
1157 signal(SIGPIPE, SIG_IGN);
fe3bbfbe 1158 setupSigSegvAction();
ed9b544e 1159
1160 server.clients = listCreate();
1161 server.slaves = listCreate();
87eca727 1162 server.monitors = listCreate();
ed9b544e 1163 server.objfreelist = listCreate();
1164 createSharedObjects();
1165 server.el = aeCreateEventLoop();
3305306f 1166 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
10c43610 1167 server.sharingpool = dictCreate(&setDictType,NULL);
ed9b544e 1168 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
1169 if (server.fd == -1) {
1170 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
1171 exit(1);
1172 }
3305306f 1173 for (j = 0; j < server.dbnum; j++) {
1174 server.db[j].dict = dictCreate(&hashDictType,NULL);
1175 server.db[j].expires = dictCreate(&setDictType,NULL);
1176 server.db[j].id = j;
1177 }
ed9b544e 1178 server.cronloops = 0;
9f3c422c 1179 server.bgsavechildpid = -1;
9d65a1bb 1180 server.bgrewritechildpid = -1;
1181 server.bgrewritebuf = sdsempty();
ed9b544e 1182 server.lastsave = time(NULL);
1183 server.dirty = 0;
1184 server.usedmemory = 0;
1185 server.stat_numcommands = 0;
1186 server.stat_numconnections = 0;
1187 server.stat_starttime = time(NULL);
d8f8b666 1188 aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
44b38ef4 1189
1190 if (server.appendonly) {
71eba477 1191 server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,0644);
44b38ef4 1192 if (server.appendfd == -1) {
1193 redisLog(REDIS_WARNING, "Can't open the append-only file: %s",
1194 strerror(errno));
1195 exit(1);
1196 }
1197 }
ed9b544e 1198}
1199
1200/* Empty the whole database */
ca37e9cd 1201static long long emptyDb() {
ed9b544e 1202 int j;
ca37e9cd 1203 long long removed = 0;
ed9b544e 1204
3305306f 1205 for (j = 0; j < server.dbnum; j++) {
ca37e9cd 1206 removed += dictSize(server.db[j].dict);
3305306f 1207 dictEmpty(server.db[j].dict);
1208 dictEmpty(server.db[j].expires);
1209 }
ca37e9cd 1210 return removed;
ed9b544e 1211}
1212
85dd2f3a 1213static int yesnotoi(char *s) {
1214 if (!strcasecmp(s,"yes")) return 1;
1215 else if (!strcasecmp(s,"no")) return 0;
1216 else return -1;
1217}
1218
ed9b544e 1219/* I agree, this is a very rudimental way to load a configuration...
1220 will improve later if the config gets more complex */
1221static void loadServerConfig(char *filename) {
c9a111ac 1222 FILE *fp;
ed9b544e 1223 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
1224 int linenum = 0;
1225 sds line = NULL;
c9a111ac 1226
1227 if (filename[0] == '-' && filename[1] == '\0')
1228 fp = stdin;
1229 else {
1230 if ((fp = fopen(filename,"r")) == NULL) {
1231 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
1232 exit(1);
1233 }
ed9b544e 1234 }
c9a111ac 1235
ed9b544e 1236 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
1237 sds *argv;
1238 int argc, j;
1239
1240 linenum++;
1241 line = sdsnew(buf);
1242 line = sdstrim(line," \t\r\n");
1243
1244 /* Skip comments and blank lines*/
1245 if (line[0] == '#' || line[0] == '\0') {
1246 sdsfree(line);
1247 continue;
1248 }
1249
1250 /* Split into arguments */
1251 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
1252 sdstolower(argv[0]);
1253
1254 /* Execute config directives */
bb0b03a3 1255 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
ed9b544e 1256 server.maxidletime = atoi(argv[1]);
0150db36 1257 if (server.maxidletime < 0) {
ed9b544e 1258 err = "Invalid timeout value"; goto loaderr;
1259 }
bb0b03a3 1260 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
ed9b544e 1261 server.port = atoi(argv[1]);
1262 if (server.port < 1 || server.port > 65535) {
1263 err = "Invalid port"; goto loaderr;
1264 }
bb0b03a3 1265 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
ed9b544e 1266 server.bindaddr = zstrdup(argv[1]);
bb0b03a3 1267 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
ed9b544e 1268 int seconds = atoi(argv[1]);
1269 int changes = atoi(argv[2]);
1270 if (seconds < 1 || changes < 0) {
1271 err = "Invalid save parameters"; goto loaderr;
1272 }
1273 appendServerSaveParams(seconds,changes);
bb0b03a3 1274 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
ed9b544e 1275 if (chdir(argv[1]) == -1) {
1276 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1277 argv[1], strerror(errno));
1278 exit(1);
1279 }
bb0b03a3 1280 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1281 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1282 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1283 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
ed9b544e 1284 else {
1285 err = "Invalid log level. Must be one of debug, notice, warning";
1286 goto loaderr;
1287 }
bb0b03a3 1288 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
c9a111ac 1289 FILE *logfp;
ed9b544e 1290
1291 server.logfile = zstrdup(argv[1]);
bb0b03a3 1292 if (!strcasecmp(server.logfile,"stdout")) {
ed9b544e 1293 zfree(server.logfile);
1294 server.logfile = NULL;
1295 }
1296 if (server.logfile) {
1297 /* Test if we are able to open the file. The server will not
1298 * be able to abort just for this problem later... */
c9a111ac 1299 logfp = fopen(server.logfile,"a");
1300 if (logfp == NULL) {
ed9b544e 1301 err = sdscatprintf(sdsempty(),
1302 "Can't open the log file: %s", strerror(errno));
1303 goto loaderr;
1304 }
c9a111ac 1305 fclose(logfp);
ed9b544e 1306 }
bb0b03a3 1307 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
ed9b544e 1308 server.dbnum = atoi(argv[1]);
1309 if (server.dbnum < 1) {
1310 err = "Invalid number of databases"; goto loaderr;
1311 }
285add55 1312 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1313 server.maxclients = atoi(argv[1]);
3fd78bcd 1314 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
d4465900 1315 server.maxmemory = strtoll(argv[1], NULL, 10);
bb0b03a3 1316 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
ed9b544e 1317 server.masterhost = sdsnew(argv[1]);
1318 server.masterport = atoi(argv[2]);
1319 server.replstate = REDIS_REPL_CONNECT;
d0ccebcf 1320 } else if (!strcasecmp(argv[0],"masterauth") && argc == 2) {
1321 server.masterauth = zstrdup(argv[1]);
bb0b03a3 1322 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
85dd2f3a 1323 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
ed9b544e 1324 err = "argument must be 'yes' or 'no'"; goto loaderr;
1325 }
bb0b03a3 1326 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
85dd2f3a 1327 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
10c43610 1328 err = "argument must be 'yes' or 'no'"; goto loaderr;
1329 }
e52c65b9 1330 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1331 server.sharingpoolsize = atoi(argv[1]);
1332 if (server.sharingpoolsize < 1) {
1333 err = "invalid object sharing pool size"; goto loaderr;
1334 }
bb0b03a3 1335 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
85dd2f3a 1336 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
ed9b544e 1337 err = "argument must be 'yes' or 'no'"; goto loaderr;
1338 }
44b38ef4 1339 } else if (!strcasecmp(argv[0],"appendonly") && argc == 2) {
1340 if ((server.appendonly = yesnotoi(argv[1])) == -1) {
1341 err = "argument must be 'yes' or 'no'"; goto loaderr;
1342 }
48f0308a 1343 } else if (!strcasecmp(argv[0],"appendfsync") && argc == 2) {
1766c6da 1344 if (!strcasecmp(argv[1],"no")) {
48f0308a 1345 server.appendfsync = APPENDFSYNC_NO;
1766c6da 1346 } else if (!strcasecmp(argv[1],"always")) {
48f0308a 1347 server.appendfsync = APPENDFSYNC_ALWAYS;
1766c6da 1348 } else if (!strcasecmp(argv[1],"everysec")) {
48f0308a 1349 server.appendfsync = APPENDFSYNC_EVERYSEC;
1350 } else {
1351 err = "argument must be 'no', 'always' or 'everysec'";
1352 goto loaderr;
1353 }
bb0b03a3 1354 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
abcb223e 1355 server.requirepass = zstrdup(argv[1]);
bb0b03a3 1356 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
ed329fcf 1357 server.pidfile = zstrdup(argv[1]);
bb0b03a3 1358 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
b8b553c8 1359 server.dbfilename = zstrdup(argv[1]);
ed9b544e 1360 } else {
1361 err = "Bad directive or wrong number of arguments"; goto loaderr;
1362 }
1363 for (j = 0; j < argc; j++)
1364 sdsfree(argv[j]);
1365 zfree(argv);
1366 sdsfree(line);
1367 }
c9a111ac 1368 if (fp != stdin) fclose(fp);
ed9b544e 1369 return;
1370
1371loaderr:
1372 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1373 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1374 fprintf(stderr, ">>> '%s'\n", line);
1375 fprintf(stderr, "%s\n", err);
1376 exit(1);
1377}
1378
1379static void freeClientArgv(redisClient *c) {
1380 int j;
1381
1382 for (j = 0; j < c->argc; j++)
1383 decrRefCount(c->argv[j]);
e8a74421 1384 for (j = 0; j < c->mbargc; j++)
1385 decrRefCount(c->mbargv[j]);
ed9b544e 1386 c->argc = 0;
e8a74421 1387 c->mbargc = 0;
ed9b544e 1388}
1389
1390static void freeClient(redisClient *c) {
1391 listNode *ln;
1392
1393 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1394 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1395 sdsfree(c->querybuf);
1396 listRelease(c->reply);
1397 freeClientArgv(c);
1398 close(c->fd);
1399 ln = listSearchKey(server.clients,c);
1400 assert(ln != NULL);
1401 listDelNode(server.clients,ln);
1402 if (c->flags & REDIS_SLAVE) {
6208b3a7 1403 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1404 close(c->repldbfd);
87eca727 1405 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1406 ln = listSearchKey(l,c);
ed9b544e 1407 assert(ln != NULL);
87eca727 1408 listDelNode(l,ln);
ed9b544e 1409 }
1410 if (c->flags & REDIS_MASTER) {
1411 server.master = NULL;
1412 server.replstate = REDIS_REPL_CONNECT;
1413 }
93ea3759 1414 zfree(c->argv);
e8a74421 1415 zfree(c->mbargv);
ed9b544e 1416 zfree(c);
1417}
1418
cc30e368 1419#define GLUEREPLY_UP_TO (1024)
ed9b544e 1420static void glueReplyBuffersIfNeeded(redisClient *c) {
c28b42ac 1421 int copylen = 0;
1422 char buf[GLUEREPLY_UP_TO];
6208b3a7 1423 listNode *ln;
ed9b544e 1424 robj *o;
1425
6208b3a7 1426 listRewind(c->reply);
1427 while((ln = listYield(c->reply))) {
c28b42ac 1428 int objlen;
1429
ed9b544e 1430 o = ln->value;
c28b42ac 1431 objlen = sdslen(o->ptr);
1432 if (copylen + objlen <= GLUEREPLY_UP_TO) {
1433 memcpy(buf+copylen,o->ptr,objlen);
1434 copylen += objlen;
ed9b544e 1435 listDelNode(c->reply,ln);
c28b42ac 1436 } else {
1437 if (copylen == 0) return;
1438 break;
ed9b544e 1439 }
ed9b544e 1440 }
c28b42ac 1441 /* Now the output buffer is empty, add the new single element */
1442 o = createObject(REDIS_STRING,sdsnewlen(buf,copylen));
1443 listAddNodeHead(c->reply,o);
ed9b544e 1444}
1445
1446static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1447 redisClient *c = privdata;
1448 int nwritten = 0, totwritten = 0, objlen;
1449 robj *o;
1450 REDIS_NOTUSED(el);
1451 REDIS_NOTUSED(mask);
1452
2895e862 1453 /* Use writev() if we have enough buffers to send */
7ea870c0 1454 if (!server.glueoutputbuf &&
1455 listLength(c->reply) > REDIS_WRITEV_THRESHOLD &&
1456 !(c->flags & REDIS_MASTER))
2895e862 1457 {
1458 sendReplyToClientWritev(el, fd, privdata, mask);
1459 return;
1460 }
2895e862 1461
ed9b544e 1462 while(listLength(c->reply)) {
c28b42ac 1463 if (server.glueoutputbuf && listLength(c->reply) > 1)
1464 glueReplyBuffersIfNeeded(c);
1465
ed9b544e 1466 o = listNodeValue(listFirst(c->reply));
1467 objlen = sdslen(o->ptr);
1468
1469 if (objlen == 0) {
1470 listDelNode(c->reply,listFirst(c->reply));
1471 continue;
1472 }
1473
1474 if (c->flags & REDIS_MASTER) {
6f376729 1475 /* Don't reply to a master */
ed9b544e 1476 nwritten = objlen - c->sentlen;
1477 } else {
a4d1ba9a 1478 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
ed9b544e 1479 if (nwritten <= 0) break;
1480 }
1481 c->sentlen += nwritten;
1482 totwritten += nwritten;
1483 /* If we fully sent the object on head go to the next one */
1484 if (c->sentlen == objlen) {
1485 listDelNode(c->reply,listFirst(c->reply));
1486 c->sentlen = 0;
1487 }
6f376729 1488 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
12f9d551 1489 * bytes, in a single threaded server it's a good idea to serve
6f376729 1490 * other clients as well, even if a very large request comes from
1491 * super fast link that is always able to accept data (in real world
12f9d551 1492 * scenario think about 'KEYS *' against the loopback interfae) */
6f376729 1493 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
ed9b544e 1494 }
1495 if (nwritten == -1) {
1496 if (errno == EAGAIN) {
1497 nwritten = 0;
1498 } else {
1499 redisLog(REDIS_DEBUG,
1500 "Error writing to client: %s", strerror(errno));
1501 freeClient(c);
1502 return;
1503 }
1504 }
1505 if (totwritten > 0) c->lastinteraction = time(NULL);
1506 if (listLength(c->reply) == 0) {
1507 c->sentlen = 0;
1508 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1509 }
1510}
1511
2895e862 1512static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask)
1513{
1514 redisClient *c = privdata;
1515 int nwritten = 0, totwritten = 0, objlen, willwrite;
1516 robj *o;
1517 struct iovec iov[REDIS_WRITEV_IOVEC_COUNT];
1518 int offset, ion = 0;
1519 REDIS_NOTUSED(el);
1520 REDIS_NOTUSED(mask);
1521
1522 listNode *node;
1523 while (listLength(c->reply)) {
1524 offset = c->sentlen;
1525 ion = 0;
1526 willwrite = 0;
1527
1528 /* fill-in the iov[] array */
1529 for(node = listFirst(c->reply); node; node = listNextNode(node)) {
1530 o = listNodeValue(node);
1531 objlen = sdslen(o->ptr);
1532
1533 if (totwritten + objlen - offset > REDIS_MAX_WRITE_PER_EVENT)
1534 break;
1535
1536 if(ion == REDIS_WRITEV_IOVEC_COUNT)
1537 break; /* no more iovecs */
1538
1539 iov[ion].iov_base = ((char*)o->ptr) + offset;
1540 iov[ion].iov_len = objlen - offset;
1541 willwrite += objlen - offset;
1542 offset = 0; /* just for the first item */
1543 ion++;
1544 }
1545
1546 if(willwrite == 0)
1547 break;
1548
1549 /* write all collected blocks at once */
1550 if((nwritten = writev(fd, iov, ion)) < 0) {
1551 if (errno != EAGAIN) {
1552 redisLog(REDIS_DEBUG,
1553 "Error writing to client: %s", strerror(errno));
1554 freeClient(c);
1555 return;
1556 }
1557 break;
1558 }
1559
1560 totwritten += nwritten;
1561 offset = c->sentlen;
1562
1563 /* remove written robjs from c->reply */
1564 while (nwritten && listLength(c->reply)) {
1565 o = listNodeValue(listFirst(c->reply));
1566 objlen = sdslen(o->ptr);
1567
1568 if(nwritten >= objlen - offset) {
1569 listDelNode(c->reply, listFirst(c->reply));
1570 nwritten -= objlen - offset;
1571 c->sentlen = 0;
1572 } else {
1573 /* partial write */
1574 c->sentlen += nwritten;
1575 break;
1576 }
1577 offset = 0;
1578 }
1579 }
1580
1581 if (totwritten > 0)
1582 c->lastinteraction = time(NULL);
1583
1584 if (listLength(c->reply) == 0) {
1585 c->sentlen = 0;
1586 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1587 }
1588}
1589
ed9b544e 1590static struct redisCommand *lookupCommand(char *name) {
1591 int j = 0;
1592 while(cmdTable[j].name != NULL) {
bb0b03a3 1593 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
ed9b544e 1594 j++;
1595 }
1596 return NULL;
1597}
1598
1599/* resetClient prepare the client to process the next command */
1600static void resetClient(redisClient *c) {
1601 freeClientArgv(c);
1602 c->bulklen = -1;
e8a74421 1603 c->multibulk = 0;
ed9b544e 1604}
1605
1606/* If this function gets called we already read a whole
1607 * command, argments are in the client argv/argc fields.
1608 * processCommand() execute the command or prepare the
1609 * server for a bulk read from the client.
1610 *
1611 * If 1 is returned the client is still alive and valid and
1612 * and other operations can be performed by the caller. Otherwise
1613 * if 0 is returned the client was destroied (i.e. after QUIT). */
1614static int processCommand(redisClient *c) {
1615 struct redisCommand *cmd;
1616 long long dirty;
1617
3fd78bcd 1618 /* Free some memory if needed (maxmemory setting) */
1619 if (server.maxmemory) freeMemoryIfNeeded();
1620
e8a74421 1621 /* Handle the multi bulk command type. This is an alternative protocol
1622 * supported by Redis in order to receive commands that are composed of
1623 * multiple binary-safe "bulk" arguments. The latency of processing is
1624 * a bit higher but this allows things like multi-sets, so if this
1625 * protocol is used only for MSET and similar commands this is a big win. */
1626 if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') {
1627 c->multibulk = atoi(((char*)c->argv[0]->ptr)+1);
1628 if (c->multibulk <= 0) {
1629 resetClient(c);
1630 return 1;
1631 } else {
1632 decrRefCount(c->argv[c->argc-1]);
1633 c->argc--;
1634 return 1;
1635 }
1636 } else if (c->multibulk) {
1637 if (c->bulklen == -1) {
1638 if (((char*)c->argv[0]->ptr)[0] != '$') {
1639 addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
1640 resetClient(c);
1641 return 1;
1642 } else {
1643 int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
1644 decrRefCount(c->argv[0]);
1645 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1646 c->argc--;
1647 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1648 resetClient(c);
1649 return 1;
1650 }
1651 c->argc--;
1652 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1653 return 1;
1654 }
1655 } else {
1656 c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1));
1657 c->mbargv[c->mbargc] = c->argv[0];
1658 c->mbargc++;
1659 c->argc--;
1660 c->multibulk--;
1661 if (c->multibulk == 0) {
1662 robj **auxargv;
1663 int auxargc;
1664
1665 /* Here we need to swap the multi-bulk argc/argv with the
1666 * normal argc/argv of the client structure. */
1667 auxargv = c->argv;
1668 c->argv = c->mbargv;
1669 c->mbargv = auxargv;
1670
1671 auxargc = c->argc;
1672 c->argc = c->mbargc;
1673 c->mbargc = auxargc;
1674
1675 /* We need to set bulklen to something different than -1
1676 * in order for the code below to process the command without
1677 * to try to read the last argument of a bulk command as
1678 * a special argument. */
1679 c->bulklen = 0;
1680 /* continue below and process the command */
1681 } else {
1682 c->bulklen = -1;
1683 return 1;
1684 }
1685 }
1686 }
1687 /* -- end of multi bulk commands processing -- */
1688
ed9b544e 1689 /* The QUIT command is handled as a special case. Normal command
1690 * procs are unable to close the client connection safely */
bb0b03a3 1691 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
ed9b544e 1692 freeClient(c);
1693 return 0;
1694 }
1695 cmd = lookupCommand(c->argv[0]->ptr);
1696 if (!cmd) {
1697 addReplySds(c,sdsnew("-ERR unknown command\r\n"));
1698 resetClient(c);
1699 return 1;
1700 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1701 (c->argc < -cmd->arity)) {
1702 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
1703 resetClient(c);
1704 return 1;
3fd78bcd 1705 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1706 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1707 resetClient(c);
1708 return 1;
ed9b544e 1709 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1710 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1711
1712 decrRefCount(c->argv[c->argc-1]);
1713 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1714 c->argc--;
1715 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1716 resetClient(c);
1717 return 1;
1718 }
1719 c->argc--;
1720 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1721 /* It is possible that the bulk read is already in the
8d0490e7 1722 * buffer. Check this condition and handle it accordingly.
1723 * This is just a fast path, alternative to call processInputBuffer().
1724 * It's a good idea since the code is small and this condition
1725 * happens most of the times. */
ed9b544e 1726 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1727 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1728 c->argc++;
1729 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1730 } else {
1731 return 1;
1732 }
1733 }
10c43610 1734 /* Let's try to share objects on the command arguments vector */
1735 if (server.shareobjects) {
1736 int j;
1737 for(j = 1; j < c->argc; j++)
1738 c->argv[j] = tryObjectSharing(c->argv[j]);
1739 }
942a3961 1740 /* Let's try to encode the bulk object to save space. */
1741 if (cmd->flags & REDIS_CMD_BULK)
1742 tryObjectEncoding(c->argv[c->argc-1]);
1743
e63943a4 1744 /* Check if the user is authenticated */
1745 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1746 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1747 resetClient(c);
1748 return 1;
1749 }
1750
ed9b544e 1751 /* Exec the command */
1752 dirty = server.dirty;
1753 cmd->proc(c);
33ed1a42 1754 if (server.appendonly && server.dirty-dirty)
44b38ef4 1755 feedAppendOnlyFile(cmd,c->db->id,c->argv,c->argc);
33ed1a42 1756 if (server.dirty-dirty && listLength(server.slaves))
3305306f 1757 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
87eca727 1758 if (listLength(server.monitors))
3305306f 1759 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
ed9b544e 1760 server.stat_numcommands++;
1761
1762 /* Prepare the client for the next command */
1763 if (c->flags & REDIS_CLOSE) {
1764 freeClient(c);
1765 return 0;
1766 }
1767 resetClient(c);
1768 return 1;
1769}
1770
87eca727 1771static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
6208b3a7 1772 listNode *ln;
ed9b544e 1773 int outc = 0, j;
93ea3759 1774 robj **outv;
1775 /* (args*2)+1 is enough room for args, spaces, newlines */
1776 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1777
1778 if (argc <= REDIS_STATIC_ARGS) {
1779 outv = static_outv;
1780 } else {
1781 outv = zmalloc(sizeof(robj*)*(argc*2+1));
93ea3759 1782 }
ed9b544e 1783
1784 for (j = 0; j < argc; j++) {
1785 if (j != 0) outv[outc++] = shared.space;
1786 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1787 robj *lenobj;
1788
1789 lenobj = createObject(REDIS_STRING,
0ea663ea 1790 sdscatprintf(sdsempty(),"%d\r\n",
1791 stringObjectLen(argv[j])));
ed9b544e 1792 lenobj->refcount = 0;
1793 outv[outc++] = lenobj;
1794 }
1795 outv[outc++] = argv[j];
1796 }
1797 outv[outc++] = shared.crlf;
1798
40d224a9 1799 /* Increment all the refcounts at start and decrement at end in order to
1800 * be sure to free objects if there is no slave in a replication state
1801 * able to be feed with commands */
1802 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
6208b3a7 1803 listRewind(slaves);
1804 while((ln = listYield(slaves))) {
ed9b544e 1805 redisClient *slave = ln->value;
40d224a9 1806
1807 /* Don't feed slaves that are still waiting for BGSAVE to start */
6208b3a7 1808 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
40d224a9 1809
1810 /* Feed all the other slaves, MONITORs and so on */
ed9b544e 1811 if (slave->slaveseldb != dictid) {
1812 robj *selectcmd;
1813
1814 switch(dictid) {
1815 case 0: selectcmd = shared.select0; break;
1816 case 1: selectcmd = shared.select1; break;
1817 case 2: selectcmd = shared.select2; break;
1818 case 3: selectcmd = shared.select3; break;
1819 case 4: selectcmd = shared.select4; break;
1820 case 5: selectcmd = shared.select5; break;
1821 case 6: selectcmd = shared.select6; break;
1822 case 7: selectcmd = shared.select7; break;
1823 case 8: selectcmd = shared.select8; break;
1824 case 9: selectcmd = shared.select9; break;
1825 default:
1826 selectcmd = createObject(REDIS_STRING,
1827 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1828 selectcmd->refcount = 0;
1829 break;
1830 }
1831 addReply(slave,selectcmd);
1832 slave->slaveseldb = dictid;
1833 }
1834 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
ed9b544e 1835 }
40d224a9 1836 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
93ea3759 1837 if (outv != static_outv) zfree(outv);
ed9b544e 1838}
1839
638e42ac 1840static void processInputBuffer(redisClient *c) {
ed9b544e 1841again:
1842 if (c->bulklen == -1) {
1843 /* Read the first line of the query */
1844 char *p = strchr(c->querybuf,'\n');
1845 size_t querylen;
644fafa3 1846
ed9b544e 1847 if (p) {
1848 sds query, *argv;
1849 int argc, j;
1850
1851 query = c->querybuf;
1852 c->querybuf = sdsempty();
1853 querylen = 1+(p-(query));
1854 if (sdslen(query) > querylen) {
1855 /* leave data after the first line of the query in the buffer */
1856 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1857 }
1858 *p = '\0'; /* remove "\n" */
1859 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1860 sdsupdatelen(query);
1861
1862 /* Now we can split the query in arguments */
1863 if (sdslen(query) == 0) {
1864 /* Ignore empty query */
1865 sdsfree(query);
1866 return;
1867 }
1868 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
93ea3759 1869 sdsfree(query);
1870
1871 if (c->argv) zfree(c->argv);
1872 c->argv = zmalloc(sizeof(robj*)*argc);
93ea3759 1873
1874 for (j = 0; j < argc; j++) {
ed9b544e 1875 if (sdslen(argv[j])) {
1876 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1877 c->argc++;
1878 } else {
1879 sdsfree(argv[j]);
1880 }
1881 }
1882 zfree(argv);
1883 /* Execute the command. If the client is still valid
1884 * after processCommand() return and there is something
1885 * on the query buffer try to process the next command. */
af807d87 1886 if (c->argc && processCommand(c) && sdslen(c->querybuf)) goto again;
ed9b544e 1887 return;
644fafa3 1888 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
ed9b544e 1889 redisLog(REDIS_DEBUG, "Client protocol error");
1890 freeClient(c);
1891 return;
1892 }
1893 } else {
1894 /* Bulk read handling. Note that if we are at this point
1895 the client already sent a command terminated with a newline,
1896 we are reading the bulk data that is actually the last
1897 argument of the command. */
1898 int qbl = sdslen(c->querybuf);
1899
1900 if (c->bulklen <= qbl) {
1901 /* Copy everything but the final CRLF as final argument */
1902 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1903 c->argc++;
1904 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
638e42ac 1905 /* Process the command. If the client is still valid after
1906 * the processing and there is more data in the buffer
1907 * try to parse it. */
1908 if (processCommand(c) && sdslen(c->querybuf)) goto again;
ed9b544e 1909 return;
1910 }
1911 }
1912}
1913
638e42ac 1914static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1915 redisClient *c = (redisClient*) privdata;
1916 char buf[REDIS_IOBUF_LEN];
1917 int nread;
1918 REDIS_NOTUSED(el);
1919 REDIS_NOTUSED(mask);
1920
1921 nread = read(fd, buf, REDIS_IOBUF_LEN);
1922 if (nread == -1) {
1923 if (errno == EAGAIN) {
1924 nread = 0;
1925 } else {
1926 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
1927 freeClient(c);
1928 return;
1929 }
1930 } else if (nread == 0) {
1931 redisLog(REDIS_DEBUG, "Client closed connection");
1932 freeClient(c);
1933 return;
1934 }
1935 if (nread) {
1936 c->querybuf = sdscatlen(c->querybuf, buf, nread);
1937 c->lastinteraction = time(NULL);
1938 } else {
1939 return;
1940 }
1941 processInputBuffer(c);
1942}
1943
ed9b544e 1944static int selectDb(redisClient *c, int id) {
1945 if (id < 0 || id >= server.dbnum)
1946 return REDIS_ERR;
3305306f 1947 c->db = &server.db[id];
ed9b544e 1948 return REDIS_OK;
1949}
1950
40d224a9 1951static void *dupClientReplyValue(void *o) {
1952 incrRefCount((robj*)o);
1953 return 0;
1954}
1955
ed9b544e 1956static redisClient *createClient(int fd) {
1957 redisClient *c = zmalloc(sizeof(*c));
1958
1959 anetNonBlock(NULL,fd);
1960 anetTcpNoDelay(NULL,fd);
1961 if (!c) return NULL;
1962 selectDb(c,0);
1963 c->fd = fd;
1964 c->querybuf = sdsempty();
1965 c->argc = 0;
93ea3759 1966 c->argv = NULL;
ed9b544e 1967 c->bulklen = -1;
e8a74421 1968 c->multibulk = 0;
1969 c->mbargc = 0;
1970 c->mbargv = NULL;
ed9b544e 1971 c->sentlen = 0;
1972 c->flags = 0;
1973 c->lastinteraction = time(NULL);
abcb223e 1974 c->authenticated = 0;
40d224a9 1975 c->replstate = REDIS_REPL_NONE;
6b47e12e 1976 c->reply = listCreate();
ed9b544e 1977 listSetFreeMethod(c->reply,decrRefCount);
40d224a9 1978 listSetDupMethod(c->reply,dupClientReplyValue);
ed9b544e 1979 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
266373b2 1980 readQueryFromClient, c) == AE_ERR) {
ed9b544e 1981 freeClient(c);
1982 return NULL;
1983 }
6b47e12e 1984 listAddNodeTail(server.clients,c);
ed9b544e 1985 return c;
1986}
1987
1988static void addReply(redisClient *c, robj *obj) {
1989 if (listLength(c->reply) == 0 &&
6208b3a7 1990 (c->replstate == REDIS_REPL_NONE ||
1991 c->replstate == REDIS_REPL_ONLINE) &&
ed9b544e 1992 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
266373b2 1993 sendReplyToClient, c) == AE_ERR) return;
9d65a1bb 1994 listAddNodeTail(c->reply,getDecodedObject(obj));
ed9b544e 1995}
1996
1997static void addReplySds(redisClient *c, sds s) {
1998 robj *o = createObject(REDIS_STRING,s);
1999 addReply(c,o);
2000 decrRefCount(o);
2001}
2002
e2665397 2003static void addReplyDouble(redisClient *c, double d) {
2004 char buf[128];
2005
2006 snprintf(buf,sizeof(buf),"%.17g",d);
2007 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n%s\r\n",
2008 strlen(buf),buf));
2009}
2010
942a3961 2011static void addReplyBulkLen(redisClient *c, robj *obj) {
2012 size_t len;
2013
2014 if (obj->encoding == REDIS_ENCODING_RAW) {
2015 len = sdslen(obj->ptr);
2016 } else {
2017 long n = (long)obj->ptr;
2018
2019 len = 1;
2020 if (n < 0) {
2021 len++;
2022 n = -n;
2023 }
2024 while((n = n/10) != 0) {
2025 len++;
2026 }
2027 }
2028 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",len));
2029}
2030
ed9b544e 2031static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
2032 int cport, cfd;
2033 char cip[128];
285add55 2034 redisClient *c;
ed9b544e 2035 REDIS_NOTUSED(el);
2036 REDIS_NOTUSED(mask);
2037 REDIS_NOTUSED(privdata);
2038
2039 cfd = anetAccept(server.neterr, fd, cip, &cport);
2040 if (cfd == AE_ERR) {
2041 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
2042 return;
2043 }
2044 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
285add55 2045 if ((c = createClient(cfd)) == NULL) {
ed9b544e 2046 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
2047 close(cfd); /* May be already closed, just ingore errors */
2048 return;
2049 }
285add55 2050 /* If maxclient directive is set and this is one client more... close the
2051 * connection. Note that we create the client instead to check before
2052 * for this condition, since now the socket is already set in nonblocking
2053 * mode and we can send an error for free using the Kernel I/O */
2054 if (server.maxclients && listLength(server.clients) > server.maxclients) {
2055 char *err = "-ERR max number of clients reached\r\n";
2056
2057 /* That's a best effort error message, don't check write errors */
fee803ba 2058 if (write(c->fd,err,strlen(err)) == -1) {
2059 /* Nothing to do, Just to avoid the warning... */
2060 }
285add55 2061 freeClient(c);
2062 return;
2063 }
ed9b544e 2064 server.stat_numconnections++;
2065}
2066
2067/* ======================= Redis objects implementation ===================== */
2068
2069static robj *createObject(int type, void *ptr) {
2070 robj *o;
2071
2072 if (listLength(server.objfreelist)) {
2073 listNode *head = listFirst(server.objfreelist);
2074 o = listNodeValue(head);
2075 listDelNode(server.objfreelist,head);
2076 } else {
2077 o = zmalloc(sizeof(*o));
2078 }
ed9b544e 2079 o->type = type;
942a3961 2080 o->encoding = REDIS_ENCODING_RAW;
ed9b544e 2081 o->ptr = ptr;
2082 o->refcount = 1;
2083 return o;
2084}
2085
2086static robj *createStringObject(char *ptr, size_t len) {
2087 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
2088}
2089
2090static robj *createListObject(void) {
2091 list *l = listCreate();
2092
ed9b544e 2093 listSetFreeMethod(l,decrRefCount);
2094 return createObject(REDIS_LIST,l);
2095}
2096
2097static robj *createSetObject(void) {
2098 dict *d = dictCreate(&setDictType,NULL);
ed9b544e 2099 return createObject(REDIS_SET,d);
2100}
2101
1812e024 2102static robj *createZsetObject(void) {
6b47e12e 2103 zset *zs = zmalloc(sizeof(*zs));
2104
2105 zs->dict = dictCreate(&zsetDictType,NULL);
2106 zs->zsl = zslCreate();
2107 return createObject(REDIS_ZSET,zs);
1812e024 2108}
2109
ed9b544e 2110static void freeStringObject(robj *o) {
942a3961 2111 if (o->encoding == REDIS_ENCODING_RAW) {
2112 sdsfree(o->ptr);
2113 }
ed9b544e 2114}
2115
2116static void freeListObject(robj *o) {
2117 listRelease((list*) o->ptr);
2118}
2119
2120static void freeSetObject(robj *o) {
2121 dictRelease((dict*) o->ptr);
2122}
2123
fd8ccf44 2124static void freeZsetObject(robj *o) {
2125 zset *zs = o->ptr;
2126
2127 dictRelease(zs->dict);
2128 zslFree(zs->zsl);
2129 zfree(zs);
2130}
2131
ed9b544e 2132static void freeHashObject(robj *o) {
2133 dictRelease((dict*) o->ptr);
2134}
2135
2136static void incrRefCount(robj *o) {
2137 o->refcount++;
94754ccc 2138#ifdef DEBUG_REFCOUNT
2139 if (o->type == REDIS_STRING)
2140 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
2141#endif
ed9b544e 2142}
2143
2144static void decrRefCount(void *obj) {
2145 robj *o = obj;
94754ccc 2146
2147#ifdef DEBUG_REFCOUNT
2148 if (o->type == REDIS_STRING)
2149 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
2150#endif
ed9b544e 2151 if (--(o->refcount) == 0) {
2152 switch(o->type) {
2153 case REDIS_STRING: freeStringObject(o); break;
2154 case REDIS_LIST: freeListObject(o); break;
2155 case REDIS_SET: freeSetObject(o); break;
fd8ccf44 2156 case REDIS_ZSET: freeZsetObject(o); break;
ed9b544e 2157 case REDIS_HASH: freeHashObject(o); break;
2158 default: assert(0 != 0); break;
2159 }
2160 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
2161 !listAddNodeHead(server.objfreelist,o))
2162 zfree(o);
2163 }
2164}
2165
942a3961 2166static robj *lookupKey(redisDb *db, robj *key) {
2167 dictEntry *de = dictFind(db->dict,key);
2168 return de ? dictGetEntryVal(de) : NULL;
2169}
2170
2171static robj *lookupKeyRead(redisDb *db, robj *key) {
2172 expireIfNeeded(db,key);
2173 return lookupKey(db,key);
2174}
2175
2176static robj *lookupKeyWrite(redisDb *db, robj *key) {
2177 deleteIfVolatile(db,key);
2178 return lookupKey(db,key);
2179}
2180
2181static int deleteKey(redisDb *db, robj *key) {
2182 int retval;
2183
2184 /* We need to protect key from destruction: after the first dictDelete()
2185 * it may happen that 'key' is no longer valid if we don't increment
2186 * it's count. This may happen when we get the object reference directly
2187 * from the hash table with dictRandomKey() or dict iterators */
2188 incrRefCount(key);
2189 if (dictSize(db->expires)) dictDelete(db->expires,key);
2190 retval = dictDelete(db->dict,key);
2191 decrRefCount(key);
2192
2193 return retval == DICT_OK;
2194}
2195
10c43610 2196/* Try to share an object against the shared objects pool */
2197static robj *tryObjectSharing(robj *o) {
2198 struct dictEntry *de;
2199 unsigned long c;
2200
3305306f 2201 if (o == NULL || server.shareobjects == 0) return o;
10c43610 2202
2203 assert(o->type == REDIS_STRING);
2204 de = dictFind(server.sharingpool,o);
2205 if (de) {
2206 robj *shared = dictGetEntryKey(de);
2207
2208 c = ((unsigned long) dictGetEntryVal(de))+1;
2209 dictGetEntryVal(de) = (void*) c;
2210 incrRefCount(shared);
2211 decrRefCount(o);
2212 return shared;
2213 } else {
2214 /* Here we are using a stream algorihtm: Every time an object is
2215 * shared we increment its count, everytime there is a miss we
2216 * recrement the counter of a random object. If this object reaches
2217 * zero we remove the object and put the current object instead. */
3305306f 2218 if (dictSize(server.sharingpool) >=
10c43610 2219 server.sharingpoolsize) {
2220 de = dictGetRandomKey(server.sharingpool);
2221 assert(de != NULL);
2222 c = ((unsigned long) dictGetEntryVal(de))-1;
2223 dictGetEntryVal(de) = (void*) c;
2224 if (c == 0) {
2225 dictDelete(server.sharingpool,de->key);
2226 }
2227 } else {
2228 c = 0; /* If the pool is empty we want to add this object */
2229 }
2230 if (c == 0) {
2231 int retval;
2232
2233 retval = dictAdd(server.sharingpool,o,(void*)1);
2234 assert(retval == DICT_OK);
2235 incrRefCount(o);
2236 }
2237 return o;
2238 }
2239}
2240
724a51b1 2241/* Check if the nul-terminated string 's' can be represented by a long
2242 * (that is, is a number that fits into long without any other space or
2243 * character before or after the digits).
2244 *
2245 * If so, the function returns REDIS_OK and *longval is set to the value
2246 * of the number. Otherwise REDIS_ERR is returned */
f69f2cba 2247static int isStringRepresentableAsLong(sds s, long *longval) {
724a51b1 2248 char buf[32], *endptr;
2249 long value;
2250 int slen;
2251
2252 value = strtol(s, &endptr, 10);
2253 if (endptr[0] != '\0') return REDIS_ERR;
2254 slen = snprintf(buf,32,"%ld",value);
2255
2256 /* If the number converted back into a string is not identical
2257 * then it's not possible to encode the string as integer */
f69f2cba 2258 if (sdslen(s) != (unsigned)slen || memcmp(buf,s,slen)) return REDIS_ERR;
724a51b1 2259 if (longval) *longval = value;
2260 return REDIS_OK;
2261}
2262
942a3961 2263/* Try to encode a string object in order to save space */
2264static int tryObjectEncoding(robj *o) {
2265 long value;
942a3961 2266 sds s = o->ptr;
3305306f 2267
942a3961 2268 if (o->encoding != REDIS_ENCODING_RAW)
2269 return REDIS_ERR; /* Already encoded */
3305306f 2270
942a3961 2271 /* It's not save to encode shared objects: shared objects can be shared
2272 * everywhere in the "object space" of Redis. Encoded objects can only
2273 * appear as "values" (and not, for instance, as keys) */
2274 if (o->refcount > 1) return REDIS_ERR;
3305306f 2275
942a3961 2276 /* Currently we try to encode only strings */
2277 assert(o->type == REDIS_STRING);
94754ccc 2278
724a51b1 2279 /* Check if we can represent this string as a long integer */
2280 if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return REDIS_ERR;
942a3961 2281
2282 /* Ok, this object can be encoded */
2283 o->encoding = REDIS_ENCODING_INT;
2284 sdsfree(o->ptr);
2285 o->ptr = (void*) value;
2286 return REDIS_OK;
2287}
2288
9d65a1bb 2289/* Get a decoded version of an encoded object (returned as a new object).
2290 * If the object is already raw-encoded just increment the ref count. */
2291static robj *getDecodedObject(robj *o) {
942a3961 2292 robj *dec;
2293
9d65a1bb 2294 if (o->encoding == REDIS_ENCODING_RAW) {
2295 incrRefCount(o);
2296 return o;
2297 }
942a3961 2298 if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) {
2299 char buf[32];
2300
2301 snprintf(buf,32,"%ld",(long)o->ptr);
2302 dec = createStringObject(buf,strlen(buf));
2303 return dec;
2304 } else {
2305 assert(1 != 1);
2306 }
3305306f 2307}
2308
d7f43c08 2309/* Compare two string objects via strcmp() or alike.
2310 * Note that the objects may be integer-encoded. In such a case we
2311 * use snprintf() to get a string representation of the numbers on the stack
2312 * and compare the strings, it's much faster than calling getDecodedObject(). */
724a51b1 2313static int compareStringObjects(robj *a, robj *b) {
2314 assert(a->type == REDIS_STRING && b->type == REDIS_STRING);
d7f43c08 2315 char bufa[128], bufb[128], *astr, *bstr;
2316 int bothsds = 1;
724a51b1 2317
e197b441 2318 if (a == b) return 0;
d7f43c08 2319 if (a->encoding != REDIS_ENCODING_RAW) {
2320 snprintf(bufa,sizeof(bufa),"%ld",(long) a->ptr);
2321 astr = bufa;
2322 bothsds = 0;
724a51b1 2323 } else {
d7f43c08 2324 astr = a->ptr;
724a51b1 2325 }
d7f43c08 2326 if (b->encoding != REDIS_ENCODING_RAW) {
2327 snprintf(bufb,sizeof(bufb),"%ld",(long) b->ptr);
2328 bstr = bufb;
2329 bothsds = 0;
2330 } else {
2331 bstr = b->ptr;
2332 }
2333 return bothsds ? sdscmp(astr,bstr) : strcmp(astr,bstr);
724a51b1 2334}
2335
0ea663ea 2336static size_t stringObjectLen(robj *o) {
2337 assert(o->type == REDIS_STRING);
2338 if (o->encoding == REDIS_ENCODING_RAW) {
2339 return sdslen(o->ptr);
2340 } else {
2341 char buf[32];
2342
2343 return snprintf(buf,32,"%ld",(long)o->ptr);
2344 }
2345}
2346
ed9b544e 2347/*============================ DB saving/loading ============================ */
2348
f78fd11b 2349static int rdbSaveType(FILE *fp, unsigned char type) {
2350 if (fwrite(&type,1,1,fp) == 0) return -1;
2351 return 0;
2352}
2353
bb32ede5 2354static int rdbSaveTime(FILE *fp, time_t t) {
2355 int32_t t32 = (int32_t) t;
2356 if (fwrite(&t32,4,1,fp) == 0) return -1;
2357 return 0;
2358}
2359
e3566d4b 2360/* check rdbLoadLen() comments for more info */
f78fd11b 2361static int rdbSaveLen(FILE *fp, uint32_t len) {
2362 unsigned char buf[2];
2363
2364 if (len < (1<<6)) {
2365 /* Save a 6 bit len */
10c43610 2366 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
f78fd11b 2367 if (fwrite(buf,1,1,fp) == 0) return -1;
2368 } else if (len < (1<<14)) {
2369 /* Save a 14 bit len */
10c43610 2370 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
f78fd11b 2371 buf[1] = len&0xFF;
17be1a4a 2372 if (fwrite(buf,2,1,fp) == 0) return -1;
f78fd11b 2373 } else {
2374 /* Save a 32 bit len */
10c43610 2375 buf[0] = (REDIS_RDB_32BITLEN<<6);
f78fd11b 2376 if (fwrite(buf,1,1,fp) == 0) return -1;
2377 len = htonl(len);
2378 if (fwrite(&len,4,1,fp) == 0) return -1;
2379 }
2380 return 0;
2381}
2382
e3566d4b 2383/* String objects in the form "2391" "-100" without any space and with a
2384 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2385 * encoded as integers to save space */
56906eef 2386static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
e3566d4b 2387 long long value;
2388 char *endptr, buf[32];
2389
2390 /* Check if it's possible to encode this value as a number */
2391 value = strtoll(s, &endptr, 10);
2392 if (endptr[0] != '\0') return 0;
2393 snprintf(buf,32,"%lld",value);
2394
2395 /* If the number converted back into a string is not identical
2396 * then it's not possible to encode the string as integer */
2397 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
2398
2399 /* Finally check if it fits in our ranges */
2400 if (value >= -(1<<7) && value <= (1<<7)-1) {
2401 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
2402 enc[1] = value&0xFF;
2403 return 2;
2404 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
2405 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
2406 enc[1] = value&0xFF;
2407 enc[2] = (value>>8)&0xFF;
2408 return 3;
2409 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
2410 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
2411 enc[1] = value&0xFF;
2412 enc[2] = (value>>8)&0xFF;
2413 enc[3] = (value>>16)&0xFF;
2414 enc[4] = (value>>24)&0xFF;
2415 return 5;
2416 } else {
2417 return 0;
2418 }
2419}
2420
774e3047 2421static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
2422 unsigned int comprlen, outlen;
2423 unsigned char byte;
2424 void *out;
2425
2426 /* We require at least four bytes compression for this to be worth it */
2427 outlen = sdslen(obj->ptr)-4;
2428 if (outlen <= 0) return 0;
3a2694c4 2429 if ((out = zmalloc(outlen+1)) == NULL) return 0;
774e3047 2430 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
2431 if (comprlen == 0) {
88e85998 2432 zfree(out);
774e3047 2433 return 0;
2434 }
2435 /* Data compressed! Let's save it on disk */
2436 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
2437 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
2438 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
2439 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
2440 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
88e85998 2441 zfree(out);
774e3047 2442 return comprlen;
2443
2444writeerr:
88e85998 2445 zfree(out);
774e3047 2446 return -1;
2447}
2448
e3566d4b 2449/* Save a string objet as [len][data] on disk. If the object is a string
2450 * representation of an integer value we try to safe it in a special form */
942a3961 2451static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
2452 size_t len;
e3566d4b 2453 int enclen;
10c43610 2454
942a3961 2455 len = sdslen(obj->ptr);
2456
774e3047 2457 /* Try integer encoding */
e3566d4b 2458 if (len <= 11) {
2459 unsigned char buf[5];
2460 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
2461 if (fwrite(buf,enclen,1,fp) == 0) return -1;
2462 return 0;
2463 }
2464 }
774e3047 2465
2466 /* Try LZF compression - under 20 bytes it's unable to compress even
88e85998 2467 * aaaaaaaaaaaaaaaaaa so skip it */
942a3961 2468 if (len > 20) {
774e3047 2469 int retval;
2470
2471 retval = rdbSaveLzfStringObject(fp,obj);
2472 if (retval == -1) return -1;
2473 if (retval > 0) return 0;
2474 /* retval == 0 means data can't be compressed, save the old way */
2475 }
2476
2477 /* Store verbatim */
10c43610 2478 if (rdbSaveLen(fp,len) == -1) return -1;
2479 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
2480 return 0;
2481}
2482
942a3961 2483/* Like rdbSaveStringObjectRaw() but handle encoded objects */
2484static int rdbSaveStringObject(FILE *fp, robj *obj) {
2485 int retval;
942a3961 2486
9d65a1bb 2487 obj = getDecodedObject(obj);
2488 retval = rdbSaveStringObjectRaw(fp,obj);
2489 decrRefCount(obj);
2490 return retval;
942a3961 2491}
2492
a7866db6 2493/* Save a double value. Doubles are saved as strings prefixed by an unsigned
2494 * 8 bit integer specifing the length of the representation.
2495 * This 8 bit integer has special values in order to specify the following
2496 * conditions:
2497 * 253: not a number
2498 * 254: + inf
2499 * 255: - inf
2500 */
2501static int rdbSaveDoubleValue(FILE *fp, double val) {
2502 unsigned char buf[128];
2503 int len;
2504
2505 if (isnan(val)) {
2506 buf[0] = 253;
2507 len = 1;
2508 } else if (!isfinite(val)) {
2509 len = 1;
2510 buf[0] = (val < 0) ? 255 : 254;
2511 } else {
eaa256ad 2512 snprintf((char*)buf+1,sizeof(buf)-1,"%.17g",val);
6c446631 2513 buf[0] = strlen((char*)buf+1);
a7866db6 2514 len = buf[0]+1;
2515 }
2516 if (fwrite(buf,len,1,fp) == 0) return -1;
2517 return 0;
2518}
2519
ed9b544e 2520/* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
f78fd11b 2521static int rdbSave(char *filename) {
ed9b544e 2522 dictIterator *di = NULL;
2523 dictEntry *de;
ed9b544e 2524 FILE *fp;
2525 char tmpfile[256];
2526 int j;
bb32ede5 2527 time_t now = time(NULL);
ed9b544e 2528
a3b21203 2529 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
ed9b544e 2530 fp = fopen(tmpfile,"w");
2531 if (!fp) {
2532 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
2533 return REDIS_ERR;
2534 }
f78fd11b 2535 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
ed9b544e 2536 for (j = 0; j < server.dbnum; j++) {
bb32ede5 2537 redisDb *db = server.db+j;
2538 dict *d = db->dict;
3305306f 2539 if (dictSize(d) == 0) continue;
ed9b544e 2540 di = dictGetIterator(d);
2541 if (!di) {
2542 fclose(fp);
2543 return REDIS_ERR;
2544 }
2545
2546 /* Write the SELECT DB opcode */
f78fd11b 2547 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
2548 if (rdbSaveLen(fp,j) == -1) goto werr;
ed9b544e 2549
2550 /* Iterate this DB writing every entry */
2551 while((de = dictNext(di)) != NULL) {
2552 robj *key = dictGetEntryKey(de);
2553 robj *o = dictGetEntryVal(de);
bb32ede5 2554 time_t expiretime = getExpire(db,key);
2555
2556 /* Save the expire time */
2557 if (expiretime != -1) {
2558 /* If this key is already expired skip it */
2559 if (expiretime < now) continue;
2560 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
2561 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
2562 }
2563 /* Save the key and associated value */
f78fd11b 2564 if (rdbSaveType(fp,o->type) == -1) goto werr;
10c43610 2565 if (rdbSaveStringObject(fp,key) == -1) goto werr;
f78fd11b 2566 if (o->type == REDIS_STRING) {
ed9b544e 2567 /* Save a string value */
10c43610 2568 if (rdbSaveStringObject(fp,o) == -1) goto werr;
f78fd11b 2569 } else if (o->type == REDIS_LIST) {
ed9b544e 2570 /* Save a list value */
2571 list *list = o->ptr;
6208b3a7 2572 listNode *ln;
ed9b544e 2573
6208b3a7 2574 listRewind(list);
f78fd11b 2575 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
6208b3a7 2576 while((ln = listYield(list))) {
ed9b544e 2577 robj *eleobj = listNodeValue(ln);
f78fd11b 2578
10c43610 2579 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
ed9b544e 2580 }
f78fd11b 2581 } else if (o->type == REDIS_SET) {
ed9b544e 2582 /* Save a set value */
2583 dict *set = o->ptr;
2584 dictIterator *di = dictGetIterator(set);
2585 dictEntry *de;
2586
3305306f 2587 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
ed9b544e 2588 while((de = dictNext(di)) != NULL) {
10c43610 2589 robj *eleobj = dictGetEntryKey(de);
ed9b544e 2590
10c43610 2591 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
ed9b544e 2592 }
2593 dictReleaseIterator(di);
2b59cfdf 2594 } else if (o->type == REDIS_ZSET) {
2595 /* Save a set value */
2596 zset *zs = o->ptr;
2597 dictIterator *di = dictGetIterator(zs->dict);
2598 dictEntry *de;
2599
2600 if (rdbSaveLen(fp,dictSize(zs->dict)) == -1) goto werr;
2601 while((de = dictNext(di)) != NULL) {
2602 robj *eleobj = dictGetEntryKey(de);
2603 double *score = dictGetEntryVal(de);
2604
2605 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2606 if (rdbSaveDoubleValue(fp,*score) == -1) goto werr;
2607 }
2608 dictReleaseIterator(di);
ed9b544e 2609 } else {
2610 assert(0 != 0);
2611 }
2612 }
2613 dictReleaseIterator(di);
2614 }
2615 /* EOF opcode */
f78fd11b 2616 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
2617
2618 /* Make sure data will not remain on the OS's output buffers */
ed9b544e 2619 fflush(fp);
2620 fsync(fileno(fp));
2621 fclose(fp);
2622
2623 /* Use RENAME to make sure the DB file is changed atomically only
2624 * if the generate DB file is ok. */
2625 if (rename(tmpfile,filename) == -1) {
325d1eb4 2626 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
ed9b544e 2627 unlink(tmpfile);
2628 return REDIS_ERR;
2629 }
2630 redisLog(REDIS_NOTICE,"DB saved on disk");
2631 server.dirty = 0;
2632 server.lastsave = time(NULL);
2633 return REDIS_OK;
2634
2635werr:
2636 fclose(fp);
2637 unlink(tmpfile);
2638 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
2639 if (di) dictReleaseIterator(di);
2640 return REDIS_ERR;
2641}
2642
f78fd11b 2643static int rdbSaveBackground(char *filename) {
ed9b544e 2644 pid_t childpid;
2645
9d65a1bb 2646 if (server.bgsavechildpid != -1) return REDIS_ERR;
ed9b544e 2647 if ((childpid = fork()) == 0) {
2648 /* Child */
2649 close(server.fd);
f78fd11b 2650 if (rdbSave(filename) == REDIS_OK) {
ed9b544e 2651 exit(0);
2652 } else {
2653 exit(1);
2654 }
2655 } else {
2656 /* Parent */
5a7c647e 2657 if (childpid == -1) {
2658 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
2659 strerror(errno));
2660 return REDIS_ERR;
2661 }
ed9b544e 2662 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
9f3c422c 2663 server.bgsavechildpid = childpid;
ed9b544e 2664 return REDIS_OK;
2665 }
2666 return REDIS_OK; /* unreached */
2667}
2668
a3b21203 2669static void rdbRemoveTempFile(pid_t childpid) {
2670 char tmpfile[256];
2671
2672 snprintf(tmpfile,256,"temp-%d.rdb", (int) childpid);
2673 unlink(tmpfile);
2674}
2675
f78fd11b 2676static int rdbLoadType(FILE *fp) {
2677 unsigned char type;
7b45bfb2 2678 if (fread(&type,1,1,fp) == 0) return -1;
2679 return type;
2680}
2681
bb32ede5 2682static time_t rdbLoadTime(FILE *fp) {
2683 int32_t t32;
2684 if (fread(&t32,4,1,fp) == 0) return -1;
2685 return (time_t) t32;
2686}
2687
e3566d4b 2688/* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2689 * of this file for a description of how this are stored on disk.
2690 *
2691 * isencoded is set to 1 if the readed length is not actually a length but
2692 * an "encoding type", check the above comments for more info */
2693static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
f78fd11b 2694 unsigned char buf[2];
2695 uint32_t len;
2696
e3566d4b 2697 if (isencoded) *isencoded = 0;
f78fd11b 2698 if (rdbver == 0) {
2699 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2700 return ntohl(len);
2701 } else {
17be1a4a 2702 int type;
2703
f78fd11b 2704 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
17be1a4a 2705 type = (buf[0]&0xC0)>>6;
2706 if (type == REDIS_RDB_6BITLEN) {
f78fd11b 2707 /* Read a 6 bit len */
e3566d4b 2708 return buf[0]&0x3F;
2709 } else if (type == REDIS_RDB_ENCVAL) {
2710 /* Read a 6 bit len encoding type */
2711 if (isencoded) *isencoded = 1;
2712 return buf[0]&0x3F;
17be1a4a 2713 } else if (type == REDIS_RDB_14BITLEN) {
f78fd11b 2714 /* Read a 14 bit len */
2715 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2716 return ((buf[0]&0x3F)<<8)|buf[1];
2717 } else {
2718 /* Read a 32 bit len */
2719 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2720 return ntohl(len);
2721 }
2722 }
f78fd11b 2723}
2724
e3566d4b 2725static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2726 unsigned char enc[4];
2727 long long val;
2728
2729 if (enctype == REDIS_RDB_ENC_INT8) {
2730 if (fread(enc,1,1,fp) == 0) return NULL;
2731 val = (signed char)enc[0];
2732 } else if (enctype == REDIS_RDB_ENC_INT16) {
2733 uint16_t v;
2734 if (fread(enc,2,1,fp) == 0) return NULL;
2735 v = enc[0]|(enc[1]<<8);
2736 val = (int16_t)v;
2737 } else if (enctype == REDIS_RDB_ENC_INT32) {
2738 uint32_t v;
2739 if (fread(enc,4,1,fp) == 0) return NULL;
2740 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2741 val = (int32_t)v;
2742 } else {
2743 val = 0; /* anti-warning */
2744 assert(0!=0);
2745 }
2746 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2747}
2748
88e85998 2749static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2750 unsigned int len, clen;
2751 unsigned char *c = NULL;
2752 sds val = NULL;
2753
2754 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2755 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2756 if ((c = zmalloc(clen)) == NULL) goto err;
2757 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2758 if (fread(c,clen,1,fp) == 0) goto err;
2759 if (lzf_decompress(c,clen,val,len) == 0) goto err;
5109cdff 2760 zfree(c);
88e85998 2761 return createObject(REDIS_STRING,val);
2762err:
2763 zfree(c);
2764 sdsfree(val);
2765 return NULL;
2766}
2767
e3566d4b 2768static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2769 int isencoded;
2770 uint32_t len;
f78fd11b 2771 sds val;
2772
e3566d4b 2773 len = rdbLoadLen(fp,rdbver,&isencoded);
2774 if (isencoded) {
2775 switch(len) {
2776 case REDIS_RDB_ENC_INT8:
2777 case REDIS_RDB_ENC_INT16:
2778 case REDIS_RDB_ENC_INT32:
3305306f 2779 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
88e85998 2780 case REDIS_RDB_ENC_LZF:
2781 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
e3566d4b 2782 default:
2783 assert(0!=0);
2784 }
2785 }
2786
f78fd11b 2787 if (len == REDIS_RDB_LENERR) return NULL;
2788 val = sdsnewlen(NULL,len);
2789 if (len && fread(val,len,1,fp) == 0) {
2790 sdsfree(val);
2791 return NULL;
2792 }
10c43610 2793 return tryObjectSharing(createObject(REDIS_STRING,val));
f78fd11b 2794}
2795
a7866db6 2796/* For information about double serialization check rdbSaveDoubleValue() */
2797static int rdbLoadDoubleValue(FILE *fp, double *val) {
2798 char buf[128];
2799 unsigned char len;
2800
2801 if (fread(&len,1,1,fp) == 0) return -1;
2802 switch(len) {
2803 case 255: *val = R_NegInf; return 0;
2804 case 254: *val = R_PosInf; return 0;
2805 case 253: *val = R_Nan; return 0;
2806 default:
2807 if (fread(buf,len,1,fp) == 0) return -1;
2808 sscanf(buf, "%lg", val);
2809 return 0;
2810 }
2811}
2812
f78fd11b 2813static int rdbLoad(char *filename) {
ed9b544e 2814 FILE *fp;
f78fd11b 2815 robj *keyobj = NULL;
2816 uint32_t dbid;
bb32ede5 2817 int type, retval, rdbver;
3305306f 2818 dict *d = server.db[0].dict;
bb32ede5 2819 redisDb *db = server.db+0;
f78fd11b 2820 char buf[1024];
bb32ede5 2821 time_t expiretime = -1, now = time(NULL);
2822
ed9b544e 2823 fp = fopen(filename,"r");
2824 if (!fp) return REDIS_ERR;
2825 if (fread(buf,9,1,fp) == 0) goto eoferr;
f78fd11b 2826 buf[9] = '\0';
2827 if (memcmp(buf,"REDIS",5) != 0) {
ed9b544e 2828 fclose(fp);
2829 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2830 return REDIS_ERR;
2831 }
f78fd11b 2832 rdbver = atoi(buf+5);
2833 if (rdbver > 1) {
2834 fclose(fp);
2835 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2836 return REDIS_ERR;
2837 }
ed9b544e 2838 while(1) {
2839 robj *o;
2840
2841 /* Read type. */
f78fd11b 2842 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
bb32ede5 2843 if (type == REDIS_EXPIRETIME) {
2844 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2845 /* We read the time so we need to read the object type again */
2846 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2847 }
ed9b544e 2848 if (type == REDIS_EOF) break;
2849 /* Handle SELECT DB opcode as a special case */
2850 if (type == REDIS_SELECTDB) {
e3566d4b 2851 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2852 goto eoferr;
ed9b544e 2853 if (dbid >= (unsigned)server.dbnum) {
f78fd11b 2854 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
ed9b544e 2855 exit(1);
2856 }
bb32ede5 2857 db = server.db+dbid;
2858 d = db->dict;
ed9b544e 2859 continue;
2860 }
2861 /* Read key */
f78fd11b 2862 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
ed9b544e 2863
2864 if (type == REDIS_STRING) {
2865 /* Read string value */
f78fd11b 2866 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
942a3961 2867 tryObjectEncoding(o);
ed9b544e 2868 } else if (type == REDIS_LIST || type == REDIS_SET) {
2869 /* Read list/set value */
2870 uint32_t listlen;
f78fd11b 2871
e3566d4b 2872 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
f78fd11b 2873 goto eoferr;
ed9b544e 2874 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2875 /* Load every single element of the list/set */
2876 while(listlen--) {
2877 robj *ele;
2878
f78fd11b 2879 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
942a3961 2880 tryObjectEncoding(ele);
ed9b544e 2881 if (type == REDIS_LIST) {
6b47e12e 2882 listAddNodeTail((list*)o->ptr,ele);
ed9b544e 2883 } else {
6b47e12e 2884 dictAdd((dict*)o->ptr,ele,NULL);
ed9b544e 2885 }
ed9b544e 2886 }
2b59cfdf 2887 } else if (type == REDIS_ZSET) {
2888 /* Read list/set value */
2889 uint32_t zsetlen;
2890 zset *zs;
2891
2892 if ((zsetlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2893 goto eoferr;
2894 o = createZsetObject();
2895 zs = o->ptr;
2896 /* Load every single element of the list/set */
2897 while(zsetlen--) {
2898 robj *ele;
2899 double *score = zmalloc(sizeof(double));
2900
2901 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2902 tryObjectEncoding(ele);
2903 if (rdbLoadDoubleValue(fp,score) == -1) goto eoferr;
2904 dictAdd(zs->dict,ele,score);
2905 zslInsert(zs->zsl,*score,ele);
2906 incrRefCount(ele); /* added to skiplist */
2907 }
ed9b544e 2908 } else {
2909 assert(0 != 0);
2910 }
2911 /* Add the new object in the hash table */
f78fd11b 2912 retval = dictAdd(d,keyobj,o);
ed9b544e 2913 if (retval == DICT_ERR) {
f78fd11b 2914 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
ed9b544e 2915 exit(1);
2916 }
bb32ede5 2917 /* Set the expire time if needed */
2918 if (expiretime != -1) {
2919 setExpire(db,keyobj,expiretime);
2920 /* Delete this key if already expired */
2921 if (expiretime < now) deleteKey(db,keyobj);
2922 expiretime = -1;
2923 }
f78fd11b 2924 keyobj = o = NULL;
ed9b544e 2925 }
2926 fclose(fp);
2927 return REDIS_OK;
2928
2929eoferr: /* unexpected end of file is handled here with a fatal exit */
e3566d4b 2930 if (keyobj) decrRefCount(keyobj);
f80dff62 2931 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
ed9b544e 2932 exit(1);
2933 return REDIS_ERR; /* Just to avoid warning */
2934}
2935
2936/*================================== Commands =============================== */
2937
abcb223e 2938static void authCommand(redisClient *c) {
2e77c2ee 2939 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
abcb223e
BH
2940 c->authenticated = 1;
2941 addReply(c,shared.ok);
2942 } else {
2943 c->authenticated = 0;
fa4c0aba 2944 addReplySds(c,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
abcb223e
BH
2945 }
2946}
2947
ed9b544e 2948static void pingCommand(redisClient *c) {
2949 addReply(c,shared.pong);
2950}
2951
2952static void echoCommand(redisClient *c) {
942a3961 2953 addReplyBulkLen(c,c->argv[1]);
ed9b544e 2954 addReply(c,c->argv[1]);
2955 addReply(c,shared.crlf);
2956}
2957
2958/*=================================== Strings =============================== */
2959
2960static void setGenericCommand(redisClient *c, int nx) {
2961 int retval;
2962
3305306f 2963 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
ed9b544e 2964 if (retval == DICT_ERR) {
2965 if (!nx) {
3305306f 2966 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
ed9b544e 2967 incrRefCount(c->argv[2]);
2968 } else {
c937aa89 2969 addReply(c,shared.czero);
ed9b544e 2970 return;
2971 }
2972 } else {
2973 incrRefCount(c->argv[1]);
2974 incrRefCount(c->argv[2]);
2975 }
2976 server.dirty++;
3305306f 2977 removeExpire(c->db,c->argv[1]);
c937aa89 2978 addReply(c, nx ? shared.cone : shared.ok);
ed9b544e 2979}
2980
2981static void setCommand(redisClient *c) {
a4d1ba9a 2982 setGenericCommand(c,0);
ed9b544e 2983}
2984
2985static void setnxCommand(redisClient *c) {
a4d1ba9a 2986 setGenericCommand(c,1);
ed9b544e 2987}
2988
2989static void getCommand(redisClient *c) {
3305306f 2990 robj *o = lookupKeyRead(c->db,c->argv[1]);
2991
2992 if (o == NULL) {
c937aa89 2993 addReply(c,shared.nullbulk);
ed9b544e 2994 } else {
ed9b544e 2995 if (o->type != REDIS_STRING) {
c937aa89 2996 addReply(c,shared.wrongtypeerr);
ed9b544e 2997 } else {
942a3961 2998 addReplyBulkLen(c,o);
ed9b544e 2999 addReply(c,o);
3000 addReply(c,shared.crlf);
3001 }
3002 }
3003}
3004
f6b141c5 3005static void getsetCommand(redisClient *c) {
a431eb74 3006 getCommand(c);
3007 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
3008 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
3009 } else {
3010 incrRefCount(c->argv[1]);
3011 }
3012 incrRefCount(c->argv[2]);
3013 server.dirty++;
3014 removeExpire(c->db,c->argv[1]);
3015}
3016
70003d28 3017static void mgetCommand(redisClient *c) {
70003d28 3018 int j;
3019
c937aa89 3020 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
70003d28 3021 for (j = 1; j < c->argc; j++) {
3305306f 3022 robj *o = lookupKeyRead(c->db,c->argv[j]);
3023 if (o == NULL) {
c937aa89 3024 addReply(c,shared.nullbulk);
70003d28 3025 } else {
70003d28 3026 if (o->type != REDIS_STRING) {
c937aa89 3027 addReply(c,shared.nullbulk);
70003d28 3028 } else {
942a3961 3029 addReplyBulkLen(c,o);
70003d28 3030 addReply(c,o);
3031 addReply(c,shared.crlf);
3032 }
3033 }
3034 }
3035}
3036
6c446631 3037static void msetGenericCommand(redisClient *c, int nx) {
3038 int j;
3039
3040 if ((c->argc % 2) == 0) {
3041 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
3042 return;
3043 }
3044 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
3045 * set nothing at all if at least one already key exists. */
3046 if (nx) {
3047 for (j = 1; j < c->argc; j += 2) {
3048 if (dictFind(c->db->dict,c->argv[j]) != NULL) {
3049 addReply(c, shared.czero);
3050 return;
3051 }
3052 }
3053 }
3054
3055 for (j = 1; j < c->argc; j += 2) {
3056 int retval;
3057
3058 retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
3059 if (retval == DICT_ERR) {
3060 dictReplace(c->db->dict,c->argv[j],c->argv[j+1]);
3061 incrRefCount(c->argv[j+1]);
3062 } else {
3063 incrRefCount(c->argv[j]);
3064 incrRefCount(c->argv[j+1]);
3065 }
3066 removeExpire(c->db,c->argv[j]);
3067 }
3068 server.dirty += (c->argc-1)/2;
3069 addReply(c, nx ? shared.cone : shared.ok);
3070}
3071
3072static void msetCommand(redisClient *c) {
3073 msetGenericCommand(c,0);
3074}
3075
3076static void msetnxCommand(redisClient *c) {
3077 msetGenericCommand(c,1);
3078}
3079
d68ed120 3080static void incrDecrCommand(redisClient *c, long long incr) {
ed9b544e 3081 long long value;
3082 int retval;
3083 robj *o;
3084
3305306f 3085 o = lookupKeyWrite(c->db,c->argv[1]);
3086 if (o == NULL) {
ed9b544e 3087 value = 0;
3088 } else {
ed9b544e 3089 if (o->type != REDIS_STRING) {
3090 value = 0;
3091 } else {
3092 char *eptr;
3093
942a3961 3094 if (o->encoding == REDIS_ENCODING_RAW)
3095 value = strtoll(o->ptr, &eptr, 10);
3096 else if (o->encoding == REDIS_ENCODING_INT)
3097 value = (long)o->ptr;
3098 else
3099 assert(1 != 1);
ed9b544e 3100 }
3101 }
3102
3103 value += incr;
3104 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
942a3961 3105 tryObjectEncoding(o);
3305306f 3106 retval = dictAdd(c->db->dict,c->argv[1],o);
ed9b544e 3107 if (retval == DICT_ERR) {
3305306f 3108 dictReplace(c->db->dict,c->argv[1],o);
3109 removeExpire(c->db,c->argv[1]);
ed9b544e 3110 } else {
3111 incrRefCount(c->argv[1]);
3112 }
3113 server.dirty++;
c937aa89 3114 addReply(c,shared.colon);
ed9b544e 3115 addReply(c,o);
3116 addReply(c,shared.crlf);
3117}
3118
3119static void incrCommand(redisClient *c) {
a4d1ba9a 3120 incrDecrCommand(c,1);
ed9b544e 3121}
3122
3123static void decrCommand(redisClient *c) {
a4d1ba9a 3124 incrDecrCommand(c,-1);
ed9b544e 3125}
3126
3127static void incrbyCommand(redisClient *c) {
d68ed120 3128 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
a4d1ba9a 3129 incrDecrCommand(c,incr);
ed9b544e 3130}
3131
3132static void decrbyCommand(redisClient *c) {
d68ed120 3133 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
a4d1ba9a 3134 incrDecrCommand(c,-incr);
ed9b544e 3135}
3136
3137/* ========================= Type agnostic commands ========================= */
3138
3139static void delCommand(redisClient *c) {
5109cdff 3140 int deleted = 0, j;
3141
3142 for (j = 1; j < c->argc; j++) {
3143 if (deleteKey(c->db,c->argv[j])) {
3144 server.dirty++;
3145 deleted++;
3146 }
3147 }
3148 switch(deleted) {
3149 case 0:
c937aa89 3150 addReply(c,shared.czero);
5109cdff 3151 break;
3152 case 1:
3153 addReply(c,shared.cone);
3154 break;
3155 default:
3156 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
3157 break;
ed9b544e 3158 }
3159}
3160
3161static void existsCommand(redisClient *c) {
3305306f 3162 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
ed9b544e 3163}
3164
3165static void selectCommand(redisClient *c) {
3166 int id = atoi(c->argv[1]->ptr);
3167
3168 if (selectDb(c,id) == REDIS_ERR) {
774e3047 3169 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
ed9b544e 3170 } else {
3171 addReply(c,shared.ok);
3172 }
3173}
3174
3175static void randomkeyCommand(redisClient *c) {
3176 dictEntry *de;
3305306f 3177
3178 while(1) {
3179 de = dictGetRandomKey(c->db->dict);
ce7bef07 3180 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
3305306f 3181 }
ed9b544e 3182 if (de == NULL) {
ce7bef07 3183 addReply(c,shared.plus);
ed9b544e 3184 addReply(c,shared.crlf);
3185 } else {
c937aa89 3186 addReply(c,shared.plus);
ed9b544e 3187 addReply(c,dictGetEntryKey(de));
3188 addReply(c,shared.crlf);
3189 }
3190}
3191
3192static void keysCommand(redisClient *c) {
3193 dictIterator *di;
3194 dictEntry *de;
3195 sds pattern = c->argv[1]->ptr;
3196 int plen = sdslen(pattern);
3197 int numkeys = 0, keyslen = 0;
3198 robj *lenobj = createObject(REDIS_STRING,NULL);
3199
3305306f 3200 di = dictGetIterator(c->db->dict);
ed9b544e 3201 addReply(c,lenobj);
3202 decrRefCount(lenobj);
3203 while((de = dictNext(di)) != NULL) {
3204 robj *keyobj = dictGetEntryKey(de);
3305306f 3205
ed9b544e 3206 sds key = keyobj->ptr;
3207 if ((pattern[0] == '*' && pattern[1] == '\0') ||
3208 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
3305306f 3209 if (expireIfNeeded(c->db,keyobj) == 0) {
3210 if (numkeys != 0)
3211 addReply(c,shared.space);
3212 addReply(c,keyobj);
3213 numkeys++;
3214 keyslen += sdslen(key);
3215 }
ed9b544e 3216 }
3217 }
3218 dictReleaseIterator(di);
c937aa89 3219 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
ed9b544e 3220 addReply(c,shared.crlf);
3221}
3222
3223static void dbsizeCommand(redisClient *c) {
3224 addReplySds(c,
3305306f 3225 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
ed9b544e 3226}
3227
3228static void lastsaveCommand(redisClient *c) {
3229 addReplySds(c,
c937aa89 3230 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
ed9b544e 3231}
3232
3233static void typeCommand(redisClient *c) {
3305306f 3234 robj *o;
ed9b544e 3235 char *type;
3305306f 3236
3237 o = lookupKeyRead(c->db,c->argv[1]);
3238 if (o == NULL) {
c937aa89 3239 type = "+none";
ed9b544e 3240 } else {
ed9b544e 3241 switch(o->type) {
c937aa89 3242 case REDIS_STRING: type = "+string"; break;
3243 case REDIS_LIST: type = "+list"; break;
3244 case REDIS_SET: type = "+set"; break;
412a8bce 3245 case REDIS_ZSET: type = "+zset"; break;
ed9b544e 3246 default: type = "unknown"; break;
3247 }
3248 }
3249 addReplySds(c,sdsnew(type));
3250 addReply(c,shared.crlf);
3251}
3252
3253static void saveCommand(redisClient *c) {
9d65a1bb 3254 if (server.bgsavechildpid != -1) {
05557f6d 3255 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
3256 return;
3257 }
f78fd11b 3258 if (rdbSave(server.dbfilename) == REDIS_OK) {
ed9b544e 3259 addReply(c,shared.ok);
3260 } else {
3261 addReply(c,shared.err);
3262 }
3263}
3264
3265static void bgsaveCommand(redisClient *c) {
9d65a1bb 3266 if (server.bgsavechildpid != -1) {
ed9b544e 3267 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
3268 return;
3269 }
f78fd11b 3270 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
ed9b544e 3271 addReply(c,shared.ok);
3272 } else {
3273 addReply(c,shared.err);
3274 }
3275}
3276
3277static void shutdownCommand(redisClient *c) {
3278 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
a3b21203 3279 /* Kill the saving child if there is a background saving in progress.
3280 We want to avoid race conditions, for instance our saving child may
3281 overwrite the synchronous saving did by SHUTDOWN. */
9d65a1bb 3282 if (server.bgsavechildpid != -1) {
9f3c422c 3283 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
3284 kill(server.bgsavechildpid,SIGKILL);
a3b21203 3285 rdbRemoveTempFile(server.bgsavechildpid);
9f3c422c 3286 }
a3b21203 3287 /* SYNC SAVE */
f78fd11b 3288 if (rdbSave(server.dbfilename) == REDIS_OK) {
9f3c422c 3289 if (server.daemonize)
b284af55 3290 unlink(server.pidfile);
b284af55 3291 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
ed9b544e 3292 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
3293 exit(1);
3294 } else {
a3b21203 3295 /* Ooops.. error saving! The best we can do is to continue operating.
3296 * Note that if there was a background saving process, in the next
3297 * cron() Redis will be notified that the background saving aborted,
3298 * handling special stuff like slaves pending for synchronization... */
ed9b544e 3299 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
3300 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3301 }
3302}
3303
3304static void renameGenericCommand(redisClient *c, int nx) {
ed9b544e 3305 robj *o;
3306
3307 /* To use the same key as src and dst is probably an error */
3308 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
c937aa89 3309 addReply(c,shared.sameobjecterr);
ed9b544e 3310 return;
3311 }
3312
3305306f 3313 o = lookupKeyWrite(c->db,c->argv[1]);
3314 if (o == NULL) {
c937aa89 3315 addReply(c,shared.nokeyerr);
ed9b544e 3316 return;
3317 }
ed9b544e 3318 incrRefCount(o);
3305306f 3319 deleteIfVolatile(c->db,c->argv[2]);
3320 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
ed9b544e 3321 if (nx) {
3322 decrRefCount(o);
c937aa89 3323 addReply(c,shared.czero);
ed9b544e 3324 return;
3325 }
3305306f 3326 dictReplace(c->db->dict,c->argv[2],o);
ed9b544e 3327 } else {
3328 incrRefCount(c->argv[2]);
3329 }
3305306f 3330 deleteKey(c->db,c->argv[1]);
ed9b544e 3331 server.dirty++;
c937aa89 3332 addReply(c,nx ? shared.cone : shared.ok);
ed9b544e 3333}
3334
3335static void renameCommand(redisClient *c) {
3336 renameGenericCommand(c,0);
3337}
3338
3339static void renamenxCommand(redisClient *c) {
3340 renameGenericCommand(c,1);
3341}
3342
3343static void moveCommand(redisClient *c) {
3305306f 3344 robj *o;
3345 redisDb *src, *dst;
ed9b544e 3346 int srcid;
3347
3348 /* Obtain source and target DB pointers */
3305306f 3349 src = c->db;
3350 srcid = c->db->id;
ed9b544e 3351 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
c937aa89 3352 addReply(c,shared.outofrangeerr);
ed9b544e 3353 return;
3354 }
3305306f 3355 dst = c->db;
3356 selectDb(c,srcid); /* Back to the source DB */
ed9b544e 3357
3358 /* If the user is moving using as target the same
3359 * DB as the source DB it is probably an error. */
3360 if (src == dst) {
c937aa89 3361 addReply(c,shared.sameobjecterr);
ed9b544e 3362 return;
3363 }
3364
3365 /* Check if the element exists and get a reference */
3305306f 3366 o = lookupKeyWrite(c->db,c->argv[1]);
3367 if (!o) {
c937aa89 3368 addReply(c,shared.czero);
ed9b544e 3369 return;
3370 }
3371
3372 /* Try to add the element to the target DB */
3305306f 3373 deleteIfVolatile(dst,c->argv[1]);
3374 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
c937aa89 3375 addReply(c,shared.czero);
ed9b544e 3376 return;
3377 }
3305306f 3378 incrRefCount(c->argv[1]);
ed9b544e 3379 incrRefCount(o);
3380
3381 /* OK! key moved, free the entry in the source DB */
3305306f 3382 deleteKey(src,c->argv[1]);
ed9b544e 3383 server.dirty++;
c937aa89 3384 addReply(c,shared.cone);
ed9b544e 3385}
3386
3387/* =================================== Lists ================================ */
3388static void pushGenericCommand(redisClient *c, int where) {
3389 robj *lobj;
ed9b544e 3390 list *list;
3305306f 3391
3392 lobj = lookupKeyWrite(c->db,c->argv[1]);
3393 if (lobj == NULL) {
ed9b544e 3394 lobj = createListObject();
3395 list = lobj->ptr;
3396 if (where == REDIS_HEAD) {
6b47e12e 3397 listAddNodeHead(list,c->argv[2]);
ed9b544e 3398 } else {
6b47e12e 3399 listAddNodeTail(list,c->argv[2]);
ed9b544e 3400 }
3305306f 3401 dictAdd(c->db->dict,c->argv[1],lobj);
ed9b544e 3402 incrRefCount(c->argv[1]);
3403 incrRefCount(c->argv[2]);
3404 } else {
ed9b544e 3405 if (lobj->type != REDIS_LIST) {
3406 addReply(c,shared.wrongtypeerr);
3407 return;
3408 }
3409 list = lobj->ptr;
3410 if (where == REDIS_HEAD) {
6b47e12e 3411 listAddNodeHead(list,c->argv[2]);
ed9b544e 3412 } else {
6b47e12e 3413 listAddNodeTail(list,c->argv[2]);
ed9b544e 3414 }
3415 incrRefCount(c->argv[2]);
3416 }
3417 server.dirty++;
3418 addReply(c,shared.ok);
3419}
3420
3421static void lpushCommand(redisClient *c) {
3422 pushGenericCommand(c,REDIS_HEAD);
3423}
3424
3425static void rpushCommand(redisClient *c) {
3426 pushGenericCommand(c,REDIS_TAIL);
3427}
3428
3429static void llenCommand(redisClient *c) {
3305306f 3430 robj *o;
ed9b544e 3431 list *l;
3432
3305306f 3433 o = lookupKeyRead(c->db,c->argv[1]);
3434 if (o == NULL) {
c937aa89 3435 addReply(c,shared.czero);
ed9b544e 3436 return;
3437 } else {
ed9b544e 3438 if (o->type != REDIS_LIST) {
c937aa89 3439 addReply(c,shared.wrongtypeerr);
ed9b544e 3440 } else {
3441 l = o->ptr;
c937aa89 3442 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
ed9b544e 3443 }
3444 }
3445}
3446
3447static void lindexCommand(redisClient *c) {
3305306f 3448 robj *o;
ed9b544e 3449 int index = atoi(c->argv[2]->ptr);
3450
3305306f 3451 o = lookupKeyRead(c->db,c->argv[1]);
3452 if (o == NULL) {
c937aa89 3453 addReply(c,shared.nullbulk);
ed9b544e 3454 } else {
ed9b544e 3455 if (o->type != REDIS_LIST) {
c937aa89 3456 addReply(c,shared.wrongtypeerr);
ed9b544e 3457 } else {
3458 list *list = o->ptr;
3459 listNode *ln;
3460
3461 ln = listIndex(list, index);
3462 if (ln == NULL) {
c937aa89 3463 addReply(c,shared.nullbulk);
ed9b544e 3464 } else {
3465 robj *ele = listNodeValue(ln);
942a3961 3466 addReplyBulkLen(c,ele);
ed9b544e 3467 addReply(c,ele);
3468 addReply(c,shared.crlf);
3469 }
3470 }
3471 }
3472}
3473
3474static void lsetCommand(redisClient *c) {
3305306f 3475 robj *o;
ed9b544e 3476 int index = atoi(c->argv[2]->ptr);
3477
3305306f 3478 o = lookupKeyWrite(c->db,c->argv[1]);
3479 if (o == NULL) {
ed9b544e 3480 addReply(c,shared.nokeyerr);
3481 } else {
ed9b544e 3482 if (o->type != REDIS_LIST) {
3483 addReply(c,shared.wrongtypeerr);
3484 } else {
3485 list *list = o->ptr;
3486 listNode *ln;
3487
3488 ln = listIndex(list, index);
3489 if (ln == NULL) {
c937aa89 3490 addReply(c,shared.outofrangeerr);
ed9b544e 3491 } else {
3492 robj *ele = listNodeValue(ln);
3493
3494 decrRefCount(ele);
3495 listNodeValue(ln) = c->argv[3];
3496 incrRefCount(c->argv[3]);
3497 addReply(c,shared.ok);
3498 server.dirty++;
3499 }
3500 }
3501 }
3502}
3503
3504static void popGenericCommand(redisClient *c, int where) {
3305306f 3505 robj *o;
3506
3507 o = lookupKeyWrite(c->db,c->argv[1]);
3508 if (o == NULL) {
c937aa89 3509 addReply(c,shared.nullbulk);
ed9b544e 3510 } else {
ed9b544e 3511 if (o->type != REDIS_LIST) {
c937aa89 3512 addReply(c,shared.wrongtypeerr);
ed9b544e 3513 } else {
3514 list *list = o->ptr;
3515 listNode *ln;
3516
3517 if (where == REDIS_HEAD)
3518 ln = listFirst(list);
3519 else
3520 ln = listLast(list);
3521
3522 if (ln == NULL) {
c937aa89 3523 addReply(c,shared.nullbulk);
ed9b544e 3524 } else {
3525 robj *ele = listNodeValue(ln);
942a3961 3526 addReplyBulkLen(c,ele);
ed9b544e 3527 addReply(c,ele);
3528 addReply(c,shared.crlf);
3529 listDelNode(list,ln);
3530 server.dirty++;
3531 }
3532 }
3533 }
3534}
3535
3536static void lpopCommand(redisClient *c) {
3537 popGenericCommand(c,REDIS_HEAD);
3538}
3539
3540static void rpopCommand(redisClient *c) {
3541 popGenericCommand(c,REDIS_TAIL);
3542}
3543
3544static void lrangeCommand(redisClient *c) {
3305306f 3545 robj *o;
ed9b544e 3546 int start = atoi(c->argv[2]->ptr);
3547 int end = atoi(c->argv[3]->ptr);
3305306f 3548
3549 o = lookupKeyRead(c->db,c->argv[1]);
3550 if (o == NULL) {
c937aa89 3551 addReply(c,shared.nullmultibulk);
ed9b544e 3552 } else {
ed9b544e 3553 if (o->type != REDIS_LIST) {
c937aa89 3554 addReply(c,shared.wrongtypeerr);
ed9b544e 3555 } else {
3556 list *list = o->ptr;
3557 listNode *ln;
3558 int llen = listLength(list);
3559 int rangelen, j;
3560 robj *ele;
3561
3562 /* convert negative indexes */
3563 if (start < 0) start = llen+start;
3564 if (end < 0) end = llen+end;
3565 if (start < 0) start = 0;
3566 if (end < 0) end = 0;
3567
3568 /* indexes sanity checks */
3569 if (start > end || start >= llen) {
3570 /* Out of range start or start > end result in empty list */
c937aa89 3571 addReply(c,shared.emptymultibulk);
ed9b544e 3572 return;
3573 }
3574 if (end >= llen) end = llen-1;
3575 rangelen = (end-start)+1;
3576
3577 /* Return the result in form of a multi-bulk reply */
3578 ln = listIndex(list, start);
c937aa89 3579 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
ed9b544e 3580 for (j = 0; j < rangelen; j++) {
3581 ele = listNodeValue(ln);
942a3961 3582 addReplyBulkLen(c,ele);
ed9b544e 3583 addReply(c,ele);
3584 addReply(c,shared.crlf);
3585 ln = ln->next;
3586 }
3587 }
3588 }
3589}
3590
3591static void ltrimCommand(redisClient *c) {
3305306f 3592 robj *o;
ed9b544e 3593 int start = atoi(c->argv[2]->ptr);
3594 int end = atoi(c->argv[3]->ptr);
3595
3305306f 3596 o = lookupKeyWrite(c->db,c->argv[1]);
3597 if (o == NULL) {
ed9b544e 3598 addReply(c,shared.nokeyerr);
3599 } else {
ed9b544e 3600 if (o->type != REDIS_LIST) {
3601 addReply(c,shared.wrongtypeerr);
3602 } else {
3603 list *list = o->ptr;
3604 listNode *ln;
3605 int llen = listLength(list);
3606 int j, ltrim, rtrim;
3607
3608 /* convert negative indexes */
3609 if (start < 0) start = llen+start;
3610 if (end < 0) end = llen+end;
3611 if (start < 0) start = 0;
3612 if (end < 0) end = 0;
3613
3614 /* indexes sanity checks */
3615 if (start > end || start >= llen) {
3616 /* Out of range start or start > end result in empty list */
3617 ltrim = llen;
3618 rtrim = 0;
3619 } else {
3620 if (end >= llen) end = llen-1;
3621 ltrim = start;
3622 rtrim = llen-end-1;
3623 }
3624
3625 /* Remove list elements to perform the trim */
3626 for (j = 0; j < ltrim; j++) {
3627 ln = listFirst(list);
3628 listDelNode(list,ln);
3629 }
3630 for (j = 0; j < rtrim; j++) {
3631 ln = listLast(list);
3632 listDelNode(list,ln);
3633 }
ed9b544e 3634 server.dirty++;
e59229a2 3635 addReply(c,shared.ok);
ed9b544e 3636 }
3637 }
3638}
3639
3640static void lremCommand(redisClient *c) {
3305306f 3641 robj *o;
ed9b544e 3642
3305306f 3643 o = lookupKeyWrite(c->db,c->argv[1]);
3644 if (o == NULL) {
33c08b39 3645 addReply(c,shared.czero);
ed9b544e 3646 } else {
ed9b544e 3647 if (o->type != REDIS_LIST) {
c937aa89 3648 addReply(c,shared.wrongtypeerr);
ed9b544e 3649 } else {
3650 list *list = o->ptr;
3651 listNode *ln, *next;
3652 int toremove = atoi(c->argv[2]->ptr);
3653 int removed = 0;
3654 int fromtail = 0;
3655
3656 if (toremove < 0) {
3657 toremove = -toremove;
3658 fromtail = 1;
3659 }
3660 ln = fromtail ? list->tail : list->head;
3661 while (ln) {
ed9b544e 3662 robj *ele = listNodeValue(ln);
a4d1ba9a 3663
3664 next = fromtail ? ln->prev : ln->next;
724a51b1 3665 if (compareStringObjects(ele,c->argv[3]) == 0) {
ed9b544e 3666 listDelNode(list,ln);
3667 server.dirty++;
3668 removed++;
3669 if (toremove && removed == toremove) break;
3670 }
3671 ln = next;
3672 }
c937aa89 3673 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
ed9b544e 3674 }
3675 }
3676}
3677
12f9d551 3678/* This is the semantic of this command:
0f5f7e9a 3679 * RPOPLPUSH srclist dstlist:
12f9d551 3680 * IF LLEN(srclist) > 0
3681 * element = RPOP srclist
3682 * LPUSH dstlist element
3683 * RETURN element
3684 * ELSE
3685 * RETURN nil
3686 * END
3687 * END
3688 *
3689 * The idea is to be able to get an element from a list in a reliable way
3690 * since the element is not just returned but pushed against another list
3691 * as well. This command was originally proposed by Ezra Zygmuntowicz.
3692 */
0f5f7e9a 3693static void rpoplpushcommand(redisClient *c) {
12f9d551 3694 robj *sobj;
3695
3696 sobj = lookupKeyWrite(c->db,c->argv[1]);
3697 if (sobj == NULL) {
3698 addReply(c,shared.nullbulk);
3699 } else {
3700 if (sobj->type != REDIS_LIST) {
3701 addReply(c,shared.wrongtypeerr);
3702 } else {
3703 list *srclist = sobj->ptr;
3704 listNode *ln = listLast(srclist);
3705
3706 if (ln == NULL) {
3707 addReply(c,shared.nullbulk);
3708 } else {
3709 robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
3710 robj *ele = listNodeValue(ln);
3711 list *dstlist;
3712
3713 if (dobj == NULL) {
3714
3715 /* Create the list if the key does not exist */
3716 dobj = createListObject();
3717 dictAdd(c->db->dict,c->argv[2],dobj);
3718 incrRefCount(c->argv[2]);
3719 } else if (dobj->type != REDIS_LIST) {
3720 addReply(c,shared.wrongtypeerr);
3721 return;
3722 }
3723 /* Add the element to the target list */
3724 dstlist = dobj->ptr;
3725 listAddNodeHead(dstlist,ele);
3726 incrRefCount(ele);
3727
3728 /* Send the element to the client as reply as well */
3729 addReplyBulkLen(c,ele);
3730 addReply(c,ele);
3731 addReply(c,shared.crlf);
3732
3733 /* Finally remove the element from the source list */
3734 listDelNode(srclist,ln);
3735 server.dirty++;
3736 }
3737 }
3738 }
3739}
3740
3741
ed9b544e 3742/* ==================================== Sets ================================ */
3743
3744static void saddCommand(redisClient *c) {
ed9b544e 3745 robj *set;
3746
3305306f 3747 set = lookupKeyWrite(c->db,c->argv[1]);
3748 if (set == NULL) {
ed9b544e 3749 set = createSetObject();
3305306f 3750 dictAdd(c->db->dict,c->argv[1],set);
ed9b544e 3751 incrRefCount(c->argv[1]);
3752 } else {
ed9b544e 3753 if (set->type != REDIS_SET) {
c937aa89 3754 addReply(c,shared.wrongtypeerr);
ed9b544e 3755 return;
3756 }
3757 }
3758 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
3759 incrRefCount(c->argv[2]);
3760 server.dirty++;
c937aa89 3761 addReply(c,shared.cone);
ed9b544e 3762 } else {
c937aa89 3763 addReply(c,shared.czero);
ed9b544e 3764 }
3765}
3766
3767static void sremCommand(redisClient *c) {
3305306f 3768 robj *set;
ed9b544e 3769
3305306f 3770 set = lookupKeyWrite(c->db,c->argv[1]);
3771 if (set == NULL) {
c937aa89 3772 addReply(c,shared.czero);
ed9b544e 3773 } else {
ed9b544e 3774 if (set->type != REDIS_SET) {
c937aa89 3775 addReply(c,shared.wrongtypeerr);
ed9b544e 3776 return;
3777 }
3778 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
3779 server.dirty++;
12fea928 3780 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
c937aa89 3781 addReply(c,shared.cone);
ed9b544e 3782 } else {
c937aa89 3783 addReply(c,shared.czero);
ed9b544e 3784 }
3785 }
3786}
3787
a4460ef4 3788static void smoveCommand(redisClient *c) {
3789 robj *srcset, *dstset;
3790
3791 srcset = lookupKeyWrite(c->db,c->argv[1]);
3792 dstset = lookupKeyWrite(c->db,c->argv[2]);
3793
3794 /* If the source key does not exist return 0, if it's of the wrong type
3795 * raise an error */
3796 if (srcset == NULL || srcset->type != REDIS_SET) {
3797 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
3798 return;
3799 }
3800 /* Error if the destination key is not a set as well */
3801 if (dstset && dstset->type != REDIS_SET) {
3802 addReply(c,shared.wrongtypeerr);
3803 return;
3804 }
3805 /* Remove the element from the source set */
3806 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
3807 /* Key not found in the src set! return zero */
3808 addReply(c,shared.czero);
3809 return;
3810 }
3811 server.dirty++;
3812 /* Add the element to the destination set */
3813 if (!dstset) {
3814 dstset = createSetObject();
3815 dictAdd(c->db->dict,c->argv[2],dstset);
3816 incrRefCount(c->argv[2]);
3817 }
3818 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
3819 incrRefCount(c->argv[3]);
3820 addReply(c,shared.cone);
3821}
3822
ed9b544e 3823static void sismemberCommand(redisClient *c) {
3305306f 3824 robj *set;
ed9b544e 3825
3305306f 3826 set = lookupKeyRead(c->db,c->argv[1]);
3827 if (set == NULL) {
c937aa89 3828 addReply(c,shared.czero);
ed9b544e 3829 } else {
ed9b544e 3830 if (set->type != REDIS_SET) {
c937aa89 3831 addReply(c,shared.wrongtypeerr);
ed9b544e 3832 return;
3833 }
3834 if (dictFind(set->ptr,c->argv[2]))
c937aa89 3835 addReply(c,shared.cone);
ed9b544e 3836 else
c937aa89 3837 addReply(c,shared.czero);
ed9b544e 3838 }
3839}
3840
3841static void scardCommand(redisClient *c) {
3305306f 3842 robj *o;
ed9b544e 3843 dict *s;
3844
3305306f 3845 o = lookupKeyRead(c->db,c->argv[1]);
3846 if (o == NULL) {
c937aa89 3847 addReply(c,shared.czero);
ed9b544e 3848 return;
3849 } else {
ed9b544e 3850 if (o->type != REDIS_SET) {
c937aa89 3851 addReply(c,shared.wrongtypeerr);
ed9b544e 3852 } else {
3853 s = o->ptr;
c937aa89 3854 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3305306f 3855 dictSize(s)));
ed9b544e 3856 }
3857 }
3858}
3859
12fea928 3860static void spopCommand(redisClient *c) {
3861 robj *set;
3862 dictEntry *de;
3863
3864 set = lookupKeyWrite(c->db,c->argv[1]);
3865 if (set == NULL) {
3866 addReply(c,shared.nullbulk);
3867 } else {
3868 if (set->type != REDIS_SET) {
3869 addReply(c,shared.wrongtypeerr);
3870 return;
3871 }
3872 de = dictGetRandomKey(set->ptr);
3873 if (de == NULL) {
3874 addReply(c,shared.nullbulk);
3875 } else {
3876 robj *ele = dictGetEntryKey(de);
3877
942a3961 3878 addReplyBulkLen(c,ele);
12fea928 3879 addReply(c,ele);
3880 addReply(c,shared.crlf);
3881 dictDelete(set->ptr,ele);
3882 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3883 server.dirty++;
3884 }
3885 }
3886}
3887
2abb95a9 3888static void srandmemberCommand(redisClient *c) {
3889 robj *set;
3890 dictEntry *de;
3891
3892 set = lookupKeyRead(c->db,c->argv[1]);
3893 if (set == NULL) {
3894 addReply(c,shared.nullbulk);
3895 } else {
3896 if (set->type != REDIS_SET) {
3897 addReply(c,shared.wrongtypeerr);
3898 return;
3899 }
3900 de = dictGetRandomKey(set->ptr);
3901 if (de == NULL) {
3902 addReply(c,shared.nullbulk);
3903 } else {
3904 robj *ele = dictGetEntryKey(de);
3905
3906 addReplyBulkLen(c,ele);
3907 addReply(c,ele);
3908 addReply(c,shared.crlf);
3909 }
3910 }
3911}
3912
ed9b544e 3913static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
3914 dict **d1 = (void*) s1, **d2 = (void*) s2;
3915
3305306f 3916 return dictSize(*d1)-dictSize(*d2);
ed9b544e 3917}
3918
3919static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) {
3920 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3921 dictIterator *di;
3922 dictEntry *de;
3923 robj *lenobj = NULL, *dstset = NULL;
3924 int j, cardinality = 0;
3925
ed9b544e 3926 for (j = 0; j < setsnum; j++) {
3927 robj *setobj;
3305306f 3928
3929 setobj = dstkey ?
3930 lookupKeyWrite(c->db,setskeys[j]) :
3931 lookupKeyRead(c->db,setskeys[j]);
3932 if (!setobj) {
ed9b544e 3933 zfree(dv);
5faa6025 3934 if (dstkey) {
3935 deleteKey(c->db,dstkey);
3936 addReply(c,shared.ok);
3937 } else {
3938 addReply(c,shared.nullmultibulk);
3939 }
ed9b544e 3940 return;
3941 }
ed9b544e 3942 if (setobj->type != REDIS_SET) {
3943 zfree(dv);
c937aa89 3944 addReply(c,shared.wrongtypeerr);
ed9b544e 3945 return;
3946 }
3947 dv[j] = setobj->ptr;
3948 }
3949 /* Sort sets from the smallest to largest, this will improve our
3950 * algorithm's performace */
3951 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
3952
3953 /* The first thing we should output is the total number of elements...
3954 * since this is a multi-bulk write, but at this stage we don't know
3955 * the intersection set size, so we use a trick, append an empty object
3956 * to the output list and save the pointer to later modify it with the
3957 * right length */
3958 if (!dstkey) {
3959 lenobj = createObject(REDIS_STRING,NULL);
3960 addReply(c,lenobj);
3961 decrRefCount(lenobj);
3962 } else {
3963 /* If we have a target key where to store the resulting set
3964 * create this key with an empty set inside */
3965 dstset = createSetObject();
ed9b544e 3966 }
3967
3968 /* Iterate all the elements of the first (smallest) set, and test
3969 * the element against all the other sets, if at least one set does
3970 * not include the element it is discarded */
3971 di = dictGetIterator(dv[0]);
ed9b544e 3972
3973 while((de = dictNext(di)) != NULL) {
3974 robj *ele;
3975
3976 for (j = 1; j < setsnum; j++)
3977 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
3978 if (j != setsnum)
3979 continue; /* at least one set does not contain the member */
3980 ele = dictGetEntryKey(de);
3981 if (!dstkey) {
942a3961 3982 addReplyBulkLen(c,ele);
ed9b544e 3983 addReply(c,ele);
3984 addReply(c,shared.crlf);
3985 cardinality++;
3986 } else {
3987 dictAdd(dstset->ptr,ele,NULL);
3988 incrRefCount(ele);
3989 }
3990 }
3991 dictReleaseIterator(di);
3992
83cdfe18
AG
3993 if (dstkey) {
3994 /* Store the resulting set into the target */
3995 deleteKey(c->db,dstkey);
3996 dictAdd(c->db->dict,dstkey,dstset);
3997 incrRefCount(dstkey);
3998 }
3999
40d224a9 4000 if (!dstkey) {
c937aa89 4001 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality);
40d224a9 4002 } else {
03fd01c7 4003 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
4004 dictSize((dict*)dstset->ptr)));
40d224a9 4005 server.dirty++;
4006 }
ed9b544e 4007 zfree(dv);
4008}
4009
4010static void sinterCommand(redisClient *c) {
4011 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
4012}
4013
4014static void sinterstoreCommand(redisClient *c) {
4015 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
4016}
4017
f4f56e1d 4018#define REDIS_OP_UNION 0
4019#define REDIS_OP_DIFF 1
4020
4021static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
40d224a9 4022 dict **dv = zmalloc(sizeof(dict*)*setsnum);
4023 dictIterator *di;
4024 dictEntry *de;
f4f56e1d 4025 robj *dstset = NULL;
40d224a9 4026 int j, cardinality = 0;
4027
40d224a9 4028 for (j = 0; j < setsnum; j++) {
4029 robj *setobj;
4030
4031 setobj = dstkey ?
4032 lookupKeyWrite(c->db,setskeys[j]) :
4033 lookupKeyRead(c->db,setskeys[j]);
4034 if (!setobj) {
4035 dv[j] = NULL;
4036 continue;
4037 }
4038 if (setobj->type != REDIS_SET) {
4039 zfree(dv);
4040 addReply(c,shared.wrongtypeerr);
4041 return;
4042 }
4043 dv[j] = setobj->ptr;
4044 }
4045
4046 /* We need a temp set object to store our union. If the dstkey
4047 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
4048 * this set object will be the resulting object to set into the target key*/
4049 dstset = createSetObject();
4050
40d224a9 4051 /* Iterate all the elements of all the sets, add every element a single
4052 * time to the result set */
4053 for (j = 0; j < setsnum; j++) {
51829ed3 4054 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
40d224a9 4055 if (!dv[j]) continue; /* non existing keys are like empty sets */
4056
4057 di = dictGetIterator(dv[j]);
40d224a9 4058
4059 while((de = dictNext(di)) != NULL) {
4060 robj *ele;
4061
4062 /* dictAdd will not add the same element multiple times */
4063 ele = dictGetEntryKey(de);
f4f56e1d 4064 if (op == REDIS_OP_UNION || j == 0) {
4065 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
4066 incrRefCount(ele);
40d224a9 4067 cardinality++;
4068 }
f4f56e1d 4069 } else if (op == REDIS_OP_DIFF) {
4070 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
4071 cardinality--;
4072 }
40d224a9 4073 }
4074 }
4075 dictReleaseIterator(di);
51829ed3
AG
4076
4077 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
40d224a9 4078 }
4079
f4f56e1d 4080 /* Output the content of the resulting set, if not in STORE mode */
4081 if (!dstkey) {
4082 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
4083 di = dictGetIterator(dstset->ptr);
f4f56e1d 4084 while((de = dictNext(di)) != NULL) {
4085 robj *ele;
4086
4087 ele = dictGetEntryKey(de);
942a3961 4088 addReplyBulkLen(c,ele);
f4f56e1d 4089 addReply(c,ele);
4090 addReply(c,shared.crlf);
4091 }
4092 dictReleaseIterator(di);
83cdfe18
AG
4093 } else {
4094 /* If we have a target key where to store the resulting set
4095 * create this key with the result set inside */
4096 deleteKey(c->db,dstkey);
4097 dictAdd(c->db->dict,dstkey,dstset);
4098 incrRefCount(dstkey);
f4f56e1d 4099 }
4100
4101 /* Cleanup */
40d224a9 4102 if (!dstkey) {
40d224a9 4103 decrRefCount(dstset);
4104 } else {
03fd01c7 4105 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
4106 dictSize((dict*)dstset->ptr)));
40d224a9 4107 server.dirty++;
4108 }
4109 zfree(dv);
4110}
4111
4112static void sunionCommand(redisClient *c) {
f4f56e1d 4113 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
40d224a9 4114}
4115
4116static void sunionstoreCommand(redisClient *c) {
f4f56e1d 4117 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
4118}
4119
4120static void sdiffCommand(redisClient *c) {
4121 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
4122}
4123
4124static void sdiffstoreCommand(redisClient *c) {
4125 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
40d224a9 4126}
4127
6b47e12e 4128/* ==================================== ZSets =============================== */
4129
4130/* ZSETs are ordered sets using two data structures to hold the same elements
4131 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
4132 * data structure.
4133 *
4134 * The elements are added to an hash table mapping Redis objects to scores.
4135 * At the same time the elements are added to a skip list mapping scores
4136 * to Redis objects (so objects are sorted by scores in this "view"). */
4137
4138/* This skiplist implementation is almost a C translation of the original
4139 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
4140 * Alternative to Balanced Trees", modified in three ways:
4141 * a) this implementation allows for repeated values.
4142 * b) the comparison is not just by key (our 'score') but by satellite data.
4143 * c) there is a back pointer, so it's a doubly linked list with the back
4144 * pointers being only at "level 1". This allows to traverse the list
4145 * from tail to head, useful for ZREVRANGE. */
4146
4147static zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
4148 zskiplistNode *zn = zmalloc(sizeof(*zn));
4149
4150 zn->forward = zmalloc(sizeof(zskiplistNode*) * level);
4151 zn->score = score;
4152 zn->obj = obj;
4153 return zn;
4154}
4155
4156static zskiplist *zslCreate(void) {
4157 int j;
4158 zskiplist *zsl;
4159
4160 zsl = zmalloc(sizeof(*zsl));
4161 zsl->level = 1;
cc812361 4162 zsl->length = 0;
6b47e12e 4163 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
4164 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++)
4165 zsl->header->forward[j] = NULL;
e3870fab 4166 zsl->header->backward = NULL;
4167 zsl->tail = NULL;
6b47e12e 4168 return zsl;
4169}
4170
fd8ccf44 4171static void zslFreeNode(zskiplistNode *node) {
4172 decrRefCount(node->obj);
ad807e6f 4173 zfree(node->forward);
fd8ccf44 4174 zfree(node);
4175}
4176
4177static void zslFree(zskiplist *zsl) {
ad807e6f 4178 zskiplistNode *node = zsl->header->forward[0], *next;
fd8ccf44 4179
ad807e6f 4180 zfree(zsl->header->forward);
4181 zfree(zsl->header);
fd8ccf44 4182 while(node) {
599379dd 4183 next = node->forward[0];
fd8ccf44 4184 zslFreeNode(node);
4185 node = next;
4186 }
ad807e6f 4187 zfree(zsl);
fd8ccf44 4188}
4189
6b47e12e 4190static int zslRandomLevel(void) {
4191 int level = 1;
4192 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
4193 level += 1;
4194 return level;
4195}
4196
4197static void zslInsert(zskiplist *zsl, double score, robj *obj) {
4198 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4199 int i, level;
4200
4201 x = zsl->header;
4202 for (i = zsl->level-1; i >= 0; i--) {
9d60e6e4 4203 while (x->forward[i] &&
4204 (x->forward[i]->score < score ||
4205 (x->forward[i]->score == score &&
4206 compareStringObjects(x->forward[i]->obj,obj) < 0)))
6b47e12e 4207 x = x->forward[i];
4208 update[i] = x;
4209 }
6b47e12e 4210 /* we assume the key is not already inside, since we allow duplicated
4211 * scores, and the re-insertion of score and redis object should never
4212 * happpen since the caller of zslInsert() should test in the hash table
4213 * if the element is already inside or not. */
4214 level = zslRandomLevel();
4215 if (level > zsl->level) {
4216 for (i = zsl->level; i < level; i++)
4217 update[i] = zsl->header;
4218 zsl->level = level;
4219 }
4220 x = zslCreateNode(level,score,obj);
4221 for (i = 0; i < level; i++) {
4222 x->forward[i] = update[i]->forward[i];
4223 update[i]->forward[i] = x;
4224 }
bb975144 4225 x->backward = (update[0] == zsl->header) ? NULL : update[0];
e3870fab 4226 if (x->forward[0])
4227 x->forward[0]->backward = x;
4228 else
4229 zsl->tail = x;
cc812361 4230 zsl->length++;
6b47e12e 4231}
4232
50c55df5 4233/* Delete an element with matching score/object from the skiplist. */
fd8ccf44 4234static int zslDelete(zskiplist *zsl, double score, robj *obj) {
e197b441 4235 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4236 int i;
4237
4238 x = zsl->header;
4239 for (i = zsl->level-1; i >= 0; i--) {
9d60e6e4 4240 while (x->forward[i] &&
4241 (x->forward[i]->score < score ||
4242 (x->forward[i]->score == score &&
4243 compareStringObjects(x->forward[i]->obj,obj) < 0)))
e197b441 4244 x = x->forward[i];
4245 update[i] = x;
4246 }
4247 /* We may have multiple elements with the same score, what we need
4248 * is to find the element with both the right score and object. */
4249 x = x->forward[0];
50c55df5 4250 if (x && score == x->score && compareStringObjects(x->obj,obj) == 0) {
9d60e6e4 4251 for (i = 0; i < zsl->level; i++) {
4252 if (update[i]->forward[i] != x) break;
4253 update[i]->forward[i] = x->forward[i];
4254 }
4255 if (x->forward[0]) {
4256 x->forward[0]->backward = (x->backward == zsl->header) ?
4257 NULL : x->backward;
e197b441 4258 } else {
9d60e6e4 4259 zsl->tail = x->backward;
e197b441 4260 }
9d60e6e4 4261 zslFreeNode(x);
4262 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4263 zsl->level--;
4264 zsl->length--;
4265 return 1;
4266 } else {
4267 return 0; /* not found */
e197b441 4268 }
4269 return 0; /* not found */
fd8ccf44 4270}
4271
1807985b 4272/* Delete all the elements with score between min and max from the skiplist.
4273 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
4274 * Note that this function takes the reference to the hash table view of the
4275 * sorted set, in order to remove the elements from the hash table too. */
4276static unsigned long zslDeleteRange(zskiplist *zsl, double min, double max, dict *dict) {
4277 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4278 unsigned long removed = 0;
4279 int i;
4280
4281 x = zsl->header;
4282 for (i = zsl->level-1; i >= 0; i--) {
4283 while (x->forward[i] && x->forward[i]->score < min)
4284 x = x->forward[i];
4285 update[i] = x;
4286 }
4287 /* We may have multiple elements with the same score, what we need
4288 * is to find the element with both the right score and object. */
4289 x = x->forward[0];
4290 while (x && x->score <= max) {
4291 zskiplistNode *next;
4292
4293 for (i = 0; i < zsl->level; i++) {
4294 if (update[i]->forward[i] != x) break;
4295 update[i]->forward[i] = x->forward[i];
4296 }
4297 if (x->forward[0]) {
4298 x->forward[0]->backward = (x->backward == zsl->header) ?
4299 NULL : x->backward;
4300 } else {
4301 zsl->tail = x->backward;
4302 }
4303 next = x->forward[0];
4304 dictDelete(dict,x->obj);
4305 zslFreeNode(x);
4306 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4307 zsl->level--;
4308 zsl->length--;
4309 removed++;
4310 x = next;
4311 }
4312 return removed; /* not found */
4313}
4314
50c55df5 4315/* Find the first node having a score equal or greater than the specified one.
4316 * Returns NULL if there is no match. */
4317static zskiplistNode *zslFirstWithScore(zskiplist *zsl, double score) {
4318 zskiplistNode *x;
4319 int i;
4320
4321 x = zsl->header;
4322 for (i = zsl->level-1; i >= 0; i--) {
4323 while (x->forward[i] && x->forward[i]->score < score)
4324 x = x->forward[i];
4325 }
4326 /* We may have multiple elements with the same score, what we need
4327 * is to find the element with both the right score and object. */
4328 return x->forward[0];
4329}
4330
fd8ccf44 4331/* The actual Z-commands implementations */
4332
7db723ad 4333/* This generic command implements both ZADD and ZINCRBY.
e2665397 4334 * scoreval is the score if the operation is a ZADD (doincrement == 0) or
7db723ad 4335 * the increment if the operation is a ZINCRBY (doincrement == 1). */
e2665397 4336static void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double scoreval, int doincrement) {
fd8ccf44 4337 robj *zsetobj;
4338 zset *zs;
4339 double *score;
4340
e2665397 4341 zsetobj = lookupKeyWrite(c->db,key);
fd8ccf44 4342 if (zsetobj == NULL) {
4343 zsetobj = createZsetObject();
e2665397 4344 dictAdd(c->db->dict,key,zsetobj);
4345 incrRefCount(key);
fd8ccf44 4346 } else {
4347 if (zsetobj->type != REDIS_ZSET) {
4348 addReply(c,shared.wrongtypeerr);
4349 return;
4350 }
4351 }
fd8ccf44 4352 zs = zsetobj->ptr;
e2665397 4353
7db723ad 4354 /* Ok now since we implement both ZADD and ZINCRBY here the code
e2665397 4355 * needs to handle the two different conditions. It's all about setting
4356 * '*score', that is, the new score to set, to the right value. */
4357 score = zmalloc(sizeof(double));
4358 if (doincrement) {
4359 dictEntry *de;
4360
4361 /* Read the old score. If the element was not present starts from 0 */
4362 de = dictFind(zs->dict,ele);
4363 if (de) {
4364 double *oldscore = dictGetEntryVal(de);
4365 *score = *oldscore + scoreval;
4366 } else {
4367 *score = scoreval;
4368 }
4369 } else {
4370 *score = scoreval;
4371 }
4372
4373 /* What follows is a simple remove and re-insert operation that is common
7db723ad 4374 * to both ZADD and ZINCRBY... */
e2665397 4375 if (dictAdd(zs->dict,ele,score) == DICT_OK) {
fd8ccf44 4376 /* case 1: New element */
e2665397 4377 incrRefCount(ele); /* added to hash */
4378 zslInsert(zs->zsl,*score,ele);
4379 incrRefCount(ele); /* added to skiplist */
fd8ccf44 4380 server.dirty++;
e2665397 4381 if (doincrement)
e2665397 4382 addReplyDouble(c,*score);
91d71bfc 4383 else
4384 addReply(c,shared.cone);
fd8ccf44 4385 } else {
4386 dictEntry *de;
4387 double *oldscore;
4388
4389 /* case 2: Score update operation */
e2665397 4390 de = dictFind(zs->dict,ele);
fd8ccf44 4391 assert(de != NULL);
4392 oldscore = dictGetEntryVal(de);
4393 if (*score != *oldscore) {
4394 int deleted;
4395
e2665397 4396 /* Remove and insert the element in the skip list with new score */
4397 deleted = zslDelete(zs->zsl,*oldscore,ele);
fd8ccf44 4398 assert(deleted != 0);
e2665397 4399 zslInsert(zs->zsl,*score,ele);
4400 incrRefCount(ele);
4401 /* Update the score in the hash table */
4402 dictReplace(zs->dict,ele,score);
fd8ccf44 4403 server.dirty++;
2161a965 4404 } else {
4405 zfree(score);
fd8ccf44 4406 }
e2665397 4407 if (doincrement)
4408 addReplyDouble(c,*score);
4409 else
4410 addReply(c,shared.czero);
fd8ccf44 4411 }
4412}
4413
e2665397 4414static void zaddCommand(redisClient *c) {
4415 double scoreval;
4416
4417 scoreval = strtod(c->argv[2]->ptr,NULL);
4418 zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,0);
4419}
4420
7db723ad 4421static void zincrbyCommand(redisClient *c) {
e2665397 4422 double scoreval;
4423
4424 scoreval = strtod(c->argv[2]->ptr,NULL);
4425 zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,1);
4426}
4427
1b7106e7 4428static void zremCommand(redisClient *c) {
4429 robj *zsetobj;
4430 zset *zs;
4431
4432 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4433 if (zsetobj == NULL) {
4434 addReply(c,shared.czero);
4435 } else {
4436 dictEntry *de;
4437 double *oldscore;
4438 int deleted;
4439
4440 if (zsetobj->type != REDIS_ZSET) {
4441 addReply(c,shared.wrongtypeerr);
4442 return;
4443 }
4444 zs = zsetobj->ptr;
4445 de = dictFind(zs->dict,c->argv[2]);
4446 if (de == NULL) {
4447 addReply(c,shared.czero);
4448 return;
4449 }
4450 /* Delete from the skiplist */
4451 oldscore = dictGetEntryVal(de);
4452 deleted = zslDelete(zs->zsl,*oldscore,c->argv[2]);
4453 assert(deleted != 0);
4454
4455 /* Delete from the hash table */
4456 dictDelete(zs->dict,c->argv[2]);
4457 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4458 server.dirty++;
4459 addReply(c,shared.cone);
4460 }
4461}
4462
1807985b 4463static void zremrangebyscoreCommand(redisClient *c) {
4464 double min = strtod(c->argv[2]->ptr,NULL);
4465 double max = strtod(c->argv[3]->ptr,NULL);
4466 robj *zsetobj;
4467 zset *zs;
4468
4469 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4470 if (zsetobj == NULL) {
4471 addReply(c,shared.czero);
4472 } else {
4473 long deleted;
4474
4475 if (zsetobj->type != REDIS_ZSET) {
4476 addReply(c,shared.wrongtypeerr);
4477 return;
4478 }
4479 zs = zsetobj->ptr;
4480 deleted = zslDeleteRange(zs->zsl,min,max,zs->dict);
4481 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4482 server.dirty += deleted;
4483 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",deleted));
4484 }
4485}
4486
e3870fab 4487static void zrangeGenericCommand(redisClient *c, int reverse) {
cc812361 4488 robj *o;
4489 int start = atoi(c->argv[2]->ptr);
4490 int end = atoi(c->argv[3]->ptr);
4491
4492 o = lookupKeyRead(c->db,c->argv[1]);
4493 if (o == NULL) {
4494 addReply(c,shared.nullmultibulk);
4495 } else {
4496 if (o->type != REDIS_ZSET) {
4497 addReply(c,shared.wrongtypeerr);
4498 } else {
4499 zset *zsetobj = o->ptr;
4500 zskiplist *zsl = zsetobj->zsl;
4501 zskiplistNode *ln;
4502
4503 int llen = zsl->length;
4504 int rangelen, j;
4505 robj *ele;
4506
4507 /* convert negative indexes */
4508 if (start < 0) start = llen+start;
4509 if (end < 0) end = llen+end;
4510 if (start < 0) start = 0;
4511 if (end < 0) end = 0;
4512
4513 /* indexes sanity checks */
4514 if (start > end || start >= llen) {
4515 /* Out of range start or start > end result in empty list */
4516 addReply(c,shared.emptymultibulk);
4517 return;
4518 }
4519 if (end >= llen) end = llen-1;
4520 rangelen = (end-start)+1;
4521
4522 /* Return the result in form of a multi-bulk reply */
e3870fab 4523 if (reverse) {
4524 ln = zsl->tail;
4525 while (start--)
4526 ln = ln->backward;
4527 } else {
4528 ln = zsl->header->forward[0];
4529 while (start--)
4530 ln = ln->forward[0];
4531 }
cc812361 4532
4533 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
4534 for (j = 0; j < rangelen; j++) {
0aad7a19 4535 ele = ln->obj;
cc812361 4536 addReplyBulkLen(c,ele);
4537 addReply(c,ele);
4538 addReply(c,shared.crlf);
e3870fab 4539 ln = reverse ? ln->backward : ln->forward[0];
cc812361 4540 }
4541 }
4542 }
4543}
4544
e3870fab 4545static void zrangeCommand(redisClient *c) {
4546 zrangeGenericCommand(c,0);
4547}
4548
4549static void zrevrangeCommand(redisClient *c) {
4550 zrangeGenericCommand(c,1);
4551}
4552
50c55df5 4553static void zrangebyscoreCommand(redisClient *c) {
4554 robj *o;
4555 double min = strtod(c->argv[2]->ptr,NULL);
4556 double max = strtod(c->argv[3]->ptr,NULL);
4557
4558 o = lookupKeyRead(c->db,c->argv[1]);
4559 if (o == NULL) {
4560 addReply(c,shared.nullmultibulk);
4561 } else {
4562 if (o->type != REDIS_ZSET) {
4563 addReply(c,shared.wrongtypeerr);
4564 } else {
4565 zset *zsetobj = o->ptr;
4566 zskiplist *zsl = zsetobj->zsl;
4567 zskiplistNode *ln;
4568 robj *ele, *lenobj;
4569 unsigned int rangelen = 0;
4570
4571 /* Get the first node with the score >= min */
4572 ln = zslFirstWithScore(zsl,min);
4573 if (ln == NULL) {
4574 /* No element matching the speciifed interval */
4575 addReply(c,shared.emptymultibulk);
4576 return;
4577 }
4578
4579 /* We don't know in advance how many matching elements there
4580 * are in the list, so we push this object that will represent
4581 * the multi-bulk length in the output buffer, and will "fix"
4582 * it later */
4583 lenobj = createObject(REDIS_STRING,NULL);
4584 addReply(c,lenobj);
4585
dbbc7285 4586 while(ln && ln->score <= max) {
50c55df5 4587 ele = ln->obj;
4588 addReplyBulkLen(c,ele);
4589 addReply(c,ele);
4590 addReply(c,shared.crlf);
4591 ln = ln->forward[0];
4592 rangelen++;
4593 }
4594 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",rangelen);
4595 }
4596 }
4597}
4598
3c41331e 4599static void zcardCommand(redisClient *c) {
e197b441 4600 robj *o;
4601 zset *zs;
4602
4603 o = lookupKeyRead(c->db,c->argv[1]);
4604 if (o == NULL) {
4605 addReply(c,shared.czero);
4606 return;
4607 } else {
4608 if (o->type != REDIS_ZSET) {
4609 addReply(c,shared.wrongtypeerr);
4610 } else {
4611 zs = o->ptr;
4612 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",zs->zsl->length));
4613 }
4614 }
4615}
4616
6e333bbe 4617static void zscoreCommand(redisClient *c) {
4618 robj *o;
4619 zset *zs;
4620
4621 o = lookupKeyRead(c->db,c->argv[1]);
4622 if (o == NULL) {
96d8b4ee 4623 addReply(c,shared.nullbulk);
6e333bbe 4624 return;
4625 } else {
4626 if (o->type != REDIS_ZSET) {
4627 addReply(c,shared.wrongtypeerr);
4628 } else {
4629 dictEntry *de;
4630
4631 zs = o->ptr;
4632 de = dictFind(zs->dict,c->argv[2]);
4633 if (!de) {
4634 addReply(c,shared.nullbulk);
4635 } else {
6e333bbe 4636 double *score = dictGetEntryVal(de);
4637
e2665397 4638 addReplyDouble(c,*score);
6e333bbe 4639 }
4640 }
4641 }
4642}
4643
6b47e12e 4644/* ========================= Non type-specific commands ==================== */
4645
ed9b544e 4646static void flushdbCommand(redisClient *c) {
ca37e9cd 4647 server.dirty += dictSize(c->db->dict);
3305306f 4648 dictEmpty(c->db->dict);
4649 dictEmpty(c->db->expires);
ed9b544e 4650 addReply(c,shared.ok);
ed9b544e 4651}
4652
4653static void flushallCommand(redisClient *c) {
ca37e9cd 4654 server.dirty += emptyDb();
ed9b544e 4655 addReply(c,shared.ok);
f78fd11b 4656 rdbSave(server.dbfilename);
ca37e9cd 4657 server.dirty++;
ed9b544e 4658}
4659
56906eef 4660static redisSortOperation *createSortOperation(int type, robj *pattern) {
ed9b544e 4661 redisSortOperation *so = zmalloc(sizeof(*so));
ed9b544e 4662 so->type = type;
4663 so->pattern = pattern;
4664 return so;
4665}
4666
4667/* Return the value associated to the key with a name obtained
4668 * substituting the first occurence of '*' in 'pattern' with 'subst' */
56906eef 4669static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
ed9b544e 4670 char *p;
4671 sds spat, ssub;
4672 robj keyobj;
4673 int prefixlen, sublen, postfixlen;
ed9b544e 4674 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
4675 struct {
f1017b3f 4676 long len;
4677 long free;
ed9b544e 4678 char buf[REDIS_SORTKEY_MAX+1];
4679 } keyname;
4680
28173a49 4681 /* If the pattern is "#" return the substitution object itself in order
4682 * to implement the "SORT ... GET #" feature. */
4683 spat = pattern->ptr;
4684 if (spat[0] == '#' && spat[1] == '\0') {
4685 return subst;
4686 }
4687
4688 /* The substitution object may be specially encoded. If so we create
9d65a1bb 4689 * a decoded object on the fly. Otherwise getDecodedObject will just
4690 * increment the ref count, that we'll decrement later. */
4691 subst = getDecodedObject(subst);
942a3961 4692
ed9b544e 4693 ssub = subst->ptr;
4694 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
4695 p = strchr(spat,'*');
ed5a857a 4696 if (!p) {
4697 decrRefCount(subst);
4698 return NULL;
4699 }
ed9b544e 4700
4701 prefixlen = p-spat;
4702 sublen = sdslen(ssub);
4703 postfixlen = sdslen(spat)-(prefixlen+1);
4704 memcpy(keyname.buf,spat,prefixlen);
4705 memcpy(keyname.buf+prefixlen,ssub,sublen);
4706 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
4707 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
4708 keyname.len = prefixlen+sublen+postfixlen;
4709
4710 keyobj.refcount = 1;
4711 keyobj.type = REDIS_STRING;
4712 keyobj.ptr = ((char*)&keyname)+(sizeof(long)*2);
4713
942a3961 4714 decrRefCount(subst);
4715
a4d1ba9a 4716 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
3305306f 4717 return lookupKeyRead(db,&keyobj);
ed9b544e 4718}
4719
4720/* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
4721 * the additional parameter is not standard but a BSD-specific we have to
4722 * pass sorting parameters via the global 'server' structure */
4723static int sortCompare(const void *s1, const void *s2) {
4724 const redisSortObject *so1 = s1, *so2 = s2;
4725 int cmp;
4726
4727 if (!server.sort_alpha) {
4728 /* Numeric sorting. Here it's trivial as we precomputed scores */
4729 if (so1->u.score > so2->u.score) {
4730 cmp = 1;
4731 } else if (so1->u.score < so2->u.score) {
4732 cmp = -1;
4733 } else {
4734 cmp = 0;
4735 }
4736 } else {
4737 /* Alphanumeric sorting */
4738 if (server.sort_bypattern) {
4739 if (!so1->u.cmpobj || !so2->u.cmpobj) {
4740 /* At least one compare object is NULL */
4741 if (so1->u.cmpobj == so2->u.cmpobj)
4742 cmp = 0;
4743 else if (so1->u.cmpobj == NULL)
4744 cmp = -1;
4745 else
4746 cmp = 1;
4747 } else {
4748 /* We have both the objects, use strcoll */
4749 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
4750 }
4751 } else {
4752 /* Compare elements directly */
9d65a1bb 4753 robj *dec1, *dec2;
4754
4755 dec1 = getDecodedObject(so1->obj);
4756 dec2 = getDecodedObject(so2->obj);
4757 cmp = strcoll(dec1->ptr,dec2->ptr);
4758 decrRefCount(dec1);
4759 decrRefCount(dec2);
ed9b544e 4760 }
4761 }
4762 return server.sort_desc ? -cmp : cmp;
4763}
4764
4765/* The SORT command is the most complex command in Redis. Warning: this code
4766 * is optimized for speed and a bit less for readability */
4767static void sortCommand(redisClient *c) {
ed9b544e 4768 list *operations;
4769 int outputlen = 0;
4770 int desc = 0, alpha = 0;
4771 int limit_start = 0, limit_count = -1, start, end;
4772 int j, dontsort = 0, vectorlen;
4773 int getop = 0; /* GET operation counter */
443c6409 4774 robj *sortval, *sortby = NULL, *storekey = NULL;
ed9b544e 4775 redisSortObject *vector; /* Resulting vector to sort */
4776
4777 /* Lookup the key to sort. It must be of the right types */
3305306f 4778 sortval = lookupKeyRead(c->db,c->argv[1]);
4779 if (sortval == NULL) {
c937aa89 4780 addReply(c,shared.nokeyerr);
ed9b544e 4781 return;
4782 }
ed9b544e 4783 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST) {
c937aa89 4784 addReply(c,shared.wrongtypeerr);
ed9b544e 4785 return;
4786 }
4787
4788 /* Create a list of operations to perform for every sorted element.
4789 * Operations can be GET/DEL/INCR/DECR */
4790 operations = listCreate();
092dac2a 4791 listSetFreeMethod(operations,zfree);
ed9b544e 4792 j = 2;
4793
4794 /* Now we need to protect sortval incrementing its count, in the future
4795 * SORT may have options able to overwrite/delete keys during the sorting
4796 * and the sorted key itself may get destroied */
4797 incrRefCount(sortval);
4798
4799 /* The SORT command has an SQL-alike syntax, parse it */
4800 while(j < c->argc) {
4801 int leftargs = c->argc-j-1;
4802 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
4803 desc = 0;
4804 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
4805 desc = 1;
4806 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
4807 alpha = 1;
4808 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
4809 limit_start = atoi(c->argv[j+1]->ptr);
4810 limit_count = atoi(c->argv[j+2]->ptr);
4811 j+=2;
443c6409 4812 } else if (!strcasecmp(c->argv[j]->ptr,"store") && leftargs >= 1) {
4813 storekey = c->argv[j+1];
4814 j++;
ed9b544e 4815 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
4816 sortby = c->argv[j+1];
4817 /* If the BY pattern does not contain '*', i.e. it is constant,
4818 * we don't need to sort nor to lookup the weight keys. */
4819 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
4820 j++;
4821 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
4822 listAddNodeTail(operations,createSortOperation(
4823 REDIS_SORT_GET,c->argv[j+1]));
4824 getop++;
4825 j++;
ed9b544e 4826 } else {
4827 decrRefCount(sortval);
4828 listRelease(operations);
c937aa89 4829 addReply(c,shared.syntaxerr);
ed9b544e 4830 return;
4831 }
4832 j++;
4833 }
4834
4835 /* Load the sorting vector with all the objects to sort */
4836 vectorlen = (sortval->type == REDIS_LIST) ?
4837 listLength((list*)sortval->ptr) :
3305306f 4838 dictSize((dict*)sortval->ptr);
ed9b544e 4839 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
ed9b544e 4840 j = 0;
4841 if (sortval->type == REDIS_LIST) {
4842 list *list = sortval->ptr;
6208b3a7 4843 listNode *ln;
4844
4845 listRewind(list);
4846 while((ln = listYield(list))) {
ed9b544e 4847 robj *ele = ln->value;
4848 vector[j].obj = ele;
4849 vector[j].u.score = 0;
4850 vector[j].u.cmpobj = NULL;
ed9b544e 4851 j++;
4852 }
4853 } else {
4854 dict *set = sortval->ptr;
4855 dictIterator *di;
4856 dictEntry *setele;
4857
4858 di = dictGetIterator(set);
ed9b544e 4859 while((setele = dictNext(di)) != NULL) {
4860 vector[j].obj = dictGetEntryKey(setele);
4861 vector[j].u.score = 0;
4862 vector[j].u.cmpobj = NULL;
4863 j++;
4864 }
4865 dictReleaseIterator(di);
4866 }
4867 assert(j == vectorlen);
4868
4869 /* Now it's time to load the right scores in the sorting vector */
4870 if (dontsort == 0) {
4871 for (j = 0; j < vectorlen; j++) {
4872 if (sortby) {
4873 robj *byval;
4874
3305306f 4875 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
ed9b544e 4876 if (!byval || byval->type != REDIS_STRING) continue;
4877 if (alpha) {
9d65a1bb 4878 vector[j].u.cmpobj = getDecodedObject(byval);
ed9b544e 4879 } else {
942a3961 4880 if (byval->encoding == REDIS_ENCODING_RAW) {
4881 vector[j].u.score = strtod(byval->ptr,NULL);
4882 } else {
9d65a1bb 4883 /* Don't need to decode the object if it's
4884 * integer-encoded (the only encoding supported) so
4885 * far. We can just cast it */
f1017b3f 4886 if (byval->encoding == REDIS_ENCODING_INT) {
942a3961 4887 vector[j].u.score = (long)byval->ptr;
f1017b3f 4888 } else
942a3961 4889 assert(1 != 1);
4890 }
ed9b544e 4891 }
4892 } else {
942a3961 4893 if (!alpha) {
4894 if (vector[j].obj->encoding == REDIS_ENCODING_RAW)
4895 vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
4896 else {
4897 if (vector[j].obj->encoding == REDIS_ENCODING_INT)
4898 vector[j].u.score = (long) vector[j].obj->ptr;
4899 else
4900 assert(1 != 1);
4901 }
4902 }
ed9b544e 4903 }
4904 }
4905 }
4906
4907 /* We are ready to sort the vector... perform a bit of sanity check
4908 * on the LIMIT option too. We'll use a partial version of quicksort. */
4909 start = (limit_start < 0) ? 0 : limit_start;
4910 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
4911 if (start >= vectorlen) {
4912 start = vectorlen-1;
4913 end = vectorlen-2;
4914 }
4915 if (end >= vectorlen) end = vectorlen-1;
4916
4917 if (dontsort == 0) {
4918 server.sort_desc = desc;
4919 server.sort_alpha = alpha;
4920 server.sort_bypattern = sortby ? 1 : 0;
5f5b9840 4921 if (sortby && (start != 0 || end != vectorlen-1))
4922 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
4923 else
4924 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
ed9b544e 4925 }
4926
4927 /* Send command output to the output buffer, performing the specified
4928 * GET/DEL/INCR/DECR operations if any. */
4929 outputlen = getop ? getop*(end-start+1) : end-start+1;
443c6409 4930 if (storekey == NULL) {
4931 /* STORE option not specified, sent the sorting result to client */
4932 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
4933 for (j = start; j <= end; j++) {
4934 listNode *ln;
4935 if (!getop) {
4936 addReplyBulkLen(c,vector[j].obj);
4937 addReply(c,vector[j].obj);
4938 addReply(c,shared.crlf);
4939 }
4940 listRewind(operations);
4941 while((ln = listYield(operations))) {
4942 redisSortOperation *sop = ln->value;
4943 robj *val = lookupKeyByPattern(c->db,sop->pattern,
4944 vector[j].obj);
4945
4946 if (sop->type == REDIS_SORT_GET) {
4947 if (!val || val->type != REDIS_STRING) {
4948 addReply(c,shared.nullbulk);
4949 } else {
4950 addReplyBulkLen(c,val);
4951 addReply(c,val);
4952 addReply(c,shared.crlf);
4953 }
4954 } else {
4955 assert(sop->type == REDIS_SORT_GET); /* always fails */
4956 }
4957 }
ed9b544e 4958 }
443c6409 4959 } else {
4960 robj *listObject = createListObject();
4961 list *listPtr = (list*) listObject->ptr;
4962
4963 /* STORE option specified, set the sorting result as a List object */
4964 for (j = start; j <= end; j++) {
4965 listNode *ln;
4966 if (!getop) {
4967 listAddNodeTail(listPtr,vector[j].obj);
4968 incrRefCount(vector[j].obj);
4969 }
4970 listRewind(operations);
4971 while((ln = listYield(operations))) {
4972 redisSortOperation *sop = ln->value;
4973 robj *val = lookupKeyByPattern(c->db,sop->pattern,
4974 vector[j].obj);
4975
4976 if (sop->type == REDIS_SORT_GET) {
4977 if (!val || val->type != REDIS_STRING) {
4978 listAddNodeTail(listPtr,createStringObject("",0));
4979 } else {
4980 listAddNodeTail(listPtr,val);
4981 incrRefCount(val);
4982 }
ed9b544e 4983 } else {
443c6409 4984 assert(sop->type == REDIS_SORT_GET); /* always fails */
ed9b544e 4985 }
ed9b544e 4986 }
ed9b544e 4987 }
121796f7 4988 if (dictReplace(c->db->dict,storekey,listObject)) {
4989 incrRefCount(storekey);
4990 }
443c6409 4991 /* Note: we add 1 because the DB is dirty anyway since even if the
4992 * SORT result is empty a new key is set and maybe the old content
4993 * replaced. */
4994 server.dirty += 1+outputlen;
4995 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",outputlen));
ed9b544e 4996 }
4997
4998 /* Cleanup */
4999 decrRefCount(sortval);
5000 listRelease(operations);
5001 for (j = 0; j < vectorlen; j++) {
5002 if (sortby && alpha && vector[j].u.cmpobj)
5003 decrRefCount(vector[j].u.cmpobj);
5004 }
5005 zfree(vector);
5006}
5007
5008static void infoCommand(redisClient *c) {
5009 sds info;
5010 time_t uptime = time(NULL)-server.stat_starttime;
c3cb078d 5011 int j;
ed9b544e 5012
5013 info = sdscatprintf(sdsempty(),
5014 "redis_version:%s\r\n"
f1017b3f 5015 "arch_bits:%s\r\n"
a0f643ea 5016 "uptime_in_seconds:%d\r\n"
5017 "uptime_in_days:%d\r\n"
ed9b544e 5018 "connected_clients:%d\r\n"
5019 "connected_slaves:%d\r\n"
5fba9f71 5020 "used_memory:%zu\r\n"
ed9b544e 5021 "changes_since_last_save:%lld\r\n"
be2bb6b0 5022 "bgsave_in_progress:%d\r\n"
ed9b544e 5023 "last_save_time:%d\r\n"
5024 "total_connections_received:%lld\r\n"
5025 "total_commands_processed:%lld\r\n"
a0f643ea 5026 "role:%s\r\n"
ed9b544e 5027 ,REDIS_VERSION,
f1017b3f 5028 (sizeof(long) == 8) ? "64" : "32",
a0f643ea 5029 uptime,
5030 uptime/(3600*24),
ed9b544e 5031 listLength(server.clients)-listLength(server.slaves),
5032 listLength(server.slaves),
5033 server.usedmemory,
5034 server.dirty,
9d65a1bb 5035 server.bgsavechildpid != -1,
ed9b544e 5036 server.lastsave,
5037 server.stat_numconnections,
5038 server.stat_numcommands,
a0f643ea 5039 server.masterhost == NULL ? "master" : "slave"
ed9b544e 5040 );
a0f643ea 5041 if (server.masterhost) {
5042 info = sdscatprintf(info,
5043 "master_host:%s\r\n"
5044 "master_port:%d\r\n"
5045 "master_link_status:%s\r\n"
5046 "master_last_io_seconds_ago:%d\r\n"
5047 ,server.masterhost,
5048 server.masterport,
5049 (server.replstate == REDIS_REPL_CONNECTED) ?
5050 "up" : "down",
f72b934d 5051 server.master ? ((int)(time(NULL)-server.master->lastinteraction)) : -1
a0f643ea 5052 );
5053 }
c3cb078d 5054 for (j = 0; j < server.dbnum; j++) {
5055 long long keys, vkeys;
5056
5057 keys = dictSize(server.db[j].dict);
5058 vkeys = dictSize(server.db[j].expires);
5059 if (keys || vkeys) {
9d65a1bb 5060 info = sdscatprintf(info, "db%d:keys=%lld,expires=%lld\r\n",
c3cb078d 5061 j, keys, vkeys);
5062 }
5063 }
c937aa89 5064 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info)));
ed9b544e 5065 addReplySds(c,info);
70003d28 5066 addReply(c,shared.crlf);
ed9b544e 5067}
5068
3305306f 5069static void monitorCommand(redisClient *c) {
5070 /* ignore MONITOR if aleady slave or in monitor mode */
5071 if (c->flags & REDIS_SLAVE) return;
5072
5073 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
5074 c->slaveseldb = 0;
6b47e12e 5075 listAddNodeTail(server.monitors,c);
3305306f 5076 addReply(c,shared.ok);
5077}
5078
5079/* ================================= Expire ================================= */
5080static int removeExpire(redisDb *db, robj *key) {
5081 if (dictDelete(db->expires,key) == DICT_OK) {
5082 return 1;
5083 } else {
5084 return 0;
5085 }
5086}
5087
5088static int setExpire(redisDb *db, robj *key, time_t when) {
5089 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
5090 return 0;
5091 } else {
5092 incrRefCount(key);
5093 return 1;
5094 }
5095}
5096
bb32ede5 5097/* Return the expire time of the specified key, or -1 if no expire
5098 * is associated with this key (i.e. the key is non volatile) */
5099static time_t getExpire(redisDb *db, robj *key) {
5100 dictEntry *de;
5101
5102 /* No expire? return ASAP */
5103 if (dictSize(db->expires) == 0 ||
5104 (de = dictFind(db->expires,key)) == NULL) return -1;
5105
5106 return (time_t) dictGetEntryVal(de);
5107}
5108
3305306f 5109static int expireIfNeeded(redisDb *db, robj *key) {
5110 time_t when;
5111 dictEntry *de;
5112
5113 /* No expire? return ASAP */
5114 if (dictSize(db->expires) == 0 ||
5115 (de = dictFind(db->expires,key)) == NULL) return 0;
5116
5117 /* Lookup the expire */
5118 when = (time_t) dictGetEntryVal(de);
5119 if (time(NULL) <= when) return 0;
5120
5121 /* Delete the key */
5122 dictDelete(db->expires,key);
5123 return dictDelete(db->dict,key) == DICT_OK;
5124}
5125
5126static int deleteIfVolatile(redisDb *db, robj *key) {
5127 dictEntry *de;
5128
5129 /* No expire? return ASAP */
5130 if (dictSize(db->expires) == 0 ||
5131 (de = dictFind(db->expires,key)) == NULL) return 0;
5132
5133 /* Delete the key */
0c66a471 5134 server.dirty++;
3305306f 5135 dictDelete(db->expires,key);
5136 return dictDelete(db->dict,key) == DICT_OK;
5137}
5138
802e8373 5139static void expireGenericCommand(redisClient *c, robj *key, time_t seconds) {
3305306f 5140 dictEntry *de;
3305306f 5141
802e8373 5142 de = dictFind(c->db->dict,key);
3305306f 5143 if (de == NULL) {
5144 addReply(c,shared.czero);
5145 return;
5146 }
43e5ccdf 5147 if (seconds < 0) {
5148 if (deleteKey(c->db,key)) server.dirty++;
5149 addReply(c, shared.cone);
3305306f 5150 return;
5151 } else {
5152 time_t when = time(NULL)+seconds;
802e8373 5153 if (setExpire(c->db,key,when)) {
3305306f 5154 addReply(c,shared.cone);
77423026 5155 server.dirty++;
5156 } else {
3305306f 5157 addReply(c,shared.czero);
77423026 5158 }
3305306f 5159 return;
5160 }
5161}
5162
802e8373 5163static void expireCommand(redisClient *c) {
5164 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10));
5165}
5166
5167static void expireatCommand(redisClient *c) {
5168 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10)-time(NULL));
5169}
5170
fd88489a 5171static void ttlCommand(redisClient *c) {
5172 time_t expire;
5173 int ttl = -1;
5174
5175 expire = getExpire(c->db,c->argv[1]);
5176 if (expire != -1) {
5177 ttl = (int) (expire-time(NULL));
5178 if (ttl < 0) ttl = -1;
5179 }
5180 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
5181}
5182
ed9b544e 5183/* =============================== Replication ============================= */
5184
a4d1ba9a 5185static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
ed9b544e 5186 ssize_t nwritten, ret = size;
5187 time_t start = time(NULL);
5188
5189 timeout++;
5190 while(size) {
5191 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
5192 nwritten = write(fd,ptr,size);
5193 if (nwritten == -1) return -1;
5194 ptr += nwritten;
5195 size -= nwritten;
5196 }
5197 if ((time(NULL)-start) > timeout) {
5198 errno = ETIMEDOUT;
5199 return -1;
5200 }
5201 }
5202 return ret;
5203}
5204
a4d1ba9a 5205static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
ed9b544e 5206 ssize_t nread, totread = 0;
5207 time_t start = time(NULL);
5208
5209 timeout++;
5210 while(size) {
5211 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
5212 nread = read(fd,ptr,size);
5213 if (nread == -1) return -1;
5214 ptr += nread;
5215 size -= nread;
5216 totread += nread;
5217 }
5218 if ((time(NULL)-start) > timeout) {
5219 errno = ETIMEDOUT;
5220 return -1;
5221 }
5222 }
5223 return totread;
5224}
5225
5226static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
5227 ssize_t nread = 0;
5228
5229 size--;
5230 while(size) {
5231 char c;
5232
5233 if (syncRead(fd,&c,1,timeout) == -1) return -1;
5234 if (c == '\n') {
5235 *ptr = '\0';
5236 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
5237 return nread;
5238 } else {
5239 *ptr++ = c;
5240 *ptr = '\0';
5241 nread++;
5242 }
5243 }
5244 return nread;
5245}
5246
5247static void syncCommand(redisClient *c) {
40d224a9 5248 /* ignore SYNC if aleady slave or in monitor mode */
5249 if (c->flags & REDIS_SLAVE) return;
5250
5251 /* SYNC can't be issued when the server has pending data to send to
5252 * the client about already issued commands. We need a fresh reply
5253 * buffer registering the differences between the BGSAVE and the current
5254 * dataset, so that we can copy to other slaves if needed. */
5255 if (listLength(c->reply) != 0) {
5256 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
5257 return;
5258 }
5259
5260 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
5261 /* Here we need to check if there is a background saving operation
5262 * in progress, or if it is required to start one */
9d65a1bb 5263 if (server.bgsavechildpid != -1) {
40d224a9 5264 /* Ok a background save is in progress. Let's check if it is a good
5265 * one for replication, i.e. if there is another slave that is
5266 * registering differences since the server forked to save */
5267 redisClient *slave;
5268 listNode *ln;
5269
6208b3a7 5270 listRewind(server.slaves);
5271 while((ln = listYield(server.slaves))) {
40d224a9 5272 slave = ln->value;
5273 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
40d224a9 5274 }
5275 if (ln) {
5276 /* Perfect, the server is already registering differences for
5277 * another slave. Set the right state, and copy the buffer. */
5278 listRelease(c->reply);
5279 c->reply = listDup(slave->reply);
40d224a9 5280 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5281 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
5282 } else {
5283 /* No way, we need to wait for the next BGSAVE in order to
5284 * register differences */
5285 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
5286 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
5287 }
5288 } else {
5289 /* Ok we don't have a BGSAVE in progress, let's start one */
5290 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
5291 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5292 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
5293 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
5294 return;
5295 }
5296 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5297 }
6208b3a7 5298 c->repldbfd = -1;
40d224a9 5299 c->flags |= REDIS_SLAVE;
5300 c->slaveseldb = 0;
6b47e12e 5301 listAddNodeTail(server.slaves,c);
40d224a9 5302 return;
5303}
5304
6208b3a7 5305static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
5306 redisClient *slave = privdata;
5307 REDIS_NOTUSED(el);
5308 REDIS_NOTUSED(mask);
5309 char buf[REDIS_IOBUF_LEN];
5310 ssize_t nwritten, buflen;
5311
5312 if (slave->repldboff == 0) {
5313 /* Write the bulk write count before to transfer the DB. In theory here
5314 * we don't know how much room there is in the output buffer of the
5315 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
5316 * operations) will never be smaller than the few bytes we need. */
5317 sds bulkcount;
5318
5319 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
5320 slave->repldbsize);
5321 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
5322 {
5323 sdsfree(bulkcount);
5324 freeClient(slave);
5325 return;
5326 }
5327 sdsfree(bulkcount);
5328 }
5329 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
5330 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
5331 if (buflen <= 0) {
5332 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
5333 (buflen == 0) ? "premature EOF" : strerror(errno));
5334 freeClient(slave);
5335 return;
5336 }
5337 if ((nwritten = write(fd,buf,buflen)) == -1) {
5338 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
5339 strerror(errno));
5340 freeClient(slave);
5341 return;
5342 }
5343 slave->repldboff += nwritten;
5344 if (slave->repldboff == slave->repldbsize) {
5345 close(slave->repldbfd);
5346 slave->repldbfd = -1;
5347 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5348 slave->replstate = REDIS_REPL_ONLINE;
5349 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
266373b2 5350 sendReplyToClient, slave) == AE_ERR) {
6208b3a7 5351 freeClient(slave);
5352 return;
5353 }
5354 addReplySds(slave,sdsempty());
5355 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
5356 }
5357}
ed9b544e 5358
a3b21203 5359/* This function is called at the end of every backgrond saving.
5360 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
5361 * otherwise REDIS_ERR is passed to the function.
5362 *
5363 * The goal of this function is to handle slaves waiting for a successful
5364 * background saving in order to perform non-blocking synchronization. */
5365static void updateSlavesWaitingBgsave(int bgsaveerr) {
6208b3a7 5366 listNode *ln;
5367 int startbgsave = 0;
ed9b544e 5368
6208b3a7 5369 listRewind(server.slaves);
5370 while((ln = listYield(server.slaves))) {
5371 redisClient *slave = ln->value;
ed9b544e 5372
6208b3a7 5373 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
5374 startbgsave = 1;
5375 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5376 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
dde65f3f 5377 struct redis_stat buf;
6208b3a7 5378
5379 if (bgsaveerr != REDIS_OK) {
5380 freeClient(slave);
5381 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
5382 continue;
5383 }
5384 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
dde65f3f 5385 redis_fstat(slave->repldbfd,&buf) == -1) {
6208b3a7 5386 freeClient(slave);
5387 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
5388 continue;
5389 }
5390 slave->repldboff = 0;
5391 slave->repldbsize = buf.st_size;
5392 slave->replstate = REDIS_REPL_SEND_BULK;
5393 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
266373b2 5394 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
6208b3a7 5395 freeClient(slave);
5396 continue;
5397 }
5398 }
ed9b544e 5399 }
6208b3a7 5400 if (startbgsave) {
5401 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5402 listRewind(server.slaves);
5403 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
5404 while((ln = listYield(server.slaves))) {
5405 redisClient *slave = ln->value;
ed9b544e 5406
6208b3a7 5407 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
5408 freeClient(slave);
5409 }
5410 }
5411 }
ed9b544e 5412}
5413
5414static int syncWithMaster(void) {
d0ccebcf 5415 char buf[1024], tmpfile[256], authcmd[1024];
ed9b544e 5416 int dumpsize;
5417 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
5418 int dfd;
5419
5420 if (fd == -1) {
5421 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
5422 strerror(errno));
5423 return REDIS_ERR;
5424 }
d0ccebcf 5425
5426 /* AUTH with the master if required. */
5427 if(server.masterauth) {
5428 snprintf(authcmd, 1024, "AUTH %s\r\n", server.masterauth);
5429 if (syncWrite(fd, authcmd, strlen(server.masterauth)+7, 5) == -1) {
5430 close(fd);
5431 redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",
5432 strerror(errno));
5433 return REDIS_ERR;
5434 }
5435 /* Read the AUTH result. */
5436 if (syncReadLine(fd,buf,1024,3600) == -1) {
5437 close(fd);
5438 redisLog(REDIS_WARNING,"I/O error reading auth result from MASTER: %s",
5439 strerror(errno));
5440 return REDIS_ERR;
5441 }
5442 if (buf[0] != '+') {
5443 close(fd);
5444 redisLog(REDIS_WARNING,"Cannot AUTH to MASTER, is the masterauth password correct?");
5445 return REDIS_ERR;
5446 }
5447 }
5448
ed9b544e 5449 /* Issue the SYNC command */
5450 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
5451 close(fd);
5452 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
5453 strerror(errno));
5454 return REDIS_ERR;
5455 }
5456 /* Read the bulk write count */
8c4d91fc 5457 if (syncReadLine(fd,buf,1024,3600) == -1) {
ed9b544e 5458 close(fd);
5459 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
5460 strerror(errno));
5461 return REDIS_ERR;
5462 }
4aa701c1 5463 if (buf[0] != '$') {
5464 close(fd);
5465 redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
5466 return REDIS_ERR;
5467 }
c937aa89 5468 dumpsize = atoi(buf+1);
ed9b544e 5469 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
5470 /* Read the bulk write data on a temp file */
5471 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
5472 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
5473 if (dfd == -1) {
5474 close(fd);
5475 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
5476 return REDIS_ERR;
5477 }
5478 while(dumpsize) {
5479 int nread, nwritten;
5480
5481 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
5482 if (nread == -1) {
5483 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
5484 strerror(errno));
5485 close(fd);
5486 close(dfd);
5487 return REDIS_ERR;
5488 }
5489 nwritten = write(dfd,buf,nread);
5490 if (nwritten == -1) {
5491 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
5492 close(fd);
5493 close(dfd);
5494 return REDIS_ERR;
5495 }
5496 dumpsize -= nread;
5497 }
5498 close(dfd);
5499 if (rename(tmpfile,server.dbfilename) == -1) {
5500 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
5501 unlink(tmpfile);
5502 close(fd);
5503 return REDIS_ERR;
5504 }
5505 emptyDb();
f78fd11b 5506 if (rdbLoad(server.dbfilename) != REDIS_OK) {
ed9b544e 5507 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
5508 close(fd);
5509 return REDIS_ERR;
5510 }
5511 server.master = createClient(fd);
5512 server.master->flags |= REDIS_MASTER;
5513 server.replstate = REDIS_REPL_CONNECTED;
5514 return REDIS_OK;
5515}
5516
321b0e13 5517static void slaveofCommand(redisClient *c) {
5518 if (!strcasecmp(c->argv[1]->ptr,"no") &&
5519 !strcasecmp(c->argv[2]->ptr,"one")) {
5520 if (server.masterhost) {
5521 sdsfree(server.masterhost);
5522 server.masterhost = NULL;
5523 if (server.master) freeClient(server.master);
5524 server.replstate = REDIS_REPL_NONE;
5525 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
5526 }
5527 } else {
5528 sdsfree(server.masterhost);
5529 server.masterhost = sdsdup(c->argv[1]->ptr);
5530 server.masterport = atoi(c->argv[2]->ptr);
5531 if (server.master) freeClient(server.master);
5532 server.replstate = REDIS_REPL_CONNECT;
5533 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
5534 server.masterhost, server.masterport);
5535 }
5536 addReply(c,shared.ok);
5537}
5538
3fd78bcd 5539/* ============================ Maxmemory directive ======================== */
5540
5541/* This function gets called when 'maxmemory' is set on the config file to limit
5542 * the max memory used by the server, and we are out of memory.
5543 * This function will try to, in order:
5544 *
5545 * - Free objects from the free list
5546 * - Try to remove keys with an EXPIRE set
5547 *
5548 * It is not possible to free enough memory to reach used-memory < maxmemory
5549 * the server will start refusing commands that will enlarge even more the
5550 * memory usage.
5551 */
5552static void freeMemoryIfNeeded(void) {
5553 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
5554 if (listLength(server.objfreelist)) {
5555 robj *o;
5556
5557 listNode *head = listFirst(server.objfreelist);
5558 o = listNodeValue(head);
5559 listDelNode(server.objfreelist,head);
5560 zfree(o);
5561 } else {
5562 int j, k, freed = 0;
5563
5564 for (j = 0; j < server.dbnum; j++) {
5565 int minttl = -1;
5566 robj *minkey = NULL;
5567 struct dictEntry *de;
5568
5569 if (dictSize(server.db[j].expires)) {
5570 freed = 1;
5571 /* From a sample of three keys drop the one nearest to
5572 * the natural expire */
5573 for (k = 0; k < 3; k++) {
5574 time_t t;
5575
5576 de = dictGetRandomKey(server.db[j].expires);
5577 t = (time_t) dictGetEntryVal(de);
5578 if (minttl == -1 || t < minttl) {
5579 minkey = dictGetEntryKey(de);
5580 minttl = t;
5581 }
5582 }
5583 deleteKey(server.db+j,minkey);
5584 }
5585 }
5586 if (!freed) return; /* nothing to free... */
5587 }
5588 }
5589}
5590
f80dff62 5591/* ============================== Append Only file ========================== */
5592
5593static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
5594 sds buf = sdsempty();
5595 int j;
5596 ssize_t nwritten;
5597 time_t now;
5598 robj *tmpargv[3];
5599
5600 /* The DB this command was targetting is not the same as the last command
5601 * we appendend. To issue a SELECT command is needed. */
5602 if (dictid != server.appendseldb) {
5603 char seldb[64];
5604
5605 snprintf(seldb,sizeof(seldb),"%d",dictid);
5606 buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
5607 strlen(seldb),seldb);
5608 server.appendseldb = dictid;
5609 }
5610
5611 /* "Fix" the argv vector if the command is EXPIRE. We want to translate
5612 * EXPIREs into EXPIREATs calls */
5613 if (cmd->proc == expireCommand) {
5614 long when;
5615
5616 tmpargv[0] = createStringObject("EXPIREAT",8);
5617 tmpargv[1] = argv[1];
5618 incrRefCount(argv[1]);
5619 when = time(NULL)+strtol(argv[2]->ptr,NULL,10);
5620 tmpargv[2] = createObject(REDIS_STRING,
5621 sdscatprintf(sdsempty(),"%ld",when));
5622 argv = tmpargv;
5623 }
5624
5625 /* Append the actual command */
5626 buf = sdscatprintf(buf,"*%d\r\n",argc);
5627 for (j = 0; j < argc; j++) {
5628 robj *o = argv[j];
5629
9d65a1bb 5630 o = getDecodedObject(o);
f80dff62 5631 buf = sdscatprintf(buf,"$%d\r\n",sdslen(o->ptr));
5632 buf = sdscatlen(buf,o->ptr,sdslen(o->ptr));
5633 buf = sdscatlen(buf,"\r\n",2);
9d65a1bb 5634 decrRefCount(o);
f80dff62 5635 }
5636
5637 /* Free the objects from the modified argv for EXPIREAT */
5638 if (cmd->proc == expireCommand) {
5639 for (j = 0; j < 3; j++)
5640 decrRefCount(argv[j]);
5641 }
5642
5643 /* We want to perform a single write. This should be guaranteed atomic
5644 * at least if the filesystem we are writing is a real physical one.
5645 * While this will save us against the server being killed I don't think
5646 * there is much to do about the whole server stopping for power problems
5647 * or alike */
5648 nwritten = write(server.appendfd,buf,sdslen(buf));
5649 if (nwritten != (signed)sdslen(buf)) {
5650 /* Ooops, we are in troubles. The best thing to do for now is
5651 * to simply exit instead to give the illusion that everything is
5652 * working as expected. */
5653 if (nwritten == -1) {
5654 redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno));
5655 } else {
5656 redisLog(REDIS_WARNING,"Exiting on short write while writing to the append-only file: %s",strerror(errno));
5657 }
5658 exit(1);
5659 }
85a83172 5660 /* If a background append only file rewriting is in progress we want to
5661 * accumulate the differences between the child DB and the current one
5662 * in a buffer, so that when the child process will do its work we
5663 * can append the differences to the new append only file. */
5664 if (server.bgrewritechildpid != -1)
5665 server.bgrewritebuf = sdscatlen(server.bgrewritebuf,buf,sdslen(buf));
5666
5667 sdsfree(buf);
f80dff62 5668 now = time(NULL);
5669 if (server.appendfsync == APPENDFSYNC_ALWAYS ||
5670 (server.appendfsync == APPENDFSYNC_EVERYSEC &&
5671 now-server.lastfsync > 1))
5672 {
5673 fsync(server.appendfd); /* Let's try to get this data on the disk */
5674 server.lastfsync = now;
5675 }
5676}
5677
5678/* In Redis commands are always executed in the context of a client, so in
5679 * order to load the append only file we need to create a fake client. */
5680static struct redisClient *createFakeClient(void) {
5681 struct redisClient *c = zmalloc(sizeof(*c));
5682
5683 selectDb(c,0);
5684 c->fd = -1;
5685 c->querybuf = sdsempty();
5686 c->argc = 0;
5687 c->argv = NULL;
5688 c->flags = 0;
9387d17d 5689 /* We set the fake client as a slave waiting for the synchronization
5690 * so that Redis will not try to send replies to this client. */
5691 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
f80dff62 5692 c->reply = listCreate();
5693 listSetFreeMethod(c->reply,decrRefCount);
5694 listSetDupMethod(c->reply,dupClientReplyValue);
5695 return c;
5696}
5697
5698static void freeFakeClient(struct redisClient *c) {
5699 sdsfree(c->querybuf);
5700 listRelease(c->reply);
5701 zfree(c);
5702}
5703
5704/* Replay the append log file. On error REDIS_OK is returned. On non fatal
5705 * error (the append only file is zero-length) REDIS_ERR is returned. On
5706 * fatal error an error message is logged and the program exists. */
5707int loadAppendOnlyFile(char *filename) {
5708 struct redisClient *fakeClient;
5709 FILE *fp = fopen(filename,"r");
5710 struct redis_stat sb;
5711
5712 if (redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0)
5713 return REDIS_ERR;
5714
5715 if (fp == NULL) {
5716 redisLog(REDIS_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
5717 exit(1);
5718 }
5719
5720 fakeClient = createFakeClient();
5721 while(1) {
5722 int argc, j;
5723 unsigned long len;
5724 robj **argv;
5725 char buf[128];
5726 sds argsds;
5727 struct redisCommand *cmd;
5728
5729 if (fgets(buf,sizeof(buf),fp) == NULL) {
5730 if (feof(fp))
5731 break;
5732 else
5733 goto readerr;
5734 }
5735 if (buf[0] != '*') goto fmterr;
5736 argc = atoi(buf+1);
5737 argv = zmalloc(sizeof(robj*)*argc);
5738 for (j = 0; j < argc; j++) {
5739 if (fgets(buf,sizeof(buf),fp) == NULL) goto readerr;
5740 if (buf[0] != '$') goto fmterr;
5741 len = strtol(buf+1,NULL,10);
5742 argsds = sdsnewlen(NULL,len);
5743 if (fread(argsds,len,1,fp) == 0) goto fmterr;
5744 argv[j] = createObject(REDIS_STRING,argsds);
5745 if (fread(buf,2,1,fp) == 0) goto fmterr; /* discard CRLF */
5746 }
5747
5748 /* Command lookup */
5749 cmd = lookupCommand(argv[0]->ptr);
5750 if (!cmd) {
5751 redisLog(REDIS_WARNING,"Unknown command '%s' reading the append only file", argv[0]->ptr);
5752 exit(1);
5753 }
5754 /* Try object sharing and encoding */
5755 if (server.shareobjects) {
5756 int j;
5757 for(j = 1; j < argc; j++)
5758 argv[j] = tryObjectSharing(argv[j]);
5759 }
5760 if (cmd->flags & REDIS_CMD_BULK)
5761 tryObjectEncoding(argv[argc-1]);
5762 /* Run the command in the context of a fake client */
5763 fakeClient->argc = argc;
5764 fakeClient->argv = argv;
5765 cmd->proc(fakeClient);
5766 /* Discard the reply objects list from the fake client */
5767 while(listLength(fakeClient->reply))
5768 listDelNode(fakeClient->reply,listFirst(fakeClient->reply));
5769 /* Clean up, ready for the next command */
5770 for (j = 0; j < argc; j++) decrRefCount(argv[j]);
5771 zfree(argv);
5772 }
5773 fclose(fp);
5774 freeFakeClient(fakeClient);
5775 return REDIS_OK;
5776
5777readerr:
5778 if (feof(fp)) {
5779 redisLog(REDIS_WARNING,"Unexpected end of file reading the append only file");
5780 } else {
5781 redisLog(REDIS_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno));
5782 }
5783 exit(1);
5784fmterr:
5785 redisLog(REDIS_WARNING,"Bad file format reading the append only file");
5786 exit(1);
5787}
5788
9d65a1bb 5789/* Write an object into a file in the bulk format $<count>\r\n<payload>\r\n */
5790static int fwriteBulk(FILE *fp, robj *obj) {
5791 char buf[128];
5792 obj = getDecodedObject(obj);
5793 snprintf(buf,sizeof(buf),"$%ld\r\n",(long)sdslen(obj->ptr));
5794 if (fwrite(buf,strlen(buf),1,fp) == 0) goto err;
5795 if (fwrite(obj->ptr,sdslen(obj->ptr),1,fp) == 0) goto err;
5796 if (fwrite("\r\n",2,1,fp) == 0) goto err;
5797 decrRefCount(obj);
5798 return 1;
5799err:
5800 decrRefCount(obj);
5801 return 0;
5802}
5803
5804/* Write a double value in bulk format $<count>\r\n<payload>\r\n */
5805static int fwriteBulkDouble(FILE *fp, double d) {
5806 char buf[128], dbuf[128];
5807
5808 snprintf(dbuf,sizeof(dbuf),"%.17g\r\n",d);
5809 snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(dbuf)-2);
5810 if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
5811 if (fwrite(dbuf,strlen(dbuf),1,fp) == 0) return 0;
5812 return 1;
5813}
5814
5815/* Write a long value in bulk format $<count>\r\n<payload>\r\n */
5816static int fwriteBulkLong(FILE *fp, long l) {
5817 char buf[128], lbuf[128];
5818
5819 snprintf(lbuf,sizeof(lbuf),"%ld\r\n",l);
5820 snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(lbuf)-2);
5821 if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
5822 if (fwrite(lbuf,strlen(lbuf),1,fp) == 0) return 0;
5823 return 1;
5824}
5825
5826/* Write a sequence of commands able to fully rebuild the dataset into
5827 * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */
5828static int rewriteAppendOnlyFile(char *filename) {
5829 dictIterator *di = NULL;
5830 dictEntry *de;
5831 FILE *fp;
5832 char tmpfile[256];
5833 int j;
5834 time_t now = time(NULL);
5835
5836 /* Note that we have to use a different temp name here compared to the
5837 * one used by rewriteAppendOnlyFileBackground() function. */
5838 snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
5839 fp = fopen(tmpfile,"w");
5840 if (!fp) {
5841 redisLog(REDIS_WARNING, "Failed rewriting the append only file: %s", strerror(errno));
5842 return REDIS_ERR;
5843 }
5844 for (j = 0; j < server.dbnum; j++) {
5845 char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
5846 redisDb *db = server.db+j;
5847 dict *d = db->dict;
5848 if (dictSize(d) == 0) continue;
5849 di = dictGetIterator(d);
5850 if (!di) {
5851 fclose(fp);
5852 return REDIS_ERR;
5853 }
5854
5855 /* SELECT the new DB */
5856 if (fwrite(selectcmd,sizeof(selectcmd)-1,1,fp) == 0) goto werr;
85a83172 5857 if (fwriteBulkLong(fp,j) == 0) goto werr;
9d65a1bb 5858
5859 /* Iterate this DB writing every entry */
5860 while((de = dictNext(di)) != NULL) {
5861 robj *key = dictGetEntryKey(de);
5862 robj *o = dictGetEntryVal(de);
5863 time_t expiretime = getExpire(db,key);
5864
5865 /* Save the key and associated value */
9d65a1bb 5866 if (o->type == REDIS_STRING) {
5867 /* Emit a SET command */
5868 char cmd[]="*3\r\n$3\r\nSET\r\n";
5869 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
5870 /* Key and value */
5871 if (fwriteBulk(fp,key) == 0) goto werr;
5872 if (fwriteBulk(fp,o) == 0) goto werr;
5873 } else if (o->type == REDIS_LIST) {
5874 /* Emit the RPUSHes needed to rebuild the list */
5875 list *list = o->ptr;
5876 listNode *ln;
5877
5878 listRewind(list);
5879 while((ln = listYield(list))) {
5880 char cmd[]="*3\r\n$5\r\nRPUSH\r\n";
5881 robj *eleobj = listNodeValue(ln);
5882
5883 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
5884 if (fwriteBulk(fp,key) == 0) goto werr;
5885 if (fwriteBulk(fp,eleobj) == 0) goto werr;
5886 }
5887 } else if (o->type == REDIS_SET) {
5888 /* Emit the SADDs needed to rebuild the set */
5889 dict *set = o->ptr;
5890 dictIterator *di = dictGetIterator(set);
5891 dictEntry *de;
5892
5893 while((de = dictNext(di)) != NULL) {
5894 char cmd[]="*3\r\n$4\r\nSADD\r\n";
5895 robj *eleobj = dictGetEntryKey(de);
5896
5897 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
5898 if (fwriteBulk(fp,key) == 0) goto werr;
5899 if (fwriteBulk(fp,eleobj) == 0) goto werr;
5900 }
5901 dictReleaseIterator(di);
5902 } else if (o->type == REDIS_ZSET) {
5903 /* Emit the ZADDs needed to rebuild the sorted set */
5904 zset *zs = o->ptr;
5905 dictIterator *di = dictGetIterator(zs->dict);
5906 dictEntry *de;
5907
5908 while((de = dictNext(di)) != NULL) {
5909 char cmd[]="*4\r\n$4\r\nZADD\r\n";
5910 robj *eleobj = dictGetEntryKey(de);
5911 double *score = dictGetEntryVal(de);
5912
5913 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
5914 if (fwriteBulk(fp,key) == 0) goto werr;
5915 if (fwriteBulkDouble(fp,*score) == 0) goto werr;
5916 if (fwriteBulk(fp,eleobj) == 0) goto werr;
5917 }
5918 dictReleaseIterator(di);
5919 } else {
5920 assert(0 != 0);
5921 }
5922 /* Save the expire time */
5923 if (expiretime != -1) {
5924 char cmd[]="*3\r\n$6\r\nEXPIRE\r\n";
5925 /* If this key is already expired skip it */
5926 if (expiretime < now) continue;
5927 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
5928 if (fwriteBulk(fp,key) == 0) goto werr;
5929 if (fwriteBulkLong(fp,expiretime) == 0) goto werr;
5930 }
5931 }
5932 dictReleaseIterator(di);
5933 }
5934
5935 /* Make sure data will not remain on the OS's output buffers */
5936 fflush(fp);
5937 fsync(fileno(fp));
5938 fclose(fp);
5939
5940 /* Use RENAME to make sure the DB file is changed atomically only
5941 * if the generate DB file is ok. */
5942 if (rename(tmpfile,filename) == -1) {
5943 redisLog(REDIS_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
5944 unlink(tmpfile);
5945 return REDIS_ERR;
5946 }
5947 redisLog(REDIS_NOTICE,"SYNC append only file rewrite performed");
5948 return REDIS_OK;
5949
5950werr:
5951 fclose(fp);
5952 unlink(tmpfile);
5953 redisLog(REDIS_WARNING,"Write error writing append only fileon disk: %s", strerror(errno));
5954 if (di) dictReleaseIterator(di);
5955 return REDIS_ERR;
5956}
5957
5958/* This is how rewriting of the append only file in background works:
5959 *
5960 * 1) The user calls BGREWRITEAOF
5961 * 2) Redis calls this function, that forks():
5962 * 2a) the child rewrite the append only file in a temp file.
5963 * 2b) the parent accumulates differences in server.bgrewritebuf.
5964 * 3) When the child finished '2a' exists.
5965 * 4) The parent will trap the exit code, if it's OK, will append the
5966 * data accumulated into server.bgrewritebuf into the temp file, and
5967 * finally will rename(2) the temp file in the actual file name.
5968 * The the new file is reopened as the new append only file. Profit!
5969 */
5970static int rewriteAppendOnlyFileBackground(void) {
5971 pid_t childpid;
5972
5973 if (server.bgrewritechildpid != -1) return REDIS_ERR;
5974 if ((childpid = fork()) == 0) {
5975 /* Child */
5976 char tmpfile[256];
5977 close(server.fd);
5978
5979 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
5980 if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {
5981 exit(0);
5982 } else {
5983 exit(1);
5984 }
5985 } else {
5986 /* Parent */
5987 if (childpid == -1) {
5988 redisLog(REDIS_WARNING,
5989 "Can't rewrite append only file in background: fork: %s",
5990 strerror(errno));
5991 return REDIS_ERR;
5992 }
5993 redisLog(REDIS_NOTICE,
5994 "Background append only file rewriting started by pid %d",childpid);
5995 server.bgrewritechildpid = childpid;
85a83172 5996 /* We set appendseldb to -1 in order to force the next call to the
5997 * feedAppendOnlyFile() to issue a SELECT command, so the differences
5998 * accumulated by the parent into server.bgrewritebuf will start
5999 * with a SELECT statement and it will be safe to merge. */
6000 server.appendseldb = -1;
9d65a1bb 6001 return REDIS_OK;
6002 }
6003 return REDIS_OK; /* unreached */
6004}
6005
6006static void bgrewriteaofCommand(redisClient *c) {
6007 if (server.bgrewritechildpid != -1) {
6008 addReplySds(c,sdsnew("-ERR background append only file rewriting already in progress\r\n"));
6009 return;
6010 }
6011 if (rewriteAppendOnlyFileBackground() == REDIS_OK) {
6012 addReply(c,shared.ok);
6013 } else {
6014 addReply(c,shared.err);
6015 }
6016}
6017
6018static void aofRemoveTempFile(pid_t childpid) {
6019 char tmpfile[256];
6020
6021 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) childpid);
6022 unlink(tmpfile);
6023}
6024
7f957c92 6025/* ================================= Debugging ============================== */
6026
6027static void debugCommand(redisClient *c) {
6028 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
6029 *((char*)-1) = 'x';
210e29f7 6030 } else if (!strcasecmp(c->argv[1]->ptr,"reload")) {
6031 if (rdbSave(server.dbfilename) != REDIS_OK) {
6032 addReply(c,shared.err);
6033 return;
6034 }
6035 emptyDb();
6036 if (rdbLoad(server.dbfilename) != REDIS_OK) {
6037 addReply(c,shared.err);
6038 return;
6039 }
6040 redisLog(REDIS_WARNING,"DB reloaded by DEBUG RELOAD");
6041 addReply(c,shared.ok);
333298da 6042 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
6043 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
6044 robj *key, *val;
6045
6046 if (!de) {
6047 addReply(c,shared.nokeyerr);
6048 return;
6049 }
6050 key = dictGetEntryKey(de);
6051 val = dictGetEntryVal(de);
6052 addReplySds(c,sdscatprintf(sdsempty(),
942a3961 6053 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
6054 key, key->refcount, val, val->refcount, val->encoding));
7f957c92 6055 } else {
333298da 6056 addReplySds(c,sdsnew(
210e29f7 6057 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>|RELOAD]\r\n"));
7f957c92 6058 }
6059}
56906eef 6060
bcfc686d 6061/* =================================== Main! ================================ */
56906eef 6062
bcfc686d 6063#ifdef __linux__
6064int linuxOvercommitMemoryValue(void) {
6065 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
6066 char buf[64];
56906eef 6067
bcfc686d 6068 if (!fp) return -1;
6069 if (fgets(buf,64,fp) == NULL) {
6070 fclose(fp);
6071 return -1;
6072 }
6073 fclose(fp);
56906eef 6074
bcfc686d 6075 return atoi(buf);
6076}
6077
6078void linuxOvercommitMemoryWarning(void) {
6079 if (linuxOvercommitMemoryValue() == 0) {
6080 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
6081 }
6082}
6083#endif /* __linux__ */
6084
6085static void daemonize(void) {
6086 int fd;
6087 FILE *fp;
6088
6089 if (fork() != 0) exit(0); /* parent exits */
6090 setsid(); /* create a new session */
6091
6092 /* Every output goes to /dev/null. If Redis is daemonized but
6093 * the 'logfile' is set to 'stdout' in the configuration file
6094 * it will not log at all. */
6095 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
6096 dup2(fd, STDIN_FILENO);
6097 dup2(fd, STDOUT_FILENO);
6098 dup2(fd, STDERR_FILENO);
6099 if (fd > STDERR_FILENO) close(fd);
6100 }
6101 /* Try to write the pid file */
6102 fp = fopen(server.pidfile,"w");
6103 if (fp) {
6104 fprintf(fp,"%d\n",getpid());
6105 fclose(fp);
56906eef 6106 }
56906eef 6107}
6108
bcfc686d 6109int main(int argc, char **argv) {
6110 initServerConfig();
6111 if (argc == 2) {
6112 resetServerSaveParams();
6113 loadServerConfig(argv[1]);
6114 } else if (argc > 2) {
6115 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
6116 exit(1);
6117 } else {
6118 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
6119 }
6120 initServer();
6121 if (server.daemonize) daemonize();
6122 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
6123#ifdef __linux__
6124 linuxOvercommitMemoryWarning();
6125#endif
6126 if (server.appendonly) {
6127 if (loadAppendOnlyFile(server.appendfilename) == REDIS_OK)
6128 redisLog(REDIS_NOTICE,"DB loaded from append only file");
6129 } else {
6130 if (rdbLoad(server.dbfilename) == REDIS_OK)
6131 redisLog(REDIS_NOTICE,"DB loaded from disk");
6132 }
6133 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
266373b2 6134 acceptHandler, NULL) == AE_ERR) oom("creating file event");
bcfc686d 6135 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
6136 aeMain(server.el);
6137 aeDeleteEventLoop(server.el);
6138 return 0;
6139}
6140
6141/* ============================= Backtrace support ========================= */
6142
6143#ifdef HAVE_BACKTRACE
6144static char *findFuncName(void *pointer, unsigned long *offset);
6145
56906eef 6146static void *getMcontextEip(ucontext_t *uc) {
6147#if defined(__FreeBSD__)
6148 return (void*) uc->uc_mcontext.mc_eip;
6149#elif defined(__dietlibc__)
6150 return (void*) uc->uc_mcontext.eip;
06db1f50 6151#elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
56906eef 6152 return (void*) uc->uc_mcontext->__ss.__eip;
06db1f50 6153#elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
cb7e07cc 6154 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
06db1f50 6155 return (void*) uc->uc_mcontext->__ss.__rip;
cbc59b38 6156 #else
6157 return (void*) uc->uc_mcontext->__ss.__eip;
6158 #endif
b91cf5ef 6159#elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
56906eef 6160 return (void*) uc->uc_mcontext.gregs[REG_EIP];
b91cf5ef 6161#elif defined(__ia64__) /* Linux IA64 */
6162 return (void*) uc->uc_mcontext.sc_ip;
6163#else
6164 return NULL;
56906eef 6165#endif
6166}
6167
6168static void segvHandler(int sig, siginfo_t *info, void *secret) {
6169 void *trace[100];
6170 char **messages = NULL;
6171 int i, trace_size = 0;
6172 unsigned long offset=0;
6173 time_t uptime = time(NULL)-server.stat_starttime;
6174 ucontext_t *uc = (ucontext_t*) secret;
6175 REDIS_NOTUSED(info);
6176
6177 redisLog(REDIS_WARNING,
6178 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION, sig);
6179 redisLog(REDIS_WARNING, "%s", sdscatprintf(sdsempty(),
433cc893 6180 "redis_version:%s; "
56906eef 6181 "uptime_in_seconds:%d; "
433cc893 6182 "connected_clients:%d; "
6183 "connected_slaves:%d; "
6184 "used_memory:%zu; "
6185 "changes_since_last_save:%lld; "
6186 "bgsave_in_progress:%d; "
6187 "last_save_time:%d; "
6188 "total_connections_received:%lld; "
6189 "total_commands_processed:%lld; "
6190 "role:%s;"
6191 ,REDIS_VERSION,
6192 uptime,
6193 listLength(server.clients)-listLength(server.slaves),
6194 listLength(server.slaves),
6195 server.usedmemory,
6196 server.dirty,
9d65a1bb 6197 server.bgsavechildpid != -1,
433cc893 6198 server.lastsave,
6199 server.stat_numconnections,
6200 server.stat_numcommands,
6201 server.masterhost == NULL ? "master" : "slave"
6202 ));
56906eef 6203
6204 trace_size = backtrace(trace, 100);
de96dbfe 6205 /* overwrite sigaction with caller's address */
b91cf5ef 6206 if (getMcontextEip(uc) != NULL) {
6207 trace[1] = getMcontextEip(uc);
6208 }
56906eef 6209 messages = backtrace_symbols(trace, trace_size);
fe3bbfbe 6210
d76412d1 6211 for (i=1; i<trace_size; ++i) {
56906eef 6212 char *fn = findFuncName(trace[i], &offset), *p;
6213
6214 p = strchr(messages[i],'+');
6215 if (!fn || (p && ((unsigned long)strtol(p+1,NULL,10)) < offset)) {
6216 redisLog(REDIS_WARNING,"%s", messages[i]);
6217 } else {
6218 redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
6219 }
6220 }
6221 free(messages);
6222 exit(0);
fe3bbfbe 6223}
56906eef 6224
6225static void setupSigSegvAction(void) {
6226 struct sigaction act;
6227
6228 sigemptyset (&act.sa_mask);
6229 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
6230 * is used. Otherwise, sa_handler is used */
6231 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
6232 act.sa_sigaction = segvHandler;
6233 sigaction (SIGSEGV, &act, NULL);
6234 sigaction (SIGBUS, &act, NULL);
12fea928 6235 sigaction (SIGFPE, &act, NULL);
6236 sigaction (SIGILL, &act, NULL);
6237 sigaction (SIGBUS, &act, NULL);
e65fdc78 6238 return;
56906eef 6239}
e65fdc78 6240
bcfc686d 6241#include "staticsymbols.h"
6242/* This function try to convert a pointer into a function name. It's used in
6243 * oreder to provide a backtrace under segmentation fault that's able to
6244 * display functions declared as static (otherwise the backtrace is useless). */
6245static char *findFuncName(void *pointer, unsigned long *offset){
6246 int i, ret = -1;
6247 unsigned long off, minoff = 0;
ed9b544e 6248
bcfc686d 6249 /* Try to match against the Symbol with the smallest offset */
6250 for (i=0; symsTable[i].pointer; i++) {
6251 unsigned long lp = (unsigned long) pointer;
0bc03378 6252
bcfc686d 6253 if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
6254 off=lp-symsTable[i].pointer;
6255 if (ret < 0 || off < minoff) {
6256 minoff=off;
6257 ret=i;
6258 }
6259 }
0bc03378 6260 }
bcfc686d 6261 if (ret == -1) return NULL;
6262 *offset = minoff;
6263 return symsTable[ret].name;
0bc03378 6264}
bcfc686d 6265#else /* HAVE_BACKTRACE */
6266static void setupSigSegvAction(void) {
0bc03378 6267}
bcfc686d 6268#endif /* HAVE_BACKTRACE */
0bc03378 6269
ed9b544e 6270
ed9b544e 6271
bcfc686d 6272/* The End */
6273
6274
ed9b544e 6275